Merge "move listener targets to notification agent"
This commit is contained in:
commit
6a97e62c54
|
@ -175,6 +175,19 @@ class NotificationService(cotyledon.Service):
|
|||
|
||||
return event_pipe_manager
|
||||
|
||||
def get_targets(self):
|
||||
"""Return a sequence of oslo_messaging.Target
|
||||
|
||||
This sequence is defining the exchange and topics to be connected.
|
||||
"""
|
||||
topics = (self.conf.notification_topics
|
||||
if 'notification_topics' in self.conf
|
||||
else self.conf.oslo_messaging_notifications.topics)
|
||||
return [oslo_messaging.Target(topic=topic, exchange=exchange)
|
||||
for topic in set(topics)
|
||||
for exchange in
|
||||
set(self.conf.notification.notification_control_exchanges)]
|
||||
|
||||
def run(self):
|
||||
# Delay startup so workers are jittered
|
||||
time.sleep(self.startup_delay)
|
||||
|
@ -236,7 +249,7 @@ class NotificationService(cotyledon.Service):
|
|||
endpoints.append(
|
||||
event_endpoint.EventEndpoint(event_pipe_manager))
|
||||
|
||||
targets = []
|
||||
targets = self.get_targets()
|
||||
for ext in notification_manager:
|
||||
handler = ext.obj
|
||||
LOG.debug('Event types from %(name)s: %(type)s'
|
||||
|
@ -244,12 +257,6 @@ class NotificationService(cotyledon.Service):
|
|||
{'name': ext.name,
|
||||
'type': ', '.join(handler.event_types),
|
||||
'error': ack_on_error})
|
||||
# NOTE(gordc): this could be a set check but oslo_messaging issue
|
||||
# https://bugs.launchpad.net/oslo.messaging/+bug/1398511
|
||||
# This ensures we don't create multiple duplicate consumers.
|
||||
for new_tar in handler.get_targets(self.conf):
|
||||
if new_tar not in targets:
|
||||
targets.append(new_tar)
|
||||
endpoints.append(handler)
|
||||
|
||||
urls = self.conf.notification.messaging_urls or [None]
|
||||
|
|
|
@ -920,23 +920,6 @@ class NotificationEndpoint(object):
|
|||
event_type='|'.join(self.event_types))
|
||||
self.manager = manager
|
||||
|
||||
@staticmethod
|
||||
def get_notification_topics(conf):
|
||||
if 'notification_topics' in conf:
|
||||
return conf.notification_topics
|
||||
return conf.oslo_messaging_notifications.topics
|
||||
|
||||
def get_targets(self, conf):
|
||||
"""Return a sequence of oslo_messaging.Target
|
||||
|
||||
This sequence is defining the exchange and topics to be connected for
|
||||
this plugin.
|
||||
"""
|
||||
return [oslo_messaging.Target(topic=topic, exchange=exchange)
|
||||
for topic in self.get_notification_topics(conf)
|
||||
for exchange in
|
||||
conf.notification.notification_control_exchanges]
|
||||
|
||||
@abc.abstractproperty
|
||||
def event_types(self):
|
||||
"""Return a sequence of strings to filter on.
|
||||
|
|
|
@ -94,7 +94,3 @@ class TestNotifications(base.BaseTestCase):
|
|||
self.assertEqual(HTTP_RESPONSE['payload']['request']
|
||||
['HTTP_X_SERVICE_NAME'], sample.resource_id)
|
||||
self.assertEqual(1, sample.volume)
|
||||
|
||||
def test_targets(self):
|
||||
targets = middleware.HTTPRequest(mock.Mock()).get_targets(self.CONF)
|
||||
self.assertEqual(15, len(targets))
|
||||
|
|
|
@ -85,14 +85,6 @@ TEST_NOTICE_PAYLOAD = {
|
|||
class _FakeNotificationPlugin(pipeline.NotificationEndpoint):
|
||||
event_types = ['fake.event']
|
||||
|
||||
def get_targets(self, conf):
|
||||
return [
|
||||
oslo_messaging.Target(
|
||||
topic=topic,
|
||||
exchange=conf.notification.notification_control_exchanges[0])
|
||||
for topic in self.get_notification_topics(conf)
|
||||
]
|
||||
|
||||
def build_sample(self, message):
|
||||
return []
|
||||
|
||||
|
@ -106,6 +98,9 @@ class TestNotification(tests_base.BaseTestCase):
|
|||
self.setup_messaging(self.CONF)
|
||||
self.srv = notification.NotificationService(0, self.CONF)
|
||||
|
||||
def test_targets(self):
|
||||
self.assertEqual(15, len(self.srv.get_targets()))
|
||||
|
||||
def fake_get_notifications_manager(self, pm):
|
||||
self.plugin = _FakeNotificationPlugin(pm)
|
||||
return extension.ExtensionManager.make_test_instance(
|
||||
|
@ -153,21 +148,13 @@ class TestNotification(tests_base.BaseTestCase):
|
|||
|
||||
@mock.patch('oslo_messaging.get_batch_notification_listener')
|
||||
def test_unique_consumers(self, mock_listener):
|
||||
|
||||
def fake_get_notifications_manager_dup_targets(pm):
|
||||
plugin = _FakeNotificationPlugin(pm)
|
||||
return extension.ExtensionManager.make_test_instance(
|
||||
[extension.Extension('test', None, None, plugin),
|
||||
extension.Extension('test', None, None, plugin)])
|
||||
|
||||
with mock.patch.object(self.srv,
|
||||
'_get_notifications_manager') as get_nm:
|
||||
get_nm.side_effect = fake_get_notifications_manager_dup_targets
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
# 1 target, 1 listener
|
||||
self.assertEqual(1, len(mock_listener.call_args_list[0][0][1]))
|
||||
self.assertEqual(1, len(self.srv.listeners))
|
||||
self.CONF.set_override('notification_control_exchanges', ['dup'] * 2,
|
||||
group='notification')
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
# 1 target, 1 listener
|
||||
self.assertEqual(1, len(mock_listener.call_args_list[0][0][1]))
|
||||
self.assertEqual(1, len(self.srv.listeners))
|
||||
|
||||
|
||||
class BaseRealNotification(tests_base.BaseTestCase):
|
||||
|
|
Loading…
Reference in New Issue