Browse Source

Split arista_ml2 into separate files for sync and RPC wrappers

arista_ml2.py was becoming quite large and a dumping ground for
code. This change begins the process of organizing the driver in
a more structured, and manageable way.

Change-Id: I050cdbdc028d7acda940eb932df7052ad2698967
changes/27/490227/5
Mitchell Jameson 5 years ago
parent
commit
310066fa17
  1. 50
      networking_arista/common/constants.py
  2. 2255
      networking_arista/ml2/arista_ml2.py
  3. 358
      networking_arista/ml2/arista_sync.py
  4. 2
      networking_arista/ml2/drivers/driver_helpers.py
  5. 4
      networking_arista/ml2/drivers/type_arista_vlan.py
  6. 119
      networking_arista/ml2/mechanism_arista.py
  7. 0
      networking_arista/ml2/rpc/__init__.py
  8. 871
      networking_arista/ml2/rpc/arista_eapi.py
  9. 674
      networking_arista/ml2/rpc/arista_json.py
  10. 464
      networking_arista/ml2/rpc/base.py
  11. 2
      networking_arista/tests/unit/ml2/drivers/test_arista_type_driver.py
  12. 0
      networking_arista/tests/unit/ml2/rpc/__init__.py
  13. 736
      networking_arista/tests/unit/ml2/rpc/test_arista_eapi_rpc_wrapper.py
  14. 657
      networking_arista/tests/unit/ml2/rpc/test_arista_json_rpc_wrapper.py
  15. 1640
      networking_arista/tests/unit/ml2/test_arista_mechanism_driver.py
  16. 320
      networking_arista/tests/unit/ml2/test_arista_sync.py
  17. 9
      networking_arista/tests/unit/ml2/utils.py

50
networking_arista/common/constants.py

@ -0,0 +1,50 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from networking_arista._i18n import _
EOS_UNREACHABLE_MSG = _('Unable to reach EOS')
UNABLE_TO_DELETE_PORT_MSG = _('Unable to delete port from EOS')
UNABLE_TO_DELETE_DEVICE_MSG = _('Unable to delete device')
# Constants
INTERNAL_TENANT_ID = 'INTERNAL-TENANT-ID'
MECHANISM_DRV_NAME = 'arista'
# Insert a heartbeat command every 100 commands
HEARTBEAT_INTERVAL = 100
# Commands dict keys
CMD_SYNC_HEARTBEAT = 'SYNC_HEARTBEAT'
CMD_REGION_SYNC = 'REGION_SYNC'
CMD_INSTANCE = 'INSTANCE'
# EAPI error messages of interest
ERR_CVX_NOT_LEADER = 'only available on cluster leader'
ERR_DVR_NOT_SUPPORTED = 'EOS version on CVX does not support DVR'
BAREMETAL_NOT_SUPPORTED = 'EOS version on CVX dpes not support Baremetal'
# Flat network constant
NETWORK_TYPE_FLAT = 'flat'
class InstanceType(object):
BAREMETAL = 'baremetal'
DHCP = 'dhcp'
ROUTER = 'router'
VM = 'vm'
VIRTUAL_INSTANCE_TYPES = [DHCP, ROUTER, VM]
BAREMETAL_INSTANCE_TYPES = [BAREMETAL]

2255
networking_arista/ml2/arista_ml2.py

File diff suppressed because it is too large Load Diff

358
networking_arista/ml2/arista_sync.py

