This commit is contained in:
Ryan Williams
2010-10-18 00:16:14 -07:00
10 changed files with 684 additions and 11 deletions

View File

@@ -7,6 +7,7 @@ dist
build build
*.esproj *.esproj
.DS_Store .DS_Store
.idea
doc/_build doc/_build
annotated annotated
cover cover
@@ -17,6 +18,7 @@ lib*
bin bin
include include
.noseids .noseids
pip-log.txt
syntax: re syntax: re
^.ropeproject/.*$ ^.ropeproject/.*$

81
eventlet/green/zmq.py Normal file
View File

@@ -0,0 +1,81 @@
__zmq__ = __import__('zmq')
from eventlet import sleep
from eventlet.hubs import trampoline, get_hub
__patched__ = ['Context', 'Socket']
globals().update(dict([(var, getattr(__zmq__, var))
for var in __zmq__.__all__
if not (var.startswith('__')
or
var in __patched__)
]))
def get_hub_name_from_instance(hub):
return hub.__class__.__module__.rsplit('.',1)[-1]
def Context(io_threads=1):
hub = get_hub()
hub_name = get_hub_name_from_instance(hub)
if hub_name != 'zeromq':
raise RuntimeError("Hub must be 'zeromq', got '%s'" % hub_name)
return hub.get_context(io_threads)
class _Context(__zmq__.Context):
def socket(self, socket_type):
return Socket(self, socket_type)
class Socket(__zmq__.Socket):
def _send_message(self, data, flags=0, copy=True):
flags |= __zmq__.NOBLOCK
while True:
try:
super(Socket, self)._send_message(data, flags)
return
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
raise
trampoline(self, write=True)
def _send_copy(self, data, flags=0, copy=True):
flags |= __zmq__.NOBLOCK
while True:
try:
super(Socket, self)._send_copy(data, flags)
return
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
raise
trampoline(self, write=True)
def _recv_message(self, flags=0):
flags |= __zmq__.NOBLOCK
while True:
try:
m = super(Socket, self)._recv_message(flags)
if m:
return m
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
raise
trampoline(self, read=True)
def _recv_copy(self, flags=0):
flags |= __zmq__.NOBLOCK
while True:
try:
m = super(Socket, self)._recv_copy(flags)
if m:
return m
except __zmq__.ZMQError, e:
if e.errno != EAGAIN:
raise
trampoline(self, read=True)

99
eventlet/hubs/zeromq.py Normal file
View File

@@ -0,0 +1,99 @@
from eventlet import patcher
from eventlet.green import zmq
from eventlet.hubs import poll, _threadlocal
from eventlet.hubs.hub import BaseHub, noop
from eventlet.hubs.poll import READ, WRITE
from eventlet.support import clear_sys_exc_info
import sys
time = patcher.original('time')
select = patcher.original('select')
sleep = time.sleep
EXC_MASK = zmq.POLLERR
READ_MASK = zmq.POLLIN
WRITE_MASK = zmq.POLLOUT
class Hub(poll.Hub):
def __init__(self, clock=time.time):
BaseHub.__init__(self, clock)
self.poll = zmq.Poller()
def get_context(self, io_threads=1):
"""zmq's Context must be unique within a hub
The zeromq API documentation states:
All zmq sockets passed to the zmq_poll() function must share the same
zmq context and must belong to the thread calling zmq_poll()
As zmq_poll is what's eventually being called then we need to insure
that all sockets that are going to be passed to zmq_poll (via
hub.do_poll) are in the same context
"""
try:
return _threadlocal.context
except AttributeError:
_threadlocal.context = zmq._Context(io_threads)
return _threadlocal.context
def register(self, fileno, new=False):
mask = 0
if self.listeners[READ].get(fileno):
mask |= READ_MASK
if self.listeners[WRITE].get(fileno):
mask |= WRITE_MASK
if mask:
self.poll.register(fileno, mask)
else:
self.poll.unregister(fileno)
def wait(self, seconds=None):
readers = self.listeners[READ]
writers = self.listeners[WRITE]
if not readers and not writers:
if seconds:
sleep(seconds)
return
try:
presult = self.do_poll(seconds)
except zmq.ZMQError, e:
# In the poll hub this part exists to special case some exceptions
# from socket. There may be some error numbers that wider use of
# this hub will throw up as needing special treatment so leaving
# this block and this comment as a remineder
raise
SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
if self.debug_blocking:
self.block_detect_pre()
for fileno, event in presult:
try:
if event & READ_MASK:
readers.get(fileno, noop).cb(fileno)
if event & WRITE_MASK:
writers.get(fileno, noop).cb(fileno)
if event & EXC_MASK:
# zmq.POLLERR is returned for any error condition in the
# underlying fd (as passed through to poll/epoll)
readers.get(fileno, noop).cb(fileno)
writers.get(fileno, noop).cb(fileno)
except SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())
clear_sys_exc_info()
if self.debug_blocking:
self.block_detect_post()
# def do_poll(self, seconds):
# print 'poll: ', seconds
# if seconds < 0:
# seconds = 500
# return self.poll.poll(seconds)

