diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index de1d0b78c..589ee7443 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -62,6 +62,10 @@ def refresh(model): IMPL.refresh(model) +def expire_all(): + IMPL.expire_all() + + # Locking. diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index cbde52486..14b46531f 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -125,6 +125,11 @@ def refresh(model, session=None): session.refresh(model) +@b.session_aware() +def expire_all(session=None): + session.expire_all() + + @b.session_aware() def acquire_lock(model, id, session=None): # Expire all so all objects queried after lock is acquired diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index e2fd62ffe..0e8bef3e7 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -414,6 +414,16 @@ class Workflow(object): if incomplete_tasks_count > 0: return incomplete_tasks_count + LOG.debug("Workflow completed [id=%s]", self.wf_ex.id) + + # NOTE(rakhmerov): Once we know that the workflow has completed, + # we need to expire all the objects in the DB session to make sure + # to read the most relevant data from the DB (that's already been + # committed in parallel transactions). Otherwise, some data like + # workflow context may be stale and decisions made upon it will be + # wrong. + db_api.expire_all() + wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) if wf_ctrl.any_cancels(): diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index 0612f16ee..364542fef 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -816,6 +816,44 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertDictEqual({}, wf_ex.output) + def test_output_expression(self): + wf_text = """--- + version: '2.0' + + wf: + output: + continue_flag: <% $.continue_flag %> + + task-defaults: + on-error: + - task2 + + tasks: + task1: + action: std.fail + on-success: task3 + + task2: + action: std.noop + publish: + continue_flag: false + + task3: + action: std.noop + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf') + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(2, len(wf_ex.task_executions)) + self.assertDictEqual({'continue_flag': False}, wf_ex.output) + def test_triggered_by(self): wf_text = """--- version: '2.0' diff --git a/releasenotes/notes/fix_workflow_output-cee5df431679de6b.yaml b/releasenotes/notes/fix_workflow_output-cee5df431679de6b.yaml new file mode 100644 index 000000000..46dec0a4d --- /dev/null +++ b/releasenotes/notes/fix_workflow_output-cee5df431679de6b.yaml @@ -0,0 +1,13 @@ +--- + +fixes: + - | + Workflow output sometimes was not calculated correctly due to + the race condition between different transactions: the one that + checks workflow completion (i.e. calls "check_and_complete") and + the one that processes action execution completion (i.e. calls + "on_action_complete"). Calculating output sometimes was based on + stale data cached by the SQLAlchemy session. To fix this, we just + need to expire all objects in the session so that they are + refreshed automatically if we read their state in order to make + required calculations. The corresponding change was made.