removed Job & friends from coros; use proc instead

This commit is contained in:
Denis Bilenko
2009-01-11 19:51:18 +06:00
parent f42cdd32fa
commit f01b395fdf
2 changed files with 1 additions and 344 deletions

View File

@@ -266,226 +266,12 @@ class event(object):
return self.send(None, args)
class Job(object):
"""Spawn a greenlet, control its execution and collect the result.
use spawn_new() classmethod to spawn a new coroutine and get a new Job instance;
use kill() method to kill the greenlet running the function;
use wait() method to collect the result of the function.
"""
def __init__(self, ev=None):
if ev is None:
ev = event()
self.event = event()
@classmethod
def spawn_new(cls, function, *args, **kwargs):
job = cls()
job.spawn(function, *args, **kwargs)
return job
def spawn(self, function, *args, **kwargs):
assert not hasattr(self, 'greenlet_ref'), 'spawn can only be used once per instance'
g = api.spawn(_collect_result, weakref.ref(self), function, *args, **kwargs)
self.greenlet_ref = weakref.ref(g)
# spawn_later can be also implemented here
@property
def greenlet(self):
return self.greenlet_ref()
def __nonzero__(self):
greenlet = self.greenlet_ref()
if greenlet is not None and not greenlet.dead:
return True
return False
def __repr__(self):
klass = type(self).__name__
if self.greenlet is not None and self.greenlet.dead:
dead = '(dead)'
else:
dead = ''
return '<%s greenlet=%r%s event=%s>' % (klass, self.greenlet, dead, self.event)
def wait(self):
"""Wait for the spawned greenlet to exit.
Return the result of the function if it completed without errors;
re-raise the exception otherwise.
Return GreenletExit() object if the greenlet was killed.
"""
return self.event.wait()
def poll(self, notready=None):
return self.event.poll(notready)
def poll_result(self, notready=None):
return self.event.poll_result(notready)
def poll_exception(self, notready=None):
return self.event.poll_exception(notready)
def ready(self):
return self.event.ready()
def has_result(self):
return self.event.has_result()
def has_exception(self):
return self.event.has_exception()
def _send(self, result):
self.event.send(result)
def _send_exception(self, *throw_args):
self.event.send_exception(*throw_args)
def kill(self, *throw_args):
greenlet = self.greenlet_ref()
if greenlet is not None:
return api.kill(greenlet, *throw_args)
def kill_after(self, seconds):
return api.call_after_global(seconds, _kill_by_ref, weakref.ref(self))
def _kill_by_ref(async_job_ref):
async_job = async_job_ref()
if async_job is not None:
async_job.kill()
def _collect_result(job_ref, function, *args, **kwargs):
"""Execute *function* and send its result to job_ref().
If function raises GreenletExit() it's trapped and sent as a regular value.
If job_ref points to a dead object or if DEBUG is true the exception
will be re-raised.
"""
try:
result = function(*args, **kwargs)
except api.GreenletExit, ex:
job = job_ref()
if job is not None:
job._send(ex)
except:
job = job_ref()
if job is not None:
job._send_exception(*sys.exc_info())
if not DEBUG:
return
raise # let hub log the exception
else:
job = job_ref()
if job is not None:
job._send(result)
class GroupMemberJob(Job):
def __init__(self, group_queue, event=None):
self._group_queue = group_queue
Job.__init__(self, event)
def _send(self, result):
self._group_queue.send((self, result, None))
def _send_exception(self, *throw_args):
self._group_queue.send((self, None, throw_args))
class JobGroupExit(api.GreenletExit):
pass
class JobGroup(object):
"""Spawn jobs in the context of the group: when one job raises an exception,
all other jobs are killed immediatelly.
To spawn a job use spawn_job method which returns a Job instance.
>>> group = JobGroup()
>>> job = group.spawn_new(api.get_hub().switch) # give up control to hub forever
>>> _ = group.spawn_new(int, 'bad') # raise ValueError
>>> job.wait()
JobGroupExit('Killed because of ValueError in the group',)
"""
def __init__(self):
self._queue = queue()
self._jobs = []
self._waiter_job = Job.spawn_new(self._waiter)
self._killerror = None
def spawn_new(self, function, *args, **kwargs):
assert self._waiter_job.poll('run') == 'run'
job = GroupMemberJob(self._queue)
self._jobs.append(job)
if self._killerror is None:
job.spawn(function, *args, **kwargs)
else:
job.event.send(self._killerror)
return job
def kill_all(self, *throw_args):
assert self._waiter_job.poll('run') == 'run', '_waiter_job must live'
for job in self._jobs:
g = job.greenlet
if g is not None:
api.get_hub().schedule_call(0, g.throw, *throw_args)
api.sleep(0)
# QQQ: add kill_all_later(seconds, throw_args)
# add kill_delay attribute
def complete(self, *jobs):
assert self._waiter_job.poll('run') == 'run'
left = set(jobs)
for job in jobs:
if job.ready():
left.remove(job)
for job in left:
job.wait()
# XXX make jobs a list, because wait methods will have timeout parameter soon
def wait(self, *jobs):
self.complete(*jobs)
return [x.wait() for x in jobs]
def complete_all(self):
while True:
count = len(self._jobs)
self.complete(*self._jobs)
# it's possible that more jobs were added while we were waiting
if count == len(self._jobs):
break
def wait_all(self):
self.complete_all()
return [x.wait() for x in self._jobs]
def _waiter(self):
# XXX: this lives forever, fix it to exit after all jobs died
# XXX: add __nonzero__ method that returns whether JobGroup is alive
# XXX: 3 states: True (alive), finishing, False (all dead)
while True:
job, result, throw_args = self._queue.wait()
if throw_args is None:
if not job.event.ready():
job.event.send(result)
else:
if not job.event.ready():
job.event.send_exception(*throw_args)
if self._killerror is None:
type = throw_args[0]
self._killerror = JobGroupExit('Killed because of %s in the group' % type.__name__)
self.kill_all(self._killerror)
# cannot exit here, as I need to deliver GreenExits
class multievent(object):
"""is an event that can hold more than one value (it cannot be cancelled though)
is like a queue, but if there're waiters blocked, send/send_exception will wake up
all of them, just like an event will do (queue will wake up only one)
"""
# XXX to be removed
def __init__(self):
self.items = collections.deque()
@@ -589,7 +375,6 @@ class BoundedSemaphore(object):
the calling coroutine until count becomes nonzero again. Attempting to
release() after count has reached limit suspends the calling coroutine until
count becomes less than limit again.
"""
def __init__(self, count, limit):
if count > limit:

View File

@@ -57,134 +57,6 @@ class TestEvent(unittest.TestCase):
result = event2.wait()
raise AssertionError('Nobody sent anything to event2 yet it received %r' % (result, ))
class CommonJobTests:
def test_simple_return(self):
res = self.Job.spawn_new(lambda: 25).wait()
assert res==25, res
def test_exception(self):
try:
self.Job.spawn_new(sys.exit, 'bye').wait()
except SystemExit, ex:
assert ex.args == ('bye', )
else:
assert False, "Shouldn't get there"
def _test_kill(self, sync):
def func():
sleep(DELAY)
return 101
res = self.Job.spawn_new(func)
assert res
if sync:
res.kill()
else:
spawn(res.kill)
wait_result = res.wait()
assert not res, repr(res)
assert isinstance(wait_result, GreenletExit), repr(wait_result)
def test_kill_sync(self):
return self._test_kill(True)
def test_kill_async(self):
return self._test_kill(False)
def test_poll(self):
def func():
sleep(DELAY)
return 25
job = self.Job.spawn_new(func)
self.assertEqual(job.poll(), None)
assert job, repr(job)
self.assertEqual(job.wait(), 25)
self.assertEqual(job.poll(), 25)
assert not job, repr(job)
job = self.Job.spawn_new(func)
self.assertEqual(job.poll(5), 5)
assert job, repr(job)
self.assertEqual(job.wait(), 25)
self.assertEqual(job.poll(5), 25)
assert not job, repr(job)
def test_kill_after(self):
def func():
sleep(DELAY)
return 25
job = self.Job.spawn_new(func)
job.kill_after(DELAY/2)
result = job.wait()
assert isinstance(result, GreenletExit), repr(result)
job = self.Job.spawn_new(func)
job.kill_after(DELAY*2)
self.assertEqual(job.wait(), 25)
sleep(DELAY*2)
self.assertEqual(job.wait(), 25)
class TestJob(CommonJobTests, unittest.TestCase):
def setUp(self):
self.Job = Job
class TestJobGroup(CommonJobTests, unittest.TestCase):
def setUp(self):
self.Job = JobGroup()
def tearDown(self):
del self.Job
def check_raises_badint(self, wait):
try:
wait()
except ValueError, ex:
assert 'badint' in str(ex), str(ex)
else:
raise AssertionError('must raise ValueError')
def check_killed(self, wait, text=''):
result = wait()
assert isinstance(result, GreenletExit), repr(result)
assert str(result) == text, str(result)
def test_group_error(self):
x = self.Job.spawn_new(int, 'badint')
y = self.Job.spawn_new(sleep, DELAY)
self.check_killed(y.wait, 'Killed because of ValueError in the group')
self.check_raises_badint(x.wait)
z = self.Job.spawn_new(sleep, DELAY)
self.check_killed(z.wait, 'Killed because of ValueError in the group')
def test_wait_all(self):
x = self.Job.spawn_new(lambda : 1)
y = self.Job.spawn_new(lambda : 2)
z = self.Job.spawn_new(lambda : 3)
assert self.Job.wait_all() == [1, 2, 3], repr(self.Job.wait_all())
assert [x.wait(), y.wait(), z.wait()] == [1, 2, 3], [x.wait(), y.wait(), z.wait()]
def test_error_wait_all(self):
def x():
sleep(DELAY)
return 1
# x will be killed
x = self.Job.spawn_new(x)
# y will raise ValueError
y = self.Job.spawn_new(int, 'badint')
# z cannot be killed because it does not yield. it will finish successfully
z = self.Job.spawn_new(lambda : 3)
self.check_raises_badint(self.Job.wait_all)
self.check_killed(x.poll, 'Killed because of ValueError in the group')
self.check_killed(x.wait, 'Killed because of ValueError in the group')
assert z.wait() == 3, repr(z.wait())
self.check_raises_badint(y.wait)
# zz won't be even started, because there's already an error in the group
zz = self.Job.spawn_new(lambda : 4)
self.check_killed(x.poll, 'Killed because of ValueError in the group')
self.check_killed(x.wait, 'Killed because of ValueError in the group')
if __name__=='__main__':
unittest.main()