added spawn_link function (with tests)

This commit is contained in:
Denis Bilenko
2008-11-24 20:59:34 +06:00
parent 1b7cb386a4
commit 0c91a6b1bc
2 changed files with 67 additions and 3 deletions

View File

@@ -236,6 +236,35 @@ class event(object):
# the arguments and the same as for greenlet.throw
return self.send(None, args)
class async_result(object):
def __init__(self, greenlet, event):
self.greenlet = greenlet
self.event = event
def wait(self):
return self.event.wait()
def kill(self):
return api.kill(self.greenlet)
def _wrap_result_in_event(event, func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except api.GreenletExit, ex:
event.send(ex)
except:
event.send_exception(*sys.exc_info())
else:
event.send(result)
def spawn_link(func, *args, **kwargs):
result = event()
g = api.spawn(_wrap_result_in_event, result, func, *args, **kwargs)
return async_result(g, result)
class semaphore(object):
"""Classic semaphore implemented with a counter and an event.
Optionally initialize with a resource count, then acquire() and release()

View File

@@ -1,8 +1,9 @@
import unittest
from eventlet.coros import event
from eventlet.api import spawn, sleep
import sys
from eventlet.coros import event, spawn_link
from eventlet.api import spawn, sleep, GreenletExit
class Test(unittest.TestCase):
class TestEvent(unittest.TestCase):
def test_send_exc(self):
log = []
@@ -20,6 +21,40 @@ class Test(unittest.TestCase):
sleep(0)
assert log == [('catched', 'Exception')], log
class TestSpawnLink(unittest.TestCase):
def test_simple_return(self):
res = spawn_link(lambda: 25).wait()
assert res==25, res
def test_exception(self):
try:
spawn_link(sys.exit, 'bye').wait()
except SystemExit, ex:
assert ex.args == ('bye', )
else:
assert False, "Shouldn't get there"
def _test_kill(self, sync):
def func():
sleep(0.1)
return 101
res = spawn_link(func)
if sync:
res.kill()
else:
spawn(res.kill)
wait_result = res.wait()
assert isinstance(wait_result, GreenletExit), `wait_result`
def test_kill_sync(self):
return self._test_kill(True)
def test_kill_async(self):
return self._test_kill(False)
if __name__=='__main__':
unittest.main()