From 4545bf1713b3d84fdf291e33460426b01fea7fa1 Mon Sep 17 00:00:00 2001 From: Geoff Salmon Date: Thu, 29 Dec 2011 11:01:43 -0500 Subject: [PATCH] zmq: cache super() calls. fix calls to getsockopt. remove unnecessary __zmq__ references. --- eventlet/green/zmq.py | 126 ++++++++++++++++++++++-------------------- 1 file changed, 66 insertions(+), 60 deletions(-) diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py index 6634277..f1ebe95 100644 --- a/eventlet/green/zmq.py +++ b/eventlet/green/zmq.py @@ -1,8 +1,8 @@ """The :mod:`zmq` module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq ` to be non blocking """ __zmq__ = __import__('zmq') -from eventlet import sleep, hubs -from eventlet.hubs import trampoline, _threadlocal +from eventlet import hubs +from eventlet.hubs import trampoline from eventlet.patcher import slurp_properties from eventlet.support import greenlets as greenlet @@ -184,7 +184,14 @@ def _wraps(source_fn): # - What should happen to threads blocked on send/recv when socket is # closed? -class Socket(__zmq__.Socket): +_Socket = __zmq__.Socket +_Socket_recv = _Socket.recv +_Socket_send = _Socket.send +_Socket_send_multipart = _Socket.send_multipart +_Socket_recv_multipart = _Socket.recv_multipart +_Socket_getsockopt = _Socket.getsockopt + +class Socket(_Socket): """Green version of :class:`zmq.core.socket.Socket The following three methods are always overridden: @@ -227,7 +234,7 @@ class Socket(__zmq__.Socket): else: self.recv = self.recv_multipart = self._send_not_supported - def _trampoline(self, is_send): + def _trampoline(self, evt): """Wait for events on the zmq socket. After this method returns it is still possible that send and recv will return EAGAIN. @@ -238,7 +245,6 @@ class Socket(__zmq__.Socket): still block. """ - evt = self._send_event if is_send else self._recv_event if self._in_trampoline: # Already a thread blocked in trampoline. evt.block() @@ -247,95 +253,95 @@ class Socket(__zmq__.Socket): self._in_trampoline = True with evt: # Only trampoline on read events for zmq FDs, never write. - trampoline(self.getsockopt(__zmq__.FD), read=True) + trampoline(self.getsockopt(FD), read=True) finally: self._in_trampoline = False - @_wraps(__zmq__.Socket.send) + @_wraps(_Socket.send) def send(self, msg, flags=0, copy=True, track=False): """Send method used by REP and REQ sockets. The lock-step send->recv->send->recv restriction of these sockets makes this implementation simple. """ - if flags & __zmq__.NOBLOCK: - return super(Socket, self).send(msg, flags, copy, track) + if flags & NOBLOCK: + return _Socket_send(self, msg, flags, copy, track) - flags |= __zmq__.NOBLOCK + flags |= NOBLOCK while True: try: - return super(Socket, self).send(msg, flags, copy, track) - except __zmq__.ZMQError, e: + return _Socket_send(self, msg, flags, copy, track) + except ZMQError, e: if e.errno == EAGAIN: - self._trampoline(True) + self._trampoline(self._send_event) else: raise - @_wraps(__zmq__.Socket.recv) + @_wraps(_Socket.recv) def recv(self, flags=0, copy=True, track=False): """Recv method used by REP and REQ sockets. The lock-step send->recv->send->recv restriction of these sockets makes this implementation simple. """ - if flags & __zmq__.NOBLOCK: - return super(Socket, self).recv(flags, copy, track) + if flags & NOBLOCK: + return _Socket_recv(self, flags, copy, track) - flags |= __zmq__.NOBLOCK + flags |= NOBLOCK while True: try: - return super(Socket, self).recv(flags, copy, track) - except __zmq__.ZMQError, e: + return _Socket_recv(self, flags, copy, track) + except ZMQError, e: if e.errno == EAGAIN: - self._trampoline(False) + self._trampoline(self._recv_event) else: raise - @_wraps(__zmq__.Socket.getsockopt) + @_wraps(_Socket.getsockopt) def getsockopt(self, option): - result = super(Socket, self).getsockopt(option) - if option == __zmq__.EVENTS: + result = _Socket_getsockopt(self, option) + if option == EVENTS: # Getting the events causes the zmq socket to process # events which may mean a msg can be sent or received. If # there is a greenthread blocked and waiting for events, # it will miss the edge-triggered read event, so wake it # up. - if self._send_evt and (result & __zmq__.POLLOUT): + if self._send_evt and (result & POLLOUT): self._send_evt.wake() - if self._recv_evt and (result & __zmq__.POLLIN): + if self._recv_evt and (result & POLLIN): self._recv_evt.wake() return result def _send_not_supported(self, msg, flags, copy, track): - raise __zmq__.ZMQError(__zmq__.ENOTSUP) + raise ZMQError(ENOTSUP) def _recv_not_supported(self, flags, copy, track): - raise __zmq__.ZMQError(__zmq__.ENOTSUP) + raise ZMQError(ENOTSUP) - @_wraps(__zmq__.Socket.send) + @_wraps(_Socket.send) def _xsafe_send(self, msg, flags=0, copy=True, track=False): """A send method that's safe to use when multiple greenthreads are calling send, send_multipart, recv and recv_multipart on the same socket. """ - if flags & __zmq__.NOBLOCK: - result = super(Socket, self).send(msg, flags, copy, track) + if flags & NOBLOCK: + result = _Socket_send(self, msg, flags, copy, track) if self._send_event or self._recv_event: - getsockopt(__zmq__.EVENTS) # triggers wakeups + self.getsockopt(EVENTS) # triggers wakeups return result # TODO: pyzmq will copy the message buffer and create Message # objects under some circumstances. We could do that work here # once to avoid doing it every time the send is retried. - flags |= __zmq__.NOBLOCK + flags |= NOBLOCK with self._send_lock: while True: try: - return super(Socket, self).send(msg, flags, copy, track) - except __zmq__.ZMQError, e: + return _Socket_send(self, msg, flags, copy, track) + except ZMQError, e: if e.errno == EAGAIN: - self._trampoline(True) + self._trampoline(self._send_event) else: raise finally: @@ -346,63 +352,63 @@ class Socket(__zmq__.Socket): self._recv_event.wake() - @_wraps(__zmq__.Socket.send_multipart) + @_wraps(_Socket.send_multipart) def _xsafe_send_multipart(self, msg_parts, flags=0, copy=True, track=False): """A send_multipart method that's safe to use when multiple greenthreads are calling send, send_multipart, recv and recv_multipart on the same socket. """ - if flags & __zmq__.NOBLOCK: - return super(Socket, self).send_multipart(msg_parts, flags, copy, track) + if flags & NOBLOCK: + return _Socket_send_multipart(self, msg_parts, flags, copy, track) # acquire lock here so the subsequent calls to send for the # message parts after the first don't block with self._send_lock: - return super(Socket, self).send_multipart(msg_parts, flags, copy, track) + return _Socket_send_multipart(self, msg_parts, flags, copy, track) - @_wraps(__zmq__.Socket.recv) + @_wraps(_Socket.recv) def _xsafe_recv(self, flags=0, copy=True, track=False): """A recv method that's safe to use when multiple greenthreads are calling send, send_multipart, recv and recv_multipart on the same socket. """ - if flags & __zmq__.NOBLOCK: - msg = super(Socket, self).recv(flags, copy, track) + if flags & NOBLOCK: + msg = _Socket_recv(self, flags, copy, track) if self._send_event or self._recv_event: - getsockopt(__zmq__.EVENTS) # triggers wakeups + self.getsockopt(EVENTS) # triggers wakeups return msg - flags |= __zmq__.NOBLOCK + flags |= NOBLOCK with self._recv_lock: while True: try: try: - return super(Socket, self).recv(flags, copy, track) + return _Socket_recv(self, flags, copy, track) finally: # The call to recv processes 0mq events and may # make the socket ready to send. Wake the next # receiver. (Could check EVENTS for POLLOUT here) if self._send_event: self._send_event.wake() - except __zmq__.ZMQError, e: + except ZMQError, e: if e.errno == EAGAIN: - self._trampoline(False) + self._trampoline(self._recv_event) else: raise - @_wraps(__zmq__.Socket.recv_multipart) + @_wraps(_Socket.recv_multipart) def _xsafe_recv_multipart(self, flags=0, copy=True, track=False): """A recv_multipart method that's safe to use when multiple greenthreads are calling send, send_multipart, recv and recv_multipart on the same socket. """ - if flags & __zmq__.NOBLOCK: - return super(Socket, self).recv_multipart(flags, copy, track) + if flags & NOBLOCK: + return _Socket_recv_multipart(self, flags, copy, track) # acquire lock here so the subsequent calls to recv for the # message parts after the first don't block with self._recv_lock: - return super(Socket, self).recv_multipart(flags, copy, track) + return _Socket_recv_multipart(self, flags, copy, track) # The behavior of the send and recv methods depends on the socket # type. See http://api.zeromq.org/2-1:zmq-socket for explanation @@ -414,19 +420,19 @@ class Socket(__zmq__.Socket): _full_ops = (_xsafe_send, _xsafe_send_multipart, _xsafe_recv, _xsafe_recv_multipart) _eventlet_ops = { - __zmq__.PUB: _send_only_ops, - __zmq__.SUB: _recv_only_ops, + PUB: _send_only_ops, + SUB: _recv_only_ops, - __zmq__.PUSH: _send_only_ops, - __zmq__.PULL: _recv_only_ops, + PUSH: _send_only_ops, + PULL: _recv_only_ops, - __zmq__.PAIR: _full_ops + PAIR: _full_ops } try: - _eventlet_ops[__zmq__.XREP] = _full_ops - _eventlet_ops[__zmq__.XREQ] = _full_ops + _eventlet_ops[XREP] = _full_ops + _eventlet_ops[XREQ] = _full_ops except AttributeError: # XREP and XREQ are being renamed ROUTER and DEALER - _eventlet_ops[__zmq__.ROUTER] = _full_ops - _eventlet_ops[__zmq__.DEALER] = _full_ops + _eventlet_ops[ROUTER] = _full_ops + _eventlet_ops[DEALER] = _full_ops