Use keystone.common.provider_api for token APIs

This change converts the usage of self.<provider_api> to
keystone.common.providers_api.ProviderAPIs.<provider_api> in manager
and controller logic. This is the correct way to reference
providers from other managers and controllers now that dependency
injection has been eliminated.

Change-Id: I853dd2d1fec159316b3101750b87aae9368bac58
This commit is contained in:
Lance Bragstad 2017-12-27 16:02:44 +00:00
parent eb7f1e885e
commit 114edb4108
3 changed files with 64 additions and 49 deletions

View File

@ -24,6 +24,7 @@ from keystone.token.providers import common
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
def authentication_method_generator(request, auth):
@ -87,7 +88,7 @@ class BaseAuthenticationMethod(provider_api.ProviderAPIMixin, object):
msg = _('Tenant name cannot contain reserved characters.')
raise exception.Unauthorized(message=msg)
try:
project_id = self.resource_api.get_project_by_name(
project_id = PROVIDERS.resource_api.get_project_by_name(
project_name, CONF.identity.default_domain_id
)['id']
except exception.ProjectNotFound as e:
@ -137,7 +138,7 @@ class TokenAuthenticationMethod(BaseAuthenticationMethod):
size=CONF.max_token_size)
try:
v3_token_data = self.token_provider_api.validate_token(
v3_token_data = PROVIDERS.token_provider_api.validate_token(
old_token
)
# NOTE(lbragstad): Even though we are not using the v2.0 token
@ -168,7 +169,7 @@ class TokenAuthenticationMethod(BaseAuthenticationMethod):
raise exception.Forbidden('Trusts are disabled.')
elif CONF.trust.enabled and 'trust_id' in auth:
try:
trust_ref = self.trust_api.get_trust(auth['trust_id'])
trust_ref = PROVIDERS.trust_api.get_trust(auth['trust_id'])
except exception.TrustNotFound:
raise exception.Forbidden()
# If a trust is being used to obtain access to another project and
@ -179,7 +180,7 @@ class TokenAuthenticationMethod(BaseAuthenticationMethod):
raise exception.Forbidden()
expiry = token_model_ref.expires
user_ref = self.identity_api.get_user(user_id)
user_ref = PROVIDERS.identity_api.get_user(user_id)
bind = token_model_ref.bind
original_audit_id = token_model_ref.audit_chain_id
return (user_ref, project_id, expiry, bind, original_audit_id)
@ -224,14 +225,14 @@ class LocalAuthenticationMethod(BaseAuthenticationMethod):
raise exception.ValidationSizeError(attribute='username',
size=CONF.max_param_size)
try:
user_ref = self.identity_api.get_user_by_name(
user_ref = PROVIDERS.identity_api.get_user_by_name(
username, CONF.identity.default_domain_id)
user_id = user_ref['id']
except exception.UserNotFound as e:
raise exception.Unauthorized(e)
try:
user_ref = self.identity_api.authenticate(
user_ref = PROVIDERS.identity_api.authenticate(
request,
user_id=user_id,
password=password)
@ -263,7 +264,7 @@ class ExternalAuthenticationMethod(BaseAuthenticationMethod):
raise ExternalAuthNotApplicable()
try:
user_ref = self.identity_api.get_user_by_name(
user_ref = PROVIDERS.identity_api.get_user_by_name(
username, CONF.identity.default_domain_id)
except exception.UserNotFound as e:
raise exception.Unauthorized(e)
@ -317,7 +318,7 @@ class V2TokenDataHelper(provider_api.ProviderAPIMixin, object):
if 'project' in v3_token:
# v3 token_data does not contain all tenant attributes
tenant = self.resource_api.get_project(
tenant = PROVIDERS.resource_api.get_project(
v3_token['project']['id'])
# Drop domain specific fields since v2 calls are not domain-aware.
token['tenant'] = controller.V2Controller.v3_to_v2_project(
@ -337,21 +338,21 @@ class V2TokenDataHelper(provider_api.ProviderAPIMixin, object):
msg = _('Non-default domain is not supported')
if CONF.trust.enabled:
try:
trust_ref = self.trust_api.get_trust(v3_trust['id'])
trust_ref = PROVIDERS.trust_api.get_trust(v3_trust['id'])
except exception.TrustNotFound:
raise exception.TokenNotFound(token_id=token_id)
trustee_user_ref = self.identity_api.get_user(
trustee_user_ref = PROVIDERS.identity_api.get_user(
trust_ref['trustee_user_id'])
if (trustee_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
trustor_user_ref = self.identity_api.get_user(
trustor_user_ref = PROVIDERS.identity_api.get_user(
trust_ref['trustor_user_id'])
if (trustor_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
if trust_ref.get('project_id'):
project_ref = self.resource_api.get_project(
project_ref = PROVIDERS.resource_api.get_project(
trust_ref['project_id'])
if (project_ref['domain_id'] !=
CONF.identity.default_domain_id):
@ -387,7 +388,7 @@ class V2TokenDataHelper(provider_api.ProviderAPIMixin, object):
# Get and build v2 service catalog
token_data['serviceCatalog'] = []
if 'tenant' in token:
catalog_ref = self.catalog_api.get_catalog(
catalog_ref = PROVIDERS.catalog_api.get_catalog(
user['id'], token['tenant']['id'])
if catalog_ref:
token_data['serviceCatalog'] = self.format_catalog(catalog_ref)

View File

@ -23,6 +23,7 @@ import six
from keystone.common import cache
from keystone.common import manager
from keystone.common import provider_api
import keystone.conf
from keystone import exception
from keystone.i18n import _
@ -32,6 +33,7 @@ from keystone import notifications
CONF = keystone.conf.CONF
LOG = log.getLogger(__name__)
PROVIDERS = provider_api.ProviderAPIs
TOKENS_REGION = cache.create_region(name='tokens')
MEMOIZE_TOKENS = cache.get_memoization_decorator(
@ -132,7 +134,7 @@ class Manager(manager.Manager):
except KeyError:
raise exception.TokenNotFound(_('Failed to validate token'))
token_values = self.revoke_api.model.build_token_values(token_data)
self.revoke_api.check_token(token_values)
PROVIDERS.revoke_api.check_token(token_values)
def check_revocation(self, token):
return self.check_revocation_v3(token)
@ -242,11 +244,12 @@ class Manager(manager.Manager):
domain_id = token_ref.domain_id if token_ref.domain_scoped else None
if revoke_chain:
self.revoke_api.revoke_by_audit_chain_id(token_ref.audit_chain_id,
project_id=project_id,
domain_id=domain_id)
PROVIDERS.revoke_api.revoke_by_audit_chain_id(
token_ref.audit_chain_id, project_id=project_id,
domain_id=domain_id
)
else:
self.revoke_api.revoke_by_audit_id(token_ref.audit_id)
PROVIDERS.revoke_api.revoke_by_audit_id(token_ref.audit_id)
if CONF.token.revoke_by_id and self._needs_persistence:
self._persistence.delete_token(token_id=token_id)
@ -264,7 +267,7 @@ class Manager(manager.Manager):
payload):
if CONF.token.revoke_by_id:
trust_id = payload['resource_info']
trust = self.trust_api.get_trust(trust_id, deleted=True)
trust = PROVIDERS.trust_api.get_trust(trust_id, deleted=True)
self._persistence.delete_tokens(user_id=trust['trustor_user_id'],
trust_id=trust_id)
if CONF.token.cache_on_issue:
@ -305,7 +308,7 @@ class Manager(manager.Manager):
if CONF.token.revoke_by_id:
project_id = payload['resource_info']
self._persistence.delete_tokens_for_users(
self.assignment_api.list_user_ids_for_project(project_id),
PROVIDERS.assignment_api.list_user_ids_for_project(project_id),
project_id=project_id)
if CONF.token.cache_on_issue:
# NOTE(amakarov): preserving behavior

View File

@ -36,6 +36,7 @@ from keystone.token.providers import base
LOG = log.getLogger(__name__)
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
def default_expire_time():
@ -96,7 +97,7 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
:returns: A dictionary containing two keys, the `id` of the domain and
the `name` of the domain.
"""
domain_ref = self.resource_api.get_domain(domain_id)
domain_ref = PROVIDERS.resource_api.get_domain(domain_id)
if not domain_ref.get('enabled'):
msg = _('Unable to validate token because domain %(id)s is '
'disabled') % {'id': domain_ref['id']}
@ -111,7 +112,7 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
:return: A dictionary containing up to three keys, the `id` of the
project, the `name` of the project, and the parent `domain`.
"""
project_ref = self.resource_api.get_project(project_id)
project_ref = PROVIDERS.resource_api.get_project(project_id)
if not project_ref.get('enabled'):
msg = _('Unable to validate token because project %(id)s is '
'disabled') % {'id': project_ref['id']}
@ -137,7 +138,7 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
token_data['domain'] = self._get_filtered_domain(domain_id)
if project_id:
token_data['project'] = self._get_filtered_project(project_id)
project_ref = self.resource_api.get_project(project_id)
project_ref = PROVIDERS.resource_api.get_project(project_id)
token_data['is_domain'] = project_ref['is_domain']
def _populate_is_admin_project(self, token_data):
@ -161,12 +162,12 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
def _get_roles_for_user(self, user_id, domain_id, project_id):
roles = []
if domain_id:
roles = self.assignment_api.get_roles_for_user_and_domain(
roles = PROVIDERS.assignment_api.get_roles_for_user_and_domain(
user_id, domain_id)
if project_id:
roles = self.assignment_api.get_roles_for_user_and_project(
roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
user_id, project_id)
return [self.role_api.get_role(role_id) for role_id in roles]
return [PROVIDERS.role_api.get_role(role_id) for role_id in roles]
def populate_roles_for_federated_user(self, token_data, group_ids,
project_id=None, domain_id=None,
@ -205,9 +206,9 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
# appropriate error message.
raise exception.Unauthorized(msg)
roles = self.assignment_api.get_roles_for_groups(group_ids,
project_id,
domain_id)
roles = PROVIDERS.assignment_api.get_roles_for_groups(
group_ids, project_id, domain_id
)
roles = roles + self._get_roles_for_user(user_id, domain_id,
project_id)
@ -236,25 +237,27 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
# no need to repopulate user if it already exists
return
user_ref = self.identity_api.get_user(user_id)
user_ref = PROVIDERS.identity_api.get_user(user_id)
if CONF.trust.enabled and trust and 'OS-TRUST:trust' not in token_data:
trustor_user_ref = (self.identity_api.get_user(
trustor_user_ref = (PROVIDERS.identity_api.get_user(
trust['trustor_user_id']))
trustee_user_ref = (self.identity_api.get_user(
trustee_user_ref = (PROVIDERS.identity_api.get_user(
trust['trustee_user_id']))
try:
self.resource_api.assert_domain_enabled(
PROVIDERS.resource_api.assert_domain_enabled(
trustor_user_ref['domain_id'])
except AssertionError:
raise exception.TokenNotFound(_('Trustor domain is disabled.'))
try:
self.resource_api.assert_domain_enabled(
PROVIDERS.resource_api.assert_domain_enabled(
trustee_user_ref['domain_id'])
except AssertionError:
raise exception.TokenNotFound(_('Trustee domain is disabled.'))
try:
self.identity_api.assert_user_enabled(trust['trustor_user_id'])
PROVIDERS.identity_api.assert_user_enabled(
trust['trustor_user_id']
)
except AssertionError:
raise exception.Forbidden(_('Trustor is disabled.'))
if trust['impersonation']:
@ -288,11 +291,11 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
if access_token:
filtered_roles = []
access_token_ref = self.oauth_api.get_access_token(
access_token_ref = PROVIDERS.oauth_api.get_access_token(
access_token['id']
)
authed_role_ids = jsonutils.loads(access_token_ref['role_ids'])
all_roles = self.role_api.list_roles()
all_roles = PROVIDERS.role_api.list_roles()
for role in all_roles:
for authed_role in authed_role_ids:
if authed_role == role['id']:
@ -307,7 +310,9 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
# need to do this because the user ID of the original trustor helps
# us determine scope in the redelegated context.
if trust.get('redelegated_trust_id'):
trust_chain = self.trust_api.get_trust_pedigree(trust['id'])
trust_chain = PROVIDERS.trust_api.get_trust_pedigree(
trust['id']
)
token_user_id = trust_chain[-1]['trustor_user_id']
else:
token_user_id = trust['trustor_user_id']
@ -327,15 +332,15 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
# any implied roles, whether global or domain specific
refs = [{'role_id': role['id']} for role in trust['roles']]
effective_trust_roles = (
self.assignment_api.add_implied_roles(refs))
PROVIDERS.assignment_api.add_implied_roles(refs))
# Now get the current role assignments for the trustor,
# including any domain specific roles.
assignment_list = self.assignment_api.list_role_assignments(
assignments = PROVIDERS.assignment_api.list_role_assignments(
user_id=token_user_id,
project_id=token_project_id,
effective=True, strip_domain_roles=False)
current_effective_trustor_roles = (
list(set([x['role_id'] for x in assignment_list])))
list(set([x['role_id'] for x in assignments])))
# Go through each of the effective trust roles, making sure the
# trustor still has them, if any have been removed, then we
# will treat the trust as invalid
@ -344,7 +349,7 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
match_roles = [x for x in current_effective_trustor_roles
if x == trust_role['role_id']]
if match_roles:
role = self.role_api.get_role(match_roles[0])
role = PROVIDERS.role_api.get_role(match_roles[0])
if role['domain_id'] is None:
filtered_roles.append(role)
else:
@ -383,7 +388,7 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
if CONF.trust.enabled and trust:
user_id = trust['trustor_user_id']
if project_id or domain_id:
service_catalog = self.catalog_api.get_v3_catalog(
service_catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id, project_id)
token_data['catalog'] = service_catalog
@ -391,7 +396,9 @@ class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
if 'service_providers' in token_data:
return
service_providers = self.federation_api.get_enabled_service_providers()
service_providers = (
PROVIDERS.federation_api.get_enabled_service_providers()
)
if service_providers:
token_data['service_providers'] = service_providers
@ -496,7 +503,9 @@ class BaseProvider(provider_api.ProviderAPIMixin, base.Provider):
access_token = None
if 'oauth1' in method_names:
access_token_id = auth_context['access_token_id']
access_token = self.oauth_api.get_access_token(access_token_id)
access_token = PROVIDERS.oauth_api.get_access_token(
access_token_id
)
token_data = self.v3_token_data_helper.get_token_data(
user_id,
@ -520,7 +529,7 @@ class BaseProvider(provider_api.ProviderAPIMixin, base.Provider):
idp = auth_context[federation_constants.IDENTITY_PROVIDER]
protocol = auth_context[federation_constants.PROTOCOL]
user_dict = self.identity_api.get_user(user_id)
user_dict = PROVIDERS.identity_api.get_user(user_id)
user_name = user_dict['name']
token_data = {
@ -576,7 +585,7 @@ class BaseProvider(provider_api.ProviderAPIMixin, base.Provider):
trust_ref = None
trust_id = token_ref.get('trust_id')
if trust_id:
trust_ref = self.trust_api.get_trust(trust_id)
trust_ref = PROVIDERS.trust_api.get_trust(trust_id)
token_dict = None
if token_data['token']['user'].get(
federation_constants.FEDERATION):
@ -613,11 +622,13 @@ class BaseProvider(provider_api.ProviderAPIMixin, base.Provider):
domain_id
)
if trust_id:
trust_ref = self.trust_api.get_trust(trust_id)
trust_ref = PROVIDERS.trust_api.get_trust(trust_id)
access_token = None
if access_token_id:
access_token = self.oauth_api.get_access_token(access_token_id)
access_token = PROVIDERS.oauth_api.get_access_token(
access_token_id
)
return self.v3_token_data_helper.get_token_data(
user_id,