remove CoroutinePool's tests from coros_test.py (they are in test__pool.py now)

This commit is contained in:
Denis Bilenko
2009-06-08 14:51:13 +07:00
parent 01d7f6dfc8
commit 59b6f7605a

View File

@@ -114,121 +114,6 @@ class TestEvent(tests.TestCase):
self.assertRaises(api.TimeoutError, evt.wait)
class TestCoroutinePool(tests.TestCase):
mode = 'static'
def setUp(self):
# raise an exception if we're waiting forever
self._cancel_timeout = api.exc_after(1, api.TimeoutError)
def tearDown(self):
self._cancel_timeout.cancel()
def test_execute_async(self):
done = coros.event()
def some_work():
done.send()
pool = coros.CoroutinePool(0, 2)
pool.execute_async(some_work)
done.wait()
def test_execute(self):
value = 'return value'
def some_work():
return value
pool = coros.CoroutinePool(0, 2)
worker = pool.execute(some_work)
self.assertEqual(value, worker.wait())
def test_multiple_coros(self):
evt = coros.event()
results = []
def producer():
results.append('prod')
evt.send()
def consumer():
results.append('cons1')
evt.wait()
results.append('cons2')
pool = coros.CoroutinePool(0, 2)
done = pool.execute(consumer)
pool.execute_async(producer)
done.wait()
self.assertEquals(['cons1', 'prod', 'cons2'], results)
# since CoroutinePool does not kill the greenlet, the following does not work
# def test_timer_cancel(self):
# def some_work():
# t = timer.LocalTimer(5, lambda: None)
# t.schedule()
# return t
# pool = coros.CoroutinePool(0, 2)
# worker = pool.execute(some_work)
# t = worker.wait()
# api.sleep(0)
# self.assertEquals(t.cancelled, True)
def test_reentrant(self):
pool = coros.CoroutinePool(0,1)
def reenter():
waiter = pool.execute(lambda a: a, 'reenter')
self.assertEqual('reenter', waiter.wait())
outer_waiter = pool.execute(reenter)
outer_waiter.wait()
evt = coros.event()
def reenter_async():
pool.execute_async(lambda a: a, 'reenter')
evt.send('done')
pool.execute_async(reenter_async)
evt.wait()
def test_horrible_main_loop_death(self):
# testing the case that causes the run_forever
# method to exit unwantedly
pool = coros.CoroutinePool(min_size=1, max_size=1)
def crash(*args, **kw):
raise RuntimeError("Whoa")
class FakeFile(object):
write = crash
# we're going to do this by causing the traceback.print_exc in
# safe_apply to raise an exception and thus exit _main_loop
normal_err = sys.stderr
try:
sys.stderr = FakeFile()
waiter = pool.execute(crash)
self.assertRaises(RuntimeError, waiter.wait)
# the pool should have something free at this point since the
# waiter returned
self.assertEqual(pool.free(), 1)
# shouldn't block when trying to get
t = api.exc_after(0.1, api.TimeoutError)
self.assert_(pool.get())
t.cancel()
finally:
sys.stderr = normal_err
def test_track_events(self):
pool = coros.CoroutinePool(track_events=True)
for x in range(6):
pool.execute(lambda n: n, x)
for y in range(6):
print "wait", y
pool.wait()
def test_track_slow_event(self):
pool = coros.CoroutinePool(track_events=True)
def slow():
api.sleep(0.1)
return 'ok'
pool.execute(slow)
self.assertEquals(pool.wait(), 'ok')
class IncrActor(coros.Actor):
def received(self, evt):
self.value = getattr(self, 'value', 0) + 1