From 7d9980f7cab92cc70b194c09b8aae510e945bfaf Mon Sep 17 00:00:00 2001 From: Paul Michali Date: Thu, 17 Dec 2015 14:45:03 +0000 Subject: [PATCH] API validators and converters This brings over the validation and convert_to methods from Neutron. Additional tests were added to improve coverage, and some supporting methods were added for these. Renamed validators to remove underscore, so that they can be imported into neutron/api/v2/attributes.py. Added devref docs to help developers in creating and using validators and converters. Change-Id: I81394dff69b816146e521bcd3e9641761178d6fd Implements: blueprint neutron-lib --- doc/source/devref/api_converters.rst | 86 ++ doc/source/devref/api_validators.rst | 102 +++ doc/source/devref/index.rst | 2 + neutron_lib/api/__init__.py | 0 neutron_lib/api/converters.py | 119 +++ neutron_lib/api/validators.py | 536 +++++++++++ neutron_lib/constants.py | 14 + neutron_lib/tests/tools.py | 38 + neutron_lib/tests/unit/api/__init__.py | 0 .../tests/unit/api/test_conversions.py | 169 ++++ neutron_lib/tests/unit/api/test_validators.py | 840 ++++++++++++++++++ 11 files changed, 1906 insertions(+) create mode 100644 doc/source/devref/api_converters.rst create mode 100644 doc/source/devref/api_validators.rst create mode 100644 neutron_lib/api/__init__.py create mode 100644 neutron_lib/api/converters.py create mode 100644 neutron_lib/api/validators.py create mode 100644 neutron_lib/tests/tools.py create mode 100644 neutron_lib/tests/unit/api/__init__.py create mode 100644 neutron_lib/tests/unit/api/test_conversions.py create mode 100644 neutron_lib/tests/unit/api/test_validators.py diff --git a/doc/source/devref/api_converters.rst b/doc/source/devref/api_converters.rst new file mode 100644 index 000000000..a44cf5a99 --- /dev/null +++ b/doc/source/devref/api_converters.rst @@ -0,0 +1,86 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + + Convention for heading levels in Neutron devref: + ======= Heading 0 (reserved for the title in a document) + ------- Heading 1 + ~~~~~~~ Heading 2 + +++++++ Heading 3 + ''''''' Heading 4 + (Avoid deeper levels because they do not render well.) + + +API Converters +============== + +Defintions for REST API attributes, can include conversion methods +to help normalize user input or transform the input into a form that +can be used. + + +Defining A Converter Method +--------------------------- + +By convention, the name should start with ``convert_to_``, and will +take a single argument for the data to be converted. The method +should return the converted data (which, if the input is None, +and no conversion is performed, the implicit None returned by the +method may be used). If the conversion is impossible, an +InvalidInput exception should be raised, indicating what is wrong. +For example, here is one that converts a variety of user inputs +to a boolean value. +:: + + def convert_to_boolean(data): + if isinstance(data, six.string_types): + val = data.lower() + if val == "true" or val == "1": + return True + if val == "false" or val == "0": + return False + elif isinstance(data, bool): + return data + elif isinstance(data, int): + if data == 0: + return False + elif data == 1: + return True + msg = _("'%s' cannot be converted to boolean") % data + raise n_exc.InvalidInput(error_message=msg) + + +Using Validators +---------------- + +In client code, the conversion can be used in a REST API +definition, by specifying the name of the method as a value for +the 'convert_to' key on an attribute. For example: + +:: + + 'admin_state_up': {'allow_post': True, 'allow_put': True, + 'default': True, + 'convert_to': conversions.convert_to_boolean, + 'is_visible': True}, + +Here, the admin_state_up is a boolean, so the converter is used to +take user's (string) input and transform it to a boolean. + + +Test The Validator +------------------ + +Do the right thing, and make sure you've created a unit test for any +converter that you add to verify that it works as expected. + diff --git a/doc/source/devref/api_validators.rst b/doc/source/devref/api_validators.rst new file mode 100644 index 000000000..cf87d2f30 --- /dev/null +++ b/doc/source/devref/api_validators.rst @@ -0,0 +1,102 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + + Convention for heading levels in Neutron devref: + ======= Heading 0 (reserved for the title in a document) + ------- Heading 1 + ~~~~~~~ Heading 2 + +++++++ Heading 3 + ''''''' Heading 4 + (Avoid deeper levels because they do not render well.) + + +API Validators +============== + +For the REST API, attributes may have custom validators defined. Each +validator will have a method to perform the validation, and a type +definition string, so that the validator can be referenced. + + +Defining A Validator Method +--------------------------- + +The validation method will have a positional argument for the data to +be validated, and may have additional (optional) keyword arguments that +can be used during validation. The method must handle any exceptions +and either return None (success) or a i18n string indicating the +validation failure message. By convention, the method name is prefixed +with ``validate_`` and then includes the data type. For example: + +:: + + def validate_uuid(data, valid_values=None): + if not uuidutils.is_uuid_like(data): + msg = _("'%s' is not a valid UUID") % data + LOG.debug(msg) + return msg + +There is a validation dictionary that maps the method to a validation +type that can be referred to in REST API definitions. An entry in the +dictionary would look like the following: + +:: + + 'type:uuid': validate_uuid, + + +Using Validators +---------------- + +In client code, the valdiator can be used in a REST API by using the +dictionary key for the validator. For example: + +:: + + RESOURCE_ATTRIBUTE_MAP = { + NETWORKS: { + 'id': {'allow_post': False, 'allow_put': False, + 'validate': {'type:uuid': None}, + 'is_visible': True, + 'primary_key': True}, + 'name': {'allow_post': True, 'allow_put': True, + 'validate': {'type:string': NAME_MAX_LEN}, + 'default': '', 'is_visible': True}, + +Here, the networks resource has an 'id' attribute with a UUID validator, +as seen by the 'validate' key containing a dictionary with a key of +'type:uuid'. + +Any addition arguments for the validator can be specified as values for +the dictionary entry (None in this case, NAME_MAX_LEN in the 'name' +attribute that uses a string validator). In a IP version attribute, one +could have a validator defined as follows: + +:: + + 'ip_version': {'allow_post': True, 'allow_put': False, + 'convert_to': conversions.convert_to_int, + 'validate': {'type:values': [4, 6]}, + 'is_visible': True}, + +Here, the valdiate_values() method will take the list of values as the +allowable values that can be specified for this attribute. + +Test The Validator +------------------ + +Do the right thing, and make sure you've created a unit test for any +validator that you add to verify that it works as expected, even for +simple validators. + diff --git a/doc/source/devref/index.rst b/doc/source/devref/index.rst index 835ec344c..948445de8 100644 --- a/doc/source/devref/index.rst +++ b/doc/source/devref/index.rst @@ -32,6 +32,8 @@ Neutron Lib Internals .. toctree:: :maxdepth: 3 + api_converters + api_validators callbacks diff --git a/neutron_lib/api/__init__.py b/neutron_lib/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron_lib/api/converters.py b/neutron_lib/api/converters.py new file mode 100644 index 000000000..ad4b61a0f --- /dev/null +++ b/neutron_lib/api/converters.py @@ -0,0 +1,119 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from neutron_lib._i18n import _ +from neutron_lib import exceptions as n_exc + + +def convert_to_boolean(data): + if isinstance(data, six.string_types): + val = data.lower() + if val == "true" or val == "1": + return True + if val == "false" or val == "0": + return False + elif isinstance(data, bool): + return data + elif isinstance(data, int): + if data == 0: + return False + elif data == 1: + return True + msg = _("'%s' cannot be converted to boolean") % data + raise n_exc.InvalidInput(error_message=msg) + + +def convert_to_boolean_if_not_none(data): + if data is not None: + return convert_to_boolean(data) + + +def convert_to_int(data): + try: + return int(data) + except (ValueError, TypeError): + msg = _("'%s' is not an integer") % data + raise n_exc.InvalidInput(error_message=msg) + + +def convert_to_int_if_not_none(data): + if data is not None: + return convert_to_int(data) + return data + + +def convert_to_positive_float_or_none(val): + # NOTE(salv-orlando): This conversion function is currently used by + # a vendor specific extension only at the moment It is used for + # port's RXTX factor in neutron.plugins.vmware.extensions.qos. + # It is deemed however generic enough to be in this module as it + # might be used in future for other API attributes. + if val is None: + return + try: + val = float(val) + if val < 0: + raise ValueError() + except (ValueError, TypeError): + msg = _("'%s' must be a non negative decimal.") % val + raise n_exc.InvalidInput(error_message=msg) + return val + + +def convert_kvp_str_to_list(data): + """Convert a value of the form 'key=value' to ['key', 'value']. + + :raises: n_exc.InvalidInput if any of the strings are malformed + (e.g. do not contain a key). + """ + kvp = [x.strip() for x in data.split('=', 1)] + if len(kvp) == 2 and kvp[0]: + return kvp + msg = _("'%s' is not of the form =[value]") % data + raise n_exc.InvalidInput(error_message=msg) + + +def convert_kvp_list_to_dict(kvp_list): + """Convert a list of 'key=value' strings to a dict. + + :raises: n_exc.InvalidInput if any of the strings are malformed + (e.g. do not contain a key) or if any + of the keys appear more than once. + """ + if kvp_list == ['True']: + # No values were provided (i.e. '--flag-name') + return {} + kvp_map = {} + for kvp_str in kvp_list: + key, value = convert_kvp_str_to_list(kvp_str) + kvp_map.setdefault(key, set()) + kvp_map[key].add(value) + return dict((x, list(y)) for x, y in six.iteritems(kvp_map)) + + +def convert_none_to_empty_list(value): + return [] if value is None else value + + +def convert_none_to_empty_dict(value): + return {} if value is None else value + + +def convert_to_list(data): + if data is None: + return [] + elif hasattr(data, '__iter__') and not isinstance(data, six.string_types): + return list(data) + else: + return [data] diff --git a/neutron_lib/api/validators.py b/neutron_lib/api/validators.py new file mode 100644 index 000000000..0a36b7a77 --- /dev/null +++ b/neutron_lib/api/validators.py @@ -0,0 +1,536 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + +import functools +import netaddr +from oslo_log import log as logging +from oslo_utils import uuidutils +import six + +from neutron_lib._i18n import _ +from neutron_lib.api import converters +from neutron_lib import constants +from neutron_lib import exceptions as n_exc + +LOG = logging.getLogger(__name__) + +# Used by range check to indicate no limit for a bound. +UNLIMITED = None + +# Note: In order to ensure that the MAC address is unicast the first byte +# must be even. +MAC_PATTERN = "^%s[aceACE02468](:%s{2}){5}$" % (constants.HEX_ELEM, + constants.HEX_ELEM) + + +def _verify_dict_keys(expected_keys, target_dict, strict=True): + """Allows to verify keys in a dictionary. + + :param expected_keys: A list of keys expected to be present. + :param target_dict: The dictionary which should be verified. + :param strict: Specifies whether additional keys are allowed to be present. + :return: True, if keys in the dictionary correspond to the specification. + """ + if not isinstance(target_dict, dict): + msg = (_("Invalid input. '%(target_dict)s' must be a dictionary " + "with keys: %(expected_keys)s") % + {'target_dict': target_dict, 'expected_keys': expected_keys}) + LOG.debug(msg) + return msg + + expected_keys = set(expected_keys) + provided_keys = set(target_dict.keys()) + + predicate = expected_keys.__eq__ if strict else expected_keys.issubset + + if not predicate(provided_keys): + msg = (_("Validation of dictionary's keys failed. " + "Expected keys: %(expected_keys)s " + "Provided keys: %(provided_keys)s") % + {'expected_keys': expected_keys, + 'provided_keys': provided_keys}) + LOG.debug(msg) + return msg + + +def is_attr_set(attribute): + return not (attribute is None or + attribute is constants.ATTR_NOT_SPECIFIED) + + +def _validate_list_of_items(item_validator, data, *args, **kwargs): + if not isinstance(data, list): + msg = _("'%s' is not a list") % data + return msg + + if len(set(data)) != len(data): + msg = _("Duplicate items in the list: '%s'") % ', '.join(data) + return msg + + for item in data: + msg = item_validator(item, *args, **kwargs) + if msg: + return msg + + +def validate_values(data, valid_values=None): + if data not in valid_values: + msg = (_("'%(data)s' is not in %(valid_values)s") % + {'data': data, 'valid_values': valid_values}) + LOG.debug(msg) + return msg + + +def validate_not_empty_string_or_none(data, max_len=None): + if data is not None: + return validate_not_empty_string(data, max_len=max_len) + + +def validate_not_empty_string(data, max_len=None): + msg = validate_string(data, max_len=max_len) + if msg: + return msg + if not data.strip(): + msg = _("'%s' Blank strings are not permitted") % data + LOG.debug(msg) + return msg + + +def validate_string_or_none(data, max_len=None): + if data is not None: + return validate_string(data, max_len=max_len) + + +def validate_string(data, max_len=None): + if not isinstance(data, six.string_types): + msg = _("'%s' is not a valid string") % data + LOG.debug(msg) + return msg + + if max_len is not None and len(data) > max_len: + msg = (_("'%(data)s' exceeds maximum length of %(max_len)s") % + {'data': data, 'max_len': max_len}) + LOG.debug(msg) + return msg + + +validate_list_of_unique_strings = functools.partial(_validate_list_of_items, + validate_string) + + +def validate_boolean(data, valid_values=None): + try: + converters.convert_to_boolean(data) + except n_exc.InvalidInput: + msg = _("'%s' is not a valid boolean value") % data + LOG.debug(msg) + return msg + + +def validate_range(data, valid_values=None): + """Check that integer value is within a range provided. + + Test is inclusive. Allows either limit to be ignored, to allow + checking ranges where only the lower or upper limit matter. + It is expected that the limits provided are valid integers or + the value None. + """ + + min_value = valid_values[0] + max_value = valid_values[1] + try: + data = int(data) + except (ValueError, TypeError): + msg = _("'%s' is not an integer") % data + LOG.debug(msg) + return msg + if min_value is not UNLIMITED and data < min_value: + msg = _("'%(data)s' is too small - must be at least " + "'%(limit)d'") % {'data': data, 'limit': min_value} + LOG.debug(msg) + return msg + if max_value is not UNLIMITED and data > max_value: + msg = _("'%(data)s' is too large - must be no larger than " + "'%(limit)d'") % {'data': data, 'limit': max_value} + LOG.debug(msg) + return msg + + +def validate_no_whitespace(data): + """Validates that input has no whitespace.""" + if re.search(r'\s', data): + msg = _("'%s' contains whitespace") % data + LOG.debug(msg) + raise n_exc.InvalidInput(error_message=msg) + return data + + +def validate_mac_address(data, valid_values=None): + try: + valid_mac = netaddr.valid_mac(validate_no_whitespace(data)) + except Exception: + valid_mac = False + + if valid_mac: + valid_mac = (not netaddr.EUI(data) in + map(netaddr.EUI, constants.INVALID_MAC_ADDRESSES)) + # TODO(arosen): The code in this file should be refactored + # so it catches the correct exceptions. validate_no_whitespace + # raises AttributeError if data is None. + if not valid_mac: + msg = _("'%s' is not a valid MAC address") % data + LOG.debug(msg) + return msg + + +def validate_mac_address_or_none(data, valid_values=None): + if data is not None: + return validate_mac_address(data, valid_values) + + +def validate_ip_address(data, valid_values=None): + msg = None + try: + # netaddr.core.ZEROFILL is only applicable to IPv4. + # it will remove leading zeros from IPv4 address octets. + ip = netaddr.IPAddress(validate_no_whitespace(data), + flags=netaddr.core.ZEROFILL) + # The followings are quick checks for IPv6 (has ':') and + # IPv4. (has 3 periods like 'xx.xx.xx.xx') + # NOTE(yamamoto): netaddr uses libraries provided by the underlying + # platform to convert addresses. For example, inet_aton(3). + # Some platforms, including NetBSD and OS X, have inet_aton + # implementation which accepts more varying forms of addresses than + # we want to accept here. The following check is to reject such + # addresses. For Example: + # >>> netaddr.IPAddress('1' * 59) + # IPAddress('199.28.113.199') + # >>> netaddr.IPAddress(str(int('1' * 59) & 0xffffffff)) + # IPAddress('199.28.113.199') + # >>> + if ':' not in data and data.count('.') != 3: + msg = _("'%s' is not a valid IP address") % data + # A leading '0' in IPv4 address may be interpreted as an octal number, + # e.g. 011 octal is 9 decimal. Since there is no standard saying + # whether IP address with leading '0's should be interpreted as octal + # or decimal, hence we reject leading '0's to avoid ambiguity. + if ip.version == 4 and str(ip) != data: + msg = _("'%(data)s' is not an accepted IP address, " + "'%(ip)s' is recommended") % {"data": data, "ip": ip} + except Exception: + msg = _("'%s' is not a valid IP address") % data + if msg: + LOG.debug(msg) + return msg + + +def validate_ip_pools(data, valid_values=None): + """Validate that start and end IP addresses are present. + + In addition to this the IP addresses will also be validated + """ + if not isinstance(data, list): + msg = _("Invalid data format for IP pool: '%s'") % data + LOG.debug(msg) + return msg + + expected_keys = ['start', 'end'] + for ip_pool in data: + msg = _verify_dict_keys(expected_keys, ip_pool) + if msg: + return msg + for k in expected_keys: + msg = validate_ip_address(ip_pool[k]) + if msg: + return msg + + +def validate_fixed_ips(data, valid_values=None): + if not isinstance(data, list): + msg = _("Invalid data format for fixed IP: '%s'") % data + LOG.debug(msg) + return msg + + ips = [] + for fixed_ip in data: + if not isinstance(fixed_ip, dict): + msg = _("Invalid data format for fixed IP: '%s'") % fixed_ip + LOG.debug(msg) + return msg + if 'ip_address' in fixed_ip: + # Ensure that duplicate entries are not set - just checking IP + # suffices. Duplicate subnet_id's are legitimate. + fixed_ip_address = fixed_ip['ip_address'] + if fixed_ip_address in ips: + msg = _("Duplicate IP address '%s'") % fixed_ip_address + LOG.debug(msg) + else: + msg = validate_ip_address(fixed_ip_address) + if msg: + return msg + ips.append(fixed_ip_address) + if 'subnet_id' in fixed_ip: + msg = validate_uuid(fixed_ip['subnet_id']) + if msg: + return msg + + +def validate_nameservers(data, valid_values=None): + if not hasattr(data, '__iter__'): + msg = _("Invalid data format for nameserver: '%s'") % data + LOG.debug(msg) + return msg + + hosts = [] + for host in data: + # This must be an IP address only + msg = validate_ip_address(host) + if msg: + msg = _("'%(host)s' is not a valid nameserver. %(msg)s") % { + 'host': host, 'msg': msg} + LOG.debug(msg) + return msg + if host in hosts: + msg = _("Duplicate nameserver '%s'") % host + LOG.debug(msg) + return msg + hosts.append(host) + + +def validate_hostroutes(data, valid_values=None): + if not isinstance(data, list): + msg = _("Invalid data format for hostroute: '%s'") % data + LOG.debug(msg) + return msg + + expected_keys = ['destination', 'nexthop'] + hostroutes = [] + for hostroute in data: + msg = _verify_dict_keys(expected_keys, hostroute) + if msg: + return msg + msg = validate_subnet(hostroute['destination']) + if msg: + return msg + msg = validate_ip_address(hostroute['nexthop']) + if msg: + return msg + if hostroute in hostroutes: + msg = _("Duplicate hostroute '%s'") % hostroute + LOG.debug(msg) + return msg + hostroutes.append(hostroute) + + +def validate_ip_address_or_none(data, valid_values=None): + if data is not None: + return validate_ip_address(data, valid_values) + + +def validate_subnet(data, valid_values=None): + msg = None + try: + net = netaddr.IPNetwork(validate_no_whitespace(data)) + if '/' not in data or (net.version == 4 and str(net) != data): + msg = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": data, + "cidr": net.cidr} + else: + return + except Exception: + msg = _("'%s' is not a valid IP subnet") % data + if msg: + LOG.debug(msg) + return msg + + +def validate_subnet_or_none(data, valid_values=None): + if data is not None: + return validate_subnet(data, valid_values) + + +validate_subnet_list = functools.partial(_validate_list_of_items, + validate_subnet) + + +def validate_regex(data, valid_values=None): + try: + if re.match(valid_values, data): + return + except TypeError: + pass + + msg = _("'%s' is not a valid input") % data + LOG.debug(msg) + return msg + + +def validate_regex_or_none(data, valid_values=None): + if data is not None: + return validate_regex(data, valid_values) + + +def validate_subnetpool_id(data, valid_values=None): + if data != constants.IPV6_PD_POOL_ID: + return validate_uuid_or_none(data, valid_values) + + +def validate_subnetpool_id_or_none(data, valid_values=None): + if data is not None: + return validate_subnetpool_id(data, valid_values) + + +def validate_uuid(data, valid_values=None): + if not uuidutils.is_uuid_like(data): + msg = _("'%s' is not a valid UUID") % data + LOG.debug(msg) + return msg + + +def validate_uuid_or_none(data, valid_values=None): + if data is not None: + return validate_uuid(data) + + +validate_uuid_list = functools.partial(_validate_list_of_items, + validate_uuid) + + +def _validate_dict_item(key, key_validator, data): + # Find conversion function, if any, and apply it + conv_func = key_validator.get('convert_to') + if conv_func: + data[key] = conv_func(data.get(key)) + # Find validator function + # TODO(salv-orlando): Structure of dict attributes should be improved + # to avoid iterating over items + val_func = val_params = None + for (k, v) in six.iteritems(key_validator): + if k.startswith('type:'): + # ask forgiveness, not permission + try: + val_func = validators[k] + except KeyError: + msg = _("Validator '%s' does not exist.") % k + LOG.debug(msg) + return msg + val_params = v + break + # Process validation + if val_func: + return val_func(data.get(key), val_params) + + +def validate_dict(data, key_specs=None): + if not isinstance(data, dict): + msg = _("'%s' is not a dictionary") % data + LOG.debug(msg) + return msg + # Do not perform any further validation, if no constraints are supplied + if not key_specs: + return + + # Check whether all required keys are present + required_keys = [key for key, spec in six.iteritems(key_specs) + if spec.get('required')] + + if required_keys: + msg = _verify_dict_keys(required_keys, data, False) + if msg: + return msg + + # Perform validation and conversion of all values + # according to the specifications. + for key, key_validator in [(k, v) for k, v in six.iteritems(key_specs) + if k in data]: + msg = _validate_dict_item(key, key_validator, data) + if msg: + return msg + + +def validate_dict_or_none(data, key_specs=None): + if data is not None: + return validate_dict(data, key_specs) + + +def validate_dict_or_empty(data, key_specs=None): + if data != {}: + return validate_dict(data, key_specs) + + +def validate_dict_or_nodata(data, key_specs=None): + if data: + return validate_dict(data, key_specs) + + +def validate_non_negative(data, valid_values=None): + try: + data = int(data) + except (ValueError, TypeError): + msg = _("'%s' is not an integer") % data + LOG.debug(msg) + return msg + + if data < 0: + msg = _("'%s' should be non-negative") % data + LOG.debug(msg) + return msg + + +# Dictionary that maintains a list of validation functions +validators = {'type:dict': validate_dict, + 'type:dict_or_none': validate_dict_or_none, + 'type:dict_or_empty': validate_dict_or_empty, + 'type:dict_or_nodata': validate_dict_or_nodata, + 'type:fixed_ips': validate_fixed_ips, + 'type:hostroutes': validate_hostroutes, + 'type:ip_address': validate_ip_address, + 'type:ip_address_or_none': validate_ip_address_or_none, + 'type:ip_pools': validate_ip_pools, + 'type:mac_address': validate_mac_address, + 'type:mac_address_or_none': validate_mac_address_or_none, + 'type:nameservers': validate_nameservers, + 'type:non_negative': validate_non_negative, + 'type:range': validate_range, + 'type:regex': validate_regex, + 'type:regex_or_none': validate_regex_or_none, + 'type:string': validate_string, + 'type:string_or_none': validate_string_or_none, + 'type:not_empty_string': validate_not_empty_string, + 'type:not_empty_string_or_none': + validate_not_empty_string_or_none, + 'type:subnet': validate_subnet, + 'type:subnet_list': validate_subnet_list, + 'type:subnet_or_none': validate_subnet_or_none, + 'type:subnetpool_id': validate_subnetpool_id, + 'type:subnetpool_id_or_none': validate_subnetpool_id_or_none, + 'type:uuid': validate_uuid, + 'type:uuid_or_none': validate_uuid_or_none, + 'type:uuid_list': validate_uuid_list, + 'type:values': validate_values, + 'type:boolean': validate_boolean, + 'type:list_of_unique_strings': validate_list_of_unique_strings} + + +def add_validator(validation_type, validator): + """Dynamically add a validator. + + This can be used by clients to add their own, private validators, rather + than directly modifying the data structure. The clients can NOT modify + existing validators. + """ + key = 'type:' + validation_type + if key in validators: + raise KeyError("Validator type %s is already defined", validation_type) + validators[key] = validator diff --git a/neutron_lib/constants.py b/neutron_lib/constants.py index f0ac4b21a..c70901841 100644 --- a/neutron_lib/constants.py +++ b/neutron_lib/constants.py @@ -129,8 +129,22 @@ ICMPV6_ALLOWED_TYPES = [130, 131, 132, 135, 136] ICMPV6_TYPE_RA = 134 ICMPV6_TYPE_NA = 136 +# Human-readable ID to which the subnetpool ID should be set to +# indicate that IPv6 Prefix Delegation is enabled for a given subnetpool +IPV6_PD_POOL_ID = 'prefix_delegation' + # Device names start with "tap" TAP_DEVICE_PREFIX = 'tap' # Time format ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' + +############################# +# Attribute related constants +############################# +ATTR_NOT_SPECIFIED = object() + +HEX_ELEM = '[0-9A-Fa-f]' +UUID_PATTERN = '-'.join([HEX_ELEM + '{8}', HEX_ELEM + '{4}', + HEX_ELEM + '{4}', HEX_ELEM + '{4}', + HEX_ELEM + '{12}']) diff --git a/neutron_lib/tests/tools.py b/neutron_lib/tests/tools.py new file mode 100644 index 000000000..24cde5cf0 --- /dev/null +++ b/neutron_lib/tests/tools.py @@ -0,0 +1,38 @@ +# Copyright (c) 2013 NEC Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Note: _safe_sort_key came from neutron/common/utils.py. For neutron-lib +# it is only used for testing, so is placed here. +import collections + + +def _safe_sort_key(value): + """Return value hash or build one for dictionaries.""" + if isinstance(value, collections.Mapping): + return sorted(value.items()) + return value + + +class UnorderedList(list): + """A list that is equals to any permutation of itself.""" + + def __eq__(self, other): + if not isinstance(other, list): + return False + return (sorted(self, key=_safe_sort_key) == + sorted(other, key=_safe_sort_key)) + + def __neq__(self, other): + return not self == other diff --git a/neutron_lib/tests/unit/api/__init__.py b/neutron_lib/tests/unit/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron_lib/tests/unit/api/test_conversions.py b/neutron_lib/tests/unit/api/test_conversions.py new file mode 100644 index 000000000..f3a30f20a --- /dev/null +++ b/neutron_lib/tests/unit/api/test_conversions.py @@ -0,0 +1,169 @@ +# Copyright 2012 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from neutron_lib.api import converters +from neutron_lib import exceptions as n_exc +from neutron_lib.tests import base +from neutron_lib.tests import tools + + +class TestConvertToBoolean(base.TestCase): + + def test_convert_to_boolean_bool(self): + self.assertIs(converters.convert_to_boolean(True), True) + self.assertIs(converters.convert_to_boolean(False), False) + + def test_convert_to_boolean_int(self): + self.assertIs(converters.convert_to_boolean(0), False) + self.assertIs(converters.convert_to_boolean(1), True) + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_boolean, + 7) + + def test_convert_to_boolean_str(self): + self.assertIs(converters.convert_to_boolean('True'), True) + self.assertIs(converters.convert_to_boolean('true'), True) + self.assertIs(converters.convert_to_boolean('False'), False) + self.assertIs(converters.convert_to_boolean('false'), False) + self.assertIs(converters.convert_to_boolean('0'), False) + self.assertIs(converters.convert_to_boolean('1'), True) + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_boolean, + '7') + + def test_convert_to_boolean_if_not_none(self): + self.assertIsNone(converters.convert_to_boolean_if_not_none(None)) + self.assertIs(converters.convert_to_boolean_if_not_none(1), True) + + +class TestConvertToInt(base.TestCase): + + def test_convert_to_int_int(self): + self.assertEqual(-1, converters.convert_to_int(-1)) + self.assertEqual(0, converters.convert_to_int(0)) + self.assertEqual(1, converters.convert_to_int(1)) + + def test_convert_to_int_if_not_none(self): + self.assertEqual(-1, converters.convert_to_int_if_not_none(-1)) + self.assertEqual(0, converters.convert_to_int_if_not_none(0)) + self.assertEqual(1, converters.convert_to_int_if_not_none(1)) + self.assertIsNone(converters.convert_to_int_if_not_none(None)) + + def test_convert_to_int_str(self): + self.assertEqual(4, converters.convert_to_int('4')) + self.assertEqual(6, converters.convert_to_int('6')) + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_int, + 'garbage') + + def test_convert_to_int_none(self): + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_int, + None) + + def test_convert_none_to_empty_list_none(self): + self.assertEqual([], converters.convert_none_to_empty_list(None)) + + def test_convert_none_to_empty_dict(self): + self.assertEqual({}, converters.convert_none_to_empty_dict(None)) + + def test_convert_none_to_empty_list_value(self): + values = ['1', 3, [], [1], {}, {'a': 3}] + for value in values: + self.assertEqual( + value, converters.convert_none_to_empty_list(value)) + + +class TestConvertToFloat(base.TestCase): + # NOTE: the routine being tested here is a plugin-specific extension + # module. As the plugin split proceed towards its second phase this + # test should either be remove, or the validation routine moved into + # neutron.api.v2.attributes + + def test_convert_to_float_positve_value(self): + self.assertEqual( + 1.111, converters.convert_to_positive_float_or_none(1.111)) + self.assertEqual(1, converters.convert_to_positive_float_or_none(1)) + self.assertEqual(0, converters.convert_to_positive_float_or_none(0)) + + def test_convert_to_float_negative_value(self): + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_positive_float_or_none, + -1.11) + + def test_convert_to_float_string(self): + self.assertEqual(4, converters.convert_to_positive_float_or_none('4')) + self.assertEqual( + 4.44, converters.convert_to_positive_float_or_none('4.44')) + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_positive_float_or_none, + 'garbage') + + def test_convert_to_float_none_value(self): + self.assertIsNone(converters.convert_to_positive_float_or_none(None)) + + +class TestConvertKvp(base.TestCase): + + def test_convert_kvp_list_to_dict_succeeds_for_missing_values(self): + result = converters.convert_kvp_list_to_dict(['True']) + self.assertEqual({}, result) + + def test_convert_kvp_list_to_dict_succeeds_for_multiple_values(self): + result = converters.convert_kvp_list_to_dict( + ['a=b', 'a=c', 'a=c', 'b=a']) + expected = {'a': tools.UnorderedList(['c', 'b']), 'b': ['a']} + self.assertEqual(expected, result) + + def test_convert_kvp_list_to_dict_succeeds_for_values(self): + result = converters.convert_kvp_list_to_dict(['a=b', 'c=d']) + self.assertEqual({'a': ['b'], 'c': ['d']}, result) + + def test_convert_kvp_str_to_list_fails_for_missing_key(self): + with testtools.ExpectedException(n_exc.InvalidInput): + converters.convert_kvp_str_to_list('=a') + + def test_convert_kvp_str_to_list_fails_for_missing_equals(self): + with testtools.ExpectedException(n_exc.InvalidInput): + converters.convert_kvp_str_to_list('a') + + def test_convert_kvp_str_to_list_succeeds_for_one_equals(self): + result = converters.convert_kvp_str_to_list('a=') + self.assertEqual(['a', ''], result) + + def test_convert_kvp_str_to_list_succeeds_for_two_equals(self): + result = converters.convert_kvp_str_to_list('a=a=a') + self.assertEqual(['a', 'a=a'], result) + + +class TestConvertToList(base.TestCase): + + def test_convert_to_empty_list(self): + for item in (None, [], (), {}): + self.assertEqual([], converters.convert_to_list(item)) + + def test_convert_to_list_string(self): + for item in ('', 'foo'): + self.assertEqual([item], converters.convert_to_list(item)) + + def test_convert_to_list_iterable(self): + for item in ([None], [1, 2, 3], (1, 2, 3), set([1, 2, 3]), ['foo']): + self.assertEqual(list(item), converters.convert_to_list(item)) + + def test_convert_to_list_non_iterable(self): + for item in (True, False, 1, 1.2, object()): + self.assertEqual([item], converters.convert_to_list(item)) diff --git a/neutron_lib/tests/unit/api/test_validators.py b/neutron_lib/tests/unit/api/test_validators.py new file mode 100644 index 000000000..7dad65661 --- /dev/null +++ b/neutron_lib/tests/unit/api/test_validators.py @@ -0,0 +1,840 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import string + +import mock +import netaddr + +from neutron_lib._i18n import _ +from neutron_lib.api import converters +from neutron_lib.api import validators +from neutron_lib import constants +from neutron_lib import exceptions as n_exc +from neutron_lib.tests import base + + +def dummy_validator(data, valid_values=None): + pass + + +class TestAttributeValidation(base.TestCase): + + def _construct_dict_and_constraints(self): + """Constructs a test dictionary and a definition of constraints. + + :return: A (dictionary, constraint) tuple + """ + + constraints = {'key1': {'type:values': ['val1', 'val2'], + 'required': True}, + 'key2': {'type:string': None, + 'required': False}, + 'key3': {'type:dict': {'k4': {'type:string': None, + 'required': True}}, + 'required': True}} + + dictionary = {'key1': 'val1', + 'key2': 'a string value', + 'key3': {'k4': 'a string value'}} + + return dictionary, constraints + + def test_adding_validator(self): + validators.add_validator('new_type', dummy_validator) + self.assertIn('type:new_type', validators.validators) + self.assertEqual(dummy_validator, + validators.validators['type:new_type']) + + def test_fail_adding_duplicate_validator(self): + self.assertRaises(KeyError, + validators.add_validator, + 'dict', dummy_validator) + + def test_is_attr_set(self): + data = constants.ATTR_NOT_SPECIFIED + self.assertIs(validators.is_attr_set(data), False) + + data = None + self.assertIs(validators.is_attr_set(data), False) + + data = "I'm set" + self.assertIs(validators.is_attr_set(data), True) + + def test_validate_values(self): + msg = validators.validate_values(4, [4, 6]) + self.assertIsNone(msg) + + msg = validators.validate_values(4, (4, 6)) + self.assertIsNone(msg) + + msg = validators.validate_values(7, [4, 6]) + self.assertEqual("'7' is not in [4, 6]", msg) + + msg = validators.validate_values(7, (4, 6)) + self.assertEqual("'7' is not in (4, 6)", msg) + + def test_validate_not_empty_string(self): + msg = validators.validate_not_empty_string(' ', None) + self.assertEqual(u"' ' Blank strings are not permitted", msg) + msg = validators.validate_not_empty_string(123, None) + self.assertEqual(u"'123' is not a valid string", msg) + + def test_validate_not_empty_string_or_none(self): + msg = validators.validate_not_empty_string_or_none(' ', None) + self.assertEqual(u"' ' Blank strings are not permitted", msg) + + msg = validators.validate_not_empty_string_or_none(None, None) + self.assertIsNone(msg) + + def test_validate_string_or_none(self): + msg = validators.validate_string_or_none('test', None) + self.assertIsNone(msg) + + msg = validators.validate_string_or_none(None, None) + self.assertIsNone(msg) + + def test_validate_string(self): + msg = validators.validate_string(None, None) + self.assertEqual("'None' is not a valid string", msg) + + # 0 == len(data) == max_len + msg = validators.validate_string("", 0) + self.assertIsNone(msg) + + # 0 == len(data) < max_len + msg = validators.validate_string("", 9) + self.assertIsNone(msg) + + # 0 < len(data) < max_len + msg = validators.validate_string("123456789", 10) + self.assertIsNone(msg) + + # 0 < len(data) == max_len + msg = validators.validate_string("123456789", 9) + self.assertIsNone(msg) + + # 0 < max_len < len(data) + msg = validators.validate_string("1234567890", 9) + self.assertEqual("'1234567890' exceeds maximum length of 9", msg) + + msg = validators.validate_string("123456789", None) + self.assertIsNone(msg) + + def test_validate_list_of_unique_strings(self): + data = "TEST" + msg = validators.validate_list_of_unique_strings(data, None) + self.assertEqual("'TEST' is not a list", msg) + + data = ["TEST01", "TEST02", "TEST01"] + msg = validators.validate_list_of_unique_strings(data, None) + self.assertEqual( + "Duplicate items in the list: 'TEST01, TEST02, TEST01'", msg) + + data = ["12345678", "123456789"] + msg = validators.validate_list_of_unique_strings(data, 8) + self.assertEqual("'123456789' exceeds maximum length of 8", msg) + + data = ["TEST01", "TEST02", "TEST03"] + msg = validators.validate_list_of_unique_strings(data, None) + self.assertIsNone(msg) + + def test_validate_boolean(self): + msg = validators.validate_boolean(True) + self.assertIsNone(msg) + msg = validators.validate_boolean(0) + self.assertIsNone(msg) + msg = validators.validate_boolean("false") + self.assertIsNone(msg) + msg = validators.validate_boolean("fasle") + self.assertEqual("'fasle' is not a valid boolean value", msg) + + def test_validate_no_whitespace(self): + data = 'no_white_space' + result = validators.validate_no_whitespace(data) + self.assertEqual(data, result) + + self.assertRaises(n_exc.InvalidInput, + validators.validate_no_whitespace, + 'i have whitespace') + + self.assertRaises(n_exc.InvalidInput, + validators.validate_no_whitespace, + 'i\thave\twhitespace') + + for ws in string.whitespace: + self.assertRaises(n_exc.InvalidInput, + validators.validate_no_whitespace, + '%swhitespace-at-head' % ws) + self.assertRaises(n_exc.InvalidInput, + validators.validate_no_whitespace, + 'whitespace-at-tail%s' % ws) + + def test_validate_range(self): + msg = validators.validate_range(1, [1, 9]) + self.assertIsNone(msg) + + msg = validators.validate_range(5, [1, 9]) + self.assertIsNone(msg) + + msg = validators.validate_range(9, [1, 9]) + self.assertIsNone(msg) + + msg = validators.validate_range(1, (1, 9)) + self.assertIsNone(msg) + + msg = validators.validate_range(5, (1, 9)) + self.assertIsNone(msg) + + msg = validators.validate_range(9, (1, 9)) + self.assertIsNone(msg) + + msg = validators.validate_range(0, [1, 9]) + self.assertEqual("'0' is too small - must be at least '1'", msg) + + msg = validators.validate_range(10, (1, 9)) + self.assertEqual("'10' is too large - must be no larger than '9'", msg) + + msg = validators.validate_range("bogus", (1, 9)) + self.assertEqual("'bogus' is not an integer", msg) + + msg = validators.validate_range(10, (validators.UNLIMITED, + validators.UNLIMITED)) + self.assertIsNone(msg) + + msg = validators.validate_range(10, (1, validators.UNLIMITED)) + self.assertIsNone(msg) + + msg = validators.validate_range(1, (validators.UNLIMITED, 9)) + self.assertIsNone(msg) + + msg = validators.validate_range(-1, (0, validators.UNLIMITED)) + self.assertEqual("'-1' is too small - must be at least '0'", msg) + + msg = validators.validate_range(10, (validators.UNLIMITED, 9)) + self.assertEqual("'10' is too large - must be no larger than '9'", msg) + + def _test_validate_mac_address(self, validator, allow_none=False): + mac_addr = "ff:16:3e:4f:00:00" + msg = validator(mac_addr) + self.assertIsNone(msg) + + mac_addr = "ffa:16:3e:4f:00:00" + msg = validator(mac_addr) + err_msg = "'%s' is not a valid MAC address" + self.assertEqual(err_msg % mac_addr, msg) + + for invalid_mac_addr in constants.INVALID_MAC_ADDRESSES: + msg = validator(invalid_mac_addr) + self.assertEqual(err_msg % invalid_mac_addr, msg) + + mac_addr = "123" + msg = validator(mac_addr) + self.assertEqual(err_msg % mac_addr, msg) + + mac_addr = None + msg = validator(mac_addr) + if allow_none: + self.assertIsNone(msg) + else: + self.assertEqual(err_msg % mac_addr, msg) + + mac_addr = "ff:16:3e:4f:00:00\r" + msg = validator(mac_addr) + self.assertEqual(err_msg % mac_addr, msg) + + def test_validate_mac_address(self): + self._test_validate_mac_address(validators.validate_mac_address) + + def test_validate_mac_address_or_none(self): + self._test_validate_mac_address( + validators.validate_mac_address_or_none, allow_none=True) + + def test_validate_ip_address(self): + ip_addr = '1.1.1.1' + msg = validators.validate_ip_address(ip_addr) + self.assertIsNone(msg) + + ip_addr = '1111.1.1.1' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + # Depending on platform to run UTs, this case might or might not be + # an equivalent to test_validate_ip_address_bsd. + ip_addr = '1' * 59 + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + ip_addr = '1.1.1.1 has whitespace' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + ip_addr = '111.1.1.1\twhitespace' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + ip_addr = '111.1.1.1\nwhitespace' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + for ws in string.whitespace: + ip_addr = '%s111.1.1.1' % ws + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + for ws in string.whitespace: + ip_addr = '111.1.1.1%s' % ws + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + def test_validate_ip_address_with_leading_zero(self): + ip_addr = '1.1.1.01' + expected_msg = ("'%(data)s' is not an accepted IP address, " + "'%(ip)s' is recommended") + msg = validators.validate_ip_address(ip_addr) + self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.1'}, + msg) + + ip_addr = '1.1.1.011' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.11'}, + msg) + + ip_addr = '1.1.1.09' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.9'}, + msg) + + ip_addr = "fe80:0:0:0:0:0:0:0001" + msg = validators.validate_ip_address(ip_addr) + self.assertIsNone(msg) + + def test_validate_ip_address_bsd(self): + # NOTE(yamamoto): On NetBSD and OS X, netaddr.IPAddress() accepts + # '1' * 59 as a valid address. The behaviour is inherited from + # libc behaviour there. This test ensures that our validator reject + # such addresses on such platforms by mocking netaddr to emulate + # the behaviour. + ip_addr = '1' * 59 + with mock.patch('netaddr.IPAddress') as ip_address_cls: + msg = validators.validate_ip_address(ip_addr) + ip_address_cls.assert_called_once_with(ip_addr, + flags=netaddr.core.ZEROFILL) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + def test_validate_ip_pools(self): + pools = [[{'end': '10.0.0.254'}], + [{'start': '10.0.0.254'}], + [{'start': '1000.0.0.254', + 'end': '1.1.1.1'}], + [{'start': '10.0.0.2', 'end': '10.0.0.254', + 'forza': 'juve'}], + [{'start': '10.0.0.2', 'end': '10.0.0.254'}, + {'end': '10.0.0.254'}], + [None], + None] + for pool in pools: + msg = validators.validate_ip_pools(pool) + self.assertIsNotNone(msg) + + pools = [[{'end': '10.0.0.254', 'start': '10.0.0.2'}, + {'start': '11.0.0.2', 'end': '11.1.1.1'}], + [{'start': '11.0.0.2', 'end': '11.0.0.100'}]] + for pool in pools: + msg = validators.validate_ip_pools(pool) + self.assertIsNone(msg) + + invalid_ip = '10.0.0.2\r' + pools = [[{'end': '10.0.0.254', 'start': invalid_ip}]] + for pool in pools: + msg = validators.validate_ip_pools(pool) + self.assertEqual("'%s' is not a valid IP address" % invalid_ip, + msg) + + def test_validate_fixed_ips(self): + fixed_ips = [ + {'data': [{'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1111.1.1.1'}], + 'error_msg': "'1111.1.1.1' is not a valid IP address"}, + {'data': [{'subnet_id': 'invalid', + 'ip_address': '1.1.1.1'}], + 'error_msg': "'invalid' is not a valid UUID"}, + {'data': None, + 'error_msg': "Invalid data format for fixed IP: 'None'"}, + {'data': "1.1.1.1", + 'error_msg': "Invalid data format for fixed IP: '1.1.1.1'"}, + {'data': ['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1'], + 'error_msg': "Invalid data format for fixed IP: " + "'00000000-ffff-ffff-ffff-000000000000'"}, + {'data': [['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1']], + 'error_msg': "Invalid data format for fixed IP: " + "'['00000000-ffff-ffff-ffff-000000000000', " + "'1.1.1.1']'"}, + {'data': [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}, + {'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}], + 'error_msg': "Duplicate IP address '1.1.1.1'"}] + for fixed in fixed_ips: + msg = validators.validate_fixed_ips(fixed['data']) + self.assertEqual(fixed['error_msg'], msg) + + fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}], + [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}, + {'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.2'}]] + for fixed in fixed_ips: + msg = validators.validate_fixed_ips(fixed) + self.assertIsNone(msg) + + def test_validate_nameservers(self): + ns_pools = [['1.1.1.2', '1.1.1.2'], + ['www.hostname.com', 'www.hostname.com'], + ['1000.0.0.1'], + ['www.hostname.com'], + ['www.great.marathons.to.travel'], + ['valid'], + ['77.hostname.com'], + ['1' * 59], + ['www.internal.hostname.com'], + None] + + for ns in ns_pools: + msg = validators.validate_nameservers(ns, None) + self.assertIsNotNone(msg) + + ns_pools = [['100.0.0.2'], + ['1.1.1.1', '1.1.1.2']] + + for ns in ns_pools: + msg = validators.validate_nameservers(ns, None) + self.assertIsNone(msg) + + def test_validate_hostroutes(self): + hostroute_pools = [[{'destination': '100.0.0.0/24'}], + [{'nexthop': '10.0.2.20'}], + [{'nexthop': '10.0.2.20', + 'forza': 'juve', + 'destination': '100.0.0.0/8'}], + [{'nexthop': '1110.0.2.20', + 'destination': '100.0.0.0/8'}], + [{'nexthop': '10.0.2.20', + 'destination': '100.0.0.0'}], + [{'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}, + {'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}], + [None], + None] + for host_routes in hostroute_pools: + msg = validators.validate_hostroutes(host_routes, None) + self.assertIsNotNone(msg) + + hostroute_pools = [[{'destination': '100.0.0.0/24', + 'nexthop': '10.0.2.20'}], + [{'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}, + {'nexthop': '10.0.2.20', + 'destination': '101.0.0.0/8'}]] + for host_routes in hostroute_pools: + msg = validators.validate_hostroutes(host_routes, None) + self.assertIsNone(msg) + + def test_validate_ip_address_or_none(self): + ip_addr = None + msg = validators.validate_ip_address_or_none(ip_addr) + self.assertIsNone(msg) + + ip_addr = '1.1.1.1' + msg = validators.validate_ip_address_or_none(ip_addr) + self.assertIsNone(msg) + + ip_addr = '1111.1.1.1' + msg = validators.validate_ip_address_or_none(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + def test_uuid_pattern(self): + data = 'garbage' + msg = validators.validate_regex(data, constants.UUID_PATTERN) + self.assertIsNotNone(msg) + + data = '00000000-ffff-ffff-ffff-000000000000' + msg = validators.validate_regex(data, constants.UUID_PATTERN) + self.assertIsNone(msg) + + def test_mac_pattern(self): + # Valid - 3 octets + base_mac = "fa:16:3e:00:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNone(msg) + + # Valid - 4 octets + base_mac = "fa:16:3e:4f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNone(msg) + + # Invalid - not unicast + base_mac = "01:16:3e:4f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "a:16:3e:4f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "ffa:16:3e:4f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "01163e4f0000" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "01-16-3e-4f-00-00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "00:16:3:f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "12:3:4:5:67:89ab" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + def _test_validate_subnet(self, validator, allow_none=False): + # Valid - IPv4 + cidr = "10.0.2.0/24" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - IPv6 without final octets + cidr = "fe80::/24" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - IPv6 with final octets + cidr = "fe80::/24" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - uncompressed ipv6 address + cidr = "fe80:0:0:0:0:0:0:0/128" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - ipv6 address with multiple consecutive zero + cidr = "2001:0db8:0:0:1::1/128" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - ipv6 address with multiple consecutive zero + cidr = "2001:0db8::1:0:0:1/128" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - ipv6 address with multiple consecutive zero + cidr = "2001::0:1:0:0:1100/120" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Invalid - abbreviated ipv4 address + cidr = "10/24" + msg = validator(cidr, None) + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "10.0.0.0/24"} + self.assertEqual(error, msg) + + # Invalid - IPv4 missing mask + cidr = "10.0.2.0" + msg = validator(cidr, None) + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "10.0.2.0/32"} + self.assertEqual(error, msg) + + # Valid - IPv4 with non-zero masked bits is ok + for i in range(1, 255): + cidr = "192.168.1.%s/24" % i + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Invalid - IPv6 without final octets, missing mask + cidr = "fe80::" + msg = validator(cidr, None) + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "fe80::/128"} + self.assertEqual(error, msg) + + # Invalid - IPv6 with final octets, missing mask + cidr = "fe80::0" + msg = validator(cidr, None) + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "fe80::/128"} + self.assertEqual(error, msg) + + # Invalid - Address format error + cidr = 'invalid' + msg = validator(cidr, None) + error = "'%s' is not a valid IP subnet" % cidr + self.assertEqual(error, msg) + + cidr = None + msg = validator(cidr, None) + if allow_none: + self.assertIsNone(msg) + else: + error = "'%s' is not a valid IP subnet" % cidr + self.assertEqual(error, msg) + + # Invalid - IPv4 with trailing CR + cidr = "10.0.2.0/24\r" + msg = validator(cidr, None) + error = "'%s' is not a valid IP subnet" % cidr + self.assertEqual(error, msg) + + def test_validate_subnet(self): + self._test_validate_subnet(validators.validate_subnet) + + def test_validate_subnet_or_none(self): + self._test_validate_subnet(validators.validate_subnet_or_none, + allow_none=True) + + def test_validate_subnet_list(self): + msg = validators.validate_subnet_list('abc') + self.assertEqual(u"'abc' is not a list", msg) + msg = validators.validate_subnet_list(['10.1.0.0/24', + '10.2.0.0/24', + '10.1.0.0/24']) + self.assertEqual(u"Duplicate items in the list: '10.1.0.0/24, " + u"10.2.0.0/24, 10.1.0.0/24'", msg) + msg = validators.validate_subnet_list(['10.1.0.0/24', '10.2.0.0']) + self.assertEqual(u"'10.2.0.0' isn't a recognized IP subnet cidr, " + u"'10.2.0.0/32' is recommended", msg) + + def _test_validate_regex(self, validator, allow_none=False): + pattern = '[hc]at' + + data = None + msg = validator(data, pattern) + if allow_none: + self.assertIsNone(msg) + else: + self.assertEqual("'None' is not a valid input", msg) + + data = 'bat' + msg = validator(data, pattern) + self.assertEqual("'%s' is not a valid input" % data, msg) + + data = 'hat' + msg = validator(data, pattern) + self.assertIsNone(msg) + + data = 'cat' + msg = validator(data, pattern) + self.assertIsNone(msg) + + def test_validate_regex(self): + self._test_validate_regex(validators.validate_regex) + + def test_validate_regex_or_none(self): + self._test_validate_regex(validators.validate_regex_or_none, + allow_none=True) + + def test_validate_subnetpool_id(self): + msg = validators.validate_subnetpool_id(constants.IPV6_PD_POOL_ID) + self.assertIsNone(msg) + + msg = validators.validate_subnetpool_id( + '00000000-ffff-ffff-ffff-000000000000') + self.assertIsNone(msg) + + def test_validate_subnetpool_id_or_none(self): + msg = validators.validate_subnetpool_id_or_none(None) + self.assertIsNone(msg) + + msg = validators.validate_subnetpool_id_or_none( + '00000000-ffff-ffff-ffff-000000000000') + self.assertIsNone(msg) + + def test_validate_uuid(self): + invalid_uuids = [None, + 123, + '123', + 't5069610-744b-42a7-8bd8-ceac1a229cd4', + 'e5069610-744bb-42a7-8bd8-ceac1a229cd4'] + for uuid in invalid_uuids: + msg = validators.validate_uuid(uuid) + error = "'%s' is not a valid UUID" % uuid + self.assertEqual(error, msg) + + msg = validators.validate_uuid('00000000-ffff-ffff-ffff-000000000000') + self.assertIsNone(msg) + + def test__validate_list_of_items(self): + # check not a list + items = [None, + 123, + 'e5069610-744b-42a7-8bd8-ceac1a229cd4', + '12345678123456781234567812345678', + {'uuid': 'e5069610-744b-42a7-8bd8-ceac1a229cd4'}] + for item in items: + msg = validators._validate_list_of_items(mock.Mock(), item) + error = "'%s' is not a list" % item + self.assertEqual(error, msg) + + # check duplicate items in a list + duplicate_items = ['e5069610-744b-42a7-8bd8-ceac1a229cd4', + 'f3eeab00-8367-4524-b662-55e64d4cacb5', + 'e5069610-744b-42a7-8bd8-ceac1a229cd4'] + msg = validators._validate_list_of_items(mock.Mock(), duplicate_items) + error = ("Duplicate items in the list: " + "'%s'" % ', '.join(duplicate_items)) + self.assertEqual(error, msg) + + # check valid lists + valid_lists = [[], + [1, 2, 3], + ['a', 'b', 'c']] + for list_obj in valid_lists: + msg = validators._validate_list_of_items( + mock.Mock(return_value=None), list_obj) + self.assertIsNone(msg) + + def test_validate_dict_type(self): + for value in (None, True, '1', []): + self.assertEqual("'%s' is not a dictionary" % value, + validators.validate_dict(value)) + + def test_validate_dict_without_constraints(self): + msg = validators.validate_dict({}) + self.assertIsNone(msg) + + # Validate a dictionary without constraints. + msg = validators.validate_dict({'key': 'value'}) + self.assertIsNone(msg) + + def test_validate_a_valid_dict_with_constraints(self): + dictionary, constraints = self._construct_dict_and_constraints() + + msg = validators.validate_dict(dictionary, constraints) + self.assertIsNone(msg, 'Validation of a valid dictionary failed.') + + def test_validate_dict_with_invalid_validator(self): + dictionary, constraints = self._construct_dict_and_constraints() + + constraints['key1'] = {'type:unsupported': None, 'required': True} + msg = validators.validate_dict(dictionary, constraints) + self.assertEqual("Validator 'type:unsupported' does not exist.", msg) + + def test_validate_dict_not_required_keys(self): + dictionary, constraints = self._construct_dict_and_constraints() + + del dictionary['key2'] + msg = validators.validate_dict(dictionary, constraints) + self.assertIsNone(msg, 'Field that was not required by the specs was' + 'required by the validator.') + + def test_validate_dict_required_keys(self): + dictionary, constraints = self._construct_dict_and_constraints() + + del dictionary['key1'] + msg = validators.validate_dict(dictionary, constraints) + self.assertIn('Expected keys:', msg) + + def test_validate_dict_wrong_values(self): + dictionary, constraints = self._construct_dict_and_constraints() + + dictionary['key1'] = 'UNSUPPORTED' + msg = validators.validate_dict(dictionary, constraints) + self.assertIsNotNone(msg) + + def test_validate_dict_convert_boolean(self): + dictionary, constraints = self._construct_dict_and_constraints() + + constraints['key_bool'] = { + 'type:boolean': None, + 'required': False, + 'convert_to': converters.convert_to_boolean} + dictionary['key_bool'] = 'true' + msg = validators.validate_dict(dictionary, constraints) + self.assertIsNone(msg) + # Explicitly comparing with literal 'True' as assertTrue + # succeeds also for 'true' + self.assertIs(True, dictionary['key_bool']) + + def test_subdictionary(self): + dictionary, constraints = self._construct_dict_and_constraints() + + del dictionary['key3']['k4'] + dictionary['key3']['k5'] = 'a string value' + msg = validators.validate_dict(dictionary, constraints) + self.assertIn('Expected keys:', msg) + + def test_validate_dict_or_none(self): + dictionary, constraints = self._construct_dict_and_constraints() + + # Check whether None is a valid value. + msg = validators.validate_dict_or_none(None, constraints) + self.assertIsNone(msg, 'Validation of a None dictionary failed.') + + # Check validation of a regular dictionary. + msg = validators.validate_dict_or_none(dictionary, constraints) + self.assertIsNone(msg, 'Validation of a valid dictionary failed.') + + def test_validate_dict_or_empty(self): + dictionary, constraints = self._construct_dict_and_constraints() + + # Check whether an empty dictionary is valid. + msg = validators.validate_dict_or_empty({}, constraints) + self.assertIsNone(msg, 'Validation of a None dictionary failed.') + + # Check validation of a regular dictionary. + msg = validators.validate_dict_or_empty(dictionary, constraints) + self.assertIsNone(msg, 'Validation of a valid dictionary failed.') + + def test_validate_dict_or_nodata(self): + dictionary, constraints = self._construct_dict_and_constraints() + + # Check whether no data is a valid value. + msg = validators.validate_dict_or_nodata(None, constraints) + self.assertIsNone(msg, 'Validation of None for no-data failed.') + msg = validators.validate_dict_or_nodata({}, constraints) + self.assertIsNone(msg, 'Validation of empty dict for no-data failed.') + + # Check validation of a regular dictionary. + msg = validators.validate_dict_or_nodata(dictionary, constraints) + self.assertIsNone(msg, 'Validation of a valid dictionary failed.') + + def test_validate_non_negative(self): + msg = validators.validate_non_negative('abc') + self.assertEqual("'abc' is not an integer", msg) + + for value in (-1, '-2'): + self.assertEqual("'%s' should be non-negative" % value, + validators.validate_non_negative(value)) + + for value in (0, 1, '2', True, False): + msg = validators.validate_non_negative(value) + self.assertIsNone(msg)