Add Application Credentials controller
Add the controller, router, schema, and policies for application credentials. If a secret is not provided, one is generated at the controller layer. bp application-credentials Depends-on: Id26a2790acae25f80bd28a8cb121c80cb5064645 Depends-on: Icbd58464182b082854fb5d73ccc93c900ede020c Change-Id: I7a371d59c19a11e55f17baf12d92327c1258533d
This commit is contained in:
parent
c2b93dcb36
commit
166eced28b
@ -236,6 +236,10 @@ identity:delete_domain_config DELETE /v3/domains/{d
|
||||
identity:get_domain_config_default GET /v3/domains/config/default
|
||||
GET /v3/domains/config/{group}/default
|
||||
GET /v3/domains/config/{group}/{option}/default
|
||||
identity:get_application_credential GET /v3/users/{user_id}/application_credentials/{application_credential_id}
|
||||
identity:list_application_credentials GET /v3/users/{user_id}/application_credentials
|
||||
identity:create_application_credential POST /v3/users/{user_id}/application_credential
|
||||
identity:delete_application_credential DELETE /v3/users/{user_id}/application_credential/{application_credential_id}
|
||||
========================================================= ===
|
||||
|
||||
.. _grant_resources:
|
||||
|
@ -251,5 +251,10 @@
|
||||
"identity:get_security_compliance_domain_config": "",
|
||||
"identity:update_domain_config": "rule:cloud_admin",
|
||||
"identity:delete_domain_config": "rule:cloud_admin",
|
||||
"identity:get_domain_config_default": "rule:cloud_admin"
|
||||
"identity:get_domain_config_default": "rule:cloud_admin",
|
||||
|
||||
"identity:get_application_credential": "rule:admin_or_owner",
|
||||
"identity:list_application_credentials": "rule:admin_or_owner",
|
||||
"identity:create_application_credential": "rule:admin_or_owner",
|
||||
"identity:delete_application_credential": "rule:admin_or_owner"
|
||||
}
|
||||
|
@ -10,4 +10,5 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystone.application_credential import controllers # noqa
|
||||
from keystone.application_credential.core import * # noqa
|
||||
|
143
keystone/application_credential/controllers.py
Normal file
143
keystone/application_credential/controllers.py
Normal file
@ -0,0 +1,143 @@
|
||||
# Copyright 2018 SUSE Linux GmbH
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Workflow Logic the Application Credential service."""
|
||||
|
||||
import base64
|
||||
import os
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from keystone.application_credential import schema
|
||||
from keystone.common import controller
|
||||
from keystone.common import provider_api
|
||||
from keystone.common import utils
|
||||
from keystone.common import validation
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
from keystone.i18n import _
|
||||
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class ApplicationCredentialV3(controller.V3Controller):
|
||||
collection_name = 'application_credentials'
|
||||
member_name = 'application_credential'
|
||||
_public_parameters = frozenset([
|
||||
'id',
|
||||
'name',
|
||||
'description',
|
||||
'expires_at',
|
||||
'project_id',
|
||||
'roles',
|
||||
# secret is only exposed after create, it is not stored
|
||||
'secret',
|
||||
'links',
|
||||
'unrestricted'
|
||||
])
|
||||
|
||||
def _normalize_role_list(self, app_cred_roles):
|
||||
roles = []
|
||||
for role in app_cred_roles:
|
||||
if role.get('id'):
|
||||
roles.append(role)
|
||||
else:
|
||||
roles.append(PROVIDERS.role_api.get_unique_role_by_name(
|
||||
role['name']))
|
||||
return roles
|
||||
|
||||
def _generate_secret(self):
|
||||
length = 64
|
||||
secret = os.urandom(length)
|
||||
secret = base64.urlsafe_b64encode(secret)
|
||||
secret = secret.rstrip(b'=')
|
||||
secret = secret.decode('utf-8')
|
||||
return secret
|
||||
|
||||
@classmethod
|
||||
def _add_self_referential_link(cls, context, ref):
|
||||
path = ('/users/%(user_id)s/application_credentials') % {
|
||||
'user_id': ref['user_id']}
|
||||
ref.setdefault('links', {})
|
||||
ref['links']['self'] = cls.base_url(
|
||||
context, path=path) + '/' + ref['id']
|
||||
return ref
|
||||
|
||||
@classmethod
|
||||
def wrap_member(cls, context, ref):
|
||||
cls._add_self_referential_link(context, ref)
|
||||
ref = cls.filter_params(ref)
|
||||
return {cls.member_name: ref}
|
||||
|
||||
@controller.protected()
|
||||
def create_application_credential(self, request, user_id,
|
||||
application_credential):
|
||||
validation.lazy_validate(schema.application_credential_create,
|
||||
application_credential)
|
||||
|
||||
token = request.auth_context['token']
|
||||
|
||||
if request.context.user_id != user_id:
|
||||
action = _("Cannot create an application credential for another "
|
||||
"user")
|
||||
raise exception.ForbiddenAction(action=action)
|
||||
project_id = request.context.project_id
|
||||
app_cred = self._assign_unique_id(application_credential)
|
||||
if not app_cred.get('secret'):
|
||||
app_cred['secret'] = self._generate_secret()
|
||||
app_cred['user_id'] = user_id
|
||||
app_cred['project_id'] = project_id
|
||||
app_cred['roles'] = self._normalize_role_list(
|
||||
app_cred.get('roles', token['roles']))
|
||||
if app_cred.get('expires_at'):
|
||||
app_cred['expires_at'] = utils.parse_expiration_date(
|
||||
app_cred['expires_at'])
|
||||
app_cred = self._normalize_dict(app_cred)
|
||||
app_cred_api = PROVIDERS.application_credential_api
|
||||
try:
|
||||
ref = app_cred_api.create_application_credential(
|
||||
app_cred, initiator=request.audit_initiator
|
||||
)
|
||||
except exception.RoleAssignmentNotFound as e:
|
||||
# Raise a Bad Request, not a Not Found, in accordance with the
|
||||
# API-SIG recommendations:
|
||||
# https://specs.openstack.org/openstack/api-wg/guidelines/http.html#failure-code-clarifications
|
||||
raise exception.ApplicationCredentialValidationError(
|
||||
detail=str(e))
|
||||
return ApplicationCredentialV3.wrap_member(request.context_dict, ref)
|
||||
|
||||
@controller.filterprotected('name')
|
||||
def list_application_credentials(self, request, filters, user_id):
|
||||
app_cred_api = PROVIDERS.application_credential_api
|
||||
hints = ApplicationCredentialV3.build_driver_hints(request, filters)
|
||||
refs = app_cred_api.list_application_credentials(user_id, hints=hints)
|
||||
return ApplicationCredentialV3.wrap_collection(request.context_dict,
|
||||
refs)
|
||||
|
||||
@controller.protected()
|
||||
def get_application_credential(self, request, user_id,
|
||||
application_credential_id):
|
||||
ref = PROVIDERS.application_credential_api.get_application_credential(
|
||||
application_credential_id)
|
||||
return ApplicationCredentialV3.wrap_member(request.context_dict, ref)
|
||||
|
||||
@controller.protected()
|
||||
def delete_application_credential(self, request, user_id,
|
||||
application_credential_id):
|
||||
PROVIDERS.application_credential_api.delete_application_credential(
|
||||
application_credential_id, initiator=request.audit_initiator
|
||||
)
|
54
keystone/application_credential/routers.py
Normal file
54
keystone/application_credential/routers.py
Normal file
@ -0,0 +1,54 @@
|
||||
# Copyright 2018 SUSE Linux GmbH
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""WSGI Routers for the Application Credential service."""
|
||||
|
||||
from keystone.application_credential import controllers
|
||||
from keystone.common import json_home
|
||||
from keystone.common import wsgi
|
||||
|
||||
APP_CRED_RESOURCE_RELATION = json_home.build_v3_resource_relation(
|
||||
'application_credential')
|
||||
APP_CRED_PARAMETER_RELATION = json_home.build_v3_parameter_relation(
|
||||
'application_credential_id')
|
||||
APP_CRED_COLLECTION_PATH = '/users/{user_id}/application_credentials'
|
||||
APP_CRED_RESOURCE_PATH = (
|
||||
'/users/{user_id}/application_credentials/{application_credential_id}'
|
||||
)
|
||||
|
||||
|
||||
class Routers(wsgi.RoutersBase):
|
||||
|
||||
def append_v3_routers(self, mapper, routers):
|
||||
app_cred_controller = controllers.ApplicationCredentialV3()
|
||||
|
||||
self._add_resource(
|
||||
mapper, app_cred_controller,
|
||||
path=APP_CRED_COLLECTION_PATH,
|
||||
get_head_action='list_application_credentials',
|
||||
post_action='create_application_credential',
|
||||
rel=APP_CRED_RESOURCE_RELATION,
|
||||
path_vars={
|
||||
'user_id': json_home.Parameters.USER_ID,
|
||||
})
|
||||
|
||||
self._add_resource(
|
||||
mapper, app_cred_controller,
|
||||
path=APP_CRED_RESOURCE_PATH,
|
||||
get_head_action='get_application_credential',
|
||||
delete_action='delete_application_credential',
|
||||
rel=APP_CRED_RESOURCE_RELATION,
|
||||
path_vars={
|
||||
'user_id': json_home.Parameters.USER_ID,
|
||||
'application_credential_id': APP_CRED_PARAMETER_RELATION,
|
||||
})
|
50
keystone/application_credential/schema.py
Normal file
50
keystone/application_credential/schema.py
Normal file
@ -0,0 +1,50 @@
|
||||
# Copyright 2018 SUSE Linux GmbH
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystone.common import validation
|
||||
from keystone.common.validation import parameter_types
|
||||
|
||||
_role_properties = {
|
||||
'type': 'array',
|
||||
'items': {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'id': parameter_types.id_string,
|
||||
'name': parameter_types.name
|
||||
},
|
||||
'minProperties': 1,
|
||||
'maxProperties': 1,
|
||||
'additionalProperties': False
|
||||
}
|
||||
}
|
||||
|
||||
_application_credential_properties = {
|
||||
'name': parameter_types.name,
|
||||
'description': validation.nullable(parameter_types.description),
|
||||
'secret': {
|
||||
'type': ['null', 'string']
|
||||
},
|
||||
'expires_at': {
|
||||
'type': ['null', 'string']
|
||||
},
|
||||
'roles': _role_properties,
|
||||
'unrestricted': parameter_types.boolean
|
||||
}
|
||||
|
||||
application_credential_create = {
|
||||
'type': 'object',
|
||||
'properties': _application_credential_properties,
|
||||
'required': ['name'],
|
||||
'additionanlProperties': True
|
||||
}
|
@ -13,6 +13,7 @@
|
||||
import itertools
|
||||
|
||||
from keystone.common.policies import access_token
|
||||
from keystone.common.policies import application_credential
|
||||
from keystone.common.policies import auth
|
||||
from keystone.common.policies import base
|
||||
from keystone.common.policies import consumer
|
||||
@ -49,6 +50,7 @@ from keystone.common.policies import user
|
||||
def list_rules():
|
||||
return itertools.chain(
|
||||
base.list_rules(),
|
||||
application_credential.list_rules(),
|
||||
access_token.list_rules(),
|
||||
auth.list_rules(),
|
||||
consumer.list_rules(),
|
||||
|
65
keystone/common/policies/application_credential.py
Normal file
65
keystone/common/policies/application_credential.py
Normal file
@ -0,0 +1,65 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_policy import policy
|
||||
|
||||
from keystone.common.policies import base
|
||||
|
||||
collection_path = '/v3/users/{user_id}/application_credentials'
|
||||
resource_path = collection_path + '/{application_credential_id}'
|
||||
|
||||
application_credential_policies = [
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'get_application_credential',
|
||||
check_str=base.RULE_ADMIN_OR_OWNER,
|
||||
# FIXME(cmurphy) A system administrator should be able to manage any
|
||||
# application credential. A user with a role on a project should be
|
||||
# able to manage their own application credential. We don't currently
|
||||
# have a way of describing how a project administrator should or should
|
||||
# not be able to manage application credentials related to their
|
||||
# project. scope_types will remain commented out for now and will be
|
||||
# updated when we have an answer for this. The same applies to the
|
||||
# other policies in this file.
|
||||
# scope_types=['system', 'project'],
|
||||
description='Show application credential details.',
|
||||
operations=[{'path': resource_path,
|
||||
'method': 'GET'},
|
||||
{'path': resource_path,
|
||||
'method': 'HEAD'}]),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'list_application_credentials',
|
||||
check_str=base.RULE_ADMIN_OR_OWNER,
|
||||
# scope_types=['system', 'project'],
|
||||
description='List application credentials for a user.',
|
||||
operations=[{'path': collection_path,
|
||||
'method': 'GET'},
|
||||
{'path': collection_path,
|
||||
'method': 'HEAD'}]),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'create_application_credential',
|
||||
check_str=base.RULE_ADMIN_OR_OWNER,
|
||||
# scope_types=['system', 'project'],
|
||||
description='Create an application credential.',
|
||||
operations=[{'path': collection_path,
|
||||
'method': 'POST'}]),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'delete_application_credential',
|
||||
check_str=base.RULE_ADMIN_OR_OWNER,
|
||||
# scope_types=['system', 'project'],
|
||||
description='Delete an application credential.',
|
||||
operations=[{'path': resource_path,
|
||||
'method': 'DELETE'}])
|
||||
]
|
||||
|
||||
|
||||
def list_rules():
|
||||
return application_credential_policies
|
@ -171,6 +171,10 @@ class AmbiguityError(ValidationError):
|
||||
" resolve the ambiguity.")
|
||||
|
||||
|
||||
class ApplicationCredentialValidationError(ValidationError):
|
||||
message_format = _("Invalid application credential: %(detail)s")
|
||||
|
||||
|
||||
class CircularRegionHierarchyError(Error):
|
||||
message_format = _("The specified parent region %(parent_region_id)s "
|
||||
"would create a circular region hierarchy.")
|
||||
|
231
keystone/tests/unit/test_v3_application_credential.py
Normal file
231
keystone/tests/unit/test_v3_application_credential.py
Normal file
@ -0,0 +1,231 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
from testtools import matchers
|
||||
import uuid
|
||||
|
||||
from six.moves import http_client
|
||||
|
||||
from keystone.common import provider_api
|
||||
import keystone.conf
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import test_v3
|
||||
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
MEMBER_PATH_FMT = '/users/%(user_id)s/application_credentials/%(app_cred_id)s'
|
||||
|
||||
|
||||
class ApplicationCredentialTestCase(test_v3.RestfulTestCase):
|
||||
"""Test CRUD operations for application credentials."""
|
||||
|
||||
def _app_cred_body(self, roles=None, name=None, expires=None, secret=None):
|
||||
name = name or uuid.uuid4().hex
|
||||
description = 'Credential for backups'
|
||||
app_cred_data = {
|
||||
'name': name,
|
||||
'description': description
|
||||
}
|
||||
if roles:
|
||||
app_cred_data['roles'] = roles
|
||||
if expires:
|
||||
app_cred_data['expires_at'] = expires
|
||||
if secret:
|
||||
app_cred_data['secret'] = secret
|
||||
return {'application_credential': app_cred_data}
|
||||
|
||||
def test_create_application_credential(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
app_cred_body = self._app_cred_body(roles=roles)
|
||||
resp = self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
# Create operation returns the secret
|
||||
self.assertIn('secret', resp.json['application_credential'])
|
||||
# But not the stored hash
|
||||
self.assertNotIn('secret_hash', resp.json['application_credential'])
|
||||
|
||||
def test_create_application_credential_with_secret(self):
|
||||
secret = 'supersecuresecret'
|
||||
roles = [{'id': self.role_id}]
|
||||
app_cred_body = self._app_cred_body(roles=roles, secret=secret)
|
||||
resp = self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
self.assertEqual(secret, resp.json['application_credential']['secret'])
|
||||
|
||||
def test_create_application_credential_roles_from_token(self):
|
||||
app_cred_body = self._app_cred_body()
|
||||
resp = self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
self.assertThat(resp.json['application_credential']['roles'],
|
||||
matchers.HasLength(1))
|
||||
self.assertEqual(resp.json['application_credential']['roles'][0]['id'],
|
||||
self.role_id)
|
||||
|
||||
def test_create_application_credential_wrong_user(self):
|
||||
wrong_user = unit.create_user(PROVIDERS.identity_api,
|
||||
test_v3.DEFAULT_DOMAIN_ID)
|
||||
roles = [{'id': self.role_id}]
|
||||
app_cred_body = self._app_cred_body(roles=roles)
|
||||
self.post('/users/%s/application_credentials' % wrong_user['id'],
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.FORBIDDEN)
|
||||
|
||||
def test_create_application_credential_bad_role(self):
|
||||
roles = [{'id': uuid.uuid4().hex}]
|
||||
app_cred_body = self._app_cred_body(roles=roles)
|
||||
self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def test_create_application_credential_with_expiration(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
expires = datetime.datetime.utcnow() + datetime.timedelta(days=365)
|
||||
app_cred_body = self._app_cred_body(roles=roles, expires=expires)
|
||||
self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
|
||||
def test_create_application_credential_invalid_expiration_fmt(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
expires = 'next tuesday'
|
||||
app_cred_body = self._app_cred_body(roles=roles, expires=expires)
|
||||
self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def test_create_application_credential_already_expired(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
expires = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
|
||||
app_cred_body = self._app_cred_body(roles=roles, expires=expires)
|
||||
self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.BAD_REQUEST)
|
||||
|
||||
def test_create_application_credential_allow_recursion(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
app_cred_body = self._app_cred_body(roles=roles)
|
||||
app_cred_body['application_credential']['unrestricted'] = True
|
||||
self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
|
||||
def test_list_application_credentials(self):
|
||||
resp = self.get('/users/%s/application_credentials' % self.user_id,
|
||||
expected_status=http_client.OK)
|
||||
self.assertEqual([], resp.json['application_credentials'])
|
||||
roles = [{'id': self.role_id}]
|
||||
app_cred_body = self._app_cred_body(roles=roles)
|
||||
self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
resp = self.get('/users/%s/application_credentials' % self.user_id,
|
||||
expected_status=http_client.OK)
|
||||
self.assertEqual(1, len(resp.json['application_credentials']))
|
||||
self.assertNotIn('secret', resp.json['application_credentials'][0])
|
||||
self.assertNotIn('secret_hash',
|
||||
resp.json['application_credentials'][0])
|
||||
app_cred_body['application_credential']['name'] = 'two'
|
||||
self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
resp = self.get('/users/%s/application_credentials' % self.user_id,
|
||||
expected_status=http_client.OK)
|
||||
self.assertEqual(2, len(resp.json['application_credentials']))
|
||||
for ac in resp.json['application_credentials']:
|
||||
self.assertNotIn('secret', ac)
|
||||
self.assertNotIn('secret_hash', ac)
|
||||
|
||||
def test_list_application_credentials_by_name(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
app_cred_body = self._app_cred_body(roles=roles)
|
||||
name = app_cred_body['application_credential']['name']
|
||||
search_path = ('/users/%(user_id)s/application_credentials?'
|
||||
'name=%(name)s') % {'user_id': self.user_id,
|
||||
'name': name}
|
||||
resp = self.get(search_path, expected_status=http_client.OK)
|
||||
self.assertEqual([], resp.json['application_credentials'])
|
||||
self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
resp = self.get(search_path, expected_status=http_client.OK)
|
||||
self.assertEqual(1, len(resp.json['application_credentials']))
|
||||
self.assertNotIn('secret', resp.json['application_credentials'][0])
|
||||
self.assertNotIn('secret_hash',
|
||||
resp.json['application_credentials'][0])
|
||||
app_cred_body['application_credential']['name'] = 'two'
|
||||
self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
resp = self.get(search_path, expected_status=http_client.OK)
|
||||
self.assertEqual(1, len(resp.json['application_credentials']))
|
||||
self.assertEqual(resp.json['application_credentials'][0]['name'], name)
|
||||
|
||||
def test_get_head_application_credential(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
app_cred_body = self._app_cred_body(roles=roles)
|
||||
resp = self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
|
||||
app_cred_id = resp.json['application_credential']['id']
|
||||
self.head(MEMBER_PATH_FMT % {'user_id': self.user_id,
|
||||
'app_cred_id': app_cred_id},
|
||||
expected_status=http_client.OK)
|
||||
expected_response = resp.json
|
||||
expected_response['application_credential'].pop('secret')
|
||||
resp = self.get(MEMBER_PATH_FMT % {'user_id': self.user_id,
|
||||
'app_cred_id': app_cred_id},
|
||||
expected_status=http_client.OK)
|
||||
self.assertDictEqual(resp.json, expected_response)
|
||||
|
||||
def test_get_head_application_credential_not_found(self):
|
||||
self.head(MEMBER_PATH_FMT % {'user_id': self.user_id,
|
||||
'app_cred_id': uuid.uuid4().hex},
|
||||
expected_status=http_client.NOT_FOUND)
|
||||
self.get(MEMBER_PATH_FMT % {'user_id': self.user_id,
|
||||
'app_cred_id': uuid.uuid4().hex},
|
||||
expected_status=http_client.NOT_FOUND)
|
||||
|
||||
def test_delete_application_credential(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
app_cred_body = self._app_cred_body(roles=roles)
|
||||
resp = self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
app_cred_id = resp.json['application_credential']['id']
|
||||
self.delete(MEMBER_PATH_FMT % {'user_id': self.user_id,
|
||||
'app_cred_id': app_cred_id},
|
||||
expected_status=http_client.NO_CONTENT)
|
||||
|
||||
def test_delete_application_credential_not_found(self):
|
||||
self.delete(MEMBER_PATH_FMT % {'user_id': self.user_id,
|
||||
'app_cred_id': uuid.uuid4().hex},
|
||||
expected_status=http_client.NOT_FOUND)
|
||||
|
||||
def test_update_application_credential(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
app_cred_body = self._app_cred_body(roles=roles)
|
||||
resp = self.post('/users/%s/application_credentials' % self.user_id,
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.CREATED)
|
||||
# Application credentials are immutable
|
||||
app_cred_body['application_credential']['description'] = "New Things"
|
||||
app_cred_id = resp.json['application_credential']['id']
|
||||
self.patch(MEMBER_PATH_FMT % {'user_id': self.user_id,
|
||||
'app_cred_id': app_cred_id},
|
||||
body=app_cred_body,
|
||||
expected_status=http_client.NOT_FOUND)
|
@ -13,6 +13,7 @@
|
||||
|
||||
import uuid
|
||||
|
||||
from keystone.application_credential import schema as app_cred_schema
|
||||
from keystone.assignment import schema as assignment_schema
|
||||
from keystone.catalog import schema as catalog_schema
|
||||
from keystone.common import validation
|
||||
@ -2672,3 +2673,84 @@ class LimitValidationTestCase(unit.BaseTestCase):
|
||||
self.assertRaises(exception.SchemaValidationError,
|
||||
self.update_limits_validator.validate,
|
||||
request_to_validate)
|
||||
|
||||
|
||||
class ApplicationCredentialValidatorTestCase(unit.TestCase):
|
||||
_valid_roles = [{'name': 'member'},
|
||||
{'id': uuid.uuid4().hex},
|
||||
{'id': str(uuid.uuid4())},
|
||||
{'name': '_member_'}]
|
||||
_invalid_roles = [True, 123, None, {'badkey': 'badval'}]
|
||||
|
||||
def setUp(self):
|
||||
super(ApplicationCredentialValidatorTestCase, self).setUp()
|
||||
|
||||
create = app_cred_schema.application_credential_create
|
||||
self.create_app_cred_validator = validators.SchemaValidator(create)
|
||||
|
||||
def test_validate_app_cred_request(self):
|
||||
request_to_validate = {
|
||||
'name': 'myappcred',
|
||||
'description': 'My App Cred',
|
||||
'roles': [{'name': 'member'}],
|
||||
'expires_at': 'tomorrow'
|
||||
}
|
||||
self.create_app_cred_validator.validate(request_to_validate)
|
||||
|
||||
def test_validate_app_cred_request_without_name_fails(self):
|
||||
request_to_validate = {
|
||||
'description': 'My App Cred',
|
||||
'roles': [{'name': 'member'}],
|
||||
'expires_at': 'tomorrow'
|
||||
}
|
||||
self.assertRaises(exception.SchemaValidationError,
|
||||
self.create_app_cred_validator.validate,
|
||||
request_to_validate)
|
||||
|
||||
def test_validate_app_cred_with_invalid_expires_at_fails(self):
|
||||
request_to_validate = {
|
||||
'name': 'myappcred',
|
||||
'description': 'My App Cred',
|
||||
'roles': [{'name': 'member'}],
|
||||
'expires_at': 3
|
||||
}
|
||||
self.assertRaises(exception.SchemaValidationError,
|
||||
self.create_app_cred_validator.validate,
|
||||
request_to_validate)
|
||||
|
||||
def test_validate_app_cred_with_null_expires_at_succeeds(self):
|
||||
request_to_validate = {
|
||||
'name': 'myappcred',
|
||||
'description': 'My App Cred',
|
||||
'roles': [{'name': 'member'}],
|
||||
}
|
||||
self.create_app_cred_validator.validate(request_to_validate)
|
||||
|
||||
def test_validate_app_cred_with_unrestricted_flag_succeeds(self):
|
||||
request_to_validate = {
|
||||
'name': 'myappcred',
|
||||
'description': 'My App Cred',
|
||||
'roles': [{'name': 'member'}],
|
||||
'unrestricted': True
|
||||
}
|
||||
self.create_app_cred_validator.validate(request_to_validate)
|
||||
|
||||
def test_validate_app_cred_with_secret_succeeds(self):
|
||||
request_to_validate = {
|
||||
'name': 'myappcred',
|
||||
'description': 'My App Cred',
|
||||
'roles': [{'name': 'member'}],
|
||||
'secret': 'secretsecretsecretsecret'
|
||||
}
|
||||
self.create_app_cred_validator.validate(request_to_validate)
|
||||
|
||||
def test_validate_app_cred_invalid_roles_fails(self):
|
||||
for role in self._invalid_roles:
|
||||
request_to_validate = {
|
||||
'name': 'myappcred',
|
||||
'description': 'My App Cred',
|
||||
'roles': [role]
|
||||
}
|
||||
self.assertRaises(exception.SchemaValidationError,
|
||||
self.create_app_cred_validator.validate,
|
||||
request_to_validate)
|
||||
|
@ -175,6 +175,11 @@ FEDERATED_AUTH_URL = ('/OS-FEDERATION/identity_providers/{idp_id}'
|
||||
FEDERATED_IDP_SPECIFIC_WEBSSO = ('/auth/OS-FEDERATION/identity_providers/'
|
||||
'{idp_id}/protocols/{protocol_id}/websso')
|
||||
|
||||
APPLICATION_CREDENTIAL = ('/users/{user_id}/application_credentials/'
|
||||
'{application_credential_id}')
|
||||
APPLICATION_CREDENTIAL_RELATION = (
|
||||
json_home.build_v3_parameter_relation('application_credential_id'))
|
||||
|
||||
V3_JSON_HOME_RESOURCES = {
|
||||
json_home.build_v3_resource_relation('auth_tokens'): {
|
||||
'href': '/auth/tokens'},
|
||||
@ -653,6 +658,11 @@ V3_JSON_HOME_RESOURCES = {
|
||||
},
|
||||
'hints': {'status': 'experimental'}
|
||||
},
|
||||
json_home.build_v3_resource_relation('application_credential'): {
|
||||
'href-template': APPLICATION_CREDENTIAL,
|
||||
'href-vars': {
|
||||
'application_credential_id': APPLICATION_CREDENTIAL_RELATION,
|
||||
'user_id': json_home.build_v3_parameter_relation('user_id')}}
|
||||
}
|
||||
|
||||
|
||||
|
@ -19,6 +19,7 @@ from oslo_log import log
|
||||
from paste import deploy
|
||||
import routes
|
||||
|
||||
from keystone.application_credential import routers as app_cred_routers
|
||||
from keystone.assignment import routers as assignment_routers
|
||||
from keystone.auth import routers as auth_routers
|
||||
from keystone.catalog import routers as catalog_routers
|
||||
@ -127,6 +128,7 @@ def v3_app_factory(global_conf, **local_conf):
|
||||
catalog_routers,
|
||||
credential_routers,
|
||||
identity_routers,
|
||||
app_cred_routers,
|
||||
limit_routers,
|
||||
policy_routers,
|
||||
resource_routers,
|
||||
|
Loading…
Reference in New Issue
Block a user