Add transport_options parameter

With this new parameter is possible to pass other parameters
from the client to the drivers.
So it is possible to tune the driver behavior.

For example can be used to send the mandatory flag in RabbitMQ

Note:
  - The transport_options parameter is not actually used (yet).
  - This part of blueprint transport-options  (first part)

Implements: blueprint transport-options
The blueprint link is
https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options

Change-Id: Iff23a9344c2c87259cf36b0d14c0a9fc075a2a72
This commit is contained in:
Gabriele 2019-05-21 14:39:55 +02:00
parent 40c25c2bde
commit 83266cc6ea
No known key found for this signature in database
GPG Key ID: 2DEE5B9E783BBCFA
8 changed files with 102 additions and 46 deletions

View File

@ -580,7 +580,7 @@ class AMQPDriverBase(base.BaseDriver):
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, call_monitor_timeout=None,
envelope=True, notify=False, retry=None):
envelope=True, notify=False, retry=None, transport_options=None):
msg = message
@ -626,7 +626,8 @@ class AMQPDriverBase(base.BaseDriver):
" topic '%(topic)s'", {'exchange': exchange,
'topic': topic})
conn.topic_send(exchange_name=exchange, topic=topic,
msg=msg, timeout=timeout, retry=retry)
msg=msg, timeout=timeout, retry=retry,
transport_options=transport_options)
if wait_for_reply:
result = self._waiter.wait(msg_id, timeout,
@ -639,9 +640,10 @@ class AMQPDriverBase(base.BaseDriver):
self._waiter.unlisten(msg_id)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
call_monitor_timeout=None, retry=None):
call_monitor_timeout=None, retry=None, transport_options=None):
return self._send(target, ctxt, message, wait_for_reply, timeout,
call_monitor_timeout, retry=retry)
call_monitor_timeout, retry=retry,
transport_options=transport_options)
def send_notification(self, target, ctxt, message, version, retry=None):
return self._send(target, ctxt, message,

View File

@ -20,7 +20,6 @@ from oslo_utils import excutils
from oslo_utils import timeutils
import six
from oslo_messaging import exceptions
base_opts = [
@ -41,6 +40,7 @@ def batch_poll_helper(func):
:py:meth:`PollStyleListener.poll` implementation that only polls for a
single message per call.
"""
def wrapper(in_self, timeout=None, batch_size=1, batch_timeout=None):
incomings = []
driver_prefetch = in_self.prefetch_size
@ -57,6 +57,7 @@ def batch_poll_helper(func):
break
return incomings
return wrapper
@ -244,9 +245,9 @@ class Listener(object):
all backend implementations.
:type prefetch_size: int
"""
def __init__(self, batch_size, batch_timeout,
prefetch_size=-1):
self.on_incoming_callback = None
self.batch_timeout = batch_timeout
self.prefetch_size = prefetch_size
@ -283,6 +284,7 @@ class PollStyleListenerAdapter(Listener):
"""A Listener that uses a PollStyleListener for message transfer. A
dedicated thread is created to do message polling.
"""
def __init__(self, poll_style_listener, batch_size, batch_timeout):
super(PollStyleListenerAdapter, self).__init__(
batch_size, batch_timeout, poll_style_listener.prefetch_size
@ -364,7 +366,7 @@ class BaseDriver(object):
@abc.abstractmethod
def send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, call_monitor_timeout=None,
retry=None):
retry=None, transport_options=None):
"""Send a message to the given target and optionally wait for a reply.
This method is used by the RPC client when sending RPC requests to a
server.
@ -434,6 +436,10 @@ class BaseDriver(object):
:type call_monitor_timeout: float
:param retry: maximum message send attempts permitted
:type retry: int
:param transport_options: additional parameters to configure the driver
for example to send parameters as "mandatory"
flag in RabbitMQ
:type transport_options: dictionary
:returns: A reply message or None if no reply expected
:raises: :py:exc:`MessagingException`, any exception thrown by the
remote server when executing the RPC call.

View File

@ -188,7 +188,8 @@ class FakeDriver(base.BaseDriver):
"""
json.dumps(message)
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
transport_options=None):
self._check_serialize(message)
exchange = self._exchange_manager.get_exchange(target.exchange)
@ -216,10 +217,11 @@ class FakeDriver(base.BaseDriver):
return None
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
call_monitor_timeout=None, retry=None):
call_monitor_timeout=None, retry=None, transport_options=None):
# NOTE(sileht): retry doesn't need to be implemented, the fake
# transport always works
return self._send(target, ctxt, message, wait_for_reply, timeout)
return self._send(target, ctxt, message, wait_for_reply, timeout,
transport_options)
def send_notification(self, target, ctxt, message, version, retry=None):
# NOTE(sileht): retry doesn't need to be implemented, the fake

View File

