Sending TASK_FAILED event in case of MistralException
If task was failed by unhandled exceptions, Mistral was not sending TASK_FAILED event, which could be critical for Mistral's clients, who use notifications mechanism to handle issues. Change-Id: I460686c2852e3eb642506049ad5c33705697ecd8 Closes-Bug: #1803746 Signed-off-by: Oleg Ovcharuk <vgvoleg@gmail.com>
This commit is contained in:
parent
81af1b4838
commit
87200f6aea
@ -70,12 +70,7 @@ def run_task(wf_cmd):
|
||||
(e, wf_ex.name, task_spec.get_name(), tb.format_exc())
|
||||
)
|
||||
|
||||
LOG.error(msg)
|
||||
|
||||
task.set_state(states.ERROR, msg)
|
||||
task.save_finished_time()
|
||||
|
||||
wf_handler.force_fail_workflow(wf_ex, msg)
|
||||
force_fail_task(task.task_ex, msg, task=task)
|
||||
|
||||
return
|
||||
|
||||
@ -123,12 +118,7 @@ def _on_action_complete(action_ex):
|
||||
" action=%s]:\n%s" %
|
||||
(e, wf_ex.name, task_ex.name, action_ex.name, tb.format_exc()))
|
||||
|
||||
LOG.error(msg)
|
||||
|
||||
task.set_state(states.ERROR, msg)
|
||||
task.save_finished_time()
|
||||
|
||||
wf_handler.force_fail_workflow(wf_ex, msg)
|
||||
force_fail_task(task_ex, msg, task=task)
|
||||
|
||||
return
|
||||
|
||||
@ -183,19 +173,14 @@ def _on_action_update(action_ex):
|
||||
" action=%s]:\n%s" %
|
||||
(e, wf_ex.name, task_ex.name, action_ex.name, tb.format_exc()))
|
||||
|
||||
LOG.error(msg)
|
||||
|
||||
task.set_state(states.ERROR, msg)
|
||||
task.save_finished_time()
|
||||
|
||||
wf_handler.force_fail_workflow(wf_ex, msg)
|
||||
force_fail_task(task_ex, msg, task=task)
|
||||
|
||||
return
|
||||
|
||||
_check_affected_tasks(task)
|
||||
|
||||
|
||||
def force_fail_task(task_ex, msg):
|
||||
def force_fail_task(task_ex, msg, task=None):
|
||||
"""Forces the given task to fail.
|
||||
|
||||
This method implements the 'forced' task fail without giving a chance
|
||||
@ -207,14 +192,22 @@ def force_fail_task(task_ex, msg):
|
||||
|
||||
:param task_ex: Task execution.
|
||||
:param msg: Error message.
|
||||
:param task: Task object. Optional.
|
||||
"""
|
||||
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
|
||||
task_ex.workflow_execution_id
|
||||
)
|
||||
|
||||
task = _build_task_from_execution(wf_spec, task_ex)
|
||||
LOG.error(msg)
|
||||
|
||||
if not task:
|
||||
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
|
||||
task_ex.workflow_execution_id
|
||||
)
|
||||
|
||||
task = _build_task_from_execution(wf_spec, task_ex)
|
||||
|
||||
old_task_state = task_ex.state
|
||||
task.set_state(states.ERROR, msg)
|
||||
task.notify(old_task_state, states.ERROR)
|
||||
|
||||
task.save_finished_time()
|
||||
|
||||
wf_handler.force_fail_workflow(task_ex.workflow_execution, msg)
|
||||
@ -239,12 +232,7 @@ def continue_task(task_ex):
|
||||
(e, wf_ex.name, task_ex.name, tb.format_exc())
|
||||
)
|
||||
|
||||
LOG.error(msg)
|
||||
|
||||
task.set_state(states.ERROR, msg)
|
||||
task.save_finished_time()
|
||||
|
||||
wf_handler.force_fail_workflow(wf_ex, msg)
|
||||
force_fail_task(task_ex, msg, task=task)
|
||||
|
||||
return
|
||||
|
||||
@ -268,12 +256,7 @@ def complete_task(task_ex, state, state_info):
|
||||
(e, wf_ex.name, task_ex.name, tb.format_exc())
|
||||
)
|
||||
|
||||
LOG.error(msg)
|
||||
|
||||
task.set_state(states.ERROR, msg)
|
||||
task.save_finished_time()
|
||||
|
||||
wf_handler.force_fail_workflow(wf_ex, msg)
|
||||
force_fail_task(task_ex, msg, task=task)
|
||||
|
||||
return
|
||||
|
||||
|
@ -1035,3 +1035,45 @@ class NotifyEventsTest(base.NotifierTestCase):
|
||||
|
||||
self.assertTrue(self.publishers['wbhk'].publish.called)
|
||||
self.assertListEqual(expected_order, EVENT_LOGS)
|
||||
|
||||
def test_notify_task_input_error(self):
|
||||
wf_def = """---
|
||||
version: '2.0'
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
input:
|
||||
url: <% $.ItWillBeError %>
|
||||
action: std.http
|
||||
on-error: task2
|
||||
task2:
|
||||
action: std.noop
|
||||
"""
|
||||
|
||||
wf_svc.create_workflows(wf_def)
|
||||
|
||||
notify_options = [{'type': 'webhook'}]
|
||||
params = {'notify': notify_options}
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf', '', **params)
|
||||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
task_exs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(1, len(task_exs))
|
||||
|
||||
t1_ex = self._assert_single_item(task_exs, name='task1')
|
||||
self.assertEqual(states.ERROR, t1_ex.state)
|
||||
|
||||
expected_order = [
|
||||
(wf_ex.id, events.WORKFLOW_LAUNCHED),
|
||||
(t1_ex.id, events.TASK_LAUNCHED),
|
||||
(t1_ex.id, events.TASK_FAILED),
|
||||
(wf_ex.id, events.WORKFLOW_FAILED)
|
||||
]
|
||||
|
||||
self.assertTrue(self.publishers['wbhk'].publish.called)
|
||||
self.assertListEqual(expected_order, EVENT_LOGS)
|
||||
|
Loading…
x
Reference in New Issue
Block a user