From 01d7f6dfc83605a2255676b5942851619a7ced26 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Mon, 8 Jun 2009 14:50:38 +0700 Subject: [PATCH] remove CoroutinePool from pools --- eventlet/pools.py | 456 ---------------------------------------- greentest/pools_test.py | 184 ---------------- 2 files changed, 640 deletions(-) diff --git a/eventlet/pools.py b/eventlet/pools.py index 0ddbd1d..7f65764 100644 --- a/eventlet/pools.py +++ b/eventlet/pools.py @@ -213,462 +213,6 @@ class ExceptionWrapper(object): self.e = e -class CoroutinePool(Pool): - """ Like a thread pool, but with coroutines. - - Coroutine pools are useful for splitting up tasks or globally controlling - concurrency. You don't retrieve the coroutines directly with get() -- - instead use the execute() and execute_async() methods to run code. - - >>> from eventlet import coros, api - >>> p = coros.CoroutinePool(max_size=2) - >>> def foo(a): - ... print "foo", a - ... - >>> evt = p.execute(foo, 1) - >>> evt.wait() - foo 1 - - Once the pool is exhausted, calling an execute forces a yield. - - >>> p.execute_async(foo, 2) - >>> p.execute_async(foo, 3) - >>> p.free() - 0 - >>> p.execute_async(foo, 4) - foo 2 - foo 3 - - >>> api.sleep(0) - foo 4 - """ - - def __init__(self, min_size=0, max_size=4, track_events=False): - self._greenlets = set() - if track_events: - self._tracked_events = [] - self._next_event = None - else: - self._tracked_events = None - self.requested = coros.metaphore() - super(CoroutinePool, self).__init__(min_size, max_size) - -## This doesn't yet pass its own doctest -- but I'm not even sure it's a -## wonderful idea. -## def __del__(self): -## """Experimental: try to prevent the calling script from exiting until -## all coroutines in this pool have run to completion. - -## >>> from eventlet import coros -## >>> pool = coros.CoroutinePool() -## >>> def saw(x): print "I saw %s!" -## ... -## >>> pool.launch_all(saw, "GHI") -## >>> del pool -## I saw G! -## I saw H! -## I saw I! -## """ -## self.wait_all() - - def _main_loop(self, sender): - """ Private, infinite loop run by a pooled coroutine. """ - try: - while True: - recvd = sender.wait() - # Delete the sender's result here because the very - # first event through the loop is referenced by - # spawn_startup, and therefore is not itself deleted. - # This means that we have to free up its argument - # because otherwise said argument persists in memory - # forever. This is generally only a problem in unit - # tests. - sender._result = coros.NOT_USED - - sender = coros.event() - (evt, func, args, kw) = recvd - self._safe_apply(evt, func, args, kw) - #api.get_hub().cancel_timers(api.getcurrent()) - # Likewise, delete these variables or else they will - # be referenced by this frame until replaced by the - # next recvd, which may or may not be a long time from - # now. - del evt, func, args, kw, recvd - - self.put(sender) - finally: - # if we get here, something broke badly, and all we can really - # do is try to keep the pool from leaking items. - # Shouldn't even try to print the exception. - self.put(self.create()) - - def _safe_apply(self, evt, func, args, kw): - """ Private method that runs the function, catches exceptions, and - passes back the return value in the event.""" - try: - result = func(*args, **kw) - if evt is not None: - evt.send(result) - if self._tracked_events is not None: - if self._next_event is None: - self._tracked_events.append(result) - else: - - ne = self._next_event - self._next_event = None - ne.send(result) - except api.GreenletExit, e: - # we're printing this out to see if it ever happens - # in practice - print "GreenletExit raised in coroutine pool", e - if evt is not None: - evt.send(e) # sent as a return value, not an exception - except KeyboardInterrupt: - raise # allow program to exit - except Exception, e: - traceback.print_exc() - if evt is not None: - evt.send(exc=e) - if self._tracked_events is not None: - if self._next_event is None: - self._tracked_events.append(ExceptionWrapper(e)) - else: - ne = self._next_event - self._next_event = None - ne.send(exc=e) - - def _execute(self, evt, func, args, kw): - """ Private implementation of the execute methods. - """ - # if reentering an empty pool, don't try to wait on a coroutine freeing - # itself -- instead, just execute in the current coroutine - if self.free() == 0 and api.getcurrent() in self._greenlets: - self._safe_apply(evt, func, args, kw) - else: - sender = self.get() - sender.send((evt, func, args, kw)) - - def create(self): - """Private implementation of eventlet.pools.Pool - interface. Creates an event and spawns the - _main_loop coroutine, passing the event. - The event is used to send a callable into the - new coroutine, to be executed. - """ - sender = coros.event() - self._greenlets.add(api.spawn(self._main_loop, sender)) - return sender - - def get(self): - """Override of eventlet.pools.Pool interface""" - # Track the number of requested CoroutinePool coroutines - self.requested.inc() - # forward call to base class - return super(CoroutinePool, self).get() - - def put(self, item): - """Override of eventlet.pools.Pool interface""" - # forward call to base class - super(CoroutinePool, self).put(item) - # Track the number of outstanding CoroutinePool coroutines - self.requested.dec() - - def execute(self, func, *args, **kw): - """Execute func in one of the coroutines maintained - by the pool, when one is free. - - Immediately returns an eventlet.coros.event object which - func's result will be sent to when it is available. - - >>> from eventlet import coros - >>> p = coros.CoroutinePool() - >>> evt = p.execute(lambda a: ('foo', a), 1) - >>> evt.wait() - ('foo', 1) - """ - receiver = coros.event() - self._execute(receiver, func, args, kw) - return receiver - - def execute_async(self, func, *args, **kw): - """Execute func in one of the coroutines maintained - by the pool, when one is free. - - No return value is provided. - >>> from eventlet import coros, api - >>> p = coros.CoroutinePool() - >>> def foo(a): - ... print "foo", a - ... - >>> p.execute_async(foo, 1) - >>> api.sleep(0) - foo 1 - """ - self._execute(None, func, args, kw) - - def wait(self): - """Wait for the next execute in the pool to complete, - and return the result. - - You must pass track_events=True to the CoroutinePool constructor - in order to use this method. - """ - assert self._tracked_events is not None, ( - "Must pass track_events=True to the constructor to use CoroutinePool.wait()") - if self._next_event is not None: - return self._next_event.wait() - - if not self._tracked_events: - self._next_event = coros.event() - return self._next_event.wait() - - result = self._tracked_events.pop(0) - if isinstance(result, ExceptionWrapper): - raise result.e - - if not self._tracked_events: - self._next_event = coros.event() - return result - - def killall(self): - for g in self._greenlets: - api.kill(g) - - def wait_all(self): - """Wait until all coroutines started either by execute() or - execute_async() have completed. If you kept the event objects returned - by execute(), you can then call their individual wait() methods to - retrieve results with no further actual waiting. - - >>> from eventlet import coros - >>> pool = coros.CoroutinePool() - >>> pool.wait_all() - >>> def hi(name): - ... print "Hello, %s!" % name - ... return name - ... - >>> evt = pool.execute(hi, "world") - >>> pool.execute_async(hi, "darkness, my old friend") - >>> pool.wait_all() - Hello, world! - Hello, darkness, my old friend! - >>> evt.wait() - 'world' - >>> pool.wait_all() - """ - self.requested.wait() - - def launch_all(self, function, iterable): - """For each tuple (sequence) in iterable, launch function(*tuple) in - its own coroutine -- like itertools.starmap(), but in parallel. - Discard values returned by function(). You should call wait_all() to - wait for all coroutines, newly-launched plus any previously-submitted - execute() or execute_async() calls, to complete. - - >>> from eventlet import coros - >>> pool = coros.CoroutinePool() - >>> def saw(x): - ... print "I saw %s!" % x - ... - >>> pool.launch_all(saw, "ABC") - >>> pool.wait_all() - I saw A! - I saw B! - I saw C! - """ - for tup in iterable: - self.execute_async(function, *tup) - - def process_all(self, function, iterable): - """For each tuple (sequence) in iterable, launch function(*tuple) in - its own coroutine -- like itertools.starmap(), but in parallel. - Discard values returned by function(). Don't return until all - coroutines, newly-launched plus any previously-submitted execute() or - execute_async() calls, have completed. - - >>> from eventlet import coros - >>> pool = coros.CoroutinePool() - >>> def saw(x): print "I saw %s!" % x - ... - >>> pool.process_all(saw, "DEF") - I saw D! - I saw E! - I saw F! - """ - self.launch_all(function, iterable) - self.wait_all() - - def generate_results(self, function, iterable, qsize=None): - """For each tuple (sequence) in iterable, launch function(*tuple) in - its own coroutine -- like itertools.starmap(), but in parallel. - Yield each of the values returned by function(), in the order they're - completed rather than the order the coroutines were launched. - - Iteration stops when we've yielded results for each arguments tuple in - iterable. Unlike wait_all() and process_all(), this function does not - wait for any previously-submitted execute() or execute_async() calls. - - Results are temporarily buffered in a queue. If you pass qsize=, this - value is used to limit the max size of the queue: an attempt to buffer - too many results will suspend the completed CoroutinePool coroutine - until the requesting coroutine (the caller of generate_results()) has - retrieved one or more results by calling this generator-iterator's - next(). - - If any coroutine raises an uncaught exception, that exception will - propagate to the requesting coroutine via the corresponding next() call. - - What I particularly want these tests to illustrate is that using this - generator function: - - for result in generate_results(function, iterable): - # ... do something with result ... - - executes coroutines at least as aggressively as the classic eventlet - idiom: - - events = [pool.execute(function, *args) for args in iterable] - for event in events: - result = event.wait() - # ... do something with result ... - - even without a distinct event object for every arg tuple in iterable, - and despite the funny flow control from interleaving launches of new - coroutines with yields of completed coroutines' results. - - (The use case that makes this function preferable to the classic idiom - above is when the iterable, which may itself be a generator, produces - millions of items.) - - >>> from eventlet import coros - >>> import string - >>> pool = coros.CoroutinePool(max_size=5) - >>> pausers = [coros.event() for x in xrange(2)] - >>> def longtask(evt, desc): - ... print "%s woke up with %s" % (desc, evt.wait()) - ... - >>> pool.launch_all(longtask, zip(pausers, "AB")) - >>> def quicktask(desc): - ... print "returning %s" % desc - ... return desc - ... - - (Instead of using a for loop, step through generate_results() - items individually to illustrate timing) - - >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase)) - >>> print step.next() - returning a - returning b - returning c - a - >>> print step.next() - b - >>> print step.next() - c - >>> print step.next() - returning d - returning e - returning f - d - >>> pausers[0].send("A") - >>> print step.next() - e - >>> print step.next() - f - >>> print step.next() - A woke up with A - returning g - returning h - returning i - g - >>> print "".join([step.next() for x in xrange(3)]) - returning j - returning k - returning l - returning m - hij - >>> pausers[1].send("B") - >>> print "".join([step.next() for x in xrange(4)]) - B woke up with B - returning n - returning o - returning p - returning q - klmn - """ - # Get an iterator because of our funny nested loop below. Wrap the - # iterable in enumerate() so we count items that come through. - tuples = iter(enumerate(iterable)) - # If the iterable is empty, this whole function is a no-op, and we can - # save ourselves some grief by just quitting out. In particular, once - # we enter the outer loop below, we're going to wait on the queue -- - # but if we launched no coroutines with that queue as the destination, - # we could end up waiting a very long time. - try: - index, args = tuples.next() - except StopIteration: - return - # From this point forward, 'args' is the current arguments tuple and - # 'index+1' counts how many such tuples we've seen. - # This implementation relies on the fact that _execute() accepts an - # event-like object, and -- unless it's None -- the completed - # coroutine calls send(result). We slyly pass a queue rather than an - # event -- the same queue instance for all coroutines. This is why our - # queue interface intentionally resembles the event interface. - q = coros.queue(max_size=qsize) - # How many results have we yielded so far? - finished = 0 - # This first loop is only until we've launched all the coroutines. Its - # complexity is because if iterable contains more args tuples than the - # size of our pool, attempting to _execute() the (poolsize+1)th - # coroutine would suspend until something completes and send()s its - # result to our queue. But to keep down queue overhead and to maximize - # responsiveness to our caller, we'd rather suspend on reading the - # queue. So we stuff the pool as full as we can, then wait for - # something to finish, then stuff more coroutines into the pool. - try: - while True: - # Before each yield, start as many new coroutines as we can fit. - # (The self.free() test isn't 100% accurate: if we happen to be - # executing in one of the pool's coroutines, we could _execute() - # without waiting even if self.free() reports 0. See _execute().) - # The point is that we don't want to wait in the _execute() call, - # we want to wait in the q.wait() call. - # IMPORTANT: at start, and whenever we've caught up with all - # coroutines we've launched so far, we MUST iterate this inner - # loop at least once, regardless of self.free() -- otherwise the - # q.wait() call below will deadlock! - # Recall that index is the index of the NEXT args tuple that we - # haven't yet launched. Therefore it counts how many args tuples - # we've launched so far. - while self.free() > 0 or finished == index: - # Just like the implementation of execute_async(), save that - # we're passing our queue instead of None as the "event" to - # which to send() the result. - self._execute(q, function, args, {}) - # We've consumed that args tuple, advance to next. - index, args = tuples.next() - # Okay, we've filled up the pool again, yield a result -- which - # will probably wait for a coroutine to complete. Although we do - # have q.ready(), so we could iterate without waiting, we avoid - # that because every yield could involve considerable real time. - # We don't know how long it takes to return from yield, so every - # time we do, take the opportunity to stuff more requests into the - # pool before yielding again. - yield q.wait() - # Be sure to count results so we know when to stop! - finished += 1 - except StopIteration: - pass - # Here we've exhausted the input iterable. index+1 is the total number - # of coroutines we've launched. We probably haven't yielded that many - # results yet. Wait for the rest of the results, yielding them as they - # arrive. - while finished < index + 1: - yield q.wait() - finished += 1 - if __name__=='__main__': import doctest doctest.testmod() diff --git a/greentest/pools_test.py b/greentest/pools_test.py index 3b70560..3fffdf0 100644 --- a/greentest/pools_test.py +++ b/greentest/pools_test.py @@ -240,190 +240,6 @@ class TestFan(tests.TestCase): self.assertRaises(pools.SomeFailed, self.pool.fan, my_failing_callable, range(4)) -class TestCoroutinePool(tests.TestCase): - mode = 'static' - def setUp(self): - # raise an exception if we're waiting forever - self._cancel_timeout = api.exc_after(1, TestTookTooLong()) - - def tearDown(self): - self._cancel_timeout.cancel() - - def test_execute_async(self): - done = coros.event() - def some_work(): - done.send() - pool = pools.CoroutinePool(0, 2) - pool.execute_async(some_work) - done.wait() - - def test_execute(self): - value = 'return value' - def some_work(): - return value - pool = pools.CoroutinePool(0, 2) - worker = pool.execute(some_work) - self.assertEqual(value, worker.wait()) - - def test_multiple_coros(self): - evt = coros.event() - results = [] - def producer(): - results.append('prod') - evt.send() - - def consumer(): - results.append('cons1') - evt.wait() - results.append('cons2') - - pool = pools.CoroutinePool(0, 2) - done = pool.execute(consumer) - pool.execute_async(producer) - done.wait() - self.assertEquals(['cons1', 'prod', 'cons2'], results) - - def test_timer_cancel(self): - def some_work(): - t = timer.Timer(5, lambda: None) - t.autocancellable = True - t.schedule() - return t - pool = pools.CoroutinePool(0, 2) - worker = pool.execute(some_work) - t = worker.wait() - api.sleep(0) - self.assertEquals(t.cancelled, True) - - def test_reentrant(self): - pool = pools.CoroutinePool(0,1) - def reenter(): - waiter = pool.execute(lambda a: a, 'reenter') - self.assertEqual('reenter', waiter.wait()) - - outer_waiter = pool.execute(reenter) - outer_waiter.wait() - - evt = coros.event() - def reenter_async(): - pool.execute_async(lambda a: a, 'reenter') - evt.send('done') - - pool.execute_async(reenter_async) - evt.wait() - - def test_horrible_main_loop_death(self): - # testing the case that causes the run_forever - # method to exit unwantedly - pool = pools.CoroutinePool(min_size=1, max_size=1) - def crash(*args, **kw): - raise RuntimeError("Whoa") - class FakeFile(object): - write = crash - - # we're going to do this by causing the traceback.print_exc in - # safe_apply to raise an exception and thus exit _main_loop - normal_err = sys.stderr - try: - sys.stderr = FakeFile() - waiter = pool.execute(crash) - self.assertRaises(RuntimeError, waiter.wait) - # the pool should have something free at this point since the - # waiter returned - self.assertEqual(pool.free(), 1) - # shouldn't block when trying to get - t = api.exc_after(0.1, api.TimeoutError) - self.assert_(pool.get()) - t.cancel() - finally: - sys.stderr = normal_err - - def test_track_events(self): - pool = pools.CoroutinePool(track_events=True) - for x in range(6): - pool.execute(lambda n: n, x) - for y in range(6): - pool.wait() - - def test_track_slow_event(self): - pool = pools.CoroutinePool(track_events=True) - def slow(): - api.sleep(0.1) - return 'ok' - pool.execute(slow) - self.assertEquals(pool.wait(), 'ok') - - def test_channel_smash(self): - # The premise is that the coroutine in the pool exhibits an - # interest in receiving data from the channel, but then times - # out and gets recycled, so it ceases to care about what gets - # sent over the channel. The pool should be able to tell the - # channel about the sudden change of heart, or else, when we - # eventually do send something into the channel it will catch - # the coroutine pool's coroutine in an awkward place, losing - # the data that we're sending. - from eventlet import pools - pool = pools.CoroutinePool(min_size=1, max_size=1) - tp = pools.TokenPool(max_size=1) - token = tp.get() # empty pool - def do_receive(tp): - api.exc_after(0, RuntimeError()) - try: - t = tp.get() - self.fail("Shouldn't have recieved anything from the pool") - except RuntimeError: - return 'timed out' - - # the execute makes the pool expect that coroutine, but then - # immediately cuts bait - e1 = pool.execute(do_receive, tp) - self.assertEquals(e1.wait(), 'timed out') - - # the pool can get some random item back - def send_wakeup(tp): - tp.put('wakeup') - api.spawn(send_wakeup, tp) - - # now we ask the pool to run something else, which should not - # be affected by the previous send at all - def resume(): - return 'resumed' - e2 = pool.execute(resume) - self.assertEquals(e2.wait(), 'resumed') - - # we should be able to get out the thing we put in there, too - self.assertEquals(tp.get(), 'wakeup') - - def test_channel_death(self): - # In here, we have a coroutine trying to receive data from a - # channel, but timing out immediately and dying. The channel - # should be smart enough to not try to send data to a dead - # coroutine, because if it tries to, it'll lose the data. - from eventlet import pools - tp = pools.TokenPool(max_size=1) - token = tp.get() - e1 = coros.event() - def do_receive(evt, tp): - api.exc_after(0, RuntimeError()) - try: - t = tp.get() - evt.send(t) - except RuntimeError: - evt.send('timed out') - - # the execute gets the pool to add a waiter, but then kills - # itself off - api.spawn(do_receive, e1, tp) - self.assertEquals(e1.wait(), 'timed out') - - def send_wakeup(tp): - tp.put('wakeup') - api.spawn(send_wakeup, tp) - - # should be able to retrieve the message - self.assertEquals(tp.get(), 'wakeup') - - if __name__ == '__main__': tests.main()