diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index ca76aaae2..1f25da68c 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -168,6 +168,12 @@ rabbit_opts = [ default=2, help='How often times during the heartbeat_timeout_threshold ' 'we check the heartbeat.'), + cfg.IntOpt('direct_mandatory_flag', + default=True, + help='Enable/Disable the RabbitMQ mandatory flag ' + 'for direct send. The direct send is used as reply,' + 'so the MessageUndeliverable exception is raised' + ' in case the client queue does not exist.'), ] LOG = logging.getLogger(__name__) @@ -492,6 +498,7 @@ class Connection(object): # if it was already monkey patched by eventlet/greenlet. global threading threading = stdlib_threading + self.direct_mandatory_flag = driver_conf.direct_mandatory_flag if self.ssl: self.ssl_version = driver_conf.ssl_version @@ -1291,9 +1298,11 @@ class Connection(object): durable=False, auto_delete=True, passive=True) - + options = oslo_messaging.TransportOptions( + at_least_once=self.direct_mandatory_flag) self._ensure_publishing(self._publish_and_raises_on_missing_exchange, - exchange, msg, routing_key=msg_id) + exchange, msg, routing_key=msg_id, + transport_options=options) def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None, transport_options=None): diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 9bb6f6382..b16d77fa6 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -1,4 +1,3 @@ - # Copyright 2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -124,6 +123,7 @@ A simple example of an RPC server with multiple endpoints might be:: import logging import sys +from oslo_messaging import exceptions from oslo_messaging.rpc import dispatcher as rpc_dispatcher from oslo_messaging import server as msg_server from oslo_messaging import transport as msg_transport @@ -178,13 +178,19 @@ class RPCServer(msg_server.MessageHandlingServer): message.reply(res) else: message.reply(failure=failure) + except exceptions.MessageUndeliverable as e: + LOG.exception( + "MessageUndeliverable error, " + "source exception: %s, routing_key: %s, exchange: %s: ", + e.exception, e.routing_key, e.exchange + ) except Exception: LOG.exception("Can not send reply for message") finally: - # NOTE(dhellmann): Remove circular object reference - # between the current stack frame and the traceback in - # exc_info. - del failure + # NOTE(dhellmann): Remove circular object reference + # between the current stack frame and the traceback in + # exc_info. + del failure def get_rpc_server(transport, target, endpoints, @@ -222,6 +228,7 @@ def expected_exceptions(*exceptions): ExpectedException, which is used internally by the RPC sever. The RPC client will see the original exception type. """ + def outer(func): def inner(*args, **kwargs): try: @@ -234,7 +241,9 @@ def expected_exceptions(*exceptions): # ignored and thrown as normal. except exceptions: raise rpc_dispatcher.ExpectedException() + return inner + return outer