Merge "Add '__task_execution' structure to task execution context on the fly"
This commit is contained in:
commit
135353093b
mistral
engine
tests/unit/engine
workflow
@ -338,6 +338,7 @@ class RetryPolicy(base.TaskPolicy):
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
data_flow.get_current_task_dict(task_ex),
|
||||
data_flow.evaluate_task_outbound_context(task_ex),
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
|
@ -311,8 +311,6 @@ class Task(object):
|
||||
task_name = self.task_spec.get_name()
|
||||
task_type = self.task_spec.get_type()
|
||||
|
||||
data_flow.add_current_task_to_context(self.ctx, task_id, task_name)
|
||||
|
||||
values = {
|
||||
'id': task_id,
|
||||
'name': task_name,
|
||||
@ -430,16 +428,13 @@ class RegularTask(Task):
|
||||
self._schedule_actions()
|
||||
|
||||
def _update_inbound_context(self):
|
||||
task_ex = self.task_ex
|
||||
assert task_ex
|
||||
assert self.task_ex
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
|
||||
|
||||
self.ctx = wf_ctrl.get_task_inbound_context(self.task_spec)
|
||||
data_flow.add_current_task_to_context(self.ctx, task_ex.id,
|
||||
task_ex.name)
|
||||
|
||||
utils.update_dict(task_ex.in_context, self.ctx)
|
||||
utils.update_dict(self.task_ex.in_context, self.ctx)
|
||||
|
||||
def _update_triggered_by(self):
|
||||
assert self.task_ex
|
||||
@ -515,17 +510,17 @@ class RegularTask(Task):
|
||||
)
|
||||
|
||||
def _evaluate_expression(self, expression, ctx=None):
|
||||
ctx = ctx or self.ctx
|
||||
ctx_view = data_flow.ContextView(
|
||||
ctx,
|
||||
data_flow.get_current_task_dict(self.task_ex),
|
||||
ctx or self.ctx,
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
)
|
||||
input_dict = expr.evaluate_recursively(
|
||||
|
||||
return expr.evaluate_recursively(
|
||||
expression,
|
||||
ctx_view
|
||||
)
|
||||
return input_dict
|
||||
|
||||
def _build_action(self):
|
||||
action_name = self.task_spec.get_action_name()
|
||||
|
@ -846,14 +846,6 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
task2_1_ex = self._assert_single_item(tasks_execs, name='task2_1')
|
||||
task2_2_ex = self._assert_single_item(tasks_execs, name='task2_2')
|
||||
|
||||
# TODO(rakhmerov): Find out why '__task_execution' is still
|
||||
# in the inbound context
|
||||
del task0_ex.in_context['__task_execution']
|
||||
del task1_1_ex.in_context['__task_execution']
|
||||
del task1_2_ex.in_context['__task_execution']
|
||||
del task2_1_ex.in_context['__task_execution']
|
||||
del task2_2_ex.in_context['__task_execution']
|
||||
|
||||
self.assertDictEqual({}, task0_ex.in_context)
|
||||
self.assertDictEqual({'var0': 'val0'}, task1_1_ex.in_context)
|
||||
self.assertDictEqual(
|
||||
|
@ -206,10 +206,12 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task1_ex = self._assert_single_item(
|
||||
wf_ex.task_executions, name='task1'
|
||||
wf_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
task2_ex = self._assert_single_item(
|
||||
wf_ex.task_executions, name='task2'
|
||||
wf_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
self.assertDictEqual(
|
||||
@ -229,6 +231,11 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
|
||||
task2_ex.published
|
||||
)
|
||||
|
||||
# The internal data needed for evaluation of the task() function
|
||||
# should not be persisted to DB.
|
||||
self.assertNotIn('__task_execution', task1_ex.in_context)
|
||||
self.assertNotIn('__task_execution', task2_ex.in_context)
|
||||
|
||||
def test_task_function_no_name_on_complete_case(self):
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
@ -190,7 +190,12 @@ def publish_variables(task_ex, task_spec):
|
||||
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
expr_ctx = ContextView(task_ex.in_context, wf_ex.context, wf_ex.input)
|
||||
expr_ctx = ContextView(
|
||||
get_current_task_dict(task_ex),
|
||||
task_ex.in_context,
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
|
||||
if task_ex.name in expr_ctx:
|
||||
LOG.warning(
|
||||
@ -268,19 +273,14 @@ def evaluate_workflow_output(wf_ex, wf_output, ctx):
|
||||
return output or ctx
|
||||
|
||||
|
||||
def add_current_task_to_context(ctx, task_id, task_name):
|
||||
ctx['__task_execution'] = {
|
||||
'id': task_id,
|
||||
'name': task_name
|
||||
def get_current_task_dict(task_ex):
|
||||
return {
|
||||
'__task_execution': {
|
||||
'id': task_ex.id,
|
||||
'name': task_ex.name
|
||||
}
|
||||
}
|
||||
|
||||
return ctx
|
||||
|
||||
|
||||
def remove_internal_data_from_context(ctx):
|
||||
if '__task_execution' in ctx:
|
||||
del ctx['__task_execution']
|
||||
|
||||
|
||||
def add_openstack_data_to_context(wf_ex):
|
||||
wf_ex.context = wf_ex.context or {}
|
||||
|
@ -125,8 +125,6 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
elif not t_s:
|
||||
t_s = self.wf_spec.get_tasks()[task_ex.name]
|
||||
|
||||
data_flow.remove_internal_data_from_context(ctx)
|
||||
|
||||
triggered_by = [
|
||||
{
|
||||
'task_id': task_ex.id,
|
||||
@ -176,8 +174,6 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
data_flow.evaluate_task_outbound_context(t_ex)
|
||||
)
|
||||
|
||||
data_flow.remove_internal_data_from_context(ctx)
|
||||
|
||||
return ctx
|
||||
|
||||
def get_logical_task_state(self, task_ex):
|
||||
@ -248,6 +244,7 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
t_name = task_ex.name
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
data_flow.get_current_task_dict(task_ex),
|
||||
ctx or data_flow.evaluate_task_outbound_context(task_ex),
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
|
Loading…
x
Reference in New Issue
Block a user