API validators and converters

This brings over the validation and convert_to methods from Neutron.
Additional tests were added to improve coverage, and some supporting
methods were added for these.

Renamed validators to remove underscore, so that they can be imported
into neutron/api/v2/attributes.py.

Added devref docs to help developers in creating and using validators
and converters.

Change-Id: I81394dff69b816146e521bcd3e9641761178d6fd
Implements: blueprint neutron-lib
changes/53/259053/7
Paul Michali 7 years ago
parent 42c178cd3d
commit 7d9980f7ca
  1. 86
      doc/source/devref/api_converters.rst
  2. 102
      doc/source/devref/api_validators.rst
  3. 2
      doc/source/devref/index.rst
  4. 0
      neutron_lib/api/__init__.py
  5. 119
      neutron_lib/api/converters.py
  6. 536
      neutron_lib/api/validators.py
  7. 14
      neutron_lib/constants.py
  8. 38
      neutron_lib/tests/tools.py
  9. 0
      neutron_lib/tests/unit/api/__init__.py
  10. 169
      neutron_lib/tests/unit/api/test_conversions.py
  11. 840
      neutron_lib/tests/unit/api/test_validators.py

@ -0,0 +1,86 @@
..
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Convention for heading levels in Neutron devref:
======= Heading 0 (reserved for the title in a document)
------- Heading 1
~~~~~~~ Heading 2
+++++++ Heading 3
''''''' Heading 4
(Avoid deeper levels because they do not render well.)
API Converters
==============
Defintions for REST API attributes, can include conversion methods
to help normalize user input or transform the input into a form that
can be used.
Defining A Converter Method
---------------------------
By convention, the name should start with ``convert_to_``, and will
take a single argument for the data to be converted. The method
should return the converted data (which, if the input is None,
and no conversion is performed, the implicit None returned by the
method may be used). If the conversion is impossible, an
InvalidInput exception should be raised, indicating what is wrong.
For example, here is one that converts a variety of user inputs
to a boolean value.
::
def convert_to_boolean(data):
if isinstance(data, six.string_types):
val = data.lower()
if val == "true" or val == "1":
return True
if val == "false" or val == "0":
return False
elif isinstance(data, bool):
return data
elif isinstance(data, int):
if data == 0:
return False
elif data == 1:
return True
msg = _("'%s' cannot be converted to boolean") % data
raise n_exc.InvalidInput(error_message=msg)
Using Validators
----------------
In client code, the conversion can be used in a REST API
definition, by specifying the name of the method as a value for
the 'convert_to' key on an attribute. For example:
::
'admin_state_up': {'allow_post': True, 'allow_put': True,
'default': True,
'convert_to': conversions.convert_to_boolean,
'is_visible': True},
Here, the admin_state_up is a boolean, so the converter is used to
take user's (string) input and transform it to a boolean.
Test The Validator
------------------
Do the right thing, and make sure you've created a unit test for any
converter that you add to verify that it works as expected.

