From ba0d8d0c5b5fc1389c83e8c9983d02f72c61e7b8 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Sat, 3 Oct 2009 19:48:29 -0700 Subject: [PATCH 1/3] Initial commit of Parallel implementation --- eventlet/coros.py | 6 +++ eventlet/parallel.py | 111 +++++++++++++++++++++++++++++++++++++++++ tests/parallel_test.py | 21 ++++++++ 3 files changed, 138 insertions(+) create mode 100644 eventlet/parallel.py create mode 100644 tests/parallel_test.py diff --git a/eventlet/coros.py b/eventlet/coros.py index 3e36c73..7b359de 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -486,6 +486,12 @@ class Queue(object): def waiting(self): return len(self._waiters) + + def __iter__(self): + return self + + def next(self): + return self.wait() class Channel(object): diff --git a/eventlet/parallel.py b/eventlet/parallel.py new file mode 100644 index 0000000..b435948 --- /dev/null +++ b/eventlet/parallel.py @@ -0,0 +1,111 @@ +from eventlet import api,coros +import sys + +__all__ = ['spawn', 'detach', 'Parallel'] + +class ResultGreenlet(api.Greenlet): + def __init__(self): + api.Greenlet.__init__(self, self.main) + from eventlet import coros + self._exit_event = coros.event() + + def wait(self): + return self._exit_event.wait() + + def link(self, func): + self._exit_funcs = getattr(self, '_exit_funcs', []) + self._exit_funcs.append(func) + + def main(self, *a): + function, args, kwargs = a + try: + result = function(*args, **kwargs) + except: + self._exit_event.send_exception(*sys.exc_info()) + for f in getattr(self, '_exit_funcs', []): + f(self, exc=sys.exc_info()) + else: + self._exit_event.send(result) + for f in getattr(self, '_exit_funcs', []): + f(self, result) + + + +def spawn(func, *args, **kwargs): + """ Create a coroutine to run func(*args, **kwargs) without any + way to retrieve the results. Returns the greenlet object. + """ + # TODO: relying on the existence of hub.greenlet may lead to sadness? + g = ResultGreenlet() + g.parent = api.get_hub().greenlet + api.get_hub().schedule_call_global(0, g.switch, func, args, kwargs) + return g + + +class Parallel(object): + """ The Parallel class allows you to easily control coroutine concurrency. + """ + def __init__(self, max_size): + self.max_size = max_size + self.coroutines_running = set() + self.sem = coros.Semaphore(max_size) + self._results = coros.Queue() + + def resize(self, new_max_size): + """ Change the max number of coroutines doing work at any given time. + + If resize is called when there are more than *new_max_size* + coroutines already working on tasks, they will be allowed to complete but no + new tasks will be allowed to get launched until enough coroutines finish their + tasks to drop the overall quantity below *new_max_size*. Until then, the + return value of free() will be negative. + """ + max_size_delta = new_max_size - self.max_size + self.sem.counter += max_size_delta + self.max_size = new_max_size + + @property + def current_size(self): + """ The current size is the number of coroutines that are currently + executing functions in the Parallel's pool.""" + return len(self.coroutines_running) + + def free(self): + """ Returns the number of coroutines available for use.""" + return self.sem.counter + + def _coro_done(self, coro, result, exc=None): + self.sem.release() + self.coroutines_running.remove(coro) + self._results.send(result) + # if done processing (no more work is being done), + # send StopIteration so that the queue knows it's done + if self.sem.balance == self.max_size: + self._results.send_exception(StopIteration) + + def spawn(self, func, *args, **kwargs): + """ Create a coroutine to run func(*args, **kwargs). Returns a + Coro object that can be used to retrieve the results of the function. + """ + # if reentering an empty pool, don't try to wait on a coroutine freeing + # itself -- instead, just execute in the current coroutine + current = api.getcurrent() + if self.sem.locked() and current in self.coroutines_running: + func(*args, **kwargs) + else: + self.sem.acquire() + p = spawn(func, *args, **kwargs) + self.coroutines_running.add(p) + p.link(self._coro_done) + + return p + + def wait(self): + """Wait for the next execute in the pool to complete, + and return the result.""" + return self.results.wait() + + def results(self): + """ Returns an iterator over the results collected so far; when iterating over + the result set, """ + return self._results diff --git a/tests/parallel_test.py b/tests/parallel_test.py new file mode 100644 index 0000000..50e5656 --- /dev/null +++ b/tests/parallel_test.py @@ -0,0 +1,21 @@ +from eventlet import api, parallel +import unittest + +class Spawn(unittest.TestCase): + def test_simple(self): + def f(a, b=None): + return (a,b) + + coro = parallel.spawn(f, 1, b=2) + self.assertEquals(coro.wait(), (1,2)) + +class Parallel(unittest.TestCase): + def test_parallel(self): + def f(a): + api.sleep(0.01) + return a + p = parallel.Parallel(4) + for i in xrange(10): + p.spawn(f, i) + result_list = list(p.results()) + self.assertEquals(result_list, range(10)) \ No newline at end of file From ab0ea083766d3188fa8344a772ac4bcde3451646 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Sat, 3 Oct 2009 19:56:00 -0700 Subject: [PATCH 2/3] Replaced api.spawn with Parallel spawn. Not real happy about keeping API around, but refactoring to remove the api module will have to wait. --- eventlet/api.py | 55 +++++++++++++++++++++++++++----------------- eventlet/parallel.py | 50 +++++----------------------------------- 2 files changed, 40 insertions(+), 65 deletions(-) diff --git a/eventlet/api.py b/eventlet/api.py index fe67d7b..7e1a609 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -225,31 +225,44 @@ def _spawn_startup(cb, args, kw, cancel=None): cancel() return cb(*args, **kw) -def _spawn(g): - g.parent = greenlet.getcurrent() - g.switch() + +class ResultGreenlet(Greenlet): + def __init__(self): + Greenlet.__init__(self, self.main) + from eventlet import coros + self._exit_event = coros.event() + + def wait(self): + return self._exit_event.wait() + + def link(self, func): + self._exit_funcs = getattr(self, '_exit_funcs', []) + self._exit_funcs.append(func) + + def main(self, *a): + function, args, kwargs = a + try: + result = function(*args, **kwargs) + except: + self._exit_event.send_exception(*sys.exc_info()) + for f in getattr(self, '_exit_funcs', []): + f(self, exc=sys.exc_info()) + else: + self._exit_event.send(result) + for f in getattr(self, '_exit_funcs', []): + f(self, result) -def spawn(function, *args, **kwds): - """Create a new coroutine, or cooperative thread of control, within which - to execute *function*. - - The *function* will be called with the given *args* and keyword arguments - *kwds* and will remain in control unless it cooperatively yields by - calling a socket method or ``sleep()``. - - :func:`spawn` returns control to the caller immediately, and *function* - will be called in a future main loop iteration. - - An uncaught exception in *function* or any child will terminate the new - coroutine with a log message. +def spawn(func, *args, **kwargs): + """ Create a coroutine to run func(*args, **kwargs) without any + way to retrieve the results. Returns the greenlet object. """ - # killable - t = None - g = Greenlet(_spawn_startup) - t = get_hub().schedule_call_global(0, _spawn, g) - g.switch(function, args, kwds, t.cancel) + g = ResultGreenlet() + hub = get_hub() + g.parent = hub.greenlet + hub.schedule_call_global(0, g.switch, func, args, kwargs) return g + def kill(g, *throw_args): get_hub().schedule_call_global(0, g.throw, *throw_args) diff --git a/eventlet/parallel.py b/eventlet/parallel.py index b435948..702da4a 100644 --- a/eventlet/parallel.py +++ b/eventlet/parallel.py @@ -1,46 +1,8 @@ -from eventlet import api,coros +from eventlet.coros import Semaphore, Queue +from eventlet.api import spawn, getcurrent import sys -__all__ = ['spawn', 'detach', 'Parallel'] - -class ResultGreenlet(api.Greenlet): - def __init__(self): - api.Greenlet.__init__(self, self.main) - from eventlet import coros - self._exit_event = coros.event() - - def wait(self): - return self._exit_event.wait() - - def link(self, func): - self._exit_funcs = getattr(self, '_exit_funcs', []) - self._exit_funcs.append(func) - - def main(self, *a): - function, args, kwargs = a - try: - result = function(*args, **kwargs) - except: - self._exit_event.send_exception(*sys.exc_info()) - for f in getattr(self, '_exit_funcs', []): - f(self, exc=sys.exc_info()) - else: - self._exit_event.send(result) - for f in getattr(self, '_exit_funcs', []): - f(self, result) - - - -def spawn(func, *args, **kwargs): - """ Create a coroutine to run func(*args, **kwargs) without any - way to retrieve the results. Returns the greenlet object. - """ - # TODO: relying on the existence of hub.greenlet may lead to sadness? - g = ResultGreenlet() - g.parent = api.get_hub().greenlet - api.get_hub().schedule_call_global(0, g.switch, func, args, kwargs) - return g - +__all__ = ['Parallel'] class Parallel(object): """ The Parallel class allows you to easily control coroutine concurrency. @@ -48,8 +10,8 @@ class Parallel(object): def __init__(self, max_size): self.max_size = max_size self.coroutines_running = set() - self.sem = coros.Semaphore(max_size) - self._results = coros.Queue() + self.sem = Semaphore(max_size) + self._results = Queue() def resize(self, new_max_size): """ Change the max number of coroutines doing work at any given time. @@ -89,7 +51,7 @@ class Parallel(object): """ # if reentering an empty pool, don't try to wait on a coroutine freeing # itself -- instead, just execute in the current coroutine - current = api.getcurrent() + current = getcurrent() if self.sem.locked() and current in self.coroutines_running: func(*args, **kwargs) else: From dc9b3e4e294351770b22d17f31e6db459b85fade Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Sun, 4 Oct 2009 00:17:49 -0700 Subject: [PATCH 3/3] Added spawn_all function and rudimentary test for it. --- eventlet/parallel.py | 18 ++++++++++++++++-- tests/parallel_test.py | 16 +++++++++++----- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/eventlet/parallel.py b/eventlet/parallel.py index 702da4a..2247e92 100644 --- a/eventlet/parallel.py +++ b/eventlet/parallel.py @@ -68,6 +68,20 @@ class Parallel(object): return self.results.wait() def results(self): - """ Returns an iterator over the results collected so far; when iterating over - the result set, """ + """ Returns an iterator over the results from the worker coroutines.""" return self._results + + def _do_spawn_all(self, func, iterable): + for i in iterable: + if not isinstance(i, tuple): + self.spawn(func, i) + else: + self.spawn(func, *i) + + def spawn_all(self, func, iterable): + """ Applies *func* over every item in *iterable* using the concurrency + present in the pool. This function is a generator which yields the + results of *func* as applied to the members of the iterable.""" + + spawn(self._do_spawn_all, func, iterable) + return self.results() \ No newline at end of file diff --git a/tests/parallel_test.py b/tests/parallel_test.py index 50e5656..0096288 100644 --- a/tests/parallel_test.py +++ b/tests/parallel_test.py @@ -8,14 +8,20 @@ class Spawn(unittest.TestCase): coro = parallel.spawn(f, 1, b=2) self.assertEquals(coro.wait(), (1,2)) + +def passthru(a): + api.sleep(0.01) + return a class Parallel(unittest.TestCase): def test_parallel(self): - def f(a): - api.sleep(0.01) - return a p = parallel.Parallel(4) for i in xrange(10): - p.spawn(f, i) + p.spawn(passthru, i) result_list = list(p.results()) - self.assertEquals(result_list, range(10)) \ No newline at end of file + self.assertEquals(result_list, range(10)) + + def test_spawn_all(self): + p = parallel.Parallel(4) + result_list = list(p.spawn_all(passthru, xrange(10))) + self.assertEquals(result_list, range(10))