tests: half-hearted fix sporadic zmq_test failures on Travis
Real fix must work without any sleep(), even 0 delay. With all these sync events the code makes perfect sense and execution is properly organized. So I am 99% sure there is a problem in green/zmq. Hint for the person willing to take on this in future: send/recv seems not to switch to zmq trampolining
This commit is contained in:
@@ -1,9 +1,3 @@
|
|||||||
from __future__ import with_statement
|
|
||||||
|
|
||||||
from eventlet import event, spawn, sleep, semaphore
|
|
||||||
from nose.tools import *
|
|
||||||
from tests import check_idle_cpu_usage, LimitedTestCase, using_pyevent, skip_unless
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from eventlet.green import zmq
|
from eventlet.green import zmq
|
||||||
except ImportError:
|
except ImportError:
|
||||||
@@ -11,23 +5,27 @@ except ImportError:
|
|||||||
else:
|
else:
|
||||||
RECV_ON_CLOSED_SOCKET_ERRNOS = (zmq.ENOTSUP, zmq.ENOTSOCK)
|
RECV_ON_CLOSED_SOCKET_ERRNOS = (zmq.ENOTSUP, zmq.ENOTSOCK)
|
||||||
|
|
||||||
|
import eventlet
|
||||||
|
from eventlet import event, spawn, sleep, semaphore
|
||||||
|
import tests
|
||||||
|
|
||||||
|
|
||||||
def zmq_supported(_):
|
def zmq_supported(_):
|
||||||
try:
|
try:
|
||||||
import zmq
|
import zmq
|
||||||
except ImportError:
|
except ImportError:
|
||||||
return False
|
return False
|
||||||
return not using_pyevent(_)
|
return not tests.using_pyevent(_)
|
||||||
|
|
||||||
|
|
||||||
class TestUpstreamDownStream(LimitedTestCase):
|
class TestUpstreamDownStream(tests.LimitedTestCase):
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestUpstreamDownStream, self).setUp()
|
super(TestUpstreamDownStream, self).setUp()
|
||||||
self.context = zmq.Context()
|
self.context = zmq.Context()
|
||||||
self.sockets = []
|
self.sockets = []
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.clear_up_sockets()
|
self.clear_up_sockets()
|
||||||
super(TestUpstreamDownStream, self).tearDown()
|
super(TestUpstreamDownStream, self).tearDown()
|
||||||
@@ -65,7 +63,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
else:
|
else:
|
||||||
self.fail("Function did not raise any error")
|
self.fail("Function did not raise any error")
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_close_linger(self):
|
def test_close_linger(self):
|
||||||
"""Socket.close() must support linger argument.
|
"""Socket.close() must support linger argument.
|
||||||
|
|
||||||
@@ -75,7 +73,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
sock1.close(1)
|
sock1.close(1)
|
||||||
sock2.close(linger=0)
|
sock2.close(linger=0)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_recv_spawned_before_send_is_non_blocking(self):
|
def test_recv_spawned_before_send_is_non_blocking(self):
|
||||||
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
|
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
|
||||||
# req.connect(ipc)
|
# req.connect(ipc)
|
||||||
@@ -93,7 +91,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
done.wait()
|
done.wait()
|
||||||
self.assertEqual(msg['res'], b'test')
|
self.assertEqual(msg['res'], b'test')
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_close_socket_raises_enotsup(self):
|
def test_close_socket_raises_enotsup(self):
|
||||||
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
|
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
|
||||||
|
|
||||||
@@ -102,7 +100,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
|
self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
|
||||||
self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
|
self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_close_xsocket_raises_enotsup(self):
|
def test_close_xsocket_raises_enotsup(self):
|
||||||
req, rep, port = self.create_bound_pair(zmq.XREQ, zmq.XREP)
|
req, rep, port = self.create_bound_pair(zmq.XREQ, zmq.XREP)
|
||||||
|
|
||||||
@@ -111,7 +109,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
|
self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
|
||||||
self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
|
self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_send_1k_req_rep(self):
|
def test_send_1k_req_rep(self):
|
||||||
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
|
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
|
||||||
sleep()
|
sleep()
|
||||||
@@ -137,7 +135,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
final_i = done.wait()
|
final_i = done.wait()
|
||||||
self.assertEqual(final_i, 0)
|
self.assertEqual(final_i, 0)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_send_1k_push_pull(self):
|
def test_send_1k_push_pull(self):
|
||||||
down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
|
down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
|
||||||
sleep()
|
sleep()
|
||||||
@@ -161,7 +159,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
final_i = done.wait()
|
final_i = done.wait()
|
||||||
self.assertEqual(final_i, 0)
|
self.assertEqual(final_i, 0)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_send_1k_pub_sub(self):
|
def test_send_1k_pub_sub(self):
|
||||||
pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
|
pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
|
||||||
sub1 = self.context.socket(zmq.SUB)
|
sub1 = self.context.socket(zmq.SUB)
|
||||||
@@ -210,46 +208,62 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
self.assertEqual(sub2_count, 500)
|
self.assertEqual(sub2_count, 500)
|
||||||
self.assertEqual(sub_all_count, 1000)
|
self.assertEqual(sub_all_count, 1000)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_change_subscription(self):
|
def test_change_subscription(self):
|
||||||
|
# FIXME: Extensive testing showed this particular test is the root cause
|
||||||
|
# of sporadic failures on Travis.
|
||||||
pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
|
pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
|
||||||
sub.setsockopt(zmq.SUBSCRIBE, b'test')
|
sub.setsockopt(zmq.SUBSCRIBE, b'test')
|
||||||
|
sleep(0)
|
||||||
sleep(0.2)
|
sub_ready = event.Event()
|
||||||
|
sub_last = event.Event()
|
||||||
sub_done = event.Event()
|
sub_done = event.Event()
|
||||||
|
|
||||||
def rx(sock, done_evt):
|
def rx():
|
||||||
|
while sub.recv() != b'test BEGIN':
|
||||||
|
sleep(0)
|
||||||
|
sub_ready.send()
|
||||||
count = 0
|
count = 0
|
||||||
sub = b'test'
|
|
||||||
while True:
|
while True:
|
||||||
msg = sock.recv()
|
msg = sub.recv()
|
||||||
sleep()
|
if msg == b'test BEGIN':
|
||||||
if b'DONE' in msg:
|
# BEGIN may come many times
|
||||||
|
continue
|
||||||
|
if msg == b'test LAST':
|
||||||
|
sub.setsockopt(zmq.SUBSCRIBE, b'done')
|
||||||
|
sub.setsockopt(zmq.UNSUBSCRIBE, b'test')
|
||||||
|
sleep(0)
|
||||||
|
# In real application you should either sync
|
||||||
|
# or tolerate loss of messages.
|
||||||
|
sub_last.send()
|
||||||
|
if msg == b'done DONE':
|
||||||
break
|
break
|
||||||
if b'LAST' in msg and sub == b'test':
|
|
||||||
sock.setsockopt(zmq.UNSUBSCRIBE, b'test')
|
|
||||||
sock.setsockopt(zmq.SUBSCRIBE, b'done')
|
|
||||||
sub = b'done'
|
|
||||||
count += 1
|
count += 1
|
||||||
done_evt.send(count)
|
sub_done.send(count)
|
||||||
|
|
||||||
def tx(sock):
|
def tx():
|
||||||
|
# Sync receiver ready to avoid loss of first packets
|
||||||
|
while not sub_ready.ready():
|
||||||
|
pub.send(b'test BEGIN')
|
||||||
|
sleep(0.005)
|
||||||
for i in range(1, 101):
|
for i in range(1, 101):
|
||||||
msg = ("test %s" % i).encode()
|
msg = 'test {0}'.format(i).encode()
|
||||||
if i != 50:
|
if i != 50:
|
||||||
sock.send(msg)
|
pub.send(msg)
|
||||||
else:
|
else:
|
||||||
sock.send(b'test LAST')
|
pub.send(b'test LAST')
|
||||||
sleep()
|
sub_last.wait()
|
||||||
sock.send(b'done DONE')
|
# XXX: putting a real delay of 1ms here fixes sporadic failures on Travis
|
||||||
|
# just yield sleep(0) doesn't cut it
|
||||||
spawn(rx, sub, sub_done)
|
sleep(0.001)
|
||||||
spawn(tx, pub)
|
pub.send(b'done DONE')
|
||||||
|
|
||||||
|
eventlet.spawn(rx)
|
||||||
|
eventlet.spawn(tx)
|
||||||
rx_count = sub_done.wait()
|
rx_count = sub_done.wait()
|
||||||
self.assertEqual(rx_count, 50)
|
self.assertEqual(rx_count, 50)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_recv_multipart_bug68(self):
|
def test_recv_multipart_bug68(self):
|
||||||
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
|
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
|
||||||
msg = [b'']
|
msg = [b'']
|
||||||
@@ -267,13 +281,13 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
# but it's private __str__ appears to be the way to go
|
# but it's private __str__ appears to be the way to go
|
||||||
self.assertEqual([m.bytes for m in recieved_msg], msg2)
|
self.assertEqual([m.bytes for m in recieved_msg], msg2)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_recv_noblock_bug76(self):
|
def test_recv_noblock_bug76(self):
|
||||||
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
|
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
|
||||||
self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK)
|
self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK)
|
||||||
self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True)
|
self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_send_during_recv(self):
|
def test_send_during_recv(self):
|
||||||
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
|
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
|
||||||
sleep()
|
sleep()
|
||||||
@@ -308,7 +322,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
for evt in done_evts:
|
for evt in done_evts:
|
||||||
self.assertEqual(evt.wait(), 0)
|
self.assertEqual(evt.wait(), 0)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_send_during_recv_multipart(self):
|
def test_send_during_recv_multipart(self):
|
||||||
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
|
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
|
||||||
sleep()
|
sleep()
|
||||||
@@ -347,7 +361,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
self.assertEqual(final_i, 0)
|
self.assertEqual(final_i, 0)
|
||||||
|
|
||||||
# Need someway to ensure a thread is blocked on send... This isn't working
|
# Need someway to ensure a thread is blocked on send... This isn't working
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_recv_during_send(self):
|
def test_recv_during_send(self):
|
||||||
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
|
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
|
||||||
sleep()
|
sleep()
|
||||||
@@ -376,7 +390,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
final_i = done.wait()
|
final_i = done.wait()
|
||||||
self.assertEqual(final_i, 0)
|
self.assertEqual(final_i, 0)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_close_during_recv(self):
|
def test_close_during_recv(self):
|
||||||
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
|
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
|
||||||
sleep()
|
sleep()
|
||||||
@@ -396,7 +410,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
done1.wait()
|
done1.wait()
|
||||||
done2.wait()
|
done2.wait()
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_getsockopt_events(self):
|
def test_getsockopt_events(self):
|
||||||
sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
|
sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
|
||||||
sleep()
|
sleep()
|
||||||
@@ -415,7 +429,7 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
events = sock2.getsockopt(zmq.EVENTS)
|
events = sock2.getsockopt(zmq.EVENTS)
|
||||||
self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
|
self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_cpu_usage_after_bind(self):
|
def test_cpu_usage_after_bind(self):
|
||||||
"""zmq eats CPU after PUB socket .bind()
|
"""zmq eats CPU after PUB socket .bind()
|
||||||
|
|
||||||
@@ -432,9 +446,9 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
self.sockets.append(sock)
|
self.sockets.append(sock)
|
||||||
sock.bind_to_random_port("tcp://127.0.0.1")
|
sock.bind_to_random_port("tcp://127.0.0.1")
|
||||||
sleep()
|
sleep()
|
||||||
check_idle_cpu_usage(0.2, 0.1)
|
tests.check_idle_cpu_usage(0.2, 0.1)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_cpu_usage_after_pub_send_or_dealer_recv(self):
|
def test_cpu_usage_after_pub_send_or_dealer_recv(self):
|
||||||
"""zmq eats CPU after PUB send or DEALER recv.
|
"""zmq eats CPU after PUB send or DEALER recv.
|
||||||
|
|
||||||
@@ -444,18 +458,18 @@ class TestUpstreamDownStream(LimitedTestCase):
|
|||||||
sub.setsockopt(zmq.SUBSCRIBE, b"")
|
sub.setsockopt(zmq.SUBSCRIBE, b"")
|
||||||
sleep()
|
sleep()
|
||||||
pub.send(b'test_send')
|
pub.send(b'test_send')
|
||||||
check_idle_cpu_usage(0.2, 0.1)
|
tests.check_idle_cpu_usage(0.2, 0.1)
|
||||||
|
|
||||||
sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
|
sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
|
||||||
sleep()
|
sleep()
|
||||||
sender.send(b'test_recv')
|
sender.send(b'test_recv')
|
||||||
msg = receiver.recv()
|
msg = receiver.recv()
|
||||||
self.assertEqual(msg, b'test_recv')
|
self.assertEqual(msg, b'test_recv')
|
||||||
check_idle_cpu_usage(0.2, 0.1)
|
tests.check_idle_cpu_usage(0.2, 0.1)
|
||||||
|
|
||||||
|
|
||||||
class TestQueueLock(LimitedTestCase):
|
class TestQueueLock(tests.LimitedTestCase):
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_queue_lock_order(self):
|
def test_queue_lock_order(self):
|
||||||
q = zmq._QueueLock()
|
q = zmq._QueueLock()
|
||||||
s = semaphore.Semaphore(0)
|
s = semaphore.Semaphore(0)
|
||||||
@@ -482,7 +496,7 @@ class TestQueueLock(LimitedTestCase):
|
|||||||
s.acquire()
|
s.acquire()
|
||||||
self.assertEqual(results, [1, 2, 3])
|
self.assertEqual(results, [1, 2, 3])
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_count(self):
|
def test_count(self):
|
||||||
q = zmq._QueueLock()
|
q = zmq._QueueLock()
|
||||||
self.assertFalse(q)
|
self.assertFalse(q)
|
||||||
@@ -495,7 +509,7 @@ class TestQueueLock(LimitedTestCase):
|
|||||||
self.assertTrue(q)
|
self.assertTrue(q)
|
||||||
self.assertFalse(q)
|
self.assertFalse(q)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_errors(self):
|
def test_errors(self):
|
||||||
q = zmq._QueueLock()
|
q = zmq._QueueLock()
|
||||||
|
|
||||||
@@ -506,7 +520,7 @@ class TestQueueLock(LimitedTestCase):
|
|||||||
|
|
||||||
self.assertRaises(zmq.LockReleaseError, q.release)
|
self.assertRaises(zmq.LockReleaseError, q.release)
|
||||||
|
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_nested_acquire(self):
|
def test_nested_acquire(self):
|
||||||
q = zmq._QueueLock()
|
q = zmq._QueueLock()
|
||||||
self.assertFalse(q)
|
self.assertFalse(q)
|
||||||
@@ -534,8 +548,8 @@ class TestQueueLock(LimitedTestCase):
|
|||||||
self.assertEqual(results, [1])
|
self.assertEqual(results, [1])
|
||||||
|
|
||||||
|
|
||||||
class TestBlockedThread(LimitedTestCase):
|
class TestBlockedThread(tests.LimitedTestCase):
|
||||||
@skip_unless(zmq_supported)
|
@tests.skip_unless(zmq_supported)
|
||||||
def test_block(self):
|
def test_block(self):
|
||||||
e = zmq._BlockedThread()
|
e = zmq._BlockedThread()
|
||||||
done = event.Event()
|
done = event.Event()
|
||||||
|
Reference in New Issue
Block a user