Remove manager-driver assignment metadata construct

The old kvs-inspired metadata construct was still hanging around
between the assignment manager and driver. This patch removes this
and re-uses other existing methods in its place.

Implements: blueprint remove-role-metadata

Change-Id: Ic7737fc44c1d1a86521c428216316d06b67dd1f9
This commit is contained in:
Henry Nash 2015-01-21 15:31:04 +00:00
parent 9ee582a48c
commit b2ed78c7dd
4 changed files with 38 additions and 258 deletions

View File

@ -70,41 +70,6 @@ class Assignment(assignment.AssignmentDriverV8):
# logic.
return role_list
def _get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None):
def _get_roles_for_just_user_and_project(user_id, tenant_id):
user_dn = self.user._id_to_dn(user_id)
return [self.role._dn_to_id(a.role_dn)
for a in self.role.get_role_assignments
(self.project._id_to_dn(tenant_id))
if common_ldap.is_dn_equal(a.user_dn, user_dn)]
def _get_roles_for_group_and_project(group_id, project_id):
group_dn = self.group._id_to_dn(group_id)
return [self.role._dn_to_id(a.role_dn)
for a in self.role.get_role_assignments
(self.project._id_to_dn(project_id))
if common_ldap.is_dn_equal(a.user_dn, group_dn)]
if domain_id is not None:
msg = _('Domain metadata not supported by LDAP')
raise exception.NotImplemented(message=msg)
if group_id is None and user_id is None:
return {}
if tenant_id is None:
return {}
if user_id is None:
metadata_ref = _get_roles_for_group_and_project(group_id,
tenant_id)
else:
metadata_ref = _get_roles_for_just_user_and_project(user_id,
tenant_id)
if not metadata_ref:
return {}
return {'roles': [self._role_to_dict(r, False) for r in metadata_ref]}
def list_project_ids_for_user(self, user_id, group_ids, hints,
inherited=False):
# TODO(henry-nash): The ldap driver does not support inherited
@ -203,35 +168,34 @@ class Assignment(assignment.AssignmentDriverV8):
self.role.delete_user(ref.role_dn, ref.group_dn,
self.role._dn_to_id(ref.role_dn))
def _assert_not_domain_grant(self, domain_id):
if domain_id is not None:
msg = _('Domain grants not supported by LDAP')
raise exception.NotImplemented(message=msg)
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
self._assert_not_domain_grant(domain_id)
if user_id is None:
metadata_ref['roles'] = self._add_role_to_group_and_project(
self._add_role_to_group_and_project(
group_id, project_id, role_id)
else:
metadata_ref['roles'] = self.add_role_to_user_and_project(
self.add_role_to_user_and_project(
user_id, project_id, role_id)
def check_grant_role_id(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
role_ids = set(self._roles_from_role_dicts(
metadata_ref.get('roles', []), inherited_to_projects))
if role_id not in role_ids:
self._assert_not_domain_grant(domain_id)
if not self.list_role_assignments(
role_id=role_id, user_id=user_id,
group_ids=[group_id] if group_id else [],
project_ids=[project_id] if project_id else []):
actor_id = user_id or group_id
target_id = domain_id or project_id
raise exception.RoleAssignmentNotFound(role_id=role_id,
@ -242,19 +206,17 @@ class Assignment(assignment.AssignmentDriverV8):
domain_id=None, project_id=None,
inherited_to_projects=False):
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
self.check_grant_role_id(
role_id, user_id=user_id, group_id=group_id,
domain_id=domain_id, project_id=project_id,
inherited_to_projects=inherited_to_projects)
try:
if user_id is None:
metadata_ref['roles'] = (
self._remove_role_from_group_and_project(
group_id, project_id, role_id))
self._remove_role_from_group_and_project(
group_id, project_id, role_id)
else:
metadata_ref['roles'] = self.remove_role_from_user_and_project(
self.remove_role_from_user_and_project(
user_id, project_id, role_id)
except (exception.RoleNotFound, KeyError):
actor_id = user_id or group_id
@ -267,14 +229,12 @@ class Assignment(assignment.AssignmentDriverV8):
domain_id=None, project_id=None,
inherited_to_projects=False):
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
return self._roles_from_role_dicts(metadata_ref.get('roles', []),
inherited_to_projects)
self._assert_not_domain_grant(domain_id)
assignment_list = self.list_role_assignments(
user_id=user_id, group_ids=[group_id] if group_id else [],
project_ids=[project_id] if project_id else [])
# Use set() to process the list to remove any duplicates
return list(set([x['role_id'] for x in assignment_list]))
def list_role_assignments(self, role_id=None,
user_id=None, group_ids=None,

View File

@ -66,51 +66,6 @@ class Assignment(keystone_assignment.AssignmentDriverV8):
assignments = query.all()
return [assignment.actor_id for assignment in assignments]
def _get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None, session=None):
# TODO(henry-nash): This method represents the last vestiges of the old
# metadata concept in this driver. Although we no longer need it here,
# since the Manager layer uses the metadata concept across all
# assignment drivers, we need to remove it from all of them in order to
# finally remove this method.
# We aren't given a session when called by the manager directly.
if session is None:
session = sql.get_session()
q = session.query(RoleAssignment)
def _calc_assignment_type():
# Figure out the assignment type we're checking for from the args.
if user_id:
if tenant_id:
return AssignmentType.USER_PROJECT
else:
return AssignmentType.USER_DOMAIN
else:
if tenant_id:
return AssignmentType.GROUP_PROJECT
else:
return AssignmentType.GROUP_DOMAIN
q = q.filter_by(type=_calc_assignment_type())
q = q.filter_by(actor_id=user_id or group_id)
q = q.filter_by(target_id=tenant_id or domain_id)
refs = q.all()
if not refs:
raise exception.MetadataNotFound()
metadata_ref = {}
metadata_ref['roles'] = []
for assignment in refs:
role_ref = {}
role_ref['id'] = assignment.role_id
if assignment.inherited:
role_ref['inherited_to'] = 'projects'
metadata_ref['roles'].append(role_ref)
return metadata_ref
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):

View File

@ -87,106 +87,31 @@ class Manager(manager.Manager):
"""Get the roles associated with a user within given project.
This includes roles directly assigned to the user on the
project, as well as those by virtue of group membership. If
the OS-INHERIT extension is enabled, then this will also
include roles inherited from the domain.
project, as well as those by virtue of group membership or
inheritance.
:returns: a list of role ids.
:raises: keystone.exception.UserNotFound,
keystone.exception.ProjectNotFound
:raises: keystone.exception.ProjectNotFound
"""
def _get_group_project_roles(user_id, project_ref):
group_ids = self._get_group_ids_for_user_id(user_id)
return self.list_role_ids_for_groups_on_project(
group_ids,
project_ref['id'],
project_ref['domain_id'],
self._list_parent_ids_of_project(project_ref['id']))
def _get_user_project_roles(user_id, project_ref):
role_list = []
try:
metadata_ref = self._get_metadata(user_id=user_id,
tenant_id=project_ref['id'])
role_list = self._roles_from_role_dicts(
metadata_ref.get('roles', {}), False)
except exception.MetadataNotFound: # nosec: No metadata so no
# roles.
pass
if CONF.os_inherit.enabled:
# Now get any inherited roles for the owning domain
try:
metadata_ref = self._get_metadata(
user_id=user_id, domain_id=project_ref['domain_id'])
role_list += self._roles_from_role_dicts(
metadata_ref.get('roles', {}), True)
except (exception.MetadataNotFound, # nosec : No metadata or
# the backend doesn't support the role ops, so no
# roles.
exception.NotImplemented):
pass
# As well inherited roles from parent projects
for p in self.resource_api.list_project_parents(
project_ref['id']):
p_roles = self.list_grants(
user_id=user_id, project_id=p['id'],
inherited_to_projects=True)
role_list += [x['id'] for x in p_roles]
return role_list
project_ref = self.resource_api.get_project(tenant_id)
user_role_list = _get_user_project_roles(user_id, project_ref)
group_role_list = _get_group_project_roles(user_id, project_ref)
self.resource_api.get_project(tenant_id)
assignment_list = self.list_role_assignments(
user_id=user_id, project_id=tenant_id, effective=True)
# Use set() to process the list to remove any duplicates
return list(set(user_role_list + group_role_list))
return list(set([x['role_id'] for x in assignment_list]))
def get_roles_for_user_and_domain(self, user_id, domain_id):
"""Get the roles associated with a user within given domain.
:returns: a list of role ids.
:raises: keystone.exception.UserNotFound,
keystone.exception.DomainNotFound
:raises: keystone.exception.DomainNotFound
"""
def _get_group_domain_roles(user_id, domain_id):
role_list = []
group_ids = self._get_group_ids_for_user_id(user_id)
for group_id in group_ids:
try:
metadata_ref = self._get_metadata(group_id=group_id,
domain_id=domain_id)
role_list += self._roles_from_role_dicts(
metadata_ref.get('roles', {}), False)
except (exception.MetadataNotFound, # nosec
exception.NotImplemented):
# MetadataNotFound implies no group grant, so skip.
# Ignore NotImplemented since not all backends support
# domains.
pass
return role_list
def _get_user_domain_roles(user_id, domain_id):
metadata_ref = {}
try:
metadata_ref = self._get_metadata(user_id=user_id,
domain_id=domain_id)
except (exception.MetadataNotFound, # nosec
exception.NotImplemented):
# MetadataNotFound implies no user grants.
# Ignore NotImplemented since not all backends support
# domains
pass
return self._roles_from_role_dicts(
metadata_ref.get('roles', {}), False)
self.resource_api.get_domain(domain_id)
user_role_list = _get_user_domain_roles(user_id, domain_id)
group_role_list = _get_group_domain_roles(user_id, domain_id)
assignment_list = self.list_role_assignments(
user_id=user_id, domain_id=domain_id, effective=True)
# Use set() to process the list to remove any duplicates
return list(set(user_role_list + group_role_list))
return list(set([x['role_id'] for x in assignment_list]))
def get_roles_for_groups(self, group_ids, project_id=None, domain_id=None):
"""Get a list of roles for this group on domain and/or project."""
@ -915,20 +840,6 @@ class Manager(manager.Manager):
@six.add_metaclass(abc.ABCMeta)
class AssignmentDriverV8(object):
def _role_to_dict(self, role_id, inherited):
role_dict = {'id': role_id}
if inherited:
role_dict['inherited_to'] = 'projects'
return role_dict
def _roles_from_role_dicts(self, dict_list, inherited):
role_list = []
for d in dict_list:
if ((not d.get('inherited_to') and not inherited) or
(d.get('inherited_to') == 'projects' and inherited)):
role_list.append(d['id'])
return role_list
def _get_list_limit(self):
return CONF.assignment.list_limit or CONF.list_limit

View File

@ -261,52 +261,6 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
tenants = self.assignment_api.list_projects_for_user(user['id'])
self.assertEqual([], tenants)
def test_metadata_removed_on_delete_user(self):
# A test to check that the internal representation
# or roles is correctly updated when a user is deleted
user = {'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID,
'password': 'passwd'}
user = self.identity_api.create_user(user)
role = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.role_api.create_role(role['id'], role)
self.assignment_api.add_role_to_user_and_project(
user['id'],
self.tenant_bar['id'],
role['id'])
self.identity_api.delete_user(user['id'])
# Now check whether the internal representation of roles
# has been deleted
self.assertRaises(exception.MetadataNotFound,
self.assignment_api._get_metadata,
user['id'],
self.tenant_bar['id'])
def test_metadata_removed_on_delete_project(self):
# A test to check that the internal representation
# or roles is correctly updated when a project is deleted
user = {'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID,
'password': 'passwd'}
user = self.identity_api.create_user(user)
role = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.role_api.create_role(role['id'], role)
self.assignment_api.add_role_to_user_and_project(
user['id'],
self.tenant_bar['id'],
role['id'])
self.resource_api.delete_project(self.tenant_bar['id'])
# Now check whether the internal representation of roles
# has been deleted
self.assertRaises(exception.MetadataNotFound,
self.assignment_api._get_metadata,
user['id'],
self.tenant_bar['id'])
def test_update_project_returns_extra(self):
"""This tests for backwards-compatibility with an essex/folsom bug.