Fix health manager miss policy action name when node recover

Change-Id: Ica60114ca72c33d6a9ecb6194582c7ddce736714
Closes-Bug: 1732821
This commit is contained in:
Yuanbin.Chen 2017-11-17 14:38:20 +08:00
parent 47eb4dde05
commit bc843245b5
4 changed files with 126 additions and 51 deletions

View File

@ -62,7 +62,7 @@ class NovaNotificationEndpoint(object):
'compute.instance.soft_delete.end': 'SOFT_DELETE', 'compute.instance.soft_delete.end': 'SOFT_DELETE',
} }
def __init__(self, project_id, cluster_id): def __init__(self, project_id, cluster_id, recover_action):
self.filter_rule = messaging.NotificationFilter( self.filter_rule = messaging.NotificationFilter(
publisher_id='^compute.*', publisher_id='^compute.*',
event_type='^compute\.instance\..*', event_type='^compute\.instance\..*',
@ -70,6 +70,7 @@ class NovaNotificationEndpoint(object):
self.project_id = project_id self.project_id = project_id
self.cluster_id = cluster_id self.cluster_id = cluster_id
self.rpc = rpc_client.EngineClient() self.rpc = rpc_client.EngineClient()
self.recover_action = recover_action
def info(self, ctxt, publisher_id, event_type, payload, metadata): def info(self, ctxt, publisher_id, event_type, payload, metadata):
meta = payload['metadata'] meta = payload['metadata']
@ -82,6 +83,7 @@ class NovaNotificationEndpoint(object):
'instance_id': payload.get('instance_id', 'Unknown'), 'instance_id': payload.get('instance_id', 'Unknown'),
'timestamp': metadata['timestamp'], 'timestamp': metadata['timestamp'],
'publisher': publisher_id, 'publisher': publisher_id,
'operation': self.recover_action['operation'],
} }
node_id = meta.get('cluster_node_id') node_id = meta.get('cluster_node_id')
if node_id: if node_id:
@ -111,7 +113,7 @@ class HeatNotificationEndpoint(object):
'orchestration.stack.delete.end': 'DELETE', 'orchestration.stack.delete.end': 'DELETE',
} }
def __init__(self, project_id, cluster_id): def __init__(self, project_id, cluster_id, recover_action):
self.filter_rule = messaging.NotificationFilter( self.filter_rule = messaging.NotificationFilter(
publisher_id='^orchestration.*', publisher_id='^orchestration.*',
event_type='^orchestration\.stack\..*', event_type='^orchestration\.stack\..*',
@ -119,6 +121,7 @@ class HeatNotificationEndpoint(object):
self.project_id = project_id self.project_id = project_id
self.cluster_id = cluster_id self.cluster_id = cluster_id
self.rpc = rpc_client.EngineClient() self.rpc = rpc_client.EngineClient()
self.recover_action = recover_action
def info(self, ctxt, publisher_id, event_type, payload, metadata): def info(self, ctxt, publisher_id, event_type, payload, metadata):
if event_type not in self.STACK_FAILURE_EVENTS: if event_type not in self.STACK_FAILURE_EVENTS:
@ -149,6 +152,7 @@ class HeatNotificationEndpoint(object):
'stack_id': payload.get('stack_identity', 'Unknown'), 'stack_id': payload.get('stack_identity', 'Unknown'),
'timestamp': metadata['timestamp'], 'timestamp': metadata['timestamp'],
'publisher': publisher_id, 'publisher': publisher_id,
'operation': self.recover_action['operation'],
} }
LOG.info("Requesting stack recovery: %s", node_id) LOG.info("Requesting stack recovery: %s", node_id)
ctx = context.get_service_context(project=self.project_id, ctx = context.get_service_context(project=self.project_id,
@ -157,12 +161,13 @@ class HeatNotificationEndpoint(object):
self.rpc.call(ctx, 'node_recover', req) self.rpc.call(ctx, 'node_recover', req)
def ListenerProc(exchange, project_id, cluster_id): def ListenerProc(exchange, project_id, cluster_id, recover_action):
"""Thread procedure for running a event listener. """Thread procedure for running a event listener.
:param exchange: The control exchange for a target service. :param exchange: The control exchange for a target service.
:param project_id: The ID of the project to filter. :param project_id: The ID of the project to filter.
:param cluster_id: The ID of the cluster to filter. :param cluster_id: The ID of the cluster to filter.
:param recover_action: The health policy action name.
""" """
transport = messaging.get_notification_transport(cfg.CONF) transport = messaging.get_notification_transport(cfg.CONF)
@ -172,14 +177,14 @@ def ListenerProc(exchange, project_id, cluster_id):
exchange=exchange), exchange=exchange),
] ]
endpoints = [ endpoints = [
NovaNotificationEndpoint(project_id, cluster_id), NovaNotificationEndpoint(project_id, cluster_id, recover_action),
] ]
else: # heat notification else: # heat notification
targets = [ targets = [
messaging.Target(topic='notifications', exchange=exchange), messaging.Target(topic='notifications', exchange=exchange),
] ]
endpoints = [ endpoints = [
HeatNotificationEndpoint(project_id, cluster_id), HeatNotificationEndpoint(project_id, cluster_id, recover_action),
] ]
listener = messaging.get_notification_listener( listener = messaging.get_notification_listener(
@ -232,11 +237,12 @@ class HealthManager(service.Service):
else: else:
return False, "Cluster check action failed or cancelled" return False, "Cluster check action failed or cancelled"
def _poll_cluster(self, cluster_id, timeout): def _poll_cluster(self, cluster_id, timeout, recover_action):
"""Routine to be executed for polling cluster status. """Routine to be executed for polling cluster status.
:param cluster_id: The UUID of the cluster to be checked. :param cluster_id: The UUID of the cluster to be checked.
:param timeout: The maximum number of seconds to wait. :param timeout: The maximum number of seconds to wait.
:param recover_action: The health policy action name.
:returns: Nothing. :returns: Nothing.
""" """
start_time = timeutils.utcnow(True) start_time = timeutils.utcnow(True)
@ -269,15 +275,17 @@ class HealthManager(service.Service):
for node in nodes: for node in nodes:
if node.status != 'ACTIVE': if node.status != 'ACTIVE':
LOG.info("Requesting node recovery: %s", node.id) LOG.info("Requesting node recovery: %s", node.id)
req = objects.NodeRecoverRequest(identity=node.id) req = objects.NodeRecoverRequest(identity=node.id,
params=recover_action)
self.rpc_client.call(ctx, 'node_recover', req) self.rpc_client.call(ctx, 'node_recover', req)
return _chase_up(start_time, timeout) return _chase_up(start_time, timeout)
def _add_listener(self, cluster_id): def _add_listener(self, cluster_id, recover_action):
"""Routine to be executed for adding cluster listener. """Routine to be executed for adding cluster listener.
:param cluster_id: The UUID of the cluster to be filtered. :param cluster_id: The UUID of the cluster to be filtered.
:param recover_action: The health policy action name.
:returns: Nothing. :returns: Nothing.
""" """
cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False) cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False)
@ -295,7 +303,8 @@ class HealthManager(service.Service):
return None return None
project = cluster.project project = cluster.project
return self.TG.add_thread(ListenerProc, exchange, project, cluster_id) return self.TG.add_thread(ListenerProc, exchange, project, cluster_id,
recover_action)
def _start_check(self, entry): def _start_check(self, entry):
"""Routine for starting the checking for a cluster. """Routine for starting the checking for a cluster.
@ -305,16 +314,24 @@ class HealthManager(service.Service):
""" """
cid = entry['cluster_id'] cid = entry['cluster_id']
ctype = entry['check_type'] ctype = entry['check_type']
# Get the recover action parameter from the entry params
params = entry['params']
recover_action = {}
if 'recover_action' in params:
rac = params['recover_action']
for operation in rac:
recover_action['operation'] = operation.get('name')
if ctype == consts.NODE_STATUS_POLLING: if ctype == consts.NODE_STATUS_POLLING:
interval = min(entry['interval'], cfg.CONF.check_interval_max) interval = min(entry['interval'], cfg.CONF.check_interval_max)
timer = self.TG.add_dynamic_timer(self._poll_cluster, timer = self.TG.add_dynamic_timer(self._poll_cluster,
None, # initial_delay None, # initial_delay
None, # check_interval_max None, # check_interval_max
cid, interval) cid, interval, recover_action)
entry['timer'] = timer entry['timer'] = timer
elif ctype == consts.LIFECYCLE_EVENTS: elif ctype == consts.LIFECYCLE_EVENTS:
LOG.info("Start listening events for cluster (%s).", cid) LOG.info("Start listening events for cluster (%s).", cid)
listener = self._add_listener(cid) listener = self._add_listener(cid, recover_action)
if listener: if listener:
entry['listener'] = listener entry['listener'] = listener
else: else:

