Merge "callback: priority to specify calling order"

This commit is contained in:
Zuul 2018-04-05 23:27:25 +00:00 committed by Gerrit Code Review
commit f46d20bc7d
6 changed files with 173 additions and 28 deletions

View File

@ -11,12 +11,14 @@
# under the License.
import collections
import itertools
from oslo_log import log as logging
from oslo_utils import reflection
from neutron_lib.callbacks import events
from neutron_lib.callbacks import exceptions
from neutron_lib.callbacks import priority_group
from neutron_lib.db import utils as db_utils
LOG = logging.getLogger(__name__)
@ -28,7 +30,8 @@ class CallbacksManager(object):
def __init__(self):
self.clear()
def subscribe(self, callback, resource, event):
def subscribe(self, callback, resource, event,
priority=priority_group.PRIORITY_DEFAULT):
"""Subscribe callback for a resource event.
The same callback may register for more than one event.
@ -36,23 +39,40 @@ class CallbacksManager(object):
:param callback: the callback. It must raise or return a boolean.
:param resource: the resource. It must be a valid resource.
:param event: the event. It must be a valid event.
:param priority: the priority. Callbacks are sorted by priority
to be called. Smaller one is called earlier.
"""
LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s",
{'callback': callback, 'resource': resource, 'event': event})
LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s "
"%(priority)d",
{'callback': callback, 'resource': resource, 'event': event,
'priority': priority})
callback_id = _get_id(callback)
try:
self._callbacks[resource][event][callback_id] = callback
except KeyError:
# Initialize the registry for unknown resources and/or events
# prior to enlisting the callback.
self._callbacks[resource][event] = {}
self._callbacks[resource][event][callback_id] = callback
callbacks_list = self._callbacks[resource].setdefault(event, [])
for pc_pair in callbacks_list:
if pc_pair[0] == priority:
pri_callbacks = pc_pair[1]
break
else:
pri_callbacks = {}
callbacks_list.append((priority, pri_callbacks))
callbacks_list.sort(key=lambda x: x[0])
pri_callbacks[callback_id] = callback
# We keep a copy of callbacks to speed the unsubscribe operation.
if callback_id not in self._index:
self._index[callback_id] = collections.defaultdict(set)
self._index[callback_id][resource].add(event)
def _del_callback(self, callbacks_list, callback_id):
for pc_pair in callbacks_list:
pri_callbacks = pc_pair[1]
if callback_id in pri_callbacks:
del pri_callbacks[callback_id]
if not pri_callbacks:
callbacks_list.remove(pc_pair)
break
def unsubscribe(self, callback, resource, event):
"""Unsubscribe callback from the registry.
@ -68,7 +88,7 @@ class CallbacksManager(object):
LOG.debug("Callback %s not found", callback_id)
return
if resource and event:
del self._callbacks[resource][event][callback_id]
self._del_callback(self._callbacks[resource][event], callback_id)
self._index[callback_id][resource].discard(event)
if not self._index[callback_id][resource]:
del self._index[callback_id][resource]
@ -88,7 +108,8 @@ class CallbacksManager(object):
if callback_id:
if resource in self._index[callback_id]:
for event in self._index[callback_id][resource]:
del self._callbacks[resource][event][callback_id]
self._del_callback(self._callbacks[resource][event],
callback_id)
del self._index[callback_id][resource]
if not self._index[callback_id]:
del self._index[callback_id]
@ -103,7 +124,8 @@ class CallbacksManager(object):
if callback_id:
for resource, resource_events in self._index[callback_id].items():
for event in resource_events:
del self._callbacks[resource][event][callback_id]
self._del_callback(self._callbacks[resource][event],
callback_id)
del self._index[callback_id]
def publish(self, resource, event, trigger, payload=None):
@ -162,7 +184,11 @@ class CallbacksManager(object):
def _notify_loop(self, resource, event, trigger, **kwargs):
"""The notification loop."""
errors = []
callbacks = list(self._callbacks[resource].get(event, {}).items())
# NOTE(yamahata): Since callback may unsubscribe it,
# convert iterator to list to avoid runtime error.
callbacks = list(itertools.chain(
*[pri_callbacks.items() for (priority, pri_callbacks)
in self._callbacks[resource].get(event, [])]))
LOG.debug("Notify callbacks %s for %s, %s",
[c[0] for c in callbacks], resource, event)
# TODO(armax): consider using a GreenPile

View File

@ -0,0 +1,34 @@
# Copyright (c) 2018 Intel Corporation.
# Copyright (c) 2018 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(yamahata):smallest one is called first.
# we are using a big enough number for default that can
# be manipulated to increase or decrease the priorities.
# for all the callbacks, to reduce the priority division
# can be used for reducing that number for higher priority.
# each resources would want to define their own symbolic values for their use.
PRIORITY_DEFAULT = 55550000
# For l3 plugin and flavor
PRIORITY_ROUTER_EXTENDED_ATTRIBUTE = PRIORITY_DEFAULT - 100
# DEFAULT is reserved for third party which may not know priority yet
PRIORITY_ROUTER_DEFAULT = PRIORITY_DEFAULT
PRIORITY_ROUTER_CONTROLLER = PRIORITY_DEFAULT + 100
PRIORITY_ROUTER_DRIVER = PRIORITY_DEFAULT + 200

View File

@ -14,6 +14,7 @@ import collections
import inspect
from neutron_lib.callbacks import manager
from neutron_lib.callbacks import priority_group
# TODO(armax): consider adding locking
@ -31,8 +32,9 @@ def _get_callback_manager():
return _CALLBACK_MANAGER
def subscribe(callback, resource, event):
_get_callback_manager().subscribe(callback, resource, event)
def subscribe(callback, resource, event,
priority=priority_group.PRIORITY_DEFAULT):
_get_callback_manager().subscribe(callback, resource, event, priority)
def unsubscribe(callback, resource, event):
@ -61,7 +63,7 @@ def clear():
_get_callback_manager().clear()
def receives(resource, events):
def receives(resource, events, priority=priority_group.PRIORITY_DEFAULT):
"""Use to decorate methods on classes before initialization.
Any classes that use this must themselves be decorated with the
@ -72,7 +74,7 @@ def receives(resource, events):
def decorator(f):
for e in events:
_REGISTERED_CLASS_METHODS[f].append((resource, e))
_REGISTERED_CLASS_METHODS[f].append((resource, e, priority))
return f
return decorator
@ -112,9 +114,9 @@ def has_registry_receivers(klass):
func = getattr(unbound_method, 'im_func', unbound_method)
if func not in _REGISTERED_CLASS_METHODS:
continue
for resource, event in _REGISTERED_CLASS_METHODS[func]:
for resource, event, priority in _REGISTERED_CLASS_METHODS[func]:
# subscribe the bound method
subscribe(getattr(instance, name), resource, event)
subscribe(getattr(instance, name), resource, event, priority)
setattr(instance, '_DECORATED_METHODS_SUBSCRIBED', True)
return instance
klass.__new__ = replacement_new

