Handle federated tokens

Federated tokens don't include domains in the user object.
Keystoneclient should be able to estimate whether the token is a
federated one and, if so, don't expect user domain information.
In case of the federated token keystoneclient returns None in response
to user_domain_name and user_domain_id calls.

Co-Authored-By: Steve Martinelli <stevemar@ca.ibm.com>

Closes-Bug: #1346820
Change-Id: I3453275fa1b0a41b1c015b0c3a92895a77d69a41
This commit is contained in:
Marek Denis
2014-09-12 17:24:59 +02:00
committed by Steve Martinelli
parent 3305c7be4b
commit 7006f9b008
5 changed files with 89 additions and 2 deletions

View File

@@ -388,6 +388,14 @@ class AccessInfo(dict):
""" """
raise NotImplementedError() raise NotImplementedError()
@property
def is_federated(self):
"""Returns true if federation was used to get the token.
:returns: boolean
"""
raise NotImplementedError()
class AccessInfoV2(AccessInfo): class AccessInfoV2(AccessInfo):
"""An object for encapsulating a raw v2 auth token from identity """An object for encapsulating a raw v2 auth token from identity
@@ -576,6 +584,10 @@ class AccessInfoV2(AccessInfo):
def oauth_consumer_id(self): def oauth_consumer_id(self):
return None return None
@property
def is_federated(self):
return False
class AccessInfoV3(AccessInfo): class AccessInfoV3(AccessInfo):
"""An object for encapsulating a raw v3 auth token from identity """An object for encapsulating a raw v3 auth token from identity
@@ -604,6 +616,10 @@ class AccessInfoV3(AccessInfo):
def has_service_catalog(self): def has_service_catalog(self):
return 'catalog' in self return 'catalog' in self
@property
def is_federated(self):
return 'OS-FEDERATION' in self['user']
@property @property
def expires(self): def expires(self):
return timeutils.parse_isotime(self['expires_at']) return timeutils.parse_isotime(self['expires_at'])
@@ -618,11 +634,21 @@ class AccessInfoV3(AccessInfo):
@property @property
def user_domain_id(self): def user_domain_id(self):
return self['user']['domain']['id'] try:
return self['user']['domain']['id']
except KeyError:
if self.is_federated:
return None
raise
@property @property
def user_domain_name(self): def user_domain_name(self):
return self['user']['domain']['name'] try:
return self['user']['domain']['name']
except KeyError:
if self.is_federated:
return None
raise
@property @property
def role_ids(self): def role_ids(self):

View File

@@ -25,6 +25,7 @@ from keystoneclient.fixture.discovery import * # noqa
from keystoneclient.fixture.exception import FixtureValidationError # noqa from keystoneclient.fixture.exception import FixtureValidationError # noqa
from keystoneclient.fixture.v2 import Token as V2Token # noqa from keystoneclient.fixture.v2 import Token as V2Token # noqa
from keystoneclient.fixture.v3 import Token as V3Token # noqa from keystoneclient.fixture.v3 import Token as V3Token # noqa
from keystoneclient.fixture.v3 import V3FederationToken # noqa
__all__ = ['DiscoveryList', __all__ = ['DiscoveryList',
'FixtureValidationError', 'FixtureValidationError',
@@ -32,4 +33,5 @@ __all__ = ['DiscoveryList',
'V3Discovery', 'V3Discovery',
'V2Token', 'V2Token',
'V3Token', 'V3Token',
'V3FederationToken',
] ]

View File

@@ -352,3 +352,31 @@ class Token(dict):
def set_oauth(self, access_token_id=None, consumer_id=None): def set_oauth(self, access_token_id=None, consumer_id=None):
self.oauth_access_token_id = access_token_id or uuid.uuid4().hex self.oauth_access_token_id = access_token_id or uuid.uuid4().hex
self.oauth_consumer_id = consumer_id or uuid.uuid4().hex self.oauth_consumer_id = consumer_id or uuid.uuid4().hex
class V3FederationToken(Token):
"""A V3 Keystone Federation token that can be used for testing.
Similar to V3Token, this object is designed to allow clients to generate
a correct V3 federation token for use in test code.
"""
def __init__(self, methods=None, identity_provider=None, protocol=None,
groups=None):
methods = methods or ['saml2']
super(V3FederationToken, self).__init__(methods=methods)
# NOTE(stevemar): Federated tokens do not have a domain for the user
del self._user['domain']
self.add_federation_info_to_user(identity_provider, protocol, groups)
def add_federation_info_to_user(self, identity_provider=None,
protocol=None, groups=None):
data = {
"OS-FEDERATION": {
"identity_provider": identity_provider or uuid.uuid4().hex,
"protocol": protocol or uuid.uuid4().hex,
"groups": groups or [{"id": uuid.uuid4().hex}]
}
}
self._user.update(data)
return data

View File

@@ -181,3 +181,10 @@ class AccessInfoTest(utils.TestCase):
auth_ref = access.AccessInfo.factory(body=token, auth_ref = access.AccessInfo.factory(body=token,
auth_token=new_auth_token) auth_token=new_auth_token)
self.assertEqual(new_auth_token, auth_ref.auth_token) self.assertEqual(new_auth_token, auth_ref.auth_token)
def test_federated_property_standard_token(self):
"""Check if is_federated property returns expected value."""
token = fixture.V3Token()
token.set_project_scope()
auth_ref = access.AccessInfo.factory(body=token)
self.assertFalse(auth_ref.is_federated)

View File

@@ -13,7 +13,9 @@
import copy import copy
import uuid import uuid
from keystoneclient import access
from keystoneclient import exceptions from keystoneclient import exceptions
from keystoneclient import fixture
from keystoneclient.tests.v3 import utils from keystoneclient.tests.v3 import utils
from keystoneclient.v3.contrib.federation import base from keystoneclient.v3.contrib.federation import base
from keystoneclient.v3.contrib.federation import identity_providers from keystoneclient.v3.contrib.federation import identity_providers
@@ -385,3 +387,25 @@ class FederationDomainTests(utils.TestCase):
self.assertEqual(len(domains_ref), len(returned_list)) self.assertEqual(len(domains_ref), len(returned_list))
for domain in returned_list: for domain in returned_list:
self.assertIsInstance(domain, self.model) self.assertIsInstance(domain, self.model)
class FederatedTokenTests(utils.TestCase):
def setUp(self):
super(FederatedTokenTests, self).setUp()
token = fixture.V3FederationToken()
token.set_project_scope()
token.add_role()
self.federated_token = access.AccessInfo.factory(body=token)
def test_federated_property_federated_token(self):
"""Check if is_federated property returns expected value."""
self.assertTrue(self.federated_token.is_federated)
def test_get_user_domain_name(self):
"""Ensure a federated user's domain name does not exist."""
self.assertIsNone(self.federated_token.user_domain_name)
def test_get_user_domain_id(self):
"""Ensure a federated user's domain ID does not exist."""
self.assertIsNone(self.federated_token.user_domain_id)