Renamed Parallel to GreenPool, added GreenPile abstraction for handling batches of work.

This commit is contained in:
Ryan Williams
2009-12-24 15:29:34 -05:00
parent 36a10df5a2
commit 0316c27062
2 changed files with 452 additions and 78 deletions

View File

@@ -1,36 +1,35 @@
from eventlet.coros import Semaphore, Queue, Event
from eventlet.api import spawn, getcurrent
from eventlet import api
from collections import deque
import sys
__all__ = ['Parallel']
__all__ = ['GreenPool', 'GreenPile']
class Parallel(object):
class GreenPool(object):
""" The Parallel class allows you to easily control coroutine concurrency.
"""
def __init__(self, max_size):
self.max_size = max_size
def __init__(self, size):
self.size = size
self.coroutines_running = set()
self.sem = Semaphore(max_size)
self.sem = Semaphore(size)
self.no_coros_running = Event()
self._results = Queue()
def resize(self, new_max_size):
def resize(self, new_size):
""" Change the max number of coroutines doing work at any given time.
If resize is called when there are more than *new_max_size*
If resize is called when there are more than *new_size*
coroutines already working on tasks, they will be allowed to complete but no
new tasks will be allowed to get launched until enough coroutines finish their
tasks to drop the overall quantity below *new_max_size*. Until then, the
tasks to drop the overall quantity below *new_size*. Until then, the
return value of free() will be negative.
"""
max_size_delta = new_max_size - self.max_size
self.sem.counter += max_size_delta
self.max_size = new_max_size
size_delta = new_size - self.size
self.sem.counter += size_delta
self.size = new_size
@property
def current_size(self):
""" The current size is the number of coroutines that are currently
executing functions in the Parallel's pool."""
def running(self):
""" Returns the number of coroutines that are currently executing
functions in the Parallel's pool."""
return len(self.coroutines_running)
def free(self):
@@ -40,14 +39,7 @@ class Parallel(object):
def spawn(self, func, *args, **kwargs):
"""Run func(*args, **kwargs) in its own green thread.
"""
return self._spawn(False, func, *args, **kwargs)
def spawn_q(self, func, *args, **kwargs):
"""Run func(*args, **kwargs) in its own green thread.
The results of func are stuck in the results() iterator.
"""
self._spawn(True, func, *args, **kwargs)
return self._spawn(func, *args, **kwargs)
def spawn_n(self, func, *args, **kwargs):
""" Create a coroutine to run func(*args, **kwargs).
@@ -55,60 +47,115 @@ class Parallel(object):
Returns None; the results of the function are not retrievable.
The results of the function are not put into the results() iterator.
"""
self._spawn(False, func, *args, **kwargs)
self._spawn(func, *args, **kwargs)
def _spawn(self, send_result, func, *args, **kwargs):
def _spawn(self, func, *args, **kwargs):
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = getcurrent()
current = api.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
func(*args, **kwargs)
# a bit hacky to use the GT without switching to it
gt = api.GreenThread(current)
gt.main(func, args, kwargs)
return gt
else:
self.sem.acquire()
p = spawn(func, *args, **kwargs)
gt = api.spawn(func, *args, **kwargs)
if not self.coroutines_running:
self.no_coros_running = Event()
self.coroutines_running.add(p)
p.link(self._spawn_done, send_result=send_result, coro=p)
return p
self.coroutines_running.add(gt)
gt.link(self._spawn_done, coro=gt)
return gt
def waitall(self):
"""Waits until all coroutines in the pool are finished working."""
self.no_coros_running.wait()
def _spawn_done(self, result=None, exc=None, send_result=False, coro=None):
def _spawn_done(self, result=None, exc=None, coro=None):
self.sem.release()
self.coroutines_running.remove(coro)
if send_result:
self._results.send(result)
# if done processing (no more work is waiting for processing),
# send StopIteration so that the queue knows it's done
if self.sem.balance == self.max_size:
if send_result:
self._results.send_exception(StopIteration)
self.no_coros_running.send(None)
def wait(self):
"""Wait for the next execute in the pool to complete,
and return the result."""
return self.results.wait()
if self.sem.balance == self.size:
self.no_coros_running.send(None)
def waiting(self):
"""Return the number of coroutines waiting to execute.
"""
if self.sem.balance < 0:
return -self.sem.balance
else:
return 0
try:
next
except NameError:
def next(it):
try:
it.next()
except AttributeError:
raise TypeError("%s object is not an iterator" % type(it))
def results(self):
""" Returns an iterator over the results from the worker coroutines."""
return self._results
class GreenPile(object):
def __init__(self, size_or_pool):
if isinstance(size_or_pool, GreenPool):
self.pool = size_or_pool
else:
self.pool = GreenPool(size_or_pool)
self.waiters = Queue()
self.counter = 0
def spawn(self, func, *args, **kw):
self.counter += 1
try:
gt = self.pool.spawn(func, *args, **kw)
self.waiters.send(gt)
except:
self.counter -= 1
raise
def _do_spawn_all(self, func, iterable):
for i in iterable:
# if the list is composed of single arguments, use those
if not isinstance(i, (tuple, list)):
self.spawn_q(func, i)
else:
self.spawn_q(func, *i)
def __iter__(self):
return self
def spawn_all(self, func, iterable):
""" Applies *func* over every item in *iterable* using the concurrency
present in the pool. This function is a generator which yields the
results of *func* as applied to the members of the iterable."""
def next(self):
if self.counter == 0:
raise StopIteration()
try:
return self.waiters.wait().wait()
finally:
self.counter -= 1
def _do_map(self, func, iterables):
while True:
try:
i = map(next, iterables)
self.spawn(func, *i)
except StopIteration:
break
def imap(self, function, *iterables):
"""This is the same as itertools.imap, except that *func* is executed
with the specified concurrency.
spawn(self._do_spawn_all, func, iterable)
return self.results()
Make an iterator that computes the *function* using arguments from
each of the *iterables*, and using the coroutine concurrency specified
in the GreenPile's constructor. Like map() except that it returns an
iterator instead of a list and that it stops when the shortest iterable
is exhausted instead of filling in None for shorter iterables.
"""
if function is None:
function = lambda *a: a
# spawn first item to prime the pump
try:
it = map(iter, iterables)
i = map(next, it)
self.spawn(function, *i)
except StopIteration:
# if the iterable has no items, we need
# to defer the StopIteration till someone
# iterates over us
self.spawn(lambda: next(iter([])))
# spin off a coroutine to launch the rest of the items
api.spawn(self._do_map, function, it)
return self

