Refactor task notifications

* All calls to a notifier within the Task class have now been
  moved into the method set_state() so that the relation between
  a state change and a notification is now straightforward and the
  notification calls don't have to be spread out across different
  modules.

Change-Id: I9c0647235e1439049d3e7db13f19bef542f10508
This commit is contained in:
Renat Akhmerov
2020-06-03 15:37:56 +07:00
parent a620dabb78
commit b55dbdea68
6 changed files with 167 additions and 122 deletions

View File

@@ -85,12 +85,8 @@ def run_task(wf_cmd):
def mark_task_running(task_ex, wf_spec):
task = build_task_from_execution(wf_spec, task_ex)
old_task_state = task_ex.state
task.set_state(states.RUNNING, None, False)
task.notify(old_task_state, states.RUNNING)
@profiler.trace('task-handler-on-action-complete', hide_args=True)
def _on_action_complete(action_ex):
@@ -211,12 +207,8 @@ def force_fail_task(task_ex, msg, task=None):
task = build_task_from_execution(wf_spec, task_ex)
old_task_state = task_ex.state
task.set_state(states.ERROR, msg)
task.notify(old_task_state, states.ERROR)
wf_handler.force_fail_workflow(task_ex.workflow_execution, msg)

View File

@@ -68,16 +68,17 @@ class Task(object):
self.created = False
self.state_changed = False
def notify(self, old_task_state, new_task_state):
def _notify(self, from_state, to_state):
publishers = self.wf_ex.params.get('notify')
if not publishers and not isinstance(publishers, list):
return
notifier = notif.get_notifier(cfg.CONF.notifier.type)
event = events.identify_task_event(old_task_state, new_task_state)
event = events.identify_task_event(from_state, to_state)
filtered_publishers = []
for publisher in publishers:
if not isinstance(publisher, dict):
continue
@@ -303,20 +304,6 @@ class Task(object):
cur_state = self.task_ex.state
if cur_state != state or self.task_ex.state_info != state_info:
# Recalculating "started_at" timestamp only if the state
# was WAITING (all preconditions are satisfied and it's
# ready to start) or the task is being rerun. So we treat
# all iterations of "retry" policy as one run.
if state == states.RUNNING and \
(cur_state == states.WAITING or self.rerun):
self.task_ex.started_at = utils.utc_now_sec()
if states.is_completed(state):
self.task_ex.finished_at = utils.utc_now_sec()
if self.rerun:
self.task_ex.finished_at = None
task_ex = db_api.update_task_execution_state(
id=self.task_ex.id,
cur_state=cur_state,
@@ -332,9 +319,25 @@ class Task(object):
if isinstance(state_info, dict) else state_info
self.state_changed = True
# Recalculating "started_at" timestamp only if the state
# was WAITING (all preconditions are satisfied and it's
# ready to start) or the task is being rerun. So we treat
# all iterations of "retry" policy as one run.
if state == states.RUNNING and \
(cur_state == states.WAITING or self.rerun):
self.task_ex.started_at = utils.utc_now_sec()
if states.is_completed(state):
self.task_ex.finished_at = utils.utc_now_sec()
if self.rerun:
self.task_ex.finished_at = None
if processed is not None:
self.task_ex.processed = processed
self._notify(cur_state, state)
wf_trace.info(
self.task_ex.workflow_execution,
"Task '%s' (%s) [%s -> %s, msg=%s]" %
@@ -361,15 +364,8 @@ class Task(object):
assert self.task_ex
# Record the current task state.
old_task_state = self.task_ex.state
# Ignore if task already completed.
if self.is_completed():
# Publish task event again so subscribers know
# task completed state is being processed again.
self.notify(old_task_state, self.task_ex.state)
return
# If we were unable to change the task state it means that it was
@@ -419,9 +415,6 @@ class Task(object):
# If workflow is paused we shouldn't schedule new commands
# and mark task as processed.
if states.is_paused(self.wf_ex.state):
# Publish task event even if the workflow is paused.
self.notify(old_task_state, self.task_ex.state)
return
# Mark task as processed after all decisions have been made
@@ -430,9 +423,6 @@ class Task(object):
self.register_workflow_completion_check()
# Publish task event.
self.notify(old_task_state, self.task_ex.state)
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
def register_workflow_completion_check(self):
@@ -459,15 +449,8 @@ class Task(object):
assert self.task_ex
# Record the current task state.
old_task_state = self.task_ex.state
# Ignore if task already completed.
if states.is_completed(self.task_ex.state):
# Publish task event again so subscribers know
# task completed state is being processed again.
self.notify(old_task_state, self.task_ex.state)
return
# Update only if state transition is valid.
@@ -486,9 +469,6 @@ class Task(object):
if states.is_completed(self.task_ex.state):
self.register_workflow_completion_check()
# Publish event.
self.notify(old_task_state, self.task_ex.state)
def _before_task_start(self):
policies_spec = self.task_spec.get_policies()
@@ -601,8 +581,8 @@ class RegularTask(Task):
self._create_task_execution()
# Publish event.
self.notify(None, self.task_ex.state)
# Notify about the initial state change.
self._notify(None, self.task_ex.state)
LOG.debug(
'Starting task [name=%s, init_state=%s, workflow_name=%s,'
@@ -633,14 +613,8 @@ class RegularTask(Task):
'Rerunning succeeded tasks is not supported.'
)
# Record the current task state.
old_task_state = self.task_ex.state
self.set_state(states.RUNNING, None, processed=False)
# Publish event.
self.notify(old_task_state, self.task_ex.state)
if self.rerun:
self._before_task_start()

View File

@@ -301,6 +301,9 @@ class Workflow(object):
parent_wf._recursive_rerun()
# TODO(rakhmerov): this is a design issue again as in many places.
# Ideally, we should just build (or get) an instance of Task and
# call set_state() on it.
from mistral.engine import task_handler
task_handler.mark_task_running(parent_task_ex, parent_wf.wf_spec)

View File

@@ -21,27 +21,32 @@ LOG = logging.getLogger(__name__)
class NotifierTestCase(engine_test_base.EngineTestCase):
# TODO(rakhmerov): All these method have a different signature comparing to
# their base versions. Must be fixed.
def await_workflow_success(self, wf_ex_id, post_delay=1):
# Override the original wait method to add a delay to allow enough
# time for the notification events to get processed.
super(NotifierTestCase, self).await_workflow_success(wf_ex_id)
self._sleep(post_delay)
def await_workflow_error(self, wf_ex_id, post_delay=1):
# Override the original wait method to add a delay to allow enough
# time for the notification events to get processed.
super(NotifierTestCase, self).await_workflow_error(wf_ex_id)
self._sleep(post_delay)
def await_workflow_paused(self, wf_ex_id, post_delay=1):
# Override the original wait method to add a delay to allow enough
# time for the notification events to get processed.
super(NotifierTestCase, self).await_workflow_paused(wf_ex_id)
self._sleep(post_delay)
def await_workflow_cancelled(self, wf_ex_id, post_delay=1):
# Override the original wait method to add a delay to allow enough
# time for the notification events to get processed.
super(NotifierTestCase, self).await_workflow_cancelled(wf_ex_id)
self._sleep(post_delay)

View File

@@ -46,11 +46,11 @@ def notifier_process(ex_id, data, event, timestamp, publishers):
class ServerPluginTest(base.NotifierTestCase):
def tearDown(self):
notif.cleanup()
super(ServerPluginTest, self).tearDown()
notif.cleanup()
def test_get_bad_notifier(self):
self.assertRaises(sd_exc.NoMatches, notif.get_notifier, 'foobar')
@@ -61,28 +61,32 @@ class ServerPluginTest(base.NotifierTestCase):
mock.MagicMock(return_value=None)
)
class LocalNotifServerTest(base.NotifierTestCase):
@classmethod
def setUpClass(cls):
super(LocalNotifServerTest, cls).setUpClass()
cfg.CONF.set_default('type', 'local', group='notifier')
@classmethod
def tearDownClass(cls):
cfg.CONF.set_default('type', 'remote', group='notifier')
super(LocalNotifServerTest, cls).tearDownClass()
def setUp(self):
super(LocalNotifServerTest, self).setUp()
self.publisher = notif.get_notification_publisher('webhook')
self.publisher.publish = mock.MagicMock(side_effect=publisher_process)
self.publisher.publish.reset_mock()
del EVENT_LOGS[:]
def tearDown(self):
notif.cleanup()
super(LocalNotifServerTest, self).tearDown()
notif.cleanup()
def test_get_notifier(self):
notifier = notif.get_notifier(cfg.CONF.notifier.type)
@@ -150,20 +154,22 @@ class LocalNotifServerTest(base.NotifierTestCase):
mock.MagicMock(side_effect=notifier_process)
)
class RemoteNotifServerTest(base.NotifierTestCase):
@classmethod
def setUpClass(cls):
super(RemoteNotifServerTest, cls).setUpClass()
cfg.CONF.set_default('type', 'remote', group='notifier')
def setUp(self):
super(RemoteNotifServerTest, self).setUp()
del EVENT_LOGS[:]
def tearDown(self):
notif.cleanup()
super(RemoteNotifServerTest, self).tearDown()
notif.cleanup()
def test_get_notifier(self):
notifier = notif.get_notifier(cfg.CONF.notifier.type)

