packetary/packetary/api/validators.py

112 lines
3.6 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import functools
import inspect
import jsonschema
_SENTINEL = object()
def _get_default_arguments(func):
try:
signature = inspect.signature(func)
return {p.name: p.default for p in signature.parameters.values()}
except AttributeError:
pass
args = inspect.getargspec(func)
if args.defaults:
return {
k: v for k, v in zip(reversed(args.args), reversed(args.defaults))
}
return {}
def _get_args_count(func):
try:
return len(inspect.signature(func).parameters)
except AttributeError:
pass
return len(inspect.getargspec(func).args)
def _validate_data(data, schema):
"""Validate the input data using jsonschema validation.
:param data: a data to validate represented as a dict
:param schema: a schema to validate represented as a dict;
must be in JSON Schema Draft 4 format.
"""
try:
jsonschema.validate(data, schema)
except jsonschema.ValidationError as e:
_raise_validation_error("data", e.message, e.path)
except jsonschema.SchemaError as e:
_raise_validation_error("schema", e.message, e.schema_path)
def _raise_validation_error(what, details, path):
message = "Invalid {0}: {1}.".format(what, details)
if path:
message += "\nField: [{0}]".format(
"][".join(repr(p) for p in path)
)
raise ValueError(message)
def _build_validator(schema):
# check that schema is method of class and expected self argument
if callable(schema) and _get_args_count(schema) > 0:
def validator(self, value):
_validate_data(value, schema(self))
elif callable(schema):
def validator(_, value):
_validate_data(value, schema())
else:
def validator(_, value):
return _validate_data(value, schema)
return validator
def declare_schema(**schemas):
"""Declares data schema for function arguments.
:param schemas: the mapping, where key is argument name, value is schema
the schema may be callable object or method of class
in this case the wrapped function should be method of
same class
:raises ValueError: if the passed data does not fit declared schema
"""
def decorator(func):
validators = {k: _build_validator(v) for k, v in schemas.items()}
defaults = _get_default_arguments(func)
@functools.wraps(func)
def wrapper(*args, **kwargs):
bound_args = inspect.getcallargs(func, *args, **kwargs)
for n, v in bound_args.items():
if v is not defaults.get(n, _SENTINEL) and n in validators:
validators[n](args and args[0] or None, v)
return func(*args, **kwargs)
return wrapper
return decorator