from unittest import main, TestCase from tests import SilencedTestCase from eventlet import coros, api class TestEvent(SilencedTestCase): def test_waiting_for_event(self): evt = coros.event() value = 'some stuff' def send_to_event(): evt.send(value) api.spawn(send_to_event) self.assertEqual(evt.wait(), value) def test_multiple_waiters(self): evt = coros.event() value = 'some stuff' results = [] def wait_on_event(i_am_done): evt.wait() results.append(True) i_am_done.send() waiters = [] count = 5 for i in range(count): waiters.append(coros.event()) api.spawn(wait_on_event, waiters[-1]) evt.send() for w in waiters: w.wait() self.assertEqual(len(results), count) def test_reset(self): evt = coros.event() # calling reset before send should throw self.assertRaises(AssertionError, evt.reset) value = 'some stuff' def send_to_event(): evt.send(value) api.spawn(send_to_event) self.assertEqual(evt.wait(), value) # now try it again, and we should get the same exact value, # and we shouldn't be allowed to resend without resetting value2 = 'second stuff' self.assertRaises(AssertionError, evt.send, value2) self.assertEqual(evt.wait(), value) # reset and everything should be happy evt.reset() def send_to_event2(): evt.send(value2) api.spawn(send_to_event2) self.assertEqual(evt.wait(), value2) def test_double_exception(self): evt = coros.event() # send an exception through the event evt.send(exc=RuntimeError('from test_double_exception')) self.assertRaises(RuntimeError, evt.wait) evt.reset() # shouldn't see the RuntimeError again api.exc_after(0.001, api.TimeoutError('from test_double_exception')) self.assertRaises(api.TimeoutError, evt.wait) class IncrActor(coros.Actor): def received(self, evt): self.value = getattr(self, 'value', 0) + 1 if evt: evt.send() class TestActor(SilencedTestCase): mode = 'static' def setUp(self): super(TestActor, self).setUp() self.actor = IncrActor() def tearDown(self): super(TestActor, self).tearDown() api.kill(self.actor._killer) def test_cast(self): evt = coros.event() self.actor.cast(evt) evt.wait() evt.reset() self.assertEqual(self.actor.value, 1) self.actor.cast(evt) evt.wait() self.assertEqual(self.actor.value, 2) def test_cast_multi_1(self): # make sure that both messages make it in there evt = coros.event() evt1 = coros.event() self.actor.cast(evt) self.actor.cast(evt1) evt.wait() evt1.wait() self.assertEqual(self.actor.value, 2) def test_cast_multi_2(self): # the actor goes through a slightly different code path if it # is forced to enter its event loop prior to any cast()s api.sleep(0) self.test_cast_multi_1() def test_sleeping_during_received(self): # ensure that even if the received method cooperatively # yields, eventually all messages are delivered msgs = [] waiters = [] def received( (message, evt) ): api.sleep(0) msgs.append(message) evt.send() self.actor.received = received waiters.append(coros.event()) self.actor.cast( (1, waiters[-1])) api.sleep(0) waiters.append(coros.event()) self.actor.cast( (2, waiters[-1]) ) waiters.append(coros.event()) self.actor.cast( (3, waiters[-1]) ) api.sleep(0) waiters.append(coros.event()) self.actor.cast( (4, waiters[-1]) ) waiters.append(coros.event()) self.actor.cast( (5, waiters[-1]) ) for evt in waiters: evt.wait() self.assertEqual(msgs, [1,2,3,4,5]) def test_raising_received(self): msgs = [] def received( (message, evt) ): evt.send() if message == 'fail': raise RuntimeError() else: msgs.append(message) self.actor.received = received evt = coros.event() self.actor.cast( ('fail', evt) ) evt.wait() evt.reset() self.actor.cast( ('should_appear', evt) ) evt.wait() self.assertEqual(['should_appear'], msgs) def test_multiple(self): self.actor = IncrActor(concurrency=2) total = [0] def received( (func, ev, value) ): func() total[0] += value ev.send() self.actor.received = received def onemoment(): api.sleep(0.1) evt = coros.event() evt1 = coros.event() self.actor.cast( (onemoment, evt, 1) ) self.actor.cast( (lambda: None, evt1, 2) ) evt1.wait() self.assertEqual(total[0], 2) # both coroutines should have been used self.assertEqual(self.actor._pool.current_size, 2) api.sleep(0) self.assertEqual(self.actor._pool.free(), 1) evt.wait() self.assertEqual(total[0], 3) api.sleep(0) self.assertEqual(self.actor._pool.free(), 2) if __name__ == '__main__': main()