first stab at supporting multiple senders and receivers. incomplete.

This commit is contained in:
Geoff Salmon
2011-09-02 15:51:04 -04:00
parent f5e5b2bda7
commit 30f3265012
2 changed files with 190 additions and 8 deletions

View File

@@ -1,13 +1,15 @@
"""The :mod:`zmq` module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
"""
__zmq__ = __import__('zmq')
from eventlet import sleep
from eventlet import sleep, hubs
from eventlet.hubs import trampoline, _threadlocal
from eventlet.patcher import slurp_properties
from eventlet.support import greenlets as greenlet
__patched__ = ['Context', 'Socket']
slurp_properties(__zmq__, globals(), ignore=__patched__)
from collections import deque
def Context(io_threads=1):
"""Factory function replacement for :class:`zmq.core.context.Context`
@@ -42,21 +44,44 @@ class _Context(__zmq__.Context):
"""
return Socket(self, socket_type)
# see http://api.zeromq.org/2-1:zmq-socket for explanation of socket types
_multi_reader_types = set([__zmq__.XREP, __zmq__.XREQ, __zmq__.SUB, __zmq__.PULL, __zmq__.PAIR])
_multi_writer_types = set([__zmq__.XREP, __zmq__.XREQ, __zmq__.PUB, __zmq__.PUSH, __zmq__.PAIR])
class Socket(__zmq__.Socket):
"""Green version of :class:`zmq.core.socket.Socket
The following four methods are overridden:
* _send_message
* _send_copy
* _recv_message
* _recv_copy
The following two methods are always overridden:
* send
* recv
To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
is deferred to the hub (using :func:`eventlet.hubs.trampoline`) if a
``zmq.EAGAIN`` (retry) error is raised
For some socket types, where multiple greenthreads could be
calling send or recv at the same time, these methods are also
overridden:
* send_multipart
* recv_multipart
"""
def __init__(self, *args, **kwargs):
super(Socket, self).__init__(*args, **kwargs)
if False and self.socket_type in _multi_writer_types:
# support multiple greenthreads writing at the same time
self._writers = deque()
self.send = self._xsafe_send
self.send_multipart = self._xsafe_send_multipart
if False and self.socket_type in _multi_reader_types:
# support multiple greenthreads reading at the same time
self._readers = deque()
self.recv = self._xsafe_recv
self.recv_multipart = self._xsafe_recv_multipart
def _sock_wait(self, read=False, write=False):
"""
First checks if there are events in the socket, to avoid
@@ -118,4 +143,104 @@ class Socket(__zmq__.Socket):
if e.errno != EAGAIN:
raise
def _xsafe_send(self, msg, flags=0, copy=True, track=False):
"""
A send method that's safe to use when multiple greenthreads
are calling send, send_multipart, recv and recv_multipart on
the same socket.
"""
if flags & __zmq__.NOBLOCK:
super(Socket, self).send(msg, flags=flags, track=track, copy=copy)
return
flags |= __zmq__.NOBLOCK
if self._writers:
self._writers.append((msg, flags, copy, track, greenlet.getcurrent()))
if hubs.get_hub().switch():
# msg was sent by another greenthread
return
else:
pass
else:
self._writers.append((msg, flags, copy, track, greenlet.getcurrent()))
while True:
try:
if (self.getsockopt(__zmq__.EVENTS) & __zmq__.POLLOUT):
super(Socket, self).send(msg, flags=flags, track=track,
copy=copy)
self._sock_wait(write=True)
super(Socket, self).send(msg, flags=flags, track=track,
copy=copy)
return
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
raise
def _xsafe_send_multipart(self, msg_parts, flags=0, copy=True, track=False):
"""
A send_multipart method that's safe to use when multiple
greenthreads are calling send, send_multipart, recv and
recv_multipart on the same socket.
Ensure multipart messages are not interleaved.
"""
self._writers.append((list(reversed(msg)), flags, copy, track, greenlet.getcurrent()))
if len(self._writers) == 1:
# no blocked writers
pass
def _send_queued(self, ):
"""
Send as many msgs from the writers deque as possible. Wake up
the greenthreads for messages that are sent.
"""
writers = self.writers
hub = hubs.get_hub()
while writers:
msg, flags, copy, track, writer = writers[0]
if isinstance(msg, list):
is_list = True
m = msg[-1]
else:
is_list = False
m = msg
try:
super(Socket, self).send(m, flags=flags, track=track,
copy=copy)
hub.schedule_call_global(0, writer.switch, True)
except (SystemExit, KeyboardInterrupt):
raise
except __zmq__.ZMQError, e:
if e.errno == EAGAIN:
pass
else:
hub.schedule_call_global(0, writer.throw, e)
def _xsafe_recv(self, flags=0, copy=True, track=False):
"""
A recv method that's safe to use when multiple greenthreads
are calling send, send_multipart, recv and recv_multipart on
the same socket.
"""
pass
def _xsafe_recv_multipart(self, flags=0, copy=True, track=False):
"""
A recv method that's safe to use when multiple greenthreads
are calling send, send_multipart, recv and recv_multipart on
the same socket.
"""
pass

View File

@@ -238,6 +238,63 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True)
@skip_unless(zmq_supported)
def test_send_during_recv(self):
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
sleep()
done = event.Event()
def slow_rx():
self.assertEqual(sender.recv(), "done")
done.send(0)
def tx():
tx_i = 0
while tx_i <= 1000:
sender.send(str(tx_i))
tx_i += 1
def rx():
while True:
rx_i = receiver.recv()
if rx_i == "1000":
receiver.send('done')
return
spawn(slow_rx)
spawn(tx)
spawn(rx)
final_i = done.wait()
self.assertEqual(final_i, 0)
# Need someway to ensure a thread is blocked on send. This method
# below uses too much memory. Try adjust watermarks or other
# socket opts?
# @skip_unless(zmq_supported)
# def test_recv_during_send(self):
# sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
# sleep()
# done = event.Event()
# def tx():
# msg = "0" * 1024
# while True:
# sender.send(msg)
# def rx():
# self.assertEqual(sender.recv(), "done")
# sender_thread.kill()
# done.send(0)
# def single_tx():
# receiver.send("done")
# sender_thread = spawn(tx)
# sleep()
# spawn(rx)
# spawn(single_tx)
# final_i = done.wait()
# self.assertEqual(final_i, 0)
class TestThreadedContextAccess(TestCase):