General tidy up and addition of long overdue docstrings
This commit is contained in:
@@ -12,9 +12,19 @@ globals().update(dict([(var, getattr(__zmq__, var))
|
||||
|
||||
|
||||
def get_hub_name_from_instance(hub):
|
||||
"""Get the string name the eventlet uses to refer to hub
|
||||
|
||||
:param hub: An eventlet hub
|
||||
"""
|
||||
return hub.__class__.__module__.rsplit('.',1)[-1]
|
||||
|
||||
def Context(io_threads=1):
|
||||
"""Factory function replacement for :class:`zmq.core.context.Context`
|
||||
|
||||
This factory ensures the :class:`zeromq hub <eventlet.hubs.zeromq.Hub>`
|
||||
is the active hub, and defers creation (or retreival) of the ``Context``
|
||||
to the hub's :meth:`~eventlet.hubs.zeromq.Hub.get_context` method
|
||||
"""
|
||||
hub = get_hub()
|
||||
hub_name = get_hub_name_from_instance(hub)
|
||||
if hub_name != 'zeromq':
|
||||
@@ -22,11 +32,35 @@ def Context(io_threads=1):
|
||||
return hub.get_context(io_threads)
|
||||
|
||||
class _Context(__zmq__.Context):
|
||||
"""Internal subclass of :class:`zmq.core.context.Context`
|
||||
|
||||
.. warning:: Do not grab one of these yourself, use the factory function
|
||||
:func:`eventlet.green.zmq.Context`
|
||||
"""
|
||||
|
||||
def socket(self, socket_type):
|
||||
"""Overridden method to ensure that the green version of socket is used
|
||||
|
||||
Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
|
||||
that a :class:`Socket` with all of its send and recv methods set to be
|
||||
non-blocking is returned
|
||||
"""
|
||||
return Socket(self, socket_type)
|
||||
|
||||
class Socket(__zmq__.Socket):
|
||||
"""Green version of :class:`zmq.core.socket.Socket
|
||||
|
||||
The following four methods are overridden:
|
||||
|
||||
* _send_message
|
||||
* _send_copy
|
||||
* _recv_message
|
||||
* _recv_copy
|
||||
|
||||
To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
|
||||
is deferred to the hub (using :func:`eventlet.hubs.trampoline`) if a
|
||||
``zmq.EAGAIN`` (retry) error is raised
|
||||
"""
|
||||
|
||||
|
||||
def _send_message(self, msg, flags=0):
|
||||
|
||||
@@ -58,9 +58,11 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
|
||||
sleep()
|
||||
msg = dict(res=None)
|
||||
done = event.Event()
|
||||
|
||||
def rx():
|
||||
msg['res'] = rep.recv()
|
||||
done.send('done')
|
||||
|
||||
spawn(rx)
|
||||
req.send('test')
|
||||
done.wait()
|
||||
@@ -69,6 +71,7 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
|
||||
@skip_unless_zmq
|
||||
def test_close_socket_raises_enotsup(self):
|
||||
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
|
||||
|
||||
rep.close()
|
||||
req.close()
|
||||
self.assertRaisesErrno(zmq.ENOTSUP, rep.recv)
|
||||
@@ -79,12 +82,14 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
|
||||
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
|
||||
sleep()
|
||||
done = event.Event()
|
||||
|
||||
def tx():
|
||||
tx_i = 0
|
||||
req.send(str(tx_i))
|
||||
while req.recv() != 'done':
|
||||
tx_i += 1
|
||||
req.send(str(tx_i))
|
||||
|
||||
def rx():
|
||||
while True:
|
||||
rx_i = rep.recv()
|
||||
@@ -103,12 +108,15 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
|
||||
def test_send_1k_push_pull(self):
|
||||
down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
|
||||
sleep()
|
||||
|
||||
done = event.Event()
|
||||
|
||||
def tx():
|
||||
tx_i = 0
|
||||
while tx_i <= 1000:
|
||||
tx_i += 1
|
||||
down.send(str(tx_i))
|
||||
|
||||
def rx():
|
||||
while True:
|
||||
rx_i = up.recv()
|
||||
@@ -132,10 +140,13 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
|
||||
sub_all.setsockopt(zmq.SUBSCRIBE, '')
|
||||
sub1.setsockopt(zmq.SUBSCRIBE, 'sub1')
|
||||
sub2.setsockopt(zmq.SUBSCRIBE, 'sub2')
|
||||
|
||||
sub_all_done = event.Event()
|
||||
sub1_done = event.Event()
|
||||
sub2_done = event.Event()
|
||||
|
||||
sleep(0.2)
|
||||
|
||||
def rx(sock, done_evt, msg_count=10000):
|
||||
count = 0
|
||||
while count < msg_count:
|
||||
@@ -173,6 +184,7 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
|
||||
|
||||
sub_done = event.Event()
|
||||
sleep(0.2)
|
||||
|
||||
def rx(sock, done_evt):
|
||||
count = 0
|
||||
sub = 'test'
|
||||
@@ -185,7 +197,6 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
|
||||
sock.setsockopt(zmq.UNSUBSCRIBE, 'test')
|
||||
sock.setsockopt(zmq.SUBSCRIBE, 'done')
|
||||
sub = 'done'
|
||||
#continue # We don't want to count this message
|
||||
count += 1
|
||||
done_evt.send(count)
|
||||
|
||||
@@ -231,7 +242,7 @@ class TestThreadedContextAccess(TestCase):
|
||||
All zmq sockets passed to the zmq_poll() function must share the same zmq
|
||||
context and must belong to the thread calling zmq_poll()
|
||||
|
||||
As zmq_poll is what's eventually being called then we need to insure that
|
||||
As zmq_poll is what's eventually being called then we need to ensure that
|
||||
all sockets that are going to be passed to zmq_poll (via hub.do_poll) are
|
||||
in the same context
|
||||
"""
|
||||
@@ -257,9 +268,6 @@ class TestThreadedContextAccess(TestCase):
|
||||
context = zmq.Context()
|
||||
test_result = []
|
||||
def assert_different(ctx):
|
||||
# assert not hasattr(_threadlocal, 'hub')
|
||||
# import os
|
||||
# os.environ['EVENTLET_HUB'] = 'zeromq'
|
||||
hub = get_hub()
|
||||
try:
|
||||
this_thread_context = zmq.Context()
|
||||
@@ -272,7 +280,9 @@ class TestThreadedContextAccess(TestCase):
|
||||
sleep(0.1)
|
||||
self.assertFalse(test_result[0])
|
||||
|
||||
|
||||
class TestCheckingForZMQHub(TestCase):
|
||||
|
||||
@skip_unless_zmq
|
||||
def setUp(self):
|
||||
self.orig_hub = zmq.get_hub_name_from_instance(get_hub())
|
||||
|
||||
Reference in New Issue
Block a user