From 8549aeaf664831195f56be8b2fc977e0d136287d Mon Sep 17 00:00:00 2001 From: Mike Fedosin Date: Mon, 20 May 2019 18:09:55 +0200 Subject: [PATCH] Optimize searching of upstream task executions Now to find upstream task executions mistral does a lookup for each inbound task spec. This is not optimal, because it leads to a significant number of db requests. To optimize this behavior we prepare a list of task names in advance, and then search executions using 'in' filter. Change-Id: Ia7bf62c45b889f753671bdda048f91c46af41039 --- mistral/workflow/base.py | 8 ++++++++ mistral/workflow/direct_workflow.py | 19 +++++++------------ mistral/workflow/reverse_workflow.py | 17 +++-------------- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 4e302f3b2..1df67e060 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -20,6 +20,7 @@ import abc from oslo_log import log as logging from osprofiler import profiler +from mistral.db.v2 import api as db_api from mistral import exceptions as exc from mistral.lang import parser as spec_parser from mistral import utils as u @@ -263,3 +264,10 @@ class WorkflowController(object): def _is_paused_or_completed(self): return states.is_paused_or_completed(self.wf_ex.state) + + def _get_task_executions(self, **kwargs): + return db_api.get_task_executions( + workflow_execution_id=self.wf_ex.id, + sort_keys=[], # disable sorting + **kwargs + ) diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index 20cde942e..a646fe4a6 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -45,15 +45,12 @@ class DirectWorkflowController(base.WorkflowController): __workflow_type__ = "direct" def _get_upstream_task_executions(self, task_spec): - return list( - filter( - lambda t_e: self._is_upstream_task_execution(task_spec, t_e), - lookup_utils.find_task_executions_by_specs( - self.wf_ex.id, - self.wf_spec.find_inbound_task_specs(task_spec) - ) - ) - ) + t_specs_names = [t_spec.get_name() for t_spec in + self.wf_spec.find_inbound_task_specs(task_spec)] + t_execs = self._get_task_executions(name={'in': t_specs_names}) + + return [t_ex for t_ex in t_execs + if self._is_upstream_task_execution(task_spec, t_ex)] def _is_upstream_task_execution(self, t_spec, t_ex_candidate): if not states.is_completed(t_ex_candidate.state): @@ -366,10 +363,8 @@ class DirectWorkflowController(base.WorkflowController): names = self._find_all_parent_task_names(task_spec) t_execs_cache = { - t_ex.name: t_ex for t_ex in db_api.get_task_executions( + t_ex.name: t_ex for t_ex in self._get_task_executions( fields=('id', 'name', 'state'), - sort_keys=[], - workflow_execution_id=self.wf_ex.id, name={'in': names} ) } if names else {} # don't perform a db request if 'names' are empty diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index 8411bbf7c..6efb2ce5a 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -83,21 +83,10 @@ class ReverseWorkflowController(base.WorkflowController): return task_spec def _get_upstream_task_executions(self, task_spec): - t_specs = [ - self.wf_spec.get_tasks()[t_name] - for t_name in self.wf_spec.get_task_requires(task_spec) - or [] - ] + t_specs_names = self.wf_spec.get_task_requires(task_spec) or [] + t_execs = self._get_task_executions(name={'in': t_specs_names}) - return list( - filter( - lambda t_e: t_e.state == states.SUCCESS, - lookup_utils.find_task_executions_by_specs( - self.wf_ex.id, - t_specs - ) - ) - ) + return [t_ex for t_ex in t_execs if t_ex.state == states.SUCCESS] def evaluate_workflow_final_context(self): task_execs = lookup_utils.find_task_executions_by_spec(