Make room from heat event listener

This patch makes room for listeners capable of listening to heat events.

Change-Id: Iae3fcd60e5396b9ed26a02d54a825aad7ae35062
This commit is contained in:
tengqm 2017-02-02 08:16:37 -05:00
parent cd119ea1f6
commit 39c8e92b73
3 changed files with 140 additions and 55 deletions

View File

@ -146,9 +146,10 @@ cfg.CONF.register_opts(authentication_opts, group=authentication_group)
# Health Manager Group
healthmgr_group = cfg.OptGroup('health_manager')
healthmgr_opts = [
cfg.StrOpt('nova_control_exchange',
default='nova',
help="Exchange name for nova notifications"),
cfg.StrOpt('nova_control_exchange', default='nova',
help=_("Exchange name for nova notifications")),
cfg.StrOpt('heat_control_exchange', default='heat',
help=_("Exchange name for heat notifications")),
]
cfg.CONF.register_group(healthmgr_group)
cfg.CONF.register_opts(healthmgr_opts, group=healthmgr_group)

View File

@ -37,7 +37,7 @@ from senlin.rpc import client as rpc_client
LOG = logging.getLogger(__name__)
class NotificationEndpoint(object):
class NovaNotificationEndpoint(object):
VM_FAILURE_EVENTS = {
'compute.instance.delete.end': 'DELETE',
@ -93,13 +93,26 @@ class NotificationEndpoint(object):
def ListenerProc(exchange, project_id, cluster_id):
"""Thread procedure for running a event listener.
:param exchange: The control exchange for a target service.
:param project_id: The ID of the project to filter.
:param cluster_id: The ID of the cluster to filter.
"""
transport = messaging.get_notification_transport(cfg.CONF)
targets = [
messaging.Target(topic='versioned_notifications', exchange=exchange),
]
endpoints = [
NotificationEndpoint(project_id, cluster_id),
]
if exchange == cfg.CONF.health_manager.nova_control_exchange:
targets = [
messaging.Target(topic='versioned_notifications',
exchange=exchange),
]
endpoints = [
NovaNotificationEndpoint(project_id, cluster_id),
]
else: # heat notification
LOG.warning(_LW("Heat listener to be added."))
return
listener = messaging.get_notification_listener(
transport, targets, endpoints, executor='threading',
pool="senlin-listeners")
@ -197,12 +210,18 @@ class HealthManager(service.Service):
if not cluster:
LOG.warning(_LW("Cluster (%s) is not found."), cluster_id)
return
profile = objects.Profile.get(self.ctx, cluster.profile_id,
project_safe=False)
profile_type = profile.type_name.split('-')[0]
if profile_type == 'os.nova.server':
exchange = cfg.CONF.health_manager.nova_control_exchange
elif profile_type == 'os.heat.stack':
exchange = cfg.CONF.health_manager.heat_control_exchange
else:
return None
project = cluster.project
nova_exchange = cfg.CONF.health_manager.nova_control_exchange
return self.TG.add_thread(ListenerProc,
nova_exchange,
project, cluster_id)
return self.TG.add_thread(ListenerProc, exchange, project, cluster_id)
def _start_check(self, entry):
"""Routine for starting the checking for a cluster.
@ -210,25 +229,25 @@ class HealthManager(service.Service):
:param entry: A dict containing the data associated with the cluster.
:returns: An updated registry entry record.
"""
if entry['check_type'] == consts.NODE_STATUS_POLLING:
cid = entry['cluster_id']
ctype = entry['check_type']
if ctype == consts.NODE_STATUS_POLLING:
# TODO(anyone): Improve this to use one-shot flavor of timer
interval = min(entry['interval'], cfg.CONF.periodic_interval_max)
timer = self.TG.add_timer(interval, self._poll_cluster, None,
entry['cluster_id'], interval)
cid, interval)
entry['timer'] = timer
elif entry['check_type'] == consts.LIFECYCLE_EVENTS:
LOG.info(_LI("Start listening events for cluster (%s)."),
entry['cluster_id'])
listener = self._add_listener(entry['cluster_id'])
elif ctype == consts.LIFECYCLE_EVENTS:
LOG.info(_LI("Start listening events for cluster (%s)."), cid)
listener = self._add_listener(cid)
if listener:
entry['listener'] = listener
else:
LOG.warning(_LW("Error creating listener for cluster %s"), cid)
return None
else:
LOG.warning(_LW("Cluster (%(id)s) check type (%(type)s) is "
"invalid."),
{'id': entry['cluster_id'],
'type': entry['check_type']})
LOG.warning(_LW("Cluster %(id)s check type %(type)s is invalid."),
{'id': cid, 'type': ctype})
return None
return entry

