OVSDBHandler for trunk ports
Handler processes the events passed by OVSDBMonitor and reacts to events related to trunk ports, calling to trunk manager to wire/unwire trunks and subports. Co-Authored-By: Jakub Libosvar <libosvar@redhat.com> Partially-implements: blueprint vlan-aware-vms Change-Id: I97487e9d7647b4110a2cdd48d0f129340d59a40d
This commit is contained in:
parent
0eec50b12f
commit
52300f285a
|
@ -145,3 +145,6 @@ DEAD_VLAN_TAG = p_const.MAX_VLAN_TAG + 1
|
|||
|
||||
# callback resource for setting 'bridge_name' in the 'binding:vif_details'
|
||||
OVS_BRIDGE_NAME = 'ovs_bridge_name'
|
||||
|
||||
# callback resource for notifying to ovsdb handler
|
||||
OVSDB_RESOURCE = 'ovsdb'
|
||||
|
|
|
@ -44,6 +44,8 @@ from neutron.agent import rpc as agent_rpc
|
|||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.handlers import dvr_rpc
|
||||
from neutron.callbacks import events as callback_events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.common import config
|
||||
from neutron.common import constants as c_const
|
||||
from neutron.common import topics
|
||||
|
@ -1866,6 +1868,12 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
failed_devices,
|
||||
failed_ancillary_devices,
|
||||
updated_ports_copy))
|
||||
registry.notify(
|
||||
constants.OVSDB_RESOURCE,
|
||||
callback_events.AFTER_READ,
|
||||
self,
|
||||
ovsdb_events=events)
|
||||
|
||||
return (port_info, ancillary_port_info, consecutive_resyncs,
|
||||
ports_not_ready_yet)
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@ from oslo_log import log as logging
|
|||
|
||||
from neutron.api.rpc.callbacks.consumer import registry
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.services.trunk.drivers.openvswitch.agent import ovsdb_handler
|
||||
from neutron.services.trunk.drivers.openvswitch.agent import trunk_manager
|
||||
from neutron.services.trunk.rpc import agent
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -21,9 +23,15 @@ TRUNK_SKELETON = None
|
|||
|
||||
|
||||
class OVSTrunkSkeleton(agent.TrunkSkeleton):
|
||||
"""It processes Neutron Server events to create the physical resources
|
||||
associated to a logical trunk in response to user initiated API events
|
||||
(such as trunk subport add/remove). It collaborates with the OVSDBHandler
|
||||
to implement the trunk control plane.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, ovsdb_handler):
|
||||
super(OVSTrunkSkeleton, self).__init__()
|
||||
self.ovsdb_handler = ovsdb_handler
|
||||
registry.unsubscribe(self.handle_trunks, resources.TRUNK)
|
||||
|
||||
def handle_trunks(self, trunk, event_type):
|
||||
|
@ -43,4 +51,7 @@ def init_handler(resource, event, trigger, agent=None):
|
|||
# Set up agent-side RPC for receiving trunk events; we may want to
|
||||
# make this setup conditional based on server-side capabilities.
|
||||
global TRUNK_SKELETON
|
||||
TRUNK_SKELETON = OVSTrunkSkeleton()
|
||||
|
||||
manager = trunk_manager.TrunkManager(agent.int_br)
|
||||
handler = ovsdb_handler.OVSDBHandler(manager)
|
||||
TRUNK_SKELETON = OVSTrunkSkeleton(handler)
|
||||
|
|
|
@ -0,0 +1,327 @@
|
|||
# Copyright (c) 2016 SUSE Linux Products GmbH
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
|
||||
import eventlet
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_context import context
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from neutron._i18n import _LE
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
from neutron.callbacks import events
|
||||
from neutron.callbacks import registry
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron import context as n_context
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
|
||||
import constants as ovs_agent_constants
|
||||
from neutron.services.trunk import constants
|
||||
from neutron.services.trunk.drivers.openvswitch.agent \
|
||||
import trunk_manager as tman
|
||||
from neutron.services.trunk.drivers.openvswitch import constants as t_const
|
||||
from neutron.services.trunk.rpc import agent
|
||||
from neutron.services.trunk import utils as trunk_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
WAIT_FOR_PORT_TIMEOUT = 60
|
||||
|
||||
|
||||
def lock_on_bridge_name(f):
|
||||
@functools.wraps(f)
|
||||
def inner(bridge_name, *args, **kwargs):
|
||||
with lockutils.lock(bridge_name):
|
||||
return f(bridge_name, *args, **kwargs)
|
||||
return inner
|
||||
|
||||
|
||||
def is_trunk_bridge(port_name):
|
||||
return port_name.startswith(t_const.TRUNK_BR_PREFIX)
|
||||
|
||||
|
||||
def is_trunk_service_port(port_name):
|
||||
"""True if the port is any of the ports used to realize a trunk."""
|
||||
return is_trunk_bridge(port_name) or port_name[:2] in (
|
||||
tman.TrunkParentPort.DEV_PREFIX,
|
||||
tman.SubPort.DEV_PREFIX)
|
||||
|
||||
|
||||
def bridge_has_instance_port(bridge):
|
||||
"""True if there is an OVS port that doesn't have bridge or patch ports
|
||||
prefix.
|
||||
"""
|
||||
ifaces = bridge.get_iface_name_list()
|
||||
return any(iface for iface in ifaces
|
||||
if not is_trunk_service_port(iface))
|
||||
|
||||
|
||||
class OVSDBHandler(object):
|
||||
"""It listens to OVSDB events to create the physical resources associated
|
||||
to a logical trunk in response to OVSDB events (such as VM boot and/or
|
||||
delete).
|
||||
"""
|
||||
|
||||
def __init__(self, trunk_manager):
|
||||
self._context = n_context.get_admin_context_without_session()
|
||||
self.trunk_manager = trunk_manager
|
||||
self.trunk_rpc = agent.TrunkStub()
|
||||
|
||||
registry.subscribe(self.process_trunk_port_events,
|
||||
ovs_agent_constants.OVSDB_RESOURCE,
|
||||
events.AFTER_READ)
|
||||
|
||||
@property
|
||||
def context(self):
|
||||
self._context.request_id = context.generate_request_id()
|
||||
return self._context
|
||||
|
||||
def process_trunk_port_events(
|
||||
self, resource, event, trigger, ovsdb_events):
|
||||
"""Process added and removed port events coming from OVSDB monitor."""
|
||||
for port_event in ovsdb_events['added']:
|
||||
port_name = port_event['name']
|
||||
if is_trunk_bridge(port_name):
|
||||
LOG.debug("Processing trunk bridge %s", port_name)
|
||||
# As there is active waiting for port to appear, it's handled
|
||||
# in a separate greenthread.
|
||||
# NOTE: port_name is equal to bridge_name at this point.
|
||||
eventlet.spawn_n(self.handle_trunk_add, port_name)
|
||||
|
||||
for port_event in ovsdb_events['removed']:
|
||||
bridge_name = port_event['external_ids'].get('bridge_name')
|
||||
if bridge_name and is_trunk_bridge(bridge_name):
|
||||
eventlet.spawn_n(
|
||||
self.handle_trunk_remove, port_event)
|
||||
|
||||
@lock_on_bridge_name
|
||||
def handle_trunk_add(self, bridge_name):
|
||||
"""Create trunk bridge based on parent port ID.
|
||||
|
||||
This method is decorated with a lock that prevents processing deletion
|
||||
while creation hasn't been finished yet. It's based on the bridge name
|
||||
so we can keep processing other bridges in parallel.
|
||||
|
||||
:param bridge_name: Name of the created trunk bridge.
|
||||
"""
|
||||
# Wait for the VM's port, i.e. the trunk parent port, to show up.
|
||||
# If the VM fails to show up, i.e. this fails with a timeout,
|
||||
# then we clean the dangling bridge.
|
||||
bridge = ovs_lib.OVSBridge(bridge_name)
|
||||
bridge_has_port_predicate = functools.partial(
|
||||
bridge_has_instance_port, bridge)
|
||||
try:
|
||||
common_utils.wait_until_true(
|
||||
bridge_has_port_predicate,
|
||||
timeout=WAIT_FOR_PORT_TIMEOUT)
|
||||
except eventlet.TimeoutError:
|
||||
LOG.error(
|
||||
_LE('No port appeared on trunk bridge %(br_name)s '
|
||||
'in %(timeout)d seconds. Cleaning up the bridge'),
|
||||
{'br_name': bridge.br_name,
|
||||
'timeout': WAIT_FOR_PORT_TIMEOUT})
|
||||
bridge.destroy()
|
||||
return
|
||||
|
||||
# Once we get hold of the trunk parent port, we can provision
|
||||
# the OVS dataplane for the trunk.
|
||||
try:
|
||||
self._wire_trunk(bridge, self._get_parent_port(bridge))
|
||||
except oslo_messaging.MessagingException as e:
|
||||
LOG.error(_LE("Got messaging error while processing trunk bridge "
|
||||
"%(bridge_name)s: %(err)s"),
|
||||
{'bridge_name': bridge.br_name,
|
||||
'err': e})
|
||||
except RuntimeError as e:
|
||||
LOG.error(_LE("Failed to get parent port for bridge "
|
||||
"%(bridge_name)s: %(err)s"),
|
||||
{'bridge_name': bridge.br_name,
|
||||
'err': e})
|
||||
|
||||
@lock_on_bridge_name
|
||||
def handle_trunk_remove(self, port):
|
||||
"""Remove wiring between trunk bridge and integration bridge.
|
||||
|
||||
The method calls into trunk manager to remove patch ports on
|
||||
integration bridge side and to delete the trunk bridge. It's decorated
|
||||
with a lock to prevent deletion of bridge while creation is still in
|
||||
process.
|
||||
|
||||
:param port: Parent port dict.
|
||||
"""
|
||||
try:
|
||||
parent_port_id, trunk_id, subport_ids = self._get_trunk_metadata(
|
||||
port)
|
||||
self.unwire_subports_for_trunk(trunk_id, subport_ids)
|
||||
self.trunk_manager.remove_trunk(trunk_id, parent_port_id)
|
||||
except tman.TrunkManagerError as te:
|
||||
LOG.error(_LE("Removing trunk %(trunk_id)s failed: %(err)s"),
|
||||
{'trunk_id': port['external_ids']['trunk_id'],
|
||||
'err': te})
|
||||
else:
|
||||
LOG.debug("Deleted resources associated to trunk: %s", trunk_id)
|
||||
|
||||
def manages_this_trunk(self, trunk_id):
|
||||
"""True if this OVSDB handler manages trunk based on given ID."""
|
||||
bridge_name = trunk_utils.gen_trunk_br_name(trunk_id)
|
||||
return ovs_lib.BaseOVS().bridge_exists(bridge_name)
|
||||
|
||||
def wire_subports_for_trunk(self, context, trunk_id, subports,
|
||||
trunk_bridge=None, parent_port=None):
|
||||
"""Create OVS ports associated to the logical subports."""
|
||||
# Tell the server that subports must be bound to this host.
|
||||
# If this fails at the very beginning of the OVS trunk bridge
|
||||
# lifecycle (trunk_bridge != None), then destroy the bridge
|
||||
# and give up.
|
||||
subport_bindings = self.trunk_rpc.update_subport_bindings(
|
||||
context, subports)
|
||||
|
||||
# Bindings were successful: create the OVS subports.
|
||||
subport_bindings = subport_bindings.get(trunk_id, [])
|
||||
subports_mac = {p['id']: p['mac_address'] for p in subport_bindings}
|
||||
subport_ids = []
|
||||
for subport in subports:
|
||||
try:
|
||||
self.trunk_manager.add_sub_port(trunk_id, subport.port_id,
|
||||
subports_mac[subport.port_id],
|
||||
subport.segmentation_id)
|
||||
except tman.TrunkManagerError as te:
|
||||
LOG.error(_LE("Failed to add subport with port ID "
|
||||
"%(subport_port_id)s to trunk with ID "
|
||||
"%(trunk_id)s: %(err)s"),
|
||||
{'subport_port_id': subport.port_id,
|
||||
'trunk_id': trunk_id,
|
||||
'err': te})
|
||||
else:
|
||||
subport_ids.append(subport.port_id)
|
||||
|
||||
try:
|
||||
self._set_trunk_metadata(
|
||||
trunk_bridge, parent_port, trunk_id, subport_ids)
|
||||
except RuntimeError:
|
||||
LOG.error(_LE("Failed to set metadata for trunk %s"), trunk_id)
|
||||
# NOTE(status_police): Trunk bridge has missing metadata now, it
|
||||
# will cause troubles during deletion.
|
||||
# TODO(jlibosva): Investigate how to proceed during removal of
|
||||
# trunk bridge that doesn't have metadata stored.
|
||||
self.trunk_rpc.update_trunk_status(
|
||||
context, trunk_id, constants.DEGRADED_STATUS)
|
||||
return
|
||||
|
||||
# Set trunk status to DEGRADED if not all subports were created
|
||||
# succesfully
|
||||
if len(subport_ids) != len(subports):
|
||||
# NOTE(status_police): Not all subports were processed so trunk
|
||||
# is changed to DEGRADED status to reflect it.
|
||||
self.trunk_rpc.update_trunk_status(
|
||||
context, trunk_id, constants.DEGRADED_STATUS)
|
||||
else:
|
||||
# NOTE(status_police): All new resources were processed and thus
|
||||
# trunk should be set back to ACTIVE status.
|
||||
self.trunk_rpc.update_trunk_status(context, trunk_id,
|
||||
constants.ACTIVE_STATUS)
|
||||
|
||||
LOG.debug("Added trunk: %s", trunk_id)
|
||||
|
||||
def unwire_subports_for_trunk(self, trunk_id, subport_ids):
|
||||
"""Destroy OVS ports associated to the logical subports."""
|
||||
for subport_id in subport_ids:
|
||||
try:
|
||||
self.trunk_manager.remove_sub_port(trunk_id, subport_id)
|
||||
except tman.TrunkManagerError as te:
|
||||
LOG.error(_LE("Removing subport %(subport_id)s from trunk "
|
||||
"%(trunk_id)s failed: %(err)s"),
|
||||
{'subport_id': subport_id,
|
||||
'trunk_id': trunk_id,
|
||||
'err': te})
|
||||
|
||||
def _get_parent_port(self, trunk_bridge):
|
||||
"""Return the OVS trunk parent port plugged on trunk_bridge."""
|
||||
trunk_br_ports = trunk_bridge.get_ports_attributes(
|
||||
'Interface', columns=['name', 'external_ids'],
|
||||
if_exists=True)
|
||||
for trunk_br_port in trunk_br_ports:
|
||||
if not is_trunk_service_port(trunk_br_port['name']):
|
||||
return trunk_br_port
|
||||
raise RuntimeError(
|
||||
"Can't find parent port for trunk bridge %s" %
|
||||
trunk_bridge.br_name)
|
||||
|
||||
def _wire_trunk(self, trunk_br, port):
|
||||
"""Wire trunk bridge with integration bridge.
|
||||
|
||||
The method calls into trunk manager to create patch ports for trunk and
|
||||
patch ports for all subports associated with this trunk.
|
||||
|
||||
:param trunk_br: OVSBridge object representing the trunk bridge.
|
||||
:param port: Parent port dict.
|
||||
"""
|
||||
ctx = self.context
|
||||
try:
|
||||
parent_port_id = (
|
||||
self.trunk_manager.get_port_uuid_from_external_ids(port))
|
||||
trunk = self.trunk_rpc.get_trunk_details(ctx, parent_port_id)
|
||||
except tman.TrunkManagerError as te:
|
||||
LOG.error(_LE("Can't obtain parent port ID from port %s"),
|
||||
port['name'])
|
||||
return
|
||||
except resources_rpc.ResourceNotFound:
|
||||
LOG.error(_LE("Port %s has no trunk associated."), parent_port_id)
|
||||
return
|
||||
|
||||
try:
|
||||
self.trunk_manager.create_trunk(
|
||||
trunk.id, trunk.port_id,
|
||||
port['external_ids'].get('attached-mac'))
|
||||
except tman.TrunkManagerError as te:
|
||||
LOG.error(_LE("Failed to create trunk %(trunk_id)s: %(err)s"),
|
||||
{'trunk_id': trunk.id,
|
||||
'err': te})
|
||||
# NOTE(status_police): Trunk couldn't be created so it ends in
|
||||
# ERROR status and resync can fix that later.
|
||||
self.trunk_rpc.update_trunk_status(context, trunk.id,
|
||||
constants.ERROR_STATUS)
|
||||
return
|
||||
|
||||
self.wire_subports_for_trunk(
|
||||
ctx, trunk.id, trunk.sub_ports, trunk_bridge=trunk_br,
|
||||
parent_port=port)
|
||||
|
||||
def _set_trunk_metadata(self, trunk_br, port, trunk_id, subport_ids):
|
||||
"""Set trunk metadata in OVS port for trunk parent port."""
|
||||
# update the parent port external_ids to store the trunk bridge
|
||||
# name, trunk id and subport ids so we can easily remove the trunk
|
||||
# bridge and service ports once this port is removed
|
||||
trunk_bridge = trunk_br or ovs_lib.OVSBridge(
|
||||
trunk_utils.gen_trunk_br_name(trunk_id))
|
||||
port = port or self._get_parent_port(trunk_bridge)
|
||||
|
||||
port['external_ids']['bridge_name'] = trunk_br.br_name
|
||||
port['external_ids']['trunk_id'] = trunk_id
|
||||
port['external_ids']['subport_ids'] = jsonutils.dumps(subport_ids)
|
||||
trunk_br.set_db_attribute(
|
||||
'Interface', port['name'], 'external_ids', port['external_ids'])
|
||||
|
||||
def _get_trunk_metadata(self, port):
|
||||
"""Get trunk metadata from OVS port."""
|
||||
parent_port_id = (
|
||||
self.trunk_manager.get_port_uuid_from_external_ids(port))
|
||||
trunk_id = port['external_ids']['trunk_id']
|
||||
subport_ids = jsonutils.loads(port['external_ids']['subport_ids'])
|
||||
|
||||
return parent_port_id, trunk_id, subport_ids
|
|
@ -13,8 +13,10 @@
|
|||
import contextlib
|
||||
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import exceptions
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.services.trunk.drivers.openvswitch.agent import exceptions as exc
|
||||
from neutron.services.trunk import utils
|
||||
|
@ -22,6 +24,10 @@ from neutron.services.trunk import utils
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TrunkManagerError(exceptions.NeutronException):
|
||||
message = _("Error while communicating with OVSDB: %(error)s")
|
||||
|
||||
|
||||
def get_br_int_port_name(prefix, port_id):
|
||||
"""Return the OVS port name for the given port ID.
|
||||
|
||||
|
@ -188,6 +194,10 @@ class SubPort(TrunkParentPort):
|
|||
|
||||
|
||||
class TrunkManager(object):
|
||||
"""It implements the OVS trunk dataplane.
|
||||
|
||||
It interfaces with the OVSDB server to execute OVS commands.
|
||||
"""
|
||||
|
||||
def __init__(self, br_int):
|
||||
self.br_int = br_int
|
||||
|
@ -205,22 +215,29 @@ class TrunkManager(object):
|
|||
|
||||
"""
|
||||
trunk = TrunkParentPort(trunk_id, port_id, port_mac)
|
||||
if not trunk.bridge.exists():
|
||||
raise exc.TrunkBridgeNotFound(bridge=trunk.bridge.br_name)
|
||||
# Once the bridges are connected with the following patch ports,
|
||||
# the ovs agent will recognize the ports for processing and it will
|
||||
# take over the wiring process and everything that entails.
|
||||
# REVISIT(rossella_s): revisit this integration part, should tighter
|
||||
# control over the wiring logic for trunk ports be required.
|
||||
trunk.plug(self.br_int)
|
||||
try:
|
||||
if not trunk.bridge.exists():
|
||||
raise exc.TrunkBridgeNotFound(bridge=trunk.bridge.br_name)
|
||||
# Once the bridges are connected with the following patch ports,
|
||||
# the ovs agent will recognize the ports for processing and it will
|
||||
# take over the wiring process and everything that entails.
|
||||
# REVISIT(rossella_s): revisit this integration part, should
|
||||
# tighter control over the wiring logic for trunk ports be
|
||||
# required.
|
||||
trunk.plug(self.br_int)
|
||||
except RuntimeError as e:
|
||||
raise TrunkManagerError(error=e)
|
||||
|
||||
def remove_trunk(self, trunk_id, port_id):
|
||||
"""Remove the trunk bridge."""
|
||||
trunk = TrunkParentPort(trunk_id, port_id)
|
||||
if trunk.bridge.exists():
|
||||
trunk.unplug(self.br_int)
|
||||
else:
|
||||
LOG.debug("Trunk bridge with ID %s doesn't exist.", trunk_id)
|
||||
try:
|
||||
if trunk.bridge.exists():
|
||||
trunk.unplug(self.br_int)
|
||||
else:
|
||||
LOG.debug("Trunk bridge with ID %s doesn't exist.", trunk_id)
|
||||
except RuntimeError as e:
|
||||
raise TrunkManagerError(error=e)
|
||||
|
||||
def add_sub_port(self, trunk_id, port_id, port_mac, segmentation_id):
|
||||
"""Create a sub_port.
|
||||
|
@ -234,9 +251,12 @@ class TrunkManager(object):
|
|||
sub_port = SubPort(trunk_id, port_id, port_mac, segmentation_id)
|
||||
# If creating of parent trunk bridge takes longer than API call for
|
||||
# creating subport then bridge doesn't exist yet.
|
||||
if not sub_port.bridge.exists():
|
||||
raise exc.TrunkBridgeNotFound(bridge=sub_port.bridge.br_name)
|
||||
sub_port.plug(self.br_int)
|
||||
try:
|
||||
if not sub_port.bridge.exists():
|
||||
raise exc.TrunkBridgeNotFound(bridge=sub_port.bridge.br_name)
|
||||
sub_port.plug(self.br_int)
|
||||
except RuntimeError as e:
|
||||
raise TrunkManagerError(error=e)
|
||||
|
||||
def remove_sub_port(self, trunk_id, port_id):
|
||||
"""Remove a sub_port.
|
||||
|
@ -248,7 +268,18 @@ class TrunkManager(object):
|
|||
|
||||
# Trunk bridge might have been deleted by calling delete_trunk() before
|
||||
# remove_sub_port().
|
||||
if sub_port.bridge.exists():
|
||||
sub_port.unplug(self.br_int)
|
||||
else:
|
||||
LOG.debug("Trunk bridge with ID %s doesn't exist.", trunk_id)
|
||||
try:
|
||||
if sub_port.bridge.exists():
|
||||
sub_port.unplug(self.br_int)
|
||||
else:
|
||||
LOG.debug("Trunk bridge with ID %s doesn't exist.", trunk_id)
|
||||
except RuntimeError as e:
|
||||
raise TrunkManagerError(error=e)
|
||||
|
||||
def get_port_uuid_from_external_ids(self, port):
|
||||
"""Return the port UUID from the port metadata."""
|
||||
try:
|
||||
return self.br_int.portid_from_external_ids(
|
||||
port['external_ids'])
|
||||
except RuntimeError as e:
|
||||
raise TrunkManagerError(error=e)
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
# Copyright (c) 2016 SUSE Linux Products GmbH
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron_lib import constants as n_consts
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.objects import trunk as trunk_obj
|
||||
from neutron.services.trunk.drivers.openvswitch.agent import ovsdb_handler
|
||||
from neutron.services.trunk.drivers.openvswitch.agent import trunk_manager
|
||||
from neutron.tests.functional.agent.l2 import base
|
||||
|
||||
|
||||
def generate_tap_device_name():
|
||||
return n_consts.TAP_DEVICE_PREFIX + common_utils.get_random_string(
|
||||
n_consts.DEVICE_NAME_MAX_LEN - len(n_consts.TAP_DEVICE_PREFIX))
|
||||
|
||||
|
||||
class OVSDBHandlerTestCase(base.OVSAgentTestFramework):
|
||||
"""Test funcionality of OVSDBHandler.
|
||||
|
||||
This suite aims for interaction between events coming from OVSDB monitor,
|
||||
agent and wiring ports via trunk bridge to integration bridge.
|
||||
"""
|
||||
def setUp(self):
|
||||
"""Prepare resources.
|
||||
|
||||
Set up trunk_dict representing incoming data from Neutron-server when
|
||||
fetching for trunk details. Another resource trunk_br represents the
|
||||
trunk bridge which its creation is simulated when creating a port in l2
|
||||
agent framework.
|
||||
"""
|
||||
super(OVSDBHandlerTestCase, self).setUp()
|
||||
trunk_id = uuidutils.generate_uuid()
|
||||
self.trunk_dict = {
|
||||
'id': trunk_id,
|
||||
'mac_address': common_utils.get_random_mac(
|
||||
'fa:16:3e:00:00:00'.split(':')),
|
||||
'sub_ports': []}
|
||||
self.trunk_port_name = generate_tap_device_name()
|
||||
self.trunk_br = trunk_manager.TrunkBridge(trunk_id)
|
||||
self.ovsdb_handler = self._prepare_mocked_ovsdb_handler()
|
||||
|
||||
def _prepare_mocked_ovsdb_handler(self):
|
||||
handler = ovsdb_handler.OVSDBHandler(
|
||||
trunk_manager.TrunkManager(ovs_lib.OVSBridge(self.br_int)))
|
||||
mock.patch.object(handler, 'trunk_rpc').start()
|
||||
|
||||
handler.trunk_rpc.get_trunk_details.side_effect = (
|
||||
self._mock_get_trunk_details)
|
||||
handler.trunk_rpc.update_subport_bindings.side_effect = (
|
||||
self._mock_update_subport_binding)
|
||||
|
||||
return handler
|
||||
|
||||
def _mock_get_trunk_details(self, context, parent_port_id):
|
||||
if parent_port_id == self.trunk_dict['port_id']:
|
||||
return trunk_obj.Trunk(**self.trunk_dict)
|
||||
|
||||
def _mock_update_subport_binding(self, context, subports):
|
||||
return {self.trunk_dict['id']: [
|
||||
{'id': subport['port_id'], 'mac_address': subport['mac_address']}
|
||||
for subport in subports]
|
||||
}
|
||||
|
||||
def _plug_ports(self, network, ports, agent, bridge=None, namespace=None):
|
||||
# creates only the trunk, the sub_port will be plugged by the
|
||||
# trunk manager
|
||||
if not self.trunk_br.exists():
|
||||
self.trunk_br.create()
|
||||
self.addCleanup(self.trunk_br.destroy)
|
||||
self.driver.plug(
|
||||
network['id'],
|
||||
self.trunk_dict['port_id'],
|
||||
self.trunk_port_name,
|
||||
self.trunk_dict['mac_address'],
|
||||
self.trunk_br.br_name)
|
||||
|
||||
def _mock_get_events(self, agent, polling_manager, ports):
|
||||
get_events = polling_manager.get_events
|
||||
p_ids = [p['id'] for p in ports]
|
||||
|
||||
def filter_events():
|
||||
events = get_events()
|
||||
filtered_events = {
|
||||
'added': [],
|
||||
'removed': []
|
||||
}
|
||||
for event_type in filtered_events:
|
||||
for dev in events[event_type]:
|
||||
iface_id = agent.int_br.portid_from_external_ids(
|
||||
dev.get('external_ids', []))
|
||||
is_for_this_test = (
|
||||
iface_id in p_ids or
|
||||
iface_id == self.trunk_dict['port_id'] or
|
||||
dev['name'] == self.trunk_br.br_name)
|
||||
if is_for_this_test:
|
||||
# if the event is not about a port that was created by
|
||||
# this test, we filter the event out. Since these tests
|
||||
# are not run in isolation processing all the events
|
||||
# might make some test fail ( e.g. the agent might keep
|
||||
# resycing because it keeps finding not ready ports
|
||||
# that are created by other tests)
|
||||
filtered_events[event_type].append(dev)
|
||||
return filtered_events
|
||||
mock.patch.object(polling_manager, 'get_events',
|
||||
side_effect=filter_events).start()
|
||||
|
||||
def _test_trunk_creation_helper(self, ports):
|
||||
self.setup_agent_and_ports(port_dicts=ports)
|
||||
self.wait_until_ports_state(self.ports, up=True)
|
||||
self.trunk_br.delete_port(self.trunk_port_name)
|
||||
self.wait_until_ports_state(self.ports, up=False)
|
||||
common_utils.wait_until_true(lambda:
|
||||
not self.trunk_br.bridge_exists(self.trunk_br.br_name))
|
||||
|
||||
def test_trunk_creation_with_subports(self):
|
||||
ports = self.create_test_ports(amount=3)
|
||||
self.trunk_dict['port_id'] = ports[0]['id']
|
||||
self.trunk_dict['sub_ports'] = [trunk_obj.SubPort(
|
||||
id=uuidutils.generate_uuid(),
|
||||
port_id=ports[i]['id'],
|
||||
mac_address=ports[i]['mac_address'],
|
||||
segmentation_id=i,
|
||||
trunk_id=self.trunk_dict['id'])
|
||||
for i in range(1, 3)]
|
||||
self._test_trunk_creation_helper(ports[:1])
|
||||
|
||||
def test_trunk_creation_with_no_subports(self):
|
||||
ports = self.create_test_ports(amount=1)
|
||||
self.trunk_dict['port_id'] = ports[0]['id']
|
||||
self._test_trunk_creation_helper(ports)
|
|
@ -23,6 +23,6 @@ class OvsTrunkSkeletonTest(base.BaseTestCase):
|
|||
@mock.patch("neutron.api.rpc.callbacks.resource_manager."
|
||||
"ConsumerResourceCallbacksManager.unregister")
|
||||
def test___init__(self, mocked_unregister):
|
||||
test_obj = driver.OVSTrunkSkeleton()
|
||||
test_obj = driver.OVSTrunkSkeleton(mock.ANY)
|
||||
mocked_unregister.assert_called_with(test_obj.handle_trunks,
|
||||
resources.TRUNK)
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
import eventlet
|
||||
import oslo_messaging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
from neutron.objects import trunk as trunk_obj
|
||||
from neutron.services.trunk import constants
|
||||
from neutron.services.trunk.drivers.openvswitch.agent import ovsdb_handler
|
||||
from neutron.services.trunk.drivers.openvswitch.agent import trunk_manager
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class TestIsTrunkServicePort(base.BaseTestCase):
|
||||
def test_with_bridge_name(self):
|
||||
observed = ovsdb_handler.is_trunk_service_port('tbr-foo')
|
||||
self.assertTrue(observed)
|
||||
|
||||
def test_with_subport_patch_port_int_side(self):
|
||||
observed = ovsdb_handler.is_trunk_service_port('spi-foo')
|
||||
self.assertTrue(observed)
|
||||
|
||||
def test_with_subport_patch_port_trunk_side(self):
|
||||
observed = ovsdb_handler.is_trunk_service_port('spt-foo')
|
||||
self.assertTrue(observed)
|
||||
|
||||
def test_with_trunk_patch_port_int_side(self):
|
||||
observed = ovsdb_handler.is_trunk_service_port('tpi-foo')
|
||||
self.assertTrue(observed)
|
||||
|
||||
def test_with_trunk_patch_port_trunk_side(self):
|
||||
observed = ovsdb_handler.is_trunk_service_port('tpt-foo')
|
||||
self.assertTrue(observed)
|
||||
|
||||
def test_with_random_string(self):
|
||||
observed = ovsdb_handler.is_trunk_service_port('foo')
|
||||
self.assertFalse(observed)
|
||||
|
||||
|
||||
class TestBridgeHasInstancePort(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestBridgeHasInstancePort, self).setUp()
|
||||
self.bridge = mock.Mock()
|
||||
self.present_interfaces = []
|
||||
self.bridge.get_iface_name_list.return_value = self.present_interfaces
|
||||
|
||||
def test_only_service_ports_on_bridge(self):
|
||||
"""Test when only with patch ports and bridge name are on trunk bridge.
|
||||
"""
|
||||
self.present_interfaces.extend(
|
||||
['tbr-foo', 'spt-foo', 'tpt-foo'])
|
||||
self.assertFalse(ovsdb_handler.bridge_has_instance_port(self.bridge))
|
||||
|
||||
def test_device_on_bridge(self):
|
||||
"""Condition is True decause of foo device is present on bridge."""
|
||||
self.present_interfaces.extend(
|
||||
['tbr-foo', 'spt-foo', 'tpt-foo', 'foo'])
|
||||
self.assertTrue(ovsdb_handler.bridge_has_instance_port(self.bridge))
|
||||
|
||||
|
||||
class TestOVSDBHandler(base.BaseTestCase):
|
||||
"""Test that RPC or OVSDB failures do not cause crash."""
|
||||
def setUp(self):
|
||||
super(TestOVSDBHandler, self).setUp()
|
||||
self.ovsdb_handler = ovsdb_handler.OVSDBHandler(mock.sentinel.manager)
|
||||
mock.patch.object(self.ovsdb_handler, 'trunk_rpc').start()
|
||||
mock.patch.object(self.ovsdb_handler, 'trunk_manager').start()
|
||||
self.trunk_manager = self.ovsdb_handler.trunk_manager
|
||||
self.trunk_id = uuidutils.generate_uuid()
|
||||
self.fake_subports = [
|
||||
trunk_obj.SubPort(
|
||||
id=uuidutils.generate_uuid(),
|
||||
port_id=uuidutils.generate_uuid(),
|
||||
segmentation_id=1)]
|
||||
self.fake_port = {
|
||||
'name': 'foo',
|
||||
'external_ids': {
|
||||
'trunk_id': 'trunk_id',
|
||||
'subport_ids': jsonutils.dumps(
|
||||
[s.id for s in self.fake_subports]),
|
||||
}
|
||||
}
|
||||
self.subport_bindings = {
|
||||
'trunk_id': [
|
||||
{'id': subport.port_id,
|
||||
'mac_address': 'mac'} for subport in self.fake_subports]}
|
||||
|
||||
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
|
||||
@mock.patch('neutron.common.utils.wait_until_true',
|
||||
side_effect=eventlet.TimeoutError)
|
||||
def test_handle_trunk_add_interface_wont_appear(self, wut, br):
|
||||
mock_br = br.return_value
|
||||
self.ovsdb_handler.handle_trunk_add('foo')
|
||||
self.assertTrue(mock_br.destroy.called)
|
||||
|
||||
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
|
||||
def test_handle_trunk_add_rpc_failure(self, br):
|
||||
with mock.patch.object(self.ovsdb_handler, '_wire_trunk',
|
||||
side_effect=oslo_messaging.MessagingException):
|
||||
with mock.patch.object(ovsdb_handler, 'bridge_has_instance_port',
|
||||
return_value=True):
|
||||
self.ovsdb_handler.handle_trunk_add('foo')
|
||||
|
||||
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
|
||||
def test_handle_trunk_add_ovsdb_failure(self, br):
|
||||
with mock.patch.object(self.ovsdb_handler, '_wire_trunk',
|
||||
side_effect=RuntimeError):
|
||||
with mock.patch.object(ovsdb_handler, 'bridge_has_instance_port',
|
||||
return_value=True):
|
||||
self.ovsdb_handler.handle_trunk_add('foo')
|
||||
|
||||
def test_handle_trunk_remove_trunk_manager_failure(self):
|
||||
with mock.patch.object(self.ovsdb_handler, '_get_trunk_metadata',
|
||||
side_effect=trunk_manager.TrunkManagerError(error='error')):
|
||||
self.ovsdb_handler.handle_trunk_remove(self.fake_port)
|
||||
|
||||
def test_handle_trunk_remove_rpc_failure(self):
|
||||
self.ovsdb_handler.trunk_rpc.update_trunk_status = (
|
||||
oslo_messaging.MessagingException)
|
||||
self.ovsdb_handler.handle_trunk_remove(self.fake_port)
|
||||
|
||||
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
|
||||
def test_wire_subports_for_trunk_trunk_manager_failure(self, br):
|
||||
trunk_rpc = self.ovsdb_handler.trunk_rpc
|
||||
trunk_rpc.update_subport_bindings.return_value = self.subport_bindings
|
||||
self.trunk_manager.add_sub_port.side_effect = (
|
||||
trunk_manager.TrunkManagerError(error='error'))
|
||||
|
||||
self.ovsdb_handler.wire_subports_for_trunk(
|
||||
None, 'trunk_id', self.fake_subports)
|
||||
|
||||
trunk_rpc.update_trunk_status.assert_called_once_with(
|
||||
mock.ANY, mock.ANY, constants.DEGRADED_STATUS)
|
||||
|
||||
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
|
||||
def test_wire_subports_for_trunk_ovsdb_failure(self, br):
|
||||
self.ovsdb_handler.trunk_rpc.update_subport_bindings.return_value = (
|
||||
self.subport_bindings)
|
||||
with mock.patch.object(self.ovsdb_handler, '_set_trunk_metadata',
|
||||
side_effect=RuntimeError):
|
||||
self.ovsdb_handler.wire_subports_for_trunk(
|
||||
None, 'trunk_id', self.fake_subports)
|
||||
|
||||
def test_unwire_subports_for_trunk_trunk_manager_failure(self):
|
||||
self.trunk_manager.remove_sub_port.side_effect = (
|
||||
trunk_manager.TrunkManagerError(error='error'))
|
||||
self.ovsdb_handler.unwire_subports_for_trunk(None, ['subport_id'])
|
||||
|
||||
def test__wire_trunk_get_trunk_details_failure(self):
|
||||
self.trunk_manager.get_port_uuid_from_external_ids.side_effect = (
|
||||
trunk_manager.TrunkManagerError(error='error'))
|
||||
self.ovsdb_handler._wire_trunk(mock.Mock(), self.fake_port)
|
||||
|
||||
def test__wire_trunk_trunk_not_associated(self):
|
||||
self.ovsdb_handler.trunk_rpc.get_trunk_details.side_effect = (
|
||||
resources_rpc.ResourceNotFound(
|
||||
resource_id='id', resource_type='type'))
|
||||
self.ovsdb_handler._wire_trunk(mock.Mock(), self.fake_port)
|
||||
|
||||
def test__wire_trunk_create_trunk_failure(self):
|
||||
self.trunk_manager.create_trunk.side_effect = (
|
||||
trunk_manager.TrunkManagerError(error='error'))
|
||||
self.ovsdb_handler._wire_trunk(mock.Mock(), self.fake_port)
|
||||
trunk_rpc = self.ovsdb_handler.trunk_rpc
|
||||
trunk_rpc.update_trunk_status.assert_called_once_with(
|
||||
mock.ANY, mock.ANY, constants.ERROR_STATUS)
|
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import mock
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
@ -65,3 +66,36 @@ class TrunkParentPortTestCase(base.BaseTestCase):
|
|||
with self.trunk.ovsdb_transaction() as txn2:
|
||||
mock.patch.object(txn2, 'commit').start()
|
||||
self.assertIsNot(txn1, txn2)
|
||||
|
||||
|
||||
class TrunkManagerTestCase(base.BaseTestCase):
|
||||
"""Tests are aimed to cover negative cases to make sure there is no typo in
|
||||
the logging.
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TrunkManagerTestCase, self).setUp()
|
||||
self.trunk_manager = trunk_manager.TrunkManager(mock.sentinel.br_int)
|
||||
mock.patch.object(trunk_manager, 'TrunkBridge').start()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _resource_fails(self, resource, method_name):
|
||||
with mock.patch.object(resource, method_name,
|
||||
side_effect=RuntimeError):
|
||||
with testtools.ExpectedException(trunk_manager.TrunkManagerError):
|
||||
yield
|
||||
|
||||
def test_create_trunk_plug_fails(self):
|
||||
with self._resource_fails(trunk_manager.TrunkParentPort, 'plug'):
|
||||
self.trunk_manager.create_trunk(None, None, None)
|
||||
|
||||
def test_remove_trunk_unplug_fails(self):
|
||||
with self._resource_fails(trunk_manager.TrunkParentPort, 'unplug'):
|
||||
self.trunk_manager.remove_trunk(None, None)
|
||||
|
||||
def test_add_sub_port_plug_fails(self):
|
||||
with self._resource_fails(trunk_manager.SubPort, 'plug'):
|
||||
self.trunk_manager.add_sub_port(None, None, None, None)
|
||||
|
||||
def test_remove_sub_port_unplug_fails(self):
|
||||
with self._resource_fails(trunk_manager.SubPort, 'unplug'):
|
||||
self.trunk_manager.remove_sub_port(None, None)
|
||||
|
|
Loading…
Reference in New Issue