Moved imap to GreenPool, verified its memory-boundedness, and tinkered with the stress tests.
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
import itertools
|
||||
|
||||
from eventlet import greenthread
|
||||
from eventlet import coros
|
||||
|
||||
@@ -108,7 +110,37 @@ class GreenPool(object):
|
||||
if self.sem.balance < 0:
|
||||
return -self.sem.balance
|
||||
else:
|
||||
return 0
|
||||
return 0
|
||||
|
||||
def _do_imap(self, func, it, q):
|
||||
while True:
|
||||
try:
|
||||
args = it.next()
|
||||
q.send(self.spawn(func, *args))
|
||||
except StopIteration:
|
||||
q.send(self.spawn(raise_stop_iteration))
|
||||
return
|
||||
|
||||
def imap(self, function, *iterables):
|
||||
"""This is the same as itertools.imap, except that *func* is
|
||||
executed in separate green threads, with the specified concurrency
|
||||
control. Using imap consumes a constant amount of memory,
|
||||
proportional to the size of the pool, and is thus suited for iterating
|
||||
over extremely long input lists.
|
||||
|
||||
One caveat: if *function* raises an exception, the caller of imap
|
||||
will see a StopIteration exception, not the actual raised exception.
|
||||
This is a bug.
|
||||
"""
|
||||
if function is None:
|
||||
function = lambda *a: a
|
||||
it = itertools.izip(*iterables)
|
||||
q = coros.Channel(max_size=self.size)
|
||||
greenthread.spawn_n(self._do_imap, function, it, q)
|
||||
while True:
|
||||
# FIX: if wait() raises an exception the caller
|
||||
# sees a stopiteration, should see the exception
|
||||
yield q.wait().wait()
|
||||
|
||||
|
||||
try:
|
||||
@@ -155,31 +187,6 @@ class GreenPile(object):
|
||||
finally:
|
||||
self.counter -= 1
|
||||
|
||||
def _do_map(self, func, iterables):
|
||||
while True:
|
||||
try:
|
||||
i = map(next, iterables)
|
||||
self.spawn(func, *i)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
def imap(self, function, *iterables):
|
||||
"""This is the same as itertools.imap, except that *func* is
|
||||
executed in separate green threads, with the specified concurrency
|
||||
control.
|
||||
"""
|
||||
if function is None:
|
||||
function = lambda *a: a
|
||||
# spawn first item to prime the pump
|
||||
try:
|
||||
it = map(iter, iterables)
|
||||
i = map(next, it)
|
||||
self.spawn(function, *i)
|
||||
except StopIteration:
|
||||
# if the iterable has no items, we need
|
||||
# to defer the StopIteration till someone
|
||||
# iterates over us
|
||||
self.spawn(lambda: next(iter([])))
|
||||
# spin off a coroutine to launch the rest of the items
|
||||
greenthread.spawn(self._do_map, function, it)
|
||||
return self
|
||||
|
||||
def raise_stop_iteration():
|
||||
raise StopIteration()
|
||||
@@ -4,7 +4,7 @@ import random
|
||||
|
||||
import eventlet
|
||||
from eventlet import api
|
||||
from eventlet import hubs, parallel, coros
|
||||
from eventlet import hubs, parallel, coros, greenthread
|
||||
import tests
|
||||
|
||||
class Spawn(tests.LimitedTestCase):
|
||||
@@ -42,7 +42,7 @@ class GreenPool(tests.LimitedTestCase):
|
||||
|
||||
def test_waiting(self):
|
||||
pool = parallel.GreenPool(1)
|
||||
done = coros.Event()
|
||||
done = greenthread.Event()
|
||||
def consume():
|
||||
done.wait()
|
||||
def waiter(pool):
|
||||
@@ -68,7 +68,7 @@ class GreenPool(tests.LimitedTestCase):
|
||||
self.assertEqual(pool.running(), 0)
|
||||
|
||||
def test_multiple_coros(self):
|
||||
evt = coros.Event()
|
||||
evt = greenthread.Event()
|
||||
results = []
|
||||
def producer():
|
||||
results.append('prod')
|
||||
@@ -108,7 +108,7 @@ class GreenPool(tests.LimitedTestCase):
|
||||
outer_waiter = pool.spawn(reenter)
|
||||
outer_waiter.wait()
|
||||
|
||||
evt = coros.Event()
|
||||
evt = greenthread.Event()
|
||||
def reenter_async():
|
||||
pool.spawn_n(lambda a: a, 'reenter')
|
||||
evt.send('done')
|
||||
@@ -121,7 +121,7 @@ class GreenPool(tests.LimitedTestCase):
|
||||
e.wait()
|
||||
timer = api.exc_after(1, api.TimeoutError)
|
||||
try:
|
||||
evt = coros.Event()
|
||||
evt = greenthread.Event()
|
||||
for x in xrange(num_free):
|
||||
pool.spawn(wait_long_time, evt)
|
||||
# if the pool has fewer free than we expect,
|
||||
@@ -144,7 +144,7 @@ class GreenPool(tests.LimitedTestCase):
|
||||
|
||||
def test_resize(self):
|
||||
pool = parallel.GreenPool(2)
|
||||
evt = coros.Event()
|
||||
evt = greenthread.Event()
|
||||
def wait_long_time(e):
|
||||
e.wait()
|
||||
pool.spawn(wait_long_time, evt)
|
||||
@@ -238,22 +238,22 @@ class GreenPool(tests.LimitedTestCase):
|
||||
eventlet.sleep(0)
|
||||
self.assertEqual(set(r), set([1,2,3,4]))
|
||||
|
||||
class GreenPile(tests.LimitedTestCase):
|
||||
def test_imap(self):
|
||||
p = parallel.GreenPile(4)
|
||||
p = parallel.GreenPool(4)
|
||||
result_list = list(p.imap(passthru, xrange(10)))
|
||||
self.assertEquals(result_list, list(xrange(10)))
|
||||
|
||||
def test_empty_map(self):
|
||||
p = parallel.GreenPile(4)
|
||||
def test_empty_imap(self):
|
||||
p = parallel.GreenPool(4)
|
||||
result_iter = p.imap(passthru, [])
|
||||
self.assertRaises(StopIteration, result_iter.next)
|
||||
|
||||
def test_imap_nonefunc(self):
|
||||
p = parallel.GreenPile(4)
|
||||
p = parallel.GreenPool(4)
|
||||
result_list = list(p.imap(None, xrange(10)))
|
||||
self.assertEquals(result_list, [(x,) for x in xrange(10)])
|
||||
|
||||
class GreenPile(tests.LimitedTestCase):
|
||||
def test_pile(self):
|
||||
p = parallel.GreenPile(4)
|
||||
for i in xrange(10):
|
||||
@@ -299,55 +299,90 @@ def pressure(arg):
|
||||
return arg
|
||||
else:
|
||||
raise StressException(arg)
|
||||
|
||||
def passthru(arg):
|
||||
while r.random() < 0.5:
|
||||
eventlet.sleep(r.random() * 0.001)
|
||||
return arg
|
||||
|
||||
class Stress(tests.SilencedTestCase):
|
||||
# tests will take extra-long
|
||||
TEST_TIMEOUT=10
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def spawn_memory(self, concurrency):
|
||||
def spawn_order_check(self, concurrency):
|
||||
# checks that piles are strictly ordered
|
||||
# and bounded in memory
|
||||
p = parallel.GreenPile(concurrency)
|
||||
def makework(count, unique):
|
||||
for i in xrange(count):
|
||||
token = (unique, i)
|
||||
p.spawn(pressure, token)
|
||||
|
||||
eventlet.spawn(makework, 1000, 1)
|
||||
eventlet.spawn(makework, 1000, 2)
|
||||
eventlet.spawn(makework, 1000, 3)
|
||||
iters = 1000
|
||||
eventlet.spawn(makework, iters, 1)
|
||||
eventlet.spawn(makework, iters, 2)
|
||||
eventlet.spawn(makework, iters, 3)
|
||||
p.spawn(pressure, (0,0))
|
||||
latest = [-1] * 4
|
||||
received = 0
|
||||
it = iter(p)
|
||||
initial_obj_count = len(gc.get_objects())
|
||||
while True:
|
||||
try:
|
||||
i = it.next()
|
||||
received += 1
|
||||
if received % 10 == 0:
|
||||
gc.collect()
|
||||
objs_created = len(gc.get_objects()) - initial_obj_count
|
||||
self.assert_(objs_created < 200 * concurrency, objs_created)
|
||||
except StressException, exc:
|
||||
i = exc[0]
|
||||
except StopIteration:
|
||||
break
|
||||
received += 1
|
||||
if received % 5 == 0:
|
||||
api.sleep(0.0001)
|
||||
unique, order = i
|
||||
self.assert_(latest[unique] < order)
|
||||
latest[unique] = order
|
||||
for l in latest[1:]:
|
||||
self.assertEquals(l, iters - 1)
|
||||
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def test_memory_5(self):
|
||||
self.spawn_memory(5)
|
||||
def test_ordering_5(self):
|
||||
self.spawn_order_check(5)
|
||||
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def test_memory_50(self):
|
||||
self.spawn_memory(50)
|
||||
def test_ordering_50(self):
|
||||
self.spawn_order_check(50)
|
||||
|
||||
def imap_memory_check(self, concurrency):
|
||||
# checks that imap is strictly
|
||||
# ordered and consumes a constant amount of memory
|
||||
p = parallel.GreenPool(concurrency)
|
||||
count = 1000
|
||||
it = p.imap(passthru, xrange(count))
|
||||
latest = -1
|
||||
while True:
|
||||
try:
|
||||
i = it.next()
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
if latest == -1:
|
||||
gc.collect()
|
||||
initial_obj_count = len(gc.get_objects())
|
||||
self.assert_(i > latest)
|
||||
latest = i
|
||||
if latest % 5 == 0:
|
||||
api.sleep(0.001)
|
||||
if latest % 10 == 0:
|
||||
gc.collect()
|
||||
objs_created = len(gc.get_objects()) - initial_obj_count
|
||||
self.assert_(objs_created < 25 * concurrency, objs_created)
|
||||
# make sure we got to the end
|
||||
self.assertEquals(latest, count - 1)
|
||||
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def test_imap_50(self):
|
||||
self.imap_memory_check(50)
|
||||
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def test_memory_500(self):
|
||||
self.spawn_memory(50)
|
||||
def test_imap_500(self):
|
||||
self.imap_memory_check(500)
|
||||
|
||||
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
|
||||
def test_with_intpool(self):
|
||||
|
||||
Reference in New Issue
Block a user