diff --git a/eventlet/__init__.py b/eventlet/__init__.py index 91ab2aa..48e0808 100644 --- a/eventlet/__init__.py +++ b/eventlet/__init__.py @@ -1,2 +1,16 @@ version_info = (0, 9, '3pre') __version__ = '%s.%s.%s' % version_info + +from eventlet import greenthread +from eventlet import parallel + +__all__ = ['sleep', 'spawn', 'spawn_n', 'Event', 'GreenPool', 'GreenPile'] + +sleep = greenthread.sleep + +spawn = greenthread.spawn +spawn_n = greenthread.spawn_n +Event = greenthread.Event + +GreenPool = parallel.GreenPool +GreenPile = parallel.GreenPile \ No newline at end of file diff --git a/eventlet/api.py b/eventlet/api.py index f3e79fe..692a9e2 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -119,6 +119,10 @@ def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError t.cancel() +from eventlet import greenthread +spawn = greenthread.spawn +spawn_n = greenthread.spawn_n + def _spawn_startup(cb, args, kw, cancel=None): try: greenlet.getcurrent().parent.switch() @@ -128,69 +132,6 @@ def _spawn_startup(cb, args, kw, cancel=None): cancel() return cb(*args, **kw) - -class GreenThread(Greenlet): - def __init__(self, parent): - Greenlet.__init__(self, self.main, parent) - from eventlet import coros - self._exit_event = coros.Event() - - def wait(self): - return self._exit_event.wait() - - def link(self, func, *curried_args, **curried_kwargs): - """ Set up a function to be called with the results of the GreenThread. - - The function must have the following signature: - def f(result=None, exc=None, [curried args/kwargs]): - """ - self._exit_funcs = getattr(self, '_exit_funcs', []) - self._exit_funcs.append((func, curried_args, curried_kwargs)) - - def main(self, function, args, kwargs): - try: - result = function(*args, **kwargs) - except: - self._exit_event.send_exception(*sys.exc_info()) - # ca and ckw are the curried function arguments - for f, ca, ckw in getattr(self, '_exit_funcs', []): - f(exc=sys.exc_info(), *ca, **ckw) - raise - else: - self._exit_event.send(result) - for f, ca, ckw in getattr(self, '_exit_funcs', []): - f(result, *ca, **ckw) - - -def spawn(func, *args, **kwargs): - """Create a green thread to run func(*args, **kwargs). Returns a GreenThread - object which you can use to get the results of the call. - """ - hub = get_hub_() - g = GreenThread(hub.greenlet) - hub.schedule_call_global(0, g.switch, func, args, kwargs) - return g - - -def _main_wrapper(func, args, kwargs): - # function that gets around the fact that greenlet.switch - # doesn't accept keyword arguments - return func(*args, **kwargs) - -def spawn_n(func, *args, **kwargs): - """Same as spawn, but returns a greenlet object from which it is not possible - to retrieve the results. This is slightly faster than spawn; it is fastest - if there are no keyword arguments.""" - hub = get_hub_() - if kwargs: - g = Greenlet(_main_wrapper, parent=hub.greenlet) - hub.schedule_call_global(0, g.switch, func, args, kwargs) - else: - g = Greenlet(func, parent=hub.greenlet) - hub.schedule_call_global(0, g.switch, *args) - return g - - def kill(g, *throw_args): get_hub_().schedule_call_global(0, g.throw, *throw_args) if getcurrent() is not get_hub_().greenlet: @@ -362,26 +303,9 @@ def exc_after(seconds, *throw_args): timer.cancel() """ return call_after(seconds, getcurrent().throw, *throw_args) - -def sleep(seconds=0): - """Yield control to another eligible coroutine until at least *seconds* have - elapsed. - - *seconds* may be specified as an integer, or a float if fractional seconds - are desired. Calling :func:`~eventlet.api.sleep` with *seconds* of 0 is the - canonical way of expressing a cooperative yield. For example, if one is - looping over a large list performing an expensive calculation without - calling any socket methods, it's a good idea to call ``sleep(0)`` - occasionally; otherwise nothing else will run. - """ - hub = get_hub_() - assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop' - timer = hub.schedule_call_global(seconds, greenlet.getcurrent().switch) - try: - hub.switch() - finally: - timer.cancel() - + + +sleep = greenthread.sleep getcurrent = greenlet.getcurrent GreenletExit = greenlet.GreenletExit diff --git a/eventlet/coros.py b/eventlet/coros.py index 63288bd..b0db19e 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -5,11 +5,7 @@ import warnings from eventlet import api from eventlet import hubs - - -class Cancelled(RuntimeError): - pass - +from eventlet import greenthread class NOT_USED: def __repr__(self): @@ -17,179 +13,19 @@ class NOT_USED: NOT_USED = NOT_USED() -class Event(object): - """An abstraction where an arbitrary number of coroutines - can wait for one event from another. +def Event(*a, **kw): + warnings.warn("The Event class has been moved to the greenthread module! " + "Please construct greenthread.Event objects instead.", + DeprecationWarning, stacklevel=2) + return greenthread.Event(*a, **kw) - Events differ from channels in two ways: - - 1. calling :meth:`send` does not unschedule the current coroutine - 2. :meth:`send` can only be called once; use :meth:`reset` to prepare the - event for another :meth:`send` - - They are ideal for communicating return values between coroutines. - - >>> from eventlet import coros, api - >>> evt = coros.Event() - >>> def baz(b): - ... evt.send(b + 1) - ... - >>> _ = api.spawn(baz, 3) - >>> evt.wait() - 4 - """ - _result = None - def __init__(self): - self._waiters = set() - self.reset() - - def __str__(self): - params = (self.__class__.__name__, hex(id(self)), self._result, self._exc, len(self._waiters)) - return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params - - def reset(self): - """ Reset this event so it can be used to send again. - Can only be called after :meth:`send` has been called. - - >>> from eventlet import coros - >>> evt = coros.Event() - >>> evt.send(1) - >>> evt.reset() - >>> evt.send(2) - >>> evt.wait() - 2 - - Calling reset multiple times in a row is an error. - - >>> evt.reset() - >>> evt.reset() - Traceback (most recent call last): - ... - AssertionError: Trying to re-reset() a fresh event. - - """ - assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.' - self.epoch = time.time() - self._result = NOT_USED - self._exc = None - - def ready(self): - """ Return true if the :meth:`wait` call will return immediately. - Used to avoid waiting for things that might take a while to time out. - For example, you can put a bunch of events into a list, and then visit - them all repeatedly, calling :meth:`ready` until one returns ``True``, - and then you can :meth:`wait` on that one.""" - return self._result is not NOT_USED - - def has_exception(self): - return self._exc is not None - - def has_result(self): - return self._result is not NOT_USED and self._exc is None - - def poll(self, notready=None): - if self.ready(): - return self.wait() - return notready - - # QQQ make it return tuple (type, value, tb) instead of raising - # because - # 1) "poll" does not imply raising - # 2) it's better not to screw up caller's sys.exc_info() by default - # (e.g. if caller wants to calls the function in except or finally) - def poll_exception(self, notready=None): - if self.has_exception(): - return self.wait() - return notready - - def poll_result(self, notready=None): - if self.has_result(): - return self.wait() - return notready - - def wait(self): - """Wait until another coroutine calls :meth:`send`. - Returns the value the other coroutine passed to - :meth:`send`. - - >>> from eventlet import coros, api - >>> evt = coros.Event() - >>> def wait_on(): - ... retval = evt.wait() - ... print "waited for", retval - >>> _ = api.spawn(wait_on) - >>> evt.send('result') - >>> api.sleep(0) - waited for result - - Returns immediately if the event has already - occured. - - >>> evt.wait() - 'result' - """ - if self._result is NOT_USED: - self._waiters.add(api.getcurrent()) - try: - return hubs.get_hub().switch() - finally: - self._waiters.discard(api.getcurrent()) - if self._exc is not None: - api.getcurrent().throw(*self._exc) - return self._result - - def send(self, result=None, exc=None): - """Makes arrangements for the waiters to be woken with the - result and then returns immediately to the parent. - - >>> from eventlet import coros, api - >>> evt = coros.Event() - >>> def waiter(): - ... print 'about to wait' - ... result = evt.wait() - ... print 'waited for', result - >>> _ = api.spawn(waiter) - >>> api.sleep(0) - about to wait - >>> evt.send('a') - >>> api.sleep(0) - waited for a - - It is an error to call :meth:`send` multiple times on the same event. - - >>> evt.send('whoops') - Traceback (most recent call last): - ... - AssertionError: Trying to re-send() an already-triggered event. - - Use :meth:`reset` between :meth:`send` s to reuse an event object. - """ - assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.' - self._result = result - if exc is not None and not isinstance(exc, tuple): - exc = (exc, ) - self._exc = exc - hub = hubs.get_hub() - if self._waiters: - hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy()) - - def _do_send(self, result, exc, waiters): - while waiters: - waiter = waiters.pop() - if waiter in self._waiters: - if exc is None: - waiter.switch(result) - else: - waiter.throw(*exc) - - def send_exception(self, *args): - # the arguments and the same as for greenlet.throw - return self.send(None, args) def event(*a, **kw): - warnings.warn("The event class has been capitalized! Please construct" - " Event objects instead.", DeprecationWarning, stacklevel=2) - return Event(*a, **kw) + warnings.warn("The event class has been capitalized and moved! Please " + "construct greenthread.Event objects instead.", + DeprecationWarning, stacklevel=2) + return greenthread.Event(*a, **kw) + class Semaphore(object): """An unbounded semaphore. @@ -348,7 +184,7 @@ class metaphore(object): """ def __init__(self): self.counter = 0 - self.event = Event() + self.event = greenthread.Event() # send() right away, else we'd wait on the default 0 count! self.event.send() @@ -397,14 +233,14 @@ def execute(func, *args, **kw): >>> evt.wait() ('foo', 1) """ - evt = Event() - def _really_execute(): - evt.send(func(*args, **kw)) - api.spawn(_really_execute) - return evt + warnings.warn("Coros.execute is deprecated. Please use eventlet.spawn " + "instead.", DeprecationWarning, stacklevel=2) + return greenthread.spawn(func, *args, **kw) def CoroutinePool(*args, **kwargs): + warnings.warn("CoroutinePool is deprecated. Please use " + "eventlet.GreenPool instead.", DeprecationWarning, stacklevel=2) from eventlet.pool import Pool return Pool(*args, **kwargs) @@ -595,7 +431,7 @@ class Actor(object): serially. """ self._mailbox = collections.deque() - self._event = Event() + self._event = greenthread.Event() self._killer = api.spawn(self.run_forever) self._pool = CoroutinePool(min_size=0, max_size=concurrency) @@ -604,7 +440,7 @@ class Actor(object): while True: if not self._mailbox: self._event.wait() - self._event = Event() + self._event = greenthread.Event() else: # leave the message in the mailbox until after it's # been processed so the event doesn't get triggered @@ -645,7 +481,7 @@ class Actor(object): coroutine in a predictable manner, but this kinda defeats the point of the :class:`Actor`, so don't do it in a real application. - >>> evt = Event() + >>> evt = greenthread.Event() >>> a.cast( ("message 1", evt) ) >>> evt.wait() # force it to run at this exact moment received message 1 diff --git a/eventlet/greenthread.py b/eventlet/greenthread.py new file mode 100644 index 0000000..f496b1a --- /dev/null +++ b/eventlet/greenthread.py @@ -0,0 +1,267 @@ +import sys + +from eventlet import hubs +from eventlet.support import greenlets as greenlet + +__all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'GreenThread', 'Event'] + +getcurrent = greenlet.getcurrent + +def sleep(seconds=0): + """Yield control to another eligible coroutine until at least *seconds* have + elapsed. + + *seconds* may be specified as an integer, or a float if fractional seconds + are desired. Calling :func:`~eventlet.api.sleep` with *seconds* of 0 is the + canonical way of expressing a cooperative yield. For example, if one is + looping over a large list performing an expensive calculation without + calling any socket methods, it's a good idea to call ``sleep(0)`` + occasionally; otherwise nothing else will run. + """ + hub = hubs.get_hub() + assert hub.greenlet is not greenlet.getcurrent(), 'do not call blocking functions from the mainloop' + timer = hub.schedule_call_global(seconds, greenlet.getcurrent().switch) + try: + hub.switch() + finally: + timer.cancel() + + +def spawn(func, *args, **kwargs): + """Create a green thread to run func(*args, **kwargs). Returns a GreenThread + object which you can use to get the results of the call. + """ + hub = hubs.get_hub() + g = GreenThread(hub.greenlet) + hub.schedule_call_global(0, g.switch, func, args, kwargs) + return g + + +def _main_wrapper(func, args, kwargs): + # function that gets around the fact that greenlet.switch + # doesn't accept keyword arguments + return func(*args, **kwargs) + + +def spawn_n(func, *args, **kwargs): + """Same as spawn, but returns a greenlet object from which it is not + possible to retrieve the results. This is slightly faster than spawn; it is + fastest if there are no keyword arguments.""" + hub = hubs.get_hub() + if kwargs: + g = greenlet.greenlet(_main_wrapper, parent=hub.greenlet) + hub.schedule_call_global(0, g.switch, func, args, kwargs) + else: + g = greenlet.greenlet(func, parent=hub.greenlet) + hub.schedule_call_global(0, g.switch, *args) + return g + + +class GreenThread(greenlet.greenlet): + def __init__(self, parent): + greenlet.greenlet.__init__(self, self.main, parent) + self._exit_event = Event() + + def wait(self): + return self._exit_event.wait() + + def link(self, func, *curried_args, **curried_kwargs): + """ Set up a function to be called with the results of the GreenThread. + + The function must have the following signature: + def f(result=None, exc=None, [curried args/kwargs]): + """ + self._exit_funcs = getattr(self, '_exit_funcs', []) + self._exit_funcs.append((func, curried_args, curried_kwargs)) + + def main(self, function, args, kwargs): + try: + result = function(*args, **kwargs) + except: + self._exit_event.send_exception(*sys.exc_info()) + # ca and ckw are the curried function arguments + for f, ca, ckw in getattr(self, '_exit_funcs', []): + f(exc=sys.exc_info(), *ca, **ckw) + raise + else: + self._exit_event.send(result) + for f, ca, ckw in getattr(self, '_exit_funcs', []): + f(result, *ca, **ckw) + + + +class NOT_USED: + def __repr__(self): + return 'NOT_USED' + +NOT_USED = NOT_USED() + +class Event(object): + """An abstraction where an arbitrary number of coroutines + can wait for one event from another. + + Events differ from channels in two ways: + + 1. calling :meth:`send` does not unschedule the current coroutine + 2. :meth:`send` can only be called once; use :meth:`reset` to prepare the + event for another :meth:`send` + + They are ideal for communicating return values between coroutines. + + >>> from eventlet import coros, api + >>> evt = coros.Event() + >>> def baz(b): + ... evt.send(b + 1) + ... + >>> _ = api.spawn(baz, 3) + >>> evt.wait() + 4 + """ + _result = None + def __init__(self): + self._waiters = set() + self.reset() + + def __str__(self): + params = (self.__class__.__name__, hex(id(self)), self._result, self._exc, len(self._waiters)) + return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params + + def reset(self): + """ Reset this event so it can be used to send again. + Can only be called after :meth:`send` has been called. + + >>> from eventlet import coros + >>> evt = coros.Event() + >>> evt.send(1) + >>> evt.reset() + >>> evt.send(2) + >>> evt.wait() + 2 + + Calling reset multiple times in a row is an error. + + >>> evt.reset() + >>> evt.reset() + Traceback (most recent call last): + ... + AssertionError: Trying to re-reset() a fresh event. + + """ + assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.' + self._result = NOT_USED + self._exc = None + + def ready(self): + """ Return true if the :meth:`wait` call will return immediately. + Used to avoid waiting for things that might take a while to time out. + For example, you can put a bunch of events into a list, and then visit + them all repeatedly, calling :meth:`ready` until one returns ``True``, + and then you can :meth:`wait` on that one.""" + return self._result is not NOT_USED + + def has_exception(self): + return self._exc is not None + + def has_result(self): + return self._result is not NOT_USED and self._exc is None + + def poll(self, notready=None): + if self.ready(): + return self.wait() + return notready + + # QQQ make it return tuple (type, value, tb) instead of raising + # because + # 1) "poll" does not imply raising + # 2) it's better not to screw up caller's sys.exc_info() by default + # (e.g. if caller wants to calls the function in except or finally) + def poll_exception(self, notready=None): + if self.has_exception(): + return self.wait() + return notready + + def poll_result(self, notready=None): + if self.has_result(): + return self.wait() + return notready + + def wait(self): + """Wait until another coroutine calls :meth:`send`. + Returns the value the other coroutine passed to + :meth:`send`. + + >>> from eventlet import coros, api + >>> evt = coros.Event() + >>> def wait_on(): + ... retval = evt.wait() + ... print "waited for", retval + >>> _ = api.spawn(wait_on) + >>> evt.send('result') + >>> api.sleep(0) + waited for result + + Returns immediately if the event has already + occured. + + >>> evt.wait() + 'result' + """ + current = getcurrent() + if self._result is NOT_USED: + self._waiters.add(current) + try: + return hubs.get_hub().switch() + finally: + self._waiters.discard(current) + if self._exc is not None: + current.throw(*self._exc) + return self._result + + def send(self, result=None, exc=None): + """Makes arrangements for the waiters to be woken with the + result and then returns immediately to the parent. + + >>> from eventlet import coros, api + >>> evt = coros.Event() + >>> def waiter(): + ... print 'about to wait' + ... result = evt.wait() + ... print 'waited for', result + >>> _ = api.spawn(waiter) + >>> api.sleep(0) + about to wait + >>> evt.send('a') + >>> api.sleep(0) + waited for a + + It is an error to call :meth:`send` multiple times on the same event. + + >>> evt.send('whoops') + Traceback (most recent call last): + ... + AssertionError: Trying to re-send() an already-triggered event. + + Use :meth:`reset` between :meth:`send` s to reuse an event object. + """ + assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.' + self._result = result + if exc is not None and not isinstance(exc, tuple): + exc = (exc, ) + self._exc = exc + hub = hubs.get_hub() + if self._waiters: + hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy()) + + def _do_send(self, result, exc, waiters): + while waiters: + waiter = waiters.pop() + if waiter in self._waiters: + if exc is None: + waiter.switch(result) + else: + waiter.throw(*exc) + + def send_exception(self, *args): + # the arguments and the same as for greenlet.throw + return self.send(None, args) + \ No newline at end of file diff --git a/eventlet/parallel.py b/eventlet/parallel.py index 1b6bd86..9ebfe6c 100644 --- a/eventlet/parallel.py +++ b/eventlet/parallel.py @@ -1,4 +1,4 @@ -from eventlet import api +from eventlet import greenthread from eventlet import coros __all__ = ['GreenPool', 'GreenPile'] @@ -10,7 +10,7 @@ class GreenPool(object): self.size = size self.coroutines_running = set() self.sem = coros.Semaphore(size) - self.no_coros_running = coros.Event() + self.no_coros_running = greenthread.Event() def resize(self, new_size): """ Change the max number of coroutines doing work at any given time. @@ -36,35 +36,58 @@ class GreenPool(object): def spawn(self, func, *args, **kwargs): """Run func(*args, **kwargs) in its own green thread. Returns the - GreenThread object that is running the function. + GreenThread object that is running the function, which can be used + to retrieve the results. """ - return self._spawn(func, *args, **kwargs) - - def spawn_n(self, func, *args, **kwargs): - """ Create a coroutine to run func(*args, **kwargs). - - Returns None; the results of the function are not retrievable. - The results of the function are not put into the results() iterator. - """ - self._spawn(func, *args, **kwargs) - - def _spawn(self, func, *args, **kwargs): # if reentering an empty pool, don't try to wait on a coroutine freeing # itself -- instead, just execute in the current coroutine - current = api.getcurrent() + current = greenthread.getcurrent() if self.sem.locked() and current in self.coroutines_running: # a bit hacky to use the GT without switching to it - gt = api.GreenThread(current) + gt = greenthread.GreenThread(current) gt.main(func, args, kwargs) return gt else: self.sem.acquire() - gt = api.spawn(func, *args, **kwargs) + gt = greenthread.spawn(func, *args, **kwargs) if not self.coroutines_running: - self.no_coros_running = coros.Event() + self.no_coros_running = greenthread.Event() self.coroutines_running.add(gt) gt.link(self._spawn_done, coro=gt) return gt + + def _spawn_n_impl(self, func, args, kwargs, coro=None): + try: + try: + func(*args, **kwargs) + except (KeyboardInterrupt, SystemExit): + raise + except: + # TODO in debug mode print these + pass + finally: + if coro is None: + return + else: + coro = greenthread.getcurrent() + self._spawn_done(coro=coro) + + def spawn_n(self, func, *args, **kwargs): + """ Create a coroutine to run func(*args, **kwargs). + + Returns None; the results of the function are not retrievable. + """ + # if reentering an empty pool, don't try to wait on a coroutine freeing + # itself -- instead, just execute in the current coroutine + current = greenthread.getcurrent() + if self.sem.locked() and current in self.coroutines_running: + self._spawn_n_impl(func, args, kwargs) + else: + self.sem.acquire() + g = greenthread.spawn_n(self._spawn_n_impl, func, args, kwargs, coro=True) + if not self.coroutines_running: + self.no_coros_running = coros.Event() + self.coroutines_running.add(g) def waitall(self): """Waits until all coroutines in the pool are finished working.""" @@ -72,7 +95,8 @@ class GreenPool(object): def _spawn_done(self, result=None, exc=None, coro=None): self.sem.release() - self.coroutines_running.remove(coro) + if coro is not None: + self.coroutines_running.remove(coro) # if done processing (no more work is waiting for processing), # send StopIteration so that the queue knows it's done if self.sem.balance == self.size: @@ -92,7 +116,7 @@ try: except NameError: def next(it): try: - it.next() + return it.next() except AttributeError: raise TypeError("%s object is not an iterator" % type(it)) @@ -157,5 +181,5 @@ class GreenPile(object): # iterates over us self.spawn(lambda: next(iter([]))) # spin off a coroutine to launch the rest of the items - api.spawn(self._do_map, function, it) + greenthread.spawn(self._do_map, function, it) return self diff --git a/eventlet/pool.py b/eventlet/pool.py index d22e8ff..841de47 100644 --- a/eventlet/pool.py +++ b/eventlet/pool.py @@ -1,8 +1,11 @@ -# replacement of CoroutinePool implemented with proc module from eventlet import coros, proc, api -class Pool(object): +import warnings +warnings.warn("The pool module is deprecated. Please use the " + "eventlet.GreenPool and eventlet.GreenPile classes instead.", + DeprecationWarning, stacklevel=2) +class Pool(object): def __init__(self, min_size=0, max_size=4, track_events=False): if min_size > max_size: raise ValueError('min_size cannot be bigger than max_size') diff --git a/eventlet/twistedutil/protocol.py b/eventlet/twistedutil/protocol.py index 382913c..ae2ec50 100644 --- a/eventlet/twistedutil/protocol.py +++ b/eventlet/twistedutil/protocol.py @@ -8,7 +8,8 @@ from twisted.python import failure from eventlet import proc from eventlet.api import getcurrent -from eventlet.coros import Queue, Event +from eventlet.coros import Queue +from eventlet.greenthread import Event class ValueQueue(Queue): diff --git a/tests/parallel_test.py b/tests/parallel_test.py index 22afcae..b32608c 100644 --- a/tests/parallel_test.py +++ b/tests/parallel_test.py @@ -1,7 +1,10 @@ import gc +import os import random -from eventlet import api, hubs, parallel, coros +import eventlet +from eventlet import api +from eventlet import hubs, parallel, coros import tests class Spawn(tests.LimitedTestCase): @@ -10,11 +13,11 @@ class Spawn(tests.LimitedTestCase): def f(a, b=None): return (a,b) - gt = parallel.api. spawn(f, 1, b=2) + gt = eventlet.spawn(f, 1, b=2) self.assertEquals(gt.wait(), (1,2)) def passthru(a): - api.sleep(0.01) + eventlet.sleep(0.01) return a class GreenPool(tests.LimitedTestCase): @@ -30,7 +33,7 @@ class GreenPool(tests.LimitedTestCase): p = parallel.GreenPool(4) results_closure = [] def do_something(a): - api.sleep(0.01) + eventlet.sleep(0.01) results_closure.append(a) for i in xrange(10): p.spawn(do_something, i) @@ -48,14 +51,14 @@ class GreenPool(tests.LimitedTestCase): waiters = [] self.assertEqual(pool.running(), 0) - waiters.append(api.spawn(waiter, pool)) - api.sleep(0) + waiters.append(eventlet.spawn(waiter, pool)) + eventlet.sleep(0) self.assertEqual(pool.waiting(), 0) - waiters.append(api.spawn(waiter, pool)) - api.sleep(0) + waiters.append(eventlet.spawn(waiter, pool)) + eventlet.sleep(0) self.assertEqual(pool.waiting(), 1) - waiters.append(api.spawn(waiter, pool)) - api.sleep(0) + waiters.append(eventlet.spawn(waiter, pool)) + eventlet.sleep(0) self.assertEqual(pool.waiting(), 2) self.assertEqual(pool.running(), 1) done.send(None) @@ -92,8 +95,8 @@ class GreenPool(tests.LimitedTestCase): pool = parallel.GreenPool(2) worker = pool.spawn(some_work) worker.wait() - api.sleep(0) - api.sleep(0) + eventlet.sleep(0) + eventlet.sleep(0) self.assertEquals(timer_fired, []) def test_reentrant(self): @@ -136,8 +139,8 @@ class GreenPool(tests.LimitedTestCase): # clean up by causing all the wait_long_time functions to return evt.send(None) - api.sleep(0) - api.sleep(0) + eventlet.sleep(0) + eventlet.sleep(0) def test_resize(self): pool = parallel.GreenPool(2) @@ -156,8 +159,8 @@ class GreenPool(tests.LimitedTestCase): # cause the wait_long_time functions to return, which will # trigger puts to the pool evt.send(None) - api.sleep(0) - api.sleep(0) + eventlet.sleep(0) + eventlet.sleep(0) self.assertEquals(pool.free(), 1) self.assertEquals(pool.running(), 0) @@ -195,7 +198,7 @@ class GreenPool(tests.LimitedTestCase): # the pool can get some random item back def send_wakeup(tp): tp.put('wakeup') - gt = api.spawn(send_wakeup, tp) + gt = eventlet.spawn(send_wakeup, tp) # now we ask the pool to run something else, which should not # be affected by the previous send at all @@ -218,7 +221,7 @@ class GreenPool(tests.LimitedTestCase): self.assertEqual(p.free(), 1) gt.wait() self.assertEqual(r, [1]) - api.sleep(0) + eventlet.sleep(0) self.assertEqual(p.free(), 2) #Once the pool is exhausted, spawning forces a yield. @@ -232,7 +235,7 @@ class GreenPool(tests.LimitedTestCase): p.spawn_n(foo, 4) self.assertEqual(set(r), set([1,2,3])) - api.sleep(0) + eventlet.sleep(0) self.assertEqual(set(r), set([1,2,3,4])) class GreenPile(tests.LimitedTestCase): @@ -245,6 +248,11 @@ class GreenPile(tests.LimitedTestCase): p = parallel.GreenPile(4) result_iter = p.imap(passthru, []) self.assertRaises(StopIteration, result_iter.next) + + def test_imap_nonefunc(self): + p = parallel.GreenPile(4) + result_list = list(p.imap(None, xrange(10))) + self.assertEquals(result_list, [(x,) for x in xrange(10)]) def test_pile(self): p = parallel.GreenPile(4) @@ -273,12 +281,11 @@ class GreenPile(tests.LimitedTestCase): def bunch_of_work(pile, unique): for i in xrange(10): pile.spawn(passthru, i + unique) - api.spawn(bunch_of_work, pile1, 0) - api.spawn(bunch_of_work, pile2, 100) - api.sleep(0) + eventlet.spawn(bunch_of_work, pile1, 0) + eventlet.spawn(bunch_of_work, pile2, 100) + eventlet.sleep(0) self.assertEquals(list(pile2), list(xrange(100,110))) self.assertEquals(list(pile1), list(xrange(10))) - class StressException(Exception): @@ -287,16 +294,16 @@ class StressException(Exception): r = random.Random(0) def pressure(arg): while r.random() < 0.5: - api.sleep(r.random() * 0.001) + eventlet.sleep(r.random() * 0.001) if r.random() < 0.8: return arg else: raise StressException(arg) -# TODO: skip these unless explicitly demanded by the user class Stress(tests.SilencedTestCase): # tests will take extra-long TEST_TIMEOUT=10 + @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def spawn_memory(self, concurrency): # checks that piles are strictly ordered # and bounded in memory @@ -306,9 +313,9 @@ class Stress(tests.SilencedTestCase): token = (unique, i) p.spawn(pressure, token) - api.spawn(makework, 1000, 1) - api.spawn(makework, 1000, 2) - api.spawn(makework, 1000, 3) + eventlet.spawn(makework, 1000, 1) + eventlet.spawn(makework, 1000, 2) + eventlet.spawn(makework, 1000, 3) p.spawn(pressure, (0,0)) latest = [-1] * 4 received = 0 @@ -330,15 +337,19 @@ class Stress(tests.SilencedTestCase): self.assert_(latest[unique] < order) latest[unique] = order + @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_memory_5(self): self.spawn_memory(5) + @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_memory_50(self): self.spawn_memory(50) + @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_memory_500(self): self.spawn_memory(50) - + + @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') def test_with_intpool(self): from eventlet import pools class IntPool(pools.Pool): @@ -349,7 +360,7 @@ class Stress(tests.SilencedTestCase): def subtest(intpool_size, pool_size, num_executes): def run(int_pool): token = int_pool.get() - api.sleep(0.0001) + eventlet.sleep(0.0001) int_pool.put(token) return token