retire the NSX MH plugin
This patch retires the NSX MH plugin by: - Deleting the nsx_mh plugin and unit test code. - Using the NSX-V and V3 plugin test base classes where needed. - Removing any extensions that are MH specific. Change-Id: Idf65e44c301e790ca4ea69a6a8735aa0309a0dcc
This commit is contained in:
parent
b5f59ece91
commit
26135f34ac
@ -11,6 +11,7 @@
|
||||
# under the License.
|
||||
|
||||
from neutron.common import eventlet_utils
|
||||
from neutron.db.models import securitygroup # noqa
|
||||
|
||||
eventlet_utils.monkey_patch()
|
||||
|
||||
|
@ -1,162 +0,0 @@
|
||||
# Copyright 2013 VMware, Inc.
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from neutron.common import config
|
||||
|
||||
from vmware_nsx._i18n import _
|
||||
from vmware_nsx.common import config as nsx_config # noqa
|
||||
from vmware_nsx.common import nsx_utils
|
||||
from vmware_nsx.nsxlib import mh as nsxlib
|
||||
|
||||
config.setup_logging()
|
||||
|
||||
|
||||
def help(name):
|
||||
print("Usage: %s path/to/neutron/plugin/ini/config/file" % name)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def get_nsx_controllers(cluster):
|
||||
return cluster.nsx_controllers
|
||||
|
||||
|
||||
def config_helper(config_entity, cluster):
|
||||
try:
|
||||
return nsxlib.do_request('GET',
|
||||
"/ws.v1/%s?fields=uuid" % config_entity,
|
||||
cluster=cluster).get('results', [])
|
||||
except Exception as e:
|
||||
msg = (_("Error '%(err)s' when connecting to controller(s): %(ctl)s.")
|
||||
% {'err': str(e),
|
||||
'ctl': ', '.join(get_nsx_controllers(cluster))})
|
||||
raise Exception(msg)
|
||||
|
||||
|
||||
def get_control_cluster_nodes(cluster):
|
||||
return config_helper("control-cluster/node", cluster)
|
||||
|
||||
|
||||
def get_gateway_services(cluster):
|
||||
ret_gw_services = {"L2GatewayServiceConfig": [],
|
||||
"L3GatewayServiceConfig": []}
|
||||
gw_services = config_helper("gateway-service", cluster)
|
||||
for gw_service in gw_services:
|
||||
ret_gw_services[gw_service['type']].append(gw_service['uuid'])
|
||||
return ret_gw_services
|
||||
|
||||
|
||||
def get_transport_zones(cluster):
|
||||
transport_zones = config_helper("transport-zone", cluster)
|
||||
return [transport_zone['uuid'] for transport_zone in transport_zones]
|
||||
|
||||
|
||||
def get_transport_nodes(cluster):
|
||||
transport_nodes = config_helper("transport-node", cluster)
|
||||
return [transport_node['uuid'] for transport_node in transport_nodes]
|
||||
|
||||
|
||||
def is_transport_node_connected(cluster, node_uuid):
|
||||
try:
|
||||
return nsxlib.do_request('GET',
|
||||
"/ws.v1/transport-node/%s/status" % node_uuid,
|
||||
cluster=cluster)['connection']['connected']
|
||||
except Exception as e:
|
||||
msg = (_("Error '%(err)s' when connecting to controller(s): %(ctl)s.")
|
||||
% {'err': str(e),
|
||||
'ctl': ', '.join(get_nsx_controllers(cluster))})
|
||||
raise Exception(msg)
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 2:
|
||||
help(sys.argv[0])
|
||||
args = ['--config-file']
|
||||
args.append(sys.argv[1])
|
||||
config.init(args)
|
||||
print("----------------------- Database Options -----------------------")
|
||||
print("\tconnection: %s" % cfg.CONF.database.connection)
|
||||
print("\tretry_interval: %d" % cfg.CONF.database.retry_interval)
|
||||
print("\tmax_retries: %d" % cfg.CONF.database.max_retries)
|
||||
print("----------------------- NSX Options -----------------------")
|
||||
print("\tNSX Generation Timeout %d" % cfg.CONF.NSX.nsx_gen_timeout)
|
||||
print("\tNumber of concurrent connections to each controller %d" %
|
||||
cfg.CONF.NSX.concurrent_connections)
|
||||
print("\tmax_lp_per_bridged_ls: %s" % cfg.CONF.NSX.max_lp_per_bridged_ls)
|
||||
print("\tmax_lp_per_overlay_ls: %s" % cfg.CONF.NSX.max_lp_per_overlay_ls)
|
||||
print("----------------------- Cluster Options -----------------------")
|
||||
print("\tretries: %s" % cfg.CONF.retries)
|
||||
print("\tredirects: %s" % cfg.CONF.redirects)
|
||||
print("\thttp_timeout: %s" % cfg.CONF.http_timeout)
|
||||
cluster = nsx_utils.create_nsx_cluster(
|
||||
cfg.CONF,
|
||||
cfg.CONF.NSX.concurrent_connections,
|
||||
cfg.CONF.NSX.nsx_gen_timeout)
|
||||
nsx_controllers = get_nsx_controllers(cluster)
|
||||
num_controllers = len(nsx_controllers)
|
||||
print("Number of controllers found: %s" % num_controllers)
|
||||
if num_controllers == 0:
|
||||
print("You must specify at least one controller!")
|
||||
sys.exit(1)
|
||||
|
||||
get_control_cluster_nodes(cluster)
|
||||
for controller in nsx_controllers:
|
||||
print("\tController endpoint: %s" % controller)
|
||||
gateway_services = get_gateway_services(cluster)
|
||||
default_gateways = {
|
||||
"L2GatewayServiceConfig": cfg.CONF.default_l2_gw_service_uuid,
|
||||
"L3GatewayServiceConfig": cfg.CONF.default_l3_gw_service_uuid}
|
||||
errors = 0
|
||||
for svc_type in default_gateways.keys():
|
||||
for uuid in gateway_services[svc_type]:
|
||||
print("\t\tGateway(%s) uuid: %s" % (svc_type, uuid))
|
||||
if (default_gateways[svc_type] and
|
||||
default_gateways[svc_type] not in gateway_services[svc_type]):
|
||||
print("\t\t\tError: specified default %s gateway (%s) is "
|
||||
"missing from NSX Gateway Services!" % (
|
||||
svc_type,
|
||||
default_gateways[svc_type]))
|
||||
errors += 1
|
||||
transport_zones = get_transport_zones(cluster)
|
||||
print("\tTransport zones: %s" % transport_zones)
|
||||
if cfg.CONF.default_tz_uuid not in transport_zones:
|
||||
print("\t\tError: specified default transport zone "
|
||||
"(%s) is missing from NSX transport zones!"
|
||||
% cfg.CONF.default_tz_uuid)
|
||||
errors += 1
|
||||
transport_nodes = get_transport_nodes(cluster)
|
||||
print("\tTransport nodes: %s" % transport_nodes)
|
||||
node_errors = []
|
||||
for node in transport_nodes:
|
||||
if not is_transport_node_connected(cluster, node):
|
||||
node_errors.append(node)
|
||||
|
||||
# Use different exit codes, so that we can distinguish
|
||||
# between config and runtime errors
|
||||
if len(node_errors):
|
||||
print("\nThere are one or more transport nodes that are "
|
||||
"not connected: %s. Please, revise!" % node_errors)
|
||||
sys.exit(10)
|
||||
elif errors:
|
||||
print("\nThere are %d errors with your configuration. "
|
||||
"Please, revise!" % errors)
|
||||
sys.exit(12)
|
||||
else:
|
||||
print("Done.")
|
@ -17,42 +17,17 @@ from neutron_lib.api.definitions import multiprovidernet as mpnet_apidef
|
||||
from neutron_lib.api.definitions import provider_net as pnet
|
||||
from neutron_lib.api import validators
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from oslo_log import log
|
||||
import six
|
||||
|
||||
from vmware_nsx.api_client import client
|
||||
from vmware_nsx.api_client import exception as api_exc
|
||||
from vmware_nsx.common import utils as vmw_utils
|
||||
from vmware_nsx.db import db as nsx_db
|
||||
from vmware_nsx.db import networkgw_db
|
||||
from vmware_nsx import nsx_cluster
|
||||
from vmware_nsx.nsxlib.mh import l2gateway as l2gwlib
|
||||
from vmware_nsx.nsxlib.mh import router as routerlib
|
||||
from vmware_nsx.nsxlib.mh import secgroup as secgrouplib
|
||||
from vmware_nsx.nsxlib.mh import switch as switchlib
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def fetch_nsx_switches(session, cluster, neutron_net_id):
|
||||
"""Retrieve logical switches for a neutron network.
|
||||
|
||||
This function is optimized for fetching all the lswitches always
|
||||
with a single NSX query.
|
||||
If there is more than 1 logical switch (chained switches use case)
|
||||
NSX lswitches are queried by 'quantum_net_id' tag. Otherwise the NSX
|
||||
lswitch is directly retrieved by id (more efficient).
|
||||
"""
|
||||
nsx_switch_ids = get_nsx_switch_ids(session, cluster, neutron_net_id)
|
||||
if len(nsx_switch_ids) > 1:
|
||||
lswitches = switchlib.get_lswitches(cluster, neutron_net_id)
|
||||
else:
|
||||
lswitches = [switchlib.get_lswitch_by_id(
|
||||
cluster, nsx_switch_ids[0])]
|
||||
return lswitches
|
||||
|
||||
|
||||
def get_nsx_switch_ids(session, cluster, neutron_network_id):
|
||||
"""Return the NSX switch id for a given neutron network.
|
||||
|
||||
@ -134,77 +109,6 @@ def get_nsx_switch_and_port_id(session, cluster, neutron_port_id):
|
||||
return nsx_switch_id, nsx_port_id
|
||||
|
||||
|
||||
def get_nsx_security_group_id(session, cluster, neutron_id):
|
||||
"""Return the NSX sec profile uuid for a given neutron sec group.
|
||||
|
||||
First, look up the Neutron database. If not found, execute
|
||||
a query on NSX platform as the mapping might be missing.
|
||||
NOTE: Security groups are called 'security profiles' on the NSX backend.
|
||||
"""
|
||||
nsx_id = nsx_db.get_nsx_security_group_id(session, neutron_id)
|
||||
if not nsx_id:
|
||||
# Find security profile on backend.
|
||||
# This is a rather expensive query, but it won't be executed
|
||||
# more than once for each security group in Neutron's lifetime
|
||||
nsx_sec_profiles = secgrouplib.query_security_profiles(
|
||||
cluster, '*',
|
||||
filters={'tag': neutron_id,
|
||||
'tag_scope': 'q_sec_group_id'})
|
||||
# Only one result expected
|
||||
# NOTE(salv-orlando): Not handling the case where more than one
|
||||
# security profile is found with the same neutron port tag
|
||||
if not nsx_sec_profiles:
|
||||
LOG.warning("Unable to find NSX security profile for Neutron "
|
||||
"security group %s", neutron_id)
|
||||
return
|
||||
elif len(nsx_sec_profiles) > 1:
|
||||
LOG.warning("Multiple NSX security profiles found for Neutron "
|
||||
"security group %s", neutron_id)
|
||||
nsx_sec_profile = nsx_sec_profiles[0]
|
||||
nsx_id = nsx_sec_profile['uuid']
|
||||
with session.begin(subtransactions=True):
|
||||
# Create DB mapping
|
||||
nsx_db.add_neutron_nsx_security_group_mapping(
|
||||
session, neutron_id, nsx_id)
|
||||
return nsx_id
|
||||
|
||||
|
||||
def get_nsx_router_id(session, cluster, neutron_router_id):
|
||||
"""Return the NSX router uuid for a given neutron router.
|
||||
|
||||
First, look up the Neutron database. If not found, execute
|
||||
a query on NSX platform as the mapping might be missing.
|
||||
"""
|
||||
if not neutron_router_id:
|
||||
return
|
||||
nsx_router_id = nsx_db.get_nsx_router_id(
|
||||
session, neutron_router_id)
|
||||
if not nsx_router_id:
|
||||
# Find logical router from backend.
|
||||
# This is a rather expensive query, but it won't be executed
|
||||
# more than once for each router in Neutron's lifetime
|
||||
nsx_routers = routerlib.query_lrouters(
|
||||
cluster, '*',
|
||||
filters={'tag': neutron_router_id,
|
||||
'tag_scope': 'q_router_id'})
|
||||
# Only one result expected
|
||||
# NOTE(salv-orlando): Not handling the case where more than one
|
||||
# port is found with the same neutron port tag
|
||||
if not nsx_routers:
|
||||
LOG.warning("Unable to find NSX router for Neutron router %s",
|
||||
neutron_router_id)
|
||||
return
|
||||
nsx_router = nsx_routers[0]
|
||||
nsx_router_id = nsx_router['uuid']
|
||||
with session.begin(subtransactions=True):
|
||||
# Create DB mapping
|
||||
nsx_db.add_neutron_nsx_router_mapping(
|
||||
session,
|
||||
neutron_router_id,
|
||||
nsx_router_id)
|
||||
return nsx_router_id
|
||||
|
||||
|
||||
def create_nsx_cluster(cluster_opts, concurrent_connections, gen_timeout):
|
||||
cluster = nsx_cluster.NSXCluster(**cluster_opts)
|
||||
|
||||
@ -223,39 +127,6 @@ def create_nsx_cluster(cluster_opts, concurrent_connections, gen_timeout):
|
||||
return cluster
|
||||
|
||||
|
||||
def get_nsx_device_status(cluster, nsx_uuid):
|
||||
try:
|
||||
status_up = l2gwlib.get_gateway_device_status(
|
||||
cluster, nsx_uuid)
|
||||
if status_up:
|
||||
return networkgw_db.STATUS_ACTIVE
|
||||
else:
|
||||
return networkgw_db.STATUS_DOWN
|
||||
except api_exc.NsxApiException:
|
||||
return networkgw_db.STATUS_UNKNOWN
|
||||
except n_exc.NotFound:
|
||||
return networkgw_db.ERROR
|
||||
|
||||
|
||||
def get_nsx_device_statuses(cluster, tenant_id):
|
||||
try:
|
||||
status_dict = l2gwlib.get_gateway_devices_status(
|
||||
cluster, tenant_id)
|
||||
return dict((nsx_device_id,
|
||||
networkgw_db.STATUS_ACTIVE if connected
|
||||
else networkgw_db.STATUS_DOWN) for
|
||||
(nsx_device_id, connected) in six.iteritems(status_dict))
|
||||
except api_exc.NsxApiException:
|
||||
# Do not make a NSX API exception fatal
|
||||
if tenant_id:
|
||||
LOG.warning("Unable to retrieve operational status for "
|
||||
"gateway devices belonging to tenant: %s",
|
||||
tenant_id)
|
||||
else:
|
||||
LOG.warning("Unable to retrieve operational status for "
|
||||
"gateway devices")
|
||||
|
||||
|
||||
def _convert_bindings_to_nsx_transport_zones(bindings):
|
||||
nsx_transport_zones_config = []
|
||||
for binding in bindings:
|
||||
|
@ -1,136 +0,0 @@
|
||||
# Copyright 2013 VMware, Inc.
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log
|
||||
import six
|
||||
|
||||
from vmware_nsx.common import nsx_utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
# Protocol number look up for supported protocols
|
||||
protocol_num_look_up = {'tcp': 6, 'icmp': 1, 'udp': 17, 'ipv6-icmp': 58}
|
||||
|
||||
|
||||
def _convert_to_nsx_rule(session, cluster, rule, with_id=False):
|
||||
"""Converts a Neutron security group rule to the NSX format.
|
||||
|
||||
This routine also replaces Neutron IDs with NSX UUIDs.
|
||||
"""
|
||||
nsx_rule = {}
|
||||
params = ['remote_ip_prefix', 'protocol',
|
||||
'remote_group_id', 'port_range_min',
|
||||
'port_range_max', 'ethertype']
|
||||
if with_id:
|
||||
params.append('id')
|
||||
|
||||
for param in params:
|
||||
value = rule.get(param)
|
||||
if param not in rule:
|
||||
nsx_rule[param] = value
|
||||
elif not value:
|
||||
pass
|
||||
elif param == 'remote_ip_prefix':
|
||||
nsx_rule['ip_prefix'] = rule['remote_ip_prefix']
|
||||
elif param == 'remote_group_id':
|
||||
nsx_rule['profile_uuid'] = nsx_utils.get_nsx_security_group_id(
|
||||
session, cluster, rule['remote_group_id'])
|
||||
|
||||
elif param == 'protocol':
|
||||
try:
|
||||
nsx_rule['protocol'] = int(rule['protocol'])
|
||||
except (ValueError, TypeError):
|
||||
nsx_rule['protocol'] = (
|
||||
protocol_num_look_up[rule['protocol']])
|
||||
else:
|
||||
nsx_rule[param] = value
|
||||
return nsx_rule
|
||||
|
||||
|
||||
def _convert_to_nsx_rules(session, cluster, rules, with_id=False):
|
||||
"""Converts a list of Neutron security group rules to the NSX format."""
|
||||
nsx_rules = {'logical_port_ingress_rules': [],
|
||||
'logical_port_egress_rules': []}
|
||||
for direction in ['logical_port_ingress_rules',
|
||||
'logical_port_egress_rules']:
|
||||
for rule in rules[direction]:
|
||||
nsx_rules[direction].append(
|
||||
_convert_to_nsx_rule(session, cluster, rule, with_id))
|
||||
return nsx_rules
|
||||
|
||||
|
||||
def get_security_group_rules_nsx_format(session, cluster,
|
||||
security_group_rules, with_id=False):
|
||||
"""Convert neutron security group rules into NSX format.
|
||||
|
||||
This routine splits Neutron security group rules into two lists, one
|
||||
for ingress rules and the other for egress rules.
|
||||
"""
|
||||
|
||||
def fields(rule):
|
||||
_fields = ['remote_ip_prefix', 'remote_group_id', 'protocol',
|
||||
'port_range_min', 'port_range_max', 'protocol', 'ethertype']
|
||||
if with_id:
|
||||
_fields.append('id')
|
||||
return dict((k, v) for k, v in six.iteritems(rule) if k in _fields)
|
||||
|
||||
ingress_rules = []
|
||||
egress_rules = []
|
||||
for rule in security_group_rules:
|
||||
if rule.get('souce_group_id'):
|
||||
rule['remote_group_id'] = nsx_utils.get_nsx_security_group_id(
|
||||
session, cluster, rule['remote_group_id'])
|
||||
|
||||
if rule['direction'] == 'ingress':
|
||||
ingress_rules.append(fields(rule))
|
||||
elif rule['direction'] == 'egress':
|
||||
egress_rules.append(fields(rule))
|
||||
rules = {'logical_port_ingress_rules': egress_rules,
|
||||
'logical_port_egress_rules': ingress_rules}
|
||||
return _convert_to_nsx_rules(session, cluster, rules, with_id)
|
||||
|
||||
|
||||
def merge_security_group_rules_with_current(session, cluster,
|
||||
new_rules, current_rules):
|
||||
merged_rules = get_security_group_rules_nsx_format(
|
||||
session, cluster, current_rules)
|
||||
for new_rule in new_rules:
|
||||
rule = new_rule['security_group_rule']
|
||||
if rule['direction'] == 'ingress':
|
||||
merged_rules['logical_port_egress_rules'].append(
|
||||
_convert_to_nsx_rule(session, cluster, rule))
|
||||
elif rule['direction'] == 'egress':
|
||||
merged_rules['logical_port_ingress_rules'].append(
|
||||
_convert_to_nsx_rule(session, cluster, rule))
|
||||
return merged_rules
|
||||
|
||||
|
||||
def remove_security_group_with_id_and_id_field(rules, rule_id):
|
||||
"""Remove rule by rule_id.
|
||||
|
||||
This function receives all of the current rule associated with a
|
||||
security group and then removes the rule that matches the rule_id. In
|
||||
addition it removes the id field in the dict with each rule since that
|
||||
should not be passed to nsx.
|
||||
"""
|
||||
for rule_direction in rules.values():
|
||||
item_to_remove = None
|
||||
for port_rule in rule_direction:
|
||||
if port_rule['id'] == rule_id:
|
||||
item_to_remove = port_rule
|
||||
else:
|
||||
# remove key from dictionary for NSX
|
||||
del port_rule['id']
|
||||
if item_to_remove:
|
||||
rule_direction.remove(item_to_remove)
|
@ -1,688 +0,0 @@
|
||||
# Copyright 2013 VMware, Inc.
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import random
|
||||
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import context as n_context
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib.db import model_query
|
||||
from neutron_lib import exceptions
|
||||
from neutron_lib.exceptions import l3 as l3_exc
|
||||
from oslo_log import log
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
from neutron.db.models import external_net as external_net_db
|
||||
from neutron.db.models import l3 as l3_db
|
||||
from neutron.db import models_v2
|
||||
|
||||
from vmware_nsx._i18n import _
|
||||
from vmware_nsx.api_client import exception as api_exc
|
||||
from vmware_nsx.common import exceptions as nsx_exc
|
||||
from vmware_nsx.common import nsx_utils
|
||||
from vmware_nsx.nsxlib import mh as nsxlib
|
||||
from vmware_nsx.nsxlib.mh import router as routerlib
|
||||
from vmware_nsx.nsxlib.mh import switch as switchlib
|
||||
|
||||
# Maximum page size for a single request
|
||||
# NOTE(salv-orlando): This might become a version-dependent map should the
|
||||
# limit be raised in future versions
|
||||
MAX_PAGE_SIZE = 5000
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class NsxCache(object):
|
||||
"""A simple Cache for NSX resources.
|
||||
|
||||
Associates resource id with resource hash to rapidly identify
|
||||
updated resources.
|
||||
Each entry in the cache also stores the following information:
|
||||
- changed: the resource in the cache has been altered following
|
||||
an update or a delete
|
||||
- hit: the resource has been visited during an update (and possibly
|
||||
left unchanged)
|
||||
- data: current resource data
|
||||
- data_bk: backup of resource data prior to its removal
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Maps an uuid to the dict containing it
|
||||
self._uuid_dict_mappings = {}
|
||||
# Dicts for NSX cached resources
|
||||
self._lswitches = {}
|
||||
self._lswitchports = {}
|
||||
self._lrouters = {}
|
||||
|
||||
def __getitem__(self, key):
|
||||
# uuids are unique across the various types of resources
|
||||
# TODO(salv-orlando): Avoid lookups over all dictionaries
|
||||
# when retrieving items
|
||||
# Fetch lswitches, lports, or lrouters
|
||||
resources = self._uuid_dict_mappings[key]
|
||||
return resources[key]
|
||||
|
||||
def _clear_changed_flag_and_remove_from_cache(self, resources):
|
||||
# Clear the 'changed' attribute for all items
|
||||
# NOTE(arosen): the copy.copy is to avoid: 'RuntimeError:
|
||||
# dictionary changed size during iteration' for py3
|
||||
|
||||
for uuid, item in copy.copy(resources).items():
|
||||
if item.pop('changed', None) and not item.get('data'):
|
||||
# The item is not anymore in NSX, so delete it
|
||||
del resources[uuid]
|
||||
del self._uuid_dict_mappings[uuid]
|
||||
LOG.debug("Removed item %s from NSX object cache", uuid)
|
||||
|
||||
def _update_resources(self, resources, new_resources, clear_changed=True):
|
||||
if clear_changed:
|
||||
self._clear_changed_flag_and_remove_from_cache(resources)
|
||||
|
||||
def do_hash(item):
|
||||
return hash(jsonutils.dumps(item))
|
||||
|
||||
# Parse new data and identify new, deleted, and updated resources
|
||||
for item in new_resources:
|
||||
item_id = item['uuid']
|
||||
if resources.get(item_id):
|
||||
new_hash = do_hash(item)
|
||||
if new_hash != resources[item_id]['hash']:
|
||||
resources[item_id]['hash'] = new_hash
|
||||
resources[item_id]['changed'] = True
|
||||
resources[item_id]['data_bk'] = (
|
||||
resources[item_id]['data'])
|
||||
resources[item_id]['data'] = item
|
||||
# Mark the item as hit in any case
|
||||
resources[item_id]['hit'] = True
|
||||
LOG.debug("Updating item %s in NSX object cache", item_id)
|
||||
else:
|
||||
resources[item_id] = {'hash': do_hash(item)}
|
||||
resources[item_id]['hit'] = True
|
||||
resources[item_id]['changed'] = True
|
||||
resources[item_id]['data'] = item
|
||||
# add an uuid to dict mapping for easy retrieval
|
||||
# with __getitem__
|
||||
self._uuid_dict_mappings[item_id] = resources
|
||||
LOG.debug("Added item %s to NSX object cache", item_id)
|
||||
|
||||
def _delete_resources(self, resources):
|
||||
# Mark for removal all the elements which have not been visited.
|
||||
# And clear the 'hit' attribute.
|
||||
for to_delete in [k for (k, v) in six.iteritems(resources)
|
||||
if not v.pop('hit', False)]:
|
||||
resources[to_delete]['changed'] = True
|
||||
resources[to_delete]['data_bk'] = (
|
||||
resources[to_delete].pop('data', None))
|
||||
|
||||
def _get_resource_ids(self, resources, changed_only):
|
||||
if changed_only:
|
||||
return [k for (k, v) in six.iteritems(resources)
|
||||
if v.get('changed')]
|
||||
return resources.keys()
|
||||
|
||||
def get_lswitches(self, changed_only=False):
|
||||
return self._get_resource_ids(self._lswitches, changed_only)
|
||||
|
||||
def get_lrouters(self, changed_only=False):
|
||||
return self._get_resource_ids(self._lrouters, changed_only)
|
||||
|
||||
def get_lswitchports(self, changed_only=False):
|
||||
return self._get_resource_ids(self._lswitchports, changed_only)
|
||||
|
||||
def update_lswitch(self, lswitch):
|
||||
self._update_resources(self._lswitches, [lswitch], clear_changed=False)
|
||||
|
||||
def update_lrouter(self, lrouter):
|
||||
self._update_resources(self._lrouters, [lrouter], clear_changed=False)
|
||||
|
||||
def update_lswitchport(self, lswitchport):
|
||||
self._update_resources(self._lswitchports, [lswitchport],
|
||||
clear_changed=False)
|
||||
|
||||
def process_updates(self, lswitches=None,
|
||||
lrouters=None, lswitchports=None):
|
||||
self._update_resources(self._lswitches, lswitches)
|
||||
self._update_resources(self._lrouters, lrouters)
|
||||
self._update_resources(self._lswitchports, lswitchports)
|
||||
return (self._get_resource_ids(self._lswitches, changed_only=True),
|
||||
self._get_resource_ids(self._lrouters, changed_only=True),
|
||||
self._get_resource_ids(self._lswitchports, changed_only=True))
|
||||
|
||||
def process_deletes(self):
|
||||
self._delete_resources(self._lswitches)
|
||||
self._delete_resources(self._lrouters)
|
||||
self._delete_resources(self._lswitchports)
|
||||
return (self._get_resource_ids(self._lswitches, changed_only=True),
|
||||
self._get_resource_ids(self._lrouters, changed_only=True),
|
||||
self._get_resource_ids(self._lswitchports, changed_only=True))
|
||||
|
||||
|
||||
class SyncParameters(object):
|
||||
"""Defines attributes used by the synchronization procedure.
|
||||
|
||||
chunk_size: Actual chunk size
|
||||
extra_chunk_size: Additional data to fetch because of chunk size
|
||||
adjustment
|
||||
current_chunk: Counter of the current data chunk being synchronized
|
||||
Page cursors: markers for the next resource to fetch.
|
||||
'start' means page cursor unset for fetching 1st page
|
||||
init_sync_performed: True if the initial synchronization concluded
|
||||
"""
|
||||
|
||||
def __init__(self, min_chunk_size):
|
||||
self.chunk_size = min_chunk_size
|
||||
self.extra_chunk_size = 0
|
||||
self.current_chunk = 0
|
||||
self.ls_cursor = 'start'
|
||||
self.lr_cursor = 'start'
|
||||
self.lp_cursor = 'start'
|
||||
self.init_sync_performed = False
|
||||
self.total_size = 0
|
||||
|
||||
|
||||
def _start_loopingcall(min_chunk_size, state_sync_interval, func,
|
||||
initial_delay=5):
|
||||
"""Start a loopingcall for the synchronization task."""
|
||||
# Start a looping call to synchronize operational status
|
||||
# for neutron resources
|
||||
if not state_sync_interval:
|
||||
# do not start the looping call if specified
|
||||
# sync interval is 0
|
||||
return
|
||||
state_synchronizer = loopingcall.DynamicLoopingCall(
|
||||
func, sp=SyncParameters(min_chunk_size))
|
||||
state_synchronizer.start(
|
||||
initial_delay=initial_delay,
|
||||
periodic_interval_max=state_sync_interval)
|
||||
return state_synchronizer
|
||||
|
||||
|
||||
class NsxSynchronizer(object):
|
||||
|
||||
LS_URI = nsxlib._build_uri_path(
|
||||
switchlib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
|
||||
relations='LogicalSwitchStatus')
|
||||
LR_URI = nsxlib._build_uri_path(
|
||||
routerlib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status',
|
||||
relations='LogicalRouterStatus')
|
||||
LP_URI = nsxlib._build_uri_path(
|
||||
switchlib.LSWITCHPORT_RESOURCE,
|
||||
parent_resource_id='*',
|
||||
fields='uuid,tags,fabric_status_up',
|
||||
relations='LogicalPortStatus')
|
||||
|
||||
def __init__(self, plugin, cluster, state_sync_interval,
|
||||
req_delay, min_chunk_size, max_rand_delay=0,
|
||||
initial_delay=5):
|
||||
random.seed()
|
||||
self._nsx_cache = NsxCache()
|
||||
# Store parameters as instance members
|
||||
# NOTE(salv-orlando): apologies if it looks java-ish
|
||||
self._plugin = plugin
|
||||
self._cluster = cluster
|
||||
self._req_delay = req_delay
|
||||
self._sync_interval = state_sync_interval
|
||||
self._max_rand_delay = max_rand_delay
|
||||
# Validate parameters
|
||||
if self._sync_interval < self._req_delay:
|
||||
err_msg = (_("Minimum request delay:%(req_delay)s must not "
|
||||
"exceed synchronization interval:%(sync_interval)s") %
|
||||
{'req_delay': self._req_delay,
|
||||
'sync_interval': self._sync_interval})
|
||||
LOG.error(err_msg)
|
||||
raise nsx_exc.NsxPluginException(err_msg=err_msg)
|
||||
# Backoff time in case of failures while fetching sync data
|
||||
self._sync_backoff = 1
|
||||
# Store the looping call in an instance variable to allow unit tests
|
||||
# for controlling its lifecycle
|
||||
self._sync_looping_call = _start_loopingcall(
|
||||
min_chunk_size, state_sync_interval,
|
||||
self._synchronize_state, initial_delay=initial_delay)
|
||||
|
||||
def _get_tag_dict(self, tags):
|
||||
return dict((tag.get('scope'), tag['tag']) for tag in tags)
|
||||
|
||||
def synchronize_network(self, context, neutron_network_data,
|
||||
lswitches=None):
|
||||
"""Synchronize a Neutron network with its NSX counterpart.
|
||||
|
||||
This routine synchronizes a set of switches when a Neutron
|
||||
network is mapped to multiple lswitches.
|
||||
"""
|
||||
if not lswitches:
|
||||
# Try to get logical switches from nsx
|
||||
try:
|
||||
lswitches = nsx_utils.fetch_nsx_switches(
|
||||
context.session, self._cluster,
|
||||
neutron_network_data['id'])
|
||||
except exceptions.NetworkNotFound:
|
||||
# TODO(salv-orlando): We should be catching
|
||||
# api_exc.ResourceNotFound here
|
||||
# The logical switch was not found
|
||||
LOG.warning("Logical switch for neutron network %s not "
|
||||
"found on NSX.", neutron_network_data['id'])
|
||||
lswitches = []
|
||||
else:
|
||||
for lswitch in lswitches:
|
||||
self._nsx_cache.update_lswitch(lswitch)
|
||||
# By default assume things go wrong
|
||||
status = constants.NET_STATUS_ERROR
|
||||
# In most cases lswitches will contain a single element
|
||||
for ls in lswitches:
|
||||
if not ls:
|
||||
# Logical switch was deleted
|
||||
break
|
||||
ls_status = ls['_relations']['LogicalSwitchStatus']
|
||||
if not ls_status['fabric_status']:
|
||||
status = constants.NET_STATUS_DOWN
|
||||
break
|
||||
else:
|
||||
# No switch was down or missing. Set status to ACTIVE unless
|
||||
# there were no switches in the first place!
|
||||
if lswitches:
|
||||
status = constants.NET_STATUS_ACTIVE
|
||||
# Update db object
|
||||
if status == neutron_network_data['status']:
|
||||
# do nothing
|
||||
return
|
||||
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
try:
|
||||
network = self._plugin._get_network(context,
|
||||
neutron_network_data['id'])
|
||||
except exceptions.NetworkNotFound:
|
||||
pass
|
||||
else:
|
||||
network.status = status
|
||||
LOG.debug("Updating status for neutron resource %(q_id)s to:"
|
||||
" %(status)s",
|
||||
{'q_id': neutron_network_data['id'],
|
||||
'status': status})
|
||||
|
||||
def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False):
|
||||
if not ls_uuids and not scan_missing:
|
||||
return
|
||||
neutron_net_ids = set()
|
||||
neutron_nsx_mappings = {}
|
||||
# TODO(salvatore-orlando): Deal with the case the tag
|
||||
# has been tampered with
|
||||
for ls_uuid in ls_uuids:
|
||||
# If the lswitch has been deleted, get backup copy of data
|
||||
lswitch = (self._nsx_cache[ls_uuid].get('data') or
|
||||
self._nsx_cache[ls_uuid].get('data_bk'))
|
||||
tags = self._get_tag_dict(lswitch['tags'])
|
||||
neutron_id = tags.get('quantum_net_id')
|
||||
neutron_net_ids.add(neutron_id)
|
||||
neutron_nsx_mappings[neutron_id] = (
|
||||
neutron_nsx_mappings.get(neutron_id, []) +
|
||||
[self._nsx_cache[ls_uuid]])
|
||||
# Fetch neutron networks from database
|
||||
filters = {'router:external': [False]}
|
||||
if not scan_missing:
|
||||
filters['id'] = neutron_net_ids
|
||||
|
||||
networks = model_query.get_collection(
|
||||
ctx, models_v2.Network, self._plugin._make_network_dict,
|
||||
filters=filters)
|
||||
|
||||
for network in networks:
|
||||
lswitches = neutron_nsx_mappings.get(network['id'], [])
|
||||
lswitches = [lsw.get('data') for lsw in lswitches]
|
||||
self.synchronize_network(ctx, network, lswitches)
|
||||
|
||||
def synchronize_router(self, context, neutron_router_data,
|
||||
lrouter=None):
|
||||
"""Synchronize a neutron router with its NSX counterpart."""
|
||||
if not lrouter:
|
||||
# Try to get router from nsx
|
||||
try:
|
||||
# This query will return the logical router status too
|
||||
nsx_router_id = nsx_utils.get_nsx_router_id(
|
||||
context.session, self._cluster, neutron_router_data['id'])
|
||||
if nsx_router_id:
|
||||
lrouter = routerlib.get_lrouter(
|
||||
self._cluster, nsx_router_id)
|
||||
except exceptions.NotFound:
|
||||
# NOTE(salv-orlando): We should be catching
|
||||
# api_exc.ResourceNotFound here
|
||||
# The logical router was not found
|
||||
LOG.warning("Logical router for neutron router %s not "
|
||||
"found on NSX.", neutron_router_data['id'])
|
||||
if lrouter:
|
||||
# Update the cache
|
||||
self._nsx_cache.update_lrouter(lrouter)
|
||||
|
||||
# Note(salv-orlando): It might worth adding a check to verify neutron
|
||||
# resource tag in nsx entity matches a Neutron id.
|
||||
# By default assume things go wrong
|
||||
status = constants.NET_STATUS_ERROR
|
||||
if lrouter:
|
||||
lr_status = (lrouter['_relations']
|
||||
['LogicalRouterStatus']
|
||||
['fabric_status'])
|
||||
status = (lr_status and
|
||||
constants.NET_STATUS_ACTIVE or
|
||||
constants.NET_STATUS_DOWN)
|
||||
# Update db object
|
||||
if status == neutron_router_data['status']:
|
||||
# do nothing
|
||||
return
|
||||
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
try:
|
||||
router = self._plugin._get_router(context,
|
||||
neutron_router_data['id'])
|
||||
except l3_exc.RouterNotFound:
|
||||
pass
|
||||
else:
|
||||
router.status = status
|
||||
LOG.debug("Updating status for neutron resource %(q_id)s to:"
|
||||
" %(status)s",
|
||||
{'q_id': neutron_router_data['id'],
|
||||
'status': status})
|
||||
|
||||
def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False):
|
||||
if not lr_uuids and not scan_missing:
|
||||
return
|
||||
# TODO(salvatore-orlando): Deal with the case the tag
|
||||
# has been tampered with
|
||||
neutron_router_mappings = {}
|
||||
for lr_uuid in lr_uuids:
|
||||
lrouter = (self._nsx_cache[lr_uuid].get('data') or
|
||||
self._nsx_cache[lr_uuid].get('data_bk'))
|
||||
tags = self._get_tag_dict(lrouter['tags'])
|
||||
neutron_router_id = tags.get('q_router_id')
|
||||
if neutron_router_id:
|
||||
neutron_router_mappings[neutron_router_id] = (
|
||||
self._nsx_cache[lr_uuid])
|
||||
else:
|
||||
LOG.warning("Unable to find Neutron router id for "
|
||||
"NSX logical router: %s", lr_uuid)
|
||||
# Fetch neutron routers from database
|
||||
filters = ({} if scan_missing else
|
||||
{'id': neutron_router_mappings.keys()})
|
||||
routers = model_query.get_collection(
|
||||
ctx, l3_db.Router, self._plugin._make_router_dict,
|
||||
filters=filters)
|
||||
for router in routers:
|
||||
lrouter = neutron_router_mappings.get(router['id'])
|
||||
self.synchronize_router(
|
||||
ctx, router, lrouter and lrouter.get('data'))
|
||||
|
||||
def synchronize_port(self, context, neutron_port_data,
|
||||
lswitchport=None, ext_networks=None):
|
||||
"""Synchronize a Neutron port with its NSX counterpart."""
|
||||
# Skip synchronization for ports on external networks
|
||||
if not ext_networks:
|
||||
ext_networks = [net['id'] for net in context.session.query(
|
||||
models_v2.Network).join(
|
||||
external_net_db.ExternalNetwork,
|
||||
(models_v2.Network.id ==
|
||||
external_net_db.ExternalNetwork.network_id))]
|
||||
if neutron_port_data['network_id'] in ext_networks:
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE
|
||||
return
|
||||
|
||||
if not lswitchport:
|
||||
# Try to get port from nsx
|
||||
try:
|
||||
ls_uuid, lp_uuid = nsx_utils.get_nsx_switch_and_port_id(
|
||||
context.session, self._cluster, neutron_port_data['id'])
|
||||
if lp_uuid:
|
||||
lswitchport = switchlib.get_port(
|
||||
self._cluster, ls_uuid, lp_uuid,
|
||||
relations='LogicalPortStatus')
|
||||
except (exceptions.PortNotFoundOnNetwork):
|
||||
# NOTE(salv-orlando): We should be catching
|
||||
# api_exc.ResourceNotFound here instead
|
||||
# of PortNotFoundOnNetwork when the id exists but
|
||||
# the logical switch port was not found
|
||||
LOG.warning("Logical switch port for neutron port %s "
|
||||
"not found on NSX.", neutron_port_data['id'])
|
||||
lswitchport = None
|
||||
else:
|
||||
# If lswitchport is not None, update the cache.
|
||||
# It could be none if the port was deleted from the backend
|
||||
if lswitchport:
|
||||
self._nsx_cache.update_lswitchport(lswitchport)
|
||||
# Note(salv-orlando): It might worth adding a check to verify neutron
|
||||
# resource tag in nsx entity matches Neutron id.
|
||||
# By default assume things go wrong
|
||||
status = constants.PORT_STATUS_ERROR
|
||||
if lswitchport:
|
||||
lp_status = (lswitchport['_relations']
|
||||
['LogicalPortStatus']
|
||||
['fabric_status_up'])
|
||||
status = (lp_status and
|
||||
constants.PORT_STATUS_ACTIVE or
|
||||
constants.PORT_STATUS_DOWN)
|
||||
|
||||
# Update db object
|
||||
if status == neutron_port_data['status']:
|
||||
# do nothing
|
||||
return
|
||||
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
try:
|
||||
port = self._plugin._get_port(context,
|
||||
neutron_port_data['id'])
|
||||
except exceptions.PortNotFound:
|
||||
pass
|
||||
else:
|
||||
port.status = status
|
||||
LOG.debug("Updating status for neutron resource %(q_id)s to:"
|
||||
" %(status)s",
|
||||
{'q_id': neutron_port_data['id'],
|
||||
'status': status})
|
||||
|
||||
def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False):
|
||||
if not lp_uuids and not scan_missing:
|
||||
return
|
||||
# Find Neutron port id by tag - the tag is already
|
||||
# loaded in memory, no reason for doing a db query
|
||||
# TODO(salvatore-orlando): Deal with the case the tag
|
||||
# has been tampered with
|
||||
neutron_port_mappings = {}
|
||||
for lp_uuid in lp_uuids:
|
||||
lport = (self._nsx_cache[lp_uuid].get('data') or
|
||||
self._nsx_cache[lp_uuid].get('data_bk'))
|
||||
tags = self._get_tag_dict(lport['tags'])
|
||||
neutron_port_id = tags.get('q_port_id')
|
||||
if neutron_port_id:
|
||||
neutron_port_mappings[neutron_port_id] = (
|
||||
self._nsx_cache[lp_uuid])
|
||||
# Fetch neutron ports from database
|
||||
# At the first sync we need to fetch all ports
|
||||
filters = ({} if scan_missing else
|
||||
{'id': neutron_port_mappings.keys()})
|
||||
# TODO(salv-orlando): Work out a solution for avoiding
|
||||
# this query
|
||||
ext_nets = [net['id'] for net in ctx.session.query(
|
||||
models_v2.Network).join(
|
||||
external_net_db.ExternalNetwork,
|
||||
(models_v2.Network.id ==
|
||||
external_net_db.ExternalNetwork.network_id))]
|
||||
ports = model_query.get_collection(
|
||||
ctx, models_v2.Port, self._plugin._make_port_dict,
|
||||
filters=filters)
|
||||
for port in ports:
|
||||
lswitchport = neutron_port_mappings.get(port['id'])
|
||||
self.synchronize_port(
|
||||
ctx, port, lswitchport and lswitchport.get('data'),
|
||||
ext_networks=ext_nets)
|
||||
|
||||
def _get_chunk_size(self, sp):
|
||||
# NOTE(salv-orlando): Try to use __future__ for this routine only?
|
||||
ratio = ((float(sp.total_size) / float(sp.chunk_size)) /
|
||||
(float(self._sync_interval) / float(self._req_delay)))
|
||||
new_size = max(1.0, ratio) * float(sp.chunk_size)
|
||||
return int(new_size) + (new_size - int(new_size) > 0)
|
||||
|
||||
def _fetch_data(self, uri, cursor, page_size):
|
||||
# If not cursor there is nothing to retrieve
|
||||
if cursor:
|
||||
if cursor == 'start':
|
||||
cursor = None
|
||||
# Chunk size tuning might, in some conditions, make it larger
|
||||
# than 5,000, which is the maximum page size allowed by the NSX
|
||||
# API. In this case the request should be split in multiple
|
||||
# requests. This is not ideal, and therefore a log warning will
|
||||
# be emitted.
|
||||
num_requests = page_size // (MAX_PAGE_SIZE + 1) + 1
|
||||
if num_requests > 1:
|
||||
LOG.warning("Requested page size is %(cur_chunk_size)d. "
|
||||
"It might be necessary to do %(num_requests)d "
|
||||
"round-trips to NSX for fetching data. Please "
|
||||
"tune sync parameters to ensure chunk size "
|
||||
"is less than %(max_page_size)d",
|
||||
{'cur_chunk_size': page_size,
|
||||
'num_requests': num_requests,
|
||||
'max_page_size': MAX_PAGE_SIZE})
|
||||
# Only the first request might return the total size,
|
||||
# subsequent requests will definitely not
|
||||
results, cursor, total_size = nsxlib.get_single_query_page(
|
||||
uri, self._cluster, cursor,
|
||||
min(page_size, MAX_PAGE_SIZE))
|
||||
for _req in range(num_requests - 1):
|
||||
# If no cursor is returned break the cycle as there is no
|
||||
# actual need to perform multiple requests (all fetched)
|
||||
# This happens when the overall size of resources exceeds
|
||||
# the maximum page size, but the number for each single
|
||||
# resource type is below this threshold
|
||||
if not cursor:
|
||||
break
|
||||
req_results, cursor = nsxlib.get_single_query_page(
|
||||
uri, self._cluster, cursor,
|
||||
min(page_size, MAX_PAGE_SIZE))[:2]
|
||||
results.extend(req_results)
|
||||
# reset cursor before returning if we queried just to
|
||||
# know the number of entities
|
||||
return results, cursor if page_size else 'start', total_size
|
||||
return [], cursor, None
|
||||
|
||||
def _fetch_nsx_data_chunk(self, sp):
|
||||
base_chunk_size = sp.chunk_size
|
||||
chunk_size = base_chunk_size + sp.extra_chunk_size
|
||||
LOG.info("Fetching up to %s resources "
|
||||
"from NSX backend", chunk_size)
|
||||
fetched = ls_count = lr_count = lp_count = 0
|
||||
lswitches = lrouters = lswitchports = []
|
||||
if sp.ls_cursor or sp.ls_cursor == 'start':
|
||||
(lswitches, sp.ls_cursor, ls_count) = self._fetch_data(
|
||||
self.LS_URI, sp.ls_cursor, chunk_size)
|
||||
fetched = len(lswitches)
|
||||
if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start':
|
||||
(lrouters, sp.lr_cursor, lr_count) = self._fetch_data(
|
||||
self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0))
|
||||
fetched += len(lrouters)
|
||||
if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start':
|
||||
(lswitchports, sp.lp_cursor, lp_count) = self._fetch_data(
|
||||
self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0))
|
||||
fetched += len(lswitchports)
|
||||
if sp.current_chunk == 0:
|
||||
# No cursors were provided. Then it must be possible to
|
||||
# calculate the total amount of data to fetch
|
||||
sp.total_size = ls_count + lr_count + lp_count
|
||||
LOG.debug("Total data size: %d", sp.total_size)
|
||||
sp.chunk_size = self._get_chunk_size(sp)
|
||||
# Calculate chunk size adjustment
|
||||
sp.extra_chunk_size = sp.chunk_size - base_chunk_size
|
||||
LOG.debug("Fetched %(num_lswitches)d logical switches, "
|
||||
"%(num_lswitchports)d logical switch ports,"
|
||||
"%(num_lrouters)d logical routers",
|
||||
{'num_lswitches': len(lswitches),
|
||||
'num_lswitchports': len(lswitchports),
|
||||
'num_lrouters': len(lrouters)})
|
||||
return (lswitches, lrouters, lswitchports)
|
||||
|
||||
def _synchronize_state(self, sp):
|
||||
# If the plugin has been destroyed, stop the LoopingCall
|
||||
if not self._plugin:
|
||||
raise loopingcall.LoopingCallDone()
|
||||
start = timeutils.utcnow()
|
||||
# Reset page cursor variables if necessary
|
||||
if sp.current_chunk == 0:
|
||||
sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start'
|
||||
LOG.info("Running state synchronization task. Chunk: %s",
|
||||
sp.current_chunk)
|
||||
# Fetch chunk_size data from NSX
|
||||
try:
|
||||
(lswitches, lrouters, lswitchports) = (
|
||||
self._fetch_nsx_data_chunk(sp))
|
||||
except (api_exc.RequestTimeout, api_exc.NsxApiException):
|
||||
sleep_interval = self._sync_backoff
|
||||
# Cap max back off to 64 seconds
|
||||
self._sync_backoff = min(self._sync_backoff * 2, 64)
|
||||
LOG.exception("An error occurred while communicating with "
|
||||
"NSX backend. Will retry synchronization "
|
||||
"in %d seconds", sleep_interval)
|
||||
return sleep_interval
|
||||
LOG.debug("Time elapsed querying NSX: %s",
|
||||
timeutils.utcnow() - start)
|
||||
if sp.total_size:
|
||||
num_chunks = ((sp.total_size / sp.chunk_size) +
|
||||
(sp.total_size % sp.chunk_size != 0))
|
||||
else:
|
||||
num_chunks = 1
|
||||
LOG.debug("Number of chunks: %d", num_chunks)
|
||||
# Find objects which have changed on NSX side and need
|
||||
# to be synchronized
|
||||
LOG.debug("Processing NSX cache for updated objects")
|
||||
(ls_uuids, lr_uuids, lp_uuids) = self._nsx_cache.process_updates(
|
||||
lswitches, lrouters, lswitchports)
|
||||
# Process removed objects only at the last chunk
|
||||
scan_missing = (sp.current_chunk == num_chunks - 1 and
|
||||
not sp.init_sync_performed)
|
||||
if sp.current_chunk == num_chunks - 1:
|
||||
LOG.debug("Processing NSX cache for deleted objects")
|
||||
self._nsx_cache.process_deletes()
|
||||
ls_uuids = self._nsx_cache.get_lswitches(
|
||||
changed_only=not scan_missing)
|
||||
lr_uuids = self._nsx_cache.get_lrouters(
|
||||
changed_only=not scan_missing)
|
||||
lp_uuids = self._nsx_cache.get_lswitchports(
|
||||
changed_only=not scan_missing)
|
||||
LOG.debug("Time elapsed hashing data: %s",
|
||||
timeutils.utcnow() - start)
|
||||
# Get an admin context
|
||||
ctx = n_context.get_admin_context()
|
||||
# Synchronize with database
|
||||
self._synchronize_lswitches(ctx, ls_uuids,
|
||||
scan_missing=scan_missing)
|
||||
self._synchronize_lrouters(ctx, lr_uuids,
|
||||
scan_missing=scan_missing)
|
||||
self._synchronize_lswitchports(ctx, lp_uuids,
|
||||
scan_missing=scan_missing)
|
||||
# Increase chunk counter
|
||||
LOG.info("Synchronization for chunk %(chunk_num)d of "
|
||||
"%(total_chunks)d performed",
|
||||
{'chunk_num': sp.current_chunk + 1,
|
||||
'total_chunks': num_chunks})
|
||||
sp.current_chunk = (sp.current_chunk + 1) % num_chunks
|
||||
added_delay = 0
|
||||
if sp.current_chunk == 0:
|
||||
# Ensure init_sync_performed is True
|
||||
if not sp.init_sync_performed:
|
||||
sp.init_sync_performed = True
|
||||
# Add additional random delay
|
||||
added_delay = random.randint(0, self._max_rand_delay)
|
||||
LOG.debug("Time elapsed at end of sync: %s",
|
||||
timeutils.utcnow() - start)
|
||||
return self._sync_interval / num_chunks + added_delay
|
@ -1,475 +0,0 @@
|
||||
# Copyright 2013 VMware, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from sqlalchemy.orm import exc as sa_orm_exc
|
||||
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib.db import model_query
|
||||
from neutron_lib.db import utils as db_utils
|
||||
from neutron_lib import exceptions
|
||||
from neutron_lib.plugins import utils
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
|
||||
from vmware_nsx._i18n import _
|
||||
from vmware_nsx.db import nsx_models
|
||||
from vmware_nsx.extensions import networkgw
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
DEVICE_OWNER_NET_GW_INTF = 'network:gateway-interface'
|
||||
NETWORK_ID = 'network_id'
|
||||
SEGMENTATION_TYPE = 'segmentation_type'
|
||||
SEGMENTATION_ID = 'segmentation_id'
|
||||
ALLOWED_CONNECTION_ATTRIBUTES = set((NETWORK_ID,
|
||||
SEGMENTATION_TYPE,
|
||||
SEGMENTATION_ID))
|
||||
# Constants for gateway device operational status
|
||||
STATUS_UNKNOWN = "UNKNOWN"
|
||||
STATUS_ERROR = "ERROR"
|
||||
STATUS_ACTIVE = "ACTIVE"
|
||||
STATUS_DOWN = "DOWN"
|
||||
|
||||
|
||||
class GatewayInUse(exceptions.InUse):
|
||||
message = _("Network Gateway '%(gateway_id)s' still has active mappings "
|
||||
"with one or more neutron networks.")
|
||||
|
||||
|
||||
class GatewayNotFound(exceptions.NotFound):
|
||||
message = _("Network Gateway %(gateway_id)s could not be found")
|
||||
|
||||
|
||||
class GatewayDeviceInUse(exceptions.InUse):
|
||||
message = _("Network Gateway Device '%(device_id)s' is still used by "
|
||||
"one or more network gateways.")
|
||||
|
||||
|
||||
class GatewayDeviceNotFound(exceptions.NotFound):
|
||||
message = _("Network Gateway Device %(device_id)s could not be found.")
|
||||
|
||||
|
||||
class GatewayDevicesNotFound(exceptions.NotFound):
|
||||
message = _("One or more Network Gateway Devices could not be found: "
|
||||
"%(device_ids)s.")
|
||||
|
||||
|
||||
class NetworkGatewayPortInUse(exceptions.InUse):
|
||||
message = _("Port '%(port_id)s' is owned by '%(device_owner)s' and "
|
||||
"therefore cannot be deleted directly via the port API.")
|
||||
|
||||
|
||||
class GatewayConnectionInUse(exceptions.InUse):
|
||||
message = _("The specified mapping '%(mapping)s' is already in use on "
|
||||
"network gateway '%(gateway_id)s'.")
|
||||
|
||||
|
||||
class MultipleGatewayConnections(exceptions.Conflict):
|
||||
message = _("Multiple network connections found on '%(gateway_id)s' "
|
||||
"with provided criteria.")
|
||||
|
||||
|
||||
class GatewayConnectionNotFound(exceptions.NotFound):
|
||||
message = _("The connection %(network_mapping_info)s was not found on the "
|
||||
"network gateway '%(network_gateway_id)s'")
|
||||
|
||||
|
||||
class NetworkGatewayUnchangeable(exceptions.InUse):
|
||||
message = _("The network gateway %(gateway_id)s "
|
||||
"cannot be updated or deleted")
|
||||
|
||||
|
||||
class NetworkGatewayMixin(networkgw.NetworkGatewayPluginBase):
|
||||
|
||||
gateway_resource = networkgw.GATEWAY_RESOURCE_NAME
|
||||
device_resource = networkgw.DEVICE_RESOURCE_NAME
|
||||
|
||||
def _get_network_gateway(self, context, gw_id):
|
||||
try:
|
||||
gw = model_query.get_by_id(context, nsx_models.NetworkGateway,
|
||||
gw_id)
|
||||
except sa_orm_exc.NoResultFound:
|
||||
raise GatewayNotFound(gateway_id=gw_id)
|
||||
return gw
|
||||
|
||||
def _make_gw_connection_dict(self, gw_conn):
|
||||
return {'port_id': gw_conn['port_id'],
|
||||
'segmentation_type': gw_conn['segmentation_type'],
|
||||
'segmentation_id': gw_conn['segmentation_id']}
|
||||
|
||||
def _make_network_gateway_dict(self, network_gateway, fields=None):
|
||||
device_list = []
|
||||
for d in network_gateway['devices']:
|
||||
device_list.append({'id': d['id'],
|
||||
'interface_name': d['interface_name']})
|
||||
res = {'id': network_gateway['id'],
|
||||
'name': network_gateway['name'],
|
||||
'default': network_gateway['default'],
|
||||
'devices': device_list,
|
||||
'tenant_id': network_gateway['tenant_id']}
|
||||
# Query gateway connections only if needed
|
||||
if not fields or 'ports' in fields:
|
||||
res['ports'] = [self._make_gw_connection_dict(conn)
|
||||
for conn in network_gateway.network_connections]
|
||||
return db_utils.resource_fields(res, fields)
|
||||
|
||||
def _set_mapping_info_defaults(self, mapping_info):
|
||||
if not mapping_info.get('segmentation_type'):
|
||||
mapping_info['segmentation_type'] = 'flat'
|
||||
if not mapping_info.get('segmentation_id'):
|
||||
mapping_info['segmentation_id'] = 0
|
||||
|
||||
def _validate_network_mapping_info(self, network_mapping_info):
|
||||
self._set_mapping_info_defaults(network_mapping_info)
|
||||
network_id = network_mapping_info.get(NETWORK_ID)
|
||||
if not network_id:
|
||||
raise exceptions.InvalidInput(
|
||||
error_message=_("A network identifier must be specified "
|
||||
"when connecting a network to a network "
|
||||
"gateway. Unable to complete operation"))
|
||||
connection_attrs = set(network_mapping_info.keys())
|
||||
if not connection_attrs.issubset(ALLOWED_CONNECTION_ATTRIBUTES):
|
||||
raise exceptions.InvalidInput(
|
||||
error_message=(_("Invalid keys found among the ones provided "
|
||||
"in request body: %(connection_attrs)s."),
|
||||
connection_attrs))
|
||||
seg_type = network_mapping_info.get(SEGMENTATION_TYPE)
|
||||
seg_id = network_mapping_info.get(SEGMENTATION_ID)
|
||||
# It is important to validate that the segmentation ID is actually an
|
||||
# integer value
|
||||
try:
|
||||
seg_id = int(seg_id)
|
||||
except ValueError:
|
||||
msg = _("An invalid segmentation ID was specified. The "
|
||||
"segmentation ID must be a positive integer number")
|
||||
raise exceptions.InvalidInput(error_message=msg)
|
||||
# The NSX plugin accepts 0 as a valid vlan tag
|
||||
seg_id_valid = seg_id == 0 or utils.is_valid_vlan_tag(seg_id)
|
||||
if seg_type.lower() == 'flat' and seg_id:
|
||||
msg = _("Cannot specify a segmentation id when "
|
||||
"the segmentation type is flat")
|
||||
raise exceptions.InvalidInput(error_message=msg)
|
||||
elif (seg_type.lower() == 'vlan' and not seg_id_valid):
|
||||
msg = _("Invalid segmentation id (%s) for "
|
||||
"vlan segmentation type") % seg_id
|
||||
raise exceptions.InvalidInput(error_message=msg)
|
||||
return network_id
|
||||
|
||||
def _retrieve_gateway_connections(self, context, gateway_id,
|
||||
mapping_info=None, only_one=False):
|
||||
mapping_info = mapping_info or {}
|
||||
filters = {'network_gateway_id': [gateway_id]}
|
||||
for k, v in six.iteritems(mapping_info):
|
||||
if v and k != NETWORK_ID:
|
||||
filters[k] = [v]
|
||||
query = model_query.get_collection_query(context,
|
||||
nsx_models.NetworkConnection,
|
||||
filters)
|
||||
return query.one() if only_one else query.all()
|
||||
|
||||
def _unset_default_network_gateways(self, context):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
context.session.query(nsx_models.NetworkGateway).update(
|
||||
{nsx_models.NetworkGateway.default: False})
|
||||
|
||||
def _set_default_network_gateway(self, context, gw_id):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
gw = (context.session.query(nsx_models.NetworkGateway).
|
||||
filter_by(id=gw_id).one())
|
||||
gw['default'] = True
|
||||
|
||||
def prevent_network_gateway_port_deletion(self, context, port):
|
||||
"""Pre-deletion check.
|
||||
|
||||
Ensures a port will not be deleted if is being used by a network
|
||||
gateway. In that case an exception will be raised.
|
||||
"""
|
||||
if port['device_owner'] == DEVICE_OWNER_NET_GW_INTF:
|
||||
raise NetworkGatewayPortInUse(port_id=port['id'],
|
||||
device_owner=port['device_owner'])
|
||||
|
||||
def _validate_device_list(self, context, tenant_id, gateway_data):
|
||||
device_query = self._query_gateway_devices(
|
||||
context, filters={'id': [device['id']
|
||||
for device in gateway_data['devices']]})
|
||||
retrieved_device_ids = set()
|
||||
for device in device_query:
|
||||
retrieved_device_ids.add(device['id'])
|
||||
if device['tenant_id'] != tenant_id:
|
||||
raise GatewayDeviceNotFound(device_id=device['id'])
|
||||
missing_device_ids = (
|
||||
set(device['id'] for device in gateway_data['devices']) -
|
||||
retrieved_device_ids)
|
||||
if missing_device_ids:
|
||||
raise GatewayDevicesNotFound(
|
||||
device_ids=",".join(missing_device_ids))
|
||||
|
||||
def create_network_gateway(self, context, network_gateway,
|
||||
validate_device_list=True):
|
||||
gw_data = network_gateway[self.gateway_resource]
|
||||
tenant_id = gw_data['tenant_id']
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
gw_db = nsx_models.NetworkGateway(
|
||||
id=gw_data.get('id', uuidutils.generate_uuid()),
|
||||
tenant_id=tenant_id,
|
||||
name=gw_data.get('name'))
|
||||
# Device list is guaranteed to be a valid list, but some devices
|
||||
# might still either not exist or belong to a different tenant
|
||||
if validate_device_list:
|
||||
self._validate_device_list(context, tenant_id, gw_data)
|
||||
gw_db.devices.extend(
|
||||
[nsx_models.NetworkGatewayDeviceReference(**device)
|
||||
for device in gw_data['devices']])
|
||||
context.session.add(gw_db)
|
||||
LOG.debug("Created network gateway with id:%s", gw_db['id'])
|
||||
return self._make_network_gateway_dict(gw_db)
|
||||
|
||||
def update_network_gateway(self, context, id, network_gateway):
|
||||
gw_data = network_gateway[self.gateway_resource]
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
gw_db = self._get_network_gateway(context, id)
|
||||
if gw_db.default:
|
||||
raise NetworkGatewayUnchangeable(gateway_id=id)
|
||||
# Ensure there is something to update before doing it
|
||||
if any([gw_db[k] != gw_data[k] for k in gw_data]):
|
||||
gw_db.update(gw_data)
|
||||
LOG.debug("Updated network gateway with id:%s", id)
|
||||
return self._make_network_gateway_dict(gw_db)
|
||||
|
||||
def get_network_gateway(self, context, id, fields=None):
|
||||
gw_db = self._get_network_gateway(context, |