Add an optional timeout parameter to Listener.poll

For asynchronous programming, a timeout parameter is required on the listener
to allow to stop it at exit. poll() returns None on timeout.

It plan to use it in my new asyncio (Trollius) executor:
https://review.openstack.org/#/c/70983/

See also the related blueprint for the rationale:
https://wiki.openstack.org/wiki/Oslo/blueprints/asyncio

Change-Id: I918ae3c267743a0eaed1d6a210c79fb4a0eb8733
This commit is contained in:
Victor Stinner 2014-02-04 16:06:42 +01:00
parent 7d927e5e10
commit 7fe2ef7334
4 changed files with 38 additions and 9 deletions

View File

@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase']
import logging
import threading
import time
import uuid
from six import moves
@ -103,11 +104,21 @@ class AMQPListener(base.Listener):
ctxt.msg_id,
ctxt.reply_q))
def poll(self):
def poll(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
else:
deadline = None
while True:
if self.incoming:
return self.incoming.pop(0)
self.conn.consume(limit=1)
if deadline is not None:
timeout = deadline - time.time()
if timeout < 0:
return None
self.conn.consume(limit=1, timeout=timeout)
else:
self.conn.consume(limit=1)
class ReplyWaiters(object):

View File

@ -53,8 +53,11 @@ class Listener(object):
self.driver = driver
@abc.abstractmethod
def poll(self):
"Blocking until a message is pending and return IncomingMessage."
def poll(self, timeout=None):
"""Blocking until a message is pending and return IncomingMessage.
Return None after timeout seconds if timeout is set and no message is
ending.
"""
@six.add_metaclass(abc.ABCMeta)

View File

@ -45,7 +45,11 @@ class FakeListener(base.Listener):
self._exchange_manager = exchange_manager
self._targets = targets
def poll(self):
def poll(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
else:
deadline = None
while True:
for target in self._targets:
exchange = self._exchange_manager.get_exchange(target.exchange)
@ -54,7 +58,15 @@ class FakeListener(base.Listener):
message = FakeIncomingMessage(self, ctxt, message,
reply_q, requeue)
return message
time.sleep(.05)
if deadline is not None:
pause = deadline - time.time()
if pause < 0:
break
pause = min(pause, 0.050)
else:
pause = 0.050
time.sleep(pause)
return None
class FakeExchange(object):

View File

@ -879,9 +879,12 @@ class ZmqListener(base.Listener):
else:
return incoming.received.reply
def poll(self):
while True:
return self.incoming_queue.get()
def poll(self, timeout=None):
try:
return self.incoming_queue.get(timeout=timeout)
except six.moves.queue.Empty:
# timeout
return None
class ZmqDriver(base.BaseDriver):