@ -0,0 +1,358 @@
# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from neutron_lib import worker
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from networking_arista._i18n import _LI
from networking_arista.common import constants
from networking_arista.common import db_lib
from networking_arista.common import exceptions as arista_exc
LOG = logging.getLogger(__name__)
class AristaSyncWorker(worker.BaseWorker):
def __init__(self, rpc, ndb):
super(AristaSyncWorker, self).__init__(worker_process_count=0)
self.ndb = ndb
self.rpc = rpc
self.sync_service = SyncService(rpc, ndb)
rpc.sync_service = self.sync_service
self._loop = None
def start(self):
super(AristaSyncWorker, self).start()
self._sync_running = True
self._sync_event = threading.Event()
self._cleanup_db()
# Registering with EOS updates self.rpc.region_updated_time. Clear it
# to force an initial sync
self.rpc.clear_region_updated_time()
if self._loop is None:
self._loop = loopingcall.FixedIntervalLoopingCall(
self.sync_service.do_synchronize
)
self._loop.start(interval=cfg.CONF.ml2_arista.sync_interval)
def stop(self, graceful=False):
if self._loop is not None:
self._loop.stop()
def wait(self):
if self._loop is not None:
self._loop.wait()
def reset(self):
self.stop()
self.wait()
self.start()
def _cleanup_db(self):
"""Clean up any unnecessary entries in our DB."""
LOG.info('Arista Sync: DB Cleanup')
neutron_nets = self.ndb.get_all_networks()
arista_db_nets = db_lib.get_networks(tenant_id='any')
neutron_net_ids = set()
for net in neutron_nets:
neutron_net_ids.add(net['id'])
# Remove networks from the Arista DB if the network does not exist in
# Neutron DB
for net_id in set(arista_db_nets.keys()).difference(neutron_net_ids):
tenant_network = arista_db_nets[net_id]
db_lib.forget_network_segment(tenant_network['tenantId'], net_id)
db_lib.forget_all_ports_for_network(net_id)
class SyncService(object):
"""Synchronization of information between Neutron and EOS
Periodically (through configuration option), this service
ensures that Networks and VMs configured on EOS/Arista HW
are always in sync with Neutron DB.
"""
def __init__(self, rpc_wrapper, neutron_db):
self._rpc = rpc_wrapper
self._ndb = neutron_db
self._force_sync = True
self._region_updated_time = None
def force_sync(self):
"""Sets the force_sync flag."""
self._force_sync = True
def do_synchronize(self):
"""Periodically check whether EOS is in sync with ML2 driver.
If ML2 database is not in sync with EOS, then compute the diff and
send it down to EOS.
"""
# Perform sync of Security Groups unconditionally
try:
self._rpc.perform_sync_of_sg()
except Exception as e:
LOG.warning(e)
# Check whether CVX is available before starting the sync.
if not self._rpc.check_cvx_availability():
LOG.warning("Not syncing as CVX is unreachable")
self.force_sync()
return
if not self._sync_required():
return
LOG.info('Attempting to sync')
# Send 'sync start' marker.
if not self._rpc.sync_start():
LOG.info(_LI('Not starting sync, setting force'))
self._force_sync = True
return
# Perform the actual synchronization.
self.synchronize()
# Send 'sync end' marker.
if not self._rpc.sync_end():
LOG.info(_LI('Sync end failed, setting force'))
self._force_sync = True
return
self._set_region_updated_time()
def synchronize(self):
"""Sends data to EOS which differs from neutron DB."""
LOG.info(_LI('Syncing Neutron <-> EOS'))
try:
# Register with EOS to ensure that it has correct credentials
self._rpc.register_with_eos(sync=True)
self._rpc.check_supported_features()
eos_tenants = self._rpc.get_tenants()
except arista_exc.AristaRpcError:
LOG.warning(constants.EOS_UNREACHABLE_MSG)
self._force_sync = True
return
db_tenants = db_lib.get_tenants()
# Delete tenants that are in EOS, but not in the database
tenants_to_delete = frozenset(eos_tenants.keys()).difference(
db_tenants.keys())
if tenants_to_delete:
try:
self._rpc.delete_tenant_bulk(tenants_to_delete, sync=True)
except arista_exc.AristaRpcError:
LOG.warning(constants.EOS_UNREACHABLE_MSG)
self._force_sync = True
return
# None of the commands have failed till now. But if subsequent
# operations fail, then force_sync is set to true
self._force_sync = False
# Create a dict of networks keyed by id.
neutron_nets = dict(
(network['id'], network) for network in
self._ndb.get_all_networks()
)
# Get Baremetal port switch_bindings, if any
port_profiles = db_lib.get_all_portbindings()
# To support shared networks, split the sync loop in two parts:
# In first loop, delete unwanted VM and networks and update networks
# In second loop, update VMs. This is done to ensure that networks for
# all tenats are updated before VMs are updated
instances_to_update = {}
for tenant in db_tenants.keys():
db_nets = db_lib.get_networks(tenant)
db_instances = db_lib.get_vms(tenant)
eos_nets = self._get_eos_networks(eos_tenants, tenant)
eos_vms, eos_bms, eos_routers = self._get_eos_vms(eos_tenants,
tenant)
db_nets_key_set = frozenset(db_nets.keys())
db_instances_key_set = frozenset(db_instances.keys())
eos_nets_key_set = frozenset(eos_nets.keys())
eos_vms_key_set = frozenset(eos_vms.keys())
eos_routers_key_set = frozenset(eos_routers.keys())
eos_bms_key_set = frozenset(eos_bms.keys())
# Create a candidate list by incorporating all instances
eos_instances_key_set = (eos_vms_key_set | eos_routers_key_set |
eos_bms_key_set)
# Find the networks that are present on EOS, but not in Neutron DB
nets_to_delete = eos_nets_key_set.difference(db_nets_key_set)
# Find the VMs that are present on EOS, but not in Neutron DB
instances_to_delete = eos_instances_key_set.difference(
db_instances_key_set)
vms_to_delete = [
vm for vm in eos_vms_key_set if vm in instances_to_delete]
routers_to_delete = [
r for r in eos_routers_key_set if r in instances_to_delete]
bms_to_delete = [
b for b in eos_bms_key_set if b in instances_to_delete]
# Find the Networks that are present in Neutron DB, but not on EOS
nets_to_update = db_nets_key_set.difference(eos_nets_key_set)
# Find the VMs that are present in Neutron DB, but not on EOS
instances_to_update[tenant] = db_instances_key_set.difference(
eos_instances_key_set)
try:
if vms_to_delete:
self._rpc.delete_vm_bulk(tenant, vms_to_delete, sync=True)
if routers_to_delete:
if self._rpc.bm_and_dvr_supported():
self._rpc.delete_instance_bulk(
tenant,
routers_to_delete,
constants.InstanceType.ROUTER,
sync=True)
else:
LOG.info(constants.ERR_DVR_NOT_SUPPORTED)
if bms_to_delete:
if self._rpc.bm_and_dvr_supported():
self._rpc.delete_instance_bulk(
tenant,
bms_to_delete,
constants.InstanceType.BAREMETAL,
sync=True)
else:
LOG.info(constants.BAREMETAL_NOT_SUPPORTED)
if nets_to_delete:
self._rpc.delete_network_bulk(tenant, nets_to_delete,
sync=True)
if nets_to_update:
networks = [{
'network_id': net_id,
'network_name':
neutron_nets.get(net_id, {'name': ''})['name'],
'shared':
neutron_nets.get(net_id,
{'shared': False})['shared'],
'segments': self._ndb.get_all_network_segments(net_id),
}
for net_id in nets_to_update
]
self._rpc.create_network_bulk(tenant, networks, sync=True)
except arista_exc.AristaRpcError:
LOG.warning(constants.EOS_UNREACHABLE_MSG)
self._force_sync = True
# Now update the VMs
for tenant in instances_to_update:
if not instances_to_update[tenant]:
continue
try:
# Filter the ports to only the vms that we are interested
# in.
ports_of_interest = {}
for port in self._ndb.get_all_ports_for_tenant(tenant):
ports_of_interest.update(
self._port_dict_representation(port))
if ports_of_interest:
db_vms = db_lib.get_vms(tenant)
if db_vms:
self._rpc.create_instance_bulk(tenant,
ports_of_interest,
db_vms,
port_profiles,
sync=True)
except arista_exc.AristaRpcError:
LOG.warning(constants.EOS_UNREACHABLE_MSG)
self._force_sync = True
def _region_in_sync(self):
"""Checks if the region is in sync with EOS.
Checks whether the timestamp stored in EOS is the same as the
timestamp stored locally.
"""
eos_region_updated_times = self._rpc.get_region_updated_time()
if eos_region_updated_times:
return (self._region_updated_time and
(self._region_updated_time['regionTimestamp'] ==
eos_region_updated_times['regionTimestamp']))
else:
return False
def _sync_required(self):
""""Check whether the sync is required."""
try:
# Get the time at which entities in the region were updated.
# If the times match, then ML2 is in sync with EOS. Otherwise
# perform a complete sync.
if not self._force_sync and self._region_in_sync():
LOG.info(_LI('OpenStack and EOS are in sync!'))
return False
except arista_exc.AristaRpcError:
LOG.warning(constants.EOS_UNREACHABLE_MSG)
# Force an update incase of an error.
self._force_sync = True
return True
def _set_region_updated_time(self):
"""Get the region updated time from EOS and store it locally."""
try:
self._region_updated_time = self._rpc.get_region_updated_time()
except arista_exc.AristaRpcError:
# Force an update incase of an error.
self._force_sync = True
def _get_eos_networks(self, eos_tenants, tenant):
networks = {}
if eos_tenants and tenant in eos_tenants:
networks = eos_tenants[tenant]['tenantNetworks']
return networks
def _get_eos_vms(self, eos_tenants, tenant):
vms = {}
bms = {}
routers = {}
if eos_tenants and tenant in eos_tenants:
vms = eos_tenants[tenant]['tenantVmInstances']
if 'tenantBaremetalInstances' in eos_tenants[tenant]:
# Check if baremetal service is supported
bms = eos_tenants[tenant]['tenantBaremetalInstances']
if 'tenantRouterInstances' in eos_tenants[tenant]:
routers = eos_tenants[tenant]['tenantRouterInstances']
return vms, bms, routers
def _port_dict_representation(self, port):
return {port['id']: {'device_owner': port['device_owner'],
'device_id': port['device_id'],
'name': port['name'],
'id': port['id'],
'tenant_id': port['tenant_id'],
'network_id': port['network_id']}}

