Fix for migrating installation structures

Sync strategy changed for installation structures: modification_date
field is used as migration db sync field.
Field db_sync_field_name added into migration data.
Migration algorithm is refactored.

Change-Id: I070546ad39922008b213ec58a740cddc745c9b07
Blueprint: send-anon-usage
This commit is contained in:
Alexander Kislitsky 2014-11-13 21:02:08 +04:00
parent 3a2f16b5ba
commit 5314bf60e9
4 changed files with 176 additions and 54 deletions

View File

@ -43,6 +43,10 @@ MAPPING_MIGRATION = {
'type': 'string',
'index': 'not_analyzed'
},
'db_sync_field_name': {
'type': 'string',
'index': 'not_analyzed'
},
# to
'index_name': {
'type': 'string',
@ -53,8 +57,8 @@ MAPPING_MIGRATION = {
'index': 'not_analyzed'
},
# status
'last_sync_id': {
'type': 'long'
'last_sync_value': {
'enabled': False
},
'last_sync_time': {
'type': 'date'
@ -149,7 +153,8 @@ INFO_TEMPLATES = {
STRUCTURES_DB_TABLE_NAME: {
'db_table_name': STRUCTURES_DB_TABLE_NAME,
'db_id_name': 'id',
'last_sync_id': 0,
'db_sync_field_name': 'modification_date',
'last_sync_value': '1970-01-01T00:00:00',
'index_name': 'fuel',
'doc_type_name': 'structure',
'last_sync_time': None
@ -157,7 +162,8 @@ INFO_TEMPLATES = {
ACTION_LOGS_DB_TABLE_NAME: {
'db_table_name': ACTION_LOGS_DB_TABLE_NAME,
'db_id_name': 'id',
'last_sync_id': 0,
'db_sync_field_name': 'id',
'last_sync_value': 0,
'index_name': 'fuel',
'doc_type_name': 'action_logs',
'last_sync_time': None

View File

@ -29,9 +29,10 @@ class SyncInfo(dict):
# explicit properties definition
db_table_name = None
db_id_name = None
db_sync_field_name = None
index_name = None
doc_type_name = None
last_sync_id = None
last_sync_value = None
last_sync_time = None
def __init__(self, *args, **kwargs):
@ -136,11 +137,14 @@ class Migrator(object):
dest='modification_date')
))
info = self.get_sync_info(config.STRUCTURES_DB_TABLE_NAME)
self.make_migration(InstallationStructure, info, mapping_rule)
logger.info("Migration of installation structures is finished")
try:
self.make_migration(InstallationStructure, info, mapping_rule)
logger.info("Migration of installation structures is finished")
except Exception:
logger.exception("Migration of installation structures is failed")
def migrate_action_logs(self):
logger.info('Migration of action logs is started')
logger.info("Migration of action logs is started")
mapping_rule = MappingRule(
'master_node_uid',
json_fields=(),
@ -148,40 +152,74 @@ class Migrator(object):
NameMapping(source='master_node_uid', dest='master_node_uid'),
))
info = self.get_sync_info(config.ACTION_LOGS_DB_TABLE_NAME)
self.make_migration(ActionLog, info, mapping_rule)
logger.info('Migration of action logs is finished')
try:
self.make_migration(ActionLog, info, mapping_rule)
logger.info("Migration of action logs is finished")
except Exception:
logger.exception("Migration of action logs is failed")
def make_migration(self, model, sync_info, mapping_rule):
def _migrate_objs(self, objs, sync_info, mapping_rule):
if len(objs) == 0:
logger.info("Nothing to be migrated for %s",
sync_info.db_table_name)
self.put_sync_info(sync_info)
return False
logger.info("%d %s to be migrated", len(objs),
sync_info.db_table_name)
docs = []
for obj in objs:
doc = mapping_rule.make_doc(sync_info.index_name,
sync_info.doc_type_name, obj)
docs.append(doc)
last_sync_value = getattr(obj, sync_info.db_sync_field_name)
processed, errors = helpers.bulk(self.es, docs)
if errors:
logger.error("Migration of %s failed: %s",
sync_info.db_table_name, errors)
return False
else:
if last_sync_value is not None:
sync_info.last_sync_value = last_sync_value
logger.info("Chunk of %s of size %d is migrated",
sync_info.db_table_name, len(objs))
self.put_sync_info(sync_info)
return True
def migrate_with_null_sync_field(self, model, sync_info, mapping_rule):
logger.debug("Migrating %s with NULL %s", sync_info.db_table_name,
sync_info.db_sync_field_name)
sync_field = getattr(model, sync_info.db_sync_field_name)
id_field = getattr(model, sync_info.db_id_name)
offset = 0
while True:
sync_info.last_sync_time = datetime.datetime.utcnow()
objs = self.db_session.query(model). \
filter(sync_field.is_(None)). \
order_by(id_field.asc()). \
limit(config.DB_SYNC_CHUNK_SIZE).offset(offset).all()
offset += len(objs)
if not self._migrate_objs(objs, sync_info, mapping_rule):
break
logger.debug("%s with NULL %s migrated", sync_info.db_table_name,
sync_info.db_sync_field_name)
def migrate_by_sync_field(self, model, sync_info, mapping_rule):
logger.debug("Migrating %s with %s > %s", sync_info.db_table_name,
sync_info.db_sync_field_name, sync_info.last_sync_value)
sync_field = getattr(model, sync_info.db_sync_field_name)
id_field = getattr(model, sync_info.db_id_name)
while True:
sync_info.last_sync_time = datetime.datetime.utcnow()
objs = self.db_session.query(model).\
filter(id_field > sync_info.last_sync_id).\
order_by(id_field.asc()).\
objs = self.db_session.query(model). \
filter(sync_field > sync_info.last_sync_value). \
order_by(id_field.asc()). \
limit(config.DB_SYNC_CHUNK_SIZE).all()
if len(objs) == 0:
logger.info("Nothing to be migrated for %s",
model.__tablename__)
self.put_sync_info(sync_info)
if not self._migrate_objs(objs, sync_info, mapping_rule):
break
logger.debug("%s with %s > %s migrated", sync_info.db_table_name,
sync_info.db_sync_field_name, sync_info.last_sync_value)
logger.info("%d %s to be migrated", len(objs),
model.__tablename__)
docs = []
for obj in objs:
doc = mapping_rule.make_doc(sync_info.index_name,
sync_info.doc_type_name, obj)
docs.append(doc)
from_id = obj.id
processed, errors = helpers.bulk(self.es, docs)
if errors:
logger.error("Migration of %s failed: %s",
model.__tablename__, errors)
break
else:
last_sync_id = from_id
sync_info.last_sync_id = last_sync_id
logger.info("Chunk of %s of size %d is migrated",
model.__tablename__, len(objs))
self.put_sync_info(sync_info)
def make_migration(self, model, sync_info, mapping_rule):
self.migrate_with_null_sync_field(model, sync_info, mapping_rule)
self.migrate_by_sync_field(model, sync_info, mapping_rule)

View File

@ -13,6 +13,7 @@
# under the License.
from collections import namedtuple
import datetime
from elasticsearch import Elasticsearch
from random import randint
from unittest2.case import TestCase
@ -69,7 +70,7 @@ class MigrationTest(ElasticTest, DbTest):
def setUp(self):
super(MigrationTest, self).setUp()
def create_dumb_structure(self):
def create_dumb_structure(self, set_md=True):
mn_uid = '{}'.format(uuid.uuid4())
structure = {
'master_node_uid': mn_uid,
@ -87,8 +88,15 @@ class MigrationTest(ElasticTest, DbTest):
'clusters_num': 1,
'clusters': []
}
now = datetime.datetime.utcnow()
if set_md:
m_date = now
else:
m_date = None
db_session.add(InstallationStructure(master_node_uid=mn_uid,
structure=structure))
structure=structure,
creation_date=now,
modification_date=m_date))
db_session.commit()
return mn_uid

View File

@ -21,7 +21,6 @@ from migration.test.base import MigrationTest
from migration.migrator import Migrator
from migration.model import ActionLog
from migration.model import InstallationStructure
class MigratorTest(MigrationTest):
@ -64,12 +63,14 @@ class MigratorTest(MigrationTest):
def test_empty_installation_structure_migration(self):
migrator = Migrator()
time_before = datetime.datetime.utcnow()
info_before = migrator.get_sync_info(config.STRUCTURES_DB_TABLE_NAME)
migrator.migrate_installation_structure()
info = migrator.get_sync_info(config.STRUCTURES_DB_TABLE_NAME)
time_of_sync = parser.parse(info.last_sync_time)
info_after = migrator.get_sync_info(config.STRUCTURES_DB_TABLE_NAME)
time_of_sync = parser.parse(info_after.last_sync_time)
self.assertLessEqual(time_before, time_of_sync)
self.assertGreaterEqual(datetime.datetime.utcnow(), time_of_sync)
self.assertEquals(0, info.last_sync_id)
self.assertEquals(info_before.last_sync_value,
info_after.last_sync_value)
def get_indexed_docs_num(self, sync_info):
resp = self.es.count(index=sync_info.index_name,
@ -78,9 +79,38 @@ class MigratorTest(MigrationTest):
return resp['count']
@patch('migration.config.DB_SYNC_CHUNK_SIZE', 2)
def test_installation_structure_migration(self):
def test_migrations_chain(self):
migrator = Migrator()
migrator.migrate_installation_structure()
migrator.migrate_action_logs()
docs_num = 3
mn_uids = [self.create_dumb_structure() for _ in xrange(docs_num)]
for _ in xrange(docs_num):
self.create_dumb_structure()
self.create_dumb_action_log()
is_sync_info = migrator.get_sync_info(config.STRUCTURES_DB_TABLE_NAME)
is_indexed_docs_before = self.get_indexed_docs_num(is_sync_info)
al_sync_info = migrator.get_sync_info(config.ACTION_LOGS_DB_TABLE_NAME)
al_indexed_docs_before = self.get_indexed_docs_num(al_sync_info)
migrator.migrate_installation_structure()
migrator.migrate_action_logs()
self.es.indices.refresh(index=config.INDEX_FUEL)
is_indexed_docs_after = self.get_indexed_docs_num(is_sync_info)
self.assertEquals(is_indexed_docs_before + docs_num,
is_indexed_docs_after)
al_indexed_docs_after = self.get_indexed_docs_num(al_sync_info)
self.assertEquals(al_indexed_docs_before + docs_num,
al_indexed_docs_after)
@patch('migration.config.DB_SYNC_CHUNK_SIZE', 2)
def test_installation_structure_migration(self):
mn_uids = set([self.create_dumb_structure() for _ in xrange(3)])
null_md_uids = set([self.create_dumb_structure(set_md=False)
for _ in xrange(3)])
mn_uids.update(null_md_uids)
mn_uids.update(set([self.create_dumb_structure() for _ in xrange(3)]))
migrator = Migrator()
sync_info = migrator.get_sync_info(config.STRUCTURES_DB_TABLE_NAME)
@ -92,25 +122,65 @@ class MigratorTest(MigrationTest):
self.es.indices.refresh(index=config.INDEX_MIGRATION)
new_sync_info = migrator.get_sync_info(config.STRUCTURES_DB_TABLE_NAME)
time_of_sync = parser.parse(new_sync_info.last_sync_time)
last_obj = migrator.db_session.query(InstallationStructure).order_by(
InstallationStructure.id.desc()).first()
# checking sync time is updated
self.assertLessEqual(time_before, time_of_sync)
self.assertGreaterEqual(datetime.datetime.utcnow(), time_of_sync)
# checking last sync id is updated
self.assertEquals(last_obj.id, new_sync_info.last_sync_id)
last_md = parser.parse(new_sync_info.last_sync_value)
initial_md = parser.parse(sync_info.last_sync_value)
self.assertGreater(last_md, initial_md)
# checking all docs are migrated
self.es.indices.refresh(index=sync_info.index_name)
self.assertEquals(indexed_docs_before + docs_num,
self.assertEquals(indexed_docs_before + len(mn_uids),
self.get_indexed_docs_num(sync_info))
# checking new docs are indexed
for mn_uid in mn_uids:
self.es.get(sync_info.index_name, mn_uid,
doc_type=sync_info.doc_type_name)
for mn_uid in mn_uids - null_md_uids:
doc = self.es.get(sync_info.index_name, mn_uid,
doc_type=sync_info.doc_type_name)
# checking datetimes are migrated
source = doc['_source']
self.assertIsNotNone(source['creation_date'])
self.assertIsNotNone(source['modification_date'])
# checking new docs are indexed
for mn_uid in null_md_uids:
doc = self.es.get(sync_info.index_name, mn_uid,
doc_type=sync_info.doc_type_name)
# checking datetimes are migrated
source = doc['_source']
self.assertIsNotNone(source['creation_date'])
self.assertIsNone(source['modification_date'])
@patch('migration.config.DB_SYNC_CHUNK_SIZE', 2)
def test_null_modification_date_migration(self):
docs_num = 5
for _ in xrange(docs_num):
self.create_dumb_structure(set_md=False)
migrator = Migrator()
sync_info_before = migrator.get_sync_info(
config.STRUCTURES_DB_TABLE_NAME)
# checking sync info before migrations
self.assertIsNotNone(sync_info_before.last_sync_value)
self.assertIsNone(sync_info_before.last_sync_time)
indexed_docs_before = self.get_indexed_docs_num(sync_info_before)
# migrating data
migrator.migrate_installation_structure()
self.es.indices.refresh(config.INDEX_FUEL)
self.es.indices.refresh(config.INDEX_MIGRATION)
# checking sync info after migrations
sync_info_after = migrator.get_sync_info(
config.STRUCTURES_DB_TABLE_NAME)
self.assertIsNotNone(sync_info_after.last_sync_value)
self.assertIsNotNone(sync_info_after.last_sync_time)
indexed_docs_after = self.get_indexed_docs_num(sync_info_after)
self.assertEquals(indexed_docs_before + docs_num, indexed_docs_after)
def test_empty_action_logs_migration(self):
migrator = Migrator()
@ -120,7 +190,7 @@ class MigratorTest(MigrationTest):
time_of_sync = parser.parse(info.last_sync_time)
self.assertLessEqual(time_before, time_of_sync)
self.assertGreaterEqual(datetime.datetime.utcnow(), time_of_sync)
self.assertEquals(0, info.last_sync_id)
self.assertEquals(0, info.last_sync_value)
@patch('migration.config.DB_SYNC_CHUNK_SIZE', 2)
def test_action_logs_migration(self):
@ -147,7 +217,7 @@ class MigratorTest(MigrationTest):
self.assertGreaterEqual(datetime.datetime.utcnow(), time_of_sync)
# checking last sync id is updated
self.assertEquals(last_obj.id, new_sync_info.last_sync_id)
self.assertEquals(last_obj.id, new_sync_info.last_sync_value)
# checking all docs are migrated
self.es.indices.refresh(index=sync_info.index_name)