Browse Source

Add the mandatory flag for direct send

With this feature, the server will raise and log a Message Undeliverable
exception. So it is possible to log immediately an error in case the
reply queue does not exist for some reason.

This is part of blueprint transport-options
The blueprint link is [1]
Please follow the link [2] to use and test the feature.

1-
https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options
2- https://github.com/Gsantomaggio/rabbitmq-utils/
tree/master/openstack/mandatory_test

Change-Id: Iac7474c06ef425a2afe5bcd912e51510ba1c8fb3
tags/10.2.0^0
Gabriele 3 months ago
parent
commit
b7e9faf659
No account linked to committer's email address
2 changed files with 25 additions and 7 deletions
  1. 11
    2
      oslo_messaging/_drivers/impl_rabbit.py
  2. 14
    5
      oslo_messaging/rpc/server.py

+ 11
- 2
oslo_messaging/_drivers/impl_rabbit.py View File

@@ -168,6 +168,12 @@ rabbit_opts = [
168 168
                default=2,
169 169
                help='How often times during the heartbeat_timeout_threshold '
170 170
                'we check the heartbeat.'),
171
+    cfg.IntOpt('direct_mandatory_flag',
172
+               default=True,
173
+               help='Enable/Disable the RabbitMQ mandatory flag '
174
+               'for direct send. The direct send is used as reply,'
175
+               'so the MessageUndeliverable exception is raised'
176
+               ' in case the client queue does not exist.'),
171 177
 ]
172 178
 
173 179
 LOG = logging.getLogger(__name__)
@@ -492,6 +498,7 @@ class Connection(object):
492 498
             # if it was already monkey patched by eventlet/greenlet.
493 499
             global threading
494 500
             threading = stdlib_threading
501
+        self.direct_mandatory_flag = driver_conf.direct_mandatory_flag
495 502
 
496 503
         if self.ssl:
497 504
             self.ssl_version = driver_conf.ssl_version
@@ -1291,9 +1298,11 @@ class Connection(object):
1291 1298
                                          durable=False,
1292 1299
                                          auto_delete=True,
1293 1300
                                          passive=True)
1294
-
1301
+        options = oslo_messaging.TransportOptions(
1302
+            at_least_once=self.direct_mandatory_flag)
1295 1303
         self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
1296
-                                exchange, msg, routing_key=msg_id)
1304
+                                exchange, msg, routing_key=msg_id,
1305
+                                transport_options=options)
1297 1306
 
1298 1307
     def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None,
1299 1308
                    transport_options=None):

+ 14
- 5
oslo_messaging/rpc/server.py View File

@@ -1,4 +1,3 @@
1
-
2 1
 # Copyright 2013 Red Hat, Inc.
3 2
 #
4 3
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -124,6 +123,7 @@ A simple example of an RPC server with multiple endpoints might be::
124 123
 import logging
125 124
 import sys
126 125
 
126
+from oslo_messaging import exceptions
127 127
 from oslo_messaging.rpc import dispatcher as rpc_dispatcher
128 128
 from oslo_messaging import server as msg_server
129 129
 from oslo_messaging import transport as msg_transport
@@ -178,13 +178,19 @@ class RPCServer(msg_server.MessageHandlingServer):
178 178
                 message.reply(res)
179 179
             else:
180 180
                 message.reply(failure=failure)
181
+        except exceptions.MessageUndeliverable as e:
182
+            LOG.exception(
183
+                "MessageUndeliverable error, "
184
+                "source exception: %s, routing_key: %s, exchange: %s: ",
185
+                e.exception, e.routing_key, e.exchange
186
+            )
181 187
         except Exception:
182 188
             LOG.exception("Can not send reply for message")
183 189
         finally:
184
-                # NOTE(dhellmann): Remove circular object reference
185
-                # between the current stack frame and the traceback in
186
-                # exc_info.
187
-                del failure
190
+            # NOTE(dhellmann): Remove circular object reference
191
+            # between the current stack frame and the traceback in
192
+            # exc_info.
193
+            del failure
188 194
 
189 195
 
190 196
 def get_rpc_server(transport, target, endpoints,
@@ -222,6 +228,7 @@ def expected_exceptions(*exceptions):
222 228
     ExpectedException, which is used internally by the RPC sever. The RPC
223 229
     client will see the original exception type.
224 230
     """
231
+
225 232
     def outer(func):
226 233
         def inner(*args, **kwargs):
227 234
             try:
@@ -234,7 +241,9 @@ def expected_exceptions(*exceptions):
234 241
             # ignored and thrown as normal.
235 242
             except exceptions:
236 243
                 raise rpc_dispatcher.ExpectedException()
244
+
237 245
         return inner
246
+
238 247
     return outer
239 248
 
240 249
 

Loading…
Cancel
Save