[rabbitmq] Implement active call monitoring

This adds an optional call_monitor_timeout parameter to the RPC client,
which if specified, will enable heartbeating of long-running calls by
the server. This enables the user to increase the regular timeout to
a much larger value, allowing calls to take a very long time, but
with heartbeating to indicate that they are still running on the server
side. If the server stops heartbeating, then the call_monitor_timeout
takes over and we fail with the usual MessagingTimeout instead of waiting
for the longer overall timeout to expire.

Change-Id: I60334aaf019f177a984583528b71d00859d31f84
This commit is contained in:
Dan Smith 2018-02-21 11:03:38 -08:00
parent d38ad3e465
commit b34ab8b1cc
13 changed files with 221 additions and 44 deletions

View File

@ -76,6 +76,7 @@ def unpack_context(msg):
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['client_timeout'] = msg.pop('_timeout', None)
return RpcContext.from_dict(context_dict)

View File

@ -88,19 +88,21 @@ class MessageOperationsHandler(object):
class AMQPIncomingMessage(base.RpcIncomingMessage):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
obsolete_reply_queues, message_operations_handler):
client_timeout, obsolete_reply_queues,
message_operations_handler):
super(AMQPIncomingMessage, self).__init__(ctxt, message)
self.listener = listener
self.unique_id = unique_id
self.msg_id = msg_id
self.reply_q = reply_q
self.client_timeout = client_timeout
self._obsolete_reply_queues = obsolete_reply_queues
self._message_operations_handler = message_operations_handler
self.stopwatch = timeutils.StopWatch()
self.stopwatch.start()
def _send_reply(self, conn, reply=None, failure=None):
def _send_reply(self, conn, reply=None, failure=None, ending=True):
if not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id):
return
@ -109,7 +111,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
failure = rpc_common.serialize_remote_exception(failure)
# NOTE(sileht): ending can be removed in N*, see Listener.wait()
# for more detail.
msg = {'result': reply, 'failure': failure, 'ending': True,
msg = {'result': reply, 'failure': failure, 'ending': ending,
'_msg_id': self.msg_id}
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
@ -179,7 +181,9 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
self._message_operations_handler.do(self.message.requeue)
def heartbeat(self):
LOG.debug("Message heartbeat not implemented")
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
self._send_reply(conn, None, None, ending=False)
class ObsoleteReplyQueuesCache(object):
@ -259,6 +263,7 @@ class AMQPListener(base.PollStyleListener):
unique_id,
ctxt.msg_id,
ctxt.reply_q,
ctxt.client_timeout,
self._obsolete_reply_queues,
self._message_operations_handler))
@ -426,7 +431,7 @@ class ReplyWaiter(object):
ending = data.get('ending', False)
return result, ending
def wait(self, msg_id, timeout):
def wait(self, msg_id, timeout, call_monitor_timeout):
# NOTE(sileht): for each msg_id we receive two amqp message
# first one with the payload, a second one to ensure the other
# have finish to send the payload
@ -435,10 +440,21 @@ class ReplyWaiter(object):
# support both cases for now.
timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
if call_monitor_timeout:
call_monitor_timer = rpc_common.DecayingTimer(
duration=call_monitor_timeout)
call_monitor_timer.start()
else:
call_monitor_timer = None
final_reply = None
ending = False
while not ending:
timeout = timer.check_return(self._raise_timeout_exception, msg_id)
if call_monitor_timer and timeout > 0:
cm_timeout = call_monitor_timer.check_return(
self._raise_timeout_exception, msg_id)
if cm_timeout < timeout:
timeout = cm_timeout
try:
message = self.waiters.get(msg_id, timeout=timeout)
except moves.queue.Empty:
@ -450,6 +466,10 @@ class ReplyWaiter(object):
# empty `result` field or a second _send_reply() with
# ending=True and no `result` field.
final_reply = reply
elif ending is False:
LOG.debug('Call monitor heartbeat received; '
'renewing timeout timer')
call_monitor_timer.restart()
return final_reply
@ -495,7 +515,7 @@ class AMQPDriverBase(base.BaseDriver):
return self._reply_q
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None,
wait_for_reply=None, timeout=None, call_monitor_timeout=None,
envelope=True, notify=False, retry=None):
msg = message
@ -504,6 +524,7 @@ class AMQPDriverBase(base.BaseDriver):
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
msg.update({'_reply_q': self._get_reply_q()})
msg.update({'_timeout': call_monitor_timeout})
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
@ -548,7 +569,8 @@ class AMQPDriverBase(base.BaseDriver):
msg=msg, timeout=timeout, retry=retry)
if wait_for_reply:
result = self._waiter.wait(msg_id, timeout)
result = self._waiter.wait(msg_id, timeout,
call_monitor_timeout)
if isinstance(result, Exception):
raise result
return result
@ -557,9 +579,9 @@ class AMQPDriverBase(base.BaseDriver):
self._waiter.unlisten(msg_id)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
call_monitor_timeout=None, retry=None):
return self._send(target, ctxt, message, wait_for_reply, timeout,
retry=retry)
call_monitor_timeout, retry=retry)
def send_notification(self, target, ctxt, message, version, retry=None):
return self._send(target, ctxt, message,

View File

@ -81,6 +81,7 @@ class IncomingMessage(object):
def __init__(self, ctxt, message):
self.ctxt = ctxt
self.message = message
self.client_timeout = None
def acknowledge(self):
"""Called by the server to acknowledge receipt of the message. When
@ -362,7 +363,8 @@ class BaseDriver(object):
@abc.abstractmethod
def send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, retry=None):
wait_for_reply=None, timeout=None, call_monitor_timeout=None,
retry=None):
"""Send a message to the given target and optionally wait for a reply.
This method is used by the RPC client when sending RPC requests to a
server.
@ -426,6 +428,10 @@ class BaseDriver(object):
operation to complete. Should this expire the :py:meth:`send` must
raise a :py:exc:`MessagingTimeout` exception
:type timeout: float
:param call_monitor_timeout: Maximum time the client will wait for the
call to complete or receive a message heartbeat indicating the
remote side is still executing.
:type call_monitor_timeout: float
:param retry: maximum message send attempts permitted
:type retry: int
:returns: A reply message or None if no reply expected

View File

@ -268,7 +268,8 @@ class ProtonDriver(base.BaseDriver):
@_ensure_connect_called
def send(self, target, ctxt, message,
wait_for_reply=False, timeout=None, retry=None):
wait_for_reply=False, timeout=None, call_monitor_timeout=None,
retry=None):
"""Send a message to the given target.
:param target: destination for message
@ -282,6 +283,10 @@ class ProtonDriver(base.BaseDriver):
:param timeout: raise exception if send does not complete within
timeout seconds. None == no timeout.
:type timeout: float
:param call_monitor_timeout: Maximum time the client will wait for the
call to complete or receive a message heartbeat indicating the
remote side is still executing.
:type call_monitor_timeout: float
:param retry: (optional) maximum re-send attempts on recoverable error
None or -1 means to retry forever
0 means no retry

View File

@ -216,7 +216,7 @@ class FakeDriver(base.BaseDriver):
return None
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
call_monitor_timeout=None, retry=None):
# NOTE(sileht): retry doesn't need to be implemented, the fake
# transport always works
return self._send(target, ctxt, message, wait_for_reply, timeout)

View File

@ -375,7 +375,7 @@ class KafkaDriver(base.BaseDriver):
self.listeners = []
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
call_monitor_timeout=None, retry=None):
raise NotImplementedError(
'The RPC implementation for Kafka is not implemented')
@ -390,6 +390,10 @@ class KafkaDriver(base.BaseDriver):
:type message: dict
:param version: Messaging API version (currently not used)
:type version: str
:param call_monitor_timeout: Maximum time the client will wait for the
call to complete before or receive a message heartbeat indicating
the remote side is still executing.
:type call_monitor_timeout: float
:param retry: an optional default kafka consumer retries configuration
None means to retry forever
0 means no retry

View File

@ -94,13 +94,15 @@ class _BaseCallContext(object):
_marker = object()
def __init__(self, transport, target, serializer,
timeout=None, version_cap=None, retry=None):
timeout=None, version_cap=None, retry=None,
call_monitor_timeout=None):
self.conf = transport.conf
self.transport = transport
self.target = target
self.serializer = serializer
self.timeout = timeout
self.call_monitor_timeout = call_monitor_timeout
self.retry = retry
self.version_cap = version_cap
@ -166,11 +168,14 @@ class _BaseCallContext(object):
if self.timeout is None:
timeout = self.conf.rpc_response_timeout
cm_timeout = self.call_monitor_timeout
self._check_version_cap(msg.get('version'))
try:
result = self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
call_monitor_timeout=cm_timeout,
retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
@ -180,7 +185,8 @@ class _BaseCallContext(object):
@abc.abstractmethod
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker):
timeout=_marker, version_cap=_marker, retry=_marker,
call_monitor_timeout=_marker):
"""Prepare a method invocation context. See RPCClient.prepare()."""
@ -192,7 +198,8 @@ class _CallContext(_BaseCallContext):
def _prepare(cls, call_context,
exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker):
timeout=_marker, version_cap=_marker, retry=_marker,
call_monitor_timeout=_marker):
cls._check_version(version)
kwargs = dict(
exchange=exchange,
@ -211,18 +218,23 @@ class _CallContext(_BaseCallContext):
version_cap = call_context.version_cap
if retry is cls._marker:
retry = call_context.retry
if call_monitor_timeout is cls._marker:
call_monitor_timeout = call_context.call_monitor_timeout
return _CallContext(call_context.transport, target,
call_context.serializer,
timeout, version_cap, retry)
timeout, version_cap, retry,
call_monitor_timeout)
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker):
timeout=_marker, version_cap=_marker, retry=_marker,
call_monitor_timeout=_marker):
return _CallContext._prepare(self,
exchange, topic, namespace,
version, server, fanout,
timeout, version_cap, retry)
timeout, version_cap, retry,
call_monitor_timeout)
class RPCClient(_BaseCallContext):
@ -314,7 +326,8 @@ class RPCClient(_BaseCallContext):
_marker = _BaseCallContext._marker
def __init__(self, transport, target,
timeout=None, version_cap=None, serializer=None, retry=None):
timeout=None, version_cap=None, serializer=None, retry=None,
call_monitor_timeout=None):
"""Construct an RPC client.
:param transport: a messaging transport handle
@ -332,6 +345,13 @@ class RPCClient(_BaseCallContext):
0 means no retry is attempted.
N means attempt at most N retries.
:type retry: int
:param call_monitor_timeout: an optional timeout (in seconds) for
active call heartbeating. If specified,
requires the server to heartbeat
long-running calls at this interval
(less than the overall timeout
parameter).
:type call_monitor_timeout: int
"""
if serializer is None:
serializer = msg_serializer.NoOpSerializer()
@ -342,14 +362,16 @@ class RPCClient(_BaseCallContext):
"instance.")
super(RPCClient, self).__init__(
transport, target, serializer, timeout, version_cap, retry
transport, target, serializer, timeout, version_cap, retry,
call_monitor_timeout
)
self.conf.register_opts(_client_opts)
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker):
timeout=_marker, version_cap=_marker, retry=_marker,
call_monitor_timeout=_marker):
"""Prepare a method invocation context.
Use this method to override client properties for an individual method
@ -380,11 +402,19 @@ class RPCClient(_BaseCallContext):
0 means no retry is attempted.
N means attempt at most N retries.
:type retry: int
:param call_monitor_timeout: an optional timeout (in seconds) for
active call heartbeating. If specified,
requires the server to heartbeat
long-running calls at this interval
(less than the overall timeout
parameter).
:type call_monitor_timeout: int
"""
return _CallContext._prepare(self,
exchange, topic, namespace,
version, server, fanout,
timeout, version_cap, retry)
timeout, version_cap, retry,
call_monitor_timeout)
def cast(self, ctxt, method, **kwargs):
"""Invoke a method without blocking for a return value.

