From 86d5f8e1a3697e26fd490c07b8558bd20bdeadce Mon Sep 17 00:00:00 2001 From: Ryan Tidwell Date: Wed, 13 Jul 2016 14:10:43 -0700 Subject: [PATCH] Add RPC layer for Trunk Plugin and driver plumbing This patch introduces the RPC backbone required for the trunk plugin to work with agent-based L2 drivers (like Open vSwitch); to this aim it uses the RPC callback registry to send/receive trunk and subports OVOs over the wire. More patches will follow up to complete the RPC integration; some stuff is still work in progress, but there is enough substance that make this patch worth it. Partially-implements: blueprint vlan-aware-vms Co-Authored-By: Ryan Tidwell Co-Authored-By: Armando Migliaccio Co-Authored-By: Adolfo Duarte Change-Id: I3c749e9287cc778e12d3e022ddfd157ac9c1569b --- neutron/api/rpc/callbacks/resources.py | 9 ++ neutron/services/trunk/drivers/base.py | 11 +- neutron/services/trunk/plugin.py | 7 + neutron/services/trunk/rpc/__init__.py | 0 neutron/services/trunk/rpc/agent.py | 101 ++++++++++++ neutron/services/trunk/rpc/backend.py | 63 ++++++++ neutron/services/trunk/rpc/constants.py | 15 ++ neutron/services/trunk/rpc/server.py | 133 ++++++++++++++++ .../tests/unit/services/trunk/rpc/__init__.py | 0 .../unit/services/trunk/rpc/test_agent.py | 48 ++++++ .../unit/services/trunk/rpc/test_backend.py | 43 +++++ .../unit/services/trunk/rpc/test_server.py | 148 ++++++++++++++++++ 12 files changed, 577 insertions(+), 1 deletion(-) create mode 100644 neutron/services/trunk/rpc/__init__.py create mode 100644 neutron/services/trunk/rpc/agent.py create mode 100644 neutron/services/trunk/rpc/backend.py create mode 100644 neutron/services/trunk/rpc/constants.py create mode 100644 neutron/services/trunk/rpc/server.py create mode 100644 neutron/tests/unit/services/trunk/rpc/__init__.py create mode 100644 neutron/tests/unit/services/trunk/rpc/test_agent.py create mode 100644 neutron/tests/unit/services/trunk/rpc/test_backend.py create mode 100644 neutron/tests/unit/services/trunk/rpc/test_server.py diff --git a/neutron/api/rpc/callbacks/resources.py b/neutron/api/rpc/callbacks/resources.py index 552f026d05c..7e5d79b5504 100644 --- a/neutron/api/rpc/callbacks/resources.py +++ b/neutron/api/rpc/callbacks/resources.py @@ -11,23 +11,32 @@ # under the License. from neutron.objects.qos import policy +from neutron.objects import trunk +_TRUNK_CLS = trunk.Trunk _QOS_POLICY_CLS = policy.QosPolicy +_SUBPORT_CLS = trunk.SubPort _VALID_CLS = ( + _TRUNK_CLS, _QOS_POLICY_CLS, + _SUBPORT_CLS, ) _VALID_TYPES = [cls.obj_name() for cls in _VALID_CLS] # Supported types +TRUNK = _TRUNK_CLS.obj_name() QOS_POLICY = _QOS_POLICY_CLS.obj_name() +SUBPORT = _SUBPORT_CLS.obj_name() _TYPE_TO_CLS_MAP = { + TRUNK: _TRUNK_CLS, QOS_POLICY: _QOS_POLICY_CLS, + SUBPORT: _SUBPORT_CLS, } LOCAL_RESOURCE_VERSIONS = { diff --git a/neutron/services/trunk/drivers/base.py b/neutron/services/trunk/drivers/base.py index 896c7e32638..a5bfe08feac 100644 --- a/neutron/services/trunk/drivers/base.py +++ b/neutron/services/trunk/drivers/base.py @@ -18,6 +18,7 @@ import abc from neutron.callbacks import events from neutron.callbacks import registry from neutron.services.trunk import constants as trunk_consts +from neutron.services.trunk.rpc import backend class DriverBase(object): @@ -65,6 +66,14 @@ class DriverBase(object): External drivers must subscribe to the AFTER_INIT event for the trunk plugin so that they can integrate without an explicit register() method invocation. + + :param resource: neutron.services.trunk.constants.TRUNK_PLUGIN + :param event: neutron.callbacks.events.AFTER_INIT + :param trigger: neutron.service.trunks.plugin.TrunkPlugin """ - # Advertise yourself! + trigger.register_driver(self) + # Set up the server-side RPC backend if the driver is loaded, + # it is agent based, and the RPC backend is not already initialized. + if self.is_loaded and self.agent_type and not trigger.is_rpc_enabled(): + trigger.set_rpc_backend(backend.ServerSideRpcBackend()) diff --git a/neutron/services/trunk/plugin.py b/neutron/services/trunk/plugin.py index 7791c7415de..9a63428e7ee 100644 --- a/neutron/services/trunk/plugin.py +++ b/neutron/services/trunk/plugin.py @@ -62,6 +62,7 @@ class TrunkPlugin(service_base.ServicePluginBase, def __init__(self): db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs( attributes.PORTS, [_extend_port_trunk_details]) + self._rpc_backend = None self._drivers = [] self._segmentation_types = {} self._interfaces = set() @@ -79,6 +80,12 @@ class TrunkPlugin(service_base.ServicePluginBase, if not any([driver.is_loaded for driver in self._drivers]): raise trunk_exc.IncompatibleTrunkPluginConfiguration() + def set_rpc_backend(self, backend): + self._rpc_backend = backend + + def is_rpc_enabled(self): + return self._rpc_backend is not None + def register_driver(self, driver): """Register driver with trunk plugin.""" if driver.agent_type: diff --git a/neutron/services/trunk/rpc/__init__.py b/neutron/services/trunk/rpc/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/services/trunk/rpc/agent.py b/neutron/services/trunk/rpc/agent.py new file mode 100644 index 00000000000..19348712101 --- /dev/null +++ b/neutron/services/trunk/rpc/agent.py @@ -0,0 +1,101 @@ +# Copyright 2016 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +import oslo_messaging + +from neutron.api.rpc.callbacks.consumer import registry +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron.common import rpc as n_rpc +from neutron.services.trunk.rpc import constants as trunk_consts + +# This module contains stub (client-side) and skeleton (server-side) +# proxy code that executes in the Neutron L2 Agent process space. This +# is needed if trunk service plugin drivers have a remote component +# (e.g. agent), that needs to communicate with the Neutron Server. + +# The Agent side exposes the following remote methods: +# +# - update methods to learn about a trunk and its subports: these +# methods are used by the server to tell the agent about trunk +# updates; agents may selectively choose to listen to either +# trunk or subports updates or both. +# +# For server-side stub and skeleton proxy code, please look at server.py + + +class TrunkSkeleton(object): + """Skeleton proxy code for server->agent communication.""" + + def __init__(self): + registry.subscribe(self.handle_trunks, resources.TRUNK) + registry.subscribe(self.handle_subports, resources.SUBPORT) + + self._connection = n_rpc.create_connection() + endpoints = [resources_rpc.ResourcesPushRpcCallback()] + topic = resources_rpc.resource_type_versioned_topic(resources.SUBPORT) + self._connection.create_consumer(topic, endpoints, fanout=True) + topic = resources_rpc.resource_type_versioned_topic(resources.TRUNK) + self._connection.create_consumer(topic, endpoints, fanout=True) + self._connection.consume_in_threads() + + @abc.abstractmethod + def handle_trunks(self, trunks, event_type): + """Handle trunk events.""" + # if common logic may be extracted out, consider making a base + # version of this method that can be overidden by the inherited + # skeleton. + # NOTE: If trunk is not managed by the agent, the notification can + # either be ignored or cached for future use. + + @abc.abstractmethod + def handle_subports(self, subports, event_type): + """Handle subports event.""" + # if common logic may be extracted out, consider making a base + # version of this method that can be overidden by the inherited + # skeleton. + # NOTE: If the subport belongs to a trunk which the agent does not + # manage, the notification should be ignored. + + +class TrunkStub(object): + """Stub proxy code for agent->server communication.""" + # API HISTORY + # 1.0 - initial version + VERSION = '1.0' + + def __init__(self): + self.stub = resources_rpc.ResourcesPullRpcApi() + target = oslo_messaging.Target( + topic=trunk_consts.TRUNK_BASE_TOPIC, + version=self.VERSION, + namespace=trunk_consts.TRUNK_BASE_NAMESPACE) + self.rpc_client = n_rpc.get_client(target) + + def get_trunk_details(self, context, parent_port_id): + """Get information about the trunk for the given parent port.""" + return self.stub.pull(context, resources.TRUNK, parent_port_id) + + def update_trunk_status(self, context, trunk_id, status): + """Update the trunk status to reflect outcome of data plane wiring.""" + return self.rpc_client.prepare().call( + context, 'update_trunk_status', + trunk_id=trunk_id, status=status) + + def update_subport_bindings(self, context, subports): + """Update subport bindings to match parent port host binding.""" + return self.rpc_client.prepare().call( + context, 'update_subport_bindings', subports=subports) diff --git a/neutron/services/trunk/rpc/backend.py b/neutron/services/trunk/rpc/backend.py new file mode 100644 index 00000000000..7ad70653a3f --- /dev/null +++ b/neutron/services/trunk/rpc/backend.py @@ -0,0 +1,63 @@ +# Copyright 2016 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.services.trunk import constants as trunk_consts +from neutron.services.trunk.rpc import server + +LOG = logging.getLogger(__name__) + + +class ServerSideRpcBackend(object): + """The Neutron Server RPC backend.""" + + def __init__(self): + """Initialize an RPC backend for the Neutron Server.""" + self._skeleton = server.TrunkSkeleton() + self._stub = server.TrunkStub() + + # Set up listeners to trunk events: they dispatch RPC messages + # to agents as needed. These are designed to work with any + # agent-based driver that may integrate with the trunk service + # plugin, e.g. linux bridge or ovs. + for event in (events.AFTER_CREATE, events.AFTER_DELETE): + registry.subscribe(self.process_event, + trunk_consts.TRUNK, + event) + registry.subscribe(self.process_event, + trunk_consts.SUBPORTS, + event) + LOG.debug("RPC backend initialized for trunk plugin") + + def process_event(self, resource, event, trunk_plugin, payload): + """Emit RPC notifications to registered subscribers.""" + context = payload.context + LOG.debug("RPC notification needed for trunk %s", payload.trunk_id) + if resource == trunk_consts.SUBPORTS: + payload = payload.subports + method = { + events.AFTER_CREATE: self._stub.subports_added, + events.AFTER_DELETE: self._stub.subports_deleted, + } + elif resource == trunk_consts.TRUNK: + payload = payload.current_trunk + method = { + events.AFTER_CREATE: self._stub.trunk_created, + events.AFTER_DELETE: self._stub.trunk_deleted, + } + LOG.debug("Emitting event %s for resource %s", event, resource) + method[event](context, payload) diff --git a/neutron/services/trunk/rpc/constants.py b/neutron/services/trunk/rpc/constants.py new file mode 100644 index 00000000000..699d6d9e3b6 --- /dev/null +++ b/neutron/services/trunk/rpc/constants.py @@ -0,0 +1,15 @@ +# Copyright 2016 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +TRUNK_BASE_TOPIC = 'trunk' +TRUNK_BASE_NAMESPACE = 'trunk' diff --git a/neutron/services/trunk/rpc/server.py b/neutron/services/trunk/rpc/server.py new file mode 100644 index 00000000000..1ddb13be04d --- /dev/null +++ b/neutron/services/trunk/rpc/server.py @@ -0,0 +1,133 @@ +# Copyright 2016 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections + +from oslo_log import log as logging +import oslo_messaging + +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks.producer import registry +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron.common import rpc as n_rpc +from neutron.db import api as db_api +from neutron.extensions import portbindings +from neutron import manager +from neutron.objects import trunk as trunk_objects +from neutron.services.trunk.rpc import constants + +LOG = logging.getLogger(__name__) + +# This module contains stub (client-side) and skeleton (server-side) +# proxy code that executes in the Neutron server process space. This +# is needed if any of the trunk service plugin drivers has a remote +# component (e.g. agent), that needs to communicate with the Neutron +# Server. + +# The Server side exposes the following remote methods: +# +# - lookup method to retrieve trunk details: used by the agent to learn +# about the trunk. +# - update methods for trunk and its subports: used by the agent to +# inform the server about local trunk status changes. +# +# For agent-side stub and skeleton proxy code, please look at agent.py + + +def trunk_by_port_provider(resource, port_id, context, **kwargs): + """Provider callback to supply trunk information by parent port.""" + return trunk_objects.Trunk.get_object(context, port_id=port_id) + + +class TrunkSkeleton(object): + """Skeleton proxy code for agent->server communication.""" + + # API version history: + # 1.0 Initial version + target = oslo_messaging.Target(version='1.0', + namespace=constants.TRUNK_BASE_NAMESPACE) + + _core_plugin = None + _trunk_plugin = None + + def __init__(self): + # Used to provide trunk lookups for the agent. + registry.provide(trunk_by_port_provider, resources.TRUNK) + self._connection = n_rpc.create_connection() + self._connection.create_consumer( + constants.TRUNK_BASE_TOPIC, [self], fanout=False) + self._connection.consume_in_threads() + + @property + def core_plugin(self): + # TODO(armax): consider getting rid of this property if we + # can get access to the Port object + if not self._core_plugin: + self._core_plugin = manager.NeutronManager.get_plugin() + return self._core_plugin + + def update_subport_bindings(self, context, subports): + """Update subport bindings to match trunk host binding.""" + el = context.elevated() + ports_by_trunk_id = collections.defaultdict(list) + updated_ports = collections.defaultdict(list) + for s in subports: + ports_by_trunk_id[s['trunk_id']].append(s['port_id']) + for trunk_id, subport_ids in ports_by_trunk_id.items(): + trunk = trunk_objects.Trunk.get_object(el, id=trunk_id) + if not trunk: + LOG.debug("Trunk not found. id: %s", trunk_id) + continue + trunk_port_id = trunk.port_id + trunk_port = self.core_plugin.get_port(el, trunk_port_id) + trunk_host = trunk_port.get(portbindings.HOST_ID) + for port_id in subport_ids: + updated_port = self.core_plugin.update_port( + el, port_id, {'port': {portbindings.HOST_ID: trunk_host}}) + # NOTE(fitoduarte): consider trimming down the content + # of the port data structure. + updated_ports[trunk_id].append(updated_port) + return updated_ports + + def update_trunk_status(self, context, trunk_id, status): + """Update the trunk status to reflect outcome of data plane wiring.""" + with db_api.autonested_transaction(context.session): + trunk = trunk_objects.Trunk.get_object(context, id=trunk_id) + if trunk: + trunk.status = status + trunk.update() + + +class TrunkStub(object): + """Stub proxy code for server->agent communication.""" + + def __init__(self): + self._resource_rpc = resources_rpc.ResourcesPushRpcApi() + + def trunk_created(self, context, trunk): + """Tell the agent about a trunk being created.""" + self._resource_rpc.push(context, [trunk], events.CREATED) + + def trunk_deleted(self, context, trunk): + """Tell the agent about a trunk being deleted.""" + self._resource_rpc.push(context, [trunk], events.DELETED) + + def subports_added(self, context, subports): + """Tell the agent about new subports to add.""" + self._resource_rpc.push(context, subports, events.CREATED) + + def subports_deleted(self, context, subports): + """Tell the agent about existing subports to remove.""" + self._resource_rpc.push(context, subports, events.DELETED) diff --git a/neutron/tests/unit/services/trunk/rpc/__init__.py b/neutron/tests/unit/services/trunk/rpc/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/unit/services/trunk/rpc/test_agent.py b/neutron/tests/unit/services/trunk/rpc/test_agent.py new file mode 100644 index 00000000000..8ed87c2ce65 --- /dev/null +++ b/neutron/tests/unit/services/trunk/rpc/test_agent.py @@ -0,0 +1,48 @@ +# Copyright 2016 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo_config import cfg +import oslo_messaging + +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron.services.trunk.rpc import agent +from neutron.tests import base + + +class TrunkSkeletonTest(base.BaseTestCase): + # TODO(fitoduarte): add more test to improve coverage of module + @mock.patch("neutron.api.rpc.callbacks.resource_manager." + "ConsumerResourceCallbacksManager.register") + @mock.patch("neutron.common.rpc.get_server") + def test___init__(self, mocked_get_server, mocked_register): + test_obj = agent.TrunkSkeleton() + self.assertEqual(2, mocked_register.call_count) + calls = [mock.call(test_obj.handle_trunks, resources.TRUNK), + mock.call(test_obj.handle_subports, resources.SUBPORT)] + mocked_register.assert_has_calls(calls, any_order=True) + + # Test to see if the call to rpc.get_server has the correct + # target and the correct endpoints + topic = resources_rpc.resource_type_versioned_topic(resources.SUBPORT) + subport_target = oslo_messaging.Target( + topic=topic, server=cfg.CONF.host, fanout=True) + topic = resources_rpc.resource_type_versioned_topic(resources.TRUNK) + trunk_target = oslo_messaging.Target( + topic=topic, server=cfg.CONF.host, fanout=True) + calls = [mock.call(subport_target, mock.ANY), + mock.call(trunk_target, mock.ANY)] + mocked_get_server.assert_has_calls(calls, any_order=True) + self.assertIn("ResourcesPushRpcCallback", + str(mocked_get_server.call_args_list)) diff --git a/neutron/tests/unit/services/trunk/rpc/test_backend.py b/neutron/tests/unit/services/trunk/rpc/test_backend.py new file mode 100644 index 00000000000..e82fe8991c1 --- /dev/null +++ b/neutron/tests/unit/services/trunk/rpc/test_backend.py @@ -0,0 +1,43 @@ +# Copyright 2016 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.callbacks import events +from neutron.services.trunk import constants as trunk_consts +from neutron.services.trunk.rpc import backend +from neutron.tests import base + + +class ServerSideRpcBackendTest(base.BaseTestCase): + # TODO(fitoduarte): add more test to improve coverage of module + @mock.patch("neutron.api.rpc.callbacks.resource_manager." + "ResourceCallbacksManager.register") + @mock.patch("neutron.callbacks.manager.CallbacksManager.subscribe") + def test___init__(self, mocked_subscribe, mocked_register): + test_obj = backend.ServerSideRpcBackend() + + calls = [mock.call(test_obj.process_event, + trunk_consts.TRUNK, + events.AFTER_CREATE), + mock.call(test_obj.process_event, + trunk_consts.TRUNK, + events.AFTER_DELETE), + mock.call(test_obj.process_event, + trunk_consts.SUBPORTS, + events.AFTER_CREATE), + mock.call(test_obj.process_event, + trunk_consts.SUBPORTS, + events.AFTER_DELETE) + ] + mocked_subscribe.assert_has_calls(calls, any_order=True) diff --git a/neutron/tests/unit/services/trunk/rpc/test_server.py b/neutron/tests/unit/services/trunk/rpc/test_server.py new file mode 100644 index 00000000000..8bd2e01a9d8 --- /dev/null +++ b/neutron/tests/unit/services/trunk/rpc/test_server.py @@ -0,0 +1,148 @@ +# Copyright 2016 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo_config import cfg +import oslo_messaging + +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron.extensions import portbindings +from neutron import manager +from neutron.objects import trunk as trunk_obj +from neutron.services.trunk import constants +from neutron.services.trunk import drivers +from neutron.services.trunk import plugin as trunk_plugin +from neutron.services.trunk.rpc import constants as rpc_consts +from neutron.services.trunk.rpc import server +from neutron.tests import base +from neutron.tests.unit.plugins.ml2 import test_plugin + + +class TrunkSkeletonTest(test_plugin.Ml2PluginV2TestCase): + def setUp(self): + super(TrunkSkeletonTest, self).setUp() + self.drivers_patch = mock.patch.object(drivers, 'register').start() + self.compat_patch = mock.patch.object( + trunk_plugin.TrunkPlugin, 'check_compatibility').start() + self.trunk_plugin = trunk_plugin.TrunkPlugin() + self.trunk_plugin.add_segmentation_type('vlan', lambda x: True) + self.core_plugin = manager.NeutronManager.get_plugin() + + def _create_test_trunk(self, port, subports=None): + subports = subports if subports else [] + trunk = {'port_id': port['port']['id'], + 'tenant_id': 'test_tenant', + 'sub_ports': subports + } + response = ( + self.trunk_plugin.create_trunk(self.context, {'trunk': trunk})) + return response + + @mock.patch("neutron.api.rpc.callbacks.resource_manager." + "ResourceCallbacksManager.register") + @mock.patch("neutron.common.rpc.get_server") + def test___init__(self, mocked_get_server, mocked_registered): + test_obj = server.TrunkSkeleton() + mocked_registered.assert_called_with(server.trunk_by_port_provider, + resources.TRUNK) + trunk_target = oslo_messaging.Target(topic=rpc_consts.TRUNK_BASE_TOPIC, + server=cfg.CONF.host, + fanout=False) + mocked_get_server.assert_called_with(trunk_target, [test_obj]) + + def test_update_subport_bindings(self): + with self.port() as _parent_port: + parent_port = _parent_port + trunk = self._create_test_trunk(parent_port) + port_data = {portbindings.HOST_ID: 'trunk_host_id'} + self.core_plugin.update_port( + self.context, parent_port['port']['id'], {'port': port_data}) + subports = [] + for vid in range(0, 3): + with self.port() as new_port: + obj = trunk_obj.SubPort( + context=self.context, + trunk_id=trunk['id'], + port_id=new_port['port']['id'], + segmentation_type='vlan', + segmentation_id=vid) + subports.append(obj) + + test_obj = server.TrunkSkeleton() + test_obj._trunk_plugin = self.trunk_plugin + test_obj._core_plugin = self.core_plugin + updated_subports = test_obj.update_subport_bindings(self.context, + subports=subports) + self.assertIn(trunk['id'], updated_subports) + for port in updated_subports[trunk['id']]: + self.assertEqual('trunk_host_id', port[portbindings.HOST_ID]) + + @mock.patch('neutron.api.rpc.callbacks.producer.registry.provide') + def test_update_trunk_status(self, _): + with self.port() as _parent_port: + parent_port = _parent_port + trunk = self._create_test_trunk(parent_port) + trunk_id = trunk['id'] + + test_obj = server.TrunkSkeleton() + test_obj._trunk_plugin = self.trunk_plugin + self.assertEqual(constants.PENDING_STATUS, trunk['status']) + test_obj.update_trunk_status(self.context, + trunk_id, + constants.ACTIVE_STATUS) + updated_trunk = self.trunk_plugin.get_trunk(self.context, trunk_id) + self.assertEqual(constants.ACTIVE_STATUS, updated_trunk['status']) + + +class TrunkStubTest(base.BaseTestCase): + def setUp(self): + super(TrunkStubTest, self).setUp() + self.test_obj = server.TrunkStub() + + def test___init__(self): + self.assertIsInstance(self.test_obj._resource_rpc, + resources_rpc.ResourcesPushRpcApi) + + @mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi." + "push") + def test_trunk_created(self, mocked_push): + m_context = mock.Mock() + m_trunk = mock.Mock() + self.test_obj.trunk_created(m_context, m_trunk) + mocked_push.assert_called_with(m_context, [m_trunk], events.CREATED) + + @mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi." + "push") + def test_trunk_deleted(self, mocked_push): + m_context = mock.Mock() + m_trunk = mock.Mock() + self.test_obj.trunk_deleted(m_context, m_trunk) + mocked_push.assert_called_with(m_context, [m_trunk], events.DELETED) + + @mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi." + "push") + def test_subports_added(self, mocked_push): + m_context = mock.Mock() + m_subports = mock.Mock() + self.test_obj.subports_added(m_context, m_subports) + mocked_push.assert_called_with(m_context, m_subports, events.CREATED) + + @mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi." + "push") + def test_subports_deleted(self, mocked_push): + m_context = mock.Mock() + m_subports = mock.Mock() + self.test_obj.subports_deleted(m_context, m_subports) + mocked_push.assert_called_with(m_context, m_subports, events.DELETED)