Merge "If task fails on timeout - there is no clear message of failure"
This commit is contained in:
commit
ce372c7c8b
@ -148,7 +148,7 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||||||
output=output
|
output=output
|
||||||
)
|
)
|
||||||
|
|
||||||
def on_task_state_change(self, task_ex_id, state):
|
def on_task_state_change(self, task_ex_id, state, state_info=None):
|
||||||
with db_api.transaction():
|
with db_api.transaction():
|
||||||
task_ex = db_api.get_task_execution(task_ex_id)
|
task_ex = db_api.get_task_execution(task_ex_id)
|
||||||
# TODO(rakhmerov): The method is mostly needed for policy and
|
# TODO(rakhmerov): The method is mostly needed for policy and
|
||||||
@ -160,11 +160,12 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||||||
|
|
||||||
wf_trace.info(
|
wf_trace.info(
|
||||||
task_ex,
|
task_ex,
|
||||||
"Task '%s' [%s -> %s]"
|
"Task '%s' [%s -> %s] state_info : %s"
|
||||||
% (task_ex.name, task_ex.state, state)
|
% (task_ex.name, task_ex.state, state, state_info)
|
||||||
)
|
)
|
||||||
|
|
||||||
task_ex.state = state
|
task_ex.state = state
|
||||||
|
task_ex.state_info = state_info
|
||||||
|
|
||||||
self._on_task_state_change(task_ex, wf_ex)
|
self._on_task_state_change(task_ex, wf_ex)
|
||||||
|
|
||||||
|
@ -439,5 +439,6 @@ def fail_task_if_incomplete(task_ex_id, timeout):
|
|||||||
|
|
||||||
rpc.get_engine_client().on_task_state_change(
|
rpc.get_engine_client().on_task_state_change(
|
||||||
task_ex_id,
|
task_ex_id,
|
||||||
states.ERROR
|
states.ERROR,
|
||||||
|
msg
|
||||||
)
|
)
|
||||||
|
@ -122,8 +122,9 @@ class EngineServer(object):
|
|||||||
**params
|
**params
|
||||||
)
|
)
|
||||||
|
|
||||||
def on_task_state_change(self, rpc_ctx, task_ex_id, state):
|
def on_task_state_change(self, rpc_ctx, task_ex_id, state,
|
||||||
return self._engine.on_task_state_change(task_ex_id, state)
|
state_info=None):
|
||||||
|
return self._engine.on_task_state_change(task_ex_id, state, state_info)
|
||||||
|
|
||||||
def on_action_complete(self, rpc_ctx, action_ex_id, result_data,
|
def on_action_complete(self, rpc_ctx, action_ex_id, result_data,
|
||||||
result_error):
|
result_error):
|
||||||
@ -308,12 +309,13 @@ class EngineClient(base.Engine):
|
|||||||
params=params
|
params=params
|
||||||
)
|
)
|
||||||
|
|
||||||
def on_task_state_change(self, task_ex_id, state):
|
def on_task_state_change(self, task_ex_id, state, state_info=None):
|
||||||
return self._client.call(
|
return self._client.call(
|
||||||
auth_ctx.ctx(),
|
auth_ctx.ctx(),
|
||||||
'on_task_state_change',
|
'on_task_state_change',
|
||||||
task_ex_id=task_ex_id,
|
task_ex_id=task_ex_id,
|
||||||
state=state
|
state=state,
|
||||||
|
state_info=state_info
|
||||||
)
|
)
|
||||||
|
|
||||||
@wrap_messaging_exception
|
@wrap_messaging_exception
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
import datetime as dt
|
import datetime as dt
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import testtools
|
|
||||||
|
|
||||||
from mistral.db.v2 import api as db_api
|
from mistral.db.v2 import api as db_api
|
||||||
from mistral.services import scheduler
|
from mistral.services import scheduler
|
||||||
@ -129,7 +128,6 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
|
|||||||
task1.runtime_context['retry_task_policy']['retry_no'] > 0
|
task1.runtime_context['retry_task_policy']['retry_no'] > 0
|
||||||
)
|
)
|
||||||
|
|
||||||
@testtools.skip("Fix 'timeout' policy.")
|
|
||||||
def test_task_defaults_timeout_policy(self):
|
def test_task_defaults_timeout_policy(self):
|
||||||
wf_text = """---
|
wf_text = """---
|
||||||
version: '2.0'
|
version: '2.0'
|
||||||
@ -165,6 +163,9 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
|
|||||||
|
|
||||||
self._assert_single_item(tasks, name='task1', state=states.ERROR)
|
self._assert_single_item(tasks, name='task1', state=states.ERROR)
|
||||||
|
|
||||||
|
task_ex = db_api.get_task_execution(tasks[0].id)
|
||||||
|
self.assertIn("Task timed out", task_ex.state_info)
|
||||||
|
|
||||||
def test_task_defaults_wait_policies(self):
|
def test_task_defaults_wait_policies(self):
|
||||||
wf_text = """---
|
wf_text = """---
|
||||||
version: '2.0'
|
version: '2.0'
|
||||||
|
Loading…
Reference in New Issue
Block a user