Refactored imap to make it even simpler and fix the limitation, added test for multiple iterators, removed SilencedTestCase because there was no need for it anymore.

This commit is contained in:
Ryan Williams
2009-12-31 10:35:58 -08:00
parent 41d172c742
commit 3ddbba23de
9 changed files with 83 additions and 66 deletions

View File

@@ -117,7 +117,6 @@ class GreenThread(greenlet.greenlet):
# ca and ckw are the curried function arguments
for f, ca, ckw in getattr(self, '_exit_funcs', []):
f(exc=sys.exc_info(), *ca, **ckw)
raise
else:
self._exit_event.send(result)
for f, ca, ckw in getattr(self, '_exit_funcs', []):

View File

@@ -61,7 +61,6 @@ class BaseHub(object):
'exit': [],
}
self.lclass = FdListener
self.silent_timer_exceptions = False
def add(self, evtype, fileno, cb):
""" Signals an intent to or write a particular file descriptor.
@@ -221,7 +220,7 @@ class BaseHub(object):
self.squelch_observer_exception(observer, sys.exc_info())
def squelch_timer_exception(self, timer, exc_info):
if not self.silent_timer_exceptions:
if self.debug:
traceback.print_exception(*exc_info)
print >>sys.stderr, "Timer raised: %r" % (timer,)
@@ -309,6 +308,6 @@ class BaseHub(object):
self.lclass = FdListener
def _getdebug(self):
return self.lclass == DebugListener
return self.lclass is DebugListener
debug = property(_getdebug, _setdebug)

View File

@@ -4,7 +4,16 @@ from eventlet import greenthread
from eventlet import coros
__all__ = ['GreenPool', 'GreenPile']
try:
next
except NameError:
def next(it):
try:
return it.next()
except AttributeError:
raise TypeError("%s object is not an iterator" % type(it))
class GreenPool(object):
""" The GreenPool class is a pool of green threads.
"""
@@ -88,7 +97,7 @@ class GreenPool(object):
self.sem.acquire()
g = greenthread.spawn_n(self._spawn_n_impl, func, args, kwargs, coro=True)
if not self.coroutines_running:
self.no_coros_running = coros.Event()
self.no_coros_running = greenthread.Event()
self.coroutines_running.add(g)
def waitall(self):
@@ -112,14 +121,10 @@ class GreenPool(object):
else:
return 0
def _do_imap(self, func, it, q):
while True:
try:
args = it.next()
q.send(self.spawn(func, *args))
except StopIteration:
q.send(self.spawn(raise_stop_iteration))
return
def _do_imap(self, func, it, gi):
for args in it:
gi.spawn(func, *args)
gi.spawn(raise_stop_iteration)
def imap(self, function, *iterables):
"""This is the same as itertools.imap, except that *func* is
@@ -127,30 +132,18 @@ class GreenPool(object):
control. Using imap consumes a constant amount of memory,
proportional to the size of the pool, and is thus suited for iterating
over extremely long input lists.
One caveat: if *function* raises an exception, the caller of imap
will see a StopIteration exception, not the actual raised exception.
This is a bug.
"""
if function is None:
function = lambda *a: a
it = itertools.izip(*iterables)
q = coros.Channel(max_size=self.size)
greenthread.spawn_n(self._do_imap, function, it, q)
while True:
# FIX: if wait() raises an exception the caller
# sees a stopiteration, should see the exception
yield q.wait().wait()
try:
next
except NameError:
def next(it):
try:
return it.next()
except AttributeError:
raise TypeError("%s object is not an iterator" % type(it))
gi = GreenImap(self.size)
greenthread.spawn_n(self._do_imap, function, it, gi)
return gi
def raise_stop_iteration():
raise StopIteration()
class GreenPile(object):
"""GreenPile is an abstraction representing a bunch of I/O-related tasks.
@@ -161,11 +154,13 @@ class GreenPile(object):
else:
self.pool = GreenPool(size_or_pool)
self.waiters = coros.Queue()
self.used = False
self.counter = 0
def spawn(self, func, *args, **kw):
"""Runs *func* in its own green thread, with the result available by
iterating over the GreenPile object."""
self.used = True
self.counter += 1
try:
gt = self.pool.spawn(func, *args, **kw)
@@ -180,13 +175,16 @@ class GreenPile(object):
def next(self):
"""Wait for the next result, suspending the current coroutine until it
is available. Raises StopIteration when there are no more results."""
if self.counter == 0:
if self.counter == 0 and self.used:
raise StopIteration()
try:
return self.waiters.wait().wait()
finally:
self.counter -= 1
def raise_stop_iteration():
raise StopIteration()
# this is identical to GreenPile but it blocks on spawn if the results
# aren't consumed
class GreenImap(GreenPile):
def __init__(self, size_or_pool):
super(GreenImap, self).__init__(size_or_pool)
self.waiters = coros.Channel(max_size=self.pool.size)

View File

@@ -106,20 +106,6 @@ class LimitedTestCase(unittest.TestCase):
self.timer.cancel()
class SilencedTestCase(LimitedTestCase):
""" Subclass of LimitedTestCase that also silences the printing of timer
exceptions."""
def setUp(self):
from eventlet import hubs
super(SilencedTestCase, self).setUp()
hubs.get_hub().silent_timer_exceptions = True
def tearDown(self):
from eventlet import hubs
super(SilencedTestCase, self).tearDown()
hubs.get_hub().silent_timer_exceptions = False
def find_command(command):
for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
p = os.path.join(dir, command)

View File

