diff --git a/senlin/common/config.py b/senlin/common/config.py index 807e6d2f1..921842431 100644 --- a/senlin/common/config.py +++ b/senlin/common/config.py @@ -146,9 +146,10 @@ cfg.CONF.register_opts(authentication_opts, group=authentication_group) # Health Manager Group healthmgr_group = cfg.OptGroup('health_manager') healthmgr_opts = [ - cfg.StrOpt('nova_control_exchange', - default='nova', - help="Exchange name for nova notifications"), + cfg.StrOpt('nova_control_exchange', default='nova', + help=_("Exchange name for nova notifications")), + cfg.StrOpt('heat_control_exchange', default='heat', + help=_("Exchange name for heat notifications")), ] cfg.CONF.register_group(healthmgr_group) cfg.CONF.register_opts(healthmgr_opts, group=healthmgr_group) diff --git a/senlin/engine/health_manager.py b/senlin/engine/health_manager.py index e01c7345f..c22551bd3 100644 --- a/senlin/engine/health_manager.py +++ b/senlin/engine/health_manager.py @@ -37,7 +37,7 @@ from senlin.rpc import client as rpc_client LOG = logging.getLogger(__name__) -class NotificationEndpoint(object): +class NovaNotificationEndpoint(object): VM_FAILURE_EVENTS = { 'compute.instance.delete.end': 'DELETE', @@ -93,13 +93,26 @@ class NotificationEndpoint(object): def ListenerProc(exchange, project_id, cluster_id): + """Thread procedure for running a event listener. + + :param exchange: The control exchange for a target service. + :param project_id: The ID of the project to filter. + :param cluster_id: The ID of the cluster to filter. + """ transport = messaging.get_notification_transport(cfg.CONF) - targets = [ - messaging.Target(topic='versioned_notifications', exchange=exchange), - ] - endpoints = [ - NotificationEndpoint(project_id, cluster_id), - ] + + if exchange == cfg.CONF.health_manager.nova_control_exchange: + targets = [ + messaging.Target(topic='versioned_notifications', + exchange=exchange), + ] + endpoints = [ + NovaNotificationEndpoint(project_id, cluster_id), + ] + else: # heat notification + LOG.warning(_LW("Heat listener to be added.")) + return + listener = messaging.get_notification_listener( transport, targets, endpoints, executor='threading', pool="senlin-listeners") @@ -197,12 +210,18 @@ class HealthManager(service.Service): if not cluster: LOG.warning(_LW("Cluster (%s) is not found."), cluster_id) return + profile = objects.Profile.get(self.ctx, cluster.profile_id, + project_safe=False) + profile_type = profile.type_name.split('-')[0] + if profile_type == 'os.nova.server': + exchange = cfg.CONF.health_manager.nova_control_exchange + elif profile_type == 'os.heat.stack': + exchange = cfg.CONF.health_manager.heat_control_exchange + else: + return None project = cluster.project - nova_exchange = cfg.CONF.health_manager.nova_control_exchange - return self.TG.add_thread(ListenerProc, - nova_exchange, - project, cluster_id) + return self.TG.add_thread(ListenerProc, exchange, project, cluster_id) def _start_check(self, entry): """Routine for starting the checking for a cluster. @@ -210,25 +229,25 @@ class HealthManager(service.Service): :param entry: A dict containing the data associated with the cluster. :returns: An updated registry entry record. """ - if entry['check_type'] == consts.NODE_STATUS_POLLING: + cid = entry['cluster_id'] + ctype = entry['check_type'] + if ctype == consts.NODE_STATUS_POLLING: # TODO(anyone): Improve this to use one-shot flavor of timer interval = min(entry['interval'], cfg.CONF.periodic_interval_max) timer = self.TG.add_timer(interval, self._poll_cluster, None, - entry['cluster_id'], interval) + cid, interval) entry['timer'] = timer - elif entry['check_type'] == consts.LIFECYCLE_EVENTS: - LOG.info(_LI("Start listening events for cluster (%s)."), - entry['cluster_id']) - listener = self._add_listener(entry['cluster_id']) + elif ctype == consts.LIFECYCLE_EVENTS: + LOG.info(_LI("Start listening events for cluster (%s)."), cid) + listener = self._add_listener(cid) if listener: entry['listener'] = listener else: + LOG.warning(_LW("Error creating listener for cluster %s"), cid) return None else: - LOG.warning(_LW("Cluster (%(id)s) check type (%(type)s) is " - "invalid."), - {'id': entry['cluster_id'], - 'type': entry['check_type']}) + LOG.warning(_LW("Cluster %(id)s check type %(type)s is invalid."), + {'id': cid, 'type': ctype}) return None return entry diff --git a/senlin/tests/unit/engine/test_health_manager.py b/senlin/tests/unit/engine/test_health_manager.py index e1b1fe8f9..5e67b7ed2 100644 --- a/senlin/tests/unit/engine/test_health_manager.py +++ b/senlin/tests/unit/engine/test_health_manager.py @@ -18,17 +18,18 @@ from oslo_config import cfg from senlin.common import consts from senlin.common import context from senlin.common import messaging -from senlin.engine import health_manager +from senlin.engine import health_manager as hm from senlin.objects import cluster as obj_cluster from senlin.objects import health_registry as hr from senlin.objects import node as obj_node +from senlin.objects import profile as obj_profile from senlin.objects.requests import nodes as vorn from senlin.rpc import client as rpc_client from senlin.tests.unit.common import base @mock.patch('oslo_messaging.NotificationFilter') -class TestNotificationEndpoint(base.SenlinTestCase): +class TestNovaNotificationEndpoint(base.SenlinTestCase): @mock.patch('senlin.rpc.client.EngineClient') def test_init(self, mock_rpc, mock_filter): @@ -42,7 +43,7 @@ class TestNotificationEndpoint(base.SenlinTestCase): 'compute.instance.soft_delete.end': 'SOFT_DELETE', } - obj = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER') + obj = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER') mock_filter.assert_called_once_with( publisher_id='^compute.*', @@ -61,7 +62,7 @@ class TestNotificationEndpoint(base.SenlinTestCase): @mock.patch('senlin.rpc.client.EngineClient') def test_info(self, mock_rpc, mock_context, mock_filter): x_rpc = mock_rpc.return_value - endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID') + endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') ctx = mock.Mock() payload = { 'metadata': { @@ -97,7 +98,7 @@ class TestNotificationEndpoint(base.SenlinTestCase): @mock.patch('senlin.rpc.client.EngineClient') def test_info_no_metadata(self, mock_rpc, mock_filter): x_rpc = mock_rpc.return_value - endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID') + endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') ctx = mock.Mock() payload = {'metadata': {}} metadata = {'timestamp': 'TIMESTAMP'} @@ -111,7 +112,7 @@ class TestNotificationEndpoint(base.SenlinTestCase): @mock.patch('senlin.rpc.client.EngineClient') def test_info_no_cluster_in_metadata(self, mock_rpc, mock_filter): x_rpc = mock_rpc.return_value - endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID') + endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') ctx = mock.Mock() payload = {'metadata': {'foo': 'bar'}} metadata = {'timestamp': 'TIMESTAMP'} @@ -125,7 +126,7 @@ class TestNotificationEndpoint(base.SenlinTestCase): @mock.patch('senlin.rpc.client.EngineClient') def test_info_cluster_id_not_match(self, mock_rpc, mock_filter): x_rpc = mock_rpc.return_value - endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID') + endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') ctx = mock.Mock() payload = {'metadata': {'cluster_id': 'FOOBAR'}} metadata = {'timestamp': 'TIMESTAMP'} @@ -139,7 +140,7 @@ class TestNotificationEndpoint(base.SenlinTestCase): @mock.patch('senlin.rpc.client.EngineClient') def test_info_event_type_not_interested(self, mock_rpc, mock_filter): x_rpc = mock_rpc.return_value - endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID') + endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') ctx = mock.Mock() payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}} metadata = {'timestamp': 'TIMESTAMP'} @@ -153,7 +154,7 @@ class TestNotificationEndpoint(base.SenlinTestCase): @mock.patch('senlin.rpc.client.EngineClient') def test_info_no_node_id(self, mock_rpc, mock_filter): x_rpc = mock_rpc.return_value - endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID') + endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') ctx = mock.Mock() payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}} metadata = {'timestamp': 'TIMESTAMP'} @@ -168,7 +169,7 @@ class TestNotificationEndpoint(base.SenlinTestCase): @mock.patch('senlin.rpc.client.EngineClient') def test_info_default_values(self, mock_rpc, mock_context, mock_filter): x_rpc = mock_rpc.return_value - endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID') + endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') ctx = mock.Mock() payload = { 'metadata': { @@ -199,14 +200,17 @@ class TestNotificationEndpoint(base.SenlinTestCase): self.assertEqual(expected_params, req.params) -@mock.patch('senlin.engine.health_manager.NotificationEndpoint') +@mock.patch('senlin.engine.health_manager.NovaNotificationEndpoint') @mock.patch('oslo_messaging.Target') @mock.patch('oslo_messaging.get_notification_transport') @mock.patch('oslo_messaging.get_notification_listener') class TestListenerProc(base.SenlinTestCase): - def test_listener_proc(self, mock_listener, mock_transport, mock_target, - mock_endpoint): + def test_listener_proc_nova(self, mock_listener, mock_transport, + mock_target, mock_endpoint): + cfg.CONF.set_override('nova_control_exchange', 'FAKE_EXCHANGE', + group='health_manager', enforce_type=True) + x_listener = mock.Mock() mock_listener.return_value = x_listener x_transport = mock.Mock() @@ -216,19 +220,32 @@ class TestListenerProc(base.SenlinTestCase): x_endpoint = mock.Mock() mock_endpoint.return_value = x_endpoint - res = health_manager.ListenerProc('EXCHANGE', 'PROJECT_ID', - 'CLUSTER_ID') + res = hm.ListenerProc('FAKE_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID') self.assertIsNone(res) mock_transport.assert_called_once_with(cfg.CONF) mock_target.assert_called_once_with(topic="versioned_notifications", - exchange='EXCHANGE') + exchange='FAKE_EXCHANGE') mock_endpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID') mock_listener.assert_called_once_with( x_transport, [x_target], [x_endpoint], executor='threading', pool="senlin-listeners") x_listener.start.assert_called_once_with() + def test_listener_proc_others(self, mock_listener, mock_transport, + mock_target, mock_endpoint): + x_transport = mock.Mock() + mock_transport.return_value = x_transport + + res = hm.ListenerProc('BOGUS', 'PROJECT_ID', 'CLUSTER_ID') + + self.assertIsNone(res) + + mock_transport.assert_called_once_with(cfg.CONF) + self.assertFalse(mock_listener.called) + self.assertFalse(mock_target.called) + self.assertFalse(mock_endpoint.called) + class TestHealthManager(base.SenlinTestCase): @@ -239,7 +256,7 @@ class TestHealthManager(base.SenlinTestCase): mock_eng.engine_id = 'ENGINE_ID' topic = consts.HEALTH_MANAGER_TOPIC version = consts.RPC_API_VERSION - self.hm = health_manager.HealthManager(mock_eng, topic, version) + self.hm = hm.HealthManager(mock_eng, topic, version) def test_init(self): self.assertEqual('ENGINE_ID', self.hm.engine_id) @@ -306,7 +323,7 @@ class TestHealthManager(base.SenlinTestCase): self.hm.registries[1]) @mock.patch.object(obj_node.Node, 'get_all') - @mock.patch.object(health_manager.HealthManager, "_wait_for_action") + @mock.patch.object(hm.HealthManager, "_wait_for_action") @mock.patch.object(obj_cluster.Cluster, 'get') @mock.patch.object(context, 'get_service_context') @mock.patch.object(context.RequestContext, 'from_dict') @@ -382,7 +399,7 @@ class TestHealthManager(base.SenlinTestCase): self.assertEqual('PROJECT_ID', ctx.project) mock_check.assert_called_once_with(ctx, 'cluster_check', mock.ANY) - @mock.patch.object(health_manager.HealthManager, "_wait_for_action") + @mock.patch.object(hm.HealthManager, "_wait_for_action") @mock.patch.object(obj_cluster.Cluster, 'get') @mock.patch.object(context, 'get_service_context') @mock.patch.object(context.RequestContext, 'from_dict') @@ -414,27 +431,75 @@ class TestHealthManager(base.SenlinTestCase): mock_rpc.assert_called_once_with(ctx, 'cluster_check', mock.ANY) mock_wait.assert_called_once_with(ctx, "CHECK_ID", 456) + @mock.patch.object(obj_profile.Profile, 'get') @mock.patch.object(obj_cluster.Cluster, 'get') - def test__add_listener(self, mock_get): + def test__add_listener_nova(self, mock_cluster, mock_profile): cfg.CONF.set_override('nova_control_exchange', 'FAKE_NOVA_EXCHANGE', group='health_manager', enforce_type=True) x_listener = mock.Mock() mock_add_thread = self.patchobject(self.hm.TG, 'add_thread', return_value=x_listener) - x_cluster = mock.Mock(project='PROJECT_ID') - mock_get.return_value = x_cluster + x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID') + mock_cluster.return_value = x_cluster + x_profile = mock.Mock(type_name='os.nova.server-1.0') + mock_profile.return_value = x_profile # do it res = self.hm._add_listener('CLUSTER_ID') # assertions self.assertEqual(x_listener, res) - mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', - project_safe=False) - mock_add_thread.assert_called_once_with(health_manager.ListenerProc, - 'FAKE_NOVA_EXCHANGE', - 'PROJECT_ID', - 'CLUSTER_ID') + mock_cluster.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', + project_safe=False) + mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID', + project_safe=False) + mock_add_thread.assert_called_once_with( + hm.ListenerProc, 'FAKE_NOVA_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID') + + @mock.patch.object(obj_profile.Profile, 'get') + @mock.patch.object(obj_cluster.Cluster, 'get') + def test__add_listener_heat(self, mock_cluster, mock_profile): + cfg.CONF.set_override('heat_control_exchange', 'FAKE_HEAT_EXCHANGE', + group='health_manager', enforce_type=True) + x_listener = mock.Mock() + mock_add_thread = self.patchobject(self.hm.TG, 'add_thread', + return_value=x_listener) + x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID') + mock_cluster.return_value = x_cluster + x_profile = mock.Mock(type_name='os.heat.stack-1.0') + mock_profile.return_value = x_profile + + # do it + res = self.hm._add_listener('CLUSTER_ID') + + # assertions + self.assertEqual(x_listener, res) + mock_cluster.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', + project_safe=False) + mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID', + project_safe=False) + mock_add_thread.assert_called_once_with( + hm.ListenerProc, 'FAKE_HEAT_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID') + + @mock.patch.object(obj_profile.Profile, 'get') + @mock.patch.object(obj_cluster.Cluster, 'get') + def test__add_listener_other_types(self, mock_cluster, mock_profile): + mock_add_thread = self.patchobject(self.hm.TG, 'add_thread') + x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID') + mock_cluster.return_value = x_cluster + x_profile = mock.Mock(type_name='other.types-1.0') + mock_profile.return_value = x_profile + + # do it + res = self.hm._add_listener('CLUSTER_ID') + + # assertions + self.assertIsNone(res) + mock_cluster.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', + project_safe=False) + mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID', + project_safe=False) + self.assertFalse(mock_add_thread.called) @mock.patch.object(obj_cluster.Cluster, 'get') def test__add_listener_cluster_not_found(self, mock_get): @@ -580,7 +645,7 @@ class TestHealthManager(base.SenlinTestCase): mock_add_tm.assert_called_with(50, mock_poll, None, 'CLUSTER_ID', 50) self.assertEqual(1, len(self.hm.registries)) - @mock.patch.object(health_manager.HealthManager, '_stop_check') + @mock.patch.object(hm.HealthManager, '_stop_check') @mock.patch.object(hr.HealthRegistry, 'delete') def test_unregister_cluster(self, mock_delete, mock_stop): ctx = mock.Mock() @@ -602,7 +667,7 @@ class TestHealthManager(base.SenlinTestCase): mock_delete.assert_called_once_with(ctx, 'CLUSTER_ID') @mock.patch.object(hr.HealthRegistry, 'update') - @mock.patch.object(health_manager.HealthManager, '_start_check') + @mock.patch.object(hm.HealthManager, '_start_check') def test_enable_cluster(self, mock_start, mock_update): ctx = mock.Mock() entry1 = {'cluster_id': 'FAKE_ID', 'enabled': False} @@ -617,7 +682,7 @@ class TestHealthManager(base.SenlinTestCase): mock_update.assert_called_once_with(ctx, 'FAKE_ID', {'enabled': True}) @mock.patch.object(hr.HealthRegistry, 'update') - @mock.patch.object(health_manager.HealthManager, '_stop_check') + @mock.patch.object(hm.HealthManager, '_stop_check') def test_disable_cluster(self, mock_stop, mock_update): ctx = mock.Mock() entry1 = {'cluster_id': 'FAKE_ID', 'enabled': True}