Moved Event to its own module. More cycle-breaking!

This commit is contained in:
Ryan Williams
2010-01-17 21:28:48 -08:00
parent 674e216aea
commit 7349ead03d
11 changed files with 231 additions and 223 deletions

View File

@@ -4,6 +4,7 @@ import traceback
import warnings
from eventlet import api
from eventlet import event as _event
from eventlet import hubs
from eventlet import greenthread
from eventlet import semaphore as semaphoremod
@@ -15,17 +16,17 @@ class NOT_USED:
NOT_USED = NOT_USED()
def Event(*a, **kw):
warnings.warn("The Event class has been moved to the greenthread module! "
"Please construct greenthread.Event objects instead.",
warnings.warn("The Event class has been moved to the event module! "
"Please construct event.Event objects instead.",
DeprecationWarning, stacklevel=2)
return greenthread.Event(*a, **kw)
return _event.Event(*a, **kw)
def event(*a, **kw):
warnings.warn("The event class has been capitalized and moved! Please "
"construct greenthread.Event objects instead.",
"construct event.Event objects instead.",
DeprecationWarning, stacklevel=2)
return greenthread.Event(*a, **kw)
return _event.Event(*a, **kw)
def Semaphore(count):
@@ -70,7 +71,7 @@ class metaphore(object):
"""
def __init__(self):
self.counter = 0
self.event = greenthread.Event()
self.event = _event.Event()
# send() right away, else we'd wait on the default 0 count!
self.event.send()
@@ -317,7 +318,7 @@ class Actor(object):
serially.
"""
self._mailbox = collections.deque()
self._event = greenthread.Event()
self._event = _event.Event()
self._killer = api.spawn(self.run_forever)
self._pool = CoroutinePool(min_size=0, max_size=concurrency)
@@ -326,7 +327,7 @@ class Actor(object):
while True:
if not self._mailbox:
self._event.wait()
self._event = greenthread.Event()
self._event = _event.Event()
else:
# leave the message in the mailbox until after it's
# been processed so the event doesn't get triggered
@@ -367,7 +368,7 @@ class Actor(object):
coroutine in a predictable manner, but this kinda defeats the point of
the :class:`Actor`, so don't do it in a real application.
>>> evt = greenthread.Event()
>>> evt = event.Event()
>>> a.cast( ("message 1", evt) )
>>> evt.wait() # force it to run at this exact moment
received message 1

179
eventlet/event.py Normal file
View File

@@ -0,0 +1,179 @@
from eventlet import hubs
from eventlet.support import greenlets as greenlet
__all__ = ['Event']
class NOT_USED:
def __repr__(self):
return 'NOT_USED'
NOT_USED = NOT_USED()
class Event(object):
"""An abstraction where an arbitrary number of coroutines
can wait for one event from another.
Events differ from channels in two ways:
1. calling :meth:`send` does not unschedule the current coroutine
2. :meth:`send` can only be called once; use :meth:`reset` to prepare the
event for another :meth:`send`
They are ideal for communicating return values between coroutines.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def baz(b):
... evt.send(b + 1)
...
>>> _ = api.spawn(baz, 3)
>>> evt.wait()
4
"""
_result = None
def __init__(self):
self._waiters = set()
self.reset()
def __str__(self):
params = (self.__class__.__name__, hex(id(self)), self._result, self._exc, len(self._waiters))
return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params
def reset(self):
""" Reset this event so it can be used to send again.
Can only be called after :meth:`send` has been called.
>>> from eventlet import coros
>>> evt = coros.Event()
>>> evt.send(1)
>>> evt.reset()
>>> evt.send(2)
>>> evt.wait()
2
Calling reset multiple times in a row is an error.
>>> evt.reset()
>>> evt.reset()
Traceback (most recent call last):
...
AssertionError: Trying to re-reset() a fresh event.
"""
assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
self._result = NOT_USED
self._exc = None
def ready(self):
""" Return true if the :meth:`wait` call will return immediately.
Used to avoid waiting for things that might take a while to time out.
For example, you can put a bunch of events into a list, and then visit
them all repeatedly, calling :meth:`ready` until one returns ``True``,
and then you can :meth:`wait` on that one."""
return self._result is not NOT_USED
def has_exception(self):
return self._exc is not None
def has_result(self):
return self._result is not NOT_USED and self._exc is None
def poll(self, notready=None):
if self.ready():
return self.wait()
return notready
# QQQ make it return tuple (type, value, tb) instead of raising
# because
# 1) "poll" does not imply raising
# 2) it's better not to screw up caller's sys.exc_info() by default
# (e.g. if caller wants to calls the function in except or finally)
def poll_exception(self, notready=None):
if self.has_exception():
return self.wait()
return notready
def poll_result(self, notready=None):
if self.has_result():
return self.wait()
return notready
def wait(self):
"""Wait until another coroutine calls :meth:`send`.
Returns the value the other coroutine passed to
:meth:`send`.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def wait_on():
... retval = evt.wait()
... print "waited for", retval
>>> _ = api.spawn(wait_on)
>>> evt.send('result')
>>> api.sleep(0)
waited for result
Returns immediately if the event has already
occured.
>>> evt.wait()
'result'
"""
current = greenlet.getcurrent()
if self._result is NOT_USED:
self._waiters.add(current)
try:
return hubs.get_hub().switch()
finally:
self._waiters.discard(current)
if self._exc is not None:
current.throw(*self._exc)
return self._result
def send(self, result=None, exc=None):
"""Makes arrangements for the waiters to be woken with the
result and then returns immediately to the parent.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def waiter():
... print 'about to wait'
... result = evt.wait()
... print 'waited for', result
>>> _ = api.spawn(waiter)
>>> api.sleep(0)
about to wait
>>> evt.send('a')
>>> api.sleep(0)
waited for a
It is an error to call :meth:`send` multiple times on the same event.
>>> evt.send('whoops')
Traceback (most recent call last):
...
AssertionError: Trying to re-send() an already-triggered event.
Use :meth:`reset` between :meth:`send` s to reuse an event object.
"""
assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
self._result = result
if exc is not None and not isinstance(exc, tuple):
exc = (exc, )
self._exc = exc
hub = hubs.get_hub()
if self._waiters:
hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy())
def _do_send(self, result, exc, waiters):
while waiters:
waiter = waiters.pop()
if waiter in self._waiters:
if exc is None:
waiter.switch(result)
else:
waiter.throw(*exc)
def send_exception(self, *args):
# the arguments and the same as for greenlet.throw
return self.send(None, args)

