From b55dbdea68561d65ed3ef52d3ccb7bdbc1a283f9 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Wed, 3 Jun 2020 15:37:56 +0700 Subject: [PATCH] Refactor task notifications * All calls to a notifier within the Task class have now been moved into the method set_state() so that the relation between a state change and a notification is now straightforward and the notification calls don't have to be spread out across different modules. Change-Id: I9c0647235e1439049d3e7db13f19bef542f10508 --- mistral/engine/task_handler.py | 8 - mistral/engine/tasks.py | 68 ++----- mistral/engine/workflows.py | 3 + mistral/tests/unit/notifiers/base.py | 7 +- .../unit/notifiers/test_notifier_servers.py | 18 +- mistral/tests/unit/notifiers/test_notify.py | 185 ++++++++++++------ 6 files changed, 167 insertions(+), 122 deletions(-) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 2a7b9c351..778656d37 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -85,12 +85,8 @@ def run_task(wf_cmd): def mark_task_running(task_ex, wf_spec): task = build_task_from_execution(wf_spec, task_ex) - old_task_state = task_ex.state - task.set_state(states.RUNNING, None, False) - task.notify(old_task_state, states.RUNNING) - @profiler.trace('task-handler-on-action-complete', hide_args=True) def _on_action_complete(action_ex): @@ -211,12 +207,8 @@ def force_fail_task(task_ex, msg, task=None): task = build_task_from_execution(wf_spec, task_ex) - old_task_state = task_ex.state - task.set_state(states.ERROR, msg) - task.notify(old_task_state, states.ERROR) - wf_handler.force_fail_workflow(task_ex.workflow_execution, msg) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index ea85d3915..e8bc6e24c 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -68,16 +68,17 @@ class Task(object): self.created = False self.state_changed = False - def notify(self, old_task_state, new_task_state): + def _notify(self, from_state, to_state): publishers = self.wf_ex.params.get('notify') if not publishers and not isinstance(publishers, list): return notifier = notif.get_notifier(cfg.CONF.notifier.type) - event = events.identify_task_event(old_task_state, new_task_state) + event = events.identify_task_event(from_state, to_state) filtered_publishers = [] + for publisher in publishers: if not isinstance(publisher, dict): continue @@ -303,20 +304,6 @@ class Task(object): cur_state = self.task_ex.state if cur_state != state or self.task_ex.state_info != state_info: - # Recalculating "started_at" timestamp only if the state - # was WAITING (all preconditions are satisfied and it's - # ready to start) or the task is being rerun. So we treat - # all iterations of "retry" policy as one run. - if state == states.RUNNING and \ - (cur_state == states.WAITING or self.rerun): - self.task_ex.started_at = utils.utc_now_sec() - - if states.is_completed(state): - self.task_ex.finished_at = utils.utc_now_sec() - - if self.rerun: - self.task_ex.finished_at = None - task_ex = db_api.update_task_execution_state( id=self.task_ex.id, cur_state=cur_state, @@ -332,9 +319,25 @@ class Task(object): if isinstance(state_info, dict) else state_info self.state_changed = True + # Recalculating "started_at" timestamp only if the state + # was WAITING (all preconditions are satisfied and it's + # ready to start) or the task is being rerun. So we treat + # all iterations of "retry" policy as one run. + if state == states.RUNNING and \ + (cur_state == states.WAITING or self.rerun): + self.task_ex.started_at = utils.utc_now_sec() + + if states.is_completed(state): + self.task_ex.finished_at = utils.utc_now_sec() + + if self.rerun: + self.task_ex.finished_at = None + if processed is not None: self.task_ex.processed = processed + self._notify(cur_state, state) + wf_trace.info( self.task_ex.workflow_execution, "Task '%s' (%s) [%s -> %s, msg=%s]" % @@ -361,15 +364,8 @@ class Task(object): assert self.task_ex - # Record the current task state. - old_task_state = self.task_ex.state - # Ignore if task already completed. if self.is_completed(): - # Publish task event again so subscribers know - # task completed state is being processed again. - self.notify(old_task_state, self.task_ex.state) - return # If we were unable to change the task state it means that it was @@ -419,9 +415,6 @@ class Task(object): # If workflow is paused we shouldn't schedule new commands # and mark task as processed. if states.is_paused(self.wf_ex.state): - # Publish task event even if the workflow is paused. - self.notify(old_task_state, self.task_ex.state) - return # Mark task as processed after all decisions have been made @@ -430,9 +423,6 @@ class Task(object): self.register_workflow_completion_check() - # Publish task event. - self.notify(old_task_state, self.task_ex.state) - dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) def register_workflow_completion_check(self): @@ -459,15 +449,8 @@ class Task(object): assert self.task_ex - # Record the current task state. - old_task_state = self.task_ex.state - # Ignore if task already completed. if states.is_completed(self.task_ex.state): - # Publish task event again so subscribers know - # task completed state is being processed again. - self.notify(old_task_state, self.task_ex.state) - return # Update only if state transition is valid. @@ -486,9 +469,6 @@ class Task(object): if states.is_completed(self.task_ex.state): self.register_workflow_completion_check() - # Publish event. - self.notify(old_task_state, self.task_ex.state) - def _before_task_start(self): policies_spec = self.task_spec.get_policies() @@ -601,8 +581,8 @@ class RegularTask(Task): self._create_task_execution() - # Publish event. - self.notify(None, self.task_ex.state) + # Notify about the initial state change. + self._notify(None, self.task_ex.state) LOG.debug( 'Starting task [name=%s, init_state=%s, workflow_name=%s,' @@ -633,14 +613,8 @@ class RegularTask(Task): 'Rerunning succeeded tasks is not supported.' ) - # Record the current task state. - old_task_state = self.task_ex.state - self.set_state(states.RUNNING, None, processed=False) - # Publish event. - self.notify(old_task_state, self.task_ex.state) - if self.rerun: self._before_task_start() diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index c1f0cd2cd..c3c97f6f4 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -301,6 +301,9 @@ class Workflow(object): parent_wf._recursive_rerun() + # TODO(rakhmerov): this is a design issue again as in many places. + # Ideally, we should just build (or get) an instance of Task and + # call set_state() on it. from mistral.engine import task_handler task_handler.mark_task_running(parent_task_ex, parent_wf.wf_spec) diff --git a/mistral/tests/unit/notifiers/base.py b/mistral/tests/unit/notifiers/base.py index 46db73c40..6a7d9575a 100644 --- a/mistral/tests/unit/notifiers/base.py +++ b/mistral/tests/unit/notifiers/base.py @@ -21,27 +21,32 @@ LOG = logging.getLogger(__name__) class NotifierTestCase(engine_test_base.EngineTestCase): - + # TODO(rakhmerov): All these method have a different signature comparing to + # their base versions. Must be fixed. def await_workflow_success(self, wf_ex_id, post_delay=1): # Override the original wait method to add a delay to allow enough # time for the notification events to get processed. super(NotifierTestCase, self).await_workflow_success(wf_ex_id) + self._sleep(post_delay) def await_workflow_error(self, wf_ex_id, post_delay=1): # Override the original wait method to add a delay to allow enough # time for the notification events to get processed. super(NotifierTestCase, self).await_workflow_error(wf_ex_id) + self._sleep(post_delay) def await_workflow_paused(self, wf_ex_id, post_delay=1): # Override the original wait method to add a delay to allow enough # time for the notification events to get processed. super(NotifierTestCase, self).await_workflow_paused(wf_ex_id) + self._sleep(post_delay) def await_workflow_cancelled(self, wf_ex_id, post_delay=1): # Override the original wait method to add a delay to allow enough # time for the notification events to get processed. super(NotifierTestCase, self).await_workflow_cancelled(wf_ex_id) + self._sleep(post_delay) diff --git a/mistral/tests/unit/notifiers/test_notifier_servers.py b/mistral/tests/unit/notifiers/test_notifier_servers.py index bf9d68ebd..b37803377 100644 --- a/mistral/tests/unit/notifiers/test_notifier_servers.py +++ b/mistral/tests/unit/notifiers/test_notifier_servers.py @@ -46,11 +46,11 @@ def notifier_process(ex_id, data, event, timestamp, publishers): class ServerPluginTest(base.NotifierTestCase): - def tearDown(self): - notif.cleanup() super(ServerPluginTest, self).tearDown() + notif.cleanup() + def test_get_bad_notifier(self): self.assertRaises(sd_exc.NoMatches, notif.get_notifier, 'foobar') @@ -61,28 +61,32 @@ class ServerPluginTest(base.NotifierTestCase): mock.MagicMock(return_value=None) ) class LocalNotifServerTest(base.NotifierTestCase): - @classmethod def setUpClass(cls): super(LocalNotifServerTest, cls).setUpClass() + cfg.CONF.set_default('type', 'local', group='notifier') @classmethod def tearDownClass(cls): cfg.CONF.set_default('type', 'remote', group='notifier') + super(LocalNotifServerTest, cls).tearDownClass() def setUp(self): super(LocalNotifServerTest, self).setUp() + self.publisher = notif.get_notification_publisher('webhook') self.publisher.publish = mock.MagicMock(side_effect=publisher_process) self.publisher.publish.reset_mock() + del EVENT_LOGS[:] def tearDown(self): - notif.cleanup() super(LocalNotifServerTest, self).tearDown() + notif.cleanup() + def test_get_notifier(self): notifier = notif.get_notifier(cfg.CONF.notifier.type) @@ -150,20 +154,22 @@ class LocalNotifServerTest(base.NotifierTestCase): mock.MagicMock(side_effect=notifier_process) ) class RemoteNotifServerTest(base.NotifierTestCase): - @classmethod def setUpClass(cls): super(RemoteNotifServerTest, cls).setUpClass() + cfg.CONF.set_default('type', 'remote', group='notifier') def setUp(self): super(RemoteNotifServerTest, self).setUp() + del EVENT_LOGS[:] def tearDown(self): - notif.cleanup() super(RemoteNotifServerTest, self).tearDown() + notif.cleanup() + def test_get_notifier(self): notifier = notif.get_notifier(cfg.CONF.notifier.type) diff --git a/mistral/tests/unit/notifiers/test_notify.py b/mistral/tests/unit/notifiers/test_notify.py index 0040ae7c6..b255448c9 100644 --- a/mistral/tests/unit/notifiers/test_notify.py +++ b/mistral/tests/unit/notifiers/test_notify.py @@ -65,8 +65,9 @@ class NotifyEventsTest(base.NotifierTestCase): cfg.CONF.set_default('notify', None, group='notifier') def test_notify_all_explicit(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -77,7 +78,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [ { @@ -119,8 +120,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertIn((wf_ex.id, events.WORKFLOW_SUCCEEDED), EVENT_LOGS) def test_notify_all_implicit(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -131,7 +133,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -166,8 +168,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertIn((wf_ex.id, events.WORKFLOW_SUCCEEDED), EVENT_LOGS) def test_notify_order(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -178,11 +181,9 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) - notify_options = [ - {'type': 'webhook'} - ] + notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -219,8 +220,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_notify_with_event_filter(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -231,7 +233,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [ { @@ -274,8 +276,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertFalse(self.publishers['wbhk'].publish.called) self.assertFalse(self.publishers['noop'].publish.called) - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -286,7 +289,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [ {'type': 'webhook'}, @@ -338,8 +341,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertFalse(self.publishers['wbhk'].publish.called) self.assertFalse(self.publishers['noop'].publish.called) - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -350,7 +354,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [ {'type': 'webhook'}, @@ -406,8 +410,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertFalse(self.publishers['wbhk'].publish.called) self.assertFalse(self.publishers['noop'].publish.called) - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -418,7 +423,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) cfg.CONF.set_default( 'notify', @@ -468,9 +473,11 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_workbook_notify(self): - wb_def = """ + wb_text = """ version: '2.0' + name: wb + workflows: wf1: tasks: @@ -480,13 +487,14 @@ class NotifyEventsTest(base.NotifierTestCase): - t2 t2: action: std.noop + wf2: tasks: t1: action: std.noop """ - wb_svc.create_workbook_v2(wb_def) + wb_svc.create_workbook_v2(wb_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -546,8 +554,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_notify_task_error(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -558,7 +567,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.fail """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -596,8 +605,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_notify_task_transition_fail(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -606,7 +616,7 @@ class NotifyEventsTest(base.NotifierTestCase): - fail """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -639,8 +649,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_notify_with_items_task(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -652,7 +663,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -691,8 +702,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_notify_pause_resume(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -703,7 +715,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -727,6 +739,7 @@ class NotifyEventsTest(base.NotifierTestCase): # Pause the workflow. self.engine.pause_workflow(wf_ex.id) + self.await_workflow_paused(wf_ex.id) with db_api.transaction(): @@ -734,6 +747,7 @@ class NotifyEventsTest(base.NotifierTestCase): task_exs = wf_ex.task_executions t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) # Workflow is paused but the task is still running as expected. @@ -818,8 +832,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_notify_pause_resume_task(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -830,7 +845,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -854,6 +869,7 @@ class NotifyEventsTest(base.NotifierTestCase): # Pause the action execution of task 1. self.engine.on_action_update(t1_act_exs[0].id, states.PAUSED) + self.await_workflow_paused(wf_ex.id) with db_api.transaction(): @@ -940,8 +956,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_notify_cancel(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -952,7 +969,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -1033,8 +1050,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_notify_cancel_task(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -1045,7 +1063,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -1095,8 +1113,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertListEqual(expected_order, EVENT_LOGS) def test_notify_task_input_error(self): - wf_def = """--- + wf_text = """--- version: '2.0' + wf: tasks: task1: @@ -1108,7 +1127,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -1139,8 +1158,9 @@ class NotifyEventsTest(base.NotifierTestCase): @mock.patch('mistral.actions.std_actions.NoOpAction.run', mock.MagicMock( side_effect=[Exception(), None, None])) def test_notify_rerun_task(self): - wf_def = """ + wf_text = """ version: '2.0' + wf: tasks: t1: @@ -1151,7 +1171,7 @@ class NotifyEventsTest(base.NotifierTestCase): action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -1169,7 +1189,9 @@ class NotifyEventsTest(base.NotifierTestCase): self.assertEqual(states.ERROR, t1_ex.state) self.assertEqual(1, len(task_exs)) + # Rerun the workflow. self.engine.rerun_workflow(t1_ex.id) + self.await_workflow_success(wf_ex.id) with db_api.transaction(): @@ -1203,27 +1225,31 @@ class NotifyEventsTest(base.NotifierTestCase): @mock.patch('mistral.actions.std_actions.NoOpAction.run', mock.MagicMock( side_effect=[Exception(), None, None, None])) def test_notify_rerun_nested_workflow(self): - wf_def = """ + wf_text = """ + version: '2.0' + wf_1: tasks: wf_1_t1: workflow: wf_2 on-success: - wf_1_t2 + wf_1_t2: action: std.noop - version: '2.0' + wf_2: tasks: wf_2_t1: action: std.noop on-success: - wf_2_t2 + wf_2_t2: action: std.noop """ - wf_svc.create_workflows(wf_def) + wf_svc.create_workflows(wf_text) notify_options = [{'type': 'webhook'}] params = {'notify': notify_options} @@ -1234,41 +1260,79 @@ class NotifyEventsTest(base.NotifierTestCase): with db_api.transaction(): wf_exs = db_api.get_workflow_executions() - self._assert_single_item(wf_exs, name='wf_1', - state=states.ERROR) - self._assert_single_item(wf_exs, name='wf_2', - state=states.ERROR) + + self._assert_single_item( + wf_exs, + name='wf_1', + state=states.ERROR + ) + self._assert_single_item( + wf_exs, + name='wf_2', + state=states.ERROR + ) task_exs = db_api.get_task_executions() - self._assert_single_item(task_exs, name='wf_1_t1', - state=states.ERROR) - wf_2_t1 = self._assert_single_item(task_exs, name='wf_2_t1', - state=states.ERROR) + + self._assert_single_item( + task_exs, + name='wf_1_t1', + state=states.ERROR + ) + + wf_2_t1 = self._assert_single_item( + task_exs, + name='wf_2_t1', + state=states.ERROR + ) self.assertEqual(2, len(task_exs)) self.assertEqual(2, len(wf_exs)) + # Rerun the nested workflow. self.engine.rerun_workflow(wf_2_t1.id) self.await_workflow_success(wf_1_ex.id) with db_api.transaction(): wf_exs = db_api.get_workflow_executions() - wf_1_ex = self._assert_single_item(wf_exs, name='wf_1', - state=states.SUCCESS) - wf_2_ex = self._assert_single_item(wf_exs, name='wf_2', - state=states.SUCCESS) + + wf_1_ex = self._assert_single_item( + wf_exs, + name='wf_1', + state=states.SUCCESS + ) + wf_2_ex = self._assert_single_item( + wf_exs, + name='wf_2', + state=states.SUCCESS + ) task_wf_1_exs = wf_1_ex.task_executions - wf_1_t1 = self._assert_single_item(task_wf_1_exs, name='wf_1_t1', - state=states.SUCCESS) - wf_1_t2 = self._assert_single_item(task_wf_1_exs, name='wf_1_t2', - state=states.SUCCESS) + + wf_1_t1 = self._assert_single_item( + task_wf_1_exs, + name='wf_1_t1', + state=states.SUCCESS + ) + wf_1_t2 = self._assert_single_item( + task_wf_1_exs, + name='wf_1_t2', + state=states.SUCCESS + ) + task_wf_2_exs = wf_2_ex.task_executions - wf_2_t1 = self._assert_single_item(task_wf_2_exs, name='wf_2_t1', - state=states.SUCCESS) - wf_2_t2 = self._assert_single_item(task_wf_2_exs, name='wf_2_t2', - state=states.SUCCESS) + + wf_2_t1 = self._assert_single_item( + task_wf_2_exs, + name='wf_2_t1', + state=states.SUCCESS + ) + wf_2_t2 = self._assert_single_item( + task_wf_2_exs, + name='wf_2_t2', + state=states.SUCCESS + ) self.assertEqual(2, len(task_wf_1_exs)) self.assertEqual(2, len(task_wf_2_exs)) @@ -1297,5 +1361,6 @@ class NotifyEventsTest(base.NotifierTestCase): (wf_1_t2.id, events.TASK_SUCCEEDED), (wf_1_ex.id, events.WORKFLOW_SUCCEEDED), ] + self.assertTrue(self.publishers['wbhk'].publish.called) self.assertListEqual(expected_order, EVENT_LOGS)