diff --git a/eventlet/coros.py b/eventlet/coros.py index 03bb15b..995eb7f 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -56,18 +56,18 @@ NOT_USED = NOT_USED() class event(object): """An abstraction where an arbitrary number of coroutines can wait for one event from another. - + Events differ from channels in two ways: 1) calling send() does not unschedule the current coroutine - 2) send() can only be called once; use reset() to prepare the event for + 2) send() can only be called once; use reset() to prepare the event for another send() They are ideal for communicating return values between coroutines. - + >>> from eventlet import coros, api >>> evt = coros.event() >>> def baz(b): ... evt.send(b + 1) - ... + ... >>> _ = api.spawn(baz, 3) >>> evt.wait() 4 @@ -83,7 +83,7 @@ class event(object): def reset(self): """ Reset this event so it can be used to send again. Can only be called after send has been called. - + >>> from eventlet import coros >>> evt = coros.event() >>> evt.send(1) @@ -91,24 +91,24 @@ class event(object): >>> evt.send(2) >>> evt.wait() 2 - + Calling reset multiple times in a row is an error. - + >>> evt.reset() >>> evt.reset() Traceback (most recent call last): ... AssertionError: Trying to re-reset() a fresh event. - + """ assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.' self.epoch = time.time() self._result = NOT_USED self._exc = None self._waiters = {} - + def ready(self): - """ Return true if the wait() call will return immediately. + """ Return true if the wait() call will return immediately. Used to avoid waiting for things that might take a while to time out. For example, you can put a bunch of events into a list, and then visit them all repeatedly, calling ready() until one returns True, and then @@ -119,7 +119,7 @@ class event(object): """Wait until another coroutine calls send. Returns the value the other coroutine passed to send. - + >>> from eventlet import coros, api >>> evt = coros.event() >>> def wait_on(): @@ -132,7 +132,7 @@ class event(object): Returns immediately if the event has already occured. - + >>> evt.wait() 'result' """ @@ -154,9 +154,9 @@ class event(object): from wait. Sends the eventlet.coros.Cancelled exception - waiter: The greenlet (greenlet.getcurrent()) of the + waiter: The greenlet (greenlet.getcurrent()) of the coroutine to cancel - + >>> from eventlet import coros, api >>> evt = coros.event() >>> def wait_on(): @@ -166,23 +166,23 @@ class event(object): ... print "Cancelled" ... >>> waiter = api.spawn(wait_on) - + The cancel call works on coroutines that are in the wait() call. - + >>> api.sleep(0) # enter the wait() >>> evt.cancel(waiter) >>> api.sleep(0) # receive the exception Cancelled - + The cancel is invisible to coroutines that call wait() after cancel() is called. This is different from send()'s behavior, where the result is passed to any waiter regardless of the ordering of the calls. - + >>> waiter = api.spawn(wait_on) >>> api.sleep(0) - + Cancels have no effect on the ability to send() to the event. - + >>> evt.send('stuff') >>> api.sleep(0) received stuff @@ -195,7 +195,7 @@ class event(object): def send(self, result=None, exc=None): """Makes arrangements for the waiters to be woken with the result and then returns immediately to the parent. - + >>> from eventlet import coros, api >>> evt = coros.event() >>> def waiter(): @@ -208,14 +208,14 @@ class event(object): >>> evt.send('a') >>> api.sleep(0) waited for a - + It is an error to call send() multiple times on the same event. - + >>> evt.send('whoops') Traceback (most recent call last): ... AssertionError: Trying to re-send() an already-triggered event. - + Use reset() between send()s to reuse an event object. """ assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.' @@ -464,7 +464,7 @@ class semaphore(object): self.relevent.reset() if self.counter == 1: # If self.counter was 0 before we incremented it, then wake up - # anybody who was waiting + # anybody who was waiting self.acqevent.send() def __exit__(self, typ, val, tb): @@ -525,16 +525,16 @@ class metaphore(object): """Suspend the caller only if our count is nonzero. In that case, resume the caller once the count decrements to zero again. """ - self.event.wait() + self.event.wait() def execute(func, *args, **kw): """ Executes an operation asynchronously in a new coroutine, returning an event to retrieve the return value. - This has the same api as the CoroutinePool.execute method; the only + This has the same api as the CoroutinePool.execute method; the only difference is that this one creates a new coroutine instead of drawing from a pool. - + >>> from eventlet import coros >>> evt = coros.execute(lambda a: ('foo', a), 1) >>> evt.wait() @@ -548,23 +548,23 @@ def execute(func, *args, **kw): class CoroutinePool(pools.Pool): - """ Like a thread pool, but with coroutines. - + """ Like a thread pool, but with coroutines. + Coroutine pools are useful for splitting up tasks or globally controlling - concurrency. You don't retrieve the coroutines directly with get() -- + concurrency. You don't retrieve the coroutines directly with get() -- instead use the execute() and execute_async() methods to run code. - + >>> from eventlet import coros, api >>> p = coros.CoroutinePool(max_size=2) >>> def foo(a): ... print "foo", a - ... + ... >>> evt = p.execute(foo, 1) >>> evt.wait() foo 1 - + Once the pool is exhausted, calling an execute forces a yield. - + >>> p.execute_async(foo, 2) >>> p.execute_async(foo, 3) >>> p.free() @@ -572,11 +572,11 @@ class CoroutinePool(pools.Pool): >>> p.execute_async(foo, 4) foo 2 foo 3 - + >>> api.sleep(0) foo 4 """ - + def __init__(self, min_size=0, max_size=4, track_events=False): self._greenlets = set() if track_events: @@ -618,7 +618,7 @@ class CoroutinePool(pools.Pool): # forever. This is generally only a problem in unit # tests. sender._result = NOT_USED - + sender = event() (evt, func, args, kw) = recvd self._safe_apply(evt, func, args, kw) @@ -647,7 +647,7 @@ class CoroutinePool(pools.Pool): if self._next_event is None: self._tracked_events.append(result) else: - + ne = self._next_event self._next_event = None ne.send(result) @@ -674,7 +674,7 @@ class CoroutinePool(pools.Pool): def _execute(self, evt, func, args, kw): """ Private implementation of the execute methods. """ - # if reentering an empty pool, don't try to wait on a coroutine freeing + # if reentering an empty pool, don't try to wait on a coroutine freeing # itself -- instead, just execute in the current coroutine if self.free() == 0 and api.getcurrent() in self._greenlets: self._safe_apply(evt, func, args, kw) @@ -706,14 +706,14 @@ class CoroutinePool(pools.Pool): super(CoroutinePool, self).put(item) # Track the number of outstanding CoroutinePool coroutines self.requested.dec() - + def execute(self, func, *args, **kw): """Execute func in one of the coroutines maintained by the pool, when one is free. Immediately returns an eventlet.coros.event object which func's result will be sent to when it is available. - + >>> from eventlet import coros >>> p = coros.CoroutinePool() >>> evt = p.execute(lambda a: ('foo', a), 1) @@ -733,7 +733,7 @@ class CoroutinePool(pools.Pool): >>> p = coros.CoroutinePool() >>> def foo(a): ... print "foo", a - ... + ... >>> p.execute_async(foo, 1) >>> api.sleep(0) foo 1 @@ -880,16 +880,16 @@ class CoroutinePool(pools.Pool): >>> pausers = [coros.event() for x in xrange(2)] >>> def longtask(evt, desc): ... print "%s woke up with %s" % (desc, evt.wait()) - ... + ... >>> pool.launch_all(longtask, zip(pausers, "AB")) >>> def quicktask(desc): ... print "returning %s" % desc ... return desc ... - + (Instead of using a for loop, step through generate_results() items individually to illustrate timing) - + >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase)) >>> print step.next() returning a @@ -1097,7 +1097,7 @@ class queue(object): raise exc[0], exc[1], exc[2] raise exc return result - + def ready(self): # could also base this on self.sem.counter... return len(self.items) > 0 @@ -1116,7 +1116,7 @@ class Actor(object): """ def __init__(self, concurrency = 1): """ Constructs an Actor, kicking off a new coroutine to process the messages. - + The concurrency argument specifies how many messages the actor will try to process concurrently. If it is 1, the actor will process messages serially. @@ -1160,18 +1160,18 @@ class Actor(object): The default implementation just raises an exception, so replace it with something useful! - + >>> class Greeter(Actor): ... def received(self, (message, evt) ): ... print "received", message ... if evt: evt.send() ... >>> a = Greeter() - + This example uses events to synchronize between the actor and the main coroutine in a predictable manner, but this kinda defeats the point of the Actor, so don't do it in a real application. - + >>> evt = event() >>> a.cast( ("message 1", evt) ) >>> evt.wait() # force it to run at this exact moment @@ -1182,7 +1182,7 @@ class Actor(object): >>> evt.wait() received message 2 received message 3 - + >>> api.kill(a._killer) # test cleanup """ raise NotImplementedError()