removed trailing space from eventlet/coros.py

This commit is contained in:
Denis Bilenko
2008-12-10 16:08:52 +06:00
parent f67efb944d
commit a40d7cfa52

View File

@@ -56,18 +56,18 @@ NOT_USED = NOT_USED()
class event(object): class event(object):
"""An abstraction where an arbitrary number of coroutines """An abstraction where an arbitrary number of coroutines
can wait for one event from another. can wait for one event from another.
Events differ from channels in two ways: Events differ from channels in two ways:
1) calling send() does not unschedule the current coroutine 1) calling send() does not unschedule the current coroutine
2) send() can only be called once; use reset() to prepare the event for 2) send() can only be called once; use reset() to prepare the event for
another send() another send()
They are ideal for communicating return values between coroutines. They are ideal for communicating return values between coroutines.
>>> from eventlet import coros, api >>> from eventlet import coros, api
>>> evt = coros.event() >>> evt = coros.event()
>>> def baz(b): >>> def baz(b):
... evt.send(b + 1) ... evt.send(b + 1)
... ...
>>> _ = api.spawn(baz, 3) >>> _ = api.spawn(baz, 3)
>>> evt.wait() >>> evt.wait()
4 4
@@ -83,7 +83,7 @@ class event(object):
def reset(self): def reset(self):
""" Reset this event so it can be used to send again. """ Reset this event so it can be used to send again.
Can only be called after send has been called. Can only be called after send has been called.
>>> from eventlet import coros >>> from eventlet import coros
>>> evt = coros.event() >>> evt = coros.event()
>>> evt.send(1) >>> evt.send(1)
@@ -91,24 +91,24 @@ class event(object):
>>> evt.send(2) >>> evt.send(2)
>>> evt.wait() >>> evt.wait()
2 2
Calling reset multiple times in a row is an error. Calling reset multiple times in a row is an error.
>>> evt.reset() >>> evt.reset()
>>> evt.reset() >>> evt.reset()
Traceback (most recent call last): Traceback (most recent call last):
... ...
AssertionError: Trying to re-reset() a fresh event. AssertionError: Trying to re-reset() a fresh event.
""" """
assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.' assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
self.epoch = time.time() self.epoch = time.time()
self._result = NOT_USED self._result = NOT_USED
self._exc = None self._exc = None
self._waiters = {} self._waiters = {}
def ready(self): def ready(self):
""" Return true if the wait() call will return immediately. """ Return true if the wait() call will return immediately.
Used to avoid waiting for things that might take a while to time out. Used to avoid waiting for things that might take a while to time out.
For example, you can put a bunch of events into a list, and then visit For example, you can put a bunch of events into a list, and then visit
them all repeatedly, calling ready() until one returns True, and then them all repeatedly, calling ready() until one returns True, and then
@@ -119,7 +119,7 @@ class event(object):
"""Wait until another coroutine calls send. """Wait until another coroutine calls send.
Returns the value the other coroutine passed to Returns the value the other coroutine passed to
send. send.
>>> from eventlet import coros, api >>> from eventlet import coros, api
>>> evt = coros.event() >>> evt = coros.event()
>>> def wait_on(): >>> def wait_on():
@@ -132,7 +132,7 @@ class event(object):
Returns immediately if the event has already Returns immediately if the event has already
occured. occured.
>>> evt.wait() >>> evt.wait()
'result' 'result'
""" """
@@ -154,9 +154,9 @@ class event(object):
from wait. Sends the eventlet.coros.Cancelled from wait. Sends the eventlet.coros.Cancelled
exception exception
waiter: The greenlet (greenlet.getcurrent()) of the waiter: The greenlet (greenlet.getcurrent()) of the
coroutine to cancel coroutine to cancel
>>> from eventlet import coros, api >>> from eventlet import coros, api
>>> evt = coros.event() >>> evt = coros.event()
>>> def wait_on(): >>> def wait_on():
@@ -166,23 +166,23 @@ class event(object):
... print "Cancelled" ... print "Cancelled"
... ...
>>> waiter = api.spawn(wait_on) >>> waiter = api.spawn(wait_on)
The cancel call works on coroutines that are in the wait() call. The cancel call works on coroutines that are in the wait() call.
>>> api.sleep(0) # enter the wait() >>> api.sleep(0) # enter the wait()
>>> evt.cancel(waiter) >>> evt.cancel(waiter)
>>> api.sleep(0) # receive the exception >>> api.sleep(0) # receive the exception
Cancelled Cancelled
The cancel is invisible to coroutines that call wait() after cancel() The cancel is invisible to coroutines that call wait() after cancel()
is called. This is different from send()'s behavior, where the result is called. This is different from send()'s behavior, where the result
is passed to any waiter regardless of the ordering of the calls. is passed to any waiter regardless of the ordering of the calls.
>>> waiter = api.spawn(wait_on) >>> waiter = api.spawn(wait_on)
>>> api.sleep(0) >>> api.sleep(0)
Cancels have no effect on the ability to send() to the event. Cancels have no effect on the ability to send() to the event.
>>> evt.send('stuff') >>> evt.send('stuff')
>>> api.sleep(0) >>> api.sleep(0)
received stuff received stuff
@@ -195,7 +195,7 @@ class event(object):
def send(self, result=None, exc=None): def send(self, result=None, exc=None):
"""Makes arrangements for the waiters to be woken with the """Makes arrangements for the waiters to be woken with the
result and then returns immediately to the parent. result and then returns immediately to the parent.
>>> from eventlet import coros, api >>> from eventlet import coros, api
>>> evt = coros.event() >>> evt = coros.event()
>>> def waiter(): >>> def waiter():
@@ -208,14 +208,14 @@ class event(object):
>>> evt.send('a') >>> evt.send('a')
>>> api.sleep(0) >>> api.sleep(0)
waited for a waited for a
It is an error to call send() multiple times on the same event. It is an error to call send() multiple times on the same event.
>>> evt.send('whoops') >>> evt.send('whoops')
Traceback (most recent call last): Traceback (most recent call last):
... ...
AssertionError: Trying to re-send() an already-triggered event. AssertionError: Trying to re-send() an already-triggered event.
Use reset() between send()s to reuse an event object. Use reset() between send()s to reuse an event object.
""" """
assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.' assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
@@ -464,7 +464,7 @@ class semaphore(object):
self.relevent.reset() self.relevent.reset()
if self.counter == 1: if self.counter == 1:
# If self.counter was 0 before we incremented it, then wake up # If self.counter was 0 before we incremented it, then wake up
# anybody who was waiting # anybody who was waiting
self.acqevent.send() self.acqevent.send()
def __exit__(self, typ, val, tb): def __exit__(self, typ, val, tb):
@@ -525,16 +525,16 @@ class metaphore(object):
"""Suspend the caller only if our count is nonzero. In that case, """Suspend the caller only if our count is nonzero. In that case,
resume the caller once the count decrements to zero again. resume the caller once the count decrements to zero again.
""" """
self.event.wait() self.event.wait()
def execute(func, *args, **kw): def execute(func, *args, **kw):
""" Executes an operation asynchronously in a new coroutine, returning """ Executes an operation asynchronously in a new coroutine, returning
an event to retrieve the return value. an event to retrieve the return value.
This has the same api as the CoroutinePool.execute method; the only This has the same api as the CoroutinePool.execute method; the only
difference is that this one creates a new coroutine instead of drawing difference is that this one creates a new coroutine instead of drawing
from a pool. from a pool.
>>> from eventlet import coros >>> from eventlet import coros
>>> evt = coros.execute(lambda a: ('foo', a), 1) >>> evt = coros.execute(lambda a: ('foo', a), 1)
>>> evt.wait() >>> evt.wait()
@@ -548,23 +548,23 @@ def execute(func, *args, **kw):
class CoroutinePool(pools.Pool): class CoroutinePool(pools.Pool):
""" Like a thread pool, but with coroutines. """ Like a thread pool, but with coroutines.
Coroutine pools are useful for splitting up tasks or globally controlling Coroutine pools are useful for splitting up tasks or globally controlling
concurrency. You don't retrieve the coroutines directly with get() -- concurrency. You don't retrieve the coroutines directly with get() --
instead use the execute() and execute_async() methods to run code. instead use the execute() and execute_async() methods to run code.
>>> from eventlet import coros, api >>> from eventlet import coros, api
>>> p = coros.CoroutinePool(max_size=2) >>> p = coros.CoroutinePool(max_size=2)
>>> def foo(a): >>> def foo(a):
... print "foo", a ... print "foo", a
... ...
>>> evt = p.execute(foo, 1) >>> evt = p.execute(foo, 1)
>>> evt.wait() >>> evt.wait()
foo 1 foo 1
Once the pool is exhausted, calling an execute forces a yield. Once the pool is exhausted, calling an execute forces a yield.
>>> p.execute_async(foo, 2) >>> p.execute_async(foo, 2)
>>> p.execute_async(foo, 3) >>> p.execute_async(foo, 3)
>>> p.free() >>> p.free()
@@ -572,11 +572,11 @@ class CoroutinePool(pools.Pool):
>>> p.execute_async(foo, 4) >>> p.execute_async(foo, 4)
foo 2 foo 2
foo 3 foo 3
>>> api.sleep(0) >>> api.sleep(0)
foo 4 foo 4
""" """
def __init__(self, min_size=0, max_size=4, track_events=False): def __init__(self, min_size=0, max_size=4, track_events=False):
self._greenlets = set() self._greenlets = set()
if track_events: if track_events:
@@ -618,7 +618,7 @@ class CoroutinePool(pools.Pool):
# forever. This is generally only a problem in unit # forever. This is generally only a problem in unit
# tests. # tests.
sender._result = NOT_USED sender._result = NOT_USED
sender = event() sender = event()
(evt, func, args, kw) = recvd (evt, func, args, kw) = recvd
self._safe_apply(evt, func, args, kw) self._safe_apply(evt, func, args, kw)
@@ -647,7 +647,7 @@ class CoroutinePool(pools.Pool):
if self._next_event is None: if self._next_event is None:
self._tracked_events.append(result) self._tracked_events.append(result)
else: else:
ne = self._next_event ne = self._next_event
self._next_event = None self._next_event = None
ne.send(result) ne.send(result)
@@ -674,7 +674,7 @@ class CoroutinePool(pools.Pool):
def _execute(self, evt, func, args, kw): def _execute(self, evt, func, args, kw):
""" Private implementation of the execute methods. """ Private implementation of the execute methods.
""" """
# if reentering an empty pool, don't try to wait on a coroutine freeing # if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine # itself -- instead, just execute in the current coroutine
if self.free() == 0 and api.getcurrent() in self._greenlets: if self.free() == 0 and api.getcurrent() in self._greenlets:
self._safe_apply(evt, func, args, kw) self._safe_apply(evt, func, args, kw)
@@ -706,14 +706,14 @@ class CoroutinePool(pools.Pool):
super(CoroutinePool, self).put(item) super(CoroutinePool, self).put(item)
# Track the number of outstanding CoroutinePool coroutines # Track the number of outstanding CoroutinePool coroutines
self.requested.dec() self.requested.dec()
def execute(self, func, *args, **kw): def execute(self, func, *args, **kw):
"""Execute func in one of the coroutines maintained """Execute func in one of the coroutines maintained
by the pool, when one is free. by the pool, when one is free.
Immediately returns an eventlet.coros.event object which Immediately returns an eventlet.coros.event object which
func's result will be sent to when it is available. func's result will be sent to when it is available.
>>> from eventlet import coros >>> from eventlet import coros
>>> p = coros.CoroutinePool() >>> p = coros.CoroutinePool()
>>> evt = p.execute(lambda a: ('foo', a), 1) >>> evt = p.execute(lambda a: ('foo', a), 1)
@@ -733,7 +733,7 @@ class CoroutinePool(pools.Pool):
>>> p = coros.CoroutinePool() >>> p = coros.CoroutinePool()
>>> def foo(a): >>> def foo(a):
... print "foo", a ... print "foo", a
... ...
>>> p.execute_async(foo, 1) >>> p.execute_async(foo, 1)
>>> api.sleep(0) >>> api.sleep(0)
foo 1 foo 1
@@ -880,16 +880,16 @@ class CoroutinePool(pools.Pool):
>>> pausers = [coros.event() for x in xrange(2)] >>> pausers = [coros.event() for x in xrange(2)]
>>> def longtask(evt, desc): >>> def longtask(evt, desc):
... print "%s woke up with %s" % (desc, evt.wait()) ... print "%s woke up with %s" % (desc, evt.wait())
... ...
>>> pool.launch_all(longtask, zip(pausers, "AB")) >>> pool.launch_all(longtask, zip(pausers, "AB"))
>>> def quicktask(desc): >>> def quicktask(desc):
... print "returning %s" % desc ... print "returning %s" % desc
... return desc ... return desc
... ...
(Instead of using a for loop, step through generate_results() (Instead of using a for loop, step through generate_results()
items individually to illustrate timing) items individually to illustrate timing)
>>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase)) >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
>>> print step.next() >>> print step.next()
returning a returning a
@@ -1097,7 +1097,7 @@ class queue(object):
raise exc[0], exc[1], exc[2] raise exc[0], exc[1], exc[2]
raise exc raise exc
return result return result
def ready(self): def ready(self):
# could also base this on self.sem.counter... # could also base this on self.sem.counter...
return len(self.items) > 0 return len(self.items) > 0
@@ -1116,7 +1116,7 @@ class Actor(object):
""" """
def __init__(self, concurrency = 1): def __init__(self, concurrency = 1):
""" Constructs an Actor, kicking off a new coroutine to process the messages. """ Constructs an Actor, kicking off a new coroutine to process the messages.
The concurrency argument specifies how many messages the actor will try The concurrency argument specifies how many messages the actor will try
to process concurrently. If it is 1, the actor will process messages to process concurrently. If it is 1, the actor will process messages
serially. serially.
@@ -1160,18 +1160,18 @@ class Actor(object):
The default implementation just raises an exception, so The default implementation just raises an exception, so
replace it with something useful! replace it with something useful!
>>> class Greeter(Actor): >>> class Greeter(Actor):
... def received(self, (message, evt) ): ... def received(self, (message, evt) ):
... print "received", message ... print "received", message
... if evt: evt.send() ... if evt: evt.send()
... ...
>>> a = Greeter() >>> a = Greeter()
This example uses events to synchronize between the actor and the main This example uses events to synchronize between the actor and the main
coroutine in a predictable manner, but this kinda defeats the point of coroutine in a predictable manner, but this kinda defeats the point of
the Actor, so don't do it in a real application. the Actor, so don't do it in a real application.
>>> evt = event() >>> evt = event()
>>> a.cast( ("message 1", evt) ) >>> a.cast( ("message 1", evt) )
>>> evt.wait() # force it to run at this exact moment >>> evt.wait() # force it to run at this exact moment
@@ -1182,7 +1182,7 @@ class Actor(object):
>>> evt.wait() >>> evt.wait()
received message 2 received message 2
received message 3 received message 3
>>> api.kill(a._killer) # test cleanup >>> api.kill(a._killer) # test cleanup
""" """
raise NotImplementedError() raise NotImplementedError()