diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index b861a9810..ec1d2cb8d 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -34,11 +34,14 @@ _CHECK_AND_COMPLETE_PATH = ( @profiler.trace('workflow-handler-start-workflow') def start_workflow(wf_identifier, wf_input, desc, params): - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_identifier) - ) + wf = workflows.Workflow() - wf.start(wf_input, desc=desc, params=params) + wf.start( + wf_def=db_api.get_workflow_definition(wf_identifier), + input_dict=wf_input, + desc=desc, + params=params + ) _schedule_check_and_complete(wf.wf_ex) @@ -46,10 +49,7 @@ def start_workflow(wf_identifier, wf_input, desc, params): def stop_workflow(wf_ex, state, msg=None): - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) # In this case we should not try to handle possible errors. Instead, # we need to let them pop up since the typical way of failing objects @@ -86,10 +86,7 @@ def _check_and_complete(wf_ex_id): if not wf_ex or states.is_completed(wf_ex.state): return - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) try: incomplete_tasks_count = wf.check_and_complete() @@ -121,10 +118,7 @@ def _check_and_complete(wf_ex_id): def pause_workflow(wf_ex, msg=None): - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) wf.set_state(states.PAUSED, msg) @@ -133,10 +127,7 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None): if wf_ex.state == states.PAUSED: return wf_ex.get_clone() - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) wf.rerun(task_ex, reset=reset, env=env) @@ -150,10 +141,7 @@ def resume_workflow(wf_ex, env=None): if not states.is_paused_or_idle(wf_ex.state): return wf_ex.get_clone() - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) wf.resume(env=env) diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index fd73b2377..eb09012dc 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -54,8 +54,7 @@ class Workflow(object): Mistral engine or its components in order to manipulate with workflows. """ - def __init__(self, wf_def, wf_ex=None): - self.wf_def = wf_def + def __init__(self, wf_ex=None): self.wf_ex = wf_ex if wf_ex: @@ -64,16 +63,13 @@ class Workflow(object): wf_ex.id ) else: - # New workflow execution. - self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( - wf_def.id, - wf_def.updated_at - ) + self.wf_spec = None @profiler.trace('workflow-start') - def start(self, input_dict, desc='', params=None): + def start(self, wf_def, input_dict, desc='', params=None): """Start workflow. + :param wf_def: Workflow definition. :param input_dict: Workflow input. :param desc: Workflow execution description. :param params: Workflow type specific parameters. @@ -81,17 +77,23 @@ class Workflow(object): assert not self.wf_ex + # New workflow execution. + self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wf_def.id, + wf_def.updated_at + ) + wf_trace.info( self.wf_ex, "Starting workflow [name=%s, input=%s]" % - (self.wf_def.name, utils.cut(input_dict)) + (wf_def.name, utils.cut(input_dict)) ) # TODO(rakhmerov): This call implicitly changes input_dict! Fix it! # After fix we need to move validation after adding risky fields. - eng_utils.validate_input(self.wf_def, input_dict, self.wf_spec) + eng_utils.validate_input(wf_def, input_dict, self.wf_spec) - self._create_execution(input_dict, desc, params) + self._create_execution(wf_def, input_dict, desc, params) self.set_state(states.RUNNING) @@ -202,12 +204,12 @@ class Workflow(object): ) return final_context - def _create_execution(self, input_dict, desc, params): + def _create_execution(self, wf_def, input_dict, desc, params): self.wf_ex = db_api.create_workflow_execution({ - 'name': self.wf_def.name, + 'name': wf_def.name, 'description': desc, - 'workflow_name': self.wf_def.name, - 'workflow_id': self.wf_def.id, + 'workflow_name': wf_def.name, + 'workflow_id': wf_def.id, 'spec': self.wf_spec.to_dict(), 'state': states.IDLE, 'output': {}, @@ -272,10 +274,7 @@ class Workflow(object): self.wf_ex.task_execution_id ) - parent_wf = Workflow( - db_api.get_workflow_definition(parent_task_ex.workflow_id), - parent_task_ex.workflow_execution - ) + parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution) parent_wf.lock() diff --git a/mistral/tests/unit/engine/test_workflow_cancel.py b/mistral/tests/unit/engine/test_workflow_cancel.py index afac2b962..bc9ebdf26 100644 --- a/mistral/tests/unit/engine/test_workflow_cancel.py +++ b/mistral/tests/unit/engine/test_workflow_cancel.py @@ -71,6 +71,33 @@ class WorkflowCancelTest(base.EngineTestCase): self.assertEqual(1, len(task_execs)) self.assertEqual(states.SUCCESS, task_1_ex.state) + def test_cancel_workflow_if_definition_deleted(self): + workflow = """ + version: '2.0' + + wf: + type: direct + tasks: + task1: + action: std.echo output="foo" + wait-before: 5 + """ + + wf = wf_service.create_workflows(workflow)[0] + + wf_ex = self.engine.start_workflow('wf', {}) + + with db_api.transaction(): + db_api.delete_workflow_definition(wf.id) + + self.engine.stop_workflow( + wf_ex.id, + states.CANCELLED, + "Cancelled by user." + ) + + self.await_workflow_cancelled(wf_ex.id) + def test_cancel_paused_workflow(self): workflow = """ version: '2.0'