20
examples/chat_bridge.py Normal file
View File

@@ -0,0 +1,20 @@
import sys
from zmq import FORWARDER, PUB, SUB, SUBSCRIBE
from zmq.devices import Device
if __name__ == "__main__":
usage = 'usage: chat_bridge sub_address pub_address'
if len (sys.argv) != 3:
print usage
sys.exit(1)
sub_addr = sys.argv[1]
pub_addr = sys.argv[2]
print "Recieving on %s" % sub_addr
print "Sending on %s" % pub_addr
device = Device(FORWARDER, SUB, PUB)
device.bind_in(sub_addr)
device.setsockopt_in(SUBSCRIBE, "")
device.bind_out(pub_addr)
device.start()

View File

@@ -0,0 +1,127 @@
"""This is a websocket chat example with many servers. A client can connect to
any of the servers and their messages will be received by all clients connected
to any of the servers.
Run the examples like this:
$ python examples/chat_bridge.py tcp://127.0.0.1:12345 tcp://127.0.0.1:12346
and the servers like this (changing the port for each one obviously):
$ python examples/distributed_websocket_chat.py -p tcp://127.0.0.1:12345 -s tcp://127.0.0.1:12346 7000
So all messages are published to port 12345 and the device forwards all the
messages to 12346 where they are subscribed to
"""
import os, sys
import eventlet
from collections import defaultdict
from eventlet import spawn_n, sleep
from eventlet import wsgi
from eventlet import websocket
from eventlet.green import zmq
from eventlet.hubs import get_hub, use_hub
from uuid import uuid1
use_hub('zeromq')
ctx = zmq.Context()
class IDName(object):
def __init__(self):
self.id = uuid1()
self.name = None
def __str__(self):
if self.name:
return self.name
else:
return str(self.id)
def pack_message(self, msg):
return self, msg
def unpack_message(self, msg):
sender, message = msg
sender_name = 'you said' if sender.id == self.id \
else '%s says' % sender
return "%s: %s" % (sender_name, message)
participants = defaultdict(IDName)
def subscribe_and_distribute(sub_socket):
global participants
while True:
msg = sub_socket.recv_pyobj()
for ws, name_id in participants.items():
to_send = name_id.unpack_message(msg)
if to_send:
try:
ws.send(to_send)
except:
del participants[ws]
@websocket.WebSocketWSGI
def handle(ws):
global pub_socket
name_id = participants[ws]
ws.send("Connected as %s, change name with 'name: new_name'" % name_id)
try:
while True:
m = ws.wait()
if m is None:
break
if m.startswith('name:'):
old_name = str(name_id)
new_name = m.split(':', 1)[1].strip()
name_id.name = new_name
m = 'Changed name from %s' % old_name
pub_socket.send_pyobj(name_id.pack_message(m))
sleep()
finally:
del participants[ws]
def dispatch(environ, start_response):
"""Resolves to the web page or the websocket depending on the path."""
global port
if environ['PATH_INFO'] == '/chat':
return handle(environ, start_response)
else:
start_response('200 OK', [('content-type', 'text/html')])
return [open(os.path.join(
os.path.dirname(__file__),
'websocket_chat.html')).read() % dict(port=port)]
port = None
if __name__ == "__main__":
usage = 'usage: websocket_chat -p pub address -s sub address port number'
if len (sys.argv) != 6:
print usage
sys.exit(1)
pub_addr = sys.argv[2]
sub_addr = sys.argv[4]
try:
port = int(sys.argv[5])
except ValueError:
print "Error port supplied couldn't be converted to int\n", usage
sys.exit(1)
try:
pub_socket = ctx.socket(zmq.PUB)
pub_socket.connect(pub_addr)
print "Publishing to %s" % pub_addr
sub_socket = ctx.socket(zmq.SUB)
sub_socket.connect(sub_addr)
sub_socket.setsockopt(zmq.SUBSCRIBE, "")
print "Subscribing to %s" % sub_addr
except:
print "Couldn't create sockets\n", usage
sys.exit(1)
spawn_n(subscribe_and_distribute, sub_socket)
listener = eventlet.listen(('127.0.0.1', port))
print "\nVisit http://localhost:%s/ in your websocket-capable browser.\n" % port
wsgi.server(listener, dispatch)

