Consolidate the fernet provider issue_v3_token()

When the Fernet token provider was implemented, it extended the
provider.common.py:BaseProvider class. It also overrode most all common methods
the BaseProvider implemented. Other token providers in Keystone (like the UUID
an PKI providers) just implemente a _get_token_id method because token ids may
be different across providers.

This commit removes the issue_v3_token() method from the fernet.Provider. This
ensure that the Fernet provider uses the same issue_v3_token() that all the
other token providers use.

Subsequent patches will do the same for issue_v2_token, validate_v3_token, and
validate_v2_token.

Change-Id: I03f56c9c84389a6d6cdb3a6863fcbfca486af337
This commit is contained in:
Lance Bragstad 2015-06-29 18:12:37 +00:00
parent 7a88015a9f
commit 91a0b29809
10 changed files with 137 additions and 77 deletions

View File

@ -4033,7 +4033,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase):
trustor_user_id=self.user_id,
trustee_user_id=trustee_user['id'],
project_id=self.project_id,
impersonation=True,
impersonation=False,
role_ids=[self.role_id])
# Create a trust

View File

@ -37,11 +37,8 @@ class TestFernetTokenProvider(tests.TestCase):
self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
self.provider = fernet.Provider()
def test_get_token_id_raises_not_implemented(self):
"""Test that an exception is raised when calling _get_token_id."""
token_data = {}
self.assertRaises(exception.NotImplemented,
self.provider._get_token_id, token_data)
def test_supports_bind_authentication_returns_false(self):
self.assertFalse(self.provider._supports_bind_authentication)
def test_invalid_v3_token_raises_401(self):
self.assertRaises(

View File

@ -0,0 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.tests import unit as tests
from keystone.token.providers import pki
class TestPkiTokenProvider(tests.TestCase):
def setUp(self):
super(TestPkiTokenProvider, self).setUp()
self.provider = pki.Provider()
def test_supports_bind_authentication_returns_true(self):
self.assertTrue(self.provider._supports_bind_authentication)
def test_need_persistence_return_true(self):
self.assertTrue(self.provider.needs_persistence)

View File

@ -0,0 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.tests import unit as tests
from keystone.token.providers import pkiz
class TestPkizTokenProvider(tests.TestCase):
def setUp(self):
super(TestPkizTokenProvider, self).setUp()
self.provider = pkiz.Provider()
def test_supports_bind_authentication_returns_true(self):
self.assertTrue(self.provider._supports_bind_authentication)
def test_need_persistence_return_true(self):
self.assertTrue(self.provider.needs_persistence)

View File

@ -0,0 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.tests import unit as tests
from keystone.token.providers import uuid
class TestUuidTokenProvider(tests.TestCase):
def setUp(self):
super(TestUuidTokenProvider, self).setUp()
self.provider = uuid.Provider()
def test_supports_bind_authentication_returns_true(self):
self.assertTrue(self.provider._supports_bind_authentication)
def test_need_persistence_return_true(self):
self.assertTrue(self.provider.needs_persistence)

View File

@ -497,6 +497,12 @@ class BaseProvider(provider.Provider):
project_id=None, domain_id=None, auth_context=None,
trust=None, metadata_ref=None, include_catalog=True,
parent_audit_id=None):
if auth_context and auth_context.get('bind'):
# NOTE(lbragstad): Check if the token provider being used actually
# supports bind authentication methods before proceeding.
if not self._supports_bind_authentication:
raise exception.NotImplemented()
# for V2, trust is stashed in metadata_ref
if (CONF.trust.enabled and not trust and metadata_ref and
'trust_id' in metadata_ref):

View File