2
networking_arista/ml2/drivers/driver_helpers.py

@ -20,8 +20,8 @@ from six import moves
from neutron.db.models.plugins.ml2 import vlanallocation
from networking_arista._i18n import _LI
from networking_arista.common.constants import EOS_UNREACHABLE_MSG
from networking_arista.common import exceptions as arista_exc
from networking_arista.ml2.arista_ml2 import EOS_UNREACHABLE_MSG
LOG = log.getLogger(__name__)

4
networking_arista/ml2/drivers/type_arista_vlan.py

@ -23,8 +23,8 @@ from neutron.plugins.ml2.drivers import type_vlan
from networking_arista._i18n import _LI
from networking_arista.common import db_lib
from networking_arista.common import exceptions as exc
from networking_arista.ml2 import arista_ml2
from networking_arista.ml2.drivers import driver_helpers
from networking_arista.ml2.rpc.arista_eapi import AristaRPCWrapperEapi
LOG = log.getLogger(__name__)
cfg.CONF.import_group('arista_type_driver', 'networking_arista.common.config')
@ -44,7 +44,7 @@ class AristaVlanTypeDriver(type_vlan.VlanTypeDriver):
def __init__(self):
super(AristaVlanTypeDriver, self).__init__()
ndb = db_lib.NeutronNets()
self.rpc = arista_ml2.AristaRPCWrapperEapi(ndb)
self.rpc = AristaRPCWrapperEapi(ndb)
self.sync_service = driver_helpers.VlanSyncService(self.rpc)
self.network_vlan_ranges = dict()
self.sync_timeout = cfg.CONF.arista_type_driver['sync_interval']

119
networking_arista/ml2/mechanism_arista.py

