diff --git a/doc/source/devref/api_converters.rst b/doc/source/devref/api_converters.rst new file mode 100644 index 0000000..a44cf5a --- /dev/null +++ b/doc/source/devref/api_converters.rst @@ -0,0 +1,86 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + + Convention for heading levels in Neutron devref: + ======= Heading 0 (reserved for the title in a document) + ------- Heading 1 + ~~~~~~~ Heading 2 + +++++++ Heading 3 + ''''''' Heading 4 + (Avoid deeper levels because they do not render well.) + + +API Converters +============== + +Defintions for REST API attributes, can include conversion methods +to help normalize user input or transform the input into a form that +can be used. + + +Defining A Converter Method +--------------------------- + +By convention, the name should start with ``convert_to_``, and will +take a single argument for the data to be converted. The method +should return the converted data (which, if the input is None, +and no conversion is performed, the implicit None returned by the +method may be used). If the conversion is impossible, an +InvalidInput exception should be raised, indicating what is wrong. +For example, here is one that converts a variety of user inputs +to a boolean value. +:: + + def convert_to_boolean(data): + if isinstance(data, six.string_types): + val = data.lower() + if val == "true" or val == "1": + return True + if val == "false" or val == "0": + return False + elif isinstance(data, bool): + return data + elif isinstance(data, int): + if data == 0: + return False + elif data == 1: + return True + msg = _("'%s' cannot be converted to boolean") % data + raise n_exc.InvalidInput(error_message=msg) + + +Using Validators +---------------- + +In client code, the conversion can be used in a REST API +definition, by specifying the name of the method as a value for +the 'convert_to' key on an attribute. For example: + +:: + + 'admin_state_up': {'allow_post': True, 'allow_put': True, + 'default': True, + 'convert_to': conversions.convert_to_boolean, + 'is_visible': True}, + +Here, the admin_state_up is a boolean, so the converter is used to +take user's (string) input and transform it to a boolean. + + +Test The Validator +------------------ + +Do the right thing, and make sure you've created a unit test for any +converter that you add to verify that it works as expected. + diff --git a/doc/source/devref/api_validators.rst b/doc/source/devref/api_validators.rst new file mode 100644 index 0000000..cf87d2f --- /dev/null +++ b/doc/source/devref/api_validators.rst @@ -0,0 +1,102 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + + Convention for heading levels in Neutron devref: + ======= Heading 0 (reserved for the title in a document) + ------- Heading 1 + ~~~~~~~ Heading 2 + +++++++ Heading 3 + ''''''' Heading 4 + (Avoid deeper levels because they do not render well.) + + +API Validators +============== + +For the REST API, attributes may have custom validators defined. Each +validator will have a method to perform the validation, and a type +definition string, so that the validator can be referenced. + + +Defining A Validator Method +--------------------------- + +The validation method will have a positional argument for the data to +be validated, and may have additional (optional) keyword arguments that +can be used during validation. The method must handle any exceptions +and either return None (success) or a i18n string indicating the +validation failure message. By convention, the method name is prefixed +with ``validate_`` and then includes the data type. For example: + +:: + + def validate_uuid(data, valid_values=None): + if not uuidutils.is_uuid_like(data): + msg = _("'%s' is not a valid UUID") % data + LOG.debug(msg) + return msg + +There is a validation dictionary that maps the method to a validation +type that can be referred to in REST API definitions. An entry in the +dictionary would look like the following: + +:: + + 'type:uuid': validate_uuid, + + +Using Validators +---------------- + +In client code, the valdiator can be used in a REST API by using the +dictionary key for the validator. For example: + +:: + + RESOURCE_ATTRIBUTE_MAP = { + NETWORKS: { + 'id': {'allow_post': False, 'allow_put': False, + 'validate': {'type:uuid': None}, + 'is_visible': True, + 'primary_key': True}, + 'name': {'allow_post': True, 'allow_put': True, + 'validate': {'type:string': NAME_MAX_LEN}, + 'default': '', 'is_visible': True}, + +Here, the networks resource has an 'id' attribute with a UUID validator, +as seen by the 'validate' key containing a dictionary with a key of +'type:uuid'. + +Any addition arguments for the validator can be specified as values for +the dictionary entry (None in this case, NAME_MAX_LEN in the 'name' +attribute that uses a string validator). In a IP version attribute, one +could have a validator defined as follows: + +:: + + 'ip_version': {'allow_post': True, 'allow_put': False, + 'convert_to': conversions.convert_to_int, + 'validate': {'type:values': [4, 6]}, + 'is_visible': True}, + +Here, the valdiate_values() method will take the list of values as the +allowable values that can be specified for this attribute. + +Test The Validator +------------------ + +Do the right thing, and make sure you've created a unit test for any +validator that you add to verify that it works as expected, even for +simple validators. + diff --git a/doc/source/devref/index.rst b/doc/source/devref/index.rst index 835ec34..948445d 100644 --- a/doc/source/devref/index.rst +++ b/doc/source/devref/index.rst @@ -32,6 +32,8 @@ Neutron Lib Internals .. toctree:: :maxdepth: 3 + api_converters + api_validators callbacks diff --git a/neutron_lib/api/__init__.py b/neutron_lib/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/neutron_lib/api/converters.py b/neutron_lib/api/converters.py new file mode 100644 index 0000000..ad4b61a --- /dev/null +++ b/neutron_lib/api/converters.py @@ -0,0 +1,119 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from neutron_lib._i18n import _ +from neutron_lib import exceptions as n_exc + + +def convert_to_boolean(data): + if isinstance(data, six.string_types): + val = data.lower() + if val == "true" or val == "1": + return True + if val == "false" or val == "0": + return False + elif isinstance(data, bool): + return data + elif isinstance(data, int): + if data == 0: + return False + elif data == 1: + return True + msg = _("'%s' cannot be converted to boolean") % data + raise n_exc.InvalidInput(error_message=msg) + + +def convert_to_boolean_if_not_none(data): + if data is not None: + return convert_to_boolean(data) + + +def convert_to_int(data): + try: + return int(data) + except (ValueError, TypeError): + msg = _("'%s' is not an integer") % data + raise n_exc.InvalidInput(error_message=msg) + + +def convert_to_int_if_not_none(data): + if data is not None: + return convert_to_int(data) + return data + + +def convert_to_positive_float_or_none(val): + # NOTE(salv-orlando): This conversion function is currently used by + # a vendor specific extension only at the moment It is used for + # port's RXTX factor in neutron.plugins.vmware.extensions.qos. + # It is deemed however generic enough to be in this module as it + # might be used in future for other API attributes. + if val is None: + return + try: + val = float(val) + if val < 0: + raise ValueError() + except (ValueError, TypeError): + msg = _("'%s' must be a non negative decimal.") % val + raise n_exc.InvalidInput(error_message=msg) + return val + + +def convert_kvp_str_to_list(data): + """Convert a value of the form 'key=value' to ['key', 'value']. + + :raises: n_exc.InvalidInput if any of the strings are malformed + (e.g. do not contain a key). + """ + kvp = [x.strip() for x in data.split('=', 1)] + if len(kvp) == 2 and kvp[0]: + return kvp + msg = _("'%s' is not of the form =[value]") % data + raise n_exc.InvalidInput(error_message=msg) + + +def convert_kvp_list_to_dict(kvp_list): + """Convert a list of 'key=value' strings to a dict. + + :raises: n_exc.InvalidInput if any of the strings are malformed + (e.g. do not contain a key) or if any + of the keys appear more than once. + """ + if kvp_list == ['True']: + # No values were provided (i.e. '--flag-name') + return {} + kvp_map = {} + for kvp_str in kvp_list: + key, value = convert_kvp_str_to_list(kvp_str) + kvp_map.setdefault(key, set()) + kvp_map[key].add(value) + return dict((x, list(y)) for x, y in six.iteritems(kvp_map)) + + +def convert_none_to_empty_list(value): + return [] if value is None else value + + +def convert_none_to_empty_dict(value): + return {} if value is None else value + + +def convert_to_list(data): + if data is None: + return [] + elif hasattr(data, '__iter__') and not isinstance(data, six.string_types): + return list(data) + else: + return [data] diff --git a/neutron_lib/api/validators.py b/neutron_lib/api/validators.py new file mode 100644 index 0000000..0a36b7a --- /dev/null +++ b/neutron_lib/api/validators.py @@ -0,0 +1,536 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + +import functools +import netaddr +from oslo_log import log as logging +from oslo_utils import uuidutils +import six + +from neutron_lib._i18n import _ +from neutron_lib.api import converters +from neutron_lib import constants +from neutron_lib import exceptions as n_exc + +LOG = logging.getLogger(__name__) + +# Used by range check to indicate no limit for a bound. +UNLIMITED = None + +# Note: In order to ensure that the MAC address is unicast the first byte +# must be even. +MAC_PATTERN = "^%s[aceACE02468](:%s{2}){5}$" % (constants.HEX_ELEM, + constants.HEX_ELEM) + + +def _verify_dict_keys(expected_keys, target_dict, strict=True): + """Allows to verify keys in a dictionary. + + :param expected_keys: A list of keys expected to be present. + :param target_dict: The dictionary which should be verified. + :param strict: Specifies whether additional keys are allowed to be present. + :return: True, if keys in the dictionary correspond to the specification. + """ + if not isinstance(target_dict, dict): + msg = (_("Invalid input. '%(target_dict)s' must be a dictionary " + "with keys: %(expected_keys)s") % + {'target_dict': target_dict, 'expected_keys': expected_keys}) + LOG.debug(msg) + return msg + + expected_keys = set(expected_keys) + provided_keys = set(target_dict.keys()) + + predicate = expected_keys.__eq__ if strict else expected_keys.issubset + + if not predicate(provided_keys): + msg = (_("Validation of dictionary's keys failed. " + "Expected keys: %(expected_keys)s " + "Provided keys: %(provided_keys)s") % + {'expected_keys': expected_keys, + 'provided_keys': provided_keys}) + LOG.debug(msg) + return msg + + +def is_attr_set(attribute): + return not (attribute is None or + attribute is constants.ATTR_NOT_SPECIFIED) + + +def _validate_list_of_items(item_validator, data, *args, **kwargs): + if not isinstance(data, list): + msg = _("'%s' is not a list") % data + return msg + + if len(set(data)) != len(data): + msg = _("Duplicate items in the list: '%s'") % ', '.join(data) + return msg + + for item in data: + msg = item_validator(item, *args, **kwargs) + if msg: + return msg + + +def validate_values(data, valid_values=None): + if data not in valid_values: + msg = (_("'%(data)s' is not in %(valid_values)s") % + {'data': data, 'valid_values': valid_values}) + LOG.debug(msg) + return msg + + +def validate_not_empty_string_or_none(data, max_len=None): + if data is not None: + return validate_not_empty_string(data, max_len=max_len) + + +def validate_not_empty_string(data, max_len=None): + msg = validate_string(data, max_len=max_len) + if msg: + return msg + if not data.strip(): + msg = _("'%s' Blank strings are not permitted") % data + LOG.debug(msg) + return msg + + +def validate_string_or_none(data, max_len=None): + if data is not None: + return validate_string(data, max_len=max_len) + + +def validate_string(data, max_len=None): + if not isinstance(data, six.string_types): + msg = _("'%s' is not a valid string") % data + LOG.debug(msg) + return msg + + if max_len is not None and len(data) > max_len: + msg = (_("'%(data)s' exceeds maximum length of %(max_len)s") % + {'data': data, 'max_len': max_len}) + LOG.debug(msg) + return msg + + +validate_list_of_unique_strings = functools.partial(_validate_list_of_items, + validate_string) + + +def validate_boolean(data, valid_values=None): + try: + converters.convert_to_boolean(data) + except n_exc.InvalidInput: + msg = _("'%s' is not a valid boolean value") % data + LOG.debug(msg) + return msg + + +def validate_range(data, valid_values=None): + """Check that integer value is within a range provided. + + Test is inclusive. Allows either limit to be ignored, to allow + checking ranges where only the lower or upper limit matter. + It is expected that the limits provided are valid integers or + the value None. + """ + + min_value = valid_values[0] + max_value = valid_values[1] + try: + data = int(data) + except (ValueError, TypeError): + msg = _("'%s' is not an integer") % data + LOG.debug(msg) + return msg + if min_value is not UNLIMITED and data < min_value: + msg = _("'%(data)s' is too small - must be at least " + "'%(limit)d'") % {'data': data, 'limit': min_value} + LOG.debug(msg) + return msg + if max_value is not UNLIMITED and data > max_value: + msg = _("'%(data)s' is too large - must be no larger than " + "'%(limit)d'") % {'data': data, 'limit': max_value} + LOG.debug(msg) + return msg + + +def validate_no_whitespace(data): + """Validates that input has no whitespace.""" + if re.search(r'\s', data): + msg = _("'%s' contains whitespace") % data + LOG.debug(msg) + raise n_exc.InvalidInput(error_message=msg) + return data + + +def validate_mac_address(data, valid_values=None): + try: + valid_mac = netaddr.valid_mac(validate_no_whitespace(data)) + except Exception: + valid_mac = False + + if valid_mac: + valid_mac = (not netaddr.EUI(data) in + map(netaddr.EUI, constants.INVALID_MAC_ADDRESSES)) + # TODO(arosen): The code in this file should be refactored + # so it catches the correct exceptions. validate_no_whitespace + # raises AttributeError if data is None. + if not valid_mac: + msg = _("'%s' is not a valid MAC address") % data + LOG.debug(msg) + return msg + + +def validate_mac_address_or_none(data, valid_values=None): + if data is not None: + return validate_mac_address(data, valid_values) + + +def validate_ip_address(data, valid_values=None): + msg = None + try: + # netaddr.core.ZEROFILL is only applicable to IPv4. + # it will remove leading zeros from IPv4 address octets. + ip = netaddr.IPAddress(validate_no_whitespace(data), + flags=netaddr.core.ZEROFILL) + # The followings are quick checks for IPv6 (has ':') and + # IPv4. (has 3 periods like 'xx.xx.xx.xx') + # NOTE(yamamoto): netaddr uses libraries provided by the underlying + # platform to convert addresses. For example, inet_aton(3). + # Some platforms, including NetBSD and OS X, have inet_aton + # implementation which accepts more varying forms of addresses than + # we want to accept here. The following check is to reject such + # addresses. For Example: + # >>> netaddr.IPAddress('1' * 59) + # IPAddress('199.28.113.199') + # >>> netaddr.IPAddress(str(int('1' * 59) & 0xffffffff)) + # IPAddress('199.28.113.199') + # >>> + if ':' not in data and data.count('.') != 3: + msg = _("'%s' is not a valid IP address") % data + # A leading '0' in IPv4 address may be interpreted as an octal number, + # e.g. 011 octal is 9 decimal. Since there is no standard saying + # whether IP address with leading '0's should be interpreted as octal + # or decimal, hence we reject leading '0's to avoid ambiguity. + if ip.version == 4 and str(ip) != data: + msg = _("'%(data)s' is not an accepted IP address, " + "'%(ip)s' is recommended") % {"data": data, "ip": ip} + except Exception: + msg = _("'%s' is not a valid IP address") % data + if msg: + LOG.debug(msg) + return msg + + +def validate_ip_pools(data, valid_values=None): + """Validate that start and end IP addresses are present. + + In addition to this the IP addresses will also be validated + """ + if not isinstance(data, list): + msg = _("Invalid data format for IP pool: '%s'") % data + LOG.debug(msg) + return msg + + expected_keys = ['start', 'end'] + for ip_pool in data: + msg = _verify_dict_keys(expected_keys, ip_pool) + if msg: + return msg + for k in expected_keys: + msg = validate_ip_address(ip_pool[k]) + if msg: + return msg + + +def validate_fixed_ips(data, valid_values=None): + if not isinstance(data, list): + msg = _("Invalid data format for fixed IP: '%s'") % data + LOG.debug(msg) + return msg + + ips = [] + for fixed_ip in data: + if not isinstance(fixed_ip, dict): + msg = _("Invalid data format for fixed IP: '%s'") % fixed_ip + LOG.debug(msg) + return msg + if 'ip_address' in fixed_ip: + # Ensure that duplicate entries are not set - just checking IP + # suffices. Duplicate subnet_id's are legitimate. + fixed_ip_address = fixed_ip['ip_address'] + if fixed_ip_address in ips: + msg = _("Duplicate IP address '%s'") % fixed_ip_address + LOG.debug(msg) + else: + msg = validate_ip_address(fixed_ip_address) + if msg: + return msg + ips.append(fixed_ip_address) + if 'subnet_id' in fixed_ip: + msg = validate_uuid(fixed_ip['subnet_id']) + if msg: + return msg + + +def validate_nameservers(data, valid_values=None): + if not hasattr(data, '__iter__'): + msg = _("Invalid data format for nameserver: '%s'") % data + LOG.debug(msg) + return msg + + hosts = [] + for host in data: + # This must be an IP address only + msg = validate_ip_address(host) + if msg: + msg = _("'%(host)s' is not a valid nameserver. %(msg)s") % { + 'host': host, 'msg': msg} + LOG.debug(msg) + return msg + if host in hosts: + msg = _("Duplicate nameserver '%s'") % host + LOG.debug(msg) + return msg + hosts.append(host) + + +def validate_hostroutes(data, valid_values=None): + if not isinstance(data, list): + msg = _("Invalid data format for hostroute: '%s'") % data + LOG.debug(msg) + return msg + + expected_keys = ['destination', 'nexthop'] + hostroutes = [] + for hostroute in data: + msg = _verify_dict_keys(expected_keys, hostroute) + if msg: + return msg + msg = validate_subnet(hostroute['destination']) + if msg: + return msg + msg = validate_ip_address(hostroute['nexthop']) + if msg: + return msg + if hostroute in hostroutes: + msg = _("Duplicate hostroute '%s'") % hostroute + LOG.debug(msg) + return msg + hostroutes.append(hostroute) + + +def validate_ip_address_or_none(data, valid_values=None): + if data is not None: + return validate_ip_address(data, valid_values) + + +def validate_subnet(data, valid_values=None): + msg = None + try: + net = netaddr.IPNetwork(validate_no_whitespace(data)) + if '/' not in data or (net.version == 4 and str(net) != data): + msg = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": data, + "cidr": net.cidr} + else: + return + except Exception: + msg = _("'%s' is not a valid IP subnet") % data + if msg: + LOG.debug(msg) + return msg + + +def validate_subnet_or_none(data, valid_values=None): + if data is not None: + return validate_subnet(data, valid_values) + + +validate_subnet_list = functools.partial(_validate_list_of_items, + validate_subnet) + + +def validate_regex(data, valid_values=None): + try: + if re.match(valid_values, data): + return + except TypeError: + pass + + msg = _("'%s' is not a valid input") % data + LOG.debug(msg) + return msg + + +def validate_regex_or_none(data, valid_values=None): + if data is not None: + return validate_regex(data, valid_values) + + +def validate_subnetpool_id(data, valid_values=None): + if data != constants.IPV6_PD_POOL_ID: + return validate_uuid_or_none(data, valid_values) + + +def validate_subnetpool_id_or_none(data, valid_values=None): + if data is not None: + return validate_subnetpool_id(data, valid_values) + + +def validate_uuid(data, valid_values=None): + if not uuidutils.is_uuid_like(data): + msg = _("'%s' is not a valid UUID") % data + LOG.debug(msg) + return msg + + +def validate_uuid_or_none(data, valid_values=None): + if data is not None: + return validate_uuid(data) + + +validate_uuid_list = functools.partial(_validate_list_of_items, + validate_uuid) + + +def _validate_dict_item(key, key_validator, data): + # Find conversion function, if any, and apply it + conv_func = key_validator.get('convert_to') + if conv_func: + data[key] = conv_func(data.get(key)) + # Find validator function + # TODO(salv-orlando): Structure of dict attributes should be improved + # to avoid iterating over items + val_func = val_params = None + for (k, v) in six.iteritems(key_validator): + if k.startswith('type:'): + # ask forgiveness, not permission + try: + val_func = validators[k] + except KeyError: + msg = _("Validator '%s' does not exist.") % k + LOG.debug(msg) + return msg + val_params = v + break + # Process validation + if val_func: + return val_func(data.get(key), val_params) + + +def validate_dict(data, key_specs=None): + if not isinstance(data, dict): + msg = _("'%s' is not a dictionary") % data + LOG.debug(msg) + return msg + # Do not perform any further validation, if no constraints are supplied + if not key_specs: + return + + # Check whether all required keys are present + required_keys = [key for key, spec in six.iteritems(key_specs) + if spec.get('required')] + + if required_keys: + msg = _verify_dict_keys(required_keys, data, False) + if msg: + return msg + + # Perform validation and conversion of all values + # according to the specifications. + for key, key_validator in [(k, v) for k, v in six.iteritems(key_specs) + if k in data]: + msg = _validate_dict_item(key, key_validator, data) + if msg: + return msg + + +def validate_dict_or_none(data, key_specs=None): + if data is not None: + return validate_dict(data, key_specs) + + +def validate_dict_or_empty(data, key_specs=None): + if data != {}: + return validate_dict(data, key_specs) + + +def validate_dict_or_nodata(data, key_specs=None): + if data: + return validate_dict(data, key_specs) + + +def validate_non_negative(data, valid_values=None): + try: + data = int(data) + except (ValueError, TypeError): + msg = _("'%s' is not an integer") % data + LOG.debug(msg) + return msg + + if data < 0: + msg = _("'%s' should be non-negative") % data + LOG.debug(msg) + return msg + + +# Dictionary that maintains a list of validation functions +validators = {'type:dict': validate_dict, + 'type:dict_or_none': validate_dict_or_none, + 'type:dict_or_empty': validate_dict_or_empty, + 'type:dict_or_nodata': validate_dict_or_nodata, + 'type:fixed_ips': validate_fixed_ips, + 'type:hostroutes': validate_hostroutes, + 'type:ip_address': validate_ip_address, + 'type:ip_address_or_none': validate_ip_address_or_none, + 'type:ip_pools': validate_ip_pools, + 'type:mac_address': validate_mac_address, + 'type:mac_address_or_none': validate_mac_address_or_none, + 'type:nameservers': validate_nameservers, + 'type:non_negative': validate_non_negative, + 'type:range': validate_range, + 'type:regex': validate_regex, + 'type:regex_or_none': validate_regex_or_none, + 'type:string': validate_string, + 'type:string_or_none': validate_string_or_none, + 'type:not_empty_string': validate_not_empty_string, + 'type:not_empty_string_or_none': + validate_not_empty_string_or_none, + 'type:subnet': validate_subnet, + 'type:subnet_list': validate_subnet_list, + 'type:subnet_or_none': validate_subnet_or_none, + 'type:subnetpool_id': validate_subnetpool_id, + 'type:subnetpool_id_or_none': validate_subnetpool_id_or_none, + 'type:uuid': validate_uuid, + 'type:uuid_or_none': validate_uuid_or_none, + 'type:uuid_list': validate_uuid_list, + 'type:values': validate_values, + 'type:boolean': validate_boolean, + 'type:list_of_unique_strings': validate_list_of_unique_strings} + + +def add_validator(validation_type, validator): + """Dynamically add a validator. + + This can be used by clients to add their own, private validators, rather + than directly modifying the data structure. The clients can NOT modify + existing validators. + """ + key = 'type:' + validation_type + if key in validators: + raise KeyError("Validator type %s is already defined", validation_type) + validators[key] = validator diff --git a/neutron_lib/constants.py b/neutron_lib/constants.py index f0ac4b2..c709018 100644 --- a/neutron_lib/constants.py +++ b/neutron_lib/constants.py @@ -129,8 +129,22 @@ ICMPV6_ALLOWED_TYPES = [130, 131, 132, 135, 136] ICMPV6_TYPE_RA = 134 ICMPV6_TYPE_NA = 136 +# Human-readable ID to which the subnetpool ID should be set to +# indicate that IPv6 Prefix Delegation is enabled for a given subnetpool +IPV6_PD_POOL_ID = 'prefix_delegation' + # Device names start with "tap" TAP_DEVICE_PREFIX = 'tap' # Time format ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' + +############################# +# Attribute related constants +############################# +ATTR_NOT_SPECIFIED = object() + +HEX_ELEM = '[0-9A-Fa-f]' +UUID_PATTERN = '-'.join([HEX_ELEM + '{8}', HEX_ELEM + '{4}', + HEX_ELEM + '{4}', HEX_ELEM + '{4}', + HEX_ELEM + '{12}']) diff --git a/neutron_lib/tests/tools.py b/neutron_lib/tests/tools.py new file mode 100644 index 0000000..24cde5c --- /dev/null +++ b/neutron_lib/tests/tools.py @@ -0,0 +1,38 @@ +# Copyright (c) 2013 NEC Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Note: _safe_sort_key came from neutron/common/utils.py. For neutron-lib +# it is only used for testing, so is placed here. +import collections + + +def _safe_sort_key(value): + """Return value hash or build one for dictionaries.""" + if isinstance(value, collections.Mapping): + return sorted(value.items()) + return value + + +class UnorderedList(list): + """A list that is equals to any permutation of itself.""" + + def __eq__(self, other): + if not isinstance(other, list): + return False + return (sorted(self, key=_safe_sort_key) == + sorted(other, key=_safe_sort_key)) + + def __neq__(self, other): + return not self == other diff --git a/neutron_lib/tests/unit/api/__init__.py b/neutron_lib/tests/unit/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/neutron_lib/tests/unit/api/test_conversions.py b/neutron_lib/tests/unit/api/test_conversions.py new file mode 100644 index 0000000..f3a30f2 --- /dev/null +++ b/neutron_lib/tests/unit/api/test_conversions.py @@ -0,0 +1,169 @@ +# Copyright 2012 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from neutron_lib.api import converters +from neutron_lib import exceptions as n_exc +from neutron_lib.tests import base +from neutron_lib.tests import tools + + +class TestConvertToBoolean(base.TestCase): + + def test_convert_to_boolean_bool(self): + self.assertIs(converters.convert_to_boolean(True), True) + self.assertIs(converters.convert_to_boolean(False), False) + + def test_convert_to_boolean_int(self): + self.assertIs(converters.convert_to_boolean(0), False) + self.assertIs(converters.convert_to_boolean(1), True) + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_boolean, + 7) + + def test_convert_to_boolean_str(self): + self.assertIs(converters.convert_to_boolean('True'), True) + self.assertIs(converters.convert_to_boolean('true'), True) + self.assertIs(converters.convert_to_boolean('False'), False) + self.assertIs(converters.convert_to_boolean('false'), False) + self.assertIs(converters.convert_to_boolean('0'), False) + self.assertIs(converters.convert_to_boolean('1'), True) + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_boolean, + '7') + + def test_convert_to_boolean_if_not_none(self): + self.assertIsNone(converters.convert_to_boolean_if_not_none(None)) + self.assertIs(converters.convert_to_boolean_if_not_none(1), True) + + +class TestConvertToInt(base.TestCase): + + def test_convert_to_int_int(self): + self.assertEqual(-1, converters.convert_to_int(-1)) + self.assertEqual(0, converters.convert_to_int(0)) + self.assertEqual(1, converters.convert_to_int(1)) + + def test_convert_to_int_if_not_none(self): + self.assertEqual(-1, converters.convert_to_int_if_not_none(-1)) + self.assertEqual(0, converters.convert_to_int_if_not_none(0)) + self.assertEqual(1, converters.convert_to_int_if_not_none(1)) + self.assertIsNone(converters.convert_to_int_if_not_none(None)) + + def test_convert_to_int_str(self): + self.assertEqual(4, converters.convert_to_int('4')) + self.assertEqual(6, converters.convert_to_int('6')) + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_int, + 'garbage') + + def test_convert_to_int_none(self): + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_int, + None) + + def test_convert_none_to_empty_list_none(self): + self.assertEqual([], converters.convert_none_to_empty_list(None)) + + def test_convert_none_to_empty_dict(self): + self.assertEqual({}, converters.convert_none_to_empty_dict(None)) + + def test_convert_none_to_empty_list_value(self): + values = ['1', 3, [], [1], {}, {'a': 3}] + for value in values: + self.assertEqual( + value, converters.convert_none_to_empty_list(value)) + + +class TestConvertToFloat(base.TestCase): + # NOTE: the routine being tested here is a plugin-specific extension + # module. As the plugin split proceed towards its second phase this + # test should either be remove, or the validation routine moved into + # neutron.api.v2.attributes + + def test_convert_to_float_positve_value(self): + self.assertEqual( + 1.111, converters.convert_to_positive_float_or_none(1.111)) + self.assertEqual(1, converters.convert_to_positive_float_or_none(1)) + self.assertEqual(0, converters.convert_to_positive_float_or_none(0)) + + def test_convert_to_float_negative_value(self): + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_positive_float_or_none, + -1.11) + + def test_convert_to_float_string(self): + self.assertEqual(4, converters.convert_to_positive_float_or_none('4')) + self.assertEqual( + 4.44, converters.convert_to_positive_float_or_none('4.44')) + self.assertRaises(n_exc.InvalidInput, + converters.convert_to_positive_float_or_none, + 'garbage') + + def test_convert_to_float_none_value(self): + self.assertIsNone(converters.convert_to_positive_float_or_none(None)) + + +class TestConvertKvp(base.TestCase): + + def test_convert_kvp_list_to_dict_succeeds_for_missing_values(self): + result = converters.convert_kvp_list_to_dict(['True']) + self.assertEqual({}, result) + + def test_convert_kvp_list_to_dict_succeeds_for_multiple_values(self): + result = converters.convert_kvp_list_to_dict( + ['a=b', 'a=c', 'a=c', 'b=a']) + expected = {'a': tools.UnorderedList(['c', 'b']), 'b': ['a']} + self.assertEqual(expected, result) + + def test_convert_kvp_list_to_dict_succeeds_for_values(self): + result = converters.convert_kvp_list_to_dict(['a=b', 'c=d']) + self.assertEqual({'a': ['b'], 'c': ['d']}, result) + + def test_convert_kvp_str_to_list_fails_for_missing_key(self): + with testtools.ExpectedException(n_exc.InvalidInput): + converters.convert_kvp_str_to_list('=a') + + def test_convert_kvp_str_to_list_fails_for_missing_equals(self): + with testtools.ExpectedException(n_exc.InvalidInput): + converters.convert_kvp_str_to_list('a') + + def test_convert_kvp_str_to_list_succeeds_for_one_equals(self): + result = converters.convert_kvp_str_to_list('a=') + self.assertEqual(['a', ''], result) + + def test_convert_kvp_str_to_list_succeeds_for_two_equals(self): + result = converters.convert_kvp_str_to_list('a=a=a') + self.assertEqual(['a', 'a=a'], result) + + +class TestConvertToList(base.TestCase): + + def test_convert_to_empty_list(self): + for item in (None, [], (), {}): + self.assertEqual([], converters.convert_to_list(item)) + + def test_convert_to_list_string(self): + for item in ('', 'foo'): + self.assertEqual([item], converters.convert_to_list(item)) + + def test_convert_to_list_iterable(self): + for item in ([None], [1, 2, 3], (1, 2, 3), set([1, 2, 3]), ['foo']): + self.assertEqual(list(item), converters.convert_to_list(item)) + + def test_convert_to_list_non_iterable(self): + for item in (True, False, 1, 1.2, object()): + self.assertEqual([item], converters.convert_to_list(item)) diff --git a/neutron_lib/tests/unit/api/test_validators.py b/neutron_lib/tests/unit/api/test_validators.py new file mode 100644 index 0000000..7dad656 --- /dev/null +++ b/neutron_lib/tests/unit/api/test_validators.py @@ -0,0 +1,840 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import string + +import mock +import netaddr + +from neutron_lib._i18n import _ +from neutron_lib.api import converters +from neutron_lib.api import validators +from neutron_lib import constants +from neutron_lib import exceptions as n_exc +from neutron_lib.tests import base + + +def dummy_validator(data, valid_values=None): + pass + + +class TestAttributeValidation(base.TestCase): + + def _construct_dict_and_constraints(self): + """Constructs a test dictionary and a definition of constraints. + + :return: A (dictionary, constraint) tuple + """ + + constraints = {'key1': {'type:values': ['val1', 'val2'], + 'required': True}, + 'key2': {'type:string': None, + 'required': False}, + 'key3': {'type:dict': {'k4': {'type:string': None, + 'required': True}}, + 'required': True}} + + dictionary = {'key1': 'val1', + 'key2': 'a string value', + 'key3': {'k4': 'a string value'}} + + return dictionary, constraints + + def test_adding_validator(self): + validators.add_validator('new_type', dummy_validator) + self.assertIn('type:new_type', validators.validators) + self.assertEqual(dummy_validator, + validators.validators['type:new_type']) + + def test_fail_adding_duplicate_validator(self): + self.assertRaises(KeyError, + validators.add_validator, + 'dict', dummy_validator) + + def test_is_attr_set(self): + data = constants.ATTR_NOT_SPECIFIED + self.assertIs(validators.is_attr_set(data), False) + + data = None + self.assertIs(validators.is_attr_set(data), False) + + data = "I'm set" + self.assertIs(validators.is_attr_set(data), True) + + def test_validate_values(self): + msg = validators.validate_values(4, [4, 6]) + self.assertIsNone(msg) + + msg = validators.validate_values(4, (4, 6)) + self.assertIsNone(msg) + + msg = validators.validate_values(7, [4, 6]) + self.assertEqual("'7' is not in [4, 6]", msg) + + msg = validators.validate_values(7, (4, 6)) + self.assertEqual("'7' is not in (4, 6)", msg) + + def test_validate_not_empty_string(self): + msg = validators.validate_not_empty_string(' ', None) + self.assertEqual(u"' ' Blank strings are not permitted", msg) + msg = validators.validate_not_empty_string(123, None) + self.assertEqual(u"'123' is not a valid string", msg) + + def test_validate_not_empty_string_or_none(self): + msg = validators.validate_not_empty_string_or_none(' ', None) + self.assertEqual(u"' ' Blank strings are not permitted", msg) + + msg = validators.validate_not_empty_string_or_none(None, None) + self.assertIsNone(msg) + + def test_validate_string_or_none(self): + msg = validators.validate_string_or_none('test', None) + self.assertIsNone(msg) + + msg = validators.validate_string_or_none(None, None) + self.assertIsNone(msg) + + def test_validate_string(self): + msg = validators.validate_string(None, None) + self.assertEqual("'None' is not a valid string", msg) + + # 0 == len(data) == max_len + msg = validators.validate_string("", 0) + self.assertIsNone(msg) + + # 0 == len(data) < max_len + msg = validators.validate_string("", 9) + self.assertIsNone(msg) + + # 0 < len(data) < max_len + msg = validators.validate_string("123456789", 10) + self.assertIsNone(msg) + + # 0 < len(data) == max_len + msg = validators.validate_string("123456789", 9) + self.assertIsNone(msg) + + # 0 < max_len < len(data) + msg = validators.validate_string("1234567890", 9) + self.assertEqual("'1234567890' exceeds maximum length of 9", msg) + + msg = validators.validate_string("123456789", None) + self.assertIsNone(msg) + + def test_validate_list_of_unique_strings(self): + data = "TEST" + msg = validators.validate_list_of_unique_strings(data, None) + self.assertEqual("'TEST' is not a list", msg) + + data = ["TEST01", "TEST02", "TEST01"] + msg = validators.validate_list_of_unique_strings(data, None) + self.assertEqual( + "Duplicate items in the list: 'TEST01, TEST02, TEST01'", msg) + + data = ["12345678", "123456789"] + msg = validators.validate_list_of_unique_strings(data, 8) + self.assertEqual("'123456789' exceeds maximum length of 8", msg) + + data = ["TEST01", "TEST02", "TEST03"] + msg = validators.validate_list_of_unique_strings(data, None) + self.assertIsNone(msg) + + def test_validate_boolean(self): + msg = validators.validate_boolean(True) + self.assertIsNone(msg) + msg = validators.validate_boolean(0) + self.assertIsNone(msg) + msg = validators.validate_boolean("false") + self.assertIsNone(msg) + msg = validators.validate_boolean("fasle") + self.assertEqual("'fasle' is not a valid boolean value", msg) + + def test_validate_no_whitespace(self): + data = 'no_white_space' + result = validators.validate_no_whitespace(data) + self.assertEqual(data, result) + + self.assertRaises(n_exc.InvalidInput, + validators.validate_no_whitespace, + 'i have whitespace') + + self.assertRaises(n_exc.InvalidInput, + validators.validate_no_whitespace, + 'i\thave\twhitespace') + + for ws in string.whitespace: + self.assertRaises(n_exc.InvalidInput, + validators.validate_no_whitespace, + '%swhitespace-at-head' % ws) + self.assertRaises(n_exc.InvalidInput, + validators.validate_no_whitespace, + 'whitespace-at-tail%s' % ws) + + def test_validate_range(self): + msg = validators.validate_range(1, [1, 9]) + self.assertIsNone(msg) + + msg = validators.validate_range(5, [1, 9]) + self.assertIsNone(msg) + + msg = validators.validate_range(9, [1, 9]) + self.assertIsNone(msg) + + msg = validators.validate_range(1, (1, 9)) + self.assertIsNone(msg) + + msg = validators.validate_range(5, (1, 9)) + self.assertIsNone(msg) + + msg = validators.validate_range(9, (1, 9)) + self.assertIsNone(msg) + + msg = validators.validate_range(0, [1, 9]) + self.assertEqual("'0' is too small - must be at least '1'", msg) + + msg = validators.validate_range(10, (1, 9)) + self.assertEqual("'10' is too large - must be no larger than '9'", msg) + + msg = validators.validate_range("bogus", (1, 9)) + self.assertEqual("'bogus' is not an integer", msg) + + msg = validators.validate_range(10, (validators.UNLIMITED, + validators.UNLIMITED)) + self.assertIsNone(msg) + + msg = validators.validate_range(10, (1, validators.UNLIMITED)) + self.assertIsNone(msg) + + msg = validators.validate_range(1, (validators.UNLIMITED, 9)) + self.assertIsNone(msg) + + msg = validators.validate_range(-1, (0, validators.UNLIMITED)) + self.assertEqual("'-1' is too small - must be at least '0'", msg) + + msg = validators.validate_range(10, (validators.UNLIMITED, 9)) + self.assertEqual("'10' is too large - must be no larger than '9'", msg) + + def _test_validate_mac_address(self, validator, allow_none=False): + mac_addr = "ff:16:3e:4f:00:00" + msg = validator(mac_addr) + self.assertIsNone(msg) + + mac_addr = "ffa:16:3e:4f:00:00" + msg = validator(mac_addr) + err_msg = "'%s' is not a valid MAC address" + self.assertEqual(err_msg % mac_addr, msg) + + for invalid_mac_addr in constants.INVALID_MAC_ADDRESSES: + msg = validator(invalid_mac_addr) + self.assertEqual(err_msg % invalid_mac_addr, msg) + + mac_addr = "123" + msg = validator(mac_addr) + self.assertEqual(err_msg % mac_addr, msg) + + mac_addr = None + msg = validator(mac_addr) + if allow_none: + self.assertIsNone(msg) + else: + self.assertEqual(err_msg % mac_addr, msg) + + mac_addr = "ff:16:3e:4f:00:00\r" + msg = validator(mac_addr) + self.assertEqual(err_msg % mac_addr, msg) + + def test_validate_mac_address(self): + self._test_validate_mac_address(validators.validate_mac_address) + + def test_validate_mac_address_or_none(self): + self._test_validate_mac_address( + validators.validate_mac_address_or_none, allow_none=True) + + def test_validate_ip_address(self): + ip_addr = '1.1.1.1' + msg = validators.validate_ip_address(ip_addr) + self.assertIsNone(msg) + + ip_addr = '1111.1.1.1' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + # Depending on platform to run UTs, this case might or might not be + # an equivalent to test_validate_ip_address_bsd. + ip_addr = '1' * 59 + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + ip_addr = '1.1.1.1 has whitespace' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + ip_addr = '111.1.1.1\twhitespace' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + ip_addr = '111.1.1.1\nwhitespace' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + for ws in string.whitespace: + ip_addr = '%s111.1.1.1' % ws + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + for ws in string.whitespace: + ip_addr = '111.1.1.1%s' % ws + msg = validators.validate_ip_address(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + def test_validate_ip_address_with_leading_zero(self): + ip_addr = '1.1.1.01' + expected_msg = ("'%(data)s' is not an accepted IP address, " + "'%(ip)s' is recommended") + msg = validators.validate_ip_address(ip_addr) + self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.1'}, + msg) + + ip_addr = '1.1.1.011' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.11'}, + msg) + + ip_addr = '1.1.1.09' + msg = validators.validate_ip_address(ip_addr) + self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.9'}, + msg) + + ip_addr = "fe80:0:0:0:0:0:0:0001" + msg = validators.validate_ip_address(ip_addr) + self.assertIsNone(msg) + + def test_validate_ip_address_bsd(self): + # NOTE(yamamoto): On NetBSD and OS X, netaddr.IPAddress() accepts + # '1' * 59 as a valid address. The behaviour is inherited from + # libc behaviour there. This test ensures that our validator reject + # such addresses on such platforms by mocking netaddr to emulate + # the behaviour. + ip_addr = '1' * 59 + with mock.patch('netaddr.IPAddress') as ip_address_cls: + msg = validators.validate_ip_address(ip_addr) + ip_address_cls.assert_called_once_with(ip_addr, + flags=netaddr.core.ZEROFILL) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + def test_validate_ip_pools(self): + pools = [[{'end': '10.0.0.254'}], + [{'start': '10.0.0.254'}], + [{'start': '1000.0.0.254', + 'end': '1.1.1.1'}], + [{'start': '10.0.0.2', 'end': '10.0.0.254', + 'forza': 'juve'}], + [{'start': '10.0.0.2', 'end': '10.0.0.254'}, + {'end': '10.0.0.254'}], + [None], + None] + for pool in pools: + msg = validators.validate_ip_pools(pool) + self.assertIsNotNone(msg) + + pools = [[{'end': '10.0.0.254', 'start': '10.0.0.2'}, + {'start': '11.0.0.2', 'end': '11.1.1.1'}], + [{'start': '11.0.0.2', 'end': '11.0.0.100'}]] + for pool in pools: + msg = validators.validate_ip_pools(pool) + self.assertIsNone(msg) + + invalid_ip = '10.0.0.2\r' + pools = [[{'end': '10.0.0.254', 'start': invalid_ip}]] + for pool in pools: + msg = validators.validate_ip_pools(pool) + self.assertEqual("'%s' is not a valid IP address" % invalid_ip, + msg) + + def test_validate_fixed_ips(self): + fixed_ips = [ + {'data': [{'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1111.1.1.1'}], + 'error_msg': "'1111.1.1.1' is not a valid IP address"}, + {'data': [{'subnet_id': 'invalid', + 'ip_address': '1.1.1.1'}], + 'error_msg': "'invalid' is not a valid UUID"}, + {'data': None, + 'error_msg': "Invalid data format for fixed IP: 'None'"}, + {'data': "1.1.1.1", + 'error_msg': "Invalid data format for fixed IP: '1.1.1.1'"}, + {'data': ['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1'], + 'error_msg': "Invalid data format for fixed IP: " + "'00000000-ffff-ffff-ffff-000000000000'"}, + {'data': [['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1']], + 'error_msg': "Invalid data format for fixed IP: " + "'['00000000-ffff-ffff-ffff-000000000000', " + "'1.1.1.1']'"}, + {'data': [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}, + {'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}], + 'error_msg': "Duplicate IP address '1.1.1.1'"}] + for fixed in fixed_ips: + msg = validators.validate_fixed_ips(fixed['data']) + self.assertEqual(fixed['error_msg'], msg) + + fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}], + [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.1'}, + {'subnet_id': '00000000-ffff-ffff-ffff-000000000000', + 'ip_address': '1.1.1.2'}]] + for fixed in fixed_ips: + msg = validators.validate_fixed_ips(fixed) + self.assertIsNone(msg) + + def test_validate_nameservers(self): + ns_pools = [['1.1.1.2', '1.1.1.2'], + ['www.hostname.com', 'www.hostname.com'], + ['1000.0.0.1'], + ['www.hostname.com'], + ['www.great.marathons.to.travel'], + ['valid'], + ['77.hostname.com'], + ['1' * 59], + ['www.internal.hostname.com'], + None] + + for ns in ns_pools: + msg = validators.validate_nameservers(ns, None) + self.assertIsNotNone(msg) + + ns_pools = [['100.0.0.2'], + ['1.1.1.1', '1.1.1.2']] + + for ns in ns_pools: + msg = validators.validate_nameservers(ns, None) + self.assertIsNone(msg) + + def test_validate_hostroutes(self): + hostroute_pools = [[{'destination': '100.0.0.0/24'}], + [{'nexthop': '10.0.2.20'}], + [{'nexthop': '10.0.2.20', + 'forza': 'juve', + 'destination': '100.0.0.0/8'}], + [{'nexthop': '1110.0.2.20', + 'destination': '100.0.0.0/8'}], + [{'nexthop': '10.0.2.20', + 'destination': '100.0.0.0'}], + [{'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}, + {'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}], + [None], + None] + for host_routes in hostroute_pools: + msg = validators.validate_hostroutes(host_routes, None) + self.assertIsNotNone(msg) + + hostroute_pools = [[{'destination': '100.0.0.0/24', + 'nexthop': '10.0.2.20'}], + [{'nexthop': '10.0.2.20', + 'destination': '100.0.0.0/8'}, + {'nexthop': '10.0.2.20', + 'destination': '101.0.0.0/8'}]] + for host_routes in hostroute_pools: + msg = validators.validate_hostroutes(host_routes, None) + self.assertIsNone(msg) + + def test_validate_ip_address_or_none(self): + ip_addr = None + msg = validators.validate_ip_address_or_none(ip_addr) + self.assertIsNone(msg) + + ip_addr = '1.1.1.1' + msg = validators.validate_ip_address_or_none(ip_addr) + self.assertIsNone(msg) + + ip_addr = '1111.1.1.1' + msg = validators.validate_ip_address_or_none(ip_addr) + self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg) + + def test_uuid_pattern(self): + data = 'garbage' + msg = validators.validate_regex(data, constants.UUID_PATTERN) + self.assertIsNotNone(msg) + + data = '00000000-ffff-ffff-ffff-000000000000' + msg = validators.validate_regex(data, constants.UUID_PATTERN) + self.assertIsNone(msg) + + def test_mac_pattern(self): + # Valid - 3 octets + base_mac = "fa:16:3e:00:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNone(msg) + + # Valid - 4 octets + base_mac = "fa:16:3e:4f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNone(msg) + + # Invalid - not unicast + base_mac = "01:16:3e:4f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "a:16:3e:4f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "ffa:16:3e:4f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "01163e4f0000" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "01-16-3e-4f-00-00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "00:16:3:f:00:00" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + # Invalid - invalid format + base_mac = "12:3:4:5:67:89ab" + msg = validators.validate_regex(base_mac, validators.MAC_PATTERN) + self.assertIsNotNone(msg) + + def _test_validate_subnet(self, validator, allow_none=False): + # Valid - IPv4 + cidr = "10.0.2.0/24" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - IPv6 without final octets + cidr = "fe80::/24" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - IPv6 with final octets + cidr = "fe80::/24" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - uncompressed ipv6 address + cidr = "fe80:0:0:0:0:0:0:0/128" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - ipv6 address with multiple consecutive zero + cidr = "2001:0db8:0:0:1::1/128" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - ipv6 address with multiple consecutive zero + cidr = "2001:0db8::1:0:0:1/128" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Valid - ipv6 address with multiple consecutive zero + cidr = "2001::0:1:0:0:1100/120" + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Invalid - abbreviated ipv4 address + cidr = "10/24" + msg = validator(cidr, None) + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "10.0.0.0/24"} + self.assertEqual(error, msg) + + # Invalid - IPv4 missing mask + cidr = "10.0.2.0" + msg = validator(cidr, None) + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "10.0.2.0/32"} + self.assertEqual(error, msg) + + # Valid - IPv4 with non-zero masked bits is ok + for i in range(1, 255): + cidr = "192.168.1.%s/24" % i + msg = validator(cidr, None) + self.assertIsNone(msg) + + # Invalid - IPv6 without final octets, missing mask + cidr = "fe80::" + msg = validator(cidr, None) + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "fe80::/128"} + self.assertEqual(error, msg) + + # Invalid - IPv6 with final octets, missing mask + cidr = "fe80::0" + msg = validator(cidr, None) + error = _("'%(data)s' isn't a recognized IP subnet cidr," + " '%(cidr)s' is recommended") % {"data": cidr, + "cidr": "fe80::/128"} + self.assertEqual(error, msg) + + # Invalid - Address format error + cidr = 'invalid' + msg = validator(cidr, None) + error = "'%s' is not a valid IP subnet" % cidr + self.assertEqual(error, msg) + + cidr = None + msg = validator(cidr, None) + if allow_none: + self.assertIsNone(msg) + else: + error = "'%s' is not a valid IP subnet" % cidr + self.assertEqual(error, msg) + + # Invalid - IPv4 with trailing CR + cidr = "10.0.2.0/24\r" + msg = validator(cidr, None) + error = "'%s' is not a valid IP subnet" % cidr + self.assertEqual(error, msg) + + def test_validate_subnet(self): + self._test_validate_subnet(validators.validate_subnet) + + def test_validate_subnet_or_none(self): + self._test_validate_subnet(validators.validate_subnet_or_none, + allow_none=True) + + def test_validate_subnet_list(self): + msg = validators.validate_subnet_list('abc') + self.assertEqual(u"'abc' is not a list", msg) + msg = validators.validate_subnet_list(['10.1.0.0/24', + '10.2.0.0/24', + '10.1.0.0/24']) + self.assertEqual(u"Duplicate items in the list: '10.1.0.0/24, " + u"10.2.0.0/24, 10.1.0.0/24'", msg) + msg = validators.validate_subnet_list(['10.1.0.0/24', '10.2.0.0']) + self.assertEqual(u"'10.2.0.0' isn't a recognized IP subnet cidr, " + u"'10.2.0.0/32' is recommended", msg) + + def _test_validate_regex(self, validator, allow_none=False): + pattern = '[hc]at' + + data = None + msg = validator(data, pattern) + if allow_none: + self.assertIsNone(msg) + else: + self.assertEqual("'None' is not a valid input", msg) + + data = 'bat' + msg = validator(data, pattern) + self.assertEqual("'%s' is not a valid input" % data, msg) + + data = 'hat' + msg = validator(data, pattern) + self.assertIsNone(msg) + + data = 'cat' + msg = validator(data, pattern) + self.assertIsNone(msg) + + def test_validate_regex(self): + self._test_validate_regex(validators.validate_regex) + + def test_validate_regex_or_none(self): + self._test_validate_regex(validators.validate_regex_or_none, + allow_none=True) + + def test_validate_subnetpool_id(self): + msg = validators.validate_subnetpool_id(constants.IPV6_PD_POOL_ID) + self.assertIsNone(msg) + + msg = validators.validate_subnetpool_id( + '00000000-ffff-ffff-ffff-000000000000') + self.assertIsNone(msg) + + def test_validate_subnetpool_id_or_none(self): + msg = validators.validate_subnetpool_id_or_none(None) + self.assertIsNone(msg) + + msg = validators.validate_subnetpool_id_or_none( + '00000000-ffff-ffff-ffff-000000000000') + self.assertIsNone(msg) + + def test_validate_uuid(self): + invalid_uuids = [None, + 123, + '123', + 't5069610-744b-42a7-8bd8-ceac1a229cd4', + 'e5069610-744bb-42a7-8bd8-ceac1a229cd4'] + for uuid in invalid_uuids: + msg = validators.validate_uuid(uuid) + error = "'%s' is not a valid UUID" % uuid + self.assertEqual(error, msg) + + msg = validators.validate_uuid('00000000-ffff-ffff-ffff-000000000000') + self.assertIsNone(msg) + + def test__validate_list_of_items(self): + # check not a list + items = [None, + 123, + 'e5069610-744b-42a7-8bd8-ceac1a229cd4', + '12345678123456781234567812345678', + {'uuid': 'e5069610-744b-42a7-8bd8-ceac1a229cd4'}] + for item in items: + msg = validators._validate_list_of_items(mock.Mock(), item) + error = "'%s' is not a list" % item + self.assertEqual(error, msg) + + # check duplicate items in a list + duplicate_items = ['e5069610-744b-42a7-8bd8-ceac1a229cd4', + 'f3eeab00-8367-4524-b662-55e64d4cacb5', + 'e5069610-744b-42a7-8bd8-ceac1a229cd4'] + msg = validators._validate_list_of_items(mock.Mock(), duplicate_items) + error = ("Duplicate items in the list: " + "'%s'" % ', '.join(duplicate_items)) + self.assertEqual(error, msg) + + # check valid lists + valid_lists = [[], + [1, 2, 3], + ['a', 'b', 'c']] + for list_obj in valid_lists: + msg = validators._validate_list_of_items( + mock.Mock(return_value=None), list_obj) + self.assertIsNone(msg) + + def test_validate_dict_type(self): + for value in (None, True, '1', []): + self.assertEqual("'%s' is not a dictionary" % value, + validators.validate_dict(value)) + + def test_validate_dict_without_constraints(self): + msg = validators.validate_dict({}) + self.assertIsNone(msg) + + # Validate a dictionary without constraints. + msg = validators.validate_dict({'key': 'value'}) + self.assertIsNone(msg) + + def test_validate_a_valid_dict_with_constraints(self): + dictionary, constraints = self._construct_dict_and_constraints() + + msg = validators.validate_dict(dictionary, constraints) + self.assertIsNone(msg, 'Validation of a valid dictionary failed.') + + def test_validate_dict_with_invalid_validator(self): + dictionary, constraints = self._construct_dict_and_constraints() + + constraints['key1'] = {'type:unsupported': None, 'required': True} + msg = validators.validate_dict(dictionary, constraints) + self.assertEqual("Validator 'type:unsupported' does not exist.", msg) + + def test_validate_dict_not_required_keys(self): + dictionary, constraints = self._construct_dict_and_constraints() + + del dictionary['key2'] + msg = validators.validate_dict(dictionary, constraints) + self.assertIsNone(msg, 'Field that was not required by the specs was' + 'required by the validator.') + + def test_validate_dict_required_keys(self): + dictionary, constraints = self._construct_dict_and_constraints() + + del dictionary['key1'] + msg = validators.validate_dict(dictionary, constraints) + self.assertIn('Expected keys:', msg) + + def test_validate_dict_wrong_values(self): + dictionary, constraints = self._construct_dict_and_constraints() + + dictionary['key1'] = 'UNSUPPORTED' + msg = validators.validate_dict(dictionary, constraints) + self.assertIsNotNone(msg) + + def test_validate_dict_convert_boolean(self): + dictionary, constraints = self._construct_dict_and_constraints() + + constraints['key_bool'] = { + 'type:boolean': None, + 'required': False, + 'convert_to': converters.convert_to_boolean} + dictionary['key_bool'] = 'true' + msg = validators.validate_dict(dictionary, constraints) + self.assertIsNone(msg) + # Explicitly comparing with literal 'True' as assertTrue + # succeeds also for 'true' + self.assertIs(True, dictionary['key_bool']) + + def test_subdictionary(self): + dictionary, constraints = self._construct_dict_and_constraints() + + del dictionary['key3']['k4'] + dictionary['key3']['k5'] = 'a string value' + msg = validators.validate_dict(dictionary, constraints) + self.assertIn('Expected keys:', msg) + + def test_validate_dict_or_none(self): + dictionary, constraints = self._construct_dict_and_constraints() + + # Check whether None is a valid value. + msg = validators.validate_dict_or_none(None, constraints) + self.assertIsNone(msg, 'Validation of a None dictionary failed.') + + # Check validation of a regular dictionary. + msg = validators.validate_dict_or_none(dictionary, constraints) + self.assertIsNone(msg, 'Validation of a valid dictionary failed.') + + def test_validate_dict_or_empty(self): + dictionary, constraints = self._construct_dict_and_constraints() + + # Check whether an empty dictionary is valid. + msg = validators.validate_dict_or_empty({}, constraints) + self.assertIsNone(msg, 'Validation of a None dictionary failed.') + + # Check validation of a regular dictionary. + msg = validators.validate_dict_or_empty(dictionary, constraints) + self.assertIsNone(msg, 'Validation of a valid dictionary failed.') + + def test_validate_dict_or_nodata(self): + dictionary, constraints = self._construct_dict_and_constraints() + + # Check whether no data is a valid value. + msg = validators.validate_dict_or_nodata(None, constraints) + self.assertIsNone(msg, 'Validation of None for no-data failed.') + msg = validators.validate_dict_or_nodata({}, constraints) + self.assertIsNone(msg, 'Validation of empty dict for no-data failed.') + + # Check validation of a regular dictionary. + msg = validators.validate_dict_or_nodata(dictionary, constraints) + self.assertIsNone(msg, 'Validation of a valid dictionary failed.') + + def test_validate_non_negative(self): + msg = validators.validate_non_negative('abc') + self.assertEqual("'abc' is not an integer", msg) + + for value in (-1, '-2'): + self.assertEqual("'%s' should be non-negative" % value, + validators.validate_non_negative(value)) + + for value in (0, 1, '2', True, False): + msg = validators.validate_non_negative(value) + self.assertIsNone(msg)