Implementation of vlan-aware-vms for Linux Bridge

This is the agent-side implementation of vlan-aware-vms for
the Linux Bridge agent. It implements the feature using
vlan subinterfaces.

Whenever subports are required, the linux bridge trunk driver
will create vlan devices off of the parent port device following
the same naming scheme as normal ports. This allows the normal
agent loop to see these VLAN ports and wire them like any other
port so the trunk logic doesn't have to concern itself with things
like firewall rules, anti-spoofing, or encapsulation onto the
physical network.

How to try:

* enable the Linux Bridge mech driver and install the Linux Bridge agent
* enable the 'trunk' service plugin
* make a port, turn it into a trunk, attach it to a VM (or boot a VM with it)
* add subports and configure your guest on the corresponding VLAN
* don't forget these subports have security groups so add allow rules!

Partially-implements: blueprint vlan-aware-vms
Change-Id: I688d5b25885c1c3938185467b15502ccf65cf935
This commit is contained in:
Kevin Benton 2016-07-31 08:22:47 -07:00
parent 19e4b107f0
commit 2e882a9496
5 changed files with 477 additions and 0 deletions

View File

@ -40,6 +40,7 @@ from neutron.common import constants as n_const
from neutron.common import topics
from neutron import context
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import capabilities
from neutron.plugins.ml2.drivers.agent import config as cagt_config # noqa
LOG = logging.getLogger(__name__)
@ -105,6 +106,7 @@ class CommonAgentLoop(service.Service):
self._report_state)
heartbeat.start(interval=report_interval)
capabilities.notify_init_event(self.agent_type, self)
# The initialization is complete; we can start receiving messages
self.connection.consume_in_threads()

View File

@ -0,0 +1,25 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron_lib import constants
from neutron.plugins.ml2.drivers.agent import capabilities
from neutron.services.trunk.drivers.linuxbridge.agent import driver
def register():
"""Register Linux Bridge capabilities."""
# Add capabilities to be loaded during agent initialization
capabilities.register(driver.init_handler,
constants.AGENT_TYPE_LINUXBRIDGE)

View File

@ -53,6 +53,8 @@ from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
import constants as lconst
from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
import utils as lb_utils
from neutron.plugins.ml2.drivers.linuxbridge.agent \
import linuxbridge_capabilities
LOG = logging.getLogger(__name__)
@ -933,6 +935,7 @@ def main():
LOG.info(_LI("Bridge mappings: %s"), bridge_mappings)
manager = LinuxBridgeManager(bridge_mappings, interface_mappings)
linuxbridge_capabilities.register()
polling_interval = cfg.CONF.AGENT.polling_interval
quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout

View File

