forked from DonJayamanne/pythonVSCode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtaskhandle.py
More file actions
131 lines (92 loc) · 2.81 KB
/
taskhandle.py
File metadata and controls
131 lines (92 loc) · 2.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from rope.base import exceptions
class TaskHandle(object):
def __init__(self, name='Task', interrupts=True):
"""Construct a TaskHandle
If `interrupts` is `False` the task won't be interrupted by
calling `TaskHandle.stop()`.
"""
self.name = name
self.interrupts = interrupts
self.stopped = False
self.job_sets = []
self.observers = []
def stop(self):
"""Interrupts the refactoring"""
if self.interrupts:
self.stopped = True
self._inform_observers()
def current_jobset(self):
"""Return the current `JobSet`"""
if self.job_sets:
return self.job_sets[-1]
def add_observer(self, observer):
"""Register an observer for this task handle
The observer is notified whenever the task is stopped or
a job gets finished.
"""
self.observers.append(observer)
def is_stopped(self):
return self.stopped
def get_jobsets(self):
return self.job_sets
def create_jobset(self, name='JobSet', count=None):
result = JobSet(self, name=name, count=count)
self.job_sets.append(result)
self._inform_observers()
return result
def _inform_observers(self):
for observer in list(self.observers):
observer()
class JobSet(object):
def __init__(self, handle, name, count):
self.handle = handle
self.name = name
self.count = count
self.done = 0
self.job_name = None
def started_job(self, name):
self.check_status()
self.job_name = name
self.handle._inform_observers()
def finished_job(self):
self.check_status()
self.done += 1
self.handle._inform_observers()
self.job_name = None
def check_status(self):
if self.handle.is_stopped():
raise exceptions.InterruptedTaskError()
def get_active_job_name(self):
return self.job_name
def get_percent_done(self):
if self.count is not None and self.count > 0:
percent = self.done * 100 // self.count
return min(percent, 100)
def get_name(self):
return self.name
class NullTaskHandle(object):
def __init__(self):
pass
def is_stopped(self):
return False
def stop(self):
pass
def create_jobset(self, *args, **kwds):
return NullJobSet()
def get_jobsets(self):
return []
def add_observer(self, observer):
pass
class NullJobSet(object):
def started_job(self, name):
pass
def finished_job(self):
pass
def check_status(self):
pass
def get_active_job_name(self):
pass
def get_percent_done(self):
pass
def get_name(self):
pass