@ -1086,7 +1086,7 @@ class Connection(object):
self.declare_consumer(consumer)
def _ensure_publishing(self, method, exchange, msg, routing_key=None,
timeout=None, retry=None):
timeout=None, retry=None, transport_options=None):
"""Send to a publisher based on the publisher class."""
def _error_callback(exc):
@ -1095,7 +1095,8 @@ class Connection(object):
"'%(topic)s': %(err_str)s", log_info)
LOG.debug('Exception', exc_info=exc)
method = functools.partial(method, exchange, msg, routing_key, timeout)
method = functools.partial(method, exchange, msg, routing_key,
timeout, transport_options)
with self._connection_lock:
self.ensure(method, retry=retry, error_callback=_error_callback)
@ -1117,7 +1118,8 @@ class Connection(object):
'connection_id': self.connection_id})
return info
def _publish(self, exchange, msg, routing_key=None, timeout=None):
def _publish(self, exchange, msg, routing_key=None, timeout=None,
transport_options=None):
"""Publish a message."""
if not (exchange.passive or exchange.name in self._declared_exchanges):
@ -1126,7 +1128,8 @@ class Connection(object):
log_info = {'msg': msg,
'who': exchange or 'default',
'key': routing_key}
'key': routing_key,
'transport_options': str(transport_options)}
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
@ -1140,7 +1143,8 @@ class Connection(object):
compression=self.kombu_compression)
def _publish_and_creates_default_queue(self, exchange, msg,
routing_key=None, timeout=None):
routing_key=None, timeout=None,
transport_options=None):
"""Publisher that declares a default queue
When the exchange is missing instead of silently creates an exchange
@ -1177,7 +1181,8 @@ class Connection(object):
def _publish_and_raises_on_missing_exchange(self, exchange, msg,
routing_key=None,
timeout=None):
timeout=None,
transport_options=None):
"""Publisher that raises exception if exchange is missing."""
if not exchange.passive:
raise RuntimeError("_publish_and_retry_on_missing_exchange() must "
@ -1185,7 +1190,7 @@ class Connection(object):
try:
self._publish(exchange, msg, routing_key=routing_key,
timeout=timeout)
timeout=timeout, transport_options=transport_options)
return
except self.connection.channel_errors as exc:
if exc.code == 404:
@ -1212,7 +1217,8 @@ class Connection(object):
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
exchange, msg, routing_key=msg_id)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None,
transport_options=None):
"""Send a 'topic' message."""
exchange = kombu.entity.Exchange(
name=exchange_name,
@ -1222,7 +1228,8 @@ class Connection(object):
self._ensure_publishing(self._publish, exchange, msg,
routing_key=topic, timeout=timeout,
retry=retry)
retry=retry,
transport_options=transport_options)
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""

View File

@ -1,4 +1,3 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@ -45,7 +44,6 @@ _client_opts = [
class RemoteError(exceptions.MessagingException):
"""Signifies that a remote endpoint method has raised an exception.
Contains a string representation of the type of the original exception,
@ -94,7 +92,7 @@ class _BaseCallContext(object):
def __init__(self, transport, target, serializer,
timeout=None, version_cap=None, retry=None,
call_monitor_timeout=None):
call_monitor_timeout=None, transport_options=None):
self.conf = transport.conf
self.transport = transport
@ -104,6 +102,7 @@ class _BaseCallContext(object):
self.call_monitor_timeout = call_monitor_timeout
self.retry = retry
self.version_cap = version_cap
self.transport_options = transport_options
super(_BaseCallContext, self).__init__()
@ -150,7 +149,9 @@ class _BaseCallContext(object):
self._check_version_cap(msg.get('version'))
try:
self.transport._send(self.target, msg_ctxt, msg, retry=self.retry)
self.transport._send(self.target, msg_ctxt, msg,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
@ -172,10 +173,12 @@ class _BaseCallContext(object):
self._check_version_cap(msg.get('version'))
try:
result = self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
call_monitor_timeout=cm_timeout,
retry=self.retry)
result = \
self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
call_monitor_timeout=cm_timeout,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
@ -190,7 +193,6 @@ class _BaseCallContext(object):
class _CallContext(_BaseCallContext):
_marker = _BaseCallContext._marker
@classmethod
@ -198,7 +200,7 @@ class _CallContext(_BaseCallContext):
exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker,
call_monitor_timeout=_marker):
call_monitor_timeout=_marker, transport_options=_marker):
cls._check_version(version)
kwargs = dict(
exchange=exchange,
@ -219,11 +221,13 @@ class _CallContext(_BaseCallContext):
retry = call_context.retry
if call_monitor_timeout is cls._marker:
call_monitor_timeout = call_context.call_monitor_timeout
if transport_options is cls._marker:
transport_options = call_context.transport_options
return _CallContext(call_context.transport, target,
call_context.serializer,
timeout, version_cap, retry,
call_monitor_timeout)
call_monitor_timeout, transport_options)
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
@ -237,7 +241,6 @@ class _CallContext(_BaseCallContext):
class RPCClient(_BaseCallContext):
"""A class for invoking methods on remote RPC servers.
The RPCClient class is responsible for sending method invocations to and
@ -326,7 +329,7 @@ class RPCClient(_BaseCallContext):
def __init__(self, transport, target,
timeout=None, version_cap=None, serializer=None, retry=None,
call_monitor_timeout=None):
call_monitor_timeout=None, transport_options=None):
"""Construct an RPC client.
:param transport: a messaging transport handle
@ -362,7 +365,7 @@ class RPCClient(_BaseCallContext):
super(RPCClient, self).__init__(
transport, target, serializer, timeout, version_cap, retry,
call_monitor_timeout
call_monitor_timeout, transport_options
)
self.conf.register_opts(_client_opts)
@ -370,7 +373,7 @@ class RPCClient(_BaseCallContext):
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker,
call_monitor_timeout=_marker):
call_monitor_timeout=_marker, transport_options=_marker):
"""Prepare a method invocation context.
Use this method to override client properties for an individual method
@ -401,6 +404,10 @@ class RPCClient(_BaseCallContext):
0 means no retry is attempted.
N means attempt at most N retries.
:type retry: int
:param transport_options: additional parameters to configure the driver
for example to send parameters as "mandatory"
flag in RabbitMQ
:type transport_options: dictionary
:param call_monitor_timeout: an optional timeout (in seconds) for
active call heartbeating. If specified,
requires the server to heartbeat
@ -413,7 +420,7 @@ class RPCClient(_BaseCallContext):
exchange, topic, namespace,
version, server, fanout,
timeout, version_cap, retry,
call_monitor_timeout)
call_monitor_timeout, transport_options)
def cast(self, ctxt, method, **kwargs):
"""Invoke a method without blocking for a return value.

