diff --git a/AUTHORS b/AUTHORS index 4b105ff..70f27da 100644 --- a/AUTHORS +++ b/AUTHORS @@ -143,3 +143,4 @@ Thanks To * Matthew D. Pagel * Matt Yule-Bennett * Artur Stawiarski +* Tal Wrii diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py index 97cf788..b3e70e2 100644 --- a/eventlet/green/zmq.py +++ b/eventlet/green/zmq.py @@ -95,11 +95,14 @@ class _BlockedThread(object): __bool__ = __nonzero__ - def block(self): + def block(self, deadline=None): if self._blocked_thread is not None: raise Exception("Cannot block more than one thread on one BlockedThread") self._blocked_thread = greenlet.getcurrent() + if deadline is not None: + self._hub.schedule_call_local(deadline - self._hub.clock(), self.wake) + try: self._hub.switch() finally: @@ -245,6 +248,7 @@ class Socket(_Socket): event, lambda _: None, lambda: None) + self.__dict__['_eventlet_clock'] = hub.clock @_wraps(_Socket.close) def close(self, linger=None): @@ -376,6 +380,16 @@ class Socket(_Socket): self._eventlet_recv_event.wake() return msg + deadline = None + if hasattr(__zmq__, 'RCVTIMEO'): + sock_timeout = self.getsockopt(__zmq__.RCVTIMEO) + if sock_timeout == -1: + pass + elif sock_timeout > 0: + deadline = self._eventlet_clock() + sock_timeout / 1000.0 + else: + raise ValueError(sock_timeout) + flags |= NOBLOCK with self._eventlet_recv_lock: while True: @@ -383,7 +397,12 @@ class Socket(_Socket): return _Socket_recv(self, flags, copy, track) except ZMQError as e: if e.errno == EAGAIN: - self._eventlet_recv_event.block() + # zmq in its wisdom decided to reuse EAGAIN for timeouts + if deadline is not None and self._eventlet_clock() > deadline: + e.is_timeout = True + raise + + self._eventlet_recv_event.block(deadline=deadline) else: raise finally: diff --git a/tests/zmq_test.py b/tests/zmq_test.py index 5c1a25f..601878f 100644 --- a/tests/zmq_test.py +++ b/tests/zmq_test.py @@ -597,3 +597,16 @@ def test_recv_json_no_args(): with clean_pair(zmq.REQ, zmq.REP) as (s1, s2, _): eventlet.spawn(s1.send_json, {}) s2.recv_json() + + +@tests.skip_unless(zmq_supported) +def test_recv_timeout(): + # https://github.com/eventlet/eventlet/issues/282 + with clean_pair(zmq.PUB, zmq.SUB) as (_, sub, _): + sub.setsockopt(zmq.RCVTIMEO, 100) + try: + with eventlet.Timeout(1, False): + sub.recv() + assert False + except zmq.ZMQError as e: + assert eventlet.is_timeout(e)