From 3dbf4be06f674f132608b7a18da2f44d40e9953a Mon Sep 17 00:00:00 2001 From: Lance Bragstad Date: Fri, 22 Jun 2018 18:52:25 +0000 Subject: [PATCH] Cleanup keystone.token.providers.common This module was a hodge-podge of common utility methods and a basic implementation of the token provider API interface. In theory, if something should be done for all providers, we should try and pull it into a higher layer, like the token provider Manager. This makes things easier to share without having to worry about reimplementing something if we override a specific method of the interface. This is the pattern we're working towards with the TokenModel object. It was also home to the V3TokenDataHelper, which was ultimately responsible for making sure the token API contracts were honored. Now that we've moved token behavior into the TokenModel and the representation of a token into the controllers, we don't need this anymore. We should be able to make this much more clear and clean up the interfaces for people providing their own token providers. Partial-Bug: 1778945 Change-Id: I6f069c8c94e625ae553e9b41f0c54fd25bad9408 --- keystone/middleware/auth.py | 15 +- keystone/tests/unit/test_revoke.py | 12 +- keystone/tests/unit/test_token_provider.py | 6 + keystone/tests/unit/test_v3_federation.py | 20 +- keystone/tests/unit/token/test_common.py | 36 -- .../tests/unit/token/test_fernet_provider.py | 6 +- .../unit/token/test_token_data_helper.py | 56 -- keystone/tests/unit/token/test_token_model.py | 16 +- keystone/token/providers/common.py | 593 ------------------ 9 files changed, 43 insertions(+), 717 deletions(-) delete mode 100644 keystone/tests/unit/token/test_common.py delete mode 100644 keystone/tests/unit/token/test_token_data_helper.py delete mode 100644 keystone/token/providers/common.py diff --git a/keystone/middleware/auth.py b/keystone/middleware/auth.py index 9969604be5..b7553e9478 100644 --- a/keystone/middleware/auth.py +++ b/keystone/middleware/auth.py @@ -25,7 +25,6 @@ from keystone.federation import constants as federation_constants from keystone.federation import utils from keystone.i18n import _ from keystone.models import token_model -from keystone.token.providers import common CONF = keystone.conf.CONF LOG = log.getLogger(__name__) @@ -83,12 +82,11 @@ class AuthContextMiddleware(provider_api.ProviderAPIMixin, auth_context['roles'] = user_ref['roles'] else: # it's the local user, so token data is needed. - token_helper = common.V3TokenDataHelper() - token_data = token_helper.get_token_data( - user_id=user_ref['id'], - method_names=[CONF.tokenless_auth.protocol], - domain_id=domain_id, - project_id=project_id) + token = token_model.TokenModel() + token.user_id = user_ref['id'] + token.methods = [CONF.tokenless_auth.protocol] + token.domain_id = domain_id + token.project_id = project_id auth_context = {'user_id': user_ref['id']} auth_context['is_delegated_auth'] = False @@ -96,8 +94,7 @@ class AuthContextMiddleware(provider_api.ProviderAPIMixin, auth_context['domain_id'] = domain_id if project_id: auth_context['project_id'] = project_id - auth_context['roles'] = [role['name'] for role - in token_data['token']['roles']] + auth_context['roles'] = [role['name'] for role in token.roles] return auth_context def _validate_trusted_issuer(self, request): diff --git a/keystone/tests/unit/test_revoke.py b/keystone/tests/unit/test_revoke.py index 41d5a247ec..1faad0270d 100644 --- a/keystone/tests/unit/test_revoke.py +++ b/keystone/tests/unit/test_revoke.py @@ -27,7 +27,7 @@ from keystone.revoke.backends import sql from keystone.tests import unit from keystone.tests.unit import ksfixtures from keystone.tests.unit import test_backend_sql -from keystone.token.providers import common +from keystone.token import provider CONF = keystone.conf.CONF @@ -168,7 +168,7 @@ class RevokeTests(object): # check to make sure that list_events matches the token to the event we # just revoked. first_token = _sample_blank_token() - first_token['audit_id'] = common.random_urlsafe_str() + first_token['audit_id'] = provider.random_urlsafe_str() PROVIDERS.revoke_api.revoke_by_audit_id( audit_id=first_token['audit_id']) self._assertTokenRevoked(first_token) @@ -179,7 +179,7 @@ class RevokeTests(object): # sure that list events only finds 1 match since there are 2 and they # dont both have different populated audit_id fields second_token = _sample_blank_token() - second_token['audit_id'] = common.random_urlsafe_str() + second_token['audit_id'] = provider.random_urlsafe_str() PROVIDERS.revoke_api.revoke_by_audit_id( audit_id=second_token['audit_id']) self._assertTokenRevoked(second_token) @@ -212,7 +212,7 @@ class RevokeTests(object): first_token = _sample_blank_token() first_token['user_id'] = uuid.uuid4().hex first_token['project_id'] = uuid.uuid4().hex - first_token['audit_id'] = common.random_urlsafe_str() + first_token['audit_id'] = provider.random_urlsafe_str() # revoke event and then verify that there is only one revocation # and verify the only revoked event is the token PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent( @@ -241,7 +241,7 @@ class RevokeTests(object): fourth_token = _sample_blank_token() fourth_token['user_id'] = uuid.uuid4().hex fourth_token['project_id'] = uuid.uuid4().hex - fourth_token['audit_id'] = common.random_urlsafe_str() + fourth_token['audit_id'] = provider.random_urlsafe_str() PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent( project_id=fourth_token['project_id'], audit_id=fourth_token['audit_id'])) @@ -380,7 +380,7 @@ class RevokeTests(object): revocation_backend = sql.Revoke() # Create our first token with audit_id - audit_id = common.build_audit_info(parent_audit_id=None)[0] + audit_id = provider.random_urlsafe_str() token = _sample_blank_token() # Audit ID and Audit Chain ID are populated with the same value # if the token is an original token diff --git a/keystone/tests/unit/test_token_provider.py b/keystone/tests/unit/test_token_provider.py index dfb0495790..f8bda8f6b7 100644 --- a/keystone/tests/unit/test_token_provider.py +++ b/keystone/tests/unit/test_token_provider.py @@ -15,6 +15,7 @@ import datetime from oslo_utils import timeutils +from six.moves import urllib from keystone.common import provider_api from keystone.common import utils @@ -25,6 +26,7 @@ from keystone.tests import unit from keystone.tests.unit import ksfixtures from keystone.tests.unit.ksfixtures import database from keystone import token +from keystone.token import provider CONF = keystone.conf.CONF @@ -452,6 +454,10 @@ class TestTokenProvider(unit.TestCase): ) self.load_backends() + def test_strings_are_url_safe(self): + s = provider.random_urlsafe_str() + self.assertEqual(s, urllib.parse.quote_plus(s)) + def test_unsupported_token_provider(self): self.config_fixture.config(group='token', provider='MyProvider') diff --git a/keystone/tests/unit/test_v3_federation.py b/keystone/tests/unit/test_v3_federation.py index a00afa6c41..92689582e8 100644 --- a/keystone/tests/unit/test_v3_federation.py +++ b/keystone/tests/unit/test_v3_federation.py @@ -33,11 +33,13 @@ if not xmldsig: xmldsig = importutils.try_import("xmldsig") from keystone.auth import controllers as auth_controllers +from keystone.common import controller from keystone.common import provider_api import keystone.conf from keystone import exception from keystone.federation import controllers as federation_controllers from keystone.federation import idp as keystone_idp +from keystone.models import token_model from keystone import notifications from keystone.tests import unit from keystone.tests.unit import core @@ -45,7 +47,6 @@ from keystone.tests.unit import federation_fixtures from keystone.tests.unit import ksfixtures from keystone.tests.unit import mapping_fixtures from keystone.tests.unit import test_v3 -from keystone.token.providers import common as token_common CONF = keystone.conf.CONF @@ -4741,8 +4742,6 @@ class K2KServiceCatalogTests(test_v3.RestfulTestCase): PROVIDERS.federation_api.create_sp(self.SP3, sp) self.sp_gamma = {self.SP3: sp} - self.token_v3_helper = token_common.V3TokenDataHelper() - def sp_response(self, id, ref): ref.pop('enabled') ref.pop('description') @@ -4774,7 +4773,10 @@ class K2KServiceCatalogTests(test_v3.RestfulTestCase): def test_service_providers_in_token(self): """Check if service providers are listed in service catalog.""" - token = self.token_v3_helper.get_token_data(self.user_id, ['password']) + model = token_model.TokenModel() + model.user_id = self.user_id + model.methods = ['password'] + token = controller.render_token_response_from_model(model) ref = {} for r in (self.sp_alpha, self.sp_beta, self.sp_gamma): ref.update(r) @@ -4791,7 +4793,10 @@ class K2KServiceCatalogTests(test_v3.RestfulTestCase): sp_ref = {'enabled': False} PROVIDERS.federation_api.update_sp(self.SP1, sp_ref) - token = self.token_v3_helper.get_token_data(self.user_id, ['password']) + model = token_model.TokenModel() + model.user_id = self.user_id + model.methods = ['password'] + token = controller.render_token_response_from_model(model) ref = {} for r in (self.sp_beta, self.sp_gamma): ref.update(r) @@ -4808,7 +4813,10 @@ class K2KServiceCatalogTests(test_v3.RestfulTestCase): for sp in (self.SP1, self.SP2, self.SP3): PROVIDERS.federation_api.update_sp(sp, sp_ref) - token = self.token_v3_helper.get_token_data(self.user_id, ['password']) + model = token_model.TokenModel() + model.user_id = self.user_id + model.methods = ['password'] + token = controller.render_token_response_from_model(model) self.assertNotIn('service_providers', token['token'], message=('Expected Service Catalog not to have ' 'service_providers')) diff --git a/keystone/tests/unit/token/test_common.py b/keystone/tests/unit/token/test_common.py deleted file mode 100644 index ac835b9dcc..0000000000 --- a/keystone/tests/unit/token/test_common.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from six.moves import urllib - -from keystone.tests import unit -from keystone.token import provider -from keystone.token.providers import common - - -class TestTokenProvidersCommon(unit.TestCase): - def test_strings_are_url_safe(self): - s = common.random_urlsafe_str() - self.assertEqual(s, urllib.parse.quote_plus(s)) - - def test_unsupported_provider_raises_import_error(self): - namespace = "keystone.token.provider" - # Generate a random name - driver = uuid.uuid4().hex - self.config_fixture.config(group='token', provider=driver) - msg = "Unable to find '%(driver)s' driver in '%(namespace)s'." % { - 'namespace': namespace, 'driver': driver - } - - self.assertRaisesRegex(ImportError, msg, provider.Manager) diff --git a/keystone/tests/unit/token/test_fernet_provider.py b/keystone/tests/unit/token/test_fernet_provider.py index 6ca28a4515..45ddce2649 100644 --- a/keystone/tests/unit/token/test_fernet_provider.py +++ b/keystone/tests/unit/token/test_fernet_provider.py @@ -30,7 +30,7 @@ from keystone.federation import constants as federation_constants from keystone.tests import unit from keystone.tests.unit import ksfixtures from keystone.tests.unit.ksfixtures import database -from keystone.token.providers import common +from keystone.token import provider from keystone.token.providers import fernet from keystone.token import token_formatters @@ -236,7 +236,7 @@ class TestPayloads(unit.TestCase): delta=1e-05) def test_strings_can_be_converted_to_bytes(self): - s = common.random_urlsafe_str() + s = provider.random_urlsafe_str() self.assertIsInstance(s, six.text_type) b = token_formatters.BasePayload.random_urlsafe_str_to_bytes(s) @@ -293,7 +293,7 @@ class TestPayloads(unit.TestCase): exp_user_id = exp_user_id or uuid.uuid4().hex exp_methods = exp_methods or ['password'] exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [common.random_urlsafe_str()] + exp_audit_ids = [provider.random_urlsafe_str()] payload = payload_class.assemble( exp_user_id, exp_methods, exp_system, exp_project_id, diff --git a/keystone/tests/unit/token/test_token_data_helper.py b/keystone/tests/unit/token/test_token_data_helper.py deleted file mode 100644 index 9e8c3889a6..0000000000 --- a/keystone/tests/unit/token/test_token_data_helper.py +++ /dev/null @@ -1,56 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import base64 -import uuid - -from testtools import matchers - -from keystone import exception -from keystone.tests import unit -from keystone.token.providers import common - - -class TestTokenDataHelper(unit.TestCase): - def setUp(self): - super(TestTokenDataHelper, self).setUp() - self.load_backends() - self.v3_data_helper = common.V3TokenDataHelper() - - def test_v3_token_data_helper_populate_audit_info_string(self): - token_data = {} - audit_info_bytes = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2] - audit_info = audit_info_bytes.decode('utf-8') - self.v3_data_helper._populate_audit_info(token_data, audit_info) - self.assertIn(audit_info, token_data['audit_ids']) - self.assertThat(token_data['audit_ids'], matchers.HasLength(2)) - - def test_v3_token_data_helper_populate_audit_info_none(self): - token_data = {} - self.v3_data_helper._populate_audit_info(token_data, audit_info=None) - self.assertThat(token_data['audit_ids'], matchers.HasLength(1)) - self.assertNotIn(None, token_data['audit_ids']) - - def test_v3_token_data_helper_populate_audit_info_list(self): - token_data = {} - audit_info = [base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2], - base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]] - self.v3_data_helper._populate_audit_info(token_data, audit_info) - self.assertEqual(audit_info, token_data['audit_ids']) - - def test_v3_token_data_helper_populate_audit_info_invalid(self): - token_data = {} - audit_info = dict() - self.assertRaises(exception.UnexpectedError, - self.v3_data_helper._populate_audit_info, - token_data=token_data, - audit_info=audit_info) diff --git a/keystone/tests/unit/token/test_token_model.py b/keystone/tests/unit/token/test_token_model.py index 87a6ddeb49..10f457d7e4 100644 --- a/keystone/tests/unit/token/test_token_model.py +++ b/keystone/tests/unit/token/test_token_model.py @@ -26,7 +26,7 @@ from keystone.models import token_model from keystone.tests.unit import base_classes from keystone.tests.unit import core from keystone.tests.unit import test_token_provider -from keystone.token.providers import common as provider_common +from keystone.token import provider CONF = keystone.conf.CONF PROVIDERS = provider_api.ProviderAPIs @@ -206,12 +206,12 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap): def test_audit_id_attributes(self): token = token_model.TokenModel() - audit_id = provider_common.random_urlsafe_str() + audit_id = provider.random_urlsafe_str() token.audit_id = audit_id self.assertTrue(len(token.audit_ids) == 1) - parent_audit_id = provider_common.random_urlsafe_str() + parent_audit_id = provider.random_urlsafe_str() token.parent_audit_id = parent_audit_id self.assertTrue(len(token.audit_ids) == 2) @@ -291,7 +291,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap): token = token_model.TokenModel() token.user_id = user['id'] token.system = 'all' - token.audit_id = provider_common.random_urlsafe_str() + token.audit_id = provider.random_urlsafe_str() self.assertRaises( exception.Unauthorized, token.mint, self.token_id, self.issued_at @@ -360,7 +360,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap): token = token_model.TokenModel() token.user_id = user['id'] token.domain_id = CONF.identity.default_domain_id - token.audit_id = provider_common.random_urlsafe_str() + token.audit_id = provider.random_urlsafe_str() self.assertRaises( exception.Unauthorized, token.mint, self.token_id, self.issued_at @@ -398,7 +398,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap): token = token_model.TokenModel() token.user_id = user['id'] token.project_id = self.project_id - token.audit_id = provider_common.random_urlsafe_str() + token.audit_id = provider.random_urlsafe_str() self.assertRaises( exception.Unauthorized, token.mint, self.token_id, self.issued_at @@ -412,7 +412,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap): token = token_model.TokenModel() token.user_id = self.admin_user_id token.project_id = self.project_id - token.audit_id = provider_common.random_urlsafe_str() + token.audit_id = provider.random_urlsafe_str() self.assertRaises( exception.ProjectNotFound, token.mint, self.token_id, @@ -428,7 +428,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap): token = token_model.TokenModel() token.user_id = self.admin_user_id token.project_id = self.project_id - token.audit_id = provider_common.random_urlsafe_str() + token.audit_id = provider.random_urlsafe_str() self.assertRaises( exception.DomainNotFound, token.mint, self.token_id, self.issued_at diff --git a/keystone/token/providers/common.py b/keystone/token/providers/common.py deleted file mode 100644 index 583aaecba0..0000000000 --- a/keystone/token/providers/common.py +++ /dev/null @@ -1,593 +0,0 @@ -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from __future__ import absolute_import - -import base64 -import datetime -import itertools -import uuid - -from oslo_log import log -from oslo_serialization import jsonutils -from oslo_utils import timeutils -import six -from six.moves.urllib import parse - -from keystone.common import provider_api -from keystone.common import utils -import keystone.conf -from keystone import exception -from keystone.federation import constants as federation_constants -from keystone.i18n import _ -from keystone.models import token_model -from keystone.token.providers import base - - -LOG = log.getLogger(__name__) -CONF = keystone.conf.CONF -PROVIDERS = provider_api.ProviderAPIs - - -def default_expire_time(): - """Determine when a fresh token should expire. - - Expiration time varies based on configuration (see ``[token] expiration``). - - :returns: a naive UTC datetime.datetime object - - """ - expire_delta = datetime.timedelta(seconds=CONF.token.expiration) - expires_at = timeutils.utcnow() + expire_delta - return expires_at.replace(microsecond=0) - - -def random_urlsafe_str(): - """Generate a random URL-safe string. - - :rtype: six.text_type - - """ - # chop the padding (==) off the end of the encoding to save space - return base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2].decode('utf-8') - - -def build_audit_info(parent_audit_id=None): - """Build the audit data for a token. - - If ``parent_audit_id`` is None, the list will be one element in length - containing a newly generated audit_id. - - If ``parent_audit_id`` is supplied, the list will be two elements in length - containing a newly generated audit_id and the ``parent_audit_id``. The - ``parent_audit_id`` will always be element index 1 in the resulting - list. - - :param parent_audit_id: the audit of the original token in the chain - :type parent_audit_id: str - :returns: Keystone token audit data - """ - audit_id = random_urlsafe_str() - if parent_audit_id is not None: - return [audit_id, parent_audit_id] - return [audit_id] - - -class V3TokenDataHelper(provider_api.ProviderAPIMixin, object): - """Token data helper.""" - - def __init__(self): - # Keep __init__ around to ensure dependency injection works. - super(V3TokenDataHelper, self).__init__() - - def _get_filtered_domain(self, domain_id): - """Ensure the domain is enabled and return domain id and name. - - :param domain_id: The ID of the domain to validate - :returns: A dictionary containing two keys, the `id` of the domain and - the `name` of the domain. - """ - domain_ref = PROVIDERS.resource_api.get_domain(domain_id) - if not domain_ref.get('enabled'): - msg = _('Unable to validate token because domain %(id)s is ' - 'disabled') % {'id': domain_ref['id']} - LOG.warning(msg) - raise exception.DomainNotFound(msg) - return {'id': domain_ref['id'], 'name': domain_ref['name']} - - def _get_filtered_project(self, project_id): - """Ensure the project and parent domain is enabled. - - :param project_id: The ID of the project to validate - :return: A dictionary containing up to three keys, the `id` of the - project, the `name` of the project, and the parent `domain`. - """ - project_ref = PROVIDERS.resource_api.get_project(project_id) - if not project_ref.get('enabled'): - msg = _('Unable to validate token because project %(id)s is ' - 'disabled') % {'id': project_ref['id']} - LOG.warning(msg) - raise exception.ProjectNotFound(msg) - filtered_project = { - 'id': project_ref['id'], - 'name': project_ref['name']} - if project_ref['domain_id'] is not None: - filtered_project['domain'] = ( - self._get_filtered_domain(project_ref['domain_id'])) - else: - # Projects acting as a domain do not have a domain_id attribute - filtered_project['domain'] = None - return filtered_project - - def _populate_scope(self, token_data, system, domain_id, project_id): - if 'domain' in token_data or 'project' in token_data: - # scope already exist, no need to populate it again - return - - if domain_id: - token_data['domain'] = self._get_filtered_domain(domain_id) - elif project_id: - token_data['project'] = self._get_filtered_project(project_id) - project_ref = PROVIDERS.resource_api.get_project(project_id) - token_data['is_domain'] = project_ref['is_domain'] - elif system == 'all': - # NOTE(lbragstad): This might have to be more elegant in the future - # if, or when, keystone supports scoping a token to a specific - # service or region. - token_data['system'] = {'all': True} - - def _populate_is_admin_project(self, token_data): - # TODO(ayoung): Support the ability for a project acting as a domain - # to be the admin project once the rest of the code for projects - # acting as domains is merged. Code will likely be: - # (r.admin_project_name == None and project['is_domain'] == True - # and project['name'] == r.admin_project_domain_name) - admin_project_name = CONF.resource.admin_project_name - admin_project_domain_name = CONF.resource.admin_project_domain_name - - if not (admin_project_name and admin_project_domain_name): - return # admin project not enabled - - project = token_data['project'] - - token_data['is_admin_project'] = ( - project['name'] == admin_project_name and - project['domain']['name'] == admin_project_domain_name) - - def _get_roles_for_user(self, user_id, system, domain_id, project_id): - roles = [] - if system: - group_ids = [ - group['id'] for - group in PROVIDERS.identity_api.list_groups_for_user(user_id) - ] - group_roles = [] - for group_id in group_ids: - roles = PROVIDERS.assignment_api.list_system_grants_for_group( - group_id - ) - for role in roles: - group_roles.append(role) - - user_roles = PROVIDERS.assignment_api.list_system_grants_for_user( - user_id - ) - return itertools.chain(group_roles, user_roles) - if domain_id: - roles = PROVIDERS.assignment_api.get_roles_for_user_and_domain( - user_id, domain_id) - if project_id: - roles = PROVIDERS.assignment_api.get_roles_for_user_and_project( - user_id, project_id) - return [PROVIDERS.role_api.get_role(role_id) for role_id in roles] - - def _get_app_cred_roles(self, app_cred, user_id, domain_id, project_id): - roles = app_cred['roles'] - token_roles = [] - for role in roles: - try: - role_ref = PROVIDERS.assignment_api.get_grant( - role['id'], user_id=user_id, domain_id=domain_id, - project_id=project_id) - token_roles.append(role_ref) - except exception.RoleAssignmentNotFound: - pass - return [ - PROVIDERS.role_api.get_role(role['id']) for role in token_roles] - - def populate_roles_for_federated_user(self, token_data, group_ids, - project_id=None, domain_id=None, - user_id=None, system=None): - """Populate roles basing on provided groups and assignments. - - Used for federated users with dynamically assigned groups. - This method does not return anything, yet it modifies token_data in - place. - - :param token_data: a dictionary used for building token response - :param group_ids: list of group IDs a user is a member of - :param project_id: project ID to scope to - :param domain_id: domain ID to scope to - :param user_id: user ID - :param system: system scope if applicable - - :raises keystone.exception.Unauthorized: when no roles were found - - """ - def check_roles(roles, user_id, project_id, domain_id): - # User was granted roles so simply exit this function. - if roles: - return - if project_id: - msg = _('User %(user_id)s has no access ' - 'to project %(project_id)s') % { - 'user_id': user_id, - 'project_id': project_id} - elif domain_id: - msg = _('User %(user_id)s has no access ' - 'to domain %(domain_id)s') % { - 'user_id': user_id, - 'domain_id': domain_id} - # Since no roles were found a user is not authorized to - # perform any operations. Raise an exception with - # appropriate error message. - raise exception.Unauthorized(msg) - - roles = PROVIDERS.assignment_api.get_roles_for_groups( - group_ids, project_id, domain_id - ) - roles = roles + self._get_roles_for_user( - user_id, system, domain_id, project_id - ) - - # NOTE(lbragstad): Remove duplicate role references from a list of - # roles. It is often suggested that this be done with: - # - # roles = [dict(t) for t in set([tuple(d.items()) for d in roles])] - # - # But that doesn't actually remove duplicates in all cases and causes - # transient failures because dictionaries are unordered objects. This - # means {'id': 1, 'foo': 'bar'} and {'foo': 'bar', 'id': 1} won't - # actually resolve to a single entity in the above logic since they are - # both considered unique. By using `in` we're performing a containment - # check, which also does a deep comparison of the objects, which is - # what we want. - unique_roles = [] - for role in roles: - if role not in unique_roles: - unique_roles.append(role) - - check_roles(unique_roles, user_id, project_id, domain_id) - token_data['roles'] = unique_roles - - def _populate_user(self, token_data, user_id, trust): - if 'user' in token_data: - # no need to repopulate user if it already exists - return - - user_ref = PROVIDERS.identity_api.get_user(user_id) - if trust and 'OS-TRUST:trust' not in token_data: - trustor_user_ref = (PROVIDERS.identity_api.get_user( - trust['trustor_user_id'])) - trustee_user_ref = (PROVIDERS.identity_api.get_user( - trust['trustee_user_id'])) - try: - PROVIDERS.resource_api.assert_domain_enabled( - trustor_user_ref['domain_id']) - except AssertionError: - raise exception.TokenNotFound(_('Trustor domain is disabled.')) - try: - PROVIDERS.resource_api.assert_domain_enabled( - trustee_user_ref['domain_id']) - except AssertionError: - raise exception.TokenNotFound(_('Trustee domain is disabled.')) - - try: - PROVIDERS.identity_api.assert_user_enabled( - trust['trustor_user_id'] - ) - except AssertionError: - raise exception.Forbidden(_('Trustor is disabled.')) - if trust['impersonation']: - user_ref = trustor_user_ref - token_data['OS-TRUST:trust'] = ( - { - 'id': trust['id'], - 'trustor_user': {'id': trust['trustor_user_id']}, - 'trustee_user': {'id': trust['trustee_user_id']}, - 'impersonation': trust['impersonation'] - }) - filtered_user = { - 'id': user_ref['id'], - 'name': user_ref['name'], - 'domain': self._get_filtered_domain(user_ref['domain_id']), - 'password_expires_at': user_ref['password_expires_at']} - token_data['user'] = filtered_user - - def _populate_oauth_section(self, token_data, access_token): - if access_token: - access_token_id = access_token['id'] - consumer_id = access_token['consumer_id'] - token_data['OS-OAUTH1'] = ({'access_token_id': access_token_id, - 'consumer_id': consumer_id}) - - def _populate_roles(self, token_data, user_id, system, domain_id, - project_id, trust, app_cred_id, access_token): - if 'roles' in token_data: - # no need to repopulate roles - return - - if access_token: - filtered_roles = [] - access_token_ref = PROVIDERS.oauth_api.get_access_token( - access_token['id'] - ) - authed_role_ids = jsonutils.loads(access_token_ref['role_ids']) - all_roles = PROVIDERS.role_api.list_roles() - for role in all_roles: - for authed_role in authed_role_ids: - if authed_role == role['id']: - filtered_roles.append({'id': role['id'], - 'name': role['name']}) - token_data['roles'] = filtered_roles - return - - if trust: - # If redelegated_trust_id is set, then we must traverse the - # trust_chain in order to determine who the original trustor is. We - # need to do this because the user ID of the original trustor helps - # us determine scope in the redelegated context. - if trust.get('redelegated_trust_id'): - trust_chain = PROVIDERS.trust_api.get_trust_pedigree( - trust['id'] - ) - token_user_id = trust_chain[-1]['trustor_user_id'] - else: - token_user_id = trust['trustor_user_id'] - - token_project_id = trust['project_id'] - # trusts do not support domains yet - token_domain_id = None - else: - token_user_id = user_id - token_project_id = project_id - token_domain_id = domain_id - - if system or token_domain_id or token_project_id: - filtered_roles = [] - if trust: - # First expand out any roles that were in the trust to include - # any implied roles, whether global or domain specific - refs = [{'role_id': role['id']} for role in trust['roles']] - effective_trust_roles = ( - PROVIDERS.assignment_api.add_implied_roles(refs)) - effective_trust_role_ids = ( - set([r['role_id'] for r in effective_trust_roles]) - ) - # Now get the current role assignments for the trustor, - # including any domain specific roles. - assignments = PROVIDERS.assignment_api.list_role_assignments( - user_id=token_user_id, - system=system, - project_id=token_project_id, - effective=True, strip_domain_roles=False) - current_effective_trustor_roles = ( - set([x['role_id'] for x in assignments])) - # Go through each of the effective trust roles, making sure the - # trustor still has them, if any have been removed, then we - # will treat the trust as invalid - for trust_role_id in effective_trust_role_ids: - if trust_role_id in current_effective_trustor_roles: - role = PROVIDERS.role_api.get_role(trust_role_id) - if role['domain_id'] is None: - filtered_roles.append(role) - else: - raise exception.Forbidden( - _('Trustee has no delegated roles.')) - elif app_cred_id: - app_cred_api = PROVIDERS.application_credential_api - app_cred_ref = app_cred_api.get_application_credential( - app_cred_id) - for role in self._get_app_cred_roles(app_cred_ref, - token_user_id, - token_domain_id, - token_project_id): - filtered_roles.append({'id': role['id'], - 'name': role['name']}) - else: - for role in self._get_roles_for_user(token_user_id, - system, - token_domain_id, - token_project_id): - filtered_roles.append({'id': role['id'], - 'name': role['name']}) - - # user has no project or domain roles, therefore access denied - if not filtered_roles: - if token_project_id: - msg = _('User %(user_id)s has no access ' - 'to project %(project_id)s') % { - 'user_id': user_id, - 'project_id': token_project_id} - elif token_domain_id: - msg = _('User %(user_id)s has no access ' - 'to domain %(domain_id)s') % { - 'user_id': user_id, - 'domain_id': token_domain_id} - elif system: - msg = _('User %(user_id)s has no access ' - 'to the system') % {'user_id': user_id} - LOG.debug(msg) - raise exception.Unauthorized(msg) - - token_data['roles'] = filtered_roles - - def _populate_service_catalog(self, token_data, user_id, system, domain_id, - project_id, trust): - if 'catalog' in token_data: - # no need to repopulate service catalog - return - - if trust: - user_id = trust['trustor_user_id'] - - # NOTE(lbragstad): The catalog API requires a project in order to - # generate a service catalog, but that appears to be only if there are - # endpoint -> project relationships. In the event we're dealing with a - # system_scoped token, we should pass None to the catalog API and just - # get a catalog anyway. - if project_id or domain_id or system: - service_catalog = PROVIDERS.catalog_api.get_v3_catalog( - user_id, project_id) - token_data['catalog'] = service_catalog - - def _populate_service_providers(self, token_data): - if 'service_providers' in token_data: - return - - service_providers = ( - PROVIDERS.federation_api.get_enabled_service_providers() - ) - if service_providers: - token_data['service_providers'] = service_providers - - def _validate_identity_provider(self, token_data): - federated_info = token_data['user'].get('OS-FEDERATION') - if federated_info: - idp_id = federated_info['identity_provider']['id'] - # FIXME(lbragstad): This isn't working properly because somewhere - # along the line we *were* encoding and decoding properly. This - # is needed to get some tests to pass in python 3. This will likely - # be fixed when the validate token path is moved over to using the - # token model, just like authenticate. - if isinstance(idp_id, bytes): - idp_id = idp_id.decode('utf-8') - PROVIDERS.federation_api.get_idp(idp_id) - - def _populate_token_dates(self, token_data, expires=None, issued_at=None): - if not expires: - expires = default_expire_time() - if not isinstance(expires, six.string_types): - expires = utils.isotime(expires, subsecond=True) - token_data['expires_at'] = expires - token_data['issued_at'] = (issued_at or - utils.isotime(subsecond=True)) - - def _populate_audit_info(self, token_data, audit_info=None): - if audit_info is None or isinstance(audit_info, six.string_types): - token_data['audit_ids'] = build_audit_info(audit_info) - elif isinstance(audit_info, list): - token_data['audit_ids'] = audit_info - else: - msg = (_('Invalid audit info data type: %(data)s (%(type)s)') % - {'data': audit_info, 'type': type(audit_info)}) - LOG.error(msg) - raise exception.UnexpectedError(msg) - - def _populate_app_cred(self, token_data, app_cred_id): - if app_cred_id: - app_cred_api = PROVIDERS.application_credential_api - app_cred = app_cred_api.get_application_credential(app_cred_id) - restricted = not app_cred['unrestricted'] - token_data['application_credential'] = {} - token_data['application_credential']['id'] = app_cred['id'] - token_data['application_credential']['name'] = app_cred['name'] - token_data['application_credential']['restricted'] = restricted - - def get_token_data(self, user_id, method_names, system=None, - domain_id=None, project_id=None, expires=None, - app_cred_id=None, trust=None, token=None, - include_catalog=True, bind=None, access_token=None, - issued_at=None, audit_info=None): - token_data = {'methods': method_names} - - # We've probably already written these to the token - if token: - for x in ('roles', 'user', 'catalog', 'project', 'domain'): - if x in token: - token_data[x] = token[x] - - if bind: - token_data['bind'] = bind - - self._populate_scope(token_data, system, domain_id, project_id) - if token_data.get('project'): - self._populate_is_admin_project(token_data) - self._populate_user(token_data, user_id, trust) - self._populate_roles(token_data, user_id, system, domain_id, - project_id, trust, app_cred_id, access_token) - self._populate_audit_info(token_data, audit_info) - - if include_catalog: - self._populate_service_catalog( - token_data, user_id, system, domain_id, project_id, trust - ) - self._populate_service_providers(token_data) - self._validate_identity_provider(token_data) - self._populate_token_dates(token_data, expires=expires, - issued_at=issued_at) - self._populate_oauth_section(token_data, access_token) - self._populate_app_cred(token_data, app_cred_id) - return {'token': token_data} - - -class BaseProvider(provider_api.ProviderAPIMixin, base.Provider): - def __init__(self, *args, **kwargs): - super(BaseProvider, self).__init__(*args, **kwargs) - self.v3_token_data_helper = V3TokenDataHelper() - - def get_token_version(self, token_data): - if token_data and isinstance(token_data, dict): - if 'token_version' in token_data: - if token_data['token_version'] in token_model.VERSIONS: - return token_data['token_version'] - if 'token' in token_data and 'methods' in token_data['token']: - return token_model.V3 - raise exception.UnsupportedTokenVersionException() - - def _is_mapped_token(self, auth_context): - return (federation_constants.IDENTITY_PROVIDER in auth_context and - federation_constants.PROTOCOL in auth_context) - - def _handle_mapped_tokens(self, auth_context, project_id, domain_id): - user_id = auth_context['user_id'] - group_ids = auth_context['group_ids'] - idp = auth_context[federation_constants.IDENTITY_PROVIDER] - protocol = auth_context[federation_constants.PROTOCOL] - - user_dict = PROVIDERS.identity_api.get_user(user_id) - user_name = user_dict['name'] - - token_data = { - 'user': { - 'id': user_id, - 'name': parse.unquote(user_name), - federation_constants.FEDERATION: { - 'groups': [{'id': x} for x in group_ids], - 'identity_provider': {'id': idp}, - 'protocol': {'id': protocol} - }, - 'domain': { - 'id': CONF.federation.federated_domain_name, - 'name': CONF.federation.federated_domain_name - } - } - } - - # FIXME(lbragstad): This will have to account for system-scoping, too. - if project_id or domain_id: - self.v3_token_data_helper.populate_roles_for_federated_user( - token_data, group_ids, project_id, domain_id, user_id) - - return token_data