A mixin for jsonpatch requests validation

A JSONPatch-enabled API requires validation logic which will check the
correctness of incoming request body according to schema. A mixin is
added which will do such validations.

Such validation logic may be useful not only for Artifacts API but also
for regular v2 images API, as it also uses JSONPatch notation in its
update calls. So this mixin may be used in future in Images API as well.

Implements-blueprint: artifact-repository
Change-Id: I1b7b72bcc0a0455803a4e99dd17a9732b0dd7a8d
This commit is contained in:
Inessa Vasilevskaya 2015-01-20 18:52:01 +04:00 committed by Mike Fedosin
parent 8e4b2dad0b
commit 65682fc81a
3 changed files with 210 additions and 0 deletions

View File

@ -537,3 +537,20 @@ class UnknownArtifactType(NotFound):
class ArtifactInvalidStateTransition(Invalid):
message = _("Artifact state cannot be changed from %(curr)s to %(to)s")
class JsonPatchException(GlanceException):
message = _("Invalid jsonpatch request")
class InvalidJsonPatchBody(JsonPatchException):
message = _("The provided body %(body)s is invalid "
"under given schema: %(schema)s")
class InvalidJsonPatchPath(JsonPatchException):
message = _("The provided path '%(path)s' is invalid: %(explanation)s")
def __init__(self, message=None, *args, **kwargs):
self.explanation = kwargs.get("explanation")
super(InvalidJsonPatchPath, self).__init__(message, *args, **kwargs)

View File

@ -0,0 +1,122 @@
# Copyright 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A mixin that validates the given body for jsonpatch-compatibility.
The methods supported are limited to listed in METHODS_ALLOWED
"""
import re
import jsonschema
import glance.common.exception as exc
from glance.openstack.common._i18n import _
class JsonPatchValidatorMixin(object):
# a list of allowed methods allowed according to RFC 6902
ALLOWED = ["replace", "test", "remove", "add", "copy"]
PATH_REGEX_COMPILED = re.compile("^/[^/]+(/[^/]+)*$")
def __init__(self, methods_allowed=["replace", "remove"]):
self.schema = self._gen_schema(methods_allowed)
self.methods_allowed = [m for m in methods_allowed
if m in self.ALLOWED]
@staticmethod
def _gen_schema(methods_allowed):
"""
Generates a jsonschema for jsonpatch request based on methods_allowed
"""
# op replace needs no 'value' param, so needs a special schema if
# present in methods_allowed
basic_schema = {
"type": "array",
"items": {"properties": {"op": {"type": "string",
"enum": methods_allowed},
"path": {"type": "string"},
"value": {"type": ["string",
"object",
"integer",
"array",
"boolean"]}
},
"required": ["op", "path", "value"],
"type": "object"},
"$schema": "http://json-schema.org/draft-04/schema#"
}
if "remove" in methods_allowed:
methods_allowed.remove("remove")
no_remove_op_schema = {
"type": "object",
"properties": {
"op": {"type": "string", "enum": methods_allowed},
"path": {"type": "string"},
"value": {"type": ["string", "object",
"integer", "array", "boolean"]}
},
"required": ["op", "path", "value"]}
op_remove_only_schema = {
"type": "object",
"properties": {
"op": {"type": "string", "enum": ["remove"]},
"path": {"type": "string"}
},
"required": ["op", "path"]}
basic_schema = {
"type": "array",
"items": {
"oneOf": [no_remove_op_schema, op_remove_only_schema]},
"$schema": "http://json-schema.org/draft-04/schema#"
}
return basic_schema
def validate_body(self, body):
try:
jsonschema.validate(body, self.schema)
# now make sure everything is ok with path
return [{"path": self._decode_json_pointer(e["path"]),
"value": e.get("value", None),
"op": e["op"]} for e in body]
except jsonschema.ValidationError:
raise exc.InvalidJsonPatchBody(body=body, schema=self.schema)
def _check_for_path_errors(self, pointer):
if not re.match(self.PATH_REGEX_COMPILED, pointer):
msg = _("Json path should start with a '/', "
"end with no '/', no 2 subsequent '/' are allowed.")
raise exc.InvalidJsonPatchPath(path=pointer, explanation=msg)
if re.search('~[^01]', pointer) or pointer.endswith('~'):
msg = _("Pointer contains '~' which is not part of"
" a recognized escape sequence [~0, ~1].")
raise exc.InvalidJsonPatchPath(path=pointer, explanation=msg)
def _decode_json_pointer(self, pointer):
"""Parses a json pointer. Returns a pointer as a string.
Json Pointers are defined in
http://tools.ietf.org/html/draft-pbryan-zyp-json-pointer .
The pointers use '/' for separation between object attributes.
A '/' character in an attribute name is encoded as "~1" and
a '~' character is encoded as "~0".
"""
self._check_for_path_errors(pointer)
ret = []
for part in pointer.lstrip('/').split('/'):
ret.append(part.replace('~1', '/').replace('~0', '~').strip())
return '/'.join(ret)

View File

@ -0,0 +1,71 @@
# Copyright (c) 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import glance.common.exception as exc
import glance.common.jsonpatchvalidator as jpv
import glance.tests.utils as utils
class TestValidator(jpv.JsonPatchValidatorMixin):
def __init__(self, methods_allowed=["replace", "add"]):
super(TestValidator, self).__init__(methods_allowed)
class TestJsonPatchMixin(utils.BaseTestCase):
def test_body_validation(self):
validator = TestValidator()
validator.validate_body(
[{"op": "replace", "path": "/param", "value": "ok"}])
# invalid if not a list of [{"op": "", "path": "", "value": ""}]
# is passed
self.assertRaises(exc.JsonPatchException, validator.validate_body,
{"op": "replace", "path": "/me",
"value": "should be a list"})
def test_value_validation(self):
# a string, a list and a dict are valid value types
validator = TestValidator()
validator.validate_body(
[{"op": "replace", "path": "/param", "value": "ok string"}])
validator.validate_body(
[{"op": "replace", "path": "/param",
"value": ["ok list", "really ok"]}])
validator.validate_body(
[{"op": "replace", "path": "/param", "value": {"ok": "dict"}}])
def test_op_validation(self):
validator = TestValidator(methods_allowed=["replace", "add", "copy"])
validator.validate_body(
[{"op": "copy", "path": "/param", "value": "ok"},
{"op": "replace", "path": "/param/1", "value": "ok"}])
self.assertRaises(
exc.JsonPatchException, validator.validate_body,
[{"op": "test", "path": "/param", "value": "not allowed"}])
self.assertRaises(exc.JsonPatchException, validator.validate_body,
[{"op": "nosuchmethodatall", "path": "/param",
"value": "no way"}])
def test_path_validation(self):
validator = TestValidator()
bad_body_part = {"op": "add", "value": "bad path"}
for bad_path in ["/param/", "param", "//param", "/param~2", "/param~"]:
bad_body_part["path"] = bad_path
bad_body = [bad_body_part]
self.assertRaises(exc.JsonPatchException,
validator.validate_body, bad_body)
ok_body = [{"op": "add", "value": "some value",
"path": "/param~1/param~0"}]
body = validator.validate_body(ok_body)[0]
self.assertEqual("param//param~", body["path"])