renamed coros.async_result to coros.AsyncJob; added poll, kill_after, __nonzero__ methods; made it access greenlet via weakref
This commit is contained in:
@@ -26,6 +26,7 @@ import collections
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import weakref
|
||||
|
||||
|
||||
from eventlet import api
|
||||
@@ -236,36 +237,83 @@ class event(object):
|
||||
# the arguments and the same as for greenlet.throw
|
||||
return self.send(None, args)
|
||||
|
||||
class _AsyncJobPollError(Exception):
|
||||
pass
|
||||
|
||||
class async_result(object):
|
||||
class AsyncJob(object):
|
||||
|
||||
def __init__(self, greenlet, event):
|
||||
self.greenlet = greenlet
|
||||
def __init__(self, greenlet_ref, event):
|
||||
self.greenlet_ref = greenlet_ref
|
||||
self.event = event
|
||||
|
||||
def __bool__(self):
|
||||
return not self.greenlet.dead
|
||||
@property
|
||||
def greenlet(self):
|
||||
return self.greenlet_ref()
|
||||
|
||||
def __nonzero__(self):
|
||||
greenlet = self.greenlet_ref()
|
||||
if greenlet is not None and not greenlet.dead:
|
||||
return True
|
||||
return False
|
||||
|
||||
def __repr__(self):
|
||||
klass = type(self).__name__
|
||||
if self.greenlet is not None and self.greenlet.dead:
|
||||
dead = '(dead)'
|
||||
else:
|
||||
dead = ''
|
||||
return '<%s greenlet=%r%s event=%s>' % (klass, self.greenlet, dead, self.event)
|
||||
|
||||
def wait(self):
|
||||
return self.event.wait()
|
||||
|
||||
def kill(self):
|
||||
return api.kill(self.greenlet)
|
||||
def poll(self, marker=None):
|
||||
error = _AsyncJobPollError()
|
||||
timer = api.exc_after(0, error)
|
||||
try:
|
||||
return self.wait()
|
||||
except _AsyncJobPollError, ex:
|
||||
if ex is error:
|
||||
return marker
|
||||
raise
|
||||
finally:
|
||||
timer.cancel()
|
||||
|
||||
def _wrap_result_in_event(event, func, *args, **kwargs):
|
||||
def kill(self):
|
||||
greenlet = self.greenlet_ref()
|
||||
if greenlet is not None:
|
||||
return api.kill(greenlet)
|
||||
|
||||
def kill_after(self, seconds):
|
||||
api.call_after_global(seconds, _kill_by_ref, weakref.ref(self))
|
||||
|
||||
def _kill_by_ref(async_job_ref):
|
||||
async_job = async_job_ref()
|
||||
if async_job is not None:
|
||||
async_job.kill()
|
||||
|
||||
def _wrap_result_in_event(event_ref, func, *args, **kwargs):
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
except api.GreenletExit, ex:
|
||||
event.send(ex)
|
||||
event = event_ref()
|
||||
if event is not None:
|
||||
event.send(ex)
|
||||
except:
|
||||
event.send_exception(*sys.exc_info())
|
||||
event = event_ref()
|
||||
if event is not None:
|
||||
event.send_exception(*sys.exc_info())
|
||||
else:
|
||||
raise # let hub log the exception
|
||||
else:
|
||||
event.send(result)
|
||||
event = event_ref()
|
||||
if event is not None:
|
||||
event.send(result)
|
||||
|
||||
def spawn_link(func, *args, **kwargs):
|
||||
result = event()
|
||||
g = api.spawn(_wrap_result_in_event, result, func, *args, **kwargs)
|
||||
return async_result(g, result)
|
||||
g = api.spawn(_wrap_result_in_event, weakref.ref(result), func, *args, **kwargs)
|
||||
return AsyncJob(weakref.ref(g), result)
|
||||
|
||||
|
||||
class multievent(object):
|
||||
|
@@ -1,7 +1,7 @@
|
||||
import unittest
|
||||
import sys
|
||||
from eventlet.coros import event, spawn_link
|
||||
from eventlet.api import spawn, sleep, GreenletExit
|
||||
from eventlet.api import spawn, sleep, GreenletExit, exc_after
|
||||
|
||||
class TestEvent(unittest.TestCase):
|
||||
|
||||
@@ -41,12 +41,14 @@ class TestSpawnLink(unittest.TestCase):
|
||||
sleep(0.1)
|
||||
return 101
|
||||
res = spawn_link(func)
|
||||
assert res
|
||||
if sync:
|
||||
res.kill()
|
||||
else:
|
||||
spawn(res.kill)
|
||||
wait_result = res.wait()
|
||||
assert isinstance(wait_result, GreenletExit), `wait_result`
|
||||
assert not res, repr(res)
|
||||
assert isinstance(wait_result, GreenletExit), repr(wait_result)
|
||||
|
||||
def test_kill_sync(self):
|
||||
return self._test_kill(True)
|
||||
@@ -54,9 +56,39 @@ class TestSpawnLink(unittest.TestCase):
|
||||
def test_kill_async(self):
|
||||
return self._test_kill(False)
|
||||
|
||||
def test_poll(self):
|
||||
def func():
|
||||
sleep(0.1)
|
||||
return 25
|
||||
job = spawn_link(func)
|
||||
self.assertEqual(job.poll(), None)
|
||||
assert job, repr(job)
|
||||
self.assertEqual(job.wait(), 25)
|
||||
self.assertEqual(job.poll(), 25)
|
||||
assert not job, repr(job)
|
||||
|
||||
job = spawn_link(func)
|
||||
self.assertEqual(job.poll(5), 5)
|
||||
assert job, repr(job)
|
||||
self.assertEqual(job.wait(), 25)
|
||||
self.assertEqual(job.poll(5), 25)
|
||||
assert not job, repr(job)
|
||||
|
||||
def test_kill_after(self):
|
||||
def func():
|
||||
sleep(0.1)
|
||||
return 25
|
||||
job = spawn_link(func)
|
||||
job.kill_after(0.05)
|
||||
result = job.wait()
|
||||
assert isinstance(result, GreenletExit), repr(result)
|
||||
|
||||
job = spawn_link(func)
|
||||
job.kill_after(0.2)
|
||||
self.assertEqual(job.wait(), 25)
|
||||
sleep(0.2)
|
||||
self.assertEqual(job.wait(), 25)
|
||||
|
||||
|
||||
if __name__=='__main__':
|
||||
unittest.main()
|
||||
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user