Merge "Implied roles driver and manager"
This commit is contained in:
commit
1053b63e8c
keystone
@ -594,6 +594,61 @@ class Manager(manager.Manager):
|
|||||||
return expand_group_assignment(ref, user_id)
|
return expand_group_assignment(ref, user_id)
|
||||||
return [ref]
|
return [ref]
|
||||||
|
|
||||||
|
def _add_implied_roles(self, role_refs):
|
||||||
|
"""Expand out implied roles.
|
||||||
|
|
||||||
|
The role_refs passed in have had all inheritance and group assignments
|
||||||
|
expanded out. We now need to look at the role_id in each ref and see
|
||||||
|
if it is a prior role for some implied roles. If it is, then we need to
|
||||||
|
duplicate that ref, one for each implied role. We store the prior role
|
||||||
|
in the indirect dict that is part of such a duplicated ref, so that a
|
||||||
|
caller can determine where the assignment came from.
|
||||||
|
|
||||||
|
"""
|
||||||
|
def _make_implied_ref_copy(prior_ref, implied_role_id):
|
||||||
|
# Create a ref for an implied role from the ref of a prior role,
|
||||||
|
# setting the new role_id to be the implied role and the indirect
|
||||||
|
# role_id to be the prior role
|
||||||
|
implied_ref = copy.deepcopy(prior_ref)
|
||||||
|
implied_ref['role_id'] = implied_role_id
|
||||||
|
indirect = implied_ref.setdefault('indirect', {})
|
||||||
|
indirect['role_id'] = prior_ref['role_id']
|
||||||
|
return implied_ref
|
||||||
|
|
||||||
|
if not CONF.token.infer_roles:
|
||||||
|
return role_refs
|
||||||
|
try:
|
||||||
|
implied_roles_cache = {}
|
||||||
|
role_refs_to_check = list(role_refs)
|
||||||
|
ref_results = list(role_refs)
|
||||||
|
while(role_refs_to_check):
|
||||||
|
next_ref = role_refs_to_check.pop()
|
||||||
|
next_role_id = next_ref['role_id']
|
||||||
|
if next_role_id in implied_roles_cache:
|
||||||
|
implied_roles = implied_roles_cache[next_role_id]
|
||||||
|
else:
|
||||||
|
implied_roles = (
|
||||||
|
self.role_api.list_implied_roles(next_role_id))
|
||||||
|
implied_roles_cache[next_role_id] = implied_roles
|
||||||
|
for implied_role in implied_roles:
|
||||||
|
implied_ref = (
|
||||||
|
_make_implied_ref_copy(
|
||||||
|
next_ref, implied_role['implied_role_id']))
|
||||||
|
ref_results.append(implied_ref)
|
||||||
|
role_refs_to_check.append(implied_ref)
|
||||||
|
except exception.NotImplemented:
|
||||||
|
LOG.debug('Role driver does not support implied roles.')
|
||||||
|
|
||||||
|
return ref_results
|
||||||
|
|
||||||
|
def _filter_by_role_id(self, role_id, ref_results):
|
||||||
|
# if we arrive here, we need to filer by role_id.
|
||||||
|
filter_results = []
|
||||||
|
for ref in ref_results:
|
||||||
|
if ref['role_id'] == role_id:
|
||||||
|
filter_results.append(ref)
|
||||||
|
return filter_results
|
||||||
|
|
||||||
def _list_effective_role_assignments(self, role_id, user_id, group_id,
|
def _list_effective_role_assignments(self, role_id, user_id, group_id,
|
||||||
domain_id, project_id, subtree_ids,
|
domain_id, project_id, subtree_ids,
|
||||||
inherited):
|
inherited):
|
||||||
@ -717,19 +772,24 @@ class Manager(manager.Manager):
|
|||||||
# relevant, since domains don't inherit assignments
|
# relevant, since domains don't inherit assignments
|
||||||
inherited = False if domain_id else inherited
|
inherited = False if domain_id else inherited
|
||||||
|
|
||||||
# List user assignments
|
# List user assignments.
|
||||||
|
# Due to the need to expand implied roles, this call will skip
|
||||||
|
# filtering by role_id and instead return the whole set of roles.
|
||||||
|
# Matching on the specified role is performed at the end.
|
||||||
|
|
||||||
direct_refs = list_role_assignments_for_actor(
|
direct_refs = list_role_assignments_for_actor(
|
||||||
role_id=role_id, user_id=user_id, project_id=project_id,
|
role_id=None, user_id=user_id, project_id=project_id,
|
||||||
subtree_ids=subtree_ids, domain_id=domain_id,
|
subtree_ids=subtree_ids, domain_id=domain_id,
|
||||||
inherited=inherited)
|
inherited=inherited)
|
||||||
|
|
||||||
# And those from the user's groups
|
# And those from the user's groups. Again, role_id is not
|
||||||
|
# used to filter here
|
||||||
group_refs = []
|
group_refs = []
|
||||||
if user_id:
|
if user_id:
|
||||||
group_ids = self._get_group_ids_for_user_id(user_id)
|
group_ids = self._get_group_ids_for_user_id(user_id)
|
||||||
if group_ids:
|
if group_ids:
|
||||||
group_refs = list_role_assignments_for_actor(
|
group_refs = list_role_assignments_for_actor(
|
||||||
role_id=role_id, project_id=project_id,
|
role_id=None, project_id=project_id,
|
||||||
subtree_ids=subtree_ids, group_ids=group_ids,
|
subtree_ids=subtree_ids, group_ids=group_ids,
|
||||||
domain_id=domain_id, inherited=inherited)
|
domain_id=domain_id, inherited=inherited)
|
||||||
|
|
||||||
@ -740,6 +800,10 @@ class Manager(manager.Manager):
|
|||||||
ref=ref, user_id=user_id, project_id=project_id,
|
ref=ref, user_id=user_id, project_id=project_id,
|
||||||
subtree_ids=subtree_ids)
|
subtree_ids=subtree_ids)
|
||||||
|
|
||||||
|
refs = self._add_implied_roles(refs)
|
||||||
|
if role_id:
|
||||||
|
refs = self._filter_by_role_id(role_id, refs)
|
||||||
|
|
||||||
return refs
|
return refs
|
||||||
|
|
||||||
def _list_direct_role_assignments(self, role_id, user_id, group_id,
|
def _list_direct_role_assignments(self, role_id, user_id, group_id,
|
||||||
@ -1421,7 +1485,39 @@ class RoleDriverV9(RoleDriverBase):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
pass
|
@abc.abstractmethod
|
||||||
|
def get_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
"""Fetches a role inference rule
|
||||||
|
|
||||||
|
:raises keystone.exception.ImpliedRoleNotFound: If the implied role
|
||||||
|
doesn't exist.
|
||||||
|
|
||||||
|
"""
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def create_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
"""Creates a role inference rule
|
||||||
|
|
||||||
|
:raises: keystone.exception.RoleNotFound: If the role doesn't exist.
|
||||||
|
|
||||||
|
"""
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def delete_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
"""Deletes a role inference rule"""
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def list_role_inference_rules(self):
|
||||||
|
"""Lists all the rules used to imply one role from another"""
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def list_implied_roles(self, prior_role_id):
|
||||||
|
"""Lists roles implied from the prior role id"""
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
|
||||||
class V9RoleWrapperForV8Driver(RoleDriverV9):
|
class V9RoleWrapperForV8Driver(RoleDriverV9):
|
||||||
@ -1472,5 +1568,19 @@ class V9RoleWrapperForV8Driver(RoleDriverV9):
|
|||||||
def delete_role(self, role_id):
|
def delete_role(self, role_id):
|
||||||
self.driver.delete_role(role_id)
|
self.driver.delete_role(role_id)
|
||||||
|
|
||||||
|
def get_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
def create_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
def delete_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
def list_implied_roles(self, prior_role_id):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
def list_role_inference_rules(self):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
RoleDriver = manager.create_legacy_driver(RoleDriverV8)
|
RoleDriver = manager.create_legacy_driver(RoleDriverV8)
|
||||||
|
@ -96,8 +96,23 @@ class Role(assignment.RoleDriverV9):
|
|||||||
self.get_role(role_id)
|
self.get_role(role_id)
|
||||||
return self.role.update(role_id, role)
|
return self.role.update(role_id, role)
|
||||||
|
|
||||||
|
def create_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
# NOTE(heny-nash): A mixin class to enable the sharing of the LDAP structure
|
def delete_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
def list_implied_roles(self, prior_role_id):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
def list_role_inference_rules(self):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
def get_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
raise exception.NotImplemented() # pragma: no cover
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE(henry-nash): A mixin class to enable the sharing of the LDAP structure
|
||||||
# between here and the assignment LDAP.
|
# between here and the assignment LDAP.
|
||||||
class RoleLdapStructureMixin(object):
|
class RoleLdapStructureMixin(object):
|
||||||
DEFAULT_OU = 'ou=Roles'
|
DEFAULT_OU = 'ou=Roles'
|
||||||
|
@ -9,6 +9,8 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
from oslo_db import exception as db_exception
|
||||||
|
from sqlalchemy import and_
|
||||||
|
|
||||||
from keystone import assignment
|
from keystone import assignment
|
||||||
from keystone.common import driver_hints
|
from keystone.common import driver_hints
|
||||||
@ -71,6 +73,80 @@ class Role(assignment.RoleDriverV9):
|
|||||||
ref = self._get_role(session, role_id)
|
ref = self._get_role(session, role_id)
|
||||||
session.delete(ref)
|
session.delete(ref)
|
||||||
|
|
||||||
|
@sql.handle_conflicts(conflict_type='implied_role')
|
||||||
|
def create_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
with sql.transaction() as session:
|
||||||
|
inference = {'prior_role_id': prior_role_id,
|
||||||
|
'implied_role_id': implied_role_id}
|
||||||
|
ref = ImpliedRoleTable.from_dict(inference)
|
||||||
|
try:
|
||||||
|
session.add(ref)
|
||||||
|
except db_exception.DBReferenceError:
|
||||||
|
# We don't know which role threw this.
|
||||||
|
# Query each to trigger the exception.
|
||||||
|
self._get_role(prior_role_id)
|
||||||
|
self._get_role(implied_role_id)
|
||||||
|
return ref.to_dict()
|
||||||
|
|
||||||
|
def delete_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
with sql.transaction() as session:
|
||||||
|
query = session.query(ImpliedRoleTable).filter(and_(
|
||||||
|
ImpliedRoleTable.prior_role_id == prior_role_id,
|
||||||
|
ImpliedRoleTable.implied_role_id == implied_role_id))
|
||||||
|
query.delete(synchronize_session='fetch')
|
||||||
|
|
||||||
|
def list_implied_roles(self, prior_role_id):
|
||||||
|
with sql.transaction() as session:
|
||||||
|
query = session.query(
|
||||||
|
ImpliedRoleTable).filter(
|
||||||
|
ImpliedRoleTable.prior_role_id == prior_role_id)
|
||||||
|
refs = query.all()
|
||||||
|
return [ref.to_dict() for ref in refs]
|
||||||
|
|
||||||
|
def list_role_inference_rules(self):
|
||||||
|
with sql.transaction() as session:
|
||||||
|
query = session.query(ImpliedRoleTable)
|
||||||
|
refs = query.all()
|
||||||
|
return [ref.to_dict() for ref in refs]
|
||||||
|
|
||||||
|
def get_implied_role(self, prior_role_id, implied_role_id):
|
||||||
|
with sql.transaction() as session:
|
||||||
|
query = session.query(
|
||||||
|
ImpliedRoleTable).filter(
|
||||||
|
ImpliedRoleTable.prior_role_id == prior_role_id).filter(
|
||||||
|
ImpliedRoleTable.implied_role_id == implied_role_id)
|
||||||
|
ref = query.all()
|
||||||
|
if len(ref) < 1:
|
||||||
|
raise exception.ImpliedRoleNotFound(
|
||||||
|
prior_role_id=prior_role_id,
|
||||||
|
implied_role_id=implied_role_id)
|
||||||
|
return ref[0].to_dict()
|
||||||
|
|
||||||
|
|
||||||
|
class ImpliedRoleTable(sql.ModelBase, sql.DictBase):
|
||||||
|
__tablename__ = 'implied_role'
|
||||||
|
attributes = ['prior_role_id', 'implied_role_id']
|
||||||
|
prior_role_id = sql.Column(sql.String(64), sql.ForeignKey('role.id'),
|
||||||
|
primary_key=True)
|
||||||
|
implied_role_id = sql.Column(sql.String(64), sql.ForeignKey('role.id'),
|
||||||
|
primary_key=True)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, dictionary):
|
||||||
|
new_dictionary = dictionary.copy()
|
||||||
|
return cls(**new_dictionary)
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
"""Return a dictionary with model's attributes.
|
||||||
|
|
||||||
|
overrides the `to_dict` function from the base class
|
||||||
|
to avoid having an `extra` field.
|
||||||
|
"""
|
||||||
|
d = dict()
|
||||||
|
for attr in self.__class__.attributes:
|
||||||
|
d[attr] = getattr(self, attr)
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
class RoleTable(sql.ModelBase, sql.DictBase):
|
class RoleTable(sql.ModelBase, sql.DictBase):
|
||||||
__tablename__ = 'role'
|
__tablename__ = 'role'
|
||||||
|
@ -301,6 +301,9 @@ FILE_OPTIONS = {
|
|||||||
'middleware must be configured with the '
|
'middleware must be configured with the '
|
||||||
'hash_algorithms, otherwise token revocation will '
|
'hash_algorithms, otherwise token revocation will '
|
||||||
'not be processed correctly.'),
|
'not be processed correctly.'),
|
||||||
|
cfg.BoolOpt('infer_roles', default=True,
|
||||||
|
help='Add roles to token that are not explicitly added, '
|
||||||
|
'but that are linked implicitly to other roles.'),
|
||||||
],
|
],
|
||||||
'revoke': [
|
'revoke': [
|
||||||
cfg.StrOpt('driver',
|
cfg.StrOpt('driver',
|
||||||
|
@ -152,6 +152,18 @@ class Role(Model):
|
|||||||
optional_keys = tuple()
|
optional_keys = tuple()
|
||||||
|
|
||||||
|
|
||||||
|
class ImpliedRole(Model):
|
||||||
|
"""ImpliedRole object.
|
||||||
|
|
||||||
|
Required keys:
|
||||||
|
prior_role_id
|
||||||
|
implied_role_id
|
||||||
|
"""
|
||||||
|
|
||||||
|
required_keys = ('prior_role_id', 'implied_role_id')
|
||||||
|
optional_keys = tuple()
|
||||||
|
|
||||||
|
|
||||||
class Trust(Model):
|
class Trust(Model):
|
||||||
"""Trust object.
|
"""Trust object.
|
||||||
|
|
||||||
|
@ -285,6 +285,10 @@ class RoleNotFound(NotFound):
|
|||||||
message_format = _("Could not find role: %(role_id)s")
|
message_format = _("Could not find role: %(role_id)s")
|
||||||
|
|
||||||
|
|
||||||
|
class ImpliedRoleNotFound(NotFound):
|
||||||
|
message_format = _("%(prior_role_id)s does not imply %(implied_role_id)s")
|
||||||
|
|
||||||
|
|
||||||
class RoleAssignmentNotFound(NotFound):
|
class RoleAssignmentNotFound(NotFound):
|
||||||
message_format = _("Could not find role assignment with role: "
|
message_format = _("Could not find role assignment with role: "
|
||||||
"%(role_id)s, user or group: %(actor_id)s, "
|
"%(role_id)s, user or group: %(actor_id)s, "
|
||||||
|
@ -101,6 +101,12 @@ class AssignmentTestHelperMixin(object):
|
|||||||
{'project': {'project': 3}},
|
{'project': {'project': 3}},
|
||||||
{'project': {'project': 3}}]
|
{'project': {'project': 3}}]
|
||||||
|
|
||||||
|
# A set of implied role specifications. In this case, prior role
|
||||||
|
# index 0 implies role index 1, and role 1 implies roles 2 and 3.
|
||||||
|
|
||||||
|
'roles': [{'role': 0, 'implied_roles': [1]},
|
||||||
|
{'role': 1, 'implied_roles': [2, 3]}]
|
||||||
|
|
||||||
# A list of groups and their members. In this case make users with
|
# A list of groups and their members. In this case make users with
|
||||||
# index 0 and 1 members of group with index 0. Users and Groups are
|
# index 0 and 1 members of group with index 0. Users and Groups are
|
||||||
# indexed in the order they appear in the 'entities' key above.
|
# indexed in the order they appear in the 'entities' key above.
|
||||||
@ -286,6 +292,23 @@ class AssignmentTestHelperMixin(object):
|
|||||||
reference_data[reference_index][shorthand_data[key]]['id'])
|
reference_data[reference_index][shorthand_data[key]]['id'])
|
||||||
return expanded_key, index_value
|
return expanded_key, index_value
|
||||||
|
|
||||||
|
def create_implied_roles(self, implied_pattern, test_data):
|
||||||
|
"""Create the implied roles specified in the test plan."""
|
||||||
|
for implied_spec in implied_pattern:
|
||||||
|
# Each implied role specification is a dict of the form:
|
||||||
|
#
|
||||||
|
# {'role': 0, 'implied_roles': list of roles}
|
||||||
|
|
||||||
|
prior_role = test_data['roles'][implied_spec['role']]['id']
|
||||||
|
if isinstance(implied_spec['implied_roles'], list):
|
||||||
|
for this_role in implied_spec['implied_roles']:
|
||||||
|
implied_role = test_data['roles'][this_role]['id']
|
||||||
|
self.role_api.create_implied_role(prior_role, implied_role)
|
||||||
|
else:
|
||||||
|
implied_role = (
|
||||||
|
test_data['roles'][implied_spec['implied_roles']]['id'])
|
||||||
|
self.role_api.create_implied_role(prior_role, implied_role)
|
||||||
|
|
||||||
def create_group_memberships(self, group_pattern, test_data):
|
def create_group_memberships(self, group_pattern, test_data):
|
||||||
"""Create the group memberships specified in the test plan."""
|
"""Create the group memberships specified in the test plan."""
|
||||||
for group_spec in group_pattern:
|
for group_spec in group_pattern:
|
||||||
@ -399,6 +422,8 @@ class AssignmentTestHelperMixin(object):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
test_data = self.create_entities(test_plan['entities'])
|
test_data = self.create_entities(test_plan['entities'])
|
||||||
|
if 'implied_roles' in test_plan:
|
||||||
|
self.create_implied_roles(test_plan['implied_roles'], test_data)
|
||||||
if 'group_memberships' in test_plan:
|
if 'group_memberships' in test_plan:
|
||||||
self.create_group_memberships(test_plan['group_memberships'],
|
self.create_group_memberships(test_plan['group_memberships'],
|
||||||
test_data)
|
test_data)
|
||||||
@ -6397,6 +6422,162 @@ class InheritanceTests(AssignmentTestHelperMixin):
|
|||||||
self.assertIn(test_data['users'][x]['id'], user_ids)
|
self.assertIn(test_data['users'][x]['id'], user_ids)
|
||||||
|
|
||||||
|
|
||||||
|
class ImpliedRoleTests(AssignmentTestHelperMixin):
|
||||||
|
|
||||||
|
def test_role_assignments_simple_tree_of_implied_roles(self):
|
||||||
|
"""Test that implied roles are expanded out."""
|
||||||
|
test_plan = {
|
||||||
|
'entities': {'domains': {'users': 1, 'projects': 1},
|
||||||
|
'roles': 4},
|
||||||
|
# Three level tree of implied roles
|
||||||
|
'implied_roles': [{'role': 0, 'implied_roles': [1]},
|
||||||
|
{'role': 1, 'implied_roles': [2, 3]}],
|
||||||
|
'assignments': [{'user': 0, 'role': 0, 'project': 0}],
|
||||||
|
'tests': [
|
||||||
|
# List all direct assignments for user[0], this should just
|
||||||
|
# show the one top level role assignment
|
||||||
|
{'params': {'user': 0},
|
||||||
|
'results': [{'user': 0, 'role': 0, 'project': 0}]},
|
||||||
|
# Listing in effective mode should show the implied roles
|
||||||
|
# expanded out
|
||||||
|
{'params': {'user': 0, 'effective': True},
|
||||||
|
'results': [{'user': 0, 'role': 0, 'project': 0},
|
||||||
|
{'user': 0, 'role': 1, 'project': 0,
|
||||||
|
'indirect': {'role': 0}},
|
||||||
|
{'user': 0, 'role': 2, 'project': 0,
|
||||||
|
'indirect': {'role': 1}},
|
||||||
|
{'user': 0, 'role': 3, 'project': 0,
|
||||||
|
'indirect': {'role': 1}}]},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
self.execute_assignment_plan(test_plan)
|
||||||
|
|
||||||
|
def test_role_assignments_directed_graph_of_implied_roles(self):
|
||||||
|
"""Test that a role can have multiple, different prior roles."""
|
||||||
|
test_plan = {
|
||||||
|
'entities': {'domains': {'users': 1, 'projects': 1},
|
||||||
|
'roles': 6},
|
||||||
|
# Three level tree of implied roles, where one of the roles at the
|
||||||
|
# bottom is implied by more than one top level role
|
||||||
|
'implied_roles': [{'role': 0, 'implied_roles': [1, 2]},
|
||||||
|
{'role': 1, 'implied_roles': [3, 4]},
|
||||||
|
{'role': 5, 'implied_roles': [4]}],
|
||||||
|
# The use gets both top level roles
|
||||||
|
'assignments': [{'user': 0, 'role': 0, 'project': 0},
|
||||||
|
{'user': 0, 'role': 5, 'project': 0}],
|
||||||
|
'tests': [
|
||||||
|
# The implied roles should be expanded out and there should be
|
||||||
|
# two entries for the role that had two different prior roles.
|
||||||
|
{'params': {'user': 0, 'effective': True},
|
||||||
|
'results': [{'user': 0, 'role': 0, 'project': 0},
|
||||||
|
{'user': 0, 'role': 5, 'project': 0},
|
||||||
|
{'user': 0, 'role': 1, 'project': 0,
|
||||||
|
'indirect': {'role': 0}},
|
||||||
|
{'user': 0, 'role': 2, 'project': 0,
|
||||||
|
'indirect': {'role': 0}},
|
||||||
|
{'user': 0, 'role': 3, 'project': 0,
|
||||||
|
'indirect': {'role': 1}},
|
||||||
|
{'user': 0, 'role': 4, 'project': 0,
|
||||||
|
'indirect': {'role': 1}},
|
||||||
|
{'user': 0, 'role': 4, 'project': 0,
|
||||||
|
'indirect': {'role': 5}}]},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
test_data = self.execute_assignment_plan(test_plan)
|
||||||
|
|
||||||
|
# We should also be able to get a similar (yet summarized) answer to
|
||||||
|
# the above by calling get_roles_for_user_and_project(), which should
|
||||||
|
# list the role_ids, yet remove any duplicates
|
||||||
|
role_ids = self.assignment_api.get_roles_for_user_and_project(
|
||||||
|
test_data['users'][0]['id'], test_data['projects'][0]['id'])
|
||||||
|
# We should see 6 entries, not 7, since role index 5 appeared twice in
|
||||||
|
# the answer from list_role_assignments
|
||||||
|
self.assertThat(role_ids, matchers.HasLength(6))
|
||||||
|
for x in range(0, 5):
|
||||||
|
self.assertIn(test_data['roles'][x]['id'], role_ids)
|
||||||
|
|
||||||
|
def test_role_assignments_implied_roles_filtered_by_role(self):
|
||||||
|
"""Test that you can filter by role even if roles are implied."""
|
||||||
|
test_plan = {
|
||||||
|
'entities': {'domains': {'users': 1, 'projects': 2},
|
||||||
|
'roles': 4},
|
||||||
|
# Three level tree of implied roles
|
||||||
|
'implied_roles': [{'role': 0, 'implied_roles': [1]},
|
||||||
|
{'role': 1, 'implied_roles': [2, 3]}],
|
||||||
|
'assignments': [{'user': 0, 'role': 0, 'project': 0},
|
||||||
|
{'user': 0, 'role': 3, 'project': 1}],
|
||||||
|
'tests': [
|
||||||
|
# List effective roles filtering by one of the implied roles,
|
||||||
|
# showing that the filter was implied post expansion of
|
||||||
|
# implied roles (and that non impled roles are included in
|
||||||
|
# the filter
|
||||||
|
{'params': {'role': 3, 'effective': True},
|
||||||
|
'results': [{'user': 0, 'role': 3, 'project': 0,
|
||||||
|
'indirect': {'role': 1}},
|
||||||
|
{'user': 0, 'role': 3, 'project': 1}]},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
self.execute_assignment_plan(test_plan)
|
||||||
|
|
||||||
|
def test_role_assignments_simple_tree_of_implied_roles_on_domain(self):
|
||||||
|
"""Test that implied roles are expanded out when placed on a domain."""
|
||||||
|
test_plan = {
|
||||||
|
'entities': {'domains': {'users': 1},
|
||||||
|
'roles': 4},
|
||||||
|
# Three level tree of implied roles
|
||||||
|
'implied_roles': [{'role': 0, 'implied_roles': [1]},
|
||||||
|
{'role': 1, 'implied_roles': [2, 3]}],
|
||||||
|
'assignments': [{'user': 0, 'role': 0, 'domain': 0}],
|
||||||
|
'tests': [
|
||||||
|
# List all direct assignments for user[0], this should just
|
||||||
|
# show the one top level role assignment
|
||||||
|
{'params': {'user': 0},
|
||||||
|
'results': [{'user': 0, 'role': 0, 'domain': 0}]},
|
||||||
|
# Listing in effective mode should how the implied roles
|
||||||
|
# expanded out
|
||||||
|
{'params': {'user': 0, 'effective': True},
|
||||||
|
'results': [{'user': 0, 'role': 0, 'domain': 0},
|
||||||
|
{'user': 0, 'role': 1, 'domain': 0,
|
||||||
|
'indirect': {'role': 0}},
|
||||||
|
{'user': 0, 'role': 2, 'domain': 0,
|
||||||
|
'indirect': {'role': 1}},
|
||||||
|
{'user': 0, 'role': 3, 'domain': 0,
|
||||||
|
'indirect': {'role': 1}}]},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
self.execute_assignment_plan(test_plan)
|
||||||
|
|
||||||
|
def test_role_assignments_inherited_implied_roles(self):
|
||||||
|
"""Test that you can intermix inherited and implied roles."""
|
||||||
|
test_plan = {
|
||||||
|
'entities': {'domains': {'users': 1, 'projects': 1},
|
||||||
|
'roles': 4},
|
||||||
|
# Simply one level of implied roles
|
||||||
|
'implied_roles': [{'role': 0, 'implied_roles': [1]}],
|
||||||
|
# Assign to top level role as an inherited assignment to the
|
||||||
|
# domain
|
||||||
|
'assignments': [{'user': 0, 'role': 0, 'domain': 0,
|
||||||
|
'inherited_to_projects': True}],
|
||||||
|
'tests': [
|
||||||
|
# List all direct assignments for user[0], this should just
|
||||||
|
# show the one top level role assignment
|
||||||
|
{'params': {'user': 0},
|
||||||
|
'results': [{'user': 0, 'role': 0, 'domain': 0,
|
||||||
|
'inherited_to_projects': 'projects'}]},
|
||||||
|
# List in effective mode - we should only see the inital and
|
||||||
|
# implied role on the project (since inherited roles are not
|
||||||
|
# active on their anchor point).
|
||||||
|
{'params': {'user': 0, 'effective': True},
|
||||||
|
'results': [{'user': 0, 'role': 0, 'project': 0,
|
||||||
|
'indirect': {'domain': 0}},
|
||||||
|
{'user': 0, 'role': 1, 'project': 0,
|
||||||
|
'indirect': {'domain': 0, 'role': 0}}]},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
self.config_fixture.config(group='os_inherit', enabled=True)
|
||||||
|
self.execute_assignment_plan(test_plan)
|
||||||
|
|
||||||
|
|
||||||
class FilterTests(filtering.FilterTests):
|
class FilterTests(filtering.FilterTests):
|
||||||
def test_list_entities_filtered(self):
|
def test_list_entities_filtered(self):
|
||||||
for entity in ['user', 'group', 'project']:
|
for entity in ['user', 'group', 'project']:
|
||||||
|
@ -611,6 +611,10 @@ class SqlInheritance(SqlTests, test_backend.InheritanceTests):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SqlImpliedRoles(SqlTests, test_backend.ImpliedRoleTests):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class SqlTokenCacheInvalidation(SqlTests, test_backend.TokenCacheInvalidation):
|
class SqlTokenCacheInvalidation(SqlTests, test_backend.TokenCacheInvalidation):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(SqlTokenCacheInvalidation, self).setUp()
|
super(SqlTokenCacheInvalidation, self).setUp()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user