keystone/keystone/tests/test_v3.py

1281 lines
47 KiB
Python

# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
from oslo.serialization import jsonutils
from oslo_utils import timeutils
import six
from testtools import matchers
from keystone import auth
from keystone.common import authorization
from keystone.common import cache
from keystone import config
from keystone import exception
from keystone import middleware
from keystone.policy.backends import rules
from keystone import tests
from keystone.tests import rest
CONF = config.CONF
DEFAULT_DOMAIN_ID = 'default'
TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
class AuthTestMixin(object):
"""To hold auth building helper functions."""
def build_auth_scope(self, project_id=None, project_name=None,
project_domain_id=None, project_domain_name=None,
domain_id=None, domain_name=None, trust_id=None):
scope_data = {}
if project_id or project_name:
scope_data['project'] = {}
if project_id:
scope_data['project']['id'] = project_id
else:
scope_data['project']['name'] = project_name
if project_domain_id or project_domain_name:
project_domain_json = {}
if project_domain_id:
project_domain_json['id'] = project_domain_id
else:
project_domain_json['name'] = project_domain_name
scope_data['project']['domain'] = project_domain_json
if domain_id or domain_name:
scope_data['domain'] = {}
if domain_id:
scope_data['domain']['id'] = domain_id
else:
scope_data['domain']['name'] = domain_name
if trust_id:
scope_data['OS-TRUST:trust'] = {}
scope_data['OS-TRUST:trust']['id'] = trust_id
return scope_data
def build_password_auth(self, user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
password=None):
password_data = {'user': {}}
if user_id:
password_data['user']['id'] = user_id
else:
password_data['user']['name'] = username
if user_domain_id or user_domain_name:
password_data['user']['domain'] = {}
if user_domain_id:
password_data['user']['domain']['id'] = user_domain_id
else:
password_data['user']['domain']['name'] = user_domain_name
password_data['user']['password'] = password
return password_data
def build_token_auth(self, token):
return {'id': token}
def build_authentication_request(self, token=None, user_id=None,
username=None, user_domain_id=None,
user_domain_name=None, password=None,
kerberos=False, **kwargs):
"""Build auth dictionary.
It will create an auth dictionary based on all the arguments
that it receives.
"""
auth_data = {}
auth_data['identity'] = {'methods': []}
if kerberos:
auth_data['identity']['methods'].append('kerberos')
auth_data['identity']['kerberos'] = {}
if token:
auth_data['identity']['methods'].append('token')
auth_data['identity']['token'] = self.build_token_auth(token)
if user_id or username:
auth_data['identity']['methods'].append('password')
auth_data['identity']['password'] = self.build_password_auth(
user_id, username, user_domain_id, user_domain_name, password)
if kwargs:
auth_data['scope'] = self.build_auth_scope(**kwargs)
return {'auth': auth_data}
class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
AuthTestMixin):
def config_files(self):
config_files = super(RestfulTestCase, self).config_files()
config_files.append(tests.dirs.tests_conf('backend_sql.conf'))
return config_files
def get_extensions(self):
extensions = set(['revoke'])
if hasattr(self, 'EXTENSION_NAME'):
extensions.add(self.EXTENSION_NAME)
return extensions
def generate_paste_config(self):
new_paste_file = None
try:
new_paste_file = tests.generate_paste_config(self.EXTENSION_TO_ADD)
except AttributeError:
# no need to report this error here, as most tests will not have
# EXTENSION_TO_ADD defined.
pass
finally:
return new_paste_file
def remove_generated_paste_config(self):
try:
tests.remove_generated_paste_config(self.EXTENSION_TO_ADD)
except AttributeError:
pass
def setUp(self, app_conf='keystone'):
"""Setup for v3 Restful Test Cases.
"""
new_paste_file = self.generate_paste_config()
self.addCleanup(self.remove_generated_paste_config)
if new_paste_file:
app_conf = 'config:%s' % (new_paste_file)
super(RestfulTestCase, self).setUp(app_conf=app_conf)
self.empty_context = {'environment': {}}
# drop the policy rules
self.addCleanup(rules.reset)
def load_backends(self):
# ensure the cache region instance is setup
cache.configure_cache_region(cache.REGION)
super(RestfulTestCase, self).load_backends()
def load_fixtures(self, fixtures):
self.load_sample_data()
def _populate_default_domain(self):
if CONF.database.connection == tests.IN_MEM_DB_CONN_STRING:
# NOTE(morganfainberg): If an in-memory db is being used, be sure
# to populate the default domain, this is typically done by
# a migration, but the in-mem db uses model definitions to create
# the schema (no migrations are run).
try:
self.assignment_api.get_domain(DEFAULT_DOMAIN_ID)
except exception.DomainNotFound:
domain = {'description': (u'Owns users and tenants (i.e. '
u'projects) available on Identity '
u'API v2.'),
'enabled': True,
'id': DEFAULT_DOMAIN_ID,
'name': u'Default'}
self.assignment_api.create_domain(DEFAULT_DOMAIN_ID, domain)
def load_sample_data(self):
self._populate_default_domain()
self.domain_id = uuid.uuid4().hex
self.domain = self.new_domain_ref()
self.domain['id'] = self.domain_id
self.assignment_api.create_domain(self.domain_id, self.domain)
self.project_id = uuid.uuid4().hex
self.project = self.new_project_ref(
domain_id=self.domain_id)
self.project['id'] = self.project_id
self.assignment_api.create_project(self.project_id, self.project)
self.user = self.new_user_ref(domain_id=self.domain_id)
password = self.user['password']
self.user = self.identity_api.create_user(self.user)
self.user['password'] = password
self.user_id = self.user['id']
self.default_domain_project_id = uuid.uuid4().hex
self.default_domain_project = self.new_project_ref(
domain_id=DEFAULT_DOMAIN_ID)
self.default_domain_project['id'] = self.default_domain_project_id
self.assignment_api.create_project(self.default_domain_project_id,
self.default_domain_project)
self.default_domain_user = self.new_user_ref(
domain_id=DEFAULT_DOMAIN_ID)
password = self.default_domain_user['password']
self.default_domain_user = (
self.identity_api.create_user(self.default_domain_user))
self.default_domain_user['password'] = password
self.default_domain_user_id = self.default_domain_user['id']
# create & grant policy.json's default role for admin_required
self.role_id = uuid.uuid4().hex
self.role = self.new_role_ref()
self.role['id'] = self.role_id
self.role['name'] = 'admin'
self.role_api.create_role(self.role_id, self.role)
self.assignment_api.add_role_to_user_and_project(
self.user_id, self.project_id, self.role_id)
self.assignment_api.add_role_to_user_and_project(
self.default_domain_user_id, self.default_domain_project_id,
self.role_id)
self.assignment_api.add_role_to_user_and_project(
self.default_domain_user_id, self.project_id,
self.role_id)
self.region_id = uuid.uuid4().hex
self.region = self.new_region_ref()
self.region['id'] = self.region_id
self.catalog_api.create_region(
self.region.copy())
self.service_id = uuid.uuid4().hex
self.service = self.new_service_ref()
self.service['id'] = self.service_id
self.catalog_api.create_service(
self.service_id,
self.service.copy())
self.endpoint_id = uuid.uuid4().hex
self.endpoint = self.new_endpoint_ref(service_id=self.service_id)
self.endpoint['id'] = self.endpoint_id
self.endpoint['region_id'] = self.region['id']
self.catalog_api.create_endpoint(
self.endpoint_id,
self.endpoint.copy())
# The server adds 'enabled' and defaults to True.
self.endpoint['enabled'] = True
def new_ref(self):
"""Populates a ref with attributes common to all API entities."""
return {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
'enabled': True}
def new_region_ref(self):
ref = self.new_ref()
# Region doesn't have name or enabled.
del ref['name']
del ref['enabled']
ref['parent_region_id'] = None
return ref
def new_service_ref(self):
ref = self.new_ref()
ref['type'] = uuid.uuid4().hex
ref['name'] = uuid.uuid4().hex
return ref
def new_endpoint_ref(self, service_id, interface='public', **kwargs):
ref = self.new_ref()
del ref['enabled'] # enabled is optional
ref['interface'] = interface
ref['service_id'] = service_id
ref['url'] = 'https://' + uuid.uuid4().hex + '.com'
ref['region_id'] = self.region_id
ref.update(kwargs)
return ref
def new_domain_ref(self):
ref = self.new_ref()
return ref
def new_project_ref(self, domain_id, parent_id=None):
ref = self.new_ref()
ref['domain_id'] = domain_id
ref['parent_id'] = parent_id
return ref
def new_user_ref(self, domain_id, project_id=None):
ref = self.new_ref()
ref['domain_id'] = domain_id
ref['email'] = uuid.uuid4().hex
ref['password'] = uuid.uuid4().hex
if project_id:
ref['default_project_id'] = project_id
return ref
def new_group_ref(self, domain_id):
ref = self.new_ref()
ref['domain_id'] = domain_id
return ref
def new_credential_ref(self, user_id, project_id=None, cred_type=None):
ref = dict()
ref['id'] = uuid.uuid4().hex
ref['user_id'] = user_id
if cred_type == 'ec2':
ref['type'] = 'ec2'
ref['blob'] = {'blah': 'test'}
else:
ref['type'] = 'cert'
ref['blob'] = uuid.uuid4().hex
if project_id:
ref['project_id'] = project_id
return ref
def new_role_ref(self):
ref = self.new_ref()
# Roles don't have a description or the enabled flag
del ref['description']
del ref['enabled']
return ref
def new_policy_ref(self):
ref = self.new_ref()
ref['blob'] = uuid.uuid4().hex
ref['type'] = uuid.uuid4().hex
return ref
def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None,
impersonation=None, expires=None, role_ids=None,
role_names=None, remaining_uses=None,
allow_redelegation=False):
ref = dict()
ref['id'] = uuid.uuid4().hex
ref['trustor_user_id'] = trustor_user_id
ref['trustee_user_id'] = trustee_user_id
ref['impersonation'] = impersonation or False
ref['project_id'] = project_id
ref['remaining_uses'] = remaining_uses
ref['allow_redelegation'] = allow_redelegation
if isinstance(expires, six.string_types):
ref['expires_at'] = expires
elif isinstance(expires, dict):
ref['expires_at'] = timeutils.strtime(
timeutils.utcnow() + datetime.timedelta(**expires),
fmt=TIME_FORMAT)
elif expires is None:
pass
else:
raise NotImplementedError('Unexpected value for "expires"')
role_ids = role_ids or []
role_names = role_names or []
if role_ids or role_names:
ref['roles'] = []
for role_id in role_ids:
ref['roles'].append({'id': role_id})
for role_name in role_names:
ref['roles'].append({'name': role_name})
return ref
def create_new_default_project_for_user(self, user_id, domain_id,
enable_project=True):
ref = self.new_project_ref(domain_id=domain_id)
ref['enabled'] = enable_project
r = self.post('/projects', body={'project': ref})
project = self.assertValidProjectResponse(r, ref)
# set the user's preferred project
body = {'user': {'default_project_id': project['id']}}
r = self.patch('/users/%(user_id)s' % {
'user_id': user_id},
body=body)
self.assertValidUserResponse(r)
return project
def admin_request(self, *args, **kwargs):
"""Translates XML responses to dicts.
This implies that we only have to write assertions for JSON.
"""
r = super(RestfulTestCase, self).admin_request(*args, **kwargs)
return r
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
r = self.admin_request(
method='POST',
path='/v3/auth/tokens',
body={
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'name': self.user['name'],
'password': self.user['password'],
'domain': {
'id': self.user['domain_id']
}
}
}
},
'scope': {
'project': {
'id': self.project['id'],
}
}
}
})
return r.headers.get('X-Subject-Token')
def get_requested_token(self, auth):
"""Request the specific token we want."""
r = self.v3_authenticate_token(auth)
return r.headers.get('X-Subject-Token')
def v3_authenticate_token(self, auth, expected_status=201):
return self.admin_request(method='POST',
path='/v3/auth/tokens',
body=auth,
expected_status=expected_status)
def v3_noauth_request(self, path, **kwargs):
# request does not require auth token header
path = '/v3' + path
return self.admin_request(path=path, **kwargs)
def v3_request(self, path, **kwargs):
# check to see if caller requires token for the API call.
if kwargs.pop('noauth', None):
return self.v3_noauth_request(path, **kwargs)
# Check if the caller has passed in auth details for
# use in requesting the token
auth_arg = kwargs.pop('auth', None)
if auth_arg:
token = self.get_requested_token(auth_arg)
else:
token = kwargs.pop('token', None)
if not token:
token = self.get_scoped_token()
path = '/v3' + path
return self.admin_request(path=path, token=token, **kwargs)
def get(self, path, **kwargs):
r = self.v3_request(method='GET', path=path, **kwargs)
if 'expected_status' not in kwargs:
self.assertResponseStatus(r, 200)
return r
def head(self, path, **kwargs):
r = self.v3_request(method='HEAD', path=path, **kwargs)
if 'expected_status' not in kwargs:
self.assertResponseStatus(r, 204)
self.assertEqual('', r.body)
return r
def post(self, path, **kwargs):
r = self.v3_request(method='POST', path=path, **kwargs)
if 'expected_status' not in kwargs:
self.assertResponseStatus(r, 201)
return r
def put(self, path, **kwargs):
r = self.v3_request(method='PUT', path=path, **kwargs)
if 'expected_status' not in kwargs:
self.assertResponseStatus(r, 204)
return r
def patch(self, path, **kwargs):
r = self.v3_request(method='PATCH', path=path, **kwargs)
if 'expected_status' not in kwargs:
self.assertResponseStatus(r, 200)
return r
def delete(self, path, **kwargs):
r = self.v3_request(method='DELETE', path=path, **kwargs)
if 'expected_status' not in kwargs:
self.assertResponseStatus(r, 204)
return r
def assertValidErrorResponse(self, r):
resp = r.result
self.assertIsNotNone(resp.get('error'))
self.assertIsNotNone(resp['error'].get('code'))
self.assertIsNotNone(resp['error'].get('title'))
self.assertIsNotNone(resp['error'].get('message'))
self.assertEqual(int(resp['error']['code']), r.status_code)
def assertValidListLinks(self, links, resource_url=None):
self.assertIsNotNone(links)
self.assertIsNotNone(links.get('self'))
self.assertThat(links['self'], matchers.StartsWith('http://localhost'))
if resource_url:
self.assertThat(links['self'], matchers.EndsWith(resource_url))
self.assertIn('next', links)
if links['next'] is not None:
self.assertThat(links['next'],
matchers.StartsWith('http://localhost'))
self.assertIn('previous', links)
if links['previous'] is not None:
self.assertThat(links['previous'],
matchers.StartsWith('http://localhost'))
def assertValidListResponse(self, resp, key, entity_validator, ref=None,
expected_length=None, keys_to_check=None,
resource_url=None):
"""Make assertions common to all API list responses.
If a reference is provided, it's ID will be searched for in the
response, and asserted to be equal.
"""
entities = resp.result.get(key)
self.assertIsNotNone(entities)
if expected_length is not None:
self.assertEqual(expected_length, len(entities))
elif ref is not None:
# we're at least expecting the ref
self.assertNotEmpty(entities)
# collections should have relational links
self.assertValidListLinks(resp.result.get('links'),
resource_url=resource_url)
for entity in entities:
self.assertIsNotNone(entity)
self.assertValidEntity(entity, keys_to_check=keys_to_check)
entity_validator(entity)
if ref:
entity = [x for x in entities if x['id'] == ref['id']][0]
self.assertValidEntity(entity, ref=ref,
keys_to_check=keys_to_check)
entity_validator(entity, ref)
return entities
def assertValidResponse(self, resp, key, entity_validator, *args,
**kwargs):
"""Make assertions common to all API responses."""
entity = resp.result.get(key)
self.assertIsNotNone(entity)
keys = kwargs.pop('keys_to_check', None)
self.assertValidEntity(entity, keys_to_check=keys, *args, **kwargs)
entity_validator(entity, *args, **kwargs)
return entity
def assertValidEntity(self, entity, ref=None, keys_to_check=None):
"""Make assertions common to all API entities.
If a reference is provided, the entity will also be compared against
the reference.
"""
if keys_to_check is not None:
keys = keys_to_check
else:
keys = ['name', 'description', 'enabled']
for k in ['id'] + keys:
msg = '%s unexpectedly None in %s' % (k, entity)
self.assertIsNotNone(entity.get(k), msg)
self.assertIsNotNone(entity.get('links'))
self.assertIsNotNone(entity['links'].get('self'))
self.assertThat(entity['links']['self'],
matchers.StartsWith('http://localhost'))
self.assertIn(entity['id'], entity['links']['self'])
if ref:
for k in keys:
msg = '%s not equal: %s != %s' % (k, ref[k], entity[k])
self.assertEqual(ref[k], entity[k])
return entity
# auth validation
def assertValidISO8601ExtendedFormatDatetime(self, dt):
try:
return timeutils.parse_strtime(dt, fmt=TIME_FORMAT)
except Exception:
msg = '%s is not a valid ISO 8601 extended format date time.' % dt
raise AssertionError(msg)
self.assertIsInstance(dt, datetime.datetime)
def assertValidTokenResponse(self, r, user=None):
self.assertTrue(r.headers.get('X-Subject-Token'))
token = r.result['token']
self.assertIsNotNone(token.get('expires_at'))
expires_at = self.assertValidISO8601ExtendedFormatDatetime(
token['expires_at'])
self.assertIsNotNone(token.get('issued_at'))
issued_at = self.assertValidISO8601ExtendedFormatDatetime(
token['issued_at'])
self.assertTrue(issued_at < expires_at)
self.assertIn('user', token)
self.assertIn('id', token['user'])
self.assertIn('name', token['user'])
self.assertIn('domain', token['user'])
self.assertIn('id', token['user']['domain'])
if user is not None:
self.assertEqual(user['id'], token['user']['id'])
self.assertEqual(user['name'], token['user']['name'])
self.assertEqual(user['domain_id'], token['user']['domain']['id'])
return token
def assertValidUnscopedTokenResponse(self, r, *args, **kwargs):
token = self.assertValidTokenResponse(r, *args, **kwargs)
self.assertNotIn('roles', token)
self.assertNotIn('catalog', token)
self.assertNotIn('project', token)
self.assertNotIn('domain', token)
return token
def assertValidScopedTokenResponse(self, r, *args, **kwargs):
require_catalog = kwargs.pop('require_catalog', True)
endpoint_filter = kwargs.pop('endpoint_filter', False)
ep_filter_assoc = kwargs.pop('ep_filter_assoc', 0)
token = self.assertValidTokenResponse(r, *args, **kwargs)
if require_catalog:
self.assertIn('catalog', token)
if isinstance(token['catalog'], list):
# only test JSON
for service in token['catalog']:
for endpoint in service['endpoints']:
self.assertNotIn('enabled', endpoint)
self.assertNotIn('legacy_endpoint_id', endpoint)
self.assertNotIn('service_id', endpoint)
# sub test for the OS-EP-FILTER extension enabled
if endpoint_filter:
self.assertThat(token['catalog'],
matchers.HasLength(ep_filter_assoc))
else:
self.assertNotIn('catalog', token)
self.assertIn('roles', token)
self.assertTrue(token['roles'])
for role in token['roles']:
self.assertIn('id', role)
self.assertIn('name', role)
return token
def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs):
token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
self.assertIn('project', token)
self.assertIn('id', token['project'])
self.assertIn('name', token['project'])
self.assertIn('domain', token['project'])
self.assertIn('id', token['project']['domain'])
self.assertIn('name', token['project']['domain'])
self.assertEqual(self.role_id, token['roles'][0]['id'])
return token
def assertValidProjectTrustScopedTokenResponse(self, r, *args, **kwargs):
token = self.assertValidProjectScopedTokenResponse(r, *args, **kwargs)
trust = token.get('OS-TRUST:trust')
self.assertIsNotNone(trust)
self.assertIsNotNone(trust.get('id'))
self.assertIsInstance(trust.get('impersonation'), bool)
self.assertIsNotNone(trust.get('trustor_user'))
self.assertIsNotNone(trust.get('trustee_user'))
self.assertIsNotNone(trust['trustor_user'].get('id'))
self.assertIsNotNone(trust['trustee_user'].get('id'))
def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs):
token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
self.assertIn('domain', token)
self.assertIn('id', token['domain'])
self.assertIn('name', token['domain'])
return token
def assertEqualTokens(self, a, b):
"""Assert that two tokens are equal.
Compare two tokens except for their ids. This also truncates
the time in the comparison.
"""
def normalize(token):
del token['token']['expires_at']
del token['token']['issued_at']
return token
a_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
a['token']['expires_at'])
b_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
b['token']['expires_at'])
self.assertCloseEnoughForGovernmentWork(a_expires_at, b_expires_at)
a_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
a['token']['issued_at'])
b_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
b['token']['issued_at'])
self.assertCloseEnoughForGovernmentWork(a_issued_at, b_issued_at)
return self.assertDictEqual(normalize(a), normalize(b))
# catalog validation
def assertValidCatalogResponse(self, resp, *args, **kwargs):
self.assertEqual(['catalog', 'links'], resp.json.keys())
self.assertValidCatalog(resp.json['catalog'])
self.assertIn('links', resp.json)
self.assertIsInstance(resp.json['links'], dict)
self.assertEqual(['self'], resp.json['links'].keys())
self.assertEqual(
'http://localhost/v3/auth/catalog',
resp.json['links']['self'])
def assertValidCatalog(self, entity):
self.assertIsInstance(entity, list)
self.assertTrue(len(entity) > 0)
for service in entity:
self.assertIsNotNone(service.get('id'))
self.assertIsNotNone(service.get('name'))
self.assertIsNotNone(service.get('type'))
self.assertNotIn('enabled', service)
self.assertTrue(len(service['endpoints']) > 0)
for endpoint in service['endpoints']:
self.assertIsNotNone(endpoint.get('id'))
self.assertIsNotNone(endpoint.get('interface'))
self.assertIsNotNone(endpoint.get('url'))
self.assertNotIn('enabled', endpoint)
self.assertNotIn('legacy_endpoint_id', endpoint)
self.assertNotIn('service_id', endpoint)
# region validation
def assertValidRegionListResponse(self, resp, *args, **kwargs):
# NOTE(jaypipes): I have to pass in a blank keys_to_check parameter
# below otherwise the base assertValidEntity method
# tries to find a "name" and an "enabled" key in the
# returned ref dicts. The issue is, I don't understand
# how the service and endpoint entity assertions below
# actually work (they don't raise assertions), since
# AFAICT, the service and endpoint tables don't have
# a "name" column either... :(
return self.assertValidListResponse(
resp,
'regions',
self.assertValidRegion,
keys_to_check=[],
*args,
**kwargs)
def assertValidRegionResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'region',
self.assertValidRegion,
keys_to_check=[],
*args,
**kwargs)
def assertValidRegion(self, entity, ref=None):
self.assertIsNotNone(entity.get('description'))
if ref:
self.assertEqual(ref['description'], entity['description'])
return entity
# service validation
def assertValidServiceListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'services',
self.assertValidService,
*args,
**kwargs)
def assertValidServiceResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'service',
self.assertValidService,
*args,
**kwargs)
def assertValidService(self, entity, ref=None):
self.assertIsNotNone(entity.get('type'))
self.assertIsInstance(entity.get('enabled'), bool)
if ref:
self.assertEqual(ref['type'], entity['type'])
return entity
# endpoint validation
def assertValidEndpointListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'endpoints',
self.assertValidEndpoint,
*args,
**kwargs)
def assertValidEndpointResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'endpoint',
self.assertValidEndpoint,
*args,
**kwargs)
def assertValidEndpoint(self, entity, ref=None):
self.assertIsNotNone(entity.get('interface'))
self.assertIsNotNone(entity.get('service_id'))
self.assertIsInstance(entity['enabled'], bool)
# this is intended to be an unexposed implementation detail
self.assertNotIn('legacy_endpoint_id', entity)
if ref:
self.assertEqual(ref['interface'], entity['interface'])
self.assertEqual(ref['service_id'], entity['service_id'])
if ref.get('region') is not None:
self.assertEqual(ref['region_id'], entity.get('region_id'))
return entity
# domain validation
def assertValidDomainListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'domains',
self.assertValidDomain,
*args,
**kwargs)
def assertValidDomainResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'domain',
self.assertValidDomain,
*args,
**kwargs)
def assertValidDomain(self, entity, ref=None):
if ref:
pass
return entity
# project validation
def assertValidProjectListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'projects',
self.assertValidProject,
*args,
**kwargs)
def assertValidProjectResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'project',
self.assertValidProject,
*args,
**kwargs)
def assertValidProject(self, entity, ref=None):
self.assertIsNotNone(entity.get('domain_id'))
if ref:
self.assertEqual(ref['domain_id'], entity['domain_id'])
return entity
# user validation
def assertValidUserListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'users',
self.assertValidUser,
*args,
**kwargs)
def assertValidUserResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'user',
self.assertValidUser,
*args,
**kwargs)
def assertValidUser(self, entity, ref=None):
self.assertIsNotNone(entity.get('domain_id'))
self.assertIsNotNone(entity.get('email'))
self.assertIsNone(entity.get('password'))
self.assertNotIn('tenantId', entity)
if ref:
self.assertEqual(ref['domain_id'], entity['domain_id'])
self.assertEqual(ref['email'], entity['email'])
if 'default_project_id' in ref:
self.assertIsNotNone(ref['default_project_id'])
self.assertEqual(ref['default_project_id'],
entity['default_project_id'])
return entity
# group validation
def assertValidGroupListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'groups',
self.assertValidGroup,
*args,
**kwargs)
def assertValidGroupResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'group',
self.assertValidGroup,
*args,
**kwargs)
def assertValidGroup(self, entity, ref=None):
self.assertIsNotNone(entity.get('name'))
if ref:
self.assertEqual(ref['name'], entity['name'])
return entity
# credential validation
def assertValidCredentialListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'credentials',
self.assertValidCredential,
keys_to_check=['blob', 'user_id', 'type'],
*args,
**kwargs)
def assertValidCredentialResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'credential',
self.assertValidCredential,
keys_to_check=['blob', 'user_id', 'type'],
*args,
**kwargs)
def assertValidCredential(self, entity, ref=None):
self.assertIsNotNone(entity.get('user_id'))
self.assertIsNotNone(entity.get('blob'))
self.assertIsNotNone(entity.get('type'))
if ref:
self.assertEqual(ref['user_id'], entity['user_id'])
self.assertEqual(ref['blob'], entity['blob'])
self.assertEqual(ref['type'], entity['type'])
self.assertEqual(ref.get('project_id'), entity.get('project_id'))
return entity
# role validation
def assertValidRoleListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'roles',
self.assertValidRole,
keys_to_check=['name'],
*args,
**kwargs)
def assertValidRoleResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'role',
self.assertValidRole,
keys_to_check=['name'],
*args,
**kwargs)
def assertValidRole(self, entity, ref=None):
self.assertIsNotNone(entity.get('name'))
if ref:
self.assertEqual(ref['name'], entity['name'])
return entity
def assertValidRoleAssignmentListResponse(self, resp, expected_length=None,
resource_url=None):
entities = resp.result.get('role_assignments')
if expected_length is not None:
self.assertEqual(expected_length, len(entities))
# collections should have relational links
self.assertValidListLinks(resp.result.get('links'),
resource_url=resource_url)
for entity in entities:
self.assertIsNotNone(entity)
self.assertValidRoleAssignment(entity)
return entities
def assertValidRoleAssignment(self, entity, ref=None, url=None):
self.assertIsNotNone(entity.get('role'))
self.assertIsNotNone(entity.get('scope'))
# Only one of user or group should be present
self.assertIsNotNone(entity.get('user') or
entity.get('group'))
self.assertIsNone(entity.get('user') and
entity.get('group'))
# Only one of domain or project should be present
self.assertIsNotNone(entity['scope'].get('project') or
entity['scope'].get('domain'))
self.assertIsNone(entity['scope'].get('project') and
entity['scope'].get('domain'))
if entity['scope'].get('project'):
self.assertIsNotNone(entity['scope']['project'].get('id'))
else:
self.assertIsNotNone(entity['scope']['domain'].get('id'))
self.assertIsNotNone(entity.get('links'))
self.assertIsNotNone(entity['links'].get('assignment'))
if ref:
if ref.get('user'):
self.assertEqual(ref['user']['id'], entity['user']['id'])
if ref.get('group'):
self.assertEqual(ref['group']['id'], entity['group']['id'])
if ref.get('role'):
self.assertEqual(ref['role']['id'], entity['role']['id'])
if ref['scope'].get('project'):
self.assertEqual(ref['scope']['project']['id'],
entity['scope']['project']['id'])
if ref['scope'].get('domain'):
self.assertEqual(ref['scope']['domain']['id'],
entity['scope']['domain']['id'])
if ref['scope'].get('OS-INHERIT:inherited_to'):
self.assertEqual(ref['scope']['OS-INHERIT:inherited_to'],
entity['scope']['OS-INHERIT:inherited_to'])
if url:
self.assertIn(url, entity['links']['assignment'])
def assertRoleAssignmentInListResponse(
self, resp, ref, link_url=None, expected=1):
found_count = 0
for entity in resp.result.get('role_assignments'):
try:
self.assertValidRoleAssignment(
entity, ref=ref, url=link_url)
except Exception:
# It doesn't match, so let's go onto the next one
pass
else:
found_count += 1
self.assertEqual(expected, found_count)
def assertRoleAssignmentNotInListResponse(
self, resp, ref, link_url=None):
self.assertRoleAssignmentInListResponse(
resp, ref=ref, link_url=link_url, expected=0)
# policy validation
def assertValidPolicyListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'policies',
self.assertValidPolicy,
*args,
**kwargs)
def assertValidPolicyResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'policy',
self.assertValidPolicy,
*args,
**kwargs)
def assertValidPolicy(self, entity, ref=None):
self.assertIsNotNone(entity.get('blob'))
self.assertIsNotNone(entity.get('type'))
if ref:
self.assertEqual(ref['blob'], entity['blob'])
self.assertEqual(ref['type'], entity['type'])
return entity
# trust validation
def assertValidTrustListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse(
resp,
'trusts',
self.assertValidTrustSummary,
keys_to_check=['trustor_user_id',
'trustee_user_id',
'impersonation'],
*args,
**kwargs)
def assertValidTrustResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
'trust',
self.assertValidTrust,
keys_to_check=['trustor_user_id',
'trustee_user_id',
'impersonation'],
*args,
**kwargs)
def assertValidTrustSummary(self, entity, ref=None):
return self.assertValidTrust(entity, ref, summary=True)
def assertValidTrust(self, entity, ref=None, summary=False):
self.assertIsNotNone(entity.get('trustor_user_id'))
self.assertIsNotNone(entity.get('trustee_user_id'))
self.assertIsNotNone(entity.get('impersonation'))
self.assertIn('expires_at', entity)
if entity['expires_at'] is not None:
self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at'])
if summary:
# Trust list contains no roles, but getting a specific
# trust by ID provides the detailed response containing roles
self.assertNotIn('roles', entity)
self.assertIn('project_id', entity)
else:
for role in entity['roles']:
self.assertIsNotNone(role)
self.assertValidEntity(role, keys_to_check=['name'])
self.assertValidRole(role)
self.assertValidListLinks(entity.get('roles_links'))
# always disallow role xor project_id (neither or both is allowed)
has_roles = bool(entity.get('roles'))
has_project = bool(entity.get('project_id'))
self.assertFalse(has_roles ^ has_project)
if ref:
self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id'])
self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id'])
self.assertEqual(ref['project_id'], entity['project_id'])
if entity.get('expires_at') or ref.get('expires_at'):
entity_exp = self.assertValidISO8601ExtendedFormatDatetime(
entity['expires_at'])
ref_exp = self.assertValidISO8601ExtendedFormatDatetime(
ref['expires_at'])
self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp)
else:
self.assertEqual(ref.get('expires_at'),
entity.get('expires_at'))
return entity
def build_external_auth_request(self, remote_user,
remote_domain=None, auth_data=None,
kerberos=False):
context = {'environment': {'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'}}
if remote_domain:
context['environment']['REMOTE_DOMAIN'] = remote_domain
if not auth_data:
auth_data = self.build_authentication_request(
kerberos=kerberos)['auth']
no_context = None
auth_info = auth.controllers.AuthInfo.create(no_context, auth_data)
auth_context = {'extras': {}, 'method_names': []}
return context, auth_info, auth_context
class VersionTestCase(RestfulTestCase):
def test_get_version(self):
pass
# NOTE(gyee): test AuthContextMiddleware here instead of test_middleware.py
# because we need the token
class AuthContextMiddlewareTestCase(RestfulTestCase):
def _mock_request_object(self, token_id):
class fake_req(object):
headers = {middleware.AUTH_TOKEN_HEADER: token_id}
environ = {}
return fake_req()
def test_auth_context_build_by_middleware(self):
# test to make sure AuthContextMiddleware successful build the auth
# context from the incoming auth token
admin_token = self.get_scoped_token()
req = self._mock_request_object(admin_token)
application = None
middleware.AuthContextMiddleware(application).process_request(req)
self.assertEqual(
self.user['id'],
req.environ.get(authorization.AUTH_CONTEXT_ENV)['user_id'])
def test_auth_context_override(self):
overridden_context = 'OVERRIDDEN_CONTEXT'
# this token should not be used
token = uuid.uuid4().hex
req = self._mock_request_object(token)
req.environ[authorization.AUTH_CONTEXT_ENV] = overridden_context
application = None
middleware.AuthContextMiddleware(application).process_request(req)
# make sure overridden context take precedence
self.assertEqual(overridden_context,
req.environ.get(authorization.AUTH_CONTEXT_ENV))
def test_admin_token_auth_context(self):
# test to make sure AuthContextMiddleware does not attempt to build
# auth context if the incoming auth token is the special admin token
req = self._mock_request_object(CONF.admin_token)
application = None
middleware.AuthContextMiddleware(application).process_request(req)
self.assertDictEqual(req.environ.get(authorization.AUTH_CONTEXT_ENV),
{})
class JsonHomeTestMixin(object):
"""JSON Home test
Mixin this class to provide a test for the JSON-Home response for an
extension.
The base class must set JSON_HOME_DATA to a dict of relationship URLs
(rels) to the JSON-Home data for the relationship. The rels and associated
data must be in the response.
"""
def test_get_json_home(self):
resp = self.get('/', convert=False,
headers={'Accept': 'application/json-home'})
self.assertThat(resp.headers['Content-Type'],
matchers.Equals('application/json-home'))
resp_data = jsonutils.loads(resp.body)
# Check that the example relationships are present.
for rel in self.JSON_HOME_DATA:
self.assertThat(resp_data['resources'][rel],
matchers.Equals(self.JSON_HOME_DATA[rel]))