Implement backend logic for system roles

Introduce the plumbing so that we can store system role assignments.

bp system-scope

Change-Id: If9f56e4568fa528f201030edabdf30ac6961682e
This commit is contained in:
Lance Bragstad 2017-09-27 21:22:15 +00:00
parent bd729623f5
commit f86db08962
2 changed files with 144 additions and 0 deletions

View File

@ -145,3 +145,65 @@ class AssignmentDriverBase(object):
def delete_domain_assignments(self, domain_id):
"""Delete all assignments for a domain."""
raise exception.NotImplemented()
@abc.abstractmethod
def create_system_grant(self, role_id, actor_id, target_id,
assignment_type, inherited):
"""Grant a user or group a role on the system.
:param role_id: the unique ID of the role to grant to the user
:param actor_id: the unique ID of the user or group
:param target_id: the unique ID or string representing the target
:param assignment_type: a string describing the relationship of the
assignment
:param inherited: a boolean denoting if the assignment is inherited or
not
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def list_system_grants(self, actor_id, target_id, assignment_type):
"""Return a list of all system assignments for a specific entity.
:param actor_id: the unique ID of the actor
:param target_id: the unique ID of the target
:param assignment_type: the type of assignment to return
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def list_system_grants_by_role(self, role_id):
"""Return a list of system assignments associated to a role.
:param role_id: the unique ID of the role to grant to the user
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def check_system_grant(self, role_id, actor_id, target_id, inherited):
"""Check if a user or group has a specific role on the system.
:param role_id: the unique ID of the role to grant to the user
:param actor_id: the unique ID of the user or group
:param target_id: the unique ID or string representing the target
:param inherited: a boolean denoting if the assignment is inherited or
not
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def delete_system_grant(self, role_id, actor_id, target_id, inherited):
"""Remove a system assignment from a user or group.
:param role_id: the unique ID of the role to grant to the user
:param actor_id: the unique ID of the user or group
:param target_id: the unique ID or string representing the target
:param inherited: a boolean denoting if the assignment is inherited or
not
"""
raise exception.NotImplemented() # pragma: no cover

View File

@ -288,6 +288,65 @@ class Assignment(base.AssignmentDriverBase):
)
q.delete(False)
def create_system_grant(self, role_id, actor_id, target_id,
assignment_type, inherited):
try:
with sql.session_for_write() as session:
session.add(
SystemRoleAssignment(
type=assignment_type,
actor_id=actor_id,
target_id=target_id,
role_id=role_id,
inherited=inherited
)
)
except sql.DBDuplicateEntry: # nosec : The v3 grant APIs are silent if
# the assignment already exists
pass
def list_system_grants(self, actor_id, target_id, assignment_type):
with sql.session_for_read() as session:
query = session.query(SystemRoleAssignment)
query = query.filter_by(actor_id=actor_id)
query = query.filter_by(target_id=target_id)
query = query.filter_by(type=assignment_type)
results = query.all()
return [role.to_dict() for role in results]
def list_system_grants_by_role(self, role_id):
with sql.session_for_read() as session:
query = session.query(SystemRoleAssignment)
query = query.filter_by(role_id=role_id)
return query.all()
def check_system_grant(self, role_id, actor_id, target_id, inherited):
with sql.session_for_read() as session:
try:
q = session.query(SystemRoleAssignment)
q = q.filter_by(actor_id=actor_id)
q = q.filter_by(target_id=target_id)
q = q.filter_by(role_id=role_id)
q = q.filter_by(inherited=inherited)
q.one()
except sql.NotFound:
raise exception.RoleAssignmentNotFound(
role_id=role_id, actor_id=actor_id, target_id=target_id
)
def delete_system_grant(self, role_id, actor_id, target_id, inherited):
with sql.session_for_write() as session:
q = session.query(SystemRoleAssignment)
q = q.filter_by(actor_id=actor_id)
q = q.filter_by(target_id=target_id)
q = q.filter_by(role_id=role_id)
q = q.filter_by(inherited=inherited)
if not q.delete(False):
raise exception.RoleAssignmentNotFound(
role_id=role_id, actor_id=actor_id, target_id=target_id
)
class RoleAssignment(sql.ModelBase, sql.ModelDictMixin):
__tablename__ = 'assignment'
@ -315,3 +374,26 @@ class RoleAssignment(sql.ModelBase, sql.ModelDictMixin):
parent implementation is not applicable.
"""
return dict(self.items())
class SystemRoleAssignment(sql.ModelBase, sql.ModelDictMixin):
__tablename__ = 'system_assignment'
attributes = ['type', 'actor_id', 'target_id', 'role_id', 'inherited']
type = sql.Column(sql.String(64), nullable=False)
actor_id = sql.Column(sql.String(64), nullable=False)
target_id = sql.Column(sql.String(64), nullable=False)
role_id = sql.Column(sql.String(64), nullable=False)
inherited = sql.Column(sql.Boolean, default=False, nullable=False)
__table_args__ = (
sql.PrimaryKeyConstraint('type', 'actor_id', 'target_id', 'role_id',
'inherited'),
sql.Index('ix_system_actor_id', 'actor_id'),
)
def to_dict(self):
"""Override parent method with a simpler implementation.
RoleAssignment doesn't have non-indexed 'extra' attributes, so the
parent implementation is not applicable.
"""
return dict(self.items())