Allow to filter event notifications by their type

Change-Id: I396bbe8d2933121eb9d65b4804d72b3a5811cc2a
Implements: blueprint mistral-filter-event-notifications
Signed-off-by: Oleg Ovcharuk <vgvoleg@gmail.com>
This commit is contained in:
Oleg Ovcharuk 2019-07-12 03:00:05 +03:00
parent 1a2ff63733
commit 12c84dc936
3 changed files with 81 additions and 3 deletions

View File

@ -76,6 +76,19 @@ class Task(object):
notifier = notif.get_notifier(cfg.CONF.notifier.type)
event = events.identify_task_event(old_task_state, new_task_state)
filtered_publishers = []
for publisher in publishers:
if not isinstance(publisher, dict):
continue
target_events = publisher.get('event_types', [])
if not target_events or event in target_events:
filtered_publishers.append(publisher)
if not filtered_publishers:
return
def _convert_to_notification_data():
return {
"id": self.task_ex.id,
@ -96,7 +109,7 @@ class Task(object):
_convert_to_notification_data(),
event,
self.task_ex.updated_at,
publishers
filtered_publishers
)
post_tx_queue.register_operation(_send_notification)

View File

@ -74,6 +74,19 @@ class Workflow(object):
notifier = notif.get_notifier(cfg.CONF.notifier.type)
filtered_publishers = []
for publisher in publishers:
if not isinstance(publisher, dict):
continue
target_events = publisher.get('event_types', [])
if not target_events or event in target_events:
filtered_publishers.append(publisher)
if not filtered_publishers:
return
def _convert_to_notification_data():
return {
"id": self.wf_ex.id,
@ -92,7 +105,7 @@ class Workflow(object):
_convert_to_notification_data(),
event,
self.wf_ex.updated_at,
publishers
filtered_publishers
)
post_tx_queue.register_operation(_send_notification)

View File

@ -81,7 +81,7 @@ class NotifyEventsTest(base.NotifierTestCase):
notify_options = [
{
'type': 'webhook',
'events': events.EVENTS
'event_types': events.EVENTS
}
]
@ -216,6 +216,58 @@ class NotifyEventsTest(base.NotifierTestCase):
self.assertTrue(self.publishers['wbhk'].publish.called)
self.assertListEqual(expected_order, EVENT_LOGS)
def test_notify_with_event_filter(self):
wf_def = """
version: '2.0'
wf:
tasks:
t1:
action: std.noop
on-success:
- t2
t2:
action: std.noop
"""
wf_svc.create_workflows(wf_def)
notify_options = [
{
'type': 'webhook',
'event_types': [
events.WORKFLOW_LAUNCHED,
events.WORKFLOW_SUCCEEDED
]
}
]
params = {'notify': notify_options}
wf_ex = self.engine.start_workflow('wf', '', **params)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(2, len(task_exs))
t1_ex = self._assert_single_item(task_exs, name='t1')
t2_ex = self._assert_single_item(task_exs, name='t2')
self.assertEqual(states.SUCCESS, t1_ex.state)
self.assertIsNone(t1_ex.state_info)
self.assertEqual(states.SUCCESS, t2_ex.state)
self.assertIsNone(t2_ex.state_info)
self.assertTrue(self.publishers['wbhk'].publish.called)
self.assertEqual(2, len(EVENT_LOGS))
self.assertIn((wf_ex.id, events.WORKFLOW_LAUNCHED), EVENT_LOGS)
self.assertIn((wf_ex.id, events.WORKFLOW_SUCCEEDED), EVENT_LOGS)
def test_notify_multiple(self):
self.assertFalse(self.publishers['wbhk'].publish.called)
self.assertFalse(self.publishers['noop'].publish.called)