coropool.Pool: add _execute, launch_all, process_all and generate_results (from the old CoroutinePool)
This commit is contained in:
@@ -49,6 +49,11 @@ class Pool(object):
|
|||||||
|
|
||||||
execute_async = execute
|
execute_async = execute
|
||||||
|
|
||||||
|
def _execute(self, evt, func, args, kw):
|
||||||
|
p = self.execute(func, args, kw)
|
||||||
|
p.link(evt)
|
||||||
|
return p
|
||||||
|
|
||||||
def waitall(self):
|
def waitall(self):
|
||||||
return self.procs.waitall()
|
return self.procs.waitall()
|
||||||
|
|
||||||
@@ -62,4 +67,217 @@ class Pool(object):
|
|||||||
def killall(self):
|
def killall(self):
|
||||||
return self.procs.killall()
|
return self.procs.killall()
|
||||||
|
|
||||||
|
def launch_all(self, function, iterable):
|
||||||
|
"""For each tuple (sequence) in iterable, launch function(*tuple) in
|
||||||
|
its own coroutine -- like itertools.starmap(), but in parallel.
|
||||||
|
Discard values returned by function(). You should call wait_all() to
|
||||||
|
wait for all coroutines, newly-launched plus any previously-submitted
|
||||||
|
execute() or execute_async() calls, to complete.
|
||||||
|
|
||||||
|
>>> pool = Pool()
|
||||||
|
>>> def saw(x):
|
||||||
|
... print "I saw %s!" % x
|
||||||
|
...
|
||||||
|
>>> pool.launch_all(saw, "ABC")
|
||||||
|
>>> pool.wait_all()
|
||||||
|
I saw A!
|
||||||
|
I saw B!
|
||||||
|
I saw C!
|
||||||
|
"""
|
||||||
|
for tup in iterable:
|
||||||
|
self.execute(function, *tup)
|
||||||
|
|
||||||
|
def process_all(self, function, iterable):
|
||||||
|
"""For each tuple (sequence) in iterable, launch function(*tuple) in
|
||||||
|
its own coroutine -- like itertools.starmap(), but in parallel.
|
||||||
|
Discard values returned by function(). Don't return until all
|
||||||
|
coroutines, newly-launched plus any previously-submitted execute() or
|
||||||
|
execute_async() calls, have completed.
|
||||||
|
|
||||||
|
>>> from eventlet import coros
|
||||||
|
>>> pool = coros.CoroutinePool()
|
||||||
|
>>> def saw(x): print "I saw %s!" % x
|
||||||
|
...
|
||||||
|
>>> pool.process_all(saw, "DEF")
|
||||||
|
I saw D!
|
||||||
|
I saw E!
|
||||||
|
I saw F!
|
||||||
|
"""
|
||||||
|
self.launch_all(function, iterable)
|
||||||
|
self.wait_all()
|
||||||
|
|
||||||
|
def generate_results(self, function, iterable, qsize=None):
|
||||||
|
"""For each tuple (sequence) in iterable, launch function(*tuple) in
|
||||||
|
its own coroutine -- like itertools.starmap(), but in parallel.
|
||||||
|
Yield each of the values returned by function(), in the order they're
|
||||||
|
completed rather than the order the coroutines were launched.
|
||||||
|
|
||||||
|
Iteration stops when we've yielded results for each arguments tuple in
|
||||||
|
iterable. Unlike wait_all() and process_all(), this function does not
|
||||||
|
wait for any previously-submitted execute() or execute_async() calls.
|
||||||
|
|
||||||
|
Results are temporarily buffered in a queue. If you pass qsize=, this
|
||||||
|
value is used to limit the max size of the queue: an attempt to buffer
|
||||||
|
too many results will suspend the completed CoroutinePool coroutine
|
||||||
|
until the requesting coroutine (the caller of generate_results()) has
|
||||||
|
retrieved one or more results by calling this generator-iterator's
|
||||||
|
next().
|
||||||
|
|
||||||
|
If any coroutine raises an uncaught exception, that exception will
|
||||||
|
propagate to the requesting coroutine via the corresponding next() call.
|
||||||
|
|
||||||
|
What I particularly want these tests to illustrate is that using this
|
||||||
|
generator function:
|
||||||
|
|
||||||
|
for result in generate_results(function, iterable):
|
||||||
|
# ... do something with result ...
|
||||||
|
|
||||||
|
executes coroutines at least as aggressively as the classic eventlet
|
||||||
|
idiom:
|
||||||
|
|
||||||
|
events = [pool.execute(function, *args) for args in iterable]
|
||||||
|
for event in events:
|
||||||
|
result = event.wait()
|
||||||
|
# ... do something with result ...
|
||||||
|
|
||||||
|
even without a distinct event object for every arg tuple in iterable,
|
||||||
|
and despite the funny flow control from interleaving launches of new
|
||||||
|
coroutines with yields of completed coroutines' results.
|
||||||
|
|
||||||
|
(The use case that makes this function preferable to the classic idiom
|
||||||
|
above is when the iterable, which may itself be a generator, produces
|
||||||
|
millions of items.)
|
||||||
|
|
||||||
|
>>> from eventlet import coros
|
||||||
|
>>> import string
|
||||||
|
>>> pool = coros.CoroutinePool(max_size=5)
|
||||||
|
>>> pausers = [coros.event() for x in xrange(2)]
|
||||||
|
>>> def longtask(evt, desc):
|
||||||
|
... print "%s woke up with %s" % (desc, evt.wait())
|
||||||
|
...
|
||||||
|
>>> pool.launch_all(longtask, zip(pausers, "AB"))
|
||||||
|
>>> def quicktask(desc):
|
||||||
|
... print "returning %s" % desc
|
||||||
|
... return desc
|
||||||
|
...
|
||||||
|
|
||||||
|
(Instead of using a for loop, step through generate_results()
|
||||||
|
items individually to illustrate timing)
|
||||||
|
|
||||||
|
>>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
|
||||||
|
>>> print step.next()
|
||||||
|
returning a
|
||||||
|
returning b
|
||||||
|
returning c
|
||||||
|
a
|
||||||
|
>>> print step.next()
|
||||||
|
b
|
||||||
|
>>> print step.next()
|
||||||
|
c
|
||||||
|
>>> print step.next()
|
||||||
|
returning d
|
||||||
|
returning e
|
||||||
|
returning f
|
||||||
|
d
|
||||||
|
>>> pausers[0].send("A")
|
||||||
|
>>> print step.next()
|
||||||
|
e
|
||||||
|
>>> print step.next()
|
||||||
|
f
|
||||||
|
>>> print step.next()
|
||||||
|
A woke up with A
|
||||||
|
returning g
|
||||||
|
returning h
|
||||||
|
returning i
|
||||||
|
g
|
||||||
|
>>> print "".join([step.next() for x in xrange(3)])
|
||||||
|
returning j
|
||||||
|
returning k
|
||||||
|
returning l
|
||||||
|
returning m
|
||||||
|
hij
|
||||||
|
>>> pausers[1].send("B")
|
||||||
|
>>> print "".join([step.next() for x in xrange(4)])
|
||||||
|
B woke up with B
|
||||||
|
returning n
|
||||||
|
returning o
|
||||||
|
returning p
|
||||||
|
returning q
|
||||||
|
klmn
|
||||||
|
"""
|
||||||
|
# Get an iterator because of our funny nested loop below. Wrap the
|
||||||
|
# iterable in enumerate() so we count items that come through.
|
||||||
|
tuples = iter(enumerate(iterable))
|
||||||
|
# If the iterable is empty, this whole function is a no-op, and we can
|
||||||
|
# save ourselves some grief by just quitting out. In particular, once
|
||||||
|
# we enter the outer loop below, we're going to wait on the queue --
|
||||||
|
# but if we launched no coroutines with that queue as the destination,
|
||||||
|
# we could end up waiting a very long time.
|
||||||
|
try:
|
||||||
|
index, args = tuples.next()
|
||||||
|
except StopIteration:
|
||||||
|
return
|
||||||
|
# From this point forward, 'args' is the current arguments tuple and
|
||||||
|
# 'index+1' counts how many such tuples we've seen.
|
||||||
|
# This implementation relies on the fact that _execute() accepts an
|
||||||
|
# event-like object, and -- unless it's None -- the completed
|
||||||
|
# coroutine calls send(result). We slyly pass a queue rather than an
|
||||||
|
# event -- the same queue instance for all coroutines. This is why our
|
||||||
|
# queue interface intentionally resembles the event interface.
|
||||||
|
q = coros.queue(max_size=qsize)
|
||||||
|
# How many results have we yielded so far?
|
||||||
|
finished = 0
|
||||||
|
# This first loop is only until we've launched all the coroutines. Its
|
||||||
|
# complexity is because if iterable contains more args tuples than the
|
||||||
|
# size of our pool, attempting to _execute() the (poolsize+1)th
|
||||||
|
# coroutine would suspend until something completes and send()s its
|
||||||
|
# result to our queue. But to keep down queue overhead and to maximize
|
||||||
|
# responsiveness to our caller, we'd rather suspend on reading the
|
||||||
|
# queue. So we stuff the pool as full as we can, then wait for
|
||||||
|
# something to finish, then stuff more coroutines into the pool.
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
# Before each yield, start as many new coroutines as we can fit.
|
||||||
|
# (The self.free() test isn't 100% accurate: if we happen to be
|
||||||
|
# executing in one of the pool's coroutines, we could _execute()
|
||||||
|
# without waiting even if self.free() reports 0. See _execute().)
|
||||||
|
# The point is that we don't want to wait in the _execute() call,
|
||||||
|
# we want to wait in the q.wait() call.
|
||||||
|
# IMPORTANT: at start, and whenever we've caught up with all
|
||||||
|
# coroutines we've launched so far, we MUST iterate this inner
|
||||||
|
# loop at least once, regardless of self.free() -- otherwise the
|
||||||
|
# q.wait() call below will deadlock!
|
||||||
|
# Recall that index is the index of the NEXT args tuple that we
|
||||||
|
# haven't yet launched. Therefore it counts how many args tuples
|
||||||
|
# we've launched so far.
|
||||||
|
while self.free() > 0 or finished == index:
|
||||||
|
# Just like the implementation of execute_async(), save that
|
||||||
|
# we're passing our queue instead of None as the "event" to
|
||||||
|
# which to send() the result.
|
||||||
|
self._execute(q, function, args, {})
|
||||||
|
# We've consumed that args tuple, advance to next.
|
||||||
|
index, args = tuples.next()
|
||||||
|
# Okay, we've filled up the pool again, yield a result -- which
|
||||||
|
# will probably wait for a coroutine to complete. Although we do
|
||||||
|
# have q.ready(), so we could iterate without waiting, we avoid
|
||||||
|
# that because every yield could involve considerable real time.
|
||||||
|
# We don't know how long it takes to return from yield, so every
|
||||||
|
# time we do, take the opportunity to stuff more requests into the
|
||||||
|
# pool before yielding again.
|
||||||
|
yield q.wait()
|
||||||
|
# Be sure to count results so we know when to stop!
|
||||||
|
finished += 1
|
||||||
|
except StopIteration:
|
||||||
|
pass
|
||||||
|
# Here we've exhausted the input iterable. index+1 is the total number
|
||||||
|
# of coroutines we've launched. We probably haven't yielded that many
|
||||||
|
# results yet. Wait for the rest of the results, yielding them as they
|
||||||
|
# arrive.
|
||||||
|
while finished < index + 1:
|
||||||
|
yield q.wait()
|
||||||
|
finished += 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__=='__main__':
|
||||||
|
import doctest
|
||||||
|
doctest.testmod()
|
||||||
|
Reference in New Issue
Block a user