@ -0,0 +1,209 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import oslo_messaging
from neutron._i18n import _LE
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.handlers import resources_rpc
from neutron.callbacks import events as local_events
from neutron.callbacks import registry
from neutron.callbacks import resources as local_resources
from neutron import context as n_ctx
from neutron.services.trunk import constants as t_const
from neutron.services.trunk.drivers.linuxbridge.agent import trunk_plumber
from neutron.services.trunk.rpc import agent as trunk_rpc
LOG = logging.getLogger(__name__)
def init_handler(resource, event, trigger, agent=None):
"""Handler for agent init event."""
if agent:
LinuxBridgeTrunkDriver()
class LinuxBridgeTrunkDriver(trunk_rpc.TrunkSkeleton):
"""Driver responsible for handling trunk/subport/port events.
Receives data model events from the server and VIF events
from the agent and uses these to drive a Plumber instance
to wire up VLAN subinterfaces for any trunks.
"""
def __init__(self, plumber=None, trunk_api=None):
self._plumber = plumber or trunk_plumber.Plumber()
self._tapi = trunk_api or _TrunkAPI(trunk_rpc.TrunkStub())
registry.subscribe(self.agent_port_change,
local_resources.PORT_DEVICE,
local_events.AFTER_UPDATE)
registry.subscribe(self.agent_port_delete,
local_resources.PORT_DEVICE,
local_events.AFTER_DELETE)
super(LinuxBridgeTrunkDriver, self).__init__()
def handle_trunks(self, trunks, event_type):
"""Trunk data model change from the server."""
context = n_ctx.get_admin_context()
for trunk in trunks:
if event_type in (events.UPDATED, events.CREATED):
self._tapi.put_trunk(trunk.port_id, trunk)
self.wire_trunk(context, trunk)
elif event_type == events.DELETED:
self._tapi.put_trunk(trunk.port_id, None)
self._plumber.delete_trunk_subports(trunk)
def handle_subports(self, subports, event_type):
"""Subport data model change from the server."""
context = n_ctx.get_admin_context()
affected_trunks = set()
if event_type == events.DELETED:
method = self._tapi.delete_trunk_subport
else:
method = self._tapi.put_trunk_subport
for s in subports:
affected_trunks.add(s['trunk_id'])
method(s['trunk_id'], s)
for trunk_id in affected_trunks:
trunk = self._tapi.get_trunk(context, trunk_id)
if not trunk:
continue
self.wire_trunk(context, trunk)
def agent_port_delete(self, resource, event, trigger, context, port_id,
**kwargs):
"""Agent informed us a VIF was removed."""
# NOTE(kevinbenton): we don't need to do anything to cleanup VLAN
# interfaces if a trunk was removed because the kernel will do that
# for us. We also don't update the trunk status to DOWN because we
# don't want to race with another agent that the trunk may have been
# moved to.
def agent_port_change(self, resource, event, trigger, context,
device_details, **kwargs):
"""The agent hath informed us thusly of a port update or create."""
trunk = self._tapi.get_trunk(context, device_details['port_id'])
if trunk:
# a wild trunk has appeared! make its children
self.wire_trunk(context, trunk)
return
# clear any VLANs in case this was a trunk that changed status while
# agent was offline.
self._plumber.delete_subports_by_port_id(device_details['port_id'])
if self._tapi.get_trunk_for_subport(context,
device_details['port_id']):
# This is a subport. We need to ensure the correct mac address is
# set now that we have the port data to see the data model MAC.
self._plumber.set_port_mac(device_details['port_id'],
device_details['mac_address'])
def wire_trunk(self, context, trunk):
"""Wire up subports while keeping the server trunk status apprised."""
if not self._plumber.trunk_on_host(trunk):
LOG.debug("Trunk %s not present on this host", trunk.port_id)
return
self._tapi.bind_subports_to_host(context, trunk)
try:
self._plumber.ensure_trunk_subports(trunk)
self._tapi.set_trunk_status(context, trunk, t_const.ACTIVE_STATUS)
except Exception:
if not self._plumber.trunk_on_host(trunk):
LOG.debug("Trunk %s removed during wiring", trunk.port_id)
return
# something broke
LOG.exception(_LE("Failure setting up subports for %s"),
trunk.port_id)
self._tapi.set_trunk_status(context, trunk,
t_const.DEGRADED_STATUS)
class _TrunkAPI(object):
"""Our secret stash of trunks stored by port ID. Tell no one."""
def __init__(self, trunk_stub):
self.server_api = trunk_stub
self._trunk_by_port_id = {}
self._trunk_by_id = {}
self._sub_port_id_to_trunk_port_id = {}
def _fetch_trunk(self, context, port_id):
try:
t = self.server_api.get_trunk_details(context, port_id)
LOG.debug("Found trunk %(t)s for port %(p)s", dict(p=port_id, t=t))
return t
except resources_rpc.ResourceNotFound:
return None
except oslo_messaging.RemoteError as e:
if 'CallbackNotFound' not in str(e):
raise
LOG.debug("Trunk plugin disabled on server. Assuming port %s is "
"not a trunk.", port_id)
return None
def set_trunk_status(self, context, trunk, status):
self.server_api.update_trunk_status(context, trunk.id, status)
def bind_subports_to_host(self, context, trunk):
self.server_api.update_subport_bindings(context, trunk.sub_ports)
def put_trunk_subport(self, trunk_id, subport):
LOG.debug("Adding subport %(sub)s to trunk %(trunk)s",
dict(sub=subport, trunk=trunk_id))
if trunk_id not in self._trunk_by_id:
# not on this agent
return
trunk = self._trunk_by_id[trunk_id]
trunk.sub_ports = [s for s in trunk.sub_ports
if s.port_id != subport.port_id] + [subport]
def delete_trunk_subport(self, trunk_id, subport):
LOG.debug("Removing subport %(sub)s from trunk %(trunk)s",
dict(sub=subport, trunk=trunk_id))
if trunk_id not in self._trunk_by_id:
# not on this agent
return
trunk = self._trunk_by_id[trunk_id]
trunk.sub_ports = [s for s in trunk.sub_ports
if s.port_id != subport.port_id]
def put_trunk(self, port_id, trunk):
if port_id in self._trunk_by_port_id:
# already existed. expunge sub_port cross ref
self._sub_port_id_to_trunk_port_id = {
s: p for s, p in self._sub_port_id_to_trunk_port_id.items()
if p != port_id}
self._trunk_by_port_id[port_id] = trunk
if not trunk:
return
self._trunk_by_id[trunk.id] = trunk
for sub in trunk.sub_ports:
self._sub_port_id_to_trunk_port_id[sub.port_id] = trunk.port_id
def get_trunk(self, context, port_id):
"""Gets trunk object for port_id. None if not trunk."""
if port_id not in self._trunk_by_port_id:
# TODO(kevinbenton): ask the server for *all* trunk port IDs on
# start and eliminate asking the server if every port is a trunk
# TODO(kevinbenton): clear this on AMQP reconnect
LOG.debug("Cache miss for port %s, fetching from server", port_id)
self.put_trunk(port_id, self._fetch_trunk(context, port_id))
return self.get_trunk(context, port_id)
return self._trunk_by_port_id[port_id]
def get_trunk_for_subport(self, context, port_id):
"""Returns trunk if port_id is a subport, else None."""
trunk_port = self._sub_port_id_to_trunk_port_id.get(port_id)
if trunk_port:
return self.get_trunk(context, trunk_port)

