Add JSON schema and validation for role resource

Change-Id: Ie52dcda09f76bfd86b8e1a8b6d71dfd7078ecf22
This commit is contained in:
0weng 2024-12-10 13:06:37 -08:00
parent 7dc3199363
commit 0a690d47b2
3 changed files with 203 additions and 84 deletions
keystone

View File

@ -18,11 +18,11 @@ import flask
import flask_restful
from keystone.api._shared import implied_roles as shared
from keystone.api import validation
from keystone.assignment import schema
from keystone.common import json_home
from keystone.common import provider_api
from keystone.common import rbac_enforcer
from keystone.common import validation
import keystone.conf
from keystone.server import flask as ks_flask
@ -31,6 +31,69 @@ ENFORCER = rbac_enforcer.RBACEnforcer
PROVIDERS = provider_api.ProviderAPIs
class RolesResource(ks_flask.ResourceBase):
collection_key = 'roles'
member_key = 'role'
get_member_from_driver = PROVIDERS.deferred_provider_lookup(
api='role_api', method='get_role'
)
def _is_domain_role(self, role):
return bool(role.get('domain_id'))
@validation.request_query_schema(schema.roles_index_request_query)
@validation.response_body_schema(schema.roles_index_response_body)
def get(self):
"""List roles.
GET/HEAD /v3/roles
"""
filters = ['name', 'domain_id']
domain_filter = flask.request.args.get('domain_id')
if domain_filter:
ENFORCER.enforce_call(
action='identity:list_domain_roles', filters=filters
)
else:
ENFORCER.enforce_call(
action='identity:list_roles', filters=filters
)
hints = self.build_driver_hints(filters)
if not domain_filter:
# NOTE(jamielennox): To handle the default case of not domain_id
# defined the role_assignment backend does some hackery to
# distinguish between global and domain scoped roles. This backend
# behaviour relies upon a value of domain_id being set (not just
# defaulting to None). Manually set the filter if its not
# provided.
hints.add_filter('domain_id', None)
refs = PROVIDERS.role_api.list_roles(hints=hints)
return self.wrap_collection(refs, hints=hints)
@validation.request_body_schema(schema.role_create_request_body)
@validation.response_body_schema(schema.role_show_response_body)
def post(self):
"""Create role.
POST /v3/roles
"""
role = self.request_body_json.get('role', {})
if self._is_domain_role(role):
target = {'role': role}
ENFORCER.enforce_call(
action='identity:create_domain_role', target_attr=target
)
else:
ENFORCER.enforce_call(action='identity:create_role')
role = self._assign_unique_id(role)
role = self._normalize_dict(role)
ref = PROVIDERS.role_api.create_role(
role['id'], role, initiator=self.audit_initiator
)
return self.wrap_member(ref), http.client.CREATED
class RoleResource(ks_flask.ResourceBase):
collection_key = 'roles'
member_key = 'role'
@ -41,17 +104,13 @@ class RoleResource(ks_flask.ResourceBase):
def _is_domain_role(self, role):
return bool(role.get('domain_id'))
def get(self, role_id=None):
"""Get role or list roles.
@validation.request_body_schema(None)
@validation.response_body_schema(schema.role_show_response_body)
def get(self, role_id):
"""Get role.
GET/HEAD /v3/roles
GET/HEAD /v3/roles/{role_id}
"""
if role_id is not None:
return self._get_role(role_id)
return self._list_roles()
def _get_role(self, role_id):
err = None
role = {}
try:
@ -80,51 +139,8 @@ class RoleResource(ks_flask.ResourceBase):
)
return self.wrap_member(role)
def _list_roles(self):
filters = ['name', 'domain_id']
domain_filter = flask.request.args.get('domain_id')
if domain_filter:
ENFORCER.enforce_call(
action='identity:list_domain_roles', filters=filters
)
else:
ENFORCER.enforce_call(
action='identity:list_roles', filters=filters
)
hints = self.build_driver_hints(filters)
if not domain_filter:
# NOTE(jamielennox): To handle the default case of not domain_id
# defined the role_assignment backend does some hackery to
# distinguish between global and domain scoped roles. This backend
# behaviour relies upon a value of domain_id being set (not just
# defaulting to None). Manually set the filter if its not
# provided.
hints.add_filter('domain_id', None)
refs = PROVIDERS.role_api.list_roles(hints=hints)
return self.wrap_collection(refs, hints=hints)
def post(self):
"""Create role.
POST /v3/roles
"""
role = self.request_body_json.get('role', {})
if self._is_domain_role(role):
target = {'role': role}
ENFORCER.enforce_call(
action='identity:create_domain_role', target_attr=target
)
else:
ENFORCER.enforce_call(action='identity:create_role')
validation.lazy_validate(schema.role_create, role)
role = self._assign_unique_id(role)
role = self._normalize_dict(role)
ref = PROVIDERS.role_api.create_role(
role['id'], role, initiator=self.audit_initiator
)
return self.wrap_member(ref), http.client.CREATED
@validation.request_body_schema(schema.role_update_request_body)
@validation.response_body_schema(schema.role_show_response_body)
def patch(self, role_id):
"""Update role.
@ -151,13 +167,14 @@ class RoleResource(ks_flask.ResourceBase):
member_target=role,
)
request_body_role = self.request_body_json.get('role', {})
validation.lazy_validate(schema.role_update, request_body_role)
self._require_matching_id(request_body_role)
ref = PROVIDERS.role_api.update_role(
role_id, request_body_role, initiator=self.audit_initiator
)
return self.wrap_member(ref)
@validation.request_body_schema(None)
@validation.response_body_schema(None)
def delete(self, role_id):
"""Delete role.
@ -299,8 +316,23 @@ class RoleImplicationResource(flask_restful.Resource):
class RoleAPI(ks_flask.APIBase):
_name = 'roles'
_import_name = __name__
resources = [RoleResource]
resource_mapping = [
ks_flask.construct_resource_map(
resource=RolesResource,
url='/roles',
resource_kwargs={},
rel="roles",
path_vars=None,
),
ks_flask.construct_resource_map(
resource=RoleResource,
url='/roles/<string:role_id>',
resource_kwargs={},
rel="role",
path_vars={
'role_id': json_home.build_v3_parameter_relation("role_id")
},
),
ks_flask.construct_resource_map(
resource=RoleImplicationListResource,
url='/roles/<string:prior_role_id>/implies',

View File

@ -10,27 +10,113 @@
# License for the specific language governing permissions and limitations
# under the License.
from typing import Any
from keystone.api.validation import parameter_types
from keystone.api.validation import response_types
from keystone.assignment.role_backends import resource_options as ro
from keystone.common.validation import parameter_types
from keystone.common import validation
# Schema for Identity v3 API
_role_properties = {
'name': parameter_types.name,
'description': parameter_types.description,
'options': ro.ROLE_OPTIONS_REGISTRY.json_schema,
_role_properties: dict[str, Any] = {
"name": parameter_types.name,
"description": validation.nullable(parameter_types.description),
"domain_id": validation.nullable(parameter_types.domain_id),
"options": ro.ROLE_OPTIONS_REGISTRY.json_schema,
}
# NOTE(0weng): Multiple response body examples in the docs are
# incorrectly missing the `options` field.
# Common schema of `Role` resource
role_schema: dict[str, Any] = {
"type": "object",
"description": "A role object.",
"properties": {
"id": {
"type": "string",
"format": "uuid",
"description": "The role ID.",
"readOnly": True,
},
"links": response_types.resource_links,
**_role_properties,
},
"additionalProperties": True,
}
role_create = {
'type': 'object',
'properties': _role_properties,
'required': ['name'],
'additionalProperties': True,
# Response body of API operations returning a single role
# `GET /roles/{role_id}`, `POST /roles`, and `PATCH /roles/{role_id}`
role_show_response_body: dict[str, Any] = {
"type": "object",
"properties": {"role": role_schema},
"additionalProperties": False,
}
role_update = {
'type': 'object',
'properties': _role_properties,
'minProperties': 1,
'additionalProperties': True,
# Query parameters of the `GET /roles` API operation
# returning a list of roles
roles_index_request_query: dict[str, Any] = {
"type": "object",
"properties": {
"name": parameter_types.name,
"domain_id": parameter_types.domain_id,
},
"additionalProperties": False,
}
# Response body of the `GET /roles` API operation
# returning a list of roles
roles_index_response_body: dict[str, Any] = {
"type": "object",
"properties": {
"links": response_types.links,
"roles": {
"type": "array",
"items": role_schema,
"description": "A list of role objects.",
},
"truncated": response_types.truncated,
},
"additionalProperties": False,
}
# Request body of the `POST /roles` API operation
role_create_request_body = {
"type": "object",
"properties": {
"role": {
"type": "object",
"properties": _role_properties,
"description": "A role object.",
"required": ["name"],
"additionalProperties": True,
}
},
"required": ["role"],
"additionalProperties": False,
}
# FIXME(0weng): There's no error if additional properties are added
# at the top level, e.g. POST/PATCH with this body:
# {"role": {"some_key":"some_value"}, "no_error": "no_error_here"}
# Is this intended, or should it be disallowed by the schema (as it is here)?
# 400 errors do occur if the "role" property is missing
# (the error message is that '{}' is not enough properties,
# so I imagine extra properties are removed)
# or no properties are provided.
# Request body of the `PATCH /roles/{role_id}` operation
role_update_request_body = {
"type": "object",
"properties": {
"role": {
"type": "object",
"properties": _role_properties,
"description": "A role object.",
"minProperties": 1,
"additionalProperties": True,
}
},
"required": ["role"],
"additionalProperties": False,
}

View File

@ -883,19 +883,19 @@ class RoleValidationTestCase(unit.BaseTestCase):
self.role_name = 'My Role'
create = assignment_schema.role_create
update = assignment_schema.role_update
create = assignment_schema.role_create_request_body
update = assignment_schema.role_update_request_body
self.create_role_validator = validators.SchemaValidator(create)
self.update_role_validator = validators.SchemaValidator(update)
def test_validate_role_request(self):
"""Test we can successfully validate a create role request."""
request_to_validate = {'name': self.role_name}
request_to_validate = {"role": {'name': self.role_name}}
self.create_role_validator.validate(request_to_validate)
def test_validate_role_create_without_name_raises_exception(self):
"""Test that we raise an exception when `name` isn't included."""
request_to_validate = {'enabled': True}
request_to_validate = {"role": {'enabled': True}}
self.assertRaises(
exception.SchemaValidationError,
self.create_role_validator.validate,
@ -905,7 +905,7 @@ class RoleValidationTestCase(unit.BaseTestCase):
def test_validate_role_create_fails_with_invalid_name(self):
"""Exception when validating a create request with invalid `name`."""
for invalid_name in _INVALID_NAMES:
request_to_validate = {'name': invalid_name}
request_to_validate = {"role": {'name': invalid_name}}
self.assertRaises(
exception.SchemaValidationError,
self.create_role_validator.validate,
@ -915,7 +915,7 @@ class RoleValidationTestCase(unit.BaseTestCase):
def test_validate_role_create_request_with_name_too_long_fails(self):
"""Exception raised when creating a role with `name` too long."""
long_role_name = 'a' * 256
request_to_validate = {'name': long_role_name}
request_to_validate = {"role": {'name': long_role_name}}
self.assertRaises(
exception.SchemaValidationError,
self.create_role_validator.validate,
@ -925,14 +925,15 @@ class RoleValidationTestCase(unit.BaseTestCase):
def test_validate_role_request_with_valid_description(self):
"""Test we can validate `description` in create role request."""
request_to_validate = {
'name': self.role_name,
'description': 'My Role',
"role": {'name': self.role_name, 'description': 'My Role'}
}
self.create_role_validator.validate(request_to_validate)
def test_validate_role_request_fails_with_invalid_description(self):
"""Exception is raised when `description` as a non-string value."""
request_to_validate = {'name': self.role_name, 'description': False}
request_to_validate = {
"role": {'name': self.role_name, 'description': False}
}
self.assertRaises(
exception.SchemaValidationError,
self.create_role_validator.validate,
@ -941,13 +942,13 @@ class RoleValidationTestCase(unit.BaseTestCase):
def test_validate_role_update_request(self):
"""Test that we validate a role update request."""
request_to_validate = {'name': 'My New Role'}
request_to_validate = {"role": {'name': 'My New Role'}}
self.update_role_validator.validate(request_to_validate)
def test_validate_role_update_fails_with_invalid_name(self):
"""Exception when validating an update request with invalid `name`."""
for invalid_name in _INVALID_NAMES:
request_to_validate = {'name': invalid_name}
request_to_validate = {"role": {'name': invalid_name}}
self.assertRaises(
exception.SchemaValidationError,
self.update_role_validator.validate,
@ -957,7 +958,7 @@ class RoleValidationTestCase(unit.BaseTestCase):
def test_validate_role_update_request_with_name_too_long_fails(self):
"""Exception raised when updating a role with `name` too long."""
long_role_name = 'a' * 256
request_to_validate = {'name': long_role_name}
request_to_validate = {"role": {'name': long_role_name}}
self.assertRaises(
exception.SchemaValidationError,
self.update_role_validator.validate,