Merge "Implementation of vlan-aware-vms for Linux Bridge"

This commit is contained in:
Jenkins 2016-09-15 01:10:50 +00:00 committed by Gerrit Code Review
commit b7fdd64cda
5 changed files with 477 additions and 0 deletions

View File

@ -40,6 +40,7 @@ from neutron.common import constants as n_const
from neutron.common import topics
from neutron import context
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import capabilities
from neutron.plugins.ml2.drivers.agent import config as cagt_config # noqa
LOG = logging.getLogger(__name__)
@ -105,6 +106,7 @@ class CommonAgentLoop(service.Service):
self._report_state)
heartbeat.start(interval=report_interval)
capabilities.notify_init_event(self.agent_type, self)
# The initialization is complete; we can start receiving messages
self.connection.consume_in_threads()

View File

@ -0,0 +1,25 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron_lib import constants
from neutron.plugins.ml2.drivers.agent import capabilities
from neutron.services.trunk.drivers.linuxbridge.agent import driver
def register():
"""Register Linux Bridge capabilities."""
# Add capabilities to be loaded during agent initialization
capabilities.register(driver.init_handler,
constants.AGENT_TYPE_LINUXBRIDGE)

View File

@ -53,6 +53,8 @@ from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
import constants as lconst
from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
import utils as lb_utils
from neutron.plugins.ml2.drivers.linuxbridge.agent \
import linuxbridge_capabilities
LOG = logging.getLogger(__name__)
@ -927,6 +929,7 @@ def main():
LOG.info(_LI("Bridge mappings: %s"), bridge_mappings)
manager = LinuxBridgeManager(bridge_mappings, interface_mappings)
linuxbridge_capabilities.register()
polling_interval = cfg.CONF.AGENT.polling_interval
quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout

View File

