From 36c3965daeb6557fa07e1318d5a9acf906401519 Mon Sep 17 00:00:00 2001 From: dukhlov Date: Wed, 10 Feb 2016 17:40:09 +0200 Subject: [PATCH] Remove server queue creating if target's server is empty If target passed to MessageHandlerServer doesn't contain server, It is not needed to create queue for server. Also this patch removes mandatory flag for sending message during fanout cast because is is possible to cast fanout messages when nobody listen queue and this situation is ok. Change-Id: I4f8930e7b939c62bc1e97b4acaba6884fe1b97b2 --- .../_drivers/pika_driver/pika_message.py | 2 +- .../_drivers/pika_driver/pika_poller.py | 44 ++++++++++--------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index 5bffd514a..526167e1f 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -548,7 +548,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): else: self._do_send( exchange=exchange, routing_key=queue, msg_dict=msg_dict, - msg_props=msg_props, confirm=True, mandatory=True, + msg_props=msg_props, confirm=True, mandatory=not target.fanout, persistent=False, expiration_time=expiration_time, retrier=retrier ) diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index 1bb21c576..9bace76f6 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -263,34 +263,38 @@ class RpcServicePikaPoller(PikaPoller): exchange = self._pika_engine.get_rpc_exchange_name( self._target.exchange, self._target.topic, False, no_ack ) - fanout_exchange = self._pika_engine.get_rpc_exchange_name( - self._target.exchange, self._target.topic, True, no_ack - ) + queue = self._pika_engine.get_rpc_queue_name( self._target.topic, None, no_ack ) - server_queue = self._pika_engine.get_rpc_queue_name( - self._target.topic, self._target.server, no_ack - ) - - queues_to_consume[queue] = no_ack - queues_to_consume[server_queue] = no_ack - self._pika_engine.declare_queue_binding_by_channel( channel=self._channel, exchange=exchange, queue=queue, routing_key=queue, exchange_type='direct', durable=False, queue_expiration=queue_expiration ) - self._pika_engine.declare_queue_binding_by_channel( - channel=self._channel, exchange=exchange, queue=server_queue, - routing_key=server_queue, exchange_type='direct', - queue_expiration=queue_expiration, durable=False - ) - self._pika_engine.declare_queue_binding_by_channel( - channel=self._channel, exchange=fanout_exchange, durable=False, - queue=server_queue, routing_key="", exchange_type='fanout', - queue_expiration=queue_expiration - ) + queues_to_consume[queue] = no_ack + + if self._target.server: + server_queue = self._pika_engine.get_rpc_queue_name( + self._target.topic, self._target.server, no_ack + ) + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=exchange, durable=False, + queue=server_queue, routing_key=server_queue, + exchange_type='direct', queue_expiration=queue_expiration + ) + queues_to_consume[server_queue] = no_ack + + fanout_exchange = self._pika_engine.get_rpc_exchange_name( + self._target.exchange, self._target.topic, True, no_ack + ) + + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=fanout_exchange, + queue=server_queue, routing_key="", exchange_type='fanout', + queue_expiration=queue_expiration, durable=False + ) + return queues_to_consume