# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 OpenStack LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import datetime import default_fixtures import uuid import nose.exc from keystone.catalog import core from keystone import config from keystone import exception from keystone.openstack.common import timeutils from keystone import config from keystone import test CONF = config.CONF DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id class IdentityTests(object): def test_project_add_and_remove_user_role(self): user_refs = self.identity_api.get_project_users(self.tenant_bar['id']) self.assertNotIn(self.user_two['id'], [x['id'] for x in user_refs]) self.identity_api.add_role_to_user_and_project( tenant_id=self.tenant_bar['id'], user_id=self.user_two['id'], role_id=self.role_other['id']) user_refs = self.identity_api.get_project_users(self.tenant_bar['id']) self.assertIn(self.user_two['id'], [x['id'] for x in user_refs]) self.identity_api.remove_role_from_user_and_project( tenant_id=self.tenant_bar['id'], user_id=self.user_two['id'], role_id=self.role_other['id']) user_refs = self.identity_api.get_project_users(self.tenant_bar['id']) self.assertNotIn(self.user_two['id'], [x['id'] for x in user_refs]) def test_authenticate_bad_user(self): self.assertRaises(AssertionError, self.identity_api.authenticate, user_id=uuid.uuid4().hex, tenant_id=self.tenant_bar['id'], password=self.user_foo['password']) def test_authenticate_bad_password(self): self.assertRaises(AssertionError, self.identity_api.authenticate, user_id=self.user_foo['id'], tenant_id=self.tenant_bar['id'], password=uuid.uuid4().hex) def test_authenticate_bad_project(self): self.assertRaises(AssertionError, self.identity_api.authenticate, user_id=self.user_foo['id'], tenant_id=uuid.uuid4().hex, password=self.user_foo['password']) def test_authenticate_no_project(self): user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( user_id=self.user_foo['id'], password=self.user_foo['password']) # NOTE(termie): the password field is left in user_foo to make # it easier to authenticate in tests, but should # not be returned by the api self.user_foo.pop('password') self.assertDictEqual(user_ref, self.user_foo) self.assert_(tenant_ref is None) self.assert_(not metadata_ref) def test_authenticate(self): user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( user_id=self.user_sna['id'], tenant_id=self.tenant_bar['id'], password=self.user_sna['password']) # NOTE(termie): the password field is left in user_foo to make # it easier to authenticate in tests, but should # not be returned by the api self.user_sna.pop('password') self.user_sna['enabled'] = True self.assertDictEqual(user_ref, self.user_sna) self.assertDictEqual(tenant_ref, self.tenant_bar) metadata_ref.pop('roles') self.assertDictEqual(metadata_ref, self.metadata_snamtu) def test_authenticate_role_return(self): self.identity_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_baz['id'], 'keystone_admin') user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( user_id=self.user_foo['id'], tenant_id=self.tenant_baz['id'], password=self.user_foo['password']) self.assertIn('roles', metadata_ref) self.assertIn('keystone_admin', metadata_ref['roles']) def test_authenticate_no_metadata(self): user = { 'id': 'no_meta', 'name': 'NO_META', 'domain_id': DEFAULT_DOMAIN_ID, 'password': 'no_meta2', } self.identity_api.create_user(user['id'], user) self.identity_api.add_user_to_project(self.tenant_baz['id'], user['id']) user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( user_id=user['id'], tenant_id=self.tenant_baz['id'], password=user['password']) # NOTE(termie): the password field is left in user_foo to make # it easier to authenticate in tests, but should # not be returned by the api user.pop('password') self.assertEquals(metadata_ref, {"roles": [CONF.member_role_id]}) self.assertDictEqual(user_ref, user) self.assertDictEqual(tenant_ref, self.tenant_baz) def test_password_hashed(self): user_ref = self.identity_api._get_user(self.user_foo['id']) self.assertNotEqual(user_ref['password'], self.user_foo['password']) def test_get_project(self): tenant_ref = self.identity_api.get_project( tenant_id=self.tenant_bar['id']) self.assertDictEqual(tenant_ref, self.tenant_bar) def test_get_project_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.get_project, tenant_id=uuid.uuid4().hex) def test_get_project_by_name(self): tenant_ref = self.identity_api.get_project_by_name( tenant_name=self.tenant_bar['name'], domain_id=DEFAULT_DOMAIN_ID) self.assertDictEqual(tenant_ref, self.tenant_bar) def test_get_project_by_name_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.get_project_by_name, tenant_name=uuid.uuid4().hex, domain_id=DEFAULT_DOMAIN_ID) def test_get_project_users(self): tenant_ref = self.identity_api.get_project_users(self.tenant_baz['id']) user_ids = [] for user in tenant_ref: self.assertNotIn('password', user) user_ids.append(user.get('id')) self.assertEquals(len(user_ids), 2) self.assertIn(self.user_two['id'], user_ids) self.assertIn(self.user_badguy['id'], user_ids) def test_get_project_users_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.get_project_users, tenant_id=uuid.uuid4().hex) def test_get_user(self): user_ref = self.identity_api.get_user(user_id=self.user_foo['id']) # NOTE(termie): the password field is left in user_foo to make # it easier to authenticate in tests, but should # not be returned by the api self.user_foo.pop('password') self.assertDictEqual(user_ref, self.user_foo) def test_get_user_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.get_user, user_id=uuid.uuid4().hex) def test_get_user_by_name(self): user_ref = self.identity_api.get_user_by_name( user_name=self.user_foo['name'], domain_id=DEFAULT_DOMAIN_ID) # NOTE(termie): the password field is left in user_foo to make # it easier to authenticate in tests, but should # not be returned by the api self.user_foo.pop('password') self.assertDictEqual(user_ref, self.user_foo) def test_get_user_by_name_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.get_user_by_name, user_name=uuid.uuid4().hex, domain_id=DEFAULT_DOMAIN_ID) def test_get_metadata(self): metadata_ref = self.identity_api.get_metadata( user_id=self.user_sna['id'], tenant_id=self.tenant_bar['id']) metadata_ref.pop('roles') self.assertDictEqual(metadata_ref, self.metadata_snamtu) def test_get_metadata_404(self): # FIXME(dolph): these exceptions could be more specific self.assertRaises(exception.NotFound, self.identity_api.get_metadata, user_id=uuid.uuid4().hex, tenant_id=self.tenant_bar['id']) self.assertRaises(exception.NotFound, self.identity_api.get_metadata, user_id=self.user_foo['id'], tenant_id=uuid.uuid4().hex) def test_get_role(self): role_ref = self.identity_api.get_role( role_id=self.role_keystone_admin['id']) role_ref_dict = dict((x, role_ref[x]) for x in role_ref) self.assertDictEqual(role_ref_dict, self.role_keystone_admin) def test_get_role_404(self): self.assertRaises(exception.RoleNotFound, self.identity_api.get_role, role_id=uuid.uuid4().hex) def test_create_duplicate_role_name_fails(self): role = {'id': 'fake1', 'name': 'fake1name'} self.identity_api.create_role('fake1', role) role['id'] = 'fake2' self.assertRaises(exception.Conflict, self.identity_api.create_role, 'fake2', role) def test_rename_duplicate_role_name_fails(self): role1 = { 'id': 'fake1', 'name': 'fake1name' } role2 = { 'id': 'fake2', 'name': 'fake2name' } self.identity_api.create_role('fake1', role1) self.identity_api.create_role('fake2', role2) role1['name'] = 'fake2name' self.assertRaises(exception.Conflict, self.identity_api.update_role, 'fake1', role1) def test_create_duplicate_user_id_fails(self): user = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID, 'password': 'fakepass', 'tenants': ['bar']} self.identity_api.create_user('fake1', user) user['name'] = 'fake2' self.assertRaises(exception.Conflict, self.identity_api.create_user, 'fake1', user) def test_create_duplicate_user_name_fails(self): user = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID, 'password': 'fakepass', 'tenants': ['bar']} self.identity_api.create_user('fake1', user) user['id'] = 'fake2' self.assertRaises(exception.Conflict, self.identity_api.create_user, 'fake2', user) def test_create_duplicate_user_name_in_different_domains(self): new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(new_domain['id'], new_domain) user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, 'password': uuid.uuid4().hex} user2 = {'id': uuid.uuid4().hex, 'name': user1['name'], 'domain_id': new_domain['id'], 'password': uuid.uuid4().hex} self.identity_api.create_user(user1['id'], user1) self.identity_api.create_user(user2['id'], user2) def test_move_user_between_domains(self): domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'password': uuid.uuid4().hex} self.identity_api.create_user(user['id'], user) user['domain_id'] = domain2['id'] self.identity_api.update_user(user['id'], user) def test_move_user_between_domains_with_clashing_names_fails(self): domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) # First, create a user in domain1 user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'password': uuid.uuid4().hex} self.identity_api.create_user(user1['id'], user1) # Now create a user in domain2 with a potentially clashing # name - which should work since we have domain separation user2 = {'id': uuid.uuid4().hex, 'name': user1['name'], 'domain_id': domain2['id'], 'password': uuid.uuid4().hex} self.identity_api.create_user(user2['id'], user2) # Now try and move user1 into the 2nd domain - which should # fail since the names clash user1['domain_id'] = domain2['id'] self.assertRaises(exception.Conflict, self.identity_api.update_user, user1['id'], user1) def test_rename_duplicate_user_name_fails(self): user1 = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID, 'password': 'fakepass', 'tenants': ['bar']} user2 = {'id': 'fake2', 'name': 'fake2', 'domain_id': DEFAULT_DOMAIN_ID, 'password': 'fakepass', 'tenants': ['bar']} self.identity_api.create_user('fake1', user1) self.identity_api.create_user('fake2', user2) user2['name'] = 'fake1' self.assertRaises(exception.Conflict, self.identity_api.update_user, 'fake2', user2) def test_update_user_id_fails(self): user = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID, 'password': 'fakepass', 'tenants': ['bar']} self.identity_api.create_user('fake1', user) user['id'] = 'fake2' self.assertRaises(exception.ValidationError, self.identity_api.update_user, 'fake1', user) user_ref = self.identity_api.get_user('fake1') self.assertEqual(user_ref['id'], 'fake1') self.assertRaises(exception.UserNotFound, self.identity_api.get_user, 'fake2') def test_create_duplicate_project_id_fails(self): tenant = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_project('fake1', tenant) tenant['name'] = 'fake2' self.assertRaises(exception.Conflict, self.identity_api.create_project, 'fake1', tenant) def test_create_duplicate_project_name_fails(self): tenant = {'id': 'fake1', 'name': 'fake', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_project('fake1', tenant) tenant['id'] = 'fake2' self.assertRaises(exception.Conflict, self.identity_api.create_project, 'fake1', tenant) def test_create_duplicate_project_name_in_different_domains(self): new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(new_domain['id'], new_domain) tenant1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID} tenant2 = {'id': uuid.uuid4().hex, 'name': tenant1['name'], 'domain_id': new_domain['id']} self.identity_api.create_project(tenant1['id'], tenant1) self.identity_api.create_project(tenant2['id'], tenant2) def test_move_project_between_domains(self): domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id']} self.identity_api.create_project(project['id'], project) project['domain_id'] = domain2['id'] self.identity_api.update_project(project['id'], project) def test_move_project_between_domains_with_clashing_names_fails(self): domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) # First, create a project in domain1 project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id']} self.identity_api.create_project(project1['id'], project1) # Now create a project in domain2 with a potentially clashing # name - which should work since we have domain separation project2 = {'id': uuid.uuid4().hex, 'name': project1['name'], 'domain_id': domain2['id']} self.identity_api.create_project(project2['id'], project2) # Now try and move project1 into the 2nd domain - which should # fail since the names clash project1['domain_id'] = domain2['id'] self.assertRaises(exception.Conflict, self.identity_api.update_project, project1['id'], project1) def test_rename_duplicate_project_name_fails(self): tenant1 = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} tenant2 = {'id': 'fake2', 'name': 'fake2', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_project('fake1', tenant1) self.identity_api.create_project('fake2', tenant2) tenant2['name'] = 'fake1' self.assertRaises(exception.Error, self.identity_api.update_project, 'fake2', tenant2) def test_update_project_id_does_nothing(self): tenant = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_project('fake1', tenant) tenant['id'] = 'fake2' self.identity_api.update_project('fake1', tenant) tenant_ref = self.identity_api.get_project('fake1') self.assertEqual(tenant_ref['id'], 'fake1') self.assertRaises(exception.ProjectNotFound, self.identity_api.get_project, 'fake2') def test_add_duplicate_role_grant(self): roles_ref = self.identity_api.get_roles_for_user_and_project( self.user_foo['id'], self.tenant_bar['id']) self.assertNotIn('keystone_admin', roles_ref) self.identity_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_bar['id'], 'keystone_admin') self.assertRaises(exception.Conflict, self.identity_api.add_role_to_user_and_project, self.user_foo['id'], self.tenant_bar['id'], 'keystone_admin') def test_get_role_by_user_and_project(self): roles_ref = self.identity_api.get_roles_for_user_and_project( self.user_foo['id'], self.tenant_bar['id']) self.assertNotIn('keystone_admin', roles_ref) self.identity_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_bar['id'], 'keystone_admin') roles_ref = self.identity_api.get_roles_for_user_and_project( self.user_foo['id'], self.tenant_bar['id']) self.assertIn('keystone_admin', roles_ref) self.assertNotIn('member', roles_ref) self.identity_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_bar['id'], 'member') roles_ref = self.identity_api.get_roles_for_user_and_project( self.user_foo['id'], self.tenant_bar['id']) self.assertIn('keystone_admin', roles_ref) self.assertIn('member', roles_ref) def test_get_roles_for_user_and_project_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.get_roles_for_user_and_project, uuid.uuid4().hex, self.tenant_bar['id']) self.assertRaises(exception.ProjectNotFound, self.identity_api.get_roles_for_user_and_project, self.user_foo['id'], uuid.uuid4().hex) def test_add_role_to_user_and_project_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.add_role_to_user_and_project, uuid.uuid4().hex, self.tenant_bar['id'], 'keystone_admin') self.assertRaises(exception.ProjectNotFound, self.identity_api.add_role_to_user_and_project, self.user_foo['id'], uuid.uuid4().hex, 'keystone_admin') self.assertRaises(exception.RoleNotFound, self.identity_api.add_role_to_user_and_project, self.user_foo['id'], self.tenant_bar['id'], uuid.uuid4().hex) def test_remove_role_from_user_and_project(self): self.identity_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_bar['id'], 'member') self.identity_api.remove_role_from_user_and_project( self.user_foo['id'], self.tenant_bar['id'], 'member') roles_ref = self.identity_api.get_roles_for_user_and_project( self.user_foo['id'], self.tenant_bar['id']) self.assertNotIn('member', roles_ref) self.assertRaises(exception.NotFound, self.identity_api.remove_role_from_user_and_project, self.user_foo['id'], self.tenant_bar['id'], 'member') def test_get_role_grant_by_user_and_project(self): roles_ref = self.identity_api.list_grants( user_id=self.user_foo['id'], project_id=self.tenant_bar['id']) self.assertEquals(len(roles_ref), 1) self.identity_api.create_grant(user_id=self.user_foo['id'], project_id=self.tenant_bar['id'], role_id='keystone_admin') roles_ref = self.identity_api.list_grants( user_id=self.user_foo['id'], project_id=self.tenant_bar['id']) self.assertDictEqual(roles_ref[1], self.role_keystone_admin) self.identity_api.create_grant(user_id=self.user_foo['id'], project_id=self.tenant_bar['id'], role_id='member') roles_ref = self.identity_api.list_grants( user_id=self.user_foo['id'], project_id=self.tenant_bar['id']) roles_ref_ids = [] for i, ref in enumerate(roles_ref): roles_ref_ids.append(ref['id']) self.assertIn('keystone_admin', roles_ref_ids) self.assertIn('member', roles_ref_ids) def test_get_role_grants_for_user_and_project_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.list_grants, user_id=uuid.uuid4().hex, project_id=self.tenant_bar['id']) self.assertRaises(exception.ProjectNotFound, self.identity_api.list_grants, user_id=self.user_foo['id'], project_id=uuid.uuid4().hex) def test_add_role_grant_to_user_and_project_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.create_grant, user_id=uuid.uuid4().hex, project_id=self.tenant_bar['id'], role_id='keystone_admin') self.assertRaises(exception.ProjectNotFound, self.identity_api.create_grant, user_id=self.user_foo['id'], project_id=uuid.uuid4().hex, role_id='keystone_admin') self.assertRaises(exception.RoleNotFound, self.identity_api.create_grant, user_id=self.user_foo['id'], project_id=self.tenant_bar['id'], role_id=uuid.uuid4().hex) def test_remove_role_grant_from_user_and_project(self): self.identity_api.create_grant(user_id=self.user_foo['id'], project_id=self.tenant_baz['id'], role_id='member') roles_ref = self.identity_api.list_grants( user_id=self.user_foo['id'], project_id=self.tenant_baz['id']) self.assertDictEqual(roles_ref[0], self.role_member) self.identity_api.delete_grant(user_id=self.user_foo['id'], project_id=self.tenant_baz['id'], role_id='member') roles_ref = self.identity_api.list_grants( user_id=self.user_foo['id'], project_id=self.tenant_baz['id']) self.assertEquals(len(roles_ref), 0) self.assertRaises(exception.NotFound, self.identity_api.delete_grant, user_id=self.user_foo['id'], project_id=self.tenant_baz['id'], role_id='member') def test_get_and_remove_role_grant_by_group_and_project(self): new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(new_domain['id'], new_domain) new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': 'secret', 'enabled': True, 'domain_id': new_domain['id']} self.identity_api.create_user(new_user['id'], new_user) self.identity_api.add_user_to_group(new_user['id'], new_group['id']) roles_ref = self.identity_api.list_grants( group_id=new_group['id'], project_id=self.tenant_bar['id']) self.assertEquals(len(roles_ref), 0) self.identity_api.create_grant(group_id=new_group['id'], project_id=self.tenant_bar['id'], role_id='member') roles_ref = self.identity_api.list_grants( group_id=new_group['id'], project_id=self.tenant_bar['id']) self.assertDictEqual(roles_ref[0], self.role_member) self.identity_api.delete_grant(group_id=new_group['id'], project_id=self.tenant_bar['id'], role_id='member') roles_ref = self.identity_api.list_grants( group_id=new_group['id'], project_id=self.tenant_bar['id']) self.assertEquals(len(roles_ref), 0) self.assertRaises(exception.NotFound, self.identity_api.delete_grant, group_id=new_group['id'], project_id=self.tenant_bar['id'], role_id='member') def test_get_and_remove_role_grant_by_group_and_domain(self): new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(new_domain['id'], new_domain) new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'], 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': new_domain['id']} self.identity_api.create_user(new_user['id'], new_user) self.identity_api.add_user_to_group(new_user['id'], new_group['id']) roles_ref = self.identity_api.list_grants( group_id=new_group['id'], domain_id=new_domain['id']) self.assertEquals(len(roles_ref), 0) self.identity_api.create_grant(group_id=new_group['id'], domain_id=new_domain['id'], role_id='member') roles_ref = self.identity_api.list_grants( group_id=new_group['id'], domain_id=new_domain['id']) self.assertDictEqual(roles_ref[0], self.role_member) self.identity_api.delete_grant(group_id=new_group['id'], domain_id=new_domain['id'], role_id='member') roles_ref = self.identity_api.list_grants( group_id=new_group['id'], domain_id=new_domain['id']) self.assertEquals(len(roles_ref), 0) self.assertRaises(exception.NotFound, self.identity_api.delete_grant, group_id=new_group['id'], domain_id=new_domain['id'], role_id='member') def test_get_and_remove_correct_role_grant_from_a_mix(self): new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(new_domain['id'], new_domain) new_project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': new_domain['id']} self.identity_api.create_project(new_project['id'], new_project) new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'], 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) new_group2 = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'], 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group2['id'], new_group2) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': new_domain['id']} self.identity_api.create_user(new_user['id'], new_user) new_user2 = {'id': uuid.uuid4().hex, 'name': 'new_user2', 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': new_domain['id']} self.identity_api.create_user(new_user2['id'], new_user2) self.identity_api.add_user_to_group(new_user['id'], new_group['id']) # First check we have no grants roles_ref = self.identity_api.list_grants( group_id=new_group['id'], domain_id=new_domain['id']) self.assertEquals(len(roles_ref), 0) # Now add the grant we are going to test for, and some others as # well just to make sure we get back the right one self.identity_api.create_grant(group_id=new_group['id'], domain_id=new_domain['id'], role_id='member') self.identity_api.create_grant(group_id=new_group2['id'], domain_id=new_domain['id'], role_id='keystone_admin') self.identity_api.create_grant(user_id=new_user2['id'], domain_id=new_domain['id'], role_id='keystone_admin') self.identity_api.create_grant(group_id=new_group['id'], project_id=new_project['id'], role_id='keystone_admin') roles_ref = self.identity_api.list_grants( group_id=new_group['id'], domain_id=new_domain['id']) self.assertDictEqual(roles_ref[0], self.role_member) self.identity_api.delete_grant(group_id=new_group['id'], domain_id=new_domain['id'], role_id='member') roles_ref = self.identity_api.list_grants( group_id=new_group['id'], domain_id=new_domain['id']) self.assertEquals(len(roles_ref), 0) self.assertRaises(exception.NotFound, self.identity_api.delete_grant, group_id=new_group['id'], domain_id=new_domain['id'], role_id='member') def test_get_and_remove_role_grant_by_user_and_domain(self): new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(new_domain['id'], new_domain) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': 'secret', 'enabled': True, 'domain_id': new_domain['id']} self.identity_api.create_user(new_user['id'], new_user) roles_ref = self.identity_api.list_grants( user_id=new_user['id'], domain_id=new_domain['id']) self.assertEquals(len(roles_ref), 0) self.identity_api.create_grant(user_id=new_user['id'], domain_id=new_domain['id'], role_id='member') roles_ref = self.identity_api.list_grants( user_id=new_user['id'], domain_id=new_domain['id']) self.assertDictEqual(roles_ref[0], self.role_member) self.identity_api.delete_grant(user_id=new_user['id'], domain_id=new_domain['id'], role_id='member') roles_ref = self.identity_api.list_grants( user_id=new_user['id'], domain_id=new_domain['id']) self.assertEquals(len(roles_ref), 0) self.assertRaises(exception.NotFound, self.identity_api.delete_grant, user_id=new_user['id'], domain_id=new_domain['id'], role_id='member') def test_get_and_remove_role_grant_by_group_and_cross_domain(self): group1_domain1_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(group1_domain1_role['id'], group1_domain1_role) group1_domain2_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(group1_domain2_role['id'], group1_domain2_role) domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) group1 = {'id': uuid.uuid4().hex, 'domain_id': domain1['id'], 'name': uuid.uuid4().hex} self.identity_api.create_group(group1['id'], group1) roles_ref = self.identity_api.list_grants( group_id=group1['id'], domain_id=domain1['id']) self.assertEquals(len(roles_ref), 0) roles_ref = self.identity_api.list_grants( group_id=group1['id'], domain_id=domain2['id']) self.assertEquals(len(roles_ref), 0) self.identity_api.create_grant(group_id=group1['id'], domain_id=domain1['id'], role_id=group1_domain1_role['id']) self.identity_api.create_grant(group_id=group1['id'], domain_id=domain2['id'], role_id=group1_domain2_role['id']) roles_ref = self.identity_api.list_grants( group_id=group1['id'], domain_id=domain1['id']) self.assertDictEqual(roles_ref[0], group1_domain1_role) roles_ref = self.identity_api.list_grants( group_id=group1['id'], domain_id=domain2['id']) self.assertDictEqual(roles_ref[0], group1_domain2_role) self.identity_api.delete_grant(group_id=group1['id'], domain_id=domain2['id'], role_id=group1_domain2_role['id']) roles_ref = self.identity_api.list_grants( group_id=group1['id'], domain_id=domain2['id']) self.assertEquals(len(roles_ref), 0) self.assertRaises(exception.NotFound, self.identity_api.delete_grant, group_id=group1['id'], domain_id=domain2['id'], role_id=group1_domain2_role['id']) def test_get_and_remove_role_grant_by_user_and_cross_domain(self): user1_domain1_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(user1_domain1_role['id'], user1_domain1_role) user1_domain2_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(user1_domain2_role['id'], user1_domain2_role) domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, 'enabled': True} self.identity_api.create_user(user1['id'], user1) roles_ref = self.identity_api.list_grants( user_id=user1['id'], domain_id=domain1['id']) self.assertEquals(len(roles_ref), 0) roles_ref = self.identity_api.list_grants( user_id=user1['id'], domain_id=domain2['id']) self.assertEquals(len(roles_ref), 0) self.identity_api.create_grant(user_id=user1['id'], domain_id=domain1['id'], role_id=user1_domain1_role['id']) self.identity_api.create_grant(user_id=user1['id'], domain_id=domain2['id'], role_id=user1_domain2_role['id']) roles_ref = self.identity_api.list_grants( user_id=user1['id'], domain_id=domain1['id']) self.assertDictEqual(roles_ref[0], user1_domain1_role) roles_ref = self.identity_api.list_grants( user_id=user1['id'], domain_id=domain2['id']) self.assertDictEqual(roles_ref[0], user1_domain2_role) self.identity_api.delete_grant(user_id=user1['id'], domain_id=domain2['id'], role_id=user1_domain2_role['id']) roles_ref = self.identity_api.list_grants( user_id=user1['id'], domain_id=domain2['id']) self.assertEquals(len(roles_ref), 0) self.assertRaises(exception.NotFound, self.identity_api.delete_grant, user_id=user1['id'], domain_id=domain2['id'], role_id=user1_domain2_role['id']) def test_role_grant_by_group_and_cross_domain_project(self): role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role1['id'], role1) role2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role2['id'], role2) domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'enabled': True} self.identity_api.create_group(group1['id'], group1) project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain2['id']} self.identity_api.create_project(project1['id'], project1) roles_ref = self.identity_api.list_grants( group_id=group1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 0) self.identity_api.create_grant(group_id=group1['id'], project_id=project1['id'], role_id=role1['id']) self.identity_api.create_grant(group_id=group1['id'], project_id=project1['id'], role_id=role2['id']) roles_ref = self.identity_api.list_grants( group_id=group1['id'], project_id=project1['id']) roles_ref_ids = [] for i, ref in enumerate(roles_ref): roles_ref_ids.append(ref['id']) self.assertIn(role1['id'], roles_ref_ids) self.assertIn(role2['id'], roles_ref_ids) self.identity_api.delete_grant(group_id=group1['id'], project_id=project1['id'], role_id=role1['id']) roles_ref = self.identity_api.list_grants( group_id=group1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 1) self.assertDictEqual(roles_ref[0], role2) def test_role_grant_by_user_and_cross_domain_project(self): role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role1['id'], role1) role2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role2['id'], role2) domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, 'enabled': True} self.identity_api.create_user(user1['id'], user1) project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain2['id']} self.identity_api.create_project(project1['id'], project1) roles_ref = self.identity_api.list_grants( user_id=user1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 0) self.identity_api.create_grant(user_id=user1['id'], project_id=project1['id'], role_id=role1['id']) self.identity_api.create_grant(user_id=user1['id'], project_id=project1['id'], role_id=role2['id']) roles_ref = self.identity_api.list_grants( user_id=user1['id'], project_id=project1['id']) roles_ref_ids = [] for i, ref in enumerate(roles_ref): roles_ref_ids.append(ref['id']) self.assertIn(role1['id'], roles_ref_ids) self.assertIn(role2['id'], roles_ref_ids) self.identity_api.delete_grant(user_id=user1['id'], project_id=project1['id'], role_id=role1['id']) roles_ref = self.identity_api.list_grants( user_id=user1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 1) self.assertDictEqual(roles_ref[0], role2) def test_multi_role_grant_by_user_group_on_project_domain(self): role_list = [] for _ in range(8): role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role['id'], role) role_list.append(role) domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, 'enabled': True} self.identity_api.create_user(user1['id'], user1) group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'enabled': True} self.identity_api.create_group(group1['id'], group1) project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id']} self.identity_api.create_project(project1['id'], project1) self.identity_api.add_user_to_group(user1['id'], group1['id']) roles_ref = self.identity_api.list_grants( user_id=user1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 0) self.identity_api.create_grant(user_id=user1['id'], domain_id=domain1['id'], role_id=role_list[0]['id']) self.identity_api.create_grant(user_id=user1['id'], domain_id=domain1['id'], role_id=role_list[1]['id']) self.identity_api.create_grant(group_id=group1['id'], domain_id=domain1['id'], role_id=role_list[2]['id']) self.identity_api.create_grant(group_id=group1['id'], domain_id=domain1['id'], role_id=role_list[3]['id']) self.identity_api.create_grant(user_id=user1['id'], project_id=project1['id'], role_id=role_list[4]['id']) self.identity_api.create_grant(user_id=user1['id'], project_id=project1['id'], role_id=role_list[5]['id']) self.identity_api.create_grant(group_id=group1['id'], project_id=project1['id'], role_id=role_list[6]['id']) self.identity_api.create_grant(group_id=group1['id'], project_id=project1['id'], role_id=role_list[7]['id']) roles_ref = self.identity_api.list_grants(user_id=user1['id'], domain_id=domain1['id']) self.assertEquals(len(roles_ref), 2) self.assertIn(role_list[0], roles_ref) self.assertIn(role_list[1], roles_ref) roles_ref = self.identity_api.list_grants(group_id=group1['id'], domain_id=domain1['id']) self.assertEquals(len(roles_ref), 2) self.assertIn(role_list[2], roles_ref) self.assertIn(role_list[3], roles_ref) roles_ref = self.identity_api.list_grants(user_id=user1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 2) self.assertIn(role_list[4], roles_ref) self.assertIn(role_list[5], roles_ref) roles_ref = self.identity_api.list_grants(group_id=group1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 2) self.assertIn(role_list[6], roles_ref) self.assertIn(role_list[7], roles_ref) def test_delete_role_with_user_and_group_grants(self): raise nose.exc.SkipTest('Blocked by bug 1097472') role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role1['id'], role1) domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id']} self.identity_api.create_project(project1['id'], project1) user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, 'enabled': True} self.identity_api.create_user(user1['id'], user1) group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'enabled': True} self.identity_api.create_group(group1['id'], group1) self.identity_api.create_grant(user_id=user1['id'], project_id=project1['id'], role_id=role1['id']) self.identity_api.create_grant(user_id=user1['id'], domain_id=domain1['id'], role_id=role1['id']) self.identity_api.create_grant(group_id=group1['id'], project_id=project1['id'], role_id=role1['id']) self.identity_api.create_grant(group_id=group1['id'], domain_id=domain1['id'], role_id=role1['id']) roles_ref = self.identity_api.list_grants( user_id=user1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 1) roles_ref = self.identity_api.list_grants( group_id=group1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 1) roles_ref = self.identity_api.list_grants( user_id=user1['id'], domain_id=domain1['id']) self.assertEquals(len(roles_ref), 1) roles_ref = self.identity_api.list_grants( group_id=group1['id'], domain_id=domain1['id']) self.assertEquals(len(roles_ref), 1) self.identity_api.delete_role(role1['id']) self.assertRaises(exception.RoleNotFound, self.identity_api.list_grants, user_id=user1['id'], project_id=project1['id']) self.assertRaises(exception.RoleNotFound, self.identity_api.list_grants, group_id=group1['id'], project_id=project1['id']) self.assertRaises(exception.RoleNotFound, self.identity_api.list_grants, user_id=user1['id'], domain_id=domain1['id']) self.assertRaises(exception.RoleNotFound, self.identity_api.list_grants, group_id=group1['id'], domain_id=domain1['id']) def test_delete_user_with_group_project_domain_links(self): role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role1['id'], role1) domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id']} self.identity_api.create_project(project1['id'], project1) user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, 'enabled': True} self.identity_api.create_user(user1['id'], user1) group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'enabled': True} self.identity_api.create_group(group1['id'], group1) self.identity_api.create_grant(user_id=user1['id'], project_id=project1['id'], role_id=role1['id']) self.identity_api.create_grant(user_id=user1['id'], domain_id=domain1['id'], role_id=role1['id']) self.identity_api.add_user_to_group(user_id=user1['id'], group_id=group1['id']) roles_ref = self.identity_api.list_grants( user_id=user1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 1) roles_ref = self.identity_api.list_grants( user_id=user1['id'], domain_id=domain1['id']) self.assertEquals(len(roles_ref), 1) self.identity_api.check_user_in_group( user_id=user1['id'], group_id=group1['id']) self.identity_api.delete_user(user1['id']) self.assertRaises(exception.UserNotFound, self.identity_api.list_grants, user_id=user1['id'], project_id=project1['id']) self.assertRaises(exception.UserNotFound, self.identity_api.list_grants, user_id=user1['id'], domain_id=domain1['id']) self.assertRaises(exception.NotFound, self.identity_api.check_user_in_group, user1['id'], group1['id']) def test_delete_group_with_user_project_domain_links(self): role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role1['id'], role1) domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id']} self.identity_api.create_project(project1['id'], project1) user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'password': uuid.uuid4().hex, 'enabled': True} self.identity_api.create_user(user1['id'], user1) group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id'], 'enabled': True} self.identity_api.create_group(group1['id'], group1) self.identity_api.create_grant(group_id=group1['id'], project_id=project1['id'], role_id=role1['id']) self.identity_api.create_grant(group_id=group1['id'], domain_id=domain1['id'], role_id=role1['id']) self.identity_api.add_user_to_group(user_id=user1['id'], group_id=group1['id']) roles_ref = self.identity_api.list_grants( group_id=group1['id'], project_id=project1['id']) self.assertEquals(len(roles_ref), 1) roles_ref = self.identity_api.list_grants( group_id=group1['id'], domain_id=domain1['id']) self.assertEquals(len(roles_ref), 1) self.identity_api.check_user_in_group( user_id=user1['id'], group_id=group1['id']) self.identity_api.delete_group(group1['id']) self.assertRaises(exception.GroupNotFound, self.identity_api.list_grants, group_id=group1['id'], project_id=project1['id']) self.assertRaises(exception.GroupNotFound, self.identity_api.list_grants, group_id=group1['id'], domain_id=domain1['id']) self.identity_api.get_user(user1['id']) def test_delete_domain_with_user_group_project_links(self): #TODO(chungg):add test case once expected behaviour defined pass def test_role_crud(self): role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role['id'], role) role_ref = self.identity_api.get_role(role['id']) role_ref_dict = dict((x, role_ref[x]) for x in role_ref) self.assertDictEqual(role_ref_dict, role) role['name'] = uuid.uuid4().hex self.identity_api.update_role(role['id'], role) role_ref = self.identity_api.get_role(role['id']) role_ref_dict = dict((x, role_ref[x]) for x in role_ref) self.assertDictEqual(role_ref_dict, role) self.identity_api.delete_role(role['id']) self.assertRaises(exception.RoleNotFound, self.identity_api.get_role, role['id']) def test_update_role_404(self): role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.assertRaises(exception.RoleNotFound, self.identity_api.update_role, role['id'], role) def test_add_user_to_project(self): self.identity_api.add_user_to_project(self.tenant_baz['id'], self.user_foo['id']) tenants = self.identity_api.get_projects_for_user(self.user_foo['id']) self.assertIn(self.tenant_baz['id'], tenants) def test_add_user_to_project_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.add_user_to_project, uuid.uuid4().hex, self.user_foo['id']) self.assertRaises(exception.UserNotFound, self.identity_api.add_user_to_project, self.tenant_bar['id'], uuid.uuid4().hex) def test_remove_user_from_project(self): self.identity_api.add_user_to_project(self.tenant_baz['id'], self.user_foo['id']) self.identity_api.remove_user_from_project(self.tenant_baz['id'], self.user_foo['id']) tenants = self.identity_api.get_projects_for_user(self.user_foo['id']) self.assertNotIn(self.tenant_baz['id'], tenants) def test_remove_user_from_project_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.remove_user_from_project, uuid.uuid4().hex, self.user_foo['id']) self.assertRaises(exception.UserNotFound, self.identity_api.remove_user_from_project, self.tenant_bar['id'], uuid.uuid4().hex) self.assertRaises(exception.NotFound, self.identity_api.remove_user_from_project, self.tenant_baz['id'], self.user_foo['id']) def test_get_projects_for_user_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.get_projects_for_user, uuid.uuid4().hex) def test_update_project_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.update_project, uuid.uuid4().hex, dict()) def test_delete_project_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.delete_project, uuid.uuid4().hex) def test_update_user_404(self): user_id = uuid.uuid4().hex self.assertRaises(exception.UserNotFound, self.identity_api.update_user, user_id, {'id': user_id}) def test_delete_user_with_project_association(self): user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, 'password': uuid.uuid4().hex} self.identity_api.create_user(user['id'], user) self.identity_api.add_user_to_project(self.tenant_bar['id'], user['id']) self.identity_api.delete_user(user['id']) self.assertRaises(exception.UserNotFound, self.identity_api.get_projects_for_user, user['id']) def test_delete_user_with_project_roles(self): user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, 'password': uuid.uuid4().hex} self.identity_api.create_user(user['id'], user) self.identity_api.add_role_to_user_and_project( user['id'], self.tenant_bar['id'], self.role_member['id']) self.identity_api.delete_user(user['id']) self.assertRaises(exception.UserNotFound, self.identity_api.get_projects_for_user, user['id']) def test_delete_user_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.delete_user, uuid.uuid4().hex) def test_delete_role_404(self): self.assertRaises(exception.RoleNotFound, self.identity_api.delete_role, uuid.uuid4().hex) def test_create_project_long_name_fails(self): tenant = {'id': 'fake1', 'name': 'a' * 65, 'domain_id': DEFAULT_DOMAIN_ID} self.assertRaises(exception.ValidationError, self.identity_api.create_project, tenant['id'], tenant) def test_create_project_blank_name_fails(self): tenant = {'id': 'fake1', 'name': '', 'domain_id': DEFAULT_DOMAIN_ID} self.assertRaises(exception.ValidationError, self.identity_api.create_project, tenant['id'], tenant) def test_create_project_invalid_name_fails(self): tenant = {'id': 'fake1', 'name': None, 'domain_id': DEFAULT_DOMAIN_ID} self.assertRaises(exception.ValidationError, self.identity_api.create_project, tenant['id'], tenant) tenant = {'id': 'fake1', 'name': 123, 'domain_id': DEFAULT_DOMAIN_ID} self.assertRaises(exception.ValidationError, self.identity_api.create_project, tenant['id'], tenant) def test_update_project_blank_name_fails(self): tenant = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_project('fake1', tenant) tenant['name'] = '' self.assertRaises(exception.ValidationError, self.identity_api.update_project, tenant['id'], tenant) def test_update_project_long_name_fails(self): tenant = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_project('fake1', tenant) tenant['name'] = 'a' * 65 self.assertRaises(exception.ValidationError, self.identity_api.update_project, tenant['id'], tenant) def test_update_project_invalid_name_fails(self): tenant = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_project('fake1', tenant) tenant['name'] = None self.assertRaises(exception.ValidationError, self.identity_api.update_project, tenant['id'], tenant) tenant['name'] = 123 self.assertRaises(exception.ValidationError, self.identity_api.update_project, tenant['id'], tenant) def test_create_user_long_name_fails(self): user = {'id': 'fake1', 'name': 'a' * 65, 'domain_id': DEFAULT_DOMAIN_ID} self.assertRaises(exception.ValidationError, self.identity_api.create_user, 'fake1', user) def test_create_user_blank_name_fails(self): user = {'id': 'fake1', 'name': '', 'domain_id': DEFAULT_DOMAIN_ID} self.assertRaises(exception.ValidationError, self.identity_api.create_user, 'fake1', user) def test_create_user_invalid_name_fails(self): user = {'id': 'fake1', 'name': None, 'domain_id': DEFAULT_DOMAIN_ID} self.assertRaises(exception.ValidationError, self.identity_api.create_user, 'fake1', user) user = {'id': 'fake1', 'name': 123, 'domain_id': DEFAULT_DOMAIN_ID} self.assertRaises(exception.ValidationError, self.identity_api.create_user, 'fake1', user) def test_update_user_long_name_fails(self): user = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_user('fake1', user) user['name'] = 'a' * 65 self.assertRaises(exception.ValidationError, self.identity_api.update_user, 'fake1', user) def test_update_user_blank_name_fails(self): user = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_user('fake1', user) user['name'] = '' self.assertRaises(exception.ValidationError, self.identity_api.update_user, 'fake1', user) def test_update_user_invalid_name_fails(self): user = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_user('fake1', user) user['name'] = None self.assertRaises(exception.ValidationError, self.identity_api.update_user, 'fake1', user) user['name'] = 123 self.assertRaises(exception.ValidationError, self.identity_api.update_user, 'fake1', user) def test_list_users(self): users = self.identity_api.list_users() for test_user in default_fixtures.USERS: self.assertTrue(x for x in users if x['id'] == test_user['id']) def test_list_groups(self): group1 = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} group2 = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(group1['id'], group1) self.identity_api.create_group(group2['id'], group2) groups = self.identity_api.list_groups() self.assertEquals(len(groups), 2) group_ids = [] for group in groups: group_ids.append(group.get('id')) self.assertIn(group1['id'], group_ids) self.assertIn(group2['id'], group_ids) def test_list_domains(self): domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) self.identity_api.create_domain(domain2['id'], domain2) domains = self.identity_api.list_domains() self.assertEquals(len(domains), 2) domain_ids = [] for domain in domains: domain_ids.append(domain.get('id')) self.assertIn(domain1['id'], domain_ids) self.assertIn(domain2['id'], domain_ids) def test_list_projects(self): projects = self.identity_api.list_projects() self.assertEquals(len(projects), 3) project_ids = [] for project in projects: project_ids.append(project.get('id')) self.assertIn(self.tenant_bar['id'], project_ids) self.assertIn(self.tenant_baz['id'], project_ids) def test_list_roles(self): roles = self.identity_api.list_roles() for test_role in default_fixtures.ROLES: self.assertTrue(x for x in roles if x['id'] == test_role['id']) def test_delete_project_with_role_assignments(self): tenant = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_project(tenant['id'], tenant) self.identity_api.add_role_to_user_and_project( self.user_foo['id'], tenant['id'], 'member') self.identity_api.delete_project(tenant['id']) self.assertRaises(exception.NotFound, self.identity_api.get_project, tenant['id']) def test_delete_role_check_role_grant(self): role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} alt_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_role(role['id'], role) self.identity_api.create_role(alt_role['id'], alt_role) self.identity_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_bar['id'], role['id']) self.identity_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_bar['id'], alt_role['id']) self.identity_api.delete_role(role['id']) roles_ref = self.identity_api.get_roles_for_user_and_project( self.user_foo['id'], self.tenant_bar['id']) self.assertNotIn(role['id'], roles_ref) self.assertIn(alt_role['id'], roles_ref) def test_create_project_doesnt_modify_passed_in_dict(self): new_project = {'id': 'tenant_id', 'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID} original_project = new_project.copy() self.identity_api.create_project('tenant_id', new_project) self.assertDictEqual(original_project, new_project) def test_create_user_doesnt_modify_passed_in_dict(self): new_user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': DEFAULT_DOMAIN_ID} original_user = new_user.copy() self.identity_api.create_user('user_id', new_user) self.assertDictEqual(original_user, new_user) def test_update_user_enable(self): user = {'id': 'fake1', 'name': 'fake1', 'enabled': True, 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_user('fake1', user) user_ref = self.identity_api.get_user('fake1') self.assertEqual(user_ref['enabled'], True) user['enabled'] = False self.identity_api.update_user('fake1', user) user_ref = self.identity_api.get_user('fake1') self.assertEqual(user_ref['enabled'], user['enabled']) user['enabled'] = True self.identity_api.update_user('fake1', user) user_ref = self.identity_api.get_user('fake1') self.assertEqual(user_ref['enabled'], user['enabled']) def test_update_project_enable(self): tenant = {'id': 'fake1', 'name': 'fake1', 'enabled': True, 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_project('fake1', tenant) tenant_ref = self.identity_api.get_project('fake1') self.assertEqual(tenant_ref['enabled'], True) tenant['enabled'] = False self.identity_api.update_project('fake1', tenant) tenant_ref = self.identity_api.get_project('fake1') self.assertEqual(tenant_ref['enabled'], tenant['enabled']) tenant['enabled'] = True self.identity_api.update_project('fake1', tenant) tenant_ref = self.identity_api.get_project('fake1') self.assertEqual(tenant_ref['enabled'], tenant['enabled']) def test_add_user_to_group(self): domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain['id'], domain) new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': domain['id']} self.identity_api.create_user(new_user['id'], new_user) self.identity_api.add_user_to_group(new_user['id'], new_group['id']) groups = self.identity_api.list_groups_for_user(new_user['id']) found = False for x in groups: if (x['id'] == new_group['id']): found = True self.assertTrue(found) def test_add_user_to_group_404(self): domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain['id'], domain) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': domain['id']} self.identity_api.create_user(new_user['id'], new_user) self.assertRaises(exception.GroupNotFound, self.identity_api.add_user_to_group, new_user['id'], uuid.uuid4().hex) new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) self.assertRaises(exception.UserNotFound, self.identity_api.add_user_to_group, uuid.uuid4().hex, new_group['id']) def test_check_user_in_group(self): domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain['id'], domain) new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': domain['id']} self.identity_api.create_user(new_user['id'], new_user) self.identity_api.add_user_to_group(new_user['id'], new_group['id']) self.identity_api.check_user_in_group(new_user['id'], new_group['id']) def test_check_user_not_in_group(self): new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) self.assertRaises(exception.UserNotFound, self.identity_api.check_user_in_group, uuid.uuid4().hex, new_group['id']) def test_list_users_in_group(self): domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain['id'], domain) new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': domain['id']} self.identity_api.create_user(new_user['id'], new_user) self.identity_api.add_user_to_group(new_user['id'], new_group['id']) user_refs = self.identity_api.list_users_in_group(new_group['id']) found = False for x in user_refs: if (x['id'] == new_user['id']): found = True self.assertTrue(found) def test_remove_user_from_group(self): domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain['id'], domain) new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': domain['id']} self.identity_api.create_user(new_user['id'], new_user) self.identity_api.add_user_to_group(new_user['id'], new_group['id']) agroups = self.identity_api.list_groups_for_user(new_user['id']) self.identity_api.remove_user_from_group(new_user['id'], new_group['id']) groups = self.identity_api.list_groups_for_user(new_user['id']) for x in groups: self.assertFalse(x['id'] == new_group['id']) def test_remove_user_from_group_404(self): domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain['id'], domain) new_user = {'id': uuid.uuid4().hex, 'name': 'new_user', 'password': uuid.uuid4().hex, 'enabled': True, 'domain_id': domain['id']} self.identity_api.create_user(new_user['id'], new_user) new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(new_group['id'], new_group) self.assertRaises(exception.NotFound, self.identity_api.remove_user_from_group, new_user['id'], uuid.uuid4().hex) self.assertRaises(exception.NotFound, self.identity_api.remove_user_from_group, uuid.uuid4().hex, new_group['id']) self.assertRaises(exception.NotFound, self.identity_api.remove_user_from_group, uuid.uuid4().hex, uuid.uuid4().hex) def test_group_crud(self): group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_group(group['id'], group) group_ref = self.identity_api.get_group(group['id']) self.assertDictEqual(group_ref, group) group['name'] = uuid.uuid4().hex self.identity_api.update_group(group['id'], group) group_ref = self.identity_api.get_group(group['id']) self.assertDictEqual(group_ref, group) self.identity_api.delete_group(group['id']) self.assertRaises(exception.GroupNotFound, self.identity_api.get_group, group['id']) def test_create_duplicate_group_name_fails(self): group1 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, 'name': uuid.uuid4().hex} group2 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, 'name': group1['name']} self.identity_api.create_group(group1['id'], group1) self.assertRaises(exception.Conflict, self.identity_api.create_group, group2['id'], group2) def test_create_duplicate_group_name_in_different_domains(self): new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(new_domain['id'], new_domain) group1 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, 'name': uuid.uuid4().hex} group2 = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'], 'name': group1['name']} self.identity_api.create_group(group1['id'], group1) self.identity_api.create_group(group2['id'], group2) def test_move_group_between_domains(self): domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) group = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id']} self.identity_api.create_group(group['id'], group) group['domain_id'] = domain2['id'] self.identity_api.update_group(group['id'], group) def test_move_group_between_domains_with_clashing_names_fails(self): domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain1['id'], domain1) domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.identity_api.create_domain(domain2['id'], domain2) # First, create a group in domain1 group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': domain1['id']} self.identity_api.create_group(group1['id'], group1) # Now create a group in domain2 with a potentially clashing # name - which should work since we have domain separation group2 = {'id': uuid.uuid4().hex, 'name': group1['name'], 'domain_id': domain2['id']} self.identity_api.create_group(group2['id'], group2) # Now try and move group1 into the 2nd domain - which should # fail since the names clash group1['domain_id'] = domain2['id'] self.assertRaises(exception.Conflict, self.identity_api.update_group, group1['id'], group1) def test_project_crud(self): project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex} self.identity_api.create_project(project['id'], project) project_ref = self.identity_api.get_project(project['id']) self.assertDictEqual(project_ref, project) project['name'] = uuid.uuid4().hex self.identity_api.update_project(project['id'], project) project_ref = self.identity_api.get_project(project['id']) self.assertDictEqual(project_ref, project) self.identity_api.delete_project(project['id']) self.assertRaises(exception.ProjectNotFound, self.identity_api.get_project, project['id']) def test_domain_crud(self): domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True} self.identity_api.create_domain(domain['id'], domain) domain_ref = self.identity_api.get_domain(domain['id']) self.assertDictEqual(domain_ref, domain) domain['name'] = uuid.uuid4().hex self.identity_api.update_domain(domain['id'], domain) domain_ref = self.identity_api.get_domain(domain['id']) self.assertDictEqual(domain_ref, domain) self.identity_api.delete_domain(domain['id']) self.assertRaises(exception.DomainNotFound, self.identity_api.get_domain, domain['id']) def test_user_crud(self): user = {'domain_id': uuid.uuid4().hex, 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': 'passw0rd'} self.identity_api.create_user(user['id'], user) user_ref = self.identity_api.get_user(user['id']) del user['password'] user_ref_dict = dict((x, user_ref[x]) for x in user_ref) self.assertDictEqual(user_ref_dict, user) user['password'] = uuid.uuid4().hex self.identity_api.update_user(user['id'], user) user_ref = self.identity_api.get_user(user['id']) del user['password'] user_ref_dict = dict((x, user_ref[x]) for x in user_ref) self.assertDictEqual(user_ref_dict, user) self.identity_api.delete_user(user['id']) self.assertRaises(exception.UserNotFound, self.identity_api.get_user, user['id']) def test_list_user_projects(self): user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex, 'enabled': True} self.identity_api.create_user(user1['id'], user1) user_projects = self.identity_api.list_user_projects(user1['id']) self.assertEquals(len(user_projects), 0) self.identity_api.create_grant(user_id=user1['id'], project_id=self.tenant_bar['id'], role_id=self.role_member['id']) self.identity_api.create_grant(user_id=user1['id'], project_id=self.tenant_baz['id'], role_id=self.role_member['id']) user_projects = self.identity_api.list_user_projects(user1['id']) self.assertEquals(len(user_projects), 2) class TokenTests(object): def test_token_crud(self): token_id = uuid.uuid4().hex data = {'id': token_id, 'a': 'b', 'user': {'id': 'testuserid'}} data_ref = self.token_api.create_token(token_id, data) expires = data_ref.pop('expires') self.assertTrue(isinstance(expires, datetime.datetime)) self.assertDictEqual(data_ref, data) new_data_ref = self.token_api.get_token(token_id) expires = new_data_ref.pop('expires') self.assertTrue(isinstance(expires, datetime.datetime)) self.assertEquals(new_data_ref, data) self.token_api.delete_token(token_id) self.assertRaises(exception.TokenNotFound, self.token_api.get_token, token_id) self.assertRaises(exception.TokenNotFound, self.token_api.delete_token, token_id) def create_token_sample_data(self, tenant_id=None): token_id = uuid.uuid4().hex data = {'id': token_id, 'a': 'b', 'user': {'id': 'testuserid'}} if tenant_id is not None: data['tenant'] = {'id': tenant_id, 'name': tenant_id} self.token_api.create_token(token_id, data) return token_id def test_token_list(self): tokens = self.token_api.list_tokens('testuserid') self.assertEquals(len(tokens), 0) token_id1 = self.create_token_sample_data() tokens = self.token_api.list_tokens('testuserid') self.assertEquals(len(tokens), 1) self.assertIn(token_id1, tokens) token_id2 = self.create_token_sample_data() tokens = self.token_api.list_tokens('testuserid') self.assertEquals(len(tokens), 2) self.assertIn(token_id2, tokens) self.assertIn(token_id1, tokens) self.token_api.delete_token(token_id1) tokens = self.token_api.list_tokens('testuserid') self.assertIn(token_id2, tokens) self.assertNotIn(token_id1, tokens) self.token_api.delete_token(token_id2) tokens = self.token_api.list_tokens('testuserid') self.assertNotIn(token_id2, tokens) self.assertNotIn(token_id1, tokens) # tenant-specific tokens tenant1 = uuid.uuid4().hex tenant2 = uuid.uuid4().hex token_id3 = self.create_token_sample_data(tenant_id=tenant1) token_id4 = self.create_token_sample_data(tenant_id=tenant2) tokens = self.token_api.list_tokens('testuserid') self.assertEquals(len(tokens), 2) self.assertNotIn(token_id1, tokens) self.assertNotIn(token_id2, tokens) self.assertIn(token_id3, tokens) self.assertIn(token_id4, tokens) tokens = self.token_api.list_tokens('testuserid', tenant2) self.assertEquals(len(tokens), 1) self.assertNotIn(token_id1, tokens) self.assertNotIn(token_id2, tokens) self.assertNotIn(token_id3, tokens) self.assertIn(token_id4, tokens) def test_get_token_404(self): self.assertRaises(exception.TokenNotFound, self.token_api.get_token, uuid.uuid4().hex) self.assertRaises(exception.TokenNotFound, self.token_api.get_token, None) def test_delete_token_404(self): self.assertRaises(exception.TokenNotFound, self.token_api.delete_token, uuid.uuid4().hex) def test_expired_token(self): token_id = uuid.uuid4().hex expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1) data = {'id_hash': token_id, 'id': token_id, 'a': 'b', 'expires': expire_time, 'user': {'id': 'testuserid'}} data_ref = self.token_api.create_token(token_id, data) self.assertDictEqual(data_ref, data) self.assertRaises(exception.TokenNotFound, self.token_api.get_token, token_id) def test_null_expires_token(self): token_id = uuid.uuid4().hex data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None, 'user': {'id': 'testuserid'}} data_ref = self.token_api.create_token(token_id, data) self.assertDictEqual(data_ref, data) new_data_ref = self.token_api.get_token(token_id) self.assertEqual(data_ref, new_data_ref) def check_list_revoked_tokens(self, token_ids): revoked_ids = [x['id'] for x in self.token_api.list_revoked_tokens()] for token_id in token_ids: self.assertIn(token_id, revoked_ids) def delete_token(self): token_id = uuid.uuid4().hex data = {'id_hash': token_id, 'id': token_id, 'a': 'b', 'user': {'id': 'testuserid'}} data_ref = self.token_api.create_token(token_id, data) self.token_api.delete_token(token_id) self.assertRaises( exception.TokenNotFound, self.token_api.get_token, data_ref['id']) self.assertRaises( exception.TokenNotFound, self.token_api.delete_token, data_ref['id']) return token_id def test_list_revoked_tokens_returns_empty_list(self): revoked_ids = [x['id'] for x in self.token_api.list_revoked_tokens()] self.assertEqual(revoked_ids, []) def test_list_revoked_tokens_for_single_token(self): self.check_list_revoked_tokens([self.delete_token()]) def test_list_revoked_tokens_for_multiple_tokens(self): self.check_list_revoked_tokens([self.delete_token() for x in xrange(2)]) class CommonHelperTests(test.TestCase): def test_format_helper_raises_malformed_on_missing_key(self): with self.assertRaises(exception.MalformedEndpoint): core.format_url("http://%(foo)s/%(bar)s", {"foo": "1"}) def test_format_helper_raises_malformed_on_wrong_type(self): with self.assertRaises(exception.MalformedEndpoint): core.format_url("http://%foo%s", {"foo": "1"}) def test_format_helper_raises_malformed_on_incomplete_format(self): with self.assertRaises(exception.MalformedEndpoint): core.format_url("http://%(foo)", {"foo": "1"}) class CatalogTests(object): def test_service_crud(self): # create service_id = uuid.uuid4().hex new_service = { 'id': service_id, 'type': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'description': uuid.uuid4().hex, } res = self.catalog_api.create_service( service_id, new_service.copy()) self.assertDictEqual(res, new_service) # list services = self.catalog_api.list_services() self.assertIn(service_id, [x['id'] for x in services]) # delete self.catalog_api.delete_service(service_id) self.assertRaises(exception.ServiceNotFound, self.catalog_man.delete_service, {}, service_id) self.assertRaises(exception.ServiceNotFound, self.catalog_man.get_service, {}, service_id) def test_delete_service_with_endpoint(self): # create a service service = { 'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'description': uuid.uuid4().hex, } self.catalog_api.create_service(service['id'], service) # create an endpoint attached to the service endpoint = { 'id': uuid.uuid4().hex, 'region': uuid.uuid4().hex, 'interface': uuid.uuid4().hex[:8], 'url': uuid.uuid4().hex, 'service_id': service['id'], } self.catalog_api.create_endpoint(endpoint['id'], endpoint) # deleting the service should also delete the endpoint self.catalog_api.delete_service(service['id']) self.assertRaises(exception.EndpointNotFound, self.catalog_man.get_endpoint, {}, endpoint['id']) self.assertRaises(exception.EndpointNotFound, self.catalog_man.delete_endpoint, {}, endpoint['id']) def test_get_service_404(self): self.assertRaises(exception.ServiceNotFound, self.catalog_man.get_service, {}, uuid.uuid4().hex) def test_delete_service_404(self): self.assertRaises(exception.ServiceNotFound, self.catalog_man.delete_service, {}, uuid.uuid4().hex) def test_create_endpoint_404(self): endpoint = { 'id': uuid.uuid4().hex, 'service_id': uuid.uuid4().hex, } self.assertRaises(exception.ServiceNotFound, self.catalog_man.create_endpoint, {}, endpoint['id'], endpoint) def test_get_endpoint_404(self): self.assertRaises(exception.EndpointNotFound, self.catalog_man.get_endpoint, {}, uuid.uuid4().hex) def test_delete_endpoint_404(self): self.assertRaises(exception.EndpointNotFound, self.catalog_man.delete_endpoint, {}, uuid.uuid4().hex) def test_create_endpoint(self): service = { 'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'description': uuid.uuid4().hex, } self.catalog_api.create_service(service['id'], service.copy()) endpoint = { 'id': uuid.uuid4().hex, 'region': "0" * 255, 'service_id': service['id'], 'interface': 'public', 'url': uuid.uuid4().hex, } self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy()) class PolicyTests(object): def _new_policy_ref(self): return { 'id': uuid.uuid4().hex, 'policy': uuid.uuid4().hex, 'type': uuid.uuid4().hex, 'endpoint_id': uuid.uuid4().hex, } def assertEqualPolicies(self, a, b): self.assertEqual(a['id'], b['id']) self.assertEqual(a['endpoint_id'], b['endpoint_id']) self.assertEqual(a['policy'], b['policy']) self.assertEqual(a['type'], b['type']) def test_create(self): ref = self._new_policy_ref() res = self.policy_api.create_policy(ref['id'], ref) self.assertEqualPolicies(ref, res) def test_get(self): ref = self._new_policy_ref() res = self.policy_api.create_policy(ref['id'], ref) res = self.policy_api.get_policy(ref['id']) self.assertEqualPolicies(ref, res) def test_list(self): ref = self._new_policy_ref() self.policy_api.create_policy(ref['id'], ref) res = self.policy_api.list_policies() res = [x for x in res if x['id'] == ref['id']][0] self.assertEqualPolicies(ref, res) def test_update(self): ref = self._new_policy_ref() self.policy_api.create_policy(ref['id'], ref) orig = ref ref = self._new_policy_ref() # (cannot change policy ID) self.assertRaises(exception.ValidationError, self.policy_man.update_policy, {}, orig['id'], ref) ref['id'] = orig['id'] res = self.policy_api.update_policy(orig['id'], ref) self.assertEqualPolicies(ref, res) def test_delete(self): ref = self._new_policy_ref() self.policy_api.create_policy(ref['id'], ref) self.policy_api.delete_policy(ref['id']) self.assertRaises(exception.PolicyNotFound, self.policy_man.delete_policy, {}, ref['id']) self.assertRaises(exception.PolicyNotFound, self.policy_man.get_policy, {}, ref['id']) res = self.policy_api.list_policies() self.assertFalse(len([x for x in res if x['id'] == ref['id']])) def test_get_policy_404(self): self.assertRaises(exception.PolicyNotFound, self.policy_man.get_policy, {}, uuid.uuid4().hex) def test_update_policy_404(self): self.assertRaises(exception.PolicyNotFound, self.policy_man.update_policy, {}, uuid.uuid4().hex, {}) def test_delete_policy_404(self): self.assertRaises(exception.PolicyNotFound, self.policy_man.delete_policy, {}, uuid.uuid4().hex)