From 3ddbba23de2ad1485d968bddbbb3e2866b4429c2 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 31 Dec 2009 10:35:58 -0800 Subject: [PATCH] Refactored imap to make it even simpler and fix the limitation, added test for multiple iterators, removed SilencedTestCase because there was no need for it anymore. --- eventlet/greenthread.py | 1 - eventlet/hubs/hub.py | 5 ++-- eventlet/parallel.py | 66 ++++++++++++++++++++--------------------- tests/__init__.py | 14 --------- tests/coros_test.py | 6 ++-- tests/greenio_test.py | 7 +++-- tests/parallel_test.py | 34 ++++++++++++++++++++- tests/test__hub.py | 4 +-- tests/test__proc.py | 12 ++++---- 9 files changed, 83 insertions(+), 66 deletions(-) diff --git a/eventlet/greenthread.py b/eventlet/greenthread.py index 5132f4a..20084fa 100644 --- a/eventlet/greenthread.py +++ b/eventlet/greenthread.py @@ -117,7 +117,6 @@ class GreenThread(greenlet.greenlet): # ca and ckw are the curried function arguments for f, ca, ckw in getattr(self, '_exit_funcs', []): f(exc=sys.exc_info(), *ca, **ckw) - raise else: self._exit_event.send(result) for f, ca, ckw in getattr(self, '_exit_funcs', []): diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index 250d80c..728514c 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -61,7 +61,6 @@ class BaseHub(object): 'exit': [], } self.lclass = FdListener - self.silent_timer_exceptions = False def add(self, evtype, fileno, cb): """ Signals an intent to or write a particular file descriptor. @@ -221,7 +220,7 @@ class BaseHub(object): self.squelch_observer_exception(observer, sys.exc_info()) def squelch_timer_exception(self, timer, exc_info): - if not self.silent_timer_exceptions: + if self.debug: traceback.print_exception(*exc_info) print >>sys.stderr, "Timer raised: %r" % (timer,) @@ -309,6 +308,6 @@ class BaseHub(object): self.lclass = FdListener def _getdebug(self): - return self.lclass == DebugListener + return self.lclass is DebugListener debug = property(_getdebug, _setdebug) \ No newline at end of file diff --git a/eventlet/parallel.py b/eventlet/parallel.py index c9168e5..c46df28 100644 --- a/eventlet/parallel.py +++ b/eventlet/parallel.py @@ -4,7 +4,16 @@ from eventlet import greenthread from eventlet import coros __all__ = ['GreenPool', 'GreenPile'] - + +try: + next +except NameError: + def next(it): + try: + return it.next() + except AttributeError: + raise TypeError("%s object is not an iterator" % type(it)) + class GreenPool(object): """ The GreenPool class is a pool of green threads. """ @@ -88,7 +97,7 @@ class GreenPool(object): self.sem.acquire() g = greenthread.spawn_n(self._spawn_n_impl, func, args, kwargs, coro=True) if not self.coroutines_running: - self.no_coros_running = coros.Event() + self.no_coros_running = greenthread.Event() self.coroutines_running.add(g) def waitall(self): @@ -112,14 +121,10 @@ class GreenPool(object): else: return 0 - def _do_imap(self, func, it, q): - while True: - try: - args = it.next() - q.send(self.spawn(func, *args)) - except StopIteration: - q.send(self.spawn(raise_stop_iteration)) - return + def _do_imap(self, func, it, gi): + for args in it: + gi.spawn(func, *args) + gi.spawn(raise_stop_iteration) def imap(self, function, *iterables): """This is the same as itertools.imap, except that *func* is @@ -127,30 +132,18 @@ class GreenPool(object): control. Using imap consumes a constant amount of memory, proportional to the size of the pool, and is thus suited for iterating over extremely long input lists. - - One caveat: if *function* raises an exception, the caller of imap - will see a StopIteration exception, not the actual raised exception. - This is a bug. """ if function is None: function = lambda *a: a it = itertools.izip(*iterables) - q = coros.Channel(max_size=self.size) - greenthread.spawn_n(self._do_imap, function, it, q) - while True: - # FIX: if wait() raises an exception the caller - # sees a stopiteration, should see the exception - yield q.wait().wait() - - -try: - next -except NameError: - def next(it): - try: - return it.next() - except AttributeError: - raise TypeError("%s object is not an iterator" % type(it)) + gi = GreenImap(self.size) + greenthread.spawn_n(self._do_imap, function, it, gi) + return gi + + +def raise_stop_iteration(): + raise StopIteration() + class GreenPile(object): """GreenPile is an abstraction representing a bunch of I/O-related tasks. @@ -161,11 +154,13 @@ class GreenPile(object): else: self.pool = GreenPool(size_or_pool) self.waiters = coros.Queue() + self.used = False self.counter = 0 def spawn(self, func, *args, **kw): """Runs *func* in its own green thread, with the result available by iterating over the GreenPile object.""" + self.used = True self.counter += 1 try: gt = self.pool.spawn(func, *args, **kw) @@ -180,13 +175,16 @@ class GreenPile(object): def next(self): """Wait for the next result, suspending the current coroutine until it is available. Raises StopIteration when there are no more results.""" - if self.counter == 0: + if self.counter == 0 and self.used: raise StopIteration() try: return self.waiters.wait().wait() finally: self.counter -= 1 - -def raise_stop_iteration(): - raise StopIteration() \ No newline at end of file +# this is identical to GreenPile but it blocks on spawn if the results +# aren't consumed +class GreenImap(GreenPile): + def __init__(self, size_or_pool): + super(GreenImap, self).__init__(size_or_pool) + self.waiters = coros.Channel(max_size=self.pool.size) \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py index d99f0fa..26d0406 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -106,20 +106,6 @@ class LimitedTestCase(unittest.TestCase): self.timer.cancel() -class SilencedTestCase(LimitedTestCase): - """ Subclass of LimitedTestCase that also silences the printing of timer - exceptions.""" - def setUp(self): - from eventlet import hubs - super(SilencedTestCase, self).setUp() - hubs.get_hub().silent_timer_exceptions = True - - def tearDown(self): - from eventlet import hubs - super(SilencedTestCase, self).tearDown() - hubs.get_hub().silent_timer_exceptions = False - - def find_command(command): for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep): p = os.path.join(dir, command) diff --git a/tests/coros_test.py b/tests/coros_test.py index a884ef4..35657a3 100644 --- a/tests/coros_test.py +++ b/tests/coros_test.py @@ -1,8 +1,8 @@ from unittest import main, TestCase -from tests import SilencedTestCase +from tests import LimitedTestCase from eventlet import coros, api -class TestEvent(SilencedTestCase): +class TestEvent(LimitedTestCase): def test_waiting_for_event(self): evt = coros.Event() value = 'some stuff' @@ -74,7 +74,7 @@ class IncrActor(coros.Actor): if evt: evt.send() -class TestActor(SilencedTestCase): +class TestActor(LimitedTestCase): mode = 'static' def setUp(self): super(TestActor, self).setUp() diff --git a/tests/greenio_test.py b/tests/greenio_test.py index 5f45132..5b58343 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -1,6 +1,6 @@ from tests import skipped, LimitedTestCase, skip_with_pyevent, TestIsTakingTooLong from unittest import main -from eventlet import api, util, coros, proc, greenio +from eventlet import api, util, coros, proc, greenio, hubs from eventlet.green.socket import GreenSSLObject import errno import os @@ -242,6 +242,8 @@ class TestGreenIo(LimitedTestCase): ssl_sock = ssl.wrap_socket(sock) def test_exception_squelching(self): + return # exception squelching disabled for now (greenthread doesn't + # re-raise exceptions to the hub) server = api.tcp_listener(('0.0.0.0', 0)) client = api.connect_tcp(('127.0.0.1', server.getsockname()[1])) client_2, addr = server.accept() @@ -260,7 +262,8 @@ class TestGreenIo(LimitedTestCase): api.sleep(0) finally: sys.stderr = orig - self.assert_('Traceback' in fake.getvalue()) + self.assert_('Traceback' in fake.getvalue(), + "Traceback not in:\n" + fake.getvalue()) if __name__ == '__main__': main() diff --git a/tests/parallel_test.py b/tests/parallel_test.py index 94b6b69..e035691 100644 --- a/tests/parallel_test.py +++ b/tests/parallel_test.py @@ -1,4 +1,5 @@ import gc +import itertools import os import random @@ -19,6 +20,10 @@ class Spawn(tests.LimitedTestCase): def passthru(a): eventlet.sleep(0.01) return a + +def passthru2(a, b): + eventlet.sleep(0.01) + return a,b class GreenPool(tests.LimitedTestCase): def test_spawn(self): @@ -252,7 +257,34 @@ class GreenPool(tests.LimitedTestCase): p = parallel.GreenPool(4) result_list = list(p.imap(None, xrange(10))) self.assertEquals(result_list, [(x,) for x in xrange(10)]) + + def test_imap_multi_args(self): + p = parallel.GreenPool(4) + result_list = list(p.imap(passthru2, xrange(10), xrange(10, 20))) + self.assertEquals(result_list, list(itertools.izip(xrange(10), xrange(10,20)))) + def test_imap_raises(self): + # testing the case where the function raises an exception; + # both that the caller sees that exception, and that the iterator + # continues to be usable to get the rest of the items + p = parallel.GreenPool(4) + def raiser(item): + if item == 1 or item == 7: + raise RuntimeError("intentional error") + else: + return item + it = p.imap(raiser, xrange(10)) + results = [] + while True: + try: + results.append(it.next()) + except RuntimeError: + results.append('r') + except StopIteration: + break + self.assertEquals(results, [0,'r',2,3,4,5,6,'r',8,9]) + + class GreenPile(tests.LimitedTestCase): def test_pile(self): p = parallel.GreenPile(4) @@ -305,7 +337,7 @@ def passthru(arg): eventlet.sleep(r.random() * 0.001) return arg -class Stress(tests.SilencedTestCase): +class Stress(tests.LimitedTestCase): # tests will take extra-long TEST_TIMEOUT=10 @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') diff --git a/tests/test__hub.py b/tests/test__hub.py index e00a960..b6a93e0 100644 --- a/tests/test__hub.py +++ b/tests/test__hub.py @@ -1,4 +1,4 @@ -from tests import LimitedTestCase, SilencedTestCase, main +from tests import LimitedTestCase, main import time from eventlet import api from eventlet import hubs @@ -39,7 +39,7 @@ class TestDebug(LimitedTestCase): self.assert_(not hubs.get_hub().debug) -class TestExceptionInMainloop(SilencedTestCase): +class TestExceptionInMainloop(LimitedTestCase): def test_sleep(self): # even if there was an error in the mainloop, the hub should continue to work start = time.time() diff --git a/tests/test__proc.py b/tests/test__proc.py index 370c6f1..8ffdc0b 100644 --- a/tests/test__proc.py +++ b/tests/test__proc.py @@ -2,14 +2,14 @@ import sys import unittest from eventlet.api import sleep, with_timeout from eventlet import api, proc, coros -from tests import SilencedTestCase, skipped +from tests import LimitedTestCase, skipped DELAY = 0.01 class ExpectedError(Exception): pass -class TestLink_Signal(SilencedTestCase): +class TestLink_Signal(LimitedTestCase): def test_send(self): s = proc.Source() @@ -48,7 +48,7 @@ class TestLink_Signal(SilencedTestCase): self.assertRaises(OSError, s.wait) -class TestProc(SilencedTestCase): +class TestProc(LimitedTestCase): def test_proc(self): p = proc.spawn(lambda : 100) @@ -76,13 +76,13 @@ class TestProc(SilencedTestCase): self.assertRaises(proc.LinkedCompleted, sleep, 0.1) -class TestCase(SilencedTestCase): +class TestCase(LimitedTestCase): def link(self, p, listener=None): getattr(p, self.link_method)(listener) def tearDown(self): - SilencedTestCase.tearDown(self) + LimitedTestCase.tearDown(self) self.p.unlink() def set_links(self, p, first_time, kill_exc_type): @@ -252,7 +252,7 @@ class TestRaise_link_exception(TestRaise_link): link_method = 'link_exception' -class TestStuff(SilencedTestCase): +class TestStuff(LimitedTestCase): def test_wait_noerrors(self): x = proc.spawn(lambda : 1)