diff --git a/eventlet/coropool.py b/eventlet/coropool.py index 75c6b3b..0436d8b 100644 --- a/eventlet/coropool.py +++ b/eventlet/coropool.py @@ -49,6 +49,11 @@ class Pool(object): execute_async = execute + def _execute(self, evt, func, args, kw): + p = self.execute(func, args, kw) + p.link(evt) + return p + def waitall(self): return self.procs.waitall() @@ -62,4 +67,217 @@ class Pool(object): def killall(self): return self.procs.killall() - + def launch_all(self, function, iterable): + """For each tuple (sequence) in iterable, launch function(*tuple) in + its own coroutine -- like itertools.starmap(), but in parallel. + Discard values returned by function(). You should call wait_all() to + wait for all coroutines, newly-launched plus any previously-submitted + execute() or execute_async() calls, to complete. + + >>> pool = Pool() + >>> def saw(x): + ... print "I saw %s!" % x + ... + >>> pool.launch_all(saw, "ABC") + >>> pool.wait_all() + I saw A! + I saw B! + I saw C! + """ + for tup in iterable: + self.execute(function, *tup) + + def process_all(self, function, iterable): + """For each tuple (sequence) in iterable, launch function(*tuple) in + its own coroutine -- like itertools.starmap(), but in parallel. + Discard values returned by function(). Don't return until all + coroutines, newly-launched plus any previously-submitted execute() or + execute_async() calls, have completed. + + >>> from eventlet import coros + >>> pool = coros.CoroutinePool() + >>> def saw(x): print "I saw %s!" % x + ... + >>> pool.process_all(saw, "DEF") + I saw D! + I saw E! + I saw F! + """ + self.launch_all(function, iterable) + self.wait_all() + + def generate_results(self, function, iterable, qsize=None): + """For each tuple (sequence) in iterable, launch function(*tuple) in + its own coroutine -- like itertools.starmap(), but in parallel. + Yield each of the values returned by function(), in the order they're + completed rather than the order the coroutines were launched. + + Iteration stops when we've yielded results for each arguments tuple in + iterable. Unlike wait_all() and process_all(), this function does not + wait for any previously-submitted execute() or execute_async() calls. + + Results are temporarily buffered in a queue. If you pass qsize=, this + value is used to limit the max size of the queue: an attempt to buffer + too many results will suspend the completed CoroutinePool coroutine + until the requesting coroutine (the caller of generate_results()) has + retrieved one or more results by calling this generator-iterator's + next(). + + If any coroutine raises an uncaught exception, that exception will + propagate to the requesting coroutine via the corresponding next() call. + + What I particularly want these tests to illustrate is that using this + generator function: + + for result in generate_results(function, iterable): + # ... do something with result ... + + executes coroutines at least as aggressively as the classic eventlet + idiom: + + events = [pool.execute(function, *args) for args in iterable] + for event in events: + result = event.wait() + # ... do something with result ... + + even without a distinct event object for every arg tuple in iterable, + and despite the funny flow control from interleaving launches of new + coroutines with yields of completed coroutines' results. + + (The use case that makes this function preferable to the classic idiom + above is when the iterable, which may itself be a generator, produces + millions of items.) + + >>> from eventlet import coros + >>> import string + >>> pool = coros.CoroutinePool(max_size=5) + >>> pausers = [coros.event() for x in xrange(2)] + >>> def longtask(evt, desc): + ... print "%s woke up with %s" % (desc, evt.wait()) + ... + >>> pool.launch_all(longtask, zip(pausers, "AB")) + >>> def quicktask(desc): + ... print "returning %s" % desc + ... return desc + ... + + (Instead of using a for loop, step through generate_results() + items individually to illustrate timing) + + >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase)) + >>> print step.next() + returning a + returning b + returning c + a + >>> print step.next() + b + >>> print step.next() + c + >>> print step.next() + returning d + returning e + returning f + d + >>> pausers[0].send("A") + >>> print step.next() + e + >>> print step.next() + f + >>> print step.next() + A woke up with A + returning g + returning h + returning i + g + >>> print "".join([step.next() for x in xrange(3)]) + returning j + returning k + returning l + returning m + hij + >>> pausers[1].send("B") + >>> print "".join([step.next() for x in xrange(4)]) + B woke up with B + returning n + returning o + returning p + returning q + klmn + """ + # Get an iterator because of our funny nested loop below. Wrap the + # iterable in enumerate() so we count items that come through. + tuples = iter(enumerate(iterable)) + # If the iterable is empty, this whole function is a no-op, and we can + # save ourselves some grief by just quitting out. In particular, once + # we enter the outer loop below, we're going to wait on the queue -- + # but if we launched no coroutines with that queue as the destination, + # we could end up waiting a very long time. + try: + index, args = tuples.next() + except StopIteration: + return + # From this point forward, 'args' is the current arguments tuple and + # 'index+1' counts how many such tuples we've seen. + # This implementation relies on the fact that _execute() accepts an + # event-like object, and -- unless it's None -- the completed + # coroutine calls send(result). We slyly pass a queue rather than an + # event -- the same queue instance for all coroutines. This is why our + # queue interface intentionally resembles the event interface. + q = coros.queue(max_size=qsize) + # How many results have we yielded so far? + finished = 0 + # This first loop is only until we've launched all the coroutines. Its + # complexity is because if iterable contains more args tuples than the + # size of our pool, attempting to _execute() the (poolsize+1)th + # coroutine would suspend until something completes and send()s its + # result to our queue. But to keep down queue overhead and to maximize + # responsiveness to our caller, we'd rather suspend on reading the + # queue. So we stuff the pool as full as we can, then wait for + # something to finish, then stuff more coroutines into the pool. + try: + while True: + # Before each yield, start as many new coroutines as we can fit. + # (The self.free() test isn't 100% accurate: if we happen to be + # executing in one of the pool's coroutines, we could _execute() + # without waiting even if self.free() reports 0. See _execute().) + # The point is that we don't want to wait in the _execute() call, + # we want to wait in the q.wait() call. + # IMPORTANT: at start, and whenever we've caught up with all + # coroutines we've launched so far, we MUST iterate this inner + # loop at least once, regardless of self.free() -- otherwise the + # q.wait() call below will deadlock! + # Recall that index is the index of the NEXT args tuple that we + # haven't yet launched. Therefore it counts how many args tuples + # we've launched so far. + while self.free() > 0 or finished == index: + # Just like the implementation of execute_async(), save that + # we're passing our queue instead of None as the "event" to + # which to send() the result. + self._execute(q, function, args, {}) + # We've consumed that args tuple, advance to next. + index, args = tuples.next() + # Okay, we've filled up the pool again, yield a result -- which + # will probably wait for a coroutine to complete. Although we do + # have q.ready(), so we could iterate without waiting, we avoid + # that because every yield could involve considerable real time. + # We don't know how long it takes to return from yield, so every + # time we do, take the opportunity to stuff more requests into the + # pool before yielding again. + yield q.wait() + # Be sure to count results so we know when to stop! + finished += 1 + except StopIteration: + pass + # Here we've exhausted the input iterable. index+1 is the total number + # of coroutines we've launched. We probably haven't yielded that many + # results yet. Wait for the rest of the results, yielding them as they + # arrive. + while finished < index + 1: + yield q.wait() + finished += 1 + + +if __name__=='__main__': + import doctest + doctest.testmod()