Merge "Catch exceptions for all rpc casts"

This commit is contained in:
Jenkins 2017-08-02 06:43:02 +00:00 committed by Gerrit Code Review
commit 90feab379a
3 changed files with 88 additions and 60 deletions

View File

@ -73,7 +73,7 @@ def cleanup():
assert NOTIFIER is not None
TRANSPORT.cleanup()
NOTIFICATION_TRANSPORT.cleanup()
_ContextWrapper.reset_timeouts()
_BackingOffContextWrapper.reset_timeouts()
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
@ -98,6 +98,24 @@ def _get_default_method_timeouts():
class _ContextWrapper(object):
def __init__(self, original_context):
self._original_context = original_context
def __getattr__(self, name):
return getattr(self._original_context, name)
def cast(self, ctxt, method, **kwargs):
try:
self._original_context.cast(ctxt, method, **kwargs)
except Exception:
# TODO(kevinbenton): make catch specific to missing exchange once
# bug/1705351 is resolved on the oslo.messaging side; if
# oslo.messaging auto-creates the exchange, then just remove the
# code completely
LOG.exception("Ignored exception during cast")
class _BackingOffContextWrapper(_ContextWrapper):
"""Wraps oslo messaging contexts to set the timeout for calls.
This intercepts RPC calls and sets the timeout value to the globally
@ -130,12 +148,6 @@ class _ContextWrapper(object):
})
cls._max_timeout = max_timeout
def __init__(self, original_context):
self._original_context = original_context
def __getattr__(self, name):
return getattr(self._original_context, name)
def call(self, ctxt, method, **kwargs):
# two methods with the same name in different namespaces should
# be tracked independently
@ -178,19 +190,21 @@ class BackingOffClient(oslo_messaging.RPCClient):
"""An oslo messaging RPC Client that implements a timeout backoff.
This has all of the same interfaces as oslo_messaging.RPCClient but
if the timeout parameter is not specified, the _ContextWrapper returned
will track when call timeout exceptions occur and exponentially increase
the timeout for the given call method.
if the timeout parameter is not specified, the _BackingOffContextWrapper
returned will track when call timeout exceptions occur and exponentially
increase the timeout for the given call method.
"""
def prepare(self, *args, **kwargs):
ctx = super(BackingOffClient, self).prepare(*args, **kwargs)
# don't enclose Contexts that explicitly set a timeout
return _ContextWrapper(ctx) if 'timeout' not in kwargs else ctx
# don't back off contexts that explicitly set a timeout
if 'timeout' in kwargs:
return _ContextWrapper(ctx)
return _BackingOffContextWrapper(ctx)
@staticmethod
def set_max_timeout(max_timeout):
'''Set RPC timeout ceiling for all backing-off RPC clients.'''
_ContextWrapper.set_max_timeout(max_timeout)
_BackingOffContextWrapper.set_max_timeout(max_timeout)
def get_client(target, version_cap=None, serializer=None):

View File

@ -22,7 +22,6 @@ from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from oslo_log import log
import oslo_messaging
import six
from sqlalchemy.orm import exc
from neutron._i18n import _LE, _LW
@ -371,19 +370,6 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
'failed_devices_down': failed_devices_down}
def _suppress_cast_exceptions(f):
"""Decorator to ignore exchange not found exceptions."""
@six.wraps(f)
def wrapped(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception:
# TODO(kevinbenton): make catch specific to missing exchange once
# bug/1705351 is resolved on the oslo.messaging side.
LOG.exception("Ignored exception during cast")
return wrapped
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
@ -414,13 +400,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
@_suppress_cast_exceptions
def network_delete(self, context, network_id):
cctxt = self.client.prepare(topic=self.topic_network_delete,
fanout=True)
cctxt.cast(context, 'network_delete', network_id=network_id)
@_suppress_cast_exceptions
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
cctxt = self.client.prepare(topic=self.topic_port_update,
@ -429,13 +413,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
network_type=network_type, segmentation_id=segmentation_id,
physical_network=physical_network)
@_suppress_cast_exceptions
def port_delete(self, context, port_id):
cctxt = self.client.prepare(topic=self.topic_port_delete,
fanout=True)
cctxt.cast(context, 'port_delete', port_id=port_id)
@_suppress_cast_exceptions
def network_update(self, context, network):
cctxt = self.client.prepare(topic=self.topic_network_update,
fanout=True, version='1.4')

View File

@ -335,21 +335,26 @@ class TimeoutTestCase(base.DietTestCase):
# ensure that the timeout was not increased and the back-off sleep
# wasn't called
self.assertEqual(
5, rpc._ContextWrapper._METHOD_TIMEOUTS['create_pb_and_j'])
5,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['create_pb_and_j'])
self.assertFalse(self.sleep.called)
def test_timeout_store_defaults(self):
# any method should default to the configured timeout
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'])
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'])
# a change to an existing should not affect new or existing ones
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_3'])
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_3'])
def test_method_timeout_sleep(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 2
@ -362,7 +367,7 @@ class TimeoutTestCase(base.DietTestCase):
self.sleep.reset_mock()
def test_method_timeout_increases_on_timeout_exception(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
for i in range(5):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
@ -378,15 +383,17 @@ class TimeoutTestCase(base.DietTestCase):
for i in range(5):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
def test_timeout_unchanged_on_other_exception(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc.TRANSPORT._send.side_effect = ValueError
with testtools.ExpectedException(ValueError):
self.client.call(self.call_context, 'method_1')
@ -398,8 +405,8 @@ class TimeoutTestCase(base.DietTestCase):
self.assertEqual([1, 1], timeouts)
def test_timeouts_for_methods_tracked_independently(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 1
for method in ('method_1', 'method_1', 'method_2',
'method_1', 'method_2'):
with testtools.ExpectedException(messaging.MessagingTimeout):
@ -409,8 +416,8 @@ class TimeoutTestCase(base.DietTestCase):
self.assertEqual([1, 2, 1, 4, 2], timeouts)
def test_timeouts_for_namespaces_tracked_independently(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1
rpc._ContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1
for ns in ('ns1', 'ns2'):
self.client.target.namespace = ns
for i in range(4):
@ -421,7 +428,7 @@ class TimeoutTestCase(base.DietTestCase):
self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts)
def test_method_timeout_increases_with_prepare(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
ctx = self.client.prepare(version='1.4')
with testtools.ExpectedException(messaging.MessagingTimeout):
ctx.call(self.call_context, 'method_1')
@ -435,23 +442,48 @@ class TimeoutTestCase(base.DietTestCase):
def test_set_max_timeout_caps_all_methods(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 300
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 100
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 100
rpc.BackingOffClient.set_max_timeout(50)
# both explicitly tracked
self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
# as well as new methods
self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'])
self.assertEqual(
50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'])
def test_set_max_timeout_retains_lower_timeouts(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 10
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 10
rpc.BackingOffClient.set_max_timeout(50)
self.assertEqual(10, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
10, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
def test_set_max_timeout_overrides_default_timeout(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 10
self.assertEqual(10 * 10, rpc._ContextWrapper.get_max_timeout())
rpc._ContextWrapper.set_max_timeout(10)
self.assertEqual(10, rpc._ContextWrapper.get_max_timeout())
self.assertEqual(
10 * 10, rpc._BackingOffContextWrapper.get_max_timeout())
rpc._BackingOffContextWrapper.set_max_timeout(10)
self.assertEqual(10, rpc._BackingOffContextWrapper.get_max_timeout())
class CastExceptionTestCase(base.DietTestCase):
def setUp(self):
super(CastExceptionTestCase, self).setUp()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(rpc.cleanup)
rpc.init(CONF)
rpc.TRANSPORT = mock.MagicMock()
rpc.TRANSPORT._send.side_effect = Exception
target = messaging.Target(version='1.0', topic='testing')
self.client = rpc.get_client(target)
self.cast_context = mock.Mock()
def test_cast_catches_exception(self):
self.client.cast(self.cast_context, 'method_1')
class TestConnection(base.DietTestCase):