diff --git a/senlin/common/config.py b/senlin/common/config.py index 809320166..58e7110c7 100755 --- a/senlin/common/config.py +++ b/senlin/common/config.py @@ -204,8 +204,12 @@ healthmgr_group = cfg.OptGroup('health_manager') healthmgr_opts = [ cfg.StrOpt('nova_control_exchange', default='nova', help=_("Exchange name for nova notifications.")), + cfg.StrOpt('nova_notification_topic', default='versioned_notifications', + help=_("Topic name for nova notifications.")), cfg.StrOpt('heat_control_exchange', default='heat', help=_("Exchange name for heat notifications.")), + cfg.StrOpt('heat_notification_topic', default='notifications', + help=_("Topic name for heat notifications.")), cfg.MultiStrOpt("enabled_endpoints", default=['nova', 'heat'], help=_("Notification endpoints to enable.")), cfg.IntOpt('workers', diff --git a/senlin/engine/health_manager.py b/senlin/engine/health_manager.py index 5762ccf67..e824de01b 100644 --- a/senlin/engine/health_manager.py +++ b/senlin/engine/health_manager.py @@ -67,28 +67,19 @@ def ListenerProc(exchange, project_id, cluster_id, recover_action): transport = messaging.get_notification_transport(cfg.CONF) if exchange == cfg.CONF.health_manager.nova_control_exchange: - targets = [ - messaging.Target(topic='versioned_notifications', - exchange=exchange), - ] - endpoints = [ - nova_endpoint.NovaNotificationEndpoint( - project_id, cluster_id, recover_action - ), - ] - else: # heat notification - targets = [ - messaging.Target(topic='notifications', exchange=exchange), - ] - endpoints = [ - heat_endpoint.HeatNotificationEndpoint( - project_id, cluster_id, recover_action - ), - ] + endpoint = nova_endpoint.NovaNotificationEndpoint( + project_id, cluster_id, recover_action + ) + + else: + endpoint = heat_endpoint.HeatNotificationEndpoint( + project_id, cluster_id, recover_action + ) listener = messaging.get_notification_listener( - transport, targets, endpoints, executor='threading', - pool="senlin-listeners") + transport, [endpoint.target], [endpoint], executor='threading', + pool='senlin-listeners' + ) listener.start() diff --git a/senlin/engine/notifications/heat_endpoint.py b/senlin/engine/notifications/heat_endpoint.py index bd160db8d..a30353676 100644 --- a/senlin/engine/notifications/heat_endpoint.py +++ b/senlin/engine/notifications/heat_endpoint.py @@ -37,9 +37,10 @@ class HeatNotificationEndpoint(base.Endpoints): event_type='^orchestration\.stack\..*', context={'project_id': '^%s$' % project_id}) self.rpc = rpc_client.EngineClient() - self.exchange = cfg.CONF.health_manager.heat_control_exchange - self.target = messaging.Target(topic='notifications', - exchange=self.exchange) + self.target = messaging.Target( + topic=cfg.CONF.health_manager.heat_notification_topic, + exchange=cfg.CONF.health_manager.heat_control_exchange, + ) def info(self, ctxt, publisher_id, event_type, payload, metadata): if event_type not in self.STACK_FAILURE_EVENTS: diff --git a/senlin/engine/notifications/nova_endpoint.py b/senlin/engine/notifications/nova_endpoint.py index f5b4f0630..c631d7385 100644 --- a/senlin/engine/notifications/nova_endpoint.py +++ b/senlin/engine/notifications/nova_endpoint.py @@ -41,9 +41,10 @@ class NovaNotificationEndpoint(base.Endpoints): event_type='^compute\.instance\..*', context={'project_id': '^%s$' % project_id}) self.rpc = rpc_client.EngineClient() - self.exchange = cfg.CONF.health_manager.nova_control_exchange - self.target = messaging.Target(topic='versioned_notifications', - exchange=self.exchange) + self.target = messaging.Target( + topic=cfg.CONF.health_manager.nova_notification_topic, + exchange=cfg.CONF.health_manager.nova_control_exchange, + ) def info(self, ctxt, publisher_id, event_type, payload, metadata): meta = payload['metadata'] diff --git a/senlin/tests/unit/engine/test_health_manager.py b/senlin/tests/unit/engine/test_health_manager.py index 710ae3544..b74bf7f1b 100644 --- a/senlin/tests/unit/engine/test_health_manager.py +++ b/senlin/tests/unit/engine/test_health_manager.py @@ -251,14 +251,12 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase): 'senlin.engine.notifications.heat_endpoint.HeatNotificationEndpoint') @mock.patch( 'senlin.engine.notifications.nova_endpoint.NovaNotificationEndpoint') -@mock.patch('oslo_messaging.Target') @mock.patch('oslo_messaging.get_notification_transport') @mock.patch('oslo_messaging.get_notification_listener') class TestListenerProc(base.SenlinTestCase): def test_listener_proc_nova(self, mock_listener, mock_transport, - mock_target, mock_novaendpoint, - mock_heatendpoint): + mock_novaendpoint, mock_heatendpoint): cfg.CONF.set_override('nova_control_exchange', 'FAKE_EXCHANGE', group='health_manager') @@ -266,8 +264,6 @@ class TestListenerProc(base.SenlinTestCase): mock_listener.return_value = x_listener x_transport = mock.Mock() mock_transport.return_value = x_transport - x_target = mock.Mock() - mock_target.return_value = x_target x_endpoint = mock.Mock() mock_novaendpoint.return_value = x_endpoint @@ -277,24 +273,19 @@ class TestListenerProc(base.SenlinTestCase): self.assertIsNone(res) mock_transport.assert_called_once_with(cfg.CONF) - mock_target.assert_called_once_with(topic="versioned_notifications", - exchange='FAKE_EXCHANGE') mock_novaendpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID', recover_action) mock_listener.assert_called_once_with( - x_transport, [x_target], [x_endpoint], + x_transport, [mock_novaendpoint().target], [x_endpoint], executor='threading', pool="senlin-listeners") x_listener.start.assert_called_once_with() def test_listener_proc_heat(self, mock_listener, mock_transport, - mock_target, mock_novaendpoint, - mock_heatendpoint): + mock_novaendpoint, mock_heatendpoint): x_listener = mock.Mock() mock_listener.return_value = x_listener x_transport = mock.Mock() mock_transport.return_value = x_transport - x_target = mock.Mock() - mock_target.return_value = x_target x_endpoint = mock.Mock() mock_heatendpoint.return_value = x_endpoint @@ -304,12 +295,10 @@ class TestListenerProc(base.SenlinTestCase): self.assertIsNone(res) mock_transport.assert_called_once_with(cfg.CONF) - mock_target.assert_called_once_with(topic="notifications", - exchange='heat') mock_heatendpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID', recover_action) mock_listener.assert_called_once_with( - x_transport, [x_target], [x_endpoint], + x_transport, [mock_heatendpoint().target], [x_endpoint], executor='threading', pool="senlin-listeners") x_listener.start.assert_called_once_with()