Consume notif. from multiple message bus
This patch allow to consume notification from multiple message bus servers at the same times with configuration like: [DEFAULT] transport_url = rabbit://rabbit-ceilo:5772/ [notification] messaging_urls = rabbit://rabbit-nova1:5772,rabbit-nova2:5772/ messaging_urls = rabbit://rabbit-cinder:5772/ messaging_urls = rabbit://rabbit-neutron:5772/ Implements blueprint multiple-rabbitmq Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com> Change-Id: If3bf6f080ce15f2403c3516b433953b0bdf6ea40
This commit is contained in:
@@ -63,11 +63,16 @@ def get_rpc_client(**kwargs):
|
|||||||
return oslo.messaging.RPCClient(TRANSPORT, target)
|
return oslo.messaging.RPCClient(TRANSPORT, target)
|
||||||
|
|
||||||
|
|
||||||
def get_notification_listener(targets, endpoints):
|
def get_notification_listener(targets, endpoints, url=None):
|
||||||
"""Return a configured oslo.messaging notification listener."""
|
"""Return a configured oslo.messaging notification listener."""
|
||||||
global TRANSPORT
|
global TRANSPORT
|
||||||
|
if url:
|
||||||
|
transport = oslo.messaging.get_transport(cfg.CONF, url,
|
||||||
|
_ALIASES)
|
||||||
|
else:
|
||||||
|
transport = TRANSPORT
|
||||||
return oslo.messaging.get_notification_listener(
|
return oslo.messaging.get_notification_listener(
|
||||||
TRANSPORT, targets, endpoints, executor='eventlet')
|
transport, targets, endpoints, executor='eventlet')
|
||||||
|
|
||||||
|
|
||||||
def get_notifier(publisher_id):
|
def get_notifier(publisher_id):
|
||||||
|
|||||||
@@ -39,6 +39,12 @@ OPTS = [
|
|||||||
deprecated_group='collector',
|
deprecated_group='collector',
|
||||||
default=False,
|
default=False,
|
||||||
help='Save event details.'),
|
help='Save event details.'),
|
||||||
|
cfg.MultiStrOpt('messaging_urls',
|
||||||
|
default=[],
|
||||||
|
help="Messaging URLs to listen for notifications. "
|
||||||
|
"Example: transport://user:pass@host1:port"
|
||||||
|
"[,hostN:portN]/virtual_host "
|
||||||
|
"(DEFAULT/transport_url is used if empty)"),
|
||||||
]
|
]
|
||||||
|
|
||||||
cfg.CONF.register_opts(OPTS, group="notification")
|
cfg.CONF.register_opts(OPTS, group="notification")
|
||||||
@@ -83,12 +89,18 @@ class NotificationService(os_service.Service):
|
|||||||
targets.extend(handler.get_targets(cfg.CONF))
|
targets.extend(handler.get_targets(cfg.CONF))
|
||||||
endpoints.append(handler)
|
endpoints.append(handler)
|
||||||
|
|
||||||
self.listener = messaging.get_notification_listener(targets, endpoints)
|
urls = cfg.CONF.notification.messaging_urls or [None]
|
||||||
self.listener.start()
|
self.listeners = []
|
||||||
|
for url in urls:
|
||||||
|
listener = messaging.get_notification_listener(targets,
|
||||||
|
endpoints,
|
||||||
|
url)
|
||||||
|
listener.start()
|
||||||
|
self.listeners.append(listener)
|
||||||
|
|
||||||
# Add a dummy thread to have wait() working
|
# Add a dummy thread to have wait() working
|
||||||
self.tg.add_timer(604800, lambda: None)
|
self.tg.add_timer(604800, lambda: None)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.listener.stop()
|
map(lambda x: x.stop(), self.listeners)
|
||||||
super(NotificationService, self).stop()
|
super(NotificationService, self).stop()
|
||||||
|
|||||||
@@ -114,6 +114,12 @@ class TestNotification(tests_base.BaseTestCase):
|
|||||||
self.srv.start()
|
self.srv.start()
|
||||||
self.fake_event_endpoint = fake_event_endpoint_class.return_value
|
self.fake_event_endpoint = fake_event_endpoint_class.return_value
|
||||||
|
|
||||||
|
def test_start_multiple_listeners(self):
|
||||||
|
urls = ["fake://vhost1", "fake://vhost2"]
|
||||||
|
self.CONF.set_override("messaging_urls", urls, group="notification")
|
||||||
|
self._do_process_notification_manager_start()
|
||||||
|
self.assertEqual(2, len(self.srv.listeners))
|
||||||
|
|
||||||
def test_process_notification(self):
|
def test_process_notification(self):
|
||||||
self._do_process_notification_manager_start()
|
self._do_process_notification_manager_start()
|
||||||
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
|
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
|
||||||
@@ -122,18 +128,18 @@ class TestNotification(tests_base.BaseTestCase):
|
|||||||
'compute.instance.create.end',
|
'compute.instance.create.end',
|
||||||
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
|
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
|
||||||
|
|
||||||
self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints))
|
self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints))
|
||||||
self.assertTrue(self.srv.pipeline_manager.publisher.called)
|
self.assertTrue(self.srv.pipeline_manager.publisher.called)
|
||||||
|
|
||||||
def test_process_notification_no_events(self):
|
def test_process_notification_no_events(self):
|
||||||
self._do_process_notification_manager_start()
|
self._do_process_notification_manager_start()
|
||||||
self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints))
|
self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints))
|
||||||
self.assertNotEqual(self.fake_event_endpoint,
|
self.assertNotEqual(self.fake_event_endpoint,
|
||||||
self.srv.listener.dispatcher.endpoints[0])
|
self.srv.listeners[0].dispatcher.endpoints[0])
|
||||||
|
|
||||||
def test_process_notification_with_events(self):
|
def test_process_notification_with_events(self):
|
||||||
self.CONF.set_override("store_events", True, group="notification")
|
self.CONF.set_override("store_events", True, group="notification")
|
||||||
self._do_process_notification_manager_start()
|
self._do_process_notification_manager_start()
|
||||||
self.assertEqual(2, len(self.srv.listener.dispatcher.endpoints))
|
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
|
||||||
self.assertEqual(self.fake_event_endpoint,
|
self.assertEqual(self.fake_event_endpoint,
|
||||||
self.srv.listener.dispatcher.endpoints[0])
|
self.srv.listeners[0].dispatcher.endpoints[0])
|
||||||
|
|||||||
Reference in New Issue
Block a user