e41893c9d0
This change addresses nit-level review comments from this task. Story: 1651346 Task: 10551 Change-Id: I01608004ce90facadb73e252203900a1e62cbea1
398 lines
13 KiB
Python
Executable File
398 lines
13 KiB
Python
Executable File
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import functools
|
|
import inspect
|
|
|
|
import jsonschema
|
|
from oslo_utils import strutils
|
|
from oslo_utils import uuidutils
|
|
|
|
from ironic.common import exception
|
|
from ironic.common.i18n import _
|
|
from ironic.common import utils
|
|
|
|
|
|
def string(name, value):
|
|
"""Validate that the value is a string
|
|
|
|
:param name: Name of the argument
|
|
:param value: A string value
|
|
:returns: The string value, or None if value is None
|
|
:raises: InvalidParameterValue if the value is not a string
|
|
"""
|
|
if value is None:
|
|
return
|
|
if not isinstance(value, str):
|
|
raise exception.InvalidParameterValue(
|
|
_('Expected string for %s: %s') % (name, value))
|
|
return value
|
|
|
|
|
|
def boolean(name, value):
|
|
"""Validate that the value is a string representing a boolean
|
|
|
|
:param name: Name of the argument
|
|
:param value: A string value
|
|
:returns: The boolean representation of the value, or None if value is None
|
|
:raises: InvalidParameterValue if the value cannot be converted to a
|
|
boolean
|
|
"""
|
|
if value is None:
|
|
return
|
|
try:
|
|
return strutils.bool_from_string(value, strict=True)
|
|
except ValueError as e:
|
|
raise exception.InvalidParameterValue(
|
|
_('Invalid %s: %s') % (name, e))
|
|
|
|
|
|
def uuid(name, value):
|
|
"""Validate that the value is a UUID
|
|
|
|
:param name: Name of the argument
|
|
:param value: A UUID string value
|
|
:returns: The value, or None if value is None
|
|
:raises: InvalidParameterValue if the value is not a valid UUID
|
|
"""
|
|
if value is None:
|
|
return
|
|
if not uuidutils.is_uuid_like(value):
|
|
raise exception.InvalidParameterValue(
|
|
_('Expected UUID for %s: %s') % (name, value))
|
|
return value
|
|
|
|
|
|
def name(name, value):
|
|
"""Validate that the value is a logical name
|
|
|
|
:param name: Name of the argument
|
|
:param value: A logical name string value
|
|
:returns: The value, or None if value is None
|
|
:raises: InvalidParameterValue if the value is not a valid logical name
|
|
"""
|
|
if value is None:
|
|
return
|
|
if not utils.is_valid_logical_name(value):
|
|
raise exception.InvalidParameterValue(
|
|
_('Expected name for %s: %s') % (name, value))
|
|
return value
|
|
|
|
|
|
def uuid_or_name(name, value):
|
|
"""Validate that the value is a UUID or logical name
|
|
|
|
:param name: Name of the argument
|
|
:param value: A UUID or logical name string value
|
|
:returns: The value, or None if value is None
|
|
:raises: InvalidParameterValue if the value is not a valid UUID or
|
|
logical name
|
|
"""
|
|
if value is None:
|
|
return
|
|
if (not utils.is_valid_logical_name(value)
|
|
and not uuidutils.is_uuid_like(value)):
|
|
raise exception.InvalidParameterValue(
|
|
_('Expected UUID or name for %s: %s') % (name, value))
|
|
return value
|
|
|
|
|
|
def string_list(name, value):
|
|
"""Validate and convert comma delimited string to a list.
|
|
|
|
:param name: Name of the argument
|
|
:param value: A comma separated string of values
|
|
:returns: A list of unique values (lower-cased), maintaining the
|
|
same order, or None if value is None
|
|
:raises: InvalidParameterValue if the value is not a string
|
|
"""
|
|
value = string(name, value)
|
|
if value is None:
|
|
return
|
|
items = []
|
|
for v in str(value).split(','):
|
|
v_norm = v.strip().lower()
|
|
if v_norm and v_norm not in items:
|
|
items.append(v_norm)
|
|
return items
|
|
|
|
|
|
def integer(name, value):
|
|
"""Validate that the value represents an integer
|
|
|
|
:param name: Name of the argument
|
|
:param value: A value representing an integer
|
|
:returns: The value as an int, or None if value is None
|
|
:raises: InvalidParameterValue if the value does not represent an integer
|
|
"""
|
|
if value is None:
|
|
return
|
|
try:
|
|
return int(value)
|
|
except (ValueError, TypeError):
|
|
raise exception.InvalidParameterValue(
|
|
_('Expected an integer for %s: %s') % (name, value))
|
|
|
|
|
|
def mac_address(name, value):
|
|
"""Validate that the value represents a MAC address
|
|
|
|
:param name: Name of the argument
|
|
:param value: A string value representing a MAC address
|
|
:returns: The value as a normalized MAC address, or None if value is None
|
|
:raises: InvalidParameterValue if the value is not a valid MAC address
|
|
"""
|
|
if value is None:
|
|
return
|
|
try:
|
|
return utils.validate_and_normalize_mac(value)
|
|
except exception.InvalidMAC:
|
|
raise exception.InvalidParameterValue(
|
|
_('Expected valid MAC address for %s: %s') % (name, value))
|
|
|
|
|
|
def _or(name, value, validators):
|
|
last_error = None
|
|
for v in validators:
|
|
try:
|
|
return v(name=name, value=value)
|
|
except exception.Invalid as e:
|
|
last_error = e
|
|
if last_error:
|
|
raise last_error
|
|
|
|
|
|
def or_valid(*validators):
|
|
"""Validates if at least one supplied validator passes
|
|
|
|
:param name: Name of the argument
|
|
:param value: A value
|
|
:returns: The value returned from the first successful validator
|
|
:raises: The error from the last validator when
|
|
every validation fails
|
|
"""
|
|
assert validators, 'No validators specified for or_valid'
|
|
return functools.partial(_or, validators=validators)
|
|
|
|
|
|
def _and(name, value, validators):
|
|
for v in validators:
|
|
value = v(name=name, value=value)
|
|
return value
|
|
|
|
|
|
def and_valid(*validators):
|
|
"""Validates that every supplied validator passes
|
|
|
|
The value returned from each validator is passed as the value to the next
|
|
one.
|
|
|
|
:param name: Name of the argument
|
|
:param value: A value
|
|
:returns: The value transformed through every supplied validator
|
|
:raises: The error from the first failed validator
|
|
"""
|
|
assert validators, 'No validators specified for or_valid'
|
|
return functools.partial(_and, validators=validators)
|
|
|
|
|
|
def _validate_schema(name, value, schema):
|
|
if value is None:
|
|
return
|
|
try:
|
|
jsonschema.validate(value, schema)
|
|
except jsonschema.exceptions.ValidationError as e:
|
|
|
|
# The error message includes the whole schema which can be very
|
|
# large and unhelpful, so truncate it to be brief and useful
|
|
error_msg = ' '.join(str(e).split("\n")[:3])[:-1]
|
|
raise exception.InvalidParameterValue(
|
|
_('Schema error for %s: %s') % (name, error_msg))
|
|
return value
|
|
|
|
|
|
def schema(schema):
|
|
"""Return a validator function which validates the value with jsonschema
|
|
|
|
:param: schema dict representing jsonschema to validate with
|
|
:returns: validator function which takes name and value arguments
|
|
"""
|
|
jsonschema.Draft4Validator.check_schema(schema)
|
|
|
|
return functools.partial(_validate_schema, schema=schema)
|
|
|
|
|
|
def _validate_dict(name, value, validators):
|
|
if value is None:
|
|
return
|
|
_validate_types(name, value, (dict, ))
|
|
|
|
for k, v in validators.items():
|
|
if k in value:
|
|
value[k] = v(name=k, value=value[k])
|
|
|
|
return value
|
|
|
|
|
|
def dict_valid(**validators):
|
|
"""Return a validator function which validates dict fields
|
|
|
|
Validators will replace the value with the validation result. Any dict
|
|
item which has no validator is ignored. When a key is missing in the value
|
|
then the corresponding validator will not be run.
|
|
|
|
:param: validators dict where the key is a dict key to validate and the
|
|
value is a validator function to run on that value
|
|
:returns: validator function which takes name and value arguments
|
|
"""
|
|
return functools.partial(_validate_dict, validators=validators)
|
|
|
|
|
|
def _validate_types(name, value, types):
|
|
if not isinstance(value, types):
|
|
str_types = ', '.join([str(t) for t in types])
|
|
raise exception.InvalidParameterValue(
|
|
_('Expected types %s for %s: %s') % (str_types, name, value))
|
|
return value
|
|
|
|
|
|
def types(*types):
|
|
"""Return a validator function which checks the value is one of the types
|
|
|
|
:param: types one or more types to use for the isinstance test
|
|
:returns: validator function which takes name and value arguments
|
|
"""
|
|
# Replace None with the None type
|
|
types = tuple((type(None) if tp is None else tp) for tp in types)
|
|
return functools.partial(_validate_types, types=types)
|
|
|
|
|
|
def _apply_validator(name, value, val_functions):
|
|
if callable(val_functions):
|
|
return val_functions(name, value)
|
|
|
|
for v in val_functions:
|
|
value = v(name, value)
|
|
return value
|
|
|
|
|
|
def _inspect(function):
|
|
sig = inspect.signature(function)
|
|
param_keyword = None # **kwargs parameter
|
|
param_positional = None # *args parameter
|
|
params = []
|
|
|
|
for param in sig.parameters.values():
|
|
if param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
|
|
params.append(param)
|
|
elif param.kind == inspect.Parameter.VAR_KEYWORD:
|
|
param_keyword = param
|
|
elif param.kind == inspect.Parameter.VAR_POSITIONAL:
|
|
param_positional = param
|
|
else:
|
|
assert False, 'Unsupported parameter kind %s %s' % (
|
|
param.name, param.kind
|
|
)
|
|
return params, param_positional, param_keyword
|
|
|
|
|
|
def validate(*args, **kwargs):
|
|
"""Decorator which validates and transforms function arguments
|
|
|
|
"""
|
|
assert not args, 'Validators must be specifed by argument name'
|
|
assert kwargs, 'No validators specified'
|
|
validators = kwargs
|
|
|
|
def inner_function(function):
|
|
params, param_positional, param_keyword = _inspect(function)
|
|
|
|
@functools.wraps(function)
|
|
def inner_check_args(*args, **kwargs):
|
|
args = list(args)
|
|
args_len = len(args)
|
|
kwargs_next = {}
|
|
next_arg_index = 0
|
|
|
|
if not param_keyword:
|
|
# ensure each named argument belongs to a param
|
|
kwarg_keys = set(kwargs)
|
|
param_names = set(p.name for p in params)
|
|
extra_args = kwarg_keys.difference(param_names)
|
|
if extra_args:
|
|
raise exception.InvalidParameterValue(
|
|
_('Unexpected arguments: %s') % ', '.join(extra_args))
|
|
|
|
for i, param in enumerate(params):
|
|
|
|
if i == 0 and param.name == 'self':
|
|
# skip validating self
|
|
continue
|
|
|
|
val_function = validators.get(param.name)
|
|
if not val_function:
|
|
continue
|
|
|
|
if i < args_len:
|
|
# validate positional argument
|
|
args[i] = val_function(param.name, args[i])
|
|
next_arg_index = i + 1
|
|
|
|
elif param.name in kwargs:
|
|
# validate keyword argument
|
|
kwargs_next[param.name] = val_function(
|
|
param.name, kwargs.pop(param.name))
|
|
elif param.default == inspect.Parameter.empty:
|
|
# no argument was provided, and there is no default
|
|
# in the parameter, so this is a mandatory argument
|
|
raise exception.MissingParameterValue(
|
|
_('Missing mandatory parameter: %s') % param.name)
|
|
|
|
if param_positional:
|
|
# handle validating *args
|
|
val_function = validators.get(param_positional.name)
|
|
remaining = args[next_arg_index:]
|
|
if val_function and remaining:
|
|
args = args[:next_arg_index]
|
|
args.extend(val_function(param_positional.name, remaining))
|
|
|
|
# handle validating remaining **kwargs
|
|
if kwargs:
|
|
val_function = (param_keyword
|
|
and validators.get(param_keyword.name))
|
|
if val_function:
|
|
kwargs_next.update(
|
|
val_function(param_keyword.name, kwargs))
|
|
else:
|
|
# make sure unvalidated keyword arguments are kept
|
|
kwargs_next.update(kwargs)
|
|
|
|
return function(*args, **kwargs_next)
|
|
return inner_check_args
|
|
return inner_function
|
|
|
|
|
|
patch = schema({
|
|
'type': 'array',
|
|
'items': {
|
|
'type': 'object',
|
|
'properties': {
|
|
'path': {'type': 'string', 'pattern': '^(/[\\w-]+)+$'},
|
|
'op': {'type': 'string', 'enum': ['add', 'replace', 'remove']},
|
|
'value': {}
|
|
},
|
|
'additionalProperties': False,
|
|
'required': ['op', 'path']
|
|
}
|
|
})
|
|
"""Validate a patch API operation"""
|