diff --git a/eventlet/proc.py b/eventlet/proc.py index 20c064b..f995ac0 100644 --- a/eventlet/proc.py +++ b/eventlet/proc.py @@ -425,11 +425,7 @@ class Source(object): EXC = True try: try: - # QQQ for successive send() calls, event can be notified more than once here - # QQQ forbid to call send()/send_exception() more than once - # QQQ use smart_switch() (i.e. switch that can be cancelled in finally) here, - # it won't have this problem - event = coros.event() + event = Waiter() self.link(event) try: return event.wait() @@ -444,6 +440,41 @@ class Source(object): timer.__exit__(None, None, None) +class Waiter(object): + + def __init__(self): + self.greenlet = None + + def send(self, value): + """Make greenlet calling wait() wake up (if there is a wait()). + Can only be called from get_hub().greenlet. + """ + assert api.getcurrent() is api.get_hub().greenlet + if self.greenlet is not None: + self.greenlet.switch(value) + + def send_exception(self, *throw_args): + """Make greenlet calling wait() wake up (if there is a wait()). + Can only be called from get_hub().greenlet. + """ + assert api.getcurrent() is api.get_hub().greenlet + if self.greenlet is not None: + self.greenlet.throw(*throw_args) + + def wait(self): + """Wait until send or send_exception is called. Return value passed + into send() or raise exception passed into send_exception(). + """ + assert self.greenlet is None + current = api.getcurrent() + assert current is not api.get_hub().greenlet + self.greenlet = current + try: + return api.get_hub().switch() + finally: + self.greenlet = None + + class Proc(Source): """A linkable coroutine based on Source. Upon completion, delivers coroutine's result to the listeners.