@ -0,0 +1,102 @@
..
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Convention for heading levels in Neutron devref:
======= Heading 0 (reserved for the title in a document)
------- Heading 1
~~~~~~~ Heading 2
+++++++ Heading 3
''''''' Heading 4
(Avoid deeper levels because they do not render well.)
API Validators
==============
For the REST API, attributes may have custom validators defined. Each
validator will have a method to perform the validation, and a type
definition string, so that the validator can be referenced.
Defining A Validator Method
---------------------------
The validation method will have a positional argument for the data to
be validated, and may have additional (optional) keyword arguments that
can be used during validation. The method must handle any exceptions
and either return None (success) or a i18n string indicating the
validation failure message. By convention, the method name is prefixed
with ``validate_`` and then includes the data type. For example:
::
def validate_uuid(data, valid_values=None):
if not uuidutils.is_uuid_like(data):
msg = _("'%s' is not a valid UUID") % data
LOG.debug(msg)
return msg
There is a validation dictionary that maps the method to a validation
type that can be referred to in REST API definitions. An entry in the
dictionary would look like the following:
::
'type:uuid': validate_uuid,
Using Validators
----------------
In client code, the valdiator can be used in a REST API by using the
dictionary key for the validator. For example:
::
RESOURCE_ATTRIBUTE_MAP = {
NETWORKS: {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True,
'primary_key': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': NAME_MAX_LEN},
'default': '', 'is_visible': True},
Here, the networks resource has an 'id' attribute with a UUID validator,
as seen by the 'validate' key containing a dictionary with a key of
'type:uuid'.
Any addition arguments for the validator can be specified as values for
the dictionary entry (None in this case, NAME_MAX_LEN in the 'name'
attribute that uses a string validator). In a IP version attribute, one
could have a validator defined as follows:
::
'ip_version': {'allow_post': True, 'allow_put': False,
'convert_to': conversions.convert_to_int,
'validate': {'type:values': [4, 6]},
'is_visible': True},
Here, the valdiate_values() method will take the list of values as the
allowable values that can be specified for this attribute.
Test The Validator
------------------
Do the right thing, and make sure you've created a unit test for any
validator that you add to verify that it works as expected, even for
simple validators.

@ -32,6 +32,8 @@ Neutron Lib Internals
.. toctree::
:maxdepth: 3
api_converters
api_validators
callbacks

@ -0,0 +1,119 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from neutron_lib._i18n import _
from neutron_lib import exceptions as n_exc
def convert_to_boolean(data):
if isinstance(data, six.string_types):
val = data.lower()
if val == "true" or val == "1":
return True
if val == "false" or val == "0":
return False
elif isinstance(data, bool):
return data
elif isinstance(data, int):
if data == 0:
return False
elif data == 1:
return True
msg = _("'%s' cannot be converted to boolean") % data
raise n_exc.InvalidInput(error_message=msg)
def convert_to_boolean_if_not_none(data):
if data is not None:
return convert_to_boolean(data)
def convert_to_int(data):
try:
return int(data)
except (ValueError, TypeError):
msg = _("'%s' is not an integer") % data
raise n_exc.InvalidInput(error_message=msg)
def convert_to_int_if_not_none(data):
if data is not None:
return convert_to_int(data)
return data
def convert_to_positive_float_or_none(val):
# NOTE(salv-orlando): This conversion function is currently used by
# a vendor specific extension only at the moment It is used for
# port's RXTX factor in neutron.plugins.vmware.extensions.qos.
# It is deemed however generic enough to be in this module as it
# might be used in future for other API attributes.
if val is None:
return
try:
val = float(val)
if val < 0:
raise ValueError()
except (ValueError, TypeError):
msg = _("'%s' must be a non negative decimal.") % val
raise n_exc.InvalidInput(error_message=msg)
return val
def convert_kvp_str_to_list(data):
"""Convert a value of the form 'key=value' to ['key', 'value'].
:raises: n_exc.InvalidInput if any of the strings are malformed
(e.g. do not contain a key).
"""
kvp = [x.strip() for x in data.split('=', 1)]
if len(kvp) == 2 and kvp[0]:
return kvp
msg = _("'%s' is not of the form <key>=[value]") % data
raise n_exc.InvalidInput(error_message=msg)
def convert_kvp_list_to_dict(kvp_list):
"""Convert a list of 'key=value' strings to a dict.
:raises: n_exc.InvalidInput if any of the strings are malformed
(e.g. do not contain a key) or if any
of the keys appear more than once.
"""
if kvp_list == ['True']:
# No values were provided (i.e. '--flag-name')
return {}
kvp_map = {}
for kvp_str in kvp_list:
key, value = convert_kvp_str_to_list(kvp_str)
kvp_map.setdefault(key, set())
kvp_map[key].add(value)
return dict((x, list(y)) for x, y in six.iteritems(kvp_map))
def convert_none_to_empty_list(value):
return [] if value is None else value
def convert_none_to_empty_dict(value):
return {} if value is None else value
def convert_to_list(data):
if data is None:
return []
elif hasattr(data, '__iter__') and not isinstance(data, six.string_types):
return list(data)
else:
return [data]

@ -0,0 +1,536 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import functools
import netaddr
from oslo_log import log as logging
from oslo_utils import uuidutils
import six
from neutron_lib._i18n import _
from neutron_lib.api import converters
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
LOG = logging.getLogger(__name__)
# Used by range check to indicate no limit for a bound.
UNLIMITED = None
# Note: In order to ensure that the MAC address is unicast the first byte
# must be even.
MAC_PATTERN = "^%s[aceACE02468](:%s{2}){5}$" % (constants.HEX_ELEM,
constants.HEX_ELEM)
def _verify_dict_keys(expected_keys, target_dict, strict=True):
"""Allows to verify keys in a dictionary.
:param expected_keys: A list of keys expected to be present.
:param target_dict: The dictionary which should be verified.
:param strict: Specifies whether additional keys are allowed to be present.
:return: True, if keys in the dictionary correspond to the specification.
"""
if not isinstance(target_dict, dict):
msg = (_("Invalid input. '%(target_dict)s' must be a dictionary "
"with keys: %(expected_keys)s") %
{'target_dict': target_dict, 'expected_keys': expected_keys})
LOG.debug(msg)
return msg
expected_keys = set(expected_keys)
provided_keys = set(target_dict.keys())
predicate = expected_keys.__eq__ if strict else expected_keys.issubset
if not predicate(provided_keys):
msg = (_("Validation of dictionary's keys failed. "
"Expected keys: %(expected_keys)s "
"Provided keys: %(provided_keys)s") %
{'expected_keys': expected_keys,
'provided_keys': provided_keys})
LOG.debug(msg)
return msg
def is_attr_set(attribute):
return not (attribute is None or
attribute is constants.ATTR_NOT_SPECIFIED)
def _validate_list_of_items(item_validator, data, *args, **kwargs):
if not isinstance(data, list):
msg = _("'%s' is not a list") % data
return msg
if len(set(data)) != len(data):
msg = _("Duplicate items in the list: '%s'") % ', '.join(data)
return msg
for item in data:
msg = item_validator(item, *args, **kwargs)
if msg:
return msg
def validate_values(data, valid_values=None):
if data not in valid_values:
msg = (_("'%(data)s' is not in %(valid_values)s") %
{'data': data, 'valid_values': valid_values})
LOG.debug(msg)
return msg
def validate_not_empty_string_or_none(data, max_len=None):
if data is not None:
return validate_not_empty_string(data, max_len=max_len)
def validate_not_empty_string(data, max_len=None):
msg = validate_string(data, max_len=max_len)
if msg:
return msg
if not data.strip():
msg = _("'%s' Blank strings are not permitted") % data
LOG.debug(msg)
return msg
def validate_string_or_none(data, max_len=None):
if data is not None:
return validate_string(data, max_len=max_len)
def validate_string(data, max_len=None):
if not isinstance(data, six.string_types):
msg = _("'%s' is not a valid string") % data
LOG.debug(msg)
return msg
if max_len is not None and len(data) > max_len:
msg = (_("'%(data)s' exceeds maximum length of %(max_len)s") %
{'data': data, 'max_len': max_len})
LOG.debug(msg)
return msg
validate_list_of_unique_strings = functools.partial(_validate_list_of_items,
validate_string)
def validate_boolean(data, valid_values=None):
try:
converters.convert_to_boolean(data)
except n_exc.InvalidInput:
msg = _("'%s' is not a valid boolean value") % data
LOG.debug(msg)
return msg
def validate_range(data, valid_values=None):
"""Check that integer value is within a range provided.
Test is inclusive. Allows either limit to be ignored, to allow
checking ranges where only the lower or upper limit matter.
It is expected that the limits provided are valid integers or
the value None.
"""
min_value = valid_values[0]
max_value = valid_values[1]
try:
data = int(data)
except (ValueError, TypeError):
msg = _("'%s' is not an integer") % data
LOG.debug(msg)
return msg
if min_value is not UNLIMITED and data < min_value:
msg = _("'%(data)s' is too small - must be at least "
"'%(limit)d'") % {'data': data, 'limit': min_value}
LOG.debug(msg)
return msg
if max_value is not UNLIMITED and data > max_value:
msg = _("'%(data)s' is too large - must be no larger than "
"'%(limit)d'") % {'data': data, 'limit': max_value}
LOG.debug(msg)
return msg
def validate_no_whitespace(data):
"""Validates that input has no whitespace."""
if re.search(r'\s', data):
msg = _("'%s' contains whitespace") % data
LOG.debug(msg)
raise n_exc.InvalidInput(error_message=msg)
return data
def validate_mac_address(data, valid_values=None):
try:
valid_mac = netaddr.valid_mac(validate_no_whitespace(data))
except Exception:
valid_mac = False
if valid_mac:
valid_mac = (not netaddr.EUI(data) in
map(netaddr.EUI, constants.INVALID_MAC_ADDRESSES))
# TODO(arosen): The code in this file should be refactored
# so it catches the correct exceptions. validate_no_whitespace
# raises AttributeError if data is None.
if not valid_mac:
msg = _("'%s' is not a valid MAC address") % data
LOG.debug(msg)
return msg
def validate_mac_address_or_none(data, valid_values=None):
if data is not None:
return validate_mac_address(data, valid_values)
def validate_ip_address(data, valid_values=None):
msg = None
try:
# netaddr.core.ZEROFILL is only applicable to IPv4.
# it will remove leading zeros from IPv4 address octets.
ip = netaddr.IPAddress(validate_no_whitespace(data),
flags=netaddr.core.ZEROFILL)
# The followings are quick checks for IPv6 (has ':') and
# IPv4. (has 3 periods like 'xx.xx.xx.xx')
# NOTE(yamamoto): netaddr uses libraries provided by the underlying
# platform to convert addresses. For example, inet_aton(3).
# Some platforms, including NetBSD and OS X, have inet_aton
# implementation which accepts more varying forms of addresses than
# we want to accept here. The following check is to reject such
# addresses. For Example:
# >>> netaddr.IPAddress('1' * 59)
# IPAddress('199.28.113.199')
# >>> netaddr.IPAddress(str(int('1' * 59) & 0xffffffff))
# IPAddress('199.28.113.199')
# >>>
if ':' not in data and data.count('.') != 3:
msg = _("'%s' is not a valid IP address") % data
# A leading '0' in IPv4 address may be interpreted as an octal number,
# e.g. 011 octal is 9 decimal. Since there is no standard saying
# whether IP address with leading '0's should be interpreted as octal
# or decimal, hence we reject leading '0's to avoid ambiguity.
if ip.version == 4 and str(ip) != data:
msg = _("'%(data)s' is not an accepted IP address, "
"'%(ip)s' is recommended") % {"data": data, "ip": ip}
except Exception:
msg = _("'%s' is not a valid IP address") % data
if msg:
LOG.debug(msg)
return msg
def validate_ip_pools(data, valid_values=None):
"""Validate that start and end IP addresses are present.
In addition to this the IP addresses will also be validated
"""
if not isinstance(data, list):
msg = _("Invalid data format for IP pool: '%s'") % data
LOG.debug(msg)
return msg
expected_keys = ['start', 'end']
for ip_pool in data:
msg = _verify_dict_keys(expected_keys, ip_pool)
if msg:
return msg
for k in expected_keys:
msg = validate_ip_address(ip_pool[k])
if msg:
return msg
def validate_fixed_ips(data, valid_values=None):
if not isinstance(data, list):
msg = _("Invalid data format for fixed IP: '%s'") % data
LOG.debug(msg)
return msg
ips = []
for fixed_ip in data:
if not isinstance(fixed_ip, dict):
msg = _("Invalid data format for fixed IP: '%s'") % fixed_ip
LOG.debug(msg)
return msg
if 'ip_address' in fixed_ip:
# Ensure that duplicate entries are not set - just checking IP
# suffices. Duplicate subnet_id's are legitimate.
fixed_ip_address = fixed_ip['ip_address']
if fixed_ip_address in ips:
msg = _("Duplicate IP address '%s'") % fixed_ip_address
LOG.debug(msg)
else:
msg = validate_ip_address(fixed_ip_address)
if msg:
return msg
ips.append(fixed_ip_address)
if 'subnet_id' in fixed_ip:
msg = validate_uuid(fixed_ip['subnet_id'])
if msg:
return msg
def validate_nameservers(data, valid_values=None):
if not hasattr(data, '__iter__'):
msg = _("Invalid data format for nameserver: '%s'") % data
LOG.debug(msg)
return msg
hosts = []
for host in data:
# This must be an IP address only
msg = validate_ip_address(host)
if msg:
msg = _("'%(host)s' is not a valid nameserver. %(msg)s") % {
'host': host, 'msg': msg}
LOG.debug(msg)
return msg
if host in hosts:
msg = _("Duplicate nameserver '%s'") % host
LOG.debug(msg)
return msg
hosts.append(host)
def validate_hostroutes(data, valid_values=None):
if not isinstance(data, list):
msg = _("Invalid data format for hostroute: '%s'") % data
LOG.debug(msg)
return msg
expected_keys = ['destination', 'nexthop']
hostroutes = []
for hostroute in data:
msg = _verify_dict_keys(expected_keys, hostroute)
if msg:
return msg
msg = validate_subnet(hostroute['destination'])
if msg:
return msg
msg = validate_ip_address(hostroute['nexthop'])
if msg:
return msg
if hostroute in hostroutes:
msg = _("Duplicate hostroute '%s'") % hostroute
LOG.debug(msg)
return msg
hostroutes.append(hostroute)
def validate_ip_address_or_none(data, valid_values=None):
if data is not None:
return validate_ip_address(data, valid_values)
def validate_subnet(data, valid_values=None):
msg = None
try:
net = netaddr.IPNetwork(validate_no_whitespace(data))
if '/' not in data or (net.version == 4 and str(net) != data):
msg = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": data,
"cidr": net.cidr}
else:
return
except Exception:
msg = _("'%s' is not a valid IP subnet") % data
if msg:
LOG.debug(msg)
return msg
def validate_subnet_or_none(data, valid_values=None):
if data is not None:
return validate_subnet(data, valid_values)
validate_subnet_list = functools.partial(_validate_list_of_items,
validate_subnet)
def validate_regex(data, valid_values=None):
try:
if re.match(valid_values, data):
return
except TypeError:
pass
msg = _("'%s' is not a valid input") % data
LOG.debug(msg)
return msg
def validate_regex_or_none(data, valid_values=None):
if data is not None:
return validate_regex(data, valid_values)
def validate_subnetpool_id(data, valid_values=None):
if data != constants.IPV6_PD_POOL_ID:
return validate_uuid_or_none(data, valid_values)
def validate_subnetpool_id_or_none(data, valid_values=None):
if data is not None:
return validate_subnetpool_id(data, valid_values)
def validate_uuid(data, valid_values=None):
if not uuidutils.is_uuid_like(data):
msg = _("'%s' is not a valid UUID") % data
LOG.debug(msg)
return msg
def validate_uuid_or_none(data, valid_values=None):
if data is not None:
return validate_uuid(data)
validate_uuid_list = functools.partial(_validate_list_of_items,
validate_uuid)
def _validate_dict_item(key, key_validator, data):
# Find conversion function, if any, and apply it
conv_func = key_validator.get('convert_to')
if conv_func:
data[key] = conv_func(data.get(key))
# Find validator function
# TODO(salv-orlando): Structure of dict attributes should be improved
# to avoid iterating over items
val_func = val_params = None
for (k, v) in six.iteritems(key_validator):
if k.startswith('type:'):
# ask forgiveness, not permission
try:
val_func = validators[k]
except KeyError:
msg = _("Validator '%s' does not exist.") % k
LOG.debug(msg)
return msg
val_params = v
break
# Process validation
if val_func:
return val_func(data.get(key), val_params)
def validate_dict(data, key_specs=None):
if not isinstance(data, dict):
msg = _("'%s' is not a dictionary") % data
LOG.debug(msg)
return msg
# Do not perform any further validation, if no constraints are supplied
if not key_specs:
return
# Check whether all required keys are present
required_keys = [key for key, spec in six.iteritems(key_specs)
if spec.get('required')]
if required_keys:
msg = _verify_dict_keys(required_keys, data, False)
if msg:
return msg
# Perform validation and conversion of all values
# according to the specifications.
for key, key_validator in [(k, v) for k, v in six.iteritems(key_specs)
if k in data]:
msg = _validate_dict_item(key, key_validator, data)
if msg:
return msg
def validate_dict_or_none(data, key_specs=None):
if data is not None:
return validate_dict(data, key_specs)
def validate_dict_or_empty(data, key_specs=None):
if data != {}:
return validate_dict(data, key_specs)
def validate_dict_or_nodata(data, key_specs=None):
if data:
return validate_dict(data, key_specs)
def validate_non_negative(data, valid_values=None):
try:
data = int(data)
except (ValueError, TypeError):
msg = _("'%s' is not an integer") % data
LOG.debug(msg)
return msg
if data < 0:
msg = _("'%s' should be non-negative") % data
LOG.debug(msg)
return msg
# Dictionary that maintains a list of validation functions
validators = {'type:dict': validate_dict,
'type:dict_or_none': validate_dict_or_none,
'type:dict_or_empty': validate_dict_or_empty,
'type:dict_or_nodata': validate_dict_or_nodata,
'type:fixed_ips': validate_fixed_ips,
'type:hostroutes': validate_hostroutes,
'type:ip_address': validate_ip_address,
'type:ip_address_or_none': validate_ip_address_or_none,
'type:ip_pools': validate_ip_pools,
'type:mac_address': validate_mac_address,
'type:mac_address_or_none': validate_mac_address_or_none,
'type:nameservers': validate_nameservers,
'type:non_negative': validate_non_negative,
'type:range': validate_range,
'type:regex': validate_regex,
'type:regex_or_none': validate_regex_or_none,
'type:string': validate_string,
'type:string_or_none': validate_string_or_none,
'type:not_empty_string': validate_not_empty_string,
'type:not_empty_string_or_none':
validate_not_empty_string_or_none,
'type:subnet': validate_subnet,
'type:subnet_list': validate_subnet_list,
'type:subnet_or_none': validate_subnet_or_none,
'type:subnetpool_id': validate_subnetpool_id,
'type:subnetpool_id_or_none': validate_subnetpool_id_or_none,
'type:uuid': validate_uuid,
'type:uuid_or_none': validate_uuid_or_none,
'type:uuid_list': validate_uuid_list,
'type:values': validate_values,
'type:boolean': validate_boolean,
'type:list_of_unique_strings': validate_list_of_unique_strings}
def add_validator(validation_type, validator):
"""Dynamically add a validator.
This can be used by clients to add their own, private validators, rather
than directly modifying the data structure. The clients can NOT modify
existing validators.
"""
key = 'type:' + validation_type
if key in validators:
raise KeyError("Validator type %s is already defined", validation_type)
validators[key] = validator

