From 83266cc6ea7dbe682f8fdb960a548c68fd1235b5 Mon Sep 17 00:00:00 2001 From: Gabriele Date: Tue, 21 May 2019 14:39:55 +0200 Subject: [PATCH] Add transport_options parameter With this new parameter is possible to pass other parameters from the client to the drivers. So it is possible to tune the driver behavior. For example can be used to send the mandatory flag in RabbitMQ Note: - The transport_options parameter is not actually used (yet). - This part of blueprint transport-options (first part) Implements: blueprint transport-options The blueprint link is https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options Change-Id: Iff23a9344c2c87259cf36b0d14c0a9fc075a2a72 --- oslo_messaging/_drivers/amqpdriver.py | 10 +++--- oslo_messaging/_drivers/base.py | 12 +++++-- oslo_messaging/_drivers/impl_fake.py | 8 +++-- oslo_messaging/_drivers/impl_rabbit.py | 25 ++++++++------ oslo_messaging/rpc/client.py | 39 +++++++++++++--------- oslo_messaging/tests/rpc/test_client.py | 43 +++++++++++++++++++++---- oslo_messaging/tests/test_transport.py | 6 ++-- oslo_messaging/transport.py | 5 +-- 8 files changed, 102 insertions(+), 46 deletions(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 30b4d2bf8..1cdea3a27 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -580,7 +580,7 @@ class AMQPDriverBase(base.BaseDriver): def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, call_monitor_timeout=None, - envelope=True, notify=False, retry=None): + envelope=True, notify=False, retry=None, transport_options=None): msg = message @@ -626,7 +626,8 @@ class AMQPDriverBase(base.BaseDriver): " topic '%(topic)s'", {'exchange': exchange, 'topic': topic}) conn.topic_send(exchange_name=exchange, topic=topic, - msg=msg, timeout=timeout, retry=retry) + msg=msg, timeout=timeout, retry=retry, + transport_options=transport_options) if wait_for_reply: result = self._waiter.wait(msg_id, timeout, @@ -639,9 +640,10 @@ class AMQPDriverBase(base.BaseDriver): self._waiter.unlisten(msg_id) def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - call_monitor_timeout=None, retry=None): + call_monitor_timeout=None, retry=None, transport_options=None): return self._send(target, ctxt, message, wait_for_reply, timeout, - call_monitor_timeout, retry=retry) + call_monitor_timeout, retry=retry, + transport_options=transport_options) def send_notification(self, target, ctxt, message, version, retry=None): return self._send(target, ctxt, message, diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index 82dcda5b7..fb44540e9 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -20,7 +20,6 @@ from oslo_utils import excutils from oslo_utils import timeutils import six - from oslo_messaging import exceptions base_opts = [ @@ -41,6 +40,7 @@ def batch_poll_helper(func): :py:meth:`PollStyleListener.poll` implementation that only polls for a single message per call. """ + def wrapper(in_self, timeout=None, batch_size=1, batch_timeout=None): incomings = [] driver_prefetch = in_self.prefetch_size @@ -57,6 +57,7 @@ def batch_poll_helper(func): break return incomings + return wrapper @@ -244,9 +245,9 @@ class Listener(object): all backend implementations. :type prefetch_size: int """ + def __init__(self, batch_size, batch_timeout, prefetch_size=-1): - self.on_incoming_callback = None self.batch_timeout = batch_timeout self.prefetch_size = prefetch_size @@ -283,6 +284,7 @@ class PollStyleListenerAdapter(Listener): """A Listener that uses a PollStyleListener for message transfer. A dedicated thread is created to do message polling. """ + def __init__(self, poll_style_listener, batch_size, batch_timeout): super(PollStyleListenerAdapter, self).__init__( batch_size, batch_timeout, poll_style_listener.prefetch_size @@ -364,7 +366,7 @@ class BaseDriver(object): @abc.abstractmethod def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, call_monitor_timeout=None, - retry=None): + retry=None, transport_options=None): """Send a message to the given target and optionally wait for a reply. This method is used by the RPC client when sending RPC requests to a server. @@ -434,6 +436,10 @@ class BaseDriver(object): :type call_monitor_timeout: float :param retry: maximum message send attempts permitted :type retry: int + :param transport_options: additional parameters to configure the driver + for example to send parameters as "mandatory" + flag in RabbitMQ + :type transport_options: dictionary :returns: A reply message or None if no reply expected :raises: :py:exc:`MessagingException`, any exception thrown by the remote server when executing the RPC call. diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index fccfebe08..05300e7b1 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -188,7 +188,8 @@ class FakeDriver(base.BaseDriver): """ json.dumps(message) - def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, + transport_options=None): self._check_serialize(message) exchange = self._exchange_manager.get_exchange(target.exchange) @@ -216,10 +217,11 @@ class FakeDriver(base.BaseDriver): return None def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - call_monitor_timeout=None, retry=None): + call_monitor_timeout=None, retry=None, transport_options=None): # NOTE(sileht): retry doesn't need to be implemented, the fake # transport always works - return self._send(target, ctxt, message, wait_for_reply, timeout) + return self._send(target, ctxt, message, wait_for_reply, timeout, + transport_options) def send_notification(self, target, ctxt, message, version, retry=None): # NOTE(sileht): retry doesn't need to be implemented, the fake diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 817e7be55..3ea43b036 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -1086,7 +1086,7 @@ class Connection(object): self.declare_consumer(consumer) def _ensure_publishing(self, method, exchange, msg, routing_key=None, - timeout=None, retry=None): + timeout=None, retry=None, transport_options=None): """Send to a publisher based on the publisher class.""" def _error_callback(exc): @@ -1095,7 +1095,8 @@ class Connection(object): "'%(topic)s': %(err_str)s", log_info) LOG.debug('Exception', exc_info=exc) - method = functools.partial(method, exchange, msg, routing_key, timeout) + method = functools.partial(method, exchange, msg, routing_key, + timeout, transport_options) with self._connection_lock: self.ensure(method, retry=retry, error_callback=_error_callback) @@ -1117,7 +1118,8 @@ class Connection(object): 'connection_id': self.connection_id}) return info - def _publish(self, exchange, msg, routing_key=None, timeout=None): + def _publish(self, exchange, msg, routing_key=None, timeout=None, + transport_options=None): """Publish a message.""" if not (exchange.passive or exchange.name in self._declared_exchanges): @@ -1126,7 +1128,8 @@ class Connection(object): log_info = {'msg': msg, 'who': exchange or 'default', - 'key': routing_key} + 'key': routing_key, + 'transport_options': str(transport_options)} LOG.trace('Connection._publish: sending message %(msg)s to' ' %(who)s with routing key %(key)s', log_info) @@ -1140,7 +1143,8 @@ class Connection(object): compression=self.kombu_compression) def _publish_and_creates_default_queue(self, exchange, msg, - routing_key=None, timeout=None): + routing_key=None, timeout=None, + transport_options=None): """Publisher that declares a default queue When the exchange is missing instead of silently creates an exchange @@ -1177,7 +1181,8 @@ class Connection(object): def _publish_and_raises_on_missing_exchange(self, exchange, msg, routing_key=None, - timeout=None): + timeout=None, + transport_options=None): """Publisher that raises exception if exchange is missing.""" if not exchange.passive: raise RuntimeError("_publish_and_retry_on_missing_exchange() must " @@ -1185,7 +1190,7 @@ class Connection(object): try: self._publish(exchange, msg, routing_key=routing_key, - timeout=timeout) + timeout=timeout, transport_options=transport_options) return except self.connection.channel_errors as exc: if exc.code == 404: @@ -1212,7 +1217,8 @@ class Connection(object): self._ensure_publishing(self._publish_and_raises_on_missing_exchange, exchange, msg, routing_key=msg_id) - def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): + def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None, + transport_options=None): """Send a 'topic' message.""" exchange = kombu.entity.Exchange( name=exchange_name, @@ -1222,7 +1228,8 @@ class Connection(object): self._ensure_publishing(self._publish, exchange, msg, routing_key=topic, timeout=timeout, - retry=retry) + retry=retry, + transport_options=transport_options) def fanout_send(self, topic, msg, retry=None): """Send a 'fanout' message.""" diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index 636b2a194..ea5f54e2c 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -1,4 +1,3 @@ - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -45,7 +44,6 @@ _client_opts = [ class RemoteError(exceptions.MessagingException): - """Signifies that a remote endpoint method has raised an exception. Contains a string representation of the type of the original exception, @@ -94,7 +92,7 @@ class _BaseCallContext(object): def __init__(self, transport, target, serializer, timeout=None, version_cap=None, retry=None, - call_monitor_timeout=None): + call_monitor_timeout=None, transport_options=None): self.conf = transport.conf self.transport = transport @@ -104,6 +102,7 @@ class _BaseCallContext(object): self.call_monitor_timeout = call_monitor_timeout self.retry = retry self.version_cap = version_cap + self.transport_options = transport_options super(_BaseCallContext, self).__init__() @@ -150,7 +149,9 @@ class _BaseCallContext(object): self._check_version_cap(msg.get('version')) try: - self.transport._send(self.target, msg_ctxt, msg, retry=self.retry) + self.transport._send(self.target, msg_ctxt, msg, + retry=self.retry, + transport_options=self.transport_options) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) @@ -172,10 +173,12 @@ class _BaseCallContext(object): self._check_version_cap(msg.get('version')) try: - result = self.transport._send(self.target, msg_ctxt, msg, - wait_for_reply=True, timeout=timeout, - call_monitor_timeout=cm_timeout, - retry=self.retry) + result = \ + self.transport._send(self.target, msg_ctxt, msg, + wait_for_reply=True, timeout=timeout, + call_monitor_timeout=cm_timeout, + retry=self.retry, + transport_options=self.transport_options) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) @@ -190,7 +193,6 @@ class _BaseCallContext(object): class _CallContext(_BaseCallContext): - _marker = _BaseCallContext._marker @classmethod @@ -198,7 +200,7 @@ class _CallContext(_BaseCallContext): exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, timeout=_marker, version_cap=_marker, retry=_marker, - call_monitor_timeout=_marker): + call_monitor_timeout=_marker, transport_options=_marker): cls._check_version(version) kwargs = dict( exchange=exchange, @@ -219,11 +221,13 @@ class _CallContext(_BaseCallContext): retry = call_context.retry if call_monitor_timeout is cls._marker: call_monitor_timeout = call_context.call_monitor_timeout + if transport_options is cls._marker: + transport_options = call_context.transport_options return _CallContext(call_context.transport, target, call_context.serializer, timeout, version_cap, retry, - call_monitor_timeout) + call_monitor_timeout, transport_options) def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, @@ -237,7 +241,6 @@ class _CallContext(_BaseCallContext): class RPCClient(_BaseCallContext): - """A class for invoking methods on remote RPC servers. The RPCClient class is responsible for sending method invocations to and @@ -326,7 +329,7 @@ class RPCClient(_BaseCallContext): def __init__(self, transport, target, timeout=None, version_cap=None, serializer=None, retry=None, - call_monitor_timeout=None): + call_monitor_timeout=None, transport_options=None): """Construct an RPC client. :param transport: a messaging transport handle @@ -362,7 +365,7 @@ class RPCClient(_BaseCallContext): super(RPCClient, self).__init__( transport, target, serializer, timeout, version_cap, retry, - call_monitor_timeout + call_monitor_timeout, transport_options ) self.conf.register_opts(_client_opts) @@ -370,7 +373,7 @@ class RPCClient(_BaseCallContext): def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, timeout=_marker, version_cap=_marker, retry=_marker, - call_monitor_timeout=_marker): + call_monitor_timeout=_marker, transport_options=_marker): """Prepare a method invocation context. Use this method to override client properties for an individual method @@ -401,6 +404,10 @@ class RPCClient(_BaseCallContext): 0 means no retry is attempted. N means attempt at most N retries. :type retry: int + :param transport_options: additional parameters to configure the driver + for example to send parameters as "mandatory" + flag in RabbitMQ + :type transport_options: dictionary :param call_monitor_timeout: an optional timeout (in seconds) for active call heartbeating. If specified, requires the server to heartbeat @@ -413,7 +420,7 @@ class RPCClient(_BaseCallContext): exchange, topic, namespace, version, server, fanout, timeout, version_cap, retry, - call_monitor_timeout) + call_monitor_timeout, transport_options) def cast(self, ctxt, method, **kwargs): """Invoke a method without blocking for a return value. diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py index 9fa40db05..ec22d705c 100755 --- a/oslo_messaging/tests/rpc/test_client.py +++ b/oslo_messaging/tests/rpc/test_client.py @@ -49,7 +49,31 @@ class TestCastCall(test_utils.BaseTestCase): transport._send = mock.Mock() msg = dict(method='foo', args=self.args) - kwargs = {'retry': None} + kwargs = {'retry': None, 'transport_options': None} + if self.call: + kwargs['wait_for_reply'] = True + kwargs['timeout'] = None + kwargs['call_monitor_timeout'] = None + + method = client.call if self.call else client.cast + method(self.ctxt, 'foo', **self.args) + + transport._send.assert_called_once_with(oslo_messaging.Target(), + self.ctxt, + msg, + **kwargs) + + def test_cast_call_with_transport_options(self): + self.config(rpc_response_timeout=None) + + transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') + client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), + transport_options={'my_k': 'my_val'}) + + transport._send = mock.Mock() + + msg = dict(method='foo', args=self.args) + kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}} if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None @@ -203,7 +227,8 @@ class TestCastToTarget(test_utils.BaseTestCase): transport._send.assert_called_once_with(expect_target, {}, msg, - retry=None) + retry=None, + transport_options=None) TestCastToTarget.generate_scenarios() @@ -245,7 +270,7 @@ class TestCallTimeout(test_utils.BaseTestCase): msg = dict(method='foo', args={}) kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None, - call_monitor_timeout=self.cm) + call_monitor_timeout=self.cm, transport_options=None) if self.prepare is not _notset: client = client.prepare(timeout=self.prepare) @@ -277,7 +302,8 @@ class TestCallRetry(test_utils.BaseTestCase): msg = dict(method='foo', args={}) kwargs = dict(wait_for_reply=True, timeout=60, - retry=self.expect, call_monitor_timeout=None) + retry=self.expect, call_monitor_timeout=None, + transport_options=None) if self.prepare is not _notset: client = client.prepare(retry=self.prepare) @@ -334,8 +360,8 @@ class TestSerializer(test_utils.BaseTestCase): serializer=serializer) transport._send = mock.Mock() - - kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {} + kwargs = dict(wait_for_reply=True, + timeout=None) if self.call else {} kwargs['retry'] = None if self.call: kwargs['call_monitor_timeout'] = None @@ -367,6 +393,7 @@ class TestSerializer(test_utils.BaseTestCase): transport._send.assert_called_once_with(oslo_messaging.Target(), dict(user='alice'), msg, + transport_options=None, **kwargs) expected_calls = [mock.call(self.ctxt, arg) for arg in self.args] self.assertEqual(expected_calls, @@ -466,7 +493,9 @@ class TestVersionCap(test_utils.BaseTestCase): self.assertFalse(self.success) else: self.assertTrue(self.success) - transport._send.assert_called_once_with(target, {}, msg, **kwargs) + transport._send.assert_called_once_with(target, {}, msg, + transport_options=None, + **kwargs) TestVersionCap.generate_scenarios() diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py index a2c17f9c4..02a19f784 100755 --- a/oslo_messaging/tests/test_transport.py +++ b/oslo_messaging/tests/test_transport.py @@ -231,7 +231,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): wait_for_reply=None, timeout=None, call_monitor_timeout=None, - retry=None) + retry=None, + transport_options=None) def test_send_all_args(self): t = transport.Transport(_FakeDriver(cfg.CONF)) @@ -250,7 +251,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): wait_for_reply='wait_for_reply', timeout='timeout', call_monitor_timeout='cm_timeout', - retry='retry') + retry='retry', + transport_options=None) def test_send_notification(self): t = transport.Transport(_FakeDriver(cfg.CONF)) diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index e44cb26b6..c263e3541 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -116,7 +116,7 @@ class Transport(object): self._driver.require_features(requeue=requeue) def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - call_monitor_timeout=None, retry=None): + call_monitor_timeout=None, retry=None, transport_options=None): if not target.topic: raise exceptions.InvalidTarget('A topic is required to send', target) @@ -124,7 +124,8 @@ class Transport(object): wait_for_reply=wait_for_reply, timeout=timeout, call_monitor_timeout=call_monitor_timeout, - retry=retry) + retry=retry, + transport_options=transport_options) def _send_notification(self, target, ctxt, message, version, retry=None): if not target.topic: