Use payloads for TRUNK and SUBPORTS callbacks

This patch switches the code over to the payload style of callbacks [1]
for TRUNK and SUBPORTS events. As needed existing callbacks are shimmed
to support both payload and kwarg style callbacks. These shims will be
removed once all callbacks are switched over to payloads.
Also the neutron.services.trunk.callback module is removed as consumers
will no longer need the TrunkPayload therein.

NeutronLibImpact

[1]
https://docs.openstack.org/neutron-lib/latest/contributor/callbacks.html

Change-Id: Ie302b48b283f8780072b5c9e2bc8787d87c11794
This commit is contained in:
Nurmatov Mamatisa 2021-04-20 14:35:34 +03:00
parent 77b7d8b452
commit 69ef824069
10 changed files with 171 additions and 147 deletions

View File

@ -1,34 +0,0 @@
# (c) Copyright 2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO(boden): remove this once moved over to neutron-lib payloads
class TrunkPayload(object):
"""Payload for trunk-related callback registry notifications."""
def __init__(self, context, trunk_id, current_trunk=None,
original_trunk=None, subports=None):
self.context = context
self.trunk_id = trunk_id
self.current_trunk = current_trunk
self.original_trunk = original_trunk
self.subports = subports if subports else []
def __eq__(self, other):
return (isinstance(other, self.__class__) and
self.__dict__ == other.__dict__)
def __ne__(self, other):
return not self.__eq__(other)

View File

