diff --git a/eventlet/api.py b/eventlet/api.py index 9459228..2985c9e 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -128,31 +128,44 @@ def _spawn_startup(cb, args, kw, cancel=None): cancel() return cb(*args, **kw) -def _spawn(g): - g.parent = greenlet.getcurrent() - g.switch() + +class ResultGreenlet(Greenlet): + def __init__(self): + Greenlet.__init__(self, self.main) + from eventlet import coros + self._exit_event = coros.event() + + def wait(self): + return self._exit_event.wait() + + def link(self, func): + self._exit_funcs = getattr(self, '_exit_funcs', []) + self._exit_funcs.append(func) + + def main(self, *a): + function, args, kwargs = a + try: + result = function(*args, **kwargs) + except: + self._exit_event.send_exception(*sys.exc_info()) + for f in getattr(self, '_exit_funcs', []): + f(self, exc=sys.exc_info()) + else: + self._exit_event.send(result) + for f in getattr(self, '_exit_funcs', []): + f(self, result) -def spawn(function, *args, **kwds): - """Create a new coroutine, or cooperative thread of control, within which - to execute *function*. - - The *function* will be called with the given *args* and keyword arguments - *kwds* and will remain in control unless it cooperatively yields by - calling a socket method or ``sleep()``. - - :func:`spawn` returns control to the caller immediately, and *function* - will be called in a future main loop iteration. - - An uncaught exception in *function* or any child will terminate the new - coroutine with a log message. +def spawn(func, *args, **kwargs): + """ Create a coroutine to run func(*args, **kwargs) without any + way to retrieve the results. Returns the greenlet object. """ - # killable - t = None - g = Greenlet(_spawn_startup) - t = get_hub_().schedule_call_global(0, _spawn, g) - g.switch(function, args, kwds, t.cancel) + g = ResultGreenlet() + hub = get_hub() + g.parent = hub.greenlet + hub.schedule_call_global(0, g.switch, func, args, kwargs) return g + def kill(g, *throw_args): get_hub_().schedule_call_global(0, g.throw, *throw_args) diff --git a/eventlet/coros.py b/eventlet/coros.py index 713ea3b..63288bd 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -469,6 +469,12 @@ class Queue(object): def waiting(self): return len(self._waiters) + + def __iter__(self): + return self + + def next(self): + return self.wait() class Channel(object): diff --git a/eventlet/parallel.py b/eventlet/parallel.py new file mode 100644 index 0000000..2247e92 --- /dev/null +++ b/eventlet/parallel.py @@ -0,0 +1,87 @@ +from eventlet.coros import Semaphore, Queue +from eventlet.api import spawn, getcurrent +import sys + +__all__ = ['Parallel'] + +class Parallel(object): + """ The Parallel class allows you to easily control coroutine concurrency. + """ + def __init__(self, max_size): + self.max_size = max_size + self.coroutines_running = set() + self.sem = Semaphore(max_size) + self._results = Queue() + + def resize(self, new_max_size): + """ Change the max number of coroutines doing work at any given time. + + If resize is called when there are more than *new_max_size* + coroutines already working on tasks, they will be allowed to complete but no + new tasks will be allowed to get launched until enough coroutines finish their + tasks to drop the overall quantity below *new_max_size*. Until then, the + return value of free() will be negative. + """ + max_size_delta = new_max_size - self.max_size + self.sem.counter += max_size_delta + self.max_size = new_max_size + + @property + def current_size(self): + """ The current size is the number of coroutines that are currently + executing functions in the Parallel's pool.""" + return len(self.coroutines_running) + + def free(self): + """ Returns the number of coroutines available for use.""" + return self.sem.counter + + def _coro_done(self, coro, result, exc=None): + self.sem.release() + self.coroutines_running.remove(coro) + self._results.send(result) + # if done processing (no more work is being done), + # send StopIteration so that the queue knows it's done + if self.sem.balance == self.max_size: + self._results.send_exception(StopIteration) + + def spawn(self, func, *args, **kwargs): + """ Create a coroutine to run func(*args, **kwargs). Returns a + Coro object that can be used to retrieve the results of the function. + """ + # if reentering an empty pool, don't try to wait on a coroutine freeing + # itself -- instead, just execute in the current coroutine + current = getcurrent() + if self.sem.locked() and current in self.coroutines_running: + func(*args, **kwargs) + else: + self.sem.acquire() + p = spawn(func, *args, **kwargs) + self.coroutines_running.add(p) + p.link(self._coro_done) + + return p + + def wait(self): + """Wait for the next execute in the pool to complete, + and return the result.""" + return self.results.wait() + + def results(self): + """ Returns an iterator over the results from the worker coroutines.""" + return self._results + + def _do_spawn_all(self, func, iterable): + for i in iterable: + if not isinstance(i, tuple): + self.spawn(func, i) + else: + self.spawn(func, *i) + + def spawn_all(self, func, iterable): + """ Applies *func* over every item in *iterable* using the concurrency + present in the pool. This function is a generator which yields the + results of *func* as applied to the members of the iterable.""" + + spawn(self._do_spawn_all, func, iterable) + return self.results() \ No newline at end of file diff --git a/tests/parallel_test.py b/tests/parallel_test.py new file mode 100644 index 0000000..0096288 --- /dev/null +++ b/tests/parallel_test.py @@ -0,0 +1,27 @@ +from eventlet import api, parallel +import unittest + +class Spawn(unittest.TestCase): + def test_simple(self): + def f(a, b=None): + return (a,b) + + coro = parallel.spawn(f, 1, b=2) + self.assertEquals(coro.wait(), (1,2)) + +def passthru(a): + api.sleep(0.01) + return a + +class Parallel(unittest.TestCase): + def test_parallel(self): + p = parallel.Parallel(4) + for i in xrange(10): + p.spawn(passthru, i) + result_list = list(p.results()) + self.assertEquals(result_list, range(10)) + + def test_spawn_all(self): + p = parallel.Parallel(4) + result_list = list(p.spawn_all(passthru, xrange(10))) + self.assertEquals(result_list, range(10))