Sync neutron callbacks into lib
Update neutron-lib callbacks with the latest from neutron in preparation for updates to the callback notification interface. The callback updates also require a few module level functions in the db package. A few additional unit tests are also added for coverage as is python docstrings for the public API contained herein. Change-Id: I74effcc4a564a92c418132042bbdd5bed22c4950
This commit is contained in:
parent
d88406462e
commit
4dcf5971c8
@ -10,7 +10,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# String literals representing core events.
|
||||
# String literals representing events associated to data store operations
|
||||
BEFORE_CREATE = 'before_create'
|
||||
BEFORE_READ = 'before_read'
|
||||
BEFORE_UPDATE = 'before_update'
|
||||
@ -25,6 +25,15 @@ AFTER_READ = 'after_read'
|
||||
AFTER_UPDATE = 'after_update'
|
||||
AFTER_DELETE = 'after_delete'
|
||||
|
||||
# String literals representing events associated to API operations
|
||||
BEFORE_RESPONSE = 'before_response'
|
||||
AFTER_REQUEST = 'after_request'
|
||||
|
||||
# String literals representing events associated to process operations
|
||||
BEFORE_INIT = 'before_init'
|
||||
AFTER_INIT = 'after_init'
|
||||
|
||||
# String literals representing events associated to error conditions
|
||||
ABORT_CREATE = 'abort_create'
|
||||
ABORT_READ = 'abort_read'
|
||||
ABORT_UPDATE = 'abort_update'
|
||||
|
@ -18,7 +18,7 @@ class Invalid(exceptions.NeutronException):
|
||||
message = _("The value '%(value)s' for %(element)s is not valid.")
|
||||
|
||||
|
||||
class CallbackFailure(Exception):
|
||||
class CallbackFailure(exceptions.MultipleExceptions):
|
||||
|
||||
def __init__(self, errors):
|
||||
self.errors = errors
|
||||
@ -29,6 +29,24 @@ class CallbackFailure(Exception):
|
||||
else:
|
||||
return str(self.errors)
|
||||
|
||||
@property
|
||||
def inner_exceptions(self):
|
||||
"""The list of unpacked errors for this exception.
|
||||
|
||||
:return: A list of unpacked errors for this exception. An unpacked
|
||||
error is the Exception's 'error' attribute if it inherits from
|
||||
NotificationError, otherwise it's the exception itself.
|
||||
"""
|
||||
if isinstance(self.errors, list):
|
||||
return [self._unpack_if_notification_error(e) for e in self.errors]
|
||||
return [self._unpack_if_notification_error(self.errors)]
|
||||
|
||||
@staticmethod
|
||||
def _unpack_if_notification_error(exc):
|
||||
if isinstance(exc, NotificationError):
|
||||
return exc.error
|
||||
return exc
|
||||
|
||||
|
||||
class NotificationError(object):
|
||||
|
||||
|
@ -18,6 +18,7 @@ from oslo_utils import reflection
|
||||
from neutron_lib._callbacks import events
|
||||
from neutron_lib._callbacks import exceptions
|
||||
from neutron_lib._i18n import _LE
|
||||
from neutron_lib.db import utils as db_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -106,6 +107,7 @@ class CallbacksManager(object):
|
||||
del self._callbacks[resource][event][callback_id]
|
||||
del self._index[callback_id]
|
||||
|
||||
@db_utils.reraise_as_retryrequest
|
||||
def notify(self, resource, event, trigger, **kwargs):
|
||||
"""Notify all subscribed callback(s).
|
||||
|
||||
@ -138,7 +140,7 @@ class CallbacksManager(object):
|
||||
{'resource': resource, 'event': event})
|
||||
|
||||
errors = []
|
||||
callbacks = self._callbacks[resource].get(event, {}).items()
|
||||
callbacks = list(self._callbacks[resource].get(event, {}).items())
|
||||
# TODO(armax): consider using a GreenPile
|
||||
for callback_id, callback in callbacks:
|
||||
try:
|
||||
|
@ -11,13 +11,21 @@
|
||||
# under the License.
|
||||
|
||||
# String literals representing core resources.
|
||||
AGENT = 'agent'
|
||||
EXTERNAL_NETWORK = 'external_network'
|
||||
FLOATING_IP = 'floating_ip'
|
||||
NETWORK = 'network'
|
||||
NETWORKS = 'networks'
|
||||
PORT = 'port'
|
||||
PORTS = 'ports'
|
||||
PROCESS = 'process'
|
||||
ROUTER = 'router'
|
||||
ROUTER_GATEWAY = 'router_gateway'
|
||||
ROUTER_INTERFACE = 'router_interface'
|
||||
SECURITY_GROUP = 'security_group'
|
||||
SECURITY_GROUP_RULE = 'security_group_rule'
|
||||
SEGMENT = 'segment'
|
||||
SUBNET = 'subnet'
|
||||
SUBNETS = 'subnets'
|
||||
SUBNET_GATEWAY = 'subnet_gateway'
|
||||
SUBNETPOOL_ADDRESS_SCOPE = 'subnetpool_address_scope'
|
||||
|
@ -10,10 +10,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from sqlalchemy.orm import properties
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_utils import excutils
|
||||
from sqlalchemy.orm import exc
|
||||
|
||||
|
||||
def get_and_validate_sort_keys(sorts, model):
|
||||
@ -51,3 +56,46 @@ def get_sort_dirs(sorts, page_reverse=False):
|
||||
if page_reverse:
|
||||
return ['desc' if s[1] else 'asc' for s in sorts]
|
||||
return ['asc' if s[1] else 'desc' for s in sorts]
|
||||
|
||||
|
||||
def _is_nested_instance(exception, etypes):
|
||||
"""Check if exception or its inner excepts are an instance of etypes."""
|
||||
return (isinstance(exception, etypes) or
|
||||
isinstance(exception, n_exc.MultipleExceptions) and
|
||||
any(_is_nested_instance(i, etypes)
|
||||
for i in exception.inner_exceptions))
|
||||
|
||||
|
||||
def is_retriable(exception):
|
||||
"""Determine if the said exception is retriable.
|
||||
|
||||
:param exception: The exception to check.
|
||||
:return: True if 'exception' is retriable, otherwise False.
|
||||
"""
|
||||
if _is_nested_instance(exception,
|
||||
(db_exc.DBDeadlock, exc.StaleDataError,
|
||||
db_exc.DBConnectionError,
|
||||
db_exc.DBDuplicateEntry, db_exc.RetryRequest)):
|
||||
return True
|
||||
# Look for savepoints mangled by deadlocks. See bug/1590298 for details.
|
||||
return (_is_nested_instance(exception, db_exc.DBError) and
|
||||
'1305' in str(exception))
|
||||
|
||||
|
||||
def reraise_as_retryrequest(function):
|
||||
"""Wrap the said function with a RetryRequest upon error.
|
||||
|
||||
:param function: The function to wrap/decorate.
|
||||
:return: The 'function' wrapped in a try block that will reraise any
|
||||
Exception's as a RetryRequest.
|
||||
"""
|
||||
@six.wraps(function)
|
||||
def _wrapped(*args, **kwargs):
|
||||
try:
|
||||
return function(*args, **kwargs)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception() as ctx:
|
||||
if is_retriable(e):
|
||||
ctx.reraise = False
|
||||
raise db_exc.RetryRequest(e)
|
||||
return _wrapped
|
||||
|
@ -248,3 +248,16 @@ class PolicyInitError(NeutronException):
|
||||
|
||||
class PolicyCheckError(NeutronException):
|
||||
message = _("Failed to check policy %(policy)s because %(reason)s.")
|
||||
|
||||
|
||||
class MultipleExceptions(Exception):
|
||||
"""Container for multiple exceptions encountered.
|
||||
|
||||
The API layer of Neutron will automatically unpack, translate,
|
||||
filter, and combine the inner exceptions in any exception derived
|
||||
from this class.
|
||||
"""
|
||||
|
||||
def __init__(self, exceptions, *args, **kwargs):
|
||||
super(MultipleExceptions, self).__init__(*args, **kwargs)
|
||||
self.inner_exceptions = exceptions
|
||||
|
@ -54,3 +54,9 @@ class TestCallbackExceptions(test_exceptions.TestExceptions):
|
||||
'''Test that correct message is created for this error class.'''
|
||||
error = ex.NotificationError('abc', 'boom')
|
||||
self.assertEqual('Callback abc failed with "boom"', str(error))
|
||||
|
||||
def test_inner_exceptions(self):
|
||||
key_err = KeyError()
|
||||
n_key_err = ex.NotificationError('cb1', key_err)
|
||||
err = ex.CallbackFailure([key_err, n_key_err])
|
||||
self.assertEqual([key_err, n_key_err.error], err.inner_exceptions)
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
import mock
|
||||
|
||||
from oslo_db import exception as db_exc
|
||||
from oslotest import base
|
||||
|
||||
from neutron_lib._callbacks import events
|
||||
@ -36,6 +37,10 @@ def callback_raise(*args, **kwargs):
|
||||
raise Exception()
|
||||
|
||||
|
||||
def callback_raise_retriable(*args, **kwargs):
|
||||
raise db_exc.DBDeadlock()
|
||||
|
||||
|
||||
class CallBacksManagerTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -80,6 +85,13 @@ class CallBacksManagerTestCase(base.BaseTestCase):
|
||||
2,
|
||||
len(self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]))
|
||||
|
||||
def test_unsubscribe_during_iteration(self):
|
||||
unsub = lambda r, e, *a, **k: self.manager.unsubscribe(unsub, r, e)
|
||||
self.manager.subscribe(unsub, resources.PORT,
|
||||
events.BEFORE_CREATE)
|
||||
self.manager.notify(resources.PORT, events.BEFORE_CREATE, mock.ANY)
|
||||
self.assertNotIn(unsub, self.manager._index)
|
||||
|
||||
def test_unsubscribe(self):
|
||||
self.manager.subscribe(
|
||||
callback_1, resources.PORT, events.BEFORE_CREATE)
|
||||
@ -169,6 +181,17 @@ class CallBacksManagerTestCase(base.BaseTestCase):
|
||||
]
|
||||
n.assert_has_calls(expected_calls)
|
||||
|
||||
def test_notify_with_precommit_exception(self):
|
||||
with mock.patch.object(self.manager, '_notify_loop') as n:
|
||||
n.return_value = ['error']
|
||||
self.assertRaises(exceptions.CallbackFailure,
|
||||
self.manager.notify,
|
||||
mock.ANY, events.PRECOMMIT_UPDATE, mock.ANY)
|
||||
expected_calls = [
|
||||
mock.call(mock.ANY, 'precommit_update', mock.ANY),
|
||||
]
|
||||
n.assert_has_calls(expected_calls)
|
||||
|
||||
def test_notify_handle_exception(self):
|
||||
self.manager.subscribe(
|
||||
callback_raise, resources.PORT, events.BEFORE_CREATE)
|
||||
@ -176,6 +199,12 @@ class CallBacksManagerTestCase(base.BaseTestCase):
|
||||
resources.PORT, events.BEFORE_CREATE, self)
|
||||
self.assertIsInstance(e.errors[0], exceptions.NotificationError)
|
||||
|
||||
def test_notify_handle_retriable_exception(self):
|
||||
self.manager.subscribe(
|
||||
callback_raise_retriable, resources.PORT, events.BEFORE_CREATE)
|
||||
self.assertRaises(db_exc.RetryRequest, self.manager.notify,
|
||||
resources.PORT, events.BEFORE_CREATE, self)
|
||||
|
||||
def test_notify_called_once_with_no_failures(self):
|
||||
with mock.patch.object(self.manager, '_notify_loop') as n:
|
||||
n.return_value = False
|
||||
|
@ -57,3 +57,10 @@ class TestCallbackRegistryDispatching(base.BaseTestCase):
|
||||
def test_clear(self):
|
||||
registry.clear()
|
||||
registry.CALLBACK_MANAGER.clear.assert_called_with()
|
||||
|
||||
def test_get_callback_manager(self):
|
||||
with mock.patch.object(registry.manager,
|
||||
'CallbacksManager') as mock_mgr:
|
||||
registry.CALLBACK_MANAGER = None
|
||||
registry._get_callback_manager()
|
||||
mock_mgr.assert_called_once_with()
|
||||
|
Loading…
Reference in New Issue
Block a user