Merge
This commit is contained in:
@@ -128,31 +128,44 @@ def _spawn_startup(cb, args, kw, cancel=None):
|
||||
cancel()
|
||||
return cb(*args, **kw)
|
||||
|
||||
def _spawn(g):
|
||||
g.parent = greenlet.getcurrent()
|
||||
g.switch()
|
||||
|
||||
class ResultGreenlet(Greenlet):
|
||||
def __init__(self):
|
||||
Greenlet.__init__(self, self.main)
|
||||
from eventlet import coros
|
||||
self._exit_event = coros.event()
|
||||
|
||||
def wait(self):
|
||||
return self._exit_event.wait()
|
||||
|
||||
def link(self, func):
|
||||
self._exit_funcs = getattr(self, '_exit_funcs', [])
|
||||
self._exit_funcs.append(func)
|
||||
|
||||
def main(self, *a):
|
||||
function, args, kwargs = a
|
||||
try:
|
||||
result = function(*args, **kwargs)
|
||||
except:
|
||||
self._exit_event.send_exception(*sys.exc_info())
|
||||
for f in getattr(self, '_exit_funcs', []):
|
||||
f(self, exc=sys.exc_info())
|
||||
else:
|
||||
self._exit_event.send(result)
|
||||
for f in getattr(self, '_exit_funcs', []):
|
||||
f(self, result)
|
||||
|
||||
|
||||
def spawn(function, *args, **kwds):
|
||||
"""Create a new coroutine, or cooperative thread of control, within which
|
||||
to execute *function*.
|
||||
|
||||
The *function* will be called with the given *args* and keyword arguments
|
||||
*kwds* and will remain in control unless it cooperatively yields by
|
||||
calling a socket method or ``sleep()``.
|
||||
|
||||
:func:`spawn` returns control to the caller immediately, and *function*
|
||||
will be called in a future main loop iteration.
|
||||
|
||||
An uncaught exception in *function* or any child will terminate the new
|
||||
coroutine with a log message.
|
||||
def spawn(func, *args, **kwargs):
|
||||
""" Create a coroutine to run func(*args, **kwargs) without any
|
||||
way to retrieve the results. Returns the greenlet object.
|
||||
"""
|
||||
# killable
|
||||
t = None
|
||||
g = Greenlet(_spawn_startup)
|
||||
t = get_hub_().schedule_call_global(0, _spawn, g)
|
||||
g.switch(function, args, kwds, t.cancel)
|
||||
g = ResultGreenlet()
|
||||
hub = get_hub()
|
||||
g.parent = hub.greenlet
|
||||
hub.schedule_call_global(0, g.switch, func, args, kwargs)
|
||||
return g
|
||||
|
||||
|
||||
def kill(g, *throw_args):
|
||||
get_hub_().schedule_call_global(0, g.throw, *throw_args)
|
||||
|
||||
@@ -469,6 +469,12 @@ class Queue(object):
|
||||
|
||||
def waiting(self):
|
||||
return len(self._waiters)
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def next(self):
|
||||
return self.wait()
|
||||
|
||||
|
||||
class Channel(object):
|
||||
|
||||
87
eventlet/parallel.py
Normal file
87
eventlet/parallel.py
Normal file
@@ -0,0 +1,87 @@
|
||||
from eventlet.coros import Semaphore, Queue
|
||||
from eventlet.api import spawn, getcurrent
|
||||
import sys
|
||||
|
||||
__all__ = ['Parallel']
|
||||
|
||||
class Parallel(object):
|
||||
""" The Parallel class allows you to easily control coroutine concurrency.
|
||||
"""
|
||||
def __init__(self, max_size):
|
||||
self.max_size = max_size
|
||||
self.coroutines_running = set()
|
||||
self.sem = Semaphore(max_size)
|
||||
self._results = Queue()
|
||||
|
||||
def resize(self, new_max_size):
|
||||
""" Change the max number of coroutines doing work at any given time.
|
||||
|
||||
If resize is called when there are more than *new_max_size*
|
||||
coroutines already working on tasks, they will be allowed to complete but no
|
||||
new tasks will be allowed to get launched until enough coroutines finish their
|
||||
tasks to drop the overall quantity below *new_max_size*. Until then, the
|
||||
return value of free() will be negative.
|
||||
"""
|
||||
max_size_delta = new_max_size - self.max_size
|
||||
self.sem.counter += max_size_delta
|
||||
self.max_size = new_max_size
|
||||
|
||||
@property
|
||||
def current_size(self):
|
||||
""" The current size is the number of coroutines that are currently
|
||||
executing functions in the Parallel's pool."""
|
||||
return len(self.coroutines_running)
|
||||
|
||||
def free(self):
|
||||
""" Returns the number of coroutines available for use."""
|
||||
return self.sem.counter
|
||||
|
||||
def _coro_done(self, coro, result, exc=None):
|
||||
self.sem.release()
|
||||
self.coroutines_running.remove(coro)
|
||||
self._results.send(result)
|
||||
# if done processing (no more work is being done),
|
||||
# send StopIteration so that the queue knows it's done
|
||||
if self.sem.balance == self.max_size:
|
||||
self._results.send_exception(StopIteration)
|
||||
|
||||
def spawn(self, func, *args, **kwargs):
|
||||
""" Create a coroutine to run func(*args, **kwargs). Returns a
|
||||
Coro object that can be used to retrieve the results of the function.
|
||||
"""
|
||||
# if reentering an empty pool, don't try to wait on a coroutine freeing
|
||||
# itself -- instead, just execute in the current coroutine
|
||||
current = getcurrent()
|
||||
if self.sem.locked() and current in self.coroutines_running:
|
||||
func(*args, **kwargs)
|
||||
else:
|
||||
self.sem.acquire()
|
||||
p = spawn(func, *args, **kwargs)
|
||||
self.coroutines_running.add(p)
|
||||
p.link(self._coro_done)
|
||||
|
||||
return p
|
||||
|
||||
def wait(self):
|
||||
"""Wait for the next execute in the pool to complete,
|
||||
and return the result."""
|
||||
return self.results.wait()
|
||||
|
||||
def results(self):
|
||||
""" Returns an iterator over the results from the worker coroutines."""
|
||||
return self._results
|
||||
|
||||
def _do_spawn_all(self, func, iterable):
|
||||
for i in iterable:
|
||||
if not isinstance(i, tuple):
|
||||
self.spawn(func, i)
|
||||
else:
|
||||
self.spawn(func, *i)
|
||||
|
||||
def spawn_all(self, func, iterable):
|
||||
""" Applies *func* over every item in *iterable* using the concurrency
|
||||
present in the pool. This function is a generator which yields the
|
||||
results of *func* as applied to the members of the iterable."""
|
||||
|
||||
spawn(self._do_spawn_all, func, iterable)
|
||||
return self.results()
|
||||
27
tests/parallel_test.py
Normal file
27
tests/parallel_test.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from eventlet import api, parallel
|
||||
import unittest
|
||||
|
||||
class Spawn(unittest.TestCase):
|
||||
def test_simple(self):
|
||||
def f(a, b=None):
|
||||
return (a,b)
|
||||
|
||||
coro = parallel.spawn(f, 1, b=2)
|
||||
self.assertEquals(coro.wait(), (1,2))
|
||||
|
||||
def passthru(a):
|
||||
api.sleep(0.01)
|
||||
return a
|
||||
|
||||
class Parallel(unittest.TestCase):
|
||||
def test_parallel(self):
|
||||
p = parallel.Parallel(4)
|
||||
for i in xrange(10):
|
||||
p.spawn(passthru, i)
|
||||
result_list = list(p.results())
|
||||
self.assertEquals(result_list, range(10))
|
||||
|
||||
def test_spawn_all(self):
|
||||
p = parallel.Parallel(4)
|
||||
result_list = list(p.spawn_all(passthru, xrange(10)))
|
||||
self.assertEquals(result_list, range(10))
|
||||
Reference in New Issue
Block a user