From f01b395fdf7150d59b4c0813f7c922f58c8fb599 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Sun, 11 Jan 2009 19:51:18 +0600 Subject: [PATCH] removed Job & friends from coros; use proc instead --- eventlet/coros.py | 217 +-------------------------------------- greentest/test__event.py | 128 ----------------------- 2 files changed, 1 insertion(+), 344 deletions(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index 80aa11b..b2cd0fa 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -266,226 +266,12 @@ class event(object): return self.send(None, args) -class Job(object): - """Spawn a greenlet, control its execution and collect the result. - - use spawn_new() classmethod to spawn a new coroutine and get a new Job instance; - use kill() method to kill the greenlet running the function; - use wait() method to collect the result of the function. - """ - - def __init__(self, ev=None): - if ev is None: - ev = event() - self.event = event() - - @classmethod - def spawn_new(cls, function, *args, **kwargs): - job = cls() - job.spawn(function, *args, **kwargs) - return job - - def spawn(self, function, *args, **kwargs): - assert not hasattr(self, 'greenlet_ref'), 'spawn can only be used once per instance' - g = api.spawn(_collect_result, weakref.ref(self), function, *args, **kwargs) - self.greenlet_ref = weakref.ref(g) - - # spawn_later can be also implemented here - - @property - def greenlet(self): - return self.greenlet_ref() - - def __nonzero__(self): - greenlet = self.greenlet_ref() - if greenlet is not None and not greenlet.dead: - return True - return False - - def __repr__(self): - klass = type(self).__name__ - if self.greenlet is not None and self.greenlet.dead: - dead = '(dead)' - else: - dead = '' - return '<%s greenlet=%r%s event=%s>' % (klass, self.greenlet, dead, self.event) - - def wait(self): - """Wait for the spawned greenlet to exit. - Return the result of the function if it completed without errors; - re-raise the exception otherwise. - - Return GreenletExit() object if the greenlet was killed. - """ - return self.event.wait() - - def poll(self, notready=None): - return self.event.poll(notready) - - def poll_result(self, notready=None): - return self.event.poll_result(notready) - - def poll_exception(self, notready=None): - return self.event.poll_exception(notready) - - def ready(self): - return self.event.ready() - - def has_result(self): - return self.event.has_result() - - def has_exception(self): - return self.event.has_exception() - - def _send(self, result): - self.event.send(result) - - def _send_exception(self, *throw_args): - self.event.send_exception(*throw_args) - - def kill(self, *throw_args): - greenlet = self.greenlet_ref() - if greenlet is not None: - return api.kill(greenlet, *throw_args) - - def kill_after(self, seconds): - return api.call_after_global(seconds, _kill_by_ref, weakref.ref(self)) - -def _kill_by_ref(async_job_ref): - async_job = async_job_ref() - if async_job is not None: - async_job.kill() - - -def _collect_result(job_ref, function, *args, **kwargs): - """Execute *function* and send its result to job_ref(). - - If function raises GreenletExit() it's trapped and sent as a regular value. - If job_ref points to a dead object or if DEBUG is true the exception - will be re-raised. - """ - try: - result = function(*args, **kwargs) - except api.GreenletExit, ex: - job = job_ref() - if job is not None: - job._send(ex) - except: - job = job_ref() - if job is not None: - job._send_exception(*sys.exc_info()) - if not DEBUG: - return - raise # let hub log the exception - else: - job = job_ref() - if job is not None: - job._send(result) - -class GroupMemberJob(Job): - - def __init__(self, group_queue, event=None): - self._group_queue = group_queue - Job.__init__(self, event) - - def _send(self, result): - self._group_queue.send((self, result, None)) - - def _send_exception(self, *throw_args): - self._group_queue.send((self, None, throw_args)) - - -class JobGroupExit(api.GreenletExit): - pass - -class JobGroup(object): - """Spawn jobs in the context of the group: when one job raises an exception, - all other jobs are killed immediatelly. - - To spawn a job use spawn_job method which returns a Job instance. - >>> group = JobGroup() - >>> job = group.spawn_new(api.get_hub().switch) # give up control to hub forever - >>> _ = group.spawn_new(int, 'bad') # raise ValueError - >>> job.wait() - JobGroupExit('Killed because of ValueError in the group',) - """ - - def __init__(self): - self._queue = queue() - self._jobs = [] - self._waiter_job = Job.spawn_new(self._waiter) - self._killerror = None - - def spawn_new(self, function, *args, **kwargs): - assert self._waiter_job.poll('run') == 'run' - job = GroupMemberJob(self._queue) - self._jobs.append(job) - if self._killerror is None: - job.spawn(function, *args, **kwargs) - else: - job.event.send(self._killerror) - return job - - def kill_all(self, *throw_args): - assert self._waiter_job.poll('run') == 'run', '_waiter_job must live' - for job in self._jobs: - g = job.greenlet - if g is not None: - api.get_hub().schedule_call(0, g.throw, *throw_args) - api.sleep(0) - - # QQQ: add kill_all_later(seconds, throw_args) - # add kill_delay attribute - - def complete(self, *jobs): - assert self._waiter_job.poll('run') == 'run' - left = set(jobs) - for job in jobs: - if job.ready(): - left.remove(job) - for job in left: - job.wait() - - # XXX make jobs a list, because wait methods will have timeout parameter soon - def wait(self, *jobs): - self.complete(*jobs) - return [x.wait() for x in jobs] - - def complete_all(self): - while True: - count = len(self._jobs) - self.complete(*self._jobs) - # it's possible that more jobs were added while we were waiting - if count == len(self._jobs): - break - - def wait_all(self): - self.complete_all() - return [x.wait() for x in self._jobs] - - def _waiter(self): - # XXX: this lives forever, fix it to exit after all jobs died - # XXX: add __nonzero__ method that returns whether JobGroup is alive - # XXX: 3 states: True (alive), finishing, False (all dead) - while True: - job, result, throw_args = self._queue.wait() - if throw_args is None: - if not job.event.ready(): - job.event.send(result) - else: - if not job.event.ready(): - job.event.send_exception(*throw_args) - if self._killerror is None: - type = throw_args[0] - self._killerror = JobGroupExit('Killed because of %s in the group' % type.__name__) - self.kill_all(self._killerror) - # cannot exit here, as I need to deliver GreenExits - class multievent(object): """is an event that can hold more than one value (it cannot be cancelled though) is like a queue, but if there're waiters blocked, send/send_exception will wake up all of them, just like an event will do (queue will wake up only one) """ + # XXX to be removed def __init__(self): self.items = collections.deque() @@ -589,7 +375,6 @@ class BoundedSemaphore(object): the calling coroutine until count becomes nonzero again. Attempting to release() after count has reached limit suspends the calling coroutine until count becomes less than limit again. - """ def __init__(self, count, limit): if count > limit: diff --git a/greentest/test__event.py b/greentest/test__event.py index 6ee09cb..89d28eb 100644 --- a/greentest/test__event.py +++ b/greentest/test__event.py @@ -57,134 +57,6 @@ class TestEvent(unittest.TestCase): result = event2.wait() raise AssertionError('Nobody sent anything to event2 yet it received %r' % (result, )) -class CommonJobTests: - - def test_simple_return(self): - res = self.Job.spawn_new(lambda: 25).wait() - assert res==25, res - - def test_exception(self): - try: - self.Job.spawn_new(sys.exit, 'bye').wait() - except SystemExit, ex: - assert ex.args == ('bye', ) - else: - assert False, "Shouldn't get there" - - def _test_kill(self, sync): - def func(): - sleep(DELAY) - return 101 - res = self.Job.spawn_new(func) - assert res - if sync: - res.kill() - else: - spawn(res.kill) - wait_result = res.wait() - assert not res, repr(res) - assert isinstance(wait_result, GreenletExit), repr(wait_result) - - def test_kill_sync(self): - return self._test_kill(True) - - def test_kill_async(self): - return self._test_kill(False) - - def test_poll(self): - def func(): - sleep(DELAY) - return 25 - job = self.Job.spawn_new(func) - self.assertEqual(job.poll(), None) - assert job, repr(job) - self.assertEqual(job.wait(), 25) - self.assertEqual(job.poll(), 25) - assert not job, repr(job) - - job = self.Job.spawn_new(func) - self.assertEqual(job.poll(5), 5) - assert job, repr(job) - self.assertEqual(job.wait(), 25) - self.assertEqual(job.poll(5), 25) - assert not job, repr(job) - - def test_kill_after(self): - def func(): - sleep(DELAY) - return 25 - job = self.Job.spawn_new(func) - job.kill_after(DELAY/2) - result = job.wait() - assert isinstance(result, GreenletExit), repr(result) - - job = self.Job.spawn_new(func) - job.kill_after(DELAY*2) - self.assertEqual(job.wait(), 25) - sleep(DELAY*2) - self.assertEqual(job.wait(), 25) - -class TestJob(CommonJobTests, unittest.TestCase): - - def setUp(self): - self.Job = Job - -class TestJobGroup(CommonJobTests, unittest.TestCase): - - def setUp(self): - self.Job = JobGroup() - - def tearDown(self): - del self.Job - - def check_raises_badint(self, wait): - try: - wait() - except ValueError, ex: - assert 'badint' in str(ex), str(ex) - else: - raise AssertionError('must raise ValueError') - - def check_killed(self, wait, text=''): - result = wait() - assert isinstance(result, GreenletExit), repr(result) - assert str(result) == text, str(result) - - def test_group_error(self): - x = self.Job.spawn_new(int, 'badint') - y = self.Job.spawn_new(sleep, DELAY) - self.check_killed(y.wait, 'Killed because of ValueError in the group') - self.check_raises_badint(x.wait) - z = self.Job.spawn_new(sleep, DELAY) - self.check_killed(z.wait, 'Killed because of ValueError in the group') - - def test_wait_all(self): - x = self.Job.spawn_new(lambda : 1) - y = self.Job.spawn_new(lambda : 2) - z = self.Job.spawn_new(lambda : 3) - assert self.Job.wait_all() == [1, 2, 3], repr(self.Job.wait_all()) - assert [x.wait(), y.wait(), z.wait()] == [1, 2, 3], [x.wait(), y.wait(), z.wait()] - - def test_error_wait_all(self): - def x(): - sleep(DELAY) - return 1 - # x will be killed - x = self.Job.spawn_new(x) - # y will raise ValueError - y = self.Job.spawn_new(int, 'badint') - # z cannot be killed because it does not yield. it will finish successfully - z = self.Job.spawn_new(lambda : 3) - self.check_raises_badint(self.Job.wait_all) - self.check_killed(x.poll, 'Killed because of ValueError in the group') - self.check_killed(x.wait, 'Killed because of ValueError in the group') - assert z.wait() == 3, repr(z.wait()) - self.check_raises_badint(y.wait) - - # zz won't be even started, because there's already an error in the group - zz = self.Job.spawn_new(lambda : 4) - self.check_killed(x.poll, 'Killed because of ValueError in the group') - self.check_killed(x.wait, 'Killed because of ValueError in the group') if __name__=='__main__': unittest.main()