View File

@ -191,7 +191,7 @@ class HealthPolicy(base.Policy):
kwargs = { kwargs = {
'check_type': self.check_type, 'check_type': self.check_type,
'interval': self.interval, 'interval': self.interval,
'params': {}, 'params': {'recover_action': self.recover_actions},
'enabled': enabled 'enabled': enabled
} }

View File

@ -63,8 +63,8 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
'compute.instance.shutdown.end': 'SHUTDOWN', 'compute.instance.shutdown.end': 'SHUTDOWN',
'compute.instance.soft_delete.end': 'SOFT_DELETE', 'compute.instance.soft_delete.end': 'SOFT_DELETE',
} }
recover_action = {'operation': 'REBUILD'}
obj = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER') obj = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER', recover_action)
mock_filter.assert_called_once_with( mock_filter.assert_called_once_with(
publisher_id='^compute.*', publisher_id='^compute.*',
@ -83,7 +83,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info(self, mock_rpc, mock_context, mock_filter): def test_info(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = { payload = {
'metadata': { 'metadata': {
@ -113,13 +115,16 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
'instance_id': 'PHYSICAL_ID', 'instance_id': 'PHYSICAL_ID',
'timestamp': 'TIMESTAMP', 'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER', 'publisher': 'PUBLISHER',
'operation': 'REBUILD'
} }
self.assertEqual(expected_params, req.params) self.assertEqual(expected_params, req.params)
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_metadata(self, mock_rpc, mock_filter): def test_info_no_metadata(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'metadata': {}} payload = {'metadata': {}}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -133,7 +138,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_cluster_in_metadata(self, mock_rpc, mock_filter): def test_info_no_cluster_in_metadata(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'metadata': {'foo': 'bar'}} payload = {'metadata': {'foo': 'bar'}}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -147,7 +154,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_cluster_id_not_match(self, mock_rpc, mock_filter): def test_info_cluster_id_not_match(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'FOOBAR'}} payload = {'metadata': {'cluster_id': 'FOOBAR'}}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -161,7 +170,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_event_type_not_interested(self, mock_rpc, mock_filter): def test_info_event_type_not_interested(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}} payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -175,7 +186,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_node_id(self, mock_rpc, mock_filter): def test_info_no_node_id(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}} payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -190,7 +203,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_default_values(self, mock_rpc, mock_context, mock_filter): def test_info_default_values(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = { payload = {
'metadata': { 'metadata': {
@ -217,6 +232,7 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
'instance_id': 'Unknown', 'instance_id': 'Unknown',
'timestamp': 'TIMESTAMP', 'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER', 'publisher': 'PUBLISHER',
'operation': 'REBUILD',
} }
self.assertEqual(expected_params, req.params) self.assertEqual(expected_params, req.params)
@ -230,8 +246,8 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
event_map = { event_map = {
'orchestration.stack.delete.end': 'DELETE', 'orchestration.stack.delete.end': 'DELETE',
} }
recover_action = {'operation': 'REBUILD'}
obj = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER') obj = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER', recover_action)
mock_filter.assert_called_once_with( mock_filter.assert_called_once_with(
publisher_id='^orchestration.*', publisher_id='^orchestration.*',
@ -250,7 +266,9 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info(self, mock_rpc, mock_context, mock_filter): def test_info(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = { payload = {
'tags': { 'tags': {
@ -280,13 +298,16 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
'stack_id': 'PHYSICAL_ID', 'stack_id': 'PHYSICAL_ID',
'timestamp': 'TIMESTAMP', 'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER', 'publisher': 'PUBLISHER',
'operation': 'REBUILD',
} }
self.assertEqual(expected_params, req.params) self.assertEqual(expected_params, req.params)
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_event_type_not_interested(self, mock_rpc, mock_filter): def test_info_event_type_not_interested(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'tags': {'cluster_id': 'CLUSTER_ID'}} payload = {'tags': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -301,7 +322,9 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_tag(self, mock_rpc, mock_filter): def test_info_no_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'tags': None} payload = {'tags': None}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -315,7 +338,9 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_empty_tag(self, mock_rpc, mock_filter): def test_info_empty_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'tags': []} payload = {'tags': []}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -329,7 +354,9 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_cluster_in_tag(self, mock_rpc, mock_filter): def test_info_no_cluster_in_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'tags': ['foo', 'bar']} payload = {'tags': ['foo', 'bar']}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -343,7 +370,9 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_node_in_tag(self, mock_rpc, mock_filter): def test_info_no_node_in_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'tags': ['cluster_id=C1ID']} payload = {'tags': ['cluster_id=C1ID']}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -357,7 +386,9 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_cluster_id_not_match(self, mock_rpc, mock_filter): def test_info_cluster_id_not_match(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = {'tags': ['cluster_id=FOOBAR', 'cluster_node_id=N2']} payload = {'tags': ['cluster_id=FOOBAR', 'cluster_node_id=N2']}
metadata = {'timestamp': 'TIMESTAMP'} metadata = {'timestamp': 'TIMESTAMP'}
@ -372,7 +403,9 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient') @mock.patch('senlin.rpc.client.EngineClient')
def test_info_default_values(self, mock_rpc, mock_context, mock_filter): def test_info_default_values(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value x_rpc = mock_rpc.return_value
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock() ctx = mock.Mock()
payload = { payload = {
'tags': [ 'tags': [
@ -399,6 +432,7 @@ class TestHeatNotificationEndpoint(base.SenlinTestCase):
'stack_id': 'Unknown', 'stack_id': 'Unknown',
'timestamp': 'TIMESTAMP', 'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER', 'publisher': 'PUBLISHER',
'operation': 'REBUILD',
} }
self.assertEqual(expected_params, req.params) self.assertEqual(expected_params, req.params)
@ -425,13 +459,16 @@ class TestListenerProc(base.SenlinTestCase):
x_endpoint = mock.Mock() x_endpoint = mock.Mock()
mock_novaendpoint.return_value = x_endpoint mock_novaendpoint.return_value = x_endpoint
res = hm.ListenerProc('FAKE_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
res = hm.ListenerProc('FAKE_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID',
recover_action)
self.assertIsNone(res) self.assertIsNone(res)
mock_transport.assert_called_once_with(cfg.CONF) mock_transport.assert_called_once_with(cfg.CONF)
mock_target.assert_called_once_with(topic="versioned_notifications", mock_target.assert_called_once_with(topic="versioned_notifications",
exchange='FAKE_EXCHANGE') exchange='FAKE_EXCHANGE')
mock_novaendpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID') mock_novaendpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID',
recover_action)
mock_listener.assert_called_once_with( mock_listener.assert_called_once_with(
x_transport, [x_target], [x_endpoint], x_transport, [x_target], [x_endpoint],
executor='threading', pool="senlin-listeners") executor='threading', pool="senlin-listeners")
@ -449,13 +486,16 @@ class TestListenerProc(base.SenlinTestCase):
x_endpoint = mock.Mock() x_endpoint = mock.Mock()
mock_heatendpoint.return_value = x_endpoint mock_heatendpoint.return_value = x_endpoint
res = hm.ListenerProc('heat', 'PROJECT_ID', 'CLUSTER_ID') recover_action = {'operation': 'REBUILD'}
res = hm.ListenerProc('heat', 'PROJECT_ID', 'CLUSTER_ID',
recover_action)
self.assertIsNone(res) self.assertIsNone(res)
mock_transport.assert_called_once_with(cfg.CONF) mock_transport.assert_called_once_with(cfg.CONF)
mock_target.assert_called_once_with(topic="notifications", mock_target.assert_called_once_with(topic="notifications",
exchange='heat') exchange='heat')
mock_heatendpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID') mock_heatendpoint.assert_called_once_with('PROJECT_ID', 'CLUSTER_ID',
recover_action)
mock_listener.assert_called_once_with( mock_listener.assert_called_once_with(
x_transport, [x_target], [x_endpoint], x_transport, [x_target], [x_endpoint],
executor='threading', pool="senlin-listeners") executor='threading', pool="senlin-listeners")
@ -516,7 +556,7 @@ class TestHealthManager(base.SenlinTestCase):
# assertions # assertions
mock_claim.assert_called_once_with(self.hm.ctx, self.hm.engine_id) mock_claim.assert_called_once_with(self.hm.ctx, self.hm.engine_id)
mock_calls = [ mock_calls = [
mock.call(self.hm._poll_cluster, None, None, 'CID1', 12) mock.call(self.hm._poll_cluster, None, None, 'CID1', 12, {})
] ]
mock_add_timer.assert_has_calls(mock_calls) mock_add_timer.assert_has_calls(mock_calls)
self.assertEqual(2, len(self.hm.registries)) self.assertEqual(2, len(self.hm.registries))
@ -559,8 +599,9 @@ class TestHealthManager(base.SenlinTestCase):
x_action_recover = {'action': 'RECOVER_ID'} x_action_recover = {'action': 'RECOVER_ID'}
mock_rpc.side_effect = [x_action_check, x_action_recover] mock_rpc.side_effect = [x_action_check, x_action_recover]
recover_action = {'operation': 'REBUILD'}
# do it # do it
res = self.hm._poll_cluster('CLUSTER_ID', 456) res = self.hm._poll_cluster('CLUSTER_ID', 456, recover_action)
self.assertEqual(mock_chase.return_value, res) self.assertEqual(mock_chase.return_value, res)
mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID',
@ -580,8 +621,9 @@ class TestHealthManager(base.SenlinTestCase):
def test__poll_cluster_not_found(self, mock_check, mock_get, mock_chase): def test__poll_cluster_not_found(self, mock_check, mock_get, mock_chase):
mock_get.return_value = None mock_get.return_value = None
recover_action = {'operation': 'REBUILD'}
# do it # do it
res = self.hm._poll_cluster('CLUSTER_ID', 123) res = self.hm._poll_cluster('CLUSTER_ID', 123, recover_action)
self.assertEqual(mock_chase.return_value, res) self.assertEqual(mock_chase.return_value, res)
self.assertEqual(0, mock_check.call_count) self.assertEqual(0, mock_check.call_count)
@ -599,8 +641,9 @@ class TestHealthManager(base.SenlinTestCase):
mock_ctx.return_value = ctx mock_ctx.return_value = ctx
mock_check.side_effect = Exception("boom") mock_check.side_effect = Exception("boom")
recover_action = {'operation': 'REBUILD'}
# do it # do it
res = self.hm._poll_cluster('CLUSTER_ID', 123) res = self.hm._poll_cluster('CLUSTER_ID', 123, recover_action)
self.assertEqual(mock_chase.return_value, res) self.assertEqual(mock_chase.return_value, res)
mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID',
@ -624,8 +667,9 @@ class TestHealthManager(base.SenlinTestCase):
x_action_check = {'action': 'CHECK_ID'} x_action_check = {'action': 'CHECK_ID'}
mock_rpc.return_value = x_action_check mock_rpc.return_value = x_action_check
recover_action = {'operation': 'REBUILD'}
# do it # do it
res = self.hm._poll_cluster('CLUSTER_ID', 456) res = self.hm._poll_cluster('CLUSTER_ID', 456, recover_action)
self.assertEqual(mock_chase.return_value, res) self.assertEqual(mock_chase.return_value, res)
mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID', mock_get.assert_called_once_with(self.hm.ctx, 'CLUSTER_ID',
@ -648,8 +692,9 @@ class TestHealthManager(base.SenlinTestCase):
x_profile = mock.Mock(type='os.nova.server-1.0') x_profile = mock.Mock(type='os.nova.server-1.0')
mock_profile.return_value = x_profile mock_profile.return_value = x_profile
recover_action = {'operation': 'REBUILD'}
# do it # do it
res = self.hm._add_listener('CLUSTER_ID') res = self.hm._add_listener('CLUSTER_ID', recover_action)
# assertions # assertions
self.assertEqual(x_listener, res) self.assertEqual(x_listener, res)
@ -658,7 +703,8 @@ class TestHealthManager(base.SenlinTestCase):
mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID', mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID',
project_safe=False) project_safe=False)
mock_add_thread.assert_called_once_with( mock_add_thread.assert_called_once_with(
hm.ListenerProc, 'FAKE_NOVA_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID') hm.ListenerProc, 'FAKE_NOVA_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID',
recover_action)
@mock.patch.object(obj_profile.Profile, 'get') @mock.patch.object(obj_profile.Profile, 'get')
@mock.patch.object(obj_cluster.Cluster, 'get') @mock.patch.object(obj_cluster.Cluster, 'get')
@ -673,8 +719,9 @@ class TestHealthManager(base.SenlinTestCase):
x_profile = mock.Mock(type='os.heat.stack-1.0') x_profile = mock.Mock(type='os.heat.stack-1.0')
mock_profile.return_value = x_profile mock_profile.return_value = x_profile
recover_action = {'operation': 'REBUILD'}
# do it # do it
res = self.hm._add_listener('CLUSTER_ID') res = self.hm._add_listener('CLUSTER_ID', recover_action)
# assertions # assertions
self.assertEqual(x_listener, res) self.assertEqual(x_listener, res)
@ -683,7 +730,8 @@ class TestHealthManager(base.SenlinTestCase):
mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID', mock_profile.assert_called_once_with(self.hm.ctx, 'PROFILE_ID',
project_safe=False) project_safe=False)
mock_add_thread.assert_called_once_with( mock_add_thread.assert_called_once_with(
hm.ListenerProc, 'FAKE_HEAT_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID') hm.ListenerProc, 'FAKE_HEAT_EXCHANGE', 'PROJECT_ID', 'CLUSTER_ID',
recover_action)
@mock.patch.object(obj_profile.Profile, 'get') @mock.patch.object(obj_profile.Profile, 'get')
@mock.patch.object(obj_cluster.Cluster, 'get') @mock.patch.object(obj_cluster.Cluster, 'get')
@ -694,8 +742,9 @@ class TestHealthManager(base.SenlinTestCase):
x_profile = mock.Mock(type='other.types-1.0') x_profile = mock.Mock(type='other.types-1.0')
mock_profile.return_value = x_profile mock_profile.return_value = x_profile
recover_action = {'operation': 'REBUILD'}
# do it # do it
res = self.hm._add_listener('CLUSTER_ID') res = self.hm._add_listener('CLUSTER_ID', recover_action)
# assertions # assertions
self.assertIsNone(res) self.assertIsNone(res)
@ -710,8 +759,9 @@ class TestHealthManager(base.SenlinTestCase):
mock_get.return_value = None mock_get.return_value = None
mock_add_thread = self.patchobject(self.hm.TG, 'add_thread') mock_add_thread = self.patchobject(self.hm.TG, 'add_thread')
recover_action = {'operation': 'REBUILD'}
# do it # do it
res = self.hm._add_listener('CLUSTER_ID') res = self.hm._add_listener('CLUSTER_ID', recover_action)
# assertions # assertions
self.assertIsNone(res) self.assertIsNone(res)
@ -728,14 +778,16 @@ class TestHealthManager(base.SenlinTestCase):
'cluster_id': 'CCID', 'cluster_id': 'CCID',
'interval': 12, 'interval': 12,
'check_type': consts.NODE_STATUS_POLLING, 'check_type': consts.NODE_STATUS_POLLING,
'params': {'recover_action': [{'name': 'REBUILD'}]},
} }
recover_action = {'operation': 'REBUILD'}
res = self.hm._start_check(entry) res = self.hm._start_check(entry)
expected = copy.deepcopy(entry) expected = copy.deepcopy(entry)
expected['timer'] = x_timer expected['timer'] = x_timer
self.assertEqual(expected, res) self.assertEqual(expected, res)
mock_add_timer.assert_called_once_with( mock_add_timer.assert_called_once_with(
self.hm._poll_cluster, None, None, 'CCID', 12) self.hm._poll_cluster, None, None, 'CCID', 12, recover_action)
def test__start_check_for_listening(self): def test__start_check_for_listening(self):
x_listener = mock.Mock() x_listener = mock.Mock()
@ -745,13 +797,15 @@ class TestHealthManager(base.SenlinTestCase):
entry = { entry = {
'cluster_id': 'CCID', 'cluster_id': 'CCID',
'check_type': consts.LIFECYCLE_EVENTS, 'check_type': consts.LIFECYCLE_EVENTS,
'params': {'recover_action': [{'name': 'REBUILD'}]},
} }
recover_action = {'operation': 'REBUILD'}
res = self.hm._start_check(entry) res = self.hm._start_check(entry)
expected = copy.deepcopy(entry) expected = copy.deepcopy(entry)
expected['listener'] = x_listener expected['listener'] = x_listener
self.assertEqual(expected, res) self.assertEqual(expected, res)
mock_add_listener.assert_called_once_with('CCID') mock_add_listener.assert_called_once_with('CCID', recover_action)
def test__start_check_for_listening_failed(self): def test__start_check_for_listening_failed(self):
mock_add_listener = self.patchobject(self.hm, '_add_listener', mock_add_listener = self.patchobject(self.hm, '_add_listener',
@ -760,16 +814,19 @@ class TestHealthManager(base.SenlinTestCase):
entry = { entry = {
'cluster_id': 'CCID', 'cluster_id': 'CCID',
'check_type': consts.LIFECYCLE_EVENTS, 'check_type': consts.LIFECYCLE_EVENTS,
'params': {'recover_action': [{'name': 'REBUILD'}]},
} }
recover_action = {'operation': 'REBUILD'}
res = self.hm._start_check(entry) res = self.hm._start_check(entry)
self.assertIsNone(res) self.assertIsNone(res)
mock_add_listener.assert_called_once_with('CCID') mock_add_listener.assert_called_once_with('CCID', recover_action)
def test__start_check_other_types(self): def test__start_check_other_types(self):
entry = { entry = {
'cluster_id': 'CCID', 'cluster_id': 'CCID',
'check_type': 'BOGUS TYPE', 'check_type': 'BOGUS TYPE',
'params': {'recover_action': [{'name': 'REBUILD'}]},
} }
res = self.hm._start_check(entry) res = self.hm._start_check(entry)
@ -844,7 +901,8 @@ class TestHealthManager(base.SenlinTestCase):
mock_reg_create.assert_called_once_with( mock_reg_create.assert_called_once_with(
ctx, 'CLUSTER_ID', consts.NODE_STATUS_POLLING, 50, {}, 'ENGINE_ID', ctx, 'CLUSTER_ID', consts.NODE_STATUS_POLLING, 50, {}, 'ENGINE_ID',
enabled=True) enabled=True)
mock_add_tm.assert_called_with(mock_poll, None, None, 'CLUSTER_ID', 50) mock_add_tm.assert_called_with(mock_poll, None, None, 'CLUSTER_ID', 50,
{})
self.assertEqual(1, len(self.hm.registries)) self.assertEqual(1, len(self.hm.registries))
@mock.patch.object(hr.HealthRegistry, 'create') @mock.patch.object(hr.HealthRegistry, 'create')

View File

@ -100,7 +100,7 @@ class TestHealthPolicy(base.SenlinTestCase):
kwargs = { kwargs = {
'check_type': self.hp.check_type, 'check_type': self.hp.check_type,
'interval': self.hp.interval, 'interval': self.hp.interval,
'params': {}, 'params': {'recover_action': self.hp.recover_actions},
'enabled': True 'enabled': True
} }
mock_hm_reg.assert_called_once_with('CLUSTER_ID', mock_hm_reg.assert_called_once_with('CLUSTER_ID',