diff --git a/tests/zmq_test.py b/tests/zmq_test.py index de6a44c..734dff4 100644 --- a/tests/zmq_test.py +++ b/tests/zmq_test.py @@ -1,9 +1,3 @@ -from __future__ import with_statement - -from eventlet import event, spawn, sleep, semaphore -from nose.tools import * -from tests import check_idle_cpu_usage, LimitedTestCase, using_pyevent, skip_unless - try: from eventlet.green import zmq except ImportError: @@ -11,23 +5,27 @@ except ImportError: else: RECV_ON_CLOSED_SOCKET_ERRNOS = (zmq.ENOTSUP, zmq.ENOTSOCK) +import eventlet +from eventlet import event, spawn, sleep, semaphore +import tests + def zmq_supported(_): try: import zmq except ImportError: return False - return not using_pyevent(_) + return not tests.using_pyevent(_) -class TestUpstreamDownStream(LimitedTestCase): - @skip_unless(zmq_supported) +class TestUpstreamDownStream(tests.LimitedTestCase): + @tests.skip_unless(zmq_supported) def setUp(self): super(TestUpstreamDownStream, self).setUp() self.context = zmq.Context() self.sockets = [] - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def tearDown(self): self.clear_up_sockets() super(TestUpstreamDownStream, self).tearDown() @@ -65,7 +63,7 @@ class TestUpstreamDownStream(LimitedTestCase): else: self.fail("Function did not raise any error") - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_close_linger(self): """Socket.close() must support linger argument. @@ -75,7 +73,7 @@ class TestUpstreamDownStream(LimitedTestCase): sock1.close(1) sock2.close(linger=0) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_recv_spawned_before_send_is_non_blocking(self): req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR) # req.connect(ipc) @@ -93,7 +91,7 @@ class TestUpstreamDownStream(LimitedTestCase): done.wait() self.assertEqual(msg['res'], b'test') - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_close_socket_raises_enotsup(self): req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR) @@ -102,7 +100,7 @@ class TestUpstreamDownStream(LimitedTestCase): self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv) self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test') - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_close_xsocket_raises_enotsup(self): req, rep, port = self.create_bound_pair(zmq.XREQ, zmq.XREP) @@ -111,7 +109,7 @@ class TestUpstreamDownStream(LimitedTestCase): self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv) self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test') - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_send_1k_req_rep(self): req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP) sleep() @@ -137,7 +135,7 @@ class TestUpstreamDownStream(LimitedTestCase): final_i = done.wait() self.assertEqual(final_i, 0) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_send_1k_push_pull(self): down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL) sleep() @@ -161,7 +159,7 @@ class TestUpstreamDownStream(LimitedTestCase): final_i = done.wait() self.assertEqual(final_i, 0) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_send_1k_pub_sub(self): pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB) sub1 = self.context.socket(zmq.SUB) @@ -210,46 +208,62 @@ class TestUpstreamDownStream(LimitedTestCase): self.assertEqual(sub2_count, 500) self.assertEqual(sub_all_count, 1000) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_change_subscription(self): + # FIXME: Extensive testing showed this particular test is the root cause + # of sporadic failures on Travis. pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB) sub.setsockopt(zmq.SUBSCRIBE, b'test') - - sleep(0.2) + sleep(0) + sub_ready = event.Event() + sub_last = event.Event() sub_done = event.Event() - def rx(sock, done_evt): + def rx(): + while sub.recv() != b'test BEGIN': + sleep(0) + sub_ready.send() count = 0 - sub = b'test' while True: - msg = sock.recv() - sleep() - if b'DONE' in msg: + msg = sub.recv() + if msg == b'test BEGIN': + # BEGIN may come many times + continue + if msg == b'test LAST': + sub.setsockopt(zmq.SUBSCRIBE, b'done') + sub.setsockopt(zmq.UNSUBSCRIBE, b'test') + sleep(0) + # In real application you should either sync + # or tolerate loss of messages. + sub_last.send() + if msg == b'done DONE': break - if b'LAST' in msg and sub == b'test': - sock.setsockopt(zmq.UNSUBSCRIBE, b'test') - sock.setsockopt(zmq.SUBSCRIBE, b'done') - sub = b'done' count += 1 - done_evt.send(count) + sub_done.send(count) - def tx(sock): + def tx(): + # Sync receiver ready to avoid loss of first packets + while not sub_ready.ready(): + pub.send(b'test BEGIN') + sleep(0.005) for i in range(1, 101): - msg = ("test %s" % i).encode() + msg = 'test {0}'.format(i).encode() if i != 50: - sock.send(msg) + pub.send(msg) else: - sock.send(b'test LAST') - sleep() - sock.send(b'done DONE') - - spawn(rx, sub, sub_done) - spawn(tx, pub) + pub.send(b'test LAST') + sub_last.wait() + # XXX: putting a real delay of 1ms here fixes sporadic failures on Travis + # just yield sleep(0) doesn't cut it + sleep(0.001) + pub.send(b'done DONE') + eventlet.spawn(rx) + eventlet.spawn(tx) rx_count = sub_done.wait() self.assertEqual(rx_count, 50) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_recv_multipart_bug68(self): req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP) msg = [b''] @@ -267,13 +281,13 @@ class TestUpstreamDownStream(LimitedTestCase): # but it's private __str__ appears to be the way to go self.assertEqual([m.bytes for m in recieved_msg], msg2) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_recv_noblock_bug76(self): req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP) self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK) self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_send_during_recv(self): sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) sleep() @@ -308,7 +322,7 @@ class TestUpstreamDownStream(LimitedTestCase): for evt in done_evts: self.assertEqual(evt.wait(), 0) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_send_during_recv_multipart(self): sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) sleep() @@ -347,7 +361,7 @@ class TestUpstreamDownStream(LimitedTestCase): self.assertEqual(final_i, 0) # Need someway to ensure a thread is blocked on send... This isn't working - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_recv_during_send(self): sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) sleep() @@ -376,7 +390,7 @@ class TestUpstreamDownStream(LimitedTestCase): final_i = done.wait() self.assertEqual(final_i, 0) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_close_during_recv(self): sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) sleep() @@ -396,7 +410,7 @@ class TestUpstreamDownStream(LimitedTestCase): done1.wait() done2.wait() - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_getsockopt_events(self): sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER) sleep() @@ -415,7 +429,7 @@ class TestUpstreamDownStream(LimitedTestCase): events = sock2.getsockopt(zmq.EVENTS) self.assertEqual(events & zmq.POLLIN, zmq.POLLIN) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_cpu_usage_after_bind(self): """zmq eats CPU after PUB socket .bind() @@ -432,9 +446,9 @@ class TestUpstreamDownStream(LimitedTestCase): self.sockets.append(sock) sock.bind_to_random_port("tcp://127.0.0.1") sleep() - check_idle_cpu_usage(0.2, 0.1) + tests.check_idle_cpu_usage(0.2, 0.1) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_cpu_usage_after_pub_send_or_dealer_recv(self): """zmq eats CPU after PUB send or DEALER recv. @@ -444,18 +458,18 @@ class TestUpstreamDownStream(LimitedTestCase): sub.setsockopt(zmq.SUBSCRIBE, b"") sleep() pub.send(b'test_send') - check_idle_cpu_usage(0.2, 0.1) + tests.check_idle_cpu_usage(0.2, 0.1) sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER) sleep() sender.send(b'test_recv') msg = receiver.recv() self.assertEqual(msg, b'test_recv') - check_idle_cpu_usage(0.2, 0.1) + tests.check_idle_cpu_usage(0.2, 0.1) -class TestQueueLock(LimitedTestCase): - @skip_unless(zmq_supported) +class TestQueueLock(tests.LimitedTestCase): + @tests.skip_unless(zmq_supported) def test_queue_lock_order(self): q = zmq._QueueLock() s = semaphore.Semaphore(0) @@ -482,7 +496,7 @@ class TestQueueLock(LimitedTestCase): s.acquire() self.assertEqual(results, [1, 2, 3]) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_count(self): q = zmq._QueueLock() self.assertFalse(q) @@ -495,7 +509,7 @@ class TestQueueLock(LimitedTestCase): self.assertTrue(q) self.assertFalse(q) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_errors(self): q = zmq._QueueLock() @@ -506,7 +520,7 @@ class TestQueueLock(LimitedTestCase): self.assertRaises(zmq.LockReleaseError, q.release) - @skip_unless(zmq_supported) + @tests.skip_unless(zmq_supported) def test_nested_acquire(self): q = zmq._QueueLock() self.assertFalse(q) @@ -534,8 +548,8 @@ class TestQueueLock(LimitedTestCase): self.assertEqual(results, [1]) -class TestBlockedThread(LimitedTestCase): - @skip_unless(zmq_supported) +class TestBlockedThread(tests.LimitedTestCase): + @tests.skip_unless(zmq_supported) def test_block(self): e = zmq._BlockedThread() done = event.Event()