View File

@ -20,9 +20,15 @@ from oslotest import base
from neutron_lib.callbacks import events
from neutron_lib.callbacks import exceptions
from neutron_lib.callbacks import manager
from neutron_lib.callbacks import priority_group
from neutron_lib.callbacks import resources
PRI_HIGH = 0
PRI_MED = 5000
PRI_LOW = 10000
class ObjectWithCallback(object):
def __init__(self):
@ -101,8 +107,12 @@ class CallBacksManagerTestCase(base.BaseTestCase):
callback_2, resources.PORT, events.BEFORE_CREATE)
self.assertEqual(2, len(self.manager._index))
self.assertEqual(
2,
1,
len(self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]))
self.assertEqual(
2,
len(self.manager._callbacks
[resources.PORT][events.BEFORE_CREATE][0][1]))
def test_unsubscribe_during_iteration(self):
unsub = lambda r, e, *a, **k: self.manager.unsubscribe(unsub, r, e)
@ -157,12 +167,17 @@ class CallBacksManagerTestCase(base.BaseTestCase):
self.manager.subscribe(
callback_2, resources.PORT, events.BEFORE_DELETE)
self.manager.unsubscribe_by_resource(callback_1, resources.PORT)
self.assertNotIn(
callback_id_1,
self.manager._callbacks[resources.PORT][events.BEFORE_CREATE])
self.assertEqual(
0,
len(self.manager._callbacks
[resources.PORT][events.BEFORE_CREATE]))
self.assertEqual(
1,
len(self.manager._callbacks[resources.PORT][events.BEFORE_DELETE]))
self.assertIn(
callback_id_2,
self.manager._callbacks[resources.PORT][events.BEFORE_DELETE])
self.manager._callbacks
[resources.PORT][events.BEFORE_DELETE][0][1])
self.assertNotIn(callback_id_1, self.manager._index)
def test_unsubscribe_all(self):
@ -266,6 +281,52 @@ class CallBacksManagerTestCase(base.BaseTestCase):
self.assertEqual(0, len(self.manager._callbacks))
self.assertEqual(0, len(self.manager._index))
def test_callback_priority(self):
pri_first = priority_group.PRIORITY_DEFAULT - 100
pri_last = priority_group.PRIORITY_DEFAULT + 100
# lowest priority value should be first in the _callbacks
self.manager.subscribe(callback_1, 'my-resource', 'my-event')
self.manager.subscribe(callback_2, 'my-resource',
'my-event', pri_last)
self.manager.subscribe(callback_3, 'my-resource',
'my-event', pri_first)
callbacks = self.manager._callbacks['my-resource']['my-event']
# callbacks should be sorted based on priority for resource and event
self.assertEqual(3, len(callbacks))
self.assertEqual(pri_first, callbacks[0][0])
self.assertEqual(priority_group.PRIORITY_DEFAULT, callbacks[1][0])
self.assertEqual(pri_last, callbacks[2][0])
@mock.patch('neutron_lib.callbacks.manager.CallbacksManager._del_callback')
def test_del_callback_called_on_unsubscribe(self, mock_cb):
self.manager.subscribe(callback_1, 'my-resource', 'my-event')
callback_id = self.manager._find(callback_1)
callbacks = self.manager._callbacks['my-resource']['my-event']
self.assertEqual(1, len(callbacks))
self.manager.unsubscribe(callback_1, 'my-resource', 'my-event')
mock_cb.assert_called_once_with(callbacks, callback_id)
@mock.patch("neutron_lib.callbacks.manager.LOG")
def test_callback_order(self, _logger):
self.manager.subscribe(callback_1, 'my-resource', 'my-event', PRI_MED)
self.manager.subscribe(callback_2, 'my-resource', 'my-event', PRI_HIGH)
self.manager.subscribe(callback_3, 'my-resource', 'my-event', PRI_LOW)
self.assertEqual(
3, len(self.manager._callbacks['my-resource']['my-event']))
self.manager.unsubscribe(callback_3, 'my-resource', 'my-event')
self.manager.notify('my-resource', 'my-event', mock.ANY)
# callback_3 should be deleted and not executed
self.assertEqual(
2, len(self.manager._callbacks['my-resource']['my-event']))
self.assertEqual(0, callback_3.counter)
# executed callbacks should have counter incremented
self.assertEqual(1, callback_2.counter)
self.assertEqual(1, callback_1.counter)
callback_ids = _logger.debug.mock_calls[4][1][1]
# callback_2 should be first in exceution as it has higher priority
self.assertEqual(callback_id_2, callback_ids[0])
self.assertEqual(callback_id_1, callback_ids[1])
@mock.patch("neutron_lib.callbacks.manager.LOG")
def test__notify_loop_skip_log_errors(self, _logger):
self.manager.subscribe(

View File

@ -18,10 +18,13 @@ import testtools
from oslotest import base
from neutron_lib.callbacks import events
from neutron_lib.callbacks import priority_group
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import fixture
PRI_CALLBACK = 20
@registry.has_registry_receivers
class ObjectWithDecoratedCallback(object):
@ -51,7 +54,7 @@ class AnotherObjectWithDecoratedCallback(ObjectWithDecoratedCallback,
super(AnotherObjectWithDecoratedCallback, self).__init__()
self.counter2 = 0
@registry.receives(resources.NETWORK, [events.AFTER_DELETE])
@registry.receives(resources.NETWORK, [events.AFTER_DELETE], PRI_CALLBACK)
def callback2(self, *args, **kwargs):
self.counter2 += 1
@ -89,10 +92,14 @@ class CallBacksManagerTestCase(base.BaseTestCase):
def test_object_inheriting_others_no_double_subscribe(self):
with mock.patch.object(registry, 'subscribe') as sub:
AnotherObjectWithDecoratedCallback()
callback = AnotherObjectWithDecoratedCallback()
# there are 3 methods (2 in parent and one in child) and 1
# subscribes to 2 events, so we expect 4 subscribes
priority_call = [mock.call(
callback.callback2,
resources.NETWORK, events.AFTER_DELETE, PRI_CALLBACK)]
self.assertEqual(4, len(sub.mock_calls))
sub.assert_has_calls(priority_call)
def test_new_inheritance_not_broken(self):
self.assertTrue(AnotherObjectWithDecoratedCallback().new_called)
@ -117,7 +124,14 @@ class TestCallbackRegistryDispatching(base.BaseTestCase):
def test_subscribe(self):
registry.subscribe(my_callback, 'my-resource', 'my-event')
self.callback_manager.subscribe.assert_called_with(
my_callback, 'my-resource', 'my-event')
my_callback, 'my-resource', 'my-event',
priority_group.PRIORITY_DEFAULT)
def test_subscribe_explicit_priority(self):
registry.subscribe(my_callback, 'my-resource', 'my-event',
PRI_CALLBACK)
self.callback_manager.subscribe.assert_called_with(
my_callback, 'my-resource', 'my-event', PRI_CALLBACK)
def test_unsubscribe(self):
registry.unsubscribe(my_callback, 'my-resource', 'my-event')

View File

@ -0,0 +1,8 @@
---
features:
- |
Introduced priority to callback subscription. An integer value can be
associated with each callback so that callbacks can be executed in
specified order for same resources and events. Every callback will have
priority value by default. To execute callbacks in specified order, priorities
should be defined explicitly, lower priority value would be executed first.