Merge "Direct workflow code cleanup and refactoring"

This commit is contained in:
Zuul 2019-06-14 16:35:26 +00:00 committed by Gerrit Code Review
commit f4416678ba
1 changed files with 26 additions and 89 deletions

View File

@ -119,7 +119,7 @@ class DirectWorkflowController(base.WorkflowController):
ctx = data_flow.evaluate_task_outbound_context(task_ex)
for t_n, params, event_name in self._find_next_tasks(task_ex, ctx=ctx):
for t_n, params, event_name in self._find_next_tasks(task_ex, ctx):
t_s = self.wf_spec.get_tasks()[t_n]
if not (t_s or t_n in commands.ENGINE_CMD_CLS):
@ -260,87 +260,41 @@ class DirectWorkflowController(base.WorkflowController):
return res and not task_ex.has_next_tasks
def _find_next_tasks(self, task_ex, ctx=None):
t_state = task_ex.state
t_name = task_ex.name
ctx_view = {}
def _find_next_tasks(self, task_ex, ctx):
t_n = task_ex.name
t_s = task_ex.state
# Evaluate context only for conditional transitions or if it's
# explicitly provided.
if self._is_conditional_transition(task_ex) or ctx is not None:
# By default we don't download task context from the database,
# but just basic fields: 'id', 'name' and 'state'. It's a good
# optimization, because contexts can be too heavy and we don't
# need them most of the time.
# But sometimes we need it for conditional transitions (when
# the decision where to go is based on the current context),
# and if this is the case, we download full task execution
# and then evaluate its context to find the route.
# TODO(mfedosin): Think of a way to avoid this.
if not hasattr(task_ex, "in_context"):
task_ex = db_api.get_task_execution(task_ex.id)
ctx_view = data_flow.ContextView(
data_flow.get_current_task_dict(task_ex),
ctx or data_flow.evaluate_task_outbound_context(task_ex),
data_flow.get_workflow_environment_dict(self.wf_ex),
self.wf_ex.context,
self.wf_ex.input
)
ctx_view = data_flow.ContextView(
data_flow.get_current_task_dict(task_ex),
ctx,
data_flow.get_workflow_environment_dict(self.wf_ex),
self.wf_ex.context,
self.wf_ex.input
)
# [(task_name, params, 'on-success'|'on-error'|'on-complete'), ...]
result = []
def process_clause(clause, event_name):
task_tuples = self._find_next_tasks_for_clause(clause, ctx_view)
if t_s == states.ERROR:
for name, cond, params in self.wf_spec.get_on_error_clause(t_n):
if not cond or expr.evaluate(cond, ctx_view):
params = expr.evaluate_recursively(params, ctx_view)
result.append((name, params, 'on-error'))
for t in task_tuples:
result.append((t[0], t[1], event_name))
if t_s == states.SUCCESS:
for name, cond, params in self.wf_spec.get_on_success_clause(t_n):
if not cond or expr.evaluate(cond, ctx_view):
params = expr.evaluate_recursively(params, ctx_view)
result.append((name, params, 'on-success'))
if t_state == states.SUCCESS:
process_clause(
self.wf_spec.get_on_success_clause(t_name),
'on-success'
)
elif t_state == states.ERROR:
process_clause(
self.wf_spec.get_on_error_clause(t_name),
'on-error'
)
if states.is_completed(t_state) and not states.is_cancelled(t_state):
process_clause(
self.wf_spec.get_on_complete_clause(t_name),
'on-complete'
)
if states.is_completed(t_s) and not states.is_cancelled(t_s):
for name, cond, params in self.wf_spec.get_on_complete_clause(t_n):
if not cond or expr.evaluate(cond, ctx_view):
params = expr.evaluate_recursively(params, ctx_view)
result.append((name, params, 'on-complete'))
return result
@staticmethod
def _find_next_tasks_for_clause(clause, ctx):
"""Finds next tasks names.
This method finds next tasks(commands) base on given {name: condition}
dictionary.
:param clause: Tuple (task_name, condition, parameters) taken from
'on-complete', 'on-success' or 'on-error' clause.
:param ctx: Context that clause expressions should be evaluated
against of.
:return: List of task(command) names.
"""
if not clause:
return []
if not ctx:
return [(t_name, params) for t_name, _, params in clause]
return [
(t_name, expr.evaluate_recursively(params, ctx))
for t_name, condition, params in clause
if not condition or expr.evaluate(condition, ctx)
]
@profiler.trace(
'direct-wf-controller-get-join-logical-state',
hide_args=True
@ -578,20 +532,3 @@ class DirectWorkflowController(base.WorkflowController):
t_execs_cache[name] = None
return t_execs_cache
def _is_conditional_transition(self, t_ex):
if t_ex.state == states.ERROR:
for _, cond, _ in self.wf_spec.get_on_error_clause(t_ex.name):
if cond:
return True
if t_ex.state == states.SUCCESS:
for _, cond, _ in self.wf_spec.get_on_success_clause(t_ex.name):
if cond:
return True
for _, cond, _ in self.wf_spec.get_on_complete_clause(t_ex.name):
if cond:
return True
return False