diff --git a/neutron/common/rpc.py b/neutron/common/rpc.py index bb9ddcdb8c7..a8c729f8c62 100644 --- a/neutron/common/rpc.py +++ b/neutron/common/rpc.py @@ -73,7 +73,7 @@ def cleanup(): assert NOTIFIER is not None TRANSPORT.cleanup() NOTIFICATION_TRANSPORT.cleanup() - _ContextWrapper.reset_timeouts() + _BackingOffContextWrapper.reset_timeouts() TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None @@ -98,6 +98,24 @@ def _get_default_method_timeouts(): class _ContextWrapper(object): + def __init__(self, original_context): + self._original_context = original_context + + def __getattr__(self, name): + return getattr(self._original_context, name) + + def cast(self, ctxt, method, **kwargs): + try: + self._original_context.cast(ctxt, method, **kwargs) + except Exception: + # TODO(kevinbenton): make catch specific to missing exchange once + # bug/1705351 is resolved on the oslo.messaging side; if + # oslo.messaging auto-creates the exchange, then just remove the + # code completely + LOG.exception("Ignored exception during cast") + + +class _BackingOffContextWrapper(_ContextWrapper): """Wraps oslo messaging contexts to set the timeout for calls. This intercepts RPC calls and sets the timeout value to the globally @@ -130,12 +148,6 @@ class _ContextWrapper(object): }) cls._max_timeout = max_timeout - def __init__(self, original_context): - self._original_context = original_context - - def __getattr__(self, name): - return getattr(self._original_context, name) - def call(self, ctxt, method, **kwargs): # two methods with the same name in different namespaces should # be tracked independently @@ -178,19 +190,21 @@ class BackingOffClient(oslo_messaging.RPCClient): """An oslo messaging RPC Client that implements a timeout backoff. This has all of the same interfaces as oslo_messaging.RPCClient but - if the timeout parameter is not specified, the _ContextWrapper returned - will track when call timeout exceptions occur and exponentially increase - the timeout for the given call method. + if the timeout parameter is not specified, the _BackingOffContextWrapper + returned will track when call timeout exceptions occur and exponentially + increase the timeout for the given call method. """ def prepare(self, *args, **kwargs): ctx = super(BackingOffClient, self).prepare(*args, **kwargs) - # don't enclose Contexts that explicitly set a timeout - return _ContextWrapper(ctx) if 'timeout' not in kwargs else ctx + # don't back off contexts that explicitly set a timeout + if 'timeout' in kwargs: + return _ContextWrapper(ctx) + return _BackingOffContextWrapper(ctx) @staticmethod def set_max_timeout(max_timeout): '''Set RPC timeout ceiling for all backing-off RPC clients.''' - _ContextWrapper.set_max_timeout(max_timeout) + _BackingOffContextWrapper.set_max_timeout(max_timeout) def get_client(target, version_cap=None, serializer=None): diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index feb84e8c494..cd27928a632 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -22,7 +22,6 @@ from neutron_lib.plugins import directory from neutron_lib.plugins.ml2 import api from oslo_log import log import oslo_messaging -import six from sqlalchemy.orm import exc from neutron._i18n import _LE, _LW @@ -371,19 +370,6 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): 'failed_devices_down': failed_devices_down} -def _suppress_cast_exceptions(f): - """Decorator to ignore exchange not found exceptions.""" - @six.wraps(f) - def wrapped(*args, **kwargs): - try: - return f(*args, **kwargs) - except Exception: - # TODO(kevinbenton): make catch specific to missing exchange once - # bug/1705351 is resolved on the oslo.messaging side. - LOG.exception("Ignored exception during cast") - return wrapped - - class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, type_tunnel.TunnelAgentRpcApiMixin): @@ -414,13 +400,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, target = oslo_messaging.Target(topic=topic, version='1.0') self.client = n_rpc.get_client(target) - @_suppress_cast_exceptions def network_delete(self, context, network_id): cctxt = self.client.prepare(topic=self.topic_network_delete, fanout=True) cctxt.cast(context, 'network_delete', network_id=network_id) - @_suppress_cast_exceptions def port_update(self, context, port, network_type, segmentation_id, physical_network): cctxt = self.client.prepare(topic=self.topic_port_update, @@ -429,13 +413,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, network_type=network_type, segmentation_id=segmentation_id, physical_network=physical_network) - @_suppress_cast_exceptions def port_delete(self, context, port_id): cctxt = self.client.prepare(topic=self.topic_port_delete, fanout=True) cctxt.cast(context, 'port_delete', port_id=port_id) - @_suppress_cast_exceptions def network_update(self, context, network): cctxt = self.client.prepare(topic=self.topic_network_update, fanout=True, version='1.4') diff --git a/neutron/tests/unit/common/test_rpc.py b/neutron/tests/unit/common/test_rpc.py index 204a524fbde..dc4bfc4c0c0 100644 --- a/neutron/tests/unit/common/test_rpc.py +++ b/neutron/tests/unit/common/test_rpc.py @@ -335,21 +335,26 @@ class TimeoutTestCase(base.DietTestCase): # ensure that the timeout was not increased and the back-off sleep # wasn't called self.assertEqual( - 5, rpc._ContextWrapper._METHOD_TIMEOUTS['create_pb_and_j']) + 5, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['create_pb_and_j']) self.assertFalse(self.sleep.called) def test_timeout_store_defaults(self): # any method should default to the configured timeout - self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) - self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_2']) + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2']) # a change to an existing should not affect new or existing ones - rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000 - self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) - self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_3']) + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000 + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_3']) def test_method_timeout_sleep(self): rpc.TRANSPORT.conf.rpc_response_timeout = 2 @@ -362,7 +367,7 @@ class TimeoutTestCase(base.DietTestCase): self.sleep.reset_mock() def test_method_timeout_increases_on_timeout_exception(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 for i in range(5): with testtools.ExpectedException(messaging.MessagingTimeout): self.client.call(self.call_context, 'method_1') @@ -378,15 +383,17 @@ class TimeoutTestCase(base.DietTestCase): for i in range(5): with testtools.ExpectedException(messaging.MessagingTimeout): self.client.call(self.call_context, 'method_1') - self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + 10 * rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) with testtools.ExpectedException(messaging.MessagingTimeout): self.client.call(self.call_context, 'method_1') - self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + 10 * rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) def test_timeout_unchanged_on_other_exception(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 rpc.TRANSPORT._send.side_effect = ValueError with testtools.ExpectedException(ValueError): self.client.call(self.call_context, 'method_1') @@ -398,8 +405,8 @@ class TimeoutTestCase(base.DietTestCase): self.assertEqual([1, 1], timeouts) def test_timeouts_for_methods_tracked_independently(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 - rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 1 for method in ('method_1', 'method_1', 'method_2', 'method_1', 'method_2'): with testtools.ExpectedException(messaging.MessagingTimeout): @@ -409,8 +416,8 @@ class TimeoutTestCase(base.DietTestCase): self.assertEqual([1, 2, 1, 4, 2], timeouts) def test_timeouts_for_namespaces_tracked_independently(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1 - rpc._ContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1 for ns in ('ns1', 'ns2'): self.client.target.namespace = ns for i in range(4): @@ -421,7 +428,7 @@ class TimeoutTestCase(base.DietTestCase): self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts) def test_method_timeout_increases_with_prepare(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 ctx = self.client.prepare(version='1.4') with testtools.ExpectedException(messaging.MessagingTimeout): ctx.call(self.call_context, 'method_1') @@ -435,23 +442,48 @@ class TimeoutTestCase(base.DietTestCase): def test_set_max_timeout_caps_all_methods(self): rpc.TRANSPORT.conf.rpc_response_timeout = 300 - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 100 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 100 rpc.BackingOffClient.set_max_timeout(50) # both explicitly tracked - self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + 50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) # as well as new methods - self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_2']) + self.assertEqual( + 50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2']) def test_set_max_timeout_retains_lower_timeouts(self): - rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 10 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 10 rpc.BackingOffClient.set_max_timeout(50) - self.assertEqual(10, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + 10, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) def test_set_max_timeout_overrides_default_timeout(self): rpc.TRANSPORT.conf.rpc_response_timeout = 10 - self.assertEqual(10 * 10, rpc._ContextWrapper.get_max_timeout()) - rpc._ContextWrapper.set_max_timeout(10) - self.assertEqual(10, rpc._ContextWrapper.get_max_timeout()) + self.assertEqual( + 10 * 10, rpc._BackingOffContextWrapper.get_max_timeout()) + rpc._BackingOffContextWrapper.set_max_timeout(10) + self.assertEqual(10, rpc._BackingOffContextWrapper.get_max_timeout()) + + +class CastExceptionTestCase(base.DietTestCase): + def setUp(self): + super(CastExceptionTestCase, self).setUp() + + self.messaging_conf = messaging_conffixture.ConfFixture(CONF) + self.messaging_conf.transport_driver = 'fake' + self.messaging_conf.response_timeout = 0 + self.useFixture(self.messaging_conf) + + self.addCleanup(rpc.cleanup) + rpc.init(CONF) + rpc.TRANSPORT = mock.MagicMock() + rpc.TRANSPORT._send.side_effect = Exception + target = messaging.Target(version='1.0', topic='testing') + self.client = rpc.get_client(target) + self.cast_context = mock.Mock() + + def test_cast_catches_exception(self): + self.client.cast(self.cast_context, 'method_1') class TestConnection(base.DietTestCase):