From 83a08d4b7ed36019f17a070a43ba6e4863cafe34 Mon Sep 17 00:00:00 2001 From: Dmitriy Ukhlov Date: Mon, 14 Dec 2015 11:36:28 +0200 Subject: [PATCH] Adds unit tests for pika_poll module Change-Id: I69cc0e0302382ab45ba464bb5993300d44679106 --- .../_drivers/pika_driver/pika_engine.py | 3 + .../_drivers/pika_driver/pika_message.py | 33 +- .../_drivers/pika_driver/pika_poller.py | 79 ++- .../tests/drivers/pika/test_message.py | 28 +- .../tests/drivers/pika/test_poller.py | 536 ++++++++++++++++++ 5 files changed, 605 insertions(+), 74 deletions(-) create mode 100644 oslo_messaging/tests/drivers/pika/test_poller.py diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index 6e877bb2e..4f38295a8 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -200,6 +200,9 @@ class PikaEngine(object): self._connection_host_param_list = [] self._connection_host_status_list = [] + if not url.hosts: + raise ValueError("You should provide at least one RabbitMQ host") + for transport_host in url.hosts: pika_params = common_pika_params.copy() pika_params.update( diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index edd5c7328..eac2be938 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -72,18 +72,17 @@ class PikaIncomingMessage(object): information from RabbitMQ message and provide access to it """ - def __init__(self, pika_engine, channel, method, properties, body, no_ack): + def __init__(self, pika_engine, channel, method, properties, body): """Parse RabbitMQ message :param pika_engine: PikaEngine, shared object with configuration and shared driver functionality :param channel: Channel, RabbitMQ channel which was used for - this message delivery + this message delivery, used for sending ack back. + If None - ack is not required :param method: Method, RabbitMQ message method :param properties: Properties, RabbitMQ message properties :param body: Bytes, RabbitMQ message body - :param no_ack: Boolean, defines should this message be acked by - consumer or not """ headers = getattr(properties, "headers", {}) version = headers.get(_VERSION_HEADER, None) @@ -93,7 +92,6 @@ class PikaIncomingMessage(object): "{}".format(version, _VERSION)) self._pika_engine = pika_engine - self._no_ack = no_ack self._channel = channel self._delivery_tag = method.delivery_tag @@ -128,12 +126,15 @@ class PikaIncomingMessage(object): self.message = message_dict self.ctxt = context_dict + def need_ack(self): + return self._channel is not None + def acknowledge(self): """Ack the message. Should be called by message processing logic when it considered as consumed (means that we don't need redelivery of this message anymore) """ - if not self._no_ack: + if self.need_ack(): self._channel.basic_ack(delivery_tag=self._delivery_tag) def requeue(self): @@ -141,7 +142,7 @@ class PikaIncomingMessage(object): when it can not process the message right now and should be redelivered later if it is possible """ - if not self._no_ack: + if self.need_ack(): return self._channel.basic_nack(delivery_tag=self._delivery_tag, requeue=True) @@ -152,22 +153,21 @@ class RpcPikaIncomingMessage(PikaIncomingMessage): method added to allow consumer to send RPC reply back to the RPC client """ - def __init__(self, pika_engine, channel, method, properties, body, no_ack): + def __init__(self, pika_engine, channel, method, properties, body): """Defines default values of msg_id and reply_q fields and just call super.__init__ method :param pika_engine: PikaEngine, shared object with configuration and shared driver functionality :param channel: Channel, RabbitMQ channel which was used for - this message delivery + this message delivery, used for sending ack back. + If None - ack is not required :param method: Method, RabbitMQ message method :param properties: Properties, RabbitMQ message properties :param body: Bytes, RabbitMQ message body - :param no_ack: Boolean, defines should this message be acked by - consumer or not """ super(RpcPikaIncomingMessage, self).__init__( - pika_engine, channel, method, properties, body, no_ack + pika_engine, channel, method, properties, body ) self.reply_q = properties.reply_to self.msg_id = properties.correlation_id @@ -231,7 +231,7 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage): """PikaIncomingMessage implementation for RPC reply messages. It expects extra RPC reply related fields in message body (result and failure). """ - def __init__(self, pika_engine, channel, method, properties, body, no_ack): + def __init__(self, pika_engine, channel, method, properties, body): """Defines default values of result and failure fields, call super.__init__ method and then construct Exception object if failure is not None @@ -239,15 +239,14 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage): :param pika_engine: PikaEngine, shared object with configuration and shared driver functionality :param channel: Channel, RabbitMQ channel which was used for - this message delivery + this message delivery, used for sending ack back. + If None - ack is not required :param method: Method, RabbitMQ message method :param properties: Properties, RabbitMQ message properties :param body: Bytes, RabbitMQ message body - :param no_ack: Boolean, defines should this message be acked by - consumer or not """ super(RpcReplyPikaIncomingMessage, self).__init__( - pika_engine, channel, method, properties, body, no_ack + pika_engine, channel, method, properties, body ) self.msg_id = properties.correlation_id diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index 5aa948a2e..3533dad2f 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -31,8 +31,7 @@ class PikaPoller(object): connectivity related problem detected """ - def __init__(self, pika_engine, prefetch_count, - incoming_message_class=pika_drv_msg.PikaIncomingMessage): + def __init__(self, pika_engine, prefetch_count, incoming_message_class): """Initialize required fields :param pika_engine: PikaEngine, shared object with configuration and @@ -110,8 +109,7 @@ class PikaPoller(object): """ self._message_queue.append( self._incoming_message_class( - self._pika_engine, self._channel, method, properties, body, - True + self._pika_engine, None, method, properties, body ) ) @@ -121,8 +119,7 @@ class PikaPoller(object): """ self._message_queue.append( self._incoming_message_class( - self._pika_engine, self._channel, method, properties, body, - False + self._pika_engine, self._channel, method, properties, body ) ) @@ -146,6 +143,11 @@ class PikaPoller(object): LOG.exception("Unexpected error during closing connection") self._connection = None + for i in xrange(len(self._message_queue) - 1, -1, -1): + message = self._message_queue[i] + if message.need_ack(): + del self._message_queue[i] + def poll(self, timeout=None, prefetch_size=1): """Main method of this class - consumes message from RabbitMQ @@ -158,32 +160,29 @@ class PikaPoller(object): """ expiration_time = time.time() + timeout if timeout else None - while len(self._message_queue) < prefetch_size: + while True: with self._lock: - if not self._started: - return None - - try: - if self._channel is None: - self._reconnect() - # we need some time_limit here, not too small to avoid a - # lot of not needed iterations but not too large to release - # lock time to time and give a chance to perform another - # method waiting this lock - self._connection.process_data_events( - time_limit=0.25 - ) - except Exception as e: - LOG.warn("Exception during consuming message. " + str(e)) - self._cleanup() - if timeout is not None: - timeout = expiration_time - time.time() - if timeout <= 0: - break - - result = self._message_queue[:prefetch_size] - self._message_queue = self._message_queue[prefetch_size:] - return result + if timeout is not None: + timeout = expiration_time - time.time() + if (len(self._message_queue) < prefetch_size and + self._started and ((timeout is None) or timeout > 0)): + try: + if self._channel is None: + self._reconnect() + # we need some time_limit here, not too small to avoid + # a lot of not needed iterations but not too large to + # release lock time to time and give a chance to + # perform another method waiting this lock + self._connection.process_data_events( + time_limit=0.25 + ) + except pika_pool.Connection.connectivity_errors: + self._cleanup() + raise + else: + result = self._message_queue[:prefetch_size] + del self._message_queue[:prefetch_size] + return result def start(self): """Starts poller. Should be called before polling to allow message @@ -201,7 +200,6 @@ class PikaPoller(object): return self._started = False - self._cleanup() def reconnect(self): """Safe version of _reconnect. Performs reconnection to the broker.""" @@ -249,9 +247,7 @@ class RpcServicePikaPoller(PikaPoller): :return Dictionary, declared_queue_name -> no_ack_mode """ - queue_expiration = ( - self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration - ) + queue_expiration = self._pika_engine.rpc_queue_expiration queues_to_consume = {} @@ -319,15 +315,11 @@ class RpcReplyPikaPoller(PikaPoller): :return Dictionary, declared_queue_name -> no_ack_mode """ - queue_expiration = ( - self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration - ) - self._pika_engine.declare_queue_binding_by_channel( channel=self._channel, exchange=self._exchange, queue=self._queue, routing_key=self._queue, exchange_type='direct', - queue_expiration=queue_expiration, + queue_expiration=self._pika_engine.rpc_queue_expiration, durable=False ) @@ -363,8 +355,8 @@ class NotificationPikaPoller(PikaPoller): """ def __init__(self, pika_engine, targets_and_priorities, queue_name=None, prefetch_count=100): - """Adds exchange and queue parameter for declaring exchange and queue - used for RPC reply delivery + """Adds targets_and_priorities and queue_name parameter + for declaring exchanges and queues used for notification delivery :param pika_engine: PikaEngine, shared object with configuration and shared driver functionality @@ -379,7 +371,8 @@ class NotificationPikaPoller(PikaPoller): self._queue_name = queue_name super(NotificationPikaPoller, self).__init__( - pika_engine, prefetch_count=prefetch_count + pika_engine, prefetch_count=prefetch_count, + incoming_message_class=pika_drv_msg.PikaIncomingMessage ) def _declare_queue_binding(self): diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py index 5008ce36e..3c3f87e39 100644 --- a/oslo_messaging/tests/drivers/pika/test_message.py +++ b/oslo_messaging/tests/drivers/pika/test_message.py @@ -46,7 +46,7 @@ class PikaIncomingMessageTestCase(unittest.TestCase): def test_message_body_parsing(self): message = pika_drv_msg.PikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - self._body, True + self._body ) self.assertEqual(message.ctxt.get("key_context", None), @@ -57,7 +57,7 @@ class PikaIncomingMessageTestCase(unittest.TestCase): def test_message_acknowledge(self): message = pika_drv_msg.PikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - self._body, False + self._body ) message.acknowledge() @@ -68,8 +68,8 @@ class PikaIncomingMessageTestCase(unittest.TestCase): def test_message_acknowledge_no_ack(self): message = pika_drv_msg.PikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body, True + self._pika_engine, None, self._method, self._properties, + self._body ) message.acknowledge() @@ -79,7 +79,7 @@ class PikaIncomingMessageTestCase(unittest.TestCase): def test_message_requeue(self): message = pika_drv_msg.PikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - self._body, False + self._body ) message.requeue() @@ -90,8 +90,8 @@ class PikaIncomingMessageTestCase(unittest.TestCase): def test_message_requeue_no_ack(self): message = pika_drv_msg.PikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body, True + self._pika_engine, None, self._method, self._properties, + self._body ) message.requeue() @@ -126,7 +126,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): message = pika_drv_msg.RpcPikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - self._body, True + self._body ) self.assertEqual(message.ctxt.get("key_context", None), @@ -140,7 +140,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): def test_cast_message_body_parsing(self): message = pika_drv_msg.RpcPikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - self._body, True + self._body ) self.assertEqual(message.ctxt.get("key_context", None), @@ -156,7 +156,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): def test_reply_for_cast_message(self, send_reply_mock): message = pika_drv_msg.RpcPikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - self._body, True + self._body ) self.assertEqual(message.ctxt.get("key_context", None), @@ -182,7 +182,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): message = pika_drv_msg.RpcPikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - self._body, True + self._body ) self.assertEqual(message.ctxt.get("key_context", None), @@ -218,7 +218,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): message = pika_drv_msg.RpcPikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - self._body, True + self._body ) self.assertEqual(message.ctxt.get("key_context", None), @@ -274,7 +274,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase): message = pika_drv_msg.RpcReplyPikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - body, True + body ) self.assertEqual(message.msg_id, 123456789) @@ -294,7 +294,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase): message = pika_drv_msg.RpcReplyPikaIncomingMessage( self._pika_engine, self._channel, self._method, self._properties, - body, True + body ) self.assertEqual(message.msg_id, 123456789) diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py new file mode 100644 index 000000000..77a3b6b29 --- /dev/null +++ b/oslo_messaging/tests/drivers/pika/test_poller.py @@ -0,0 +1,536 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time +import unittest + +import mock + +from oslo_messaging._drivers.pika_driver import pika_poller + + +class PikaPollerTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._poller_connection_mock = mock.Mock() + self._poller_channel_mock = mock.Mock() + self._poller_connection_mock.channel.return_value = ( + self._poller_channel_mock + ) + self._pika_engine.create_connection.return_value = ( + self._poller_connection_mock + ) + self._prefetch_count = 123 + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." + "_declare_queue_binding") + def test_poll(self, declare_queue_binding_mock): + incoming_message_class_mock = mock.Mock() + poller = pika_poller.PikaPoller( + self._pika_engine, self._prefetch_count, + incoming_message_class=incoming_message_class_mock + ) + unused = object() + method = object() + properties = object() + body = object() + + self._poller_connection_mock.process_data_events.side_effect = ( + lambda time_limit: poller._on_message_with_ack_callback( + unused, method, properties, body + ) + ) + + poller.start() + res = poller.poll() + + self.assertEqual(len(res), 1) + + self.assertEqual(res[0], incoming_message_class_mock.return_value) + incoming_message_class_mock.assert_called_once_with( + self._pika_engine, self._poller_channel_mock, method, properties, + body + ) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + self.assertTrue(declare_queue_binding_mock.called) + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." + "_declare_queue_binding") + def test_poll_after_stop(self, declare_queue_binding_mock): + incoming_message_class_mock = mock.Mock() + poller = pika_poller.PikaPoller( + self._pika_engine, self._prefetch_count, + incoming_message_class=incoming_message_class_mock + ) + + n = 10 + params = [] + + for i in range(n): + params.append((object(), object(), object(), object())) + + index = [0] + + def f(time_limit): + for i in range(10): + poller._on_message_no_ack_callback( + *params[index[0]] + ) + index[0] += 1 + + self._poller_connection_mock.process_data_events.side_effect = f + + poller.start() + res = poller.poll(prefetch_size=1) + self.assertEqual(len(res), 1) + self.assertEqual(res[0], incoming_message_class_mock.return_value) + self.assertEqual( + incoming_message_class_mock.call_args_list[0][0], + (self._pika_engine, None) + params[0][1:] + ) + + poller.stop() + + res2 = poller.poll(prefetch_size=n) + + self.assertEqual(len(res2), n-1) + self.assertEqual(incoming_message_class_mock.call_count, n) + + self.assertEqual( + self._poller_connection_mock.process_data_events.call_count, 1) + + for i in range(n-1): + self.assertEqual(res2[i], incoming_message_class_mock.return_value) + self.assertEqual( + incoming_message_class_mock.call_args_list[i+1][0], + (self._pika_engine, None) + params[i+1][1:] + ) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + self.assertTrue(declare_queue_binding_mock.called) + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." + "_declare_queue_binding") + def test_poll_batch(self, declare_queue_binding_mock): + incoming_message_class_mock = mock.Mock() + poller = pika_poller.PikaPoller( + self._pika_engine, self._prefetch_count, + incoming_message_class=incoming_message_class_mock + ) + + n = 10 + params = [] + + for i in range(n): + params.append((object(), object(), object(), object())) + + index = [0] + + def f(time_limit): + poller._on_message_with_ack_callback( + *params[index[0]] + ) + index[0] += 1 + + self._poller_connection_mock.process_data_events.side_effect = f + + poller.start() + res = poller.poll(prefetch_size=n) + + self.assertEqual(len(res), n) + self.assertEqual(incoming_message_class_mock.call_count, n) + + for i in range(n): + self.assertEqual(res[i], incoming_message_class_mock.return_value) + self.assertEqual( + incoming_message_class_mock.call_args_list[i][0], + (self._pika_engine, self._poller_channel_mock) + params[i][1:] + ) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + self.assertTrue(declare_queue_binding_mock.called) + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." + "_declare_queue_binding") + def test_poll_batch_with_timeout(self, declare_queue_binding_mock): + incoming_message_class_mock = mock.Mock() + poller = pika_poller.PikaPoller( + self._pika_engine, self._prefetch_count, + incoming_message_class=incoming_message_class_mock + ) + + n = 10 + timeout = 1 + sleep_time = 0.2 + params = [] + + success_count = 5 + + for i in range(n): + params.append((object(), object(), object(), object())) + + index = [0] + + def f(time_limit): + time.sleep(sleep_time) + poller._on_message_with_ack_callback( + *params[index[0]] + ) + index[0] += 1 + + self._poller_connection_mock.process_data_events.side_effect = f + + poller.start() + res = poller.poll(prefetch_size=n, timeout=timeout) + + self.assertEqual(len(res), success_count) + self.assertEqual(incoming_message_class_mock.call_count, success_count) + + for i in range(success_count): + self.assertEqual(res[i], incoming_message_class_mock.return_value) + self.assertEqual( + incoming_message_class_mock.call_args_list[i][0], + (self._pika_engine, self._poller_channel_mock) + params[i][1:] + ) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + self.assertTrue(declare_queue_binding_mock.called) + + +class RpcServicePikaPollerTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._poller_connection_mock = mock.Mock() + self._poller_channel_mock = mock.Mock() + self._poller_connection_mock.channel.return_value = ( + self._poller_channel_mock + ) + self._pika_engine.create_connection.return_value = ( + self._poller_connection_mock + ) + + self._pika_engine.get_rpc_queue_name.side_effect = ( + lambda topic, server, no_ack: "_".join( + [topic, str(server), str(no_ack)] + ) + ) + + self._pika_engine.get_rpc_exchange_name.side_effect = ( + lambda exchange, topic, fanout, no_ack: "_".join( + [exchange, topic, str(fanout), str(no_ack)] + ) + ) + + self._prefetch_count = 123 + self._target = mock.Mock(exchange="exchange", topic="topic", + server="server") + self._pika_engine.rpc_queue_expiration = 12345 + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_message." + "RpcPikaIncomingMessage") + def test_declare_rpc_queue_bindings(self, rpc_pika_incoming_message_mock): + poller = pika_poller.RpcServicePikaPoller( + self._pika_engine, self._target, self._prefetch_count, + ) + self._poller_connection_mock.process_data_events.side_effect = ( + lambda time_limit: poller._on_message_with_ack_callback( + None, None, None, None + ) + ) + + poller.start() + res = poller.poll() + + self.assertEqual(len(res), 1) + + self.assertEqual(res[0], rpc_pika_incoming_message_mock.return_value) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + declare_queue_binding_by_channel_mock = ( + self._pika_engine.declare_queue_binding_by_channel + ) + + self.assertEqual( + declare_queue_binding_by_channel_mock.call_count, 6 + ) + + declare_queue_binding_by_channel_mock.assert_has_calls(( + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_False_True", + exchange_type='direct', + queue="topic_None_True", + queue_expiration=12345, + routing_key="topic_None_True" + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_False_True", + exchange_type='direct', + queue="topic_server_True", + queue_expiration=12345, + routing_key="topic_server_True" + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_True_True", + exchange_type='fanout', + queue="topic_server_True", + queue_expiration=12345, + routing_key='' + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_False_False", + exchange_type='direct', + queue="topic_None_False", + queue_expiration=12345, + routing_key="topic_None_False" + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_False_False", + exchange_type='direct', + queue="topic_server_False", + queue_expiration=12345, + routing_key="topic_server_False" + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_True_False", + exchange_type='fanout', + queue="topic_server_False", + queue_expiration=12345, + routing_key='' + ), + )) + + +class RpcReplyServicePikaPollerTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._poller_connection_mock = mock.Mock() + self._poller_channel_mock = mock.Mock() + self._poller_connection_mock.channel.return_value = ( + self._poller_channel_mock + ) + self._pika_engine.create_connection.return_value = ( + self._poller_connection_mock + ) + + self._prefetch_count = 123 + self._exchange = "rpc_reply_exchange" + self._queue = "rpc_reply_queue" + + self._pika_engine.rpc_reply_retry_delay = 12132543456 + + self._pika_engine.rpc_queue_expiration = 12345 + self._pika_engine.rpc_reply_retry_attempts = 3 + + def test_start(self): + poller = pika_poller.RpcReplyPikaPoller( + self._pika_engine, self._exchange, self._queue, + self._prefetch_count, + ) + + poller.start() + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + def test_declare_rpc_reply_queue_binding(self): + poller = pika_poller.RpcReplyPikaPoller( + self._pika_engine, self._exchange, self._queue, + self._prefetch_count, + ) + + poller.start() + + declare_queue_binding_by_channel_mock = ( + self._pika_engine.declare_queue_binding_by_channel + ) + + self.assertEqual( + declare_queue_binding_by_channel_mock.call_count, 1 + ) + + declare_queue_binding_by_channel_mock.assert_called_once_with( + channel=self._poller_channel_mock, durable=False, + exchange='rpc_reply_exchange', exchange_type='direct', + queue='rpc_reply_queue', queue_expiration=12345, + routing_key='rpc_reply_queue' + ) + + +class NotificationPikaPollerTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._poller_connection_mock = mock.Mock() + self._poller_channel_mock = mock.Mock() + self._poller_connection_mock.channel.return_value = ( + self._poller_channel_mock + ) + self._pika_engine.create_connection.return_value = ( + self._poller_connection_mock + ) + + self._prefetch_count = 123 + self._target_and_priorities = ( + ( + mock.Mock(exchange="exchange1", topic="topic1", + server="server1"), 1 + ), + ( + mock.Mock(exchange="exchange1", topic="topic1"), 2 + ), + ( + mock.Mock(exchange="exchange2", topic="topic2",), 1 + ), + ) + self._pika_engine.notification_persistence = object() + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_message." + "PikaIncomingMessage") + def test_declare_notification_queue_bindings_default_queue( + self, pika_incoming_message_mock): + poller = pika_poller.NotificationPikaPoller( + self._pika_engine, self._target_and_priorities, None, + self._prefetch_count, + ) + self._poller_connection_mock.process_data_events.side_effect = ( + lambda time_limit: poller._on_message_with_ack_callback( + None, None, None, None + ) + ) + + poller.start() + res = poller.poll() + + self.assertEqual(len(res), 1) + + self.assertEqual(res[0], pika_incoming_message_mock.return_value) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + declare_queue_binding_by_channel_mock = ( + self._pika_engine.declare_queue_binding_by_channel + ) + + self.assertEqual( + declare_queue_binding_by_channel_mock.call_count, 3 + ) + + declare_queue_binding_by_channel_mock.assert_has_calls(( + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange1", + exchange_type='direct', + queue="topic1.1", + queue_expiration=None, + routing_key="topic1.1" + ), + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange1", + exchange_type='direct', + queue="topic1.2", + queue_expiration=None, + routing_key="topic1.2" + ), + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange2", + exchange_type='direct', + queue="topic2.1", + queue_expiration=None, + routing_key="topic2.1" + ) + )) + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_message." + "PikaIncomingMessage") + def test_declare_notification_queue_bindings_custom_queue( + self, pika_incoming_message_mock): + poller = pika_poller.NotificationPikaPoller( + self._pika_engine, self._target_and_priorities, + "custom_queue_name", self._prefetch_count + ) + self._poller_connection_mock.process_data_events.side_effect = ( + lambda time_limit: poller._on_message_with_ack_callback( + None, None, None, None + ) + ) + + poller.start() + res = poller.poll() + + self.assertEqual(len(res), 1) + + self.assertEqual(res[0], pika_incoming_message_mock.return_value) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + declare_queue_binding_by_channel_mock = ( + self._pika_engine.declare_queue_binding_by_channel + ) + + self.assertEqual( + declare_queue_binding_by_channel_mock.call_count, 3 + ) + + declare_queue_binding_by_channel_mock.assert_has_calls(( + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange1", + exchange_type='direct', + queue="custom_queue_name", + queue_expiration=None, + routing_key="topic1.1" + ), + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange1", + exchange_type='direct', + queue="custom_queue_name", + queue_expiration=None, + routing_key="topic1.2" + ), + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange2", + exchange_type='direct', + queue="custom_queue_name", + queue_expiration=None, + routing_key="topic2.1" + ) + ))