Merge "API validators and converters"
This commit is contained in:
commit
30a745250d
86
doc/source/devref/api_converters.rst
Normal file
86
doc/source/devref/api_converters.rst
Normal file
@ -0,0 +1,86 @@
|
||||
..
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
not use this file except in compliance with the License. You may obtain
|
||||
a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
License for the specific language governing permissions and limitations
|
||||
under the License.
|
||||
|
||||
|
||||
Convention for heading levels in Neutron devref:
|
||||
======= Heading 0 (reserved for the title in a document)
|
||||
------- Heading 1
|
||||
~~~~~~~ Heading 2
|
||||
+++++++ Heading 3
|
||||
''''''' Heading 4
|
||||
(Avoid deeper levels because they do not render well.)
|
||||
|
||||
|
||||
API Converters
|
||||
==============
|
||||
|
||||
Defintions for REST API attributes, can include conversion methods
|
||||
to help normalize user input or transform the input into a form that
|
||||
can be used.
|
||||
|
||||
|
||||
Defining A Converter Method
|
||||
---------------------------
|
||||
|
||||
By convention, the name should start with ``convert_to_``, and will
|
||||
take a single argument for the data to be converted. The method
|
||||
should return the converted data (which, if the input is None,
|
||||
and no conversion is performed, the implicit None returned by the
|
||||
method may be used). If the conversion is impossible, an
|
||||
InvalidInput exception should be raised, indicating what is wrong.
|
||||
For example, here is one that converts a variety of user inputs
|
||||
to a boolean value.
|
||||
::
|
||||
|
||||
def convert_to_boolean(data):
|
||||
if isinstance(data, six.string_types):
|
||||
val = data.lower()
|
||||
if val == "true" or val == "1":
|
||||
return True
|
||||
if val == "false" or val == "0":
|
||||
return False
|
||||
elif isinstance(data, bool):
|
||||
return data
|
||||
elif isinstance(data, int):
|
||||
if data == 0:
|
||||
return False
|
||||
elif data == 1:
|
||||
return True
|
||||
msg = _("'%s' cannot be converted to boolean") % data
|
||||
raise n_exc.InvalidInput(error_message=msg)
|
||||
|
||||
|
||||
Using Validators
|
||||
----------------
|
||||
|
||||
In client code, the conversion can be used in a REST API
|
||||
definition, by specifying the name of the method as a value for
|
||||
the 'convert_to' key on an attribute. For example:
|
||||
|
||||
::
|
||||
|
||||
'admin_state_up': {'allow_post': True, 'allow_put': True,
|
||||
'default': True,
|
||||
'convert_to': conversions.convert_to_boolean,
|
||||
'is_visible': True},
|
||||
|
||||
Here, the admin_state_up is a boolean, so the converter is used to
|
||||
take user's (string) input and transform it to a boolean.
|
||||
|
||||
|
||||
Test The Validator
|
||||
------------------
|
||||
|
||||
Do the right thing, and make sure you've created a unit test for any
|
||||
converter that you add to verify that it works as expected.
|
||||
|
102
doc/source/devref/api_validators.rst
Normal file
102
doc/source/devref/api_validators.rst
Normal file
@ -0,0 +1,102 @@
|
||||
..
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
not use this file except in compliance with the License. You may obtain
|
||||
a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
License for the specific language governing permissions and limitations
|
||||
under the License.
|
||||
|
||||
|
||||
Convention for heading levels in Neutron devref:
|
||||
======= Heading 0 (reserved for the title in a document)
|
||||
------- Heading 1
|
||||
~~~~~~~ Heading 2
|
||||
+++++++ Heading 3
|
||||
''''''' Heading 4
|
||||
(Avoid deeper levels because they do not render well.)
|
||||
|
||||
|
||||
API Validators
|
||||
==============
|
||||
|
||||
For the REST API, attributes may have custom validators defined. Each
|
||||
validator will have a method to perform the validation, and a type
|
||||
definition string, so that the validator can be referenced.
|
||||
|
||||
|
||||
Defining A Validator Method
|
||||
---------------------------
|
||||
|
||||
The validation method will have a positional argument for the data to
|
||||
be validated, and may have additional (optional) keyword arguments that
|
||||
can be used during validation. The method must handle any exceptions
|
||||
and either return None (success) or a i18n string indicating the
|
||||
validation failure message. By convention, the method name is prefixed
|
||||
with ``validate_`` and then includes the data type. For example:
|
||||
|
||||
::
|
||||
|
||||
def validate_uuid(data, valid_values=None):
|
||||
if not uuidutils.is_uuid_like(data):
|
||||
msg = _("'%s' is not a valid UUID") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
There is a validation dictionary that maps the method to a validation
|
||||
type that can be referred to in REST API definitions. An entry in the
|
||||
dictionary would look like the following:
|
||||
|
||||
::
|
||||
|
||||
'type:uuid': validate_uuid,
|
||||
|
||||
|
||||
Using Validators
|
||||
----------------
|
||||
|
||||
In client code, the valdiator can be used in a REST API by using the
|
||||
dictionary key for the validator. For example:
|
||||
|
||||
::
|
||||
|
||||
RESOURCE_ATTRIBUTE_MAP = {
|
||||
NETWORKS: {
|
||||
'id': {'allow_post': False, 'allow_put': False,
|
||||
'validate': {'type:uuid': None},
|
||||
'is_visible': True,
|
||||
'primary_key': True},
|
||||
'name': {'allow_post': True, 'allow_put': True,
|
||||
'validate': {'type:string': NAME_MAX_LEN},
|
||||
'default': '', 'is_visible': True},
|
||||
|
||||
Here, the networks resource has an 'id' attribute with a UUID validator,
|
||||
as seen by the 'validate' key containing a dictionary with a key of
|
||||
'type:uuid'.
|
||||
|
||||
Any addition arguments for the validator can be specified as values for
|
||||
the dictionary entry (None in this case, NAME_MAX_LEN in the 'name'
|
||||
attribute that uses a string validator). In a IP version attribute, one
|
||||
could have a validator defined as follows:
|
||||
|
||||
::
|
||||
|
||||
'ip_version': {'allow_post': True, 'allow_put': False,
|
||||
'convert_to': conversions.convert_to_int,
|
||||
'validate': {'type:values': [4, 6]},
|
||||
'is_visible': True},
|
||||
|
||||
Here, the valdiate_values() method will take the list of values as the
|
||||
allowable values that can be specified for this attribute.
|
||||
|
||||
Test The Validator
|
||||
------------------
|
||||
|
||||
Do the right thing, and make sure you've created a unit test for any
|
||||
validator that you add to verify that it works as expected, even for
|
||||
simple validators.
|
||||
|
@ -32,6 +32,8 @@ Neutron Lib Internals
|
||||
.. toctree::
|
||||
:maxdepth: 3
|
||||
|
||||
api_converters
|
||||
api_validators
|
||||
callbacks
|
||||
|
||||
|
||||
|
0
neutron_lib/api/__init__.py
Normal file
0
neutron_lib/api/__init__.py
Normal file
119
neutron_lib/api/converters.py
Normal file
119
neutron_lib/api/converters.py
Normal file
@ -0,0 +1,119 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
from neutron_lib import exceptions as n_exc
|
||||
|
||||
|
||||
def convert_to_boolean(data):
|
||||
if isinstance(data, six.string_types):
|
||||
val = data.lower()
|
||||
if val == "true" or val == "1":
|
||||
return True
|
||||
if val == "false" or val == "0":
|
||||
return False
|
||||
elif isinstance(data, bool):
|
||||
return data
|
||||
elif isinstance(data, int):
|
||||
if data == 0:
|
||||
return False
|
||||
elif data == 1:
|
||||
return True
|
||||
msg = _("'%s' cannot be converted to boolean") % data
|
||||
raise n_exc.InvalidInput(error_message=msg)
|
||||
|
||||
|
||||
def convert_to_boolean_if_not_none(data):
|
||||
if data is not None:
|
||||
return convert_to_boolean(data)
|
||||
|
||||
|
||||
def convert_to_int(data):
|
||||
try:
|
||||
return int(data)
|
||||
except (ValueError, TypeError):
|
||||
msg = _("'%s' is not an integer") % data
|
||||
raise n_exc.InvalidInput(error_message=msg)
|
||||
|
||||
|
||||
def convert_to_int_if_not_none(data):
|
||||
if data is not None:
|
||||
return convert_to_int(data)
|
||||
return data
|
||||
|
||||
|
||||
def convert_to_positive_float_or_none(val):
|
||||
# NOTE(salv-orlando): This conversion function is currently used by
|
||||
# a vendor specific extension only at the moment It is used for
|
||||
# port's RXTX factor in neutron.plugins.vmware.extensions.qos.
|
||||
# It is deemed however generic enough to be in this module as it
|
||||
# might be used in future for other API attributes.
|
||||
if val is None:
|
||||
return
|
||||
try:
|
||||
val = float(val)
|
||||
if val < 0:
|
||||
raise ValueError()
|
||||
except (ValueError, TypeError):
|
||||
msg = _("'%s' must be a non negative decimal.") % val
|
||||
raise n_exc.InvalidInput(error_message=msg)
|
||||
return val
|
||||
|
||||
|
||||
def convert_kvp_str_to_list(data):
|
||||
"""Convert a value of the form 'key=value' to ['key', 'value'].
|
||||
|
||||
:raises: n_exc.InvalidInput if any of the strings are malformed
|
||||
(e.g. do not contain a key).
|
||||
"""
|
||||
kvp = [x.strip() for x in data.split('=', 1)]
|
||||
if len(kvp) == 2 and kvp[0]:
|
||||
return kvp
|
||||
msg = _("'%s' is not of the form <key>=[value]") % data
|
||||
raise n_exc.InvalidInput(error_message=msg)
|
||||
|
||||
|
||||
def convert_kvp_list_to_dict(kvp_list):
|
||||
"""Convert a list of 'key=value' strings to a dict.
|
||||
|
||||
:raises: n_exc.InvalidInput if any of the strings are malformed
|
||||
(e.g. do not contain a key) or if any
|
||||
of the keys appear more than once.
|
||||
"""
|
||||
if kvp_list == ['True']:
|
||||
# No values were provided (i.e. '--flag-name')
|
||||
return {}
|
||||
kvp_map = {}
|
||||
for kvp_str in kvp_list:
|
||||
key, value = convert_kvp_str_to_list(kvp_str)
|
||||
kvp_map.setdefault(key, set())
|
||||
kvp_map[key].add(value)
|
||||
return dict((x, list(y)) for x, y in six.iteritems(kvp_map))
|
||||
|
||||
|
||||
def convert_none_to_empty_list(value):
|
||||
return [] if value is None else value
|
||||
|
||||
|
||||
def convert_none_to_empty_dict(value):
|
||||
return {} if value is None else value
|
||||
|
||||
|
||||
def convert_to_list(data):
|
||||
if data is None:
|
||||
return []
|
||||
elif hasattr(data, '__iter__') and not isinstance(data, six.string_types):
|
||||
return list(data)
|
||||
else:
|
||||
return [data]
|
536
neutron_lib/api/validators.py
Normal file
536
neutron_lib/api/validators.py
Normal file
@ -0,0 +1,536 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import re
|
||||
|
||||
import functools
|
||||
import netaddr
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
from neutron_lib.api import converters
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import exceptions as n_exc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# Used by range check to indicate no limit for a bound.
|
||||
UNLIMITED = None
|
||||
|
||||
# Note: In order to ensure that the MAC address is unicast the first byte
|
||||
# must be even.
|
||||
MAC_PATTERN = "^%s[aceACE02468](:%s{2}){5}$" % (constants.HEX_ELEM,
|
||||
constants.HEX_ELEM)
|
||||
|
||||
|
||||
def _verify_dict_keys(expected_keys, target_dict, strict=True):
|
||||
"""Allows to verify keys in a dictionary.
|
||||
|
||||
:param expected_keys: A list of keys expected to be present.
|
||||
:param target_dict: The dictionary which should be verified.
|
||||
:param strict: Specifies whether additional keys are allowed to be present.
|
||||
:return: True, if keys in the dictionary correspond to the specification.
|
||||
"""
|
||||
if not isinstance(target_dict, dict):
|
||||
msg = (_("Invalid input. '%(target_dict)s' must be a dictionary "
|
||||
"with keys: %(expected_keys)s") %
|
||||
{'target_dict': target_dict, 'expected_keys': expected_keys})
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
expected_keys = set(expected_keys)
|
||||
provided_keys = set(target_dict.keys())
|
||||
|
||||
predicate = expected_keys.__eq__ if strict else expected_keys.issubset
|
||||
|
||||
if not predicate(provided_keys):
|
||||
msg = (_("Validation of dictionary's keys failed. "
|
||||
"Expected keys: %(expected_keys)s "
|
||||
"Provided keys: %(provided_keys)s") %
|
||||
{'expected_keys': expected_keys,
|
||||
'provided_keys': provided_keys})
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def is_attr_set(attribute):
|
||||
return not (attribute is None or
|
||||
attribute is constants.ATTR_NOT_SPECIFIED)
|
||||
|
||||
|
||||
def _validate_list_of_items(item_validator, data, *args, **kwargs):
|
||||
if not isinstance(data, list):
|
||||
msg = _("'%s' is not a list") % data
|
||||
return msg
|
||||
|
||||
if len(set(data)) != len(data):
|
||||
msg = _("Duplicate items in the list: '%s'") % ', '.join(data)
|
||||
return msg
|
||||
|
||||
for item in data:
|
||||
msg = item_validator(item, *args, **kwargs)
|
||||
if msg:
|
||||
return msg
|
||||
|
||||
|
||||
def validate_values(data, valid_values=None):
|
||||
if data not in valid_values:
|
||||
msg = (_("'%(data)s' is not in %(valid_values)s") %
|
||||
{'data': data, 'valid_values': valid_values})
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def validate_not_empty_string_or_none(data, max_len=None):
|
||||
if data is not None:
|
||||
return validate_not_empty_string(data, max_len=max_len)
|
||||
|
||||
|
||||
def validate_not_empty_string(data, max_len=None):
|
||||
msg = validate_string(data, max_len=max_len)
|
||||
if msg:
|
||||
return msg
|
||||
if not data.strip():
|
||||
msg = _("'%s' Blank strings are not permitted") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def validate_string_or_none(data, max_len=None):
|
||||
if data is not None:
|
||||
return validate_string(data, max_len=max_len)
|
||||
|
||||
|
||||
def validate_string(data, max_len=None):
|
||||
if not isinstance(data, six.string_types):
|
||||
msg = _("'%s' is not a valid string") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
if max_len is not None and len(data) > max_len:
|
||||
msg = (_("'%(data)s' exceeds maximum length of %(max_len)s") %
|
||||
{'data': data, 'max_len': max_len})
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
validate_list_of_unique_strings = functools.partial(_validate_list_of_items,
|
||||
validate_string)
|
||||
|
||||
|
||||
def validate_boolean(data, valid_values=None):
|
||||
try:
|
||||
converters.convert_to_boolean(data)
|
||||
except n_exc.InvalidInput:
|
||||
msg = _("'%s' is not a valid boolean value") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def validate_range(data, valid_values=None):
|
||||
"""Check that integer value is within a range provided.
|
||||
|
||||
Test is inclusive. Allows either limit to be ignored, to allow
|
||||
checking ranges where only the lower or upper limit matter.
|
||||
It is expected that the limits provided are valid integers or
|
||||
the value None.
|
||||
"""
|
||||
|
||||
min_value = valid_values[0]
|
||||
max_value = valid_values[1]
|
||||
try:
|
||||
data = int(data)
|
||||
except (ValueError, TypeError):
|
||||
msg = _("'%s' is not an integer") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
if min_value is not UNLIMITED and data < min_value:
|
||||
msg = _("'%(data)s' is too small - must be at least "
|
||||
"'%(limit)d'") % {'data': data, 'limit': min_value}
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
if max_value is not UNLIMITED and data > max_value:
|
||||
msg = _("'%(data)s' is too large - must be no larger than "
|
||||
"'%(limit)d'") % {'data': data, 'limit': max_value}
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def validate_no_whitespace(data):
|
||||
"""Validates that input has no whitespace."""
|
||||
if re.search(r'\s', data):
|
||||
msg = _("'%s' contains whitespace") % data
|
||||
LOG.debug(msg)
|
||||
raise n_exc.InvalidInput(error_message=msg)
|
||||
return data
|
||||
|
||||
|
||||
def validate_mac_address(data, valid_values=None):
|
||||
try:
|
||||
valid_mac = netaddr.valid_mac(validate_no_whitespace(data))
|
||||
except Exception:
|
||||
valid_mac = False
|
||||
|
||||
if valid_mac:
|
||||
valid_mac = (not netaddr.EUI(data) in
|
||||
map(netaddr.EUI, constants.INVALID_MAC_ADDRESSES))
|
||||
# TODO(arosen): The code in this file should be refactored
|
||||
# so it catches the correct exceptions. validate_no_whitespace
|
||||
# raises AttributeError if data is None.
|
||||
if not valid_mac:
|
||||
msg = _("'%s' is not a valid MAC address") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def validate_mac_address_or_none(data, valid_values=None):
|
||||
if data is not None:
|
||||
return validate_mac_address(data, valid_values)
|
||||
|
||||
|
||||
def validate_ip_address(data, valid_values=None):
|
||||
msg = None
|
||||
try:
|
||||
# netaddr.core.ZEROFILL is only applicable to IPv4.
|
||||
# it will remove leading zeros from IPv4 address octets.
|
||||
ip = netaddr.IPAddress(validate_no_whitespace(data),
|
||||
flags=netaddr.core.ZEROFILL)
|
||||
# The followings are quick checks for IPv6 (has ':') and
|
||||
# IPv4. (has 3 periods like 'xx.xx.xx.xx')
|
||||
# NOTE(yamamoto): netaddr uses libraries provided by the underlying
|
||||
# platform to convert addresses. For example, inet_aton(3).
|
||||
# Some platforms, including NetBSD and OS X, have inet_aton
|
||||
# implementation which accepts more varying forms of addresses than
|
||||
# we want to accept here. The following check is to reject such
|
||||
# addresses. For Example:
|
||||
# >>> netaddr.IPAddress('1' * 59)
|
||||
# IPAddress('199.28.113.199')
|
||||
# >>> netaddr.IPAddress(str(int('1' * 59) & 0xffffffff))
|
||||
# IPAddress('199.28.113.199')
|
||||
# >>>
|
||||
if ':' not in data and data.count('.') != 3:
|
||||
msg = _("'%s' is not a valid IP address") % data
|
||||
# A leading '0' in IPv4 address may be interpreted as an octal number,
|
||||
# e.g. 011 octal is 9 decimal. Since there is no standard saying
|
||||
# whether IP address with leading '0's should be interpreted as octal
|
||||
# or decimal, hence we reject leading '0's to avoid ambiguity.
|
||||
if ip.version == 4 and str(ip) != data:
|
||||
msg = _("'%(data)s' is not an accepted IP address, "
|
||||
"'%(ip)s' is recommended") % {"data": data, "ip": ip}
|
||||
except Exception:
|
||||
msg = _("'%s' is not a valid IP address") % data
|
||||
if msg:
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def validate_ip_pools(data, valid_values=None):
|
||||
"""Validate that start and end IP addresses are present.
|
||||
|
||||
In addition to this the IP addresses will also be validated
|
||||
"""
|
||||
if not isinstance(data, list):
|
||||
msg = _("Invalid data format for IP pool: '%s'") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
expected_keys = ['start', 'end']
|
||||
for ip_pool in data:
|
||||
msg = _verify_dict_keys(expected_keys, ip_pool)
|
||||
if msg:
|
||||
return msg
|
||||
for k in expected_keys:
|
||||
msg = validate_ip_address(ip_pool[k])
|
||||
if msg:
|
||||
return msg
|
||||
|
||||
|
||||
def validate_fixed_ips(data, valid_values=None):
|
||||
if not isinstance(data, list):
|
||||
msg = _("Invalid data format for fixed IP: '%s'") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
ips = []
|
||||
for fixed_ip in data:
|
||||
if not isinstance(fixed_ip, dict):
|
||||
msg = _("Invalid data format for fixed IP: '%s'") % fixed_ip
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
if 'ip_address' in fixed_ip:
|
||||
# Ensure that duplicate entries are not set - just checking IP
|
||||
# suffices. Duplicate subnet_id's are legitimate.
|
||||
fixed_ip_address = fixed_ip['ip_address']
|
||||
if fixed_ip_address in ips:
|
||||
msg = _("Duplicate IP address '%s'") % fixed_ip_address
|
||||
LOG.debug(msg)
|
||||
else:
|
||||
msg = validate_ip_address(fixed_ip_address)
|
||||
if msg:
|
||||
return msg
|
||||
ips.append(fixed_ip_address)
|
||||
if 'subnet_id' in fixed_ip:
|
||||
msg = validate_uuid(fixed_ip['subnet_id'])
|
||||
if msg:
|
||||
return msg
|
||||
|
||||
|
||||
def validate_nameservers(data, valid_values=None):
|
||||
if not hasattr(data, '__iter__'):
|
||||
msg = _("Invalid data format for nameserver: '%s'") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
hosts = []
|
||||
for host in data:
|
||||
# This must be an IP address only
|
||||
msg = validate_ip_address(host)
|
||||
if msg:
|
||||
msg = _("'%(host)s' is not a valid nameserver. %(msg)s") % {
|
||||
'host': host, 'msg': msg}
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
if host in hosts:
|
||||
msg = _("Duplicate nameserver '%s'") % host
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
hosts.append(host)
|
||||
|
||||
|
||||
def validate_hostroutes(data, valid_values=None):
|
||||
if not isinstance(data, list):
|
||||
msg = _("Invalid data format for hostroute: '%s'") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
expected_keys = ['destination', 'nexthop']
|
||||
hostroutes = []
|
||||
for hostroute in data:
|
||||
msg = _verify_dict_keys(expected_keys, hostroute)
|
||||
if msg:
|
||||
return msg
|
||||
msg = validate_subnet(hostroute['destination'])
|
||||
if msg:
|
||||
return msg
|
||||
msg = validate_ip_address(hostroute['nexthop'])
|
||||
if msg:
|
||||
return msg
|
||||
if hostroute in hostroutes:
|
||||
msg = _("Duplicate hostroute '%s'") % hostroute
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
hostroutes.append(hostroute)
|
||||
|
||||
|
||||
def validate_ip_address_or_none(data, valid_values=None):
|
||||
if data is not None:
|
||||
return validate_ip_address(data, valid_values)
|
||||
|
||||
|
||||
def validate_subnet(data, valid_values=None):
|
||||
msg = None
|
||||
try:
|
||||
net = netaddr.IPNetwork(validate_no_whitespace(data))
|
||||
if '/' not in data or (net.version == 4 and str(net) != data):
|
||||
msg = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||
" '%(cidr)s' is recommended") % {"data": data,
|
||||
"cidr": net.cidr}
|
||||
else:
|
||||
return
|
||||
except Exception:
|
||||
msg = _("'%s' is not a valid IP subnet") % data
|
||||
if msg:
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def validate_subnet_or_none(data, valid_values=None):
|
||||
if data is not None:
|
||||
return validate_subnet(data, valid_values)
|
||||
|
||||
|
||||
validate_subnet_list = functools.partial(_validate_list_of_items,
|
||||
validate_subnet)
|
||||
|
||||
|
||||
def validate_regex(data, valid_values=None):
|
||||
try:
|
||||
if re.match(valid_values, data):
|
||||
return
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
msg = _("'%s' is not a valid input") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def validate_regex_or_none(data, valid_values=None):
|
||||
if data is not None:
|
||||
return validate_regex(data, valid_values)
|
||||
|
||||
|
||||
def validate_subnetpool_id(data, valid_values=None):
|
||||
if data != constants.IPV6_PD_POOL_ID:
|
||||
return validate_uuid_or_none(data, valid_values)
|
||||
|
||||
|
||||
def validate_subnetpool_id_or_none(data, valid_values=None):
|
||||
if data is not None:
|
||||
return validate_subnetpool_id(data, valid_values)
|
||||
|
||||
|
||||
def validate_uuid(data, valid_values=None):
|
||||
if not uuidutils.is_uuid_like(data):
|
||||
msg = _("'%s' is not a valid UUID") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def validate_uuid_or_none(data, valid_values=None):
|
||||
if data is not None:
|
||||
return validate_uuid(data)
|
||||
|
||||
|
||||
validate_uuid_list = functools.partial(_validate_list_of_items,
|
||||
validate_uuid)
|
||||
|
||||
|
||||
def _validate_dict_item(key, key_validator, data):
|
||||
# Find conversion function, if any, and apply it
|
||||
conv_func = key_validator.get('convert_to')
|
||||
if conv_func:
|
||||
data[key] = conv_func(data.get(key))
|
||||
# Find validator function
|
||||
# TODO(salv-orlando): Structure of dict attributes should be improved
|
||||
# to avoid iterating over items
|
||||
val_func = val_params = None
|
||||
for (k, v) in six.iteritems(key_validator):
|
||||
if k.startswith('type:'):
|
||||
# ask forgiveness, not permission
|
||||
try:
|
||||
val_func = validators[k]
|
||||
except KeyError:
|
||||
msg = _("Validator '%s' does not exist.") % k
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
val_params = v
|
||||
break
|
||||
# Process validation
|
||||
if val_func:
|
||||
return val_func(data.get(key), val_params)
|
||||
|
||||
|
||||
def validate_dict(data, key_specs=None):
|
||||
if not isinstance(data, dict):
|
||||
msg = _("'%s' is not a dictionary") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
# Do not perform any further validation, if no constraints are supplied
|
||||
if not key_specs:
|
||||
return
|
||||
|
||||
# Check whether all required keys are present
|
||||
required_keys = [key for key, spec in six.iteritems(key_specs)
|
||||
if spec.get('required')]
|
||||
|
||||
if required_keys:
|
||||
msg = _verify_dict_keys(required_keys, data, False)
|
||||
if msg:
|
||||
return msg
|
||||
|
||||
# Perform validation and conversion of all values
|
||||
# according to the specifications.
|
||||
for key, key_validator in [(k, v) for k, v in six.iteritems(key_specs)
|
||||
if k in data]:
|
||||
msg = _validate_dict_item(key, key_validator, data)
|
||||
if msg:
|
||||
return msg
|
||||
|
||||
|
||||
def validate_dict_or_none(data, key_specs=None):
|
||||
if data is not None:
|
||||
return validate_dict(data, key_specs)
|
||||
|
||||
|
||||
def validate_dict_or_empty(data, key_specs=None):
|
||||
if data != {}:
|
||||
return validate_dict(data, key_specs)
|
||||
|
||||
|
||||
def validate_dict_or_nodata(data, key_specs=None):
|
||||
if data:
|
||||
return validate_dict(data, key_specs)
|
||||
|
||||
|
||||
def validate_non_negative(data, valid_values=None):
|
||||
try:
|
||||
data = int(data)
|
||||
except (ValueError, TypeError):
|
||||
msg = _("'%s' is not an integer") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
if data < 0:
|
||||
msg = _("'%s' should be non-negative") % data
|
||||
LOG.debug(msg)
|
||||
return msg
|
||||
|
||||
|
||||
# Dictionary that maintains a list of validation functions
|
||||
validators = {'type:dict': validate_dict,
|
||||
'type:dict_or_none': validate_dict_or_none,
|
||||
'type:dict_or_empty': validate_dict_or_empty,
|
||||
'type:dict_or_nodata': validate_dict_or_nodata,
|
||||
'type:fixed_ips': validate_fixed_ips,
|
||||
'type:hostroutes': validate_hostroutes,
|
||||
'type:ip_address': validate_ip_address,
|
||||
'type:ip_address_or_none': validate_ip_address_or_none,
|
||||
'type:ip_pools': validate_ip_pools,
|
||||
'type:mac_address': validate_mac_address,
|
||||
'type:mac_address_or_none': validate_mac_address_or_none,
|
||||
'type:nameservers': validate_nameservers,
|
||||
'type:non_negative': validate_non_negative,
|
||||
'type:range': validate_range,
|
||||
'type:regex': validate_regex,
|
||||
'type:regex_or_none': validate_regex_or_none,
|
||||
'type:string': validate_string,
|
||||
'type:string_or_none': validate_string_or_none,
|
||||
'type:not_empty_string': validate_not_empty_string,
|
||||
'type:not_empty_string_or_none':
|
||||
validate_not_empty_string_or_none,
|
||||
'type:subnet': validate_subnet,
|
||||
'type:subnet_list': validate_subnet_list,
|
||||
'type:subnet_or_none': validate_subnet_or_none,
|
||||
'type:subnetpool_id': validate_subnetpool_id,
|
||||
'type:subnetpool_id_or_none': validate_subnetpool_id_or_none,
|
||||
'type:uuid': validate_uuid,
|
||||
'type:uuid_or_none': validate_uuid_or_none,
|
||||
'type:uuid_list': validate_uuid_list,
|
||||
'type:values': validate_values,
|
||||
'type:boolean': validate_boolean,
|
||||
'type:list_of_unique_strings': validate_list_of_unique_strings}
|
||||
|
||||
|
||||
def add_validator(validation_type, validator):
|
||||
"""Dynamically add a validator.
|
||||
|
||||
This can be used by clients to add their own, private validators, rather
|
||||
than directly modifying the data structure. The clients can NOT modify
|
||||
existing validators.
|
||||
"""
|
||||
key = 'type:' + validation_type
|
||||
if key in validators:
|
||||
raise KeyError("Validator type %s is already defined", validation_type)
|
||||
validators[key] = validator
|
@ -129,8 +129,22 @@ ICMPV6_ALLOWED_TYPES = [130, 131, 132, 135, 136]
|
||||
ICMPV6_TYPE_RA = 134
|
||||
ICMPV6_TYPE_NA = 136
|
||||
|
||||
# Human-readable ID to which the subnetpool ID should be set to
|
||||
# indicate that IPv6 Prefix Delegation is enabled for a given subnetpool
|
||||
IPV6_PD_POOL_ID = 'prefix_delegation'
|
||||
|
||||
# Device names start with "tap"
|
||||
TAP_DEVICE_PREFIX = 'tap'
|
||||
|
||||
# Time format
|
||||
ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
|
||||
|
||||
#############################
|
||||
# Attribute related constants
|
||||
#############################
|
||||
ATTR_NOT_SPECIFIED = object()
|
||||
|
||||
HEX_ELEM = '[0-9A-Fa-f]'
|
||||
UUID_PATTERN = '-'.join([HEX_ELEM + '{8}', HEX_ELEM + '{4}',
|
||||
HEX_ELEM + '{4}', HEX_ELEM + '{4}',
|
||||
HEX_ELEM + '{12}'])
|
||||
|
38
neutron_lib/tests/tools.py
Normal file
38
neutron_lib/tests/tools.py
Normal file
@ -0,0 +1,38 @@
|
||||
# Copyright (c) 2013 NEC Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# Note: _safe_sort_key came from neutron/common/utils.py. For neutron-lib
|
||||
# it is only used for testing, so is placed here.
|
||||
import collections
|
||||
|
||||
|
||||
def _safe_sort_key(value):
|
||||
"""Return value hash or build one for dictionaries."""
|
||||
if isinstance(value, collections.Mapping):
|
||||
return sorted(value.items())
|
||||
return value
|
||||
|
||||
|
||||
class UnorderedList(list):
|
||||
"""A list that is equals to any permutation of itself."""
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, list):
|
||||
return False
|
||||
return (sorted(self, key=_safe_sort_key) ==
|
||||
sorted(other, key=_safe_sort_key))
|
||||
|
||||
def __neq__(self, other):
|
||||
return not self == other
|
0
neutron_lib/tests/unit/api/__init__.py
Normal file
0
neutron_lib/tests/unit/api/__init__.py
Normal file
169
neutron_lib/tests/unit/api/test_conversions.py
Normal file
169
neutron_lib/tests/unit/api/test_conversions.py
Normal file
@ -0,0 +1,169 @@
|
||||
# Copyright 2012 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import testtools
|
||||
|
||||
from neutron_lib.api import converters
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from neutron_lib.tests import base
|
||||
from neutron_lib.tests import tools
|
||||
|
||||
|
||||
class TestConvertToBoolean(base.TestCase):
|
||||
|
||||
def test_convert_to_boolean_bool(self):
|
||||
self.assertIs(converters.convert_to_boolean(True), True)
|
||||
self.assertIs(converters.convert_to_boolean(False), False)
|
||||
|
||||
def test_convert_to_boolean_int(self):
|
||||
self.assertIs(converters.convert_to_boolean(0), False)
|
||||
self.assertIs(converters.convert_to_boolean(1), True)
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
converters.convert_to_boolean,
|
||||
7)
|
||||
|
||||
def test_convert_to_boolean_str(self):
|
||||
self.assertIs(converters.convert_to_boolean('True'), True)
|
||||
self.assertIs(converters.convert_to_boolean('true'), True)
|
||||
self.assertIs(converters.convert_to_boolean('False'), False)
|
||||
self.assertIs(converters.convert_to_boolean('false'), False)
|
||||
self.assertIs(converters.convert_to_boolean('0'), False)
|
||||
self.assertIs(converters.convert_to_boolean('1'), True)
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
converters.convert_to_boolean,
|
||||
'7')
|
||||
|
||||
def test_convert_to_boolean_if_not_none(self):
|
||||
self.assertIsNone(converters.convert_to_boolean_if_not_none(None))
|
||||
self.assertIs(converters.convert_to_boolean_if_not_none(1), True)
|
||||
|
||||
|
||||
class TestConvertToInt(base.TestCase):
|
||||
|
||||
def test_convert_to_int_int(self):
|
||||
self.assertEqual(-1, converters.convert_to_int(-1))
|
||||
self.assertEqual(0, converters.convert_to_int(0))
|
||||
self.assertEqual(1, converters.convert_to_int(1))
|
||||
|
||||
def test_convert_to_int_if_not_none(self):
|
||||
self.assertEqual(-1, converters.convert_to_int_if_not_none(-1))
|
||||
self.assertEqual(0, converters.convert_to_int_if_not_none(0))
|
||||
self.assertEqual(1, converters.convert_to_int_if_not_none(1))
|
||||
self.assertIsNone(converters.convert_to_int_if_not_none(None))
|
||||
|
||||
def test_convert_to_int_str(self):
|
||||
self.assertEqual(4, converters.convert_to_int('4'))
|
||||
self.assertEqual(6, converters.convert_to_int('6'))
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
converters.convert_to_int,
|
||||
'garbage')
|
||||
|
||||
def test_convert_to_int_none(self):
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
converters.convert_to_int,
|
||||
None)
|
||||
|
||||
def test_convert_none_to_empty_list_none(self):
|
||||
self.assertEqual([], converters.convert_none_to_empty_list(None))
|
||||
|
||||
def test_convert_none_to_empty_dict(self):
|
||||
self.assertEqual({}, converters.convert_none_to_empty_dict(None))
|
||||
|
||||
def test_convert_none_to_empty_list_value(self):
|
||||
values = ['1', 3, [], [1], {}, {'a': 3}]
|
||||
for value in values:
|
||||
self.assertEqual(
|
||||
value, converters.convert_none_to_empty_list(value))
|
||||
|
||||
|
||||
class TestConvertToFloat(base.TestCase):
|
||||
# NOTE: the routine being tested here is a plugin-specific extension
|
||||
# module. As the plugin split proceed towards its second phase this
|
||||
# test should either be remove, or the validation routine moved into
|
||||
# neutron.api.v2.attributes
|
||||
|
||||
def test_convert_to_float_positve_value(self):
|
||||
self.assertEqual(
|
||||
1.111, converters.convert_to_positive_float_or_none(1.111))
|
||||
self.assertEqual(1, converters.convert_to_positive_float_or_none(1))
|
||||
self.assertEqual(0, converters.convert_to_positive_float_or_none(0))
|
||||
|
||||
def test_convert_to_float_negative_value(self):
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
converters.convert_to_positive_float_or_none,
|
||||
-1.11)
|
||||
|
||||
def test_convert_to_float_string(self):
|
||||
self.assertEqual(4, converters.convert_to_positive_float_or_none('4'))
|
||||
self.assertEqual(
|
||||
4.44, converters.convert_to_positive_float_or_none('4.44'))
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
converters.convert_to_positive_float_or_none,
|
||||
'garbage')
|
||||
|
||||
def test_convert_to_float_none_value(self):
|
||||
self.assertIsNone(converters.convert_to_positive_float_or_none(None))
|
||||
|
||||
|
||||
class TestConvertKvp(base.TestCase):
|
||||
|
||||
def test_convert_kvp_list_to_dict_succeeds_for_missing_values(self):
|
||||
result = converters.convert_kvp_list_to_dict(['True'])
|
||||
self.assertEqual({}, result)
|
||||
|
||||
def test_convert_kvp_list_to_dict_succeeds_for_multiple_values(self):
|
||||
result = converters.convert_kvp_list_to_dict(
|
||||
['a=b', 'a=c', 'a=c', 'b=a'])
|
||||
expected = {'a': tools.UnorderedList(['c', 'b']), 'b': ['a']}
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_convert_kvp_list_to_dict_succeeds_for_values(self):
|
||||
result = converters.convert_kvp_list_to_dict(['a=b', 'c=d'])
|
||||
self.assertEqual({'a': ['b'], 'c': ['d']}, result)
|
||||
|
||||
def test_convert_kvp_str_to_list_fails_for_missing_key(self):
|
||||
with testtools.ExpectedException(n_exc.InvalidInput):
|
||||
converters.convert_kvp_str_to_list('=a')
|
||||
|
||||
def test_convert_kvp_str_to_list_fails_for_missing_equals(self):
|
||||
with testtools.ExpectedException(n_exc.InvalidInput):
|
||||
converters.convert_kvp_str_to_list('a')
|
||||
|
||||
def test_convert_kvp_str_to_list_succeeds_for_one_equals(self):
|
||||
result = converters.convert_kvp_str_to_list('a=')
|
||||
self.assertEqual(['a', ''], result)
|
||||
|
||||
def test_convert_kvp_str_to_list_succeeds_for_two_equals(self):
|
||||
result = converters.convert_kvp_str_to_list('a=a=a')
|
||||
self.assertEqual(['a', 'a=a'], result)
|
||||
|
||||
|
||||
class TestConvertToList(base.TestCase):
|
||||
|
||||
def test_convert_to_empty_list(self):
|
||||
for item in (None, [], (), {}):
|
||||
self.assertEqual([], converters.convert_to_list(item))
|
||||
|
||||
def test_convert_to_list_string(self):
|
||||
for item in ('', 'foo'):
|
||||
self.assertEqual([item], converters.convert_to_list(item))
|
||||
|
||||
def test_convert_to_list_iterable(self):
|
||||
for item in ([None], [1, 2, 3], (1, 2, 3), set([1, 2, 3]), ['foo']):
|
||||
self.assertEqual(list(item), converters.convert_to_list(item))
|
||||
|
||||
def test_convert_to_list_non_iterable(self):
|
||||
for item in (True, False, 1, 1.2, object()):
|
||||
self.assertEqual([item], converters.convert_to_list(item))
|
840
neutron_lib/tests/unit/api/test_validators.py
Normal file
840
neutron_lib/tests/unit/api/test_validators.py
Normal file
@ -0,0 +1,840 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import string
|
||||
|
||||
import mock
|
||||
import netaddr
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
from neutron_lib.api import converters
|
||||
from neutron_lib.api import validators
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from neutron_lib.tests import base
|
||||
|
||||
|
||||
def dummy_validator(data, valid_values=None):
|
||||
pass
|
||||
|
||||
|
||||
class TestAttributeValidation(base.TestCase):
|
||||
|
||||
def _construct_dict_and_constraints(self):
|
||||
"""Constructs a test dictionary and a definition of constraints.
|
||||
|
||||
:return: A (dictionary, constraint) tuple
|
||||
"""
|
||||
|
||||
constraints = {'key1': {'type:values': ['val1', 'val2'],
|
||||
'required': True},
|
||||
'key2': {'type:string': None,
|
||||
'required': False},
|
||||
'key3': {'type:dict': {'k4': {'type:string': None,
|
||||
'required': True}},
|
||||
'required': True}}
|
||||
|
||||
dictionary = {'key1': 'val1',
|
||||
'key2': 'a string value',
|
||||
'key3': {'k4': 'a string value'}}
|
||||
|
||||
return dictionary, constraints
|
||||
|
||||
def test_adding_validator(self):
|
||||
validators.add_validator('new_type', dummy_validator)
|
||||
self.assertIn('type:new_type', validators.validators)
|
||||
self.assertEqual(dummy_validator,
|
||||
validators.validators['type:new_type'])
|
||||
|
||||
def test_fail_adding_duplicate_validator(self):
|
||||
self.assertRaises(KeyError,
|
||||
validators.add_validator,
|
||||
'dict', dummy_validator)
|
||||
|
||||
def test_is_attr_set(self):
|
||||
data = constants.ATTR_NOT_SPECIFIED
|
||||
self.assertIs(validators.is_attr_set(data), False)
|
||||
|
||||
data = None
|
||||
self.assertIs(validators.is_attr_set(data), False)
|
||||
|
||||
data = "I'm set"
|
||||
self.assertIs(validators.is_attr_set(data), True)
|
||||
|
||||
def test_validate_values(self):
|
||||
msg = validators.validate_values(4, [4, 6])
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_values(4, (4, 6))
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_values(7, [4, 6])
|
||||
self.assertEqual("'7' is not in [4, 6]", msg)
|
||||
|
||||
msg = validators.validate_values(7, (4, 6))
|
||||
self.assertEqual("'7' is not in (4, 6)", msg)
|
||||
|
||||
def test_validate_not_empty_string(self):
|
||||
msg = validators.validate_not_empty_string(' ', None)
|
||||
self.assertEqual(u"' ' Blank strings are not permitted", msg)
|
||||
msg = validators.validate_not_empty_string(123, None)
|
||||
self.assertEqual(u"'123' is not a valid string", msg)
|
||||
|
||||
def test_validate_not_empty_string_or_none(self):
|
||||
msg = validators.validate_not_empty_string_or_none(' ', None)
|
||||
self.assertEqual(u"' ' Blank strings are not permitted", msg)
|
||||
|
||||
msg = validators.validate_not_empty_string_or_none(None, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_string_or_none(self):
|
||||
msg = validators.validate_string_or_none('test', None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_string_or_none(None, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_string(self):
|
||||
msg = validators.validate_string(None, None)
|
||||
self.assertEqual("'None' is not a valid string", msg)
|
||||
|
||||
# 0 == len(data) == max_len
|
||||
msg = validators.validate_string("", 0)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# 0 == len(data) < max_len
|
||||
msg = validators.validate_string("", 9)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# 0 < len(data) < max_len
|
||||
msg = validators.validate_string("123456789", 10)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# 0 < len(data) == max_len
|
||||
msg = validators.validate_string("123456789", 9)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# 0 < max_len < len(data)
|
||||
msg = validators.validate_string("1234567890", 9)
|
||||
self.assertEqual("'1234567890' exceeds maximum length of 9", msg)
|
||||
|
||||
msg = validators.validate_string("123456789", None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_list_of_unique_strings(self):
|
||||
data = "TEST"
|
||||
msg = validators.validate_list_of_unique_strings(data, None)
|
||||
self.assertEqual("'TEST' is not a list", msg)
|
||||
|
||||
data = ["TEST01", "TEST02", "TEST01"]
|
||||
msg = validators.validate_list_of_unique_strings(data, None)
|
||||
self.assertEqual(
|
||||
"Duplicate items in the list: 'TEST01, TEST02, TEST01'", msg)
|
||||
|
||||
data = ["12345678", "123456789"]
|
||||
msg = validators.validate_list_of_unique_strings(data, 8)
|
||||
self.assertEqual("'123456789' exceeds maximum length of 8", msg)
|
||||
|
||||
data = ["TEST01", "TEST02", "TEST03"]
|
||||
msg = validators.validate_list_of_unique_strings(data, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_boolean(self):
|
||||
msg = validators.validate_boolean(True)
|
||||
self.assertIsNone(msg)
|
||||
msg = validators.validate_boolean(0)
|
||||
self.assertIsNone(msg)
|
||||
msg = validators.validate_boolean("false")
|
||||
self.assertIsNone(msg)
|
||||
msg = validators.validate_boolean("fasle")
|
||||
self.assertEqual("'fasle' is not a valid boolean value", msg)
|
||||
|
||||
def test_validate_no_whitespace(self):
|
||||
data = 'no_white_space'
|
||||
result = validators.validate_no_whitespace(data)
|
||||
self.assertEqual(data, result)
|
||||
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
validators.validate_no_whitespace,
|
||||
'i have whitespace')
|
||||
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
validators.validate_no_whitespace,
|
||||
'i\thave\twhitespace')
|
||||
|
||||
for ws in string.whitespace:
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
validators.validate_no_whitespace,
|
||||
'%swhitespace-at-head' % ws)
|
||||
self.assertRaises(n_exc.InvalidInput,
|
||||
validators.validate_no_whitespace,
|
||||
'whitespace-at-tail%s' % ws)
|
||||
|
||||
def test_validate_range(self):
|
||||
msg = validators.validate_range(1, [1, 9])
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_range(5, [1, 9])
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_range(9, [1, 9])
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_range(1, (1, 9))
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_range(5, (1, 9))
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_range(9, (1, 9))
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_range(0, [1, 9])
|
||||
self.assertEqual("'0' is too small - must be at least '1'", msg)
|
||||
|
||||
msg = validators.validate_range(10, (1, 9))
|
||||
self.assertEqual("'10' is too large - must be no larger than '9'", msg)
|
||||
|
||||
msg = validators.validate_range("bogus", (1, 9))
|
||||
self.assertEqual("'bogus' is not an integer", msg)
|
||||
|
||||
msg = validators.validate_range(10, (validators.UNLIMITED,
|
||||
validators.UNLIMITED))
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_range(10, (1, validators.UNLIMITED))
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_range(1, (validators.UNLIMITED, 9))
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_range(-1, (0, validators.UNLIMITED))
|
||||
self.assertEqual("'-1' is too small - must be at least '0'", msg)
|
||||
|
||||
msg = validators.validate_range(10, (validators.UNLIMITED, 9))
|
||||
self.assertEqual("'10' is too large - must be no larger than '9'", msg)
|
||||
|
||||
def _test_validate_mac_address(self, validator, allow_none=False):
|
||||
mac_addr = "ff:16:3e:4f:00:00"
|
||||
msg = validator(mac_addr)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
mac_addr = "ffa:16:3e:4f:00:00"
|
||||
msg = validator(mac_addr)
|
||||
err_msg = "'%s' is not a valid MAC address"
|
||||
self.assertEqual(err_msg % mac_addr, msg)
|
||||
|
||||
for invalid_mac_addr in constants.INVALID_MAC_ADDRESSES:
|
||||
msg = validator(invalid_mac_addr)
|
||||
self.assertEqual(err_msg % invalid_mac_addr, msg)
|
||||
|
||||
mac_addr = "123"
|
||||
msg = validator(mac_addr)
|
||||
self.assertEqual(err_msg % mac_addr, msg)
|
||||
|
||||
mac_addr = None
|
||||
msg = validator(mac_addr)
|
||||
if allow_none:
|
||||
self.assertIsNone(msg)
|
||||
else:
|
||||
self.assertEqual(err_msg % mac_addr, msg)
|
||||
|
||||
mac_addr = "ff:16:3e:4f:00:00\r"
|
||||
msg = validator(mac_addr)
|
||||
self.assertEqual(err_msg % mac_addr, msg)
|
||||
|
||||
def test_validate_mac_address(self):
|
||||
self._test_validate_mac_address(validators.validate_mac_address)
|
||||
|
||||
def test_validate_mac_address_or_none(self):
|
||||
self._test_validate_mac_address(
|
||||
validators.validate_mac_address_or_none, allow_none=True)
|
||||
|
||||
def test_validate_ip_address(self):
|
||||
ip_addr = '1.1.1.1'
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
ip_addr = '1111.1.1.1'
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
|
||||
|
||||
# Depending on platform to run UTs, this case might or might not be
|
||||
# an equivalent to test_validate_ip_address_bsd.
|
||||
ip_addr = '1' * 59
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
|
||||
|
||||
ip_addr = '1.1.1.1 has whitespace'
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
|
||||
|
||||
ip_addr = '111.1.1.1\twhitespace'
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
|
||||
|
||||
ip_addr = '111.1.1.1\nwhitespace'
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
|
||||
|
||||
for ws in string.whitespace:
|
||||
ip_addr = '%s111.1.1.1' % ws
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
|
||||
|
||||
for ws in string.whitespace:
|
||||
ip_addr = '111.1.1.1%s' % ws
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
|
||||
|
||||
def test_validate_ip_address_with_leading_zero(self):
|
||||
ip_addr = '1.1.1.01'
|
||||
expected_msg = ("'%(data)s' is not an accepted IP address, "
|
||||
"'%(ip)s' is recommended")
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.1'},
|
||||
msg)
|
||||
|
||||
ip_addr = '1.1.1.011'
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.11'},
|
||||
msg)
|
||||
|
||||
ip_addr = '1.1.1.09'
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.9'},
|
||||
msg)
|
||||
|
||||
ip_addr = "fe80:0:0:0:0:0:0:0001"
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_ip_address_bsd(self):
|
||||
# NOTE(yamamoto): On NetBSD and OS X, netaddr.IPAddress() accepts
|
||||
# '1' * 59 as a valid address. The behaviour is inherited from
|
||||
# libc behaviour there. This test ensures that our validator reject
|
||||
# such addresses on such platforms by mocking netaddr to emulate
|
||||
# the behaviour.
|
||||
ip_addr = '1' * 59
|
||||
with mock.patch('netaddr.IPAddress') as ip_address_cls:
|
||||
msg = validators.validate_ip_address(ip_addr)
|
||||
ip_address_cls.assert_called_once_with(ip_addr,
|
||||
flags=netaddr.core.ZEROFILL)
|
||||
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
|
||||
|
||||
def test_validate_ip_pools(self):
|
||||
pools = [[{'end': '10.0.0.254'}],
|
||||
[{'start': '10.0.0.254'}],
|
||||
[{'start': '1000.0.0.254',
|
||||
'end': '1.1.1.1'}],
|
||||
[{'start': '10.0.0.2', 'end': '10.0.0.254',
|
||||
'forza': 'juve'}],
|
||||
[{'start': '10.0.0.2', 'end': '10.0.0.254'},
|
||||
{'end': '10.0.0.254'}],
|
||||
[None],
|
||||
None]
|
||||
for pool in pools:
|
||||
msg = validators.validate_ip_pools(pool)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
pools = [[{'end': '10.0.0.254', 'start': '10.0.0.2'},
|
||||
{'start': '11.0.0.2', 'end': '11.1.1.1'}],
|
||||
[{'start': '11.0.0.2', 'end': '11.0.0.100'}]]
|
||||
for pool in pools:
|
||||
msg = validators.validate_ip_pools(pool)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
invalid_ip = '10.0.0.2\r'
|
||||
pools = [[{'end': '10.0.0.254', 'start': invalid_ip}]]
|
||||
for pool in pools:
|
||||
msg = validators.validate_ip_pools(pool)
|
||||
self.assertEqual("'%s' is not a valid IP address" % invalid_ip,
|
||||
msg)
|
||||
|
||||
def test_validate_fixed_ips(self):
|
||||
fixed_ips = [
|
||||
{'data': [{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
|
||||
'ip_address': '1111.1.1.1'}],
|
||||
'error_msg': "'1111.1.1.1' is not a valid IP address"},
|
||||
{'data': [{'subnet_id': 'invalid',
|
||||
'ip_address': '1.1.1.1'}],
|
||||
'error_msg': "'invalid' is not a valid UUID"},
|
||||
{'data': None,
|
||||
'error_msg': "Invalid data format for fixed IP: 'None'"},
|
||||
{'data': "1.1.1.1",
|
||||
'error_msg': "Invalid data format for fixed IP: '1.1.1.1'"},
|
||||
{'data': ['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1'],
|
||||
'error_msg': "Invalid data format for fixed IP: "
|
||||
"'00000000-ffff-ffff-ffff-000000000000'"},
|
||||
{'data': [['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1']],
|
||||
'error_msg': "Invalid data format for fixed IP: "
|
||||
"'['00000000-ffff-ffff-ffff-000000000000', "
|
||||
"'1.1.1.1']'"},
|
||||
{'data': [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
|
||||
'ip_address': '1.1.1.1'},
|
||||
{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
|
||||
'ip_address': '1.1.1.1'}],
|
||||
'error_msg': "Duplicate IP address '1.1.1.1'"}]
|
||||
for fixed in fixed_ips:
|
||||
msg = validators.validate_fixed_ips(fixed['data'])
|
||||
self.assertEqual(fixed['error_msg'], msg)
|
||||
|
||||
fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
|
||||
'ip_address': '1.1.1.1'}],
|
||||
[{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
|
||||
'ip_address': '1.1.1.1'},
|
||||
{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
|
||||
'ip_address': '1.1.1.2'}]]
|
||||
for fixed in fixed_ips:
|
||||
msg = validators.validate_fixed_ips(fixed)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_nameservers(self):
|
||||
ns_pools = [['1.1.1.2', '1.1.1.2'],
|
||||
['www.hostname.com', 'www.hostname.com'],
|
||||
['1000.0.0.1'],
|
||||
['www.hostname.com'],
|
||||
['www.great.marathons.to.travel'],
|
||||
['valid'],
|
||||
['77.hostname.com'],
|
||||
['1' * 59],
|
||||
['www.internal.hostname.com'],
|
||||
None]
|
||||
|
||||
for ns in ns_pools:
|
||||
msg = validators.validate_nameservers(ns, None)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
ns_pools = [['100.0.0.2'],
|
||||
['1.1.1.1', '1.1.1.2']]
|
||||
|
||||
for ns in ns_pools:
|
||||
msg = validators.validate_nameservers(ns, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_hostroutes(self):
|
||||
hostroute_pools = [[{'destination': '100.0.0.0/24'}],
|
||||
[{'nexthop': '10.0.2.20'}],
|
||||
[{'nexthop': '10.0.2.20',
|
||||
'forza': 'juve',
|
||||
'destination': '100.0.0.0/8'}],
|
||||
[{'nexthop': '1110.0.2.20',
|
||||
'destination': '100.0.0.0/8'}],
|
||||
[{'nexthop': '10.0.2.20',
|
||||
'destination': '100.0.0.0'}],
|
||||
[{'nexthop': '10.0.2.20',
|
||||
'destination': '100.0.0.0/8'},
|
||||
{'nexthop': '10.0.2.20',
|
||||
'destination': '100.0.0.0/8'}],
|
||||
[None],
|
||||
None]
|
||||
for host_routes in hostroute_pools:
|
||||
msg = validators.validate_hostroutes(host_routes, None)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
hostroute_pools = [[{'destination': '100.0.0.0/24',
|
||||
'nexthop': '10.0.2.20'}],
|
||||
[{'nexthop': '10.0.2.20',
|
||||
'destination': '100.0.0.0/8'},
|
||||
{'nexthop': '10.0.2.20',
|
||||
'destination': '101.0.0.0/8'}]]
|
||||
for host_routes in hostroute_pools:
|
||||
msg = validators.validate_hostroutes(host_routes, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_ip_address_or_none(self):
|
||||
ip_addr = None
|
||||
msg = validators.validate_ip_address_or_none(ip_addr)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
ip_addr = '1.1.1.1'
|
||||
msg = validators.validate_ip_address_or_none(ip_addr)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
ip_addr = '1111.1.1.1'
|
||||
msg = validators.validate_ip_address_or_none(ip_addr)
|
||||
self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
|
||||
|
||||
def test_uuid_pattern(self):
|
||||
data = 'garbage'
|
||||
msg = validators.validate_regex(data, constants.UUID_PATTERN)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
data = '00000000-ffff-ffff-ffff-000000000000'
|
||||
msg = validators.validate_regex(data, constants.UUID_PATTERN)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_mac_pattern(self):
|
||||
# Valid - 3 octets
|
||||
base_mac = "fa:16:3e:00:00:00"
|
||||
msg = validators.validate_regex(base_mac, validators.MAC_PATTERN)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Valid - 4 octets
|
||||
base_mac = "fa:16:3e:4f:00:00"
|
||||
msg = validators.validate_regex(base_mac, validators.MAC_PATTERN)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Invalid - not unicast
|
||||
base_mac = "01:16:3e:4f:00:00"
|
||||
msg = validators.validate_regex(base_mac, validators.MAC_PATTERN)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
# Invalid - invalid format
|
||||
base_mac = "a:16:3e:4f:00:00"
|
||||
msg = validators.validate_regex(base_mac, validators.MAC_PATTERN)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
# Invalid - invalid format
|
||||
base_mac = "ffa:16:3e:4f:00:00"
|
||||
msg = validators.validate_regex(base_mac, validators.MAC_PATTERN)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
# Invalid - invalid format
|
||||
base_mac = "01163e4f0000"
|
||||
msg = validators.validate_regex(base_mac, validators.MAC_PATTERN)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
# Invalid - invalid format
|
||||
base_mac = "01-16-3e-4f-00-00"
|
||||
msg = validators.validate_regex(base_mac, validators.MAC_PATTERN)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
# Invalid - invalid format
|
||||
base_mac = "00:16:3:f:00:00"
|
||||
msg = validators.validate_regex(base_mac, validators.MAC_PATTERN)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
# Invalid - invalid format
|
||||
base_mac = "12:3:4:5:67:89ab"
|
||||
msg = validators.validate_regex(base_mac, validators.MAC_PATTERN)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
def _test_validate_subnet(self, validator, allow_none=False):
|
||||
# Valid - IPv4
|
||||
cidr = "10.0.2.0/24"
|
||||
msg = validator(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Valid - IPv6 without final octets
|
||||
cidr = "fe80::/24"
|
||||
msg = validator(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Valid - IPv6 with final octets
|
||||
cidr = "fe80::/24"
|
||||
msg = validator(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Valid - uncompressed ipv6 address
|
||||
cidr = "fe80:0:0:0:0:0:0:0/128"
|
||||
msg = validator(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Valid - ipv6 address with multiple consecutive zero
|
||||
cidr = "2001:0db8:0:0:1::1/128"
|
||||
msg = validator(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Valid - ipv6 address with multiple consecutive zero
|
||||
cidr = "2001:0db8::1:0:0:1/128"
|
||||
msg = validator(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Valid - ipv6 address with multiple consecutive zero
|
||||
cidr = "2001::0:1:0:0:1100/120"
|
||||
msg = validator(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Invalid - abbreviated ipv4 address
|
||||
cidr = "10/24"
|
||||
msg = validator(cidr, None)
|
||||
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||
" '%(cidr)s' is recommended") % {"data": cidr,
|
||||
"cidr": "10.0.0.0/24"}
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
# Invalid - IPv4 missing mask
|
||||
cidr = "10.0.2.0"
|
||||
msg = validator(cidr, None)
|
||||
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||
" '%(cidr)s' is recommended") % {"data": cidr,
|
||||
"cidr": "10.0.2.0/32"}
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
# Valid - IPv4 with non-zero masked bits is ok
|
||||
for i in range(1, 255):
|
||||
cidr = "192.168.1.%s/24" % i
|
||||
msg = validator(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Invalid - IPv6 without final octets, missing mask
|
||||
cidr = "fe80::"
|
||||
msg = validator(cidr, None)
|
||||
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||
" '%(cidr)s' is recommended") % {"data": cidr,
|
||||
"cidr": "fe80::/128"}
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
# Invalid - IPv6 with final octets, missing mask
|
||||
cidr = "fe80::0"
|
||||
msg = validator(cidr, None)
|
||||
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
|
||||
" '%(cidr)s' is recommended") % {"data": cidr,
|
||||
"cidr": "fe80::/128"}
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
# Invalid - Address format error
|
||||
cidr = 'invalid'
|
||||
msg = validator(cidr, None)
|
||||
error = "'%s' is not a valid IP subnet" % cidr
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
cidr = None
|
||||
msg = validator(cidr, None)
|
||||
if allow_none:
|
||||
self.assertIsNone(msg)
|
||||
else:
|
||||
error = "'%s' is not a valid IP subnet" % cidr
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
# Invalid - IPv4 with trailing CR
|
||||
cidr = "10.0.2.0/24\r"
|
||||
msg = validator(cidr, None)
|
||||
error = "'%s' is not a valid IP subnet" % cidr
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
def test_validate_subnet(self):
|
||||
self._test_validate_subnet(validators.validate_subnet)
|
||||
|
||||
def test_validate_subnet_or_none(self):
|
||||
self._test_validate_subnet(validators.validate_subnet_or_none,
|
||||
allow_none=True)
|
||||
|
||||
def test_validate_subnet_list(self):
|
||||
msg = validators.validate_subnet_list('abc')
|
||||
self.assertEqual(u"'abc' is not a list", msg)
|
||||
msg = validators.validate_subnet_list(['10.1.0.0/24',
|
||||
'10.2.0.0/24',
|
||||
'10.1.0.0/24'])
|
||||
self.assertEqual(u"Duplicate items in the list: '10.1.0.0/24, "
|
||||
u"10.2.0.0/24, 10.1.0.0/24'", msg)
|
||||
msg = validators.validate_subnet_list(['10.1.0.0/24', '10.2.0.0'])
|
||||
self.assertEqual(u"'10.2.0.0' isn't a recognized IP subnet cidr, "
|
||||
u"'10.2.0.0/32' is recommended", msg)
|
||||
|
||||
def _test_validate_regex(self, validator, allow_none=False):
|
||||
pattern = '[hc]at'
|
||||
|
||||
data = None
|
||||
msg = validator(data, pattern)
|
||||
if allow_none:
|
||||
self.assertIsNone(msg)
|
||||
else:
|
||||
self.assertEqual("'None' is not a valid input", msg)
|
||||
|
||||
data = 'bat'
|
||||
msg = validator(data, pattern)
|
||||
self.assertEqual("'%s' is not a valid input" % data, msg)
|
||||
|
||||
data = 'hat'
|
||||
msg = validator(data, pattern)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
data = 'cat'
|
||||
msg = validator(data, pattern)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_regex(self):
|
||||
self._test_validate_regex(validators.validate_regex)
|
||||
|
||||
def test_validate_regex_or_none(self):
|
||||
self._test_validate_regex(validators.validate_regex_or_none,
|
||||
allow_none=True)
|
||||
|
||||
def test_validate_subnetpool_id(self):
|
||||
msg = validators.validate_subnetpool_id(constants.IPV6_PD_POOL_ID)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_subnetpool_id(
|
||||
'00000000-ffff-ffff-ffff-000000000000')
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_subnetpool_id_or_none(self):
|
||||
msg = validators.validate_subnetpool_id_or_none(None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = validators.validate_subnetpool_id_or_none(
|
||||
'00000000-ffff-ffff-ffff-000000000000')
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_uuid(self):
|
||||
invalid_uuids = [None,
|
||||
123,
|
||||
'123',
|
||||
't5069610-744b-42a7-8bd8-ceac1a229cd4',
|
||||
'e5069610-744bb-42a7-8bd8-ceac1a229cd4']
|
||||
for uuid in invalid_uuids:
|
||||
msg = validators.validate_uuid(uuid)
|
||||
error = "'%s' is not a valid UUID" % uuid
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
msg = validators.validate_uuid('00000000-ffff-ffff-ffff-000000000000')
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test__validate_list_of_items(self):
|
||||
# check not a list
|
||||
items = [None,
|
||||
123,
|
||||
'e5069610-744b-42a7-8bd8-ceac1a229cd4',
|
||||
'12345678123456781234567812345678',
|
||||
{'uuid': 'e5069610-744b-42a7-8bd8-ceac1a229cd4'}]
|
||||
for item in items:
|
||||
msg = validators._validate_list_of_items(mock.Mock(), item)
|
||||
error = "'%s' is not a list" % item
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
# check duplicate items in a list
|
||||
duplicate_items = ['e5069610-744b-42a7-8bd8-ceac1a229cd4',
|
||||
'f3eeab00-8367-4524-b662-55e64d4cacb5',
|
||||
'e5069610-744b-42a7-8bd8-ceac1a229cd4']
|
||||
msg = validators._validate_list_of_items(mock.Mock(), duplicate_items)
|
||||
error = ("Duplicate items in the list: "
|
||||
"'%s'" % ', '.join(duplicate_items))
|
||||
self.assertEqual(error, msg)
|
||||
|
||||
# check valid lists
|
||||
valid_lists = [[],
|
||||
[1, 2, 3],
|
||||
['a', 'b', 'c']]
|
||||
for list_obj in valid_lists:
|
||||
msg = validators._validate_list_of_items(
|
||||
mock.Mock(return_value=None), list_obj)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_dict_type(self):
|
||||
for value in (None, True, '1', []):
|
||||
self.assertEqual("'%s' is not a dictionary" % value,
|
||||
validators.validate_dict(value))
|
||||
|
||||
def test_validate_dict_without_constraints(self):
|
||||
msg = validators.validate_dict({})
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Validate a dictionary without constraints.
|
||||
msg = validators.validate_dict({'key': 'value'})
|
||||
self.assertIsNone(msg)
|
||||
|
||||
def test_validate_a_valid_dict_with_constraints(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
msg = validators.validate_dict(dictionary, constraints)
|
||||
self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
|
||||
|
||||
def test_validate_dict_with_invalid_validator(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
constraints['key1'] = {'type:unsupported': None, 'required': True}
|
||||
msg = validators.validate_dict(dictionary, constraints)
|
||||
self.assertEqual("Validator 'type:unsupported' does not exist.", msg)
|
||||
|
||||
def test_validate_dict_not_required_keys(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
del dictionary['key2']
|
||||
msg = validators.validate_dict(dictionary, constraints)
|
||||
self.assertIsNone(msg, 'Field that was not required by the specs was'
|
||||
'required by the validator.')
|
||||
|
||||
def test_validate_dict_required_keys(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
del dictionary['key1']
|
||||
msg = validators.validate_dict(dictionary, constraints)
|
||||
self.assertIn('Expected keys:', msg)
|
||||
|
||||
def test_validate_dict_wrong_values(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
dictionary['key1'] = 'UNSUPPORTED'
|
||||
msg = validators.validate_dict(dictionary, constraints)
|
||||
self.assertIsNotNone(msg)
|
||||
|
||||
def test_validate_dict_convert_boolean(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
constraints['key_bool'] = {
|
||||
'type:boolean': None,
|
||||
'required': False,
|
||||
'convert_to': converters.convert_to_boolean}
|
||||
dictionary['key_bool'] = 'true'
|
||||
msg = validators.validate_dict(dictionary, constraints)
|
||||
self.assertIsNone(msg)
|
||||
# Explicitly comparing with literal 'True' as assertTrue
|
||||
# succeeds also for 'true'
|
||||
self.assertIs(True, dictionary['key_bool'])
|
||||
|
||||
def test_subdictionary(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
del dictionary['key3']['k4']
|
||||
dictionary['key3']['k5'] = 'a string value'
|
||||
msg = validators.validate_dict(dictionary, constraints)
|
||||
self.assertIn('Expected keys:', msg)
|
||||
|
||||
def test_validate_dict_or_none(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
# Check whether None is a valid value.
|
||||
msg = validators.validate_dict_or_none(None, constraints)
|
||||
self.assertIsNone(msg, 'Validation of a None dictionary failed.')
|
||||
|
||||
# Check validation of a regular dictionary.
|
||||
msg = validators.validate_dict_or_none(dictionary, constraints)
|
||||
self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
|
||||
|
||||
def test_validate_dict_or_empty(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
# Check whether an empty dictionary is valid.
|
||||
msg = validators.validate_dict_or_empty({}, constraints)
|
||||
self.assertIsNone(msg, 'Validation of a None dictionary failed.')
|
||||
|
||||
# Check validation of a regular dictionary.
|
||||
msg = validators.validate_dict_or_empty(dictionary, constraints)
|
||||
self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
|
||||
|
||||
def test_validate_dict_or_nodata(self):
|
||||
dictionary, constraints = self._construct_dict_and_constraints()
|
||||
|
||||
# Check whether no data is a valid value.
|
||||
msg = validators.validate_dict_or_nodata(None, constraints)
|
||||
self.assertIsNone(msg, 'Validation of None for no-data failed.')
|
||||
msg = validators.validate_dict_or_nodata({}, constraints)
|
||||
self.assertIsNone(msg, 'Validation of empty dict for no-data failed.')
|
||||
|
||||
# Check validation of a regular dictionary.
|
||||
msg = validators.validate_dict_or_nodata(dictionary, constraints)
|
||||
self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
|
||||
|
||||
def test_validate_non_negative(self):
|
||||
msg = validators.validate_non_negative('abc')
|
||||
self.assertEqual("'abc' is not an integer", msg)
|
||||
|
||||
for value in (-1, '-2'):
|
||||
self.assertEqual("'%s' should be non-negative" % value,
|
||||
validators.validate_non_negative(value))
|
||||
|
||||
for value in (0, 1, '2', True, False):
|
||||
msg = validators.validate_non_negative(value)
|
||||
self.assertIsNone(msg)
|
Loading…
x
Reference in New Issue
Block a user