Add DeleteFromSelect to avoid database's limit
nova-manage db archive_deleted_rows fails if max_rows is a large number. Database has a limit of maximum sql variables in one SQL statement. It is more efficient to insert(select) directly and then delete(same select) without ever returning the selected rows back to Python. This also can avoid database's limit. Closes-Bug: #1214720 Change-Id: I29e3a5ce14c59dd2979e45e8d31fc3df04c70266
This commit is contained in:
@@ -5516,11 +5516,14 @@ def _get_default_deleted_value(table):
|
||||
@require_admin_context
|
||||
def archive_deleted_rows_for_table(context, tablename, max_rows):
|
||||
"""Move up to max_rows rows from one tables to the corresponding
|
||||
shadow table.
|
||||
shadow table. The context argument is only used for the decorator.
|
||||
|
||||
:returns: number of rows archived
|
||||
"""
|
||||
# The context argument is only used for the decorator.
|
||||
# NOTE(guochbo): There is a circular import, nova.db.sqlalchemy.utils
|
||||
# imports nova.db.sqlalchemy.api.
|
||||
from nova.db.sqlalchemy import utils as db_utils
|
||||
|
||||
engine = get_engine()
|
||||
conn = engine.connect()
|
||||
metadata = MetaData()
|
||||
@@ -5534,38 +5537,41 @@ def archive_deleted_rows_for_table(context, tablename, max_rows):
|
||||
except NoSuchTableError:
|
||||
# No corresponding shadow table; skip it.
|
||||
return rows_archived
|
||||
# Group the insert and delete in a transaction.
|
||||
with conn.begin():
|
||||
# TODO(dripton): It would be more efficient to insert(select) and then
|
||||
# delete(same select) without ever returning the selected rows back to
|
||||
# Python. sqlalchemy does not support that directly, but we have
|
||||
# nova.db.sqlalchemy.utils.InsertFromSelect for the insert side. We
|
||||
# need a corresponding function for the delete side.
|
||||
try:
|
||||
column = table.c.id
|
||||
column_name = "id"
|
||||
except AttributeError:
|
||||
# We have one table (dns_domains) where the key is called
|
||||
# "domain" rather than "id"
|
||||
column = table.c.domain
|
||||
column_name = "domain"
|
||||
query = select([table],
|
||||
table.c.deleted != default_deleted_value).\
|
||||
order_by(column).limit(max_rows)
|
||||
rows = conn.execute(query).fetchall()
|
||||
if rows:
|
||||
keys = [getattr(row, column_name) for row in rows]
|
||||
delete_statement = table.delete(column.in_(keys))
|
||||
try:
|
||||
result = conn.execute(delete_statement)
|
||||
except IntegrityError:
|
||||
# A foreign key constraint keeps us from deleting some of
|
||||
# these rows until we clean up a dependent table. Just
|
||||
# skip this table for now; we'll come back to it later.
|
||||
return rows_archived
|
||||
insert_statement = shadow_table.insert()
|
||||
conn.execute(insert_statement, rows)
|
||||
rows_archived = result.rowcount
|
||||
|
||||
if tablename == "dns_domains":
|
||||
# We have one table (dns_domains) where the key is called
|
||||
# "domain" rather than "id"
|
||||
column = table.c.domain
|
||||
column_name = "domain"
|
||||
else:
|
||||
column = table.c.id
|
||||
column_name = "id"
|
||||
# NOTE(guochbo): Use InsertFromSelect and DeleteFromSelect to avoid
|
||||
# database's limit of maximum parameter in one SQL statment.
|
||||
query_insert = select([table],
|
||||
table.c.deleted != default_deleted_value).\
|
||||
order_by(column).limit(max_rows)
|
||||
query_delete = select([column],
|
||||
table.c.deleted != default_deleted_value).\
|
||||
order_by(column).limit(max_rows)
|
||||
|
||||
insert_statement = db_utils.InsertFromSelect(shadow_table, query_insert)
|
||||
delete_statement = db_utils.DeleteFromSelect(table, query_delete, column)
|
||||
try:
|
||||
# Group the insert and delete in a transaction.
|
||||
with conn.begin():
|
||||
result_insert = conn.execute(insert_statement)
|
||||
result_delete = conn.execute(delete_statement)
|
||||
except IntegrityError:
|
||||
# A foreign key constraint keeps us from deleting some of
|
||||
# these rows until we clean up a dependent table. Just
|
||||
# skip this table for now; we'll come back to it later.
|
||||
msg = _("IntegrityError detected when archiving table %s") % tablename
|
||||
LOG.warn(msg)
|
||||
return rows_archived
|
||||
|
||||
rows_archived = result_delete.rowcount
|
||||
|
||||
return rows_archived
|
||||
|
||||
|
||||
|
||||
@@ -71,6 +71,24 @@ def visit_insert_from_select(element, compiler, **kw):
|
||||
compiler.process(element.select))
|
||||
|
||||
|
||||
class DeleteFromSelect(UpdateBase):
|
||||
def __init__(self, table, select, column):
|
||||
self.table = table
|
||||
self.select = select
|
||||
self.column = column
|
||||
|
||||
|
||||
# NOTE(guochbo): some verions of MySQL doesn't yet support subquery with
|
||||
# 'LIMIT & IN/ALL/ANY/SOME' We need work around this with nesting select .
|
||||
@compiles(DeleteFromSelect)
|
||||
def visit_delete_from_select(element, compiler, **kw):
|
||||
return "DELETE FROM %s WHERE %s in (SELECT T1.%s FROM (%s) as T1)" % (
|
||||
compiler.process(element.table, asfrom=True),
|
||||
compiler.process(element.column),
|
||||
element.column.name,
|
||||
compiler.process(element.select))
|
||||
|
||||
|
||||
def _get_not_supported_column(col_name_col_instance, column_name):
|
||||
try:
|
||||
column = col_name_col_instance[column_name]
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
import warnings
|
||||
|
||||
from migrate.changeset import UniqueConstraint
|
||||
@@ -41,6 +43,43 @@ class CustomType(UserDefinedType):
|
||||
class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
|
||||
"""Class for testing utils that are used in db migrations."""
|
||||
|
||||
def test_delete_from_select(self):
|
||||
table_name = "__test_deletefromselect_table__"
|
||||
uuidstrs = []
|
||||
for unused in range(10):
|
||||
uuidstrs.append(uuid.uuid4().hex)
|
||||
for key, engine in self.engines.items():
|
||||
meta = MetaData()
|
||||
meta.bind = engine
|
||||
conn = engine.connect()
|
||||
test_table = Table(table_name, meta,
|
||||
Column('id', Integer, primary_key=True,
|
||||
nullable=False, autoincrement=True),
|
||||
Column('uuid', String(36), nullable=False))
|
||||
test_table.create()
|
||||
# Add 10 rows to table
|
||||
for uuidstr in uuidstrs:
|
||||
ins_stmt = test_table.insert().values(uuid=uuidstr)
|
||||
conn.execute(ins_stmt)
|
||||
|
||||
# Delete 4 rows in one chunk
|
||||
column = test_table.c.id
|
||||
query_delete = select([column],
|
||||
test_table.c.id < 5).order_by(column)
|
||||
delete_statement = utils.DeleteFromSelect(test_table,
|
||||
query_delete, column)
|
||||
result_delete = conn.execute(delete_statement)
|
||||
# Verify we delete 4 rows
|
||||
self.assertEqual(result_delete.rowcount, 4)
|
||||
|
||||
query_all = select([test_table]).\
|
||||
where(test_table.c.uuid.in_(uuidstrs))
|
||||
rows = conn.execute(query_all).fetchall()
|
||||
# Verify we still have 6 rows in table
|
||||
self.assertEqual(len(rows), 6)
|
||||
|
||||
test_table.drop()
|
||||
|
||||
def test_utils_drop_unique_constraint(self):
|
||||
table_name = "__test_tmp_table__"
|
||||
uc_name = 'uniq_foo'
|
||||
|
||||
Reference in New Issue
Block a user