Remove server queue creating if target's server is empty

If target passed to MessageHandlerServer doesn't contain server,
It is not needed to create queue for server.
Also this patch removes mandatory flag for sending message during fanout
cast because is is possible to cast fanout messages when nobody listen
queue and this situation is ok.

Change-Id: I4f8930e7b939c62bc1e97b4acaba6884fe1b97b2
This commit is contained in:
dukhlov 2016-02-10 17:40:09 +02:00 committed by Dmitriy Ukhlov
parent 2a226ab270
commit 36c3965dae
2 changed files with 25 additions and 21 deletions

View File

@ -548,7 +548,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
else: else:
self._do_send( self._do_send(
exchange=exchange, routing_key=queue, msg_dict=msg_dict, exchange=exchange, routing_key=queue, msg_dict=msg_dict,
msg_props=msg_props, confirm=True, mandatory=True, msg_props=msg_props, confirm=True, mandatory=not target.fanout,
persistent=False, expiration_time=expiration_time, persistent=False, expiration_time=expiration_time,
retrier=retrier retrier=retrier
) )

View File

@ -263,34 +263,38 @@ class RpcServicePikaPoller(PikaPoller):
exchange = self._pika_engine.get_rpc_exchange_name( exchange = self._pika_engine.get_rpc_exchange_name(
self._target.exchange, self._target.topic, False, no_ack self._target.exchange, self._target.topic, False, no_ack
) )
fanout_exchange = self._pika_engine.get_rpc_exchange_name(
self._target.exchange, self._target.topic, True, no_ack
)
queue = self._pika_engine.get_rpc_queue_name( queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, None, no_ack self._target.topic, None, no_ack
) )
server_queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, self._target.server, no_ack
)
queues_to_consume[queue] = no_ack
queues_to_consume[server_queue] = no_ack
self._pika_engine.declare_queue_binding_by_channel( self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=exchange, queue=queue, channel=self._channel, exchange=exchange, queue=queue,
routing_key=queue, exchange_type='direct', durable=False, routing_key=queue, exchange_type='direct', durable=False,
queue_expiration=queue_expiration queue_expiration=queue_expiration
) )
queues_to_consume[queue] = no_ack
if self._target.server:
server_queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, self._target.server, no_ack
)
self._pika_engine.declare_queue_binding_by_channel( self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=exchange, queue=server_queue, channel=self._channel, exchange=exchange, durable=False,
routing_key=server_queue, exchange_type='direct', queue=server_queue, routing_key=server_queue,
exchange_type='direct', queue_expiration=queue_expiration
)
queues_to_consume[server_queue] = no_ack
fanout_exchange = self._pika_engine.get_rpc_exchange_name(
self._target.exchange, self._target.topic, True, no_ack
)
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=fanout_exchange,
queue=server_queue, routing_key="", exchange_type='fanout',
queue_expiration=queue_expiration, durable=False queue_expiration=queue_expiration, durable=False
) )
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=fanout_exchange, durable=False,
queue=server_queue, routing_key="", exchange_type='fanout',
queue_expiration=queue_expiration
)
return queues_to_consume return queues_to_consume