Merge "Expose/Improve callback notification interface"
This commit is contained in:
commit
8b3a74c8b9
@ -74,6 +74,146 @@ Rather than keeping the conversation abstract, let us delve into some examples,
|
||||
help understand better some of the principles behind the provided mechanism.
|
||||
|
||||
|
||||
Event payloads
|
||||
--------------
|
||||
|
||||
The use of ``**kwargs`` for callback event payloads is deprecated (slated to be
|
||||
removed in 'Queens') in favor of standardized event payload objects as
|
||||
described herein.
|
||||
|
||||
The event payloads are defined in ``neutron_lib.callbacks.events`` and define a
|
||||
set of set of payload objects based on consumption pattern. The following event
|
||||
objects are defined today:
|
||||
|
||||
- ``EventPayload``: Base object for all other payloads and define the common set
|
||||
of attributes used by events. The ``EventPayload`` can also be used directly
|
||||
for basic payloads that don't need to transport additional values.
|
||||
- ``DBEventPayload``: Payloads pertaining to database callbacks. These objects
|
||||
capture both the pre and post state (among other things) for database
|
||||
changes.
|
||||
- ``APIEventPayload``: Payloads pertaining to API callbacks. These objects
|
||||
capture details relating to an API event; such as the method name and API
|
||||
action.
|
||||
|
||||
Each event object is described in greater detail in its own subsection below.
|
||||
|
||||
For backwards compatibility the callback registry and manager still provide
|
||||
the ``notify`` method for passing ``**kwargs``, but also provide the
|
||||
``publish`` method for passing an event object.
|
||||
|
||||
|
||||
Event objects: EventPayload
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The ``EventPayload`` object is the parent class of all other payload objects
|
||||
and defines the common set of attributes applicable to most events. For
|
||||
example, the ``EventPayload`` contains the ``context``, ``request_body``, etc.
|
||||
In addition, a ``metadata`` attribute is available to transport event data
|
||||
that's not yet standardized. While the ``metadata`` attribute is there for
|
||||
use, it should only be used in special cases like phasing in new payload
|
||||
attributes.
|
||||
|
||||
Payload objects also transport resource state via the ``states`` attribute.
|
||||
This collection of resource objects tracks the state changes for the respective
|
||||
resource related to the event. For example database changes might have a
|
||||
pre and post updated resource that's used as ``states``. Tracking states
|
||||
allows consumers to inspect the various changes in the resource and take
|
||||
action as needed; for example checking the pre and post object to determine
|
||||
the delta. State object types are event specific; API events may use python
|
||||
``dicts`` as state objects whereas database events use resource/OVO model objects.
|
||||
|
||||
Note that states as well as any other event payload attributes are not copied;
|
||||
subscribers obtain a direct reference to event payload objects (states,
|
||||
metadata, etc.) and should not be modified by subscribers.
|
||||
|
||||
|
||||
Event objects: DBEventPayload
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
For datastore/database events, ``DBEventPayload`` can be used as the payload
|
||||
event object. In addition to the attributes inherited from ``EventPayload``,
|
||||
database payloads also contain an additional ``desired_state``. The desired state
|
||||
is intended for use with pre create/commit scenarios where the publisher
|
||||
has a resource object (yet to be persisted) that's used in the event payload.
|
||||
|
||||
These event objects are suitable for the standard before/after database
|
||||
events we have today as well as any that might arise in the future.
|
||||
|
||||
Example usage::
|
||||
|
||||
# BEFORE_CREATE:
|
||||
DBEventPayload(context,
|
||||
request_body=params_of_create_request,
|
||||
resource_id=id_of_resource_if_avail,
|
||||
desired_state=db_resource_to_commit)
|
||||
|
||||
# AFTER_CREATE:
|
||||
DBEventPayload(context,
|
||||
request_body=params_of_create_request,
|
||||
states=[my_new_copy_after_create],
|
||||
resource_id=id_of_resource)
|
||||
|
||||
# PRECOMMIT_CREATE:
|
||||
DBEventPayload(context,
|
||||
request_body=params_of_create_request,
|
||||
resource_id=id_of_resource_if_avail,
|
||||
desired_state=db_resource_to_commit)
|
||||
|
||||
# BEFORE_DELETE:
|
||||
DBEventPayload(context,
|
||||
states=[resource_to_delete],
|
||||
resource_id=id_of_resource)
|
||||
|
||||
# AFTER_DELETE:
|
||||
DBEventPayload(context,
|
||||
states=[copy_of_deleted_resource],
|
||||
resource_id=id_of_resource)
|
||||
|
||||
# BEFORE_UPDATE:
|
||||
DBEventPayload(context,
|
||||
request_body=body_of_update_request,
|
||||
states=[original_db_resource],
|
||||
resource_id=id_of_resource
|
||||
desired_state=updated_db_resource_to_commit)
|
||||
|
||||
# AFTER_UPDATE:
|
||||
DBEventPayload(context,
|
||||
request_body=body_of_update_request,
|
||||
states=[original_db_resource, updated_db_resource],
|
||||
resource_id=id_of_resource)
|
||||
|
||||
|
||||
Event objects: APIEventPayload
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
For API related callbacks, the ``APIEventPayload`` object can be used to
|
||||
transport callback payloads. For example, the REST API resource controller can
|
||||
use API events for pre/post operation callbacks.
|
||||
|
||||
In addition to transporting all the attributes of ``EventPayload``, the
|
||||
``APIEventPayload`` object also includes the ``action``, ``method_name`` and
|
||||
``collection_name`` payload attributes permitting API components to
|
||||
pass along API controller specifics.
|
||||
|
||||
Sample usage::
|
||||
|
||||
# BEFORE_RESPONSE for create:
|
||||
APIEventPayload(context, notifier_method, action,
|
||||
request_body=req_body,
|
||||
states=[create_result],
|
||||
collection_name=self._collection_name)
|
||||
|
||||
# BEFORE_RESPONSE for delete:
|
||||
APIEventPayload(context, notifier_method, action,
|
||||
states=[copy_of_deleted_resource],
|
||||
collection_name=self._collection_name)
|
||||
|
||||
# BEFORE_RESPONSE for update:
|
||||
APIEventPayload(context, notifier_method, action,
|
||||
states=[original, updated],
|
||||
collection_name=self._collection_name)
|
||||
|
||||
|
||||
Subscribing to events
|
||||
---------------------
|
||||
|
||||
@ -115,13 +255,13 @@ In practical terms this scenario would be translated in the code below:
|
||||
from neutron_lib.callbacks import registry
|
||||
|
||||
|
||||
def callback1(resource, event, trigger, **kwargs):
|
||||
def callback1(resource, event, trigger, payload):
|
||||
print('Callback1 called by trigger: ', trigger)
|
||||
print('kwargs: ', kwargs)
|
||||
print('payload: ', payload)
|
||||
|
||||
def callback2(resource, event, trigger, **kwargs):
|
||||
def callback2(resource, event, trigger, payload):
|
||||
print('Callback2 called by trigger: ', trigger)
|
||||
print('kwargs: ', kwargs)
|
||||
print('payload: ', payload)
|
||||
|
||||
|
||||
# B and C express interest with I
|
||||
@ -132,8 +272,8 @@ In practical terms this scenario would be translated in the code below:
|
||||
|
||||
# A notifies
|
||||
def do_notify():
|
||||
kwargs = {'foo': 'bar'}
|
||||
registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs)
|
||||
registry.publish(resources.ROUTER, events.BEFORE_CREATE,
|
||||
do_notify, events.EventPayload(None))
|
||||
|
||||
|
||||
print('Notifying...')
|
||||
@ -147,9 +287,9 @@ The output is:
|
||||
> Subscribed
|
||||
> Notifying...
|
||||
> Callback2 called by trigger: <function do_notify at 0x7f2a5d663410>
|
||||
> kwargs: {'foo': 'bar'}
|
||||
> payload: <neutron_lib._callbacks.events.EventPayload object at 0x7ff9ed253510>
|
||||
> Callback1 called by trigger: <function do_notify at 0x7f2a5d663410>
|
||||
> kwargs: {'foo': 'bar'}
|
||||
> payload: <neutron_lib._callbacks.events.EventPayload object at 0x7ff9ed253510>
|
||||
|
||||
Thanks to the intermediary existence throughout the life of the system, A, B, and C
|
||||
are flexible to evolve their internals, dynamics, and lifecycles.
|
||||
@ -189,10 +329,10 @@ to abort events are ignored. The snippet below shows this in action:
|
||||
from neutron_lib.callbacks import registry
|
||||
|
||||
|
||||
def callback1(resource, event, trigger, **kwargs):
|
||||
def callback1(resource, event, trigger, payload=None):
|
||||
raise Exception('I am failing!')
|
||||
|
||||
def callback2(resource, event, trigger, **kwargs):
|
||||
def callback2(resource, event, trigger, payload=None):
|
||||
print('Callback2 called by %s on event %s' % (trigger, event))
|
||||
|
||||
|
||||
@ -203,15 +343,13 @@ to abort events are ignored. The snippet below shows this in action:
|
||||
|
||||
|
||||
def do_notify():
|
||||
kwargs = {'foo': 'bar'}
|
||||
registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs)
|
||||
|
||||
registry.publish(resources.ROUTER, events.BEFORE_CREATE, do_notify)
|
||||
|
||||
print('Notifying...')
|
||||
try:
|
||||
do_notify()
|
||||
except exceptions.CallbackFailure as e:
|
||||
print('Error: ', e)
|
||||
print("Error: %s" % e)
|
||||
|
||||
The output is:
|
||||
|
||||
@ -258,11 +396,11 @@ The snippet below shows these concepts in action:
|
||||
from neutron_lib.callbacks import registry
|
||||
|
||||
|
||||
def callback1(resource, event, trigger, **kwargs):
|
||||
def callback1(resource, event, trigger, payload=None):
|
||||
print('Callback1 called by %s on event %s for resource %s' % (trigger, event, resource))
|
||||
|
||||
|
||||
def callback2(resource, event, trigger, **kwargs):
|
||||
def callback2(resource, event, trigger, payload=None):
|
||||
print('Callback2 called by %s on event %s for resource %s' % (trigger, event, resource))
|
||||
|
||||
|
||||
@ -276,12 +414,11 @@ The snippet below shows these concepts in action:
|
||||
|
||||
def do_notify():
|
||||
print('Notifying...')
|
||||
kwargs = {'foo': 'bar'}
|
||||
registry.notify(resources.ROUTER, events.BEFORE_READ, do_notify, **kwargs)
|
||||
registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs)
|
||||
registry.notify(resources.ROUTER, events.AFTER_DELETE, do_notify, **kwargs)
|
||||
registry.notify(resources.PORT, events.BEFORE_UPDATE, do_notify, **kwargs)
|
||||
registry.notify(resources.ROUTER_GATEWAY, events.BEFORE_UPDATE, do_notify, **kwargs)
|
||||
registry.publish(resources.ROUTER, events.BEFORE_READ, do_notify)
|
||||
registry.publish(resources.ROUTER, events.BEFORE_CREATE, do_notify)
|
||||
registry.publish(resources.ROUTER, events.AFTER_DELETE, do_notify)
|
||||
registry.publish(resources.PORT, events.BEFORE_UPDATE, do_notify)
|
||||
registry.publish(resources.ROUTER_GATEWAY, events.BEFORE_UPDATE, do_notify)
|
||||
|
||||
|
||||
do_notify()
|
||||
@ -319,6 +456,23 @@ The output is:
|
||||
Notifying...
|
||||
|
||||
|
||||
Testing with callbacks
|
||||
----------------------
|
||||
|
||||
A python `fixture <https://pypi.python.org/pypi/fixtures>`_ is provided for implementations that need to
|
||||
unit test and mock the callback registry. This can be used for example, when your code publishes callback events
|
||||
that you need to verify. Consumers can use ``neutron_lib.tests.unit.callbacks.base.CallbackRegistryFixture``
|
||||
in their unit test classes with the ``useFixture()`` method passing along a ``CallbackRegistryFixture`` instance.
|
||||
If mocking of the actual singleton callback manager is necessary, consumers can pass a value to
|
||||
with the ``callback_manager`` kwarg. For example::
|
||||
|
||||
def setUp(self):
|
||||
super(MyTestClass, self).setUp()
|
||||
self.registry_fixture = callback_base.CallbackRegistryFixture()
|
||||
self.useFixture(self.registry_fixture)
|
||||
# each test now uses an isolated callback manager
|
||||
|
||||
|
||||
FAQ
|
||||
---
|
||||
|
||||
@ -377,17 +531,17 @@ What kind of function can be a callback?
|
||||
from neutron_lib.callbacks import registry
|
||||
|
||||
|
||||
def callback1(resource, event, trigger, **kwargs):
|
||||
def callback1(resource, event, trigger, payload):
|
||||
print('module callback')
|
||||
|
||||
|
||||
class MyCallback(object):
|
||||
|
||||
def callback2(self, resource, event, trigger, **kwargs):
|
||||
def callback2(self, resource, event, trigger, payload):
|
||||
print('object callback')
|
||||
|
||||
@classmethod
|
||||
def callback3(cls, resource, event, trigger, **kwargs):
|
||||
def callback3(cls, resource, event, trigger, payload):
|
||||
print('class callback')
|
||||
|
||||
|
||||
@ -397,13 +551,13 @@ What kind of function can be a callback?
|
||||
registry.subscribe(MyCallback.callback3, resources.ROUTER, events.BEFORE_CREATE)
|
||||
|
||||
def do_notify():
|
||||
def nested_subscribe(resource, event, trigger, **kwargs):
|
||||
def nested_subscribe(resource, event, trigger, payload):
|
||||
print('nested callback')
|
||||
|
||||
registry.subscribe(nested_subscribe, resources.ROUTER, events.BEFORE_CREATE)
|
||||
|
||||
kwargs = {'foo': 'bar'}
|
||||
registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs)
|
||||
registry.publish(resources.ROUTER, events.BEFORE_CREATE,
|
||||
do_notify, events.EventPayload(None))
|
||||
|
||||
|
||||
print('Notifying...')
|
||||
|
@ -1,45 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# String literals representing events associated to data store operations
|
||||
BEFORE_CREATE = 'before_create'
|
||||
BEFORE_READ = 'before_read'
|
||||
BEFORE_UPDATE = 'before_update'
|
||||
BEFORE_DELETE = 'before_delete'
|
||||
|
||||
PRECOMMIT_CREATE = 'precommit_create'
|
||||
PRECOMMIT_UPDATE = 'precommit_update'
|
||||
PRECOMMIT_DELETE = 'precommit_delete'
|
||||
|
||||
AFTER_CREATE = 'after_create'
|
||||
AFTER_READ = 'after_read'
|
||||
AFTER_UPDATE = 'after_update'
|
||||
AFTER_DELETE = 'after_delete'
|
||||
|
||||
# String literals representing events associated to API operations
|
||||
BEFORE_RESPONSE = 'before_response'
|
||||
AFTER_REQUEST = 'after_request'
|
||||
|
||||
# String literals representing events associated to process operations
|
||||
BEFORE_INIT = 'before_init'
|
||||
BEFORE_SPAWN = 'before_spawn' # sent per process
|
||||
AFTER_INIT = 'after_init' # sent per worker
|
||||
|
||||
# String literals representing events associated to error conditions
|
||||
ABORT_CREATE = 'abort_create'
|
||||
ABORT_READ = 'abort_read'
|
||||
ABORT_UPDATE = 'abort_update'
|
||||
ABORT_DELETE = 'abort_delete'
|
||||
|
||||
ABORT = 'abort_'
|
||||
BEFORE = 'before_'
|
||||
PRECOMMIT = 'precommit_'
|
153
neutron_lib/callbacks/events.py
Normal file
153
neutron_lib/callbacks/events.py
Normal file
@ -0,0 +1,153 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# String literals representing events associated to data store operations
|
||||
BEFORE_CREATE = 'before_create'
|
||||
BEFORE_READ = 'before_read'
|
||||
BEFORE_UPDATE = 'before_update'
|
||||
BEFORE_DELETE = 'before_delete'
|
||||
|
||||
PRECOMMIT_CREATE = 'precommit_create'
|
||||
PRECOMMIT_UPDATE = 'precommit_update'
|
||||
PRECOMMIT_DELETE = 'precommit_delete'
|
||||
|
||||
AFTER_CREATE = 'after_create'
|
||||
AFTER_READ = 'after_read'
|
||||
AFTER_UPDATE = 'after_update'
|
||||
AFTER_DELETE = 'after_delete'
|
||||
|
||||
# String literals representing events associated to API operations
|
||||
BEFORE_RESPONSE = 'before_response'
|
||||
AFTER_REQUEST = 'after_request'
|
||||
|
||||
# String literals representing events associated to process operations
|
||||
BEFORE_INIT = 'before_init'
|
||||
BEFORE_SPAWN = 'before_spawn' # sent per process
|
||||
AFTER_INIT = 'after_init' # sent per worker
|
||||
|
||||
# String literals representing events associated to error conditions
|
||||
ABORT_CREATE = 'abort_create'
|
||||
ABORT_READ = 'abort_read'
|
||||
ABORT_UPDATE = 'abort_update'
|
||||
ABORT_DELETE = 'abort_delete'
|
||||
|
||||
ABORT = 'abort_'
|
||||
BEFORE = 'before_'
|
||||
PRECOMMIT = 'precommit_'
|
||||
|
||||
|
||||
class EventPayload(object):
|
||||
"""Base event payload object.
|
||||
|
||||
This class is intended to be the super class for all event payloads. As
|
||||
such, it defines common attributes many events are likely to use in their
|
||||
payload. Note that event attributes are passed by reference; no copying
|
||||
of states, metadata or request_body is performed and thus consumers should
|
||||
not modify payload references.
|
||||
|
||||
For more information, see the callbacks dev-ref documentation for this
|
||||
project.
|
||||
"""
|
||||
|
||||
def __init__(self, context, metadata=None, request_body=None,
|
||||
states=None, resource_id=None):
|
||||
# the event context
|
||||
self.context = context
|
||||
|
||||
# NOTE(boden): longer term we should consider removing metadata
|
||||
# optional 'unstructured' (key,value) pairs for special needs
|
||||
self.metadata = metadata if metadata else {}
|
||||
|
||||
# the request body associated to the resource
|
||||
self.request_body = request_body
|
||||
|
||||
# an iterable of states for the resource from the newest to the oldest
|
||||
# for example db states or api request/response
|
||||
# the actual object type for states will vary depending on event caller
|
||||
self.states = states if states else []
|
||||
|
||||
# a unique ID for the event resource; may be None if the resource
|
||||
# isn't created yet
|
||||
self.resource_id = resource_id
|
||||
|
||||
@property
|
||||
def has_states(self):
|
||||
"""Determines if this event payload has any states.
|
||||
|
||||
:returns: True if this event payload has states, otherwise False.
|
||||
"""
|
||||
return len(self.states) > 0
|
||||
|
||||
@property
|
||||
def latest_state(self):
|
||||
"""Returns the latest state for the event payload.
|
||||
|
||||
:returns: The last state of this event payload if has_state else None.
|
||||
"""
|
||||
return self.states[-1] if self.has_states else None
|
||||
|
||||
|
||||
class DBEventPayload(EventPayload):
|
||||
"""The payload for data store events payloads."""
|
||||
|
||||
def __init__(self, context, metadata=None, request_body=None,
|
||||
states=None, resource_id=None, desired_state=None):
|
||||
|
||||
super(DBEventPayload, self).__init__(
|
||||
context, metadata=metadata, request_body=request_body,
|
||||
states=states, resource_id=resource_id)
|
||||
|
||||
# the model object to be persisted in pre create/commit payloads
|
||||
self.desired_state = desired_state
|
||||
|
||||
@property
|
||||
def is_persisted(self):
|
||||
"""Determine if the resource for this event payload is persisted.
|
||||
|
||||
:returns: True if this payload's resource is persisted, otherwise
|
||||
False.
|
||||
"""
|
||||
return self.resource_id is not None and self.has_states
|
||||
|
||||
@property
|
||||
def is_to_be_committed(self):
|
||||
""""Determine if the event payload resource is to be committed.
|
||||
|
||||
:returns: True if the desired state has been populated, else False.
|
||||
"""
|
||||
return self.desired_state is not None
|
||||
|
||||
@property
|
||||
def latest_state(self):
|
||||
"""Returns the latest state for the event payload resource.
|
||||
|
||||
:returns: If this payload has a desired_state its returned, otherwise
|
||||
latest_state is returned.
|
||||
"""
|
||||
return (self.desired_state or
|
||||
super(DBEventPayload, self).latest_state)
|
||||
|
||||
|
||||
class APIEventPayload(EventPayload):
|
||||
"""The payload for API events."""
|
||||
|
||||
def __init__(self, context, method_name, action,
|
||||
metadata=None, request_body=None, states=None,
|
||||
resource_id=None, collection_name=None):
|
||||
|
||||
super(APIEventPayload, self).__init__(
|
||||
context, metadata=metadata, request_body=request_body,
|
||||
states=states, resource_id=resource_id)
|
||||
|
||||
self.method_name = method_name
|
||||
self.action = action
|
||||
self.collection_name = collection_name
|
@ -15,9 +15,9 @@ import collections
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import reflection
|
||||
|
||||
from neutron_lib._callbacks import events
|
||||
from neutron_lib._callbacks import exceptions
|
||||
from neutron_lib._i18n import _LE
|
||||
from neutron_lib.callbacks import events
|
||||
from neutron_lib.callbacks import exceptions
|
||||
from neutron_lib.db import utils as db_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -107,15 +107,43 @@ class CallbacksManager(object):
|
||||
del self._callbacks[resource][event][callback_id]
|
||||
del self._index[callback_id]
|
||||
|
||||
def publish(self, resource, event, trigger, payload=None):
|
||||
"""Notify all subscribed callback(s) with a payload.
|
||||
|
||||
Dispatch the resource's event to the subscribed callbacks.
|
||||
|
||||
:param resource: The resource for the event.
|
||||
:param event: The event.
|
||||
:param trigger: The trigger. A reference to the sender of the event.
|
||||
:param payload: The optional event object to send to subscribers. If
|
||||
passed this must be an instance of BaseEvent.
|
||||
:raises Invalid, CallbackFailure: The Invalid exception is raised if
|
||||
the payload object is not an instance of BaseEvent. CallbackFailure
|
||||
is raise if the underlying callback has errors.
|
||||
"""
|
||||
kwargs = {}
|
||||
if payload:
|
||||
if not isinstance(payload, events.EventPayload):
|
||||
raise exceptions.Invalid(element='event payload',
|
||||
value=type(payload))
|
||||
kwargs['payload'] = payload
|
||||
return self.notify(resource, event, trigger, **kwargs)
|
||||
|
||||
# NOTE(boden): We plan to deprecate the usage of this method and **kwargs
|
||||
# as the payload in Queens, but no warning here to avoid log flooding
|
||||
@db_utils.reraise_as_retryrequest
|
||||
def notify(self, resource, event, trigger, **kwargs):
|
||||
"""Notify all subscribed callback(s).
|
||||
|
||||
Dispatch the resource's event to the subscribed callbacks.
|
||||
|
||||
:param resource: the resource.
|
||||
:param event: the event.
|
||||
:param trigger: the trigger. A reference to the sender of the event.
|
||||
:param resource: The resource for the event.
|
||||
:param event: The event.
|
||||
:param trigger: The trigger. A reference to the sender of the event.
|
||||
:param kwargs: (deprecated) Unstructured key/value pairs to invoke
|
||||
the callback with. Using event objects with publish() is preferred.
|
||||
:raises CallbackFailure: CallbackFailure is raised if the underlying
|
||||
callback has errors.
|
||||
"""
|
||||
errors = self._notify_loop(resource, event, trigger, **kwargs)
|
||||
if errors:
|
@ -10,18 +10,18 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron_lib._callbacks import manager
|
||||
from neutron_lib.callbacks import manager
|
||||
|
||||
|
||||
# TODO(armax): consider adding locking
|
||||
CALLBACK_MANAGER = None
|
||||
_CALLBACK_MANAGER = None
|
||||
|
||||
|
||||
def _get_callback_manager():
|
||||
global CALLBACK_MANAGER
|
||||
if CALLBACK_MANAGER is None:
|
||||
CALLBACK_MANAGER = manager.CallbacksManager()
|
||||
return CALLBACK_MANAGER
|
||||
global _CALLBACK_MANAGER
|
||||
if _CALLBACK_MANAGER is None:
|
||||
_CALLBACK_MANAGER = manager.CallbacksManager()
|
||||
return _CALLBACK_MANAGER
|
||||
|
||||
|
||||
def subscribe(callback, resource, event):
|
||||
@ -40,9 +40,15 @@ def unsubscribe_all(callback):
|
||||
_get_callback_manager().unsubscribe_all(callback)
|
||||
|
||||
|
||||
# NOTE(boden): This method is deprecated in favor of publish() and will be
|
||||
# removed in Queens, but not deprecation message to reduce log flooding
|
||||
def notify(resource, event, trigger, **kwargs):
|
||||
_get_callback_manager().notify(resource, event, trigger, **kwargs)
|
||||
|
||||
|
||||
def publish(resource, event, trigger, payload=None):
|
||||
_get_callback_manager().publish(resource, event, trigger, payload=payload)
|
||||
|
||||
|
||||
def clear():
|
||||
_get_callback_manager().clear()
|
@ -13,7 +13,7 @@
|
||||
# String literals representing core resources.
|
||||
AGENT = 'agent'
|
||||
EXTERNAL_NETWORK = 'external_network'
|
||||
FLOATING_IP = 'floating_ip'
|
||||
FLOATING_IP = 'floatingip'
|
||||
NETWORK = 'network'
|
||||
NETWORKS = 'networks'
|
||||
PORT = 'port'
|
@ -12,6 +12,8 @@
|
||||
|
||||
import fixtures
|
||||
|
||||
from neutron_lib.callbacks import manager
|
||||
from neutron_lib.callbacks import registry
|
||||
from neutron_lib.plugins import directory
|
||||
|
||||
|
||||
@ -29,3 +31,31 @@ class PluginDirectoryFixture(fixtures.Fixture):
|
||||
|
||||
def _restore(self):
|
||||
directory._PLUGIN_DIRECTORY = self._orig_directory
|
||||
|
||||
|
||||
class CallbackRegistryFixture(fixtures.Fixture):
|
||||
"""Callback registry fixture.
|
||||
|
||||
This class is intended to be used as a fixture within unit tests and
|
||||
therefore consumers must register it using useFixture() within their
|
||||
unit test class. The implementation optionally allows consumers to pass
|
||||
in the CallbacksManager manager to use for your tests.
|
||||
"""
|
||||
|
||||
def __init__(self, callback_manager=None):
|
||||
"""Creates a new RegistryFixture.
|
||||
|
||||
:param callback_manager: If specified, the return value to use for
|
||||
_get_callback_manager(). Otherwise a new instance of CallbacksManager
|
||||
is used.
|
||||
"""
|
||||
super(CallbackRegistryFixture, self).__init__()
|
||||
self.callback_manager = callback_manager or manager.CallbacksManager()
|
||||
|
||||
def _setUp(self):
|
||||
self._orig_manager = registry._CALLBACK_MANAGER
|
||||
registry._CALLBACK_MANAGER = self.callback_manager
|
||||
self.addCleanup(self._restore)
|
||||
|
||||
def _restore(self):
|
||||
registry._CALLBACK_MANAGER = self._orig_manager
|
||||
|
@ -19,7 +19,7 @@ Tests for `neutron_lib.callback.exceptions` module.
|
||||
|
||||
import functools
|
||||
|
||||
import neutron_lib._callbacks.exceptions as ex
|
||||
import neutron_lib.callbacks.exceptions as ex
|
||||
from neutron_lib.tests.unit import test_exceptions
|
||||
|
||||
|
||||
|
116
neutron_lib/tests/unit/callbacks/test_events.py
Normal file
116
neutron_lib/tests/unit/callbacks/test_events.py
Normal file
@ -0,0 +1,116 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron_lib.callbacks import events
|
||||
from oslotest import base
|
||||
|
||||
|
||||
class EventPayloadTestCase(base.BaseTestCase):
|
||||
|
||||
def test_context(self):
|
||||
e = events.EventPayload(mock.ANY)
|
||||
self.assertEqual(mock.ANY, e.context)
|
||||
|
||||
def test_metadata(self):
|
||||
meta = {'k1': 'v1', 'k2': mock.ANY}
|
||||
e = events.EventPayload(mock.ANY, metadata=meta)
|
||||
self.assertEqual(meta, e.metadata)
|
||||
event_meta = e.metadata
|
||||
event_meta['k3'] = 'v3'
|
||||
self.assertTrue('k3' in e.metadata)
|
||||
|
||||
def test_request_body(self):
|
||||
e = events.EventPayload(mock.ANY, request_body={'k', 'v'})
|
||||
self.assertEqual({'k', 'v'}, e.request_body)
|
||||
|
||||
def test_states(self):
|
||||
e = events.EventPayload(mock.ANY, states=['s1', 's2'])
|
||||
self.assertEqual(['s1', 's2'], e.states)
|
||||
e.states.append('state')
|
||||
self.assertTrue('state' in e.states)
|
||||
|
||||
def test_resource_id(self):
|
||||
e = events.EventPayload(mock.ANY, resource_id='id1')
|
||||
self.assertEqual('id1', e.resource_id)
|
||||
|
||||
def test_has_no_states(self):
|
||||
e = events.EventPayload(mock.ANY)
|
||||
self.assertFalse(e.has_states)
|
||||
|
||||
def test_has_states(self):
|
||||
e = events.EventPayload(mock.ANY, states=['s1'])
|
||||
self.assertTrue(e.has_states)
|
||||
|
||||
def test_latest_state_with_states(self):
|
||||
body = object()
|
||||
states = [object(), object()]
|
||||
e = events.EventPayload(mock.ANY, request_body=body, states=states)
|
||||
self.assertEqual(states[-1], e.latest_state)
|
||||
|
||||
def test_latest_state_without_states(self):
|
||||
body = object()
|
||||
e = events.EventPayload(mock.ANY, request_body=body)
|
||||
self.assertIsNone(e.latest_state)
|
||||
|
||||
|
||||
class DataStoreEventPayloadTestCase(base.BaseTestCase):
|
||||
|
||||
def test_states(self):
|
||||
e = events.DBEventPayload(mock.ANY, states=['s1'])
|
||||
self.assertEqual(['s1'], e.states)
|
||||
|
||||
def test_desired_state(self):
|
||||
desired_state = {'k': object()}
|
||||
e = events.DBEventPayload(mock.ANY, desired_state=desired_state)
|
||||
self.assertEqual(desired_state, e.desired_state)
|
||||
desired_state['a'] = 'A'
|
||||
self.assertEqual(desired_state, e.desired_state)
|
||||
|
||||
def test_is_not_persisted(self):
|
||||
e = events.DBEventPayload(mock.ANY, states=['s1'])
|
||||
self.assertFalse(e.is_persisted)
|
||||
e = events.DBEventPayload(mock.ANY, resource_id='1a')
|
||||
self.assertFalse(e.is_persisted)
|
||||
|
||||
def test_is_persisted(self):
|
||||
e = events.DBEventPayload(mock.ANY, states=['s1'],
|
||||
resource_id='1a')
|
||||
self.assertTrue(e.is_persisted)
|
||||
|
||||
def test_is_not_to_be_committed(self):
|
||||
e = events.DBEventPayload(mock.ANY, states=['s1'],
|
||||
resource_id='1a')
|
||||
self.assertFalse(e.is_to_be_committed)
|
||||
|
||||
def test_is_to_be_committed(self):
|
||||
e = events.DBEventPayload(mock.ANY, states=[mock.ANY],
|
||||
resource_id='1a', desired_state=object())
|
||||
self.assertTrue(e.is_to_be_committed)
|
||||
|
||||
def test_latest_state_with_desired_state(self):
|
||||
desired_state = object()
|
||||
e = events.DBEventPayload(mock.ANY, states=[object()],
|
||||
desired_state=desired_state)
|
||||
self.assertEqual(desired_state, e.latest_state)
|
||||
|
||||
|
||||
class APIEventPayloadTestCase(base.BaseTestCase):
|
||||
|
||||
def test_action(self):
|
||||
e = events.APIEventPayload(mock.ANY, 'post.end', 'POST')
|
||||
self.assertEqual('POST', e.action)
|
||||
|
||||
def test_method_name(self):
|
||||
e = events.APIEventPayload(mock.ANY, 'post.end', 'POST')
|
||||
self.assertEqual('post.end', e.method_name)
|
@ -17,10 +17,10 @@ import mock
|
||||
from oslo_db import exception as db_exc
|
||||
from oslotest import base
|
||||
|
||||
from neutron_lib._callbacks import events
|
||||
from neutron_lib._callbacks import exceptions
|
||||
from neutron_lib._callbacks import manager
|
||||
from neutron_lib._callbacks import resources
|
||||
from neutron_lib.callbacks import events
|
||||
from neutron_lib.callbacks import exceptions
|
||||
from neutron_lib.callbacks import manager
|
||||
from neutron_lib.callbacks import resources
|
||||
|
||||
|
||||
class ObjectWithCallback(object):
|
||||
@ -54,6 +54,10 @@ def callback_raise_retriable(*args, **kwargs):
|
||||
raise db_exc.DBDeadlock()
|
||||
|
||||
|
||||
def callback_3(resource, event, trigger, payload):
|
||||
callback_3.counter += 1
|
||||
|
||||
|
||||
class CallBacksManagerTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -61,6 +65,7 @@ class CallBacksManagerTestCase(base.BaseTestCase):
|
||||
self.manager = manager.CallbacksManager()
|
||||
callback_1.counter = 0
|
||||
callback_2.counter = 0
|
||||
callback_3.counter = 0
|
||||
|
||||
def test_subscribe(self):
|
||||
self.manager.subscribe(
|
||||
@ -261,7 +266,7 @@ class CallBacksManagerTestCase(base.BaseTestCase):
|
||||
self.assertEqual(0, len(self.manager._callbacks))
|
||||
self.assertEqual(0, len(self.manager._index))
|
||||
|
||||
@mock.patch("neutron_lib._callbacks.manager.LOG")
|
||||
@mock.patch("neutron_lib.callbacks.manager.LOG")
|
||||
def test__notify_loop_skip_log_errors(self, _logger):
|
||||
self.manager.subscribe(
|
||||
callback_raise, resources.PORT, events.BEFORE_CREATE)
|
||||
@ -289,3 +294,29 @@ class CallBacksManagerTestCase(base.BaseTestCase):
|
||||
self.assertEqual(1, a.counter)
|
||||
self.assertEqual(1, b.counter)
|
||||
self.assertEqual(1, c.counter)
|
||||
|
||||
def test_publish_invalid_payload(self):
|
||||
self.assertRaises(exceptions.Invalid, self.manager.publish,
|
||||
resources.PORT, events.AFTER_DELETE, self,
|
||||
payload=object())
|
||||
|
||||
def test_publish_empty_payload(self):
|
||||
notify_payload = []
|
||||
|
||||
def _memo(resource, event, trigger, payload=None):
|
||||
notify_payload.append(payload)
|
||||
|
||||
self.manager.subscribe(_memo, 'x', 'y')
|
||||
self.manager.publish('x', 'y', self)
|
||||
self.assertIsNone(notify_payload[0])
|
||||
|
||||
def test_publish_payload(self):
|
||||
notify_payload = []
|
||||
|
||||
def _memo(resource, event, trigger, payload=None):
|
||||
notify_payload.append(payload)
|
||||
|
||||
self.manager.subscribe(_memo, 'x', 'y')
|
||||
payload = events.EventPayload(object())
|
||||
self.manager.publish('x', 'y', self, payload=payload)
|
||||
self.assertEqual(payload, notify_payload[0])
|
||||
|
@ -16,7 +16,9 @@ import mock
|
||||
|
||||
from oslotest import base
|
||||
|
||||
from neutron_lib._callbacks import registry
|
||||
from neutron_lib.callbacks import events
|
||||
from neutron_lib.callbacks import registry
|
||||
from neutron_lib import fixture
|
||||
|
||||
|
||||
def my_callback():
|
||||
@ -27,40 +29,42 @@ class TestCallbackRegistryDispatching(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCallbackRegistryDispatching, self).setUp()
|
||||
registry.CALLBACK_MANAGER = mock.Mock()
|
||||
self.callback_manager = mock.Mock()
|
||||
self.registry_fixture = fixture.CallbackRegistryFixture(
|
||||
callback_manager=self.callback_manager)
|
||||
self.useFixture(self.registry_fixture)
|
||||
|
||||
def test_subscribe(self):
|
||||
registry.subscribe(my_callback, 'my-resource', 'my-event')
|
||||
registry.CALLBACK_MANAGER.subscribe.assert_called_with(
|
||||
self.callback_manager.subscribe.assert_called_with(
|
||||
my_callback, 'my-resource', 'my-event')
|
||||
|
||||
def test_unsubscribe(self):
|
||||
registry.unsubscribe(my_callback, 'my-resource', 'my-event')
|
||||
registry.CALLBACK_MANAGER.unsubscribe.assert_called_with(
|
||||
self.callback_manager.unsubscribe.assert_called_with(
|
||||
my_callback, 'my-resource', 'my-event')
|
||||
|
||||
def test_unsubscribe_by_resource(self):
|
||||
registry.unsubscribe_by_resource(my_callback, 'my-resource')
|
||||
registry.CALLBACK_MANAGER.unsubscribe_by_resource.assert_called_with(
|
||||
self.callback_manager.unsubscribe_by_resource.assert_called_with(
|
||||
my_callback, 'my-resource')
|
||||
|
||||
def test_unsubscribe_all(self):
|
||||
registry.unsubscribe_all(my_callback)
|
||||
registry.CALLBACK_MANAGER.unsubscribe_all.assert_called_with(
|
||||
self.callback_manager.unsubscribe_all.assert_called_with(
|
||||
my_callback)
|
||||
|
||||
def test_notify(self):
|
||||
registry.notify('my-resource', 'my-event', mock.ANY)
|
||||
registry.CALLBACK_MANAGER.notify.assert_called_with(
|
||||
self.callback_manager.notify.assert_called_with(
|
||||
'my-resource', 'my-event', mock.ANY)
|
||||
|
||||
def test_clear(self):
|
||||
registry.clear()
|
||||
registry.CALLBACK_MANAGER.clear.assert_called_with()
|
||||
self.callback_manager.clear.assert_called_with()
|
||||
|
||||
def test_get_callback_manager(self):
|
||||
with mock.patch.object(registry.manager,
|
||||
'CallbacksManager') as mock_mgr:
|
||||
registry.CALLBACK_MANAGER = None
|
||||
registry._get_callback_manager()
|
||||
mock_mgr.assert_called_once_with()
|
||||
def test_publish_payload(self):
|
||||
event_payload = events.EventPayload(mock.ANY)
|
||||
registry.publish('x', 'y', self, payload=event_payload)
|
||||
self.callback_manager.publish.assert_called_with(
|
||||
'x', 'y', self, payload=event_payload)
|
||||
|
@ -14,6 +14,7 @@ import mock
|
||||
|
||||
from oslotest import base
|
||||
|
||||
from neutron_lib.callbacks import registry
|
||||
from neutron_lib import fixture
|
||||
from neutron_lib.plugins import directory
|
||||
|
||||
@ -29,3 +30,16 @@ class PluginDirectoryFixtureTestCase(base.BaseTestCase):
|
||||
def test_fixture(self):
|
||||
directory.add_plugin('foo', 'foo')
|
||||
self.assertTrue(self.directory.add_plugin.called)
|
||||
|
||||
|
||||
class CallbackRegistryFixtureTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(CallbackRegistryFixtureTestCase, self).setUp()
|
||||
self.manager = mock.Mock()
|
||||
self.useFixture(fixture.CallbackRegistryFixture(
|
||||
callback_manager=self.manager))
|
||||
|
||||
def test_fixture(self):
|
||||
registry.notify('a', 'b', self)
|
||||
self.assertTrue(self.manager.notify.called)
|
||||
|
@ -0,0 +1,14 @@
|
||||
---
|
||||
features:
|
||||
- Neutron's callback API found in ``neutron.callbacks.*`` is now exposed
|
||||
in ``neutron_lib.callbacks.*``. In addition, a set of event payload objects
|
||||
are now available for use in transporting event payload data in a
|
||||
standardized way.
|
||||
- A test fixture is provided for isolating the global callback manager in
|
||||
``neutron_lib.callbacks.registry``. For more details see the comments in
|
||||
``neutron_lib.tests.unti.callbacks.base``.
|
||||
deprecations:
|
||||
- The use of ``neutron_lib.callbacks.registry.notify()`` and
|
||||
``neutron_lib.callbacks.manager.CallbacksManager.notify()`` is deprecated in
|
||||
favor of their ``publish()`` counterparts and will be removed in
|
||||
the "Queens" release time-frame.
|
Loading…
Reference in New Issue
Block a user