View File

@@ -1,6 +1,7 @@
import itertools
from eventlet import coros
from eventlet import event
from eventlet import greenthread
from eventlet import semaphore
@@ -22,7 +23,7 @@ class GreenPool(object):
self.size = size
self.coroutines_running = set()
self.sem = semaphore.Semaphore(size)
self.no_coros_running = greenthread.Event()
self.no_coros_running = event.Event()
def resize(self, new_size):
""" Change the max number of coroutines doing work at any given time.
@@ -66,7 +67,7 @@ class GreenPool(object):
self.sem.acquire()
gt = greenthread.spawn(function, *args, **kwargs)
if not self.coroutines_running:
self.no_coros_running = greenthread.Event()
self.no_coros_running = event.Event()
self.coroutines_running.add(gt)
gt.link(self._spawn_done, coro=gt)
return gt
@@ -100,7 +101,7 @@ class GreenPool(object):
self.sem.acquire()
g = greenthread.spawn_n(self._spawn_n_impl, func, args, kwargs, coro=True)
if not self.coroutines_running:
self.no_coros_running = greenthread.Event()
self.no_coros_running = event.Event()
self.coroutines_running.add(g)
def waitall(self):

View File

@@ -1,10 +1,11 @@
import sys
from eventlet import event
from eventlet import hubs
from eventlet import timer
from eventlet.support import greenlets as greenlet
__all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'call_after_global', 'call_after_local', 'GreenThread', 'Event']
__all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'call_after_global', 'call_after_local', 'GreenThread']
getcurrent = greenlet.getcurrent
@@ -190,7 +191,7 @@ def _spawn_n(seconds, func, args, kwargs):
class GreenThread(greenlet.greenlet):
def __init__(self, parent):
greenlet.greenlet.__init__(self, self.main, parent)
self._exit_event = Event()
self._exit_event = event.Event()
def wait(self):
return self._exit_event.wait()
@@ -219,182 +220,4 @@ class GreenThread(greenlet.greenlet):
f(result, *ca, **ckw)
def kill(self):
return kill(self)
class NOT_USED:
def __repr__(self):
return 'NOT_USED'
NOT_USED = NOT_USED()
class Event(object):
"""An abstraction where an arbitrary number of coroutines
can wait for one event from another.
Events differ from channels in two ways:
1. calling :meth:`send` does not unschedule the current coroutine
2. :meth:`send` can only be called once; use :meth:`reset` to prepare the
event for another :meth:`send`
They are ideal for communicating return values between coroutines.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def baz(b):
... evt.send(b + 1)
...
>>> _ = api.spawn(baz, 3)
>>> evt.wait()
4
"""
_result = None
def __init__(self):
self._waiters = set()
self.reset()
def __str__(self):
params = (self.__class__.__name__, hex(id(self)), self._result, self._exc, len(self._waiters))
return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params
def reset(self):
""" Reset this event so it can be used to send again.
Can only be called after :meth:`send` has been called.
>>> from eventlet import coros
>>> evt = coros.Event()
>>> evt.send(1)
>>> evt.reset()
>>> evt.send(2)
>>> evt.wait()
2
Calling reset multiple times in a row is an error.
>>> evt.reset()
>>> evt.reset()
Traceback (most recent call last):
...
AssertionError: Trying to re-reset() a fresh event.
"""
assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
self._result = NOT_USED
self._exc = None
def ready(self):
""" Return true if the :meth:`wait` call will return immediately.
Used to avoid waiting for things that might take a while to time out.
For example, you can put a bunch of events into a list, and then visit
them all repeatedly, calling :meth:`ready` until one returns ``True``,
and then you can :meth:`wait` on that one."""
return self._result is not NOT_USED
def has_exception(self):
return self._exc is not None
def has_result(self):
return self._result is not NOT_USED and self._exc is None
def poll(self, notready=None):
if self.ready():
return self.wait()
return notready
# QQQ make it return tuple (type, value, tb) instead of raising
# because
# 1) "poll" does not imply raising
# 2) it's better not to screw up caller's sys.exc_info() by default
# (e.g. if caller wants to calls the function in except or finally)
def poll_exception(self, notready=None):
if self.has_exception():
return self.wait()
return notready
def poll_result(self, notready=None):
if self.has_result():
return self.wait()
return notready
def wait(self):
"""Wait until another coroutine calls :meth:`send`.
Returns the value the other coroutine passed to
:meth:`send`.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def wait_on():
... retval = evt.wait()
... print "waited for", retval
>>> _ = api.spawn(wait_on)
>>> evt.send('result')
>>> api.sleep(0)
waited for result
Returns immediately if the event has already
occured.
>>> evt.wait()
'result'
"""
current = getcurrent()
if self._result is NOT_USED:
self._waiters.add(current)
try:
return hubs.get_hub().switch()
finally:
self._waiters.discard(current)
if self._exc is not None:
current.throw(*self._exc)
return self._result
def send(self, result=None, exc=None):
"""Makes arrangements for the waiters to be woken with the
result and then returns immediately to the parent.
>>> from eventlet import coros, api
>>> evt = coros.Event()
>>> def waiter():
... print 'about to wait'
... result = evt.wait()
... print 'waited for', result
>>> _ = api.spawn(waiter)
>>> api.sleep(0)
about to wait
>>> evt.send('a')
>>> api.sleep(0)
waited for a
It is an error to call :meth:`send` multiple times on the same event.
>>> evt.send('whoops')
Traceback (most recent call last):
...
AssertionError: Trying to re-send() an already-triggered event.
Use :meth:`reset` between :meth:`send` s to reuse an event object.
"""
assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
self._result = result
if exc is not None and not isinstance(exc, tuple):
exc = (exc, )
self._exc = exc
hub = hubs.get_hub()
if self._waiters:
hub.schedule_call_global(0, self._do_send, self._result, self._exc, self._waiters.copy())
def _do_send(self, result, exc, waiters):
while waiters:
waiter = waiters.pop()
if waiter in self._waiters:
if exc is None:
waiter.switch(result)
else:
waiter.throw(*exc)
def send_exception(self, *args):
# the arguments and the same as for greenlet.throw
return self.send(None, args)
return kill(self)

View File

@@ -18,7 +18,8 @@ from Queue import Full, Empty
_NONE = object()
from eventlet.hubs import get_hub
from eventlet.greenthread import getcurrent, exc_after, Event
from eventlet.greenthread import getcurrent, exc_after
from eventlet.event import Event
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue']

View File

@@ -20,6 +20,7 @@ import sys
from Queue import Empty, Queue
from eventlet import api
from eventlet import event
from eventlet import greenio
from eventlet import greenthread
@@ -54,7 +55,7 @@ def tpool_trampoline():
def esend(meth,*args, **kwargs):
global _reqq, _rspq
e = greenthread.Event()
e = event.Event()
_reqq.put((e,meth,args,kwargs))
return e

View File

@@ -9,7 +9,7 @@ from twisted.python import failure
from eventlet import proc
from eventlet.api import getcurrent
from eventlet.coros import Queue
from eventlet.greenthread import Event
from eventlet.event import Event
class ValueQueue(Queue):

View File

@@ -1,7 +1,9 @@
from unittest import main, TestCase
from tests import LimitedTestCase
import eventlet
from eventlet import greenthread, api, coros
from eventlet import coros
from eventlet import event
from eventlet import greenthread
class IncrActor(coros.Actor):
def received(self, evt):
@@ -17,10 +19,10 @@ class TestActor(LimitedTestCase):
def tearDown(self):
super(TestActor, self).tearDown()
api.kill(self.actor._killer)
greenthread.kill(self.actor._killer)
def test_cast(self):
evt = greenthread.Event()
evt = event.Event()
self.actor.cast(evt)
evt.wait()
evt.reset()
@@ -31,8 +33,8 @@ class TestActor(LimitedTestCase):
def test_cast_multi_1(self):
# make sure that both messages make it in there
evt = greenthread.Event()
evt1 = greenthread.Event()
evt = event.Event()
evt1 = event.Event()
self.actor.cast(evt)
self.actor.cast(evt1)
evt.wait()
@@ -56,17 +58,17 @@ class TestActor(LimitedTestCase):
evt.send()
self.actor.received = received
waiters.append(greenthread.Event())
waiters.append(event.Event())
self.actor.cast( (1, waiters[-1]))
eventlet.sleep(0)
waiters.append(greenthread.Event())
waiters.append(event.Event())
self.actor.cast( (2, waiters[-1]) )
waiters.append(greenthread.Event())
waiters.append(event.Event())
self.actor.cast( (3, waiters[-1]) )
eventlet.sleep(0)
waiters.append(greenthread.Event())
waiters.append(event.Event())
self.actor.cast( (4, waiters[-1]) )
waiters.append(greenthread.Event())
waiters.append(event.Event())
self.actor.cast( (5, waiters[-1]) )
for evt in waiters:
evt.wait()
@@ -83,7 +85,7 @@ class TestActor(LimitedTestCase):
self.actor.received = received
evt = greenthread.Event()
evt = event.Event()
self.actor.cast( ('fail', evt) )
evt.wait()
evt.reset()
@@ -103,8 +105,8 @@ class TestActor(LimitedTestCase):
def onemoment():
eventlet.sleep(0.1)
evt = greenthread.Event()
evt1 = greenthread.Event()
evt = event.Event()
evt1 = event.Event()
self.actor.cast( (onemoment, evt, 1) )
self.actor.cast( (lambda: None, evt1, 2) )

View File

@@ -1,10 +1,10 @@
import eventlet
from eventlet import greenthread
from eventlet import event
from tests import LimitedTestCase
class TestEvent(LimitedTestCase):
def test_waiting_for_event(self):
evt = greenthread.Event()
evt = event.Event()
value = 'some stuff'
def send_to_event():
evt.send(value)
@@ -12,7 +12,7 @@ class TestEvent(LimitedTestCase):
self.assertEqual(evt.wait(), value)
def test_multiple_waiters(self):
evt = greenthread.Event()
evt = event.Event()
value = 'some stuff'
results = []
def wait_on_event(i_am_done):
@@ -23,7 +23,7 @@ class TestEvent(LimitedTestCase):
waiters = []
count = 5
for i in range(count):
waiters.append(greenthread.Event())
waiters.append(event.Event())
eventlet.spawn_n(wait_on_event, waiters[-1])
evt.send()
@@ -33,7 +33,7 @@ class TestEvent(LimitedTestCase):
self.assertEqual(len(results), count)
def test_reset(self):
evt = greenthread.Event()
evt = event.Event()
# calling reset before send should throw
self.assertRaises(AssertionError, evt.reset)
@@ -58,7 +58,7 @@ class TestEvent(LimitedTestCase):
self.assertEqual(evt.wait(), value2)
def test_double_exception(self):
evt = greenthread.Event()
evt = event.Event()
# send an exception through the event
evt.send(exc=RuntimeError('from test_double_exception'))
self.assertRaises(RuntimeError, evt.wait)

View File

@@ -5,7 +5,7 @@ import random
import eventlet
from eventlet import api
from eventlet import hubs, greenpool, coros, greenthread
from eventlet import hubs, greenpool, coros, event
import tests
class Spawn(tests.LimitedTestCase):
@@ -47,7 +47,7 @@ class GreenPool(tests.LimitedTestCase):
def test_waiting(self):
pool = greenpool.GreenPool(1)
done = greenthread.Event()
done = event.Event()
def consume():
done.wait()
def waiter(pool):
@@ -73,7 +73,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(pool.running(), 0)
def test_multiple_coros(self):
evt = greenthread.Event()
evt = event.Event()
results = []
def producer():
results.append('prod')
@@ -113,7 +113,7 @@ class GreenPool(tests.LimitedTestCase):
outer_waiter = pool.spawn(reenter)
outer_waiter.wait()
evt = greenthread.Event()
evt = event.Event()
def reenter_async():
pool.spawn_n(lambda a: a, 'reenter')
evt.send('done')
@@ -126,7 +126,7 @@ class GreenPool(tests.LimitedTestCase):
e.wait()
timer = api.exc_after(1, api.TimeoutError)
try:
evt = greenthread.Event()
evt = event.Event()
for x in xrange(num_free):
pool.spawn(wait_long_time, evt)
# if the pool has fewer free than we expect,
@@ -149,7 +149,7 @@ class GreenPool(tests.LimitedTestCase):
def test_resize(self):
pool = greenpool.GreenPool(2)
evt = greenthread.Event()
evt = event.Event()
def wait_long_time(e):
e.wait()
pool.spawn(wait_long_time, evt)

View File

@@ -1,6 +1,6 @@
from tests import LimitedTestCase, main
import eventlet
from eventlet import greenthread
from eventlet import event
def do_bail(q):
eventlet.exc_after(0, RuntimeError())
@@ -59,7 +59,7 @@ class TestQueue(LimitedTestCase):
x = q.get()
return x
evt = greenthread.Event()
evt = event.Event()
gt = eventlet.spawn(sender, evt, q)
eventlet.sleep(0)
self.assert_(not evt.ready())