@@ -1,8 +1,8 @@
from unittest import main, TestCase
from tests import SilencedTestCase
from tests import LimitedTestCase
from eventlet import coros, api
class TestEvent(SilencedTestCase):
class TestEvent(LimitedTestCase):
def test_waiting_for_event(self):
evt = coros.Event()
value = 'some stuff'
@@ -74,7 +74,7 @@ class IncrActor(coros.Actor):
if evt: evt.send()
class TestActor(SilencedTestCase):
class TestActor(LimitedTestCase):
mode = 'static'
def setUp(self):
super(TestActor, self).setUp()

View File

@@ -1,6 +1,6 @@
from tests import skipped, LimitedTestCase, skip_with_pyevent, TestIsTakingTooLong
from unittest import main
from eventlet import api, util, coros, proc, greenio
from eventlet import api, util, coros, proc, greenio, hubs
from eventlet.green.socket import GreenSSLObject
import errno
import os
@@ -242,6 +242,8 @@ class TestGreenIo(LimitedTestCase):
ssl_sock = ssl.wrap_socket(sock)
def test_exception_squelching(self):
return # exception squelching disabled for now (greenthread doesn't
# re-raise exceptions to the hub)
server = api.tcp_listener(('0.0.0.0', 0))
client = api.connect_tcp(('127.0.0.1', server.getsockname()[1]))
client_2, addr = server.accept()
@@ -260,7 +262,8 @@ class TestGreenIo(LimitedTestCase):
api.sleep(0)
finally:
sys.stderr = orig
self.assert_('Traceback' in fake.getvalue())
self.assert_('Traceback' in fake.getvalue(),
"Traceback not in:\n" + fake.getvalue())
if __name__ == '__main__':
main()

View File

@@ -1,4 +1,5 @@
import gc
import itertools
import os
import random
@@ -19,6 +20,10 @@ class Spawn(tests.LimitedTestCase):
def passthru(a):
eventlet.sleep(0.01)
return a
def passthru2(a, b):
eventlet.sleep(0.01)
return a,b
class GreenPool(tests.LimitedTestCase):
def test_spawn(self):
@@ -252,7 +257,34 @@ class GreenPool(tests.LimitedTestCase):
p = parallel.GreenPool(4)
result_list = list(p.imap(None, xrange(10)))
self.assertEquals(result_list, [(x,) for x in xrange(10)])
def test_imap_multi_args(self):
p = parallel.GreenPool(4)
result_list = list(p.imap(passthru2, xrange(10), xrange(10, 20)))
self.assertEquals(result_list, list(itertools.izip(xrange(10), xrange(10,20))))
def test_imap_raises(self):
# testing the case where the function raises an exception;
# both that the caller sees that exception, and that the iterator
# continues to be usable to get the rest of the items
p = parallel.GreenPool(4)
def raiser(item):
if item == 1 or item == 7:
raise RuntimeError("intentional error")
else:
return item
it = p.imap(raiser, xrange(10))
results = []
while True:
try:
results.append(it.next())
except RuntimeError:
results.append('r')
except StopIteration:
break
self.assertEquals(results, [0,'r',2,3,4,5,6,'r',8,9])
class GreenPile(tests.LimitedTestCase):
def test_pile(self):
p = parallel.GreenPile(4)
@@ -305,7 +337,7 @@ def passthru(arg):
eventlet.sleep(r.random() * 0.001)
return arg
class Stress(tests.SilencedTestCase):
class Stress(tests.LimitedTestCase):
# tests will take extra-long
TEST_TIMEOUT=10
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')

View File

@@ -1,4 +1,4 @@
from tests import LimitedTestCase, SilencedTestCase, main
from tests import LimitedTestCase, main
import time
from eventlet import api
from eventlet import hubs
@@ -39,7 +39,7 @@ class TestDebug(LimitedTestCase):
self.assert_(not hubs.get_hub().debug)
class TestExceptionInMainloop(SilencedTestCase):
class TestExceptionInMainloop(LimitedTestCase):
def test_sleep(self):
# even if there was an error in the mainloop, the hub should continue to work
start = time.time()

View File

@@ -2,14 +2,14 @@ import sys
import unittest
from eventlet.api import sleep, with_timeout
from eventlet import api, proc, coros
from tests import SilencedTestCase, skipped
from tests import LimitedTestCase, skipped
DELAY = 0.01
class ExpectedError(Exception):
pass
class TestLink_Signal(SilencedTestCase):
class TestLink_Signal(LimitedTestCase):
def test_send(self):
s = proc.Source()
@@ -48,7 +48,7 @@ class TestLink_Signal(SilencedTestCase):
self.assertRaises(OSError, s.wait)
class TestProc(SilencedTestCase):
class TestProc(LimitedTestCase):
def test_proc(self):
p = proc.spawn(lambda : 100)
@@ -76,13 +76,13 @@ class TestProc(SilencedTestCase):
self.assertRaises(proc.LinkedCompleted, sleep, 0.1)
class TestCase(SilencedTestCase):
class TestCase(LimitedTestCase):
def link(self, p, listener=None):
getattr(p, self.link_method)(listener)
def tearDown(self):
SilencedTestCase.tearDown(self)
LimitedTestCase.tearDown(self)
self.p.unlink()
def set_links(self, p, first_time, kill_exc_type):
@@ -252,7 +252,7 @@ class TestRaise_link_exception(TestRaise_link):
link_method = 'link_exception'
class TestStuff(SilencedTestCase):
class TestStuff(LimitedTestCase):
def test_wait_noerrors(self):
x = proc.spawn(lambda : 1)