@ -129,8 +129,22 @@ ICMPV6_ALLOWED_TYPES = [130, 131, 132, 135, 136]
ICMPV6_TYPE_RA = 134
ICMPV6_TYPE_NA = 136
# Human-readable ID to which the subnetpool ID should be set to
# indicate that IPv6 Prefix Delegation is enabled for a given subnetpool
IPV6_PD_POOL_ID = 'prefix_delegation'
# Device names start with "tap"
TAP_DEVICE_PREFIX = 'tap'
# Time format
ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
#############################
# Attribute related constants
#############################
ATTR_NOT_SPECIFIED = object()
HEX_ELEM = '[0-9A-Fa-f]'
UUID_PATTERN = '-'.join([HEX_ELEM + '{8}', HEX_ELEM + '{4}',
HEX_ELEM + '{4}', HEX_ELEM + '{4}',
HEX_ELEM + '{12}'])

@ -0,0 +1,38 @@
# Copyright (c) 2013 NEC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Note: _safe_sort_key came from neutron/common/utils.py. For neutron-lib
# it is only used for testing, so is placed here.
import collections
def _safe_sort_key(value):
"""Return value hash or build one for dictionaries."""
if isinstance(value, collections.Mapping):
return sorted(value.items())
return value
class UnorderedList(list):
"""A list that is equals to any permutation of itself."""
def __eq__(self, other):
if not isinstance(other, list):
return False
return (sorted(self, key=_safe_sort_key) ==
sorted(other, key=_safe_sort_key))
def __neq__(self, other):
return not self == other

