Implied roles index with cascading delete

Currently, when deleting a role that has an implied role, only one
is removed from the database. This is because the initial FKs that
were created did not take delete and update into account.

Change-Id: I09ef96cf9e4a863337399ae880e0511125f5239c
Closes-Bug: 1546562
This commit is contained in:
Alexander Makarov 2016-02-18 18:57:40 +03:00 committed by Steve Martinelli
parent d2af8e0d6a
commit 15ca0f27f1
3 changed files with 83 additions and 4 deletions

View File

@ -148,10 +148,14 @@ class Role(assignment.RoleDriverV9):
class ImpliedRoleTable(sql.ModelBase, sql.DictBase):
__tablename__ = 'implied_role'
attributes = ['prior_role_id', 'implied_role_id']
prior_role_id = sql.Column(sql.String(64), sql.ForeignKey('role.id'),
primary_key=True)
implied_role_id = sql.Column(sql.String(64), sql.ForeignKey('role.id'),
primary_key=True)
prior_role_id = sql.Column(
sql.String(64),
sql.ForeignKey('role.id', ondelete="CASCADE"),
primary_key=True)
implied_role_id = sql.Column(
sql.String(64),
sql.ForeignKey('role.id', ondelete="CASCADE"),
primary_key=True)
@classmethod
def from_dict(cls, dictionary):

View File

@ -0,0 +1,46 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import migrate
import sqlalchemy as sql
ROLE_TABLE = 'role'
IMPLIED_ROLE_TABLE = 'implied_role'
def upgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
role = sql.Table(ROLE_TABLE, meta, autoload=True)
implied_role = sql.Table(IMPLIED_ROLE_TABLE, meta, autoload=True)
fkeys = [
{'columns': [implied_role.c.prior_role_id],
'references': [role.c.id]},
{'columns': [implied_role.c.implied_role_id],
'references': [role.c.id]},
]
# NOTE(stevemar): We need to divide these into two separate loops otherwise
# they may clobber each other and only end up with one foreign key.
for fkey in fkeys:
migrate.ForeignKeyConstraint(columns=fkey['columns'],
refcolumns=fkey['references'],
name=fkey.get('name')).drop()
for fkey in fkeys:
migrate.ForeignKeyConstraint(columns=fkey['columns'],
refcolumns=fkey['references'],
name=fkey.get('name'),
ondelete="CASCADE").create()

View File

@ -41,6 +41,7 @@ from oslo_db.sqlalchemy import session as db_session
from sqlalchemy.engine import reflection
import sqlalchemy.exc
from sqlalchemy import schema
from testtools import matchers
from keystone.common import sql
from keystone.common.sql import migrate_repo
@ -134,6 +135,7 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
super(SqlMigrateBase, self).setUp()
self.load_backends()
database.initialize_sql_session()
conn_str = CONF.database.connection
if (conn_str != unit.IN_MEM_DB_CONN_STRING and
@ -816,6 +818,33 @@ class SqlUpgradeTests(SqlMigrateBase):
password_table)
self.assertListEqual(expected_users, actual_users)
def test_implied_roles_fk_on_delete_cascade(self):
if self.engine.name == 'sqlite':
self.skipTest('sqlite backend does not support foreign keys')
self.upgrade(92)
def _create_three_roles():
id_list = []
for _ in range(3):
role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
id_list.append(role['id'])
return id_list
role_id_list = _create_three_roles()
self.role_api.create_implied_role(role_id_list[0], role_id_list[1])
self.role_api.create_implied_role(role_id_list[0], role_id_list[2])
# assert that there are two roles implied by role 0.
implied_roles = self.role_api.list_implied_roles(role_id_list[0])
self.assertThat(implied_roles, matchers.HasLength(2))
self.role_api.delete_role(role_id_list[0])
# assert the cascade deletion is effective.
implied_roles = self.role_api.list_implied_roles(role_id_list[0])
self.assertThat(implied_roles, matchers.HasLength(0))
class VersionTests(SqlMigrateBase):