Implement validation on the Identity V3 API

Use JSONSchema to validate CRUD operations on the V3 Identity resources.
This includes wrapping the create and update methods for Users and
Groups.

Co-Authored-By: Lin Hua Cheng <os.lcheng@gmail.com>

Change-Id: Ia260838c85f897c52740217d8d222bb86edc11c6
bp: identity-api-validation
Closes-Bug: #999084
Closes-Bug: #1387605
This commit is contained in:
Lance Bragstad 2014-10-30 21:58:29 +00:00 committed by Brant Knudson
parent 885a49da6b
commit f2103ffdcd
3 changed files with 253 additions and 4 deletions

View File

@ -19,8 +19,10 @@ from oslo_log import log
from keystone.common import controller
from keystone.common import dependency
from keystone.common import validation
from keystone import exception
from keystone.i18n import _, _LW
from keystone.identity import schema
from keystone import notifications
@ -205,9 +207,8 @@ class UserV3(controller.V3Controller):
self.check_protection(context, prep_info, ref)
@controller.protected()
@validation.validated(schema.user_create, 'user')
def create_user(self, context, user):
self._require_attribute(user, 'name')
# The manager layer will generate the unique ID for users
ref = self._normalize_dict(user)
ref = self._normalize_domain_id(context, ref)
@ -243,6 +244,7 @@ class UserV3(controller.V3Controller):
return UserV3.wrap_member(context, ref)
@controller.protected()
@validation.validated(schema.user_update, 'user')
def update_user(self, context, user_id, user):
return self._update_user(context, user_id, user)
@ -291,9 +293,8 @@ class GroupV3(controller.V3Controller):
self.get_member_from_driver = self.identity_api.get_group
@controller.protected()
@validation.validated(schema.group_create, 'group')
def create_group(self, context, group):
self._require_attribute(group, 'name')
# The manager layer will generate the unique ID for groups
ref = self._normalize_dict(group)
ref = self._normalize_domain_id(context, ref)
@ -321,6 +322,7 @@ class GroupV3(controller.V3Controller):
return GroupV3.wrap_member(context, ref)
@controller.protected()
@validation.validated(schema.group_update, 'group')
def update_group(self, context, group_id, group):
self._require_matching_id(group_id, group)
self._require_matching_domain_id(

View File

@ -0,0 +1,67 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.common import validation
from keystone.common.validation import parameter_types
# NOTE(lhcheng): the max length is not applicable since it is specific
# to the SQL backend, LDAP does not have length limitation.
_identity_name = {
'type': 'string',
'minLength': 1
}
_user_properties = {
'default_project_id': validation.nullable(parameter_types.id_string),
'description': validation.nullable(parameter_types.description),
'domain_id': parameter_types.id_string,
'enabled': parameter_types.boolean,
'name': _identity_name,
'password': {
'type': ['string', 'null']
}
}
user_create = {
'type': 'object',
'properties': _user_properties,
'required': ['name'],
'additionalProperties': True
}
user_update = {
'type': 'object',
'properties': _user_properties,
'minProperties': 1,
'additionalProperties': True
}
_group_properties = {
'description': validation.nullable(parameter_types.description),
'domain_id': parameter_types.id_string,
'name': _identity_name
}
group_create = {
'type': 'object',
'properties': _group_properties,
'required': ['name'],
'additionalProperties': True
}
group_update = {
'type': 'object',
'properties': _group_properties,
'minProperties': 1,
'additionalProperties': True
}

View File

@ -22,6 +22,7 @@ from keystone.contrib.endpoint_filter import schema as endpoint_filter_schema
from keystone.contrib.federation import schema as federation_schema
from keystone.credential import schema as credential_schema
from keystone import exception
from keystone.identity import schema as identity_schema
from keystone.policy import schema as policy_schema
from keystone.resource import schema as resource_schema
from keystone.tests import unit
@ -1567,3 +1568,182 @@ class ServiceProviderValidationTestCase(unit.BaseTestCase):
self.assertRaises(exception.SchemaValidationError,
self.update_sp_validator.validate,
request_to_validate)
class UserValidationTestCase(unit.BaseTestCase):
"""Test for V3 User API validation."""
def setUp(self):
super(UserValidationTestCase, self).setUp()
self.user_name = uuid.uuid4().hex
create = identity_schema.user_create
update = identity_schema.user_update
self.create_user_validator = validators.SchemaValidator(create)
self.update_user_validator = validators.SchemaValidator(update)
def test_validate_user_create_request_succeeds(self):
"""Test that validating a user create request succeeds."""
request_to_validate = {'name': self.user_name}
self.create_user_validator.validate(request_to_validate)
def test_validate_user_create_with_all_valid_parameters_succeeds(self):
"""Test that validating a user create request succeeds."""
request_to_validate = {'name': self.user_name,
'default_project_id': uuid.uuid4().hex,
'domain_id': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
'enabled': True,
'email': uuid.uuid4().hex,
'password': uuid.uuid4().hex}
self.create_user_validator.validate(request_to_validate)
def test_validate_user_create_fails_without_name(self):
"""Exception raised when validating a user without name."""
request_to_validate = {'email': uuid.uuid4().hex}
self.assertRaises(exception.SchemaValidationError,
self.create_user_validator.validate,
request_to_validate)
def test_validate_user_create_fails_with_name_of_zero_length(self):
"""Exception raised when validating a username with length of zero."""
request_to_validate = {'name': ''}
self.assertRaises(exception.SchemaValidationError,
self.create_user_validator.validate,
request_to_validate)
def test_validate_user_create_fails_with_name_of_wrong_type(self):
"""Exception raised when validating a username of wrong type."""
request_to_validate = {'name': True}
self.assertRaises(exception.SchemaValidationError,
self.create_user_validator.validate,
request_to_validate)
def test_validate_user_create_succeeds_with_valid_enabled_formats(self):
"""Validate acceptable enabled formats in create user requests."""
for enabled in _VALID_ENABLED_FORMATS:
request_to_validate = {'name': self.user_name,
'enabled': enabled}
self.create_user_validator.validate(request_to_validate)
def test_validate_user_create_fails_with_invalid_enabled_formats(self):
"""Exception raised when enabled is not an acceptable format."""
for invalid_enabled in _INVALID_ENABLED_FORMATS:
request_to_validate = {'name': self.user_name,
'enabled': invalid_enabled}
self.assertRaises(exception.SchemaValidationError,
self.create_user_validator.validate,
request_to_validate)
def test_validate_user_create_succeeds_with_extra_attributes(self):
"""Validate extra parameters on user create requests."""
request_to_validate = {'name': self.user_name,
'other_attr': uuid.uuid4().hex}
self.create_user_validator.validate(request_to_validate)
def test_validate_user_create_succeeds_with_password_of_zero_length(self):
"""Validate empty password on user create requests."""
request_to_validate = {'name': self.user_name,
'password': ''}
self.create_user_validator.validate(request_to_validate)
def test_validate_user_create_succeeds_with_null_password(self):
"""Validate that password is nullable on create user."""
request_to_validate = {'name': self.user_name,
'password': None}
self.create_user_validator.validate(request_to_validate)
def test_validate_user_create_fails_with_invalid_password_type(self):
"""Exception raised when user password is of the wrong type."""
request_to_validate = {'name': self.user_name,
'password': True}
self.assertRaises(exception.SchemaValidationError,
self.create_user_validator.validate,
request_to_validate)
def test_validate_user_create_succeeds_with_null_description(self):
"""Validate that description can be nullable on create user."""
request_to_validate = {'name': self.user_name,
'description': None}
self.create_user_validator.validate(request_to_validate)
def test_validate_user_update_succeeds(self):
"""Validate an update user request."""
request_to_validate = {'email': uuid.uuid4().hex}
self.update_user_validator.validate(request_to_validate)
def test_validate_user_update_fails_with_no_parameters(self):
"""Exception raised when updating nothing."""
request_to_validate = {}
self.assertRaises(exception.SchemaValidationError,
self.update_user_validator.validate,
request_to_validate)
def test_validate_user_update_succeeds_with_extra_parameters(self):
"""Validate user update requests with extra parameters."""
request_to_validate = {'other_attr': uuid.uuid4().hex}
self.update_user_validator.validate(request_to_validate)
class GroupValidationTestCase(unit.BaseTestCase):
"""Test for V3 Group API validation."""
def setUp(self):
super(GroupValidationTestCase, self).setUp()
self.group_name = uuid.uuid4().hex
create = identity_schema.group_create
update = identity_schema.group_update
self.create_group_validator = validators.SchemaValidator(create)
self.update_group_validator = validators.SchemaValidator(update)
def test_validate_group_create_succeeds(self):
"""Validate create group requests."""
request_to_validate = {'name': self.group_name}
self.create_group_validator.validate(request_to_validate)
def test_validate_group_create_succeeds_with_all_parameters(self):
"""Validate create group requests with all parameters."""
request_to_validate = {'name': self.group_name,
'description': uuid.uuid4().hex,
'domain_id': uuid.uuid4().hex}
self.create_group_validator.validate(request_to_validate)
def test_validate_group_create_fails_without_group_name(self):
"""Exception raised when group name is not provided in request."""
request_to_validate = {'description': uuid.uuid4().hex}
self.assertRaises(exception.SchemaValidationError,
self.create_group_validator.validate,
request_to_validate)
def test_validate_group_create_fails_when_group_name_is_too_short(self):
"""Exception raised when group name is equal to zero."""
request_to_validate = {'name': ''}
self.assertRaises(exception.SchemaValidationError,
self.create_group_validator.validate,
request_to_validate)
def test_validate_group_create_succeeds_with_extra_parameters(self):
"""Validate extra attributes on group create requests."""
request_to_validate = {'name': self.group_name,
'other_attr': uuid.uuid4().hex}
self.create_group_validator.validate(request_to_validate)
def test_validate_group_update_succeeds(self):
"""Validate group update requests."""
request_to_validate = {'description': uuid.uuid4().hex}
self.update_group_validator.validate(request_to_validate)
def test_validate_group_update_fails_with_no_parameters(self):
"""Exception raised when no parameters passed in on update."""
request_to_validate = {}
self.assertRaises(exception.SchemaValidationError,
self.update_group_validator.validate,
request_to_validate)
def test_validate_group_update_succeeds_with_extra_parameters(self):
"""Validate group update requests with extra parameters."""
request_to_validate = {'other_attr': uuid.uuid4().hex}
self.update_group_validator.validate(request_to_validate)