From 85d8f6f1c8c51f32b6ec2f6fdc8cc09bbcc060eb Mon Sep 17 00:00:00 2001 From: Ihar Hrachyshka Date: Mon, 24 Jul 2017 09:22:58 -0700 Subject: [PATCH] Catch exceptions for all rpc casts I15cc2d6ae48e505c2da121880e27481dedf36d3b catches exceptions for specific RPC endpoints affected by a recent change related to push-notifications. There may be more changes like that in the future, so instead of fixing them one by one, this patch consistently catches exceptions from all cast calls for all RPC clients. Change-Id: Ia7e6cd717758a9d5b18fe9cb07c55938f52040ce Partial-Bug: #1705351 --- neutron/common/rpc.py | 40 ++++++++---- neutron/plugins/ml2/rpc.py | 18 ------ neutron/tests/unit/common/test_rpc.py | 90 ++++++++++++++++++--------- 3 files changed, 88 insertions(+), 60 deletions(-) diff --git a/neutron/common/rpc.py b/neutron/common/rpc.py index bb9ddcdb8c7..a8c729f8c62 100644 --- a/neutron/common/rpc.py +++ b/neutron/common/rpc.py @@ -73,7 +73,7 @@ def cleanup(): assert NOTIFIER is not None TRANSPORT.cleanup() NOTIFICATION_TRANSPORT.cleanup() - _ContextWrapper.reset_timeouts() + _BackingOffContextWrapper.reset_timeouts() TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None @@ -98,6 +98,24 @@ def _get_default_method_timeouts(): class _ContextWrapper(object): + def __init__(self, original_context): + self._original_context = original_context + + def __getattr__(self, name): + return getattr(self._original_context, name) + + def cast(self, ctxt, method, **kwargs): + try: + self._original_context.cast(ctxt, method, **kwargs) + except Exception: + # TODO(kevinbenton): make catch specific to missing exchange once + # bug/1705351 is resolved on the oslo.messaging side; if + # oslo.messaging auto-creates the exchange, then just remove the + # code completely + LOG.exception("Ignored exception during cast") + + +class _BackingOffContextWrapper(_ContextWrapper): """Wraps oslo messaging contexts to set the timeout for calls. This intercepts RPC calls and sets the timeout value to the globally @@ -130,12 +148,6 @@ class _ContextWrapper(object): }) cls._max_timeout = max_timeout - def __init__(self, original_context): - self._original_context = original_context - - def __getattr__(self, name): - return getattr(self._original_context, name) - def call(self, ctxt, method, **kwargs): # two methods with the same name in different namespaces should # be tracked independently @@ -178,19 +190,21 @@ class BackingOffClient(oslo_messaging.RPCClient): """An oslo messaging RPC Client that implements a timeout backoff. This has all of the same interfaces as oslo_messaging.RPCClient but - if the timeout parameter is not specified, the _ContextWrapper returned - will track when call timeout exceptions occur and exponentially increase - the timeout for the given call method. + if the timeout parameter is not specified, the _BackingOffContextWrapper + returned will track when call timeout exceptions occur and exponentially + increase the timeout for the given call method. """ def prepare(self, *args, **kwargs): ctx = super(BackingOffClient, self).prepare(*args, **kwargs) - # don't enclose Contexts that explicitly set a timeout - return _ContextWrapper(ctx) if 'timeout' not in kwargs else ctx + # don't back off contexts that explicitly set a timeout + if 'timeout' in kwargs: + return _ContextWrapper(ctx) + return _BackingOffContextWrapper(ctx) @staticmethod def set_max_timeout(max_timeout): '''Set RPC timeout ceiling for all backing-off RPC clients.''' - _ContextWrapper.set_max_timeout(max_timeout) + _BackingOffContextWrapper.set_max_timeout(max_timeout) def get_client(target, version_cap=None, serializer=None): diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 293eccbe705..22ea803e20e 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -22,7 +22,6 @@ from neutron_lib.plugins import directory from neutron_lib.plugins.ml2 import api from oslo_log import log import oslo_messaging -import six from sqlalchemy.orm import exc from neutron._i18n import _LE, _LW @@ -364,19 +363,6 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): 'failed_devices_down': failed_devices_down} -def _suppress_cast_exceptions(f): - """Decorator to ignore exchange not found exceptions.""" - @six.wraps(f) - def wrapped(*args, **kwargs): - try: - return f(*args, **kwargs) - except Exception: - # TODO(kevinbenton): make catch specific to missing exchange once - # bug/1705351 is resolved on the oslo.messaging side. - LOG.exception("Ignored exception during cast") - return wrapped - - class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, type_tunnel.TunnelAgentRpcApiMixin): @@ -407,13 +393,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, target = oslo_messaging.Target(topic=topic, version='1.0') self.client = n_rpc.get_client(target) - @_suppress_cast_exceptions def network_delete(self, context, network_id): cctxt = self.client.prepare(topic=self.topic_network_delete, fanout=True) cctxt.cast(context, 'network_delete', network_id=network_id) - @_suppress_cast_exceptions def port_update(self, context, port, network_type, segmentation_id, physical_network): cctxt = self.client.prepare(topic=self.topic_port_update, @@ -422,13 +406,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, network_type=network_type, segmentation_id=segmentation_id, physical_network=physical_network) - @_suppress_cast_exceptions def port_delete(self, context, port_id): cctxt = self.client.prepare(topic=self.topic_port_delete, fanout=True) cctxt.cast(context, 'port_delete', port_id=port_id) - @_suppress_cast_exceptions def network_update(self, context, network): cctxt = self.client.prepare(topic=self.topic_network_update, fanout=True, version='1.4') diff --git a/neutron/tests/unit/common/test_rpc.py b/neutron/tests/unit/common/test_rpc.py index 204a524fbde..dc4bfc4c0c0 100644 --- a/neutron/tests/unit/common/test_rpc.py +++ b/neutron/tests/unit/common/test_rpc.py @@ -335,21 +335,26 @@ class TimeoutTestCase(base.DietTestCase): # ensure that the timeout was not increased and the back-off sleep # wasn't called self.assertEqual( - 5, rpc._ContextWrapper._METHOD_TIMEOUTS['create_pb_and_j']) + 5, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['create_pb_and_j']) self.assertFalse(self.sleep.called) def test_timeout_store_defaults(self): # any method should default to the configured timeout - self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) - self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_2']) + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2']) # a change to an existing should not affect new or existing ones - rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000 - self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) - self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_3']) + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000 + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_3']) def test_method_timeout_sleep(self): rpc.TRANSPORT.conf.rpc_response_timeout = 2 @@ -362,7 +367,7 @@ class TimeoutTestCase(base.DietTestCase): self.sleep.reset_mock() def test_method_timeout_increases_on_timeout_exception(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 for i in range(5): with testtools.ExpectedException(messaging.MessagingTimeout): self.client.call(self.call_context, 'method_1') @@ -378,15 +383,17 @@ class TimeoutTestCase(base.DietTestCase): for i in range(5): with testtools.ExpectedException(messaging.MessagingTimeout): self.client.call(self.call_context, 'method_1') - self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + 10 * rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) with testtools.ExpectedException(messaging.MessagingTimeout): self.client.call(self.call_context, 'method_1') - self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + 10 * rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) def test_timeout_unchanged_on_other_exception(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 rpc.TRANSPORT._send.side_effect = ValueError with testtools.ExpectedException(ValueError): self.client.call(self.call_context, 'method_1') @@ -398,8 +405,8 @@ class TimeoutTestCase(base.DietTestCase): self.assertEqual([1, 1], timeouts) def test_timeouts_for_methods_tracked_independently(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 - rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 1 for method in ('method_1', 'method_1', 'method_2', 'method_1', 'method_2'): with testtools.ExpectedException(messaging.MessagingTimeout): @@ -409,8 +416,8 @@ class TimeoutTestCase(base.DietTestCase): self.assertEqual([1, 2, 1, 4, 2], timeouts) def test_timeouts_for_namespaces_tracked_independently(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1 - rpc._ContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1 for ns in ('ns1', 'ns2'): self.client.target.namespace = ns for i in range(4): @@ -421,7 +428,7 @@ class TimeoutTestCase(base.DietTestCase): self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts) def test_method_timeout_increases_with_prepare(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 ctx = self.client.prepare(version='1.4') with testtools.ExpectedException(messaging.MessagingTimeout): ctx.call(self.call_context, 'method_1') @@ -435,23 +442,48 @@ class TimeoutTestCase(base.DietTestCase): def test_set_max_timeout_caps_all_methods(self): rpc.TRANSPORT.conf.rpc_response_timeout = 300 - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 100 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 100 rpc.BackingOffClient.set_max_timeout(50) # both explicitly tracked - self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + 50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) # as well as new methods - self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_2']) + self.assertEqual( + 50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2']) def test_set_max_timeout_retains_lower_timeouts(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 10 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 10 rpc.BackingOffClient.set_max_timeout(50) - self.assertEqual(10, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + 10, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) def test_set_max_timeout_overrides_default_timeout(self): rpc.TRANSPORT.conf.rpc_response_timeout = 10 - self.assertEqual(10 * 10, rpc._ContextWrapper.get_max_timeout()) - rpc._ContextWrapper.set_max_timeout(10) - self.assertEqual(10, rpc._ContextWrapper.get_max_timeout()) + self.assertEqual( + 10 * 10, rpc._BackingOffContextWrapper.get_max_timeout()) + rpc._BackingOffContextWrapper.set_max_timeout(10) + self.assertEqual(10, rpc._BackingOffContextWrapper.get_max_timeout()) + + +class CastExceptionTestCase(base.DietTestCase): + def setUp(self): + super(CastExceptionTestCase, self).setUp() + + self.messaging_conf = messaging_conffixture.ConfFixture(CONF) + self.messaging_conf.transport_driver = 'fake' + self.messaging_conf.response_timeout = 0 + self.useFixture(self.messaging_conf) + + self.addCleanup(rpc.cleanup) + rpc.init(CONF) + rpc.TRANSPORT = mock.MagicMock() + rpc.TRANSPORT._send.side_effect = Exception + target = messaging.Target(version='1.0', topic='testing') + self.client = rpc.get_client(target) + self.cast_context = mock.Mock() + + def test_cast_catches_exception(self): + self.client.cast(self.cast_context, 'method_1') class TestConnection(base.DietTestCase):