@ -0,0 +1,209 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import oslo_messaging
from neutron._i18n import _LE
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.handlers import resources_rpc
from neutron.callbacks import events as local_events
from neutron.callbacks import registry
from neutron.callbacks import resources as local_resources
from neutron import context as n_ctx
from neutron.services.trunk import constants as t_const
from neutron.services.trunk.drivers.linuxbridge.agent import trunk_plumber
from neutron.services.trunk.rpc import agent as trunk_rpc
LOG = logging.getLogger(__name__)
def init_handler(resource, event, trigger, agent=None):
"""Handler for agent init event."""
if agent:
LinuxBridgeTrunkDriver()
class LinuxBridgeTrunkDriver(trunk_rpc.TrunkSkeleton):
"""Driver responsible for handling trunk/subport/port events.
Receives data model events from the server and VIF events
from the agent and uses these to drive a Plumber instance
to wire up VLAN subinterfaces for any trunks.
"""
def __init__(self, plumber=None, trunk_api=None):
self._plumber = plumber or trunk_plumber.Plumber()
self._tapi = trunk_api or _TrunkAPI(trunk_rpc.TrunkStub())
registry.subscribe(self.agent_port_change,
local_resources.PORT_DEVICE,
local_events.AFTER_UPDATE)
registry.subscribe(self.agent_port_delete,
local_resources.PORT_DEVICE,
local_events.AFTER_DELETE)
super(LinuxBridgeTrunkDriver, self).__init__()
def handle_trunks(self, trunks, event_type):
"""Trunk data model change from the server."""
context = n_ctx.get_admin_context()
for trunk in trunks:
if event_type in (events.UPDATED, events.CREATED):
self._tapi.put_trunk(trunk.port_id, trunk)
self.wire_trunk(context, trunk)
elif event_type == events.DELETED:
self._tapi.put_trunk(trunk.port_id, None)
self._plumber.delete_trunk_subports(trunk)
def handle_subports(self, subports, event_type):
"""Subport data model change from the server."""
context = n_ctx.get_admin_context()
affected_trunks = set()
if event_type == events.DELETED:
method = self._tapi.delete_trunk_subport
else:
method = self._tapi.put_trunk_subport
for s in subports:
affected_trunks.add(s['trunk_id'])
method(s['trunk_id'], s)
for trunk_id in affected_trunks:
trunk = self._tapi.get_trunk(context, trunk_id)
if not trunk:
continue
self.wire_trunk(context, trunk)
def agent_port_delete(self, resource, event, trigger, context, port_id,
**kwargs):
"""Agent informed us a VIF was removed."""
# NOTE(kevinbenton): we don't need to do anything to cleanup VLAN
# interfaces if a trunk was removed because the kernel will do that
# for us. We also don't update the trunk status to DOWN because we
# don't want to race with another agent that the trunk may have been
# moved to.
def agent_port_change(self, resource, event, trigger, context,
device_details, **kwargs):
"""The agent hath informed us thusly of a port update or create."""
trunk = self._tapi.get_trunk(context, device_details['port_id'])
if trunk:
# a wild trunk has appeared! make its children
self.wire_trunk(context, trunk)
return
# clear any VLANs in case this was a trunk that changed status while
# agent was offline.
self._plumber.delete_subports_by_port_id(device_details['port_id'])
if self._tapi.get_trunk_for_subport(context,
device_details['port_id']):
# This is a subport. We need to ensure the correct mac address is
# set now that we have the port data to see the data model MAC.
self._plumber.set_port_mac(device_details['port_id'],
device_details['mac_address'])
def wire_trunk(self, context, trunk):
"""Wire up subports while keeping the server trunk status apprised."""
if not self._plumber.trunk_on_host(trunk):
LOG.debug("Trunk %s not present on this host", trunk.port_id)
return
self._tapi.bind_subports_to_host(context, trunk)
try:
self._plumber.ensure_trunk_subports(trunk)
self._tapi.set_trunk_status(context, trunk, t_const.ACTIVE_STATUS)
except Exception:
if not self._plumber.trunk_on_host(trunk):
LOG.debug("Trunk %s removed during wiring", trunk.port_id)
return
# something broke
LOG.exception(_LE("Failure setting up subports for %s"),
trunk.port_id)
self._tapi.set_trunk_status(context, trunk,
t_const.DEGRADED_STATUS)
class _TrunkAPI(object):
"""Our secret stash of trunks stored by port ID. Tell no one."""
def __init__(self, trunk_stub):
self.server_api = trunk_stub
self._trunk_by_port_id = {}
self._trunk_by_id = {}
self._sub_port_id_to_trunk_port_id = {}
def _fetch_trunk(self, context, port_id):
try:
t = self.server_api.get_trunk_details(context, port_id)
LOG.debug("Found trunk %(t)s for port %(p)s", dict(p=port_id, t=t))
return t
except resources_rpc.ResourceNotFound:
return None
except oslo_messaging.RemoteError as e:
if 'CallbackNotFound' not in str(e):
raise
LOG.debug("Trunk plugin disabled on server. Assuming port %s is "
"not a trunk.", port_id)
return None
def set_trunk_status(self, context, trunk, status):
self.server_api.update_trunk_status(context, trunk.id, status)
def bind_subports_to_host(self, context, trunk):
self.server_api.update_subport_bindings(context, trunk.sub_ports)
def put_trunk_subport(self, trunk_id, subport):
LOG.debug("Adding subport %(sub)s to trunk %(trunk)s",
dict(sub=subport, trunk=trunk_id))
if trunk_id not in self._trunk_by_id:
# not on this agent
return
trunk = self._trunk_by_id[trunk_id]
trunk.sub_ports = [s for s in trunk.sub_ports
if s.port_id != subport.port_id] + [subport]
def delete_trunk_subport(self, trunk_id, subport):
LOG.debug("Removing subport %(sub)s from trunk %(trunk)s",
dict(sub=subport, trunk=trunk_id))
if trunk_id not in self._trunk_by_id:
# not on this agent
return
trunk = self._trunk_by_id[trunk_id]
trunk.sub_ports = [s for s in trunk.sub_ports
if s.port_id != subport.port_id]
def put_trunk(self, port_id, trunk):
if port_id in self._trunk_by_port_id:
# already existed. expunge sub_port cross ref
self._sub_port_id_to_trunk_port_id = {
s: p for s, p in self._sub_port_id_to_trunk_port_id.items()
if p != port_id}
self._trunk_by_port_id[port_id] = trunk
if not trunk:
return
self._trunk_by_id[trunk.id] = trunk
for sub in trunk.sub_ports:
self._sub_port_id_to_trunk_port_id[sub.port_id] = trunk.port_id
def get_trunk(self, context, port_id):
"""Gets trunk object for port_id. None if not trunk."""
if port_id not in self._trunk_by_port_id:
# TODO(kevinbenton): ask the server for *all* trunk port IDs on
# start and eliminate asking the server if every port is a trunk
# TODO(kevinbenton): clear this on AMQP reconnect
LOG.debug("Cache miss for port %s, fetching from server", port_id)
self.put_trunk(port_id, self._fetch_trunk(context, port_id))
return self.get_trunk(context, port_id)
return self._trunk_by_port_id[port_id]
def get_trunk_for_subport(self, context, port_id):
"""Returns trunk if port_id is a subport, else None."""
trunk_port = self._sub_port_id_to_trunk_port_id.get(port_id)
if trunk_port:
return self.get_trunk(context, trunk_port)

