Merge "Use payloads for TRUNK and SUBPORTS callbacks"

This commit is contained in:
Zuul 2021-05-14 18:02:07 +00:00 committed by Gerrit Code Review
commit 0f3e04b00f
10 changed files with 171 additions and 147 deletions

View File

@ -1,34 +0,0 @@
# (c) Copyright 2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO(boden): remove this once moved over to neutron-lib payloads
class TrunkPayload(object):
"""Payload for trunk-related callback registry notifications."""
def __init__(self, context, trunk_id, current_trunk=None,
original_trunk=None, subports=None):
self.context = context
self.trunk_id = trunk_id
self.current_trunk = current_trunk
self.original_trunk = original_trunk
self.subports = subports if subports else []
def __eq__(self, other):
return (isinstance(other, self.__class__) and
self.__dict__ == other.__dict__)
def __ne__(self, other):
return not self.__eq__(other)

View File

@ -76,7 +76,7 @@ class DriverBase(object):
trunk plugin so that they can integrate without an explicit
register() method invocation.
:param resource: neutron.services.trunk.constants.TRUNK_PLUGIN
:param resource: neutron_lib.callbacks.resources.TRUNK_PLUGIN
:param event: neutron_lib.callbacks.events.AFTER_INIT
:param trigger: neutron.service.trunks.plugin.TrunkPlugin
"""

View File

@ -75,7 +75,8 @@ class OVSTrunkSkeleton(agent.TrunkSkeleton):
{'event': event_type, 'subports': subports, 'err': e})
@local_registry.receives(resources.TRUNK, [local_events.BEFORE_CREATE])
def check_trunk_dependencies(self, resource, event, trigger, **kwargs):
def check_trunk_dependencies(
self, resource, event, trigger, payload=None):
# The OVS trunk driver does not work with iptables firewall and QoS.
# We should validate the environment configuration and signal that
# something might be wrong.
@ -87,7 +88,7 @@ class OVSTrunkSkeleton(agent.TrunkSkeleton):
LOG.warning(
"Firewall driver iptables_hybrid is not compatible with "
"trunk ports. Trunk %(trunk_id)s may be insecure.",
{'trunk_id': kwargs['trunk'].id})
{'trunk_id': payload.resource_id})
def init_handler(resource, event, trigger, payload=None):

View File

@ -382,9 +382,10 @@ class OVSDBHandler(object):
return
try:
registry.notify(
registry.publish(
resources.TRUNK, events.BEFORE_CREATE, self,
context=ctx, trunk=trunk)
payload=events.DBEventPayload(ctx, resource_id=trunk.id,
desired_state=trunk))
self.trunk_manager.create_trunk(
trunk.id, trunk.port_id,
port['external_ids'].get('attached-mac'))

View File

@ -148,17 +148,17 @@ class OVNTrunkHandler(object):
def trunk_event(self, resource, event, trunk_plugin, payload):
if event == events.AFTER_CREATE:
self.trunk_created(payload.current_trunk)
self.trunk_created(payload.states[0])
elif event == events.AFTER_DELETE:
self.trunk_deleted(payload.original_trunk)
self.trunk_deleted(payload.states[0])
def subport_event(self, resource, event, trunk_plugin, payload):
if event == events.AFTER_CREATE:
self.subports_added(payload.original_trunk,
payload.subports)
self.subports_added(payload.states[0],
payload.metadata['subports'])
elif event == events.AFTER_DELETE:
self.subports_deleted(payload.original_trunk,
payload.subports)
self.subports_deleted(payload.states[0],
payload.metadata['subports'])
class OVNTrunkDriver(trunk_base.DriverBase):

View File

@ -34,7 +34,6 @@ from oslo_utils import uuidutils
from neutron.db import db_base_plugin_common
from neutron.objects import base as objects_base
from neutron.objects import trunk as trunk_objects
from neutron.services.trunk import callbacks
from neutron.services.trunk import drivers
from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk import rules
@ -252,12 +251,14 @@ class TrunkPlugin(service_base.ServicePluginBase):
sub_ports=sub_ports)
with db_api.CONTEXT_WRITER.using(context):
trunk_obj.create()
payload = callbacks.TrunkPayload(context, trunk_obj.id,
current_trunk=trunk_obj)
registry.notify(
payload = events.DBEventPayload(
context, resource_id=trunk_obj.id, desired_state=trunk_obj)
registry.publish(
resources.TRUNK, events.PRECOMMIT_CREATE, self,
payload=payload)
registry.notify(
payload = events.DBEventPayload(
context, resource_id=trunk_obj.id, states=(trunk_obj,))
registry.publish(
resources.TRUNK, events.AFTER_CREATE, self, payload=payload)
return trunk_obj
@ -279,11 +280,11 @@ class TrunkPlugin(service_base.ServicePluginBase):
desired_state=trunk_obj, request_body=trunk_data)
registry.publish(resources.TRUNK, events.PRECOMMIT_UPDATE, self,
payload=payload)
registry.notify(resources.TRUNK, events.AFTER_UPDATE, self,
payload=callbacks.TrunkPayload(
context, trunk_id,
original_trunk=original_trunk,
current_trunk=trunk_obj))
payload = events.DBEventPayload(
context, resource_id=trunk_id, states=(original_trunk, trunk_obj,),
request_body=trunk_data)
registry.publish(resources.TRUNK, events.AFTER_UPDATE, self,
payload=payload)
return trunk_obj
def delete_trunk(self, context, trunk_id):
@ -304,17 +305,18 @@ class TrunkPlugin(service_base.ServicePluginBase):
LOG.warning('Trunk driver raised exception when '
'deleting trunk port %s: %s', trunk_id,
str(e))
payload = callbacks.TrunkPayload(context, trunk_id,
original_trunk=trunk)
registry.notify(resources.TRUNK,
events.PRECOMMIT_DELETE,
self, payload=payload)
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(trunk,))
registry.publish(resources.TRUNK, events.PRECOMMIT_DELETE,
self, payload=payload)
else:
LOG.info('Trunk driver does not consider trunk %s '
'untrunkable', trunk_id)
raise trunk_exc.TrunkInUse(trunk_id=trunk_id)
registry.notify(resources.TRUNK, events.AFTER_DELETE, self,
payload=payload)
registry.publish(resources.TRUNK, events.AFTER_DELETE, self,
payload=events.DBEventPayload(
context, resource_id=trunk_id,
states=(trunk,)))
@db_base_plugin_common.convert_result_to_dict
def add_subports(self, context, trunk_id, subports):
@ -356,15 +358,21 @@ class TrunkPlugin(service_base.ServicePluginBase):
obj.create()
trunk['sub_ports'].append(obj)
added_subports.append(obj)
payload = callbacks.TrunkPayload(context, trunk_id,
current_trunk=trunk,
original_trunk=original_trunk,
subports=added_subports)
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(original_trunk, trunk,),
metadata={
'subports': added_subports
})
if added_subports:
registry.notify(resources.SUBPORTS, events.PRECOMMIT_CREATE,
self, payload=payload)
registry.publish(resources.SUBPORTS, events.PRECOMMIT_CREATE,
self, payload=payload)
if added_subports:
registry.notify(
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(original_trunk, trunk,),
metadata={
'subports': added_subports
})
registry.publish(
resources.SUBPORTS, events.AFTER_CREATE, self, payload=payload)
return trunk
@ -408,15 +416,21 @@ class TrunkPlugin(service_base.ServicePluginBase):
# with multiple concurrent requests), the status is still forced
# to DOWN. See add_subports() for more details.
trunk.update(status=constants.TRUNK_DOWN_STATUS)
payload = callbacks.TrunkPayload(context, trunk_id,
current_trunk=trunk,
original_trunk=original_trunk,
subports=removed_subports)
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(original_trunk, trunk,),
metadata={
'subports': removed_subports
})
if removed_subports:
registry.notify(resources.SUBPORTS, events.PRECOMMIT_DELETE,
self, payload=payload)
registry.publish(resources.SUBPORTS, events.PRECOMMIT_DELETE,
self, payload=payload)
if removed_subports:
registry.notify(
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(original_trunk, trunk,),
metadata={
'subports': removed_subports
})
registry.publish(
resources.SUBPORTS, events.AFTER_DELETE, self, payload=payload)
return trunk

View File

@ -33,30 +33,41 @@ class ServerSideRpcBackend(object):
LOG.debug("RPC backend initialized for trunk plugin")
@registry.receives(resources.TRUNK,
[events.AFTER_DELETE, events.AFTER_CREATE])
def process_trunk_payload_event(self, resource, event,
trunk_plugin, payload=None):
"""Emit RPC notifications to registered subscribers."""
# TODO(boden): refactor back into process_event once all events use
# callback payloads
context = payload.context
LOG.debug("RPC notification needed for trunk %s", payload.resource_id)
# On AFTER_DELETE event, current_trunk is None
payload = payload.latest_state
method = {
events.AFTER_CREATE: self._stub.trunk_created,
events.AFTER_DELETE: self._stub.trunk_deleted,
}
LOG.debug("Emitting event %s for resource %s", event, resource)
method[event](context, payload)
# Set up listeners to trunk events: they dispatch RPC messages
# to agents as needed. These are designed to work with any
# agent-based driver that may integrate with the trunk service
# plugin, e.g. linux bridge or ovs.
@registry.receives(resources.TRUNK,
[events.AFTER_CREATE, events.AFTER_DELETE])
@registry.receives(resources.SUBPORTS,
[events.AFTER_CREATE, events.AFTER_DELETE])
def process_event(self, resource, event, trunk_plugin, payload):
def process_event(self, resource, event, trunk_plugin, payload=None):
"""Emit RPC notifications to registered subscribers."""
context = payload.context
LOG.debug("RPC notification needed for trunk %s", payload.trunk_id)
if resource == resources.SUBPORTS:
payload = payload.subports
method = {
events.AFTER_CREATE: self._stub.subports_added,
events.AFTER_DELETE: self._stub.subports_deleted,
}
elif resource == resources.TRUNK:
# On AFTER_DELETE event, current_trunk is None
payload = payload.current_trunk or payload.original_trunk
method = {
events.AFTER_CREATE: self._stub.trunk_created,
events.AFTER_DELETE: self._stub.trunk_deleted,
}
LOG.debug("RPC notification needed for trunk %s", payload.resource_id)
payload = payload.metadata['subports']
method = {
events.AFTER_CREATE: self._stub.subports_added,
events.AFTER_DELETE: self._stub.subports_deleted,
}
LOG.debug("Emitting event %s for resource %s", event, resource)
method[event](context, payload)

View File

@ -263,11 +263,14 @@ class TestTrunkHandler(base.BaseTestCase):
status=trunk_consts.TRUNK_ACTIVE_STATUS)
def _fake_trunk_event_payload(self):
original_trunk = mock.Mock()
original_trunk.port_id = 'original_trunk_port_id'
current_trunk = mock.Mock()
current_trunk.port_id = 'current_trunk_port_id'
payload = mock.Mock()
payload.current_trunk = mock.Mock()
payload.current_trunk.port_id = 'current_trunk_port_id'
payload.original_trunk = mock.Mock()
payload.original_trunk.port_id = 'original_trunk_port_id'
payload.states = (original_trunk, current_trunk)
current_subport = mock.Mock()
current_subport.segmentation_id = 40
current_subport.trunk_id = 'current_trunk_port_id'
@ -276,8 +279,9 @@ class TestTrunkHandler(base.BaseTestCase):
original_subport.segmentation_id = 41
original_subport.trunk_id = 'original_trunk_port_id'
original_subport.port_id = 'original_subport_port_id'
payload.current_trunk.sub_ports = [current_subport]
payload.original_trunk.sub_ports = [original_subport]
current_trunk.sub_ports = [current_subport]
original_trunk.sub_ports = [original_subport]
return payload
@mock.patch.object(trunk_driver.OVNTrunkHandler, '_set_sub_ports')
@ -286,9 +290,9 @@ class TestTrunkHandler(base.BaseTestCase):
self.handler.trunk_event(
mock.ANY, events.AFTER_CREATE, mock.ANY, fake_payload)
set_subports.assert_called_once_with(
fake_payload.current_trunk.port_id,
fake_payload.current_trunk.sub_ports)
fake_payload.current_trunk.update.assert_called_once_with(
fake_payload.states[0].port_id,
fake_payload.states[0].sub_ports)
fake_payload.states[0].update.assert_called_once_with(
status=trunk_consts.TRUNK_ACTIVE_STATUS)
@mock.patch.object(trunk_driver.OVNTrunkHandler, '_unset_sub_ports')
@ -297,7 +301,7 @@ class TestTrunkHandler(base.BaseTestCase):
self.handler.trunk_event(
mock.ANY, events.AFTER_DELETE, mock.ANY, fake_payload)
unset_subports.assert_called_once_with(
fake_payload.original_trunk.sub_ports)
fake_payload.states[0].sub_ports)
@mock.patch.object(trunk_driver.OVNTrunkHandler, '_set_sub_ports')
@mock.patch.object(trunk_driver.OVNTrunkHandler, '_unset_sub_ports')
@ -309,14 +313,18 @@ class TestTrunkHandler(base.BaseTestCase):
unset_subports.assert_not_called()
def _fake_subport_event_payload(self):
original_trunk = mock.Mock()
original_trunk.port_id = 'original_trunk_port_id'
payload = mock.Mock()
payload.original_trunk = mock.Mock()
payload.original_trunk.port_id = 'original_trunk_port_id'
payload.states = (original_trunk,)
original_subport = mock.Mock()
original_subport.segmentation_id = 41
original_subport.trunk_id = 'original_trunk_port_id'
original_subport.port_id = 'original_subport_port_id'
payload.subports = [original_subport]
payload.metadata = {'subports': [original_subport]}
return payload
@mock.patch.object(trunk_driver.OVNTrunkHandler, 'subports_added')
@ -325,7 +333,7 @@ class TestTrunkHandler(base.BaseTestCase):
self.handler.subport_event(
mock.ANY, events.AFTER_CREATE, mock.ANY, fake_payload)
s_added.assert_called_once_with(
fake_payload.original_trunk, fake_payload.subports)
fake_payload.states[0], fake_payload.metadata['subports'])
@mock.patch.object(trunk_driver.OVNTrunkHandler, 'subports_deleted')
def test_subport_event_delete(self, s_deleted):
@ -333,7 +341,7 @@ class TestTrunkHandler(base.BaseTestCase):
self.handler.subport_event(
mock.ANY, events.AFTER_DELETE, mock.ANY, fake_payload)
s_deleted.assert_called_once_with(
fake_payload.original_trunk, fake_payload.subports)
fake_payload.states[0], fake_payload.metadata['subports'])
@mock.patch.object(trunk_driver.OVNTrunkHandler, 'subports_added')
@mock.patch.object(trunk_driver.OVNTrunkHandler, 'subports_deleted')

View File

@ -18,7 +18,6 @@ from neutron_lib.callbacks import resources
from neutron_lib import fixture
from neutron.api.rpc.callbacks import resource_manager
from neutron.services.trunk import callbacks
from neutron.services.trunk.rpc import backend
from neutron.tests import base
from neutron.tests import tools
@ -39,12 +38,12 @@ class ServerSideRpcBackendTest(base.BaseTestCase):
calls = [mock.call(
*tools.get_subscribe_args(
test_obj.process_event,
test_obj.process_trunk_payload_event,
resources.TRUNK,
events.AFTER_CREATE)),
mock.call(
*tools.get_subscribe_args(
test_obj.process_event,
test_obj.process_trunk_payload_event,
resources.TRUNK,
events.AFTER_DELETE)),
mock.call(
@ -65,19 +64,20 @@ class ServerSideRpcBackendTest(base.BaseTestCase):
test_obj._stub = mock_stub = mock.Mock()
trunk_plugin = mock.Mock()
test_obj.process_event(
test_obj.process_trunk_payload_event(
resources.TRUNK, events.AFTER_CREATE, trunk_plugin,
callbacks.TrunkPayload("context",
"id",
current_trunk="current_trunk"))
test_obj.process_event(
events.DBEventPayload("context",
resource_id="id",
states=("current_trunk",)))
test_obj.process_trunk_payload_event(
resources.TRUNK, events.AFTER_DELETE, trunk_plugin,
callbacks.TrunkPayload("context",
"id",
original_trunk="original_trunk"))
events.DBEventPayload("context",
resource_id="id",
states=("original_trunk",)))
calls = [mock.call.trunk_created("context",
"current_trunk"),
"current_trunk"),
mock.call.trunk_deleted("context",
"original_trunk")]
"original_trunk")]
mock_stub.assert_has_calls(calls, any_order=False)

View File

@ -24,7 +24,6 @@ from neutron_lib.services.trunk import constants
import testtools
from neutron.objects import trunk as trunk_objects
from neutron.services.trunk import callbacks
from neutron.services.trunk import drivers
from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk import plugin as trunk_plugin
@ -116,10 +115,12 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
callback = register_mock_callback(resources.TRUNK, event)
trunk = self._create_test_trunk(parent_port)
trunk_obj = self._get_trunk_obj(trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
current_trunk=trunk_obj)
callback.assert_called_once_with(
resources.TRUNK, event, self.trunk_plugin, payload=payload)
resources.TRUNK, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(self.context, payload.context)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(trunk['id'], payload.resource_id)
def test_create_trunk_notify_after_create(self):
self._test_trunk_create_notify(events.AFTER_CREATE)
@ -136,11 +137,13 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
self.trunk_plugin.update_trunk(self.context, trunk['id'],
trunk_req)
trunk_obj = self._get_trunk_obj(trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
original_trunk=orig_trunk_obj,
current_trunk=trunk_obj)
callback.assert_called_once_with(
resources.TRUNK, event, self.trunk_plugin, payload=payload)
resources.TRUNK, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(self.context, payload.context)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(trunk['id'], payload.resource_id)
self.assertEqual(orig_trunk_obj, payload.states[0])
def test_trunk_update_notify_after_update(self):
self._test_trunk_update_notify(events.AFTER_UPDATE)
@ -170,13 +173,29 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
trunk = self._create_test_trunk(parent_port)
trunk_obj = self._get_trunk_obj(trunk['id'])
self.trunk_plugin.delete_trunk(self.context, trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
original_trunk=trunk_obj)
callback.assert_called_once_with(
resources.TRUNK, event, self.trunk_plugin, payload=payload)
resources.TRUNK, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(self.context, payload.context)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(trunk['id'], payload.resource_id)
def test_delete_trunk_notify_after_delete(self):
self._test_trunk_delete_notify(events.AFTER_DELETE)
# TODO(boden): refactor into common method once all use payloads
with self.port() as parent_port:
callback = register_mock_callback(resources.TRUNK,
events.AFTER_DELETE)
trunk = self._create_test_trunk(parent_port)
trunk_obj = self._get_trunk_obj(trunk['id'])
self.trunk_plugin.delete_trunk(self.context, trunk['id'])
callback.assert_called_once_with(
resources.TRUNK, events.AFTER_DELETE,
self.trunk_plugin, payload=mock.ANY)
call_payload = callback.mock_calls[0][2]['payload']
self.assertEqual(trunk['id'], call_payload.resource_id)
self.assertEqual((trunk_obj,), call_payload.states)
def test_delete_trunk_notify_precommit_delete(self):
self._test_trunk_delete_notify(events.PRECOMMIT_DELETE)
@ -218,12 +237,13 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
self.context, trunk['id'], {'sub_ports': [subport]})
trunk_obj = self._get_trunk_obj(trunk['id'])
subport_obj = self._get_subport_obj(subport['port_id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
current_trunk=trunk_obj,
original_trunk=orig_trunk_obj,
subports=[subport_obj])
callback.assert_called_once_with(
resources.SUBPORTS, event, self.trunk_plugin, payload=payload)
resources.SUBPORTS, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(trunk['id'], payload.resource_id)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(orig_trunk_obj, payload.states[0])
self.assertEqual([subport_obj], payload.metadata['subports'])
def test_add_subports_notify_after_create(self):
self._test_add_subports_notify(events.AFTER_CREATE)
@ -241,12 +261,13 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
self.trunk_plugin.remove_subports(
self.context, trunk['id'], {'sub_ports': [subport]})
trunk_obj = self._get_trunk_obj(trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
current_trunk=trunk_obj,
original_trunk=orig_trunk_obj,
subports=[subport_obj])
callback.assert_called_once_with(
resources.SUBPORTS, event, self.trunk_plugin, payload=payload)
resources.SUBPORTS, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(trunk['id'], payload.resource_id)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(orig_trunk_obj, payload.states[0])
self.assertEqual([subport_obj], payload.metadata['subports'])
def test_remove_subports_notify_after_delete(self):
self._test_remove_subports_notify(events.AFTER_DELETE)
@ -304,12 +325,14 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
parent, original_port,
constants.TRUNK_ACTIVE_STATUS,
constants.TRUNK_DOWN_STATUS))
payload = callbacks.TrunkPayload(self.context, original_trunk['id'],
original_trunk=original_trunk,
current_trunk=current_trunk)
callback.assert_called_once_with(
resources.TRUNK, events.AFTER_UPDATE,
self.trunk_plugin, payload=payload)
self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(self.context, payload.context)
self.assertEqual(current_trunk, payload.latest_state)
self.assertEqual(original_trunk['id'], payload.resource_id)
self.assertEqual(original_trunk, payload.states[0])
def test__trigger_trunk_status_change_vif_type_unchanged(self):
with self.port() as parent: