Merge "Allow archiving deleted rows to shadow tables, for performance."
This commit is contained in:
commit
2ecf9e8bf1
@ -770,6 +770,17 @@ class DbCommands(object):
|
||||
"""Print the current database version."""
|
||||
print migration.db_version()
|
||||
|
||||
@args('--max_rows', dest='max_rows', metavar='<number>',
|
||||
help='Maximum number of deleted rows to archive')
|
||||
def archive_deleted_rows(self, max_rows=None):
|
||||
"""Move up to max_rows deleted rows from production tables to shadow
|
||||
tables.
|
||||
"""
|
||||
if max_rows is not None:
|
||||
max_rows = int(max_rows)
|
||||
admin_context = context.get_admin_context()
|
||||
db.archive_deleted_rows(admin_context, max_rows)
|
||||
|
||||
|
||||
class InstanceTypeCommands(object):
|
||||
"""Class for managing instance types / flavors."""
|
||||
|
@ -1715,3 +1715,25 @@ def task_log_get(context, task_name, period_beginning,
|
||||
period_ending, host, state=None):
|
||||
return IMPL.task_log_get(context, task_name, period_beginning,
|
||||
period_ending, host, state)
|
||||
|
||||
|
||||
####################
|
||||
|
||||
|
||||
def archive_deleted_rows(context, max_rows=None):
|
||||
"""Move up to max_rows rows from production tables to corresponding shadow
|
||||
tables.
|
||||
|
||||
:returns: number of rows archived.
|
||||
"""
|
||||
return IMPL.archive_deleted_rows(context, max_rows=max_rows)
|
||||
|
||||
|
||||
def archive_deleted_rows_for_table(context, tablename, max_rows=None):
|
||||
"""Move up to max_rows rows from tablename to corresponding shadow
|
||||
table.
|
||||
|
||||
:returns: number of rows archived.
|
||||
"""
|
||||
return IMPL.archive_deleted_rows_for_table(context, tablename,
|
||||
max_rows=max_rows)
|
||||
|
@ -26,13 +26,20 @@ import functools
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy import Boolean
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.exc import NoSuchTableError
|
||||
from sqlalchemy import Integer
|
||||
from sqlalchemy import MetaData
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy.orm import joinedload_all
|
||||
from sqlalchemy.schema import Table
|
||||
from sqlalchemy.sql.expression import asc
|
||||
from sqlalchemy.sql.expression import desc
|
||||
from sqlalchemy.sql.expression import select
|
||||
from sqlalchemy.sql import func
|
||||
from sqlalchemy import String
|
||||
|
||||
from nova import block_device
|
||||
from nova.compute import task_states
|
||||
@ -63,6 +70,7 @@ CONF.import_opt('sql_connection',
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
get_engine = db_session.get_engine
|
||||
get_session = db_session.get_session
|
||||
|
||||
|
||||
@ -4786,3 +4794,94 @@ def task_log_end_task(context, task_name, period_beginning, period_ending,
|
||||
if rows == 0:
|
||||
#It's not running!
|
||||
raise exception.TaskNotRunning(task_name=task_name, host=host)
|
||||
|
||||
|
||||
def _get_default_deleted_value(table):
|
||||
# TODO(dripton): It would be better to introspect the actual default value
|
||||
# from the column, but I don't see a way to do that in the low-level APIs
|
||||
# of SQLAlchemy 0.7. 0.8 has better introspection APIs, which we should
|
||||
# use when Nova is ready to require 0.8.
|
||||
deleted_column_type = table.c.deleted.type
|
||||
if isinstance(deleted_column_type, Integer):
|
||||
return 0
|
||||
elif isinstance(deleted_column_type, Boolean):
|
||||
return False
|
||||
elif isinstance(deleted_column_type, String):
|
||||
return ""
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def archive_deleted_rows_for_table(context, tablename, max_rows=None):
|
||||
"""Move up to max_rows rows from one tables to the corresponding
|
||||
shadow table.
|
||||
|
||||
:returns: number of rows archived
|
||||
"""
|
||||
# The context argument is only used for the decorator.
|
||||
if max_rows is None:
|
||||
max_rows = 5000
|
||||
engine = get_engine()
|
||||
conn = engine.connect()
|
||||
metadata = MetaData()
|
||||
metadata.bind = engine
|
||||
table = Table(tablename, metadata, autoload=True)
|
||||
default_deleted_value = _get_default_deleted_value(table)
|
||||
shadow_tablename = "shadow_" + tablename
|
||||
rows_archived = 0
|
||||
try:
|
||||
shadow_table = Table(shadow_tablename, metadata, autoload=True)
|
||||
except NoSuchTableError:
|
||||
# No corresponding shadow table; skip it.
|
||||
return rows_archived
|
||||
# Group the insert and delete in a transaction.
|
||||
with conn.begin():
|
||||
# TODO(dripton): It would be more efficient to insert(select) and then
|
||||
# delete(same select) without ever returning the selected rows back to
|
||||
# Python. sqlalchemy does not support that directly, but we have
|
||||
# nova.db.sqlalchemy.utils.InsertFromSelect for the insert side. We
|
||||
# need a corresponding function for the delete side.
|
||||
try:
|
||||
column = table.c.id
|
||||
column_name = "id"
|
||||
except AttributeError:
|
||||
# We have one table (dns_domains) where the key is called
|
||||
# "domain" rather than "id"
|
||||
column = table.c.domain
|
||||
column_name = "domain"
|
||||
query = select([table],
|
||||
table.c.deleted != default_deleted_value).\
|
||||
order_by(column).limit(max_rows)
|
||||
rows = conn.execute(query).fetchall()
|
||||
if rows:
|
||||
insert_statement = shadow_table.insert()
|
||||
conn.execute(insert_statement, rows)
|
||||
keys = [getattr(row, column_name) for row in rows]
|
||||
delete_statement = table.delete(column.in_(keys))
|
||||
result = conn.execute(delete_statement)
|
||||
rows_archived = result.rowcount
|
||||
return rows_archived
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def archive_deleted_rows(context, max_rows=None):
|
||||
"""Move up to max_rows rows from production tables to the corresponding
|
||||
shadow tables.
|
||||
|
||||
:returns: Number of rows archived.
|
||||
"""
|
||||
# The context argument is only used for the decorator.
|
||||
if max_rows is None:
|
||||
max_rows = 5000
|
||||
tablenames = []
|
||||
for model_class in models.__dict__.itervalues():
|
||||
if hasattr(model_class, "__tablename__"):
|
||||
tablenames.append(model_class.__tablename__)
|
||||
rows_archived = 0
|
||||
for tablename in tablenames:
|
||||
rows_archived += archive_deleted_rows_for_table(context, tablename,
|
||||
max_rows=max_rows - rows_archived)
|
||||
if rows_archived >= max_rows:
|
||||
break
|
||||
return rows_archived
|
||||
|
@ -0,0 +1,77 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from sqlalchemy import BigInteger, Column, MetaData, Table
|
||||
from sqlalchemy.types import NullType
|
||||
|
||||
from nova.openstack.common import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def upgrade(migrate_engine):
|
||||
meta = MetaData(migrate_engine)
|
||||
meta.reflect(migrate_engine)
|
||||
table_names = meta.tables.keys()
|
||||
|
||||
meta.bind = migrate_engine
|
||||
|
||||
for table_name in table_names:
|
||||
if table_name.startswith('shadow'):
|
||||
continue
|
||||
table = Table(table_name, meta, autoload=True)
|
||||
|
||||
columns = []
|
||||
for column in table.columns:
|
||||
column_copy = None
|
||||
# NOTE(boris-42): BigInteger is not supported by sqlite, so
|
||||
# after copy it will have NullType, other
|
||||
# types that are used in Nova are supported by
|
||||
# sqlite.
|
||||
if isinstance(column.type, NullType):
|
||||
column_copy = Column(column.name, BigInteger(), default=0)
|
||||
else:
|
||||
column_copy = column.copy()
|
||||
columns.append(column_copy)
|
||||
|
||||
shadow_table_name = 'shadow_' + table_name
|
||||
shadow_table = Table(shadow_table_name, meta, *columns,
|
||||
mysql_engine='InnoDB')
|
||||
try:
|
||||
shadow_table.create()
|
||||
except Exception:
|
||||
LOG.info(repr(shadow_table))
|
||||
LOG.exception(_('Exception while creating table.'))
|
||||
raise
|
||||
|
||||
|
||||
def downgrade(migrate_engine):
|
||||
meta = MetaData(migrate_engine)
|
||||
meta.reflect(migrate_engine)
|
||||
table_names = meta.tables.keys()
|
||||
|
||||
meta.bind = migrate_engine
|
||||
|
||||
for table_name in table_names:
|
||||
if table_name.startswith('shadow'):
|
||||
continue
|
||||
shadow_table_name = 'shadow_' + table_name
|
||||
shadow_table = Table(shadow_table_name, meta, autoload=True)
|
||||
try:
|
||||
shadow_table.drop()
|
||||
except Exception:
|
||||
LOG.error(_("table '%s' not dropped") % shadow_table_name)
|
@ -22,10 +22,15 @@
|
||||
import datetime
|
||||
import uuid as stdlib_uuid
|
||||
|
||||
from sqlalchemy import MetaData
|
||||
from sqlalchemy.schema import Table
|
||||
from sqlalchemy.sql.expression import select
|
||||
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.openstack.common import cfg
|
||||
from nova.openstack.common.db.sqlalchemy import session as db_session
|
||||
from nova.openstack.common import timeutils
|
||||
from nova import test
|
||||
from nova.tests import matchers
|
||||
@ -36,6 +41,9 @@ CONF = cfg.CONF
|
||||
CONF.import_opt('reserved_host_memory_mb', 'nova.compute.resource_tracker')
|
||||
CONF.import_opt('reserved_host_disk_mb', 'nova.compute.resource_tracker')
|
||||
|
||||
get_engine = db_session.get_engine
|
||||
get_session = db_session.get_session
|
||||
|
||||
|
||||
class DbApiTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
@ -1791,3 +1799,156 @@ class TaskLogTestCase(test.TestCase):
|
||||
result = db.task_log_get(self.context, self.task_name, self.begin,
|
||||
self.end, self.host)
|
||||
self.assertEqual(result['errors'], 1)
|
||||
|
||||
|
||||
class ArchiveTestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ArchiveTestCase, self).setUp()
|
||||
self.context = context.get_admin_context()
|
||||
engine = get_engine()
|
||||
self.conn = engine.connect()
|
||||
self.metadata = MetaData()
|
||||
self.metadata.bind = engine
|
||||
self.table1 = Table("instance_id_mappings",
|
||||
self.metadata,
|
||||
autoload=True)
|
||||
self.shadow_table1 = Table("shadow_instance_id_mappings",
|
||||
self.metadata,
|
||||
autoload=True)
|
||||
self.table2 = Table("dns_domains",
|
||||
self.metadata,
|
||||
autoload=True)
|
||||
self.shadow_table2 = Table("shadow_dns_domains",
|
||||
self.metadata,
|
||||
autoload=True)
|
||||
self.uuidstrs = []
|
||||
for unused in xrange(6):
|
||||
self.uuidstrs.append(stdlib_uuid.uuid4().hex)
|
||||
|
||||
def tearDown(self):
|
||||
super(ArchiveTestCase, self).tearDown()
|
||||
delete_statement1 = self.table1.delete(
|
||||
self.table1.c.uuid.in_(self.uuidstrs))
|
||||
self.conn.execute(delete_statement1)
|
||||
delete_statement2 = self.shadow_table1.delete(
|
||||
self.shadow_table1.c.uuid.in_(self.uuidstrs))
|
||||
self.conn.execute(delete_statement2)
|
||||
delete_statement3 = self.table2.delete(self.table2.c.domain.in_(
|
||||
self.uuidstrs))
|
||||
self.conn.execute(delete_statement3)
|
||||
delete_statement4 = self.shadow_table2.delete(
|
||||
self.shadow_table2.c.domain.in_(self.uuidstrs))
|
||||
self.conn.execute(delete_statement4)
|
||||
|
||||
def test_archive_deleted_rows(self):
|
||||
# Add 6 rows to table
|
||||
for uuidstr in self.uuidstrs:
|
||||
insert_statement = self.table1.insert().values(uuid=uuidstr)
|
||||
self.conn.execute(insert_statement)
|
||||
# Set 4 to deleted
|
||||
update_statement = self.table1.update().\
|
||||
where(self.table1.c.uuid.in_(self.uuidstrs[:4]))\
|
||||
.values(deleted=True)
|
||||
self.conn.execute(update_statement)
|
||||
query1 = select([self.table1]).where(self.table1.c.uuid.in_(
|
||||
self.uuidstrs))
|
||||
rows1 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 6 in main
|
||||
self.assertEqual(len(rows1), 6)
|
||||
query2 = select([self.shadow_table1]).\
|
||||
where(self.shadow_table1.c.uuid.in_(self.uuidstrs))
|
||||
rows2 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 0 in shadow
|
||||
self.assertEqual(len(rows2), 0)
|
||||
# Archive 2 rows
|
||||
db.archive_deleted_rows(self.context, max_rows=2)
|
||||
rows3 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 4 left in main
|
||||
self.assertEqual(len(rows3), 4)
|
||||
rows4 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 2 in shadow
|
||||
self.assertEqual(len(rows4), 2)
|
||||
# Archive 2 more rows
|
||||
db.archive_deleted_rows(self.context, max_rows=2)
|
||||
rows5 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 2 left in main
|
||||
self.assertEqual(len(rows5), 2)
|
||||
rows6 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 4 in shadow
|
||||
self.assertEqual(len(rows6), 4)
|
||||
# Try to archive more, but there are no deleted rows left.
|
||||
db.archive_deleted_rows(self.context, max_rows=2)
|
||||
rows7 = self.conn.execute(query1).fetchall()
|
||||
# Verify we still have 2 left in main
|
||||
self.assertEqual(len(rows7), 2)
|
||||
rows8 = self.conn.execute(query2).fetchall()
|
||||
# Verify we still have 4 in shadow
|
||||
self.assertEqual(len(rows8), 4)
|
||||
|
||||
def test_archive_deleted_rows_for_table(self):
|
||||
tablename = "instance_id_mappings"
|
||||
# Add 6 rows to table
|
||||
for uuidstr in self.uuidstrs:
|
||||
insert_statement = self.table1.insert().values(uuid=uuidstr)
|
||||
self.conn.execute(insert_statement)
|
||||
# Set 4 to deleted
|
||||
update_statement = self.table1.update().\
|
||||
where(self.table1.c.uuid.in_(self.uuidstrs[:4]))\
|
||||
.values(deleted=True)
|
||||
self.conn.execute(update_statement)
|
||||
query1 = select([self.table1]).where(self.table1.c.uuid.in_(
|
||||
self.uuidstrs))
|
||||
rows1 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 6 in main
|
||||
self.assertEqual(len(rows1), 6)
|
||||
query2 = select([self.shadow_table1]).\
|
||||
where(self.shadow_table1.c.uuid.in_(self.uuidstrs))
|
||||
rows2 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 0 in shadow
|
||||
self.assertEqual(len(rows2), 0)
|
||||
# Archive 2 rows
|
||||
db.archive_deleted_rows_for_table(self.context, tablename, max_rows=2)
|
||||
rows3 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 4 left in main
|
||||
self.assertEqual(len(rows3), 4)
|
||||
rows4 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 2 in shadow
|
||||
self.assertEqual(len(rows4), 2)
|
||||
# Archive 2 more rows
|
||||
db.archive_deleted_rows_for_table(self.context, tablename, max_rows=2)
|
||||
rows5 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 2 left in main
|
||||
self.assertEqual(len(rows5), 2)
|
||||
rows6 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 4 in shadow
|
||||
self.assertEqual(len(rows6), 4)
|
||||
# Try to archive more, but there are no deleted rows left.
|
||||
db.archive_deleted_rows_for_table(self.context, tablename, max_rows=2)
|
||||
rows7 = self.conn.execute(query1).fetchall()
|
||||
# Verify we still have 2 left in main
|
||||
self.assertEqual(len(rows7), 2)
|
||||
rows8 = self.conn.execute(query2).fetchall()
|
||||
# Verify we still have 4 in shadow
|
||||
self.assertEqual(len(rows8), 4)
|
||||
|
||||
def test_archive_deleted_rows_no_id_column(self):
|
||||
uuidstr0 = self.uuidstrs[0]
|
||||
insert_statement = self.table2.insert().values(domain=uuidstr0)
|
||||
self.conn.execute(insert_statement)
|
||||
update_statement = self.table2.update().\
|
||||
where(self.table2.c.domain == uuidstr0).\
|
||||
values(deleted=True)
|
||||
self.conn.execute(update_statement)
|
||||
query1 = select([self.table2], self.table2.c.domain == uuidstr0)
|
||||
rows1 = self.conn.execute(query1).fetchall()
|
||||
self.assertEqual(len(rows1), 1)
|
||||
query2 = select([self.shadow_table2],
|
||||
self.shadow_table2.c.domain == uuidstr0)
|
||||
rows2 = self.conn.execute(query2).fetchall()
|
||||
self.assertEqual(len(rows2), 0)
|
||||
db.archive_deleted_rows(self.context, max_rows=1)
|
||||
rows3 = self.conn.execute(query1).fetchall()
|
||||
self.assertEqual(len(rows3), 0)
|
||||
rows4 = self.conn.execute(query2).fetchall()
|
||||
self.assertEqual(len(rows4), 1)
|
||||
|
@ -631,3 +631,49 @@ class TestMigrations(BaseMigrationTestCase):
|
||||
self.assertIn(prop_name, inst_sys_meta)
|
||||
self.assertEqual(str(inst_sys_meta[prop_name]),
|
||||
str(inst_type[prop]))
|
||||
|
||||
# migration 154, add shadow tables for deleted data
|
||||
# There are 53 shadow tables but we only test one
|
||||
# There are additional tests in test_db_api.py
|
||||
def _prerun_154(self, engine):
|
||||
meta = sqlalchemy.schema.MetaData()
|
||||
meta.reflect(engine)
|
||||
table_names = meta.tables.keys()
|
||||
for table_name in table_names:
|
||||
self.assertFalse(table_name.startswith("_shadow"))
|
||||
|
||||
def _check_154(self, engine, data):
|
||||
meta = sqlalchemy.schema.MetaData()
|
||||
meta.reflect(engine)
|
||||
table_names = set(meta.tables.keys())
|
||||
for table_name in table_names:
|
||||
print table_name
|
||||
if table_name.startswith("shadow_"):
|
||||
shadow_name = table_name
|
||||
base_name = table_name.replace("shadow_", "")
|
||||
self.assertIn(base_name, table_names)
|
||||
else:
|
||||
base_name = table_name
|
||||
shadow_name = "shadow_" + table_name
|
||||
self.assertIn(shadow_name, table_names)
|
||||
shadow_table = get_table(engine, shadow_name)
|
||||
base_table = get_table(engine, base_name)
|
||||
base_columns = []
|
||||
shadow_columns = []
|
||||
for column in base_table.columns:
|
||||
base_columns.append(column)
|
||||
for column in shadow_table.columns:
|
||||
shadow_columns.append(column)
|
||||
for ii, base_column in enumerate(base_columns):
|
||||
shadow_column = shadow_columns[ii]
|
||||
self.assertEqual(base_column.name, shadow_column.name)
|
||||
# NullType needs a special case. We end up with NullType on sqlite
|
||||
# where bigint is not defined.
|
||||
if isinstance(base_column.type, sqlalchemy.types.NullType):
|
||||
self.assertTrue(isinstance(shadow_column.type,
|
||||
sqlalchemy.types.NullType))
|
||||
else:
|
||||
# Identical types do not test equal because sqlalchemy does not
|
||||
# override __eq__, but if we stringify them then they do.
|
||||
self.assertEqual(str(base_column.type),
|
||||
str(shadow_column.type))
|
||||
|
Loading…
Reference in New Issue
Block a user