Add driver support for app cred access rules

Change-Id: Iff51313de8b2dc8c71efa901d4eab5ab417234d3
This commit is contained in:
Colleen Murphy 2019-01-19 20:30:28 +01:00
parent 182524d971
commit e8aa678a2b
2 changed files with 63 additions and 1 deletions

View File

@ -45,6 +45,10 @@ class ApplicationCredentialModel(sql.ModelBase, sql.ModelDictMixin):
'ApplicationCredentialRoleModel',
backref=sqlalchemy.orm.backref('application_credential'),
cascade='all, delete-orphan')
access_rules = sqlalchemy.orm.relationship(
'ApplicationCredentialAccessRuleModel',
backref=sqlalchemy.orm.backref('application_credential'),
cascade='all, delete-orphan')
class ApplicationCredentialRoleModel(sql.ModelBase, sql.ModelDictMixin):
@ -59,6 +63,35 @@ class ApplicationCredentialRoleModel(sql.ModelBase, sql.ModelDictMixin):
role_id = sql.Column(sql.String(64), primary_key=True, nullable=False)
class AccessRuleModel(sql.ModelBase, sql.ModelDictMixin):
__tablename__ = 'access_rule'
attributes = ['service', 'path', 'method']
id = sql.Column(sql.Integer, primary_key=True, nullable=False)
service = sql.Column(sql.String(64))
path = sql.Column(sql.String(128))
method = sql.Column(sql.String(16))
application_credential = sqlalchemy.orm.relationship(
'ApplicationCredentialAccessRuleModel',
backref=sqlalchemy.orm.backref('access_rule'),
cascade='all, delete-orphan')
class ApplicationCredentialAccessRuleModel(sql.ModelBase, sql.ModelDictMixin):
__tablename__ = 'application_credential_access_rule'
attributes = ['application_credential_id', 'access_rule_id']
application_credential_id = sql.Column(
sql.Integer,
sql.ForeignKey('application_credential.internal_id',
ondelete='cascade'),
primary_key=True,
nullable=False)
access_rule_id = sql.Column(
sql.Integer,
sql.ForeignKey('access_rule.id'),
primary_key=True,
nullable=False)
class ApplicationCredential(base.ApplicationCredentialDriverBase):
def _check_secret(self, secret, app_cred_ref):
@ -88,7 +121,8 @@ class ApplicationCredential(base.ApplicationCredentialDriverBase):
app_cred_ref['secret_hash'] = hashed_secret
@sql.handle_conflicts(conflict_type='application_credential')
def create_application_credential(self, application_credential, roles):
def create_application_credential(self, application_credential, roles,
access_rules=None):
app_cred = application_credential.copy()
self._hash_secret(app_cred)
with sql.session_for_write() as session:
@ -99,15 +133,31 @@ class ApplicationCredential(base.ApplicationCredentialDriverBase):
app_cred_role.application_credential = ref
app_cred_role.role_id = role['id']
session.add(app_cred_role)
if access_rules:
for access_rule in access_rules:
access_rule_ref = AccessRuleModel.from_dict(access_rule)
session.add(access_rule_ref)
app_cred_access_rule = ApplicationCredentialAccessRuleModel()
app_cred_access_rule.application_credential = ref
app_cred_access_rule.access_rule = access_rule_ref
session.add(app_cred_access_rule)
application_credential_dict = ref.to_dict()
application_credential_dict.pop('internal_id')
application_credential_dict['roles'] = roles
if access_rules is not None:
application_credential_dict['access_rules'] = access_rules
return application_credential_dict
def _to_dict(self, ref):
app_cred = ref.to_dict()
roles = [{'id': r.to_dict()['role_id']} for r in ref.roles]
app_cred['roles'] = roles
if ref.access_rules:
access_rules = [
{k: v for k, v in c.access_rule.to_dict().items() if k != 'id'}
for c in ref.access_rules
]
app_cred['access_rules'] = access_rules
app_cred.pop('internal_id')
return app_cred

View File

@ -39,6 +39,18 @@ class SQLModelTestCase(core_sql.BaseBackendSqlModels):
('role_id', sql.String, 64))
self.assertExpectedSchema('application_credential_role', cols)
def test_access_rule_model(self):
cols = (('id', sql.Integer, None),
('service', sql.String, 64),
('path', sql.String, 128),
('method', sql.String, 16))
self.assertExpectedSchema('access_rule', cols)
def test_application_credential_access_rule_model(self):
cols = (('application_credential_id', sql.Integer, None),
('access_rule_id', sql.Integer, None))
self.assertExpectedSchema('application_credential_access_rule', cols)
class SQLDriverTestCase(core_sql.BaseBackendSqlTests,
test_backends.ApplicationCredentialTests):