View File

@ -0,0 +1,238 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import oslo_messaging
from oslo_utils import uuidutils
import testtools
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.handlers import resources_rpc
from neutron.objects import trunk
from neutron.services.trunk import constants as t_const
from neutron.services.trunk.drivers.linuxbridge.agent import driver
from neutron.services.trunk.drivers.linuxbridge.agent import trunk_plumber
from neutron.tests import base
class LinuxBridgeTrunkDriverTestCase(base.BaseTestCase):
def setUp(self):
super(LinuxBridgeTrunkDriverTestCase, self).setUp()
self.plumber = mock.create_autospec(trunk_plumber.Plumber())
self.stub = mock.create_autospec(driver.trunk_rpc.TrunkStub())
self.tapi = mock.create_autospec(driver._TrunkAPI(self.stub))
self.lbd = driver.LinuxBridgeTrunkDriver(self.plumber, self.tapi)
self.trunk = trunk.Trunk(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
tenant_id=uuidutils.generate_uuid())
self.subports = [trunk.SubPort(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
segmentation_type='vlan',
trunk_id=self.trunk.id,
segmentation_id=i)
for i in range(20)]
self.trunk.sub_ports = self.subports
def test_handle_trunks_created(self):
self._test_handle_trunks_wire_event(events.CREATED)
def test_handle_trunks_updated(self):
self._test_handle_trunks_wire_event(events.UPDATED)
def _test_handle_trunks_wire_event(self, event):
self.plumber.trunk_on_host.return_value = True
self.lbd.handle_trunks([self.trunk], event)
self.tapi.put_trunk.assert_called_once_with(
self.trunk.port_id, self.trunk)
self.tapi.bind_subports_to_host.assert_called_once_with(
mock.ANY, self.trunk)
self.assertFalse(self.plumber.delete_trunk_subports.called)
def test_handle_trunks_deleted(self):
self.lbd.handle_trunks([self.trunk], events.DELETED)
self.tapi.put_trunk.assert_called_once_with(
self.trunk.port_id, None)
self.plumber.delete_trunk_subports.assert_called_once_with(self.trunk)
def test_handle_subports_deleted(self):
self.tapi.get_trunk.return_value = self.trunk
self.lbd.handle_subports(self.trunk.sub_ports, events.DELETED)
self.assertEqual(20, len(self.tapi.delete_trunk_subport.mock_calls))
# should have tried to wire trunk at the end with state
self.plumber.trunk_on_host.assert_called_once_with(self.trunk)
def test_handle_subports_created(self):
self.tapi.get_trunk.return_value = self.trunk
self.lbd.handle_subports(self.trunk.sub_ports, events.CREATED)
self.assertEqual(20, len(self.tapi.put_trunk_subport.mock_calls))
# should have tried to wire trunk at the end with state
self.plumber.trunk_on_host.assert_called_once_with(self.trunk)
def test_agent_port_change_is_trunk(self):
self.tapi.get_trunk.return_value = self.trunk
self.lbd.agent_port_change('resource', 'event', 'trigger', 'context',
{'port_id': self.trunk.port_id})
# should have tried to wire trunk
self.plumber.trunk_on_host.assert_called_once_with(self.trunk)
def test_agent_port_change_not_trunk(self):
self.tapi.get_trunk.return_value = None
self.tapi.get_trunk_for_subport.return_value = None
other_port_id = uuidutils.generate_uuid()
self.lbd.agent_port_change('resource', 'event', 'trigger', 'context',
{'port_id': other_port_id})
self.plumber.delete_subports_by_port_id.assert_called_once_with(
other_port_id)
def test_agent_port_change_is_subport(self):
self.tapi.get_trunk.return_value = None
self.tapi.get_trunk_for_subport.return_value = self.trunk
self.lbd.agent_port_change('resource', 'event', 'trigger', 'context',
{'port_id': self.trunk.sub_ports[0].port_id,
'mac_address': 'mac_addr'})
self.plumber.delete_subports_by_port_id.assert_called_once_with(
self.trunk.sub_ports[0].port_id)
self.tapi.get_trunk_for_subport.assert_called_once_with(
mock.ANY, self.trunk.sub_ports[0].port_id)
self.plumber.set_port_mac.assert_called_once_with(
self.trunk.sub_ports[0].port_id, 'mac_addr')
def test_wire_trunk_happy_path(self):
self.lbd.wire_trunk('ctx', self.trunk)
self.tapi.bind_subports_to_host.assert_called_once_with(
'ctx', self.trunk)
self.plumber.ensure_trunk_subports.assert_called_once_with(self.trunk)
self.tapi.set_trunk_status.assert_called_once_with(
'ctx', self.trunk, t_const.ACTIVE_STATUS)
def test_wire_trunk_not_on_host(self):
# trunk device not on host
self.plumber.trunk_on_host.return_value = False
self.lbd.wire_trunk('ctx', self.trunk)
# don't bind and don't set status
self.assertFalse(self.tapi.bind_subports_to_host.called)
self.assertFalse(self.tapi.set_trunk_status.called)
def test_wire_trunk_concurrent_removal(self):
self.plumber.trunk_on_host.side_effect = [True, False]
self.plumber.ensure_trunk_subports.side_effect = ValueError()
self.lbd.wire_trunk('ctx', self.trunk)
# we don't change status if port was just removed
self.assertFalse(self.tapi.set_trunk_status.called)
def test_wire_trunk_other_exception(self):
self.plumber.ensure_trunk_subports.side_effect = ValueError()
self.lbd.wire_trunk('ctx', self.trunk)
# degraded due to dataplane failure
self.tapi.set_trunk_status.assert_called_once_with(
'ctx', self.trunk, t_const.DEGRADED_STATUS)
class TrunkAPITestCase(base.BaseTestCase):
def setUp(self):
super(TrunkAPITestCase, self).setUp()
self.stub = mock.create_autospec(driver.trunk_rpc.TrunkStub())
self.tapi = driver._TrunkAPI(self.stub)
self.trunk = trunk.Trunk(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
tenant_id=uuidutils.generate_uuid())
self.subports = [trunk.SubPort(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
segmentation_type='vlan',
trunk_id=self.trunk.id,
segmentation_id=i)
for i in range(20)]
self.trunk.sub_ports = self.subports
self.stub.get_trunk_details.return_value = self.trunk
def test_fetch_trunk(self):
self.assertEqual(self.trunk, self.tapi._fetch_trunk('ctx', 'port'))
self.stub.get_trunk_details.assert_called_once_with('ctx', 'port')
def test_fetch_trunk_missing(self):
self.stub.get_trunk_details.side_effect = (
resources_rpc.ResourceNotFound(resource_id='1', resource_type='1'))
self.assertIsNone(self.tapi._fetch_trunk('ctx', 'port'))
def test_fetch_trunk_plugin_disabled(self):
self.stub.get_trunk_details.side_effect = (
oslo_messaging.RemoteError('CallbackNotFound'))
self.assertIsNone(self.tapi._fetch_trunk('ctx', 'port'))
def test_fetch_trunk_plugin_other_error(self):
self.stub.get_trunk_details.side_effect = (
oslo_messaging.RemoteError('vacuum full'))
with testtools.ExpectedException(oslo_messaging.RemoteError):
self.tapi._fetch_trunk('ctx', 'port')
def test_set_trunk_status(self):
self.tapi.set_trunk_status('ctx', self.trunk, 'STATUS')
self.stub.update_trunk_status.assert_called_once_with(
'ctx', self.trunk.id, 'STATUS')
def test_bind_subports_to_host(self):
self.tapi.bind_subports_to_host('ctx', self.trunk)
self.stub.update_subport_bindings.assert_called_once_with(
'ctx', self.trunk.sub_ports)
def test_put_trunk_subport_non_existent_trunk(self):
# trunks not registered are ignored
self.tapi.put_trunk_subport(
'non_trunk_id', self.trunk.sub_ports[0])
def test_put_trunk_subport(self):
self.tapi.put_trunk(self.trunk.port_id, self.trunk)
new = trunk.SubPort(id=uuidutils.generate_uuid(),
port_id=uuidutils.generate_uuid(),
segmentation_type='vlan',
trunk_id=self.trunk.id,
segmentation_id=1010)
self.tapi.put_trunk_subport(self.trunk.id, new)
subs = self.tapi.get_trunk('ctx', self.trunk.port_id).sub_ports
self.assertEqual(21, len(subs))
self.assertEqual(new, subs[-1])
def test_delete_trunk_subport(self):
self.tapi.put_trunk(self.trunk.port_id, self.trunk)
sub = self.trunk.sub_ports[10]
self.tapi.delete_trunk_subport(self.trunk.id, sub)
subs = self.tapi.get_trunk('ctx', self.trunk.port_id).sub_ports
self.assertNotIn(sub, subs)
self.assertEqual(19, len(subs))
def test_get_trunk(self):
self.tapi.put_trunk(self.trunk.port_id, self.trunk)
self.assertEqual(self.trunk,
self.tapi.get_trunk('ctx', self.trunk.port_id))
self.tapi.get_trunk('ctx', self.trunk.port_id)
self.assertFalse(self.stub.get_trunk_details.called)
def test_get_trunk_cache_miss(self):
self.assertEqual(self.trunk,
self.tapi.get_trunk('ctx', self.trunk.port_id))
self.tapi.get_trunk('ctx', self.trunk.port_id)
self.assertEqual(1, len(self.stub.get_trunk_details.mock_calls))
def test_get_trunk_not_found(self):
self.stub.get_trunk_details.side_effect = (
resources_rpc.ResourceNotFound(resource_id='1', resource_type='1'))
self.assertIsNone(self.tapi.get_trunk('ctx', self.trunk.port_id))
self.tapi.get_trunk('ctx', self.trunk.port_id)
self.assertEqual(1, len(self.stub.get_trunk_details.mock_calls))
def test_get_trunk_for_subport(self):
self.tapi.put_trunk(self.trunk.port_id, self.trunk)
t = self.tapi.get_trunk_for_subport(
'ctx', self.trunk.sub_ports[0].port_id)
self.assertEqual(self.trunk, t)