View File

@@ -1,33 +1,33 @@
from eventlet import api, parallel
import unittest
import gc
import random
class Spawn(unittest.TestCase):
from eventlet import api, hubs, parallel, coros
import tests
class Spawn(tests.LimitedTestCase):
# TODO: move this test elsewhere
def test_simple(self):
def f(a, b=None):
return (a,b)
coro = parallel.spawn(f, 1, b=2)
self.assertEquals(coro.wait(), (1,2))
gt = parallel.api. spawn(f, 1, b=2)
self.assertEquals(gt.wait(), (1,2))
def passthru(a):
api.sleep(0.01)
return a
class Parallel(unittest.TestCase):
def test_parallel(self):
p = parallel.Parallel(4)
class GreenPool(tests.LimitedTestCase):
def test_spawn(self):
p = parallel.GreenPool(4)
waiters = []
for i in xrange(10):
p.spawn_q(passthru, i)
result_list = list(p.results())
self.assertEquals(result_list, range(10))
def test_spawn_all(self):
p = parallel.Parallel(4)
result_list = list(p.spawn_all(passthru, xrange(10)))
self.assertEquals(result_list, range(10))
waiters.append(p.spawn(passthru, i))
results = [waiter.wait() for waiter in waiters]
self.assertEquals(results, list(xrange(10)))
def test_spawn_n(self):
p = parallel.Parallel(4)
p = parallel.GreenPool(4)
results_closure = []
def do_something(a):
api.sleep(0.01)
@@ -36,4 +36,331 @@ class Parallel(unittest.TestCase):
p.spawn(do_something, i)
p.waitall()
self.assertEquals(results_closure, range(10))
def test_waiting(self):
pool = parallel.GreenPool(1)
done = coros.Event()
def consume():
done.wait()
def waiter(pool):
gt = pool.spawn(consume)
gt.wait()
waiters = []
self.assertEqual(pool.running(), 0)
waiters.append(api.spawn(waiter, pool))
api.sleep(0)
self.assertEqual(pool.waiting(), 0)
waiters.append(api.spawn(waiter, pool))
api.sleep(0)
self.assertEqual(pool.waiting(), 1)
waiters.append(api.spawn(waiter, pool))
api.sleep(0)
self.assertEqual(pool.waiting(), 2)
self.assertEqual(pool.running(), 1)
done.send(None)
for w in waiters:
w.wait()
self.assertEqual(pool.waiting(), 0)
self.assertEqual(pool.running(), 0)
def test_multiple_coros(self):
evt = coros.Event()
results = []
def producer():
results.append('prod')
evt.send()
def consumer():
results.append('cons1')
evt.wait()
results.append('cons2')
pool = parallel.GreenPool(2)
done = pool.spawn(consumer)
pool.spawn_n(producer)
done.wait()
self.assertEquals(['cons1', 'prod', 'cons2'], results)
def test_timer_cancel(self):
# this test verifies that local timers are not fired
# outside of the context of the spawn
timer_fired = []
def fire_timer():
timer_fired.append(True)
def some_work():
hubs.get_hub().schedule_call_local(0, fire_timer)
pool = parallel.GreenPool(2)
worker = pool.spawn(some_work)
worker.wait()
api.sleep(0)
api.sleep(0)
self.assertEquals(timer_fired, [])
def test_reentrant(self):
pool = parallel.GreenPool(1)
def reenter():
waiter = pool.spawn(lambda a: a, 'reenter')
self.assertEqual('reenter', waiter.wait())
outer_waiter = pool.spawn(reenter)
outer_waiter.wait()
evt = coros.Event()
def reenter_async():
pool.spawn_n(lambda a: a, 'reenter')
evt.send('done')
pool.spawn_n(reenter_async)
self.assertEquals('done', evt.wait())
def assert_pool_has_free(self, pool, num_free):
def wait_long_time(e):
e.wait()
timer = api.exc_after(1, api.TimeoutError)
try:
evt = coros.Event()
for x in xrange(num_free):
pool.spawn(wait_long_time, evt)
# if the pool has fewer free than we expect,
# then we'll hit the timeout error
finally:
timer.cancel()
# if the runtime error is not raised it means the pool had
# some unexpected free items
timer = api.exc_after(0, RuntimeError)
try:
self.assertRaises(RuntimeError, pool.spawn, wait_long_time, evt)
finally:
timer.cancel()
# clean up by causing all the wait_long_time functions to return
evt.send(None)
api.sleep(0)
api.sleep(0)
def test_resize(self):
pool = parallel.GreenPool(2)
evt = coros.Event()
def wait_long_time(e):
e.wait()
pool.spawn(wait_long_time, evt)
pool.spawn(wait_long_time, evt)
self.assertEquals(pool.free(), 0)
self.assertEquals(pool.running(), 2)
self.assert_pool_has_free(pool, 0)
# verify that the pool discards excess items put into it
pool.resize(1)
# cause the wait_long_time functions to return, which will
# trigger puts to the pool
evt.send(None)
api.sleep(0)
api.sleep(0)
self.assertEquals(pool.free(), 1)
self.assertEquals(pool.running(), 0)
self.assert_pool_has_free(pool, 1)
# resize larger and assert that there are more free items
pool.resize(2)
self.assertEquals(pool.free(), 2)
self.assertEquals(pool.running(), 0)
self.assert_pool_has_free(pool, 2)
def test_pool_smash(self):
# The premise is that a coroutine in a Pool tries to get a token out
# of a token pool but times out before getting the token. We verify
# that neither pool is adversely affected by this situation.
from eventlet import pools
pool = parallel.GreenPool(1)
tp = pools.TokenPool(max_size=1)
token = tp.get() # empty out the pool
def do_receive(tp):
timer = api.exc_after(0, RuntimeError())
try:
t = tp.get()
self.fail("Shouldn't have recieved anything from the pool")
except RuntimeError:
return 'timed out'
else:
timer.cancel()
# the spawn makes the token pool expect that coroutine, but then
# immediately cuts bait
e1 = pool.spawn(do_receive, tp)
self.assertEquals(e1.wait(), 'timed out')
# the pool can get some random item back
def send_wakeup(tp):
tp.put('wakeup')
gt = api.spawn(send_wakeup, tp)
# now we ask the pool to run something else, which should not
# be affected by the previous send at all
def resume():
return 'resumed'
e2 = pool.spawn(resume)
self.assertEquals(e2.wait(), 'resumed')
# we should be able to get out the thing we put in there, too
self.assertEquals(tp.get(), 'wakeup')
gt.wait()
def test_spawn_n_2(self):
p = parallel.GreenPool(2)
self.assertEqual(p.free(), 2)
r = []
def foo(a):
r.append(a)
gt = p.spawn(foo, 1)
self.assertEqual(p.free(), 1)
gt.wait()
self.assertEqual(r, [1])
api.sleep(0)
self.assertEqual(p.free(), 2)
#Once the pool is exhausted, spawning forces a yield.
p.spawn_n(foo, 2)
self.assertEqual(1, p.free())
self.assertEqual(r, [1])
p.spawn_n(foo, 3)
self.assertEqual(0, p.free())
self.assertEqual(r, [1])
p.spawn_n(foo, 4)
self.assertEqual(set(r), set([1,2,3]))
api.sleep(0)
self.assertEqual(set(r), set([1,2,3,4]))
class GreenPile(tests.LimitedTestCase):
def test_imap(self):
p = parallel.GreenPile(4)
result_list = list(p.imap(passthru, xrange(10)))
self.assertEquals(result_list, list(xrange(10)))
def test_empty_map(self):
p = parallel.GreenPile(4)
result_iter = p.imap(passthru, [])
self.assertRaises(StopIteration, result_iter.next)
def test_pile(self):
p = parallel.GreenPile(4)
for i in xrange(10):
p.spawn(passthru, i)
result_list = list(p)
self.assertEquals(result_list, list(xrange(10)))
def test_pile_spawn_times_out(self):
p = parallel.GreenPile(4)
for i in xrange(4):
p.spawn(passthru, i)
# now it should be full and this should time out
api.exc_after(0, api.TimeoutError)
self.assertRaises(api.TimeoutError, p.spawn, passthru, "time out")
# verify that the spawn breakage didn't interrupt the sequence
# and terminates properly
for i in xrange(4,10):
p.spawn(passthru, i)
self.assertEquals(list(p), list(xrange(10)))
def test_constructing_from_pool(self):
pool = parallel.GreenPool(2)
pile1 = parallel.GreenPile(pool)
pile2 = parallel.GreenPile(pool)
def bunch_of_work(pile, unique):
for i in xrange(10):
pile.spawn(passthru, i + unique)
api.spawn(bunch_of_work, pile1, 0)
api.spawn(bunch_of_work, pile2, 100)
api.sleep(0)
self.assertEquals(list(pile2), list(xrange(100,110)))
self.assertEquals(list(pile1), list(xrange(10)))
class StressException(Exception):
pass
r = random.Random(0)
def pressure(arg):
while r.random() < 0.5:
api.sleep(r.random() * 0.001)
if r.random() < 0.8:
return arg
else:
raise StressException(arg)
# TODO: skip these unless explicitly demanded by the user
class Stress(tests.SilencedTestCase):
# tests will take extra-long
TEST_TIMEOUT=10
def spawn_memory(self, concurrency):
# checks that piles are strictly ordered
# and bounded in memory
p = parallel.GreenPile(concurrency)
def makework(count, unique):
for i in xrange(count):
token = (unique, i)
p.spawn(pressure, token)
api.spawn(makework, 1000, 1)
api.spawn(makework, 1000, 2)
api.spawn(makework, 1000, 3)
p.spawn(pressure, (0,0))
latest = [-1] * 4
received = 0
it = iter(p)
initial_obj_count = len(gc.get_objects())
while True:
try:
i = it.next()
received += 1
if received % 10 == 0:
gc.collect()
objs_created = len(gc.get_objects()) - initial_obj_count
self.assert_(objs_created < 200 * concurrency, objs_created)
except StressException, exc:
i = exc[0]
except StopIteration:
break
unique, order = i
self.assert_(latest[unique] < order)
latest[unique] = order
def test_memory_5(self):
self.spawn_memory(5)
def test_memory_50(self):
self.spawn_memory(50)
def test_memory_500(self):
self.spawn_memory(50)
def test_with_intpool(self):
from eventlet import pools
class IntPool(pools.Pool):
def create(self):
self.current_integer = getattr(self, 'current_integer', 0) + 1
return self.current_integer
def subtest(intpool_size, pool_size, num_executes):
def run(int_pool):
token = int_pool.get()
api.sleep(0.0001)
int_pool.put(token)
return token
int_pool = IntPool(max_size=intpool_size)
pool = parallel.GreenPool(pool_size)
for ix in xrange(num_executes):
pool.spawn(run, int_pool)
pool.waitall()
subtest(4, 7, 7)
subtest(50, 75, 100)
for isize in (10, 20, 30, 40, 50):
for psize in (5, 25, 35, 50):
subtest(isize, psize, psize)