Renamed parallel to greenpool.

This commit is contained in:
Ryan Williams
2010-01-01 15:03:58 -08:00
parent 19acf22454
commit ed301bd5bd
5 changed files with 32 additions and 27 deletions

View File

@@ -10,7 +10,7 @@ Module Reference
modules/coros modules/coros
modules/db_pool modules/db_pool
modules/greenio modules/greenio
modules/parallel modules/greenpool
modules/pool modules/pool
modules/pools modules/pools
modules/processes modules/processes

View File

@@ -0,0 +1,5 @@
:mod:`greenpool` -- Green Thread Pools
========================================
.. automodule:: eventlet.greenpool
:members:

View File

@@ -2,7 +2,7 @@ version_info = (0, 9, '3pre')
__version__ = '%s.%s.%s' % version_info __version__ = '%s.%s.%s' % version_info
from eventlet import greenthread from eventlet import greenthread
from eventlet import parallel from eventlet import greenpool
__all__ = ['sleep', 'spawn', 'spawn_n', 'Event', 'GreenPool', 'GreenPile'] __all__ = ['sleep', 'spawn', 'spawn_n', 'Event', 'GreenPool', 'GreenPile']
@@ -12,5 +12,5 @@ spawn = greenthread.spawn
spawn_n = greenthread.spawn_n spawn_n = greenthread.spawn_n
Event = greenthread.Event Event = greenthread.Event
GreenPool = parallel.GreenPool GreenPool = greenpool.GreenPool
GreenPile = parallel.GreenPile GreenPile = greenpool.GreenPile

View File

