Fix for action_logs migration

DB model fixed
Action logs JSON schema fixed
Tasks generation added into tests
Fixed Elasticsearch mapping scheme
Fixed collector manager running script
Set of fields for es id generation is used

Closes-Bug: #1397326
Change-Id: Ifdb47d5af6a75d4306711e3929146a9a82406dcc
This commit is contained in:
Alexander Kislitsky 2014-11-20 18:28:05 +04:00
parent 6201c2bf17
commit 77bdc11b22
9 changed files with 245 additions and 111 deletions

View File

@ -50,11 +50,13 @@
"additional_info": {
"type": "object",
"properties": {
"parent_task_id": {"type": "number"},
"parent_task_id": {"type": ["number", "null"]},
"subtasks_ids": {"type": "array"},
"operation": {"type": "string"},
"nodes_from_resp": {"type": "array"},
"ended_with_status": {"type": "string"}
"ended_with_status": {"type": "string"},
"message": {"type": ["string", "null"]},
"output": {"type": ["object", "null"]}
}
},
"is_sent": {"type": "boolean"},

View File

@ -196,7 +196,7 @@ class TestActionLogs(DbTest):
# about 1/3 is incomplete
"end_timestamp": "2" if i % 3 else None,
"additional_info": {
"parent_task_id": 0,
"parent_task_id": i if i % 2 else None,
"subtasks_ids": [],
"operation": "deployment"
},

View File

@ -31,7 +31,7 @@ def configure_app(mode=None):
'prod': 'collector.api.config.Production'
}
app.config.from_object(mode_map.get(mode))
app.config.from_envvar('COLLECTOR_SETTINGS', silent=False)
app.config.from_envvar('COLLECTOR_SETTINGS', silent=True)
setattr(app_module, 'db', flask_sqlalchemy.SQLAlchemy(app))
log.init_logger()
return app

View File

