From f948e24294dfb7dfa1342d64da749c3adeb5e2e5 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Wed, 20 Feb 2019 12:08:04 -0500 Subject: [PATCH] Handle unexpected failures during call monitor heartbeat Change-Id: Iec04c18ac3565a3610377d94caf128c6704a89eb Closes-Bug: #1816816 --- oslo_messaging/_drivers/amqpdriver.py | 9 ++++- oslo_messaging/rpc/dispatcher.py | 11 +++++- oslo_messaging/tests/rpc/test_dispatcher.py | 40 ++++++++++++++++++++- 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 47967d71c..ebbc35d8d 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -32,6 +32,7 @@ from oslo_messaging._i18n import _ from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LI from oslo_messaging._i18n import _LW +from oslo_messaging import MessageDeliveryFailure LOG = logging.getLogger(__name__) @@ -168,9 +169,15 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): return def heartbeat(self): + # generate a keep alive for RPC call monitoring with self.listener.driver._get_connection( rpc_common.PURPOSE_SEND) as conn: - self._send_reply(conn, None, None, ending=False) + try: + self._send_reply(conn, None, None, ending=False) + except rpc_amqp.AMQPDestinationNotFound: + # internal exception that indicates queue/exchange gone - + # broker unreachable. + raise MessageDeliveryFailure("Heartbeat send failed") # NOTE(sileht): Those have already be ack in RpcListener IO thread # We keep them as noop until all drivers do the same diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index 8eb1273fb..54cc15ded 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -218,7 +218,16 @@ class RPCDispatcher(dispatcher.DispatcherBase): '(interval=%(interval)i)' % ( {'method': incoming.message.get('method'), 'interval': cm_heartbeat_interval})) - incoming.heartbeat() + try: + incoming.heartbeat() + except Exception as exc: + # The heartbeat message failed to send. Likely the broker or + # client has died. Nothing to do here but exit the watchdog + # thread. If the client is still alive (dead broker) then its + # RPC will timeout as expected. + LOG.debug("Call-monitor heartbeat failed: %(exc)s" + % ({'exc': exc})) + break def dispatch(self, incoming): """Dispatch an RPC message to the appropriate endpoint method. diff --git a/oslo_messaging/tests/rpc/test_dispatcher.py b/oslo_messaging/tests/rpc/test_dispatcher.py index 00b36e718..0bc201fb0 100755 --- a/oslo_messaging/tests/rpc/test_dispatcher.py +++ b/oslo_messaging/tests/rpc/test_dispatcher.py @@ -13,6 +13,7 @@ # under the License. import testscenarios +import time import oslo_messaging from oslo_messaging import rpc @@ -180,7 +181,8 @@ class TestDispatcher(test_utils.BaseTestCase): dispatcher = oslo_messaging.RPCDispatcher(endpoints, serializer, self.access_policy) - incoming = mock.Mock(ctxt=self.ctxt, message=self.msg) + incoming = mock.Mock(ctxt=self.ctxt, message=self.msg, + client_timeout=0) res = None @@ -252,6 +254,7 @@ class TestSerializer(test_utils.BaseTestCase): incoming = mock.Mock() incoming.ctxt = self.ctxt incoming.message = dict(method='foo', args=self.args) + incoming.client_timeout = 0 retval = dispatcher.dispatch(incoming) if self.retval is not None: self.assertEqual('s' + self.retval, retval) @@ -265,3 +268,38 @@ class TestSerializer(test_utils.BaseTestCase): serializer.serialize_entity.assert_called_once_with(self.dctxt, self.retval) + + +class TestMonitorFailure(test_utils.BaseTestCase): + """Test what happens when the call monitor watchdog hits an exception when + sending the heartbeat. + """ + + class _SleepyEndpoint(object): + def __init__(self, target=None): + self.target = target + + def sleep(self, ctxt, **kwargs): + time.sleep(kwargs['timeout']) + return True + + def test_heartbeat_failure(self): + + endpoints = [self._SleepyEndpoint()] + dispatcher = oslo_messaging.RPCDispatcher(endpoints, + serializer=None) + + # sleep long enough for the client_timeout to expire multiple times + # the timeout is (client_timeout/2) and must be > 1.0 + message = {'method': 'sleep', + 'args': {'timeout': 3.5}} + ctxt = {'test': 'value'} + + incoming = mock.Mock(ctxt=ctxt, message=message, client_timeout=2.0) + incoming.heartbeat = mock.Mock(side_effect=Exception('BOOM!')) + res = dispatcher.dispatch(incoming) + self.assertTrue(res) + + # only one call to heartbeat should be made since the watchdog thread + # should exit on the first exception thrown + self.assertEqual(1, incoming.heartbeat.call_count)