From 9c5dacf42914209143570d32b6198f3494d0873d Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Mon, 17 Apr 2017 15:43:49 +0700 Subject: [PATCH] Add 'triggered_by' into task execution runtime context * Added 'triggered_by' into 'runtime_context' of task execution in the form: [ { 'task_id': '123-123-123', 'event': 'on-success' }, ... ] This allows to do backtracking of task execution history, i.e. we can see what task and on what event initiated the run of the given task. * Minor style changes. TODO: * Support 'join' task use case * Add 'runtime_context' into task execution REST resource Change-Id: I20e5d0d282527ea7996ff5f84eb11bc9c6e843fe Partially implements: blueprint mistral-previous-tasks --- mistral/engine/task_handler.py | 16 +++- mistral/engine/tasks.py | 11 ++- .../tests/unit/engine/test_direct_workflow.py | 94 ++++++++++++++++--- mistral/tests/unit/engine/test_task_cancel.py | 7 +- .../tests/unit/utils/test_inspect_utils.py | 5 +- mistral/workflow/commands.py | 57 ++++++++--- mistral/workflow/direct_workflow.py | 64 +++++++------ mistral/workflow/lookup_utils.py | 2 +- 8 files changed, 189 insertions(+), 67 deletions(-) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index d02915034..bd663c877 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -232,7 +232,8 @@ def _build_task_from_command(cmd): cmd.task_spec, cmd.ctx, unique_key=cmd.unique_key, - waiting=cmd.is_waiting() + waiting=cmd.is_waiting(), + triggered_by=cmd.triggered_by ) return task @@ -241,13 +242,22 @@ def _build_task_from_command(cmd): def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None, - unique_key=None, waiting=False): + unique_key=None, waiting=False, triggered_by=None): if task_spec.get_with_items(): cls = tasks.WithItemsTask else: cls = tasks.RegularTask - return cls(wf_ex, wf_spec, task_spec, ctx, task_ex, unique_key, waiting) + return cls( + wf_ex, + wf_spec, + task_spec, + ctx, + task_ex=task_ex, + unique_key=unique_key, + waiting=waiting, + triggered_by=triggered_by + ) @action_queue.process diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 2864450f5..4f872ee94 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -44,7 +44,7 @@ class Task(object): """ def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None, - unique_key=None, waiting=False): + unique_key=None, waiting=False, triggered_by=None): self.wf_ex = wf_ex self.task_spec = task_spec self.ctx = ctx @@ -52,6 +52,7 @@ class Task(object): self.wf_spec = wf_spec self.unique_key = unique_key self.waiting = waiting + self.triggered_by = triggered_by self.reset_flag = False self.created = False self.state_changed = False @@ -227,6 +228,14 @@ class Task(object): 'type': task_type } + if self.triggered_by: + values['runtime_context']['triggered_by'] = [ + { + 'task_id': self.triggered_by[0].id, + 'event': self.triggered_by[1] + } + ] + self.task_ex = db_api.create_task_execution(values) # Add to collection explicitly so that it's in a proper diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index dff33e5a6..bb1c2e1f8 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -38,7 +38,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): return db_api.get_workflow_execution(wf_ex.id) - def test_direct_workflow_on_closures(self): + def test_on_closures(self): wf_text = """ version: '2.0' @@ -48,7 +48,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): tasks: task1: description: | - Explicit 'fail' command should lead to workflow failure. + Explicit 'succeed' command should lead to workflow success. action: std.echo output="Echo" on-success: - task2 @@ -72,7 +72,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): action: std.noop """ - wf_ex = self._run_workflow(wf_text) + wf_ex = self._run_workflow(wf_text, expected_state=states.SUCCESS) with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -80,18 +80,16 @@ class DirectWorkflowEngineTest(base.EngineTestCase): tasks = wf_ex.task_executions task1 = self._assert_single_item(tasks, name='task1') - task3 = self._assert_single_item(tasks, name='task3') - task4 = self._assert_single_item(tasks, name='task4') + task2 = self._assert_single_item(tasks, name='task2') - self.assertEqual(3, len(tasks)) + self.assertEqual(2, len(tasks)) self.await_task_success(task1.id) - self.await_task_success(task3.id) - self.await_task_success(task4.id) + self.await_task_success(task2.id) self.assertTrue(wf_ex.state, states.ERROR) - def test_direct_workflow_condition_transition_not_triggering(self): + def test_condition_transition_not_triggering(self): wf_text = """--- version: '2.0' @@ -132,7 +130,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertTrue(wf_ex.state, states.ERROR) - def test_direct_workflow_change_state_after_success(self): + def test_change_state_after_success(self): wf_text = """ version: '2.0' @@ -656,7 +654,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): len(db_api.get_delayed_calls(target_method_name=mtd_name)) == 0 ) - def test_direct_workfow_output(self): + def test_output(self): wf_text = """--- version: '2.0' @@ -680,3 +678,77 @@ class DirectWorkflowEngineTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertDictEqual({}, wf_ex.output) + + def test_triggered_by(self): + wf_text = """--- + version: '2.0' + + wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + action: std.fail + on-error: task3 + + task3: + action: std.fail + on-error: noop + on-success: task4 + on-complete: task4 + + task4: + action: std.noop + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf', {}) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + task1 = self._assert_single_item(task_execs, name='task1') + task2 = self._assert_single_item(task_execs, name='task2') + task3 = self._assert_single_item(task_execs, name='task3') + task4 = self._assert_single_item(task_execs, name='task4') + + key = 'triggered_by' + + self.assertIsNone(task1.runtime_context.get(key)) + + self.assertListEqual( + [ + { + "task_id": task1.id, + "event": "on-success" + } + ], + task2.runtime_context.get(key) + ) + + self.assertListEqual( + [ + { + "task_id": task2.id, + "event": "on-error" + } + ], + task3.runtime_context.get(key) + ) + + self.assertListEqual( + [ + { + "task_id": task3.id, + "event": "on-complete" + } + ], + task4.runtime_context.get(key) + ) diff --git a/mistral/tests/unit/engine/test_task_cancel.py b/mistral/tests/unit/engine/test_task_cancel.py index 184c95935..e89801669 100644 --- a/mistral/tests/unit/engine/test_task_cancel.py +++ b/mistral/tests/unit/engine/test_task_cancel.py @@ -32,7 +32,6 @@ class TaskCancelTest(base.EngineTestCase): version: '2.0' wf: - type: direct tasks: task1: action: std.async_noop @@ -119,13 +118,11 @@ class TaskCancelTest(base.EngineTestCase): workflows: wf: - type: direct tasks: taskx: workflow: subwf subwf: - type: direct tasks: task1: action: std.async_noop @@ -207,7 +204,6 @@ class TaskCancelTest(base.EngineTestCase): version: '2.0' wf: - type: direct tasks: task1: action: std.async_noop @@ -294,10 +290,11 @@ class TaskCancelTest(base.EngineTestCase): def test_cancel_with_items_concurrency(self): wb_def = """ version: '2.0' + name: wb1 + workflows: wf1: - type: direct tasks: t1: with-items: i in <% list(range(0, 4)) %> diff --git a/mistral/tests/unit/utils/test_inspect_utils.py b/mistral/tests/unit/utils/test_inspect_utils.py index c4c794f1b..4654957db 100644 --- a/mistral/tests/unit/utils/test_inspect_utils.py +++ b/mistral/tests/unit/utils/test_inspect_utils.py @@ -38,7 +38,10 @@ class InspectUtilsTest(base.BaseTest): clazz = commands.RunTask parameters_str = i_u.get_arg_list_as_str(clazz.__init__) - self.assertEqual('wf_ex, wf_spec, task_spec, ctx', parameters_str) + self.assertEqual( + 'wf_ex, wf_spec, task_spec, ctx, triggered_by=null', + parameters_str + ) def test_get_parameters_str_with_function_parameter(self): diff --git a/mistral/workflow/commands.py b/mistral/workflow/commands.py index 45ee50465..f00987c34 100644 --- a/mistral/workflow/commands.py +++ b/mistral/workflow/commands.py @@ -22,16 +22,17 @@ class WorkflowCommand(object): """Workflow command. A set of workflow commands form a communication protocol between workflow - handler and its clients. When workflow handler makes a decision about + controller and its clients. When workflow controller makes a decision about how to continue a workflow it returns a set of commands so that a caller knows what to do next. """ - def __init__(self, wf_ex, wf_spec, task_spec, ctx): + def __init__(self, wf_ex, wf_spec, task_spec, ctx, triggered_by=None): self.wf_ex = wf_ex self.wf_spec = wf_spec self.task_spec = task_spec self.ctx = ctx or {} + self.triggered_by = triggered_by class Noop(WorkflowCommand): @@ -44,8 +45,14 @@ class Noop(WorkflowCommand): class RunTask(WorkflowCommand): """Instruction to run a workflow task.""" - def __init__(self, wf_ex, wf_spec, task_spec, ctx): - super(RunTask, self).__init__(wf_ex, wf_spec, task_spec, ctx) + def __init__(self, wf_ex, wf_spec, task_spec, ctx, triggered_by=None): + super(RunTask, self).__init__( + wf_ex, + wf_spec, + task_spec, + ctx, + triggered_by=triggered_by + ) self.wait = False self.unique_key = None @@ -82,8 +89,15 @@ class RunExistingTask(WorkflowCommand): class SetWorkflowState(WorkflowCommand): """Instruction to change a workflow state.""" - def __init__(self, wf_ex, wf_spec, task_spec, ctx, new_state, msg): - super(SetWorkflowState, self).__init__(wf_ex, wf_spec, task_spec, ctx) + def __init__(self, wf_ex, wf_spec, task_spec, ctx, new_state, msg=None, + triggered_by=None): + super(SetWorkflowState, self).__init__( + wf_ex, + wf_spec, + task_spec, + ctx, + triggered_by=triggered_by + ) self.new_state = new_state self.msg = msg @@ -92,14 +106,16 @@ class SetWorkflowState(WorkflowCommand): class FailWorkflow(SetWorkflowState): """Instruction to fail a workflow.""" - def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None): + def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None, + triggered_by=None): super(FailWorkflow, self).__init__( wf_ex, wf_spec, task_spec, ctx, states.ERROR, - msg + msg=msg, + triggered_by=triggered_by ) def __repr__(self): @@ -109,14 +125,16 @@ class FailWorkflow(SetWorkflowState): class SucceedWorkflow(SetWorkflowState): """Instruction to succeed a workflow.""" - def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None): + def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None, + triggered_by=None): super(SucceedWorkflow, self).__init__( wf_ex, wf_spec, task_spec, ctx, states.SUCCESS, - msg + msg=msg, + triggered_by=triggered_by ) def __repr__(self): @@ -126,14 +144,16 @@ class SucceedWorkflow(SetWorkflowState): class PauseWorkflow(SetWorkflowState): """Instruction to pause a workflow.""" - def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None): + def __init__(self, wf_ex, wf_spec, task_spec, ctx, msg=None, + triggered_by=None): super(PauseWorkflow, self).__init__( wf_ex, wf_spec, task_spec, ctx, states.PAUSED, - msg + msg=msg, + triggered_by=triggered_by ) def __repr__(self): @@ -155,7 +175,7 @@ def get_command_class(cmd_name): def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx, - explicit_params=None): + params=None, triggered_by=None): cmd_cls = get_command_class(cmd_name) or RunTask if issubclass(cmd_cls, SetWorkflowState): @@ -164,7 +184,14 @@ def create_command(cmd_name, wf_ex, wf_spec, task_spec, ctx, wf_spec, task_spec, ctx, - explicit_params.get('msg') + msg=params.get('msg'), + triggered_by=triggered_by ) else: - return cmd_cls(wf_ex, wf_spec, task_spec, ctx) + return cmd_cls( + wf_ex, + wf_spec, + task_spec, + ctx, + triggered_by=triggered_by + ) diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index 3767152be..b302fb429 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -116,7 +116,7 @@ class DirectWorkflowController(base.WorkflowController): ctx = data_flow.evaluate_task_outbound_context(task_ex) - for t_n, params in self._find_next_tasks(task_ex, ctx=ctx): + for t_n, params, event_name in self._find_next_tasks(task_ex, ctx=ctx): t_s = self.wf_spec.get_tasks()[t_n] if not (t_s or t_n in commands.RESERVED_CMDS): @@ -132,7 +132,8 @@ class DirectWorkflowController(base.WorkflowController): self.wf_spec, t_s, ctx, - params + params=params, + triggered_by=(task_ex, event_name) ) self._configure_if_join(cmd) @@ -161,11 +162,12 @@ class DirectWorkflowController(base.WorkflowController): def evaluate_workflow_final_context(self): ctx = {} - for t_ex in self._find_end_tasks(): + for t_ex in self._find_end_task_executions(): ctx = utils.merge_dicts( ctx, data_flow.evaluate_task_outbound_context(t_ex) ) + data_flow.remove_internal_data_from_context(ctx) return ctx @@ -202,7 +204,7 @@ class DirectWorkflowController(base.WorkflowController): return True - def _find_end_tasks(self): + def _find_end_task_executions(self): def is_end_task(t_ex): try: return not self._has_outbound_tasks(t_ex) @@ -214,8 +216,10 @@ class DirectWorkflowController(base.WorkflowController): return True return list( - filter(is_end_task, - lookup_utils.find_completed_tasks(self.wf_ex.id)) + filter( + is_end_task, + lookup_utils.find_completed_task_executions(self.wf_ex.id) + ) ) def _has_outbound_tasks(self, task_ex): @@ -241,33 +245,33 @@ class DirectWorkflowController(base.WorkflowController): self.wf_ex.input ) - t_names_and_params = [] + # [(task_name, 'on-success'|'on-error'|'on-complete', params), ...] + result = [] + + def process_clause(clause, event_name): + task_tuples = self._find_next_tasks_for_clause(clause, ctx_view) + + for t in task_tuples: + result.append((t[0], t[1], event_name)) + + if t_state == states.SUCCESS: + process_clause( + self.wf_spec.get_on_success_clause(t_name), + 'on-success' + ) + elif t_state == states.ERROR: + process_clause( + self.wf_spec.get_on_error_clause(t_name), + 'on-error' + ) if states.is_completed(t_state) and not states.is_cancelled(t_state): - t_names_and_params += ( - self._find_next_tasks_for_clause( - self.wf_spec.get_on_complete_clause(t_name), - ctx_view - ) + process_clause( + self.wf_spec.get_on_complete_clause(t_name), + 'on-complete' ) - if t_state == states.ERROR: - t_names_and_params += ( - self._find_next_tasks_for_clause( - self.wf_spec.get_on_error_clause(t_name), - ctx_view - ) - ) - - elif t_state == states.SUCCESS: - t_names_and_params += ( - self._find_next_tasks_for_clause( - self.wf_spec.get_on_success_clause(t_name), - ctx_view - ) - ) - - return t_names_and_params + return result @staticmethod def _find_next_tasks_for_clause(clause, ctx): @@ -276,7 +280,7 @@ class DirectWorkflowController(base.WorkflowController): This method finds next task(command) base on given {name: condition} dictionary. - :param clause: Dictionary {task_name: condition} taken from + :param clause: Tuple (task_name, condition, parameters) taken from 'on-complete', 'on-success' or 'on-error' clause. :param ctx: Context that clause expressions should be evaluated against of. diff --git a/mistral/workflow/lookup_utils.py b/mistral/workflow/lookup_utils.py index 54709d5f8..89ebe647c 100644 --- a/mistral/workflow/lookup_utils.py +++ b/mistral/workflow/lookup_utils.py @@ -115,7 +115,7 @@ def find_cancelled_task_executions(wf_ex_id): return find_task_executions_with_state(wf_ex_id, states.CANCELLED) -def find_completed_tasks(wf_ex_id): +def find_completed_task_executions(wf_ex_id): return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id)