diff --git a/keystone/identity/backends/ldap/core.py b/keystone/identity/backends/ldap/core.py index cd84a146e3..cc60d0b95b 100644 --- a/keystone/identity/backends/ldap/core.py +++ b/keystone/identity/backends/ldap/core.py @@ -122,7 +122,11 @@ class Identity(identity.Driver): # in LDAP backend return identity.filter_user(self.user.get_by_name(user_name)) - def get_metadata(self, user_id, tenant_id): + def get_metadata(self, user_id=None, tenant_id=None, + domain_id=None, group_id=None): + # FIXME(henry-nash): Use domain_id and group_id once domains + # and groups are implemented in LDAP backend + if not self.get_project(tenant_id) or not self.get_user(user_id): return {} diff --git a/keystone/identity/backends/sql.py b/keystone/identity/backends/sql.py index b06e6f7983..b61a1322d1 100644 --- a/keystone/identity/backends/sql.py +++ b/keystone/identity/backends/sql.py @@ -389,24 +389,6 @@ class Identity(sql.Base, identity.Driver): except exception.MetadataNotFound: pass - def _get_user_group_domain_roles(self, metadata_ref, user_id, domain_id): - group_refs = self.list_groups_for_user(user_id=user_id) - for x in group_refs: - try: - metadata_ref.update( - self.get_metadata(group_id=x['id'], - domain_id=domain_id)) - except exception.MetadataNotFound: - # no group grant, skip - pass - - def _get_user_domain_roles(self, metadata_ref, user_id, domain_id): - try: - metadata_ref.update(self.get_metadata(user_id, - domain_id=domain_id)) - except exception.MetadataNotFound: - pass - def get_roles_for_user_and_project(self, user_id, tenant_id): self.get_user(user_id) self.get_project(tenant_id) @@ -415,14 +397,6 @@ class Identity(sql.Base, identity.Driver): self._get_user_group_project_roles(metadata_ref, user_id, tenant_id) return list(set(metadata_ref.get('roles', []))) - def get_roles_for_user_and_domain(self, user_id, domain_id): - self.get_user(user_id) - self.get_domain(domain_id) - metadata_ref = {} - self._get_user_domain_roles(metadata_ref, user_id, domain_id) - self._get_user_group_domain_roles(metadata_ref, user_id, domain_id) - return list(set(metadata_ref.get('roles', []))) - def add_role_to_user_and_project(self, user_id, tenant_id, role_id): self.get_user(user_id) self.get_project(tenant_id) diff --git a/keystone/identity/core.py b/keystone/identity/core.py index 22de41e74c..30516e3f45 100644 --- a/keystone/identity/core.py +++ b/keystone/identity/core.py @@ -122,15 +122,15 @@ class Driver(object): raise exception.NotImplemented() def add_user_to_project(self, tenant_id, user_id): - """Add user to a tenant by creating a default role relationship. + """Add user to a tenant by creating a default role relationship. - :raises: keystone.exception.ProjectNotFound, - keystone.exception.UserNotFound + :raises: keystone.exception.ProjectNotFound, + keystone.exception.UserNotFound - """ - self.add_role_to_user_and_project(user_id, - tenant_id, - config.CONF.member_role_id) + """ + self.add_role_to_user_and_project(user_id, + tenant_id, + config.CONF.member_role_id) def remove_user_from_project(self, tenant_id, user_id): """Remove user from a tenant @@ -181,7 +181,35 @@ class Driver(object): keystone.exception.ProjectNotFound """ - raise exception.NotImplemented() + + def update_metadata_for_group_domain_roles(self, metadata_ref, + user_id, domain_id): + group_refs = self.list_groups_for_user(user_id=user_id) + for x in group_refs: + try: + metadata_ref.update( + self.get_metadata(group_id=x['id'], + domain_id=domain_id)) + except exception.MetadataNotFound: + # no group grant, skip + pass + + def update_metadata_for_user_domain_roles(self, metadata_ref, + user_id, domain_id): + try: + metadata_ref.update(self.get_metadata(user_id=user_id, + domain_id=domain_id)) + except exception.MetadataNotFound: + pass + + self.get_user(user_id) + self.get_domain(domain_id) + metadata_ref = {} + update_metadata_for_user_domain_roles(self, metadata_ref, + user_id, domain_id) + update_metadata_for_group_domain_roles(self, metadata_ref, + user_id, domain_id) + return list(set(metadata_ref.get('roles', []))) def add_role_to_user_and_project(self, user_id, tenant_id, role_id): """Add a role to a user within given tenant. diff --git a/tests/test_backend.py b/tests/test_backend.py index 25cbd6cf45..6efdb17e62 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -497,6 +497,87 @@ class IdentityTests(object): self.assertIn(self.role_admin['id'], roles_ref) self.assertIn('member', roles_ref) + def test_get_roles_for_user_and_domain(self): + """ Test for getting roles for user on a domain. + + Test Plan: + - Create a domain, with 2 users + - Check no roles yet exit + - Give user1 two roles on the domain, user2 one role + - Get roles on user1 and the domain - maybe sure we only + get back the 2 roles on user1 + - Delete both roles from user1 + - Check we get no roles back for user1 on domain + + """ + new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(new_domain['id'], new_domain) + new_user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, 'enabled': True, + 'domain_id': new_domain['id']} + self.identity_api.create_user(new_user1['id'], new_user1) + new_user2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, 'enabled': True, + 'domain_id': new_domain['id']} + self.identity_api.create_user(new_user2['id'], new_user2) + roles_ref = self.identity_api.list_grants( + user_id=new_user1['id'], + domain_id=new_domain['id']) + self.assertEquals(len(roles_ref), 0) + # Now create the grants (roles are defined in default_fixtures) + self.identity_api.create_grant(user_id=new_user1['id'], + domain_id=new_domain['id'], + role_id='member') + self.identity_api.create_grant(user_id=new_user1['id'], + domain_id=new_domain['id'], + role_id='other') + self.identity_api.create_grant(user_id=new_user2['id'], + domain_id=new_domain['id'], + role_id='admin') + # Read back the roles for user1 on domain + roles_ids = self.identity_api.get_roles_for_user_and_domain( + new_user1['id'], new_domain['id']) + self.assertEqual(len(roles_ids), 2) + self.assertIn(self.role_member['id'], roles_ids) + self.assertIn(self.role_other['id'], roles_ids) + + # Now delete both grants for user1 + self.identity_api.delete_grant(user_id=new_user1['id'], + domain_id=new_domain['id'], + role_id='member') + self.identity_api.delete_grant(user_id=new_user1['id'], + domain_id=new_domain['id'], + role_id='other') + roles_ref = self.identity_api.list_grants( + user_id=new_user1['id'], + domain_id=new_domain['id']) + self.assertEquals(len(roles_ref), 0) + + def test_get_roles_for_user_and_domain_404(self): + """ Test errors raised when getting roles for user on a domain. + + Test Plan: + - Check non-existing user gives UserNotFound + - Check non-existing domain gives DomainNotFound + + """ + new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(new_domain['id'], new_domain) + new_user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, 'enabled': True, + 'domain_id': new_domain['id']} + self.identity_api.create_user(new_user1['id'], new_user1) + + self.assertRaises(exception.UserNotFound, + self.identity_api.get_roles_for_user_and_domain, + uuid.uuid4().hex, + new_domain['id']) + + self.assertRaises(exception.DomainNotFound, + self.identity_api.get_roles_for_user_and_domain, + new_user1['id'], + uuid.uuid4().hex) + def test_get_roles_for_user_and_project_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.get_roles_for_user_and_project, diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py index bd94f00452..36f6b184dd 100644 --- a/tests/test_backend_ldap.py +++ b/tests/test_backend_ldap.py @@ -407,6 +407,12 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): def test_get_and_remove_correct_role_grant_from_a_mix(self): raise nose.exc.SkipTest('Blocked by bug 1101287') + def test_get_roles_for_user_and_domain(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_get_roles_for_user_and_domain_404(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + def test_domain_crud(self): raise nose.exc.SkipTest('Blocked by bug 1101276')