Merge "Optimize searching of upstream task executions"
This commit is contained in:
commit
fefdcd77cd
@ -20,6 +20,7 @@ import abc
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral import utils as u
|
||||
@ -263,3 +264,10 @@ class WorkflowController(object):
|
||||
|
||||
def _is_paused_or_completed(self):
|
||||
return states.is_paused_or_completed(self.wf_ex.state)
|
||||
|
||||
def _get_task_executions(self, **kwargs):
|
||||
return db_api.get_task_executions(
|
||||
workflow_execution_id=self.wf_ex.id,
|
||||
sort_keys=[], # disable sorting
|
||||
**kwargs
|
||||
)
|
||||
|
@ -45,15 +45,12 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
__workflow_type__ = "direct"
|
||||
|
||||
def _get_upstream_task_executions(self, task_spec):
|
||||
return list(
|
||||
filter(
|
||||
lambda t_e: self._is_upstream_task_execution(task_spec, t_e),
|
||||
lookup_utils.find_task_executions_by_specs(
|
||||
self.wf_ex.id,
|
||||
self.wf_spec.find_inbound_task_specs(task_spec)
|
||||
)
|
||||
)
|
||||
)
|
||||
t_specs_names = [t_spec.get_name() for t_spec in
|
||||
self.wf_spec.find_inbound_task_specs(task_spec)]
|
||||
t_execs = self._get_task_executions(name={'in': t_specs_names})
|
||||
|
||||
return [t_ex for t_ex in t_execs
|
||||
if self._is_upstream_task_execution(task_spec, t_ex)]
|
||||
|
||||
def _is_upstream_task_execution(self, t_spec, t_ex_candidate):
|
||||
if not states.is_completed(t_ex_candidate.state):
|
||||
@ -366,10 +363,8 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
names = self._find_all_parent_task_names(task_spec)
|
||||
|
||||
t_execs_cache = {
|
||||
t_ex.name: t_ex for t_ex in db_api.get_task_executions(
|
||||
t_ex.name: t_ex for t_ex in self._get_task_executions(
|
||||
fields=('id', 'name', 'state'),
|
||||
sort_keys=[],
|
||||
workflow_execution_id=self.wf_ex.id,
|
||||
name={'in': names}
|
||||
)
|
||||
} if names else {} # don't perform a db request if 'names' are empty
|
||||
|
@ -83,21 +83,10 @@ class ReverseWorkflowController(base.WorkflowController):
|
||||
return task_spec
|
||||
|
||||
def _get_upstream_task_executions(self, task_spec):
|
||||
t_specs = [
|
||||
self.wf_spec.get_tasks()[t_name]
|
||||
for t_name in self.wf_spec.get_task_requires(task_spec)
|
||||
or []
|
||||
]
|
||||
t_specs_names = self.wf_spec.get_task_requires(task_spec) or []
|
||||
t_execs = self._get_task_executions(name={'in': t_specs_names})
|
||||
|
||||
return list(
|
||||
filter(
|
||||
lambda t_e: t_e.state == states.SUCCESS,
|
||||
lookup_utils.find_task_executions_by_specs(
|
||||
self.wf_ex.id,
|
||||
t_specs
|
||||
)
|
||||
)
|
||||
)
|
||||
return [t_ex for t_ex in t_execs if t_ex.state == states.SUCCESS]
|
||||
|
||||
def evaluate_workflow_final_context(self):
|
||||
task_execs = lookup_utils.find_task_executions_by_spec(
|
||||
|
Loading…
Reference in New Issue
Block a user