Removed duplicate notification implementations

We ended up with duplicate implementations
for nova and heat endpoints. This patch
removes the duplicates and merges the code.

Change-Id: I0858af84509356326323fd224498256ff585a150
This commit is contained in:
Erik Olof Gunnar Andersson 2019-10-26 23:22:12 -07:00
parent 29c0e6fbdf
commit 3d20220f34
7 changed files with 503 additions and 356 deletions

View File

@ -31,6 +31,8 @@ from senlin.common import context
from senlin.common import messaging as rpc
from senlin.common import utils
from senlin.engine import node as node_mod
from senlin.engine.notifications import heat_endpoint
from senlin.engine.notifications import nova_endpoint
from senlin import objects
from senlin.rpc import client as rpc_client
@ -54,115 +56,6 @@ def chase_up(start_time, interval, name='Poller'):
return (missed + 1) * interval - elapsed
class NovaNotificationEndpoint(object):
VM_FAILURE_EVENTS = {
'compute.instance.pause.end': 'PAUSE',
'compute.instance.power_off.end': 'POWER_OFF',
'compute.instance.rebuild.error': 'REBUILD',
'compute.instance.shutdown.end': 'SHUTDOWN',
'compute.instance.soft_delete.end': 'SOFT_DELETE',
}
def __init__(self, project_id, cluster_id, recover_action):
self.filter_rule = messaging.NotificationFilter(
publisher_id='^compute.*',
event_type='^compute\.instance\..*',
context={'project_id': '^%s$' % project_id})
self.project_id = project_id
self.cluster_id = cluster_id
self.rpc = rpc_client.EngineClient()
self.recover_action = recover_action
def info(self, ctxt, publisher_id, event_type, payload, metadata):
meta = payload['metadata']
if meta.get('cluster_id') == self.cluster_id:
if event_type not in self.VM_FAILURE_EVENTS:
return
params = {
'event': self.VM_FAILURE_EVENTS[event_type],
'state': payload.get('state', 'Unknown'),
'instance_id': payload.get('instance_id', 'Unknown'),
'timestamp': metadata['timestamp'],
'publisher': publisher_id,
'operation': self.recover_action['operation'],
}
node_id = meta.get('cluster_node_id')
if node_id:
LOG.info("Requesting node recovery: %s", node_id)
ctx = context.get_service_context(project_id=self.project_id,
user_id=payload['user_id'])
req = objects.NodeRecoverRequest(identity=node_id,
params=params)
self.rpc.call(ctx, 'node_recover', req)
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
meta = payload.get('metadata', {})
if meta.get('cluster_id') == self.cluster_id:
LOG.warning("publisher=%s", publisher_id)
LOG.warning("event_type=%s", event_type)
def debug(self, ctxt, publisher_id, event_type, payload, metadata):
meta = payload.get('metadata', {})
if meta.get('cluster_id') == self.cluster_id:
LOG.debug("publisher=%s", publisher_id)
LOG.debug("event_type=%s", event_type)
class HeatNotificationEndpoint(object):
STACK_FAILURE_EVENTS = {
'orchestration.stack.delete.end': 'DELETE',
}
def __init__(self, project_id, cluster_id, recover_action):
self.filter_rule = messaging.NotificationFilter(
publisher_id='^orchestration.*',
event_type='^orchestration\.stack\..*',
context={'project_id': '^%s$' % project_id})
self.project_id = project_id
self.cluster_id = cluster_id
self.rpc = rpc_client.EngineClient()
self.recover_action = recover_action
def info(self, ctxt, publisher_id, event_type, payload, metadata):
if event_type not in self.STACK_FAILURE_EVENTS:
return
tags = payload['tags']
if tags is None or tags == []:
return
cluster_id = None
node_id = None
for tag in tags:
if cluster_id is None:
start = tag.find('cluster_id')
if start == 0 and tag[11:] == self.cluster_id:
cluster_id = tag[11:]
if node_id is None:
start = tag.find('cluster_node_id')
if start == 0:
node_id = tag[16:]
if cluster_id is None or node_id is None:
return
params = {
'event': self.STACK_FAILURE_EVENTS[event_type],
'state': payload.get('state', 'Unknown'),
'stack_id': payload.get('stack_identity', 'Unknown'),
'timestamp': metadata['timestamp'],
'publisher': publisher_id,
'operation': self.recover_action['operation'],
}
LOG.info("Requesting stack recovery: %s", node_id)
ctx = context.get_service_context(project_id=self.project_id,
user_id=payload['user_identity'])
req = objects.NodeRecoverRequest(identity=node_id, params=params)
self.rpc.call(ctx, 'node_recover', req)
def ListenerProc(exchange, project_id, cluster_id, recover_action):
"""Thread procedure for running an event listener.
@ -179,14 +72,18 @@ def ListenerProc(exchange, project_id, cluster_id, recover_action):
exchange=exchange),
]
endpoints = [
NovaNotificationEndpoint(project_id, cluster_id, recover_action),
nova_endpoint.NovaNotificationEndpoint(
project_id, cluster_id, recover_action
),
]
else: # heat notification
targets = [
messaging.Target(topic='notifications', exchange=exchange),
]
endpoints = [
HeatNotificationEndpoint(project_id, cluster_id, recover_action),
heat_endpoint.HeatNotificationEndpoint(
project_id, cluster_id, recover_action
),
]
listener = messaging.get_notification_listener(

View File

@ -19,8 +19,8 @@ LOG = logging.getLogger(__name__)
class Endpoints(object):
def __init__(self, project_id, engine_id, recover_action):
self.engine_id = engine_id
def __init__(self, project_id, cluster_id, recover_action):
self.cluster_id = cluster_id
self.project_id = project_id
self.recover_action = recover_action

View File

@ -28,15 +28,15 @@ class HeatNotificationEndpoint(base.Endpoints):
'orchestration.stack.delete.end': 'DELETE',
}
def __init__(self, project_id, engine_id, recover_action):
def __init__(self, project_id, cluster_id, recover_action):
super(HeatNotificationEndpoint, self).__init__(
project_id, cluster_id, recover_action
)
self.filter_rule = messaging.NotificationFilter(
publisher_id='^orchestration.*',
event_type='^orchestration\.stack\..*',
context={'project_id': '^%s$' % project_id})
self.project_id = project_id
self.engine_id = engine_id
self.rpc = rpc_client.EngineClient()
self.recover_action = recover_action
self.exchange = cfg.CONF.health_manager.heat_control_exchange
self.target = messaging.Target(topic='notifications',
exchange=self.exchange)
@ -54,7 +54,7 @@ class HeatNotificationEndpoint(base.Endpoints):
for tag in tags:
if cluster_id is None:
start = tag.find('cluster_id')
if start == 0 and tag[11:]:
if start == 0 and tag[11:] == self.cluster_id:
cluster_id = tag[11:]
if node_id is None:
start = tag.find('cluster_node_id')
@ -64,12 +64,6 @@ class HeatNotificationEndpoint(base.Endpoints):
if cluster_id is None or node_id is None:
return
ctx = context.get_service_context(project=self.project_id,
user=payload['user_identity'])
enabled = self._check_registry_status(ctx, self.engine_id, cluster_id)
if enabled is False:
return
params = {
'event': self.STACK_FAILURE_EVENTS[event_type],
'state': payload.get('state', 'Unknown'),
@ -79,5 +73,7 @@ class HeatNotificationEndpoint(base.Endpoints):
'operation': self.recover_action['operation'],
}
LOG.info("Requesting stack recovery: %s", node_id)
ctx = context.get_service_context(project_id=self.project_id,
user_id=payload['user_identity'])
req = objects.NodeRecoverRequest(identity=node_id, params=params)
self.rpc.call(ctx, 'node_recover', req)

View File

@ -32,15 +32,15 @@ class NovaNotificationEndpoint(base.Endpoints):
'compute.instance.soft_delete.end': 'SOFT_DELETE',
}
def __init__(self, project_id, engine_id, recover_action):
def __init__(self, project_id, cluster_id, recover_action):
super(NovaNotificationEndpoint, self).__init__(
project_id, cluster_id, recover_action
)
self.filter_rule = messaging.NotificationFilter(
publisher_id='^compute.*',
event_type='^compute\.instance\..*',
context={'project_id': '^%s$' % project_id})
self.project_id = project_id
self.engine_id = engine_id
self.rpc = rpc_client.EngineClient()
self.recover_action = recover_action
self.exchange = cfg.CONF.health_manager.nova_control_exchange
self.target = messaging.Target(topic='versioned_notifications',
exchange=self.exchange)
@ -51,13 +51,10 @@ class NovaNotificationEndpoint(base.Endpoints):
if not cluster_id:
return
if event_type not in self.VM_FAILURE_EVENTS:
if self.cluster_id != cluster_id:
return
ctx = context.get_service_context(project=self.project_id,
user=payload['user_id'])
enabled = self._check_registry_status(ctx, self.engine_id, cluster_id)
if enabled is False:
if event_type not in self.VM_FAILURE_EVENTS:
return
params = {
@ -71,6 +68,8 @@ class NovaNotificationEndpoint(base.Endpoints):
node_id = meta.get('cluster_node_id')
if node_id:
LOG.info("Requesting node recovery: %s", node_id)
ctx = context.get_service_context(project_id=self.project_id,
user_id=payload['user_id'])
req = objects.NodeRecoverRequest(identity=node_id,
params=params)
self.rpc.call(ctx, 'node_recover', req)

View File

@ -0,0 +1,230 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from senlin.common import context
from senlin.engine.notifications import heat_endpoint
from senlin import objects
from senlin.tests.unit.common import base
@mock.patch('oslo_messaging.NotificationFilter')
class TestHeatNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_init(self, mock_rpc, mock_filter):
x_filter = mock_filter.return_value
event_map = {
'orchestration.stack.delete.end': 'DELETE',
}
recover_action = {'operation': 'REBUILD'}
endpoint = heat_endpoint.HeatNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
mock_filter.assert_called_once_with(
publisher_id='^orchestration.*',
event_type='^orchestration\.stack\..*',
context={'project_id': '^PROJECT$'})
mock_rpc.assert_called_once_with()
self.assertEqual(x_filter, endpoint.filter_rule)
self.assertEqual(mock_rpc.return_value, endpoint.rpc)
for e in event_map:
self.assertIn(e, endpoint.STACK_FAILURE_EVENTS)
self.assertEqual(event_map[e], endpoint.STACK_FAILURE_EVENTS[e])
self.assertEqual('PROJECT', endpoint.project_id)
self.assertEqual('CLUSTER_ID', endpoint.cluster_id)
@mock.patch.object(context.RequestContext, 'from_dict')
@mock.patch('senlin.rpc.client.EngineClient')
def test_info(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = heat_endpoint.HeatNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {
'tags': {
'cluster_id=CLUSTER_ID',
'cluster_node_id=FAKE_NODE',
'cluster_node_index=123',
},
'stack_identity': 'PHYSICAL_ID',
'user_identity': 'USER',
'state': 'DELETE_COMPLETE',
}
metadata = {'timestamp': 'TIMESTAMP'}
call_ctx = mock.Mock()
mock_context.return_value = call_ctx
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
x_rpc.call.assert_called_once_with(call_ctx, 'node_recover', mock.ANY)
req = x_rpc.call.call_args[0][2]
self.assertIsInstance(req, objects.NodeRecoverRequest)
self.assertEqual('FAKE_NODE', req.identity)
expected_params = {
'event': 'DELETE',
'state': 'DELETE_COMPLETE',
'stack_id': 'PHYSICAL_ID',
'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER',
'operation': 'REBUILD',
}
self.assertEqual(expected_params, req.params)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_event_type_not_interested(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = heat_endpoint.HeatNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'tags': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER',
'orchestration.stack.create.start',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = heat_endpoint.HeatNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'tags': None}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_empty_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = heat_endpoint.HeatNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'tags': []}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_cluster_in_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = heat_endpoint.HeatNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'tags': ['foo', 'bar']}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_node_in_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = heat_endpoint.HeatNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'tags': ['cluster_id=C1ID']}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_cluster_id_not_match(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = heat_endpoint.HeatNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {
'tags': ['cluster_id=FOOBAR', 'cluster_node_id=N2'],
'user_identity': 'USER',
}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch.object(context.RequestContext, 'from_dict')
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_default_values(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = heat_endpoint.HeatNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {
'tags': [
'cluster_id=CLUSTER_ID',
'cluster_node_id=NODE_ID'
],
'user_identity': 'USER',
}
metadata = {'timestamp': 'TIMESTAMP'}
call_ctx = mock.Mock()
mock_context.return_value = call_ctx
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
x_rpc.call.assert_called_once_with(call_ctx, 'node_recover', mock.ANY)
req = x_rpc.call.call_args[0][2]
self.assertIsInstance(req, objects.NodeRecoverRequest)
self.assertEqual('NODE_ID', req.identity)
expected_params = {
'event': 'DELETE',
'state': 'Unknown',
'stack_id': 'Unknown',
'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER',
'operation': 'REBUILD',
}
self.assertEqual(expected_params, req.params)

View File

@ -0,0 +1,213 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from senlin.common import context
from senlin.engine.notifications import nova_endpoint
from senlin import objects
from senlin.tests.unit.common import base
@mock.patch('oslo_messaging.NotificationFilter')
class TestNovaNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_init(self, mock_rpc, mock_filter):
x_filter = mock_filter.return_value
event_map = {
'compute.instance.pause.end': 'PAUSE',
'compute.instance.power_off.end': 'POWER_OFF',
'compute.instance.rebuild.error': 'REBUILD',
'compute.instance.shutdown.end': 'SHUTDOWN',
'compute.instance.soft_delete.end': 'SOFT_DELETE',
}
recover_action = {'operation': 'REBUILD'}
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
mock_filter.assert_called_once_with(
publisher_id='^compute.*',
event_type='^compute\.instance\..*',
context={'project_id': '^PROJECT$'})
mock_rpc.assert_called_once_with()
self.assertEqual(x_filter, endpoint.filter_rule)
self.assertEqual(mock_rpc.return_value, endpoint.rpc)
for e in event_map:
self.assertIn(e, endpoint.VM_FAILURE_EVENTS)
self.assertEqual(event_map[e], endpoint.VM_FAILURE_EVENTS[e])
self.assertEqual('PROJECT', endpoint.project_id)
self.assertEqual('CLUSTER_ID', endpoint.cluster_id)
@mock.patch.object(context.RequestContext, 'from_dict')
@mock.patch('senlin.rpc.client.EngineClient')
def test_info(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {
'metadata': {
'cluster_id': 'CLUSTER_ID',
'cluster_node_id': 'FAKE_NODE',
'cluster_node_index': '123',
},
'instance_id': 'PHYSICAL_ID',
'user_id': 'USER',
'state': 'shutoff',
}
metadata = {'timestamp': 'TIMESTAMP'}
call_ctx = mock.Mock()
mock_context.return_value = call_ctx
res = endpoint.info(ctx, 'PUBLISHER', 'compute.instance.shutdown.end',
payload, metadata)
self.assertIsNone(res)
x_rpc.call.assert_called_once_with(call_ctx, 'node_recover', mock.ANY)
req = x_rpc.call.call_args[0][2]
self.assertIsInstance(req, objects.NodeRecoverRequest)
self.assertEqual('FAKE_NODE', req.identity)
expected_params = {
'event': 'SHUTDOWN',
'state': 'shutoff',
'instance_id': 'PHYSICAL_ID',
'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER',
'operation': 'REBUILD'
}
self.assertEqual(expected_params, req.params)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_metadata(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {}}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'compute.instance.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_cluster_in_metadata(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {'foo': 'bar'}}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'compute.instance.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_cluster_id_not_match(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'FOOBAR'}}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'compute.instance.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_event_type_not_interested(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'compute.instance.delete.start',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_node_id(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'compute.instance.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch.object(context.RequestContext, 'from_dict')
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_default_values(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {
'metadata': {
'cluster_id': 'CLUSTER_ID',
'cluster_node_id': 'NODE_ID'
},
'user_id': 'USER',
}
metadata = {'timestamp': 'TIMESTAMP'}
call_ctx = mock.Mock()
mock_context.return_value = call_ctx
res = endpoint.info(ctx, 'PUBLISHER', 'compute.instance.shutdown.end',
payload, metadata)
self.assertIsNone(res)
x_rpc.call.assert_called_once_with(call_ctx, 'node_recover', mock.ANY)
req = x_rpc.call.call_args[0][2]
self.assertIsInstance(req, objects.NodeRecoverRequest)
self.assertEqual('NODE_ID', req.identity)
expected_params = {
'event': 'SHUTDOWN',
'state': 'Unknown',
'instance_id': 'Unknown',
'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER',
'operation': 'REBUILD',
}
self.assertEqual(expected_params, req.params)

View File

@ -23,6 +23,7 @@ from senlin.common import exception as exc
from senlin.common import utils
from senlin.engine import health_manager as hm
from senlin.engine import node as node_mod
from senlin.engine.notifications import nova_endpoint
from senlin import objects
from senlin.objects import cluster as obj_cluster
from senlin.objects import node as obj_node
@ -64,28 +65,31 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
'compute.instance.soft_delete.end': 'SOFT_DELETE',
}
recover_action = {'operation': 'REBUILD'}
obj = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER', recover_action)
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
mock_filter.assert_called_once_with(
publisher_id='^compute.*',
event_type='^compute\.instance\..*',
context={'project_id': '^PROJECT$'})
mock_rpc.assert_called_once_with()
self.assertEqual(x_filter, obj.filter_rule)
self.assertEqual(mock_rpc.return_value, obj.rpc)
self.assertEqual(x_filter, endpoint.filter_rule)
self.assertEqual(mock_rpc.return_value, endpoint.rpc)
for e in event_map:
self.assertIn(e, obj.VM_FAILURE_EVENTS)
self.assertEqual(event_map[e], obj.VM_FAILURE_EVENTS[e])
self.assertEqual('PROJECT', obj.project_id)
self.assertEqual('CLUSTER', obj.cluster_id)
self.assertIn(e, endpoint.VM_FAILURE_EVENTS)
self.assertEqual(event_map[e], endpoint.VM_FAILURE_EVENTS[e])
self.assertEqual('PROJECT', endpoint.project_id)
self.assertEqual('CLUSTER_ID', endpoint.cluster_id)
@mock.patch.object(context.RequestContext, 'from_dict')
@mock.patch('senlin.rpc.client.EngineClient')
def test_info(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {
'metadata': {
@ -123,8 +127,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
def test_info_no_metadata(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -139,8 +144,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
def test_info_no_cluster_in_metadata(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {'foo': 'bar'}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -155,8 +161,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
def test_info_cluster_id_not_match(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'FOOBAR'}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -171,8 +178,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
def test_info_event_type_not_interested(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -187,8 +195,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
def test_info_no_node_id(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {'metadata': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'}
@ -204,8 +213,9 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
def test_info_default_values(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.NovaNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
endpoint = nova_endpoint.NovaNotificationEndpoint(
'PROJECT', 'CLUSTER_ID', recover_action
)
ctx = mock.Mock()
payload = {
'metadata': {
@ -237,208 +247,10 @@ class TestNovaNotificationEndpoint(base.SenlinTestCase):
self.assertEqual(expected_params, req.params)
@mock.patch('oslo_messaging.NotificationFilter')
class TestHeatNotificationEndpoint(base.SenlinTestCase):
@mock.patch('senlin.rpc.client.EngineClient')
def test_init(self, mock_rpc, mock_filter):
x_filter = mock_filter.return_value
event_map = {
'orchestration.stack.delete.end': 'DELETE',
}
recover_action = {'operation': 'REBUILD'}
obj = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER', recover_action)
mock_filter.assert_called_once_with(
publisher_id='^orchestration.*',
event_type='^orchestration\.stack\..*',
context={'project_id': '^PROJECT$'})
mock_rpc.assert_called_once_with()
self.assertEqual(x_filter, obj.filter_rule)
self.assertEqual(mock_rpc.return_value, obj.rpc)
for e in event_map:
self.assertIn(e, obj.STACK_FAILURE_EVENTS)
self.assertEqual(event_map[e], obj.STACK_FAILURE_EVENTS[e])
self.assertEqual('PROJECT', obj.project_id)
self.assertEqual('CLUSTER', obj.cluster_id)
@mock.patch.object(context.RequestContext, 'from_dict')
@mock.patch('senlin.rpc.client.EngineClient')
def test_info(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock()
payload = {
'tags': {
'cluster_id=CLUSTER_ID',
'cluster_node_id=FAKE_NODE',
'cluster_node_index=123',
},
'stack_identity': 'PHYSICAL_ID',
'user_identity': 'USER',
'state': 'DELETE_COMPLETE',
}
metadata = {'timestamp': 'TIMESTAMP'}
call_ctx = mock.Mock()
mock_context.return_value = call_ctx
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
x_rpc.call.assert_called_once_with(call_ctx, 'node_recover', mock.ANY)
req = x_rpc.call.call_args[0][2]
self.assertIsInstance(req, objects.NodeRecoverRequest)
self.assertEqual('FAKE_NODE', req.identity)
expected_params = {
'event': 'DELETE',
'state': 'DELETE_COMPLETE',
'stack_id': 'PHYSICAL_ID',
'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER',
'operation': 'REBUILD',
}
self.assertEqual(expected_params, req.params)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_event_type_not_interested(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock()
payload = {'tags': {'cluster_id': 'CLUSTER_ID'}}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER',
'orchestration.stack.create.start',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock()
payload = {'tags': None}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_empty_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock()
payload = {'tags': []}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_cluster_in_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock()
payload = {'tags': ['foo', 'bar']}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_no_node_in_tag(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock()
payload = {'tags': ['cluster_id=C1ID']}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_cluster_id_not_match(self, mock_rpc, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock()
payload = {'tags': ['cluster_id=FOOBAR', 'cluster_node_id=N2']}
metadata = {'timestamp': 'TIMESTAMP'}
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
self.assertEqual(0, x_rpc.node_recover.call_count)
@mock.patch.object(context.RequestContext, 'from_dict')
@mock.patch('senlin.rpc.client.EngineClient')
def test_info_default_values(self, mock_rpc, mock_context, mock_filter):
x_rpc = mock_rpc.return_value
recover_action = {'operation': 'REBUILD'}
endpoint = hm.HeatNotificationEndpoint('PROJECT', 'CLUSTER_ID',
recover_action)
ctx = mock.Mock()
payload = {
'tags': [
'cluster_id=CLUSTER_ID',
'cluster_node_id=NODE_ID'
],
'user_identity': 'USER',
}
metadata = {'timestamp': 'TIMESTAMP'}
call_ctx = mock.Mock()
mock_context.return_value = call_ctx
res = endpoint.info(ctx, 'PUBLISHER', 'orchestration.stack.delete.end',
payload, metadata)
self.assertIsNone(res)
x_rpc.call.assert_called_once_with(call_ctx, 'node_recover', mock.ANY)
req = x_rpc.call.call_args[0][2]
self.assertIsInstance(req, objects.NodeRecoverRequest)
self.assertEqual('NODE_ID', req.identity)
expected_params = {
'event': 'DELETE',
'state': 'Unknown',
'stack_id': 'Unknown',
'timestamp': 'TIMESTAMP',
'publisher': 'PUBLISHER',
'operation': 'REBUILD',
}
self.assertEqual(expected_params, req.params)
@mock.patch('senlin.engine.health_manager.HeatNotificationEndpoint')
@mock.patch('senlin.engine.health_manager.NovaNotificationEndpoint')
@mock.patch(
'senlin.engine.notifications.heat_endpoint.HeatNotificationEndpoint')
@mock.patch(
'senlin.engine.notifications.nova_endpoint.NovaNotificationEndpoint')
@mock.patch('oslo_messaging.Target')
@mock.patch('oslo_messaging.get_notification_transport')
@mock.patch('oslo_messaging.get_notification_listener')