From 9ea04388a68e524e1e0b52b0cd521affa962394a Mon Sep 17 00:00:00 2001 From: Idan Hefetz Date: Wed, 15 Nov 2017 09:19:28 +0000 Subject: [PATCH] parallel evaluation fixes worker call task_done also on failure. manager to close queues once not needed. Change-Id: Id5e001a020c91320d4176ccbd67c4a814a3ba6a8 --- vitrage/evaluator/evaluator_service.py | 26 +++++++++++++------------- vitrage/graph/algo_driver/algorithm.py | 1 - vitrage/storage/impl_sqlalchemy.py | 2 +- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/vitrage/evaluator/evaluator_service.py b/vitrage/evaluator/evaluator_service.py index 0943dd096..5d1bdb1e3 100644 --- a/vitrage/evaluator/evaluator_service.py +++ b/vitrage/evaluator/evaluator_service.py @@ -27,6 +27,7 @@ LOG = log.getLogger(__name__) START_EVALUATION = 'start_evaluation' +POISON_PILL = None class EvaluatorManager(EvaluatorBase): @@ -85,7 +86,9 @@ class EvaluatorManager(EvaluatorBase): q.join() def stop_all_workers(self): - self._notify_and_wait(None) + self._notify_and_wait(POISON_PILL) + for q in self._worker_queues: + q.close() self._worker_queues = list() def reload_all_workers(self, enabled=True): @@ -126,20 +129,17 @@ class EvaluatorWorker(os_service.Service): def _read_queue(self): while True: - try: - next_task = self._task_queue.get() - if next_task is None: - self._task_queue.task_done() - break # poison pill - self._do_task(next_task) + next_task = self._task_queue.get() + if next_task is POISON_PILL: self._task_queue.task_done() - # Evaluator queue may have been updated, thus the sleep: - time.sleep(0) + break + try: + self._do_task(next_task) except Exception as e: - # TODO(ihefetz): an exception here may break all the - # TODO(ihefetz): evaluators. If task_done was not called, - # TODO(ihefetz): evaluator manager will wait forever. - LOG.exception("Exception: %s", e) + LOG.exception("Graph may not be in sync: exception %s", e) + self._task_queue.task_done() + # Evaluator queue may have been updated, thus the sleep: + time.sleep(0) def _do_task(self, task): (before, current, is_vertex, action) = task diff --git a/vitrage/graph/algo_driver/algorithm.py b/vitrage/graph/algo_driver/algorithm.py index 9cf9616b3..7b4cc7859 100644 --- a/vitrage/graph/algo_driver/algorithm.py +++ b/vitrage/graph/algo_driver/algorithm.py @@ -59,7 +59,6 @@ class GraphAlgorithm(object): In sub-graph matching algorithms complexity is high in the general case Here it is considerably mitigated as we have an anchor in the graph. - TODO(ihefetz) document this :type known_mappings: list :type sub_graph: driver.Graph diff --git a/vitrage/storage/impl_sqlalchemy.py b/vitrage/storage/impl_sqlalchemy.py index 787979507..12e68f6bc 100644 --- a/vitrage/storage/impl_sqlalchemy.py +++ b/vitrage/storage/impl_sqlalchemy.py @@ -58,7 +58,7 @@ class Connection(base.Connection): engine = self._engine_facade.get_engine() engine.connect() models.Base.metadata.create_all(engine, checkfirst=False) - # TODO(ihefetz) upgrade logic is missing + # TODO(idan_hefetz) upgrade logic is missing def disconnect(self): self._engine_facade.get_engine().dispose()