networking-arista/networking_arista/ml2/mechanism_arista.py

437 lines
18 KiB
Python

# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as n_const
from neutron_lib.plugins.ml2 import api as driver_api
from neutron_lib.services.trunk import constants as trunk_consts
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from neutron.db import segments_db
from networking_arista.common import constants as a_const
from networking_arista.common import db_lib
from networking_arista.ml2 import arista_trunk
from networking_arista.ml2.rpc.arista_eapi import AristaRPCWrapperEapi
# When used as a Neutron plugin, neutron-lib imports this code. However earlier
# neutron-lib has already imported 'multiprocessing'. This means the python
# module cache (sys.modules) contains a version of 'multiprocessing' where
# select.poll() exists.
#
# Further down we import arista_sync, which spawns a greenthread. This
# greenthread then uses a green version of 'multiprocessing' where
# select.poll() has been removed.
#
# Doing here multiprocessing.Queue.put() and in the greenthread
# multiprocessing.Queue.get(timeout=...) leads to:
# AttributeError: module 'select' has no attribute 'poll'
#
# We can't do eventlet.monkey_patch() early enough (before the first
# 'mutiprocessing' import) as we would have to do it in neutron-lib and it's
# forbidden, see https://review.opendev.org/#/c/333017/
#
# The solution is to let the python module cache here forget the already
# imported 'multiprocessing' and re-import a green one. Here again
# eventlet.monkey_patch() doesn't seem to help as it doesn't seem to touch
# 'multiprocessing'. Thus we use eventlet.import_patched() instead:
import eventlet
import sys
modules_to_forget = []
for imported_module_name in sys.modules:
if imported_module_name.startswith('multiprocessing'):
modules_to_forget.append(imported_module_name)
for module_to_forget in modules_to_forget:
del sys.modules[module_to_forget]
for module_to_forget in modules_to_forget:
try:
eventlet.import_patched(module_to_forget)
except ImportError:
pass
# import a green 'multiprocessing':
multiprocessing = eventlet.import_patched('multiprocessing')
from networking_arista.ml2 import arista_sync # noqa: E402
LOG = logging.getLogger(__name__)
cfg.CONF.import_group('ml2_arista', 'networking_arista.common.config')
def log_context(function, context):
pretty_context = json.dumps(context, sort_keys=True, indent=4)
LOG.debug(function)
LOG.debug(pretty_context)
class MechResource(object):
"""Container class for passing data to sync worker"""
def __init__(self, id, resource_type, action, related_resources=None):
self.id = id
self.resource_type = resource_type
self.action = action
self.related_resources = related_resources or list()
def __str__(self):
return "%s %s ID: %s" % (self.action, self.resource_type, self.id)
class AristaDriver(driver_api.MechanismDriver):
"""Ml2 Mechanism driver for Arista networking hardware.
Remembers all networks and VMs that are provisioned on Arista Hardware.
Does not send network provisioning request if the network has already been
provisioned before for the given port.
"""
def __init__(self):
confg = cfg.CONF.ml2_arista
self.managed_physnets = confg['managed_physnets']
self.manage_fabric = confg['manage_fabric']
self.eapi = AristaRPCWrapperEapi()
self.mlag_pairs = dict()
self.provision_queue = multiprocessing.Queue()
self.trunk_driver = None
self.vif_details = {portbindings.VIF_DETAILS_CONNECTIVITY:
self.connectivity}
@property
def connectivity(self):
return portbindings.CONNECTIVITY_L2
def initialize(self):
self.mlag_pairs = db_lib.get_mlag_physnets()
self.trunk_driver = arista_trunk.AristaTrunkDriver.create()
def get_workers(self):
return [arista_sync.AristaSyncWorker(self.provision_queue)]
def create_network(self, network, segments):
"""Enqueue network create"""
tenant_id = network['project_id']
action = a_const.CREATE if tenant_id else a_const.FULL_SYNC
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, action)
n_res.related_resources.append((a_const.TENANT_RESOURCE, tenant_id))
for segment in segments:
n_res.related_resources.append(
(a_const.SEGMENT_RESOURCE, segment['id']))
self.provision_queue.put(n_res)
def delete_network(self, network, segments):
"""Enqueue network delete"""
tenant_id = network['project_id']
action = a_const.DELETE if tenant_id else a_const.FULL_SYNC
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, action)
# Delete tenant if this was the last tenant resource
if not db_lib.tenant_provisioned(tenant_id):
n_res.related_resources.append(
(a_const.TENANT_RESOURCE, tenant_id))
for segment in segments:
n_res.related_resources.append(
(a_const.SEGMENT_RESOURCE, segment['id']))
self.provision_queue.put(n_res)
def delete_segment(self, segment):
"""Enqueue segment delete"""
s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE,
a_const.DELETE)
self.provision_queue.put(s_res)
def get_instance_type(self, port):
"""Determine the port type based on device owner and vnic type"""
if port[portbindings.VNIC_TYPE] == portbindings.VNIC_BAREMETAL:
return a_const.BAREMETAL_RESOURCE
owner_to_type = {
n_const.DEVICE_OWNER_DHCP: a_const.DHCP_RESOURCE,
n_const.DEVICE_OWNER_DVR_INTERFACE: a_const.ROUTER_RESOURCE,
n_const.DEVICE_OWNER_ROUTER_INTF: a_const.ROUTER_RESOURCE,
n_const.DEVICE_OWNER_ROUTER_HA_INTF: a_const.ROUTER_RESOURCE,
n_const.DEVICE_OWNER_ROUTER_GW: a_const.ROUTER_RESOURCE,
trunk_consts.TRUNK_SUBPORT_OWNER: a_const.VM_RESOURCE}
if port['device_owner'] in owner_to_type.keys():
return owner_to_type[port['device_owner']]
elif port['device_owner'].startswith(
n_const.DEVICE_OWNER_COMPUTE_PREFIX):
return a_const.VM_RESOURCE
return None
def _get_binding_keys(self, port, host):
"""Get binding keys from the port binding"""
binding_keys = list()
switch_binding = port[portbindings.PROFILE].get(
'local_link_information', None)
if switch_binding:
for binding in switch_binding:
switch_id = binding.get('switch_id')
port_id = binding.get('port_id')
binding_keys.append((port['id'], (switch_id, port_id)))
else:
binding_keys.append((port['id'], host))
return binding_keys
def create_port_binding(self, port, host):
"""Enqueue port binding create"""
tenant_id = port['project_id']
instance_type = self.get_instance_type(port)
if not instance_type:
return
port_type = instance_type + a_const.PORT_SUFFIX
action = a_const.CREATE if tenant_id else a_const.FULL_SYNC
related_resources = list()
related_resources.append((a_const.TENANT_RESOURCE, tenant_id))
related_resources.append((instance_type, port['device_id']))
related_resources.append((port_type, port['id']))
for pb_key in self._get_binding_keys(port, host):
pb_res = MechResource(pb_key, a_const.PORT_BINDING_RESOURCE,
action, related_resources=related_resources)
self.provision_queue.put(pb_res)
def delete_port_binding(self, port, host):
"""Enqueue port binding delete"""
tenant_id = port['project_id']
instance_type = self.get_instance_type(port)
if not instance_type:
return
port_type = instance_type + a_const.PORT_SUFFIX
action = a_const.DELETE if tenant_id else a_const.FULL_SYNC
related_resources = list()
# Delete tenant if this was the last tenant resource
if not db_lib.tenant_provisioned(tenant_id):
related_resources.append((a_const.TENANT_RESOURCE, tenant_id))
# Delete instance if this was the last instance port
if not db_lib.instance_provisioned(port['device_id']):
related_resources.append((instance_type, port['device_id']))
# Delete port if this was the last port binding
if not db_lib.port_provisioned(port['id']):
related_resources.append((port_type, port['id']))
for pb_key in self._get_binding_keys(port, host):
pb_res = MechResource(pb_key, a_const.PORT_BINDING_RESOURCE,
action, related_resources=related_resources)
self.provision_queue.put(pb_res)
def create_network_postcommit(self, context):
"""Provision the network on CVX"""
network = context.current
log_context("create_network_postcommit: network", network)
segments = context.network_segments
self.create_network(network, segments)
def update_network_postcommit(self, context):
"""Send network updates to CVX:
- Update the network name
- Add new segments
"""
network = context.current
orig_network = context.original
log_context("update_network_postcommit: network", network)
log_context("update_network_postcommit: orig", orig_network)
segments = context.network_segments
# New segments may have been added
self.create_network(network, segments)
def delete_network_postcommit(self, context):
"""Delete the network from CVX"""
network = context.current
log_context("delete_network_postcommit: network", network)
segments = context.network_segments
self.delete_network(network, segments)
def update_port_postcommit(self, context):
"""Send port updates to CVX
This method is also responsible for the initial creation of ports
as we wait until after a port is bound to send the port data to CVX
"""
port = context.current
orig_port = context.original
network = context.network.current
log_context("update_port_postcommit: port", port)
log_context("update_port_postcommit: orig", orig_port)
# Device id can change without a port going DOWN, but the new device
# id may not be supported
if orig_port and port['device_id'] != orig_port['device_id']:
self.delete_port_binding(orig_port, context.original_host)
if context.status in [n_const.PORT_STATUS_ACTIVE,
n_const.PORT_STATUS_BUILD]:
if context.binding_levels:
segments = [
level['bound_segment'] for level in context.binding_levels]
self.create_network(network, segments)
self.create_port_binding(port, context.host)
else:
if (context.original_host and
context.status != context.original_status):
self.delete_port_binding(orig_port, context.original_host)
self._try_to_release_dynamic_segment(context, migration=True)
def delete_port_postcommit(self, context):
"""Delete the port from CVX"""
port = context.current
log_context("delete_port_postcommit: port", port)
self.delete_port_binding(port, context.host)
self._try_to_release_dynamic_segment(context)
def _bind_baremetal_port(self, context, segment):
"""Bind the baremetal port to the segment"""
port = context.current
vif_details = {
portbindings.VIF_DETAILS_VLAN: str(
segment[driver_api.SEGMENTATION_ID])
}
context.set_binding(segment[driver_api.ID],
portbindings.VIF_TYPE_OTHER,
vif_details,
n_const.ACTIVE)
LOG.debug("AristaDriver: bound port info- port ID %(id)s "
"on network %(network)s",
{'id': port['id'],
'network': context.network.current['id']})
if port.get('trunk_details'):
self.trunk_driver.bind_port(port)
return True
def _get_physnet(self, context):
"""Find the appropriate physnet for the host
- Baremetal ports' physnet is determined by looking at the
local_link_information contained in the binding profile
- Other ports' physnet is determined by looking for the host in the
topology
"""
port = context.current
physnet = None
if (port.get(portbindings.VNIC_TYPE) == portbindings.VNIC_BAREMETAL):
physnet = self.eapi.get_baremetal_physnet(context)
else:
physnet = self.eapi.get_host_physnet(context)
# If the switch is part of an mlag pair, the physnet is called
# peer1_peer2
physnet = self.mlag_pairs.get(physnet, physnet)
return physnet
def _bind_fabric(self, context, segment):
"""Allocate dynamic segments for the port
Segment physnets are based on the switch to which the host is
connected.
"""
port_id = context.current['id']
physnet = self._get_physnet(context)
if not physnet:
LOG.debug("bind_port for port %(port)s: no physical_network "
"found", {'port': port_id})
return False
with lockutils.lock(physnet, external=True):
context.allocate_dynamic_segment(
{'network_id': context.network.current['id'],
'network_type': n_const.TYPE_VLAN,
'physical_network': physnet})
next_segment = segments_db.get_dynamic_segment(
context._plugin_context, context.network.current['id'],
physical_network=physnet)
LOG.debug("bind_port for port %(port)s: "
"current_segment=%(current_seg)s, "
"next_segment=%(next_seg)s",
{'port': port_id, 'current_seg': segment,
'next_seg': next_segment})
context.continue_binding(segment['id'], [next_segment])
return True
def bind_port(self, context):
"""Bind port to a network segment.
Provisioning request to Arista Hardware to plug a host
into appropriate network is done when the port is created
this simply tells the ML2 Plugin that we are binding the port
"""
port = context.current
log_context("bind_port: port", port)
for segment in context.segments_to_bind:
physnet = segment.get(driver_api.PHYSICAL_NETWORK)
segment_type = segment[driver_api.NETWORK_TYPE]
if not physnet:
if (segment_type == n_const.TYPE_VXLAN and self.manage_fabric):
if self._bind_fabric(context, segment):
continue
elif (port.get(portbindings.VNIC_TYPE) ==
portbindings.VNIC_BAREMETAL):
if (not self.managed_physnets or
physnet in self.managed_physnets):
if self._bind_baremetal_port(context, segment):
continue
LOG.debug("Arista mech driver unable to bind port %(port)s to "
"%(seg_type)s segment on physical_network %(physnet)s",
{'port': port.get('id'), 'seg_type': segment_type,
'physnet': physnet})
def _try_to_release_dynamic_segment(self, context, migration=False):
"""Release dynamic segment if necessary
If this port was the last port using a segment and the segment was
allocated by this driver, it should be released
"""
if migration:
binding_levels = context.original_binding_levels
else:
binding_levels = context.binding_levels
LOG.debug("_try_to_release_dynamic_segment: "
"binding_levels=%(bl)s", {'bl': binding_levels})
if not binding_levels:
return
for prior_level, binding in enumerate(binding_levels[1:]):
allocating_driver = binding_levels[prior_level].get(
driver_api.BOUND_DRIVER)
if allocating_driver != a_const.MECHANISM_DRV_NAME:
continue
bound_segment = binding.get(driver_api.BOUND_SEGMENT, {})
segment_id = bound_segment.get('id')
if not db_lib.segment_is_dynamic(segment_id):
continue
if not db_lib.segment_bound(segment_id):
context.release_dynamic_segment(segment_id)
self.delete_segment(bound_segment)
LOG.debug("Released dynamic segment %(seg)s allocated "
"by %(drv)s", {'seg': segment_id,
'drv': allocating_driver})