View File

@ -30,7 +30,9 @@ __all__ = [
from abc import ABCMeta
from abc import abstractmethod
import logging
import sys
import threading
import six
@ -40,6 +42,8 @@ from oslo_messaging import serializer as msg_serializer
from oslo_messaging import server as msg_server
from oslo_messaging import target as msg_target
LOG = logging.getLogger(__name__)
class ExpectedException(Exception):
"""Encapsulates an expected exception raised by an RPC endpoint
@ -190,6 +194,32 @@ class RPCDispatcher(dispatcher.DispatcherBase):
result = func(ctxt, **new_args)
return self.serializer.serialize_entity(ctxt, result)
def _watchdog(self, event, incoming):
# NOTE(danms): If the client declared that they are going to
# time out after N seconds, send the call-monitor heartbeat
# every N/2 seconds to make sure there is plenty of time to
# account for inbound and outbound queuing delays. Client
# timeouts must be integral and positive, otherwise we log and
# ignore.
try:
client_timeout = int(incoming.client_timeout)
cm_heartbeat_interval = client_timeout / 2
except ValueError:
client_timeout = cm_heartbeat_interval = 0
if cm_heartbeat_interval < 1:
LOG.warning('Client provided an invalid timeout value of %r' % (
incoming.client_timeout))
return
while not event.wait(cm_heartbeat_interval):
LOG.debug(
'Sending call-monitor heartbeat for active call to %(method)s '
'(interval=%(interval)i)' % (
{'method': incoming.message.get('method'),
'interval': cm_heartbeat_interval}))
incoming.heartbeat()
def dispatch(self, incoming):
"""Dispatch an RPC message to the appropriate endpoint method.
@ -205,6 +235,20 @@ class RPCDispatcher(dispatcher.DispatcherBase):
namespace = message.get('namespace')
version = message.get('version', '1.0')
# NOTE(danms): This event and watchdog thread are used to send
# call-monitoring heartbeats for this message while the call
# is executing if it runs for some time. The thread will wait
# for the event to be signaled, which we do explicitly below
# after dispatching the method call.
completion_event = threading.Event()
watchdog_thread = threading.Thread(target=self._watchdog,
args=(completion_event, incoming))
if incoming.client_timeout:
# NOTE(danms): The client provided a timeout, so we start
# the watchdog thread. If the client is old or didn't send
# a timeout, we just never start the watchdog thread.
watchdog_thread.start()
found_compatible = False
for endpoint in self.endpoints:
target = getattr(endpoint, 'target', None)
@ -217,7 +261,12 @@ class RPCDispatcher(dispatcher.DispatcherBase):
if hasattr(endpoint, method):
if self.access_policy.is_allowed(endpoint, method):
return self._do_dispatch(endpoint, method, ctxt, args)
try:
return self._do_dispatch(endpoint, method, ctxt, args)
finally:
completion_event.set()
if incoming.client_timeout:
watchdog_thread.join()
found_compatible = True

View File

@ -469,8 +469,11 @@ class TestSendReceive(test_utils.BaseTestCase):
]
_timeout = [
('no_timeout', dict(timeout=None)),
('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
('no_timeout', dict(timeout=None, call_monitor_timeout=None)),
('timeout', dict(timeout=0.01, # FIXME(markmc): timeout=0 is broken?
call_monitor_timeout=None)),
('call_monitor_timeout', dict(timeout=0.01,
call_monitor_timeout=0.02)),
]
@classmethod
@ -500,15 +503,20 @@ class TestSendReceive(test_utils.BaseTestCase):
replies = []
msgs = []
# FIXME(danms): Surely this is not the right way to do this...
self.ctxt['client_timeout'] = self.call_monitor_timeout
def send_and_wait_for_reply(i):
try:
timeout = self.timeout
cm_timeout = self.call_monitor_timeout
replies.append(driver.send(target,
self.ctxt,
{'tx_id': i},
wait_for_reply=True,
timeout=timeout))
timeout=timeout,
call_monitor_timeout=cm_timeout))
self.assertFalse(self.failure)
self.assertIsNone(self.timeout)
except (ZeroDivisionError, oslo_messaging.MessagingTimeout) as e:
@ -594,14 +602,15 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
wait_conditions = []
orig_reply_waiter = amqpdriver.ReplyWaiter.wait
def reply_waiter(self, msg_id, timeout):
def reply_waiter(self, msg_id, timeout, call_monitor_timeout):
if wait_conditions:
cond = wait_conditions.pop()
with cond:
cond.notify()
with cond:
cond.wait()
return orig_reply_waiter(self, msg_id, timeout)
return orig_reply_waiter(self, msg_id, timeout,
call_monitor_timeout)
self.useFixture(fixtures.MockPatchObject(
amqpdriver.ReplyWaiter, 'wait', reply_waiter))
@ -891,11 +900,12 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
]
_context = [
('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
('empty_ctxt', dict(ctxt={}, expected_ctxt={'client_timeout': None})),
('user_project_ctxt',
dict(ctxt={'_context_user': 'mark',
'_context_project': 'snarkybunch'},
expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
expected_ctxt={'user': 'mark', 'project': 'snarkybunch',
'client_timeout': None})),
]
_compression = [
@ -940,6 +950,7 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
'_msg_id': uuid.uuid4().hex,
'_unique_id': uuid.uuid4().hex,
'_reply_q': 'reply_' + uuid.uuid4().hex,
'_timeout': None,
})
msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])

