First stab at zeromq support. This consists of:

A new hub: This closely mirrors the poll hub with some of the internal logic changed to reflect zmq's flags.
A green module for zmq: This subclasses Context and Socket to ensure calls are non blocking.
A (very sparse) beginings of a test module.
An example: A melding of the pyzmq chat example and the eventlet telnet chat example.


TODO

zmq_poll chokes if the sockets passed to it come from different contexts. As context is the entry point to everything else then it would make sense to include a check in here that each thread has only one context instance. By context being the entry point I mean:

ctx = zmq.Context()
socket = ctx.socket(zmq.<type-of-socket>)

This call to socket is repeated for each socket you want and ctx must be the same one for each thread.

Tests. I'd like to get to the point f having all zmq socket pairs tested - and perhaps a nice benchmark suite too.
This commit is contained in:
Ben Ford
2010-09-20 07:08:27 +01:00
parent 8972567f0a
commit b75e83a35c
4 changed files with 263 additions and 0 deletions

67
eventlet/green/zmq.py Normal file
View File

@@ -0,0 +1,67 @@
__zmq__ = __import__('zmq')
from eventlet.hubs import trampoline
__patched__ = ['Context', 'Socket']
globals().update(dict([(var, getattr(__zmq__, var))
for var in __zmq__.__all__
if not (var.startswith('__')
or
var in __patched__)
]))
class Context(__zmq__.Context):
def socket(self, socket_type):
return Socket(self, socket_type)
class Socket(__zmq__.Socket):
def _send_message(self, data, flags=0, copy=True):
# flags |= __zmq__.NOBLOCK
print 'send'
while True:
try:
return super(Socket, self)._send_message(data, flags)
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
raise
trampoline(self, read=True)
def _send_copy(self, data, flags=0, copy=True):
# flags |= __zmq__.NOBLOCK
while True:
try:
return super(Socket, self)._send_copy(data, flags)
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
raise
trampoline(self, write=True)
def _recv_message(self, flags=0):
flags |= __zmq__.NOBLOCK
while True:
try:
m = super(Socket, self)._recv_message(flags)
if m:
return m
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
raise
trampoline(self, read=True)
def _recv_copy(self, flags=0):
flags |= __zmq__.NOBLOCK
while True:
try:
m = super(Socket, self)._recv_copy(flags)
if m:
return m
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
raise
trampoline(self, read=True)

83
eventlet/hubs/zeromq.py Normal file
View File

@@ -0,0 +1,83 @@
from eventlet import patcher
from eventlet.green import zmq
from eventlet.hubs import poll
from eventlet.hubs.hub import BaseHub, noop
from eventlet.hubs.poll import READ, WRITE
from eventlet.support import clear_sys_exc_info
import sys
time = patcher.original('time')
select = patcher.original('select')
sleep = time.sleep
EXC_MASK = zmq.POLLERR
READ_MASK = zmq.POLLIN
WRITE_MASK = zmq.POLLOUT
class Hub(poll.Hub):
def __init__(self, clock=time.time):
BaseHub.__init__(self, clock)
self.poll = zmq.Poller()
def register(self, fileno, new=False):
mask = 0
if self.listeners[READ].get(fileno):
mask |= READ_MASK
if self.listeners[WRITE].get(fileno):
mask |= WRITE_MASK
if mask:
self.poll.register(fileno, mask)
else:
self.poll.unregister(fileno)
def wait(self, seconds=None):
readers = self.listeners[READ]
writers = self.listeners[WRITE]
if not readers and not writers:
if seconds:
sleep(seconds)
return
try:
presult = self.do_poll(seconds)
except zmq.ZMQError, e:
# In the poll hub this part exists to special case some exceptions
# from socket. There may be some error numbers that wider use of
# this hub will throw up as needing special treatment so leaving
# this block and this comment as a remineder
raise
SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
if self.debug_blocking:
self.block_detect_pre()
for fileno, event in presult:
try:
if event & READ_MASK:
readers.get(fileno, noop).cb(fileno)
if event & WRITE_MASK:
writers.get(fileno, noop).cb(fileno)
if event & EXC_MASK:
# zmq.POLLERR is returned for any error condition in the
# underlying fd (as passed through to poll/epoll)
readers.get(fileno, noop).cb(fileno)
writers.get(fileno, noop).cb(fileno)
except SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())
clear_sys_exc_info()
if self.debug_blocking:
self.block_detect_post()
# def do_poll(self, seconds):
# print 'poll: ', seconds
# if seconds < 0:
# seconds = 500
# return self.poll.poll(seconds)

64
examples/zmq_chat.py Normal file
View File

@@ -0,0 +1,64 @@
import eventlet, sys
from eventlet.green import socket, zmq
from eventlet.hubs import use_hub
use_hub('zeromq')
ADDR = 'ipc:///tmp/chat'
ctx = zmq.Context()
def publish(writer):
print "connected"
socket = ctx.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, "")
socket.connect(ADDR)
eventlet.sleep(0.1)
while True:
msg = socket.recv_pyobj()
str_msg = "%s: %s" % msg
writer.write(str_msg)
writer.flush()
PORT=3001
def read_chat_forever(reader, pub_socket):
line = reader.readline()
who = 'someone'
while line:
print "Chat:", line.strip()
if line.startswith('name:'):
who = line.split(':')[-1].strip()
try:
pub_socket.send_pyobj((who, line))
except socket.error, e:
# ignore broken pipes, they just mean the participant
# closed its connection already
if e[0] != 32:
raise
line = reader.readline()
print "Participant left chat."
try:
print "ChatServer starting up on port %s" % PORT
server = eventlet.listen(('0.0.0.0', PORT))
pub_socket = ctx.socket(zmq.PUB)
pub_socket.bind(ADDR)
eventlet.spawn_n(publish,
sys.stdout)
while True:
new_connection, address = server.accept()
print "Participant joined chat."
eventlet.spawn_n(publish,
new_connection.makefile('w'))
eventlet.spawn_n(read_chat_forever,
new_connection.makefile('r'),
pub_socket)
except (KeyboardInterrupt, SystemExit):
print "ChatServer exiting."

49
tests/zeromq_test.py Normal file
View File

@@ -0,0 +1,49 @@
from eventlet import spawn, sleep, getcurrent
from eventlet.hubs import use_hub, get_hub
from eventlet.green import zmq
from nose.tools import *
from tests import mock, LimitedTestCase
from eventlet.hubs.hub import READ, WRITE
class _TestZMQ(LimitedTestCase):
def setUp(self):
use_hub('zeromq')
super(_TestZMQ, self).setUp()
# self.timer.cancel()
def tearDown(self):
super(_TestZMQ, self).tearDown()
use_hub()
class TestUpstreamDownStream(_TestZMQ):
def _get_socket_pair(self):
return (zmq.Context().socket(zmq.PAIR),
zmq.Context().socket(zmq.PAIR))
def test_recv_non_blocking(self):
ipc = 'ipc:///tmp/tests'
req, rep = self._get_socket_pair()
req.connect(ipc)
rep.bind(ipc)
sleep(0.2)
# req.send('test')
# set_trace()
hub = get_hub()
# hub.add(READ, rep, getcurrent().switch)
msg = {}
def rx():
msg['res'] = rep.recv()
spawn(rx)
req.send('test')
sleep(0.2)
self.assertEqual(msg['res'], 'test')