diff --git a/greentest/pools_test.py b/greentest/pools_test.py index 282067a..3b70560 100644 --- a/greentest/pools_test.py +++ b/greentest/pools_test.py @@ -22,13 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ -import time -import unittest +import sys from eventlet import api +from eventlet import channel from eventlet import coros from eventlet import pools - +from greentest import tests +from eventlet import timer class IntPool(pools.Pool): def create(self): @@ -36,7 +37,7 @@ class IntPool(pools.Pool): return self.current_integer -class TestIntPool(unittest.TestCase): +class TestIntPool(tests.TestCase): mode = 'static' def setUp(self): self.pool = IntPool(min_size=0, max_size=4) @@ -66,19 +67,13 @@ class TestIntPool(unittest.TestCase): self.assertEquals(self.pool.free(), 4) def test_exhaustion(self): - waiter = coros.event() + waiter = channel.channel() def consumer(): gotten = None - cancel = api.exc_after(1, api.TimeoutError) try: - print time.asctime(), "getting" gotten = self.pool.get() - print time.asctime(), "got" finally: - cancel.cancel() - print "waiter send" waiter.send(gotten) - print "waiter sent" api.spawn(consumer) @@ -89,17 +84,14 @@ class TestIntPool(unittest.TestCase): # Let consumer run; nothing will be in the pool, so he will wait api.sleep(0) - print "put in pool", one # Wake consumer self.pool.put(one) - print "done put" # wait for the consumer - self.assertEquals(waiter.wait(), one) - print "done wait" + self.assertEquals(waiter.receive(), one) def test_blocks_on_pool(self): - waiter = coros.event() + waiter = channel.channel() def greedy(): self.pool.get() self.pool.get() @@ -108,9 +100,7 @@ class TestIntPool(unittest.TestCase): # No one should be waiting yet. self.assertEquals(self.pool.waiting(), 0) # The call to the next get will unschedule this routine. - print "calling get" self.pool.get() - print "called get" # So this send should never be called. waiter.send('Failed!') @@ -125,13 +115,40 @@ class TestIntPool(unittest.TestCase): ## Greedy should be blocking on the last get self.assertEquals(self.pool.waiting(), 1) - ## Send will never be called, so the event should not be ready. - self.assertEquals(waiter.ready(), False) + ## Send will never be called, so balance should be 0. + self.assertEquals(waiter.balance, 0) api.kill(killable) + def test_ordering(self): + # normal case is that items come back out in the + # same order they are put + one, two = self.pool.get(), self.pool.get() + self.pool.put(one) + self.pool.put(two) + self.assertEquals(self.pool.get(), one) + self.assertEquals(self.pool.get(), two) -class TestAbstract(unittest.TestCase): + def test_putting_to_queue(self): + timer = api.exc_after(0.1, api.TimeoutError) + size = 2 + self.pool = IntPool(min_size=0, max_size=size) + queue = coros.queue() + results = [] + def just_put(pool_item, index): + self.pool.put(pool_item) + queue.send(index) + for index in xrange(size + 1): + pool_item = self.pool.get() + api.spawn(just_put, pool_item, index) + + while results != range(size + 1): + x = queue.wait() + results.append(x) + timer.cancel() + + +class TestAbstract(tests.TestCase): mode = 'static' def test_abstract(self): ## Going for 100% coverage here @@ -140,7 +157,7 @@ class TestAbstract(unittest.TestCase): self.assertRaises(NotImplementedError, pool.get) -class TestIntPool2(unittest.TestCase): +class TestIntPool2(tests.TestCase): mode = 'static' def setUp(self): self.pool = IntPool(min_size=3, max_size=3) @@ -152,8 +169,261 @@ class TestIntPool2(unittest.TestCase): gotten = self.pool.get() self.assertEquals(gotten, 1) - - -if __name__ == '__main__': - unittest.main() + +class TestOrderAsStack(tests.TestCase): + mode = 'static' + def setUp(self): + self.pool = IntPool(max_size=3, order_as_stack=True) + + def test_ordering(self): + # items come out in the reverse order they are put + one, two = self.pool.get(), self.pool.get() + self.pool.put(one) + self.pool.put(two) + self.assertEquals(self.pool.get(), two) + self.assertEquals(self.pool.get(), one) + + +class RaisePool(pools.Pool): + def create(self): + raise RuntimeError() + + +class TestCreateRaises(tests.TestCase): + mode = 'static' + def setUp(self): + self.pool = RaisePool(max_size=3) + + def test_it(self): + self.assertEquals(self.pool.free(), 3) + self.assertRaises(RuntimeError, self.pool.get) + self.assertEquals(self.pool.free(), 3) + + +ALWAYS = RuntimeError('I always fail') +SOMETIMES = RuntimeError('I fail half the time') + + +class TestTookTooLong(Exception): + pass + +class TestFan(tests.TestCase): + mode = 'static' + def setUp(self): + self.timer = api.exc_after(1, TestTookTooLong()) + self.pool = IntPool(max_size=2) + + def tearDown(self): + self.timer.cancel() + + def test_with_list(self): + list_of_input = ['agent-one', 'agent-two', 'agent-three'] + + def my_callable(pool_item, next_thing): + ## Do some "blocking" (yielding) thing + api.sleep(0.01) + return next_thing + + output = self.pool.fan(my_callable, list_of_input) + self.assertEquals(list_of_input, output) + + def test_all_fail(self): + def my_failure(pool_item, next_thing): + raise ALWAYS + self.assertRaises(pools.AllFailed, self.pool.fan, my_failure, range(4)) + + def test_some_fail(self): + def my_failing_callable(pool_item, next_thing): + if next_thing % 2: + raise SOMETIMES + return next_thing + self.assertRaises(pools.SomeFailed, self.pool.fan, my_failing_callable, range(4)) + + +class TestCoroutinePool(tests.TestCase): + mode = 'static' + def setUp(self): + # raise an exception if we're waiting forever + self._cancel_timeout = api.exc_after(1, TestTookTooLong()) + + def tearDown(self): + self._cancel_timeout.cancel() + + def test_execute_async(self): + done = coros.event() + def some_work(): + done.send() + pool = pools.CoroutinePool(0, 2) + pool.execute_async(some_work) + done.wait() + + def test_execute(self): + value = 'return value' + def some_work(): + return value + pool = pools.CoroutinePool(0, 2) + worker = pool.execute(some_work) + self.assertEqual(value, worker.wait()) + + def test_multiple_coros(self): + evt = coros.event() + results = [] + def producer(): + results.append('prod') + evt.send() + + def consumer(): + results.append('cons1') + evt.wait() + results.append('cons2') + + pool = pools.CoroutinePool(0, 2) + done = pool.execute(consumer) + pool.execute_async(producer) + done.wait() + self.assertEquals(['cons1', 'prod', 'cons2'], results) + + def test_timer_cancel(self): + def some_work(): + t = timer.Timer(5, lambda: None) + t.autocancellable = True + t.schedule() + return t + pool = pools.CoroutinePool(0, 2) + worker = pool.execute(some_work) + t = worker.wait() + api.sleep(0) + self.assertEquals(t.cancelled, True) + + def test_reentrant(self): + pool = pools.CoroutinePool(0,1) + def reenter(): + waiter = pool.execute(lambda a: a, 'reenter') + self.assertEqual('reenter', waiter.wait()) + + outer_waiter = pool.execute(reenter) + outer_waiter.wait() + + evt = coros.event() + def reenter_async(): + pool.execute_async(lambda a: a, 'reenter') + evt.send('done') + + pool.execute_async(reenter_async) + evt.wait() + + def test_horrible_main_loop_death(self): + # testing the case that causes the run_forever + # method to exit unwantedly + pool = pools.CoroutinePool(min_size=1, max_size=1) + def crash(*args, **kw): + raise RuntimeError("Whoa") + class FakeFile(object): + write = crash + + # we're going to do this by causing the traceback.print_exc in + # safe_apply to raise an exception and thus exit _main_loop + normal_err = sys.stderr + try: + sys.stderr = FakeFile() + waiter = pool.execute(crash) + self.assertRaises(RuntimeError, waiter.wait) + # the pool should have something free at this point since the + # waiter returned + self.assertEqual(pool.free(), 1) + # shouldn't block when trying to get + t = api.exc_after(0.1, api.TimeoutError) + self.assert_(pool.get()) + t.cancel() + finally: + sys.stderr = normal_err + + def test_track_events(self): + pool = pools.CoroutinePool(track_events=True) + for x in range(6): + pool.execute(lambda n: n, x) + for y in range(6): + pool.wait() + + def test_track_slow_event(self): + pool = pools.CoroutinePool(track_events=True) + def slow(): + api.sleep(0.1) + return 'ok' + pool.execute(slow) + self.assertEquals(pool.wait(), 'ok') + + def test_channel_smash(self): + # The premise is that the coroutine in the pool exhibits an + # interest in receiving data from the channel, but then times + # out and gets recycled, so it ceases to care about what gets + # sent over the channel. The pool should be able to tell the + # channel about the sudden change of heart, or else, when we + # eventually do send something into the channel it will catch + # the coroutine pool's coroutine in an awkward place, losing + # the data that we're sending. + from eventlet import pools + pool = pools.CoroutinePool(min_size=1, max_size=1) + tp = pools.TokenPool(max_size=1) + token = tp.get() # empty pool + def do_receive(tp): + api.exc_after(0, RuntimeError()) + try: + t = tp.get() + self.fail("Shouldn't have recieved anything from the pool") + except RuntimeError: + return 'timed out' + + # the execute makes the pool expect that coroutine, but then + # immediately cuts bait + e1 = pool.execute(do_receive, tp) + self.assertEquals(e1.wait(), 'timed out') + + # the pool can get some random item back + def send_wakeup(tp): + tp.put('wakeup') + api.spawn(send_wakeup, tp) + + # now we ask the pool to run something else, which should not + # be affected by the previous send at all + def resume(): + return 'resumed' + e2 = pool.execute(resume) + self.assertEquals(e2.wait(), 'resumed') + + # we should be able to get out the thing we put in there, too + self.assertEquals(tp.get(), 'wakeup') + + def test_channel_death(self): + # In here, we have a coroutine trying to receive data from a + # channel, but timing out immediately and dying. The channel + # should be smart enough to not try to send data to a dead + # coroutine, because if it tries to, it'll lose the data. + from eventlet import pools + tp = pools.TokenPool(max_size=1) + token = tp.get() + e1 = coros.event() + def do_receive(evt, tp): + api.exc_after(0, RuntimeError()) + try: + t = tp.get() + evt.send(t) + except RuntimeError: + evt.send('timed out') + + # the execute gets the pool to add a waiter, but then kills + # itself off + api.spawn(do_receive, e1, tp) + self.assertEquals(e1.wait(), 'timed out') + + def send_wakeup(tp): + tp.put('wakeup') + api.spawn(send_wakeup, tp) + + # should be able to retrieve the message + self.assertEquals(tp.get(), 'wakeup') + + +if __name__ == '__main__': + tests.main()