@@ -5,7 +5,7 @@ import random
import eventlet import eventlet
from eventlet import api from eventlet import api
from eventlet import hubs, parallel, coros, greenthread from eventlet import hubs, greenpool, coros, greenthread
import tests import tests
class Spawn(tests.LimitedTestCase): class Spawn(tests.LimitedTestCase):
@@ -27,7 +27,7 @@ def passthru2(a, b):
class GreenPool(tests.LimitedTestCase): class GreenPool(tests.LimitedTestCase):
def test_spawn(self): def test_spawn(self):
p = parallel.GreenPool(4) p = greenpool.GreenPool(4)
waiters = [] waiters = []
for i in xrange(10): for i in xrange(10):
waiters.append(p.spawn(passthru, i)) waiters.append(p.spawn(passthru, i))
@@ -35,7 +35,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEquals(results, list(xrange(10))) self.assertEquals(results, list(xrange(10)))
def test_spawn_n(self): def test_spawn_n(self):
p = parallel.GreenPool(4) p = greenpool.GreenPool(4)
results_closure = [] results_closure = []
def do_something(a): def do_something(a):
eventlet.sleep(0.01) eventlet.sleep(0.01)
@@ -46,7 +46,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEquals(results_closure, range(10)) self.assertEquals(results_closure, range(10))
def test_waiting(self): def test_waiting(self):
pool = parallel.GreenPool(1) pool = greenpool.GreenPool(1)
done = greenthread.Event() done = greenthread.Event()
def consume(): def consume():
done.wait() done.wait()
@@ -83,7 +83,7 @@ class GreenPool(tests.LimitedTestCase):
evt.wait() evt.wait()
results.append('cons2') results.append('cons2')
pool = parallel.GreenPool(2) pool = greenpool.GreenPool(2)
done = pool.spawn(consumer) done = pool.spawn(consumer)
pool.spawn_n(producer) pool.spawn_n(producer)
done.wait() done.wait()
@@ -97,7 +97,7 @@ class GreenPool(tests.LimitedTestCase):
timer_fired.append(True) timer_fired.append(True)
def some_work(): def some_work():
hubs.get_hub().schedule_call_local(0, fire_timer) hubs.get_hub().schedule_call_local(0, fire_timer)
pool = parallel.GreenPool(2) pool = greenpool.GreenPool(2)
worker = pool.spawn(some_work) worker = pool.spawn(some_work)
worker.wait() worker.wait()
eventlet.sleep(0) eventlet.sleep(0)
@@ -105,7 +105,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEquals(timer_fired, []) self.assertEquals(timer_fired, [])
def test_reentrant(self): def test_reentrant(self):
pool = parallel.GreenPool(1) pool = greenpool.GreenPool(1)
def reenter(): def reenter():
waiter = pool.spawn(lambda a: a, 'reenter') waiter = pool.spawn(lambda a: a, 'reenter')
self.assertEqual('reenter', waiter.wait()) self.assertEqual('reenter', waiter.wait())
@@ -148,7 +148,7 @@ class GreenPool(tests.LimitedTestCase):
eventlet.sleep(0) eventlet.sleep(0)
def test_resize(self): def test_resize(self):
pool = parallel.GreenPool(2) pool = greenpool.GreenPool(2)
evt = greenthread.Event() evt = greenthread.Event()
def wait_long_time(e): def wait_long_time(e):
e.wait() e.wait()
@@ -182,7 +182,7 @@ class GreenPool(tests.LimitedTestCase):
# of a token pool but times out before getting the token. We verify # of a token pool but times out before getting the token. We verify
# that neither pool is adversely affected by this situation. # that neither pool is adversely affected by this situation.
from eventlet import pools from eventlet import pools
pool = parallel.GreenPool(1) pool = greenpool.GreenPool(1)
tp = pools.TokenPool(max_size=1) tp = pools.TokenPool(max_size=1)
token = tp.get() # empty out the pool token = tp.get() # empty out the pool
def do_receive(tp): def do_receive(tp):
@@ -217,7 +217,7 @@ class GreenPool(tests.LimitedTestCase):
gt.wait() gt.wait()
def test_spawn_n_2(self): def test_spawn_n_2(self):
p = parallel.GreenPool(2) p = greenpool.GreenPool(2)
self.assertEqual(p.free(), 2) self.assertEqual(p.free(), 2)
r = [] r = []
def foo(a): def foo(a):
@@ -244,22 +244,22 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(set(r), set([1,2,3,4])) self.assertEqual(set(r), set([1,2,3,4]))
def test_imap(self): def test_imap(self):
p = parallel.GreenPool(4) p = greenpool.GreenPool(4)
result_list = list(p.imap(passthru, xrange(10))) result_list = list(p.imap(passthru, xrange(10)))
self.assertEquals(result_list, list(xrange(10))) self.assertEquals(result_list, list(xrange(10)))
def test_empty_imap(self): def test_empty_imap(self):
p = parallel.GreenPool(4) p = greenpool.GreenPool(4)
result_iter = p.imap(passthru, []) result_iter = p.imap(passthru, [])
self.assertRaises(StopIteration, result_iter.next) self.assertRaises(StopIteration, result_iter.next)
def test_imap_nonefunc(self): def test_imap_nonefunc(self):
p = parallel.GreenPool(4) p = greenpool.GreenPool(4)
result_list = list(p.imap(None, xrange(10))) result_list = list(p.imap(None, xrange(10)))
self.assertEquals(result_list, [(x,) for x in xrange(10)]) self.assertEquals(result_list, [(x,) for x in xrange(10)])
def test_imap_multi_args(self): def test_imap_multi_args(self):
p = parallel.GreenPool(4) p = greenpool.GreenPool(4)
result_list = list(p.imap(passthru2, xrange(10), xrange(10, 20))) result_list = list(p.imap(passthru2, xrange(10), xrange(10, 20)))
self.assertEquals(result_list, list(itertools.izip(xrange(10), xrange(10,20)))) self.assertEquals(result_list, list(itertools.izip(xrange(10), xrange(10,20))))
@@ -267,7 +267,7 @@ class GreenPool(tests.LimitedTestCase):
# testing the case where the function raises an exception; # testing the case where the function raises an exception;
# both that the caller sees that exception, and that the iterator # both that the caller sees that exception, and that the iterator
# continues to be usable to get the rest of the items # continues to be usable to get the rest of the items
p = parallel.GreenPool(4) p = greenpool.GreenPool(4)
def raiser(item): def raiser(item):
if item == 1 or item == 7: if item == 1 or item == 7:
raise RuntimeError("intentional error") raise RuntimeError("intentional error")
@@ -287,14 +287,14 @@ class GreenPool(tests.LimitedTestCase):
class GreenPile(tests.LimitedTestCase): class GreenPile(tests.LimitedTestCase):
def test_pile(self): def test_pile(self):
p = parallel.GreenPile(4) p = greenpool.GreenPile(4)
for i in xrange(10): for i in xrange(10):
p.spawn(passthru, i) p.spawn(passthru, i)
result_list = list(p) result_list = list(p)
self.assertEquals(result_list, list(xrange(10))) self.assertEquals(result_list, list(xrange(10)))
def test_pile_spawn_times_out(self): def test_pile_spawn_times_out(self):
p = parallel.GreenPile(4) p = greenpool.GreenPile(4)
for i in xrange(4): for i in xrange(4):
p.spawn(passthru, i) p.spawn(passthru, i)
# now it should be full and this should time out # now it should be full and this should time out
@@ -307,9 +307,9 @@ class GreenPile(tests.LimitedTestCase):
self.assertEquals(list(p), list(xrange(10))) self.assertEquals(list(p), list(xrange(10)))
def test_constructing_from_pool(self): def test_constructing_from_pool(self):
pool = parallel.GreenPool(2) pool = greenpool.GreenPool(2)
pile1 = parallel.GreenPile(pool) pile1 = greenpool.GreenPile(pool)
pile2 = parallel.GreenPile(pool) pile2 = greenpool.GreenPile(pool)
def bunch_of_work(pile, unique): def bunch_of_work(pile, unique):
for i in xrange(10): for i in xrange(10):
pile.spawn(passthru, i + unique) pile.spawn(passthru, i + unique)
@@ -343,7 +343,7 @@ class Stress(tests.LimitedTestCase):
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def spawn_order_check(self, concurrency): def spawn_order_check(self, concurrency):
# checks that piles are strictly ordered # checks that piles are strictly ordered
p = parallel.GreenPile(concurrency) p = greenpool.GreenPile(concurrency)
def makework(count, unique): def makework(count, unique):
for i in xrange(count): for i in xrange(count):
token = (unique, i) token = (unique, i)
@@ -384,7 +384,7 @@ class Stress(tests.LimitedTestCase):
def imap_memory_check(self, concurrency): def imap_memory_check(self, concurrency):
# checks that imap is strictly # checks that imap is strictly
# ordered and consumes a constant amount of memory # ordered and consumes a constant amount of memory
p = parallel.GreenPool(concurrency) p = greenpool.GreenPool(concurrency)
count = 1000 count = 1000
it = p.imap(passthru, xrange(count)) it = p.imap(passthru, xrange(count))
latest = -1 latest = -1
@@ -432,7 +432,7 @@ class Stress(tests.LimitedTestCase):
return token return token
int_pool = IntPool(max_size=intpool_size) int_pool = IntPool(max_size=intpool_size)
pool = parallel.GreenPool(pool_size) pool = greenpool.GreenPool(pool_size)
for ix in xrange(num_executes): for ix in xrange(num_executes):
pool.spawn(run, int_pool) pool.spawn(run, int_pool)
pool.waitall() pool.waitall()