Remove dead code for auth_context
Change-Id: Id96bdb9dc51640cef98c98c93400a503dd880371
This commit is contained in:
parent
cfbc2aa30b
commit
e9332a29b5
|
@ -22,7 +22,6 @@ from keystone.common.policies import base as pol_base
|
|||
from keystone.common import utils
|
||||
from keystone import conf
|
||||
from keystone import exception
|
||||
from keystone.i18n import _
|
||||
from keystone.models import token_model
|
||||
|
||||
|
||||
|
@ -35,107 +34,14 @@ SUBJECT_TOKEN_HEADER = 'X-Subject-Token'
|
|||
|
||||
|
||||
CONF = conf.CONF
|
||||
|
||||
# Environment variable used to convey the Keystone auth context,
|
||||
# the user credential used for policy enforcement.
|
||||
AUTH_CONTEXT_ENV = 'KEYSTONE_AUTH_CONTEXT'
|
||||
"""Environment variable used to convey the Keystone auth context.
|
||||
|
||||
Auth context is essentially the user credential used for policy enforcement.
|
||||
It is a dictionary with the following attributes:
|
||||
|
||||
* ``token``: Token from the request
|
||||
* ``user_id``: user ID of the principal
|
||||
* ``user_name``: user name of the principal
|
||||
* ``user_domain_id`` (optional): Domain ID of the principal if the principal
|
||||
has a domain.
|
||||
* ``user_domain_name`` (optional): Domain name of the principal if the
|
||||
principal has a domain.
|
||||
* ``project_id`` (optional): project ID of the scoped project if auth is
|
||||
project-scoped
|
||||
* ``project_name`` (optional): project name of the scoped project if auth is
|
||||
project-scoped
|
||||
* ``project_domain_id`` (optional): Domain ID of the scoped project if auth is
|
||||
project-scoped.
|
||||
* ``project_domain_name`` (optional): Domain name of the scoped project if auth
|
||||
is project-scoped.
|
||||
* ``domain_id`` (optional): domain ID of the scoped domain if auth is
|
||||
domain-scoped
|
||||
* ``domain_name`` (optional): domain name of the scoped domain if auth is
|
||||
domain-scoped
|
||||
* ``is_delegated_auth``: True if this is delegated (via trust or oauth)
|
||||
* ``trust_id``: Trust ID if trust-scoped, or None
|
||||
* ``trustor_id``: Trustor ID if trust-scoped, or None
|
||||
* ``trustee_id``: Trustee ID if trust-scoped, or None
|
||||
* ``consumer_id``: OAuth consumer ID, or None
|
||||
* ``access_token_id``: OAuth access token ID, or None
|
||||
* ``roles`` (optional): list of role names for the given scope
|
||||
* ``group_ids`` (optional): list of group IDs for which the API user has
|
||||
membership if token was for a federated user
|
||||
|
||||
"""
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def token_to_auth_context(token):
|
||||
if not isinstance(token, token_model.KeystoneToken):
|
||||
raise exception.UnexpectedError(_('token reference must be a '
|
||||
'KeystoneToken type, got: %s') %
|
||||
type(token))
|
||||
auth_context = {'token': token,
|
||||
'is_delegated_auth': False}
|
||||
try:
|
||||
auth_context['user_id'] = token.user_id
|
||||
except KeyError:
|
||||
LOG.warning('RBAC: Invalid user data in token')
|
||||
raise exception.Unauthorized(_('No user_id in token'))
|
||||
auth_context['user_name'] = token.user_name
|
||||
auth_context['user_domain_id'] = token.user_domain_id
|
||||
auth_context['user_domain_name'] = token.user_domain_name
|
||||
|
||||
if token.project_scoped:
|
||||
auth_context['project_id'] = token.project_id
|
||||
auth_context['project_name'] = token.project_name
|
||||
auth_context['project_domain_id'] = token.project_domain_id
|
||||
auth_context['project_domain_name'] = token.project_domain_name
|
||||
auth_context['is_domain'] = token.is_domain
|
||||
elif token.domain_scoped:
|
||||
auth_context['domain_id'] = token.domain_id
|
||||
auth_context['domain_name'] = token.domain_name
|
||||
else:
|
||||
LOG.debug('RBAC: Proceeding without project or domain scope')
|
||||
|
||||
if token.trust_scoped:
|
||||
auth_context['is_delegated_auth'] = True
|
||||
auth_context['trust_id'] = token.trust_id
|
||||
auth_context['trustor_id'] = token.trustor_user_id
|
||||
auth_context['trustee_id'] = token.trustee_user_id
|
||||
else:
|
||||
# NOTE(lbragstad): These variables will already be set to None but we
|
||||
# add the else statement here for readability.
|
||||
auth_context['trust_id'] = None
|
||||
auth_context['trustor_id'] = None
|
||||
auth_context['trustee_id'] = None
|
||||
|
||||
roles = token.role_names
|
||||
if roles:
|
||||
auth_context['roles'] = roles
|
||||
|
||||
if token.oauth_scoped:
|
||||
auth_context['is_delegated_auth'] = True
|
||||
auth_context['consumer_id'] = token.oauth_consumer_id
|
||||
auth_context['access_token_id'] = token.oauth_access_token_id
|
||||
else:
|
||||
# NOTE(lbragstad): These variables will already be set to None but we
|
||||
# add the else statement here for readability.
|
||||
auth_context['consumer_id'] = None
|
||||
auth_context['access_token_id'] = None
|
||||
|
||||
if token.is_federated_user:
|
||||
auth_context['group_ids'] = token.federation_group_ids
|
||||
|
||||
auth_context['is_admin_project'] = token.is_admin_project
|
||||
return auth_context
|
||||
|
||||
|
||||
def assert_admin(app, request):
|
||||
"""Ensure the user is an admin.
|
||||
|
||||
|
|
|
@ -1,167 +0,0 @@
|
|||
# Copyright 2015 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import copy
|
||||
import uuid
|
||||
|
||||
from keystone.common import authorization
|
||||
from keystone import exception
|
||||
from keystone.federation import constants as federation_constants
|
||||
from keystone.models import token_model
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import test_token_provider
|
||||
|
||||
|
||||
class TestTokenToAuthContext(unit.BaseTestCase):
|
||||
def test_token_is_project_scoped_with_trust(self):
|
||||
# Check auth_context result when the token is project-scoped and has
|
||||
# trust info.
|
||||
|
||||
# SAMPLE_V3_TOKEN has OS-TRUST:trust in it.
|
||||
token_data = test_token_provider.SAMPLE_V3_TOKEN
|
||||
token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
|
||||
token_data=token_data)
|
||||
|
||||
auth_context = authorization.token_to_auth_context(token)
|
||||
|
||||
self.assertEqual(token, auth_context['token'])
|
||||
self.assertTrue(auth_context['is_delegated_auth'])
|
||||
self.assertEqual(token_data['token']['user']['id'],
|
||||
auth_context['user_id'])
|
||||
self.assertEqual(token_data['token']['user']['name'],
|
||||
auth_context['user_name'])
|
||||
self.assertEqual(token_data['token']['user']['domain']['id'],
|
||||
auth_context['user_domain_id'])
|
||||
self.assertEqual(token_data['token']['user']['domain']['name'],
|
||||
auth_context['user_domain_name'])
|
||||
self.assertEqual(token_data['token']['project']['id'],
|
||||
auth_context['project_id'])
|
||||
self.assertEqual(token_data['token']['project']['domain']['id'],
|
||||
auth_context['project_domain_id'])
|
||||
self.assertEqual(token_data['token']['project']['domain']['name'],
|
||||
auth_context['project_domain_name'])
|
||||
self.assertNotIn('domain_id', auth_context)
|
||||
self.assertNotIn('domain_name', auth_context)
|
||||
self.assertEqual(token_data['token']['OS-TRUST:trust']['id'],
|
||||
auth_context['trust_id'])
|
||||
self.assertEqual(
|
||||
token_data['token']['OS-TRUST:trust']['trustor_user_id'],
|
||||
auth_context['trustor_id'])
|
||||
self.assertEqual(
|
||||
token_data['token']['OS-TRUST:trust']['trustee_user_id'],
|
||||
auth_context['trustee_id'])
|
||||
self.assertItemsEqual(
|
||||
[r['name'] for r in token_data['token']['roles']],
|
||||
auth_context['roles'])
|
||||
self.assertIsNone(auth_context['consumer_id'])
|
||||
self.assertIsNone(auth_context['access_token_id'])
|
||||
self.assertNotIn('group_ids', auth_context)
|
||||
|
||||
def test_token_is_domain_scoped(self):
|
||||
# Check contents of auth_context when token is domain-scoped.
|
||||
token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
|
||||
del token_data['token']['project']
|
||||
|
||||
domain_id = uuid.uuid4().hex
|
||||
domain_name = uuid.uuid4().hex
|
||||
token_data['token']['domain'] = {'id': domain_id, 'name': domain_name}
|
||||
|
||||
token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
|
||||
token_data=token_data)
|
||||
|
||||
auth_context = authorization.token_to_auth_context(token)
|
||||
|
||||
self.assertNotIn('project_id', auth_context)
|
||||
self.assertNotIn('project_domain_id', auth_context)
|
||||
|
||||
self.assertEqual(domain_id, auth_context['domain_id'])
|
||||
self.assertEqual(domain_name, auth_context['domain_name'])
|
||||
|
||||
def test_token_is_unscoped(self):
|
||||
# Check contents of auth_context when the token is unscoped.
|
||||
token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
|
||||
del token_data['token']['project']
|
||||
|
||||
token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
|
||||
token_data=token_data)
|
||||
|
||||
auth_context = authorization.token_to_auth_context(token)
|
||||
|
||||
self.assertNotIn('project_id', auth_context)
|
||||
self.assertNotIn('project_domain_id', auth_context)
|
||||
self.assertNotIn('domain_id', auth_context)
|
||||
self.assertNotIn('domain_name', auth_context)
|
||||
|
||||
def test_token_is_for_federated_user(self):
|
||||
# When the token is for a federated user then group_ids is in
|
||||
# auth_context.
|
||||
token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
|
||||
|
||||
group_ids = [uuid.uuid4().hex for x in range(1, 5)]
|
||||
|
||||
federation_data = {'identity_provider': {'id': uuid.uuid4().hex},
|
||||
'protocol': {'id': 'saml2'},
|
||||
'groups': [{'id': gid} for gid in group_ids]}
|
||||
token_data['token']['user'][federation_constants.FEDERATION] = (
|
||||
federation_data)
|
||||
|
||||
token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
|
||||
token_data=token_data)
|
||||
|
||||
auth_context = authorization.token_to_auth_context(token)
|
||||
|
||||
self.assertItemsEqual(group_ids, auth_context['group_ids'])
|
||||
|
||||
def test_oauth_variables_set_for_oauth_token(self):
|
||||
token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
|
||||
access_token_id = uuid.uuid4().hex
|
||||
consumer_id = uuid.uuid4().hex
|
||||
token_data['token']['OS-OAUTH1'] = {'access_token_id': access_token_id,
|
||||
'consumer_id': consumer_id}
|
||||
token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
|
||||
token_data=token_data)
|
||||
|
||||
auth_context = authorization.token_to_auth_context(token)
|
||||
|
||||
self.assertEqual(access_token_id, auth_context['access_token_id'])
|
||||
self.assertEqual(consumer_id, auth_context['consumer_id'])
|
||||
|
||||
def test_oauth_variables_not_set(self):
|
||||
token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
|
||||
token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
|
||||
token_data=token_data)
|
||||
|
||||
auth_context = authorization.token_to_auth_context(token)
|
||||
|
||||
self.assertIsNone(auth_context['access_token_id'])
|
||||
self.assertIsNone(auth_context['consumer_id'])
|
||||
|
||||
def test_token_is_not_KeystoneToken_raises_exception(self):
|
||||
# If the token isn't a KeystoneToken then an UnexpectedError exception
|
||||
# is raised.
|
||||
self.assertRaises(exception.UnexpectedError,
|
||||
authorization.token_to_auth_context, {})
|
||||
|
||||
def test_user_id_missing_in_token_raises_exception(self):
|
||||
# If there's no user ID in the token then an Unauthorized
|
||||
# exception is raised.
|
||||
token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
|
||||
del token_data['token']['user']['id']
|
||||
|
||||
token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
|
||||
token_data=token_data)
|
||||
|
||||
self.assertRaises(exception.Unauthorized,
|
||||
authorization.token_to_auth_context, token)
|
Loading…
Reference in New Issue