View File

@ -0,0 +1,238 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import oslo_messaging
from oslo_utils import uuidutils
import testtools
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.handlers import resources_rpc
from neutron.objects import trunk
from neutron.services.trunk import constants as t_const
from neutron.services.trunk.drivers.linuxbridge.agent import driver
from neutron.services.trunk.drivers.linuxbridge.agent import trunk_plumber
from neutron.tests import base
class LinuxBridgeTrunkDriverTestCase(base.BaseTestCase):
def setUp(self):
super(LinuxBridgeTrunkDriverTestCase, self).setUp()
self.plumber = mock.create_autospec(trunk_plumber.Plumber())
self.stub = mock.create_autospec(driver.trunk_rpc.TrunkStub())
self.tapi = mock.create_autospec(driver._TrunkAPI(self.stub))
self.lbd = driver.LinuxBridgeTrunkDriver(self.plumber, self.tapi)
self.trunk = trunk.Trunk(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
tenant_id=uuidutils.generate_uuid())
self.subports = [trunk.SubPort(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
segmentation_type='vlan',
trunk_id=self.trunk.id,
segmentation_id=i)
for i in range(20)]
self.trunk.sub_ports = self.subports
def test_handle_trunks_created(self):
self._test_handle_trunks_wire_event(events.CREATED)
def test_handle_trunks_updated(self):
self._test_handle_trunks_wire_event(events.UPDATED)
def _test_handle_trunks_wire_event(self, event):
self.plumber.trunk_on_host.return_value = True
self.lbd.handle_trunks([self.trunk], event)
self.tapi.put_trunk.assert_called_once_with(
self.trunk.port_id, self.trunk)
self.tapi.bind_subports_to_host.assert_called_once_with(
mock.ANY, self.trunk)
self.assertFalse(self.plumber.delete_trunk_subports.called)
def test_handle_trunks_deleted(self):
self.lbd.handle_trunks([self.trunk], events.DELETED)
self.tapi.put_trunk.assert_called_once_with(
self.trunk.port_id, None)
self.plumber.delete_trunk_subports.assert_called_once_with(self.trunk)
def test_handle_subports_deleted(self):
self.tapi.get_trunk.return_value = self.trunk
self.lbd.handle_subports(self.trunk.sub_ports, events.DELETED)
self.assertEqual(20, len(self.tapi.delete_trunk_subport.mock_calls))
# should have tried to wire trunk at the end with state
self.plumber.trunk_on_host.assert_called_once_with(self.trunk)
def test_handle_subports_created(self):
self.tapi.get_trunk.return_value = self.trunk
self.lbd.handle_subports(self.trunk.sub_ports, events.CREATED)
self.assertEqual(20, len(self.tapi.put_trunk_subport.mock_calls))
# should have tried to wire trunk at the end with state
self.plumber.trunk_on_host.assert_called_once_with(self.trunk)
def test_agent_port_change_is_trunk(self):
self.tapi.get_trunk.return_value = self.trunk
self.lbd.agent_port_change('resource', 'event', 'trigger', 'context',
{'port_id': self.trunk.port_id})
# should have tried to wire trunk
self.plumber.trunk_on_host.assert_called_once_with(self.trunk)
def test_agent_port_change_not_trunk(self):
self.tapi.get_trunk.return_value = None
self.tapi.get_trunk_for_subport.return_value = None
other_port_id = uuidutils.generate_uuid()
self.lbd.agent_port_change('resource', 'event', 'trigger', 'context',
{'port_id': other_port_id})
self.plumber.delete_subports_by_port_id.assert_called_once_with(
other_port_id)
def test_agent_port_change_is_subport(self):
self.tapi.get_trunk.return_value = None
self.tapi.get_trunk_for_subport.return_value = self.trunk
self.lbd.agent_port_change('resource', 'event', 'trigger', 'context',
{'port_id': self.trunk.sub_ports[0].port_id,
'mac_address': 'mac_addr'})
self.plumber.delete_subports_by_port_id.assert_called_once_with(
self.trunk.sub_ports[0].port_id)
self.tapi.get_trunk_for_subport.assert_called_once_with(
mock.ANY, self.trunk.sub_ports[0].port_id)
self.plumber.set_port_mac.assert_called_once_with(
self.trunk.sub_ports[0].port_id, 'mac_addr')
def test_wire_trunk_happy_path(self):
self.lbd.wire_trunk('ctx', self.trunk)
self.tapi.bind_subports_to_host.assert_called_once_with(
'ctx', self.trunk)
self.plumber.ensure_trunk_subports.assert_called_once_with(self.trunk)
self.tapi.set_trunk_status.assert_called_once_with(
'ctx', self.trunk, t_const.ACTIVE_STATUS)
def test_wire_trunk_not_on_host(self):
# trunk device not on host
self.plumber.trunk_on_host.return_value = False
self.lbd.wire_trunk('ctx', self.trunk)
# don't bind and don't set status
self.assertFalse(self.tapi.bind_subports_to_host.called)
self.assertFalse(self.tapi.set_trunk_status.called)
def test_wire_trunk_concurrent_removal(self):
self.plumber.trunk_on_host.side_effect = [True, False]
self.plumber.ensure_trunk_subports.side_effect = ValueError()
self.lbd.wire_trunk('ctx', self.trunk)
# we don't change status if port was just removed
self.assertFalse(self.tapi.set_trunk_status.called)
def test_wire_trunk_other_exception(self):
self.plumber.ensure_trunk_subports.side_effect = ValueError()
self.lbd.wire_trunk('ctx', self.trunk)
# degraded due to dataplane failure
self.tapi.set_trunk_status.assert_called_once_with(
'ctx', self.trunk, t_const.DEGRADED_STATUS)
class TrunkAPITestCase(base.BaseTestCase):
def setUp(self):
super(TrunkAPITestCase, self).setUp()
self.stub = mock.create_autospec(driver.trunk_rpc.TrunkStub())
self.tapi = driver._TrunkAPI(self.stub)
self.trunk = trunk.Trunk(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
tenant_id=uuidutils.generate_uuid())
self.subports = [trunk.SubPort(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
segmentation_type='vlan',
trunk_id=self.trunk.id,
segmentation_id=i)
for i in range(20)]
self.trunk.sub_ports = self.subports
self.stub.get_trunk_details.return_value = self.trunk
def test_fetch_trunk(self):
self.assertEqual(self.trunk, self.tapi._fetch_trunk('ctx', 'port'))
self.stub.get_trunk_details.assert_called_once_with('ctx', 'port')
def test_fetch_trunk_missing(self):
self.stub.get_trunk_details.side_effect = (
resources_rpc.ResourceNotFound(resource_id='1', resource_type='1'))
self.assertIsNone(self.tapi._fetch_trunk('ctx', 'port'))
def test_fetch_trunk_plugin_disabled(self):
self.stub.get_trunk_details.side_effect = (
oslo_messaging.RemoteError('CallbackNotFound'))
self.assertIsNone(self.tapi._fetch_trunk('ctx', 'port'))
def test_fetch_trunk_plugin_other_error(self):
self.stub.get_trunk_details.side_effect = (
oslo_messaging.RemoteError('vacuum full'))
with testtools.ExpectedException(oslo_messaging.RemoteError):
self.tapi._fetch_trunk('ctx', 'port')
def test_set_trunk_status(self):
self.tapi.set_trunk_status('ctx', self.trunk, 'STATUS')
self.stub.update_trunk_status.assert_called_once_with(
'ctx', self.trunk.id, 'STATUS')
def test_bind_subports_to_host(self):
self.tapi.bind_subports_to_host('ctx', self.trunk)
self.stub.update_subport_bindings.assert_called_once_with(
'ctx', self.trunk.sub_ports)
def test_put_trunk_subport_non_existent_trunk(self):
# trunks not registered are ignored
self.tapi.put_trunk_subport(
'non_trunk_id', self.trunk.sub_ports[0])
def test_put_trunk_subport(self):
self.tapi.put_trunk(self.trunk.port_id, self.trunk)
new = trunk.SubPort(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
segmentation_type='vlan',
trunk_id=self.trunk.id,
segmentation_id=1010)
self.tapi.put_trunk_subport(self.trunk.id, new)
subs = self.tapi.get_trunk('ctx', self.trunk.port_id).sub_ports
self.assertEqual(21, len(subs))
self.assertEqual(new, subs[-1])
def test_delete_trunk_subport(self):
self.tapi.put_trunk(self.trunk.port_id, self.trunk)
sub = self.trunk.sub_ports[10]
self.tapi.delete_trunk_subport(self.trunk.id, sub)
subs = self.tapi.get_trunk('ctx', self.trunk.port_id).sub_ports
self.assertNotIn(sub, subs)
self.assertEqual(19, len(subs))
def test_get_trunk(self):
self.tapi.put_trunk(self.trunk.port_id, self.trunk)
self.assertEqual(self.trunk,
self.tapi.get_trunk('ctx', self.trunk.port_id))
self.tapi.get_trunk('ctx', self.trunk.port_id)
self.assertFalse(self.stub.get_trunk_details.called)
def test_get_trunk_cache_miss(self):
self.assertEqual(self.trunk,
self.tapi.get_trunk('ctx', self.trunk.port_id))
self.tapi.get_trunk('ctx', self.trunk.port_id)
self.assertEqual(1, len(self.stub.get_trunk_details.mock_calls))
def test_get_trunk_not_found(self):
self.stub.get_trunk_details.side_effect = (
resources_rpc.ResourceNotFound(resource_id='1', resource_type='1'))
self.assertIsNone(self.tapi.get_trunk('ctx', self.trunk.port_id))
self.tapi.get_trunk('ctx', self.trunk.port_id)
self.assertEqual(1, len(self.stub.get_trunk_details.mock_calls))
def test_get_trunk_for_subport(self):
self.tapi.put_trunk(self.trunk.port_id, self.trunk)
t = self.tapi.get_trunk_for_subport(
'ctx', self.trunk.sub_ports[0].port_id)
self.assertEqual(self.trunk, t)