View File

@@ -5,7 +5,7 @@
<script> <script>
window.onload = function() { window.onload = function() {
var data = {}; var data = {};
var s = new WebSocket("ws://127.0.0.1:7000/chat"); var s = new WebSocket("ws://127.0.0.1:%(port)s/chat");
s.onopen = function() { s.onopen = function() {
s.send('New participant joined'); s.send('New participant joined');
}; };

View File

@@ -4,6 +4,8 @@ import eventlet
from eventlet import wsgi from eventlet import wsgi
from eventlet import websocket from eventlet import websocket
PORT = 7000
participants = set() participants = set()
@websocket.WebSocketWSGI @websocket.WebSocketWSGI
@@ -27,10 +29,10 @@ def dispatch(environ, start_response):
start_response('200 OK', [('content-type', 'text/html')]) start_response('200 OK', [('content-type', 'text/html')])
return [open(os.path.join( return [open(os.path.join(
os.path.dirname(__file__), os.path.dirname(__file__),
'websocket_chat.html')).read()] 'websocket_chat.html')).read() % PORT]
if __name__ == "__main__": if __name__ == "__main__":
# run an example app from the command line # run an example app from the command line
listener = eventlet.listen(('127.0.0.1', 7000)) listener = eventlet.listen(('127.0.0.1', PORT))
print "\nVisit http://localhost:7000/ in your websocket-capable browser.\n" print "\nVisit http://localhost:7000/ in your websocket-capable browser.\n"
wsgi.server(listener, dispatch) wsgi.server(listener, dispatch)

64
examples/zmq_chat.py Normal file
View File

@@ -0,0 +1,64 @@
import eventlet, sys
from eventlet.green import socket, zmq
from eventlet.hubs import use_hub
use_hub('zeromq')
ADDR = 'ipc:///tmp/chat'
ctx = zmq.Context()
def publish(writer):
print "connected"
socket = ctx.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, "")
socket.connect(ADDR)
eventlet.sleep(0.1)
while True:
msg = socket.recv_pyobj()
str_msg = "%s: %s" % msg
writer.write(str_msg)
writer.flush()
PORT=3001
def read_chat_forever(reader, pub_socket):
line = reader.readline()
who = 'someone'
while line:
print "Chat:", line.strip()
if line.startswith('name:'):
who = line.split(':')[-1].strip()
try:
pub_socket.send_pyobj((who, line))
except socket.error, e:
# ignore broken pipes, they just mean the participant
# closed its connection already
if e[0] != 32:
raise
line = reader.readline()
print "Participant left chat."
try:
print "ChatServer starting up on port %s" % PORT
server = eventlet.listen(('0.0.0.0', PORT))
pub_socket = ctx.socket(zmq.PUB)
pub_socket.bind(ADDR)
eventlet.spawn_n(publish,
sys.stdout)
while True:
new_connection, address = server.accept()
print "Participant joined chat."
eventlet.spawn_n(publish,
new_connection.makefile('w'))
eventlet.spawn_n(read_chat_forever,
new_connection.makefile('r'),
pub_socket)
except (KeyboardInterrupt, SystemExit):
print "ChatServer exiting."

View File

@@ -70,15 +70,25 @@ except AssertionError:
class Hub(ProcessBase): class Hub(ProcessBase):
def setUp(self):
super(Hub, self).setUp()
self.old_environ = os.environ.get('EVENTLET_HUB')
os.environ['EVENTLET_HUB'] = 'selects'
def tearDown(self):
if self.old_environ:
os.environ['EVENTLET_HUB'] = self.old_environ
else:
del os.environ['EVENTLET_HUB']
super(Hub, self).tearDown()
def test_eventlet_hub(self): def test_eventlet_hub(self):
new_mod = """from eventlet import hubs new_mod = """from eventlet import hubs
print hubs.get_hub() print hubs.get_hub()
""" """
os.environ['EVENTLET_HUB'] = 'selects' self.write_to_tempfile("newmod", new_mod)
try: output, lines = self.launch_subprocess('newmod.py')
self.write_to_tempfile("newmod", new_mod) self.assertEqual(len(lines), 2, "\n".join(lines))
output, lines = self.launch_subprocess('newmod.py') self.assert_("selects" in lines[0])
self.assertEqual(len(lines), 2, "\n".join(lines))
self.assert_("selects" in lines[0])
finally:
del os.environ['EVENTLET_HUB']

268
tests/zmq_test.py Normal file
View File

@@ -0,0 +1,268 @@
from eventlet import event, spawn, sleep, patcher
from eventlet.hubs import get_hub, _threadlocal, use_hub
from eventlet.green import zmq
from nose.tools import *
from tests import mock, LimitedTestCase, skip_unless
from unittest import TestCase
from threading import Thread
from eventlet.hubs.zeromq import Hub
def using_zmq(_f):
return 'zeromq' in type(get_hub()).__module__
def skip_unless_zmq(func):
""" Decorator that skips a test if we're using the pyevent hub."""
return skip_unless(using_zmq)(func)
class TestUpstreamDownStream(LimitedTestCase):
sockets = []
def tearDown(self):
self.clear_up_sockets()
super(TestUpstreamDownStream, self).tearDown()
def create_bound_pair(self, type1, type2, interface='tcp://127.0.0.1'):
"""Create a bound socket pair using a random port."""
self.context = context = zmq.Context()
s1 = context.socket(type1)
port = s1.bind_to_random_port(interface)
s2 = context.socket(type2)
s2.connect('%s:%s' % (interface, port))
self.sockets = [s1, s2]
return s1, s2, port
def clear_up_sockets(self):
for sock in self.sockets:
sock.close()
def assertRaisesErrno(self, errno, func, *args):
try:
func(*args)
except zmq.ZMQError, e:
self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
else:
self.fail("Function did not raise any error")
@skip_unless_zmq
def test_recv_spawned_before_send_is_non_blocking(self):
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# req.connect(ipc)
# rep.bind(ipc)
sleep()
msg = dict(res=None)
done = event.Event()
def rx():
msg['res'] = rep.recv()
done.send('done')
spawn(rx)
req.send('test')
done.wait()
self.assertEqual(msg['res'], 'test')
@skip_unless_zmq
def test_close_socket_raises_enotsup(self):
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
rep.close()
req.close()
self.assertRaisesErrno(zmq.ENOTSUP, rep.recv)
self.assertRaisesErrno(zmq.ENOTSUP, req.send, 'test')
@skip_unless_zmq
def test_send_1k_req_rep(self):
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
sleep()
done = event.Event()
def tx():
tx_i = 0
req.send(str(tx_i))
while req.recv() != 'done':
tx_i += 1
req.send(str(tx_i))
def rx():
while True:
rx_i = rep.recv()
if rx_i == "1000":
rep.send('done')
sleep()
done.send(0)
break
rep.send('i')
spawn(tx)
spawn(rx)
final_i = done.wait()
self.assertEqual(final_i, 0)
@skip_unless_zmq
def test_send_1k_push_pull(self):
down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
sleep()
done = event.Event()
def tx():
tx_i = 0
while tx_i <= 1000:
tx_i += 1
down.send(str(tx_i))
def rx():
while True:
rx_i = up.recv()
if rx_i == "1000":
done.send(0)
break
spawn(tx)
spawn(rx)
final_i = done.wait()
self.assertEqual(final_i, 0)
@skip_unless_zmq
def test_send_1k_pub_sub(self):
pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
sub1 = self.context.socket(zmq.SUB)
sub2 = self.context.socket(zmq.SUB)
self.sockets.extend([sub1, sub2])
addr = 'tcp://127.0.0.1:%s' % port
sub1.connect(addr)
sub2.connect(addr)
sub_all.setsockopt(zmq.SUBSCRIBE, '')
sub1.setsockopt(zmq.SUBSCRIBE, 'sub1')
sub2.setsockopt(zmq.SUBSCRIBE, 'sub2')
sub_all_done = event.Event()
sub1_done = event.Event()
sub2_done = event.Event()
sleep(0.2)
def rx(sock, done_evt, msg_count=10000):
count = 0
while count < msg_count:
msg = sock.recv()
sleep()
if 'LAST' in msg:
break
count += 1
done_evt.send(count)
def tx(sock):
for i in range(1, 1001):
msg = "sub%s %s" % (1 if i % 2 else 2, i)
sock.send(msg)
sleep()
sock.send('sub1 LAST')
sock.send('sub2 LAST')
spawn(rx, sub_all, sub_all_done)
spawn(rx, sub1, sub1_done)
spawn(rx, sub2, sub2_done)
spawn(tx, pub)
sub1_count = sub1_done.wait()
sub2_count = sub2_done.wait()
sub_all_count = sub_all_done.wait()
self.assertEqual(sub1_count, 500)
self.assertEqual(sub2_count, 500)
self.assertEqual(sub_all_count, 1000)
@skip_unless_zmq
def test_change_subscription(self):
pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, 'test')
sub_done = event.Event()
sleep(0.2)
def rx(sock, done_evt):
count = 0
sub = 'test'
while True:
msg = sock.recv()
sleep()
if 'DONE' in msg:
break
if 'LAST' in msg and sub == 'test':
sock.setsockopt(zmq.UNSUBSCRIBE, 'test')
sock.setsockopt(zmq.SUBSCRIBE, 'done')
sub = 'done'
#continue # We don't want to count this message
count += 1
done_evt.send(count)
def tx(sock):
for i in range(1, 101):
msg = "test %s" % i
if i != 50:
sock.send(msg)
else:
sock.send('test LAST')
sleep()
sock.send('done DONE')
spawn(rx, sub, sub_done)
spawn(tx, pub)
rx_count = sub_done.wait()
self.assertEqual(rx_count, 50)
class TestThreadedContextAccess(TestCase):
"""zmq's Context must be unique within a hub
The zeromq API documentation states:
All zmq sockets passed to the zmq_poll() function must share the same zmq
context and must belong to the thread calling zmq_poll()
As zmq_poll is what's eventually being called then we need to insure that
all sockets that are going to be passed to zmq_poll (via hub.do_poll) are
in the same context
"""
@skip_unless_zmq
@mock.patch('eventlet.green.zmq.get_hub_name_from_instance')
@mock.patch('eventlet.green.zmq.get_hub', spec=Hub)
def test_context_factory_funtion(self, get_hub_mock, hub_name_mock):
hub_name_mock.return_value = 'zeromq'
ctx = zmq.Context()
self.assertTrue(get_hub_mock().get_context.called)
@skip_unless_zmq
def test_threadlocal_context(self):
hub = get_hub()
context = zmq.Context()
self.assertEqual(context, _threadlocal.context)
next_context = hub.get_context()
self.assertTrue(context is next_context)
@skip_unless_zmq
def test_different_context_in_different_thread(self):
context = zmq.Context()
test_result = []
def assert_different(ctx):
# assert not hasattr(_threadlocal, 'hub')
# import os
# os.environ['EVENTLET_HUB'] = 'zeromq'
hub = get_hub()
try:
this_thread_context = zmq.Context()
except:
test_result.append('fail')
raise
test_result.append(ctx is this_thread_context)
Thread(target=assert_different, args=(context,)).start()
while not test_result:
sleep(0.1)
self.assertFalse(test_result[0])
class TestCheckingForZMQHub(TestCase):
def setUp(self):
self.orig_hub = zmq.get_hub_name_from_instance(get_hub())
use_hub('poll')
def tearDown(self):
use_hub(self.orig_hub)
def test_assertionerror_raise_by_context(self):
self.assertRaises(RuntimeError, zmq.Context)