From e804874c50fd8cfbfca2d982a4932fdd8844c3f1 Mon Sep 17 00:00:00 2001 From: Gabriele Date: Wed, 19 Jun 2019 09:51:55 +0200 Subject: [PATCH] Implement the transport options With this feature, it is possible to specialize the parameters to send. `options = oslo_messaging.TransportOptions(at_least_once=True)` TransportOptions is used in every single driver, for example in RabbitMQ driver is used to handle the mandatory flag. Notes: - The idea of creating a new class TransportOptions is because I'd like to have an abstract class not related only to the RPCClient - at_least_once is the first parameter, when needed we can add the others. Implements: blueprint transport-options (second point) The blueprint link is [1] To test it you can use [2] 1- https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options 2- https://github.com/Gsantomaggio/rabbitmq-utils/ tree/master/openstack/mandatory_test Change-Id: I1858e4a990507d3c2bac2ef7fbef75d8c2dbfce2 --- oslo_messaging/_drivers/impl_rabbit.py | 15 ++++++------- .../tests/drivers/test_impl_rabbit.py | 2 ++ oslo_messaging/tests/rpc/test_client.py | 21 ++++++++++++------- oslo_messaging/transport.py | 11 ++++++++++ 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index aedb163c3..0edf45a7f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -49,7 +49,6 @@ from oslo_messaging import exceptions # NOTE(sileht): don't exists in py2 socket module TCP_USER_TIMEOUT = 18 - rabbit_opts = [ cfg.BoolOpt('ssl', default=False, @@ -1150,15 +1149,17 @@ class Connection(object): 'transport_options': str(transport_options)} LOG.trace('Connection._publish: sending message %(msg)s to' ' %(who)s with routing key %(key)s', log_info) - # NOTE(sileht): no need to wait more, caller expects # a answer before timeout is reached with self._transport_socket_timeout(timeout): - self._producer.publish(msg, - exchange=exchange, - routing_key=routing_key, - expiration=timeout, - compression=self.kombu_compression) + self._producer.publish( + msg, + mandatory=transport_options.at_least_once if + transport_options else False, + exchange=exchange, + routing_key=routing_key, + expiration=timeout, + compression=self.kombu_compression) def _publish_and_creates_default_queue(self, exchange, msg, routing_key=None, timeout=None, diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index a18da1535..5f302db30 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -199,6 +199,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase): 'msg', expiration=1, exchange=exchange_mock, compression=self.conf.oslo_messaging_rabbit.kombu_compression, + mandatory=False, routing_key='routing_key') @mock.patch('kombu.messaging.Producer.publish') @@ -212,6 +213,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase): conn._publish(exchange_mock, 'msg', routing_key='routing_key') fake_publish.assert_called_with( 'msg', expiration=None, + mandatory=False, compression=self.conf.oslo_messaging_rabbit.kombu_compression, exchange=exchange_mock, routing_key='routing_key') diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py index ec22d705c..6c54625fe 100755 --- a/oslo_messaging/tests/rpc/test_client.py +++ b/oslo_messaging/tests/rpc/test_client.py @@ -42,14 +42,15 @@ class TestCastCall(test_utils.BaseTestCase): def test_cast_call(self): self.config(rpc_response_timeout=None) - + transport_options = oslo_messaging.TransportOptions() transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target()) + client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), + transport_options=transport_options) transport._send = mock.Mock() msg = dict(method='foo', args=self.args) - kwargs = {'retry': None, 'transport_options': None} + kwargs = {'retry': None, 'transport_options': transport_options} if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None @@ -57,7 +58,7 @@ class TestCastCall(test_utils.BaseTestCase): method = client.call if self.call else client.cast method(self.ctxt, 'foo', **self.args) - + self.assertFalse(transport_options.at_least_once) transport._send.assert_called_once_with(oslo_messaging.Target(), self.ctxt, msg, @@ -67,13 +68,18 @@ class TestCastCall(test_utils.BaseTestCase): self.config(rpc_response_timeout=None) transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - transport_options={'my_k': 'my_val'}) + + transport_options = oslo_messaging.TransportOptions(at_least_once=True) + client = oslo_messaging.RPCClient( + transport, + oslo_messaging.Target(), + transport_options=transport_options) transport._send = mock.Mock() msg = dict(method='foo', args=self.args) - kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}} + kwargs = {'retry': None, + 'transport_options': transport_options} if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None @@ -82,6 +88,7 @@ class TestCastCall(test_utils.BaseTestCase): method = client.call if self.call else client.cast method(self.ctxt, 'foo', **self.args) + self.assertTrue(transport_options.at_least_once) transport._send.assert_called_once_with(oslo_messaging.Target(), self.ctxt, msg, diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index c263e3541..48979a64b 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -33,6 +33,7 @@ __all__ = [ 'Transport', 'TransportHost', 'TransportURL', + 'TransportOptions', 'get_transport', 'set_transport_defaults', ] @@ -277,6 +278,16 @@ class TransportHost(object): return '' +class TransportOptions(object): + + def __init__(self, at_least_once=False): + self._at_least_once = at_least_once + + @property + def at_least_once(self): + return self._at_least_once + + class TransportURL(object): """A parsed transport URL.