View File

@ -152,6 +152,42 @@ class CallTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(10, server.endpoint.ival)
def test_monitor_long_call(self):
if not self.url.startswith("rabbit://"):
self.skipTest("backend does not support call monitoring")
transport = self.useFixture(utils.RPCTransportFixture(self.conf,
self.url))
target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()),
server='server_' + str(uuid.uuid4()))
class _endpoint(object):
def delay(self, ctxt, seconds):
time.sleep(seconds)
return seconds
self.useFixture(
utils.RpcServerFixture(self.conf, self.url, target,
executor='threading',
endpoint=_endpoint()))
# First case, no monitoring, ensure we timeout normally when the
# server side runs long
client1 = utils.ClientStub(transport.transport, target,
cast=False, timeout=1)
self.assertRaises(oslo_messaging.MessagingTimeout,
client1.delay, seconds=4)
# Second case, set a short call monitor timeout and a very
# long overall timeout. If we didn't honor the call monitor
# timeout, we would wait an hour, past the test timeout. If
# the server was not sending message heartbeats, we'd time out
# after two seconds.
client2 = utils.ClientStub(transport.transport, target,
cast=False, timeout=3600,
call_monitor_timeout=2)
self.assertEqual(4, client2.delay(seconds=4))
def test_endpoint_version_namespace(self):
# verify endpoint version and namespace are checked
target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()),

