Improve tests for api protection and filtering

Fills out some more tests for both these areas.  To make it easier for
any v3 tests to do their own authentication, moved the auth builder
utility functions from test_v3_auth into test_v3.RestfulTestCase.

Also fixed an issue that meant the api filtering on a boolean
would not work.

Fixes Bug #1132080
Fixes Bug #1132372

Change-Id: I7bb7b5ba61adfc6a9c496a5547a0ca3fcfbab209
This commit is contained in:
Henry Nash 2013-02-24 08:34:07 +00:00
parent 557cb9411a
commit b4adb6a56f
4 changed files with 393 additions and 190 deletions

View File

@ -243,9 +243,28 @@ class V3Controller(V2Controller):
@classmethod
def filter_by_attribute(cls, context, refs, attr):
"""Filters a list of references by query string value."""
def _attr_match(ref_attr, val_attr):
"""Matches attributes allowing for booleans as strings.
We test explicitly for a value that defines it as 'False',
which also means that the existence of the attribute with
no value implies 'True'
"""
if type(ref_attr) is bool:
if (isinstance(val_attr, basestring) and
val_attr == '0'):
val = False
else:
val = True
return (ref_attr == val)
else:
return (ref_attr == val_attr)
if attr in context['query_string']:
value = context['query_string'][attr]
return [r for r in refs if r[attr] == value]
return [r for r in refs if _attr_match(r[attr], value)]
return refs
def _require_matching_id(self, value, ref):

View File

@ -20,9 +20,14 @@ TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
class RestfulTestCase(test_content_types.RestfulTestCase):
def setUp(self):
rules.reset()
def setUp(self, load_sample_data=True):
"""Setup for v3 Restful Test Cases.
If a child class wants to create their own sample data
and provide their own auth data to obtain tokens, then
load_sample_data should be set to false.
"""
self.config([
test.etcdir('keystone.conf.sample'),
test.testsdir('test_overrides.conf'),
@ -32,32 +37,33 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
sql_util.setup_test_database()
self.load_backends()
self.domain_id = uuid.uuid4().hex
self.domain = self.new_domain_ref()
self.domain['id'] = self.domain_id
self.identity_api.create_domain(self.domain_id, self.domain)
if load_sample_data:
self.domain_id = uuid.uuid4().hex
self.domain = self.new_domain_ref()
self.domain['id'] = self.domain_id
self.identity_api.create_domain(self.domain_id, self.domain)
self.project_id = uuid.uuid4().hex
self.project = self.new_project_ref(
domain_id=self.domain_id)
self.project['id'] = self.project_id
self.identity_api.create_project(self.project_id, self.project)
self.project_id = uuid.uuid4().hex
self.project = self.new_project_ref(
domain_id=self.domain_id)
self.project['id'] = self.project_id
self.identity_api.create_project(self.project_id, self.project)
self.user_id = uuid.uuid4().hex
self.user = self.new_user_ref(
domain_id=self.domain_id,
project_id=self.project_id)
self.user['id'] = self.user_id
self.identity_api.create_user(self.user_id, self.user)
self.user_id = uuid.uuid4().hex
self.user = self.new_user_ref(
domain_id=self.domain_id,
project_id=self.project_id)
self.user['id'] = self.user_id
self.identity_api.create_user(self.user_id, self.user)
# create & grant policy.json's default role for admin_required
self.role_id = uuid.uuid4().hex
self.role = self.new_role_ref()
self.role['id'] = self.role_id
self.role['name'] = 'admin'
self.identity_api.create_role(self.role_id, self.role)
self.identity_api.add_role_to_user_and_project(
self.user_id, self.project_id, self.role_id)
# create & grant policy.json's default role for admin_required
self.role_id = uuid.uuid4().hex
self.role = self.new_role_ref()
self.role['id'] = self.role_id
self.role['name'] = 'admin'
self.identity_api.create_role(self.role_id, self.role)
self.identity_api.add_role_to_user_and_project(
self.user_id, self.project_id, self.role_id)
self.public_server = self.serveapp('keystone', name='main')
self.admin_server = self.serveapp('keystone', name='admin')
@ -752,6 +758,76 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
return entity
def build_auth_scope(self, project_id=None, project_name=None,
project_domain_id=None, project_domain_name=None,
domain_id=None, domain_name=None, trust_id=None):
scope_data = {}
if project_id or project_name:
scope_data['project'] = {}
if project_id:
scope_data['project']['id'] = project_id
else:
scope_data['project']['name'] = project_name
if project_domain_id or project_domain_name:
project_domain_json = {}
if project_domain_id:
project_domain_json['id'] = project_domain_id
else:
project_domain_json['name'] = project_domain_name
scope_data['project']['domain'] = project_domain_json
if domain_id or domain_name:
scope_data['domain'] = {}
if domain_id:
scope_data['domain']['id'] = domain_id
else:
scope_data['domain']['name'] = domain_name
if trust_id:
scope_data['trust'] = {}
scope_data['trust']['id'] = trust_id
return scope_data
def build_password_auth(self, user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
password=None):
password_data = {'user': {}}
if user_id:
password_data['user']['id'] = user_id
else:
password_data['user']['name'] = username
if user_domain_id or user_domain_name:
password_data['user']['domain'] = {}
if user_domain_id:
password_data['user']['domain']['id'] = user_domain_id
else:
password_data['user']['domain']['name'] = user_domain_name
password_data['user']['password'] = password
return password_data
def build_token_auth(self, token):
return {'id': token}
def build_authentication_request(self, token=None, user_id=None,
username=None, user_domain_id=None,
user_domain_name=None, password=None,
**kwargs):
"""Build auth dictionary.
It will create an auth dictionary based on all the arguments
that it receives.
"""
auth_data = {}
auth_data['identity'] = {'methods': []}
if token:
auth_data['identity']['methods'].append('token')
auth_data['identity']['token'] = self.build_token_auth(token)
if user_id or username:
auth_data['identity']['methods'].append('password')
auth_data['identity']['password'] = self.build_password_auth(
user_id, username, user_domain_id, user_domain_name, password)
if kwargs:
auth_data['scope'] = self.build_auth_scope(**kwargs)
return {'auth': auth_data}
class VersionTestCase(RestfulTestCase):
def test_get_version(self):

View File

@ -29,80 +29,15 @@ import test_v3
CONF = config.CONF
def _build_auth_scope(project_id=None, project_name=None,
project_domain_id=None, project_domain_name=None,
domain_id=None, domain_name=None, trust_id=None):
scope_data = {}
if project_id or project_name:
scope_data['project'] = {}
if project_id:
scope_data['project']['id'] = project_id
else:
scope_data['project']['name'] = project_name
if project_domain_id or project_domain_name:
project_domain_json = {}
if project_domain_id:
project_domain_json['id'] = project_domain_id
else:
project_domain_json['name'] = project_domain_name
scope_data['project']['domain'] = project_domain_json
if domain_id or domain_name:
scope_data['domain'] = {}
if domain_id:
scope_data['domain']['id'] = domain_id
else:
scope_data['domain']['name'] = domain_name
if trust_id:
scope_data['trust'] = {}
scope_data['trust']['id'] = trust_id
return scope_data
class TestAuthInfo(test_v3.RestfulTestCase):
# TDOD(henry-nash) These tests are somewhat inefficient, since by
# using the test_v3.RestfulTestCase class to gain access to the auth
# building helper functions, they cause backend databases and fixtures
# to be loaded unnecessarily. Separating out the helper functions from
# this base class would improve efficiency (Bug #1134836)
def setUp(self):
super(TestAuthInfo, self).setUp(load_sample_data=False)
def _build_password_auth(user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
password=None):
password_data = {'user': {}}
if user_id:
password_data['user']['id'] = user_id
else:
password_data['user']['name'] = username
if user_domain_id or user_domain_name:
password_data['user']['domain'] = {}
if user_domain_id:
password_data['user']['domain']['id'] = user_domain_id
else:
password_data['user']['domain']['name'] = user_domain_name
password_data['user']['password'] = password
return password_data
def _build_token_auth(token):
return {'id': token}
def _build_authentication_request(token=None, user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
password=None, **kwargs):
"""Build auth dictionary.
It will create an auth dictionary based on all the arguments
that it receives.
"""
auth_data = {}
auth_data['identity'] = {'methods': []}
if token:
auth_data['identity']['methods'].append('token')
auth_data['identity']['token'] = _build_token_auth(token)
if user_id or username:
auth_data['identity']['methods'].append('password')
auth_data['identity']['password'] = _build_password_auth(
user_id, username, user_domain_id, user_domain_name, password)
if kwargs:
auth_data['scope'] = _build_auth_scope(**kwargs)
return {'auth': auth_data}
class TestAuthInfo(test.TestCase):
def test_missing_auth_methods(self):
auth_data = {'identity': {}}
auth_data['identity']['token'] = {'id': uuid.uuid4().hex}
@ -129,19 +64,21 @@ class TestAuthInfo(test.TestCase):
auth_data)
def test_project_name_no_domain(self):
auth_data = _build_authentication_request(username='test',
password='test',
project_name='abc')['auth']
auth_data = self.build_authentication_request(
username='test',
password='test',
project_name='abc')['auth']
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo,
None,
auth_data)
def test_both_project_and_domain_in_scope(self):
auth_data = _build_authentication_request(user_id='test',
password='test',
project_name='test',
domain_name='test')['auth']
auth_data = self.build_authentication_request(
user_id='test',
password='test',
project_name='test',
domain_name='test')['auth']
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo,
None,
@ -151,7 +88,7 @@ class TestAuthInfo(test.TestCase):
class TestTokenAPIs(test_v3.RestfulTestCase):
def setUp(self):
super(TestTokenAPIs, self).setUp()
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
password=self.user['password'])
@ -165,7 +102,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
def test_v3_pki_token_id(self):
self.opt_in_group('signing', token_format='PKI')
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
@ -181,7 +118,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
# FIXME(gyee): PKI tokens are not interchangeable because token
# data is baked into the token itself.
self.opt_in_group('signing', token_format='UUID')
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
@ -208,7 +145,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
# FIXME(gyee): PKI tokens are not interchangeable because token
# data is baked into the token itself.
self.opt_in_group('signing', token_format='PKI')
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
@ -287,7 +224,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
def test_rescoping_token(self):
expires = self.token_data['token']['expires_at']
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
token=self.token,
project_id=self.project_id)
r = self.post('/auth/tokens', body=auth_data)
@ -316,14 +253,14 @@ class TestAuthJSON(test_v3.RestfulTestCase):
content_type = 'json'
def test_unscoped_token_with_user_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_unscoped_token_with_user_domain_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'])
@ -331,7 +268,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertValidUnscopedTokenResponse(r)
def test_unscoped_token_with_user_domain_name(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'])
@ -339,7 +276,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertValidUnscopedTokenResponse(r)
def test_project_id_scoped_token_with_user_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
@ -351,14 +288,14 @@ class TestAuthJSON(test_v3.RestfulTestCase):
project = self.new_project_ref(domain_id=self.domain_id)
self.identity_api.create_project(project_id, project)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_project_id_scoped_token_with_user_domain_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
@ -367,7 +304,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertValidProjectScopedTokenResponse(r)
def test_project_id_scoped_token_with_user_domain_name(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
@ -380,7 +317,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
@ -392,7 +329,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
@ -405,7 +342,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
@ -418,7 +355,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
@ -430,7 +367,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
@ -443,7 +380,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
@ -467,7 +404,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.put(path=path)
# now get a domain-scoped token
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
@ -480,7 +417,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
# now get a domain-scoped token
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
@ -488,14 +425,14 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertValidDomainScopedTokenResponse(r)
def test_domain_scope_failed(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_auth_with_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
r = self.post('/auth/tokens', body=auth_data)
@ -505,45 +442,45 @@ class TestAuthJSON(test_v3.RestfulTestCase):
headers = {'X-Subject-Token': r.getheader('X-Subject-Token')}
# test token auth
auth_data = _build_authentication_request(token=token)
auth_data = self.build_authentication_request(token=token)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_invalid_user_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=uuid.uuid4().hex,
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_user_name(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=uuid.uuid4().hex,
user_domain_id=self.domain['id'],
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_domain_id(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=uuid.uuid4().hex,
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_domain_name(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=uuid.uuid4().hex,
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_password(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=uuid.uuid4().hex)
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_remote_user(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])['auth']
api = auth.controllers.Auth()
@ -554,7 +491,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertEqual(auth_context['user_id'], self.user['id'])
def test_remote_user_no_domain(self):
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
username=self.user['name'],
password=self.user['password'])['auth']
api = auth.controllers.Auth()
@ -701,7 +638,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
'trust_id': trust['id']},
expected_status=404)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
@ -720,7 +657,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
r = self.post('/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
@ -751,7 +688,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
r = self.post('/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
@ -793,7 +730,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
'trust_id': trust['id']},
expected_status=404)
auth_data = _build_authentication_request(
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])

View File

@ -22,54 +22,87 @@ import uuid
import nose.exc
from keystone import config
from keystone import exception
from keystone.openstack.common import jsonutils
from keystone.policy.backends import rules
import test_v3
CONF = config.CONF
DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
class IdentityTestProtectedCase(test_v3.RestfulTestCase):
"""Test policy protection of a sample of v3 identity apis"""
def setUp(self):
super(IdentityTestProtectedCase, self).setUp()
"""Setup for Identity Protection Test Cases.
As well as the usual housekeeping, create a set of domains,
users, roles and projects for the subsequent tests:
- Three domains: A,B & C. C is disabled.
- DomainA has user1, DomainB has user2 and user3
- DomainA has group1 and group2, DomainB has group3
- User1 has a role on DomainA
Remember that there will also be a fourth domain in existence,
the default domain.
"""
# Ensure that test_v3.RestfulTestCase doesn't load its own
# sample data, which would make checking the results of our
# tests harder
super(IdentityTestProtectedCase, self).setUp(load_sample_data=False)
# Start by creating a couple of domains
self.domainA = self.new_domain_ref()
domainA_ref = self.identity_api.create_domain(self.domainA['id'],
self.domainA)
self.domainB = self.new_domain_ref()
domainB_ref = self.identity_api.create_domain(self.domainB['id'],
self.domainB)
self.domainC = self.new_domain_ref()
self.domainC['enabled'] = False
domainC_ref = self.identity_api.create_domain(self.domainC['id'],
self.domainC)
# Now create some users, one in domainA and two of them in domainB
self.user1 = self.new_user_ref(
domain_id=self.domainA['id'])
self.user1['password'] = uuid.uuid4().hex
user_ref = self.identity_api.create_user(self.user1['id'], self.user1)
user_ref = self.identity_api.create_user(self.user1['id'],
self.user1)
self.user2 = self.new_user_ref(
domain_id=self.domainB['id'])
self.user2['password'] = uuid.uuid4().hex
user_ref = self.identity_api.create_user(self.user2['id'], self.user2)
user_ref = self.identity_api.create_user(self.user2['id'],
self.user2)
self.user3 = self.new_user_ref(
domain_id=self.domainB['id'])
self.user3['password'] = uuid.uuid4().hex
user_ref = self.identity_api.create_user(self.user3['id'], self.user3)
user_ref = self.identity_api.create_user(self.user3['id'],
self.user3)
self.project = self.new_project_ref(
self.group1 = self.new_group_ref(
domain_id=self.domainA['id'])
project_ref = self.identity_api.create_project(self.project['id'],
self.project)
user_ref = self.identity_api.create_group(self.group1['id'],
self.group1)
self.group2 = self.new_group_ref(
domain_id=self.domainA['id'])
user_ref = self.identity_api.create_group(self.group2['id'],
self.group2)
self.group3 = self.new_group_ref(
domain_id=self.domainB['id'])
user_ref = self.identity_api.create_group(self.group3['id'],
self.group3)
self.role = self.new_role_ref()
self.identity_api.create_role(self.role['id'], self.role)
self.identity_api.add_role_to_user_and_project(self.user1['id'],
self.project['id'],
self.role['id'])
self.identity_api.create_grant(self.role['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
@ -82,15 +115,9 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.opt(policy_file=self.tmpfilename)
# A default auth request we can use - un-scoped user token
self.auth = {}
self.auth['identity'] = {'methods': []}
self.auth['identity']['methods'].append('password')
self.auth['identity']['password'] = {'user': {}}
self.auth['identity']['password']['user']['id'] = (
self.user1['id'])
self.auth['identity']['password']['user']['password'] = (
self.user1['password'])
self.auth = {'auth': self.auth}
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'])
def tearDown(self):
super(IdentityTestProtectedCase, self).tearDown()
@ -103,13 +130,20 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
result_list.append(x['id'])
return result_list
def test_list_users_unprotected(self):
"""GET /users (unprotected)"""
# Make sure we get all the users back if no protection
# or filtering
def _set_policy(self, new_policy):
with open(self.tmpfilename, "w") as policyfile:
policyfile.write("""{"identity:list_users": []}""")
policyfile.write(jsonutils.dumps(new_policy))
def test_list_users_unprotected(self):
"""GET /users (unprotected)
Test Plan:
- Update policy so api is unprotected
- Use an un-scoped token to make sure we can get back all
the users independent of domain
"""
self._set_policy({"identity:list_users": []})
r = self.get('/users', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('users'))
self.assertIn(self.user1['id'], id_list)
@ -117,11 +151,15 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.assertIn(self.user3['id'], id_list)
def test_list_users_filtered_by_domain(self):
"""GET /users?domain_id=mydomain """
"""GET /users?domain_id=mydomain (filtered)
# Using no protection, make sure filtering works
with open(self.tmpfilename, "w") as policyfile:
policyfile.write("""{"identity:list_users": []}""")
Test Plan:
- Update policy so api is unprotected
- Use an un-scoped token to make sure we can filter the
users by domainB, getting back the 2 users in that domain
"""
self._set_policy({"identity:list_users": []})
url_by_name = '/users?domain_id=%s' % self.domainB['id']
r = self.get(url_by_name, auth=self.auth)
# We should get back two users, those in DomainB
@ -129,28 +167,161 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
self.assertIn(self.user2['id'], id_list)
self.assertIn(self.user3['id'], id_list)
def test_list_users_protected_by_domain(self):
"""GET /users?domain_id=mydomain (protected)"""
# Update policy to protect by domain, and then use a domain
# scoped token
new_policy = """{"identity:list_users": ["domain_id:%(domain_id)s"]}"""
with open(self.tmpfilename, "w") as policyfile:
policyfile.write(new_policy)
self.auth['auth']['scope'] = {'domain': {'id': self.domainA['id']}}
url_by_name = '/users?domain_id=%s' % self.user1['domain_id']
r = self.get(url_by_name, auth=self.auth)
# We should only get back one user, the one in DomainA
id_list = self._get_id_list_from_ref_list(r.body.get('users'))
self.assertIn(self.user1['id'], id_list)
def test_get_user_protected_match_id(self):
"""GET /users/{id} (match payload)"""
# Tests the flattening of the payload
policy = {"identity:get_user": [["user_id:%(user_id)s"]]}
with open(self.tmpfilename, "w") as policyfile:
policyfile.write(json.dumps(policy))
"""GET /users/{id} (match payload)
Test Plan:
- Update policy to protect api by user_id
- List users with user_id of user1 as filter, to check that
this will correctly match user_id in the flattened
payload
"""
# TODO (henry-nash, ayoung): It would be good to expand this
# test for further test flattening, e.g. protect on, say, an
# attribute of an object being created
new_policy = {"identity:get_user": [["user_id:%(user_id)s"]]}
self._set_policy(new_policy)
url_by_name = '/users/%s' % self.user1['id']
r = self.get(url_by_name, auth=self.auth)
body = r.body
self.assertEquals(self.user1['id'], body['user']['id'])
def test_list_users_protected_by_domain(self):
"""GET /users?domain_id=mydomain (protected)
Test Plan:
- Update policy to protect api by domain_id
- List groups using a token scoped to domainA with a filter
specifying domainA - we should only get back the one user
that is in domainA.
- Try and read the users from domainB - this should fail since
we don't have a token scoped for domainB
"""
new_policy = {"identity:list_users": ["domain_id:%(domain_id)s"]}
self._set_policy(new_policy)
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
domain_id=self.domainA['id'])
url_by_name = '/users?domain_id=%s' % self.domainA['id']
r = self.get(url_by_name, auth=self.auth)
# We should only get back one user, the one in DomainA
id_list = self._get_id_list_from_ref_list(r.body.get('users'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.user1['id'], id_list)
# Now try for domainB, which should fail
url_by_name = '/users?domain_id=%s' % self.domainB['id']
r = self.get(url_by_name, auth=self.auth,
expected_status=exception.ForbiddenAction.code)
def test_list_groups_protected_by_domain(self):
"""GET /groups?domain_id=mydomain (protected)
Test Plan:
- Update policy to protect api by domain_id
- List groups using a token scoped to domainA and make sure
we only get back the two groups that are in domainA
- Try and read the groups from domainB - this should fail since
we don't have a token scoped for domainB
"""
new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]}
self._set_policy(new_policy)
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
domain_id=self.domainA['id'])
url_by_name = '/groups?domain_id=%s' % self.domainA['id']
r = self.get(url_by_name, auth=self.auth)
# We should only get back two groups, the ones in DomainA
id_list = self._get_id_list_from_ref_list(r.body.get('groups'))
self.assertEqual(len(id_list), 2)
self.assertIn(self.group1['id'], id_list)
self.assertIn(self.group2['id'], id_list)
# Now try for domainB, which should fail
url_by_name = '/groups?domain_id=%s' % self.domainB['id']
r = self.get(url_by_name, auth=self.auth,
expected_status=exception.ForbiddenAction.code)
def test_list_groups_protected_by_domain_and_filtered(self):
"""GET /groups?domain_id=mydomain&name=myname (protected)
Test Plan:
- Update policy to protect api by domain_id
- List groups using a token scoped to domainA with a filter
specifying both domainA and the name of group.
- We should only get back the group in domainA that matches
the name
"""
new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]}
self._set_policy(new_policy)
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
domain_id=self.domainA['id'])
url_by_name = '/groups?domain_id=%s&name=%s' % (
self.domainA['id'], self.group2['name'])
r = self.get(url_by_name, auth=self.auth)
# We should only get back one user, the one in DomainA that matches
# the name supplied
id_list = self._get_id_list_from_ref_list(r.body.get('groups'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.group2['id'], id_list)
def test_list_filtered_domains(self):
"""GET /domains?enabled=0
Test Plan:
- Update policy for no protection on api
- Filter by the 'enabled' boolean to get disabled domains, which
should return just domainC
- Try the filter using different ways of specifying 'true'
to test that our handling of booleans in filter matching is
correct
"""
new_policy = {"identity:list_domains": []}
self._set_policy(new_policy)
r = self.get('/domains?enabled=0', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.domainC['id'], id_list)
# Now try a few ways of specifying 'true' when we should get back
# the other two domains, plus the default domain
r = self.get('/domains?enabled=1', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
self.assertIn(DEFAULT_DOMAIN_ID, id_list)
r = self.get('/domains?enabled', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
self.assertIn(DEFAULT_DOMAIN_ID, id_list)
def test_multiple_filters(self):
"""GET /domains?enabled&name=myname
Test Plan:
- Update policy for no protection on api
- Filter by the 'enabled' boolean and name - this should
return a single domain
"""
new_policy = {"identity:list_domains": []}
self._set_policy(new_policy)
my_url = '/domains?enableds&name=%s' % self.domainA['name']
r = self.get(my_url, auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.domainA['id'], id_list)