-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathpool.py
More file actions
286 lines (254 loc) · 11.3 KB
/
pool.py
File metadata and controls
286 lines (254 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# replacement of CoroutinePool implemented with proc module
from eventlib import coros, proc, api
class Pool(object):
def __init__(self, min_size=0, max_size=4, track_events=False):
if min_size > max_size:
raise ValueError('min_size cannot be bigger than max_size')
self.max_size = max_size
self.sem = coros.Semaphore(max_size)
self.procs = proc.RunningProcSet()
if track_events:
self.results = coros.queue()
else:
self.results = None
@property
def current_size(self):
return len(self.procs)
def free(self):
return self.sem.counter
def execute(self, func, *args, **kwargs):
"""Execute func in one of the coroutines maintained
by the pool, when one is free.
Immediately returns a Proc object which can be queried
for the func's result.
>>> pool = Pool()
>>> task = pool.execute(lambda a: ('foo', a), 1)
>>> task.wait()
('foo', 1)
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
if self.sem.locked() and api.getcurrent() in self.procs:
p = proc.spawn(func, *args, **kwargs)
try:
p.wait()
except:
pass
else:
self.sem.acquire()
p = self.procs.spawn(func, *args, **kwargs)
# assuming the above line cannot raise
p.link(lambda p: self.sem.release())
if self.results is not None:
p.link(self.results)
return p
execute_async = execute
def _execute(self, evt, func, args, kw):
p = self.execute(func, *args, **kw)
p.link(evt)
return p
def waitall(self):
return self.procs.waitall()
wait_all = waitall
def wait(self):
"""Wait for the next execute in the pool to complete,
and return the result."""
return self.results.wait()
def killall(self):
return self.procs.killall()
def launch_all(self, function, iterable):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Discard values returned by function(). You should call wait_all() to
wait for all coroutines, newly-launched plus any previously-submitted
execute() or execute_async() calls, to complete.
>>> pool = Pool()
>>> def saw(x):
... print "I saw %s!" % x
...
>>> pool.launch_all(saw, "ABC")
>>> pool.wait_all()
I saw A!
I saw B!
I saw C!
"""
for tup in iterable:
self.execute(function, *tup)
def process_all(self, function, iterable):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Discard values returned by function(). Don't return until all
coroutines, newly-launched plus any previously-submitted execute() or
execute_async() calls, have completed.
>>> from eventlib import coros
>>> pool = coros.CoroutinePool()
>>> def saw(x): print "I saw %s!" % x
...
>>> pool.process_all(saw, "DEF")
I saw D!
I saw E!
I saw F!
"""
self.launch_all(function, iterable)
self.wait_all()
def generate_results(self, function, iterable, qsize=None):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Yield each of the values returned by function(), in the order they're
completed rather than the order the coroutines were launched.
Iteration stops when we've yielded results for each arguments tuple in
iterable. Unlike wait_all() and process_all(), this function does not
wait for any previously-submitted execute() or execute_async() calls.
Results are temporarily buffered in a queue. If you pass qsize=, this
value is used to limit the max size of the queue: an attempt to buffer
too many results will suspend the completed CoroutinePool coroutine
until the requesting coroutine (the caller of generate_results()) has
retrieved one or more results by calling this generator-iterator's
next().
If any coroutine raises an uncaught exception, that exception will
propagate to the requesting coroutine via the corresponding next() call.
What I particularly want these tests to illustrate is that using this
generator function:
for result in generate_results(function, iterable):
# ... do something with result ...
executes coroutines at least as aggressively as the classic eventlib
idiom:
events = [pool.execute(function, *args) for args in iterable]
for event in events:
result = event.wait()
# ... do something with result ...
even without a distinct event object for every arg tuple in iterable,
and despite the funny flow control from interleaving launches of new
coroutines with yields of completed coroutines' results.
(The use case that makes this function preferable to the classic idiom
above is when the iterable, which may itself be a generator, produces
millions of items.)
>>> from eventlib import coros
>>> import string
>>> pool = coros.CoroutinePool(max_size=5)
>>> pausers = [coros.event() for x in xrange(2)]
>>> def longtask(evt, desc):
... print "%s woke up with %s" % (desc, evt.wait())
...
>>> pool.launch_all(longtask, zip(pausers, "AB"))
>>> def quicktask(desc):
... print "returning %s" % desc
... return desc
...
(Instead of using a for loop, step through generate_results()
items individually to illustrate timing)
>>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
>>> print step.next()
returning a
returning b
returning c
a
>>> print step.next()
b
>>> print step.next()
c
>>> print step.next()
returning d
returning e
returning f
d
>>> pausers[0].send("A")
>>> print step.next()
e
>>> print step.next()
f
>>> print step.next()
A woke up with A
returning g
returning h
returning i
g
>>> print "".join([step.next() for x in xrange(3)])
returning j
returning k
returning l
returning m
hij
>>> pausers[1].send("B")
>>> print "".join([step.next() for x in xrange(4)])
B woke up with B
returning n
returning o
returning p
returning q
klmn
"""
# Get an iterator because of our funny nested loop below. Wrap the
# iterable in enumerate() so we count items that come through.
tuples = iter(enumerate(iterable))
# If the iterable is empty, this whole function is a no-op, and we can
# save ourselves some grief by just quitting out. In particular, once
# we enter the outer loop below, we're going to wait on the queue --
# but if we launched no coroutines with that queue as the destination,
# we could end up waiting a very long time.
try:
index, args = next(tuples)
except StopIteration:
return
# From this point forward, 'args' is the current arguments tuple and
# 'index+1' counts how many such tuples we've seen.
# This implementation relies on the fact that _execute() accepts an
# event-like object, and -- unless it's None -- the completed
# coroutine calls send(result). We slyly pass a queue rather than an
# event -- the same queue instance for all coroutines. This is why our
# queue interface intentionally resembles the event interface.
q = coros.queue(max_size=qsize)
# How many results have we yielded so far?
finished = 0
# This first loop is only until we've launched all the coroutines. Its
# complexity is because if iterable contains more args tuples than the
# size of our pool, attempting to _execute() the (poolsize+1)th
# coroutine would suspend until something completes and send()s its
# result to our queue. But to keep down queue overhead and to maximize
# responsiveness to our caller, we'd rather suspend on reading the
# queue. So we stuff the pool as full as we can, then wait for
# something to finish, then stuff more coroutines into the pool.
try:
while True:
# Before each yield, start as many new coroutines as we can fit.
# (The self.free() test isn't 100% accurate: if we happen to be
# executing in one of the pool's coroutines, we could _execute()
# without waiting even if self.free() reports 0. See _execute().)
# The point is that we don't want to wait in the _execute() call,
# we want to wait in the q.wait() call.
# IMPORTANT: at start, and whenever we've caught up with all
# coroutines we've launched so far, we MUST iterate this inner
# loop at least once, regardless of self.free() -- otherwise the
# q.wait() call below will deadlock!
# Recall that index is the index of the NEXT args tuple that we
# haven't yet launched. Therefore it counts how many args tuples
# we've launched so far.
while self.free() > 0 or finished == index:
# Just like the implementation of execute_async(), save that
# we're passing our queue instead of None as the "event" to
# which to send() the result.
self._execute(q, function, args, {})
# We've consumed that args tuple, advance to next.
index, args = next(tuples)
# Okay, we've filled up the pool again, yield a result -- which
# will probably wait for a coroutine to complete. Although we do
# have q.ready(), so we could iterate without waiting, we avoid
# that because every yield could involve considerable real time.
# We don't know how long it takes to return from yield, so every
# time we do, take the opportunity to stuff more requests into the
# pool before yielding again.
yield q.wait()
# Be sure to count results so we know when to stop!
finished += 1
except StopIteration:
pass
# Here we've exhausted the input iterable. index+1 is the total number
# of coroutines we've launched. We probably haven't yielded that many
# results yet. Wait for the rest of the results, yielding them as they
# arrive.
while finished < index + 1:
yield q.wait()
finished += 1
if __name__=='__main__':
import doctest
doctest.testmod()