diff --git a/greentest/coros_test.py b/greentest/coros_test.py index 4f52344..6bf9c56 100644 --- a/greentest/coros_test.py +++ b/greentest/coros_test.py @@ -114,121 +114,6 @@ class TestEvent(tests.TestCase): self.assertRaises(api.TimeoutError, evt.wait) -class TestCoroutinePool(tests.TestCase): - mode = 'static' - def setUp(self): - # raise an exception if we're waiting forever - self._cancel_timeout = api.exc_after(1, api.TimeoutError) - - def tearDown(self): - self._cancel_timeout.cancel() - - def test_execute_async(self): - done = coros.event() - def some_work(): - done.send() - pool = coros.CoroutinePool(0, 2) - pool.execute_async(some_work) - done.wait() - - def test_execute(self): - value = 'return value' - def some_work(): - return value - pool = coros.CoroutinePool(0, 2) - worker = pool.execute(some_work) - self.assertEqual(value, worker.wait()) - - def test_multiple_coros(self): - evt = coros.event() - results = [] - def producer(): - results.append('prod') - evt.send() - - def consumer(): - results.append('cons1') - evt.wait() - results.append('cons2') - - pool = coros.CoroutinePool(0, 2) - done = pool.execute(consumer) - pool.execute_async(producer) - done.wait() - self.assertEquals(['cons1', 'prod', 'cons2'], results) - -# since CoroutinePool does not kill the greenlet, the following does not work -# def test_timer_cancel(self): -# def some_work(): -# t = timer.LocalTimer(5, lambda: None) -# t.schedule() -# return t -# pool = coros.CoroutinePool(0, 2) -# worker = pool.execute(some_work) -# t = worker.wait() -# api.sleep(0) -# self.assertEquals(t.cancelled, True) - - def test_reentrant(self): - pool = coros.CoroutinePool(0,1) - def reenter(): - waiter = pool.execute(lambda a: a, 'reenter') - self.assertEqual('reenter', waiter.wait()) - - outer_waiter = pool.execute(reenter) - outer_waiter.wait() - - evt = coros.event() - def reenter_async(): - pool.execute_async(lambda a: a, 'reenter') - evt.send('done') - - pool.execute_async(reenter_async) - evt.wait() - - def test_horrible_main_loop_death(self): - # testing the case that causes the run_forever - # method to exit unwantedly - pool = coros.CoroutinePool(min_size=1, max_size=1) - def crash(*args, **kw): - raise RuntimeError("Whoa") - class FakeFile(object): - write = crash - - # we're going to do this by causing the traceback.print_exc in - # safe_apply to raise an exception and thus exit _main_loop - normal_err = sys.stderr - try: - sys.stderr = FakeFile() - waiter = pool.execute(crash) - self.assertRaises(RuntimeError, waiter.wait) - # the pool should have something free at this point since the - # waiter returned - self.assertEqual(pool.free(), 1) - # shouldn't block when trying to get - t = api.exc_after(0.1, api.TimeoutError) - self.assert_(pool.get()) - t.cancel() - finally: - sys.stderr = normal_err - - def test_track_events(self): - pool = coros.CoroutinePool(track_events=True) - for x in range(6): - pool.execute(lambda n: n, x) - for y in range(6): - print "wait", y - pool.wait() - - def test_track_slow_event(self): - pool = coros.CoroutinePool(track_events=True) - def slow(): - api.sleep(0.1) - return 'ok' - pool.execute(slow) - self.assertEquals(pool.wait(), 'ok') - - class IncrActor(coros.Actor): def received(self, evt): self.value = getattr(self, 'value', 0) + 1