@ -0,0 +1,169 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from neutron_lib.api import converters
from neutron_lib import exceptions as n_exc
from neutron_lib.tests import base
from neutron_lib.tests import tools
class TestConvertToBoolean(base.TestCase):
def test_convert_to_boolean_bool(self):
self.assertIs(converters.convert_to_boolean(True), True)
self.assertIs(converters.convert_to_boolean(False), False)
def test_convert_to_boolean_int(self):
self.assertIs(converters.convert_to_boolean(0), False)
self.assertIs(converters.convert_to_boolean(1), True)
self.assertRaises(n_exc.InvalidInput,
converters.convert_to_boolean,
7)
def test_convert_to_boolean_str(self):
self.assertIs(converters.convert_to_boolean('True'), True)
self.assertIs(converters.convert_to_boolean('true'), True)
self.assertIs(converters.convert_to_boolean('False'), False)
self.assertIs(converters.convert_to_boolean('false'), False)
self.assertIs(converters.convert_to_boolean('0'), False)
self.assertIs(converters.convert_to_boolean('1'), True)
self.assertRaises(n_exc.InvalidInput,
converters.convert_to_boolean,
'7')
def test_convert_to_boolean_if_not_none(self):
self.assertIsNone(converters.convert_to_boolean_if_not_none(None))
self.assertIs(converters.convert_to_boolean_if_not_none(1), True)
class TestConvertToInt(base.TestCase):
def test_convert_to_int_int(self):
self.assertEqual(-1, converters.convert_to_int(-1))
self.assertEqual(0, converters.convert_to_int(0))
self.assertEqual(1, converters.convert_to_int(1))
def test_convert_to_int_if_not_none(self):
self.assertEqual(-1, converters.convert_to_int_if_not_none(-1))
self.assertEqual(0, converters.convert_to_int_if_not_none(0))
self.assertEqual(1, converters.convert_to_int_if_not_none(1))
self.assertIsNone(converters.convert_to_int_if_not_none(None))
def test_convert_to_int_str(self):
self.assertEqual(4, converters.convert_to_int('4'))
self.assertEqual(6, converters.convert_to_int('6'))
self.assertRaises(n_exc.InvalidInput,
converters.convert_to_int,
'garbage')
def test_convert_to_int_none(self):
self.assertRaises(n_exc.InvalidInput,
converters.convert_to_int,
None)
def test_convert_none_to_empty_list_none(self):
self.assertEqual([], converters.convert_none_to_empty_list(None))
def test_convert_none_to_empty_dict(self):
self.assertEqual({}, converters.convert_none_to_empty_dict(None))
def test_convert_none_to_empty_list_value(self):
values = ['1', 3, [], [1], {}, {'a': 3}]
for value in values:
self.assertEqual(
value, converters.convert_none_to_empty_list(value))
class TestConvertToFloat(base.TestCase):
# NOTE: the routine being tested here is a plugin-specific extension
# module. As the plugin split proceed towards its second phase this
# test should either be remove, or the validation routine moved into
# neutron.api.v2.attributes
def test_convert_to_float_positve_value(self):
self.assertEqual(
1.111, converters.convert_to_positive_float_or_none(1.111))
self.assertEqual(1, converters.convert_to_positive_float_or_none(1))
self.assertEqual(0, converters.convert_to_positive_float_or_none(0))
def test_convert_to_float_negative_value(self):
self.assertRaises(n_exc.InvalidInput,
converters.convert_to_positive_float_or_none,
-1.11)
def test_convert_to_float_string(self):
self.assertEqual(4, converters.convert_to_positive_float_or_none('4'))
self.assertEqual(
4.44, converters.convert_to_positive_float_or_none('4.44'))
self.assertRaises(n_exc.InvalidInput,
converters.convert_to_positive_float_or_none,
'garbage')
def test_convert_to_float_none_value(self):
self.assertIsNone(converters.convert_to_positive_float_or_none(None))
class TestConvertKvp(base.TestCase):
def test_convert_kvp_list_to_dict_succeeds_for_missing_values(self):
result = converters.convert_kvp_list_to_dict(['True'])
self.assertEqual({}, result)
def test_convert_kvp_list_to_dict_succeeds_for_multiple_values(self):
result = converters.convert_kvp_list_to_dict(
['a=b', 'a=c', 'a=c', 'b=a'])
expected = {'a': tools.UnorderedList(['c', 'b']), 'b': ['a']}
self.assertEqual(expected, result)
def test_convert_kvp_list_to_dict_succeeds_for_values(self):
result = converters.convert_kvp_list_to_dict(['a=b', 'c=d'])
self.assertEqual({'a': ['b'], 'c': ['d']}, result)
def test_convert_kvp_str_to_list_fails_for_missing_key(self):
with testtools.ExpectedException(n_exc.InvalidInput):
converters.convert_kvp_str_to_list('=a')
def test_convert_kvp_str_to_list_fails_for_missing_equals(self):
with testtools.ExpectedException(n_exc.InvalidInput):
converters.convert_kvp_str_to_list('a')
def test_convert_kvp_str_to_list_succeeds_for_one_equals(self):
result = converters.convert_kvp_str_to_list('a=')
self.assertEqual(['a', ''], result)
def test_convert_kvp_str_to_list_succeeds_for_two_equals(self):
result = converters.convert_kvp_str_to_list('a=a=a')
self.assertEqual(['a', 'a=a'], result)
class TestConvertToList(base.TestCase):
def test_convert_to_empty_list(self):
for item in (None, [], (), {}):
self.assertEqual([], converters.convert_to_list(item))
def test_convert_to_list_string(self):
for item in ('', 'foo'):
self.assertEqual([item], converters.convert_to_list(item))
def test_convert_to_list_iterable(self):
for item in ([None], [1, 2, 3], (1, 2, 3), set([1, 2, 3]), ['foo']):
self.assertEqual(list(item), converters.convert_to_list(item))
def test_convert_to_list_non_iterable(self):
for item in (True, False, 1, 1.2, object()):
self.assertEqual([item], converters.convert_to_list(item))

