neutron/neutron/services/trunk/drivers/openvswitch/agent/ovsdb_handler.py

498 lines
22 KiB
Python

# Copyright (c) 2016 SUSE Linux Products GmbH
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import eventlet
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib import context as n_context
from oslo_concurrency import lockutils
from oslo_context import context as o_context
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
from neutron._i18n import _
from neutron.agent.common import ovs_lib
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import utils as common_utils
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants as ovs_agent_constants
from neutron.services.trunk import constants
from neutron.services.trunk.drivers.openvswitch.agent import exceptions
from neutron.services.trunk.drivers.openvswitch.agent \
import trunk_manager as tman
from neutron.services.trunk.drivers.openvswitch import constants as t_const
from neutron.services.trunk.drivers.openvswitch import utils
from neutron.services.trunk.rpc import agent
LOG = logging.getLogger(__name__)
DEFAULT_WAIT_FOR_PORT_TIMEOUT = 60
def lock_on_bridge_name(required_parameter):
def func_decor(f):
try:
br_arg_index = f.__code__.co_varnames.index(required_parameter)
except ValueError:
raise RuntimeError(_("%s parameter is required for this decorator")
% required_parameter)
@functools.wraps(f)
def inner(*args, **kwargs):
try:
bridge_name = kwargs[required_parameter]
except KeyError:
bridge_name = args[br_arg_index]
with lockutils.lock(bridge_name):
return f(*args, **kwargs)
return inner
return func_decor
def is_trunk_bridge(port_name):
return port_name.startswith(t_const.TRUNK_BR_PREFIX)
def is_subport(port_name):
return port_name.startswith(tman.SubPort.DEV_PREFIX)
def is_trunk_service_port(port_name):
"""True if the port is any of the ports used to realize a trunk."""
return is_trunk_bridge(port_name) or port_name[:2] in (
tman.TrunkParentPort.DEV_PREFIX,
tman.SubPort.DEV_PREFIX)
def bridge_has_port(bridge, is_port_predicate):
"""True if there is an OVS port for which is_port_predicate is True.
"""
try:
ifaces = bridge.get_iface_name_list()
except RuntimeError as e:
LOG.error("Cannot obtain interface list for bridge %(bridge)s: "
"%(err)s",
{'bridge': bridge.br_name,
'err': e})
return False
return any(iface for iface in ifaces if is_port_predicate(iface))
def bridge_has_instance_port(bridge):
"""True if there is an OVS port that doesn't have bridge or patch ports
prefix.
"""
is_instance_port = lambda p: not is_trunk_service_port(p)
return bridge_has_port(bridge, is_instance_port)
def bridge_has_service_port(bridge):
"""True if there is an OVS port that is used to implement a trunk.
"""
return bridge_has_port(bridge, is_trunk_service_port)
@registry.has_registry_receivers
class OVSDBHandler(object):
"""It listens to OVSDB events to create the physical resources associated
to a logical trunk in response to OVSDB events (such as VM boot and/or
delete).
"""
def __init__(self, trunk_manager):
self.timeout = DEFAULT_WAIT_FOR_PORT_TIMEOUT
self._context = n_context.get_admin_context_without_session()
self.trunk_manager = trunk_manager
self.trunk_rpc = agent.TrunkStub()
@property
def context(self):
self._context.request_id = o_context.generate_request_id()
return self._context
@registry.receives(ovs_agent_constants.OVSDB_RESOURCE, [events.AFTER_READ])
def process_trunk_port_events(
self, resource, event, trigger, ovsdb_events):
"""Process added and removed port events coming from OVSDB monitor."""
for port_event in ovsdb_events['added']:
port_name = port_event['name']
if is_trunk_bridge(port_name):
LOG.debug("Processing trunk bridge %s", port_name)
# As there is active waiting for port to appear, it's handled
# in a separate greenthread.
# NOTE: port_name is equal to bridge_name at this point.
eventlet.spawn_n(self.handle_trunk_add, port_name)
for port_event in ovsdb_events['removed']:
bridge_name = port_event['external_ids'].get('bridge_name')
if bridge_name and is_trunk_bridge(bridge_name):
eventlet.spawn_n(
self.handle_trunk_remove, bridge_name, port_event)
@lock_on_bridge_name(required_parameter='bridge_name')
def handle_trunk_add(self, bridge_name):
"""Create trunk bridge based on parent port ID.
This method is decorated with a lock that prevents processing deletion
while creation hasn't been finished yet. It's based on the bridge name
so we can keep processing other bridges in parallel.
:param bridge_name: Name of the created trunk bridge.
"""
bridge = ovs_lib.OVSBridge(bridge_name)
# Handle condition when there was bridge in both added and removed
# events and handle_trunk_remove greenthread was executed before
# handle_trunk_add
if not bridge.bridge_exists(bridge_name):
LOG.debug("The bridge %s was deleted before it was handled.",
bridge_name)
return
# Determine the state of the trunk bridge by looking for the VM's port,
# i.e. the trunk parent port and/or patch ports to be present. If the
# VM is absent, then we clean the dangling bridge. If the VM is present
# the value of 'rewire' tells us whether or not the bridge was dealt
# with in a previous added event, and thus it has active patch ports.
if not self._is_vm_connected(bridge):
LOG.debug("No instance port associated to bridge %s could be "
"found. Deleting bridge and its resources.", bridge_name)
self.trunk_manager.dispose_trunk(bridge)
return
# Check if the trunk was provisioned in a previous run. This can happen
# at agent startup when existing trunks are notified as added events.
rewire = bridge_has_service_port(bridge)
# Once we get hold of the trunk parent port, we can provision
# the OVS dataplane for the trunk.
try:
self._wire_trunk(bridge, self._get_parent_port(bridge), rewire)
except oslo_messaging.MessagingException as e:
LOG.error("Got messaging error while processing trunk bridge "
"%(bridge_name)s: %(err)s",
{'bridge_name': bridge.br_name,
'err': e})
except exceptions.ParentPortNotFound as e:
LOG.error("Failed to get parent port for bridge "
"%(bridge_name)s: %(err)s",
{'bridge_name': bridge.br_name,
'err': e})
@lock_on_bridge_name(required_parameter='bridge_name')
def handle_trunk_remove(self, bridge_name, port):
"""Remove wiring between trunk bridge and integration bridge.
The method calls into trunk manager to remove patch ports on
integration bridge side and to delete the trunk bridge. It's decorated
with a lock to prevent deletion of bridge while creation is still in
process.
:param bridge_name: Name of the bridge used for locking purposes.
:param port: Parent port dict.
"""
# TODO(njohnston): In the case of DPDK with trunk ports, if nova
# deletes an interface and then re-adds it we can get a race
# condition where the port is re-added and then the bridge is
# deleted because we did not properly catch the re-addition. To
# solve this would require transitioning to ordered event
# resolution, like the L3 agent does with the
# ResourceProcessingQueue class. Until we can make that happen, we
# try to mitigate the issue by checking if there is a port on the
# bridge and if so then do not remove it.
bridge = ovs_lib.OVSBridge(bridge_name)
if bridge_has_instance_port(bridge):
LOG.debug("The bridge %s has instances attached so it will not "
"be deleted.", bridge_name)
return
try:
# TODO(jlibosva): Investigate how to proceed during removal of
# trunk bridge that doesn't have metadata stored.
parent_port_id, trunk_id, subport_ids = self._get_trunk_metadata(
port)
# NOTE(status_police): we do not report changes in trunk status on
# removal to avoid potential races between agents in case the event
# is due to a live migration or reassociation of a trunk to a new
# VM.
self.unwire_subports_for_trunk(trunk_id, subport_ids)
self.trunk_manager.remove_trunk(trunk_id, parent_port_id)
except tman.TrunkManagerError as te:
LOG.error("Removing trunk %(trunk_id)s failed: %(err)s",
{'trunk_id': port['external_ids']['trunk_id'],
'err': te})
else:
LOG.debug("Deleted resources associated to trunk: %s", trunk_id)
def manages_this_trunk(self, trunk_id):
"""True if this OVSDB handler manages trunk based on given ID."""
bridge_name = utils.gen_trunk_br_name(trunk_id)
return ovs_lib.BaseOVS().bridge_exists(bridge_name)
def get_connected_subports_for_trunk(self, trunk_id):
"""Return the list of subports present on the trunk bridge."""
bridge = ovs_lib.OVSBridge(utils.gen_trunk_br_name(trunk_id))
if not bridge.bridge_exists(bridge.br_name):
return []
try:
ports = bridge.get_ports_attributes(
'Interface', columns=['name', 'external_ids'])
return [
self.trunk_manager.get_port_uuid_from_external_ids(port)
for port in ports if is_subport(port['name'])
]
except (RuntimeError, tman.TrunkManagerError) as e:
LOG.error("Failed to get subports for bridge %(bridge)s: "
"%(err)s", {'bridge': bridge.br_name, 'err': e})
return []
def wire_subports_for_trunk(self, context, trunk_id, subports,
trunk_bridge=None, parent_port=None):
"""Create OVS ports associated to the logical subports."""
# Tell the server that subports must be bound to this host.
subport_bindings = self.trunk_rpc.update_subport_bindings(
context, subports)
# Bindings were successful: create the OVS subports.
subport_bindings = subport_bindings.get(trunk_id, [])
subports_mac = {p['id']: p['mac_address'] for p in subport_bindings}
subport_ids = []
for subport in subports:
try:
self.trunk_manager.add_sub_port(trunk_id, subport.port_id,
subports_mac[subport.port_id],
subport.segmentation_id)
except tman.TrunkManagerError as te:
LOG.error("Failed to add subport with port ID "
"%(subport_port_id)s to trunk with ID "
"%(trunk_id)s: %(err)s",
{'subport_port_id': subport.port_id,
'trunk_id': trunk_id,
'err': te})
else:
subport_ids.append(subport.port_id)
try:
self._update_trunk_metadata(
trunk_bridge, parent_port, trunk_id, subport_ids)
except (RuntimeError, exceptions.ParentPortNotFound) as e:
LOG.error("Failed to store metadata for trunk %(trunk_id)s: "
"%(reason)s", {'trunk_id': trunk_id, 'reason': e})
# NOTE(status_police): Trunk bridge has stale metadata now, it
# might cause troubles during deletion. Signal a DEGRADED status;
# if the user undo/redo the operation things may go back to
# normal.
return constants.DEGRADED_STATUS
LOG.debug("Added trunk: %s", trunk_id)
return self._get_current_status(subports, subport_ids)
def unwire_subports_for_trunk(self, trunk_id, subport_ids):
"""Destroy OVS ports associated to the logical subports."""
ids = []
for subport_id in subport_ids:
try:
self.trunk_manager.remove_sub_port(trunk_id, subport_id)
ids.append(subport_id)
except tman.TrunkManagerError as te:
LOG.error("Removing subport %(subport_id)s from trunk "
"%(trunk_id)s failed: %(err)s",
{'subport_id': subport_id,
'trunk_id': trunk_id,
'err': te})
try:
# OVS bridge and port to be determined by _update_trunk_metadata
bridge = None
port = None
self._update_trunk_metadata(
bridge, port, trunk_id, subport_ids, wire=False)
except RuntimeError as e:
# NOTE(status_police): Trunk bridge has stale metadata now, it
# might cause troubles during deletion. Signal a DEGRADED status;
# if the user undo/redo the operation things may go back to
# normal.
LOG.error("Failed to store metadata for trunk %(trunk_id)s: "
"%(reason)s", {'trunk_id': trunk_id, 'reason': e})
return constants.DEGRADED_STATUS
except exceptions.ParentPortNotFound as e:
# If a user deletes/migrates a VM and remove subports from a trunk
# in short sequence, there is a chance that we hit this spot in
# that the trunk may still be momentarily bound to the agent. We
# should not mark the status as DEGRADED in this case.
LOG.debug(e)
return self._get_current_status(subport_ids, ids)
def report_trunk_status(self, context, trunk_id, status):
"""Report trunk status to the server."""
self.trunk_rpc.update_trunk_status(context, trunk_id, status)
def _get_parent_port(self, trunk_bridge):
"""Return the OVS trunk parent port plugged on trunk_bridge."""
trunk_br_ports = trunk_bridge.get_ports_attributes(
'Interface', columns=['name', 'external_ids'],
if_exists=True)
for trunk_br_port in trunk_br_ports:
if not is_trunk_service_port(trunk_br_port['name']):
return trunk_br_port
raise exceptions.ParentPortNotFound(bridge=trunk_bridge.br_name)
def _wire_trunk(self, trunk_br, port, rewire=False):
"""Wire trunk bridge with integration bridge.
The method calls into trunk manager to create patch ports for trunk and
patch ports for all subports associated with this trunk. If rewire is
True, a diff is performed between desired state (the one got from the
server) and actual state (the patch ports present on the trunk bridge)
and subports are wired/unwired accordingly.
:param trunk_br: OVSBridge object representing the trunk bridge.
:param port: Parent port dict.
:param rewire: True if local trunk state must be reconciled with
server's state.
"""
ctx = self.context
try:
parent_port_id = (
self.trunk_manager.get_port_uuid_from_external_ids(port))
trunk = self.trunk_rpc.get_trunk_details(ctx, parent_port_id)
except tman.TrunkManagerError as te:
LOG.error("Can't obtain parent port ID from port %s",
port['name'])
return
except resources_rpc.ResourceNotFound:
LOG.error("Port %s has no trunk associated.", parent_port_id)
return
try:
registry.notify(
constants.TRUNK, events.BEFORE_CREATE, self,
context=ctx, trunk=trunk)
self.trunk_manager.create_trunk(
trunk.id, trunk.port_id,
port['external_ids'].get('attached-mac'))
except tman.TrunkManagerError as te:
LOG.error("Failed to create trunk %(trunk_id)s: %(err)s",
{'trunk_id': trunk.id,
'err': te})
# NOTE(status_police): Trunk couldn't be created so it ends in
# ERROR status and resync can fix that later.
self.report_trunk_status(ctx, trunk.id, constants.ERROR_STATUS)
return
# We need to remove stale subports
unwire_status = constants.ACTIVE_STATUS
if rewire:
old_subport_ids = self.get_connected_subports_for_trunk(trunk.id)
subports = {p['port_id'] for p in trunk.sub_ports}
subports_to_delete = set(old_subport_ids) - subports
if subports_to_delete:
unwire_status = self.unwire_subports_for_trunk(
trunk.id, subports_to_delete)
# NOTE(status_police): inform the server whether the operation
# was a partial or complete success. Do not inline status.
# NOTE: in case of rewiring we readd ports that are already present on
# the bridge because e.g. the segmentation ID might have changed (e.g.
# agent crashed, port was removed and readded with a different seg ID)
wire_status = self.wire_subports_for_trunk(
ctx, trunk.id, trunk.sub_ports,
trunk_bridge=trunk_br, parent_port=port)
if (unwire_status == wire_status and
wire_status == constants.ACTIVE_STATUS):
status = constants.ACTIVE_STATUS
else:
status = constants.DEGRADED_STATUS
self.report_trunk_status(ctx, trunk.id, status)
def _set_trunk_metadata(self, trunk_bridge, port, trunk_id, subport_ids):
"""Set trunk metadata in OVS port for trunk parent port."""
# update the parent port external_ids to store the trunk bridge
# name, trunk id and subport ids so we can easily remove the trunk
# bridge and service ports once this port is removed
trunk_bridge = trunk_bridge or ovs_lib.OVSBridge(
utils.gen_trunk_br_name(trunk_id))
port = port or self._get_parent_port(trunk_bridge)
port['external_ids']['bridge_name'] = trunk_bridge.br_name
port['external_ids']['trunk_id'] = trunk_id
port['external_ids']['subport_ids'] = jsonutils.dumps(subport_ids)
trunk_bridge.set_db_attribute(
'Interface', port['name'], 'external_ids', port['external_ids'])
def _get_trunk_metadata(self, port):
"""Get trunk metadata from OVS port."""
parent_port_id = (
self.trunk_manager.get_port_uuid_from_external_ids(port))
trunk_id = port['external_ids'].get('trunk_id')
subport_ids = jsonutils.loads(
port['external_ids'].get('subport_ids', '[]'))
return parent_port_id, trunk_id, subport_ids
def _update_trunk_metadata(self, trunk_bridge, port,
trunk_id, subport_ids, wire=True):
"""Update trunk metadata.
:param trunk_bridge: OVS trunk bridge.
:param port: OVS parent port.
:param trunk_id: trunk ID.
:param subport_ids: subports affecting the metadata.
:param wire: if True subport_ids are added, otherwise removed.
"""
trunk_bridge = trunk_bridge or ovs_lib.OVSBridge(
utils.gen_trunk_br_name(trunk_id))
port = port or self._get_parent_port(trunk_bridge)
_port_id, _trunk_id, old_subports = self._get_trunk_metadata(port)
if wire:
new_subports = set(old_subports) | set(subport_ids)
else:
new_subports = set(old_subports) - set(subport_ids)
self._set_trunk_metadata(trunk_bridge, port, trunk_id, new_subports)
def _get_current_status(self, expected_subports, actual_subports):
"""Return the current status of the trunk.
If the number of expected subports to be processed does not match the
number of subports successfully processed, the status returned is
DEGRADED, ACTIVE otherwise.
"""
# NOTE(status_police): a call to this method should be followed by
# a trunk_update_status to report the latest trunk status, but there
# can be exceptions (e.g. unwire_subports_for_trunk).
if len(expected_subports) != len(actual_subports):
return constants.DEGRADED_STATUS
else:
return constants.ACTIVE_STATUS
def _is_vm_connected(self, bridge):
"""True if an instance is connected to bridge, False otherwise."""
bridge_has_port_predicate = functools.partial(
bridge_has_instance_port, bridge)
try:
common_utils.wait_until_true(
bridge_has_port_predicate,
timeout=self.timeout)
return True
except common_utils.WaitTimeout:
LOG.error(
'No port present on trunk bridge %(br_name)s '
'in %(timeout)d seconds.',
{'br_name': bridge.br_name,
'timeout': self.timeout})
return False