Cleanup keystone.token.providers.common

This module was a hodge-podge of common utility methods and a basic
implementation of the token provider API interface. In theory, if
something should be done for all providers, we should try and pull
it into a higher layer, like the token provider Manager. This makes
things easier to share without having to worry about reimplementing
something if we override a specific method of the interface. This is
the pattern we're working towards with the TokenModel object.

It was also home to the V3TokenDataHelper, which was ultimately
responsible for making sure the token API contracts were honored. Now
that we've moved token behavior into the TokenModel and the
representation of a token into the controllers, we don't need this
anymore. We should be able to make this much more clear and clean up
the interfaces for people providing their own token providers.

Partial-Bug: 1778945
Change-Id: I6f069c8c94e625ae553e9b41f0c54fd25bad9408
This commit is contained in:
Lance Bragstad 2018-06-22 18:52:25 +00:00
parent 7ba3be57a1
commit 3dbf4be06f
9 changed files with 43 additions and 717 deletions

View File

@ -25,7 +25,6 @@ from keystone.federation import constants as federation_constants
from keystone.federation import utils
from keystone.i18n import _
from keystone.models import token_model
from keystone.token.providers import common
CONF = keystone.conf.CONF
LOG = log.getLogger(__name__)
@ -83,12 +82,11 @@ class AuthContextMiddleware(provider_api.ProviderAPIMixin,
auth_context['roles'] = user_ref['roles']
else:
# it's the local user, so token data is needed.
token_helper = common.V3TokenDataHelper()
token_data = token_helper.get_token_data(
user_id=user_ref['id'],
method_names=[CONF.tokenless_auth.protocol],
domain_id=domain_id,
project_id=project_id)
token = token_model.TokenModel()
token.user_id = user_ref['id']
token.methods = [CONF.tokenless_auth.protocol]
token.domain_id = domain_id
token.project_id = project_id
auth_context = {'user_id': user_ref['id']}
auth_context['is_delegated_auth'] = False
@ -96,8 +94,7 @@ class AuthContextMiddleware(provider_api.ProviderAPIMixin,
auth_context['domain_id'] = domain_id
if project_id:
auth_context['project_id'] = project_id
auth_context['roles'] = [role['name'] for role
in token_data['token']['roles']]
auth_context['roles'] = [role['name'] for role in token.roles]
return auth_context
def _validate_trusted_issuer(self, request):

View File

@ -27,7 +27,7 @@ from keystone.revoke.backends import sql
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit import test_backend_sql
from keystone.token.providers import common
from keystone.token import provider
CONF = keystone.conf.CONF
@ -168,7 +168,7 @@ class RevokeTests(object):
# check to make sure that list_events matches the token to the event we
# just revoked.
first_token = _sample_blank_token()
first_token['audit_id'] = common.random_urlsafe_str()
first_token['audit_id'] = provider.random_urlsafe_str()
PROVIDERS.revoke_api.revoke_by_audit_id(
audit_id=first_token['audit_id'])
self._assertTokenRevoked(first_token)
@ -179,7 +179,7 @@ class RevokeTests(object):
# sure that list events only finds 1 match since there are 2 and they
# dont both have different populated audit_id fields
second_token = _sample_blank_token()
second_token['audit_id'] = common.random_urlsafe_str()
second_token['audit_id'] = provider.random_urlsafe_str()
PROVIDERS.revoke_api.revoke_by_audit_id(
audit_id=second_token['audit_id'])
self._assertTokenRevoked(second_token)
@ -212,7 +212,7 @@ class RevokeTests(object):
first_token = _sample_blank_token()
first_token['user_id'] = uuid.uuid4().hex
first_token['project_id'] = uuid.uuid4().hex
first_token['audit_id'] = common.random_urlsafe_str()
first_token['audit_id'] = provider.random_urlsafe_str()
# revoke event and then verify that there is only one revocation
# and verify the only revoked event is the token
PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
@ -241,7 +241,7 @@ class RevokeTests(object):
fourth_token = _sample_blank_token()
fourth_token['user_id'] = uuid.uuid4().hex
fourth_token['project_id'] = uuid.uuid4().hex
fourth_token['audit_id'] = common.random_urlsafe_str()
fourth_token['audit_id'] = provider.random_urlsafe_str()
PROVIDERS.revoke_api.revoke(revoke_model.RevokeEvent(
project_id=fourth_token['project_id'],
audit_id=fourth_token['audit_id']))
@ -380,7 +380,7 @@ class RevokeTests(object):
revocation_backend = sql.Revoke()
# Create our first token with audit_id
audit_id = common.build_audit_info(parent_audit_id=None)[0]
audit_id = provider.random_urlsafe_str()
token = _sample_blank_token()
# Audit ID and Audit Chain ID are populated with the same value
# if the token is an original token

View File

@ -15,6 +15,7 @@
import datetime
from oslo_utils import timeutils
from six.moves import urllib
from keystone.common import provider_api
from keystone.common import utils
@ -25,6 +26,7 @@ from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
from keystone import token
from keystone.token import provider
CONF = keystone.conf.CONF
@ -452,6 +454,10 @@ class TestTokenProvider(unit.TestCase):
)
self.load_backends()
def test_strings_are_url_safe(self):
s = provider.random_urlsafe_str()
self.assertEqual(s, urllib.parse.quote_plus(s))
def test_unsupported_token_provider(self):
self.config_fixture.config(group='token',
provider='MyProvider')

View File

@ -33,11 +33,13 @@ if not xmldsig:
xmldsig = importutils.try_import("xmldsig")
from keystone.auth import controllers as auth_controllers
from keystone.common import controller
from keystone.common import provider_api
import keystone.conf
from keystone import exception
from keystone.federation import controllers as federation_controllers
from keystone.federation import idp as keystone_idp
from keystone.models import token_model
from keystone import notifications
from keystone.tests import unit
from keystone.tests.unit import core
@ -45,7 +47,6 @@ from keystone.tests.unit import federation_fixtures
from keystone.tests.unit import ksfixtures
from keystone.tests.unit import mapping_fixtures
from keystone.tests.unit import test_v3
from keystone.token.providers import common as token_common
CONF = keystone.conf.CONF
@ -4741,8 +4742,6 @@ class K2KServiceCatalogTests(test_v3.RestfulTestCase):
PROVIDERS.federation_api.create_sp(self.SP3, sp)
self.sp_gamma = {self.SP3: sp}
self.token_v3_helper = token_common.V3TokenDataHelper()
def sp_response(self, id, ref):
ref.pop('enabled')
ref.pop('description')
@ -4774,7 +4773,10 @@ class K2KServiceCatalogTests(test_v3.RestfulTestCase):
def test_service_providers_in_token(self):
"""Check if service providers are listed in service catalog."""
token = self.token_v3_helper.get_token_data(self.user_id, ['password'])
model = token_model.TokenModel()
model.user_id = self.user_id
model.methods = ['password']
token = controller.render_token_response_from_model(model)
ref = {}
for r in (self.sp_alpha, self.sp_beta, self.sp_gamma):
ref.update(r)
@ -4791,7 +4793,10 @@ class K2KServiceCatalogTests(test_v3.RestfulTestCase):
sp_ref = {'enabled': False}
PROVIDERS.federation_api.update_sp(self.SP1, sp_ref)
token = self.token_v3_helper.get_token_data(self.user_id, ['password'])
model = token_model.TokenModel()
model.user_id = self.user_id
model.methods = ['password']
token = controller.render_token_response_from_model(model)
ref = {}
for r in (self.sp_beta, self.sp_gamma):
ref.update(r)
@ -4808,7 +4813,10 @@ class K2KServiceCatalogTests(test_v3.RestfulTestCase):
for sp in (self.SP1, self.SP2, self.SP3):
PROVIDERS.federation_api.update_sp(sp, sp_ref)
token = self.token_v3_helper.get_token_data(self.user_id, ['password'])
model = token_model.TokenModel()
model.user_id = self.user_id
model.methods = ['password']
token = controller.render_token_response_from_model(model)
self.assertNotIn('service_providers', token['token'],
message=('Expected Service Catalog not to have '
'service_providers'))

View File

@ -1,36 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from six.moves import urllib
from keystone.tests import unit
from keystone.token import provider
from keystone.token.providers import common
class TestTokenProvidersCommon(unit.TestCase):
def test_strings_are_url_safe(self):
s = common.random_urlsafe_str()
self.assertEqual(s, urllib.parse.quote_plus(s))
def test_unsupported_provider_raises_import_error(self):
namespace = "keystone.token.provider"
# Generate a random name
driver = uuid.uuid4().hex
self.config_fixture.config(group='token', provider=driver)
msg = "Unable to find '%(driver)s' driver in '%(namespace)s'." % {
'namespace': namespace, 'driver': driver
}
self.assertRaisesRegex(ImportError, msg, provider.Manager)

View File

@ -30,7 +30,7 @@ from keystone.federation import constants as federation_constants
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
from keystone.token.providers import common
from keystone.token import provider
from keystone.token.providers import fernet
from keystone.token import token_formatters
@ -236,7 +236,7 @@ class TestPayloads(unit.TestCase):
delta=1e-05)
def test_strings_can_be_converted_to_bytes(self):
s = common.random_urlsafe_str()
s = provider.random_urlsafe_str()
self.assertIsInstance(s, six.text_type)
b = token_formatters.BasePayload.random_urlsafe_str_to_bytes(s)
@ -293,7 +293,7 @@ class TestPayloads(unit.TestCase):
exp_user_id = exp_user_id or uuid.uuid4().hex
exp_methods = exp_methods or ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [common.random_urlsafe_str()]
exp_audit_ids = [provider.random_urlsafe_str()]
payload = payload_class.assemble(
exp_user_id, exp_methods, exp_system, exp_project_id,

View File

@ -1,56 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import uuid
from testtools import matchers
from keystone import exception
from keystone.tests import unit
from keystone.token.providers import common
class TestTokenDataHelper(unit.TestCase):
def setUp(self):
super(TestTokenDataHelper, self).setUp()
self.load_backends()
self.v3_data_helper = common.V3TokenDataHelper()
def test_v3_token_data_helper_populate_audit_info_string(self):
token_data = {}
audit_info_bytes = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
audit_info = audit_info_bytes.decode('utf-8')
self.v3_data_helper._populate_audit_info(token_data, audit_info)
self.assertIn(audit_info, token_data['audit_ids'])
self.assertThat(token_data['audit_ids'], matchers.HasLength(2))
def test_v3_token_data_helper_populate_audit_info_none(self):
token_data = {}
self.v3_data_helper._populate_audit_info(token_data, audit_info=None)
self.assertThat(token_data['audit_ids'], matchers.HasLength(1))
self.assertNotIn(None, token_data['audit_ids'])
def test_v3_token_data_helper_populate_audit_info_list(self):
token_data = {}
audit_info = [base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2],
base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]]
self.v3_data_helper._populate_audit_info(token_data, audit_info)
self.assertEqual(audit_info, token_data['audit_ids'])
def test_v3_token_data_helper_populate_audit_info_invalid(self):
token_data = {}
audit_info = dict()
self.assertRaises(exception.UnexpectedError,
self.v3_data_helper._populate_audit_info,
token_data=token_data,
audit_info=audit_info)

