Cleaned up link implementation in greenthread, added greenthread test module, fixed kill's implementation to resolve the race condition.

This commit is contained in:
Ryan Williams
2010-01-18 01:22:08 -08:00
parent b5a1ef7138
commit b9a0269b0a
5 changed files with 135 additions and 37 deletions

View File

@@ -1,4 +1,5 @@
import itertools
import traceback
from eventlet import coros
from eventlet import event
@@ -69,24 +70,23 @@ class GreenPool(object):
if not self.coroutines_running:
self.no_coros_running = event.Event()
self.coroutines_running.add(gt)
gt.link(self._spawn_done, coro=gt)
gt.link(self._spawn_done)
return gt
def _spawn_n_impl(self, func, args, kwargs, coro=None):
try:
try:
func(*args, **kwargs)
except (KeyboardInterrupt, SystemExit):
except (KeyboardInterrupt, SystemExit, GreenletExit):
raise
except:
# TODO in debug mode print these
pass
traceback.print_exc()
finally:
if coro is None:
return
else:
coro = greenthread.getcurrent()
self._spawn_done(coro=coro)
self._spawn_done(coro)
def spawn_n(self, func, *args, **kwargs):
""" Create a coroutine to run the *function*. Returns None; the results
@@ -108,12 +108,12 @@ class GreenPool(object):
"""Waits until all coroutines in the pool are finished working."""
self.no_coros_running.wait()
def _spawn_done(self, result=None, exc=None, coro=None):
def _spawn_done(self, coro):
self.sem.release()
if coro is not None:
self.coroutines_running.remove(coro)
# if done processing (no more work is waiting for processing),
# send StopIteration so that the queue knows it's done
# we can finish off any waitall() calls that might be pending
if self.sem.balance == self.size:
self.no_coros_running.send(None)

View File

@@ -9,17 +9,6 @@ __all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'call_after_global', 'call
getcurrent = greenlet.getcurrent
def kill(g, *throw_args):
"""Terminates the target greenthread by raising an exception into it.
By default, this exception is GreenletExit, but a specific exception
may be specified in the *throw_args*.
"""
hub = hubs.get_hub()
hub.schedule_call_global(0, g.throw, *throw_args)
if getcurrent() is not hub.greenlet:
sleep(0)
def sleep(seconds=0):
"""Yield control to another eligible coroutine until at least *seconds* have
elapsed.
@@ -189,35 +178,82 @@ def _spawn_n(seconds, func, args, kwargs):
class GreenThread(greenlet.greenlet):
"""The GreenThread class is a type of Greenlet which has the additional
property of having a retrievable result. Do not construct GreenThread
objects directly; call :func:greenthread.spawn to get one.
"""
def __init__(self, parent):
greenlet.greenlet.__init__(self, self.main, parent)
self._exit_event = event.Event()
def wait(self):
""" Returns the result of the main function of this GreenThread. If the
result is a normal return value, wait() returns it. If it raised
an exception, wait() will also raise an exception."""
return self._exit_event.wait()
def link(self, func, *curried_args, **curried_kwargs):
""" Set up a function to be called with the results of the GreenThread.
The function must have the following signature:
def f(result=None, exc=None, [curried args/kwargs]):
The function must have the following signature::
def func(gt, [curried args/kwargs]):
When the GreenThread finishes its run, it calls *func* with itself
and with the arguments supplied at link-time. If the function wants
to retrieve the result of the GreenThread, it should call wait()
on its first argument.
"""
self._exit_funcs = getattr(self, '_exit_funcs', [])
self._exit_funcs.append((func, curried_args, curried_kwargs))
if self._exit_event.ready():
self._resolve_links()
def main(self, function, args, kwargs):
try:
result = function(*args, **kwargs)
except:
self._exit_event.send_exception(*sys.exc_info())
# ca and ckw are the curried function arguments
for f, ca, ckw in getattr(self, '_exit_funcs', []):
f(exc=sys.exc_info(), *ca, **ckw)
self._resolve_links()
raise
else:
self._exit_event.send(result)
for f, ca, ckw in getattr(self, '_exit_funcs', []):
f(result, *ca, **ckw)
self._resolve_links()
def _resolve_links(self):
# ca and ckw are the curried function arguments
for f, ca, ckw in getattr(self, '_exit_funcs', []):
f(self, *ca, **ckw)
self._exit_funcs = [] # so they don't get called again
def kill(self):
return kill(self)
return kill(self)
def kill(g, *throw_args):
"""Terminates the target greenthread by raising an exception into it.
By default, this exception is GreenletExit, but a specific exception
may be specified. *throw_args* should be the same as the arguments to
raise; either an exception instance or an exc_info tuple.
"""
if g.dead:
return
hub = hubs.get_hub()
if not g:
# greenlet hasn't started yet and therefore throw won't work
# on its own; semantically we want it to be as though the main
# method never got called
def just_raise(*a, **kw):
raise throw_args or greenlet.GreenletExit
if hasattr(g, '_exit_event'):
# it's a GreenThread object, so we want to call its main
# method to take advantage of the notification
def raise_main(*a, **kw):
g.main(just_raise, (), {})
g.run = raise_main
else:
# regular greenlet; just want to replace its run method so
# that whatever it was going to run, doesn't
g.run = just_raise
hub.schedule_call_global(0, g.throw, *throw_args)
if getcurrent() is not hub.greenlet:
sleep(0)

View File

@@ -223,7 +223,7 @@ def setup():
_threads[i].start()
_coro = greenthread.spawn_n(tpool_trampoline)
api.sleep(0) # fix a race condition when calling killall immediately
def killall():
global _setup_already, _reqq, _rspq, _rfile, _wfile

View File

@@ -8,15 +8,6 @@ from eventlet import api
from eventlet import hubs, greenpool, coros, event
import tests
class Spawn(tests.LimitedTestCase):
# TODO: move this test elsewhere
def test_simple(self):
def f(a, b=None):
return (a,b)
gt = eventlet.spawn(f, 1, b=2)
self.assertEquals(gt.wait(), (1,2))
def passthru(a):
eventlet.sleep(0.01)
return a

71
tests/greenthread_test.py Normal file
View File

@@ -0,0 +1,71 @@
from tests import LimitedTestCase
from eventlet import greenthread
from eventlet.support import greenlets as greenlet
_g_results = []
def passthru(*args, **kw):
_g_results.append((args, kw))
return args, kw
class Spawn(LimitedTestCase):
def tearDown(self):
global _g_results
super(Spawn, self).tearDown()
_g_results = []
def test_simple(self):
gt = greenthread.spawn(passthru, 1, b=2)
self.assertEquals(gt.wait(), ((1,),{'b':2}))
self.assertEquals(_g_results, [((1,),{'b':2})])
def test_n(self):
gt = greenthread.spawn_n(passthru, 2, b=3)
self.assert_(not gt.dead)
greenthread.sleep(0)
self.assert_(gt.dead)
self.assertEquals(_g_results, [((2,),{'b':3})])
def test_kill(self):
gt = greenthread.spawn(passthru, 6)
greenthread.kill(gt)
self.assertRaises(greenlet.GreenletExit, gt.wait)
greenthread.sleep(0.001)
self.assertEquals(_g_results, [])
greenthread.kill(gt)
def test_kill_meth(self):
gt = greenthread.spawn(passthru, 6)
gt.kill()
self.assertRaises(greenlet.GreenletExit, gt.wait)
greenthread.sleep(0.001)
self.assertEquals(_g_results, [])
gt.kill()
def test_kill_n(self):
gt = greenthread.spawn_n(passthru, 7)
greenthread.kill(gt)
greenthread.sleep(0.001)
self.assertEquals(_g_results, [])
greenthread.kill(gt)
def test_link(self):
results = []
def link_func(g, *a, **kw):
results.append(g)
results.append(a)
results.append(kw)
gt = greenthread.spawn(passthru, 5)
gt.link(link_func, 4, b=5)
self.assertEquals(gt.wait(), ((5,), {}))
self.assertEquals(results, [gt, (4,), {'b':5}])
def test_link_after_exited(self):
results = []
def link_func(g, *a, **kw):
results.append(g)
results.append(a)
results.append(kw)
gt = greenthread.spawn(passthru, 5)
self.assertEquals(gt.wait(), ((5,), {}))
gt.link(link_func, 4, b=5)
self.assertEquals(results, [gt, (4,), {'b':5}])