From 11e275a3cb520c8034b643194cccd7e45cf8e765 Mon Sep 17 00:00:00 2001 From: Istvan Imre Date: Tue, 28 Feb 2017 11:52:03 +0100 Subject: [PATCH] Correction in workflow state change handling In case of workflow definition is deleted attached execution could not change their state. Solution is to remove unnecessary workflow definition queries before execution state changes. Change-Id: I2900388c8155e5ff0c117e0799df975588f80379 Closes-Bug: #1668523 --- mistral/engine/workflow_handler.py | 36 ++++++------------ mistral/engine/workflows.py | 37 +++++++++---------- .../tests/unit/engine/test_workflow_cancel.py | 27 ++++++++++++++ 3 files changed, 57 insertions(+), 43 deletions(-) diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index b861a981..ec1d2cb8 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -34,11 +34,14 @@ _CHECK_AND_COMPLETE_PATH = ( @profiler.trace('workflow-handler-start-workflow') def start_workflow(wf_identifier, wf_input, desc, params): - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_identifier) - ) + wf = workflows.Workflow() - wf.start(wf_input, desc=desc, params=params) + wf.start( + wf_def=db_api.get_workflow_definition(wf_identifier), + input_dict=wf_input, + desc=desc, + params=params + ) _schedule_check_and_complete(wf.wf_ex) @@ -46,10 +49,7 @@ def start_workflow(wf_identifier, wf_input, desc, params): def stop_workflow(wf_ex, state, msg=None): - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) # In this case we should not try to handle possible errors. Instead, # we need to let them pop up since the typical way of failing objects @@ -86,10 +86,7 @@ def _check_and_complete(wf_ex_id): if not wf_ex or states.is_completed(wf_ex.state): return - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) try: incomplete_tasks_count = wf.check_and_complete() @@ -121,10 +118,7 @@ def _check_and_complete(wf_ex_id): def pause_workflow(wf_ex, msg=None): - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) wf.set_state(states.PAUSED, msg) @@ -133,10 +127,7 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None): if wf_ex.state == states.PAUSED: return wf_ex.get_clone() - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) wf.rerun(task_ex, reset=reset, env=env) @@ -150,10 +141,7 @@ def resume_workflow(wf_ex, env=None): if not states.is_paused_or_idle(wf_ex.state): return wf_ex.get_clone() - wf = workflows.Workflow( - db_api.get_workflow_definition(wf_ex.workflow_id), - wf_ex=wf_ex - ) + wf = workflows.Workflow(wf_ex=wf_ex) wf.resume(env=env) diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index fd73b237..eb09012d 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -54,8 +54,7 @@ class Workflow(object): Mistral engine or its components in order to manipulate with workflows. """ - def __init__(self, wf_def, wf_ex=None): - self.wf_def = wf_def + def __init__(self, wf_ex=None): self.wf_ex = wf_ex if wf_ex: @@ -64,16 +63,13 @@ class Workflow(object): wf_ex.id ) else: - # New workflow execution. - self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( - wf_def.id, - wf_def.updated_at - ) + self.wf_spec = None @profiler.trace('workflow-start') - def start(self, input_dict, desc='', params=None): + def start(self, wf_def, input_dict, desc='', params=None): """Start workflow. + :param wf_def: Workflow definition. :param input_dict: Workflow input. :param desc: Workflow execution description. :param params: Workflow type specific parameters. @@ -81,17 +77,23 @@ class Workflow(object): assert not self.wf_ex + # New workflow execution. + self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wf_def.id, + wf_def.updated_at + ) + wf_trace.info( self.wf_ex, "Starting workflow [name=%s, input=%s]" % - (self.wf_def.name, utils.cut(input_dict)) + (wf_def.name, utils.cut(input_dict)) ) # TODO(rakhmerov): This call implicitly changes input_dict! Fix it! # After fix we need to move validation after adding risky fields. - eng_utils.validate_input(self.wf_def, input_dict, self.wf_spec) + eng_utils.validate_input(wf_def, input_dict, self.wf_spec) - self._create_execution(input_dict, desc, params) + self._create_execution(wf_def, input_dict, desc, params) self.set_state(states.RUNNING) @@ -202,12 +204,12 @@ class Workflow(object): ) return final_context - def _create_execution(self, input_dict, desc, params): + def _create_execution(self, wf_def, input_dict, desc, params): self.wf_ex = db_api.create_workflow_execution({ - 'name': self.wf_def.name, + 'name': wf_def.name, 'description': desc, - 'workflow_name': self.wf_def.name, - 'workflow_id': self.wf_def.id, + 'workflow_name': wf_def.name, + 'workflow_id': wf_def.id, 'spec': self.wf_spec.to_dict(), 'state': states.IDLE, 'output': {}, @@ -272,10 +274,7 @@ class Workflow(object): self.wf_ex.task_execution_id ) - parent_wf = Workflow( - db_api.get_workflow_definition(parent_task_ex.workflow_id), - parent_task_ex.workflow_execution - ) + parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution) parent_wf.lock() diff --git a/mistral/tests/unit/engine/test_workflow_cancel.py b/mistral/tests/unit/engine/test_workflow_cancel.py index afac2b96..bc9ebdf2 100644 --- a/mistral/tests/unit/engine/test_workflow_cancel.py +++ b/mistral/tests/unit/engine/test_workflow_cancel.py @@ -71,6 +71,33 @@ class WorkflowCancelTest(base.EngineTestCase): self.assertEqual(1, len(task_execs)) self.assertEqual(states.SUCCESS, task_1_ex.state) + def test_cancel_workflow_if_definition_deleted(self): + workflow = """ + version: '2.0' + + wf: + type: direct + tasks: + task1: + action: std.echo output="foo" + wait-before: 5 + """ + + wf = wf_service.create_workflows(workflow)[0] + + wf_ex = self.engine.start_workflow('wf', {}) + + with db_api.transaction(): + db_api.delete_workflow_definition(wf.id) + + self.engine.stop_workflow( + wf_ex.id, + states.CANCELLED, + "Cancelled by user." + ) + + self.await_workflow_cancelled(wf_ex.id) + def test_cancel_paused_workflow(self): workflow = """ version: '2.0'