diff --git a/neutron_lib/callbacks/events.py b/neutron_lib/callbacks/events.py index 7f5d1ca0d..122798bb2 100644 --- a/neutron_lib/callbacks/events.py +++ b/neutron_lib/callbacks/events.py @@ -51,6 +51,12 @@ PRECOMMIT = 'precommit_' OVS_RESTARTED = 'ovs_restarted' +def is_cancellable_event(event): + """Return if an event is cancellable by definition""" + return (event.startswith(BEFORE) or + event.startswith(PRECOMMIT)) + + class EventPayload(object): """Base event payload object. diff --git a/neutron_lib/callbacks/exceptions.py b/neutron_lib/callbacks/exceptions.py index f2bea0d41..867f2e80e 100644 --- a/neutron_lib/callbacks/exceptions.py +++ b/neutron_lib/callbacks/exceptions.py @@ -50,9 +50,14 @@ class CallbackFailure(exceptions.MultipleExceptions): class NotificationError(object): - def __init__(self, callback_id, error): + def __init__(self, callback_id, error, cancellable=False): self.callback_id = callback_id self.error = error + self._cancellable = cancellable def __str__(self): return 'Callback %s failed with "%s"' % (self.callback_id, self.error) + + @property + def is_cancellable(self): + return self._cancellable diff --git a/neutron_lib/callbacks/manager.py b/neutron_lib/callbacks/manager.py index 5908f666b..ef40022a4 100644 --- a/neutron_lib/callbacks/manager.py +++ b/neutron_lib/callbacks/manager.py @@ -11,7 +11,6 @@ # under the License. import collections -import itertools from oslo_log import log as logging from oslo_utils import reflection @@ -22,6 +21,10 @@ from neutron_lib.callbacks import priority_group from neutron_lib.db import utils as db_utils LOG = logging.getLogger(__name__) +PriorityCallbacks = collections.namedtuple( + 'PriorityCallbacks', ['priority', 'pri_callbacks', 'cancellable']) +Callback = collections.namedtuple( + 'Callback', ['id', 'method', 'cancellable']) class CallbacksManager(object): @@ -31,7 +34,8 @@ class CallbacksManager(object): self.clear() def subscribe(self, callback, resource, event, - priority=priority_group.PRIORITY_DEFAULT): + priority=priority_group.PRIORITY_DEFAULT, + cancellable=False): """Subscribe callback for a resource event. The same callback may register for more than one event. @@ -41,22 +45,26 @@ class CallbacksManager(object): :param event: the event. It must be a valid event. :param priority: the priority. Callbacks are sorted by priority to be called. Smaller one is called earlier. + :param cancellable: if the callback is "cancellable", in case of + returning an exception, the callback manager will + raise a ``CallbackFailure`` exception. """ LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s " - "%(priority)d", + "%(priority)d, %(cancellable)s", {'callback': callback, 'resource': resource, 'event': event, - 'priority': priority}) + 'priority': priority, 'cancellable': cancellable}) callback_id = _get_id(callback) - callbacks_list = self._callbacks[resource].setdefault(event, []) - for pc_pair in callbacks_list: - if pc_pair[0] == priority: - pri_callbacks = pc_pair[1] + pri_callbacks_list = self._callbacks[resource].setdefault(event, []) + for pri_callbacks in pri_callbacks_list: + if pri_callbacks.priority == priority: + pri_callbacks = pri_callbacks.pri_callbacks break else: pri_callbacks = {} - callbacks_list.append((priority, pri_callbacks)) - callbacks_list.sort(key=lambda x: x[0]) + pri_callbacks_list.append( + PriorityCallbacks(priority, pri_callbacks, cancellable)) + pri_callbacks_list.sort(key=lambda x: x.priority) pri_callbacks[callback_id] = callback # We keep a copy of callbacks to speed the unsubscribe operation. @@ -64,13 +72,12 @@ class CallbacksManager(object): self._index[callback_id] = collections.defaultdict(set) self._index[callback_id][resource].add(event) - def _del_callback(self, callbacks_list, callback_id): - for pc_pair in callbacks_list: - pri_callbacks = pc_pair[1] - if callback_id in pri_callbacks: - del pri_callbacks[callback_id] - if not pri_callbacks: - callbacks_list.remove(pc_pair) + def _del_callback(self, pri_callbacks, callback_id): + for pri_callback in pri_callbacks: + if callback_id in pri_callback.pri_callbacks: + del pri_callback.pri_callbacks[callback_id] + if not pri_callback.pri_callbacks: + pri_callbacks.remove(pri_callback) break def unsubscribe(self, callback, resource, event): @@ -156,7 +163,8 @@ class CallbacksManager(object): raise exceptions.CallbackFailure(errors=errors) - if event.startswith(events.PRECOMMIT): + if (event.startswith(events.PRECOMMIT) or + any(error.is_cancellable for error in errors)): raise exceptions.CallbackFailure(errors=errors) def clear(self): @@ -167,32 +175,30 @@ class CallbacksManager(object): def _notify_loop(self, resource, event, trigger, payload): """The notification loop.""" errors = [] - # NOTE(yamahata): Since callback may unsubscribe it, - # convert iterator to list to avoid runtime error. - callbacks = list(itertools.chain( - *[pri_callbacks.items() for (priority, pri_callbacks) - in self._callbacks[resource].get(event, [])])) + callbacks = [] + for pri_callbacks in self._callbacks[resource].get(event, []): + for cb_id, cb_method in pri_callbacks.pri_callbacks.items(): + cb = Callback(cb_id, cb_method, pri_callbacks.cancellable) + callbacks.append(cb) resource_id = getattr(payload, "resource_id", None) LOG.debug("Publish callbacks %s for %s (%s), %s", - [c[0] for c in callbacks], resource, resource_id, event) + [c.id for c in callbacks], resource, resource_id, event) # TODO(armax): consider using a GreenPile - for callback_id, callback in callbacks: + for callback in callbacks: try: - callback(resource, event, trigger, payload=payload) + callback.method(resource, event, trigger, payload=payload) except Exception as e: - abortable_event = ( - event.startswith(events.BEFORE) or - event.startswith(events.PRECOMMIT) - ) - if not abortable_event: + if not (events.is_cancellable_event(event) or + callback.cancellable): LOG.exception("Error during notification for " "%(callback)s %(resource)s, %(event)s", - {'callback': callback_id, + {'callback': callback.id, 'resource': resource, 'event': event}) else: LOG.debug("Callback %(callback)s raised %(error)s", - {'callback': callback_id, 'error': e}) - errors.append(exceptions.NotificationError(callback_id, e)) + {'callback': callback.id, 'error': e}) + errors.append(exceptions.NotificationError( + callback.id, e, cancellable=callback.cancellable)) return errors def _find(self, callback): diff --git a/neutron_lib/callbacks/registry.py b/neutron_lib/callbacks/registry.py index 1863da6f6..a618a124c 100644 --- a/neutron_lib/callbacks/registry.py +++ b/neutron_lib/callbacks/registry.py @@ -34,8 +34,10 @@ def _get_callback_manager(): def subscribe(callback, resource, event, - priority=priority_group.PRIORITY_DEFAULT): - _get_callback_manager().subscribe(callback, resource, event, priority) + priority=priority_group.PRIORITY_DEFAULT, + cancellable=False): + _get_callback_manager().subscribe(callback, resource, event, priority, + cancellable) def unsubscribe(callback, resource, event): diff --git a/neutron_lib/tests/unit/callbacks/test_manager.py b/neutron_lib/tests/unit/callbacks/test_manager.py index 19229823b..3b70ca83b 100644 --- a/neutron_lib/tests/unit/callbacks/test_manager.py +++ b/neutron_lib/tests/unit/callbacks/test_manager.py @@ -14,6 +14,7 @@ from unittest import mock +import ddt from oslo_db import exception as db_exc from oslotest import base @@ -64,10 +65,15 @@ def callback_raise_retriable(*args, **kwargs): raise db_exc.DBDeadlock() +def callback_raise_not_retriable(*args, **kwargs): + raise Exception() + + def callback_3(resource, event, trigger, payload): callback_3.counter += 1 +@ddt.ddt class CallBacksManagerTestCase(base.BaseTestCase): def setUp(self): @@ -78,14 +84,19 @@ class CallBacksManagerTestCase(base.BaseTestCase): callback_2.counter = 0 callback_3.counter = 0 - def test_subscribe(self): + @ddt.data(True, False) + def test_subscribe(self, cancellable): self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) self.assertIsNotNone( self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]) self.assertIn(callback_id_1, self.manager._index) self.assertEqual(self.__module__ + '.callback_1-%s' % hash(callback_1), callback_id_1) + self.assertEqual(cancellable, + self.manager._callbacks[resources.PORT] + [events.BEFORE_CREATE][0][2]) def test_subscribe_unknown(self): self.manager.subscribe( @@ -95,13 +106,19 @@ class CallBacksManagerTestCase(base.BaseTestCase): self.assertIn(callback_id_1, self.manager._index) def test_subscribe_is_idempotent(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) + for cancellable in (True, False): + self.manager.subscribe( + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) + self.manager.subscribe( + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) self.assertEqual( 1, len(self.manager._callbacks[resources.PORT][events.BEFORE_CREATE])) + # The first event registered had cancellable=True. + self.assertTrue(self.manager._callbacks[resources.PORT] + [events.BEFORE_CREATE][0][2]) callbacks = self.manager._index[callback_id_1][resources.PORT] self.assertEqual(1, len(callbacks)) @@ -129,9 +146,11 @@ class CallBacksManagerTestCase(base.BaseTestCase): payload=self.event_payload) self.assertNotIn(unsub, self.manager._index) - def test_unsubscribe(self): + @ddt.data(True, False) + def test_unsubscribe(self, cancellable): self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) self.manager.unsubscribe( callback_1, resources.PORT, events.BEFORE_CREATE) self.assertNotIn( @@ -155,9 +174,11 @@ class CallBacksManagerTestCase(base.BaseTestCase): self.manager.unsubscribe, callback_1, None, events.BEFORE_CREATE) - def test_unsubscribe_is_idempotent(self): + @ddt.data(True, False) + def test_unsubscribe_is_idempotent(self, cancellable): self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) self.manager.unsubscribe( callback_1, resources.PORT, events.BEFORE_CREATE) self.manager.unsubscribe( @@ -256,6 +277,28 @@ class CallBacksManagerTestCase(base.BaseTestCase): resources.PORT, events.BEFORE_CREATE, self, payload=self.event_payload) + def test_publish_handle_not_retriable_exception(self): + self.manager.subscribe( + callback_raise_not_retriable, resources.PORT, events.BEFORE_CREATE) + self.assertRaises(exceptions.CallbackFailure, self.manager.publish, + resources.PORT, events.BEFORE_CREATE, self, + payload=self.event_payload) + + def test_publish_handle_not_retriable_exception_no_cancellable_flag(self): + self.manager.subscribe( + callback_raise_not_retriable, resources.PORT, events.AFTER_INIT) + # No exception is raised. + self.manager.publish(resources.PORT, events.AFTER_INIT, self, + payload=self.event_payload) + + def test_publish_handle_not_retriable_exception_cancellable_flag(self): + self.manager.subscribe( + callback_raise_not_retriable, resources.PORT, events.AFTER_INIT, + cancellable=True) + self.assertRaises(exceptions.CallbackFailure, self.manager.publish, + resources.PORT, events.AFTER_INIT, self, + payload=self.event_payload) + def test_publish_called_once_with_no_failures(self): with mock.patch.object(self.manager, '_notify_loop') as n: n.return_value = False diff --git a/neutron_lib/tests/unit/callbacks/test_registry.py b/neutron_lib/tests/unit/callbacks/test_registry.py index 8cce84499..5ede69754 100644 --- a/neutron_lib/tests/unit/callbacks/test_registry.py +++ b/neutron_lib/tests/unit/callbacks/test_registry.py @@ -132,13 +132,13 @@ class TestCallbackRegistryDispatching(base.BaseTestCase): registry.subscribe(my_callback, 'my-resource', 'my-event') self.callback_manager.subscribe.assert_called_with( my_callback, 'my-resource', 'my-event', - priority_group.PRIORITY_DEFAULT) + priority_group.PRIORITY_DEFAULT, False) def test_subscribe_explicit_priority(self): registry.subscribe(my_callback, 'my-resource', 'my-event', PRI_CALLBACK) self.callback_manager.subscribe.assert_called_with( - my_callback, 'my-resource', 'my-event', PRI_CALLBACK) + my_callback, 'my-resource', 'my-event', PRI_CALLBACK, False) def test_unsubscribe(self): registry.unsubscribe(my_callback, 'my-resource', 'my-event') diff --git a/releasenotes/notes/callbacksmanager-cancellable-events-966d76925db919a8.yaml b/releasenotes/notes/callbacksmanager-cancellable-events-966d76925db919a8.yaml new file mode 100644 index 000000000..91461f307 --- /dev/null +++ b/releasenotes/notes/callbacksmanager-cancellable-events-966d76925db919a8.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + ``CallbacksManager`` can now subscribe cancellable events. By default, + only ``before_`` and ``precommit_`` events, in case of error, can raise a + ``CallbackFailure`` exception. Now, if the event is subscribed with + the flag ``cancellable`` enabled, the ``publish`` method will raise this + exception if the callback fails and returns an error. diff --git a/test-requirements.txt b/test-requirements.txt index 036b8a912..652d30185 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,6 +6,7 @@ hacking>=3.0.1,<3.1.0 # Apache-2.0 bandit!=1.6.0,>=1.1.0 # Apache-2.0 coverage!=4.4,>=4.0 # Apache-2.0 +ddt>=1.0.1 # MIT fixtures>=3.0.0 # Apache-2.0/BSD flake8-import-order==0.12 # LGPLv3 pylint>=2.2.0 # GPLv2