View File

@@ -65,8 +65,9 @@ class NotifyEventsTest(base.NotifierTestCase):
cfg.CONF.set_default('notify', None, group='notifier')
def test_notify_all_explicit(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -77,7 +78,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [
{
@@ -119,8 +120,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertIn((wf_ex.id, events.WORKFLOW_SUCCEEDED), EVENT_LOGS)
def test_notify_all_implicit(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -131,7 +133,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -166,8 +168,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertIn((wf_ex.id, events.WORKFLOW_SUCCEEDED), EVENT_LOGS)
def test_notify_order(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -178,11 +181,9 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [
{'type': 'webhook'}
]
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -219,8 +220,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_with_event_filter(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -231,7 +233,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [
{
@@ -274,8 +276,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertFalse(self.publishers['wbhk'].publish.called)
self.assertFalse(self.publishers['noop'].publish.called)
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -286,7 +289,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [
{'type': 'webhook'},
@@ -338,8 +341,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertFalse(self.publishers['wbhk'].publish.called)
self.assertFalse(self.publishers['noop'].publish.called)
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -350,7 +354,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [
{'type': 'webhook'},
@@ -406,8 +410,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertFalse(self.publishers['wbhk'].publish.called)
self.assertFalse(self.publishers['noop'].publish.called)
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -418,7 +423,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
cfg.CONF.set_default(
'notify',
@@ -468,9 +473,11 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_workbook_notify(self):
wb_def = """
wb_text = """
version: '2.0'
name: wb
workflows:
wf1:
tasks:
@@ -480,13 +487,14 @@ class NotifyEventsTest(base.NotifierTestCase):
- t2
t2:
action: std.noop
wf2:
tasks:
t1:
action: std.noop
"""
wb_svc.create_workbook_v2(wb_def)
wb_svc.create_workbook_v2(wb_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -546,8 +554,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_task_error(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -558,7 +567,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.fail
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -596,8 +605,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_task_transition_fail(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -606,7 +616,7 @@ class NotifyEventsTest(base.NotifierTestCase):
- fail
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -639,8 +649,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_with_items_task(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -652,7 +663,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -691,8 +702,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_pause_resume(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -703,7 +715,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -727,6 +739,7 @@ class NotifyEventsTest(base.NotifierTestCase):
# Pause the workflow.
self.engine.pause_workflow(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
@@ -734,6 +747,7 @@ class NotifyEventsTest(base.NotifierTestCase):
task_exs = wf_ex.task_executions
t1_ex = self._assert_single_item(task_exs, name='t1')
t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id)
# Workflow is paused but the task is still running as expected.
@@ -818,8 +832,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_pause_resume_task(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -830,7 +845,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -854,6 +869,7 @@ class NotifyEventsTest(base.NotifierTestCase):
# Pause the action execution of task 1.
self.engine.on_action_update(t1_act_exs[0].id, states.PAUSED)
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
@@ -940,8 +956,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_cancel(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -952,7 +969,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -1033,8 +1050,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_cancel_task(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -1045,7 +1063,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -1095,8 +1113,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_task_input_error(self):
wf_def = """---
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
@@ -1108,7 +1127,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -1139,8 +1158,9 @@ class NotifyEventsTest(base.NotifierTestCase):
@mock.patch('mistral.actions.std_actions.NoOpAction.run', mock.MagicMock(
side_effect=[Exception(), None, None]))
def test_notify_rerun_task(self):
wf_def = """
wf_text = """
version: '2.0'
wf:
tasks:
t1:
@@ -1151,7 +1171,7 @@ class NotifyEventsTest(base.NotifierTestCase):
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -1169,7 +1189,9 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertEqual(states.ERROR, t1_ex.state)
self.assertEqual(1, len(task_exs))
# Rerun the workflow.
self.engine.rerun_workflow(t1_ex.id)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
@@ -1203,27 +1225,31 @@ class NotifyEventsTest(base.NotifierTestCase):
@mock.patch('mistral.actions.std_actions.NoOpAction.run', mock.MagicMock(
side_effect=[Exception(), None, None, None]))
def test_notify_rerun_nested_workflow(self):
wf_def = """
wf_text = """
version: '2.0'
wf_1:
tasks:
wf_1_t1:
workflow: wf_2
on-success:
- wf_1_t2
wf_1_t2:
action: std.noop
version: '2.0'
wf_2:
tasks:
wf_2_t1:
action: std.noop
on-success:
- wf_2_t2
wf_2_t2:
action: std.noop
"""
wf_svc.create_workflows(wf_def)
wf_svc.create_workflows(wf_text)
notify_options = [{'type': 'webhook'}]
params = {'notify': notify_options}
@@ -1234,41 +1260,79 @@ class NotifyEventsTest(base.NotifierTestCase):
with db_api.transaction():
wf_exs = db_api.get_workflow_executions()
self._assert_single_item(wf_exs, name='wf_1',
state=states.ERROR)
self._assert_single_item(wf_exs, name='wf_2',
state=states.ERROR)
self._assert_single_item(
wf_exs,
name='wf_1',
state=states.ERROR
)
self._assert_single_item(
wf_exs,
name='wf_2',
state=states.ERROR
)
task_exs = db_api.get_task_executions()
self._assert_single_item(task_exs, name='wf_1_t1',
state=states.ERROR)
wf_2_t1 = self._assert_single_item(task_exs, name='wf_2_t1',
state=states.ERROR)
self._assert_single_item(
task_exs,
name='wf_1_t1',
state=states.ERROR
)
wf_2_t1 = self._assert_single_item(
task_exs,
name='wf_2_t1',
state=states.ERROR
)
self.assertEqual(2, len(task_exs))
self.assertEqual(2, len(wf_exs))
# Rerun the nested workflow.
self.engine.rerun_workflow(wf_2_t1.id)
self.await_workflow_success(wf_1_ex.id)
with db_api.transaction():
wf_exs = db_api.get_workflow_executions()
wf_1_ex = self._assert_single_item(wf_exs, name='wf_1',
state=states.SUCCESS)
wf_2_ex = self._assert_single_item(wf_exs, name='wf_2',
state=states.SUCCESS)
wf_1_ex = self._assert_single_item(
wf_exs,
name='wf_1',
state=states.SUCCESS
)
wf_2_ex = self._assert_single_item(
wf_exs,
name='wf_2',
state=states.SUCCESS
)
task_wf_1_exs = wf_1_ex.task_executions
wf_1_t1 = self._assert_single_item(task_wf_1_exs, name='wf_1_t1',
state=states.SUCCESS)
wf_1_t2 = self._assert_single_item(task_wf_1_exs, name='wf_1_t2',
state=states.SUCCESS)
wf_1_t1 = self._assert_single_item(
task_wf_1_exs,
name='wf_1_t1',
state=states.SUCCESS
)
wf_1_t2 = self._assert_single_item(
task_wf_1_exs,
name='wf_1_t2',
state=states.SUCCESS
)
task_wf_2_exs = wf_2_ex.task_executions
wf_2_t1 = self._assert_single_item(task_wf_2_exs, name='wf_2_t1',
state=states.SUCCESS)
wf_2_t2 = self._assert_single_item(task_wf_2_exs, name='wf_2_t2',
state=states.SUCCESS)
wf_2_t1 = self._assert_single_item(
task_wf_2_exs,
name='wf_2_t1',
state=states.SUCCESS
)
wf_2_t2 = self._assert_single_item(
task_wf_2_exs,
name='wf_2_t2',
state=states.SUCCESS
)
self.assertEqual(2, len(task_wf_1_exs))
self.assertEqual(2, len(task_wf_2_exs))
@@ -1297,5 +1361,6 @@ class NotifyEventsTest(base.NotifierTestCase):
(wf_1_t2.id, events.TASK_SUCCEEDED),
(wf_1_ex.id, events.WORKFLOW_SUCCEEDED),
]
self.assertTrue(self.publishers['wbhk'].publish.called)
self.assertListEqual(expected_order, EVENT_LOGS)