test__coros_queue: check that queue(0) behaves like a channel (thanks to Marcus Cavanaugh for pointing this out)

This commit is contained in:
Denis Bilenko
2009-06-22 15:49:00 +07:00
parent f1500d666a
commit 68f3d11deb

View File

@@ -191,5 +191,52 @@ class TestQueue(LimitedTestCase):
self.assertEquals('hi', e1.wait())
self.assertEquals(0, waiting(q))
class TestChannel(LimitedTestCase):
def test_send(self):
api.sleep(0.1)
channel = coros.queue(0)
events = []
def another_greenlet():
events.append(channel.wait())
events.append(channel.wait())
proc.spawn(another_greenlet)
events.append('sending')
channel.send('hello')
events.append('sent hello')
channel.send('world')
events.append('sent world')
self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events)
def test_wait(self):
api.sleep(0.1)
channel = coros.queue(0)
events = []
def another_greenlet():
events.append('sending hello')
channel.send('hello')
events.append('sending world')
channel.send('world')
events.append('sent world')
proc.spawn(another_greenlet)
events.append('waiting')
events.append(channel.wait())
events.append(channel.wait())
self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events)
api.sleep(0)
self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)
if __name__=='__main__':
main()