Merge "Run vertices cleanup at the end of get_all"

This commit is contained in:
Zuul 2019-08-20 09:08:37 +00:00 committed by Gerrit Code Review
commit 405c406d50
3 changed files with 51 additions and 2 deletions

View File

@ -69,6 +69,7 @@ class DriverBase(object):
cls._add_entity_type(entity, entity_type) cls._add_entity_type(entity, entity_type)
cls._add_datasource_action(entity, datasource_action) cls._add_datasource_action(entity, datasource_action)
cls._add_sampling_time(entity) cls._add_sampling_time(entity)
entity[VProps.VITRAGE_DATASOURCE_NAME] = cls._datasource_name
yield entity yield entity
@staticmethod @staticmethod

View File

@ -18,8 +18,14 @@ import time
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
import oslo_messaging import oslo_messaging
from vitrage.utils.datetime import format_utcnow
from vitrage.common.constants import DatasourceAction from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.common.constants import GraphAction
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.consistency import CONSISTENCY_DATASOURCE
from vitrage.datasources import utils from vitrage.datasources import utils
from vitrage import messaging from vitrage import messaging
@ -29,7 +35,8 @@ LOG = log.getLogger(__name__)
class DriverExec(object): class DriverExec(object):
def __init__(self, process_output_func, persist): def __init__(self, process_output_func, persist, graph):
self.graph = graph
self.process_output_func = process_output_func self.process_output_func = process_output_func
self.persist = persist self.persist = persist
@ -49,10 +56,21 @@ class DriverExec(object):
LOCK_BY_DRIVER.acquire(driver_name) LOCK_BY_DRIVER.acquire(driver_name)
driver = utils.get_drivers_by_name([driver_name])[0] driver = utils.get_drivers_by_name([driver_name])[0]
LOG.info("run driver get_all: %s", driver_name) LOG.info("run driver get_all: %s", driver_name)
now = format_utcnow()
events = driver.get_all(action) events = driver.get_all(action)
count = self.process_output_func(events) count = self.process_output_func(events)
LOG.info("run driver get_all: %s done (%s events)", LOG.info("run driver get_all: %s done (%s events)",
driver_name, count) driver_name, count)
if driver.should_delete_outdated_entities():
vertices_to_delete = \
self._find_vertices_to_delete(driver_name, now)
if vertices_to_delete:
delete_events = self._to_events(vertices_to_delete)
count_deleted = self.process_output_func(delete_events)
LOG.info("run driver delete outdated vertices: %s done "
"(%s events)", driver_name, count_deleted)
count += count_deleted
return count return count
except Exception: except Exception:
LOG.exception("run driver get_all: %s Failed", driver_name) LOG.exception("run driver get_all: %s Failed", driver_name)
@ -79,6 +97,34 @@ class DriverExec(object):
LOCK_BY_DRIVER.release(driver_name) LOCK_BY_DRIVER.release(driver_name)
return 0 return 0
@staticmethod
def _to_events(vertices):
return (
{
DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE,
DSProps.DATASOURCE_ACTION: DatasourceAction.UPDATE,
DSProps.SAMPLE_DATE: format_utcnow(),
DSProps.EVENT_TYPE: GraphAction.DELETE_ENTITY,
VProps.VITRAGE_ID: vertex[VProps.VITRAGE_ID],
VProps.ID: vertex.get(VProps.ID, None),
VProps.VITRAGE_TYPE: vertex[VProps.VITRAGE_TYPE],
VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY],
VProps.IS_REAL_VITRAGE_ID: True
}
for vertex in vertices
)
def _find_vertices_to_delete(self, driver_name, now):
query = {
'and': [
{'==': {VProps.VITRAGE_DATASOURCE_NAME: driver_name}},
{'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: now}},
{'==': {VProps.VITRAGE_IS_DELETED: False}},
]
}
return self.graph.get_vertices(query_dict=query)
class DriversNotificationEndpoint(object): class DriversNotificationEndpoint(object):

View File

@ -47,7 +47,7 @@ class VitrageGraphInit(object):
self.persist = GraphPersistency(db_connection, self.graph) self.persist = GraphPersistency(db_connection, self.graph)
self.driver_exec = driver_exec.DriverExec( self.driver_exec = driver_exec.DriverExec(
self.events_coordination.handle_multiple_low_priority, self.events_coordination.handle_multiple_low_priority,
self.persist) self.persist, self.graph)
self.scheduler = Scheduler(self.graph, self.driver_exec, self.scheduler = Scheduler(self.graph, self.driver_exec,
self.persist) self.persist)
self.processor = Processor(self.graph) self.processor = Processor(self.graph)
@ -182,6 +182,8 @@ class EventsCoordination(object):
def handle_multiple_low_priority(self, events): def handle_multiple_low_priority(self, events):
index = 0 index = 0
if events is None:
events = []
for index, e in enumerate(events): for index, e in enumerate(events):
self._do_low_priority_work(e) self._do_low_priority_work(e)
return index return index