@ -100,17 +100,19 @@ class Provider(common.BaseProvider):
which unpacks the values and builds the Fernet token.
"""
group_ids = token_data.get('user', {}).get(
group_ids = token_data['token'].get('user', {}).get(
federation.FEDERATION, {}).get('groups')
idp_id = token_data.get('user', {}).get(
idp_id = token_data['token'].get('user', {}).get(
federation.FEDERATION, {}).get('identity_provider', {}).get('id')
protocol_id = token_data.get('user', {}).get(
protocol_id = token_data['token'].get('user', {}).get(
federation.FEDERATION, {}).get('protocol', {}).get('id')
if not group_ids:
group_ids = list()
federated_dict = dict(group_ids=group_ids, idp_id=idp_id,
protocol_id=protocol_id)
return federated_dict
if group_ids:
federated_dict = dict(group_ids=group_ids, idp_id=idp_id,
protocol_id=protocol_id)
return federated_dict
return None
def _rebuild_federated_info(self, federated_dict, user_id):
"""Format federated information into the token reference.
@ -133,70 +135,6 @@ class Provider(common.BaseProvider):
token_dict['user']['name'] = user_id
return token_dict
def issue_v3_token(self, user_id, method_names, expires_at=None,
project_id=None, domain_id=None, auth_context=None,
trust=None, metadata_ref=None, include_catalog=True,
parent_audit_id=None):
"""Issue a V3 formatted token.
Here is where we need to detect what is given to us, and what kind of
token the user is expecting. Depending on the outcome of that, we can
pass all the information to be packed to the proper token format
handler.
:param user_id: ID of the user
:param method_names: method of authentication
:param expires_at: token expiration time
:param project_id: ID of the project being scoped to
:param domain_id: ID of the domain being scoped to
:param auth_context: authentication context
:param trust: ID of the trust
:param metadata_ref: metadata reference
:param include_catalog: return the catalog in the response if True,
otherwise don't return the catalog
:param parent_audit_id: ID of the parent audit entity
:returns: tuple containing the id of the token and the token data
"""
# TODO(lbragstad): Currently, Fernet tokens don't support bind in the
# token format. Raise a 501 if we're dealing with bind.
if auth_context.get('bind'):
raise exception.NotImplemented()
token_ref = None
# NOTE(lbragstad): This determines if we are dealing with a federated
# token or not. The groups for the user will be in the returned token
# reference.
federated_dict = None
if auth_context and self._is_mapped_token(auth_context):
token_ref = self._handle_mapped_tokens(
auth_context, project_id, domain_id)
federated_dict = self._build_federated_info(token_ref)
token_data = self.v3_token_data_helper.get_token_data(
user_id,
method_names,
auth_context.get('extras') if auth_context else None,
domain_id=domain_id,
project_id=project_id,
expires=expires_at,
trust=trust,
bind=auth_context.get('bind') if auth_context else None,
token=token_ref,
include_catalog=include_catalog,
audit_info=parent_audit_id)
token = self.token_formatter.create_token(
user_id,
token_data['token']['expires_at'],
token_data['token']['audit_ids'],
methods=method_names,
domain_id=domain_id,
project_id=project_id,
trust_id=token_data['token'].get('OS-TRUST:trust', {}).get('id'),
federated_info=federated_dict)
return token, token_data
def validate_v2_token(self, token_ref):
"""Validate a V2 formatted token.
@ -266,4 +204,21 @@ class Provider(common.BaseProvider):
:type token_data: dict
:raises keystone.exception.NotImplemented: when called
"""
raise exception.NotImplemented()
return self.token_formatter.create_token(
token_data['token']['user']['id'],
token_data['token']['expires_at'],
token_data['token']['audit_ids'],
methods=token_data['token'].get('methods'),
domain_id=token_data['token'].get('domain', {}).get('id'),
project_id=token_data['token'].get('project', {}).get('id'),
trust_id=token_data['token'].get('OS-TRUST:trust', {}).get('id'),
federated_info=self._build_federated_info(token_data)
)
@property
def _supports_bind_authentication(self):
"""Return if the token provider supports bind authentication methods.
:returns: False
"""
return False

View File

@ -48,6 +48,14 @@ class Provider(common.BaseProvider):
raise exception.UnexpectedError(_(
'Unable to sign token.'))
@property
def _supports_bind_authentication(self):
"""Return if the token provider supports bind authentication methods.
:returns: True
"""
return True
def needs_persistence(self):
"""Should the token be written to a backend."""
return True

View File

@ -46,6 +46,14 @@ class Provider(common.BaseProvider):
LOG.exception(ERROR_MESSAGE)
raise exception.UnexpectedError(ERROR_MESSAGE)
@property
def _supports_bind_authentication(self):
"""Return if the token provider supports bind authentication methods.
:returns: True
"""
return True
def needs_persistence(self):
"""Should the token be written to a backend."""
return True

View File

@ -28,6 +28,14 @@ class Provider(common.BaseProvider):
def _get_token_id(self, token_data):
return uuid.uuid4().hex
@property
def _supports_bind_authentication(self):
"""Return if the token provider supports bind authentication methods.
:returns: True
"""
return True
def needs_persistence(self):
"""Should the token be written to a backend."""
return True