diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py index a4e03e0..ab936f2 100644 --- a/eventlet/green/zmq.py +++ b/eventlet/green/zmq.py @@ -63,8 +63,6 @@ _disable_recv_types = set([__zmq__.PUB, __zmq__.PUSH]) # TODO: # - Ensure that recv* and send* methods raise error when called on a # closed socket. They should not block. -# - Ensure that recv* and send* methods raise EFSM error when socket -# is in improper state. Avoid blocking. # - Return correct message tracker from send* methods # - Make MessageTracker.wait zmq friendly # - What should happen to threads blocked on send/recv when socket is @@ -117,23 +115,44 @@ class Socket(__zmq__.Socket): elif socket_type in _disable_recv_types: self.recv = self.recv_multipart = self._recv_not_supported - def _sock_wait(self, read=False, write=False): - """ - First checks if there are events in the socket, to avoid - edge trigger problems with race conditions. Then if there - are none it will trampoline and when coming back check - for the events. - """ - events = self._super_getsockopt(__zmq__.EVENTS) + def _trampoline(self): + """Wait for events on the zmq socket. After this method + returns it is still possible that send and recv will return + EAGAIN. - if read and (events & __zmq__.POLLIN): - return events - elif write and (events & __zmq__.POLLOUT): - return events - else: - # ONLY trampoline on read events for the zmq FD + Because the zmq FD is edge triggered, any call that causes the + zmq socket to process its events must wake the greenthread + that called trampoline by calling _wake_listener in case it + missed the event.""" + try: + self._blocked_thread = greenlet.getcurrent() + # Only trampoline on read events for zmq FDs, never write. trampoline(self._fd, read=True) - return self._super_getsockopt(__zmq__.EVENTS) + finally: + self._blocked_thread = None + # Either the fd is readable or we were woken by + # another thread. Cleanup the wakeup timer. + t = self._wakeup_timer + if t is not None: + # Important to cancel the timer so it doesn't + # spuriously wake this greenthread later on. + t.cancel() + self._wakeup_timer = None + + def _wake_listener(self): + """If a thread has called trampoline, wake it up. This can + safely be called multiple times and will have no effect if the + thread has already been woken up. + + Returns True if there is a listener thread that called + trampoline, False if not.""" + is_listener = self._blocked_thread is not None + + if is_listener and self._wakeup_timer is None: + self._wakeup_timer = hubs.get_hub().schedule_call_global(0, self._blocked_thread.switch) + return True + + return is_listener def send(self, msg, flags=0, copy=True, track=False): """ @@ -142,17 +161,17 @@ class Socket(__zmq__.Socket): called in real code. """ if flags & __zmq__.NOBLOCK: - super(Socket, self).send(msg, flags=flags, copy=copy, track=track) - return + return super(Socket, self).send(msg, flags, copy, track) flags |= __zmq__.NOBLOCK while True: try: - self._sock_wait(write=True) - return super(Socket, self).send(msg, flags=flags, copy=copy, track=track) + return super(Socket, self).send(msg, flags, copy, track) except __zmq__.ZMQError, e: - if e.errno != EAGAIN: + if e.errno == EAGAIN: + self._trampoline() + else: raise def recv(self, flags=0, copy=True, track=False): @@ -162,16 +181,17 @@ class Socket(__zmq__.Socket): called in real code. """ if flags & __zmq__.NOBLOCK: - return super(Socket, self).recv(flags=flags, copy=copy, track=track) + return super(Socket, self).recv(flags, copy, track) flags |= __zmq__.NOBLOCK while True: try: - self._sock_wait(read=True) - return super(Socket, self).recv(flags=flags, copy=copy, track=track) + return super(Socket, self).recv(flags, copy, track) except __zmq__.ZMQError, e: - if e.errno != EAGAIN: + if e.errno == EAGAIN: + self._trampoline() + else: raise def getsockopt(self, option): @@ -189,10 +209,10 @@ class Socket(__zmq__.Socket): return result - def _send_not_supported(self, msg, flags=0, copy=True, track=False): + def _send_not_supported(self, msg, flags, copy, track): raise __zmq__.ZMQError(__zmq__.ENOTSUP) - def _recv_not_supported(self, flags=0, copy=True, track=False): + def _recv_not_supported(self, flags, copy, track): raise __zmq__.ZMQError(__zmq__.ENOTSUP) def _xsafe_send(self, msg, flags=0, copy=True, track=False): @@ -203,7 +223,7 @@ class Socket(__zmq__.Socket): """ if flags & __zmq__.NOBLOCK: raise __zmq__.ZMQError(__zmq__.ENOTSUP) - result = super(Socket, self).send(msg, flags=flags, copy=copy, track=track) + result = super(Socket, self).send(msg, flags, copy, track) self._wake_listener() return result @@ -220,7 +240,7 @@ class Socket(__zmq__.Socket): if flags & __zmq__.NOBLOCK: raise __zmq__.ZMQError(__zmq__.ENOTSUP) - result = super(Socket, self).send_multipart(msg_parts, flags=flags, copy=copy, track=track) + result = super(Socket, self).send_multipart(msg_parts, flags, copy, track) self._wake_listener() return result @@ -233,11 +253,9 @@ class Socket(__zmq__.Socket): # immediately. This is the fast path. try: if multi: - r = super(Socket, self).send_multipart( - msg, flags=flags, copy=copy, track=track) + r = super(Socket, self).send_multipart(msg, flags, copy, track) else: - r = super(Socket, self).send( - msg, flags=flags, copy=copy, track=track) + r = super(Socket, self).send( msg, flags, copy, track) self._wake_listener() return r @@ -262,7 +280,7 @@ class Socket(__zmq__.Socket): if flags & __zmq__.NOBLOCK: raise __zmq__.ZMQError(__zmq__.ENOTSUP) - msg = super(Socket, self).recv(flags=flags, copy=copy, track=track) + msg = super(Socket, self).recv(flags, copy, track) self._wake_listener() return msg @@ -276,7 +294,7 @@ class Socket(__zmq__.Socket): """ if flags & __zmq__.NOBLOCK: raise __zmq__.ZMQError(__zmq__.ENOTSUP) - msg = super(Socket, self).recv_multipart(flags=flags, copy=copy, track=track) + msg = super(Socket, self).recv_multipart(flags, copy, track) self._wake_listener() return msg @@ -289,11 +307,9 @@ class Socket(__zmq__.Socket): # immediately. This is the fast path. try: if multi: - msg = super(Socket, self).recv_multipart( - flags=flags, copy=copy, track=track) + msg = super(Socket, self).recv_multipart(flags, copy, track) else: - msg = super(Socket, self).recv( - flags=flags, copy=copy, track=track) + msg = super(Socket, self).recv(flags, copy, track) self._wake_listener() return msg @@ -317,15 +333,6 @@ class Socket(__zmq__.Socket): return self._process_queues() - def _wake_listener(self): - is_listener = self._blocked_thread is not None - - if is_listener and self._wakeup_timer is None: - self._wakeup_timer = hubs.get_hub().schedule_call_global(0, self._blocked_thread.switch) - return True - - return is_listener - def _process_queues(self): """ If there are readers or writers queued, this method tries to recv or send messages and ensures processing continues @@ -383,21 +390,8 @@ class Socket(__zmq__.Socket): if next_thread: # Only trampoline if this thread is the next reader or writer if next_reader is current or next_writer is current: - try: - self._blocked_thread = current - # Only trampoline on read events for zmq FDs, never write. - trampoline(self._fd, read=True) - continue - finally: - self._blocked_thread = None - # Either the fd is readable or we were woken by - # another thread. Cleanup the wakeup timer. - t = self._wakeup_timer - if t is not None: - # Important to cancel the timer so it doesn't - # spuriously wake this greenthread later on. - t.cancel() - self._wakeup_timer = None + self._trampoline() + continue else: # This greenthread's work is done. Wake another to # continue processing the queues if there is one @@ -423,9 +417,9 @@ class Socket(__zmq__.Socket): writer, multi, msg, flags, copy, track = writers[0] try: if multi: - r = super_send_multipart(msg, flags=flags, copy=copy, track=track) + r = super_send_multipart(msg, flags, copy, track) else: - r = super_send(msg, flags=flags, copy=copy, track=track) + r = super_send(msg, flags, copy, track) # remember this thread's result if current is writer: