Add Application Credentials manager
Add the manager layer for application credentials. This handles generating CADF notifications on create/delete and listening for notifications that affect application credentials' lifetime. On create, the manager keeps a copy of the initial secret so that it may be returned to the user, but it is otherwise never stored. The secret hash is stored and must be filtered out before being returned to the user. bp application-credentials Change-Id: Iae6377e78d2b8e15472d378ef54e29a946dc51b5
This commit is contained in:
parent
52a32aa583
commit
716abfca59
|
@ -10,4 +10,4 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
pass
|
||||
from keystone.application_credential.core import * # noqa
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
# Copyright 2018 SUSE Linux GmbH
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Main entry point into the Application Credential service."""
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from keystone.common import cache
|
||||
from keystone.common import driver_hints
|
||||
from keystone.common import manager
|
||||
from keystone.common import provider_api
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
from keystone import notifications
|
||||
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
MEMOIZE = cache.get_memoization_decorator(group='application_credential')
|
||||
LOG = log.getLogger(__name__)
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class Manager(manager.Manager):
|
||||
"""Default pivot point for the Application Credential backend.
|
||||
|
||||
See :mod:`keystone.common.manager.Manager` for more details on how this
|
||||
dynamically calls the backend.
|
||||
|
||||
"""
|
||||
|
||||
driver_namespace = 'keystone.application_credential'
|
||||
_provides_api = 'application_credential_api'
|
||||
|
||||
_APP_CRED = 'application_credential'
|
||||
|
||||
def __init__(self):
|
||||
super(Manager, self).__init__(CONF.application_credential.driver)
|
||||
self._register_callback_listeners()
|
||||
|
||||
def _register_callback_listeners(self):
|
||||
notifications.register_event_callback(
|
||||
notifications.ACTIONS.deleted, 'user',
|
||||
self._delete_app_creds_on_user_delete_callback)
|
||||
notifications.register_event_callback(
|
||||
notifications.ACTIONS.disabled, 'user',
|
||||
self._delete_app_creds_on_user_delete_callback)
|
||||
notifications.register_event_callback(
|
||||
notifications.ACTIONS.internal,
|
||||
# This notification is emitted when a role assignment is removed,
|
||||
# we can take advantage of it even though we're not a token.
|
||||
notifications.INVALIDATE_USER_PROJECT_TOKEN_PERSISTENCE,
|
||||
self._delete_app_creds_on_assignment_removal)
|
||||
|
||||
def _delete_app_creds_on_user_delete_callback(
|
||||
self, service, resource_type, operation, payload):
|
||||
user_id = payload['resource_info']
|
||||
self._delete_application_credentials_for_user(user_id)
|
||||
|
||||
def _delete_app_creds_on_assignment_removal(
|
||||
self, service, resource_type, operation, payload):
|
||||
user_id = payload['resource_info']['user_id']
|
||||
project_id = payload['resource_info']['project_id']
|
||||
self._delete_application_credentials_for_user_on_project(user_id,
|
||||
project_id)
|
||||
|
||||
def _get_user_roles(self, user_id, project_id):
|
||||
assignment_list = self.assignment_api.list_role_assignments(
|
||||
user_id=user_id,
|
||||
project_id=project_id,
|
||||
effective=True)
|
||||
return list(set([x['role_id'] for x in assignment_list]))
|
||||
|
||||
def _require_user_has_role_in_project(self, roles, user_id, project_id):
|
||||
user_roles = self._get_user_roles(user_id, project_id)
|
||||
for role in roles:
|
||||
matching_roles = [x for x in user_roles
|
||||
if x == role['id']]
|
||||
if not matching_roles:
|
||||
raise exception.RoleAssignmentNotFound(role_id=role['id'],
|
||||
actor_id=user_id,
|
||||
target_id=project_id)
|
||||
|
||||
def _get_role_list(self, app_cred_roles):
|
||||
roles = []
|
||||
for role in app_cred_roles:
|
||||
roles.append(PROVIDERS.role_api.get_role(role['id']))
|
||||
return roles
|
||||
|
||||
def authenticate(self, request, application_credential_id, secret):
|
||||
"""Authenticate with an application credential.
|
||||
|
||||
:param str application_credential_id: Application Credential ID
|
||||
:param str secret: Application Credential secret
|
||||
|
||||
"""
|
||||
self.driver.authenticate(application_credential_id, secret)
|
||||
|
||||
def _process_app_cred(self, app_cred_ref):
|
||||
app_cred_ref = app_cred_ref.copy()
|
||||
app_cred_ref.pop('secret_hash')
|
||||
app_cred_ref['roles'] = self._get_role_list(
|
||||
app_cred_ref['roles'])
|
||||
return app_cred_ref
|
||||
|
||||
def create_application_credential(self, application_credential,
|
||||
initiator=None):
|
||||
"""Create a new application credential.
|
||||
|
||||
:param dict application_credential: Application Credential data
|
||||
:param initiator: CADF initiator
|
||||
|
||||
:returns: a new application credential
|
||||
"""
|
||||
application_credential = application_credential.copy()
|
||||
user_id = application_credential['user_id']
|
||||
project_id = application_credential['project_id']
|
||||
roles = application_credential.pop('roles', [])
|
||||
self._require_user_has_role_in_project(roles, user_id, project_id)
|
||||
unhashed_secret = application_credential['secret']
|
||||
ref = self.driver.create_application_credential(
|
||||
application_credential, roles)
|
||||
ref['secret'] = unhashed_secret
|
||||
ref = self._process_app_cred(ref)
|
||||
notifications.Audit.created(
|
||||
self._APP_CRED,
|
||||
application_credential['id'],
|
||||
initiator)
|
||||
return ref
|
||||
|
||||
@MEMOIZE
|
||||
def get_application_credential(self, application_credential_id):
|
||||
"""Get application credential details.
|
||||
|
||||
:param str application_credential_id: Application Credential ID
|
||||
|
||||
:returns: an application credential
|
||||
"""
|
||||
app_cred = self.driver.get_application_credential(
|
||||
application_credential_id)
|
||||
return self._process_app_cred(app_cred)
|
||||
|
||||
def list_application_credentials(self, user_id, hints=None):
|
||||
"""List application credentials for a user.
|
||||
|
||||
:param str user_id: User ID
|
||||
:param dict hints: Properties to filter on
|
||||
|
||||
:returns: a list of application credentials
|
||||
"""
|
||||
hints = hints or driver_hints.Hints()
|
||||
app_cred_list = self.driver.list_application_credentials_for_user(
|
||||
user_id, hints)
|
||||
return [self._process_app_cred(app_cred) for app_cred in app_cred_list]
|
||||
|
||||
def delete_application_credential(self, application_credential_id,
|
||||
initiator=None):
|
||||
"""Delete an application credential.
|
||||
|
||||
:param str application_credential_id: Application Credential ID
|
||||
:param initiator: CADF initiator
|
||||
|
||||
:raises keystone.exception.ApplicationCredentialNotFound: If the
|
||||
application credential doesn't exist.
|
||||
"""
|
||||
self.get_application_credential.invalidate()
|
||||
self.driver.delete_application_credential(application_credential_id)
|
||||
notifications.Audit.deleted(
|
||||
self._APP_CRED, application_credential_id, initiator)
|
||||
|
||||
def _delete_application_credentials_for_user(self, user_id):
|
||||
"""Delete all application credentials for a user.
|
||||
|
||||
:param str user_id: User ID
|
||||
|
||||
This is triggered when a user is deleted.
|
||||
"""
|
||||
self.driver.delete_application_credentials_for_user(user_id)
|
||||
|
||||
def _delete_application_credentials_for_user_on_project(self, user_id,
|
||||
project_id):
|
||||
"""Delete all application credentials for a user on a given project.
|
||||
|
||||
:param str user_id: User ID
|
||||
:param str project_id: Project ID
|
||||
|
||||
This is triggered when a user loses a role assignment on a project.
|
||||
"""
|
||||
self.driver.delete_application_credentials_for_user_on_project(
|
||||
user_id, project_id)
|
|
@ -19,6 +19,7 @@ import oslo_messaging
|
|||
from oslo_middleware import cors
|
||||
from osprofiler import opts as profiler
|
||||
|
||||
from keystone.conf import application_credential
|
||||
from keystone.conf import assignment
|
||||
from keystone.conf import auth
|
||||
from keystone.conf import catalog
|
||||
|
@ -53,6 +54,7 @@ CONF = cfg.CONF
|
|||
|
||||
|
||||
conf_modules = [
|
||||
application_credential,
|
||||
assignment,
|
||||
auth,
|
||||
catalog,
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from keystone.conf import utils
|
||||
|
||||
|
||||
driver = cfg.StrOpt(
|
||||
'driver',
|
||||
default='sql',
|
||||
help=utils.fmt("""
|
||||
Entry point for the application credential backend driver in the
|
||||
`keystone.application_credential` namespace. Keystone only provides a `sql`
|
||||
driver, so there is no reason to change this unless you are providing a custom
|
||||
entry point.
|
||||
"""))
|
||||
|
||||
caching = cfg.BoolOpt(
|
||||
'caching',
|
||||
default=True,
|
||||
help=utils.fmt("""
|
||||
Toggle for application credential caching. This has no effect unless global
|
||||
caching is enabled.
|
||||
"""))
|
||||
|
||||
cache_time = cfg.IntOpt(
|
||||
'cache_time',
|
||||
help=utils.fmt("""
|
||||
Time to cache application credential data in seconds. This has no effect
|
||||
unless global caching is enabled.
|
||||
"""))
|
||||
|
||||
|
||||
GROUP_NAME = __name__.split('.')[-1]
|
||||
ALL_OPTS = [
|
||||
driver,
|
||||
caching,
|
||||
cache_time,
|
||||
]
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
conf.register_opts(ALL_OPTS, group=GROUP_NAME)
|
||||
|
||||
|
||||
def list_opts():
|
||||
return {GROUP_NAME: ALL_OPTS}
|
|
@ -481,6 +481,11 @@ class ConfigRegistrationNotFound(Exception):
|
|||
pass
|
||||
|
||||
|
||||
class ApplicationCredentialNotFound(NotFound):
|
||||
message_format = _("Could not find Application Credential: "
|
||||
"%(application_credential_id)s.")
|
||||
|
||||
|
||||
class Conflict(Error):
|
||||
message_format = _("Conflict occurred attempting to store %(type)s -"
|
||||
" %(details)s.")
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystone import application_credential
|
||||
from keystone import assignment
|
||||
from keystone import auth
|
||||
from keystone import catalog
|
||||
|
@ -39,7 +40,8 @@ def load_backends():
|
|||
cache.configure_cache(region=identity.ID_MAPPING_REGION)
|
||||
cache.configure_invalidation_region()
|
||||
|
||||
managers = [assignment.Manager, catalog.Manager, credential.Manager,
|
||||
managers = [application_credential.Manager, assignment.Manager,
|
||||
catalog.Manager, credential.Manager,
|
||||
credential.provider.Manager, resource.DomainConfigManager,
|
||||
endpoint_policy.Manager, federation.Manager,
|
||||
identity.generator.Manager, identity.MappingManager,
|
||||
|
|
|
@ -10,8 +10,15 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystone.application_credential.backends import sql as sql_driver
|
||||
from keystone.common import provider_api
|
||||
from keystone.common import sql
|
||||
from keystone.tests.unit.application_credential import test_backends
|
||||
from keystone.tests.unit.backend import core_sql
|
||||
from keystone.tests.unit.ksfixtures import database
|
||||
|
||||
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class SQLModelTestCase(core_sql.BaseBackendSqlModels):
|
||||
|
@ -30,3 +37,13 @@ class SQLModelTestCase(core_sql.BaseBackendSqlModels):
|
|||
cols = (('application_credential_id', sql.Integer, None),
|
||||
('role_id', sql.String, 64))
|
||||
self.assertExpectedSchema('application_credential_role', cols)
|
||||
|
||||
|
||||
class SQLDriverTestCase(core_sql.BaseBackendSqlTests,
|
||||
test_backends.ApplicationCredentialTests):
|
||||
def setUp(self):
|
||||
self.useFixture(database.Database())
|
||||
self.driver = sql_driver.ApplicationCredential()
|
||||
super(SQLDriverTestCase, self).setUp()
|
||||
|
||||
self.app_cred_api = PROVIDERS.application_credential_api
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from keystone.common import driver_hints
|
||||
from keystone.common import provider_api
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class ApplicationCredentialTests(object):
|
||||
|
||||
def _new_app_cred_data(self, user_id, project_id, name=None,
|
||||
expires=None):
|
||||
if not name:
|
||||
name = uuid.uuid4().hex
|
||||
if not expires:
|
||||
expires = datetime.datetime.utcnow() + datetime.timedelta(days=365)
|
||||
app_cred_data = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': name,
|
||||
'description': uuid.uuid4().hex,
|
||||
'user_id': user_id,
|
||||
'project_id': project_id,
|
||||
'expires_at': expires,
|
||||
'roles': [
|
||||
{'id': self.role__member_['id']},
|
||||
],
|
||||
'secret': uuid.uuid4().hex,
|
||||
'allow_application_credential_creation': False
|
||||
}
|
||||
return app_cred_data
|
||||
|
||||
def test_create_application_credential(self):
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'])
|
||||
resp = self.app_cred_api.create_application_credential(app_cred)
|
||||
resp_roles = resp.pop('roles')
|
||||
orig_roles = app_cred.pop('roles')
|
||||
self.assertDictEqual(app_cred, resp)
|
||||
self.assertEqual(orig_roles[0]['id'], resp_roles[0]['id'])
|
||||
|
||||
def test_create_duplicate_application_credential_fails(self):
|
||||
# Ensure a user can't create two application credentials with the same
|
||||
# name
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'])
|
||||
name = app_cred['name']
|
||||
self.app_cred_api.create_application_credential(app_cred)
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'],
|
||||
name=name)
|
||||
self.assertRaises(exception.Conflict,
|
||||
self.app_cred_api.create_application_credential,
|
||||
app_cred)
|
||||
|
||||
def test_create_application_credential_require_role_assignments(self):
|
||||
# Ensure a user can't create an application credential for a project
|
||||
# they don't have a role assignment on
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_baz['id'])
|
||||
self.assertRaises(exception.RoleAssignmentNotFound,
|
||||
self.app_cred_api.create_application_credential,
|
||||
app_cred)
|
||||
|
||||
def test_application_credential_allow_recursion(self):
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'])
|
||||
app_cred['allow_application_credential_creation'] = True
|
||||
resp = self.app_cred_api.create_application_credential(app_cred)
|
||||
resp.pop('roles')
|
||||
app_cred.pop('roles')
|
||||
self.assertDictEqual(app_cred, resp)
|
||||
|
||||
def test_get_application_credential(self):
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'])
|
||||
create_resp = self.app_cred_api.create_application_credential(app_cred)
|
||||
app_cred_id = create_resp['id']
|
||||
get_resp = self.app_cred_api.get_application_credential(app_cred_id)
|
||||
create_resp.pop('secret')
|
||||
self.assertDictEqual(create_resp, get_resp)
|
||||
|
||||
def test_get_application_credential_not_found(self):
|
||||
self.assertRaises(exception.ApplicationCredentialNotFound,
|
||||
self.app_cred_api.get_application_credential,
|
||||
uuid.uuid4().hex)
|
||||
|
||||
def test_list_application_credentials(self):
|
||||
app_cred_1 = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'],
|
||||
name='app1')
|
||||
app_cred_2 = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'],
|
||||
name='app2')
|
||||
app_cred_3 = self._new_app_cred_data(self.user_two['id'],
|
||||
self.tenant_baz['id'],
|
||||
name='app3')
|
||||
resp1 = self.app_cred_api.create_application_credential(app_cred_1)
|
||||
resp2 = self.app_cred_api.create_application_credential(app_cred_2)
|
||||
resp3 = self.app_cred_api.create_application_credential(app_cred_3)
|
||||
hints = driver_hints.Hints()
|
||||
resp = self.app_cred_api.list_application_credentials(
|
||||
self.user_foo['id'], hints)
|
||||
resp_ids = [ac['id'] for ac in resp]
|
||||
self.assertIn(resp1['id'], resp_ids)
|
||||
self.assertIn(resp2['id'], resp_ids)
|
||||
self.assertNotIn(resp3['id'], resp_ids)
|
||||
for ac in resp:
|
||||
self.assertNotIn('secret_hash', ac)
|
||||
|
||||
def _list_ids(self, user):
|
||||
hints = driver_hints.Hints()
|
||||
resp = self.app_cred_api.list_application_credentials(user['id'],
|
||||
hints)
|
||||
return [ac['id'] for ac in resp]
|
||||
|
||||
def test_delete_application_credential(self):
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'])
|
||||
self.app_cred_api.create_application_credential(app_cred)
|
||||
self.assertIn(app_cred['id'], self._list_ids(self.user_foo))
|
||||
self.app_cred_api.delete_application_credential(app_cred['id'])
|
||||
self.assertNotIn(app_cred['id'], self._list_ids(self.user_foo))
|
||||
|
||||
def test_delete_application_credential_not_found(self):
|
||||
self.assertRaises(exception.ApplicationCredentialNotFound,
|
||||
self.app_cred_api.delete_application_credential,
|
||||
uuid.uuid4().hex)
|
||||
|
||||
def test_deleting_a_user_deletes_application_credentials(self):
|
||||
app_cred_1 = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'],
|
||||
name='app1')
|
||||
app_cred_2 = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'],
|
||||
name='app2')
|
||||
self.app_cred_api.create_application_credential(app_cred_1)
|
||||
self.app_cred_api.create_application_credential(app_cred_2)
|
||||
self.assertIn(app_cred_1['id'], self._list_ids(self.user_foo))
|
||||
self.assertIn(app_cred_2['id'], self._list_ids(self.user_foo))
|
||||
# This should trigger a notification which should invoke a callback in
|
||||
# the application credential Manager to cleanup user_foo's application
|
||||
# credentials.
|
||||
PROVIDERS.identity_api.delete_user(self.user_foo['id'])
|
||||
hints = driver_hints.Hints()
|
||||
self.assertListEqual(
|
||||
[], self.app_cred_api.list_application_credentials(
|
||||
self.user_foo['id'], hints))
|
||||
|
||||
def test_removing_user_from_project_deletes_application_credentials(self):
|
||||
app_cred_proj_A_1 = self._new_app_cred_data(
|
||||
self.user_foo['id'], self.tenant_bar['id'], name='app1')
|
||||
app_cred_proj_A_2 = self._new_app_cred_data(
|
||||
self.user_foo['id'], self.tenant_bar['id'], name='app2')
|
||||
app_cred_proj_B = self._new_app_cred_data(
|
||||
self.user_foo['id'], self.tenant_baz['id'], name='app3')
|
||||
PROVIDERS.assignment_api.add_role_to_user_and_project(
|
||||
tenant_id=self.tenant_baz['id'],
|
||||
user_id=self.user_foo['id'],
|
||||
role_id=self.role__member_['id'])
|
||||
self.app_cred_api.create_application_credential(app_cred_proj_A_1)
|
||||
self.app_cred_api.create_application_credential(app_cred_proj_A_2)
|
||||
self.app_cred_api.create_application_credential(app_cred_proj_B)
|
||||
self.assertIn(app_cred_proj_A_1['id'], self._list_ids(self.user_foo))
|
||||
self.assertIn(app_cred_proj_A_2['id'], self._list_ids(self.user_foo))
|
||||
self.assertIn(app_cred_proj_B['id'], self._list_ids(self.user_foo))
|
||||
# This should trigger a notification which should invoke a callback in
|
||||
# the application credential Manager to cleanup all of user_foo's
|
||||
# application credentials on project bar.
|
||||
PROVIDERS.assignment_api.remove_role_from_user_and_project(
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'],
|
||||
role_id=self.role__member_['id'])
|
||||
self.assertNotIn(app_cred_proj_A_1['id'],
|
||||
self._list_ids(self.user_foo))
|
||||
self.assertNotIn(app_cred_proj_A_2['id'],
|
||||
self._list_ids(self.user_foo))
|
||||
self.assertIn(app_cred_proj_B['id'], self._list_ids(self.user_foo))
|
||||
|
||||
def test_authenticate(self):
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'])
|
||||
resp = self.app_cred_api.create_application_credential(app_cred)
|
||||
self.app_cred_api.authenticate(
|
||||
self.make_request(), resp['id'], resp['secret'])
|
||||
|
||||
def test_authenticate_not_found(self):
|
||||
self.assertRaises(AssertionError,
|
||||
self.app_cred_api.authenticate,
|
||||
self.make_request(),
|
||||
uuid.uuid4().hex,
|
||||
uuid.uuid4().hex)
|
||||
|
||||
def test_authenticate_expired(self):
|
||||
yesterday = datetime.datetime.utcnow() - datetime.timedelta(days=1)
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'],
|
||||
expires=yesterday)
|
||||
resp = self.app_cred_api.create_application_credential(app_cred)
|
||||
self.assertRaises(AssertionError,
|
||||
self.app_cred_api.authenticate,
|
||||
self.make_request(),
|
||||
resp['id'],
|
||||
resp['secret'])
|
||||
|
||||
def test_authenticate_bad_secret(self):
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
self.tenant_bar['id'])
|
||||
resp = self.app_cred_api.create_application_credential(app_cred)
|
||||
badpass = 'badpass'
|
||||
self.assertNotEqual(badpass, resp['secret'])
|
||||
self.assertRaises(AssertionError,
|
||||
self.app_cred_api.authenticate,
|
||||
self.make_request(),
|
||||
resp['id'],
|
||||
badpass)
|
Loading…
Reference in New Issue