diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py index 8eff1feb7..2c33168e5 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_listener.py +++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py @@ -97,17 +97,27 @@ class RpcReplyPikaListener(object): """ while self._reply_poller: try: - message = self._reply_poller.poll() - if message is None: + try: + messages = self._reply_poller.poll() + except pika_drv_exc.EstablishConnectionException: + LOG.exception("Problem during establishing connection for " + "reply polling") + time.sleep( + self._pika_engine.host_connection_reconnect_delay + ) continue - message.acknowledge() - future = self._reply_waiting_futures.pop(message.msg_id, None) - if future is not None: - future.set_result(message) - except pika_drv_exc.EstablishConnectionException: - LOG.exception("Problem during establishing connection for " - "reply polling") - time.sleep(self._pika_engine.host_connection_reconnect_delay) + + for message in messages: + try: + message.acknowledge() + future = self._reply_waiting_futures.pop( + message.msg_id, None + ) + if future is not None: + future.set_result(message) + except Exception: + LOG.exception("Unexpected exception during processing" + "reply message") except BaseException: LOG.exception("Unexpected exception during reply polling") diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index 185c8d02a..1390ced75 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import threading import time @@ -31,27 +30,30 @@ class PikaPoller(object): connectivity related problem detected """ - def __init__(self, pika_engine, prefetch_count): + def __init__(self, pika_engine, prefetch_count, + incoming_message_class=pika_drv_msg.PikaIncomingMessage): """Initialize required fields :param pika_engine: PikaEngine, shared object with configuration and shared driver functionality :param prefetch_count: Integer, maximum count of unacknowledged messages which RabbitMQ broker sends to this consumer + :param incoming_message_class: PikaIncomingMessage, wrapper for + consumed RabbitMQ message """ self._pika_engine = pika_engine + self._prefetch_count = prefetch_count + self._incoming_message_class = incoming_message_class self._connection = None self._channel = None self._lock = threading.Lock() - self._prefetch_count = prefetch_count - self._started = False self._queues_to_consume = None - self._message_queue = collections.deque() + self._message_queue = [] def _reconnect(self): """Performs reconnection to the broker. It is unsafe method for @@ -106,7 +108,10 @@ class PikaPoller(object): no_ack=True mode """ self._message_queue.append( - (self._channel, method, properties, body, True) + self._incoming_message_class( + self._pika_engine, self._channel, method, properties, body, + True + ) ) def _on_message_with_ack_callback(self, unused, method, properties, body): @@ -114,7 +119,10 @@ class PikaPoller(object): no_ack=False mode """ self._message_queue.append( - (self._channel, method, properties, body, False) + self._incoming_message_class( + self._pika_engine, self._channel, method, properties, body, + False + ) ) def _cleanup(self): @@ -137,17 +145,19 @@ class PikaPoller(object): LOG.exception("Unexpected error during closing connection") self._connection = None - def poll(self, timeout=None): + def poll(self, timeout=None, prefetch_size=1): """Main method of this class - consumes message from RabbitMQ :param: timeout: float, seconds, timeout for waiting new incoming message, None means wait forever - :return: tuple, RabbitMQ message related data - (channel, method, properties, body, no_ack) + :param: prefetch_size: Integer, count of messages which we are want to + poll. It blocks until prefetch_size messages are consumed or until + timeout gets expired + :return: list of PikaIncomingMessage, RabbitMQ messages """ expiration_time = time.time() + timeout if timeout else None - while not self._message_queue: + while len(self._message_queue) < prefetch_size: with self._lock: if not self._started: return None @@ -162,15 +172,17 @@ class PikaPoller(object): self._connection.process_data_events( time_limit=0.25 ) - except Exception: + except Exception as e: + LOG.warn("Exception during consuming message. " + str(e)) self._cleanup() - raise if timeout is not None: timeout = expiration_time - time.time() if timeout <= 0: - return None + break - return self._message_queue.popleft() + result = self._message_queue[:prefetch_size] + self._message_queue = self._message_queue[prefetch_size:] + return result def start(self): """Starts poller. Should be called before polling to allow message @@ -226,7 +238,9 @@ class RpcServicePikaPoller(PikaPoller): self._target = target super(RpcServicePikaPoller, self).__init__( - pika_engine, prefetch_count=prefetch_count) + pika_engine, prefetch_count=prefetch_count, + incoming_message_class=pika_drv_msg.RpcPikaIncomingMessage + ) def _declare_queue_binding(self): """Overrides base method and perform declaration of RabbitMQ exchanges @@ -274,21 +288,6 @@ class RpcServicePikaPoller(PikaPoller): ) return queues_to_consume - def poll(self, timeout=None): - """Overrides base method and wrap RabbitMQ message into - RpcPikaIncomingMessage - - :param: timeout: float, seconds, timeout for waiting new incoming - message, None means wait forever - :return: RpcPikaIncomingMessage, consumed RPC message - """ - msg = super(RpcServicePikaPoller, self).poll(timeout) - if msg is None: - return None - return pika_drv_msg.RpcPikaIncomingMessage( - self._pika_engine, *msg - ) - class RpcReplyPikaPoller(PikaPoller): """PikaPoller implementation for polling RPC reply messages. Overrides @@ -309,7 +308,8 @@ class RpcReplyPikaPoller(PikaPoller): self._queue = queue super(RpcReplyPikaPoller, self).__init__( - pika_engine, prefetch_count + pika_engine=pika_engine, prefetch_count=prefetch_count, + incoming_message_class=pika_drv_msg.RpcReplyPikaIncomingMessage ) def _declare_queue_binding(self): @@ -355,21 +355,6 @@ class RpcReplyPikaPoller(PikaPoller): retrier(self.reconnect)() - def poll(self, timeout=None): - """Overrides base method and wrap RabbitMQ message into - RpcReplyPikaIncomingMessage - - :param: timeout: float, seconds, timeout for waiting new incoming - message, None means wait forever - :return: RpcReplyPikaIncomingMessage, consumed RPC reply message - """ - msg = super(RpcReplyPikaPoller, self).poll(timeout) - if msg is None: - return None - return pika_drv_msg.RpcReplyPikaIncomingMessage( - self._pika_engine, *msg - ) - class NotificationPikaPoller(PikaPoller): """PikaPoller implementation for polling Notification messages. Overrides @@ -421,18 +406,3 @@ class NotificationPikaPoller(PikaPoller): queues_to_consume[queue] = False return queues_to_consume - - def poll(self, timeout=None): - """Overrides base method and wrap RabbitMQ message into - PikaIncomingMessage - - :param: timeout: float, seconds, timeout for waiting new incoming - message, None means wait forever - :return: PikaIncomingMessage, consumed Notification message - """ - msg = super(NotificationPikaPoller, self).poll(timeout) - if msg is None: - return None - return pika_drv_msg.PikaIncomingMessage( - self._pika_engine, *msg - )