Rework finding indirectly affected created joins

Current algorithm of finding the joins requires a recursive search
through the execution graph, which leads to a large number of calls
to the database.

To optimize it we introduce a new algorithm that requires only one db
request. It downloads all potencial join task ids and names, and then
analyzes them without any additional db calls.

Change-Id: Ic73f2112406e681ae8a2aa67bcbccebe488fc03c
This commit is contained in:
Mike Fedosin 2019-05-29 23:58:00 +02:00
parent 49a2765383
commit c52688523c
1 changed files with 43 additions and 52 deletions

View File

@ -187,8 +187,49 @@ class DirectWorkflowController(base.WorkflowController):
return self._get_join_logical_state(task_spec)
def find_indirectly_affected_task_executions(self, task_name):
return self._find_indirectly_affected_created_joins(task_name)
def _get_next_clauses(self, task_name):
res = self.wf_spec.get_on_success_clause(task_name)[:]
res += self.wf_spec.get_on_error_clause(task_name)
res += self.wf_spec.get_on_complete_clause(task_name)
return res
def find_indirectly_affected_task_executions(self, t_name):
all_joins = {task_spec.get_name()
for task_spec in self.wf_spec.get_tasks()
if task_spec.get_join()}
t_execs_cache = {
t_ex.name: t_ex for t_ex in self._get_task_executions(
fields=('id', 'name'),
name={'in': all_joins}
)
} if all_joins else {}
visited_task_names = set()
clauses = self._get_next_clauses(t_name)
res = set()
while clauses:
visited_task_names.add(t_name)
t_name, _, _ = clauses.pop()
# Handle cycles.
if t_name in visited_task_names:
continue
# Encountered an engine command.
if not self.wf_spec.get_tasks()[t_name]:
continue
if t_name in all_joins:
if t_name in t_execs_cache:
res.add(t_execs_cache[t_name])
continue
clauses += self._get_next_clauses(t_name)
return res
def is_error_handled_for(self, task_ex):
# TODO(rakhmerov): The method works in a different way than
@ -286,56 +327,6 @@ class DirectWorkflowController(base.WorkflowController):
if not condition or expr.evaluate(condition, ctx)
]
@profiler.trace('direct-wf-controller-find-downstream-joins')
def _find_indirectly_affected_created_joins(self, task_name, result=None,
visited_task_names=None):
visited_task_names = (
set() if visited_task_names is None else visited_task_names
)
if task_name in visited_task_names:
return
visited_task_names.add(task_name)
result = set() if result is None else result
def _process_clause(clause):
for t_name, condition, params in clause:
t_spec = self.wf_spec.get_tasks()[t_name]
# Encountered an engine command.
if not t_spec:
continue
if t_spec.get_join():
# TODO(rakhmerov): This is a fundamental limitation
# that prevents us having cycles within workflows
# that contain joins because we assume that there
# can be only one "join" task with a given name.
t_ex = self._find_task_execution_by_name(t_name)
if t_ex:
result.add(t_ex)
# If we found a "join" we don't need to go further
# because completion of the found join will handle
# other deeper joins.
continue
# Recursion.
self._find_indirectly_affected_created_joins(
t_name,
result=result,
visited_task_names=visited_task_names
)
_process_clause(self.wf_spec.get_on_success_clause(task_name))
_process_clause(self.wf_spec.get_on_error_clause(task_name))
_process_clause(self.wf_spec.get_on_complete_clause(task_name))
return result
@profiler.trace('direct-wf-controller-get-join-logical-state')
def _get_join_logical_state(self, task_spec):
"""Evaluates logical state of 'join' task.