View File

@ -18,17 +18,18 @@ from oslo_config import cfg
from senlin.common import consts
from senlin.common import context
from senlin.common import messaging
from senlin.engine import health_manager
from senlin.engine import health_manager as hm
from senlin.objects import cluster as obj_cluster
from senlin.objects import health_registry as hr
from senlin.objects import node as obj_node
from senlin.objects import profile as obj_profile
from senlin.objects.requests import nodes as vorn
from senlin.rpc import client as rpc_client
from senlin.tests.unit.common import base
@mock.patch('oslo_messaging.NotificationFilter')
class TestNotificationEndpoint(base.SenlinTestCase):
class TestNovaNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_init(self, mock_rpc, mock_filter):
@ -42,7 +43,7 @@ class TestNotificationEndpoint(base.SenlinTestCase):
'compute.instance.soft_delete.end': 'SOFT_DELETE',
}
obj = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER')
obj = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER')
mock_filter.assert_called_once_with(
publisher_id='^compute.*',
@ -61,7 +62,7 @@ class TestNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_info(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID')
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID')
ctx = mock.Mock()
payload = {
'metadata': {
@ -97,7 +98,7 @@ class TestNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_metadata(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID')
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID')
ctx = mock.Mock()
payload = {'metadata': {}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -111,7 +112,7 @@ class TestNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_cluster_in_metadata(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID')
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID')
ctx = mock.Mock()
payload = {'metadata': {'foo': 'bar'}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -125,7 +126,7 @@ class TestNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_cluster_id_not_match(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID')
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID')
ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'FOOBAR'}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -139,7 +140,7 @@ class TestNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_event_type_not_interested(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID')
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID')
ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -153,7 +154,7 @@ class TestNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_node_id(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID')
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID')
ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -168,7 +169,7 @@ class TestNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_default_values(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
endpoint = health_manager.NotificationEndpoint('PROJECT', 'CLUSTER_ID')
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID')
ctx = mock.Mock()
payload = {
'metadata': {
@ -199,14 +200,17 @@ class TestNotificationEndpoint(base.SenlinTestCase):
self.assertEqual(expected_params, req.params)
@mock.patch('senlin.engine.health_manager.NotificationEndpoint')
@mock.patch('senlin.engine.health_manager.NovaNotificationEndpoint')
@mock.patch('oslo_messaging.Target')
@mock.patch('oslo_messaging.get_notification_transport')
@mock.patch('oslo_messaging.get_notification_listener')
class TestListenerProc(base.SenlinTestCase):
def test_listener_proc(self, mock_listener, mock_transport, mock_target,
mock_endpoint):
def test_listener_proc_nova(self, mock_listener, mock_transport,
mock_target, mock_endpoint):
cfg.CONF.set_override('nova_control_exchange', 'FAKE_EXCHANGE',
group='health_manager', enforce_type=True)
x_listener = mock.Mock()
mock_listener.return_value = x_listener
x_transport = mock.Mock()
@ -216,19 +220,32 @@ class TestListenerProc(base.SenlinTestCase):
x_endpoint = mock.Mock()
mock_endpoint.return_value = x_endpoint
res = health_manager.ListenerProc('EXCHANGE', 'PROJECT_ID',
'CLUSTER_ID')
res = hm.ListenerProc('FAKE_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID')
self.assertIsNone(res)
mock_transport.assert_called_once_with(cfg.CONF)
mock_target.assert_called_once_with(topic="versioned_notifications",
exchange='EXCHANGE')
exchange='FAKE_EXCHANGE')
mock_endpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID')
mock_listener.assert_called_once_with(
x_transport, [x_target], [x_endpoint],
executor='threading', pool="senlin-listeners")
x_listener.start.assert_called_once_with()
def test_listener_proc_others(self, mock_listener, mock_transport,
mock_target, mock_endpoint):
x_transport = mock.Mock()
mock_transport.return_value = x_transport
res = hm.ListenerProc('BOGUS', 'PROJECT_ID', 'CLUSTER_ID')
self.assertIsNone(res)
mock_transport.assert_called_once_with(cfg.CONF)
self.assertFalse(mock_listener.called)
self.assertFalse(mock_target.called)
self.assertFalse(mock_endpoint.called)
class TestHealthManager(base.SenlinTestCase):
@ -239,7 +256,7 @@ class TestHealthManager(base.SenlinTestCase):
mock_eng.engine_id = 'ENGINE_ID'
topic = consts.HEALTH_MANAGER_TOPIC
version = consts.RPC_API_VERSION
self.hm = health_manager.HealthManager(mock_eng, topic, version)
self.hm = hm.HealthManager(mock_eng, topic, version)
def test_init(self):
self.assertEqual('ENGINE_ID', self.hm.engine_id)
@ -306,7 +323,7 @@ class TestHealthManager(base.SenlinTestCase):
self.hm.registries[1])
@mock.patch.object(obj_node.Node, 'get_all')
@mock.patch.object(health_manager.HealthManager, "_wait_for_action")
@mock.patch.object(hm.HealthManager, "_wait_for_action")
@mock.patch.object(obj_cluster.Cluster, 'get')
@mock.patch.object(context, 'get_service_context')
@mock.patch.object(context.RequestContext, 'from_dict')
@ -382,7 +399,7 @@ class TestHealthManager(base.SenlinTestCase):
self.assertEqual('PROJECT_ID', ctx.project)
mock_check.assert_called_once_with(ctx, 'cluster_check', mock.ANY)
@mock.patch.object(health_manager.HealthManager, "_wait_for_action")
@mock.patch.object(hm.HealthManager, "_wait_for_action")
@mock.patch.object(obj_cluster.Cluster, 'get')
@mock.patch.object(context, 'get_service_context')
@mock.patch.object(context.RequestContext, 'from_dict')
@ -414,27 +431,75 @@ class TestHealthManager(base.SenlinTestCase):
mock_rpc.assert_called_once_with(ctx, 'cluster_check', mock.ANY)
mock_wait.assert_called_once_with(ctx, "CHECK_ID", 456)
@mock.patch.object(obj_profile.Profile, 'get')
@mock.patch.object(obj_cluster.Cluster, 'get')
def test__add_listener(self, mock_get):
def test__add_listener_nova(self, mock_cluster, mock_profile):
cfg.CONF.set_override('nova_control_exchange', 'FAKE_NOVA_EXCHANGE',
group='health_manager', enforce_type=True)
x_listener = mock.Mock()
mock_add_thread = self.patchobject(self.hm.TG, 'add_thread',
return_value=x_listener)
x_cluster = mock.Mock(project='PROJECT_ID')
mock_get.return_value = x_cluster
x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID')
mock_cluster.return_value = x_cluster
x_profile = mock.Mock(type_name='os.nova.server-1.0')
mock_profile.return_value = x_profile
# do it
res = self.hm._add_listener('CLUSTER_ID')
# assertions
self.assertEqual(x_listener, res)
mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID',
project_safe=False)
mock_add_thread.assert_called_once_with(health_manager.ListenerProc,
'FAKE_NOVA_EXCHANGE',
'PROJECT_ID',
'CLUSTER_ID')
mock_cluster.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID',
project_safe=False)
mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID',
project_safe=False)
mock_add_thread.assert_called_once_with(
hm.ListenerProc, 'FAKE_NOVA_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID')
@mock.patch.object(obj_profile.Profile, 'get')
@mock.patch.object(obj_cluster.Cluster, 'get')
def test__add_listener_heat(self, mock_cluster, mock_profile):
cfg.CONF.set_override('heat_control_exchange', 'FAKE_HEAT_EXCHANGE',
group='health_manager', enforce_type=True)
x_listener = mock.Mock()
mock_add_thread = self.patchobject(self.hm.TG, 'add_thread',
return_value=x_listener)
x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID')
mock_cluster.return_value = x_cluster
x_profile = mock.Mock(type_name='os.heat.stack-1.0')
mock_profile.return_value = x_profile
# do it
res = self.hm._add_listener('CLUSTER_ID')
# assertions
self.assertEqual(x_listener, res)
mock_cluster.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID',
project_safe=False)
mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID',
project_safe=False)
mock_add_thread.assert_called_once_with(
hm.ListenerProc, 'FAKE_HEAT_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID')
@mock.patch.object(obj_profile.Profile, 'get')
@mock.patch.object(obj_cluster.Cluster, 'get')
def test__add_listener_other_types(self, mock_cluster, mock_profile):
mock_add_thread = self.patchobject(self.hm.TG, 'add_thread')
x_cluster = mock.Mock(project='PROJECT_ID', profile_id='PROFILE_ID')
mock_cluster.return_value = x_cluster
x_profile = mock.Mock(type_name='other.types-1.0')
mock_profile.return_value = x_profile
# do it
res = self.hm._add_listener('CLUSTER_ID')
# assertions
self.assertIsNone(res)
mock_cluster.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID',
project_safe=False)
mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID',
project_safe=False)
self.assertFalse(mock_add_thread.called)
@mock.patch.object(obj_cluster.Cluster, 'get')
def test__add_listener_cluster_not_found(self, mock_get):
@ -580,7 +645,7 @@ class TestHealthManager(base.SenlinTestCase):
mock_add_tm.assert_called_with(50, mock_poll, None, 'CLUSTER_ID', 50)
self.assertEqual(1, len(self.hm.registries))
@mock.patch.object(health_manager.HealthManager, '_stop_check')
@mock.patch.object(hm.HealthManager, '_stop_check')
@mock.patch.object(hr.HealthRegistry, 'delete')
def test_unregister_cluster(self, mock_delete, mock_stop):
ctx = mock.Mock()
@ -602,7 +667,7 @@ class TestHealthManager(base.SenlinTestCase):
mock_delete.assert_called_once_with(ctx, 'CLUSTER_ID')
@mock.patch.object(hr.HealthRegistry, 'update')
@mock.patch.object(health_manager.HealthManager, '_start_check')
@mock.patch.object(hm.HealthManager, '_start_check')
def test_enable_cluster(self, mock_start, mock_update):
ctx = mock.Mock()
entry1 = {'cluster_id': 'FAKE_ID', 'enabled': False}
@ -617,7 +682,7 @@ class TestHealthManager(base.SenlinTestCase):
mock_update.assert_called_once_with(ctx, 'FAKE_ID', {'enabled': True})
@mock.patch.object(hr.HealthRegistry, 'update')
@mock.patch.object(health_manager.HealthManager, '_stop_check')
@mock.patch.object(hm.HealthManager, '_stop_check')
def test_disable_cluster(self, mock_stop, mock_update):
ctx = mock.Mock()
entry1 = {'cluster_id': 'FAKE_ID', 'enabled': True}