View File

@ -26,7 +26,7 @@ from keystone.models import token_model
from keystone.tests.unit import base_classes
from keystone.tests.unit import core
from keystone.tests.unit import test_token_provider
from keystone.token.providers import common as provider_common
from keystone.token import provider
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
@ -206,12 +206,12 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap):
def test_audit_id_attributes(self):
token = token_model.TokenModel()
audit_id = provider_common.random_urlsafe_str()
audit_id = provider.random_urlsafe_str()
token.audit_id = audit_id
self.assertTrue(len(token.audit_ids) == 1)
parent_audit_id = provider_common.random_urlsafe_str()
parent_audit_id = provider.random_urlsafe_str()
token.parent_audit_id = parent_audit_id
self.assertTrue(len(token.audit_ids) == 2)
@ -291,7 +291,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap):
token = token_model.TokenModel()
token.user_id = user['id']
token.system = 'all'
token.audit_id = provider_common.random_urlsafe_str()
token.audit_id = provider.random_urlsafe_str()
self.assertRaises(
exception.Unauthorized, token.mint, self.token_id, self.issued_at
@ -360,7 +360,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap):
token = token_model.TokenModel()
token.user_id = user['id']
token.domain_id = CONF.identity.default_domain_id
token.audit_id = provider_common.random_urlsafe_str()
token.audit_id = provider.random_urlsafe_str()
self.assertRaises(
exception.Unauthorized, token.mint, self.token_id, self.issued_at
@ -398,7 +398,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap):
token = token_model.TokenModel()
token.user_id = user['id']
token.project_id = self.project_id
token.audit_id = provider_common.random_urlsafe_str()
token.audit_id = provider.random_urlsafe_str()
self.assertRaises(
exception.Unauthorized, token.mint, self.token_id, self.issued_at
@ -412,7 +412,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap):
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.project_id = self.project_id
token.audit_id = provider_common.random_urlsafe_str()
token.audit_id = provider.random_urlsafe_str()
self.assertRaises(
exception.ProjectNotFound, token.mint, self.token_id,
@ -428,7 +428,7 @@ class TokenModelTests(base_classes.TestCaseWithBootstrap):
token = token_model.TokenModel()
token.user_id = self.admin_user_id
token.project_id = self.project_id
token.audit_id = provider_common.random_urlsafe_str()
token.audit_id = provider.random_urlsafe_str()
self.assertRaises(
exception.DomainNotFound, token.mint, self.token_id, self.issued_at

View File

@ -1,593 +0,0 @@
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import base64
import datetime
import itertools
import uuid
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import six
from six.moves.urllib import parse
from keystone.common import provider_api
from keystone.common import utils
import keystone.conf
from keystone import exception
from keystone.federation import constants as federation_constants
from keystone.i18n import _
from keystone.models import token_model
from keystone.token.providers import base
LOG = log.getLogger(__name__)
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
def default_expire_time():
"""Determine when a fresh token should expire.
Expiration time varies based on configuration (see ``[token] expiration``).
:returns: a naive UTC datetime.datetime object
"""
expire_delta = datetime.timedelta(seconds=CONF.token.expiration)
expires_at = timeutils.utcnow() + expire_delta
return expires_at.replace(microsecond=0)
def random_urlsafe_str():
"""Generate a random URL-safe string.
:rtype: six.text_type
"""
# chop the padding (==) off the end of the encoding to save space
return base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2].decode('utf-8')
def build_audit_info(parent_audit_id=None):
"""Build the audit data for a token.
If ``parent_audit_id`` is None, the list will be one element in length
containing a newly generated audit_id.
If ``parent_audit_id`` is supplied, the list will be two elements in length
containing a newly generated audit_id and the ``parent_audit_id``. The
``parent_audit_id`` will always be element index 1 in the resulting
list.
:param parent_audit_id: the audit of the original token in the chain
:type parent_audit_id: str
:returns: Keystone token audit data
"""
audit_id = random_urlsafe_str()
if parent_audit_id is not None:
return [audit_id, parent_audit_id]
return [audit_id]
class V3TokenDataHelper(provider_api.ProviderAPIMixin, object):
"""Token data helper."""
def __init__(self):
# Keep __init__ around to ensure dependency injection works.
super(V3TokenDataHelper, self).__init__()
def _get_filtered_domain(self, domain_id):
"""Ensure the domain is enabled and return domain id and name.
:param domain_id: The ID of the domain to validate
:returns: A dictionary containing two keys, the `id` of the domain and
the `name` of the domain.
"""
domain_ref = PROVIDERS.resource_api.get_domain(domain_id)
if not domain_ref.get('enabled'):
msg = _('Unable to validate token because domain %(id)s is '
'disabled') % {'id': domain_ref['id']}
LOG.warning(msg)
raise exception.DomainNotFound(msg)
return {'id': domain_ref['id'], 'name': domain_ref['name']}
def _get_filtered_project(self, project_id):
"""Ensure the project and parent domain is enabled.
:param project_id: The ID of the project to validate
:return: A dictionary containing up to three keys, the `id` of the
project, the `name` of the project, and the parent `domain`.
"""
project_ref = PROVIDERS.resource_api.get_project(project_id)
if not project_ref.get('enabled'):
msg = _('Unable to validate token because project %(id)s is '
'disabled') % {'id': project_ref['id']}
LOG.warning(msg)
raise exception.ProjectNotFound(msg)
filtered_project = {
'id': project_ref['id'],
'name': project_ref['name']}
if project_ref['domain_id'] is not None:
filtered_project['domain'] = (
self._get_filtered_domain(project_ref['domain_id']))
else:
# Projects acting as a domain do not have a domain_id attribute
filtered_project['domain'] = None
return filtered_project
def _populate_scope(self, token_data, system, domain_id, project_id):
if 'domain' in token_data or 'project' in token_data:
# scope already exist, no need to populate it again
return
if domain_id:
token_data['domain'] = self._get_filtered_domain(domain_id)
elif project_id:
token_data['project'] = self._get_filtered_project(project_id)
project_ref = PROVIDERS.resource_api.get_project(project_id)
token_data['is_domain'] = project_ref['is_domain']
elif system == 'all':
# NOTE(lbragstad): This might have to be more elegant in the future
# if, or when, keystone supports scoping a token to a specific
# service or region.
token_data['system'] = {'all': True}
def _populate_is_admin_project(self, token_data):
# TODO(ayoung): Support the ability for a project acting as a domain
# to be the admin project once the rest of the code for projects
# acting as domains is merged. Code will likely be:
# (r.admin_project_name == None and project['is_domain'] == True
# and project['name'] == r.admin_project_domain_name)
admin_project_name = CONF.resource.admin_project_name
admin_project_domain_name = CONF.resource.admin_project_domain_name
if not (admin_project_name and admin_project_domain_name):
return # admin project not enabled
project = token_data['project']
token_data['is_admin_project'] = (
project['name'] == admin_project_name and
project['domain']['name'] == admin_project_domain_name)
def _get_roles_for_user(self, user_id, system, domain_id, project_id):
roles = []
if system:
group_ids = [
group['id'] for
group in PROVIDERS.identity_api.list_groups_for_user(user_id)
]
group_roles = []
for group_id in group_ids:
roles = PROVIDERS.assignment_api.list_system_grants_for_group(
group_id
)
for role in roles:
group_roles.append(role)
user_roles = PROVIDERS.assignment_api.list_system_grants_for_user(
user_id
)
return itertools.chain(group_roles, user_roles)
if domain_id:
roles = PROVIDERS.assignment_api.get_roles_for_user_and_domain(
user_id, domain_id)
if project_id:
roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
user_id, project_id)
return [PROVIDERS.role_api.get_role(role_id) for role_id in roles]
def _get_app_cred_roles(self, app_cred, user_id, domain_id, project_id):
roles = app_cred['roles']
token_roles = []
for role in roles:
try:
role_ref = PROVIDERS.assignment_api.get_grant(
role['id'], user_id=user_id, domain_id=domain_id,
project_id=project_id)
token_roles.append(role_ref)
except exception.RoleAssignmentNotFound:
pass
return [
PROVIDERS.role_api.get_role(role['id']) for role in token_roles]
def populate_roles_for_federated_user(self, token_data, group_ids,
project_id=None, domain_id=None,
user_id=None, system=None):
"""Populate roles basing on provided groups and assignments.
Used for federated users with dynamically assigned groups.
This method does not return anything, yet it modifies token_data in
place.
:param token_data: a dictionary used for building token response
:param group_ids: list of group IDs a user is a member of
:param project_id: project ID to scope to
:param domain_id: domain ID to scope to
:param user_id: user ID
:param system: system scope if applicable
:raises keystone.exception.Unauthorized: when no roles were found
"""
def check_roles(roles, user_id, project_id, domain_id):
# User was granted roles so simply exit this function.
if roles:
return
if project_id:
msg = _('User %(user_id)s has no access '
'to project %(project_id)s') % {
'user_id': user_id,
'project_id': project_id}
elif domain_id:
msg = _('User %(user_id)s has no access '
'to domain %(domain_id)s') % {
'user_id': user_id,
'domain_id': domain_id}
# Since no roles were found a user is not authorized to
# perform any operations. Raise an exception with
# appropriate error message.
raise exception.Unauthorized(msg)
roles = PROVIDERS.assignment_api.get_roles_for_groups(
group_ids, project_id, domain_id
)
roles = roles + self._get_roles_for_user(
user_id, system, domain_id, project_id
)
# NOTE(lbragstad): Remove duplicate role references from a list of
# roles. It is often suggested that this be done with:
#
# roles = [dict(t) for t in set([tuple(d.items()) for d in roles])]
#
# But that doesn't actually remove duplicates in all cases and causes
# transient failures because dictionaries are unordered objects. This
# means {'id': 1, 'foo': 'bar'} and {'foo': 'bar', 'id': 1} won't
# actually resolve to a single entity in the above logic since they are
# both considered unique. By using `in` we're performing a containment
# check, which also does a deep comparison of the objects, which is
# what we want.
unique_roles = []
for role in roles:
if role not in unique_roles:
unique_roles.append(role)
check_roles(unique_roles, user_id, project_id, domain_id)
token_data['roles'] = unique_roles
def _populate_user(self, token_data, user_id, trust):
if 'user' in token_data:
# no need to repopulate user if it already exists
return
user_ref = PROVIDERS.identity_api.get_user(user_id)
if trust and 'OS-TRUST:trust' not in token_data:
trustor_user_ref = (PROVIDERS.identity_api.get_user(
trust['trustor_user_id']))
trustee_user_ref = (PROVIDERS.identity_api.get_user(
trust['trustee_user_id']))
try:
PROVIDERS.resource_api.assert_domain_enabled(
trustor_user_ref['domain_id'])
except AssertionError:
raise exception.TokenNotFound(_('Trustor domain is disabled.'))
try:
PROVIDERS.resource_api.assert_domain_enabled(
trustee_user_ref['domain_id'])
except AssertionError:
raise exception.TokenNotFound(_('Trustee domain is disabled.'))
try:
PROVIDERS.identity_api.assert_user_enabled(
trust['trustor_user_id']
)
except AssertionError:
raise exception.Forbidden(_('Trustor is disabled.'))
if trust['impersonation']:
user_ref = trustor_user_ref
token_data['OS-TRUST:trust'] = (
{
'id': trust['id'],
'trustor_user': {'id': trust['trustor_user_id']},
'trustee_user': {'id': trust['trustee_user_id']},
'impersonation': trust['impersonation']
})
filtered_user = {
'id': user_ref['id'],
'name': user_ref['name'],
'domain': self._get_filtered_domain(user_ref['domain_id']),
'password_expires_at': user_ref['password_expires_at']}
token_data['user'] = filtered_user
def _populate_oauth_section(self, token_data, access_token):
if access_token:
access_token_id = access_token['id']
consumer_id = access_token['consumer_id']
token_data['OS-OAUTH1'] = ({'access_token_id': access_token_id,
'consumer_id': consumer_id})
def _populate_roles(self, token_data, user_id, system, domain_id,
project_id, trust, app_cred_id, access_token):
if 'roles' in token_data:
# no need to repopulate roles
return
if access_token:
filtered_roles = []
access_token_ref = PROVIDERS.oauth_api.get_access_token(
access_token['id']
)
authed_role_ids = jsonutils.loads(access_token_ref['role_ids'])
all_roles = PROVIDERS.role_api.list_roles()
for role in all_roles:
for authed_role in authed_role_ids:
if authed_role == role['id']:
filtered_roles.append({'id': role['id'],
'name': role['name']})
token_data['roles'] = filtered_roles
return
if trust:
# If redelegated_trust_id is set, then we must traverse the
# trust_chain in order to determine who the original trustor is. We
# need to do this because the user ID of the original trustor helps
# us determine scope in the redelegated context.
if trust.get('redelegated_trust_id'):
trust_chain = PROVIDERS.trust_api.get_trust_pedigree(
trust['id']
)
token_user_id = trust_chain[-1]['trustor_user_id']
else:
token_user_id = trust['trustor_user_id']
token_project_id = trust['project_id']
# trusts do not support domains yet
token_domain_id = None
else:
token_user_id = user_id
token_project_id = project_id
token_domain_id = domain_id
if system or token_domain_id or token_project_id:
filtered_roles = []
if trust:
# First expand out any roles that were in the trust to include
# any implied roles, whether global or domain specific
refs = [{'role_id': role['id']} for role in trust['roles']]
effective_trust_roles = (
PROVIDERS.assignment_api.add_implied_roles(refs))
effective_trust_role_ids = (
set([r['role_id'] for r in effective_trust_roles])
)
# Now get the current role assignments for the trustor,
# including any domain specific roles.
assignments = PROVIDERS.assignment_api.list_role_assignments(
user_id=token_user_id,
system=system,
project_id=token_project_id,
effective=True, strip_domain_roles=False)
current_effective_trustor_roles = (
set([x['role_id'] for x in assignments]))
# Go through each of the effective trust roles, making sure the
# trustor still has them, if any have been removed, then we
# will treat the trust as invalid
for trust_role_id in effective_trust_role_ids:
if trust_role_id in current_effective_trustor_roles:
role = PROVIDERS.role_api.get_role(trust_role_id)
if role['domain_id'] is None:
filtered_roles.append(role)
else:
raise exception.Forbidden(
_('Trustee has no delegated roles.'))
elif app_cred_id:
app_cred_api = PROVIDERS.application_credential_api
app_cred_ref = app_cred_api.get_application_credential(
app_cred_id)
for role in self._get_app_cred_roles(app_cred_ref,
token_user_id,
token_domain_id,
token_project_id):
filtered_roles.append({'id': role['id'],
'name': role['name']})
else:
for role in self._get_roles_for_user(token_user_id,
system,
token_domain_id,
token_project_id):
filtered_roles.append({'id': role['id'],
'name': role['name']})
# user has no project or domain roles, therefore access denied
if not filtered_roles:
if token_project_id:
msg = _('User %(user_id)s has no access '
'to project %(project_id)s') % {
'user_id': user_id,
'project_id': token_project_id}
elif token_domain_id:
msg = _('User %(user_id)s has no access '
'to domain %(domain_id)s') % {
'user_id': user_id,
'domain_id': token_domain_id}
elif system:
msg = _('User %(user_id)s has no access '
'to the system') % {'user_id': user_id}
LOG.debug(msg)
raise exception.Unauthorized(msg)
token_data['roles'] = filtered_roles
def _populate_service_catalog(self, token_data, user_id, system, domain_id,
project_id, trust):
if 'catalog' in token_data:
# no need to repopulate service catalog
return
if trust:
user_id = trust['trustor_user_id']
# NOTE(lbragstad): The catalog API requires a project in order to
# generate a service catalog, but that appears to be only if there are
# endpoint -> project relationships. In the event we're dealing with a
# system_scoped token, we should pass None to the catalog API and just
# get a catalog anyway.
if project_id or domain_id or system:
service_catalog = PROVIDERS.catalog_api.get_v3_catalog(
user_id, project_id)
token_data['catalog'] = service_catalog
def _populate_service_providers(self, token_data):
if 'service_providers' in token_data:
return
service_providers = (
PROVIDERS.federation_api.get_enabled_service_providers()
)
if service_providers:
token_data['service_providers'] = service_providers
def _validate_identity_provider(self, token_data):
federated_info = token_data['user'].get('OS-FEDERATION')
if federated_info:
idp_id = federated_info['identity_provider']['id']
# FIXME(lbragstad): This isn't working properly because somewhere
# along the line we *were* encoding and decoding properly. This
# is needed to get some tests to pass in python 3. This will likely
# be fixed when the validate token path is moved over to using the
# token model, just like authenticate.
if isinstance(idp_id, bytes):
idp_id = idp_id.decode('utf-8')
PROVIDERS.federation_api.get_idp(idp_id)
def _populate_token_dates(self, token_data, expires=None, issued_at=None):
if not expires:
expires = default_expire_time()
if not isinstance(expires, six.string_types):
expires = utils.isotime(expires, subsecond=True)
token_data['expires_at'] = expires
token_data['issued_at'] = (issued_at or
utils.isotime(subsecond=True))
def _populate_audit_info(self, token_data, audit_info=None):
if audit_info is None or isinstance(audit_info, six.string_types):
token_data['audit_ids'] = build_audit_info(audit_info)
elif isinstance(audit_info, list):
token_data['audit_ids'] = audit_info
else:
msg = (_('Invalid audit info data type: %(data)s (%(type)s)') %
{'data': audit_info, 'type': type(audit_info)})
LOG.error(msg)
raise exception.UnexpectedError(msg)
def _populate_app_cred(self, token_data, app_cred_id):
if app_cred_id:
app_cred_api = PROVIDERS.application_credential_api
app_cred = app_cred_api.get_application_credential(app_cred_id)
restricted = not app_cred['unrestricted']
token_data['application_credential'] = {}
token_data['application_credential']['id'] = app_cred['id']
token_data['application_credential']['name'] = app_cred['name']
token_data['application_credential']['restricted'] = restricted
def get_token_data(self, user_id, method_names, system=None,
domain_id=None, project_id=None, expires=None,
app_cred_id=None, trust=None, token=None,
include_catalog=True, bind=None, access_token=None,
issued_at=None, audit_info=None):
token_data = {'methods': method_names}
# We've probably already written these to the token
if token:
for x in ('roles', 'user', 'catalog', 'project', 'domain'):
if x in token:
token_data[x] = token[x]
if bind:
token_data['bind'] = bind
self._populate_scope(token_data, system, domain_id, project_id)
if token_data.get('project'):
self._populate_is_admin_project(token_data)
self._populate_user(token_data, user_id, trust)
self._populate_roles(token_data, user_id, system, domain_id,
project_id, trust, app_cred_id, access_token)
self._populate_audit_info(token_data, audit_info)
if include_catalog:
self._populate_service_catalog(
token_data, user_id, system, domain_id, project_id, trust
)
self._populate_service_providers(token_data)
self._validate_identity_provider(token_data)
self._populate_token_dates(token_data, expires=expires,
issued_at=issued_at)
self._populate_oauth_section(token_data, access_token)
self._populate_app_cred(token_data, app_cred_id)
return {'token': token_data}
class BaseProvider(provider_api.ProviderAPIMixin, base.Provider):
def __init__(self, *args, **kwargs):
super(BaseProvider, self).__init__(*args, **kwargs)
self.v3_token_data_helper = V3TokenDataHelper()
def get_token_version(self, token_data):
if token_data and isinstance(token_data, dict):
if 'token_version' in token_data:
if token_data['token_version'] in token_model.VERSIONS:
return token_data['token_version']
if 'token' in token_data and 'methods' in token_data['token']:
return token_model.V3
raise exception.UnsupportedTokenVersionException()
def _is_mapped_token(self, auth_context):
return (federation_constants.IDENTITY_PROVIDER in auth_context and
federation_constants.PROTOCOL in auth_context)
def _handle_mapped_tokens(self, auth_context, project_id, domain_id):
user_id = auth_context['user_id']
group_ids = auth_context['group_ids']
idp = auth_context[federation_constants.IDENTITY_PROVIDER]
protocol = auth_context[federation_constants.PROTOCOL]
user_dict = PROVIDERS.identity_api.get_user(user_id)
user_name = user_dict['name']
token_data = {
'user': {
'id': user_id,
'name': parse.unquote(user_name),
federation_constants.FEDERATION: {
'groups': [{'id': x} for x in group_ids],
'identity_provider': {'id': idp},
'protocol': {'id': protocol}
},
'domain': {
'id': CONF.federation.federated_domain_name,
'name': CONF.federation.federated_domain_name
}
}
}
# FIXME(lbragstad): This will have to account for system-scoping, too.
if project_id or domain_id:
self.v3_token_data_helper.populate_roles_for_federated_user(
token_data, group_ids, project_id, domain_id, user_id)
return token_data