@ -76,7 +76,7 @@ class DriverBase(object):
trunk plugin so that they can integrate without an explicit
register() method invocation.
:param resource: neutron.services.trunk.constants.TRUNK_PLUGIN
:param resource: neutron_lib.callbacks.resources.TRUNK_PLUGIN
:param event: neutron_lib.callbacks.events.AFTER_INIT
:param trigger: neutron.service.trunks.plugin.TrunkPlugin
"""

View File

@ -75,7 +75,8 @@ class OVSTrunkSkeleton(agent.TrunkSkeleton):
{'event': event_type, 'subports': subports, 'err': e})
@local_registry.receives(resources.TRUNK, [local_events.BEFORE_CREATE])
def check_trunk_dependencies(self, resource, event, trigger, **kwargs):
def check_trunk_dependencies(
self, resource, event, trigger, payload=None):
# The OVS trunk driver does not work with iptables firewall and QoS.
# We should validate the environment configuration and signal that
# something might be wrong.
@ -87,7 +88,7 @@ class OVSTrunkSkeleton(agent.TrunkSkeleton):
LOG.warning(
"Firewall driver iptables_hybrid is not compatible with "
"trunk ports. Trunk %(trunk_id)s may be insecure.",
{'trunk_id': kwargs['trunk'].id})
{'trunk_id': payload.resource_id})
def init_handler(resource, event, trigger, payload=None):

View File

@ -382,9 +382,10 @@ class OVSDBHandler(object):
return
try:
registry.notify(
registry.publish(
resources.TRUNK, events.BEFORE_CREATE, self,
context=ctx, trunk=trunk)
payload=events.DBEventPayload(ctx, resource_id=trunk.id,
desired_state=trunk))
self.trunk_manager.create_trunk(
trunk.id, trunk.port_id,
port['external_ids'].get('attached-mac'))

View File

@ -148,17 +148,17 @@ class OVNTrunkHandler(object):
def trunk_event(self, resource, event, trunk_plugin, payload):
if event == events.AFTER_CREATE:
self.trunk_created(payload.current_trunk)
self.trunk_created(payload.states[0])
elif event == events.AFTER_DELETE:
self.trunk_deleted(payload.original_trunk)
self.trunk_deleted(payload.states[0])
def subport_event(self, resource, event, trunk_plugin, payload):
if event == events.AFTER_CREATE:
self.subports_added(payload.original_trunk,
payload.subports)
self.subports_added(payload.states[0],
payload.metadata['subports'])
elif event == events.AFTER_DELETE:
self.subports_deleted(payload.original_trunk,
payload.subports)
self.subports_deleted(payload.states[0],
payload.metadata['subports'])
class OVNTrunkDriver(trunk_base.DriverBase):

View File

@ -34,7 +34,6 @@ from oslo_utils import uuidutils
from neutron.db import db_base_plugin_common
from neutron.objects import base as objects_base
from neutron.objects import trunk as trunk_objects
from neutron.services.trunk import callbacks
from neutron.services.trunk import drivers
from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk import rules
@ -252,12 +251,14 @@ class TrunkPlugin(service_base.ServicePluginBase):
sub_ports=sub_ports)
with db_api.CONTEXT_WRITER.using(context):
trunk_obj.create()
payload = callbacks.TrunkPayload(context, trunk_obj.id,
current_trunk=trunk_obj)
registry.notify(
payload = events.DBEventPayload(
context, resource_id=trunk_obj.id, desired_state=trunk_obj)
registry.publish(
resources.TRUNK, events.PRECOMMIT_CREATE, self,
payload=payload)
registry.notify(
payload = events.DBEventPayload(
context, resource_id=trunk_obj.id, states=(trunk_obj,))
registry.publish(
resources.TRUNK, events.AFTER_CREATE, self, payload=payload)
return trunk_obj
@ -279,11 +280,11 @@ class TrunkPlugin(service_base.ServicePluginBase):
desired_state=trunk_obj, request_body=trunk_data)
registry.publish(resources.TRUNK, events.PRECOMMIT_UPDATE, self,
payload=payload)
registry.notify(resources.TRUNK, events.AFTER_UPDATE, self,
payload=callbacks.TrunkPayload(
context, trunk_id,
original_trunk=original_trunk,
current_trunk=trunk_obj))
payload = events.DBEventPayload(
context, resource_id=trunk_id, states=(original_trunk, trunk_obj,),
request_body=trunk_data)
registry.publish(resources.TRUNK, events.AFTER_UPDATE, self,
payload=payload)
return trunk_obj
def delete_trunk(self, context, trunk_id):
@ -304,17 +305,18 @@ class TrunkPlugin(service_base.ServicePluginBase):
LOG.warning('Trunk driver raised exception when '
'deleting trunk port %s: %s', trunk_id,
str(e))
payload = callbacks.TrunkPayload(context, trunk_id,
original_trunk=trunk)
registry.notify(resources.TRUNK,
events.PRECOMMIT_DELETE,
self, payload=payload)
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(trunk,))
registry.publish(resources.TRUNK, events.PRECOMMIT_DELETE,
self, payload=payload)
else:
LOG.info('Trunk driver does not consider trunk %s '
'untrunkable', trunk_id)
raise trunk_exc.TrunkInUse(trunk_id=trunk_id)
registry.notify(resources.TRUNK, events.AFTER_DELETE, self,
payload=payload)
registry.publish(resources.TRUNK, events.AFTER_DELETE, self,
payload=events.DBEventPayload(
context, resource_id=trunk_id,
states=(trunk,)))
@db_base_plugin_common.convert_result_to_dict
def add_subports(self, context, trunk_id, subports):
@ -356,15 +358,21 @@ class TrunkPlugin(service_base.ServicePluginBase):
obj.create()
trunk['sub_ports'].append(obj)
added_subports.append(obj)
payload = callbacks.TrunkPayload(context, trunk_id,
current_trunk=trunk,
original_trunk=original_trunk,
subports=added_subports)
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(original_trunk, trunk,),
metadata={
'subports': added_subports
})
if added_subports:
registry.notify(resources.SUBPORTS, events.PRECOMMIT_CREATE,
self, payload=payload)
registry.publish(resources.SUBPORTS, events.PRECOMMIT_CREATE,
self, payload=payload)
if added_subports:
registry.notify(
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(original_trunk, trunk,),
metadata={
'subports': added_subports
})
registry.publish(
resources.SUBPORTS, events.AFTER_CREATE, self, payload=payload)
return trunk
@ -408,15 +416,21 @@ class TrunkPlugin(service_base.ServicePluginBase):
# with multiple concurrent requests), the status is still forced
# to DOWN. See add_subports() for more details.
trunk.update(status=constants.TRUNK_DOWN_STATUS)
payload = callbacks.TrunkPayload(context, trunk_id,
current_trunk=trunk,
original_trunk=original_trunk,
subports=removed_subports)
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(original_trunk, trunk,),
metadata={
'subports': removed_subports
})
if removed_subports:
registry.notify(resources.SUBPORTS, events.PRECOMMIT_DELETE,
self, payload=payload)
registry.publish(resources.SUBPORTS, events.PRECOMMIT_DELETE,
self, payload=payload)
if removed_subports:
registry.notify(
payload = events.DBEventPayload(context, resource_id=trunk_id,
states=(original_trunk, trunk,),
metadata={
'subports': removed_subports
})
registry.publish(
resources.SUBPORTS, events.AFTER_DELETE, self, payload=payload)
return trunk

View File

@ -33,30 +33,41 @@ class ServerSideRpcBackend(object):
LOG.debug("RPC backend initialized for trunk plugin")
@registry.receives(resources.TRUNK,
[events.AFTER_DELETE, events.AFTER_CREATE])
def process_trunk_payload_event(self, resource, event,
trunk_plugin, payload=None):
"""Emit RPC notifications to registered subscribers."""
# TODO(boden): refactor back into process_event once all events use
# callback payloads
context = payload.context
LOG.debug("RPC notification needed for trunk %s", payload.resource_id)
# On AFTER_DELETE event, current_trunk is None
payload = payload.latest_state
method = {
events.AFTER_CREATE: self._stub.trunk_created,
events.AFTER_DELETE: self._stub.trunk_deleted,
}
LOG.debug("Emitting event %s for resource %s", event, resource)
method[event](context, payload)
# Set up listeners to trunk events: they dispatch RPC messages
# to agents as needed. These are designed to work with any
# agent-based driver that may integrate with the trunk service
# plugin, e.g. linux bridge or ovs.
@registry.receives(resources.TRUNK,
[events.AFTER_CREATE, events.AFTER_DELETE])
@registry.receives(resources.SUBPORTS,
[events.AFTER_CREATE, events.AFTER_DELETE])
def process_event(self, resource, event, trunk_plugin, payload):
def process_event(self, resource, event, trunk_plugin, payload=None):
"""Emit RPC notifications to registered subscribers."""
context = payload.context
LOG.debug("RPC notification needed for trunk %s", payload.trunk_id)
if resource == resources.SUBPORTS:
payload = payload.subports
method = {
events.AFTER_CREATE: self._stub.subports_added,
events.AFTER_DELETE: self._stub.subports_deleted,
}
elif resource == resources.TRUNK:
# On AFTER_DELETE event, current_trunk is None
payload = payload.current_trunk or payload.original_trunk
method = {
events.AFTER_CREATE: self._stub.trunk_created,
events.AFTER_DELETE: self._stub.trunk_deleted,
}
LOG.debug("RPC notification needed for trunk %s", payload.resource_id)
payload = payload.metadata['subports']
method = {
events.AFTER_CREATE: self._stub.subports_added,
events.AFTER_DELETE: self._stub.subports_deleted,
}
LOG.debug("Emitting event %s for resource %s", event, resource)
method[event](context, payload)

View File

@ -263,11 +263,14 @@ class TestTrunkHandler(base.BaseTestCase):
status=trunk_consts.TRUNK_ACTIVE_STATUS)
def _fake_trunk_event_payload(self):
original_trunk = mock.Mock()
original_trunk.port_id = 'original_trunk_port_id'
current_trunk = mock.Mock()
current_trunk.port_id = 'current_trunk_port_id'
payload = mock.Mock()
payload.current_trunk = mock.Mock()
payload.current_trunk.port_id = 'current_trunk_port_id'
payload.original_trunk = mock.Mock()
payload.original_trunk.port_id = 'original_trunk_port_id'
payload.states = (original_trunk, current_trunk)
current_subport = mock.Mock()
current_subport.segmentation_id = 40
current_subport.trunk_id = 'current_trunk_port_id'
@ -276,8 +279,9 @@ class TestTrunkHandler(base.BaseTestCase):
original_subport.segmentation_id = 41
original_subport.trunk_id = 'original_trunk_port_id'
original_subport.port_id = 'original_subport_port_id'
payload.current_trunk.sub_ports = [current_subport]
payload.original_trunk.sub_ports = [original_subport]
current_trunk.sub_ports = [current_subport]
original_trunk.sub_ports = [original_subport]
return payload
@mock.patch.object(trunk_driver.OVNTrunkHandler, '_set_sub_ports')
@ -286,9 +290,9 @@ class TestTrunkHandler(base.BaseTestCase):
self.handler.trunk_event(
mock.ANY, events.AFTER_CREATE, mock.ANY, fake_payload)
set_subports.assert_called_once_with(
fake_payload.current_trunk.port_id,
fake_payload.current_trunk.sub_ports)
fake_payload.current_trunk.update.assert_called_once_with(
fake_payload.states[0].port_id,
fake_payload.states[0].sub_ports)
fake_payload.states[0].update.assert_called_once_with(
status=trunk_consts.TRUNK_ACTIVE_STATUS)
@mock.patch.object(trunk_driver.OVNTrunkHandler, '_unset_sub_ports')
@ -297,7 +301,7 @@ class TestTrunkHandler(base.BaseTestCase):
self.handler.trunk_event(
mock.ANY, events.AFTER_DELETE, mock.ANY, fake_payload)
unset_subports.assert_called_once_with(
fake_payload.original_trunk.sub_ports)
fake_payload.states[0].sub_ports)
@mock.patch.object(trunk_driver.OVNTrunkHandler, '_set_sub_ports')
@mock.patch.object(trunk_driver.OVNTrunkHandler, '_unset_sub_ports')
@ -309,14 +313,18 @@ class TestTrunkHandler(base.BaseTestCase):
unset_subports.assert_not_called()
def _fake_subport_event_payload(self):
original_trunk = mock.Mock()
original_trunk.port_id = 'original_trunk_port_id'
payload = mock.Mock()
payload.original_trunk = mock.Mock()
payload.original_trunk.port_id = 'original_trunk_port_id'
payload.states = (original_trunk,)
original_subport = mock.Mock()
original_subport.segmentation_id = 41
original_subport.trunk_id = 'original_trunk_port_id'
original_subport.port_id = 'original_subport_port_id'
payload.subports = [original_subport]
payload.metadata = {'subports': [original_subport]}
return payload
@mock.patch.object(trunk_driver.OVNTrunkHandler, 'subports_added')
@ -325,7 +333,7 @@ class TestTrunkHandler(base.BaseTestCase):
self.handler.subport_event(
mock.ANY, events.AFTER_CREATE, mock.ANY, fake_payload)
s_added.assert_called_once_with(
fake_payload.original_trunk, fake_payload.subports)
fake_payload.states[0], fake_payload.metadata['subports'])
@mock.patch.object(trunk_driver.OVNTrunkHandler, 'subports_deleted')
def test_subport_event_delete(self, s_deleted):
@ -333,7 +341,7 @@ class TestTrunkHandler(base.BaseTestCase):
self.handler.subport_event(
mock.ANY, events.AFTER_DELETE, mock.ANY, fake_payload)
s_deleted.assert_called_once_with(
fake_payload.original_trunk, fake_payload.subports)
fake_payload.states[0], fake_payload.metadata['subports'])
@mock.patch.object(trunk_driver.OVNTrunkHandler, 'subports_added')
@mock.patch.object(trunk_driver.OVNTrunkHandler, 'subports_deleted')

View File

@ -18,7 +18,6 @@ from neutron_lib.callbacks import resources
from neutron_lib import fixture
from neutron.api.rpc.callbacks import resource_manager
from neutron.services.trunk import callbacks
from neutron.services.trunk.rpc import backend
from neutron.tests import base
from neutron.tests import tools
@ -39,12 +38,12 @@ class ServerSideRpcBackendTest(base.BaseTestCase):
calls = [mock.call(
*tools.get_subscribe_args(
test_obj.process_event,
test_obj.process_trunk_payload_event,
resources.TRUNK,
events.AFTER_CREATE)),
mock.call(
*tools.get_subscribe_args(
test_obj.process_event,
test_obj.process_trunk_payload_event,
resources.TRUNK,
events.AFTER_DELETE)),
mock.call(
@ -65,19 +64,20 @@ class ServerSideRpcBackendTest(base.BaseTestCase):
test_obj._stub = mock_stub = mock.Mock()
trunk_plugin = mock.Mock()
test_obj.process_event(
test_obj.process_trunk_payload_event(
resources.TRUNK, events.AFTER_CREATE, trunk_plugin,
callbacks.TrunkPayload("context",
"id",
current_trunk="current_trunk"))
test_obj.process_event(
events.DBEventPayload("context",
resource_id="id",
states=("current_trunk",)))
test_obj.process_trunk_payload_event(
resources.TRUNK, events.AFTER_DELETE, trunk_plugin,
callbacks.TrunkPayload("context",
"id",
original_trunk="original_trunk"))
events.DBEventPayload("context",
resource_id="id",
states=("original_trunk",)))
calls = [mock.call.trunk_created("context",
"current_trunk"),
"current_trunk"),
mock.call.trunk_deleted("context",
"original_trunk")]
"original_trunk")]
mock_stub.assert_has_calls(calls, any_order=False)

View File

@ -24,7 +24,6 @@ from neutron_lib.services.trunk import constants
import testtools
from neutron.objects import trunk as trunk_objects
from neutron.services.trunk import callbacks
from neutron.services.trunk import drivers
from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk import plugin as trunk_plugin
@ -116,10 +115,12 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
callback = register_mock_callback(resources.TRUNK, event)
trunk = self._create_test_trunk(parent_port)
trunk_obj = self._get_trunk_obj(trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
current_trunk=trunk_obj)
callback.assert_called_once_with(
resources.TRUNK, event, self.trunk_plugin, payload=payload)
resources.TRUNK, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(self.context, payload.context)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(trunk['id'], payload.resource_id)
def test_create_trunk_notify_after_create(self):
self._test_trunk_create_notify(events.AFTER_CREATE)
@ -136,11 +137,13 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
self.trunk_plugin.update_trunk(self.context, trunk['id'],
trunk_req)
trunk_obj = self._get_trunk_obj(trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
original_trunk=orig_trunk_obj,
current_trunk=trunk_obj)
callback.assert_called_once_with(
resources.TRUNK, event, self.trunk_plugin, payload=payload)
resources.TRUNK, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(self.context, payload.context)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(trunk['id'], payload.resource_id)
self.assertEqual(orig_trunk_obj, payload.states[0])
def test_trunk_update_notify_after_update(self):
self._test_trunk_update_notify(events.AFTER_UPDATE)
@ -170,13 +173,29 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
trunk = self._create_test_trunk(parent_port)
trunk_obj = self._get_trunk_obj(trunk['id'])
self.trunk_plugin.delete_trunk(self.context, trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
original_trunk=trunk_obj)
callback.assert_called_once_with(
resources.TRUNK, event, self.trunk_plugin, payload=payload)
resources.TRUNK, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(self.context, payload.context)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(trunk['id'], payload.resource_id)
def test_delete_trunk_notify_after_delete(self):
self._test_trunk_delete_notify(events.AFTER_DELETE)
# TODO(boden): refactor into common method once all use payloads
with self.port() as parent_port:
callback = register_mock_callback(resources.TRUNK,
events.AFTER_DELETE)
trunk = self._create_test_trunk(parent_port)
trunk_obj = self._get_trunk_obj(trunk['id'])
self.trunk_plugin.delete_trunk(self.context, trunk['id'])
callback.assert_called_once_with(
resources.TRUNK, events.AFTER_DELETE,
self.trunk_plugin, payload=mock.ANY)
call_payload = callback.mock_calls[0][2]['payload']
self.assertEqual(trunk['id'], call_payload.resource_id)
self.assertEqual((trunk_obj,), call_payload.states)
def test_delete_trunk_notify_precommit_delete(self):
self._test_trunk_delete_notify(events.PRECOMMIT_DELETE)
@ -218,12 +237,13 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
self.context, trunk['id'], {'sub_ports': [subport]})
trunk_obj = self._get_trunk_obj(trunk['id'])
subport_obj = self._get_subport_obj(subport['port_id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
current_trunk=trunk_obj,
original_trunk=orig_trunk_obj,
subports=[subport_obj])
callback.assert_called_once_with(
resources.SUBPORTS, event, self.trunk_plugin, payload=payload)
resources.SUBPORTS, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(trunk['id'], payload.resource_id)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(orig_trunk_obj, payload.states[0])
self.assertEqual([subport_obj], payload.metadata['subports'])
def test_add_subports_notify_after_create(self):
self._test_add_subports_notify(events.AFTER_CREATE)
@ -241,12 +261,13 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
self.trunk_plugin.remove_subports(
self.context, trunk['id'], {'sub_ports': [subport]})
trunk_obj = self._get_trunk_obj(trunk['id'])
payload = callbacks.TrunkPayload(self.context, trunk['id'],
current_trunk=trunk_obj,
original_trunk=orig_trunk_obj,
subports=[subport_obj])
callback.assert_called_once_with(
resources.SUBPORTS, event, self.trunk_plugin, payload=payload)
resources.SUBPORTS, event, self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(trunk['id'], payload.resource_id)
self.assertEqual(trunk_obj, payload.latest_state)
self.assertEqual(orig_trunk_obj, payload.states[0])
self.assertEqual([subport_obj], payload.metadata['subports'])
def test_remove_subports_notify_after_delete(self):
self._test_remove_subports_notify(events.AFTER_DELETE)
@ -304,12 +325,14 @@ class TrunkPluginTestCase(test_plugin.Ml2PluginV2TestCase):
parent, original_port,
constants.TRUNK_ACTIVE_STATUS,
constants.TRUNK_DOWN_STATUS))
payload = callbacks.TrunkPayload(self.context, original_trunk['id'],
original_trunk=original_trunk,
current_trunk=current_trunk)
callback.assert_called_once_with(
resources.TRUNK, events.AFTER_UPDATE,
self.trunk_plugin, payload=payload)
self.trunk_plugin, payload=mock.ANY)
payload = callback.mock_calls[0][2]['payload']
self.assertEqual(self.context, payload.context)
self.assertEqual(current_trunk, payload.latest_state)
self.assertEqual(original_trunk['id'], payload.resource_id)
self.assertEqual(original_trunk, payload.states[0])
def test__trigger_trunk_status_change_vif_type_unchanged(self):
with self.port() as parent: