From bd2444f8bffbca5e2c1e639a2c50e03604755e89 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Wed, 10 Dec 2008 18:00:27 +0600 Subject: [PATCH] Removed spawn_link and AsyncJob; added Job class that does the same --- eventlet/coros.py | 42 ++++++++++++++++++++++++--------- examples/connect.py | 8 +++---- examples/twisted_portforward.py | 6 ++--- greentest/test__event.py | 18 +++++++------- 4 files changed, 47 insertions(+), 27 deletions(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index 2257015..35a6f6c 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -238,11 +238,23 @@ class event(object): return self.send(None, args) -class AsyncJob(object): +class Job(object): + """Spawn a new coroutine to execute a function and collect its result. - def __init__(self, greenlet_ref, event): - self.greenlet_ref = greenlet_ref - self.event = event + use wait() method to collect the result of the function; + use kill() method to kill the greenlet running the function. + """ + + def __init__(self, function, *args, **kwargs): + """Create a new coroutine, or cooperative thread of control, within which + to execute *function*. + + ``Job()`` returns control to the caller immediately, and *function* + will be called in a future main loop iteration. + """ + self.event = event() + g = api.spawn(wrap_result_in_event, weakref.ref(self.event), function, *args, **kwargs) + self.greenlet_ref = weakref.ref(g) @property def greenlet(self): @@ -263,6 +275,13 @@ class AsyncJob(object): return '<%s greenlet=%r%s event=%s>' % (klass, self.greenlet, dead, self.event) def wait(self): + """Wait for the function to return or raise. + Return the return value of the function if it has returned one, + re-raise the exception that was raised by the function otherwise. + + If the greenlet was killed(), e.g. with kill() method, GreenletExit() + object is returned. + """ return self.event.wait() def poll(self, notready=None): @@ -287,9 +306,15 @@ def _kill_by_ref(async_job_ref): if async_job is not None: async_job.kill() -def _wrap_result_in_event(event_ref, func, *args, **kwargs): +def wrap_result_in_event(event_ref, function, *args, **kwargs): + """Execute *function*, send its result to event_ref(). + If function raises GreenletExit() it's trapped and sent as a regular value. + If event_ref() is not available (event was GC-ed) the return value + is thrown away and the exception is re-raised (allowing hub to log + the exception) + """ try: - result = func(*args, **kwargs) + result = function(*args, **kwargs) except api.GreenletExit, ex: event = event_ref() if event is not None: @@ -305,11 +330,6 @@ def _wrap_result_in_event(event_ref, func, *args, **kwargs): if event is not None: event.send(result) -def spawn_link(func, *args, **kwargs): - result = event() - g = api.spawn(_wrap_result_in_event, weakref.ref(result), func, *args, **kwargs) - return AsyncJob(weakref.ref(g), result) - class multievent(object): """is an event that can hold more than one value (it cannot be cancelled though) diff --git a/examples/connect.py b/examples/connect.py index d38ee9f..045bc86 100644 --- a/examples/connect.py +++ b/examples/connect.py @@ -1,12 +1,12 @@ """Spawn multiple greenlet-workers and collect their results. -Demonstrates how to use spawn_link. +Demonstrates how to use coros.Job. """ import sys import string from eventlet.api import sleep from eventlet.green import socket -from eventlet.coros import spawn_link +from eventlet.coros import Job # this example works with both standard eventlet hubs and with twisted-based hub # comment out the following line to use standard eventlet hub @@ -24,10 +24,10 @@ def progress_indicator(): sys.stderr.write('.') sleep(0.5) -spawn_link(progress_indicator) +Job(progress_indicator) urls = ['www.%s.com' % (x*3) for x in string.letters] -jobs = [spawn_link(geturl, x) for x in urls] +jobs = [Job(geturl, x) for x in urls] print 'spawned %s jobs' % len(jobs) diff --git a/examples/twisted_portforward.py b/examples/twisted_portforward.py index 95d22e5..b8efdd1 100644 --- a/examples/twisted_portforward.py +++ b/examples/twisted_portforward.py @@ -1,6 +1,6 @@ import sys from twisted.internet import reactor -from eventlet.coros import event, spawn_link +from eventlet.coros import event, Job from eventlet.twistedutil import join_reactor from eventlet.twistedutil.protocol import GreenClientCreator, SpawnFactory, UnbufferedTransport @@ -18,8 +18,8 @@ def forward(from_, to): def handler(local): remote = GreenClientCreator(reactor, UnbufferedTransport).connectTCP(remote_host, remote_port) error = event() - a = spawn_link(forward, remote, local) - b = spawn_link(forward, local, remote) + a = Job(forward, remote, local) + b = Job(forward, local, remote) a.wait() b.wait() diff --git a/greentest/test__event.py b/greentest/test__event.py index c1f5eac..6f9f500 100644 --- a/greentest/test__event.py +++ b/greentest/test__event.py @@ -1,6 +1,6 @@ import unittest import sys -from eventlet.coros import event, spawn_link +from eventlet.coros import event, Job from eventlet.api import spawn, sleep, GreenletExit, exc_after class TestEvent(unittest.TestCase): @@ -22,15 +22,15 @@ class TestEvent(unittest.TestCase): assert log == [('catched', 'Exception')], log -class TestSpawnLink(unittest.TestCase): +class TestJob(unittest.TestCase): def test_simple_return(self): - res = spawn_link(lambda: 25).wait() + res = Job(lambda: 25).wait() assert res==25, res def test_exception(self): try: - spawn_link(sys.exit, 'bye').wait() + Job(sys.exit, 'bye').wait() except SystemExit, ex: assert ex.args == ('bye', ) else: @@ -40,7 +40,7 @@ class TestSpawnLink(unittest.TestCase): def func(): sleep(0.1) return 101 - res = spawn_link(func) + res = Job(func) assert res if sync: res.kill() @@ -60,14 +60,14 @@ class TestSpawnLink(unittest.TestCase): def func(): sleep(0.1) return 25 - job = spawn_link(func) + job = Job(func) self.assertEqual(job.poll(), None) assert job, repr(job) self.assertEqual(job.wait(), 25) self.assertEqual(job.poll(), 25) assert not job, repr(job) - job = spawn_link(func) + job = Job(func) self.assertEqual(job.poll(5), 5) assert job, repr(job) self.assertEqual(job.wait(), 25) @@ -78,12 +78,12 @@ class TestSpawnLink(unittest.TestCase): def func(): sleep(0.1) return 25 - job = spawn_link(func) + job = Job(func) job.kill_after(0.05) result = job.wait() assert isinstance(result, GreenletExit), repr(result) - job = spawn_link(func) + job = Job(func) job.kill_after(0.2) self.assertEqual(job.wait(), 25) sleep(0.2)