@ -0,0 +1,840 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import string
import mock
import netaddr
from neutron_lib._i18n import _
from neutron_lib.api import converters
from neutron_lib.api import validators
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from neutron_lib.tests import base
def dummy_validator(data, valid_values=None):
pass
class TestAttributeValidation(base.TestCase):
def _construct_dict_and_constraints(self):
"""Constructs a test dictionary and a definition of constraints.
:return: A (dictionary, constraint) tuple
"""
constraints = {'key1': {'type:values': ['val1', 'val2'],
'required': True},
'key2': {'type:string': None,
'required': False},
'key3': {'type:dict': {'k4': {'type:string': None,
'required': True}},
'required': True}}
dictionary = {'key1': 'val1',
'key2': 'a string value',
'key3': {'k4': 'a string value'}}
return dictionary, constraints
def test_adding_validator(self):
validators.add_validator('new_type', dummy_validator)
self.assertIn('type:new_type', validators.validators)
self.assertEqual(dummy_validator,
validators.validators['type:new_type'])
def test_fail_adding_duplicate_validator(self):
self.assertRaises(KeyError,
validators.add_validator,
'dict', dummy_validator)
def test_is_attr_set(self):
data = constants.ATTR_NOT_SPECIFIED
self.assertIs(validators.is_attr_set(data), False)
data = None
self.assertIs(validators.is_attr_set(data), False)
data = "I'm set"
self.assertIs(validators.is_attr_set(data), True)
def test_validate_values(self):
msg = validators.validate_values(4, [4, 6])
self.assertIsNone(msg)
msg = validators.validate_values(4, (4, 6))
self.assertIsNone(msg)
msg = validators.validate_values(7, [4, 6])
self.assertEqual("'7' is not in [4, 6]", msg)
msg = validators.validate_values(7, (4, 6))
self.assertEqual("'7' is not in (4, 6)", msg)
def test_validate_not_empty_string(self):
msg = validators.validate_not_empty_string(' ', None)
self.assertEqual(u"' ' Blank strings are not permitted", msg)
msg = validators.validate_not_empty_string(123, None)
self.assertEqual(u"'123' is not a valid string", msg)
def test_validate_not_empty_string_or_none(self):
msg = validators.validate_not_empty_string_or_none(' ', None)
self.assertEqual(u"' ' Blank strings are not permitted", msg)
msg = validators.validate_not_empty_string_or_none(None, None)
self.assertIsNone(msg)
def test_validate_string_or_none(self):
msg = validators.validate_string_or_none('test', None)
self.assertIsNone(msg)
msg = validators.validate_string_or_none(None, None)
self.assertIsNone(msg)
def test_validate_string(self):
msg = validators.validate_string(None, None)
self.assertEqual("'None' is not a valid string", msg)
# 0 == len(data) == max_len
msg = validators.validate_string("", 0)
self.assertIsNone(msg)
# 0 == len(data) < max_len
msg = validators.validate_string("", 9)
self.assertIsNone(msg)
# 0 < len(data) < max_len
msg = validators.validate_string("123456789", 10)
self.assertIsNone(msg)
# 0 < len(data) == max_len
msg = validators.validate_string("123456789", 9)
self.assertIsNone(msg)
# 0 < max_len < len(data)
msg = validators.validate_string("1234567890", 9)
self.assertEqual("'1234567890' exceeds maximum length of 9", msg)
msg = validators.validate_string("123456789", None)
self.assertIsNone(msg)
def test_validate_list_of_unique_strings(self):
data = "TEST"
msg = validators.validate_list_of_unique_strings(data, None)
self.assertEqual("'TEST' is not a list", msg)
data = ["TEST01", "TEST02", "TEST01"]
msg = validators.validate_list_of_unique_strings(data, None)
self.assertEqual(
"Duplicate items in the list: 'TEST01, TEST02, TEST01'", msg)
data = ["12345678", "123456789"]
msg = validators.validate_list_of_unique_strings(data, 8)
self.assertEqual("'123456789' exceeds maximum length of 8", msg)
data = ["TEST01", "TEST02", "TEST03"]
msg = validators.validate_list_of_unique_strings(data, None)
self.assertIsNone(msg)
def test_validate_boolean(self):
msg = validators.validate_boolean(True)
self.assertIsNone(msg)
msg = validators.validate_boolean(0)
self.assertIsNone(msg)
msg = validators.validate_boolean("false")
self.assertIsNone(msg)
msg = validators.validate_boolean("fasle")
self.assertEqual("'fasle' is not a valid boolean value", msg)
def test_validate_no_whitespace(self):
data = 'no_white_space'
result = validators.validate_no_whitespace(data)
self.assertEqual(data, result)
self.assertRaises(n_exc.InvalidInput,
validators.validate_no_whitespace,
'i have whitespace')
self.assertRaises(n_exc.InvalidInput,
validators.validate_no_whitespace,
'i\thave\twhitespace')
for ws in string.whitespace:
self.assertRaises(n_exc.InvalidInput,
validators.validate_no_whitespace,
'%swhitespace-at-head' % ws)
self.assertRaises(n_exc.InvalidInput,
validators.validate_no_whitespace,
'whitespace-at-tail%s' % ws)
def test_validate_range(self):
msg = validators.validate_range(1, [1, 9])
self.assertIsNone(msg)
msg = validators.validate_range(5, [1, 9])
self.assertIsNone(msg)
msg = validators.validate_range(9, [1, 9])
self.assertIsNone(msg)
msg = validators.validate_range(1, (1, 9))
self.assertIsNone(msg)
msg = validators.validate_range(5, (1, 9))
self.assertIsNone(msg)
msg = validators.validate_range(9, (1, 9))
self.assertIsNone(msg)
msg = validators.validate_range(0, [1, 9])
self.assertEqual("'0' is too small - must be at least '1'", msg)
msg = validators.validate_range(10, (1, 9))
self.assertEqual("'10' is too large - must be no larger than '9'", msg)
msg = validators.validate_range("bogus", (1, 9))
self.assertEqual("'bogus' is not an integer", msg)
msg = validators.validate_range(10, (validators.UNLIMITED,
validators.UNLIMITED))
self.assertIsNone(msg)
msg = validators.validate_range(10, (1, validators.UNLIMITED))
self.assertIsNone(msg)
msg = validators.validate_range(1, (validators.UNLIMITED, 9))
self.assertIsNone(msg)
msg = validators.validate_range(-1, (0, validators.UNLIMITED))
self.assertEqual("'-1' is too small - must be at least '0'", msg)
msg = validators.validate_range(10, (validators.UNLIMITED, 9))
self.assertEqual("'10' is too large - must be no larger than '9'", msg)
def _test_validate_mac_address(self, validator, allow_none=False):
mac_addr = "ff:16:3e:4f:00:00"
msg = validator(mac_addr)
self.assertIsNone(msg)
mac_addr = "ffa:16:3e:4f:00:00"
msg = validator(mac_addr)
err_msg = "'%s' is not a valid MAC address"
self.assertEqual(err_msg % mac_addr, msg)
for invalid_mac_addr in constants.INVALID_MAC_ADDRESSES:
msg = validator(invalid_mac_addr)
self.assertEqual(err_msg % invalid_mac_addr, msg)
mac_addr = "123"
msg = validator(mac_addr)
self.assertEqual(err_msg % mac_addr, msg)
mac_addr = None
msg = validator(mac_addr)
if allow_none:
self.assertIsNone(msg)
else:
self.assertEqual(err_msg % mac_addr, msg)
mac_addr = "ff:16:3e:4f:00:00\r"
msg = validator(mac_addr)
self.assertEqual(err_msg % mac_addr, msg)
def test_validate_mac_address(self):
self._test_validate_mac_address(validators.validate_mac_address)
def test_validate_mac_address_or_none(self):
self._test_validate_mac_address(
validators.validate_mac_address_or_none, allow_none=True)
def test_validate_ip_address(self):
ip_addr = '1.1.1.1'
msg = validators.validate_ip_address(ip_addr)
self.assertIsNone(msg)
ip_addr = '1111.1.1.1'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
# Depending on platform to run UTs, this case might or might not be
# an equivalent to test_validate_ip_address_bsd.
ip_addr = '1' * 59
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
ip_addr = '1.1.1.1 has whitespace'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
ip_addr = '111.1.1.1\twhitespace'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
ip_addr = '111.1.1.1\nwhitespace'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
for ws in string.whitespace:
ip_addr = '%s111.1.1.1' % ws
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
for ws in string.whitespace:
ip_addr = '111.1.1.1%s' % ws
msg = validators.validate_ip_address(ip_addr)
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
def test_validate_ip_address_with_leading_zero(self):
ip_addr = '1.1.1.01'
expected_msg = ("'%(data)s' is not an accepted IP address, "
"'%(ip)s' is recommended")
msg = validators.validate_ip_address(ip_addr)
self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.1'},
msg)
ip_addr = '1.1.1.011'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.11'},
msg)
ip_addr = '1.1.1.09'
msg = validators.validate_ip_address(ip_addr)
self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.9'},
msg)
ip_addr = "fe80:0:0:0:0:0:0:0001"
msg = validators.validate_ip_address(ip_addr)
self.assertIsNone(msg)
def test_validate_ip_address_bsd(self):
# NOTE(yamamoto): On NetBSD and OS X, netaddr.IPAddress() accepts
# '1' * 59 as a valid address. The behaviour is inherited from
# libc behaviour there. This test ensures that our validator reject