diff --git a/eventlet/coros.py b/eventlet/coros.py index 98eb31a..03bb15b 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -26,6 +26,7 @@ import collections import sys import time import traceback +import weakref from eventlet import api @@ -236,36 +237,83 @@ class event(object): # the arguments and the same as for greenlet.throw return self.send(None, args) +class _AsyncJobPollError(Exception): + pass -class async_result(object): +class AsyncJob(object): - def __init__(self, greenlet, event): - self.greenlet = greenlet + def __init__(self, greenlet_ref, event): + self.greenlet_ref = greenlet_ref self.event = event - def __bool__(self): - return not self.greenlet.dead + @property + def greenlet(self): + return self.greenlet_ref() + + def __nonzero__(self): + greenlet = self.greenlet_ref() + if greenlet is not None and not greenlet.dead: + return True + return False + + def __repr__(self): + klass = type(self).__name__ + if self.greenlet is not None and self.greenlet.dead: + dead = '(dead)' + else: + dead = '' + return '<%s greenlet=%r%s event=%s>' % (klass, self.greenlet, dead, self.event) def wait(self): return self.event.wait() - def kill(self): - return api.kill(self.greenlet) + def poll(self, marker=None): + error = _AsyncJobPollError() + timer = api.exc_after(0, error) + try: + return self.wait() + except _AsyncJobPollError, ex: + if ex is error: + return marker + raise + finally: + timer.cancel() -def _wrap_result_in_event(event, func, *args, **kwargs): + def kill(self): + greenlet = self.greenlet_ref() + if greenlet is not None: + return api.kill(greenlet) + + def kill_after(self, seconds): + api.call_after_global(seconds, _kill_by_ref, weakref.ref(self)) + +def _kill_by_ref(async_job_ref): + async_job = async_job_ref() + if async_job is not None: + async_job.kill() + +def _wrap_result_in_event(event_ref, func, *args, **kwargs): try: result = func(*args, **kwargs) except api.GreenletExit, ex: - event.send(ex) + event = event_ref() + if event is not None: + event.send(ex) except: - event.send_exception(*sys.exc_info()) + event = event_ref() + if event is not None: + event.send_exception(*sys.exc_info()) + else: + raise # let hub log the exception else: - event.send(result) + event = event_ref() + if event is not None: + event.send(result) def spawn_link(func, *args, **kwargs): result = event() - g = api.spawn(_wrap_result_in_event, result, func, *args, **kwargs) - return async_result(g, result) + g = api.spawn(_wrap_result_in_event, weakref.ref(result), func, *args, **kwargs) + return AsyncJob(weakref.ref(g), result) class multievent(object): diff --git a/greentest/test__event.py b/greentest/test__event.py index f05274d..c1f5eac 100644 --- a/greentest/test__event.py +++ b/greentest/test__event.py @@ -1,7 +1,7 @@ import unittest import sys from eventlet.coros import event, spawn_link -from eventlet.api import spawn, sleep, GreenletExit +from eventlet.api import spawn, sleep, GreenletExit, exc_after class TestEvent(unittest.TestCase): @@ -41,12 +41,14 @@ class TestSpawnLink(unittest.TestCase): sleep(0.1) return 101 res = spawn_link(func) + assert res if sync: res.kill() else: spawn(res.kill) wait_result = res.wait() - assert isinstance(wait_result, GreenletExit), `wait_result` + assert not res, repr(res) + assert isinstance(wait_result, GreenletExit), repr(wait_result) def test_kill_sync(self): return self._test_kill(True) @@ -54,9 +56,39 @@ class TestSpawnLink(unittest.TestCase): def test_kill_async(self): return self._test_kill(False) + def test_poll(self): + def func(): + sleep(0.1) + return 25 + job = spawn_link(func) + self.assertEqual(job.poll(), None) + assert job, repr(job) + self.assertEqual(job.wait(), 25) + self.assertEqual(job.poll(), 25) + assert not job, repr(job) + + job = spawn_link(func) + self.assertEqual(job.poll(5), 5) + assert job, repr(job) + self.assertEqual(job.wait(), 25) + self.assertEqual(job.poll(5), 25) + assert not job, repr(job) + + def test_kill_after(self): + def func(): + sleep(0.1) + return 25 + job = spawn_link(func) + job.kill_after(0.05) + result = job.wait() + assert isinstance(result, GreenletExit), repr(result) + + job = spawn_link(func) + job.kill_after(0.2) + self.assertEqual(job.wait(), 25) + sleep(0.2) + self.assertEqual(job.wait(), 25) + if __name__=='__main__': unittest.main() - - -