Remove deprecated warnings for neutron_lib

neutron_lib should be used instead of the attributes and constants
imports. This patch moves to using neutron_lib. This removes all of
the deprecated warnings (there are still some from neutron and
l2gw - those are addressed in other patches).

Change-Id: I796d749c46a69107a1a484e8774c5d501fc4704f
This commit is contained in:
Gary Kotton 2016-05-09 23:11:55 -07:00
parent 833772c009
commit 0613e7773f
28 changed files with 161 additions and 142 deletions

View File

@ -13,9 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.v2 import attributes as attr
from neutron.extensions import multiprovidernet as mpnet
from neutron.extensions import providernet as pnet
from neutron_lib.api import validators
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from oslo_log import log
import six
@ -281,7 +282,7 @@ def _convert_segments_to_nsx_transport_zones(segments, default_tz_uuid):
for transport_zone in segments:
for value in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID]:
if transport_zone.get(value) == attr.ATTR_NOT_SPECIFIED:
if transport_zone.get(value) == constants.ATTR_NOT_SPECIFIED:
transport_zone[value] = None
transport_entry = {}
@ -308,7 +309,7 @@ def convert_to_nsx_transport_zones(
default_transport_type=None):
# Convert fields from provider request to nsx format
if (network and not attr.is_attr_set(
if (network and not validators.is_attr_set(
network.get(mpnet.SEGMENTS))):
return [{"zone_uuid": default_tz_uuid,
"transport_type": default_transport_type}]

View File

@ -17,8 +17,8 @@ import functools
import hashlib
import eventlet
from neutron.api.v2 import attributes
from neutron import version
from neutron_lib.api import validators
from neutron_lib import exceptions
from oslo_config import cfg
from oslo_context import context as common_context
@ -89,7 +89,7 @@ def device_id_to_vm_id(device_id, obfuscate=False):
def check_and_truncate(display_name):
if (attributes.is_attr_set(display_name) and
if (validators.is_attr_set(display_name) and
len(display_name) > MAX_DISPLAY_NAME_LEN):
LOG.debug("Specified name:'%s' exceeds maximum length. "
"It will be truncated on NSX", display_name)

View File

@ -16,11 +16,11 @@
import sqlalchemy as sa
from sqlalchemy import orm
from neutron.api.v2 import attributes as attr
from neutron.db import db_base_plugin_v2
from neutron.db import model_base
from neutron.db import securitygroups_db
from neutron.extensions import securitygroup as ext_sg
from neutron_lib.api import validators
from neutron_lib import exceptions as nexception
from vmware_nsx._i18n import _
from vmware_nsx.extensions import secgroup_rule_local_ip_prefix as ext_local_ip
@ -54,7 +54,7 @@ class NsxExtendedSecurityGroupRuleProperties(model_base.BASEV2):
class ExtendedSecurityGroupRuleMixin(object):
def _check_local_ip_prefix(self, context, rule):
rule_specify_local_ip_prefix = attr.is_attr_set(
rule_specify_local_ip_prefix = validators.is_attr_set(
rule.get(ext_local_ip.LOCAL_IP_PREFIX))
if rule_specify_local_ip_prefix and rule['direction'] != 'ingress':
@ -68,7 +68,8 @@ class ExtendedSecurityGroupRuleMixin(object):
def _process_security_group_rule_properties(self, context,
rule_res, rule_req):
rule_res[ext_local_ip.LOCAL_IP_PREFIX] = None
if not attr.is_attr_set(rule_req.get(ext_local_ip.LOCAL_IP_PREFIX)):
if not validators.is_attr_set(
rule_req.get(ext_local_ip.LOCAL_IP_PREFIX)):
return
with context.session.begin(subtransactions=True):

View File

@ -14,8 +14,8 @@
from sqlalchemy.orm import exc as sa_orm_exc
from neutron.api.v2 import attributes
from neutron.plugins.common import utils
from neutron_lib import constants
from neutron_lib import exceptions
from oslo_log import log as logging
from oslo_utils import uuidutils
@ -305,7 +305,7 @@ class NetworkGatewayMixin(networkgw.NetworkGatewayPluginBase):
'port':
{'tenant_id': tenant_id,
'network_id': network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'mac_address': constants.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'fixed_ips': [],
'device_id': network_gateway_id,

View File

@ -21,7 +21,6 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from neutron.api.v2 import attributes as attr
from neutron.db import db_base_plugin_v2
from neutron.db import l3_db
from neutron.extensions import external_net
@ -131,7 +130,7 @@ class DhcpAgentNotifyAPI(object):
"device_owner": const.DEVICE_OWNER_DHCP,
"network_id": network_id,
"tenant_id": subnet["tenant_id"],
"mac_address": attr.ATTR_NOT_SPECIFIED,
"mac_address": const.ATTR_NOT_SPECIFIED,
"fixed_ips": [{"subnet_id": subnet['id']}]
}
try:

View File

@ -21,7 +21,6 @@ from oslo_config import cfg
from oslo_log import log as logging
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.db import db_base_plugin_v2
from neutron.db import models_v2
@ -194,7 +193,7 @@ def _create_metadata_access_network(plugin, context, router_id):
'cidr': METADATA_SUBNET_CIDR,
'enable_dhcp': True,
# Ensure default allocation pool is generated
'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
'allocation_pools': const.ATTR_NOT_SPECIFIED,
'gateway_ip': METADATA_GATEWAY_IP,
'dns_nameservers': [],
'host_routes': []}

View File

@ -17,6 +17,9 @@ import re
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron_lib.api import validators
from neutron_lib import constants
from vmware_nsx._i18n import _
DNS_LABEL_MAX_LEN = 63
@ -51,7 +54,7 @@ def _validate_dns_format(data):
def _validate_dns_search_domain(data, max_len=attributes.NAME_MAX_LEN):
msg = attributes._validate_string(data, max_len)
msg = validators.validate_string(data, max_len)
if msg:
return msg
if not data:
@ -61,7 +64,7 @@ def _validate_dns_search_domain(data, max_len=attributes.NAME_MAX_LEN):
return msg
attributes.validators['type:dns_search_domain'] = (_validate_dns_search_domain)
validators.validators['type:dns_search_domain'] = (_validate_dns_search_domain)
DNS_SEARCH_DOMAIN = 'dns_search_domain'
@ -69,7 +72,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
'subnets': {
DNS_SEARCH_DOMAIN: {
'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'default': constants.ATTR_NOT_SPECIFIED,
'validate': {'type:dns_search_domain': attributes.NAME_MAX_LEN},
'is_visible': True},
}

View File

@ -13,15 +13,17 @@
# under the License.
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron_lib.api import converters
from neutron_lib import constants
MAC_LEARNING = 'mac_learning_enabled'
EXTENDED_ATTRIBUTES_2_0 = {
'ports': {
MAC_LEARNING: {'allow_post': True, 'allow_put': True,
'convert_to': attributes.convert_to_boolean,
'default': attributes.ATTR_NOT_SPECIFIED,
'convert_to': converters.convert_to_boolean,
'default': constants.ATTR_NOT_SPECIFIED,
'is_visible': True},
}
}

View File

@ -21,6 +21,9 @@ from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.api.v2 import resource_helper
from neutron_lib.api import validators
from neutron_lib import constants
from vmware_nsx._i18n import _
GATEWAY_RESOURCE_NAME = "network_gateway"
@ -104,12 +107,12 @@ def _validate_device_list(data, valid_values=None):
try:
for device in data:
key_specs = {DEVICE_ID_ATTR:
{'type:regex': attributes.UUID_PATTERN,
{'type:regex': constants.UUID_PATTERN,
'required': True},
IFACE_NAME_ATTR:
{'type:string': None,
'required': False}}
err_msg = attributes._validate_dict(
err_msg = validators.validate_dict(
device, key_specs=key_specs)
if err_msg:
return err_msg
@ -148,8 +151,8 @@ nw_gw_quota_opts = [
cfg.CONF.register_opts(nw_gw_quota_opts, 'QUOTAS')
attributes.validators['type:device_list'] = _validate_device_list
attributes.validators['type:connector_type'] = _validate_connector_type
validators.validators['type:device_list'] = _validate_device_list
validators.validators['type:connector_type'] = _validate_connector_type
class Networkgw(extensions.ExtensionDescriptor):

View File

@ -21,6 +21,7 @@ from neutron.api.v2 import attributes as attr
from neutron.api.v2 import base
from neutron import manager
from neutron_lib.api import converters
from neutron_lib import exceptions as nexception
from vmware_nsx._i18n import _
@ -111,7 +112,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'id': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'default': {'allow_post': True, 'allow_put': False,
'convert_to': attr.convert_to_boolean,
'convert_to': converters.convert_to_boolean,
'is_visible': True, 'default': False},
'name': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': attr.NAME_MAX_LEN},
@ -147,7 +148,8 @@ EXTENDED_ATTRIBUTES_2_0 = {
'is_visible': False,
'default': 1,
'enforce_policy': True,
'convert_to': attr.convert_to_positive_float_or_none},
'convert_to':
converters.convert_to_positive_float_or_none},
QUEUE: {'allow_post': False,
'allow_put': False,

View File

@ -13,7 +13,7 @@
# under the License.
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron_lib import constants
ROUTER_SIZE = 'router_size'
@ -22,7 +22,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
'routers': {
ROUTER_SIZE: {'allow_post': True, 'allow_put': False,
'validate': {'type:values': VALID_EDGE_SIZES},
'default': attributes.ATTR_NOT_SPECIFIED,
'default': constants.ATTR_NOT_SPECIFIED,
'is_visible': True},
}
}

View File

@ -13,7 +13,7 @@
# under the License.
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron_lib import constants
ROUTER_TYPE = 'router_type'
@ -21,7 +21,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
'routers': {
ROUTER_TYPE: {'allow_post': True, 'allow_put': True,
'validate': {'type:values': ['shared', 'exclusive']},
'default': attributes.ATTR_NOT_SPECIFIED,
'default': constants.ATTR_NOT_SPECIFIED,
'is_visible': True},
}
}

View File

@ -13,9 +13,10 @@
# under the License.
from neutron.api import extensions
from neutron.api.v2 import attributes as attr
from neutron.extensions import securitygroup
from neutron_lib import constants
LOCAL_IP_PREFIX = 'local_ip_prefix'
RESOURCE_ATTRIBUTE_MAP = {
@ -24,7 +25,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'allow_post': True,
'allow_put': False,
'convert_to': securitygroup.convert_ip_prefix_to_cidr,
'default': attr.ATTR_NOT_SPECIFIED,
'default': constants.ATTR_NOT_SPECIFIED,
'enforce_policy': True,
'is_visible': True}
}

View File

@ -13,7 +13,7 @@
# under the License.
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron_lib.api import converters
# Attribute Map
VNIC_INDEX = 'vnic_index'
@ -26,7 +26,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
'allow_put': True,
'is_visible': True,
'default': None,
'convert_to': attributes.convert_to_int_if_not_none}}}
'convert_to': converters.convert_to_int_if_not_none}}}
class Vnicindex(extensions.ExtensionDescriptor):

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.v2 import attributes as attr
from neutron_lib.api import validators
from neutron_lib import exceptions as exception
from oslo_log import log
from oslo_serialization import jsonutils
@ -43,7 +43,7 @@ def create_lqueue(cluster, queue_data):
queue_obj = dict(
(nsx_name, queue_data.get(api_name))
for api_name, nsx_name in six.iteritems(params)
if attr.is_attr_set(queue_data.get(api_name))
if validators.is_attr_set(queue_data.get(api_name))
)
if 'display_name' in queue_obj:
queue_obj['display_name'] = utils.check_and_truncate(

View File

@ -19,7 +19,6 @@ from oslo_log import log as logging
from oslo_utils import excutils
from neutron.api import extensions as neutron_extensions
from neutron.api.v2 import attributes as attr
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import db_base_plugin_v2
@ -38,6 +37,7 @@ from neutron.extensions import securitygroup as ext_sg
from neutron.plugins.common import constants
from neutron.plugins.common import utils
from neutron.quota import resource_registry
from neutron_lib.api import validators
from neutron_lib import exceptions as n_exc
import vmware_nsx
@ -183,7 +183,7 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
def _validate_network(self, context, net_data):
network_type = net_data.get(pnet.NETWORK_TYPE)
segmentation_id = net_data.get(pnet.SEGMENTATION_ID)
segmentation_id_set = attr.is_attr_set(segmentation_id)
segmentation_id_set = validators.is_attr_set(segmentation_id)
if not context.is_admin:
err_msg = _("Only and admin can create a DVS provider "
"network")
@ -286,7 +286,7 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
# security group extension checks
if has_ip:
self._ensure_default_security_group_on_port(context, port)
elif attr.is_attr_set(port_data.get(ext_sg.SECURITYGROUPS)):
elif validators.is_attr_set(port_data.get(ext_sg.SECURITYGROUPS)):
raise psec.PortSecurityAndIPRequiredForSecurityGroups()
port_data[ext_sg.SECURITYGROUPS] = (
self._get_security_groups_on_port(context, port))
@ -297,7 +297,7 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
port_data)
# allowed address pair checks
if attr.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)):
if validators.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)):
if not port_security:
raise addr_pair.AddressPairAndPortSecurityRequired()
else:

