Merge "Allow archiving deleted rows to shadow tables, for performance."
This commit is contained in:
@@ -770,6 +770,17 @@ class DbCommands(object):
|
||||
"""Print the current database version."""
|
||||
print migration.db_version()
|
||||
|
||||
@args('--max_rows', dest='max_rows', metavar='<number>',
|
||||
help='Maximum number of deleted rows to archive')
|
||||
def archive_deleted_rows(self, max_rows=None):
|
||||
"""Move up to max_rows deleted rows from production tables to shadow
|
||||
tables.
|
||||
"""
|
||||
if max_rows is not None:
|
||||
max_rows = int(max_rows)
|
||||
admin_context = context.get_admin_context()
|
||||
db.archive_deleted_rows(admin_context, max_rows)
|
||||
|
||||
|
||||
class InstanceTypeCommands(object):
|
||||
"""Class for managing instance types / flavors."""
|
||||
|
@@ -22,10 +22,15 @@
|
||||
import datetime
|
||||
import uuid as stdlib_uuid
|
||||
|
||||
from sqlalchemy import MetaData
|
||||
from sqlalchemy.schema import Table
|
||||
from sqlalchemy.sql.expression import select
|
||||
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.openstack.common import cfg
|
||||
from nova.openstack.common.db.sqlalchemy import session as db_session
|
||||
from nova.openstack.common import timeutils
|
||||
from nova import test
|
||||
from nova.tests import matchers
|
||||
@@ -36,6 +41,9 @@ CONF = cfg.CONF
|
||||
CONF.import_opt('reserved_host_memory_mb', 'nova.compute.resource_tracker')
|
||||
CONF.import_opt('reserved_host_disk_mb', 'nova.compute.resource_tracker')
|
||||
|
||||
get_engine = db_session.get_engine
|
||||
get_session = db_session.get_session
|
||||
|
||||
|
||||
class DbApiTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
@@ -1791,3 +1799,156 @@ class TaskLogTestCase(test.TestCase):
|
||||
result = db.task_log_get(self.context, self.task_name, self.begin,
|
||||
self.end, self.host)
|
||||
self.assertEqual(result['errors'], 1)
|
||||
|
||||
|
||||
class ArchiveTestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ArchiveTestCase, self).setUp()
|
||||
self.context = context.get_admin_context()
|
||||
engine = get_engine()
|
||||
self.conn = engine.connect()
|
||||
self.metadata = MetaData()
|
||||
self.metadata.bind = engine
|
||||
self.table1 = Table("instance_id_mappings",
|
||||
self.metadata,
|
||||
autoload=True)
|
||||
self.shadow_table1 = Table("shadow_instance_id_mappings",
|
||||
self.metadata,
|
||||
autoload=True)
|
||||
self.table2 = Table("dns_domains",
|
||||
self.metadata,
|
||||
autoload=True)
|
||||
self.shadow_table2 = Table("shadow_dns_domains",
|
||||
self.metadata,
|
||||
autoload=True)
|
||||
self.uuidstrs = []
|
||||
for unused in xrange(6):
|
||||
self.uuidstrs.append(stdlib_uuid.uuid4().hex)
|
||||
|
||||
def tearDown(self):
|
||||
super(ArchiveTestCase, self).tearDown()
|
||||
delete_statement1 = self.table1.delete(
|
||||
self.table1.c.uuid.in_(self.uuidstrs))
|
||||
self.conn.execute(delete_statement1)
|
||||
delete_statement2 = self.shadow_table1.delete(
|
||||
self.shadow_table1.c.uuid.in_(self.uuidstrs))
|
||||
self.conn.execute(delete_statement2)
|
||||
delete_statement3 = self.table2.delete(self.table2.c.domain.in_(
|
||||
self.uuidstrs))
|
||||
self.conn.execute(delete_statement3)
|
||||
delete_statement4 = self.shadow_table2.delete(
|
||||
self.shadow_table2.c.domain.in_(self.uuidstrs))
|
||||
self.conn.execute(delete_statement4)
|
||||
|
||||
def test_archive_deleted_rows(self):
|
||||
# Add 6 rows to table
|
||||
for uuidstr in self.uuidstrs:
|
||||
insert_statement = self.table1.insert().values(uuid=uuidstr)
|
||||
self.conn.execute(insert_statement)
|
||||
# Set 4 to deleted
|
||||
update_statement = self.table1.update().\
|
||||
where(self.table1.c.uuid.in_(self.uuidstrs[:4]))\
|
||||
.values(deleted=True)
|
||||
self.conn.execute(update_statement)
|
||||
query1 = select([self.table1]).where(self.table1.c.uuid.in_(
|
||||
self.uuidstrs))
|
||||
rows1 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 6 in main
|
||||
self.assertEqual(len(rows1), 6)
|
||||
query2 = select([self.shadow_table1]).\
|
||||
where(self.shadow_table1.c.uuid.in_(self.uuidstrs))
|
||||
rows2 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 0 in shadow
|
||||
self.assertEqual(len(rows2), 0)
|
||||
# Archive 2 rows
|
||||
db.archive_deleted_rows(self.context, max_rows=2)
|
||||
rows3 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 4 left in main
|
||||
self.assertEqual(len(rows3), 4)
|
||||
rows4 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 2 in shadow
|
||||
self.assertEqual(len(rows4), 2)
|
||||
# Archive 2 more rows
|
||||
db.archive_deleted_rows(self.context, max_rows=2)
|
||||
rows5 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 2 left in main
|
||||
self.assertEqual(len(rows5), 2)
|
||||
rows6 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 4 in shadow
|
||||
self.assertEqual(len(rows6), 4)
|
||||
# Try to archive more, but there are no deleted rows left.
|
||||
db.archive_deleted_rows(self.context, max_rows=2)
|
||||
rows7 = self.conn.execute(query1).fetchall()
|
||||
# Verify we still have 2 left in main
|
||||
self.assertEqual(len(rows7), 2)
|
||||
rows8 = self.conn.execute(query2).fetchall()
|
||||
# Verify we still have 4 in shadow
|
||||
self.assertEqual(len(rows8), 4)
|
||||
|
||||
def test_archive_deleted_rows_for_table(self):
|
||||
tablename = "instance_id_mappings"
|
||||
# Add 6 rows to table
|
||||
for uuidstr in self.uuidstrs:
|
||||
insert_statement = self.table1.insert().values(uuid=uuidstr)
|
||||
self.conn.execute(insert_statement)
|
||||
# Set 4 to deleted
|
||||
update_statement = self.table1.update().\
|
||||
where(self.table1.c.uuid.in_(self.uuidstrs[:4]))\
|
||||
.values(deleted=True)
|
||||
self.conn.execute(update_statement)
|
||||
query1 = select([self.table1]).where(self.table1.c.uuid.in_(
|
||||
self.uuidstrs))
|
||||
rows1 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 6 in main
|
||||
self.assertEqual(len(rows1), 6)
|
||||
query2 = select([self.shadow_table1]).\
|
||||
where(self.shadow_table1.c.uuid.in_(self.uuidstrs))
|
||||
rows2 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 0 in shadow
|
||||
self.assertEqual(len(rows2), 0)
|
||||
# Archive 2 rows
|
||||
db.archive_deleted_rows_for_table(self.context, tablename, max_rows=2)
|
||||
rows3 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 4 left in main
|
||||
self.assertEqual(len(rows3), 4)
|
||||
rows4 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 2 in shadow
|
||||
self.assertEqual(len(rows4), 2)
|
||||
# Archive 2 more rows
|
||||
db.archive_deleted_rows_for_table(self.context, tablename, max_rows=2)
|
||||
rows5 = self.conn.execute(query1).fetchall()
|
||||
# Verify we have 2 left in main
|
||||
self.assertEqual(len(rows5), 2)
|
||||
rows6 = self.conn.execute(query2).fetchall()
|
||||
# Verify we have 4 in shadow
|
||||
self.assertEqual(len(rows6), 4)
|
||||
# Try to archive more, but there are no deleted rows left.
|
||||
db.archive_deleted_rows_for_table(self.context, tablename, max_rows=2)
|
||||
rows7 = self.conn.execute(query1).fetchall()
|
||||
# Verify we still have 2 left in main
|
||||
self.assertEqual(len(rows7), 2)
|
||||
rows8 = self.conn.execute(query2).fetchall()
|
||||
# Verify we still have 4 in shadow
|
||||
self.assertEqual(len(rows8), 4)
|
||||
|
||||
def test_archive_deleted_rows_no_id_column(self):
|
||||
uuidstr0 = self.uuidstrs[0]
|
||||
insert_statement = self.table2.insert().values(domain=uuidstr0)
|
||||
self.conn.execute(insert_statement)
|
||||
update_statement = self.table2.update().\
|
||||
where(self.table2.c.domain == uuidstr0).\
|
||||
values(deleted=True)
|
||||
self.conn.execute(update_statement)
|
||||
query1 = select([self.table2], self.table2.c.domain == uuidstr0)
|
||||
rows1 = self.conn.execute(query1).fetchall()
|
||||
self.assertEqual(len(rows1), 1)
|
||||
query2 = select([self.shadow_table2],
|
||||
self.shadow_table2.c.domain == uuidstr0)
|
||||
rows2 = self.conn.execute(query2).fetchall()
|
||||
self.assertEqual(len(rows2), 0)
|
||||
db.archive_deleted_rows(self.context, max_rows=1)
|
||||
rows3 = self.conn.execute(query1).fetchall()
|
||||
self.assertEqual(len(rows3), 0)
|
||||
rows4 = self.conn.execute(query2).fetchall()
|
||||
self.assertEqual(len(rows4), 1)
|
||||
|
@@ -631,3 +631,49 @@ class TestMigrations(BaseMigrationTestCase):
|
||||
self.assertIn(prop_name, inst_sys_meta)
|
||||
self.assertEqual(str(inst_sys_meta[prop_name]),
|
||||
str(inst_type[prop]))
|
||||
|
||||
# migration 154, add shadow tables for deleted data
|
||||
# There are 53 shadow tables but we only test one
|
||||
# There are additional tests in test_db_api.py
|
||||
def _prerun_154(self, engine):
|
||||
meta = sqlalchemy.schema.MetaData()
|
||||
meta.reflect(engine)
|
||||
table_names = meta.tables.keys()
|
||||
for table_name in table_names:
|
||||
self.assertFalse(table_name.startswith("_shadow"))
|
||||
|
||||
def _check_154(self, engine, data):
|
||||
meta = sqlalchemy.schema.MetaData()
|
||||
meta.reflect(engine)
|
||||
table_names = set(meta.tables.keys())
|
||||
for table_name in table_names:
|
||||
print table_name
|
||||
if table_name.startswith("shadow_"):
|
||||
shadow_name = table_name
|
||||
base_name = table_name.replace("shadow_", "")
|
||||
self.assertIn(base_name, table_names)
|
||||
else:
|
||||
base_name = table_name
|
||||
shadow_name = "shadow_" + table_name
|
||||
self.assertIn(shadow_name, table_names)
|
||||
shadow_table = get_table(engine, shadow_name)
|
||||
base_table = get_table(engine, base_name)
|
||||
base_columns = []
|
||||
shadow_columns = []
|
||||
for column in base_table.columns:
|
||||
base_columns.append(column)
|
||||
for column in shadow_table.columns:
|
||||
shadow_columns.append(column)
|
||||
for ii, base_column in enumerate(base_columns):
|
||||
shadow_column = shadow_columns[ii]
|
||||
self.assertEqual(base_column.name, shadow_column.name)
|
||||
# NullType needs a special case. We end up with NullType on sqlite
|
||||
# where bigint is not defined.
|
||||
if isinstance(base_column.type, sqlalchemy.types.NullType):
|
||||
self.assertTrue(isinstance(shadow_column.type,
|
||||
sqlalchemy.types.NullType))
|
||||
else:
|
||||
# Identical types do not test equal because sqlalchemy does not
|
||||
# override __eq__, but if we stringify them then they do.
|
||||
self.assertEqual(str(base_column.type),
|
||||
str(shadow_column.type))
|
||||
|
Reference in New Issue
Block a user