Make getting user-domain roles backend independant

There is nothing backend specific in geting the list of roles
for a user-domain, so we should move this function into backends
core.  This also has the affect of now ensuring that the kvs and ldap
support will work, provided the specific backend supports roles on
users and domains.  This is true today for kvs, but support in ldap
for domains is gated by other bugs.

Fixes bug #1131769

Change-Id: Id99accb33fd7cd8d6c37e64e140552c5bfe68349
This commit is contained in:
Henry Nash 2013-02-22 15:34:58 +00:00
parent 2f916c6d54
commit 1e64378f42
5 changed files with 128 additions and 35 deletions

View File

@ -134,7 +134,11 @@ class Identity(identity.Driver):
except exception.NotFound:
raise exception.UserNotFound(user_id=user_name)
def get_metadata(self, user_id, tenant_id):
def get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None):
# FIXME(henry-nash): Use domain_id and group_id once domains
# and groups are implemented in LDAP backend
if not self.get_project(tenant_id) or not self.get_user(user_id):
return {}

View File

@ -403,24 +403,6 @@ class Identity(sql.Base, identity.Driver):
except exception.MetadataNotFound:
pass
def _get_user_group_domain_roles(self, metadata_ref, user_id, domain_id):
group_refs = self.list_groups_for_user(user_id=user_id)
for x in group_refs:
try:
metadata_ref.update(
self.get_metadata(group_id=x['id'],
domain_id=domain_id))
except exception.MetadataNotFound:
# no group grant, skip
pass
def _get_user_domain_roles(self, metadata_ref, user_id, domain_id):
try:
metadata_ref.update(self.get_metadata(user_id,
domain_id=domain_id))
except exception.MetadataNotFound:
pass
def get_roles_for_user_and_project(self, user_id, tenant_id):
self.get_user(user_id)
self.get_project(tenant_id)
@ -429,14 +411,6 @@ class Identity(sql.Base, identity.Driver):
self._get_user_group_project_roles(metadata_ref, user_id, tenant_id)
return list(set(metadata_ref.get('roles', [])))
def get_roles_for_user_and_domain(self, user_id, domain_id):
self.get_user(user_id)
self.get_domain(domain_id)
metadata_ref = {}
self._get_user_domain_roles(metadata_ref, user_id, domain_id)
self._get_user_group_domain_roles(metadata_ref, user_id, domain_id)
return list(set(metadata_ref.get('roles', [])))
def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
self.get_user(user_id)
self.get_project(tenant_id)

View File

@ -102,15 +102,15 @@ class Driver(object):
raise exception.NotImplemented()
def add_user_to_project(self, tenant_id, user_id):
"""Add user to a tenant by creating a default role relationship.
"""Add user to a tenant by creating a default role relationship.
:raises: keystone.exception.ProjectNotFound,
keystone.exception.UserNotFound
:raises: keystone.exception.ProjectNotFound,
keystone.exception.UserNotFound
"""
self.add_role_to_user_and_project(user_id,
tenant_id,
config.CONF.member_role_id)
"""
self.add_role_to_user_and_project(user_id,
tenant_id,
config.CONF.member_role_id)
def remove_user_from_project(self, tenant_id, user_id):
"""Remove user from a tenant
@ -161,7 +161,35 @@ class Driver(object):
keystone.exception.ProjectNotFound
"""
raise exception.NotImplemented()
def update_metadata_for_group_domain_roles(self, metadata_ref,
user_id, domain_id):
group_refs = self.list_groups_for_user(user_id=user_id)
for x in group_refs:
try:
metadata_ref.update(
self.get_metadata(group_id=x['id'],
domain_id=domain_id))
except exception.MetadataNotFound:
# no group grant, skip
pass
def update_metadata_for_user_domain_roles(self, metadata_ref,
user_id, domain_id):
try:
metadata_ref.update(self.get_metadata(user_id=user_id,
domain_id=domain_id))
except exception.MetadataNotFound:
pass
self.get_user(user_id)
self.get_domain(domain_id)
metadata_ref = {}
update_metadata_for_user_domain_roles(self, metadata_ref,
user_id, domain_id)
update_metadata_for_group_domain_roles(self, metadata_ref,
user_id, domain_id)
return list(set(metadata_ref.get('roles', [])))
def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
"""Add a role to a user within given tenant.

View File

@ -496,6 +496,87 @@ class IdentityTests(object):
self.assertIn(self.role_admin['id'], roles_ref)
self.assertIn('member', roles_ref)
def test_get_roles_for_user_and_domain(self):
""" Test for getting roles for user on a domain.
Test Plan:
- Create a domain, with 2 users
- Check no roles yet exit
- Give user1 two roles on the domain, user2 one role
- Get roles on user1 and the domain - maybe sure we only
get back the 2 roles on user1
- Delete both roles from user1
- Check we get no roles back for user1 on domain
"""
new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(new_domain['id'], new_domain)
new_user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': new_domain['id']}
self.identity_api.create_user(new_user1['id'], new_user1)
new_user2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': new_domain['id']}
self.identity_api.create_user(new_user2['id'], new_user2)
roles_ref = self.identity_api.list_grants(
user_id=new_user1['id'],
domain_id=new_domain['id'])
self.assertEquals(len(roles_ref), 0)
# Now create the grants (roles are defined in default_fixtures)
self.identity_api.create_grant(user_id=new_user1['id'],
domain_id=new_domain['id'],
role_id='member')
self.identity_api.create_grant(user_id=new_user1['id'],
domain_id=new_domain['id'],
role_id='other')
self.identity_api.create_grant(user_id=new_user2['id'],
domain_id=new_domain['id'],
role_id='admin')
# Read back the roles for user1 on domain
roles_ids = self.identity_api.get_roles_for_user_and_domain(
new_user1['id'], new_domain['id'])
self.assertEqual(len(roles_ids), 2)
self.assertIn(self.role_member['id'], roles_ids)
self.assertIn(self.role_other['id'], roles_ids)
# Now delete both grants for user1
self.identity_api.delete_grant(user_id=new_user1['id'],
domain_id=new_domain['id'],
role_id='member')
self.identity_api.delete_grant(user_id=new_user1['id'],
domain_id=new_domain['id'],
role_id='other')
roles_ref = self.identity_api.list_grants(
user_id=new_user1['id'],
domain_id=new_domain['id'])
self.assertEquals(len(roles_ref), 0)
def test_get_roles_for_user_and_domain_404(self):
""" Test errors raised when getting roles for user on a domain.
Test Plan:
- Check non-existing user gives UserNotFound
- Check non-existing domain gives DomainNotFound
"""
new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(new_domain['id'], new_domain)
new_user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': new_domain['id']}
self.identity_api.create_user(new_user1['id'], new_user1)
self.assertRaises(exception.UserNotFound,
self.identity_api.get_roles_for_user_and_domain,
uuid.uuid4().hex,
new_domain['id'])
self.assertRaises(exception.DomainNotFound,
self.identity_api.get_roles_for_user_and_domain,
new_user1['id'],
uuid.uuid4().hex)
def test_get_roles_for_user_and_project_404(self):
self.assertRaises(exception.UserNotFound,
self.identity_api.get_roles_for_user_and_project,

View File

@ -406,6 +406,12 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
def test_get_and_remove_correct_role_grant_from_a_mix(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
def test_get_roles_for_user_and_domain(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
def test_get_roles_for_user_and_domain_404(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
def test_domain_crud(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')