-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtt.py
More file actions
1361 lines (1182 loc) · 45.1 KB
/
tt.py
File metadata and controls
1361 lines (1182 loc) · 45.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Tracking time against various projects from the CLI.
Exit codes:
- 0: Time successfully logged
- 1: Stopwatch running when 'start' command issued
- 2: Stopwatch not running when 'stop' command issued
- 3: Interruption attempted with ongoing interruption
- 4: Stopwatch resumption from interruption without ongoing stopwatch
- 5: Attempt to resume from an interruption without an open interruption
- 6: Attempt to log a finite interval neither endpoint was specified
- 7: Attempt to log a finite interval where the start does not strictly precede the end
- 8: Unable to parse a given timespec
- 9: Attempt to edit a record with an ID that doesn't exist.
- 10: A hook failed to execute properly
- 11: A sortkey was specified for `tt ls`, but the key doesn't exist for one or more items in the log
- 13: An attempt was made to stop a tracked interval with an ongoing interruption.
- 127: A dryrun was specified.
"""
import os
import sys
import copy
import json
import time
import base64
from os.path import isdir, isfile # pylint: disable=C0412
from os.path import join as pathjoin # pylint: disable=C0412
from argparse import ArgumentParser
try:
import yaml
except ImportError:
pass
VALID_RECORD_SORT_KEYS = [
"CommitTime", "StartTime", "EndTime", "Description", "ID", "Detail"
]
def __record_sort_keys(k):
if k in VALID_RECORD_SORT_KEYS:
return k
print("Sort key must be one of: %s" % str(VALID_RECORD_SORT_KEYS),
file=sys.stderr)
raise ValueError("Sort key must be one of: %s" %
str(VALID_RECORD_SORT_KEYS))
def __dotdir():
if sys.platform == "win32":
return pathjoin(os.environ.get("HOMEDRIVE", ""),
os.environ.get("HOMEPATH", ""), ".litt")
else:
return pathjoin(os.environ.get("HOME", ""), ".litt")
def check_dotfile():
"""
Check the dotfiles for an existing database structure.
"""
try:
if isdir(__dotdir()):
if isfile("%s/events.json" % __dotdir()):
if isfile("%s/config.json" % __dotdir()):
return True
return False
except: # pylint: disable=W0702
return False
def init_dotfiles():
"""
Initialize the dotfiles for the first time.
"""
try:
os.makedirs(__dotdir(), mode=0o755)
except: # pylint: disable=W0702
pass
with open("%s/events.json" % __dotdir(), "w") as ofp:
ofp.write(
json.dumps(
{
"Stopwatch": None,
"Interruption": None,
"Aliases": dict(),
"Records": dict()
},
indent=2,
sort_keys=True))
with open("%s/config.json" % __dotdir(), "w") as ofp:
ofp.write(
json.dumps({"OutputFormat": "json"}, indent=2, sort_keys=True))
def load_hooks():
"""
List all files in the hook directories, and determine which are suitable for execution as hook
events.
"""
from os import listdir, access, X_OK
hooks = dict(pre_load=list(),
pre_commit=list(),
post_commit=list(),
pre_config_write=list(),
post_config_write=list())
for hook_key, binaries in hooks.items():
try:
for hookfile in listdir("%s/hooks/%s" % (__dotdir(), hook_key)):
hook_fullpath = "%s/hooks/%s/%s" % (__dotdir(), hook_key,
hookfile)
if isfile(hook_fullpath) and access(hook_fullpath, X_OK):
binaries.append(hook_fullpath)
binaries.sort()
except FileNotFoundError: # pylint: disable=E0602
pass
return hooks
def run_hooks(hookevent, hooks, data):
"""
Run all hooks for a given event name, passing the JSON serialized data into the hook on stdin.
"""
import subprocess
for hookfile in hooks[hookevent]:
proc = subprocess.Popen((hookfile, hookevent),
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=__dotdir())
proc.stdin.write(json.dumps(data, sort_keys=True).encode("utf-8"))
stdout, stderr = proc.communicate()
if proc.returncode != 0:
print("%s hook returned non-zero, aborting." % hookevent,
file=sys.stderr)
print(stdout.decode("utf-8"))
print(stderr.decode("utf-8"))
sys.exit(10)
def __human_alias(alias):
# An alias has a limited number of properties right now:
ret = ""
if "Description" in alias:
ret += " Description: %s\n" % alias["Description"]
if "Tags" in alias:
ret += " Tags: %s\n" % ",".join(alias["Tags"])
if "Detail" in alias:
ret += " Details:\n %s\n" % alias["Detail"]
return ret
def __seconds_to_hhmmss(seconds):
if seconds < 60:
if seconds < 10:
duration_string = "%.2fs" % seconds
else:
duration_string = "%ds" % seconds
else:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
duration_string = "%dh %02dm" % (hours, minutes)
return duration_string
def __human_record(record, context=None):
ret = ""
if "StartTime" in record:
ret += "Record started at: %s\n" % __timestamp_to_iso(
record["StartTime"])
if "EndTime" in record:
if record["EndTime"] is None:
ret += "Recording is still ongoing.\n"
ret += "Elapsed wall-clock time: %s\n" % __seconds_to_hhmmss(
__parse_time("now") - record["StartTime"])
else:
ret += "Record ended at: %s\n" % __timestamp_to_iso(
record["EndTime"])
ret += "Elapsed wall-clock time: %s\n" % __seconds_to_hhmmss(
record["EndTime"] - record["StartTime"])
if "Interruptions" in record and record["Interruptions"] != []:
ret += "Number of interruptions: %s\n" % len(
record["Interruptions"])
interruption_duration = sum([
context[i["Id"]]["EndTime"] - context[i["Id"]]["StartTime"]
for i in record["Interruptions"]
])
ret += "Total interruption duration: %s\n" % __seconds_to_hhmmss(
interruption_duration)
ret += "Activity duration: %s\n" % __seconds_to_hhmmss(
record["EndTime"] - record["StartTime"] -
interruption_duration)
if "Description" in record:
ret += "Description: %s\n" % record["Description"]
if "Tags" in record:
ret += "Tags: %s\n" % ",".join(record["Tags"])
if "Detail" in record:
ret += "Details:\n %s\n" % record["Detail"]
if "StructuredData" in record:
ret += "StructuredData:\n %s\n" % record["StructuredData"]
return ret + "\n"
def __write_output(obj,
pargs,
config,
human_hint,
dict_as_entries=False,
outfile=sys.stdout):
fmt = config[
"OutputFormat"] if pargs.output_format is None else pargs.output_format
if fmt == "json":
print(json.dumps(({i["key"]: i["value"]
for i in obj} if dict_as_entries else obj),
sort_keys=True,
indent=4),
file=outfile)
elif fmt == "json-compact":
print(json.dumps(({i["key"]: i["value"]
for i in obj} if dict_as_entries else obj),
sort_keys=True),
file=outfile)
elif fmt == "yaml":
print(yaml.dump(({i["key"]: i["value"]
for i in obj} if dict_as_entries else obj),
default_flow_style=False).strip(),
file=outfile)
elif fmt == "human":
# For human readable output, we need to know what the human hint is.
if human_hint == "ID":
# It's just an ID string, so just print it out saying so.
print("Committed Record ID: %s" % obj, file=outfile)
elif human_hint == "Config":
# It is the configuration object, so print it in pseudo-yaml
for k, v in obj.items():
print("%s: %s" % (str(k), str(v)), file=outfile)
elif human_hint.startswith("Alias"):
# This will always be a dictionary mapping alias keys to parameter sets.
for key, alias in obj.items():
print("Alias \"%s\"" % key, file=outfile)
print(__human_alias(alias), file=outfile)
elif human_hint.startswith("Record"):
# This will always be a dictionary, or dictionary as entries,
# but may sometimes be a bare record.
if "StartTime" in obj:
print(__human_record(obj), file=outfile)
else:
if dict_as_entries:
entry_dict = {
entry["key"]: entry["value"]
for entry in obj
}
for entry in obj:
key = entry["key"]
value = entry["value"]
if value.get("__Hidden", False):
continue
print("Record \"%s\"" % key, file=outfile)
print(__human_record(value, entry_dict), file=outfile)
else:
for key, value in obj.items():
if value.get("__Hidden", False):
continue
print("Record \"%s\"" % key, file=outfile)
print(__human_record(value, obj), file=outfile)
else:
raise ValueError("Undefined human hint '%s'" % human_hint)
def __load_state():
"""
Load time tracking events from the DB in the dotdirectory.
"""
with open(pathjoin(__dotdir(), "events.json"), "r") as fp:
state = json.loads(fp.read())
with open(pathjoin(__dotdir(), "config.json"), "r") as fp:
config = json.loads(fp.read())
return state, config
def __write_state(state, hooks):
"""
Save time tracking events to the dotfile
"""
with open("%s/events.json" % __dotdir(), "w") as ofp:
ofp.write(json.dumps(state, indent=2, sort_keys=True))
def __write_config(config, hooks):
"""
Save the configuration to the persistent file for future invocations.
"""
run_hooks("pre_config_write", hooks, config)
with open("%s/config.json" % __dotdir(), "w") as ofp:
ofp.write(json.dumps(config, indent=2, sort_keys=True))
run_hooks("post_config_write", hooks, config)
def cmd_base(pargs, state, config, outfile=sys.stdout):
"""
Print out the status of any currently running stopwatch or interruption timer.
"""
if state["Stopwatch"] is not None:
__write_output(state["Stopwatch"],
pargs,
config,
"Record.Incomplete",
outfile=outfile)
if state["Interruption"] is not None:
__write_output(state["Interruption"],
pargs,
config,
"Record.Incomplete",
outfile=outfile)
return None
def cmd_config(pargs, _, config, hooks):
"""
Handle configuration commands include printing the configuration and updating values.
"""
options = dict(output_format="OutputFormat")
opts = dict([(optkey, getattr(pargs, optkey))
for optkey in options.keys()]) # pylint: disable=C0201
# If all supported options are None (that is, unspecified), then just print the current
# configuration
if set(opts.values()) == set([None]):
__write_output(config, pargs, config, "Config")
# Otherwise, for those that are not None, update the values, and write them back to the file.
else:
for key, val in opts.items():
if val is not None:
config[options[key]] = val
__write_config(config, hooks)
return None
def cmd_alias(pargs, state, config):
"""
Create, modify, delete, and list aliases.
"""
images = None
if pargs.key is None:
__write_output(state["Aliases"], pargs, config, "Alias.List")
else:
images = dict()
alias = dict()
if pargs.description is not None:
alias["Description"] = pargs.description
if pargs.detail is not None:
alias["Detail"] = pargs.detail
if pargs.structured_data is not None:
alias["StructuredData"] = base64.b64encode(pargs.structured_data)
if pargs.tag != []:
alias["Tags"] = pargs.tag
# Track the old image, which is the old alias config. If the alias doesn't exist yet, then
# the old image value is None
images["OldImage"] = {pargs.key: state["Aliases"].get(pargs.key, None)}
# Because len(alias) == 0 as a condition is against PEP8. We were essentially checking that
# the dictionary had stuff in it, so if it DOES NOT have stuff in it...
if not alias:
if pargs.key in state["Aliases"]:
del state["Aliases"][pargs.key]
images["NewImage"] = {pargs.key: None}
else:
state["Aliases"][pargs.key] = alias
images["NewImage"] = {pargs.key: alias}
return images
def __resolve_positional_arg(pargs, state):
"""
Given the input arguments, determine if there is a position argument, and if so, attempt to
resolve it to the value of either the description or the alias.
"""
if pargs.quicktext is not None:
# Check it against aliases
if pargs.quicktext in state["Aliases"]:
pargs.alias = pargs.quicktext
# If it isn't an alias, use it as the description if --description is not provided
elif pargs.description is None:
pargs.description = pargs.quicktext
def __resolve_alias(pargs, state):
"""
Take the properties defined in the alias, and map them into the record.
"""
if pargs.alias not in state["Aliases"]:
return
alias = state["Aliases"][pargs.alias]
props = dict(
description="Description",
detail="Detail",
structured_data="StructuredData",
)
for key, val in props.items():
# If the alias property is nonempty, and the property was not specified, then fill in the
# property value from the alias. If the property WAS specified, then do NOT overwrite it
# with the alias.
#
# Note that not all keys are guaranteed to be in the alias.
if alias.get(val, None) is not None and getattr(pargs, key) is None:
setattr(pargs, key, alias[val])
# Tags are handled separately: append any tags in the alias to the tags in the input, and then
# remove any from the input's untag property.
#
# Note that pargs.tag is guaranteed to be a list, but the alias key doesn't even need to exist,
# let alone be a list.
pargs.tag += alias.get("Tags", [])
if "untag" in dir(pargs):
pargs.tag = list(set(pargs.tag).difference(set(pargs.untag)))
else:
pargs.tag = list(set(pargs.tag))
def __create_record(pargs, state):
__resolve_positional_arg(pargs, state)
if pargs.alias is not None:
__resolve_alias(pargs, state)
cur_time = time.time()
record = dict(
CommitTime=cur_time,
StartTime=cur_time,
EndTime=cur_time,
Tags=pargs.tag,
Description=pargs.description,
Detail=pargs.detail,
StructuredData=pargs.structured_data,
)
return record
def cmd_start(pargs, state, _, outfile=sys.stdout):
"""
Start a stopwatch to track time against a task
"""
if state["Stopwatch"] is not None:
print("Stopwatch currently running, ignoring current request",
file=sys.stderr)
if outfile == sys.stdout:
sys.exit(1)
else:
return None
record = __create_record(pargs, state)
if pargs.start_time is not None:
record["StartTime"] = __parse_time(pargs.start_time)
record["EndTime"] = None
record["CommitTime"] = None
record["Interruptions"] = []
state["Stopwatch"] = record
return None
def __generate_id(state):
while True:
id_candidate = time.strftime("%Y%m%d") + (
"-%s" % "".join([chr(ord('A') + (c % 26)) for c in os.urandom(4)]))
if id_candidate not in state["Records"]:
break
return id_candidate
def __update_record(old_record_prototype, pargs, state):
# Given an input record, use the values in the pargs value to update any values in the old
# record, and replace the old record values.
# Given the pargs value, create the record for it, and then throw away any keys that have a
# value of None (the sentinel for "not supplied"; don't clobber a value in the old record that
# was unspecified in the new one).
new_record = dict([(k, v)
for k, v in __create_record(pargs, state).items()
if v is not None])
del new_record["StartTime"]
old_record = copy.deepcopy(old_record_prototype)
new_record["Tags"] = list(
set(old_record["Tags"] + new_record["Tags"]).difference(
set(pargs.untag)))
old_record.update(new_record)
return old_record
def cmd_stop(pargs, state, config, outfile=sys.stdout):
"""
Start a stopwatch to track time against a task
"""
if state["Interruption"] is not None:
print("Interruption is in progress, ignoring current request",
file=sys.stderr)
if outfile == sys.stdout:
sys.exit(13)
else:
return None
if state["Stopwatch"] is None:
print("Stopwatch not currently running, ignoring current request",
file=sys.stderr)
if outfile == sys.stdout:
sys.exit(2)
else:
return None
if pargs.id is None:
pargs.id = __generate_id(state)
record = __update_record(state["Stopwatch"], pargs, state)
if pargs.end_time is not None:
record["EndTime"] = __parse_time(pargs.end_time)
state["Stopwatch"] = None
state["Records"][pargs.id] = record
__write_output(pargs.id, pargs, config, "ID", outfile=outfile)
# Return the images, with None for the old image.
return dict(OldImage=None, NewImage={pargs.id: record})
def cmd_sw(pargs, state, config, outfile=sys.stdout):
"""
Implements the smart stopwatch command which delegates to the start and stop functions based
on the current state of the stopwatch.
"""
# Since this is essentially just start or stop, but context sensitive, determine if there is
# a stopwatch running, and call the right function.
if state["Stopwatch"] is None:
return cmd_start(pargs, state, config, outfile)
else:
return cmd_stop(pargs, state, config, outfile)
def cmd_isw(pargs, state, config, outfile=sys.stdout):
"""
Implements the smart interruption stopwatch command which delegates to the interrupt and resume functions based
on the current state of the interruption stopwatch.
"""
# Since this is essentially just start or stop, but context sensitive, determine if there is
# a stopwatch running, and call the right function.
if state["Interruption"] is None:
return cmd_interrupt(pargs, state, config, outfile)
else:
return cmd_resume(pargs, state, config, outfile)
def cmd_cancel(pargs, state, config, outfile=sys.stdout):
"""
Cancels and cleans up any interruption or stopwatch that are currently active.
"""
if state["Interruption"] is not None:
state["Interruption"] = None
else:
state["Stopwatch"] = None
def cmd_interrupt(pargs, state, config, outfile=sys.stdout):
"""
Interrupt an existing stopwatch with a one-at-a-time interrupt timer.
"""
if state["Stopwatch"] is None:
if not hasattr(pargs, "start_time"):
pargs.start_time = None
cmd_start(pargs, state, config, outfile)
else:
# Confirm that there are no ongoing interruptions
if state["Interruption"] is not None:
print(
"Unable to interrupt task, as existing interruption is in progress.",
file=sys.stderr)
if outfile == sys.stdout:
sys.exit(3)
else:
return None
# Since at this point, there's no ongoing interruptions, add a new one to the end of the
# Interruptions list
record = __create_record(pargs, state)
record["EndTime"] = None
record["CommitTime"] = None
state["Interruption"] = record
def cmd_resume(pargs, state, config, outfile=sys.stdout):
"""
Resume an interrupted stopwatch by cleaning up and writing the event to the ledger.
"""
if state["Stopwatch"] is None:
print("Unable to resume from interruption with no stopwatch running.",
file=sys.stderr)
if outfile == sys.stdout:
sys.exit(4)
else:
return None
else:
# Confirm that the interruption that's most recent exists and is indeed still open
if state["Interruption"] is None:
print("Unable to resume without an open interruption.",
file=sys.stderr)
if outfile == sys.stdout:
sys.exit(5)
else:
return None
# Since there's an open interruption, close it out.
if pargs.id is None:
pargs.id = __generate_id(state)
record = __update_record(state["Interruption"], pargs, state)
state["Records"][pargs.id] = record
state["Interruption"] = None
state["Stopwatch"]["Interruptions"].append(dict(Id=pargs.id))
__write_output(pargs.id, pargs, config, "ID", outfile=outfile)
# Return the images, with None for the old image.
return dict(OldImage=None, NewImage={pargs.id: record})
def __parse_time(timespec):
from dateparser import parse as datetimeparser
from dateutil.tz import tzlocal
dto = datetimeparser(timespec)
if dto is None:
print("Unable to parse your timespec \"%s\"." % timespec,
file=sys.stderr)
sys.exit(8)
if dto.tzinfo is None:
return dto.replace(tzinfo=tzlocal()).timestamp()
return dto.timestamp()
def cmd_track(pargs, state, config, outfile=sys.stdout):
"""
Track a fixed interval of time
"""
if pargs.id is None:
pargs.id = __generate_id(state)
cur_time = time.time()
if pargs.start_time is None and pargs.end_time is None:
print(
"At least one of start and end of a finite interval must be specified.",
file=sys.stderr)
if outfile == sys.stdout:
sys.exit(6)
else:
return None
# Since the above check guarantees at least one was specified, set the other to
# a timezone aware "now" (in UTC).
if pargs.start_time is None:
pargs.start_time = "now UTC"
if pargs.end_time is None:
pargs.end_time = "now UTC"
start_time = __parse_time(pargs.start_time)
end_time = __parse_time(pargs.end_time)
if end_time - start_time <= 0:
print(
"For finite-interval tracking, the end time must be strictly after the start time.",
file=sys.stderr)
if outfile == sys.stdout:
sys.exit(7)
else:
return None
record = __create_record(pargs, state)
record["StartTime"] = start_time
record["EndTime"] = end_time
record["CommitTime"] = cur_time
if pargs.dryrun:
__write_output(record, pargs, config, "Record.Complete")
if outfile == sys.stdout:
sys.exit(127)
else:
return None
else:
state["Records"][pargs.id] = record
__write_output(pargs.id, pargs, config, "ID", outfile=outfile)
# Return the images, with None for the old image.
return dict(OldImage=None, NewImage={pargs.id: record})
def cmd_amend(pargs, state, config, outfile=sys.stdout):
"""
Amend the properties of a tracked record
"""
if pargs.id is None and state["Stopwatch"] is None:
print(
"No record ID specified, and no stopwatch running to default to.",
file=sys.stderr)
if outfile == sys.stdout:
sys.exit(9)
else:
return None
# First, check that the ID specified exists.
if pargs.id is not None and pargs.id not in state["Records"]:
print("Specified record ID does not exist.", file=sys.stderr)
if outfile == sys.stdout:
sys.exit(9)
else:
return None
# If the ID exists, create a new record from the arguments, and then clobber the record pulled
# from the ledger.
if pargs.id is None:
images = None
old_record = state["Stopwatch"]
record = __update_record(copy.deepcopy(state["Stopwatch"]), pargs,
state)
else:
images = dict()
images["OldImage"] = {
pargs.id: copy.deepcopy(state["Records"][pargs.id])
}
old_record = state["Records"][pargs.id]
record = __update_record(old_record, pargs, state)
# Reset the timestamps in the record to what the original record was, then we'll replace as
# necessary based on what is provided in the pargs attributes.
record["StartTime"] = old_record["StartTime"]
record["EndTime"] = old_record["EndTime"]
# Fix the weirdness that's going to go on with the timestamps
if pargs.start_time is not None:
record["StartTime"] = __parse_time(pargs.start_time)
if pargs.end_time is not None:
record["EndTime"] = __parse_time(pargs.end_time)
if pargs.id is None:
state["Stopwatch"] = record
else:
images["NewImage"] = {pargs.id: record}
state["Records"][pargs.id] = record
return images
def __check_tag_filter(tags, taglist):
ret = (tags == taglist or list(set(tags).intersection(set(taglist))) != [])
return ret
def __check_timespec_filter(timestamp, timespeclist):
for timespec in timespeclist:
dto = __parse_time(timespec["Timespec"])
if eval(repr(timestamp) + timespec["Condition"] + repr(dto)):
return True
return False
def __check_regex_filter(stringval, regexlist):
import re
if stringval is None:
return False
for pattern in regexlist:
if re.search(pattern, stringval):
return True
return False
def __filter_records(sieve, records):
filters = {
"Tags": __check_tag_filter,
"StartTime": __check_timespec_filter,
"EndTime": __check_timespec_filter,
"Description": __check_regex_filter,
"Detail": __check_regex_filter
}
results = dict()
for record_id, record in records.items():
for k, v in filters.items():
if k in record and k in sieve and v(record[k], sieve[k]):
results[record_id] = record
return results
def __timestamp_to_iso(timestamp):
from datetime import datetime
from dateutil.tz import tzlocal
return datetime.fromtimestamp(timestamp).replace(
tzinfo=tzlocal()).strftime("%FT%T%z")
def __csv_format(records, allrecords, pargs, outfile=sys.stdout):
from collections import Counter
from csv import DictWriter
# Print out a CSV with the following header structure
# RecordID StartTime EndTime CommitTime Duration InterruptionDuration Description Detail [StructuredData] Tag1 Tag2 ...
#
# Tags that appear in all records are omitted, and 'x' is placed in the column for rows that
# have that tag.
#
# Timestamps are converted from UTC Unix tiemstamps to ISO timestamps in the local timezone.
#
# For records with interruptions, the time will be subtracted from the Duration value, so
# the duration may not match the difference between start and end time.
tags = list()
rows = copy.deepcopy(records)
# The input is always a list of dictionary items as entries of the form
# [{"key": ..., "value": ...}, ...]
for entry in rows:
key = entry["key"]
val = entry["value"]
val["RecordId"] = key
val["Duration"] = (val["EndTime"] - val["StartTime"]) / 3600
val["InterruptionDuration"] = 0.0
for event in val.get("Interruptions", []):
val["InterruptionDuration"] += (
allrecords[event["Id"]]["EndTime"] -
allrecords[event["Id"]]["StartTime"]) / 3600
val["Duration"] -= val["InterruptionDuration"] / 3600
val["StartTime"] = __timestamp_to_iso(val["StartTime"])
val["EndTime"] = __timestamp_to_iso(val["EndTime"])
val["CommitTime"] = __timestamp_to_iso(val["CommitTime"])
if "Tags" in val:
tags += val["Tags"]
for tag in val["Tags"]:
val[tag] = "x"
del val["Tags"]
tag_counts = Counter(tags)
ignored_tags = [k for k, v in tag_counts.items() if v == len(records)]
tag_columns = sorted(
list(set(list(tag_counts.keys())).difference(set(ignored_tags))))
column_names = [
"RecordId", "StartTime", "EndTime", "CommitTime", "Duration",
"InterruptionDuration", "Description"
] + ([] if pargs.without_detail else ["Detail"]) + (
["StructuredData"] if pargs.with_structured_data else []) + tag_columns
csv = DictWriter(outfile, column_names)
csv.writeheader()
csv.writerows([{
col_name: entry["value"].get(col_name, "")
for col_name in column_names
} for entry in rows])
def cmd_ls(pargs, state, config, outfile=sys.stdout):
"""
Retrieve and filter the records based on the input options, sorting the output by the provided
sorting key.
"""
if pargs.pos_id is not None:
pargs.id.append(pargs.pos_id)
if pargs.id != []:
results = {
rid: copy.deepcopy(state["Records"].get(rid, None))
for rid in pargs.id if rid in state["Records"]
}
else:
results = copy.deepcopy(state["Records"])
for sieve in pargs.filter:
results = __filter_records(sieve, results)
# Now, loop through to find the smallest set containing all of the specified records
# and any of their referenced records (i.e. interruptions)
more_ids = set([
i["Id"] for rec_id, rec in results.items()
for i in rec.get("Interruptions", list())
])
more_ids = more_ids.difference(set(results.keys()))
while more_ids != set():
for rec_id in more_ids:
results[rec_id] = copy.deepcopy(state["Records"][rec_id])
results[rec_id]["__Hidden"] = True
more_ids = set([
i["Id"] for rec_id, rec in results.items()
for i in rec.get("Interruptions", list())
])
more_ids = more_ids.difference(set(results.keys()))
if not pargs.with_structured_data:
for _, val in results.items():
if "StructuredData" in val:
del val["StructuredData"]
if pargs.without_detail:
for _, val in results.items():
if "Detail" in val:
del val["Detail"]
results_list = [{"key": k, "value": v} for k, v in results.items()]
try:
results_list.sort(key=lambda d: d["value"][pargs.sort_by]
if pargs.sort_by != "ID" else d["key"])
if pargs.last is not None:
# If --last was specified, then only take the last N (or first N)
# entries, keeping all of the hidden ones.
results_list = [
result
for result in results_list if result.get("__Hidden", False)
] + (lambda l: l[-pargs.last:]
if pargs.last > 0 else l[:-pargs.last])([
result for result in results_list
if not result.get("__Hidden", False)
])
except Exception as e:
print("Error sorting log output.", repr(e), file=sys.stderr)
sys.exit(12)
if pargs.csv:
__csv_format(results_list, state["Records"], pargs, outfile)
else:
__write_output(results_list,
pargs,
config,
"Record.Complete.List",
dict_as_entries=True,
outfile=outfile)
return None
def cmd_serve(pargs, state, config):
import tt_serve
server = tt_serve.create_server(pargs.preshared_key)
server.run(port=pargs.port)
def __positional_argument(parser):
parser.add_argument(
default=None,
nargs="?",
dest="quicktext",
help=
"""Positional argument that is first checked as an alias key and, failing that, used as the
description.""")
def __property_options(parser):
parser.add_argument(
"-d",
"--description",
required=False,
default=None,
metavar="<description>",
help="""A short description of the work done during the time tracked."""
)
parser.add_argument(
"-D",
"--detail",
required=False,
default=None,
metavar="<detail>",
help=
"""A detailed description of the work done during the time tracked.""")
parser.add_argument(
"-t",
"--tag",
action="append",
required=False,
default=[],
metavar="<tag>",
help=
"""Include a tag for the tracked unit of time. Specify multiple times to include multiple
tags.""")
parser.add_argument(
"-S",
"--structured-data",
required=False,
default=None,
metavar="<data>",
help=
"""Arbitrary data to associate with the time record, base64 encoded before storing."""
)
def __alias_option(parser):