diff --git a/neutron/plugins/ml2/drivers/agent/_common_agent.py b/neutron/plugins/ml2/drivers/agent/_common_agent.py index 105f9cd10a4..340746dd3d7 100644 --- a/neutron/plugins/ml2/drivers/agent/_common_agent.py +++ b/neutron/plugins/ml2/drivers/agent/_common_agent.py @@ -40,6 +40,7 @@ from neutron.common import constants as n_const from neutron.common import topics from neutron import context from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb +from neutron.plugins.ml2.drivers.agent import capabilities from neutron.plugins.ml2.drivers.agent import config as cagt_config # noqa LOG = logging.getLogger(__name__) @@ -105,6 +106,7 @@ class CommonAgentLoop(service.Service): self._report_state) heartbeat.start(interval=report_interval) + capabilities.notify_init_event(self.agent_type, self) # The initialization is complete; we can start receiving messages self.connection.consume_in_threads() diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_capabilities.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_capabilities.py new file mode 100644 index 00000000000..aac3c6b1c92 --- /dev/null +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_capabilities.py @@ -0,0 +1,25 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from neutron_lib import constants + +from neutron.plugins.ml2.drivers.agent import capabilities +from neutron.services.trunk.drivers.linuxbridge.agent import driver + + +def register(): + """Register Linux Bridge capabilities.""" + # Add capabilities to be loaded during agent initialization + capabilities.register(driver.init_handler, + constants.AGENT_TYPE_LINUXBRIDGE) diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py index aa65eec6b40..47ac0aead9d 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -53,6 +53,8 @@ from neutron.plugins.ml2.drivers.linuxbridge.agent.common \ import constants as lconst from neutron.plugins.ml2.drivers.linuxbridge.agent.common \ import utils as lb_utils +from neutron.plugins.ml2.drivers.linuxbridge.agent \ + import linuxbridge_capabilities LOG = logging.getLogger(__name__) @@ -927,6 +929,7 @@ def main(): LOG.info(_LI("Bridge mappings: %s"), bridge_mappings) manager = LinuxBridgeManager(bridge_mappings, interface_mappings) + linuxbridge_capabilities.register() polling_interval = cfg.CONF.AGENT.polling_interval quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout diff --git a/neutron/services/trunk/drivers/linuxbridge/agent/driver.py b/neutron/services/trunk/drivers/linuxbridge/agent/driver.py new file mode 100644 index 00000000000..0e16bf05591 --- /dev/null +++ b/neutron/services/trunk/drivers/linuxbridge/agent/driver.py @@ -0,0 +1,209 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging +import oslo_messaging + +from neutron._i18n import _LE +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.handlers import resources_rpc +from neutron.callbacks import events as local_events +from neutron.callbacks import registry +from neutron.callbacks import resources as local_resources +from neutron import context as n_ctx +from neutron.services.trunk import constants as t_const +from neutron.services.trunk.drivers.linuxbridge.agent import trunk_plumber +from neutron.services.trunk.rpc import agent as trunk_rpc + + +LOG = logging.getLogger(__name__) + + +def init_handler(resource, event, trigger, agent=None): + """Handler for agent init event.""" + if agent: + LinuxBridgeTrunkDriver() + + +class LinuxBridgeTrunkDriver(trunk_rpc.TrunkSkeleton): + """Driver responsible for handling trunk/subport/port events. + + Receives data model events from the server and VIF events + from the agent and uses these to drive a Plumber instance + to wire up VLAN subinterfaces for any trunks. + """ + + def __init__(self, plumber=None, trunk_api=None): + self._plumber = plumber or trunk_plumber.Plumber() + self._tapi = trunk_api or _TrunkAPI(trunk_rpc.TrunkStub()) + registry.subscribe(self.agent_port_change, + local_resources.PORT_DEVICE, + local_events.AFTER_UPDATE) + registry.subscribe(self.agent_port_delete, + local_resources.PORT_DEVICE, + local_events.AFTER_DELETE) + super(LinuxBridgeTrunkDriver, self).__init__() + + def handle_trunks(self, trunks, event_type): + """Trunk data model change from the server.""" + context = n_ctx.get_admin_context() + for trunk in trunks: + if event_type in (events.UPDATED, events.CREATED): + self._tapi.put_trunk(trunk.port_id, trunk) + self.wire_trunk(context, trunk) + elif event_type == events.DELETED: + self._tapi.put_trunk(trunk.port_id, None) + self._plumber.delete_trunk_subports(trunk) + + def handle_subports(self, subports, event_type): + """Subport data model change from the server.""" + context = n_ctx.get_admin_context() + affected_trunks = set() + if event_type == events.DELETED: + method = self._tapi.delete_trunk_subport + else: + method = self._tapi.put_trunk_subport + for s in subports: + affected_trunks.add(s['trunk_id']) + method(s['trunk_id'], s) + for trunk_id in affected_trunks: + trunk = self._tapi.get_trunk(context, trunk_id) + if not trunk: + continue + self.wire_trunk(context, trunk) + + def agent_port_delete(self, resource, event, trigger, context, port_id, + **kwargs): + """Agent informed us a VIF was removed.""" + # NOTE(kevinbenton): we don't need to do anything to cleanup VLAN + # interfaces if a trunk was removed because the kernel will do that + # for us. We also don't update the trunk status to DOWN because we + # don't want to race with another agent that the trunk may have been + # moved to. + + def agent_port_change(self, resource, event, trigger, context, + device_details, **kwargs): + """The agent hath informed us thusly of a port update or create.""" + trunk = self._tapi.get_trunk(context, device_details['port_id']) + if trunk: + # a wild trunk has appeared! make its children + self.wire_trunk(context, trunk) + return + # clear any VLANs in case this was a trunk that changed status while + # agent was offline. + self._plumber.delete_subports_by_port_id(device_details['port_id']) + if self._tapi.get_trunk_for_subport(context, + device_details['port_id']): + # This is a subport. We need to ensure the correct mac address is + # set now that we have the port data to see the data model MAC. + self._plumber.set_port_mac(device_details['port_id'], + device_details['mac_address']) + + def wire_trunk(self, context, trunk): + """Wire up subports while keeping the server trunk status apprised.""" + if not self._plumber.trunk_on_host(trunk): + LOG.debug("Trunk %s not present on this host", trunk.port_id) + return + self._tapi.bind_subports_to_host(context, trunk) + try: + self._plumber.ensure_trunk_subports(trunk) + self._tapi.set_trunk_status(context, trunk, t_const.ACTIVE_STATUS) + except Exception: + if not self._plumber.trunk_on_host(trunk): + LOG.debug("Trunk %s removed during wiring", trunk.port_id) + return + # something broke + LOG.exception(_LE("Failure setting up subports for %s"), + trunk.port_id) + self._tapi.set_trunk_status(context, trunk, + t_const.DEGRADED_STATUS) + + +class _TrunkAPI(object): + """Our secret stash of trunks stored by port ID. Tell no one.""" + + def __init__(self, trunk_stub): + self.server_api = trunk_stub + self._trunk_by_port_id = {} + self._trunk_by_id = {} + self._sub_port_id_to_trunk_port_id = {} + + def _fetch_trunk(self, context, port_id): + try: + t = self.server_api.get_trunk_details(context, port_id) + LOG.debug("Found trunk %(t)s for port %(p)s", dict(p=port_id, t=t)) + return t + except resources_rpc.ResourceNotFound: + return None + except oslo_messaging.RemoteError as e: + if 'CallbackNotFound' not in str(e): + raise + LOG.debug("Trunk plugin disabled on server. Assuming port %s is " + "not a trunk.", port_id) + return None + + def set_trunk_status(self, context, trunk, status): + self.server_api.update_trunk_status(context, trunk.id, status) + + def bind_subports_to_host(self, context, trunk): + self.server_api.update_subport_bindings(context, trunk.sub_ports) + + def put_trunk_subport(self, trunk_id, subport): + LOG.debug("Adding subport %(sub)s to trunk %(trunk)s", + dict(sub=subport, trunk=trunk_id)) + if trunk_id not in self._trunk_by_id: + # not on this agent + return + trunk = self._trunk_by_id[trunk_id] + trunk.sub_ports = [s for s in trunk.sub_ports + if s.port_id != subport.port_id] + [subport] + + def delete_trunk_subport(self, trunk_id, subport): + LOG.debug("Removing subport %(sub)s from trunk %(trunk)s", + dict(sub=subport, trunk=trunk_id)) + if trunk_id not in self._trunk_by_id: + # not on this agent + return + trunk = self._trunk_by_id[trunk_id] + trunk.sub_ports = [s for s in trunk.sub_ports + if s.port_id != subport.port_id] + + def put_trunk(self, port_id, trunk): + if port_id in self._trunk_by_port_id: + # already existed. expunge sub_port cross ref + self._sub_port_id_to_trunk_port_id = { + s: p for s, p in self._sub_port_id_to_trunk_port_id.items() + if p != port_id} + self._trunk_by_port_id[port_id] = trunk + if not trunk: + return + self._trunk_by_id[trunk.id] = trunk + for sub in trunk.sub_ports: + self._sub_port_id_to_trunk_port_id[sub.port_id] = trunk.port_id + + def get_trunk(self, context, port_id): + """Gets trunk object for port_id. None if not trunk.""" + if port_id not in self._trunk_by_port_id: + # TODO(kevinbenton): ask the server for *all* trunk port IDs on + # start and eliminate asking the server if every port is a trunk + # TODO(kevinbenton): clear this on AMQP reconnect + LOG.debug("Cache miss for port %s, fetching from server", port_id) + self.put_trunk(port_id, self._fetch_trunk(context, port_id)) + return self.get_trunk(context, port_id) + return self._trunk_by_port_id[port_id] + + def get_trunk_for_subport(self, context, port_id): + """Returns trunk if port_id is a subport, else None.""" + trunk_port = self._sub_port_id_to_trunk_port_id.get(port_id) + if trunk_port: + return self.get_trunk(context, trunk_port) diff --git a/neutron/tests/unit/services/trunk/drivers/linuxbridge/agent/test_driver.py b/neutron/tests/unit/services/trunk/drivers/linuxbridge/agent/test_driver.py new file mode 100644 index 00000000000..ea2380ae616 --- /dev/null +++ b/neutron/tests/unit/services/trunk/drivers/linuxbridge/agent/test_driver.py @@ -0,0 +1,238 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import oslo_messaging +from oslo_utils import uuidutils +import testtools + +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.handlers import resources_rpc +from neutron.objects import trunk +from neutron.services.trunk import constants as t_const +from neutron.services.trunk.drivers.linuxbridge.agent import driver +from neutron.services.trunk.drivers.linuxbridge.agent import trunk_plumber +from neutron.tests import base + + +class LinuxBridgeTrunkDriverTestCase(base.BaseTestCase): + def setUp(self): + super(LinuxBridgeTrunkDriverTestCase, self).setUp() + self.plumber = mock.create_autospec(trunk_plumber.Plumber()) + self.stub = mock.create_autospec(driver.trunk_rpc.TrunkStub()) + self.tapi = mock.create_autospec(driver._TrunkAPI(self.stub)) + self.lbd = driver.LinuxBridgeTrunkDriver(self.plumber, self.tapi) + self.trunk = trunk.Trunk(id=uuidutils.generate_uuid(), + port_id=uuidutils.generate_uuid(), + tenant_id=uuidutils.generate_uuid()) + self.subports = [trunk.SubPort(id=uuidutils.generate_uuid(), + port_id=uuidutils.generate_uuid(), + segmentation_type='vlan', + trunk_id=self.trunk.id, + segmentation_id=i) + for i in range(20)] + self.trunk.sub_ports = self.subports + + def test_handle_trunks_created(self): + self._test_handle_trunks_wire_event(events.CREATED) + + def test_handle_trunks_updated(self): + self._test_handle_trunks_wire_event(events.UPDATED) + + def _test_handle_trunks_wire_event(self, event): + self.plumber.trunk_on_host.return_value = True + self.lbd.handle_trunks([self.trunk], event) + self.tapi.put_trunk.assert_called_once_with( + self.trunk.port_id, self.trunk) + self.tapi.bind_subports_to_host.assert_called_once_with( + mock.ANY, self.trunk) + self.assertFalse(self.plumber.delete_trunk_subports.called) + + def test_handle_trunks_deleted(self): + self.lbd.handle_trunks([self.trunk], events.DELETED) + self.tapi.put_trunk.assert_called_once_with( + self.trunk.port_id, None) + self.plumber.delete_trunk_subports.assert_called_once_with(self.trunk) + + def test_handle_subports_deleted(self): + self.tapi.get_trunk.return_value = self.trunk + self.lbd.handle_subports(self.trunk.sub_ports, events.DELETED) + self.assertEqual(20, len(self.tapi.delete_trunk_subport.mock_calls)) + # should have tried to wire trunk at the end with state + self.plumber.trunk_on_host.assert_called_once_with(self.trunk) + + def test_handle_subports_created(self): + self.tapi.get_trunk.return_value = self.trunk + self.lbd.handle_subports(self.trunk.sub_ports, events.CREATED) + self.assertEqual(20, len(self.tapi.put_trunk_subport.mock_calls)) + # should have tried to wire trunk at the end with state + self.plumber.trunk_on_host.assert_called_once_with(self.trunk) + + def test_agent_port_change_is_trunk(self): + self.tapi.get_trunk.return_value = self.trunk + self.lbd.agent_port_change('resource', 'event', 'trigger', 'context', + {'port_id': self.trunk.port_id}) + # should have tried to wire trunk + self.plumber.trunk_on_host.assert_called_once_with(self.trunk) + + def test_agent_port_change_not_trunk(self): + self.tapi.get_trunk.return_value = None + self.tapi.get_trunk_for_subport.return_value = None + other_port_id = uuidutils.generate_uuid() + self.lbd.agent_port_change('resource', 'event', 'trigger', 'context', + {'port_id': other_port_id}) + self.plumber.delete_subports_by_port_id.assert_called_once_with( + other_port_id) + + def test_agent_port_change_is_subport(self): + self.tapi.get_trunk.return_value = None + self.tapi.get_trunk_for_subport.return_value = self.trunk + self.lbd.agent_port_change('resource', 'event', 'trigger', 'context', + {'port_id': self.trunk.sub_ports[0].port_id, + 'mac_address': 'mac_addr'}) + self.plumber.delete_subports_by_port_id.assert_called_once_with( + self.trunk.sub_ports[0].port_id) + self.tapi.get_trunk_for_subport.assert_called_once_with( + mock.ANY, self.trunk.sub_ports[0].port_id) + self.plumber.set_port_mac.assert_called_once_with( + self.trunk.sub_ports[0].port_id, 'mac_addr') + + def test_wire_trunk_happy_path(self): + self.lbd.wire_trunk('ctx', self.trunk) + self.tapi.bind_subports_to_host.assert_called_once_with( + 'ctx', self.trunk) + self.plumber.ensure_trunk_subports.assert_called_once_with(self.trunk) + self.tapi.set_trunk_status.assert_called_once_with( + 'ctx', self.trunk, t_const.ACTIVE_STATUS) + + def test_wire_trunk_not_on_host(self): + # trunk device not on host + self.plumber.trunk_on_host.return_value = False + self.lbd.wire_trunk('ctx', self.trunk) + # don't bind and don't set status + self.assertFalse(self.tapi.bind_subports_to_host.called) + self.assertFalse(self.tapi.set_trunk_status.called) + + def test_wire_trunk_concurrent_removal(self): + self.plumber.trunk_on_host.side_effect = [True, False] + self.plumber.ensure_trunk_subports.side_effect = ValueError() + self.lbd.wire_trunk('ctx', self.trunk) + # we don't change status if port was just removed + self.assertFalse(self.tapi.set_trunk_status.called) + + def test_wire_trunk_other_exception(self): + self.plumber.ensure_trunk_subports.side_effect = ValueError() + self.lbd.wire_trunk('ctx', self.trunk) + # degraded due to dataplane failure + self.tapi.set_trunk_status.assert_called_once_with( + 'ctx', self.trunk, t_const.DEGRADED_STATUS) + + +class TrunkAPITestCase(base.BaseTestCase): + def setUp(self): + super(TrunkAPITestCase, self).setUp() + self.stub = mock.create_autospec(driver.trunk_rpc.TrunkStub()) + self.tapi = driver._TrunkAPI(self.stub) + self.trunk = trunk.Trunk(id=uuidutils.generate_uuid(), + port_id=uuidutils.generate_uuid(), + tenant_id=uuidutils.generate_uuid()) + self.subports = [trunk.SubPort(id=uuidutils.generate_uuid(), + port_id=uuidutils.generate_uuid(), + segmentation_type='vlan', + trunk_id=self.trunk.id, + segmentation_id=i) + for i in range(20)] + self.trunk.sub_ports = self.subports + self.stub.get_trunk_details.return_value = self.trunk + + def test_fetch_trunk(self): + self.assertEqual(self.trunk, self.tapi._fetch_trunk('ctx', 'port')) + self.stub.get_trunk_details.assert_called_once_with('ctx', 'port') + + def test_fetch_trunk_missing(self): + self.stub.get_trunk_details.side_effect = ( + resources_rpc.ResourceNotFound(resource_id='1', resource_type='1')) + self.assertIsNone(self.tapi._fetch_trunk('ctx', 'port')) + + def test_fetch_trunk_plugin_disabled(self): + self.stub.get_trunk_details.side_effect = ( + oslo_messaging.RemoteError('CallbackNotFound')) + self.assertIsNone(self.tapi._fetch_trunk('ctx', 'port')) + + def test_fetch_trunk_plugin_other_error(self): + self.stub.get_trunk_details.side_effect = ( + oslo_messaging.RemoteError('vacuum full')) + with testtools.ExpectedException(oslo_messaging.RemoteError): + self.tapi._fetch_trunk('ctx', 'port') + + def test_set_trunk_status(self): + self.tapi.set_trunk_status('ctx', self.trunk, 'STATUS') + self.stub.update_trunk_status.assert_called_once_with( + 'ctx', self.trunk.id, 'STATUS') + + def test_bind_subports_to_host(self): + self.tapi.bind_subports_to_host('ctx', self.trunk) + self.stub.update_subport_bindings.assert_called_once_with( + 'ctx', self.trunk.sub_ports) + + def test_put_trunk_subport_non_existent_trunk(self): + # trunks not registered are ignored + self.tapi.put_trunk_subport( + 'non_trunk_id', self.trunk.sub_ports[0]) + + def test_put_trunk_subport(self): + self.tapi.put_trunk(self.trunk.port_id, self.trunk) + new = trunk.SubPort(id=uuidutils.generate_uuid(), + port_id=uuidutils.generate_uuid(), + segmentation_type='vlan', + trunk_id=self.trunk.id, + segmentation_id=1010) + self.tapi.put_trunk_subport(self.trunk.id, new) + subs = self.tapi.get_trunk('ctx', self.trunk.port_id).sub_ports + self.assertEqual(21, len(subs)) + self.assertEqual(new, subs[-1]) + + def test_delete_trunk_subport(self): + self.tapi.put_trunk(self.trunk.port_id, self.trunk) + sub = self.trunk.sub_ports[10] + self.tapi.delete_trunk_subport(self.trunk.id, sub) + subs = self.tapi.get_trunk('ctx', self.trunk.port_id).sub_ports + self.assertNotIn(sub, subs) + self.assertEqual(19, len(subs)) + + def test_get_trunk(self): + self.tapi.put_trunk(self.trunk.port_id, self.trunk) + self.assertEqual(self.trunk, + self.tapi.get_trunk('ctx', self.trunk.port_id)) + self.tapi.get_trunk('ctx', self.trunk.port_id) + self.assertFalse(self.stub.get_trunk_details.called) + + def test_get_trunk_cache_miss(self): + self.assertEqual(self.trunk, + self.tapi.get_trunk('ctx', self.trunk.port_id)) + self.tapi.get_trunk('ctx', self.trunk.port_id) + self.assertEqual(1, len(self.stub.get_trunk_details.mock_calls)) + + def test_get_trunk_not_found(self): + self.stub.get_trunk_details.side_effect = ( + resources_rpc.ResourceNotFound(resource_id='1', resource_type='1')) + self.assertIsNone(self.tapi.get_trunk('ctx', self.trunk.port_id)) + self.tapi.get_trunk('ctx', self.trunk.port_id) + self.assertEqual(1, len(self.stub.get_trunk_details.mock_calls)) + + def test_get_trunk_for_subport(self): + self.tapi.put_trunk(self.trunk.port_id, self.trunk) + t = self.tapi.get_trunk_for_subport( + 'ctx', self.trunk.sub_ports[0].port_id) + self.assertEqual(self.trunk, t)