@ -18,34 +18,26 @@ import threading
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as n_const
from neutron_lib.plugins.ml2 import api as driver_api
from neutron_lib import worker
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import excutils
from neutron.common import constants as neutron_const
from networking_arista._i18n import _, _LI, _LE
from networking_arista.common import constants
from networking_arista.common import db
from networking_arista.common import db_lib
from networking_arista.common import exceptions as arista_exc
from networking_arista.ml2 import arista_ml2
from networking_arista.ml2 import arista_sync
from networking_arista.ml2.rpc.arista_eapi import AristaRPCWrapperEapi
from networking_arista.ml2.rpc.arista_json import AristaRPCWrapperJSON
from networking_arista.ml2 import sec_group_callback
LOG = logging.getLogger(__name__)
cfg.CONF.import_group('ml2_arista', 'networking_arista.common.config')
# Messages
EOS_UNREACHABLE_MSG = _('Unable to reach EOS')
UNABLE_TO_DELETE_PORT_MSG = _('Unable to delete port from EOS')
UNABLE_TO_DELETE_DEVICE_MSG = _('Unable to delete device')
# Constants
INTERNAL_TENANT_ID = 'INTERNAL-TENANT-ID'
PORT_BINDING_HOST = 'binding:host_id'
MECHANISM_DRV_NAME = 'arista'
def pretty_log(tag, obj):
import json
@ -54,63 +46,6 @@ def pretty_log(tag, obj):
LOG.debug(log_data)
class AristaSyncWorker(worker.BaseWorker):
def __init__(self, rpc, ndb):
super(AristaSyncWorker, self).__init__(worker_process_count=0)
self.ndb = ndb
self.rpc = rpc
self.sync_service = arista_ml2.SyncService(rpc, ndb)
rpc.sync_service = self.sync_service
self._loop = None
def start(self):
super(AristaSyncWorker, self).start()
self._sync_running = True
self._sync_event = threading.Event()
self._cleanup_db()
# Registering with EOS updates self.rpc.region_updated_time. Clear it
# to force an initial sync
self.rpc.clear_region_updated_time()
if self._loop is None:
self._loop = loopingcall.FixedIntervalLoopingCall(
self.sync_service.do_synchronize
)
self._loop.start(interval=cfg.CONF.ml2_arista.sync_interval)
def stop(self, graceful=False):
if self._loop is not None:
self._loop.stop()
def wait(self):
if self._loop is not None:
self._loop.wait()
def reset(self):
self.stop()
self.wait()
self.start()
def _cleanup_db(self):
"""Clean up any unnecessary entries in our DB."""
LOG.info('Arista Sync: DB Cleanup')
neutron_nets = self.ndb.get_all_networks()
arista_db_nets = db_lib.get_networks(tenant_id='any')
neutron_net_ids = set()
for net in neutron_nets:
neutron_net_ids.add(net['id'])
# Remove networks from the Arista DB if the network does not exist in
# Neutron DB
for net_id in set(arista_db_nets.keys()).difference(neutron_net_ids):
tenant_network = arista_db_nets[net_id]
db_lib.forget_network_segment(tenant_network['tenantId'], net_id)
db_lib.forget_all_ports_for_network(net_id)
class AristaDriver(driver_api.MechanismDriver):
"""Ml2 Mechanism driver for Arista networking hardware.
@ -139,14 +74,14 @@ class AristaDriver(driver_api.MechanismDriver):
self.rpc = rpc
self.eapi = rpc
else:
self.eapi = arista_ml2.AristaRPCWrapperEapi(self.ndb)
self.eapi = AristaRPCWrapperEapi(self.ndb)
api_type = confg['api_type'].upper()
if api_type == 'EAPI':
LOG.info("Using EAPI for RPC")
self.rpc = arista_ml2.AristaRPCWrapperEapi(self.ndb)
self.rpc = AristaRPCWrapperEapi(self.ndb)
elif api_type == 'JSON':
LOG.info("Using JSON for RPC")
self.rpc = arista_ml2.AristaRPCWrapperJSON(self.ndb)
self.rpc = AristaRPCWrapperJSON(self.ndb)
else:
msg = "RPC mechanism %s not recognized" % api_type
LOG.error(msg)
@ -160,7 +95,7 @@ class AristaDriver(driver_api.MechanismDriver):
self.sg_handler = sec_group_callback.AristaSecurityGroupHandler(self)
def get_workers(self):
return [AristaSyncWorker(self.rpc, self.ndb)]
return [arista_sync.AristaSyncWorker(self.rpc, self.ndb)]
def create_network_precommit(self, context):
"""Remember the tenant, and network information."""
@ -174,7 +109,7 @@ class AristaDriver(driver_api.MechanismDriver):
segments[0][driver_api.NETWORK_TYPE] != n_const.TYPE_VLAN):
return
network_id = network['id']
tenant_id = network['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = network['tenant_id'] or constants.INTERNAL_TENANT_ID
with self.eos_sync_lock:
db_lib.remember_tenant(tenant_id)
for segment in segments:
@ -189,7 +124,7 @@ class AristaDriver(driver_api.MechanismDriver):
network = context.current
network_id = network['id']
network_name = network['name']
tenant_id = network['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = network['tenant_id'] or constants.INTERNAL_TENANT_ID
segments = context.network_segments
shared_net = network['shared']
with self.eos_sync_lock:
@ -233,7 +168,8 @@ class AristaDriver(driver_api.MechanismDriver):
(new_network['shared'] != orig_network['shared'])):
network_id = new_network['id']
network_name = new_network['name']
tenant_id = new_network['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = (new_network['tenant_id'] or
constants.INTERNAL_TENANT_ID)
shared_net = new_network['shared']
with self.eos_sync_lock:
if db_lib.is_network_provisioned(tenant_id, network_id):
@ -257,7 +193,7 @@ class AristaDriver(driver_api.MechanismDriver):
"""Delete the network information from the DB."""
network = context.current
network_id = network['id']
tenant_id = network['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = network['tenant_id'] or constants.INTERNAL_TENANT_ID
with self.eos_sync_lock:
if db_lib.is_network_provisioned(tenant_id, network_id):
if db_lib.are_ports_attached_to_network(network_id):
@ -281,7 +217,7 @@ class AristaDriver(driver_api.MechanismDriver):
# HPB is not supported.
segments = []
network_id = network['id']
tenant_id = network['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = network['tenant_id'] or constants.INTERNAL_TENANT_ID
with self.eos_sync_lock:
# Succeed deleting network in case EOS is not accessible.
@ -504,7 +440,7 @@ class AristaDriver(driver_api.MechanismDriver):
if new_host and orig_host and new_host != orig_host:
LOG.debug("Handling port migration for: %s " % orig_port)
network_id = orig_port['network_id']
tenant_id = orig_port['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = orig_port['tenant_id'] or constants.INTERNAL_TENANT_ID
# Ensure that we use tenant Id for the network owner
tenant_id = self._network_owner_tenant(context, network_id,
tenant_id)
@ -537,7 +473,7 @@ class AristaDriver(driver_api.MechanismDriver):
# 2. If segment_id is provisioned and it not bound to any port it
# should be removed from EOS.
network_id = orig_port['network_id']
tenant_id = orig_port['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = orig_port['tenant_id'] or constants.INTERNAL_TENANT_ID
# Ensure that we use tenant Id for the network owner
tenant_id = self._network_owner_tenant(context, network_id,
tenant_id)
@ -570,7 +506,7 @@ class AristaDriver(driver_api.MechanismDriver):
tenant_id, network_id,
binding_level.segment_id)
except arista_exc.AristaRpcError:
LOG.info(EOS_UNREACHABLE_MSG)
LOG.info(constants.EOS_UNREACHABLE_MSG)
return True
@ -609,7 +545,7 @@ class AristaDriver(driver_api.MechanismDriver):
# device_id and device_owner are set on VM boot
port_id = new_port['id']
network_id = new_port['network_id']
tenant_id = new_port['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = new_port['tenant_id'] or constants.INTERNAL_TENANT_ID
# Ensure that we use tenant Id for the network owner
tenant_id = self._network_owner_tenant(context, network_id, tenant_id)
@ -703,7 +639,7 @@ class AristaDriver(driver_api.MechanismDriver):
port_id = port['id']
port_name = port['name']
network_id = port['network_id']
tenant_id = port['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = port['tenant_id'] or constants.INTERNAL_TENANT_ID
# Ensure that we use tenant Id for the network owner
tenant_id = self._network_owner_tenant(context, network_id, tenant_id)
sg = port['security_groups']
@ -816,7 +752,7 @@ class AristaDriver(driver_api.MechanismDriver):
host = context.host
network_id = port['network_id']
tenant_id = port['tenant_id'] or INTERNAL_TENANT_ID
tenant_id = port['tenant_id'] or constants.INTERNAL_TENANT_ID
# Ensure that we use tenant Id for the network owner
tenant_id = self._network_owner_tenant(context, network_id, tenant_id)
@ -834,7 +770,7 @@ class AristaDriver(driver_api.MechanismDriver):
except arista_exc.AristaRpcError:
# Can't do much if deleting a port failed.
# Log a warning and continue.
LOG.warning(UNABLE_TO_DELETE_PORT_MSG)
LOG.warning(constants.UNABLE_TO_DELETE_PORT_MSG)
def _delete_port(self, port, host, tenant_id):
"""Deletes the port from EOS.
@ -861,7 +797,7 @@ class AristaDriver(driver_api.MechanismDriver):
sg = port['security_groups']
if not device_id or not host:
LOG.warning(UNABLE_TO_DELETE_DEVICE_MSG)
LOG.warning(constants.UNABLE_TO_DELETE_DEVICE_MSG)
return
try:
@ -878,7 +814,7 @@ class AristaDriver(driver_api.MechanismDriver):
# if necessary, delete tenant as well.
self.delete_tenant(tenant_id)
except arista_exc.AristaRpcError:
LOG.info(EOS_UNREACHABLE_MSG)
LOG.info(constants.EOS_UNREACHABLE_MSG)
def _delete_segment(self, context, tenant_id):
"""Deletes a dynamic network segment from EOS.
@ -916,7 +852,7 @@ class AristaDriver(driver_api.MechanismDriver):
db_lib.forget_network_segment(
tenant_id, network_id, binding_level.segment_id)
except arista_exc.AristaRpcError:
LOG.info(EOS_UNREACHABLE_MSG)
LOG.info(constants.EOS_UNREACHABLE_MSG)
else:
LOG.debug("Cannot delete segment_id %(segid)s "
"segment is %(seg)s",
@ -959,7 +895,8 @@ class AristaDriver(driver_api.MechanismDriver):
# When Arista driver participate in port binding by allocating dynamic
# segment and then calling continue_binding, the driver should the
# second last driver in the bound drivers list.
if (segment_id and bound_drivers[-2:-1] == [MECHANISM_DRV_NAME]):
if (segment_id and bound_drivers[-2:-1] ==
[constants.MECHANISM_DRV_NAME]):
filters = {'segment_id': segment_id}
result = db_lib.get_port_binding_level(filters)
LOG.debug("Looking for entry with filters=%(filters)s "
@ -987,7 +924,7 @@ class AristaDriver(driver_api.MechanismDriver):
self.rpc.delete_tenant(tenant_id)
except arista_exc.AristaRpcError:
with excutils.save_and_reraise_exception():
LOG.info(EOS_UNREACHABLE_MSG)
LOG.info(constants.EOS_UNREACHABLE_MSG)
def _host_name(self, hostname):
fqdns_used = cfg.CONF.ml2_arista['use_fqdn']

0
networking_arista/ml2/rpc/__init__.py

871
networking_arista/ml2/rpc/arista_eapi.py

@ -0,0 +1,871 @@
# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import socket
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as n_const
from oslo_config import cfg
from oslo_log import log as logging
import requests
from networking_arista._i18n import _, _LI, _LW, _LE
from networking_arista.common import constants as const
from networking_arista.common import exceptions as arista_exc
from networking_arista.ml2.rpc.base import AristaRPCWrapperBase
LOG = logging.getLogger(__name__)
class AristaRPCWrapperEapi(AristaRPCWrapperBase):
def __init__(self, ndb):
super(AristaRPCWrapperEapi, self).__init__(ndb)
# The cli_commands dict stores the mapping between the CLI command key
# and the actual CLI command.
self.cli_commands = {
'timestamp': [
'show openstack config region %s timestamp' % self.region],
const.CMD_REGION_SYNC: 'region %s sync' % self.region,
const.CMD_INSTANCE: None,
const.CMD_SYNC_HEARTBEAT: 'sync heartbeat',
'resource-pool': [],
'features': {},
}
def _send_eapi_req(self, cmds, commands_to_log=None):
# This method handles all EAPI requests (using the requests library)
# and returns either None or response.json()['result'] from the EAPI
# request.
#
# Exceptions related to failures in connecting/ timeouts are caught
# here and logged. Other unexpected exceptions are logged and raised
request_headers = {}
request_headers['Content-Type'] = 'application/json'
request_headers['Accept'] = 'application/json'
url = self._api_host_url(host=self._server_ip)
params = {}
params['timestamps'] = "false"
params['format'] = "json"
params['version'] = 1
params['cmds'] = cmds
data = {}
data['id'] = "Arista ML2 driver"
data['method'] = "runCmds"
data['jsonrpc'] = "2.0"
data['params'] = params
response = None
try:
# NOTE(pbourke): shallow copy data and params to remove sensitive
# information before logging
log_data = dict(data)
log_data['params'] = dict(params)
log_data['params']['cmds'] = commands_to_log or cmds
msg = (_('EAPI request to %(ip)s contains %(cmd)s') %
{'ip': self._server_ip, 'cmd': json.dumps(log_data)})
LOG.info(msg)
response = requests.post(url, timeout=self.conn_timeout,
verify=False, data=json.dumps(data))
LOG.info(_LI('EAPI response contains: %s'), response.json())
try:
return response.json()['result']
except KeyError:
if response.json()['error']['code'] == 1002:
for data in response.json()['error']['data']:
if type(data) == dict and 'errors' in data:
if const.ERR_CVX_NOT_LEADER in data['errors'][0]:
msg = unicode("%s is not the master" % (
self._server_ip))
LOG.info(msg)
return None
msg = "Unexpected EAPI error"
LOG.info(msg)
raise arista_exc.AristaRpcError(msg=msg)
except requests.exceptions.ConnectionError:
msg = (_('Error while trying to connect to %(ip)s') %
{'ip': self._server_ip})
LOG.warning(msg)
return None
except requests.exceptions.ConnectTimeout:
msg = (_('Timed out while trying to connect to %(ip)s') %
{'ip': self._server_ip})
LOG.warning(msg)
return None
except requests.exceptions.Timeout:
msg = (_('Timed out during an EAPI request to %(ip)s') %
{'ip': self._server_ip})
LOG.warning(msg)
return None
except requests.exceptions.InvalidURL:
msg = (_('Ignore attempt to connect to invalid URL %(ip)s') %
{'ip': self._server_ip})
LOG.warning(msg)
return None
except ValueError:
LOG.info("Ignoring invalid JSON response")
return None
except Exception as error:
msg = unicode(error)
LOG.warning(msg)
raise
def check_supported_features(self):
cmd = ['show openstack instances']
try:
self._run_eos_cmds(cmd)
self.cli_commands[const.CMD_INSTANCE] = 'instance'
except (arista_exc.AristaRpcError, Exception) as err:
self.cli_commands[const.CMD_INSTANCE] = None
LOG.warning(_LW("'instance' command is not available on EOS "
"because of %s"), err)
# Get list of supported openstack features by CVX
cmd = ['show openstack features']
try:
resp = self._run_eos_cmds(cmd)
self.cli_commands['features'] = resp[0].get('features', {})
except (Exception, arista_exc.AristaRpcError):
self.cli_commands['features'] = {}
def check_vlan_type_driver_commands(self):
"""Checks the validity of CLI commands for Arista's VLAN type driver.
This method tries to execute the commands used exclusively by the
arista_vlan type driver and stores the commands if they succeed.
"""
cmd = ['show openstack resource-pool vlan region %s uuid'
% self.region]
try:
self._run_eos_cmds(cmd)
self.cli_commands['resource-pool'] = cmd
except arista_exc.AristaRpcError:
self.cli_commands['resource-pool'] = []
LOG.warning(
_LW("'resource-pool' command '%s' is not available on EOS"),
cmd)
def _heartbeat_required(self, sync, counter=0):
return (sync and self.cli_commands[const.CMD_SYNC_HEARTBEAT] and
(counter % const.HEARTBEAT_INTERVAL) == 0)
def get_vlan_assignment_uuid(self):
"""Returns the UUID for the region's vlan assignment on CVX
:returns: string containing the region's vlan assignment UUID
"""
vlan_uuid_cmd = self.cli_commands['resource-pool']
if vlan_uuid_cmd:
return self._run_eos_cmds(commands=vlan_uuid_cmd)[0]
return None
def get_vlan_allocation(self):
"""Returns the status of the region's VLAN pool in CVX
:returns: dictionary containg the assigned, allocated and available
VLANs for the region
"""
if not self.cli_commands['resource-pool']:
LOG.warning(_('The version of CVX you are using does not support'
'arista VLAN type driver.'))
return None
cmd = ['show openstack resource-pools region %s' % self.region]
command_output = self._run_eos_cmds(cmd)
if command_output:
regions = command_output[0]['physicalNetwork']
if self.region in regions.keys():
return regions[self.region]['vlanPool']['default']
return {'assignedVlans': '',
'availableVlans': '',
'allocatedVlans': ''}
def get_tenants(self):
cmds = ['show openstack config region %s' % self.region]
command_output = self._run_eos_cmds(cmds)
tenants = command_output[0]['tenants']
return tenants
def bm_and_dvr_supported(self):
return (self.cli_commands[const.CMD_INSTANCE] == 'instance')
def _baremetal_support_check(self, vnic_type):
# Basic error checking for baremental deployments
if (vnic_type == portbindings.VNIC_BAREMETAL and
not self.bm_and_dvr_supported()):
msg = _("Baremetal instances are not supported in this"
" release of EOS")
LOG.error(msg)
raise arista_exc.AristaConfigError(msg=msg)
def plug_port_into_network(self, device_id, host_id, port_id,
net_id, tenant_id, port_name, device_owner,
sg, orig_sg, vnic_type, segments,
switch_bindings=None):
if device_owner == n_const.DEVICE_OWNER_DHCP:
self.plug_dhcp_port_into_network(device_id,
host_id,
port_id,
net_id,
tenant_id,
segments,
port_name)
elif (device_owner.startswith('compute') or
device_owner.startswith('baremetal')):
if vnic_type == 'baremetal':
self.plug_baremetal_into_network(device_id,
host_id,
port_id,
net_id,
tenant_id,
segments,
port_name,
sg, orig_sg,
vnic_type,
switch_bindings)
else:
self.plug_host_into_network(device_id,
host_id,
port_id,
net_id,
tenant_id,
segments,
port_name)
elif device_owner == n_const.DEVICE_OWNER_DVR_INTERFACE:
self.plug_distributed_router_port_into_network(device_id,
host_id,
port_id,
net_id,
tenant_id,
segments)
def unplug_port_from_network(self, device_id, device_owner, hostname,
port_id, network_id, tenant_id, sg, vnic_type,
switch_bindings=None):
if device_owner == n_const.DEVICE_OWNER_DHCP:
self.unplug_dhcp_port_from_network(device_id,
hostname,
port_id,
network_id,
tenant_id)
elif (device_owner.startswith('compute') or
device_owner.startswith('baremetal')):
if vnic_type == 'baremetal':
self.unplug_baremetal_from_network(device_id,
hostname,
port_id,
network_id,
tenant_id,
sg,
vnic_type,
switch_bindings)
else:
self.unplug_host_from_network(device_id,
hostname,
port_id,
network_id,
tenant_id)
elif device_owner == n_const.DEVICE_OWNER_DVR_INTERFACE:
self.unplug_distributed_router_port_from_network(device_id,
port_id,
hostname,
tenant_id)
def plug_host_into_network(self, vm_id, host, port_id,
network_id, tenant_id, segments, port_name):
cmds = ['tenant %s' % tenant_id,
'vm id %s hostid %s' % (vm_id, host)]
if port_name:
cmds.append('port id %s name "%s" network-id %s' %
(port_id, port_name, network_id))
else:
cmds.append('port id %s network-id %s' %
(port_id, network_id))
cmds.extend(
'segment level %d id %s' % (level, segment['id'])
for level, segment in enumerate(segments))
self._run_openstack_cmds(cmds)
def plug_baremetal_into_network(self, vm_id, host, port_id,
network_id, tenant_id, segments, port_name,
sg=None, orig_sg=None,
vnic_type=None, switch_bindings=None):
# Basic error checking for baremental deployments
# notice that the following method throws and exception
# if an error condition exists
self._baremetal_support_check(vnic_type)
# For baremetal, add host to the topology
if switch_bindings and vnic_type == portbindings.VNIC_BAREMETAL:
cmds = ['tenant %s' % tenant_id]
cmds.append('instance id %s hostid %s type baremetal' %
(vm_id, host))
# This list keeps track of any ACLs that need to be rolled back
# in case we hit a failure trying to apply ACLs, and we end
# failing the transaction.
for binding in switch_bindings:
if not binding:
# skip all empty entries
continue
# Ensure that binding contains switch and port ID info
if binding['switch_id'] and binding['port_id']:
if port_name:
cmds.append('port id %s name "%s" network-id %s '
'type native switch-id %s switchport %s' %
(port_id, port_name, network_id,
binding['switch_id'], binding['port_id']))
else:
cmds.append('port id %s network-id %s type native '
'switch-id %s switchport %s' %
(port_id, network_id, binding['switch_id'],
binding['port_id']))
cmds.extend('segment level %d id %s' % (level,
segment['id'])
for level, segment in enumerate(segments))
else:
msg = _('switch and port ID not specified for baremetal')
LOG.error(msg)
raise arista_exc.AristaConfigError(msg=msg)
cmds.append('exit')
self._run_openstack_cmds(cmds)
if sg:
self.apply_security_group(sg, switch_bindings)
else:
# Security group was removed. Clean up the existing security
# groups.
if orig_sg:
self.remove_security_group(orig_sg, switch_bindings)
def plug_dhcp_port_into_network(self, dhcp_id, host, port_id,
network_id, tenant_id, segments,
port_name):
cmds = ['tenant %s' % tenant_id,
'network id %s' % network_id]
if port_name:
cmds.append('dhcp id %s hostid %s port-id %s name "%s"' %
(dhcp_id, host, port_id, port_name))
else:
cmds.append('dhcp id %s hostid %s port-id %s' %
(dhcp_id, host, port_id))
cmds.extend('segment level %d id %s' % (level, segment['id'])
for level, segment in enumerate(segments))
self._run_openstack_cmds(cmds)
def plug_distributed_router_port_into_network(self, router_id, host,
port_id, net_id, tenant_id,
segments):
if not self.bm_and_dvr_supported():
LOG.info(const.ERR_DVR_NOT_SUPPORTED)
return
cmds = ['tenant %s' % tenant_id,
'instance id %s type router' % router_id,
'port id %s network-id %s hostid %s' % (port_id, net_id, host)]
cmds.extend('segment level %d id %s' % (level, segment['id'])
for level, segment in enumerate(segments))
self._run_openstack_cmds(cmds)
def unplug_host_from_network(self, vm_id, host, port_id,
network_id, tenant_id):
cmds = ['tenant %s' % tenant_id,
'vm id %s hostid %s' % (vm_id, host),
'no port id %s' % port_id,
]
self._run_openstack_cmds(cmds)
def unplug_baremetal_from_network(self, vm_id, host, port_id,
network_id, tenant_id, sg, vnic_type,
switch_bindings=None):
# Basic error checking for baremental deployments
# notice that the following method throws and exception
# if an error condition exists
self._baremetal_support_check(vnic_type)
# Following is a temporary code for native VLANs - should be removed
cmds = ['tenant %s' % tenant_id]
cmds.append('instance id %s hostid %s type baremetal' % (vm_id, host))
cmds.append('no port id %s' % port_id)
self._run_openstack_cmds(cmds)
# SG - Remove security group rules from the port
# after deleting the instance
for binding in switch_bindings:
if not binding:
continue
self.security_group_driver.remove_acl(sg, binding['switch_id'],
binding['port_id'],
binding['switch_info'])
def unplug_dhcp_port_from_network(self, dhcp_id, host, port_id,
network_id, tenant_id):
cmds = ['tenant %s' % tenant_id,
'network id %s' % network_id,
'no dhcp id %s port-id %s' % (dhcp_id, port_id),
]
self._run_openstack_cmds(cmds)
def unplug_distributed_router_port_from_network(self, router_id,
port_id, host, tenant_id):
if not self.bm_and_dvr_supported():
LOG.info(const.ERR_DVR_NOT_SUPPORTED)
return
# When the last router port is removed, the router is deleted from EOS.
cmds = ['tenant %s' % tenant_id,
'instance id %s type router' % router_id,
'no port id %s hostid %s' % (port_id, host)]
self._run_openstack_cmds(cmds)
def create_network_bulk(self, tenant_id, network_list, sync=False):
cmds = ['tenant %s' % tenant_id]
# Create a reference to function to avoid name lookups in the loop
append_cmd = cmds.append
for counter, network in enumerate(network_list, 1):
try:
append_cmd('network id %s name "%s"' %
(network['network_id'], network['network_name']))
except KeyError:
append_cmd('network id %s' % network['network_id'])
cmds.extend(
'segment %s type %s id %d %s' % (
seg['id'] if self.hpb_supported() else 1,
seg['network_type'], seg['segmentation_id'],
('dynamic' if seg.get('is_dynamic', False) else 'static'
if self.hpb_supported() else ''))
for seg in network['segments']
if seg['network_type'] != const.NETWORK_TYPE_FLAT
)
shared_cmd = 'shared' if network['shared'] else 'no shared'
append_cmd(shared_cmd)
if self._heartbeat_required(sync, counter):
append_cmd(self.cli_commands[const.CMD_SYNC_HEARTBEAT])
if self._heartbeat_required(sync):
append_cmd(self.cli_commands[const.CMD_SYNC_HEARTBEAT])
self._run_openstack_cmds(cmds, sync=sync)
def create_network_segments(self, tenant_id, network_id,
network_name, segments):
if segments:
cmds = ['tenant %s' % tenant_id,
'network id %s name "%s"' % (network_id, network_name)]
cmds.extend(
'segment %s type %s id %d %s' % (
seg['id'], seg['network_type'], seg['segmentation_id'],
('dynamic' if seg.get('is_dynamic', False) else 'static'
if self.hpb_supported() else ''))
for seg in segments)
self._run_openstack_cmds(cmds)
def delete_network_segments(self, tenant_id, segments):
if not segments:
return
cmds = ['tenant %s' % tenant_id]
for segment in segments:
cmds.append('network id %s' % segment['network_id'])
cmds.append('no segment %s' % segment['id'])
self._run_openstack_cmds(cmds)
def delete_network_bulk(self, tenant_id, network_id_list, sync=False):
cmds = ['tenant %s' % tenant_id]
for counter, network_id in enumerate(network_id_list, 1):
cmds.append('no network id %s' % network_id)
if self._heartbeat_required(sync, counter):
cmds.append(self.cli_commands[const.CMD_SYNC_HEARTBEAT])
if self._heartbeat_required(sync):
cmds.append(self.cli_commands[const.CMD_SYNC_HEARTBEAT])
self._run_openstack_cmds(cmds, sync=sync)
def delete_vm_bulk(self, tenant_id, vm_id_list, sync=False):
cmds = ['tenant %s' % tenant_id]
counter = 0
for vm_id in vm_id_list:
counter += 1
cmds.append('no vm id %s' % vm_id)
if self._heartbeat_required(sync, counter):
cmds.append(self.cli_commands[const.CMD_SYNC_HEARTBEAT])
if self._heartbeat_required(sync):
cmds.append(self.cli_commands[const.CMD_SYNC_HEARTBEAT])
self._run_openstack_cmds(cmds, sync=sync)
def delete_instance_bulk(self, tenant_id, instance_id_list, instance_type,
sync=False):
cmds = ['tenant %s' % tenant_id]
counter = 0
for instance in instance_id_list:
counter += 1
cmds.append('no instance id %s' % instance)
if self._heartbeat_required(sync, counter):
cmds.append(self.cli_commands[const.CMD_SYNC_HEARTBEAT])
if self._heartbeat_required(sync):
cmds.append(self.cli_commands[const.CMD_SYNC_HEARTBEAT])
self._run_openstack_cmds(cmds, sync=sync)
def create_instance_bulk(self, tenant_id, neutron_ports, vms,
port_profiles, sync=False):
cmds = ['tenant %s' % tenant_id]
# Create a reference to function to avoid name lookups in the loop
append_cmd = cmds.append
counter = 0
for vm in vms.values():
counter += 1
for v_port in vm['ports']:
port_id = v_port['portId']
if not v_port['hosts']:
# Skip all the ports that have no host associsted with them
continue
if port_id not in neutron_ports.keys():
continue
neutron_port = neutron_ports[port_id]
port_name = ''
if 'name' in neutron_port:
port_name = 'name "%s"' % neutron_port['name']
device_owner = neutron_port['device_owner']
vnic_type = port_profiles[port_id]['vnic_type']
network_id = neutron_port['network_id']
segments = []
if self.hpb_supported():
segments = self._ndb.get_all_network_segments(network_id)
if device_owner == n_const.DEVICE_OWNER_DHCP:
append_cmd('network id %s' % neutron_port['network_id'])
append_cmd('dhcp id %s hostid %s port-id %s %s' %
(vm['vmId'], v_port['hosts'][0],
neutron_port['id'], port_name))
cmds.extend(
'segment level %d id %s' % (level, segment['id'])
for level, segment in enumerate(segments))
elif (device_owner.startswith('compute') or
device_owner.startswith('baremetal')):
if vnic_type == 'baremetal':
append_cmd('instance id %s hostid %s type baremetal' %
(vm['vmId'], v_port['hosts'][0]))
profile = port_profiles[neutron_port['id']]
profile = json.loads(profile['profile'])
for binding in profile['local_link_information']:
if not binding or not isinstance(binding, dict):
# skip all