diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 401ed941..c95a4c02 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -148,7 +148,7 @@ class DefaultEngine(base.Engine, coordination.Service): output=output ) - def on_task_state_change(self, task_ex_id, state): + def on_task_state_change(self, task_ex_id, state, state_info=None): with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) # TODO(rakhmerov): The method is mostly needed for policy and @@ -160,11 +160,12 @@ class DefaultEngine(base.Engine, coordination.Service): wf_trace.info( task_ex, - "Task '%s' [%s -> %s]" - % (task_ex.name, task_ex.state, state) + "Task '%s' [%s -> %s] state_info : %s" + % (task_ex.name, task_ex.state, state, state_info) ) task_ex.state = state + task_ex.state_info = state_info self._on_task_state_change(task_ex, wf_ex) diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 828309a1..dafa9dac 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -439,5 +439,6 @@ def fail_task_if_incomplete(task_ex_id, timeout): rpc.get_engine_client().on_task_state_change( task_ex_id, - states.ERROR + states.ERROR, + msg ) diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc.py index c114e1a7..847607a9 100644 --- a/mistral/engine/rpc.py +++ b/mistral/engine/rpc.py @@ -122,8 +122,9 @@ class EngineServer(object): **params ) - def on_task_state_change(self, rpc_ctx, task_ex_id, state): - return self._engine.on_task_state_change(task_ex_id, state) + def on_task_state_change(self, rpc_ctx, task_ex_id, state, + state_info=None): + return self._engine.on_task_state_change(task_ex_id, state, state_info) def on_action_complete(self, rpc_ctx, action_ex_id, result_data, result_error): @@ -308,12 +309,13 @@ class EngineClient(base.Engine): params=params ) - def on_task_state_change(self, task_ex_id, state): + def on_task_state_change(self, task_ex_id, state, state_info=None): return self._client.call( auth_ctx.ctx(), 'on_task_state_change', task_ex_id=task_ex_id, - state=state + state=state, + state_info=state_info ) @wrap_messaging_exception diff --git a/mistral/tests/unit/engine/test_task_defaults.py b/mistral/tests/unit/engine/test_task_defaults.py index 1f0e9c80..42bb0696 100644 --- a/mistral/tests/unit/engine/test_task_defaults.py +++ b/mistral/tests/unit/engine/test_task_defaults.py @@ -15,7 +15,6 @@ import datetime as dt from oslo_config import cfg from oslo_log import log as logging -import testtools from mistral.db.v2 import api as db_api from mistral.services import scheduler @@ -129,7 +128,6 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase): task1.runtime_context['retry_task_policy']['retry_no'] > 0 ) - @testtools.skip("Fix 'timeout' policy.") def test_task_defaults_timeout_policy(self): wf_text = """--- version: '2.0' @@ -165,6 +163,9 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase): self._assert_single_item(tasks, name='task1', state=states.ERROR) + task_ex = db_api.get_task_execution(tasks[0].id) + self.assertIn("Task timed out", task_ex.state_info) + def test_task_defaults_wait_policies(self): wf_text = """--- version: '2.0'