Merge "Dynamically archive FK related records in archive_deleted_rows"

This commit is contained in:
Zuul 2021-03-23 13:19:38 +00:00 committed by Gerrit Code Review
commit 50f4840048
3 changed files with 164 additions and 89 deletions

View File

@ -4052,64 +4052,132 @@ def task_log_end_task(context, task_name, period_beginning, period_ending,
################## ##################
def _archive_if_instance_deleted(table, shadow_table, instances, conn, def _get_tables_with_fk_to_table(table):
max_rows, before): """Get a list of tables that refer to the given table by foreign key (FK).
"""Look for records that pertain to deleted instances, but may not be
deleted themselves. This catches cases where we delete an instance,
but leave some residue because of a failure in a cleanup path or
similar.
Logic is: if I have a column called instance_uuid, and that instance :param table: Table object (parent) for which to find references by FK
is deleted, then I can be deleted.
:returns: A list of Table objects that refer to the specified table by FK
""" """
tables = []
for t in models.BASE.metadata.tables.values():
for fk in t.foreign_keys:
if fk.references(table):
tables.append(t)
return tables
# NOTE(jake): handle instance_actions_events differently as it relies on
# instance_actions.id not instances.uuid
if table.name == "instance_actions_events":
instance_actions = models.BASE.metadata.tables["instance_actions"]
query_select = sql.select(
[table],
and_(instances.c.deleted != instances.c.deleted.default.arg,
instances.c.uuid == instance_actions.c.instance_uuid,
instance_actions.c.id == table.c.action_id))
else: def _get_fk_stmts(metadata, conn, table, column, records):
query_select = sql.select( """Find records related to this table by foreign key (FK) and create and
[table], return insert/delete statements for them.
and_(instances.c.deleted != instances.c.deleted.default.arg,
instances.c.uuid == table.c.instance_uuid))
if before: Logic is: find the tables that reference the table passed to this method
query_select = query_select.where(instances.c.deleted_at < before) and walk the tree of references by FK. As child records are found, prepend
them to deques to execute later in a single database transaction (to avoid
orphaning related records if any one insert/delete fails or the archive
process is otherwise interrupted).
query_select = query_select.order_by(table.c.id).limit(max_rows) :param metadata: Metadata object to use to construct a shadow Table object
:param conn: Connection object to use to select records related by FK
query_insert = shadow_table.insert(inline=True).\ :param table: Table object (parent) for which to find references by FK
from_select([c.name for c in table.c], query_select) :param column: Column object (parent) to use to select records related by
FK
delete_statement = DeleteFromSelect(table, query_select, :param records: A list of records (column values) to use to select records
table.c.id) related by FK
:returns: tuple of (insert statements, delete statements) for records
related by FK to insert into shadow tables and delete from main tables
"""
inserts = collections.deque()
deletes = collections.deque()
fk_tables = _get_tables_with_fk_to_table(table)
for fk_table in fk_tables:
# Create the shadow table for the referencing table.
fk_shadow_tablename = _SHADOW_TABLE_PREFIX + fk_table.name
try: try:
with conn.begin(): fk_shadow_table = Table(fk_shadow_tablename, metadata,
conn.execute(query_insert) autoload=True)
result_delete = conn.execute(delete_statement) except NoSuchTableError:
return result_delete.rowcount # No corresponding shadow table; skip it.
except db_exc.DBReferenceError as ex: continue
LOG.warning('Failed to archive %(table)s: %(error)s',
{'table': table.name, # TODO(stephenfin): Drop this when we drop the table
'error': str(ex)}) if fk_table.name == "dns_domains":
return 0 # We have one table (dns_domains) where the key is called
# "domain" rather than "id"
fk_column = fk_table.c.domain
else:
fk_column = fk_table.c.id
for fk in fk_table.foreign_keys:
# We need to find the records in the referring (child) table that
# correspond to the records in our (parent) table so we can archive
# them.
# First, select the column in the parent referenced by the child
# table that corresponds to the parent table records that were
# passed in.
# Example: table = 'instances' and fk_table = 'instance_extra'
# fk.parent = instance_extra.instance_uuid
# fk.column = instances.uuid
# SELECT instances.uuid FROM instances, instance_extra
# WHERE instance_extra.instance_uuid = instances.uuid
# AND instance.id IN (<ids>)
# We need the instance uuids for the <ids> in order to
# look up the matching instance_extra records.
select = sql.select([fk.column]).where(
sql.and_(fk.parent == fk.column, column.in_(records)))
rows = conn.execute(select).fetchall()
p_records = [r[0] for r in rows]
# Then, select rows in the child table that correspond to the
# parent table records that were passed in.
# Example: table = 'instances' and fk_table = 'instance_extra'
# fk.parent = instance_extra.instance_uuid
# fk.column = instances.uuid
# SELECT instance_extra.id FROM instance_extra, instances
# WHERE instance_extra.instance_uuid = instances.uuid
# AND instances.uuid IN (<uuids>)
# We will get the instance_extra ids we need to archive
# them.
fk_select = sql.select([fk_column]).where(
sql.and_(fk.parent == fk.column, fk.column.in_(p_records)))
fk_rows = conn.execute(fk_select).fetchall()
fk_records = [r[0] for r in fk_rows]
if fk_records:
# If we found any records in the child table, create shadow
# table insert statements for them and prepend them to the
# deque.
fk_columns = [c.name for c in fk_table.c]
fk_insert = fk_shadow_table.insert(inline=True).\
from_select(fk_columns, sql.select([fk_table],
fk_column.in_(fk_records)))
inserts.appendleft(fk_insert)
# Create main table delete statements and prepend them to the
# deque.
fk_delete = fk_table.delete().where(fk_column.in_(fk_records))
deletes.appendleft(fk_delete)
# Repeat for any possible nested child tables.
i, d = _get_fk_stmts(metadata, conn, fk_table, fk_column, fk_records)
inserts.extendleft(i)
deletes.extendleft(d)
return inserts, deletes
def _archive_deleted_rows_for_table(metadata, tablename, max_rows, before): def _archive_deleted_rows_for_table(metadata, tablename, max_rows, before):
"""Move up to max_rows rows from one tables to the corresponding """Move up to max_rows rows from one tables to the corresponding
shadow table. shadow table.
:returns: 2-item tuple: Will also follow FK constraints and archive all referring rows.
Example: archving a record from the 'instances' table will also archive
the 'instance_extra' record before archiving the 'instances' record.
:returns: 3-item tuple:
- number of rows archived - number of rows archived
- list of UUIDs of instances that were archived - list of UUIDs of instances that were archived
- number of extra rows archived (due to FK constraints)
dict of {tablename: rows_archived}
""" """
conn = metadata.bind.connect() conn = metadata.bind.connect()
# NOTE(tdurakov): table metadata should be received # NOTE(tdurakov): table metadata should be received
@ -4125,7 +4193,7 @@ def _archive_deleted_rows_for_table(metadata, tablename, max_rows, before):
shadow_table = Table(shadow_tablename, metadata, autoload=True) shadow_table = Table(shadow_tablename, metadata, autoload=True)
except NoSuchTableError: except NoSuchTableError:
# No corresponding shadow table; skip it. # No corresponding shadow table; skip it.
return rows_archived, deleted_instance_uuids return rows_archived, deleted_instance_uuids, {}
# TODO(stephenfin): Drop this when we drop the table # TODO(stephenfin): Drop this when we drop the table
if tablename == "dns_domains": if tablename == "dns_domains":
@ -4148,10 +4216,29 @@ def _archive_deleted_rows_for_table(metadata, tablename, max_rows, before):
rows = conn.execute(select).fetchall() rows = conn.execute(select).fetchall()
records = [r[0] for r in rows] records = [r[0] for r in rows]
# We will archive deleted rows for this table and also generate insert and
# delete statements for extra rows we may archive by following FK
# relationships. Because we are iterating over the sorted_tables (list of
# Table objects sorted in order of foreign key dependency), new inserts and
# deletes ("leaves") will be added to the fronts of the deques created in
# _get_fk_stmts. This way, we make sure we delete child table records
# before we delete their parent table records.
# Keep track of any extra tablenames to number of rows that we archive by
# following FK relationships.
# {tablename: extra_rows_archived}
extras = collections.defaultdict(int)
if records: if records:
insert = shadow_table.insert(inline=True).\ insert = shadow_table.insert(inline=True).\
from_select(columns, sql.select([table], column.in_(records))) from_select(columns, sql.select([table], column.in_(records)))
delete = table.delete().where(column.in_(records)) delete = table.delete().where(column.in_(records))
# Walk FK relationships and add insert/delete statements for rows that
# refer to this table via FK constraints. fk_inserts and fk_deletes
# will be prepended to by _get_fk_stmts if referring rows are found by
# FK constraints.
fk_inserts, fk_deletes = _get_fk_stmts(
metadata, conn, table, column, records)
# NOTE(tssurya): In order to facilitate the deletion of records from # NOTE(tssurya): In order to facilitate the deletion of records from
# instance_mappings, request_specs and instance_group_member tables in # instance_mappings, request_specs and instance_group_member tables in
# the nova_api DB, the rows of deleted instances from the instances # the nova_api DB, the rows of deleted instances from the instances
@ -4165,9 +4252,14 @@ def _archive_deleted_rows_for_table(metadata, tablename, max_rows, before):
try: try:
# Group the insert and delete in a transaction. # Group the insert and delete in a transaction.
with conn.begin(): with conn.begin():
for fk_insert in fk_inserts:
conn.execute(fk_insert)
for fk_delete in fk_deletes:
result_fk_delete = conn.execute(fk_delete)
extras[fk_delete.table.name] += result_fk_delete.rowcount
conn.execute(insert) conn.execute(insert)
result_delete = conn.execute(delete) result_delete = conn.execute(delete)
rows_archived = result_delete.rowcount rows_archived += result_delete.rowcount
except db_exc.DBReferenceError as ex: except db_exc.DBReferenceError as ex:
# A foreign key constraint keeps us from deleting some of # A foreign key constraint keeps us from deleting some of
# these rows until we clean up a dependent table. Just # these rows until we clean up a dependent table. Just
@ -4176,22 +4268,7 @@ def _archive_deleted_rows_for_table(metadata, tablename, max_rows, before):
"%(tablename)s: %(error)s", "%(tablename)s: %(error)s",
{'tablename': tablename, 'error': str(ex)}) {'tablename': tablename, 'error': str(ex)})
# NOTE(jake): instance_actions_events doesn't have a instance_uuid column return rows_archived, deleted_instance_uuids, extras
# but still needs to be archived as it is a FK constraint
if ((max_rows is None or rows_archived < max_rows) and
# NOTE(melwitt): The pci_devices table uses the 'instance_uuid'
# column to track the allocated association of a PCI device and its
# records are not tied to the lifecycles of instance records.
(tablename != 'pci_devices' and
'instance_uuid' in columns or
tablename == 'instance_actions_events')):
instances = models.BASE.metadata.tables['instances']
limit = max_rows - rows_archived if max_rows is not None else None
extra = _archive_if_instance_deleted(table, shadow_table, instances,
conn, limit, before)
rows_archived += extra
return rows_archived, deleted_instance_uuids
def archive_deleted_rows(context=None, max_rows=None, before=None): def archive_deleted_rows(context=None, max_rows=None, before=None):
@ -4215,13 +4292,18 @@ def archive_deleted_rows(context=None, max_rows=None, before=None):
- list of UUIDs of instances that were archived - list of UUIDs of instances that were archived
- total number of rows that were archived - total number of rows that were archived
""" """
table_to_rows_archived = {} table_to_rows_archived = collections.defaultdict(int)
deleted_instance_uuids = [] deleted_instance_uuids = []
total_rows_archived = 0 total_rows_archived = 0
meta = MetaData(get_engine(use_slave=True, context=context)) meta = MetaData(get_engine(use_slave=True, context=context))
meta.reflect() meta.reflect()
# Reverse sort the tables so we get the leaf nodes first for processing. # Get the sorted list of tables in order of foreign key dependency.
for table in reversed(meta.sorted_tables): # Process the parent tables and find their dependent records in order to
# archive the related records in a single database transactions. The goal
# is to avoid a situation where, for example, an 'instances' table record
# is missing its corresponding 'instance_extra' record due to running the
# archive_deleted_rows command with max_rows.
for table in meta.sorted_tables:
tablename = table.name tablename = table.name
rows_archived = 0 rows_archived = 0
# skip the special sqlalchemy-migrate migrate_version table and any # skip the special sqlalchemy-migrate migrate_version table and any
@ -4229,7 +4311,7 @@ def archive_deleted_rows(context=None, max_rows=None, before=None):
if (tablename == 'migrate_version' or if (tablename == 'migrate_version' or
tablename.startswith(_SHADOW_TABLE_PREFIX)): tablename.startswith(_SHADOW_TABLE_PREFIX)):
continue continue
rows_archived, _deleted_instance_uuids = ( rows_archived, _deleted_instance_uuids, extras = (
_archive_deleted_rows_for_table( _archive_deleted_rows_for_table(
meta, tablename, meta, tablename,
max_rows=max_rows - total_rows_archived, max_rows=max_rows - total_rows_archived,
@ -4240,6 +4322,9 @@ def archive_deleted_rows(context=None, max_rows=None, before=None):
# Only report results for tables that had updates. # Only report results for tables that had updates.
if rows_archived: if rows_archived:
table_to_rows_archived[tablename] = rows_archived table_to_rows_archived[tablename] = rows_archived
for tablename, extra_rows_archived in extras.items():
table_to_rows_archived[tablename] += extra_rows_archived
total_rows_archived += extra_rows_archived
if total_rows_archived >= max_rows: if total_rows_archived >= max_rows:
break break
return table_to_rows_archived, deleted_instance_uuids, total_rows_archived return table_to_rows_archived, deleted_instance_uuids, total_rows_archived

View File

@ -167,13 +167,7 @@ class TestDatabaseArchive(integrated_helpers._IntegratedTestBase):
exceptions.append(ex) exceptions.append(ex)
if archived == 0: if archived == 0:
break break
# FIXME(melwitt): OrphanedObjectError is raised because of the bug. self.assertFalse(exceptions)
self.assertTrue(exceptions)
for ex in exceptions:
self.assertEqual(500, ex.response.status_code)
self.assertIn('OrphanedObjectError', str(ex))
# FIXME(melwitt): Uncomment when the bug is fixed.
# self.assertFalse(exceptions)
def _get_table_counts(self): def _get_table_counts(self):
engine = sqlalchemy_api.get_engine() engine = sqlalchemy_api.get_engine()

View File

@ -6075,17 +6075,24 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
instance_actions_events=1) instance_actions_events=1)
self._assertEqualObjects(expected, results[0]) self._assertEqualObjects(expected, results[0])
# Archive 1 row deleted before 2017-01-03. instance_action_events # Archive 1 row deleted before 2017-01-03
# should be the table with row deleted due to FK contraints # Because the instances table will be processed first, tables that
# refer to it (instance_actions and instance_action_events) will be
# visited and archived in the same transaction as the instance, to
# avoid orphaning the instance record (archive dependent records in one
# transaction)
before_date = dateutil_parser.parse('2017-01-03', fuzzy=True) before_date = dateutil_parser.parse('2017-01-03', fuzzy=True)
results = db.archive_deleted_rows(max_rows=1, before=before_date) results = db.archive_deleted_rows(max_rows=1, before=before_date)
expected = dict(instance_actions_events=1) expected = dict(instances=1,
instance_actions=1,
instance_actions_events=1)
self._assertEqualObjects(expected, results[0]) self._assertEqualObjects(expected, results[0])
# Archive all other rows deleted before 2017-01-03. This should # Try to archive all other rows deleted before 2017-01-03. This should
# delete row in instance_actions, then row in instances due to FK # not archive anything because the instances table and tables that
# constraints # refer to it (instance_actions and instance_action_events) were all
# archived in the last run.
results = db.archive_deleted_rows(max_rows=100, before=before_date) results = db.archive_deleted_rows(max_rows=100, before=before_date)
expected = dict(instances=1, instance_actions=1) expected = {}
self._assertEqualObjects(expected, results[0]) self._assertEqualObjects(expected, results[0])
# Verify we have 4 left in main # Verify we have 4 left in main
@ -6233,19 +6240,8 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
ins_stmt = self.migrations.insert().values(instance_uuid=instance_uuid, ins_stmt = self.migrations.insert().values(instance_uuid=instance_uuid,
deleted=0) deleted=0)
self.conn.execute(ins_stmt) self.conn.execute(ins_stmt)
# The first try to archive instances should fail, due to FK. # Archiving instances should result in migrations related to the
num = sqlalchemy_api._archive_deleted_rows_for_table(self.metadata, # instances also being archived.
"instances",
max_rows=None,
before=None)
self.assertEqual(0, num[0])
# Then archiving migrations should work.
num = sqlalchemy_api._archive_deleted_rows_for_table(self.metadata,
"migrations",
max_rows=None,
before=None)
self.assertEqual(1, num[0])
# Then archiving instances should work.
num = sqlalchemy_api._archive_deleted_rows_for_table(self.metadata, num = sqlalchemy_api._archive_deleted_rows_for_table(self.metadata,
"instances", "instances",
max_rows=None, max_rows=None,