zmq: cache super() calls. fix calls to getsockopt. remove unnecessary __zmq__ references.

This commit is contained in:
Geoff Salmon
2011-12-29 11:01:43 -05:00
parent 38c4201674
commit 4545bf1713

View File

@@ -1,8 +1,8 @@
"""The :mod:`zmq` module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
"""
__zmq__ = __import__('zmq')
from eventlet import sleep, hubs
from eventlet.hubs import trampoline, _threadlocal
from eventlet import hubs
from eventlet.hubs import trampoline
from eventlet.patcher import slurp_properties
from eventlet.support import greenlets as greenlet
@@ -184,7 +184,14 @@ def _wraps(source_fn):
# - What should happen to threads blocked on send/recv when socket is
# closed?
class Socket(__zmq__.Socket):
_Socket = __zmq__.Socket
_Socket_recv = _Socket.recv
_Socket_send = _Socket.send
_Socket_send_multipart = _Socket.send_multipart
_Socket_recv_multipart = _Socket.recv_multipart
_Socket_getsockopt = _Socket.getsockopt
class Socket(_Socket):
"""Green version of :class:`zmq.core.socket.Socket
The following three methods are always overridden:
@@ -227,7 +234,7 @@ class Socket(__zmq__.Socket):
else:
self.recv = self.recv_multipart = self._send_not_supported
def _trampoline(self, is_send):
def _trampoline(self, evt):
"""Wait for events on the zmq socket. After this method
returns it is still possible that send and recv will return
EAGAIN.
@@ -238,7 +245,6 @@ class Socket(__zmq__.Socket):
still block.
"""
evt = self._send_event if is_send else self._recv_event
if self._in_trampoline:
# Already a thread blocked in trampoline.
evt.block()
@@ -247,95 +253,95 @@ class Socket(__zmq__.Socket):
self._in_trampoline = True
with evt:
# Only trampoline on read events for zmq FDs, never write.
trampoline(self.getsockopt(__zmq__.FD), read=True)
trampoline(self.getsockopt(FD), read=True)
finally:
self._in_trampoline = False
@_wraps(__zmq__.Socket.send)
@_wraps(_Socket.send)
def send(self, msg, flags=0, copy=True, track=False):
"""Send method used by REP and REQ sockets. The lock-step
send->recv->send->recv restriction of these sockets makes this
implementation simple.
"""
if flags & __zmq__.NOBLOCK:
return super(Socket, self).send(msg, flags, copy, track)
if flags & NOBLOCK:
return _Socket_send(self, msg, flags, copy, track)
flags |= __zmq__.NOBLOCK
flags |= NOBLOCK
while True:
try:
return super(Socket, self).send(msg, flags, copy, track)
except __zmq__.ZMQError, e:
return _Socket_send(self, msg, flags, copy, track)
except ZMQError, e:
if e.errno == EAGAIN:
self._trampoline(True)
self._trampoline(self._send_event)
else:
raise
@_wraps(__zmq__.Socket.recv)
@_wraps(_Socket.recv)
def recv(self, flags=0, copy=True, track=False):
"""Recv method used by REP and REQ sockets. The lock-step
send->recv->send->recv restriction of these sockets makes this
implementation simple.
"""
if flags & __zmq__.NOBLOCK:
return super(Socket, self).recv(flags, copy, track)
if flags & NOBLOCK:
return _Socket_recv(self, flags, copy, track)
flags |= __zmq__.NOBLOCK
flags |= NOBLOCK
while True:
try:
return super(Socket, self).recv(flags, copy, track)
except __zmq__.ZMQError, e:
return _Socket_recv(self, flags, copy, track)
except ZMQError, e:
if e.errno == EAGAIN:
self._trampoline(False)
self._trampoline(self._recv_event)
else:
raise
@_wraps(__zmq__.Socket.getsockopt)
@_wraps(_Socket.getsockopt)
def getsockopt(self, option):
result = super(Socket, self).getsockopt(option)
if option == __zmq__.EVENTS:
result = _Socket_getsockopt(self, option)
if option == EVENTS:
# Getting the events causes the zmq socket to process
# events which may mean a msg can be sent or received. If
# there is a greenthread blocked and waiting for events,
# it will miss the edge-triggered read event, so wake it
# up.
if self._send_evt and (result & __zmq__.POLLOUT):
if self._send_evt and (result & POLLOUT):
self._send_evt.wake()
if self._recv_evt and (result & __zmq__.POLLIN):
if self._recv_evt and (result & POLLIN):
self._recv_evt.wake()
return result
def _send_not_supported(self, msg, flags, copy, track):
raise __zmq__.ZMQError(__zmq__.ENOTSUP)
raise ZMQError(ENOTSUP)
def _recv_not_supported(self, flags, copy, track):
raise __zmq__.ZMQError(__zmq__.ENOTSUP)
raise ZMQError(ENOTSUP)
@_wraps(__zmq__.Socket.send)
@_wraps(_Socket.send)
def _xsafe_send(self, msg, flags=0, copy=True, track=False):
"""A send method that's safe to use when multiple greenthreads
are calling send, send_multipart, recv and recv_multipart on
the same socket.
"""
if flags & __zmq__.NOBLOCK:
result = super(Socket, self).send(msg, flags, copy, track)
if flags & NOBLOCK:
result = _Socket_send(self, msg, flags, copy, track)
if self._send_event or self._recv_event:
getsockopt(__zmq__.EVENTS) # triggers wakeups
self.getsockopt(EVENTS) # triggers wakeups
return result
# TODO: pyzmq will copy the message buffer and create Message
# objects under some circumstances. We could do that work here
# once to avoid doing it every time the send is retried.
flags |= __zmq__.NOBLOCK
flags |= NOBLOCK
with self._send_lock:
while True:
try:
return super(Socket, self).send(msg, flags, copy, track)
except __zmq__.ZMQError, e:
return _Socket_send(self, msg, flags, copy, track)
except ZMQError, e:
if e.errno == EAGAIN:
self._trampoline(True)
self._trampoline(self._send_event)
else:
raise
finally:
@@ -346,63 +352,63 @@ class Socket(__zmq__.Socket):
self._recv_event.wake()
@_wraps(__zmq__.Socket.send_multipart)
@_wraps(_Socket.send_multipart)
def _xsafe_send_multipart(self, msg_parts, flags=0, copy=True, track=False):
"""A send_multipart method that's safe to use when multiple
greenthreads are calling send, send_multipart, recv and
recv_multipart on the same socket.
"""
if flags & __zmq__.NOBLOCK:
return super(Socket, self).send_multipart(msg_parts, flags, copy, track)
if flags & NOBLOCK:
return _Socket_send_multipart(self, msg_parts, flags, copy, track)
# acquire lock here so the subsequent calls to send for the
# message parts after the first don't block
with self._send_lock:
return super(Socket, self).send_multipart(msg_parts, flags, copy, track)
return _Socket_send_multipart(self, msg_parts, flags, copy, track)
@_wraps(__zmq__.Socket.recv)
@_wraps(_Socket.recv)
def _xsafe_recv(self, flags=0, copy=True, track=False):
"""A recv method that's safe to use when multiple greenthreads
are calling send, send_multipart, recv and recv_multipart on
the same socket.
"""
if flags & __zmq__.NOBLOCK:
msg = super(Socket, self).recv(flags, copy, track)
if flags & NOBLOCK:
msg = _Socket_recv(self, flags, copy, track)
if self._send_event or self._recv_event:
getsockopt(__zmq__.EVENTS) # triggers wakeups
self.getsockopt(EVENTS) # triggers wakeups
return msg
flags |= __zmq__.NOBLOCK
flags |= NOBLOCK
with self._recv_lock:
while True:
try:
try:
return super(Socket, self).recv(flags, copy, track)
return _Socket_recv(self, flags, copy, track)
finally:
# The call to recv processes 0mq events and may
# make the socket ready to send. Wake the next
# receiver. (Could check EVENTS for POLLOUT here)
if self._send_event:
self._send_event.wake()
except __zmq__.ZMQError, e:
except ZMQError, e:
if e.errno == EAGAIN:
self._trampoline(False)
self._trampoline(self._recv_event)
else:
raise
@_wraps(__zmq__.Socket.recv_multipart)
@_wraps(_Socket.recv_multipart)
def _xsafe_recv_multipart(self, flags=0, copy=True, track=False):
"""A recv_multipart method that's safe to use when multiple
greenthreads are calling send, send_multipart, recv and
recv_multipart on the same socket.
"""
if flags & __zmq__.NOBLOCK:
return super(Socket, self).recv_multipart(flags, copy, track)
if flags & NOBLOCK:
return _Socket_recv_multipart(self, flags, copy, track)
# acquire lock here so the subsequent calls to recv for the
# message parts after the first don't block
with self._recv_lock:
return super(Socket, self).recv_multipart(flags, copy, track)
return _Socket_recv_multipart(self, flags, copy, track)
# The behavior of the send and recv methods depends on the socket
# type. See http://api.zeromq.org/2-1:zmq-socket for explanation
@@ -414,19 +420,19 @@ class Socket(__zmq__.Socket):
_full_ops = (_xsafe_send, _xsafe_send_multipart, _xsafe_recv, _xsafe_recv_multipart)
_eventlet_ops = {
__zmq__.PUB: _send_only_ops,
__zmq__.SUB: _recv_only_ops,
PUB: _send_only_ops,
SUB: _recv_only_ops,
__zmq__.PUSH: _send_only_ops,
__zmq__.PULL: _recv_only_ops,
PUSH: _send_only_ops,
PULL: _recv_only_ops,
__zmq__.PAIR: _full_ops
PAIR: _full_ops
}
try:
_eventlet_ops[__zmq__.XREP] = _full_ops
_eventlet_ops[__zmq__.XREQ] = _full_ops
_eventlet_ops[XREP] = _full_ops
_eventlet_ops[XREQ] = _full_ops
except AttributeError:
# XREP and XREQ are being renamed ROUTER and DEALER
_eventlet_ops[__zmq__.ROUTER] = _full_ops
_eventlet_ops[__zmq__.DEALER] = _full_ops
_eventlet_ops[ROUTER] = _full_ops
_eventlet_ops[DEALER] = _full_ops