From 0316c27062adc09bb5837a433640d20a6552c7f2 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 24 Dec 2009 15:29:34 -0500 Subject: [PATCH] Renamed Parallel to GreenPool, added GreenPile abstraction for handling batches of work. --- eventlet/parallel.py | 169 ++++++++++++------- tests/parallel_test.py | 361 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 452 insertions(+), 78 deletions(-) diff --git a/eventlet/parallel.py b/eventlet/parallel.py index 180177e..dda970f 100644 --- a/eventlet/parallel.py +++ b/eventlet/parallel.py @@ -1,36 +1,35 @@ from eventlet.coros import Semaphore, Queue, Event -from eventlet.api import spawn, getcurrent +from eventlet import api +from collections import deque import sys -__all__ = ['Parallel'] +__all__ = ['GreenPool', 'GreenPile'] -class Parallel(object): +class GreenPool(object): """ The Parallel class allows you to easily control coroutine concurrency. """ - def __init__(self, max_size): - self.max_size = max_size + def __init__(self, size): + self.size = size self.coroutines_running = set() - self.sem = Semaphore(max_size) + self.sem = Semaphore(size) self.no_coros_running = Event() - self._results = Queue() - def resize(self, new_max_size): + def resize(self, new_size): """ Change the max number of coroutines doing work at any given time. - If resize is called when there are more than *new_max_size* + If resize is called when there are more than *new_size* coroutines already working on tasks, they will be allowed to complete but no new tasks will be allowed to get launched until enough coroutines finish their - tasks to drop the overall quantity below *new_max_size*. Until then, the + tasks to drop the overall quantity below *new_size*. Until then, the return value of free() will be negative. """ - max_size_delta = new_max_size - self.max_size - self.sem.counter += max_size_delta - self.max_size = new_max_size + size_delta = new_size - self.size + self.sem.counter += size_delta + self.size = new_size - @property - def current_size(self): - """ The current size is the number of coroutines that are currently - executing functions in the Parallel's pool.""" + def running(self): + """ Returns the number of coroutines that are currently executing + functions in the Parallel's pool.""" return len(self.coroutines_running) def free(self): @@ -40,14 +39,7 @@ class Parallel(object): def spawn(self, func, *args, **kwargs): """Run func(*args, **kwargs) in its own green thread. """ - return self._spawn(False, func, *args, **kwargs) - - def spawn_q(self, func, *args, **kwargs): - """Run func(*args, **kwargs) in its own green thread. - - The results of func are stuck in the results() iterator. - """ - self._spawn(True, func, *args, **kwargs) + return self._spawn(func, *args, **kwargs) def spawn_n(self, func, *args, **kwargs): """ Create a coroutine to run func(*args, **kwargs). @@ -55,60 +47,115 @@ class Parallel(object): Returns None; the results of the function are not retrievable. The results of the function are not put into the results() iterator. """ - self._spawn(False, func, *args, **kwargs) + self._spawn(func, *args, **kwargs) - def _spawn(self, send_result, func, *args, **kwargs): + def _spawn(self, func, *args, **kwargs): # if reentering an empty pool, don't try to wait on a coroutine freeing # itself -- instead, just execute in the current coroutine - current = getcurrent() + current = api.getcurrent() if self.sem.locked() and current in self.coroutines_running: - func(*args, **kwargs) + # a bit hacky to use the GT without switching to it + gt = api.GreenThread(current) + gt.main(func, args, kwargs) + return gt else: self.sem.acquire() - p = spawn(func, *args, **kwargs) + gt = api.spawn(func, *args, **kwargs) if not self.coroutines_running: self.no_coros_running = Event() - self.coroutines_running.add(p) - p.link(self._spawn_done, send_result=send_result, coro=p) - return p + self.coroutines_running.add(gt) + gt.link(self._spawn_done, coro=gt) + return gt def waitall(self): """Waits until all coroutines in the pool are finished working.""" self.no_coros_running.wait() - def _spawn_done(self, result=None, exc=None, send_result=False, coro=None): + def _spawn_done(self, result=None, exc=None, coro=None): self.sem.release() self.coroutines_running.remove(coro) - if send_result: - self._results.send(result) # if done processing (no more work is waiting for processing), # send StopIteration so that the queue knows it's done - if self.sem.balance == self.max_size: - if send_result: - self._results.send_exception(StopIteration) - self.no_coros_running.send(None) - - def wait(self): - """Wait for the next execute in the pool to complete, - and return the result.""" - return self.results.wait() + if self.sem.balance == self.size: + self.no_coros_running.send(None) + + def waiting(self): + """Return the number of coroutines waiting to execute. + """ + if self.sem.balance < 0: + return -self.sem.balance + else: + return 0 + + +try: + next +except NameError: + def next(it): + try: + it.next() + except AttributeError: + raise TypeError("%s object is not an iterator" % type(it)) - def results(self): - """ Returns an iterator over the results from the worker coroutines.""" - return self._results +class GreenPile(object): + def __init__(self, size_or_pool): + if isinstance(size_or_pool, GreenPool): + self.pool = size_or_pool + else: + self.pool = GreenPool(size_or_pool) + self.waiters = Queue() + self.counter = 0 + + def spawn(self, func, *args, **kw): + self.counter += 1 + try: + gt = self.pool.spawn(func, *args, **kw) + self.waiters.send(gt) + except: + self.counter -= 1 + raise - def _do_spawn_all(self, func, iterable): - for i in iterable: - # if the list is composed of single arguments, use those - if not isinstance(i, (tuple, list)): - self.spawn_q(func, i) - else: - self.spawn_q(func, *i) + def __iter__(self): + return self - def spawn_all(self, func, iterable): - """ Applies *func* over every item in *iterable* using the concurrency - present in the pool. This function is a generator which yields the - results of *func* as applied to the members of the iterable.""" + def next(self): + if self.counter == 0: + raise StopIteration() + try: + return self.waiters.wait().wait() + finally: + self.counter -= 1 + + def _do_map(self, func, iterables): + while True: + try: + i = map(next, iterables) + self.spawn(func, *i) + except StopIteration: + break + + def imap(self, function, *iterables): + """This is the same as itertools.imap, except that *func* is executed + with the specified concurrency. - spawn(self._do_spawn_all, func, iterable) - return self.results() \ No newline at end of file + Make an iterator that computes the *function* using arguments from + each of the *iterables*, and using the coroutine concurrency specified + in the GreenPile's constructor. Like map() except that it returns an + iterator instead of a list and that it stops when the shortest iterable + is exhausted instead of filling in None for shorter iterables. + """ + if function is None: + function = lambda *a: a + # spawn first item to prime the pump + try: + it = map(iter, iterables) + i = map(next, it) + self.spawn(function, *i) + except StopIteration: + # if the iterable has no items, we need + # to defer the StopIteration till someone + # iterates over us + self.spawn(lambda: next(iter([]))) + # spin off a coroutine to launch the rest of the items + api.spawn(self._do_map, function, it) + return self diff --git a/tests/parallel_test.py b/tests/parallel_test.py index 497bcf3..22afcae 100644 --- a/tests/parallel_test.py +++ b/tests/parallel_test.py @@ -1,33 +1,33 @@ -from eventlet import api, parallel -import unittest +import gc +import random -class Spawn(unittest.TestCase): +from eventlet import api, hubs, parallel, coros +import tests + +class Spawn(tests.LimitedTestCase): + # TODO: move this test elsewhere def test_simple(self): def f(a, b=None): return (a,b) - coro = parallel.spawn(f, 1, b=2) - self.assertEquals(coro.wait(), (1,2)) + gt = parallel.api. spawn(f, 1, b=2) + self.assertEquals(gt.wait(), (1,2)) def passthru(a): api.sleep(0.01) return a -class Parallel(unittest.TestCase): - def test_parallel(self): - p = parallel.Parallel(4) +class GreenPool(tests.LimitedTestCase): + def test_spawn(self): + p = parallel.GreenPool(4) + waiters = [] for i in xrange(10): - p.spawn_q(passthru, i) - result_list = list(p.results()) - self.assertEquals(result_list, range(10)) - - def test_spawn_all(self): - p = parallel.Parallel(4) - result_list = list(p.spawn_all(passthru, xrange(10))) - self.assertEquals(result_list, range(10)) + waiters.append(p.spawn(passthru, i)) + results = [waiter.wait() for waiter in waiters] + self.assertEquals(results, list(xrange(10))) def test_spawn_n(self): - p = parallel.Parallel(4) + p = parallel.GreenPool(4) results_closure = [] def do_something(a): api.sleep(0.01) @@ -36,4 +36,331 @@ class Parallel(unittest.TestCase): p.spawn(do_something, i) p.waitall() self.assertEquals(results_closure, range(10)) + + def test_waiting(self): + pool = parallel.GreenPool(1) + done = coros.Event() + def consume(): + done.wait() + def waiter(pool): + gt = pool.spawn(consume) + gt.wait() + + waiters = [] + self.assertEqual(pool.running(), 0) + waiters.append(api.spawn(waiter, pool)) + api.sleep(0) + self.assertEqual(pool.waiting(), 0) + waiters.append(api.spawn(waiter, pool)) + api.sleep(0) + self.assertEqual(pool.waiting(), 1) + waiters.append(api.spawn(waiter, pool)) + api.sleep(0) + self.assertEqual(pool.waiting(), 2) + self.assertEqual(pool.running(), 1) + done.send(None) + for w in waiters: + w.wait() + self.assertEqual(pool.waiting(), 0) + self.assertEqual(pool.running(), 0) + + def test_multiple_coros(self): + evt = coros.Event() + results = [] + def producer(): + results.append('prod') + evt.send() + def consumer(): + results.append('cons1') + evt.wait() + results.append('cons2') + pool = parallel.GreenPool(2) + done = pool.spawn(consumer) + pool.spawn_n(producer) + done.wait() + self.assertEquals(['cons1', 'prod', 'cons2'], results) + + def test_timer_cancel(self): + # this test verifies that local timers are not fired + # outside of the context of the spawn + timer_fired = [] + def fire_timer(): + timer_fired.append(True) + def some_work(): + hubs.get_hub().schedule_call_local(0, fire_timer) + pool = parallel.GreenPool(2) + worker = pool.spawn(some_work) + worker.wait() + api.sleep(0) + api.sleep(0) + self.assertEquals(timer_fired, []) + + def test_reentrant(self): + pool = parallel.GreenPool(1) + def reenter(): + waiter = pool.spawn(lambda a: a, 'reenter') + self.assertEqual('reenter', waiter.wait()) + + outer_waiter = pool.spawn(reenter) + outer_waiter.wait() + + evt = coros.Event() + def reenter_async(): + pool.spawn_n(lambda a: a, 'reenter') + evt.send('done') + + pool.spawn_n(reenter_async) + self.assertEquals('done', evt.wait()) + + def assert_pool_has_free(self, pool, num_free): + def wait_long_time(e): + e.wait() + timer = api.exc_after(1, api.TimeoutError) + try: + evt = coros.Event() + for x in xrange(num_free): + pool.spawn(wait_long_time, evt) + # if the pool has fewer free than we expect, + # then we'll hit the timeout error + finally: + timer.cancel() + + # if the runtime error is not raised it means the pool had + # some unexpected free items + timer = api.exc_after(0, RuntimeError) + try: + self.assertRaises(RuntimeError, pool.spawn, wait_long_time, evt) + finally: + timer.cancel() + + # clean up by causing all the wait_long_time functions to return + evt.send(None) + api.sleep(0) + api.sleep(0) + + def test_resize(self): + pool = parallel.GreenPool(2) + evt = coros.Event() + def wait_long_time(e): + e.wait() + pool.spawn(wait_long_time, evt) + pool.spawn(wait_long_time, evt) + self.assertEquals(pool.free(), 0) + self.assertEquals(pool.running(), 2) + self.assert_pool_has_free(pool, 0) + + # verify that the pool discards excess items put into it + pool.resize(1) + + # cause the wait_long_time functions to return, which will + # trigger puts to the pool + evt.send(None) + api.sleep(0) + api.sleep(0) + + self.assertEquals(pool.free(), 1) + self.assertEquals(pool.running(), 0) + self.assert_pool_has_free(pool, 1) + + # resize larger and assert that there are more free items + pool.resize(2) + self.assertEquals(pool.free(), 2) + self.assertEquals(pool.running(), 0) + self.assert_pool_has_free(pool, 2) + + def test_pool_smash(self): + # The premise is that a coroutine in a Pool tries to get a token out + # of a token pool but times out before getting the token. We verify + # that neither pool is adversely affected by this situation. + from eventlet import pools + pool = parallel.GreenPool(1) + tp = pools.TokenPool(max_size=1) + token = tp.get() # empty out the pool + def do_receive(tp): + timer = api.exc_after(0, RuntimeError()) + try: + t = tp.get() + self.fail("Shouldn't have recieved anything from the pool") + except RuntimeError: + return 'timed out' + else: + timer.cancel() + + # the spawn makes the token pool expect that coroutine, but then + # immediately cuts bait + e1 = pool.spawn(do_receive, tp) + self.assertEquals(e1.wait(), 'timed out') + + # the pool can get some random item back + def send_wakeup(tp): + tp.put('wakeup') + gt = api.spawn(send_wakeup, tp) + + # now we ask the pool to run something else, which should not + # be affected by the previous send at all + def resume(): + return 'resumed' + e2 = pool.spawn(resume) + self.assertEquals(e2.wait(), 'resumed') + + # we should be able to get out the thing we put in there, too + self.assertEquals(tp.get(), 'wakeup') + gt.wait() + + def test_spawn_n_2(self): + p = parallel.GreenPool(2) + self.assertEqual(p.free(), 2) + r = [] + def foo(a): + r.append(a) + gt = p.spawn(foo, 1) + self.assertEqual(p.free(), 1) + gt.wait() + self.assertEqual(r, [1]) + api.sleep(0) + self.assertEqual(p.free(), 2) + + #Once the pool is exhausted, spawning forces a yield. + p.spawn_n(foo, 2) + self.assertEqual(1, p.free()) + self.assertEqual(r, [1]) + + p.spawn_n(foo, 3) + self.assertEqual(0, p.free()) + self.assertEqual(r, [1]) + + p.spawn_n(foo, 4) + self.assertEqual(set(r), set([1,2,3])) + api.sleep(0) + self.assertEqual(set(r), set([1,2,3,4])) + +class GreenPile(tests.LimitedTestCase): + def test_imap(self): + p = parallel.GreenPile(4) + result_list = list(p.imap(passthru, xrange(10))) + self.assertEquals(result_list, list(xrange(10))) + + def test_empty_map(self): + p = parallel.GreenPile(4) + result_iter = p.imap(passthru, []) + self.assertRaises(StopIteration, result_iter.next) + + def test_pile(self): + p = parallel.GreenPile(4) + for i in xrange(10): + p.spawn(passthru, i) + result_list = list(p) + self.assertEquals(result_list, list(xrange(10))) + + def test_pile_spawn_times_out(self): + p = parallel.GreenPile(4) + for i in xrange(4): + p.spawn(passthru, i) + # now it should be full and this should time out + api.exc_after(0, api.TimeoutError) + self.assertRaises(api.TimeoutError, p.spawn, passthru, "time out") + # verify that the spawn breakage didn't interrupt the sequence + # and terminates properly + for i in xrange(4,10): + p.spawn(passthru, i) + self.assertEquals(list(p), list(xrange(10))) + + def test_constructing_from_pool(self): + pool = parallel.GreenPool(2) + pile1 = parallel.GreenPile(pool) + pile2 = parallel.GreenPile(pool) + def bunch_of_work(pile, unique): + for i in xrange(10): + pile.spawn(passthru, i + unique) + api.spawn(bunch_of_work, pile1, 0) + api.spawn(bunch_of_work, pile2, 100) + api.sleep(0) + self.assertEquals(list(pile2), list(xrange(100,110))) + self.assertEquals(list(pile1), list(xrange(10))) + + + +class StressException(Exception): + pass + +r = random.Random(0) +def pressure(arg): + while r.random() < 0.5: + api.sleep(r.random() * 0.001) + if r.random() < 0.8: + return arg + else: + raise StressException(arg) + +# TODO: skip these unless explicitly demanded by the user +class Stress(tests.SilencedTestCase): + # tests will take extra-long + TEST_TIMEOUT=10 + def spawn_memory(self, concurrency): + # checks that piles are strictly ordered + # and bounded in memory + p = parallel.GreenPile(concurrency) + def makework(count, unique): + for i in xrange(count): + token = (unique, i) + p.spawn(pressure, token) + + api.spawn(makework, 1000, 1) + api.spawn(makework, 1000, 2) + api.spawn(makework, 1000, 3) + p.spawn(pressure, (0,0)) + latest = [-1] * 4 + received = 0 + it = iter(p) + initial_obj_count = len(gc.get_objects()) + while True: + try: + i = it.next() + received += 1 + if received % 10 == 0: + gc.collect() + objs_created = len(gc.get_objects()) - initial_obj_count + self.assert_(objs_created < 200 * concurrency, objs_created) + except StressException, exc: + i = exc[0] + except StopIteration: + break + unique, order = i + self.assert_(latest[unique] < order) + latest[unique] = order + + def test_memory_5(self): + self.spawn_memory(5) + + def test_memory_50(self): + self.spawn_memory(50) + + def test_memory_500(self): + self.spawn_memory(50) + + def test_with_intpool(self): + from eventlet import pools + class IntPool(pools.Pool): + def create(self): + self.current_integer = getattr(self, 'current_integer', 0) + 1 + return self.current_integer + + def subtest(intpool_size, pool_size, num_executes): + def run(int_pool): + token = int_pool.get() + api.sleep(0.0001) + int_pool.put(token) + return token + + int_pool = IntPool(max_size=intpool_size) + pool = parallel.GreenPool(pool_size) + for ix in xrange(num_executes): + pool.spawn(run, int_pool) + pool.waitall() + + subtest(4, 7, 7) + subtest(50, 75, 100) + for isize in (10, 20, 30, 40, 50): + for psize in (5, 25, 35, 50): + subtest(isize, psize, psize) \ No newline at end of file