Add Heat and Nova notification topic config

Both the Heat and Nova topics are fully configurable
and we can never assume that the topic name is static.

In addition other services might be consuming these
queues.

Change-Id: Id036dc7e0750095d4ffbbe91cfc4d655f3c8651f
This commit is contained in:
Erik Olof Gunnar Andersson 2019-11-15 00:13:52 -08:00
parent 3d20220f34
commit 3a31cca254
5 changed files with 27 additions and 41 deletions

View File

@ -204,8 +204,12 @@ healthmgr_group = cfg.OptGroup('health_manager')
healthmgr_opts = [
cfg.StrOpt('nova_control_exchange', default='nova',
help=_("Exchange name for nova notifications.")),
cfg.StrOpt('nova_notification_topic', default='versioned_notifications',
help=_("Topic name for nova notifications.")),
cfg.StrOpt('heat_control_exchange', default='heat',
help=_("Exchange name for heat notifications.")),
cfg.StrOpt('heat_notification_topic', default='notifications',
help=_("Topic name for heat notifications.")),
cfg.MultiStrOpt("enabled_endpoints", default=['nova', 'heat'],
help=_("Notification endpoints to enable.")),
cfg.IntOpt('workers',

View File

@ -67,28 +67,19 @@ def ListenerProc(exchange, project_id, cluster_id, recover_action):
transport = messaging.get_notification_transport(cfg.CONF)
if exchange == cfg.CONF.health_manager.nova_control_exchange:
targets = [
messaging.Target(topic='versioned_notifications',
exchange=exchange),
]
endpoints = [
nova_endpoint.NovaNotificationEndpoint(
project_id, cluster_id, recover_action
),
]
else: # heat notification
targets = [
messaging.Target(topic='notifications', exchange=exchange),
]
endpoints = [
heat_endpoint.HeatNotificationEndpoint(
project_id, cluster_id, recover_action
),
]
endpoint = nova_endpoint.NovaNotificationEndpoint(
project_id, cluster_id, recover_action
)
else:
endpoint = heat_endpoint.HeatNotificationEndpoint(
project_id, cluster_id, recover_action
)
listener = messaging.get_notification_listener(
transport, targets, endpoints, executor='threading',
pool="senlin-listeners")
transport, [endpoint.target], [endpoint], executor='threading',
pool='senlin-listeners'
)
listener.start()

View File

@ -37,9 +37,10 @@ class HeatNotificationEndpoint(base.Endpoints):
event_type='^orchestration\.stack\..*',
context={'project_id': '^%s$' % project_id})
self.rpc = rpc_client.EngineClient()
self.exchange = cfg.CONF.health_manager.heat_control_exchange
self.target = messaging.Target(topic='notifications',
exchange=self.exchange)
self.target = messaging.Target(
topic=cfg.CONF.health_manager.heat_notification_topic,
exchange=cfg.CONF.health_manager.heat_control_exchange,
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
if event_type not in self.STACK_FAILURE_EVENTS:

View File

@ -41,9 +41,10 @@ class NovaNotificationEndpoint(base.Endpoints):
event_type='^compute\.instance\..*',
context={'project_id': '^%s$' % project_id})
self.rpc = rpc_client.EngineClient()
self.exchange = cfg.CONF.health_manager.nova_control_exchange
self.target = messaging.Target(topic='versioned_notifications',
exchange=self.exchange)
self.target = messaging.Target(
topic=cfg.CONF.health_manager.nova_notification_topic,
exchange=cfg.CONF.health_manager.nova_control_exchange,
)
def info(self, ctxt, publisher_id, event_type, payload, metadata):
meta = payload['metadata']

View File

@ -251,14 +251,12 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
'senlin.engine.notifications.heat_endpoint.HeatNotificationEndpoint')
@mock.patch(
'senlin.engine.notifications.nova_endpoint.NovaNotificationEndpoint')
@mock.patch('oslo_messaging.Target')
@mock.patch('oslo_messaging.get_notification_transport')
@mock.patch('oslo_messaging.get_notification_listener')
class TestListenerProc(base.SenlinTestCase):
def test_listener_proc_nova(self, mock_listener, mock_transport,
mock_target, mock_novaendpoint,
mock_heatendpoint):
mock_novaendpoint, mock_heatendpoint):
cfg.CONF.set_override('nova_control_exchange', 'FAKE_EXCHANGE',
group='health_manager')
@ -266,8 +264,6 @@ class TestListenerProc(base.SenlinTestCase):
mock_listener.return_value = x_listener
x_transport = mock.Mock()
mock_transport.return_value = x_transport
x_target = mock.Mock()
mock_target.return_value = x_target
x_endpoint = mock.Mock()
mock_novaendpoint.return_value = x_endpoint
@ -277,24 +273,19 @@ class TestListenerProc(base.SenlinTestCase):
self.assertIsNone(res)
mock_transport.assert_called_once_with(cfg.CONF)
mock_target.assert_called_once_with(topic="versioned_notifications",
exchange='FAKE_EXCHANGE')
mock_novaendpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID',
recover_action)
mock_listener.assert_called_once_with(
x_transport, [x_target], [x_endpoint],
x_transport, [mock_novaendpoint().target], [x_endpoint],
executor='threading', pool="senlin-listeners")
x_listener.start.assert_called_once_with()
def test_listener_proc_heat(self, mock_listener, mock_transport,
mock_target, mock_novaendpoint,
mock_heatendpoint):
mock_novaendpoint, mock_heatendpoint):
x_listener = mock.Mock()
mock_listener.return_value = x_listener
x_transport = mock.Mock()
mock_transport.return_value = x_transport
x_target = mock.Mock()
mock_target.return_value = x_target
x_endpoint = mock.Mock()
mock_heatendpoint.return_value = x_endpoint
@ -304,12 +295,10 @@ class TestListenerProc(base.SenlinTestCase):
self.assertIsNone(res)
mock_transport.assert_called_once_with(cfg.CONF)
mock_target.assert_called_once_with(topic="notifications",
exchange='heat')
mock_heatendpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID',
recover_action)
mock_listener.assert_called_once_with(
x_transport, [x_target], [x_endpoint],
x_transport, [mock_heatendpoint().target], [x_endpoint],
executor='threading', pool="senlin-listeners")
x_listener.start.assert_called_once_with()