Create a test token generator and use it

All the clients are currently storing samples of keystone tokens so that
they can use them in testing. This is bad as they are often out of date
or contain data that they shouldn't.

Create a V2 Token generator and make use of that for generating tokens
within our tests.

Change-Id: I72928692142c967d13391752ba57b3bdf7c1feab
blueprint: share-tokens
This commit is contained in:
Jamie Lennox 2014-03-07 13:20:40 +10:00
parent b6cdfff5f8
commit d69461b18f
9 changed files with 345 additions and 201 deletions

View File

@ -372,7 +372,7 @@ class AccessInfoV2(AccessInfo):
@property
def role_names(self):
return [r['name'] for r in self['user']['roles']]
return [r['name'] for r in self['user'].get('roles', [])]
@property
def domain_name(self):

View File

@ -0,0 +1,27 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The generators in this directory produce keystone compliant tokens for use in
testing.
They should be considered part of the public API because they may be relied
upon to generate test tokens for other clients. However they should never be
imported into the main client (keystoneclient or other). Because of this there
may be dependencies from this module on libraries that are only available in
testing.
"""
from keystoneclient.fixture.exception import FixtureValidationError # noqa
from keystoneclient.fixture.v2 import Token as V2Token # noqa
__all__ = ['V2Token', 'FixtureValidationError']

View File

@ -0,0 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class FixtureValidationError(Exception):
"""The token you created is not legitimate.
The data contained in the token that was generated is not valid and would
not have been returned from a keystone server. You should not do testing
with this token.
"""

View File

@ -0,0 +1,160 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
from keystoneclient.fixture import exception
from keystoneclient.openstack.common import timeutils
class _Service(dict):
def add_endpoint(self, public, admin=None, internal=None,
tenant_id=None, **kwargs):
kwargs['tenantId'] = tenant_id or uuid.uuid4().hex
kwargs['publicURL'] = public
kwargs['adminURL'] = admin or public
kwargs['internalURL'] = internal or public
self['endpoints'].append(kwargs)
return kwargs
class Token(dict):
"""A V2 Keystone token that can be used for testing.
This object is designed to allow clients to generate a correct V2 token for
use in there test code. It should prevent clients from having to know the
correct token format and allow them to test the portions of token handling
that matter to them and not copy and paste sample.
"""
def __init__(self, token_id=None,
expires=None, tenant_id=None, tenant_name=None, user_id=None,
user_name=None, **kwargs):
super(Token, self).__init__(access=kwargs)
self.token_id = token_id or uuid.uuid4().hex
self.user_id = user_id or uuid.uuid4().hex
self.user_name = user_name or uuid.uuid4().hex
if not expires:
expires = timeutils.utcnow() + datetime.timedelta(days=1)
try:
self.expires = expires
except (TypeError, AttributeError):
# expires should be able to be passed as a string so ignore
self.expires_str = expires
if tenant_id or tenant_name:
self.set_scope(tenant_id, tenant_name)
@property
def root(self):
return self['access']
@property
def _token(self):
return self.root.setdefault('token', {})
@property
def token_id(self):
return self._token['id']
@token_id.setter
def token_id(self, value):
self._token['id'] = value
@property
def expires_str(self):
return self._token['expires']
@expires_str.setter
def expires_str(self, value):
self._token['expires'] = value
@property
def expires(self):
return timeutils.parse_isotime(self.expires_str)
@expires.setter
def expires(self, value):
self.expires_str = timeutils.isotime(value)
@property
def _user(self):
return self.root.setdefault('user', {})
@property
def user_id(self):
return self._user['id']
@user_id.setter
def user_id(self, value):
self._user['id'] = value
@property
def user_name(self):
return self._user['name']
@user_name.setter
def user_name(self, value):
self._user['name'] = value
@property
def tenant_id(self):
return self._token.get('tenant', {}).get('id')
@tenant_id.setter
def tenant_id(self, value):
self._token.setdefault('tenant', {})['id'] = value
@property
def tenant_name(self):
return self._token.get('tenant', {}).get('name')
@tenant_name.setter
def tenant_name(self, value):
self._token.setdefault('tenant', {})['name'] = value
def validate(self):
scoped = 'tenant' in self.token
catalog = self.root.get('serviceCatalog')
if catalog and not scoped:
msg = 'You cannot have a service catalog on an unscoped token'
raise exception.FixtureValidationError(msg)
if scoped and not self.user.get('roles'):
msg = 'You must have roles on a token to scope it'
raise exception.FixtureValidationError(msg)
def add_role(self, name=None, id=None, **kwargs):
roles = self._user.setdefault('roles', [])
kwargs['id'] = id or uuid.uuid4().hex
kwargs['name'] = name or uuid.uuid4().hex
roles.append(kwargs)
return kwargs
def add_service(self, type, name=None, **kwargs):
kwargs.setdefault('endpoints', [])
kwargs['name'] = name or uuid.uuid4().hex
service = _Service(type=type, **kwargs)
self.root.setdefault('serviceCatalog', []).append(service)
return service
def set_scope(self, id=None, name=None, **kwargs):
self._token['tenant'] = kwargs
self.tenant_id = id or uuid.uuid4().hex
self.tenant_name = name or uuid.uuid4().hex

View File

@ -27,7 +27,7 @@ except ImportError:
keyring = None
PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
PROJECT_SCOPED_TOKEN = client_fixtures.project_scoped_token()
# These mirror values from PROJECT_SCOPED_TOKEN
USERNAME = 'exampleuser'

View File

@ -12,171 +12,111 @@
from __future__ import unicode_literals
from keystoneclient import fixture
UNSCOPED_TOKEN = {
'access': {'serviceCatalog': {},
'token': {'expires': '2012-10-03T16:58:01Z',
'id': '3e2813b7ba0b4006840c3825860b86ed'},
'user': {'id': 'c4da488862bd435c9e6c0275a0d0e49a',
'name': 'exampleuser',
'roles': [],
'roles_links': [],
'username': 'exampleuser'}
}
}
_TENANT_ID = '225da22d3ce34b15877ea70b2a575f58'
def unscoped_token():
return fixture.V2Token(token_id='3e2813b7ba0b4006840c3825860b86ed',
expires='2012-10-03T16:58:01Z',
user_id='c4da488862bd435c9e6c0275a0d0e49a',
user_name='exampleuser')
PROJECT_SCOPED_TOKEN = {
'access': {
'serviceCatalog': [{
'endpoints': [{
'adminURL': 'http://admin:8776/v1/%s' % _TENANT_ID,
'internalURL': 'http://internal:8776/v1/%s' % _TENANT_ID,
'publicURL': 'http://public.com:8776/v1/%s' % _TENANT_ID,
'region': 'RegionOne'
}],
'endpoints_links': [],
'name': 'Volume Service',
'type': 'volume'},
{'endpoints': [{
'adminURL': 'http://admin:9292/v1',
'internalURL': 'http://internal:9292/v1',
'publicURL': 'http://public.com:9292/v1',
'region': 'RegionOne'
}],
'endpoints_links': [],
'name': 'Image Service',
'type': 'image'},
{'endpoints': [{
'adminURL': 'http://admin:8774/v2/%s' % _TENANT_ID,
'internalURL': 'http://internal:8774/v2/%s' % _TENANT_ID,
'publicURL': 'http://public.com:8774/v2/%s' % _TENANT_ID,
'region': 'RegionOne'
}],
'endpoints_links': [],
'name': 'Compute Service',
'type': 'compute'},
{'endpoints': [{
'adminURL': 'http://admin:8773/services/Admin',
'internalURL': 'http://internal:8773/services/Cloud',
'publicURL': 'http://public.com:8773/services/Cloud',
'region': 'RegionOne'
}],
'endpoints_links': [],
'name': 'EC2 Service',
'type': 'ec2'},
{'endpoints': [{
'adminURL': 'http://admin:35357/v2.0',
'internalURL': 'http://internal:5000/v2.0',
'publicURL': 'http://public.com:5000/v2.0',
'region': 'RegionOne'
}],
'endpoints_links': [],
'name': 'Identity Service',
'type': 'identity'}],
'token': {'expires': '2012-10-03T16:53:36Z',
'id': '04c7d5ffaeef485f9dc69c06db285bdb',
'tenant': {'description': '',
'enabled': True,
'id': '225da22d3ce34b15877ea70b2a575f58',
'name': 'exampleproject'}},
'user': {'id': 'c4da488862bd435c9e6c0275a0d0e49a',
'name': 'exampleuser',
'roles': [{'id': 'edc12489faa74ee0aca0b8a0b4d74a74',
'name': 'Member'}],
'roles_links': [],
'username': 'exampleuser'}
}
}
AUTH_RESPONSE_BODY = {
'access': {
'token': {
'id': 'ab48a9efdfedb23ty3494',
'expires': '2010-11-01T03:32:15-05:00',
'tenant': {
'id': '345',
'name': 'My Project'
}
},
'user': {
'id': '123',
'name': 'jqsmith',
'roles': [{
'id': '234',
'name': 'compute:admin'
}, {
'id': '235',
'name': 'object-store:admin',
'tenantId': '1'
}],
'roles_links': []
},
'serviceCatalog': [{
'name': 'Cloud Servers',
'type': 'compute',
'endpoints': [{
'tenantId': '1',
'publicURL': 'https://compute.north.host/v1/1234',
'internalURL': 'https://compute.north.host/v1/1234',
'region': 'North',
'versionId': '1.0',
'versionInfo': 'https://compute.north.host/v1.0/',
'versionList': 'https://compute.north.host/'
}, {
'tenantId': '2',
'publicURL': 'https://compute.north.host/v1.1/3456',
'internalURL': 'https://compute.north.host/v1.1/3456',
'region': 'North',
'versionId': '1.1',
'versionInfo': 'https://compute.north.host/v1.1/',
'versionList': 'https://compute.north.host/'
}],
'endpoints_links': []
}, {
'name': 'Cloud Files',
'type': 'object-store',
'endpoints': [{
'tenantId': '11',
'publicURL': 'https://swift.north.host/v1/blah',
'internalURL': 'https://swift.north.host/v1/blah',
'region': 'South',
'versionId': '1.0',
'versionInfo': 'uri',
'versionList': 'uri'
}, {
'tenantId': '2',
'publicURL': 'https://swift.north.host/v1.1/blah',
'internalURL': 'https://compute.north.host/v1.1/blah',
'region': 'South',
'versionId': '1.1',
'versionInfo': 'https://swift.north.host/v1.1/',
'versionList': 'https://swift.north.host/'
}],
'endpoints_links': [{
'rel': 'next',
'href': 'https://identity.north.host/v2.0/'
'endpoints?marker=2'
}]
}, {
'name': 'Image Servers',
'type': 'image',
'endpoints': [{
'publicURL': 'https://image.north.host/v1/',
'internalURL': 'https://image-internal.north.host/v1/',
'region': 'North'
}, {
'publicURL': 'https://image.south.host/v1/',
'internalURL': 'https://image-internal.south.host/v1/',
'region': 'South'
}],
'endpoints_links': []
}],
'serviceCatalog_links': [{
'rel': 'next',
'href': ('https://identity.host/v2.0/endpoints?'
'session=2hfh8Ar&marker=2')
}]
}
}
def project_scoped_token():
_TENANT_ID = '225da22d3ce34b15877ea70b2a575f58'
f = fixture.V2Token(token_id='04c7d5ffaeef485f9dc69c06db285bdb',
expires='2012-10-03T16:53:36Z',
tenant_id='225da22d3ce34b15877ea70b2a575f58',
tenant_name='exampleproject',
user_id='c4da488862bd435c9e6c0275a0d0e49a',
user_name='exampleuser')
f.add_role(id='edc12489faa74ee0aca0b8a0b4d74a74',
name='Member')
s = f.add_service('volume', 'Volume Service')
s.add_endpoint(public='http://public.com:8776/v1/%s' % _TENANT_ID,
admin='http://admin:8776/v1/%s' % _TENANT_ID,
internal='http://internal:8776/v1/%s' % _TENANT_ID,
region='RegionOne')
s = f.add_service('image', 'Image Service')
s.add_endpoint(public='http://public.com:9292/v1',
admin='http://admin:9292/v1',
internal='http://internal:9292/v1',
region='RegionOne')
s = f.add_service('compute', 'Compute Service')
s.add_endpoint(public='http://public.com:8774/v2/%s' % _TENANT_ID,
admin='http://admin:8774/v2/%s' % _TENANT_ID,
internal='http://internal:8774/v2/%s' % _TENANT_ID,
region='RegionOne')
s = f.add_service('ec2', 'EC2 Service')
s.add_endpoint(public='http://public.com:8773/services/Cloud',
admin='http://admin:8773/services/Admin',
internal='http://internal:8773/services/Cloud',
region='RegionOne')
s = f.add_service('identity', 'Identity Service')
s.add_endpoint(public='http://public.com:5000/v2.0',
admin='http://admin:35357/v2.0',
internal='http://internal:5000/v2.0',
region='RegionOne')
return f
def auth_response_body():
f = fixture.V2Token(token_id='ab48a9efdfedb23ty3494',
expires='2010-11-01T03:32:15-05:00',
tenant_id='345',
tenant_name='My Project',
user_id='123',
user_name='jqsmith')
f.add_role(id='234', name='compute:admin')
f.add_role(id='235', name='object-store:admin', tenantId='1')
s = f.add_service('compute', 'Cloud Servers')
s.add_endpoint(public='https://compute.north.host/v1/1234',
internal='https://compute.north.host/v1/1234',
region='North',
tenant_id='1',
versionId='1.0',
versionInfo='https://compute.north.host/v1.0/',
versionList='https://compute.north.host/')
s.add_endpoint(public='https://compute.north.host/v1.1/3456',
internal='https://compute.north.host/v1.1/3456',
region='North',
tenant_id='2',
versionId='1.1',
versionInfo='https://compute.north.host/v1.1/',
versionList='https://compute.north.host/')
s = f.add_service('object-store', 'Cloud Files')
s.add_endpoint(public='https://swift.north.host/v1/blah',
internal='https://swift.north.host/v1/blah',
region='South',
tenant_id='11',
versionId='1.0',
versionInfo='uri',
versionList='uri')
s.add_endpoint(public='https://swift.north.host/v1.1/blah',
internal='https://compute.north.host/v1.1/blah',
region='South',
tenant_id='2',
versionId='1.1',
versionInfo='https://swift.north.host/v1.1/',
versionList='https://swift.north.host/')
s = f.add_service('image', 'Image Servers')
s.add_endpoint(public='https://image.north.host/v1/',
internal='https://image-internal.north.host/v1/',
region='North')
s.add_endpoint(public='https://image.south.host/v1/',
internal='https://image-internal.south.host/v1/',
region='South')
return f

View File

@ -21,21 +21,16 @@ from keystoneclient.tests.v2_0 import client_fixtures
from keystoneclient.tests.v2_0 import utils
UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN
PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
class AccessInfoTest(utils.TestCase, testresources.ResourcedTestCase):
resources = [('examples', token_data.EXAMPLES_RESOURCE)]
def test_building_unscoped_accessinfo(self):
auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
token = client_fixtures.unscoped_token()
auth_ref = access.AccessInfo.factory(body=token)
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)
self.assertIn('serviceCatalog', auth_ref)
self.assertFalse(auth_ref['serviceCatalog'])
self.assertEqual(auth_ref.auth_token,
'3e2813b7ba0b4006840c3825860b86ed')
@ -60,19 +55,20 @@ class AccessInfoTest(utils.TestCase, testresources.ResourcedTestCase):
self.assertEqual(auth_ref.user_domain_id, 'default')
self.assertEqual(auth_ref.user_domain_name, 'Default')
self.assertEqual(auth_ref.expires, timeutils.parse_isotime(
UNSCOPED_TOKEN['access']['token']['expires']))
self.assertEqual(auth_ref.expires, token.expires)
def test_will_expire_soon(self):
token = client_fixtures.unscoped_token()
expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
UNSCOPED_TOKEN['access']['token']['expires'] = expires.isoformat()
auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
token.expires = expires
auth_ref = access.AccessInfo.factory(body=token)
self.assertFalse(auth_ref.will_expire_soon(stale_duration=120))
self.assertTrue(auth_ref.will_expire_soon(stale_duration=300))
self.assertFalse(auth_ref.will_expire_soon())
def test_building_scoped_accessinfo(self):
auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
auth_ref = access.AccessInfo.factory(
body=client_fixtures.project_scoped_token())
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)

View File

@ -10,12 +10,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import httpretty
from keystoneclient import exceptions
from keystoneclient import fixture
from keystoneclient.tests.v2_0 import client_fixtures
from keystoneclient.tests.v2_0 import utils
from keystoneclient.v2_0 import client
@ -25,7 +25,7 @@ class KeystoneClientTest(utils.TestCase):
@httpretty.activate
def test_unscoped_init(self):
self.stub_auth(json=client_fixtures.UNSCOPED_TOKEN)
self.stub_auth(json=client_fixtures.unscoped_token())
c = client.Client(username='exampleuser',
password='password',
@ -39,7 +39,7 @@ class KeystoneClientTest(utils.TestCase):
@httpretty.activate
def test_scoped_init(self):
self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN)
self.stub_auth(json=client_fixtures.project_scoped_token())
c = client.Client(username='exampleuser',
password='password',
@ -54,7 +54,7 @@ class KeystoneClientTest(utils.TestCase):
@httpretty.activate
def test_auth_ref_load(self):
self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN)
self.stub_auth(json=client_fixtures.project_scoped_token())
cl = client.Client(username='exampleuser',
password='password',
@ -75,7 +75,7 @@ class KeystoneClientTest(utils.TestCase):
@httpretty.activate
def test_auth_ref_load_with_overridden_arguments(self):
self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN)
self.stub_auth(json=client_fixtures.project_scoped_token())
cl = client.Client(username='exampleuser',
password='password',
@ -106,34 +106,38 @@ class KeystoneClientTest(utils.TestCase):
@httpretty.activate
def test_management_url_is_updated(self):
second = copy.deepcopy(client_fixtures.PROJECT_SCOPED_TOKEN)
first_url = 'http://admin:35357/v2.0'
second_url = "http://secondurl:%d/v2.0'"
first = fixture.V2Token()
first.set_scope()
admin_url = 'http://admin:35357/v2.0'
second_url = 'http://secondurl:35357/v2.0'
for entry in second['access']['serviceCatalog']:
if entry['type'] == 'identity':
entry['endpoints'] = [{'adminURL': second_url % 35357,
'internalURL': second_url % 5000,
'publicURL': second_url % 6000,
'region': 'RegionOne'}]
s = first.add_service('identity')
s.add_endpoint(public='http://public.com:5000/v2.0',
admin=admin_url)
self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN)
second = fixture.V2Token()
second.set_scope()
s = second.add_service('identity')
s.add_endpoint(public='http://secondurl:5000/v2.0',
admin=second_url)
self.stub_auth(json=first)
cl = client.Client(username='exampleuser',
password='password',
tenant_name='exampleproject',
auth_url=self.TEST_URL)
self.assertEqual(cl.management_url, first_url)
cl.authenticate()
self.assertEqual(cl.management_url, admin_url)
self.stub_auth(json=second)
cl.authenticate()
self.assertEqual(cl.management_url, second_url % 35357)
self.assertEqual(cl.management_url, second_url)
@httpretty.activate
def test_client_with_region_name_passes_to_service_catalog(self):
# NOTE(jamielennox): this is deprecated behaviour that should be
# removed ASAP, however must remain compatible.
self.stub_auth(json=client_fixtures.AUTH_RESPONSE_BODY)
self.stub_auth(json=client_fixtures.auth_response_body())
cl = client.Client(username='exampleuser',
password='password',

View File

@ -10,8 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from keystoneclient import access
from keystoneclient import exceptions
from keystoneclient.tests.v2_0 import client_fixtures
@ -21,8 +19,7 @@ from keystoneclient.tests.v2_0 import utils
class ServiceCatalogTest(utils.TestCase):
def setUp(self):
super(ServiceCatalogTest, self).setUp()
self.AUTH_RESPONSE_BODY = copy.deepcopy(
client_fixtures.AUTH_RESPONSE_BODY)
self.AUTH_RESPONSE_BODY = client_fixtures.auth_response_body()
def test_building_a_service_catalog(self):
auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)