View File

@ -53,6 +53,7 @@ class TestCastCall(test_utils.BaseTestCase):
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
kwargs['call_monitor_timeout'] = None
method = client.call if self.call else client.cast
method(self.ctxt, 'foo', **self.args)
@ -215,19 +216,21 @@ class TestCallTimeout(test_utils.BaseTestCase):
scenarios = [
('all_none',
dict(confval=None, ctor=None, prepare=_notset, expect=None)),
dict(confval=None, ctor=None, prepare=_notset, expect=None, cm=None)),
('confval',
dict(confval=21, ctor=None, prepare=_notset, expect=21)),
dict(confval=21, ctor=None, prepare=_notset, expect=21, cm=None)),
('ctor',
dict(confval=None, ctor=21.1, prepare=_notset, expect=21.1)),
dict(confval=None, ctor=21.1, prepare=_notset, expect=21.1, cm=None)),
('ctor_zero',
dict(confval=None, ctor=0, prepare=_notset, expect=0)),
dict(confval=None, ctor=0, prepare=_notset, expect=0, cm=None)),
('prepare',
dict(confval=None, ctor=None, prepare=21.1, expect=21.1)),
dict(confval=None, ctor=None, prepare=21.1, expect=21.1, cm=None)),
('prepare_override',
dict(confval=None, ctor=10.1, prepare=21.1, expect=21.1)),
dict(confval=None, ctor=10.1, prepare=21.1, expect=21.1, cm=None)),
('prepare_zero',
dict(confval=None, ctor=None, prepare=0, expect=0)),
dict(confval=None, ctor=None, prepare=0, expect=0, cm=None)),
('call_monitor',
dict(confval=None, ctor=None, prepare=60, expect=60, cm=30)),
]
def test_call_timeout(self):
@ -235,12 +238,14 @@ class TestCallTimeout(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
timeout=self.ctor)
timeout=self.ctor,
call_monitor_timeout=self.cm)
transport._send = mock.Mock()
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None)
kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None,
call_monitor_timeout=self.cm)
if self.prepare is not _notset:
client = client.prepare(timeout=self.prepare)
@ -272,7 +277,7 @@ class TestCallRetry(test_utils.BaseTestCase):
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=60,
retry=self.expect)
retry=self.expect, call_monitor_timeout=None)
if self.prepare is not _notset:
client = client.prepare(retry=self.prepare)
@ -332,6 +337,8 @@ class TestSerializer(test_utils.BaseTestCase):
kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
kwargs['retry'] = None
if self.call:
kwargs['call_monitor_timeout'] = None
transport._send.return_value = self.retval
@ -441,6 +448,7 @@ class TestVersionCap(test_utils.BaseTestCase):
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
kwargs['call_monitor_timeout'] = None
prep_kwargs = {}
if self.prepare_cap is not _notset:

View File

@ -288,6 +288,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
'message',
wait_for_reply=None,
timeout=None,
call_monitor_timeout=None,
retry=None)
def test_send_all_args(self):
@ -297,7 +298,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
t._send(self._target, 'ctxt', 'message',
wait_for_reply='wait_for_reply',
timeout='timeout', retry='retry')
timeout='timeout', call_monitor_timeout='cm_timeout',
retry='retry')
t._driver.send.\
assert_called_once_with(self._target,
@ -305,6 +307,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
'message',
wait_for_reply='wait_for_reply',
timeout='timeout',
call_monitor_timeout='cm_timeout',
retry='retry')
def test_send_notification(self):

View File

@ -122,13 +122,15 @@ class Transport(object):
self._driver.require_features(requeue=requeue)
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
call_monitor_timeout=None, retry=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
return self._driver.send(target, ctxt, message,
wait_for_reply=wait_for_reply,
timeout=timeout, retry=retry)
timeout=timeout,
call_monitor_timeout=call_monitor_timeout,
retry=retry)
def _send_notification(self, target, ctxt, message, version, retry=None):
if not target.topic: