diff --git a/keystone/common/controller.py b/keystone/common/controller.py index f4f3c79d70..09da9d7bcf 100644 --- a/keystone/common/controller.py +++ b/keystone/common/controller.py @@ -243,9 +243,28 @@ class V3Controller(V2Controller): @classmethod def filter_by_attribute(cls, context, refs, attr): """Filters a list of references by query string value.""" + + def _attr_match(ref_attr, val_attr): + """Matches attributes allowing for booleans as strings. + + We test explicitly for a value that defines it as 'False', + which also means that the existence of the attribute with + no value implies 'True' + + """ + if type(ref_attr) is bool: + if (isinstance(val_attr, basestring) and + val_attr == '0'): + val = False + else: + val = True + return (ref_attr == val) + else: + return (ref_attr == val_attr) + if attr in context['query_string']: value = context['query_string'][attr] - return [r for r in refs if r[attr] == value] + return [r for r in refs if _attr_match(r[attr], value)] return refs def _require_matching_id(self, value, ref): diff --git a/tests/test_v3.py b/tests/test_v3.py index 9b4f7d34f6..26d46be7a4 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -20,9 +20,14 @@ TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' class RestfulTestCase(test_content_types.RestfulTestCase): - def setUp(self): - rules.reset() + def setUp(self, load_sample_data=True): + """Setup for v3 Restful Test Cases. + If a child class wants to create their own sample data + and provide their own auth data to obtain tokens, then + load_sample_data should be set to false. + + """ self.config([ test.etcdir('keystone.conf.sample'), test.testsdir('test_overrides.conf'), @@ -32,32 +37,33 @@ class RestfulTestCase(test_content_types.RestfulTestCase): sql_util.setup_test_database() self.load_backends() - self.domain_id = uuid.uuid4().hex - self.domain = self.new_domain_ref() - self.domain['id'] = self.domain_id - self.identity_api.create_domain(self.domain_id, self.domain) + if load_sample_data: + self.domain_id = uuid.uuid4().hex + self.domain = self.new_domain_ref() + self.domain['id'] = self.domain_id + self.identity_api.create_domain(self.domain_id, self.domain) - self.project_id = uuid.uuid4().hex - self.project = self.new_project_ref( - domain_id=self.domain_id) - self.project['id'] = self.project_id - self.identity_api.create_project(self.project_id, self.project) + self.project_id = uuid.uuid4().hex + self.project = self.new_project_ref( + domain_id=self.domain_id) + self.project['id'] = self.project_id + self.identity_api.create_project(self.project_id, self.project) - self.user_id = uuid.uuid4().hex - self.user = self.new_user_ref( - domain_id=self.domain_id, - project_id=self.project_id) - self.user['id'] = self.user_id - self.identity_api.create_user(self.user_id, self.user) + self.user_id = uuid.uuid4().hex + self.user = self.new_user_ref( + domain_id=self.domain_id, + project_id=self.project_id) + self.user['id'] = self.user_id + self.identity_api.create_user(self.user_id, self.user) - # create & grant policy.json's default role for admin_required - self.role_id = uuid.uuid4().hex - self.role = self.new_role_ref() - self.role['id'] = self.role_id - self.role['name'] = 'admin' - self.identity_api.create_role(self.role_id, self.role) - self.identity_api.add_role_to_user_and_project( - self.user_id, self.project_id, self.role_id) + # create & grant policy.json's default role for admin_required + self.role_id = uuid.uuid4().hex + self.role = self.new_role_ref() + self.role['id'] = self.role_id + self.role['name'] = 'admin' + self.identity_api.create_role(self.role_id, self.role) + self.identity_api.add_role_to_user_and_project( + self.user_id, self.project_id, self.role_id) self.public_server = self.serveapp('keystone', name='main') self.admin_server = self.serveapp('keystone', name='admin') @@ -752,6 +758,76 @@ class RestfulTestCase(test_content_types.RestfulTestCase): return entity + def build_auth_scope(self, project_id=None, project_name=None, + project_domain_id=None, project_domain_name=None, + domain_id=None, domain_name=None, trust_id=None): + scope_data = {} + if project_id or project_name: + scope_data['project'] = {} + if project_id: + scope_data['project']['id'] = project_id + else: + scope_data['project']['name'] = project_name + if project_domain_id or project_domain_name: + project_domain_json = {} + if project_domain_id: + project_domain_json['id'] = project_domain_id + else: + project_domain_json['name'] = project_domain_name + scope_data['project']['domain'] = project_domain_json + if domain_id or domain_name: + scope_data['domain'] = {} + if domain_id: + scope_data['domain']['id'] = domain_id + else: + scope_data['domain']['name'] = domain_name + if trust_id: + scope_data['trust'] = {} + scope_data['trust']['id'] = trust_id + return scope_data + + def build_password_auth(self, user_id=None, username=None, + user_domain_id=None, user_domain_name=None, + password=None): + password_data = {'user': {}} + if user_id: + password_data['user']['id'] = user_id + else: + password_data['user']['name'] = username + if user_domain_id or user_domain_name: + password_data['user']['domain'] = {} + if user_domain_id: + password_data['user']['domain']['id'] = user_domain_id + else: + password_data['user']['domain']['name'] = user_domain_name + password_data['user']['password'] = password + return password_data + + def build_token_auth(self, token): + return {'id': token} + + def build_authentication_request(self, token=None, user_id=None, + username=None, user_domain_id=None, + user_domain_name=None, password=None, + **kwargs): + """Build auth dictionary. + + It will create an auth dictionary based on all the arguments + that it receives. + """ + auth_data = {} + auth_data['identity'] = {'methods': []} + if token: + auth_data['identity']['methods'].append('token') + auth_data['identity']['token'] = self.build_token_auth(token) + if user_id or username: + auth_data['identity']['methods'].append('password') + auth_data['identity']['password'] = self.build_password_auth( + user_id, username, user_domain_id, user_domain_name, password) + if kwargs: + auth_data['scope'] = self.build_auth_scope(**kwargs) + return {'auth': auth_data} + class VersionTestCase(RestfulTestCase): def test_get_version(self): diff --git a/tests/test_v3_auth.py b/tests/test_v3_auth.py index 4d6e076107..df62a3e71a 100644 --- a/tests/test_v3_auth.py +++ b/tests/test_v3_auth.py @@ -29,80 +29,15 @@ import test_v3 CONF = config.CONF -def _build_auth_scope(project_id=None, project_name=None, - project_domain_id=None, project_domain_name=None, - domain_id=None, domain_name=None, trust_id=None): - scope_data = {} - if project_id or project_name: - scope_data['project'] = {} - if project_id: - scope_data['project']['id'] = project_id - else: - scope_data['project']['name'] = project_name - if project_domain_id or project_domain_name: - project_domain_json = {} - if project_domain_id: - project_domain_json['id'] = project_domain_id - else: - project_domain_json['name'] = project_domain_name - scope_data['project']['domain'] = project_domain_json - if domain_id or domain_name: - scope_data['domain'] = {} - if domain_id: - scope_data['domain']['id'] = domain_id - else: - scope_data['domain']['name'] = domain_name - if trust_id: - scope_data['trust'] = {} - scope_data['trust']['id'] = trust_id - return scope_data +class TestAuthInfo(test_v3.RestfulTestCase): + # TDOD(henry-nash) These tests are somewhat inefficient, since by + # using the test_v3.RestfulTestCase class to gain access to the auth + # building helper functions, they cause backend databases and fixtures + # to be loaded unnecessarily. Separating out the helper functions from + # this base class would improve efficiency (Bug #1134836) + def setUp(self): + super(TestAuthInfo, self).setUp(load_sample_data=False) - -def _build_password_auth(user_id=None, username=None, - user_domain_id=None, user_domain_name=None, - password=None): - password_data = {'user': {}} - if user_id: - password_data['user']['id'] = user_id - else: - password_data['user']['name'] = username - if user_domain_id or user_domain_name: - password_data['user']['domain'] = {} - if user_domain_id: - password_data['user']['domain']['id'] = user_domain_id - else: - password_data['user']['domain']['name'] = user_domain_name - password_data['user']['password'] = password - return password_data - - -def _build_token_auth(token): - return {'id': token} - - -def _build_authentication_request(token=None, user_id=None, username=None, - user_domain_id=None, user_domain_name=None, - password=None, **kwargs): - """Build auth dictionary. - - It will create an auth dictionary based on all the arguments - that it receives. - """ - auth_data = {} - auth_data['identity'] = {'methods': []} - if token: - auth_data['identity']['methods'].append('token') - auth_data['identity']['token'] = _build_token_auth(token) - if user_id or username: - auth_data['identity']['methods'].append('password') - auth_data['identity']['password'] = _build_password_auth( - user_id, username, user_domain_id, user_domain_name, password) - if kwargs: - auth_data['scope'] = _build_auth_scope(**kwargs) - return {'auth': auth_data} - - -class TestAuthInfo(test.TestCase): def test_missing_auth_methods(self): auth_data = {'identity': {}} auth_data['identity']['token'] = {'id': uuid.uuid4().hex} @@ -129,19 +64,21 @@ class TestAuthInfo(test.TestCase): auth_data) def test_project_name_no_domain(self): - auth_data = _build_authentication_request(username='test', - password='test', - project_name='abc')['auth'] + auth_data = self.build_authentication_request( + username='test', + password='test', + project_name='abc')['auth'] self.assertRaises(exception.ValidationError, auth.controllers.AuthInfo, None, auth_data) def test_both_project_and_domain_in_scope(self): - auth_data = _build_authentication_request(user_id='test', - password='test', - project_name='test', - domain_name='test')['auth'] + auth_data = self.build_authentication_request( + user_id='test', + password='test', + project_name='test', + domain_name='test')['auth'] self.assertRaises(exception.ValidationError, auth.controllers.AuthInfo, None, @@ -151,7 +88,7 @@ class TestAuthInfo(test.TestCase): class TestTokenAPIs(test_v3.RestfulTestCase): def setUp(self): super(TestTokenAPIs, self).setUp() - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain_id, password=self.user['password']) @@ -165,7 +102,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase): def test_v3_pki_token_id(self): self.opt_in_group('signing', token_format='PKI') - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) resp = self.post('/auth/tokens', body=auth_data) @@ -181,7 +118,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase): # FIXME(gyee): PKI tokens are not interchangeable because token # data is baked into the token itself. self.opt_in_group('signing', token_format='UUID') - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], project_id=self.project['id']) @@ -208,7 +145,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase): # FIXME(gyee): PKI tokens are not interchangeable because token # data is baked into the token itself. self.opt_in_group('signing', token_format='PKI') - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], project_id=self.project['id']) @@ -287,7 +224,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase): def test_rescoping_token(self): expires = self.token_data['token']['expires_at'] - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( token=self.token, project_id=self.project_id) r = self.post('/auth/tokens', body=auth_data) @@ -316,14 +253,14 @@ class TestAuthJSON(test_v3.RestfulTestCase): content_type = 'json' def test_unscoped_token_with_user_id(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) r = self.post('/auth/tokens', body=auth_data) self.assertValidUnscopedTokenResponse(r) def test_unscoped_token_with_user_domain_id(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain['id'], password=self.user['password']) @@ -331,7 +268,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.assertValidUnscopedTokenResponse(r) def test_unscoped_token_with_user_domain_name(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=self.domain['name'], password=self.user['password']) @@ -339,7 +276,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.assertValidUnscopedTokenResponse(r) def test_project_id_scoped_token_with_user_id(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], project_id=self.project['id']) @@ -351,14 +288,14 @@ class TestAuthJSON(test_v3.RestfulTestCase): project = self.new_project_ref(domain_id=self.domain_id) self.identity_api.create_project(project_id, project) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], project_id=project['id']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_project_id_scoped_token_with_user_domain_id(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain['id'], password=self.user['password'], @@ -367,7 +304,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.assertValidProjectScopedTokenResponse(r) def test_project_id_scoped_token_with_user_domain_name(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=self.domain['name'], password=self.user['password'], @@ -380,7 +317,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) @@ -392,7 +329,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain['id'], password=self.user['password'], @@ -405,7 +342,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=self.domain['name'], password=self.user['password'], @@ -418,7 +355,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_name=self.domain['name']) @@ -430,7 +367,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain['id'], password=self.user['password'], @@ -443,7 +380,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=self.domain['name'], password=self.user['password'], @@ -467,7 +404,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.put(path=path) # now get a domain-scoped token - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) @@ -480,7 +417,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) # now get a domain-scoped token - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_name=self.domain['name']) @@ -488,14 +425,14 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_failed(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_auth_with_id(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) r = self.post('/auth/tokens', body=auth_data) @@ -505,45 +442,45 @@ class TestAuthJSON(test_v3.RestfulTestCase): headers = {'X-Subject-Token': r.getheader('X-Subject-Token')} # test token auth - auth_data = _build_authentication_request(token=token) + auth_data = self.build_authentication_request(token=token) r = self.post('/auth/tokens', body=auth_data) self.assertValidUnscopedTokenResponse(r) def test_invalid_user_id(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=uuid.uuid4().hex, password=self.user['password']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_invalid_user_name(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=uuid.uuid4().hex, user_domain_id=self.domain['id'], password=self.user['password']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_invalid_domain_id(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=uuid.uuid4().hex, password=self.user['password']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_invalid_domain_name(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=uuid.uuid4().hex, password=self.user['password']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_invalid_password(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=uuid.uuid4().hex) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_remote_user(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'])['auth'] api = auth.controllers.Auth() @@ -554,7 +491,7 @@ class TestAuthJSON(test_v3.RestfulTestCase): self.assertEqual(auth_context['user_id'], self.user['id']) def test_remote_user_no_domain(self): - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( username=self.user['name'], password=self.user['password'])['auth'] api = auth.controllers.Auth() @@ -701,7 +638,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): 'trust_id': trust['id']}, expected_status=404) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) @@ -720,7 +657,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): r = self.post('/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) @@ -751,7 +688,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): r = self.post('/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) @@ -793,7 +730,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): 'trust_id': trust['id']}, expected_status=404) - auth_data = _build_authentication_request( + auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) diff --git a/tests/test_v3_protection.py b/tests/test_v3_protection.py index 5b46169311..c68a203733 100644 --- a/tests/test_v3_protection.py +++ b/tests/test_v3_protection.py @@ -22,54 +22,87 @@ import uuid import nose.exc from keystone import config +from keystone import exception +from keystone.openstack.common import jsonutils from keystone.policy.backends import rules import test_v3 CONF = config.CONF +DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id class IdentityTestProtectedCase(test_v3.RestfulTestCase): """Test policy protection of a sample of v3 identity apis""" def setUp(self): - super(IdentityTestProtectedCase, self).setUp() + """Setup for Identity Protection Test Cases. + + As well as the usual housekeeping, create a set of domains, + users, roles and projects for the subsequent tests: + + - Three domains: A,B & C. C is disabled. + - DomainA has user1, DomainB has user2 and user3 + - DomainA has group1 and group2, DomainB has group3 + - User1 has a role on DomainA + + Remember that there will also be a fourth domain in existence, + the default domain. + + """ + # Ensure that test_v3.RestfulTestCase doesn't load its own + # sample data, which would make checking the results of our + # tests harder + super(IdentityTestProtectedCase, self).setUp(load_sample_data=False) # Start by creating a couple of domains self.domainA = self.new_domain_ref() domainA_ref = self.identity_api.create_domain(self.domainA['id'], self.domainA) - self.domainB = self.new_domain_ref() domainB_ref = self.identity_api.create_domain(self.domainB['id'], self.domainB) + self.domainC = self.new_domain_ref() + self.domainC['enabled'] = False + domainC_ref = self.identity_api.create_domain(self.domainC['id'], + self.domainC) # Now create some users, one in domainA and two of them in domainB self.user1 = self.new_user_ref( domain_id=self.domainA['id']) self.user1['password'] = uuid.uuid4().hex - user_ref = self.identity_api.create_user(self.user1['id'], self.user1) + user_ref = self.identity_api.create_user(self.user1['id'], + self.user1) self.user2 = self.new_user_ref( domain_id=self.domainB['id']) self.user2['password'] = uuid.uuid4().hex - user_ref = self.identity_api.create_user(self.user2['id'], self.user2) + user_ref = self.identity_api.create_user(self.user2['id'], + self.user2) self.user3 = self.new_user_ref( domain_id=self.domainB['id']) self.user3['password'] = uuid.uuid4().hex - user_ref = self.identity_api.create_user(self.user3['id'], self.user3) + user_ref = self.identity_api.create_user(self.user3['id'], + self.user3) - self.project = self.new_project_ref( + self.group1 = self.new_group_ref( domain_id=self.domainA['id']) - project_ref = self.identity_api.create_project(self.project['id'], - self.project) + user_ref = self.identity_api.create_group(self.group1['id'], + self.group1) + + self.group2 = self.new_group_ref( + domain_id=self.domainA['id']) + user_ref = self.identity_api.create_group(self.group2['id'], + self.group2) + + self.group3 = self.new_group_ref( + domain_id=self.domainB['id']) + user_ref = self.identity_api.create_group(self.group3['id'], + self.group3) self.role = self.new_role_ref() self.identity_api.create_role(self.role['id'], self.role) - self.identity_api.add_role_to_user_and_project(self.user1['id'], - self.project['id'], - self.role['id']) self.identity_api.create_grant(self.role['id'], user_id=self.user1['id'], domain_id=self.domainA['id']) @@ -82,15 +115,9 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase): self.opt(policy_file=self.tmpfilename) # A default auth request we can use - un-scoped user token - self.auth = {} - self.auth['identity'] = {'methods': []} - self.auth['identity']['methods'].append('password') - self.auth['identity']['password'] = {'user': {}} - self.auth['identity']['password']['user']['id'] = ( - self.user1['id']) - self.auth['identity']['password']['user']['password'] = ( - self.user1['password']) - self.auth = {'auth': self.auth} + self.auth = self.build_authentication_request( + user_id=self.user1['id'], + password=self.user1['password']) def tearDown(self): super(IdentityTestProtectedCase, self).tearDown() @@ -103,13 +130,20 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase): result_list.append(x['id']) return result_list - def test_list_users_unprotected(self): - """GET /users (unprotected)""" - - # Make sure we get all the users back if no protection - # or filtering + def _set_policy(self, new_policy): with open(self.tmpfilename, "w") as policyfile: - policyfile.write("""{"identity:list_users": []}""") + policyfile.write(jsonutils.dumps(new_policy)) + + def test_list_users_unprotected(self): + """GET /users (unprotected) + + Test Plan: + - Update policy so api is unprotected + - Use an un-scoped token to make sure we can get back all + the users independent of domain + + """ + self._set_policy({"identity:list_users": []}) r = self.get('/users', auth=self.auth) id_list = self._get_id_list_from_ref_list(r.body.get('users')) self.assertIn(self.user1['id'], id_list) @@ -117,11 +151,15 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase): self.assertIn(self.user3['id'], id_list) def test_list_users_filtered_by_domain(self): - """GET /users?domain_id=mydomain """ + """GET /users?domain_id=mydomain (filtered) - # Using no protection, make sure filtering works - with open(self.tmpfilename, "w") as policyfile: - policyfile.write("""{"identity:list_users": []}""") + Test Plan: + - Update policy so api is unprotected + - Use an un-scoped token to make sure we can filter the + users by domainB, getting back the 2 users in that domain + + """ + self._set_policy({"identity:list_users": []}) url_by_name = '/users?domain_id=%s' % self.domainB['id'] r = self.get(url_by_name, auth=self.auth) # We should get back two users, those in DomainB @@ -129,28 +167,161 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase): self.assertIn(self.user2['id'], id_list) self.assertIn(self.user3['id'], id_list) - def test_list_users_protected_by_domain(self): - """GET /users?domain_id=mydomain (protected)""" - - # Update policy to protect by domain, and then use a domain - # scoped token - new_policy = """{"identity:list_users": ["domain_id:%(domain_id)s"]}""" - with open(self.tmpfilename, "w") as policyfile: - policyfile.write(new_policy) - self.auth['auth']['scope'] = {'domain': {'id': self.domainA['id']}} - url_by_name = '/users?domain_id=%s' % self.user1['domain_id'] - r = self.get(url_by_name, auth=self.auth) - # We should only get back one user, the one in DomainA - id_list = self._get_id_list_from_ref_list(r.body.get('users')) - self.assertIn(self.user1['id'], id_list) - def test_get_user_protected_match_id(self): - """GET /users/{id} (match payload)""" - # Tests the flattening of the payload - policy = {"identity:get_user": [["user_id:%(user_id)s"]]} - with open(self.tmpfilename, "w") as policyfile: - policyfile.write(json.dumps(policy)) + """GET /users/{id} (match payload) + + Test Plan: + - Update policy to protect api by user_id + - List users with user_id of user1 as filter, to check that + this will correctly match user_id in the flattened + payload + + """ + # TODO (henry-nash, ayoung): It would be good to expand this + # test for further test flattening, e.g. protect on, say, an + # attribute of an object being created + new_policy = {"identity:get_user": [["user_id:%(user_id)s"]]} + self._set_policy(new_policy) url_by_name = '/users/%s' % self.user1['id'] r = self.get(url_by_name, auth=self.auth) body = r.body self.assertEquals(self.user1['id'], body['user']['id']) + + def test_list_users_protected_by_domain(self): + """GET /users?domain_id=mydomain (protected) + + Test Plan: + - Update policy to protect api by domain_id + - List groups using a token scoped to domainA with a filter + specifying domainA - we should only get back the one user + that is in domainA. + - Try and read the users from domainB - this should fail since + we don't have a token scoped for domainB + + """ + new_policy = {"identity:list_users": ["domain_id:%(domain_id)s"]} + self._set_policy(new_policy) + self.auth = self.build_authentication_request( + user_id=self.user1['id'], + password=self.user1['password'], + domain_id=self.domainA['id']) + url_by_name = '/users?domain_id=%s' % self.domainA['id'] + r = self.get(url_by_name, auth=self.auth) + # We should only get back one user, the one in DomainA + id_list = self._get_id_list_from_ref_list(r.body.get('users')) + self.assertEqual(len(id_list), 1) + self.assertIn(self.user1['id'], id_list) + + # Now try for domainB, which should fail + url_by_name = '/users?domain_id=%s' % self.domainB['id'] + r = self.get(url_by_name, auth=self.auth, + expected_status=exception.ForbiddenAction.code) + + def test_list_groups_protected_by_domain(self): + """GET /groups?domain_id=mydomain (protected) + + Test Plan: + - Update policy to protect api by domain_id + - List groups using a token scoped to domainA and make sure + we only get back the two groups that are in domainA + - Try and read the groups from domainB - this should fail since + we don't have a token scoped for domainB + + """ + new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]} + self._set_policy(new_policy) + self.auth = self.build_authentication_request( + user_id=self.user1['id'], + password=self.user1['password'], + domain_id=self.domainA['id']) + url_by_name = '/groups?domain_id=%s' % self.domainA['id'] + r = self.get(url_by_name, auth=self.auth) + # We should only get back two groups, the ones in DomainA + id_list = self._get_id_list_from_ref_list(r.body.get('groups')) + self.assertEqual(len(id_list), 2) + self.assertIn(self.group1['id'], id_list) + self.assertIn(self.group2['id'], id_list) + + # Now try for domainB, which should fail + url_by_name = '/groups?domain_id=%s' % self.domainB['id'] + r = self.get(url_by_name, auth=self.auth, + expected_status=exception.ForbiddenAction.code) + + def test_list_groups_protected_by_domain_and_filtered(self): + """GET /groups?domain_id=mydomain&name=myname (protected) + + Test Plan: + - Update policy to protect api by domain_id + - List groups using a token scoped to domainA with a filter + specifying both domainA and the name of group. + - We should only get back the group in domainA that matches + the name + + """ + new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]} + self._set_policy(new_policy) + self.auth = self.build_authentication_request( + user_id=self.user1['id'], + password=self.user1['password'], + domain_id=self.domainA['id']) + url_by_name = '/groups?domain_id=%s&name=%s' % ( + self.domainA['id'], self.group2['name']) + r = self.get(url_by_name, auth=self.auth) + # We should only get back one user, the one in DomainA that matches + # the name supplied + id_list = self._get_id_list_from_ref_list(r.body.get('groups')) + self.assertEqual(len(id_list), 1) + self.assertIn(self.group2['id'], id_list) + + def test_list_filtered_domains(self): + """GET /domains?enabled=0 + + Test Plan: + - Update policy for no protection on api + - Filter by the 'enabled' boolean to get disabled domains, which + should return just domainC + - Try the filter using different ways of specifying 'true' + to test that our handling of booleans in filter matching is + correct + + """ + new_policy = {"identity:list_domains": []} + self._set_policy(new_policy) + r = self.get('/domains?enabled=0', auth=self.auth) + id_list = self._get_id_list_from_ref_list(r.body.get('domains')) + self.assertEqual(len(id_list), 1) + self.assertIn(self.domainC['id'], id_list) + + # Now try a few ways of specifying 'true' when we should get back + # the other two domains, plus the default domain + r = self.get('/domains?enabled=1', auth=self.auth) + id_list = self._get_id_list_from_ref_list(r.body.get('domains')) + self.assertEqual(len(id_list), 3) + self.assertIn(self.domainA['id'], id_list) + self.assertIn(self.domainB['id'], id_list) + self.assertIn(DEFAULT_DOMAIN_ID, id_list) + + r = self.get('/domains?enabled', auth=self.auth) + id_list = self._get_id_list_from_ref_list(r.body.get('domains')) + self.assertEqual(len(id_list), 3) + self.assertIn(self.domainA['id'], id_list) + self.assertIn(self.domainB['id'], id_list) + self.assertIn(DEFAULT_DOMAIN_ID, id_list) + + def test_multiple_filters(self): + """GET /domains?enabled&name=myname + + Test Plan: + - Update policy for no protection on api + - Filter by the 'enabled' boolean and name - this should + return a single domain + + """ + new_policy = {"identity:list_domains": []} + self._set_policy(new_policy) + + my_url = '/domains?enableds&name=%s' % self.domainA['name'] + r = self.get(my_url, auth=self.auth) + id_list = self._get_id_list_from_ref_list(r.body.get('domains')) + self.assertEqual(len(id_list), 1) + self.assertIn(self.domainA['id'], id_list)