Automated merge with http://bitbucket.org/which_linden/eventlet
This commit is contained in:
@@ -7,6 +7,7 @@ dist
|
||||
build
|
||||
*.esproj
|
||||
.DS_Store
|
||||
.idea
|
||||
doc/_build
|
||||
annotated
|
||||
cover
|
||||
@@ -17,6 +18,7 @@ lib*
|
||||
bin
|
||||
include
|
||||
.noseids
|
||||
pip-log.txt
|
||||
|
||||
syntax: re
|
||||
^.ropeproject/.*$
|
||||
|
81
eventlet/green/zmq.py
Normal file
81
eventlet/green/zmq.py
Normal file
@@ -0,0 +1,81 @@
|
||||
__zmq__ = __import__('zmq')
|
||||
from eventlet import sleep
|
||||
from eventlet.hubs import trampoline, get_hub
|
||||
|
||||
__patched__ = ['Context', 'Socket']
|
||||
globals().update(dict([(var, getattr(__zmq__, var))
|
||||
for var in __zmq__.__all__
|
||||
if not (var.startswith('__')
|
||||
or
|
||||
var in __patched__)
|
||||
]))
|
||||
|
||||
|
||||
def get_hub_name_from_instance(hub):
|
||||
return hub.__class__.__module__.rsplit('.',1)[-1]
|
||||
|
||||
def Context(io_threads=1):
|
||||
hub = get_hub()
|
||||
hub_name = get_hub_name_from_instance(hub)
|
||||
if hub_name != 'zeromq':
|
||||
raise RuntimeError("Hub must be 'zeromq', got '%s'" % hub_name)
|
||||
return hub.get_context(io_threads)
|
||||
|
||||
class _Context(__zmq__.Context):
|
||||
|
||||
def socket(self, socket_type):
|
||||
return Socket(self, socket_type)
|
||||
|
||||
class Socket(__zmq__.Socket):
|
||||
|
||||
|
||||
def _send_message(self, data, flags=0, copy=True):
|
||||
flags |= __zmq__.NOBLOCK
|
||||
while True:
|
||||
try:
|
||||
super(Socket, self)._send_message(data, flags)
|
||||
return
|
||||
except __zmq__.ZMQError, e:
|
||||
if e.errno != EAGAIN:
|
||||
raise
|
||||
trampoline(self, write=True)
|
||||
|
||||
def _send_copy(self, data, flags=0, copy=True):
|
||||
flags |= __zmq__.NOBLOCK
|
||||
while True:
|
||||
try:
|
||||
super(Socket, self)._send_copy(data, flags)
|
||||
return
|
||||
except __zmq__.ZMQError, e:
|
||||
if e.errno != EAGAIN:
|
||||
raise
|
||||
trampoline(self, write=True)
|
||||
|
||||
def _recv_message(self, flags=0):
|
||||
|
||||
flags |= __zmq__.NOBLOCK
|
||||
while True:
|
||||
try:
|
||||
m = super(Socket, self)._recv_message(flags)
|
||||
if m:
|
||||
return m
|
||||
except __zmq__.ZMQError, e:
|
||||
if e.errno != EAGAIN:
|
||||
raise
|
||||
trampoline(self, read=True)
|
||||
|
||||
def _recv_copy(self, flags=0):
|
||||
flags |= __zmq__.NOBLOCK
|
||||
while True:
|
||||
try:
|
||||
m = super(Socket, self)._recv_copy(flags)
|
||||
if m:
|
||||
return m
|
||||
except __zmq__.ZMQError, e:
|
||||
if e.errno != EAGAIN:
|
||||
raise
|
||||
trampoline(self, read=True)
|
||||
|
||||
|
||||
|
||||
|
99
eventlet/hubs/zeromq.py
Normal file
99
eventlet/hubs/zeromq.py
Normal file
@@ -0,0 +1,99 @@
|
||||
from eventlet import patcher
|
||||
from eventlet.green import zmq
|
||||
from eventlet.hubs import poll, _threadlocal
|
||||
from eventlet.hubs.hub import BaseHub, noop
|
||||
from eventlet.hubs.poll import READ, WRITE
|
||||
from eventlet.support import clear_sys_exc_info
|
||||
import sys
|
||||
|
||||
time = patcher.original('time')
|
||||
select = patcher.original('select')
|
||||
sleep = time.sleep
|
||||
|
||||
EXC_MASK = zmq.POLLERR
|
||||
READ_MASK = zmq.POLLIN
|
||||
WRITE_MASK = zmq.POLLOUT
|
||||
|
||||
class Hub(poll.Hub):
|
||||
|
||||
|
||||
def __init__(self, clock=time.time):
|
||||
BaseHub.__init__(self, clock)
|
||||
self.poll = zmq.Poller()
|
||||
|
||||
def get_context(self, io_threads=1):
|
||||
"""zmq's Context must be unique within a hub
|
||||
|
||||
The zeromq API documentation states:
|
||||
All zmq sockets passed to the zmq_poll() function must share the same
|
||||
zmq context and must belong to the thread calling zmq_poll()
|
||||
|
||||
As zmq_poll is what's eventually being called then we need to insure
|
||||
that all sockets that are going to be passed to zmq_poll (via
|
||||
hub.do_poll) are in the same context
|
||||
"""
|
||||
try:
|
||||
return _threadlocal.context
|
||||
except AttributeError:
|
||||
_threadlocal.context = zmq._Context(io_threads)
|
||||
return _threadlocal.context
|
||||
|
||||
def register(self, fileno, new=False):
|
||||
mask = 0
|
||||
if self.listeners[READ].get(fileno):
|
||||
mask |= READ_MASK
|
||||
if self.listeners[WRITE].get(fileno):
|
||||
mask |= WRITE_MASK
|
||||
if mask:
|
||||
self.poll.register(fileno, mask)
|
||||
else:
|
||||
self.poll.unregister(fileno)
|
||||
|
||||
|
||||
def wait(self, seconds=None):
|
||||
readers = self.listeners[READ]
|
||||
writers = self.listeners[WRITE]
|
||||
|
||||
if not readers and not writers:
|
||||
if seconds:
|
||||
sleep(seconds)
|
||||
return
|
||||
try:
|
||||
presult = self.do_poll(seconds)
|
||||
except zmq.ZMQError, e:
|
||||
# In the poll hub this part exists to special case some exceptions
|
||||
# from socket. There may be some error numbers that wider use of
|
||||
# this hub will throw up as needing special treatment so leaving
|
||||
# this block and this comment as a remineder
|
||||
raise
|
||||
SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
|
||||
|
||||
if self.debug_blocking:
|
||||
self.block_detect_pre()
|
||||
|
||||
for fileno, event in presult:
|
||||
try:
|
||||
if event & READ_MASK:
|
||||
readers.get(fileno, noop).cb(fileno)
|
||||
if event & WRITE_MASK:
|
||||
writers.get(fileno, noop).cb(fileno)
|
||||
if event & EXC_MASK:
|
||||
# zmq.POLLERR is returned for any error condition in the
|
||||
# underlying fd (as passed through to poll/epoll)
|
||||
readers.get(fileno, noop).cb(fileno)
|
||||
writers.get(fileno, noop).cb(fileno)
|
||||
except SYSTEM_EXCEPTIONS:
|
||||
raise
|
||||
except:
|
||||
self.squelch_exception(fileno, sys.exc_info())
|
||||
clear_sys_exc_info()
|
||||
|
||||
if self.debug_blocking:
|
||||
self.block_detect_post()
|
||||
|
||||
|
||||
# def do_poll(self, seconds):
|
||||
# print 'poll: ', seconds
|
||||
# if seconds < 0:
|
||||
# seconds = 500
|
||||
# return self.poll.poll(seconds)
|
20
examples/chat_bridge.py
Normal file
20
examples/chat_bridge.py
Normal file
@@ -0,0 +1,20 @@
|
||||
import sys
|
||||
from zmq import FORWARDER, PUB, SUB, SUBSCRIBE
|
||||
from zmq.devices import Device
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
usage = 'usage: chat_bridge sub_address pub_address'
|
||||
if len (sys.argv) != 3:
|
||||
print usage
|
||||
sys.exit(1)
|
||||
|
||||
sub_addr = sys.argv[1]
|
||||
pub_addr = sys.argv[2]
|
||||
print "Recieving on %s" % sub_addr
|
||||
print "Sending on %s" % pub_addr
|
||||
device = Device(FORWARDER, SUB, PUB)
|
||||
device.bind_in(sub_addr)
|
||||
device.setsockopt_in(SUBSCRIBE, "")
|
||||
device.bind_out(pub_addr)
|
||||
device.start()
|
127
examples/distributed_websocket_chat.py
Normal file
127
examples/distributed_websocket_chat.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""This is a websocket chat example with many servers. A client can connect to
|
||||
any of the servers and their messages will be received by all clients connected
|
||||
to any of the servers.
|
||||
|
||||
Run the examples like this:
|
||||
|
||||
$ python examples/chat_bridge.py tcp://127.0.0.1:12345 tcp://127.0.0.1:12346
|
||||
|
||||
and the servers like this (changing the port for each one obviously):
|
||||
|
||||
$ python examples/distributed_websocket_chat.py -p tcp://127.0.0.1:12345 -s tcp://127.0.0.1:12346 7000
|
||||
|
||||
So all messages are published to port 12345 and the device forwards all the
|
||||
messages to 12346 where they are subscribed to
|
||||
"""
|
||||
import os, sys
|
||||
import eventlet
|
||||
from collections import defaultdict
|
||||
from eventlet import spawn_n, sleep
|
||||
from eventlet import wsgi
|
||||
from eventlet import websocket
|
||||
from eventlet.green import zmq
|
||||
from eventlet.hubs import get_hub, use_hub
|
||||
from uuid import uuid1
|
||||
|
||||
use_hub('zeromq')
|
||||
ctx = zmq.Context()
|
||||
|
||||
class IDName(object):
|
||||
|
||||
def __init__(self):
|
||||
self.id = uuid1()
|
||||
self.name = None
|
||||
|
||||
def __str__(self):
|
||||
if self.name:
|
||||
return self.name
|
||||
else:
|
||||
return str(self.id)
|
||||
|
||||
def pack_message(self, msg):
|
||||
return self, msg
|
||||
|
||||
def unpack_message(self, msg):
|
||||
sender, message = msg
|
||||
sender_name = 'you said' if sender.id == self.id \
|
||||
else '%s says' % sender
|
||||
return "%s: %s" % (sender_name, message)
|
||||
|
||||
|
||||
participants = defaultdict(IDName)
|
||||
|
||||
def subscribe_and_distribute(sub_socket):
|
||||
global participants
|
||||
while True:
|
||||
msg = sub_socket.recv_pyobj()
|
||||
for ws, name_id in participants.items():
|
||||
to_send = name_id.unpack_message(msg)
|
||||
if to_send:
|
||||
try:
|
||||
ws.send(to_send)
|
||||
except:
|
||||
del participants[ws]
|
||||
|
||||
@websocket.WebSocketWSGI
|
||||
def handle(ws):
|
||||
global pub_socket
|
||||
name_id = participants[ws]
|
||||
ws.send("Connected as %s, change name with 'name: new_name'" % name_id)
|
||||
try:
|
||||
while True:
|
||||
m = ws.wait()
|
||||
if m is None:
|
||||
break
|
||||
if m.startswith('name:'):
|
||||
old_name = str(name_id)
|
||||
new_name = m.split(':', 1)[1].strip()
|
||||
name_id.name = new_name
|
||||
m = 'Changed name from %s' % old_name
|
||||
pub_socket.send_pyobj(name_id.pack_message(m))
|
||||
sleep()
|
||||
finally:
|
||||
del participants[ws]
|
||||
|
||||
def dispatch(environ, start_response):
|
||||
"""Resolves to the web page or the websocket depending on the path."""
|
||||
global port
|
||||
if environ['PATH_INFO'] == '/chat':
|
||||
return handle(environ, start_response)
|
||||
else:
|
||||
start_response('200 OK', [('content-type', 'text/html')])
|
||||
return [open(os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
'websocket_chat.html')).read() % dict(port=port)]
|
||||
|
||||
port = None
|
||||
|
||||
if __name__ == "__main__":
|
||||
usage = 'usage: websocket_chat -p pub address -s sub address port number'
|
||||
if len (sys.argv) != 6:
|
||||
print usage
|
||||
sys.exit(1)
|
||||
|
||||
pub_addr = sys.argv[2]
|
||||
sub_addr = sys.argv[4]
|
||||
try:
|
||||
port = int(sys.argv[5])
|
||||
except ValueError:
|
||||
print "Error port supplied couldn't be converted to int\n", usage
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
pub_socket = ctx.socket(zmq.PUB)
|
||||
pub_socket.connect(pub_addr)
|
||||
print "Publishing to %s" % pub_addr
|
||||
sub_socket = ctx.socket(zmq.SUB)
|
||||
sub_socket.connect(sub_addr)
|
||||
sub_socket.setsockopt(zmq.SUBSCRIBE, "")
|
||||
print "Subscribing to %s" % sub_addr
|
||||
except:
|
||||
print "Couldn't create sockets\n", usage
|
||||
sys.exit(1)
|
||||
|
||||
spawn_n(subscribe_and_distribute, sub_socket)
|
||||
listener = eventlet.listen(('127.0.0.1', port))
|
||||
print "\nVisit http://localhost:%s/ in your websocket-capable browser.\n" % port
|
||||
wsgi.server(listener, dispatch)
|
@@ -5,7 +5,7 @@
|
||||
<script>
|
||||
window.onload = function() {
|
||||
var data = {};
|
||||
var s = new WebSocket("ws://127.0.0.1:7000/chat");
|
||||
var s = new WebSocket("ws://127.0.0.1:%(port)s/chat");
|
||||
s.onopen = function() {
|
||||
s.send('New participant joined');
|
||||
};
|
||||
|
@@ -4,6 +4,8 @@ import eventlet
|
||||
from eventlet import wsgi
|
||||
from eventlet import websocket
|
||||
|
||||
PORT = 7000
|
||||
|
||||
participants = set()
|
||||
|
||||
@websocket.WebSocketWSGI
|
||||
@@ -27,10 +29,10 @@ def dispatch(environ, start_response):
|
||||
start_response('200 OK', [('content-type', 'text/html')])
|
||||
return [open(os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
'websocket_chat.html')).read()]
|
||||
'websocket_chat.html')).read() % PORT]
|
||||
|
||||
if __name__ == "__main__":
|
||||
# run an example app from the command line
|
||||
listener = eventlet.listen(('127.0.0.1', 7000))
|
||||
listener = eventlet.listen(('127.0.0.1', PORT))
|
||||
print "\nVisit http://localhost:7000/ in your websocket-capable browser.\n"
|
||||
wsgi.server(listener, dispatch)
|
||||
|
64
examples/zmq_chat.py
Normal file
64
examples/zmq_chat.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import eventlet, sys
|
||||
from eventlet.green import socket, zmq
|
||||
from eventlet.hubs import use_hub
|
||||
use_hub('zeromq')
|
||||
|
||||
ADDR = 'ipc:///tmp/chat'
|
||||
|
||||
ctx = zmq.Context()
|
||||
|
||||
def publish(writer):
|
||||
|
||||
print "connected"
|
||||
socket = ctx.socket(zmq.SUB)
|
||||
|
||||
socket.setsockopt(zmq.SUBSCRIBE, "")
|
||||
socket.connect(ADDR)
|
||||
eventlet.sleep(0.1)
|
||||
|
||||
while True:
|
||||
msg = socket.recv_pyobj()
|
||||
str_msg = "%s: %s" % msg
|
||||
writer.write(str_msg)
|
||||
writer.flush()
|
||||
|
||||
|
||||
PORT=3001
|
||||
|
||||
def read_chat_forever(reader, pub_socket):
|
||||
|
||||
line = reader.readline()
|
||||
who = 'someone'
|
||||
while line:
|
||||
print "Chat:", line.strip()
|
||||
if line.startswith('name:'):
|
||||
who = line.split(':')[-1].strip()
|
||||
|
||||
try:
|
||||
pub_socket.send_pyobj((who, line))
|
||||
except socket.error, e:
|
||||
# ignore broken pipes, they just mean the participant
|
||||
# closed its connection already
|
||||
if e[0] != 32:
|
||||
raise
|
||||
line = reader.readline()
|
||||
print "Participant left chat."
|
||||
|
||||
try:
|
||||
print "ChatServer starting up on port %s" % PORT
|
||||
server = eventlet.listen(('0.0.0.0', PORT))
|
||||
pub_socket = ctx.socket(zmq.PUB)
|
||||
pub_socket.bind(ADDR)
|
||||
eventlet.spawn_n(publish,
|
||||
sys.stdout)
|
||||
while True:
|
||||
new_connection, address = server.accept()
|
||||
|
||||
print "Participant joined chat."
|
||||
eventlet.spawn_n(publish,
|
||||
new_connection.makefile('w'))
|
||||
eventlet.spawn_n(read_chat_forever,
|
||||
new_connection.makefile('r'),
|
||||
pub_socket)
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
print "ChatServer exiting."
|
@@ -70,15 +70,25 @@ except AssertionError:
|
||||
|
||||
|
||||
class Hub(ProcessBase):
|
||||
|
||||
def setUp(self):
|
||||
super(Hub, self).setUp()
|
||||
self.old_environ = os.environ.get('EVENTLET_HUB')
|
||||
os.environ['EVENTLET_HUB'] = 'selects'
|
||||
|
||||
def tearDown(self):
|
||||
if self.old_environ:
|
||||
os.environ['EVENTLET_HUB'] = self.old_environ
|
||||
else:
|
||||
del os.environ['EVENTLET_HUB']
|
||||
super(Hub, self).tearDown()
|
||||
|
||||
def test_eventlet_hub(self):
|
||||
new_mod = """from eventlet import hubs
|
||||
print hubs.get_hub()
|
||||
"""
|
||||
os.environ['EVENTLET_HUB'] = 'selects'
|
||||
try:
|
||||
self.write_to_tempfile("newmod", new_mod)
|
||||
output, lines = self.launch_subprocess('newmod.py')
|
||||
self.assertEqual(len(lines), 2, "\n".join(lines))
|
||||
self.assert_("selects" in lines[0])
|
||||
finally:
|
||||
del os.environ['EVENTLET_HUB']
|
||||
self.write_to_tempfile("newmod", new_mod)
|
||||
output, lines = self.launch_subprocess('newmod.py')
|
||||
self.assertEqual(len(lines), 2, "\n".join(lines))
|
||||
self.assert_("selects" in lines[0])
|
||||
|
||||
|
268
tests/zmq_test.py
Normal file
268
tests/zmq_test.py
Normal file
@@ -0,0 +1,268 @@
|
||||
from eventlet import event, spawn, sleep, patcher
|
||||
from eventlet.hubs import get_hub, _threadlocal, use_hub
|
||||
from eventlet.green import zmq
|
||||
from nose.tools import *
|
||||
from tests import mock, LimitedTestCase, skip_unless
|
||||
from unittest import TestCase
|
||||
|
||||
from threading import Thread
|
||||
from eventlet.hubs.zeromq import Hub
|
||||
|
||||
def using_zmq(_f):
|
||||
return 'zeromq' in type(get_hub()).__module__
|
||||
|
||||
def skip_unless_zmq(func):
|
||||
""" Decorator that skips a test if we're using the pyevent hub."""
|
||||
return skip_unless(using_zmq)(func)
|
||||
|
||||
class TestUpstreamDownStream(LimitedTestCase):
|
||||
|
||||
sockets = []
|
||||
|
||||
def tearDown(self):
|
||||
self.clear_up_sockets()
|
||||
super(TestUpstreamDownStream, self).tearDown()
|
||||
|
||||
def create_bound_pair(self, type1, type2, interface='tcp://127.0.0.1'):
|
||||
"""Create a bound socket pair using a random port."""
|
||||
self.context = context = zmq.Context()
|
||||
s1 = context.socket(type1)
|
||||
port = s1.bind_to_random_port(interface)
|
||||
s2 = context.socket(type2)
|
||||
s2.connect('%s:%s' % (interface, port))
|
||||
self.sockets = [s1, s2]
|
||||
return s1, s2, port
|
||||
|
||||
def clear_up_sockets(self):
|
||||
for sock in self.sockets:
|
||||
sock.close()
|
||||
|
||||
def assertRaisesErrno(self, errno, func, *args):
|
||||
try:
|
||||
func(*args)
|
||||
except zmq.ZMQError, e:
|
||||
self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
|
||||
got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
|
||||
else:
|
||||
self.fail("Function did not raise any error")
|
||||
|
||||
@skip_unless_zmq
|
||||
def test_recv_spawned_before_send_is_non_blocking(self):
|
||||
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
|
||||
# req.connect(ipc)
|
||||
# rep.bind(ipc)
|
||||
sleep()
|
||||
msg = dict(res=None)
|
||||
done = event.Event()
|
||||
def rx():
|
||||
msg['res'] = rep.recv()
|
||||
done.send('done')
|
||||
spawn(rx)
|
||||
req.send('test')
|
||||
done.wait()
|
||||
self.assertEqual(msg['res'], 'test')
|
||||
|
||||
@skip_unless_zmq
|
||||
def test_close_socket_raises_enotsup(self):
|
||||
req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
|
||||
rep.close()
|
||||
req.close()
|
||||
self.assertRaisesErrno(zmq.ENOTSUP, rep.recv)
|
||||
self.assertRaisesErrno(zmq.ENOTSUP, req.send, 'test')
|
||||
|
||||
@skip_unless_zmq
|
||||
def test_send_1k_req_rep(self):
|
||||
req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
|
||||
sleep()
|
||||
done = event.Event()
|
||||
def tx():
|
||||
tx_i = 0
|
||||
req.send(str(tx_i))
|
||||
while req.recv() != 'done':
|
||||
tx_i += 1
|
||||
req.send(str(tx_i))
|
||||
def rx():
|
||||
while True:
|
||||
rx_i = rep.recv()
|
||||
if rx_i == "1000":
|
||||
rep.send('done')
|
||||
sleep()
|
||||
done.send(0)
|
||||
break
|
||||
rep.send('i')
|
||||
spawn(tx)
|
||||
spawn(rx)
|
||||
final_i = done.wait()
|
||||
self.assertEqual(final_i, 0)
|
||||
|
||||
@skip_unless_zmq
|
||||
def test_send_1k_push_pull(self):
|
||||
down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
|
||||
sleep()
|
||||
done = event.Event()
|
||||
def tx():
|
||||
tx_i = 0
|
||||
while tx_i <= 1000:
|
||||
tx_i += 1
|
||||
down.send(str(tx_i))
|
||||
def rx():
|
||||
while True:
|
||||
rx_i = up.recv()
|
||||
if rx_i == "1000":
|
||||
done.send(0)
|
||||
break
|
||||
spawn(tx)
|
||||
spawn(rx)
|
||||
final_i = done.wait()
|
||||
self.assertEqual(final_i, 0)
|
||||
|
||||
@skip_unless_zmq
|
||||
def test_send_1k_pub_sub(self):
|
||||
pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
|
||||
sub1 = self.context.socket(zmq.SUB)
|
||||
sub2 = self.context.socket(zmq.SUB)
|
||||
self.sockets.extend([sub1, sub2])
|
||||
addr = 'tcp://127.0.0.1:%s' % port
|
||||
sub1.connect(addr)
|
||||
sub2.connect(addr)
|
||||
sub_all.setsockopt(zmq.SUBSCRIBE, '')
|
||||
sub1.setsockopt(zmq.SUBSCRIBE, 'sub1')
|
||||
sub2.setsockopt(zmq.SUBSCRIBE, 'sub2')
|
||||
sub_all_done = event.Event()
|
||||
sub1_done = event.Event()
|
||||
sub2_done = event.Event()
|
||||
sleep(0.2)
|
||||
def rx(sock, done_evt, msg_count=10000):
|
||||
count = 0
|
||||
while count < msg_count:
|
||||
msg = sock.recv()
|
||||
sleep()
|
||||
if 'LAST' in msg:
|
||||
break
|
||||
count += 1
|
||||
|
||||
done_evt.send(count)
|
||||
|
||||
def tx(sock):
|
||||
for i in range(1, 1001):
|
||||
msg = "sub%s %s" % (1 if i % 2 else 2, i)
|
||||
sock.send(msg)
|
||||
sleep()
|
||||
sock.send('sub1 LAST')
|
||||
sock.send('sub2 LAST')
|
||||
|
||||
spawn(rx, sub_all, sub_all_done)
|
||||
spawn(rx, sub1, sub1_done)
|
||||
spawn(rx, sub2, sub2_done)
|
||||
spawn(tx, pub)
|
||||
sub1_count = sub1_done.wait()
|
||||
sub2_count = sub2_done.wait()
|
||||
sub_all_count = sub_all_done.wait()
|
||||
self.assertEqual(sub1_count, 500)
|
||||
self.assertEqual(sub2_count, 500)
|
||||
self.assertEqual(sub_all_count, 1000)
|
||||
|
||||
@skip_unless_zmq
|
||||
def test_change_subscription(self):
|
||||
pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
|
||||
sub.setsockopt(zmq.SUBSCRIBE, 'test')
|
||||
|
||||
sub_done = event.Event()
|
||||
sleep(0.2)
|
||||
def rx(sock, done_evt):
|
||||
count = 0
|
||||
sub = 'test'
|
||||
while True:
|
||||
msg = sock.recv()
|
||||
sleep()
|
||||
if 'DONE' in msg:
|
||||
break
|
||||
if 'LAST' in msg and sub == 'test':
|
||||
sock.setsockopt(zmq.UNSUBSCRIBE, 'test')
|
||||
sock.setsockopt(zmq.SUBSCRIBE, 'done')
|
||||
sub = 'done'
|
||||
#continue # We don't want to count this message
|
||||
count += 1
|
||||
done_evt.send(count)
|
||||
|
||||
def tx(sock):
|
||||
for i in range(1, 101):
|
||||
msg = "test %s" % i
|
||||
if i != 50:
|
||||
sock.send(msg)
|
||||
else:
|
||||
sock.send('test LAST')
|
||||
sleep()
|
||||
sock.send('done DONE')
|
||||
|
||||
spawn(rx, sub, sub_done)
|
||||
spawn(tx, pub)
|
||||
|
||||
rx_count = sub_done.wait()
|
||||
self.assertEqual(rx_count, 50)
|
||||
|
||||
|
||||
class TestThreadedContextAccess(TestCase):
|
||||
"""zmq's Context must be unique within a hub
|
||||
|
||||
The zeromq API documentation states:
|
||||
All zmq sockets passed to the zmq_poll() function must share the same zmq
|
||||
context and must belong to the thread calling zmq_poll()
|
||||
|
||||
As zmq_poll is what's eventually being called then we need to insure that
|
||||
all sockets that are going to be passed to zmq_poll (via hub.do_poll) are
|
||||
in the same context
|
||||
"""
|
||||
|
||||
@skip_unless_zmq
|
||||
@mock.patch('eventlet.green.zmq.get_hub_name_from_instance')
|
||||
@mock.patch('eventlet.green.zmq.get_hub', spec=Hub)
|
||||
def test_context_factory_funtion(self, get_hub_mock, hub_name_mock):
|
||||
hub_name_mock.return_value = 'zeromq'
|
||||
ctx = zmq.Context()
|
||||
self.assertTrue(get_hub_mock().get_context.called)
|
||||
|
||||
@skip_unless_zmq
|
||||
def test_threadlocal_context(self):
|
||||
hub = get_hub()
|
||||
context = zmq.Context()
|
||||
self.assertEqual(context, _threadlocal.context)
|
||||
next_context = hub.get_context()
|
||||
self.assertTrue(context is next_context)
|
||||
|
||||
@skip_unless_zmq
|
||||
def test_different_context_in_different_thread(self):
|
||||
context = zmq.Context()
|
||||
test_result = []
|
||||
def assert_different(ctx):
|
||||
# assert not hasattr(_threadlocal, 'hub')
|
||||
# import os
|
||||
# os.environ['EVENTLET_HUB'] = 'zeromq'
|
||||
hub = get_hub()
|
||||
try:
|
||||
this_thread_context = zmq.Context()
|
||||
except:
|
||||
test_result.append('fail')
|
||||
raise
|
||||
test_result.append(ctx is this_thread_context)
|
||||
Thread(target=assert_different, args=(context,)).start()
|
||||
while not test_result:
|
||||
sleep(0.1)
|
||||
self.assertFalse(test_result[0])
|
||||
|
||||
class TestCheckingForZMQHub(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.orig_hub = zmq.get_hub_name_from_instance(get_hub())
|
||||
use_hub('poll')
|
||||
|
||||
def tearDown(self):
|
||||
use_hub(self.orig_hub)
|
||||
|
||||
def test_assertionerror_raise_by_context(self):
|
||||
self.assertRaises(RuntimeError, zmq.Context)
|
||||
|
||||
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user