@ -14,112 +14,112 @@
import logging
LOG_FILE = '/var/log/migration.log'
LOG_FILE = "/var/log/migration.log"
LOG_LEVEL = logging.INFO
LOG_FILE_SIZE = 2048000
LOG_FILES_COUNT = 20
ELASTIC_HOST = 'localhost'
ELASTIC_HOST = "localhost"
ELASTIC_PORT = 9200
SYNC_EVERY_SECONDS = 3600
DB_CONNECTION_STRING = 'postgresql://collector:***@localhost/collector'
DB_CONNECTION_STRING = "postgresql://collector:***@localhost/collector"
# size of chunk for fetching objects for synchronization
# into Elasticsearch
DB_SYNC_CHUNK_SIZE = 1000
INDEX_MIGRATION = 'migration'
DOC_TYPE_MIGRATION_INFO = 'info'
INDEX_MIGRATION = "migration"
DOC_TYPE_MIGRATION_INFO = "info"
MAPPING_MIGRATION = {
DOC_TYPE_MIGRATION_INFO: {
'properties': {
"properties": {
# from
'db_table_name': {
'type': 'string',
'index': 'not_analyzed'
"db_table_name": {
"type": "string",
"index": "not_analyzed"
},
'db_id_name': {
'type': 'string',
'index': 'not_analyzed'
"db_id_name": {
"type": "string",
"index": "not_analyzed"
},
'db_sync_field_name': {
'type': 'string',
'index': 'not_analyzed'
"db_sync_field_name": {
"type": "string",
"index": "not_analyzed"
},
# to
'index_name': {
'type': 'string',
'index': 'not_analyzed'
"index_name": {
"type": "string",
"index": "not_analyzed"
},
'doc_type_name': {
'type': 'string',
'index': 'not_analyzed'
"doc_type_name": {
"type": "string",
"index": "not_analyzed"
},
# status
'last_sync_value': {
'enabled': False
"last_sync_value": {
"enabled": False
},
'last_sync_time': {
'type': 'date'
"last_sync_time": {
"type": "date"
}
}
}
}
INDEX_FUEL = 'fuel'
DOC_TYPE_STRUCTURE = 'structure'
DOC_TYPE_ACTION_LOGS = 'action_logs'
INDEX_FUEL = "fuel"
DOC_TYPE_STRUCTURE = "structure"
DOC_TYPE_ACTION_LOGS = "action_logs"
MAPPING_FUEL = {
DOC_TYPE_STRUCTURE: {
'properties': {
'master_node_uid': {
'type': 'string',
'index': 'not_analyzed'
"properties": {
"master_node_uid": {
"type": "string",
"index": "not_analyzed"
},
'allocated_nodes_num': {'type': 'long'},
'unallocated_nodes_num': {'type': 'long'},
'creation_date': {'type': 'date'},
'modification_date': {'type': 'date'},
'clusters': {
'type': 'nested',
'properties': {
'id': {'type': 'long'},
'status': {'type': 'string'},
'release': {
'type': 'nested',
'properties': {
'version': {
'type': 'string',
'index': 'not_analyzed'
"allocated_nodes_num": {"type": "long"},
"unallocated_nodes_num": {"type": "long"},
"creation_date": {"type": "date"},
"modification_date": {"type": "date"},
"clusters": {
"type": "nested",
"properties": {
"id": {"type": "long"},
"status": {"type": "string"},
"release": {
"type": "nested",
"properties": {
"version": {
"type": "string",
"index": "not_analyzed"
},
'os': {
'type': 'string',
'index': 'analyzed',
'analyzer': 'not_analyzed_lowercase'
"os": {
"type": "string",
"index": "analyzed",
"analyzer": "not_analyzed_lowercase"
},
'name': {
'type': 'string',
'index': 'not_analyzed'
"name": {
"type": "string",
"index": "not_analyzed"
}
}
},
'attributes': {
'type': 'nested',
'properties': {
'libvirt_type': {
'type': 'string',
'index': 'not_analyzed'
"attributes": {
"type": "nested",
"properties": {
"libvirt_type": {
"type": "string",
"index": "not_analyzed"
}
}
},
'nodes_num': {'type': 'long'},
'nodes': {
'type': 'nested',
'properties': {
'id': {'type': 'long'},
'manufacturer': {'type': 'string'}
"nodes_num": {"type": "long"},
"nodes": {
"type": "nested",
"properties": {
"id": {"type": "long"},
"manufacturer": {"type": "string"}
}
},
}
@ -127,45 +127,71 @@ MAPPING_FUEL = {
}
},
DOC_TYPE_ACTION_LOGS: {
'properties': {
'master_node_uid': {
'type': 'string',
'index': 'not_analyzed'
}
"properties": {
"master_node_uid": {
"type": "string",
"index": "not_analyzed"
},
"id": {"type": "long"},
"actor_id": {"type": "string"},
"action_group": {"type": "string"},
"action_name": {"type": "string"},
"action_type": {"type": "string"},
"start_timestamp": {"type": "date"},
"end_timestamp": {"type": "date"},
"additional_info": {
"type": "object",
"properties": {
# http request
"request_data": {"type": "object"},
"response_data": {"type": "object"},
# task
"parent_task_id": {"type": "long"},
"subtasks_ids": {"type": "long"},
"operation": {"type": "string"},
"nodes_from_resp": {"enabled": False},
"ended_with_status": {"type": "string"},
"message": {"type": "string"},
"output": {"enabled": False}
}
},
"is_sent": {"type": "boolean"},
"cluster_id": {"type": "long"},
"task_uuid": {"type": "string"}
}
}
}
ANALYSIS_INDEX_FUEL = {
'analyzer': {
'not_analyzed_lowercase': {
'filter': ['lowercase'],
'type': 'custom',
'tokenizer': 'keyword'
"analyzer": {
"not_analyzed_lowercase": {
"filter": ["lowercase"],
"type": "custom",
"tokenizer": "keyword"
}
}
}
STRUCTURES_DB_TABLE_NAME = 'installation_structures'
ACTION_LOGS_DB_TABLE_NAME = 'action_logs'
STRUCTURES_DB_TABLE_NAME = "installation_structures"
ACTION_LOGS_DB_TABLE_NAME = "action_logs"
INFO_TEMPLATES = {
STRUCTURES_DB_TABLE_NAME: {
'db_table_name': STRUCTURES_DB_TABLE_NAME,
'db_id_name': 'id',
'db_sync_field_name': 'modification_date',
'last_sync_value': '1970-01-01T00:00:00',
'index_name': 'fuel',
'doc_type_name': 'structure',
'last_sync_time': None
"db_table_name": STRUCTURES_DB_TABLE_NAME,
"db_id_name": "id",
"db_sync_field_name": "modification_date",
"last_sync_value": "1970-01-01T00:00:00",
"index_name": "fuel",
"doc_type_name": "structure",
"last_sync_time": None
},
ACTION_LOGS_DB_TABLE_NAME: {
'db_table_name': ACTION_LOGS_DB_TABLE_NAME,
'db_id_name': 'id',
'db_sync_field_name': 'id',
'last_sync_value': 0,
'index_name': 'fuel',
'doc_type_name': 'action_logs',
'last_sync_time': None
"db_table_name": ACTION_LOGS_DB_TABLE_NAME,
"db_id_name": "id",
"db_sync_field_name": "id",
"last_sync_value": 0,
"index_name": "fuel",
"doc_type_name": "action_logs",
"last_sync_time": None
}
}

View File

@ -44,19 +44,25 @@ NameMapping = namedtuple('NameMapping', ['source', 'dest'])
class MappingRule(object):
ID_FIELDS_GLUE = '_'
def __init__(self, db_id_name, json_fields=(), mixed_fields_mapping=()):
def __init__(self, db_id_names, json_fields=(), mixed_fields_mapping=()):
"""Describes how db object is mapped into Eslasticsearch document
:param db_id_name: NameMapping of db id field name
:param db_id_names: db fields names used for Elasticsearch document _id
:param json_fields: tuple of fields to be merged as dicts into
Elasicsearch document
:param mixed_fields_mapping: tuple of NameMapping for adding into
Elasicsearch document
"""
self.db_id_name = db_id_name
self.db_id_names = db_id_names
self.json_fields = json_fields
self.mixed_fields_mapping = mixed_fields_mapping
def _get_es_id(self, db_object):
values = ('{}'.format(getattr(db_object, db_id_name)) for
db_id_name in self.db_id_names)
return self.ID_FIELDS_GLUE.join(values)
def make_doc(self, index_name, doc_type_name, db_object):
"""Returns dictionary for sending into Elasticsearch
"""
@ -68,7 +74,7 @@ class MappingRule(object):
return {
'_index': index_name,
'_type': doc_type_name,
'_id': getattr(db_object, self.db_id_name),
'_id': self._get_es_id(db_object),
'_source': data
}
@ -129,7 +135,7 @@ class Migrator(object):
def migrate_installation_structure(self):
logger.info("Migration of installation structures is started")
mapping_rule = MappingRule(
'master_node_uid',
('master_node_uid',),
json_fields=('structure',),
mixed_fields_mapping=(
NameMapping(source='creation_date', dest='creation_date'),
@ -146,8 +152,8 @@ class Migrator(object):
def migrate_action_logs(self):
logger.info("Migration of action logs is started")
mapping_rule = MappingRule(
'master_node_uid',
json_fields=(),
('master_node_uid', 'external_id'),
json_fields=('body',),
mixed_fields_mapping=(
NameMapping(source='master_node_uid', dest='master_node_uid'),
))

View File

@ -35,6 +35,7 @@ class ActionLog(Base):
id = Column(Integer, primary_key=True)
master_node_uid = Column(String, nullable=False)
external_id = Column(Integer, nullable=False)
body = Column(JSON, nullable=False)
class InstallationStructure(Base):

View File

@ -198,13 +198,78 @@ class MigrationTest(ElasticTest, DbTest):
db_session.commit()
return mn_uid
def create_dumb_action_log(self):
mn_uid = '{}'.format(uuid.uuid4())
external_id = random.randint(1, 10000)
def _action_name(self):
return random.choice([
'deploy',
'deployment',
'provision',
'stop_deployment',
'reset_environment',
'update',
])
def _gen_id(self):
return random.randint(1, 10000)
def _task_status(self):
return random.choice([
'ready',
'running',
'error'
])
def _nodes(self):
return [self._gen_id() for _ in xrange(0, 100)]
def _subtasks(self):
return [self._gen_id() for _ in xrange(0, 5)]
def create_dumb_action_log(self, mn_uid=None):
if mn_uid is None:
mn_uid = '{}'.format(uuid.uuid4())
external_id = self._gen_id()
body = {
'id': self._gen_id(),
'actor_id': '{}'.format(uuid.uuid4()),
'action_group': random.choice([
'cluster_changes',
'cluster_checking',
'operations'
]),
'action_name': self._action_name(),
'action_type': random.choice(['http_request',
'nailgun_task']),
'start_timestamp': datetime.datetime.utcnow().isoformat(),
'end_timestamp': random.choice([
None,
(datetime.datetime.utcnow() + datetime.timedelta(
seconds=random.randint(0, 60)
)).isoformat(),
]),
'is_sent': random.choice([True, False]),
'cluster_id': self._gen_id(),
'task_uuid': '{}'.format(uuid.uuid4()),
'additional_info': random.choice([
{
# http request
'request_data': {},
'response_data': {},
},
{
# task
'parent_task_id': self._gen_id(),
'subtasks_ids': self._subtasks(),
'operation': self._action_name(),
'nodes_from_resp': self._nodes(),
'ended_with_status': self._task_status()
}
])
}
db_session.add(ActionLog(master_node_uid=mn_uid,
external_id=external_id))
external_id=external_id,
body=body))
db_session.commit()
return mn_uid
return '{0}_{1}'.format(mn_uid, external_id)
AggsCheck = namedtuple('AggsCheck', ['key', 'doc_count'])

View File

@ -29,7 +29,7 @@ class MappingRuleTest(MigrationTest):
db_obj = db_session.query(InstallationStructure).filter(
InstallationStructure.master_node_uid == mn_uid).one()
rule = MappingRule(
'master_node_uid',
('master_node_uid',),
json_fields=('structure',),
mixed_fields_mapping=(
NameMapping(source='creation_date', dest='creation_date'),

View File

@ -225,6 +225,40 @@ class MigratorTest(MigrationTest):
self.get_indexed_docs_num(sync_info))
# checking new docs are indexed
check_keys = [
'master_node_uid',
'id',
'actor_id',
'action_group',
'action_name',
'action_type',
'start_timestamp',
'end_timestamp',
'additional_info',
'is_sent',
'cluster_id',
'task_uuid'
]
for mn_uid in mn_uids:
self.es.get(sync_info.index_name, mn_uid,
doc_type=sync_info.doc_type_name)
resp = self.es.get(sync_info.index_name, mn_uid,
doc_type=sync_info.doc_type_name)
doc = resp['_source']
for k in check_keys:
self.assertTrue(k in doc)
@patch('migration.config.DB_SYNC_CHUNK_SIZE', 2)
def test_action_logs_one_node_migration(self):
docs_num = 5
mn_uid = 'xx'
for _ in xrange(docs_num):
self.create_dumb_action_log(mn_uid=mn_uid)
migrator = Migrator()
sync_info = migrator.get_sync_info(config.ACTION_LOGS_DB_TABLE_NAME)
indexed_docs_before = self.get_indexed_docs_num(sync_info)
migrator.migrate_action_logs()
# checking all docs are migrated
self.es.indices.refresh(index=sync_info.index_name)
self.assertEquals(indexed_docs_before + docs_num,
self.get_indexed_docs_num(sync_info))