Add notifications for trunk CRUD and standardize payload

Registry notifications should be added for trunk create/update/delete,
and SDN controllers as subscribers will need more callbacks as PRECOMMIT
events.

A standardized notification payload will consist of the following:
- context
- trunk_id
- current_trunk
- original_trunk
- subports

This standardized payload will be used in all trunk-related
notifications (trunk create/update/delete and subport add/remove) and
represents the maximum possible amount of information that can be given
in order to future-proof them.

As a bug fix, AFTER_REMOVE event on subport shouldn't be called
during the db transaction, but after the db commit.

Partially-implements: blueprint vlan-aware-vms
Change-Id: Ic1f44fa53cf9f10bd029ea47824e8ba91a8ab485
Co-Authored-By: Isaku Yamahata <isaku.yamahata@intel.com>
This commit is contained in:
Rawlin Peters 2016-07-14 15:04:24 -07:00
parent f7ec19ad01
commit 6877b47105
3 changed files with 202 additions and 19 deletions

View File

@ -0,0 +1,33 @@
# (c) Copyright 2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class TrunkPayload(object):
"""Payload for trunk-related callback registry notifications."""
def __init__(self, context, trunk_id, current_trunk=None,
original_trunk=None, subports=None):
self.context = context
self.trunk_id = trunk_id
self.current_trunk = current_trunk
self.original_trunk = original_trunk
self.subports = subports if subports else []
def __eq__(self, other):
return (isinstance(other, self.__class__) and
self.__dict__ == other.__dict__)
def __ne__(self, other):
return not self.__eq__(other)

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import copy
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import uuidutils from oslo_utils import uuidutils
@ -26,6 +28,7 @@ from neutron.db import db_base_plugin_v2
from neutron.objects import base as objects_base from neutron.objects import base as objects_base
from neutron.objects import trunk as trunk_objects from neutron.objects import trunk as trunk_objects
from neutron.services import service_base from neutron.services import service_base
from neutron.services.trunk import callbacks
from neutron.services.trunk import constants from neutron.services.trunk import constants
from neutron.services.trunk import exceptions as trunk_exc from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk import rules from neutron.services.trunk import rules
@ -122,7 +125,15 @@ class TrunkPlugin(service_base.ServicePluginBase,
tenant_id=trunk['tenant_id'], tenant_id=trunk['tenant_id'],
port_id=trunk['port_id'], port_id=trunk['port_id'],
sub_ports=sub_ports) sub_ports=sub_ports)
trunk_obj.create() with db_api.autonested_transaction(context.session):
trunk_obj.create()
payload = callbacks.TrunkPayload(context, trunk_obj.id,
current_trunk=trunk_obj)
registry.notify(
constants.TRUNK, events.PRECOMMIT_CREATE, self,
payload=payload)
registry.notify(
constants.TRUNK, events.AFTER_CREATE, self, payload=payload)
return trunk_obj return trunk_obj
@db_base_plugin_common.convert_result_to_dict @db_base_plugin_common.convert_result_to_dict
@ -131,9 +142,17 @@ class TrunkPlugin(service_base.ServicePluginBase,
trunk_data = trunk['trunk'] trunk_data = trunk['trunk']
with db_api.autonested_transaction(context.session): with db_api.autonested_transaction(context.session):
trunk_obj = self._get_trunk(context, trunk_id) trunk_obj = self._get_trunk(context, trunk_id)
original_trunk = copy.deepcopy(trunk_obj)
trunk_obj.update_fields(trunk_data, reset_changes=True) trunk_obj.update_fields(trunk_data, reset_changes=True)
trunk_obj.update() trunk_obj.update()
return trunk_obj payload = callbacks.TrunkPayload(context, trunk_id,
original_trunk=original_trunk,
current_trunk=trunk_obj)
registry.notify(constants.TRUNK, events.PRECOMMIT_UPDATE, self,
payload=payload)
registry.notify(constants.TRUNK, events.AFTER_UPDATE, self,
payload=payload)
return trunk_obj
def delete_trunk(self, context, trunk_id): def delete_trunk(self, context, trunk_id):
"""Delete the specified trunk.""" """Delete the specified trunk."""
@ -143,8 +162,14 @@ class TrunkPlugin(service_base.ServicePluginBase,
trunk_port_validator = rules.TrunkPortValidator(trunk.port_id) trunk_port_validator = rules.TrunkPortValidator(trunk.port_id)
if not trunk_port_validator.is_bound(context): if not trunk_port_validator.is_bound(context):
trunk.delete() trunk.delete()
payload = callbacks.TrunkPayload(context, trunk_id,
original_trunk=trunk)
registry.notify(constants.TRUNK, events.PRECOMMIT_DELETE, self,
payload=payload)
else: else:
raise trunk_exc.TrunkInUse(trunk_id=trunk_id) raise trunk_exc.TrunkInUse(trunk_id=trunk_id)
registry.notify(constants.TRUNK, events.AFTER_DELETE, self,
payload=payload)
@db_base_plugin_common.convert_result_to_dict @db_base_plugin_common.convert_result_to_dict
def add_subports(self, context, trunk_id, subports): def add_subports(self, context, trunk_id, subports):
@ -159,6 +184,7 @@ class TrunkPlugin(service_base.ServicePluginBase,
with db_api.autonested_transaction(context.session): with db_api.autonested_transaction(context.session):
trunk = self._get_trunk(context, trunk_id) trunk = self._get_trunk(context, trunk_id)
original_trunk = copy.deepcopy(trunk)
rules.trunk_can_be_managed(context, trunk) rules.trunk_can_be_managed(context, trunk)
for subport in subports: for subport in subports:
obj = trunk_objects.SubPort( obj = trunk_objects.SubPort(
@ -170,10 +196,16 @@ class TrunkPlugin(service_base.ServicePluginBase,
obj.create() obj.create()
trunk['sub_ports'].append(obj) trunk['sub_ports'].append(obj)
added_subports.append(obj) added_subports.append(obj)
payload = callbacks.TrunkPayload(context, trunk_id,
registry.notify( current_trunk=trunk,
constants.SUBPORTS, events.AFTER_CREATE, self, original_trunk=original_trunk,
added_subports=added_subports) subports=added_subports)
if added_subports:
registry.notify(constants.SUBPORTS, events.PRECOMMIT_CREATE,
self, payload=payload)
if added_subports:
registry.notify(
constants.SUBPORTS, events.AFTER_CREATE, self, payload=payload)
return trunk return trunk
@db_base_plugin_common.convert_result_to_dict @db_base_plugin_common.convert_result_to_dict
@ -182,6 +214,7 @@ class TrunkPlugin(service_base.ServicePluginBase,
subports = subports['sub_ports'] subports = subports['sub_ports']
with db_api.autonested_transaction(context.session): with db_api.autonested_transaction(context.session):
trunk = self._get_trunk(context, trunk_id) trunk = self._get_trunk(context, trunk_id)
original_trunk = copy.deepcopy(trunk)
rules.trunk_can_be_managed(context, trunk) rules.trunk_can_be_managed(context, trunk)
subports_validator = rules.SubPortsValidator( subports_validator = rules.SubPortsValidator(
@ -205,11 +238,19 @@ class TrunkPlugin(service_base.ServicePluginBase,
subport_obj.delete() subport_obj.delete()
removed_subports.append(subport_obj) removed_subports.append(subport_obj)
trunk.sub_ports = list(current_subports.values()) del trunk.sub_ports[:]
trunk.sub_ports.extend(current_subports.values())
payload = callbacks.TrunkPayload(context, trunk_id,
current_trunk=trunk,
original_trunk=original_trunk,
subports=removed_subports)
if removed_subports:
registry.notify(constants.SUBPORTS, events.PRECOMMIT_DELETE,
self, payload=payload)
if removed_subports:
registry.notify( registry.notify(
constants.SUBPORTS, events.AFTER_DELETE, self, constants.SUBPORTS, events.AFTER_DELETE, self, payload=payload)
removed_subports=removed_subports) return trunk
return trunk
@db_base_plugin_common.filter_fields @db_base_plugin_common.filter_fields
def get_subports(self, context, trunk_id, fields=None): def get_subports(self, context, trunk_id, fields=None):

View File

@ -19,6 +19,7 @@ from neutron.callbacks import events
from neutron.callbacks import registry from neutron.callbacks import registry
from neutron import manager from neutron import manager
from neutron.objects import trunk as trunk_objects from neutron.objects import trunk as trunk_objects
from neutron.services.trunk import callbacks
from neutron.services.trunk import constants from neutron.services.trunk import constants
from neutron.services.trunk import exceptions as trunk_exc from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk import plugin as trunk_plugin from neutron.services.trunk import plugin as trunk_plugin
@ -53,6 +54,9 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
self.trunk_plugin.create_trunk(self.context, {'trunk': trunk})) self.trunk_plugin.create_trunk(self.context, {'trunk': trunk}))
return response return response
def _get_trunk_obj(self, trunk_id):
return trunk_objects.Trunk.get_object(self.context, id=trunk_id)
def _get_subport_obj(self, port_id): def _get_subport_obj(self, port_id):
subports = trunk_objects.SubPort.get_objects( subports = trunk_objects.SubPort.get_objects(
self.context, port_id=port_id) self.context, port_id=port_id)
@ -88,24 +92,129 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
self.trunk_plugin.delete_trunk, self.trunk_plugin.delete_trunk,
self.context, trunk['id']) self.context, trunk['id'])
def _test_subports_action_notify(self, event, payload_key): def _test_trunk_create_notify(self, event):
with self.port() as parent_port:
callback = register_mock_callback(constants.TRUNK, event)
trunk = self._create_test_trunk(parent_port)
trunk_obj = self._get_trunk_obj(trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
current_trunk=trunk_obj)
callback.assert_called_once_with(
constants.TRUNK, event, self.trunk_plugin, payload=payload)
def test_create_trunk_notify_after_create(self):
self._test_trunk_create_notify(events.AFTER_CREATE)
def test_create_trunk_notify_precommit_create(self):
self._test_trunk_create_notify(events.PRECOMMIT_CREATE)
def _test_trunk_update_notify(self, event):
with self.port() as parent_port:
callback = register_mock_callback(constants.TRUNK, event)
trunk = self._create_test_trunk(parent_port)
orig_trunk_obj = self._get_trunk_obj(trunk['id'])
trunk_req = {'trunk': {'name': 'foo'}}
self.trunk_plugin.update_trunk(self.context, trunk['id'],
trunk_req)
trunk_obj = self._get_trunk_obj(trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
original_trunk=orig_trunk_obj,
current_trunk=trunk_obj)
callback.assert_called_once_with(
constants.TRUNK, event, self.trunk_plugin, payload=payload)
def test_trunk_update_notify_after_update(self):
self._test_trunk_update_notify(events.AFTER_UPDATE)
def test_trunk_update_notify_precommit_update(self):
self._test_trunk_update_notify(events.PRECOMMIT_UPDATE)
def _test_trunk_delete_notify(self, event):
with self.port() as parent_port:
callback = register_mock_callback(constants.TRUNK, event)
trunk = self._create_test_trunk(parent_port)
trunk_obj = self._get_trunk_obj(trunk['id'])
self.trunk_plugin.delete_trunk(self.context, trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
original_trunk=trunk_obj)
callback.assert_called_once_with(
constants.TRUNK, event, self.trunk_plugin, payload=payload)
def test_delete_trunk_notify_after_delete(self):
self._test_trunk_delete_notify(events.AFTER_DELETE)
def test_delete_trunk_notify_precommit_delete(self):
self._test_trunk_delete_notify(events.PRECOMMIT_DELETE)
def _test_subport_action_empty_list_no_notify(self, event, subport_method):
with self.port() as parent_port:
trunk = self._create_test_trunk(parent_port)
callback = register_mock_callback(constants.SUBPORTS, event)
subport_method(self.context, trunk['id'], {'sub_ports': []})
callback.assert_not_called()
def _test_add_subports_no_notification(self, event):
self._test_subport_action_empty_list_no_notify(
event, self.trunk_plugin.add_subports)
def test_add_subports_notify_after_create_empty_list(self):
self._test_add_subports_no_notification(events.AFTER_CREATE)
def test_add_subports_notify_precommit_create_empty_list(self):
self._test_add_subports_no_notification(events.PRECOMMIT_CREATE)
def _test_remove_subports_no_notification(self, event):
self._test_subport_action_empty_list_no_notify(
event, self.trunk_plugin.remove_subports)
def test_remove_subports_notify_after_delete_empty_list(self):
self._test_remove_subports_no_notification(events.AFTER_DELETE)
def test_remove_subports_notify_precommit_delete_empty_list(self):
self._test_remove_subports_no_notification(events.PRECOMMIT_DELETE)
def _test_add_subports_notify(self, event):
with self.port() as parent_port, self.port() as child_port: with self.port() as parent_port, self.port() as child_port:
trunk = self._create_test_trunk(parent_port) trunk = self._create_test_trunk(parent_port)
orig_trunk_obj = self._get_trunk_obj(trunk['id'])
subport = create_subport_dict(child_port['port']['id']) subport = create_subport_dict(child_port['port']['id'])
callback = register_mock_callback(constants.SUBPORTS, event) callback = register_mock_callback(constants.SUBPORTS, event)
self.trunk_plugin.add_subports( self.trunk_plugin.add_subports(
self.context, trunk['id'], {'sub_ports': [subport]}) self.context, trunk['id'], {'sub_ports': [subport]})
trunk_obj = self._get_trunk_obj(trunk['id'])
subport_obj = self._get_subport_obj(subport['port_id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
current_trunk=trunk_obj,
original_trunk=orig_trunk_obj,
subports=[subport_obj])
callback.assert_called_once_with(
constants.SUBPORTS, event, self.trunk_plugin, payload=payload)
def test_add_subports_notify_after_create(self):
self._test_add_subports_notify(events.AFTER_CREATE)
def test_add_subports_notify_precommit_create(self):
self._test_add_subports_notify(events.PRECOMMIT_CREATE)
def _test_remove_subports_notify(self, event):
with self.port() as parent_port, self.port() as child_port:
subport = create_subport_dict(child_port['port']['id'])
trunk = self._create_test_trunk(parent_port, [subport])
orig_trunk_obj = self._get_trunk_obj(trunk['id'])
callback = register_mock_callback(constants.SUBPORTS, event)
subport_obj = self._get_subport_obj(subport['port_id']) subport_obj = self._get_subport_obj(subport['port_id'])
self.trunk_plugin.remove_subports( self.trunk_plugin.remove_subports(
self.context, trunk['id'], {'sub_ports': [subport]}) self.context, trunk['id'], {'sub_ports': [subport]})
payload = {payload_key: [subport_obj]} trunk_obj = self._get_trunk_obj(trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
current_trunk=trunk_obj,
original_trunk=orig_trunk_obj,
subports=[subport_obj])
callback.assert_called_once_with( callback.assert_called_once_with(
constants.SUBPORTS, event, self.trunk_plugin, **payload) constants.SUBPORTS, event, self.trunk_plugin, payload=payload)
def test_add_subports_notify(self): def test_remove_subports_notify_after_delete(self):
self._test_subports_action_notify(events.AFTER_CREATE, self._test_remove_subports_notify(events.AFTER_DELETE)
'added_subports')
def test_remove_subports_notify(self): def test_remove_subports_notify_precommit_delete(self):
self._test_subports_action_notify(events.AFTER_DELETE, self._test_remove_subports_notify(events.PRECOMMIT_DELETE)
'removed_subports')