View File

@ -15,6 +15,7 @@
import uuid
from neutron_lib.api import validators
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from oslo_concurrency import lockutils
@ -766,17 +767,17 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
def _validate_provider_create(self, context, network):
segments = network.get(mpnet.SEGMENTS)
if not attr.is_attr_set(segments):
if not validators.is_attr_set(segments):
return
mpnet.check_duplicate_segments(segments)
for segment in segments:
network_type = segment.get(pnet.NETWORK_TYPE)
physical_network = segment.get(pnet.PHYSICAL_NETWORK)
physical_network_set = attr.is_attr_set(physical_network)
physical_network_set = validators.is_attr_set(physical_network)
segmentation_id = segment.get(pnet.SEGMENTATION_ID)
network_type_set = attr.is_attr_set(network_type)
segmentation_id_set = attr.is_attr_set(segmentation_id)
network_type_set = validators.is_attr_set(network_type)
segmentation_id_set = validators.is_attr_set(segmentation_id)
# If the physical_network_uuid isn't passed in use the default one.
if not physical_network_set:
@ -903,10 +904,10 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
Returns: True if request is multiprovider False if provider
and None if neither.
"""
if any(attr.is_attr_set(network.get(f))
if any(validators.is_attr_set(network.get(f))
for f in (pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID)):
if attr.is_attr_set(network.get(mpnet.SEGMENTS)):
if validators.is_attr_set(network.get(mpnet.SEGMENTS)):
raise mpnet.SegmentsSetInConjunctionWithProviders()
# convert to transport zone list
network[mpnet.SEGMENTS] = [
@ -917,7 +918,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
del network[pnet.PHYSICAL_NETWORK]
del network[pnet.SEGMENTATION_ID]
return False
if attr.is_attr_set(mpnet.SEGMENTS):
if validators.is_attr_set(mpnet.SEGMENTS):
return True
def create_network(self, context, network):
@ -929,7 +930,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self._validate_provider_create(context, net_data)
# Replace ATTR_NOT_SPECIFIED with None before sending to NSX
for key, value in six.iteritems(network['network']):
if value is attr.ATTR_NOT_SPECIFIED:
if value is constants.ATTR_NOT_SPECIFIED:
net_data[key] = None
# FIXME(arosen) implement admin_state_up = False in NSX
if net_data['admin_state_up'] is False:
@ -943,8 +944,8 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# network. This will be removed once the network create operation
# becomes an asynchronous task
net_data['id'] = str(uuid.uuid4())
if (not attr.is_attr_set(external) or
attr.is_attr_set(external) and not external):
if (not validators.is_attr_set(external) or
validators.is_attr_set(external) and not external):
lswitch = switchlib.create_lswitch(
self.cluster, net_data['id'],
tenant_id, net_data.get('name'),
@ -967,8 +968,8 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self._process_network_queue_mapping(
context, new_net, net_queue_id)
# Add mapping between neutron network and NSX switch
if (not attr.is_attr_set(external) or
attr.is_attr_set(external) and not external):
if (not validators.is_attr_set(external) or
validators.is_attr_set(external) and not external):
nsx_db.add_neutron_nsx_network_mapping(
context.session, new_net['id'],
lswitch['uuid'])
@ -977,7 +978,8 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
net_bindings = []
for tz in net_data[mpnet.SEGMENTS]:
segmentation_id = tz.get(pnet.SEGMENTATION_ID, 0)
segmentation_id_set = attr.is_attr_set(segmentation_id)
segmentation_id_set = validators.is_attr_set(
segmentation_id)
if not segmentation_id_set:
segmentation_id = 0
net_bindings.append(nsx_db.add_network_binding(
@ -1116,7 +1118,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self._process_port_port_security_create(
context, port_data, neutron_db)
# allowed address pair checks
if attr.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)):
if validators.is_attr_set(port_data.get(addr_pair.ADDRESS_PAIRS)):
if not port_security:
raise addr_pair.AddressPairAndPortSecurityRequired()
else:
@ -1389,7 +1391,8 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
lrouter = routerlib.create_lrouter(
self.cluster, router['id'],
tenant_id, router['name'], nexthop,
distributed=attr.is_attr_set(distributed) and distributed)
distributed=(validators.is_attr_set(distributed)
and distributed))
except nsx_exc.InvalidVersion:
msg = _("Cannot create a distributed router with the NSX "
"platform currently in execution. Please, try "

View File

@ -15,9 +15,9 @@
from oslo_log import log as logging
from oslo_utils import excutils
from neutron.api.v2 import attributes as attr
from neutron.db import l3_db
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from vmware_nsx._i18n import _, _LE
@ -103,7 +103,7 @@ class RouterDistributedDriver(router_driver.RouterBaseDriver):
is_extract=True)
super(nsx_v.NsxVPluginV2, self.plugin).update_router(
context, router_id, router)
if gw_info != attr.ATTR_NOT_SPECIFIED:
if gw_info != constants.ATTR_NOT_SPECIFIED:
self.plugin._update_router_gw_info(context, router_id, gw_info,
is_routes_update)
elif is_routes_update:

View File

@ -14,8 +14,8 @@
from oslo_log import log as logging
from neutron.api.v2 import attributes as attr
from neutron.plugins.common import constants
from neutron_lib import constants as n_consts
from vmware_nsx._i18n import _
from vmware_nsx.common import exceptions as nsxv_exc
@ -50,7 +50,7 @@ class RouterExclusiveDriver(router_driver.RouterBaseDriver):
is_extract=True)
super(nsx_v.NsxVPluginV2, self.plugin).update_router(
context, router_id, router)
if gw_info != attr.ATTR_NOT_SPECIFIED:
if gw_info != n_consts.ATTR_NOT_SPECIFIED:
self.plugin._update_router_gw_info(context, router_id, gw_info,
is_routes_update)
elif is_routes_update:

View File

@ -15,9 +15,10 @@
import netaddr
from oslo_config import cfg
from neutron.api.v2 import attributes as attr
from neutron.db import l3_db
from neutron.db import models_v2
from neutron_lib.api import validators
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from oslo_log import log as logging
@ -47,7 +48,7 @@ class RouterSharedDriver(router_driver.RouterBaseDriver):
pass
def _validate_no_routes(self, router):
if (attr.is_attr_set(router.get('routes')) and
if (validators.is_attr_set(router.get('routes')) and
len(router['routes']) > 0):
msg = _("Cannot configure static routes on a shared router")
raise n_exc.InvalidInput(error_message=msg)
@ -66,7 +67,7 @@ class RouterSharedDriver(router_driver.RouterBaseDriver):
super(nsx_v.NsxVPluginV2, self.plugin).update_router(
context, router_id, router)
if gw_info != attr.ATTR_NOT_SPECIFIED:
if gw_info != constants.ATTR_NOT_SPECIFIED:
self.plugin._update_router_gw_info(context, router_id, gw_info)
if 'admin_state_up' in r:
# If router was deployed on a different edge then

View File

@ -18,7 +18,6 @@ import hashlib
import hmac
import netaddr
from neutron.api.v2 import attributes as attr
from neutron import context as neutron_context
from neutron_lib import constants
from oslo_config import cfg
@ -129,11 +128,11 @@ class NsxVMetadataProxyHandler:
subnet_data = {'subnet':
{'cidr': cidr,
'name': 'inter-edge-subnet',
'gateway_ip': attr.ATTR_NOT_SPECIFIED,
'allocation_pools': attr.ATTR_NOT_SPECIFIED,
'gateway_ip': constants.ATTR_NOT_SPECIFIED,
'allocation_pools': constants.ATTR_NOT_SPECIFIED,
'ip_version': 4,
'dns_nameservers': attr.ATTR_NOT_SPECIFIED,
'host_routes': attr.ATTR_NOT_SPECIFIED,
'dns_nameservers': constants.ATTR_NOT_SPECIFIED,
'host_routes': constants.ATTR_NOT_SPECIFIED,
'enable_dhcp': False,
'network_id': net['id'],
'tenant_id': tenant_id}}
@ -403,8 +402,8 @@ class NsxVMetadataProxyHandler:
'admin_state_up': True,
'device_id': rtr_id,
'device_owner': constants.DEVICE_OWNER_ROUTER_INTF,
'fixed_ips': attr.ATTR_NOT_SPECIFIED,
'mac_address': attr.ATTR_NOT_SPECIFIED,
'fixed_ips': constants.ATTR_NOT_SPECIFIED,
'mac_address': constants.ATTR_NOT_SPECIFIED,
'port_security_enabled': False,
'tenant_id': None}}
@ -632,8 +631,8 @@ class NsxVMetadataProxyHandler:
'admin_state_up': True,
'device_id': rtr_id,
'device_owner': constants.DEVICE_OWNER_ROUTER_GW,
'fixed_ips': attr.ATTR_NOT_SPECIFIED,
'mac_address': attr.ATTR_NOT_SPECIFIED,
'fixed_ips': constants.ATTR_NOT_SPECIFIED,
'mac_address': constants.ATTR_NOT_SPECIFIED,
'port_security_enabled': False,
'tenant_id': None}}

View File

@ -18,6 +18,7 @@ import six
import uuid
import netaddr
from neutron_lib.api import validators
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
@ -252,13 +253,14 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
def _decide_router_type(self, context, r):
router_type = None
if attr.is_attr_set(r.get("distributed")) and r.get("distributed"):
if (validators.is_attr_set(r.get("distributed")) and
r.get("distributed")):
router_type = "distributed"
if attr.is_attr_set(r.get("router_type")):
if validators.is_attr_set(r.get("router_type")):
err_msg = _('Can not support router_type extension for '
'distributed router')
raise n_exc.InvalidInput(error_message=err_msg)
elif attr.is_attr_set(r.get("router_type")):
elif validators.is_attr_set(r.get("router_type")):
router_type = r.get("router_type")
router_type = self._router_managers.decide_tenant_router_type(
@ -373,7 +375,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
def _validate_network_qos(self, network, backend_network):
err_msg = None
if attr.is_attr_set(network.get(qos_consts.QOS_POLICY_ID)):
if validators.is_attr_set(network.get(qos_consts.QOS_POLICY_ID)):
if not backend_network:
err_msg = (_("Cannot configure QOS on external networks"))
if not cfg.CONF.nsxv.use_dvs_features:
@ -384,16 +386,16 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
raise n_exc.InvalidInput(error_message=err_msg)
def _validate_provider_create(self, context, network):
if not attr.is_attr_set(network.get(mpnet.SEGMENTS)):
if not validators.is_attr_set(network.get(mpnet.SEGMENTS)):
return
for segment in network[mpnet.SEGMENTS]:
network_type = segment.get(pnet.NETWORK_TYPE)
physical_network = segment.get(pnet.PHYSICAL_NETWORK)
segmentation_id = segment.get(pnet.SEGMENTATION_ID)
network_type_set = attr.is_attr_set(network_type)
segmentation_id_set = attr.is_attr_set(segmentation_id)
physical_network_set = attr.is_attr_set(physical_network)
network_type_set = validators.is_attr_set(network_type)
segmentation_id_set = validators.is_attr_set(segmentation_id)
physical_network_set = validators.is_attr_set(physical_network)
err_msg = None
if not network_type_set:
@ -442,15 +444,15 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
external = network.get(ext_net_extn.EXTERNAL)
if segmentation_id_set:
err_msg = _("Segmentation ID cannot be set with portgroup")
physical_net_set = attr.is_attr_set(physical_network)
physical_net_set = validators.is_attr_set(physical_network)
if not physical_net_set:
err_msg = _("Physical network must be set!")
elif not self.nsx_v.vcns.validate_network(physical_network):
err_msg = _("Physical network doesn't exist")
# A provider network portgroup will need the network name to
# match the portgroup name
elif ((not attr.is_attr_set(external) or
attr.is_attr_set(external) and not external) and
elif ((not validators.is_attr_set(external) or
validators.is_attr_set(external) and not external) and
not self.nsx_v.vcns.validate_network_name(
physical_network, network['name'])):
err_msg = _("Portgroup name must match network name")
@ -552,10 +554,10 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
Returns: True if request is multiprovider False if provider
and None if neither.
"""
if any(attr.is_attr_set(network.get(f))
if any(validators.is_attr_set(network.get(f))
for f in (pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID)):
if attr.is_attr_set(network.get(mpnet.SEGMENTS)):
if validators.is_attr_set(network.get(mpnet.SEGMENTS)):
raise mpnet.SegmentsSetInConjunctionWithProviders()
# convert to transport zone list
network[mpnet.SEGMENTS] = [
@ -566,7 +568,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
del network[pnet.PHYSICAL_NETWORK]
del network[pnet.SEGMENTATION_ID]
return False
if attr.is_attr_set(network.get(mpnet.SEGMENTS)):
if validators.is_attr_set(network.get(mpnet.SEGMENTS)):
return True
def _delete_backend_network(self, moref):
@ -621,7 +623,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
dvs-id from nsx.ini file, otherwise convert physical network string
to a list of unique DVS-IDs.
"""
if not attr.is_attr_set(physical_network):
if not validators.is_attr_set(physical_network):
return [self.dvs_id]
# Return unique DVS-IDs only and ignore duplicates
return list(set(
@ -717,8 +719,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
net_data['id'] = str(uuid.uuid4())
external = net_data.get(ext_net_extn.EXTERNAL)
backend_network = (not attr.is_attr_set(external) or
attr.is_attr_set(external) and not external)
backend_network = (not validators.is_attr_set(external) or
validators.is_attr_set(external) and not external)
self._validate_network_qos(net_data, backend_network)
if backend_network:
@ -737,7 +739,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
vdn_scope_id = self.vdn_scope_id
if provider_type is not None:
segment = net_data[mpnet.SEGMENTS][0]
if attr.is_attr_set(segment.get(pnet.PHYSICAL_NETWORK)):
if validators.is_attr_set(
segment.get(pnet.PHYSICAL_NETWORK)):
vdn_scope_id = segment.get(pnet.PHYSICAL_NETWORK)
if not (self.nsx_v.vcns.
validate_vdn_scope(vdn_scope_id)):
@ -802,11 +805,13 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
for tz in net_data[mpnet.SEGMENTS]:
network_type = tz.get(pnet.NETWORK_TYPE)
segmentation_id = tz.get(pnet.SEGMENTATION_ID, 0)
segmentation_id_set = attr.is_attr_set(segmentation_id)
segmentation_id_set = validators.is_attr_set(
segmentation_id)
if not segmentation_id_set:
segmentation_id = 0
physical_network = tz.get(pnet.PHYSICAL_NETWORK, '')
physical_net_set = attr.is_attr_set(physical_network)
physical_net_set = validators.is_attr_set(
physical_network)
if not physical_net_set:
physical_network = self.dvs_id
net_bindings.append(nsxv_db.add_network_binding(
@ -861,7 +866,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
return new_net
def _update_network_qos(self, context, net_data, dvs_net_ids, net_moref):
if attr.is_attr_set(net_data.get(qos_consts.QOS_POLICY_ID)):
if validators.is_attr_set(net_data.get(qos_consts.QOS_POLICY_ID)):
# Translate the QoS rule data into Nsx values
qos_data = qos_utils.NsxVQosRule(
context=context,
@ -1094,7 +1099,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# security group extension checks
if has_ip:
self._ensure_default_security_group_on_port(context, port)
elif attr.is_attr_set(port_data.get(ext_sg.SECURITYGROUPS)):
elif validators.is_attr_set(port_data.get(ext_sg.SECURITYGROUPS)):
raise psec.PortSecurityAndIPRequiredForSecurityGroups()
port_data[ext_sg.SECURITYGROUPS] = (
self._get_security_groups_on_port(context, port))
@ -1163,7 +1168,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# Process update for vnic-index
vnic_idx = port_data.get(ext_vnic_idx.VNIC_INDEX)
# Only set the vnic index for a compute VM
if attr.is_attr_set(vnic_idx) and is_compute_port:
if validators.is_attr_set(vnic_idx) and is_compute_port:
# Update database only if vnic index was changed
if original_port.get(ext_vnic_idx.VNIC_INDEX) != vnic_idx:
self._set_port_vnic_index_mapping(
@ -1256,7 +1261,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
new_ip = self._get_port_fixed_ip_addr(ret_port)
if ((old_ip is not None or new_ip is not None) and
(old_ip != new_ip)):
if attr.is_attr_set(original_port.get('device_id')):
if validators.is_attr_set(original_port.get('device_id')):
router_id = original_port['device_id']
router_driver = self._find_router_driver(context,
router_id)
@ -1276,7 +1281,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# Processing compute port update
vnic_idx = original_port.get(ext_vnic_idx.VNIC_INDEX)
if attr.is_attr_set(vnic_idx) and is_compute_port:
if validators.is_attr_set(vnic_idx) and is_compute_port:
vnic_id = self._get_port_vnic_id(vnic_idx, device_id)
curr_sgids = original_port.get(ext_sg.SECURITYGROUPS)
if ret_port['device_id'] != device_id:
@ -1338,7 +1343,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# If this port is attached to a device, remove the corresponding vnic
# from all NSXv Security-Groups and the spoofguard policy
port_index = neutron_db_port.get(ext_vnic_idx.VNIC_INDEX)
if attr.is_attr_set(port_index):
if validators.is_attr_set(port_index):
vnic_id = self._get_port_vnic_id(port_index,
neutron_db_port['device_id'])
sgids = neutron_db_port.get(ext_sg.SECURITYGROUPS)
@ -1410,7 +1415,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# translate the given subnet to a range object
data = subnet['subnet']
if data['cidr'] not in (attr.ATTR_NOT_SPECIFIED, None):
if data['cidr'] not in (constants.ATTR_NOT_SPECIFIED, None):
range = netaddr.IPNetwork(data['cidr'])
# Check each reserved subnet for intersection
@ -1446,9 +1451,9 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
orig_enable_dhcp=None,
orig_host_routes=None):
s = subnet_input['subnet']
request_host_routes = (attr.is_attr_set(s.get('host_routes')) and
request_host_routes = (validators.is_attr_set(s.get('host_routes')) and
s['host_routes'])
clear_host_routes = (attr.is_attr_set(s.get('host_routes')) and
clear_host_routes = (validators.is_attr_set(s.get('host_routes')) and
not s['host_routes'])
request_enable_dhcp = s.get('enable_dhcp')
if request_enable_dhcp is False:
@ -1484,7 +1489,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
raise n_exc.InvalidInput(error_message=err_msg)
data = subnet['subnet']
if (data.get('ip_version') == 6 or
(data['cidr'] not in (attr.ATTR_NOT_SPECIFIED, None)
(data['cidr'] not in (constants.ATTR_NOT_SPECIFIED, None)
and netaddr.IPNetwork(data['cidr']).version == 6)):
err_msg = _("No support for DHCP for IPv6")
raise n_exc.InvalidInput(error_message=err_msg)
@ -1511,7 +1516,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# Verify if dns search domain for subnet is configured
dns_search_domain = subnet_req.get(
ext_dns_search_domain.DNS_SEARCH_DOMAIN)
if not attr.is_attr_set(dns_search_domain):
if not validators.is_attr_set(dns_search_domain):
return
sub_binding = nsxv_db.get_nsxv_subnet_ext_attributes(
session=session,
@ -1592,7 +1597,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
'fixed_ips': [{'subnet_id': subnet['id']}],
'device_owner': constants.DEVICE_OWNER_DHCP,
'device_id': '',
'mac_address': attr.ATTR_NOT_SPECIFIED
'mac_address': constants.ATTR_NOT_SPECIFIED
}
self.create_port(context, {'port': port_dict})
# First time binding network with dhcp edge
@ -1671,7 +1676,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
'fixed_ips': [{'subnet_id': subnet['id']}],
'device_owner': constants.DEVICE_OWNER_DHCP,
'device_id': '',
'mac_address': attr.ATTR_NOT_SPECIFIED
'mac_address': constants.ATTR_NOT_SPECIFIED
}
self.create_port(context, {'port': port_dict})
@ -1751,7 +1756,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
def _extract_external_gw(self, context, router, is_extract=True):
r = router['router']
gw_info = attr.ATTR_NOT_SPECIFIED
gw_info = constants.ATTR_NOT_SPECIFIED
# First extract the gateway info in case of updating
# gateway before edge is deployed.
if 'external_gateway_info' in r:
@ -1773,7 +1778,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# for an exclusive non-distributed router; else raise a BadRequest
# exception.
r = router['router']
if attr.is_attr_set(r.get(ROUTER_SIZE)):
if validators.is_attr_set(r.get(ROUTER_SIZE)):
if r.get('router_type') == nsxv_constants.SHARED:
msg = _("Cannot specify router-size for shared router")
raise n_exc.BadRequest(resource="router", msg=msg)
@ -1809,7 +1814,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
context, lrouter,
allow_metadata=(allow_metadata and
self.metadata_proxy_handler))
if gw_info != attr.ATTR_NOT_SPECIFIED and gw_info is not None:
if gw_info != constants.ATTR_NOT_SPECIFIED and gw_info is not None:
self._update_router_gw_info(
context, lrouter['id'], gw_info)
except Exception:
@ -1827,7 +1832,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
if len(routes) > 0:
raise n_exc.InvalidInput(error_message=err_msg)
# verify that the updated router does not have static routes
if (attr.is_attr_set(router.get("routes")) and
if (validators.is_attr_set(router.get("routes")) and
len(router['routes']) > 0):
raise n_exc.InvalidInput(error_message=err_msg)

View File

@ -21,7 +21,6 @@ from neutron.api.rpc.callbacks import resources as callbacks_resources
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import resources_rpc
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import exceptions as callback_exc
from neutron.callbacks import registry
@ -57,6 +56,7 @@ from neutron.plugins.common import constants as plugin_const
from neutron.plugins.common import utils as n_utils
from neutron.quota import resource_registry
from neutron.services.qos import qos_consts
from neutron_lib.api import validators
from neutron_lib import constants as const
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
@ -369,22 +369,22 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _validate_provider_create(self, context, network_data):
is_provider_net = any(
attributes.is_attr_set(network_data.get(f))
validators.is_attr_set(network_data.get(f))
for f in (pnet.NETWORK_TYPE,
pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID))
physical_net = network_data.get(pnet.PHYSICAL_NETWORK)
if not attributes.is_attr_set(physical_net):
if not validators.is_attr_set(physical_net):
physical_net = None
vlan_id = network_data.get(pnet.SEGMENTATION_ID)
if not attributes.is_attr_set(vlan_id):
if not validators.is_attr_set(vlan_id):
vlan_id = None
err_msg = None
net_type = network_data.get(pnet.NETWORK_TYPE)
if attributes.is_attr_set(net_type):
if validators.is_attr_set(net_type):
if net_type == utils.NsxV3NetworkTypes.FLAT:
if vlan_id is not None:
err_msg = (_("Segmentation ID cannot be specified with "
@ -456,7 +456,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _validate_external_net_create(self, net_data):
is_provider_net = False
if not attributes.is_attr_set(net_data.get(pnet.PHYSICAL_NETWORK)):
if not validators.is_attr_set(net_data.get(pnet.PHYSICAL_NETWORK)):
tier0_uuid = self._default_tier0_router
else:
tier0_uuid = net_data[pnet.PHYSICAL_NETWORK]
@ -510,7 +510,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _assert_on_external_net_with_qos(self, net_data):
# Prevent creating/update external network with QoS policy
if attributes.is_attr_set(net_data.get(qos_consts.QOS_POLICY_ID)):
if validators.is_attr_set(net_data.get(qos_consts.QOS_POLICY_ID)):
err_msg = _("Cannot configure QOS on networks")
raise n_exc.InvalidInput(error_message=err_msg)
@ -521,7 +521,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
tenant_id = net_data['tenant_id']
self._ensure_default_security_group(context, tenant_id)
if attributes.is_attr_set(external) and external:
if validators.is_attr_set(external) and external:
self._assert_on_external_net_with_qos(net_data)
is_provider_net, net_type, physical_net, vlan_id = (
self._validate_external_net_create(net_data))
@ -773,7 +773,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _get_data_from_binding_profile(self, context, port):
if (pbin.PROFILE not in port or
not attributes.is_attr_set(port[pbin.PROFILE])):
not validators.is_attr_set(port[pbin.PROFILE])):
return None, None
parent_name = (
@ -879,7 +879,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
# Add QoS switching profile, if exists
qos_policy_id = None
if attributes.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID)):
if validators.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID)):
qos_policy_id = port_data[qos_consts.QOS_POLICY_ID]
elif device_owner.startswith(const.DEVICE_OWNER_COMPUTE_PREFIX):
# check if the network of this port has a policy
@ -932,7 +932,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
context, port_data, neutron_db)
# allowed address pair checks
address_pairs = port_data.get(addr_pair.ADDRESS_PAIRS)
if attributes.is_attr_set(address_pairs):
if validators.is_attr_set(address_pairs):
if not port_security:
raise addr_pair.AddressPairAndPortSecurityRequired()
else:
@ -972,7 +972,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _assert_on_external_net_port_with_qos(self, port_data):
# Prevent creating/update port with QoS policy
# on external networks.
if attributes.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID)):
if validators.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID)):
err_msg = _("Unable to update/create a port with an external "
"network and a QoS policy")
LOG.warning(err_msg)
@ -1364,7 +1364,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _extract_external_gw(self, context, router, is_extract=True):
r = router['router']
gw_info = attributes.ATTR_NOT_SPECIFIED
gw_info = const.ATTR_NOT_SPECIFIED
# First extract the gateway info in case of updating
# gateway before edge is deployed.
if 'external_gateway_info' in r:
@ -1528,7 +1528,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
nsx_db.add_neutron_nsx_router_mapping(
context.session, router['id'], result['id'])
if gw_info != attributes.ATTR_NOT_SPECIFIED:
if gw_info != const.ATTR_NOT_SPECIFIED:
try:
self._update_router_gw_info(context, router['id'], gw_info)
except nsx_exc.ManagerError:
@ -1578,7 +1578,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _validate_ext_routes(self, context, router_id, gw_info, new_routes):
ext_net_id = (gw_info['network_id']
if attributes.is_attr_set(gw_info) and gw_info else None)
if validators.is_attr_set(gw_info) and gw_info else None)
if not ext_net_id:
port_filters = {'device_id': [router_id],
'device_owner': [l3_db.DEVICE_OWNER_ROUTER_GW]}

View File

@ -23,7 +23,6 @@ from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import uuidutils
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
@ -31,6 +30,7 @@ from neutron import context
from neutron.extensions import providernet
from neutron import manager
from neutron.plugins.common import utils as n_utils
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from vmware_nsx._i18n import _, _LE, _LI
@ -207,7 +207,7 @@ class NsxV3Driver(l2gateway_db.L2GatewayMixin):
port_dict = {'port': {
'tenant_id': tenant_id,
'network_id': network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'mac_address': constants.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'fixed_ips': [],
'device_id': bridge_endpoint['id'],

View File

@ -15,9 +15,9 @@
# under the License.
from neutron.api.rpc.callbacks import events as callbacks_events
from neutron.api.v2 import attributes
from neutron import context as n_context
from neutron.objects.qos import policy as qos_policy
from neutron_lib.api import validators
from oslo_log import log as logging
from vmware_nsx._i18n import _, _LW
@ -97,7 +97,7 @@ class QosNotificationsHandler(object):
result = nsxlib.create_qos_switching_profile(
tags=tags, name=policy.name,
description=policy.description)
if not result or not attributes.is_attr_set(result.get('id')):
if not result or not validators.is_attr_set(result.get('id')):
msg = _("Unable to create QoS switching profile on the backend")
raise nsx_exc.NsxPluginException(err_msg=msg)
profile_id = result['id']

View File

@ -17,11 +17,11 @@ from oslo_config import cfg
from oslo_db import exception as d_exc
from oslo_utils import uuidutils
from neutron.api.v2 import attributes as attr
from neutron import context as neutron_context
from neutron.db import db_base_plugin_v2
from neutron import manager
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_db_plugin
from neutron_lib.api import validators
from vmware_nsx.db import vnic_index_db
from vmware_nsx.extensions import vnicindex as vnicidx
from vmware_nsx.tests import unit as vmware
@ -43,7 +43,7 @@ class VnicIndexTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
current_port = super(VnicIndexTestPlugin, self).get_port(context, id)
vnic_idx = p.get(vnicidx.VNIC_INDEX)
device_id = current_port['device_id']
if attr.is_attr_set(vnic_idx) and device_id != '':
if validators.is_attr_set(vnic_idx) and device_id != '':
self._set_port_vnic_index_mapping(
context, id, device_id, vnic_idx)
@ -52,7 +52,7 @@ class VnicIndexTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
ret_port = super(VnicIndexTestPlugin, self).update_port(
context, id, port)
vnic_idx = current_port.get(vnicidx.VNIC_INDEX)
if (attr.is_attr_set(vnic_idx) and
if (validators.is_attr_set(vnic_idx) and
device_id != ret_port['device_id']):
self._delete_port_vnic_index_mapping(
context, id)
@ -61,7 +61,7 @@ class VnicIndexTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
def delete_port(self, context, id):
port_db = self.get_port(context, id)
vnic_idx = port_db.get(vnicidx.VNIC_INDEX)
if attr.is_attr_set(vnic_idx):
if validators.is_attr_set(vnic_idx):
self._delete_port_vnic_index_mapping(context, id)
with context.session.begin(subtransactions=True):
super(VnicIndexTestPlugin, self).delete_port(context, id)

View File

@ -24,7 +24,6 @@ from oslo_config import cfg
from oslo_log import log
from oslo_serialization import jsonutils
from neutron.api.v2 import attributes as attr
from neutron import context
from neutron.extensions import l3
from neutron.tests import base
@ -320,11 +319,11 @@ class SyncTestCase(testlib_api.SqlTestCase):
return {'subnet':
{'cidr': '10.10.%s.0/24' % idx,
'name': 'sub-%s' % idx,
'gateway_ip': attr.ATTR_NOT_SPECIFIED,
'allocation_pools': attr.ATTR_NOT_SPECIFIED,
'gateway_ip': constants.ATTR_NOT_SPECIFIED,
'allocation_pools': constants.ATTR_NOT_SPECIFIED,
'ip_version': 4,
'dns_nameservers': attr.ATTR_NOT_SPECIFIED,
'host_routes': attr.ATTR_NOT_SPECIFIED,
'dns_nameservers': constants.ATTR_NOT_SPECIFIED,
'host_routes': constants.ATTR_NOT_SPECIFIED,
'enable_dhcp': True,
'network_id': net_id,
'tenant_id': 'foo'}}
@ -335,8 +334,8 @@ class SyncTestCase(testlib_api.SqlTestCase):
'admin_state_up': True,
'device_id': 'miao',
'device_owner': 'bau',
'fixed_ips': attr.ATTR_NOT_SPECIFIED,
'mac_address': attr.ATTR_NOT_SPECIFIED,
'fixed_ips': constants.ATTR_NOT_SPECIFIED,
'mac_address': constants.ATTR_NOT_SPECIFIED,
'tenant_id': 'foo'}}
def router(idx):

View File

@ -39,6 +39,7 @@ import neutron.tests.unit.extensions.test_l3_ext_gw_mode as test_ext_gw_mode
import neutron.tests.unit.extensions.test_portsecurity as test_psec
import neutron.tests.unit.extensions.test_securitygroup as ext_sg
from neutron.tests.unit import testlib_api
from neutron_lib.api import validators
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
@ -419,7 +420,7 @@ class TestNetworksV2(test_plugin.TestNetworksV2, NsxVPluginV2TestCase):
p = manager.NeutronManager.get_plugin()
# If no DVS-ID is provided as part of physical network, return
# global DVS-ID configured in nsx.ini
physical_network = attributes.ATTR_NOT_SPECIFIED
physical_network = constants.ATTR_NOT_SPECIFIED
self.assertEqual(['fake_dvs_id'], p._get_dvs_ids(physical_network))
# If DVS-IDs are provided as part of physical network as a comma
# separated string, return them as a list of DVS-IDs.
@ -458,7 +459,7 @@ class TestNetworksV2(test_plugin.TestNetworksV2, NsxVPluginV2TestCase):
data = {'network': {
'name': name,
'tenant_id': self._tenant_id,
pnet.SEGMENTATION_ID: attributes.ATTR_NOT_SPECIFIED,
pnet.SEGMENTATION_ID: constants.ATTR_NOT_SPECIFIED,
pnet.NETWORK_TYPE: 'vxlan',
pnet.PHYSICAL_NETWORK: 'vdnscope-2'}}
p = manager.NeutronManager.get_plugin()
@ -1174,7 +1175,7 @@ class TestPortsV2(NsxVPluginV2TestCase,
net_id=net_id,
cidr='2607:f0d0:1002:51::/124',
ip_version=6,
gateway_ip=attributes.ATTR_NOT_SPECIFIED,
gateway_ip=constants.ATTR_NOT_SPECIFIED,
enable_dhcp=False)
subnet2 = self.deserialize(self.fmt, res)
kwargs = {"fixed_ips":
@ -2823,7 +2824,7 @@ class TestExclusiveRouterTestCase(L3NatTest, L3NatTestCaseBase,
fw_rules = mock.call_args[0][3]['firewall_rule_list']
rule_found = False
for fw_rule in fw_rules:
if (attributes.is_attr_set(fw_rule.get("name")) and
if (validators.is_attr_set(fw_rule.get("name")) and
fw_rule['name'] == rule_name):
self.assertEqual(md_srvip, fw_rule)
rule_found = True