Catch exceptions for all rpc casts
I15cc2d6ae48e505c2da121880e27481dedf36d3b catches exceptions for specific RPC endpoints affected by a recent change related to push-notifications. There may be more changes like that in the future, so instead of fixing them one by one, this patch consistently catches exceptions from all cast calls for all RPC clients. Change-Id: Ia7e6cd717758a9d5b18fe9cb07c55938f52040ce Partial-Bug: #1705351
This commit is contained in:
parent
cbe0f03f08
commit
85d8f6f1c8
@ -73,7 +73,7 @@ def cleanup():
|
||||
assert NOTIFIER is not None
|
||||
TRANSPORT.cleanup()
|
||||
NOTIFICATION_TRANSPORT.cleanup()
|
||||
_ContextWrapper.reset_timeouts()
|
||||
_BackingOffContextWrapper.reset_timeouts()
|
||||
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
|
||||
|
||||
|
||||
@ -98,6 +98,24 @@ def _get_default_method_timeouts():
|
||||
|
||||
|
||||
class _ContextWrapper(object):
|
||||
def __init__(self, original_context):
|
||||
self._original_context = original_context
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._original_context, name)
|
||||
|
||||
def cast(self, ctxt, method, **kwargs):
|
||||
try:
|
||||
self._original_context.cast(ctxt, method, **kwargs)
|
||||
except Exception:
|
||||
# TODO(kevinbenton): make catch specific to missing exchange once
|
||||
# bug/1705351 is resolved on the oslo.messaging side; if
|
||||
# oslo.messaging auto-creates the exchange, then just remove the
|
||||
# code completely
|
||||
LOG.exception("Ignored exception during cast")
|
||||
|
||||
|
||||
class _BackingOffContextWrapper(_ContextWrapper):
|
||||
"""Wraps oslo messaging contexts to set the timeout for calls.
|
||||
|
||||
This intercepts RPC calls and sets the timeout value to the globally
|
||||
@ -130,12 +148,6 @@ class _ContextWrapper(object):
|
||||
})
|
||||
cls._max_timeout = max_timeout
|
||||
|
||||
def __init__(self, original_context):
|
||||
self._original_context = original_context
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._original_context, name)
|
||||
|
||||
def call(self, ctxt, method, **kwargs):
|
||||
# two methods with the same name in different namespaces should
|
||||
# be tracked independently
|
||||
@ -178,19 +190,21 @@ class BackingOffClient(oslo_messaging.RPCClient):
|
||||
"""An oslo messaging RPC Client that implements a timeout backoff.
|
||||
|
||||
This has all of the same interfaces as oslo_messaging.RPCClient but
|
||||
if the timeout parameter is not specified, the _ContextWrapper returned
|
||||
will track when call timeout exceptions occur and exponentially increase
|
||||
the timeout for the given call method.
|
||||
if the timeout parameter is not specified, the _BackingOffContextWrapper
|
||||
returned will track when call timeout exceptions occur and exponentially
|
||||
increase the timeout for the given call method.
|
||||
"""
|
||||
def prepare(self, *args, **kwargs):
|
||||
ctx = super(BackingOffClient, self).prepare(*args, **kwargs)
|
||||
# don't enclose Contexts that explicitly set a timeout
|
||||
return _ContextWrapper(ctx) if 'timeout' not in kwargs else ctx
|
||||
# don't back off contexts that explicitly set a timeout
|
||||
if 'timeout' in kwargs:
|
||||
return _ContextWrapper(ctx)
|
||||
return _BackingOffContextWrapper(ctx)
|
||||
|
||||
@staticmethod
|
||||
def set_max_timeout(max_timeout):
|
||||
'''Set RPC timeout ceiling for all backing-off RPC clients.'''
|
||||
_ContextWrapper.set_max_timeout(max_timeout)
|
||||
_BackingOffContextWrapper.set_max_timeout(max_timeout)
|
||||
|
||||
|
||||
def get_client(target, version_cap=None, serializer=None):
|
||||
|
@ -22,7 +22,6 @@ from neutron_lib.plugins import directory
|
||||
from neutron_lib.plugins.ml2 import api
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
import six
|
||||
from sqlalchemy.orm import exc
|
||||
|
||||
from neutron._i18n import _LE, _LW
|
||||
@ -364,19 +363,6 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
||||
'failed_devices_down': failed_devices_down}
|
||||
|
||||
|
||||
def _suppress_cast_exceptions(f):
|
||||
"""Decorator to ignore exchange not found exceptions."""
|
||||
@six.wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except Exception:
|
||||
# TODO(kevinbenton): make catch specific to missing exchange once
|
||||
# bug/1705351 is resolved on the oslo.messaging side.
|
||||
LOG.exception("Ignored exception during cast")
|
||||
return wrapped
|
||||
|
||||
|
||||
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
||||
sg_rpc.SecurityGroupAgentRpcApiMixin,
|
||||
type_tunnel.TunnelAgentRpcApiMixin):
|
||||
@ -407,13 +393,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
||||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
@_suppress_cast_exceptions
|
||||
def network_delete(self, context, network_id):
|
||||
cctxt = self.client.prepare(topic=self.topic_network_delete,
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'network_delete', network_id=network_id)
|
||||
|
||||
@_suppress_cast_exceptions
|
||||
def port_update(self, context, port, network_type, segmentation_id,
|
||||
physical_network):
|
||||
cctxt = self.client.prepare(topic=self.topic_port_update,
|
||||
@ -422,13 +406,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
||||
network_type=network_type, segmentation_id=segmentation_id,
|
||||
physical_network=physical_network)
|
||||
|
||||
@_suppress_cast_exceptions
|
||||
def port_delete(self, context, port_id):
|
||||
cctxt = self.client.prepare(topic=self.topic_port_delete,
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'port_delete', port_id=port_id)
|
||||
|
||||
@_suppress_cast_exceptions
|
||||
def network_update(self, context, network):
|
||||
cctxt = self.client.prepare(topic=self.topic_network_update,
|
||||
fanout=True, version='1.4')
|
||||
|
@ -335,21 +335,26 @@ class TimeoutTestCase(base.DietTestCase):
|
||||
# ensure that the timeout was not increased and the back-off sleep
|
||||
# wasn't called
|
||||
self.assertEqual(
|
||||
5, rpc._ContextWrapper._METHOD_TIMEOUTS['create_pb_and_j'])
|
||||
5,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['create_pb_and_j'])
|
||||
self.assertFalse(self.sleep.called)
|
||||
|
||||
def test_timeout_store_defaults(self):
|
||||
# any method should default to the configured timeout
|
||||
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'])
|
||||
self.assertEqual(
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'])
|
||||
# a change to an existing should not affect new or existing ones
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000
|
||||
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_3'])
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000
|
||||
self.assertEqual(
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_3'])
|
||||
|
||||
def test_method_timeout_sleep(self):
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout = 2
|
||||
@ -362,7 +367,7 @@ class TimeoutTestCase(base.DietTestCase):
|
||||
self.sleep.reset_mock()
|
||||
|
||||
def test_method_timeout_increases_on_timeout_exception(self):
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
for i in range(5):
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
@ -378,15 +383,17 @@ class TimeoutTestCase(base.DietTestCase):
|
||||
for i in range(5):
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(
|
||||
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(
|
||||
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
|
||||
def test_timeout_unchanged_on_other_exception(self):
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
rpc.TRANSPORT._send.side_effect = ValueError
|
||||
with testtools.ExpectedException(ValueError):
|
||||
self.client.call(self.call_context, 'method_1')
|
||||
@ -398,8 +405,8 @@ class TimeoutTestCase(base.DietTestCase):
|
||||
self.assertEqual([1, 1], timeouts)
|
||||
|
||||
def test_timeouts_for_methods_tracked_independently(self):
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 1
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 1
|
||||
for method in ('method_1', 'method_1', 'method_2',
|
||||
'method_1', 'method_2'):
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
@ -409,8 +416,8 @@ class TimeoutTestCase(base.DietTestCase):
|
||||
self.assertEqual([1, 2, 1, 4, 2], timeouts)
|
||||
|
||||
def test_timeouts_for_namespaces_tracked_independently(self):
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1
|
||||
for ns in ('ns1', 'ns2'):
|
||||
self.client.target.namespace = ns
|
||||
for i in range(4):
|
||||
@ -421,7 +428,7 @@ class TimeoutTestCase(base.DietTestCase):
|
||||
self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts)
|
||||
|
||||
def test_method_timeout_increases_with_prepare(self):
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
|
||||
ctx = self.client.prepare(version='1.4')
|
||||
with testtools.ExpectedException(messaging.MessagingTimeout):
|
||||
ctx.call(self.call_context, 'method_1')
|
||||
@ -435,23 +442,48 @@ class TimeoutTestCase(base.DietTestCase):
|
||||
|
||||
def test_set_max_timeout_caps_all_methods(self):
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout = 300
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 100
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 100
|
||||
rpc.BackingOffClient.set_max_timeout(50)
|
||||
# both explicitly tracked
|
||||
self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(
|
||||
50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
# as well as new methods
|
||||
self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'])
|
||||
self.assertEqual(
|
||||
50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'])
|
||||
|
||||
def test_set_max_timeout_retains_lower_timeouts(self):
|
||||
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 10
|
||||
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 10
|
||||
rpc.BackingOffClient.set_max_timeout(50)
|
||||
self.assertEqual(10, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
self.assertEqual(
|
||||
10, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
|
||||
|
||||
def test_set_max_timeout_overrides_default_timeout(self):
|
||||
rpc.TRANSPORT.conf.rpc_response_timeout = 10
|
||||
self.assertEqual(10 * 10, rpc._ContextWrapper.get_max_timeout())
|
||||
rpc._ContextWrapper.set_max_timeout(10)
|
||||
self.assertEqual(10, rpc._ContextWrapper.get_max_timeout())
|
||||
self.assertEqual(
|
||||
10 * 10, rpc._BackingOffContextWrapper.get_max_timeout())
|
||||
rpc._BackingOffContextWrapper.set_max_timeout(10)
|
||||
self.assertEqual(10, rpc._BackingOffContextWrapper.get_max_timeout())
|
||||
|
||||
|
||||
class CastExceptionTestCase(base.DietTestCase):
|
||||
def setUp(self):
|
||||
super(CastExceptionTestCase, self).setUp()
|
||||
|
||||
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
|
||||
self.messaging_conf.transport_driver = 'fake'
|
||||
self.messaging_conf.response_timeout = 0
|
||||
self.useFixture(self.messaging_conf)
|
||||
|
||||
self.addCleanup(rpc.cleanup)
|
||||
rpc.init(CONF)
|
||||
rpc.TRANSPORT = mock.MagicMock()
|
||||
rpc.TRANSPORT._send.side_effect = Exception
|
||||
target = messaging.Target(version='1.0', topic='testing')
|
||||
self.client = rpc.get_client(target)
|
||||
self.cast_context = mock.Mock()
|
||||
|
||||
def test_cast_catches_exception(self):
|
||||
self.client.cast(self.cast_context, 'method_1')
|
||||
|
||||
|
||||
class TestConnection(base.DietTestCase):
|
||||
|
Loading…
x
Reference in New Issue
Block a user