View File

@ -49,7 +49,31 @@ class TestCastCall(test_utils.BaseTestCase):
transport._send = mock.Mock()
msg = dict(method='foo', args=self.args)
kwargs = {'retry': None}
kwargs = {'retry': None, 'transport_options': None}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
kwargs['call_monitor_timeout'] = None
method = client.call if self.call else client.cast
method(self.ctxt, 'foo', **self.args)
transport._send.assert_called_once_with(oslo_messaging.Target(),
self.ctxt,
msg,
**kwargs)
def test_cast_call_with_transport_options(self):
self.config(rpc_response_timeout=None)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
transport_options={'my_k': 'my_val'})
transport._send = mock.Mock()
msg = dict(method='foo', args=self.args)
kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
@ -203,7 +227,8 @@ class TestCastToTarget(test_utils.BaseTestCase):
transport._send.assert_called_once_with(expect_target,
{},
msg,
retry=None)
retry=None,
transport_options=None)
TestCastToTarget.generate_scenarios()
@ -245,7 +270,7 @@ class TestCallTimeout(test_utils.BaseTestCase):
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None,
call_monitor_timeout=self.cm)
call_monitor_timeout=self.cm, transport_options=None)
if self.prepare is not _notset:
client = client.prepare(timeout=self.prepare)
@ -277,7 +302,8 @@ class TestCallRetry(test_utils.BaseTestCase):
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=60,
retry=self.expect, call_monitor_timeout=None)
retry=self.expect, call_monitor_timeout=None,
transport_options=None)
if self.prepare is not _notset:
client = client.prepare(retry=self.prepare)
@ -334,8 +360,8 @@ class TestSerializer(test_utils.BaseTestCase):
serializer=serializer)
transport._send = mock.Mock()
kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
kwargs = dict(wait_for_reply=True,
timeout=None) if self.call else {}
kwargs['retry'] = None
if self.call:
kwargs['call_monitor_timeout'] = None
@ -367,6 +393,7 @@ class TestSerializer(test_utils.BaseTestCase):
transport._send.assert_called_once_with(oslo_messaging.Target(),
dict(user='alice'),
msg,
transport_options=None,
**kwargs)
expected_calls = [mock.call(self.ctxt, arg) for arg in self.args]
self.assertEqual(expected_calls,
@ -466,7 +493,9 @@ class TestVersionCap(test_utils.BaseTestCase):
self.assertFalse(self.success)
else:
self.assertTrue(self.success)
transport._send.assert_called_once_with(target, {}, msg, **kwargs)
transport._send.assert_called_once_with(target, {}, msg,
transport_options=None,
**kwargs)
TestVersionCap.generate_scenarios()

View File

@ -231,7 +231,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
wait_for_reply=None,
timeout=None,
call_monitor_timeout=None,
retry=None)
retry=None,
transport_options=None)
def test_send_all_args(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
@ -250,7 +251,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
wait_for_reply='wait_for_reply',
timeout='timeout',
call_monitor_timeout='cm_timeout',
retry='retry')
retry='retry',
transport_options=None)
def test_send_notification(self):
t = transport.Transport(_FakeDriver(cfg.CONF))

View File

@ -116,7 +116,7 @@ class Transport(object):
self._driver.require_features(requeue=requeue)
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
call_monitor_timeout=None, retry=None):
call_monitor_timeout=None, retry=None, transport_options=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
@ -124,7 +124,8 @@ class Transport(object):
wait_for_reply=wait_for_reply,
timeout=timeout,
call_monitor_timeout=call_monitor_timeout,
retry=retry)
retry=retry,
transport_options=transport_options)
def _send_notification(self, target, ctxt, message, version, retry=None):
if not target.topic: