Forbid new legacy notification event_type
As we agreed on the Mitaka midcycle no new legacy notification event_type are allowed in Nova and every new notification shall use the versioned notification framework. This patch adds a validation step into the legacy notification sending call chain. It checks the event_type against a whitelist. During unit and functional testing an exception is raised if the event_type is not known and therefore forbiden. This check is not 100% foolproof as it only detects the new event_type if there is a test that triggers the sending of it and this check is not capable of detecting such event_types if the test directly mocks nova.rpc.get_notifier. During normal operation a WARNING is logged so that a logstash query can be set up to detect the rest of the cases. Change-Id: Ia6600868aa7b3a22cb8c020503f49dce6a4c2c2b
This commit is contained in:
parent
38c3f883db
commit
590d40c804
167
nova/rpc.py
167
nova/rpc.py
@ -26,7 +26,11 @@ __all__ = [
|
|||||||
'TRANSPORT_ALIASES',
|
'TRANSPORT_ALIASES',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
import functools
|
||||||
|
|
||||||
|
from nova.i18n import _
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
import oslo_messaging as messaging
|
import oslo_messaging as messaging
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
|
|
||||||
@ -45,6 +49,8 @@ notification_opts = [
|
|||||||
|
|
||||||
CONF.register_opts(notification_opts)
|
CONF.register_opts(notification_opts)
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
TRANSPORT = None
|
TRANSPORT = None
|
||||||
LEGACY_NOTIFIER = None
|
LEGACY_NOTIFIER = None
|
||||||
NOTIFICATION_TRANSPORT = None
|
NOTIFICATION_TRANSPORT = None
|
||||||
@ -179,9 +185,168 @@ def get_notifier(service, host=None, publisher_id=None):
|
|||||||
assert LEGACY_NOTIFIER is not None
|
assert LEGACY_NOTIFIER is not None
|
||||||
if not publisher_id:
|
if not publisher_id:
|
||||||
publisher_id = "%s.%s" % (service, host or CONF.host)
|
publisher_id = "%s.%s" % (service, host or CONF.host)
|
||||||
return LEGACY_NOTIFIER.prepare(publisher_id=publisher_id)
|
return LegacyValidatingNotifier(
|
||||||
|
LEGACY_NOTIFIER.prepare(publisher_id=publisher_id))
|
||||||
|
|
||||||
|
|
||||||
def get_versioned_notifier(publisher_id):
|
def get_versioned_notifier(publisher_id):
|
||||||
assert NOTIFIER is not None
|
assert NOTIFIER is not None
|
||||||
return NOTIFIER.prepare(publisher_id=publisher_id)
|
return NOTIFIER.prepare(publisher_id=publisher_id)
|
||||||
|
|
||||||
|
|
||||||
|
class LegacyValidatingNotifier(object):
|
||||||
|
"""Wraps an oslo.messaging Notifier and checks for allowed event_types."""
|
||||||
|
|
||||||
|
# If true an exception is thrown if the event_type is not allowed, if false
|
||||||
|
# then only a WARNING is logged
|
||||||
|
fatal = False
|
||||||
|
|
||||||
|
# This list contains the already existing therefore allowed legacy
|
||||||
|
# notification event_types. New items shall not be added to the list as
|
||||||
|
# Nova does not allow new legacy notifications any more. This list will be
|
||||||
|
# removed when all the notification is transformed to versioned
|
||||||
|
# notifications.
|
||||||
|
allowed_legacy_notification_event_types = [
|
||||||
|
'aggregate.addhost.end',
|
||||||
|
'aggregate.addhost.start',
|
||||||
|
'aggregate.create.end',
|
||||||
|
'aggregate.create.start',
|
||||||
|
'aggregate.delete.end',
|
||||||
|
'aggregate.delete.start',
|
||||||
|
'aggregate.removehost.end',
|
||||||
|
'aggregate.removehost.start',
|
||||||
|
'aggregate.updatemetadata.end',
|
||||||
|
'aggregate.updatemetadata.start',
|
||||||
|
'aggregate.updateprop.end',
|
||||||
|
'aggregate.updateprop.start',
|
||||||
|
'api.fault',
|
||||||
|
'compute.instance.create.end',
|
||||||
|
'compute.instance.create.error',
|
||||||
|
'compute.instance.create_ip.end',
|
||||||
|
'compute.instance.create_ip.start',
|
||||||
|
'compute.instance.create.start',
|
||||||
|
'compute.instance.delete.end',
|
||||||
|
'compute.instance.delete_ip.end',
|
||||||
|
'compute.instance.delete_ip.start',
|
||||||
|
'compute.instance.delete.start',
|
||||||
|
'compute.instance.evacuate',
|
||||||
|
'compute.instance.exists',
|
||||||
|
'compute.instance.finish_resize.end',
|
||||||
|
'compute.instance.finish_resize.start',
|
||||||
|
'compute.instance.live.migration.abort.start',
|
||||||
|
'compute.instance.live.migration.abort.end',
|
||||||
|
'compute.instance.live_migration.post.dest.end',
|
||||||
|
'compute.instance.live_migration.post.dest.start',
|
||||||
|
'compute.instance.live_migration._post.end',
|
||||||
|
'compute.instance.live_migration._post.start',
|
||||||
|
'compute.instance.live_migration.pre.end',
|
||||||
|
'compute.instance.live_migration.pre.start',
|
||||||
|
'compute.instance.live_migration.rollback.dest.end',
|
||||||
|
'compute.instance.live_migration.rollback.dest.start',
|
||||||
|
'compute.instance.live_migration._rollback.end',
|
||||||
|
'compute.instance.live_migration._rollback.start',
|
||||||
|
'compute.instance.pause.end',
|
||||||
|
'compute.instance.pause.start',
|
||||||
|
'compute.instance.power_off.end',
|
||||||
|
'compute.instance.power_off.start',
|
||||||
|
'compute.instance.power_on.end',
|
||||||
|
'compute.instance.power_on.start',
|
||||||
|
'compute.instance.reboot.end',
|
||||||
|
'compute.instance.reboot.start',
|
||||||
|
'compute.instance.rebuild.end',
|
||||||
|
'compute.instance.rebuild.error',
|
||||||
|
'compute.instance.rebuild.scheduled',
|
||||||
|
'compute.instance.rebuild.start',
|
||||||
|
'compute.instance.rescue.end',
|
||||||
|
'compute.instance.rescue.start',
|
||||||
|
'compute.instance.resize.confirm.end',
|
||||||
|
'compute.instance.resize.confirm.start',
|
||||||
|
'compute.instance.resize.end',
|
||||||
|
'compute.instance.resize.error',
|
||||||
|
'compute.instance.resize.prep.end',
|
||||||
|
'compute.instance.resize.prep.start',
|
||||||
|
'compute.instance.resize.revert.end',
|
||||||
|
'compute.instance.resize.revert.start',
|
||||||
|
'compute.instance.resize.start',
|
||||||
|
'compute.instance.restore.end',
|
||||||
|
'compute.instance.restore.start',
|
||||||
|
'compute.instance.resume.end',
|
||||||
|
'compute.instance.resume.start',
|
||||||
|
'compute.instance.shelve.end',
|
||||||
|
'compute.instance.shelve_offload.end',
|
||||||
|
'compute.instance.shelve_offload.start',
|
||||||
|
'compute.instance.shelve.start',
|
||||||
|
'compute.instance.shutdown.end',
|
||||||
|
'compute.instance.shutdown.start',
|
||||||
|
'compute.instance.snapshot.end',
|
||||||
|
'compute.instance.snapshot.start',
|
||||||
|
'compute.instance.soft_delete.end',
|
||||||
|
'compute.instance.soft_delete.start',
|
||||||
|
'compute.instance.suspend.end',
|
||||||
|
'compute.instance.suspend.start',
|
||||||
|
'compute.instance.trigger_crash_dump.end',
|
||||||
|
'compute.instance.trigger_crash_dump.start',
|
||||||
|
'compute.instance.unpause.end',
|
||||||
|
'compute.instance.unpause.start',
|
||||||
|
'compute.instance.unrescue.end',
|
||||||
|
'compute.instance.unrescue.start',
|
||||||
|
'compute.instance.unshelve.start',
|
||||||
|
'compute.instance.unshelve.end',
|
||||||
|
'compute.instance.update',
|
||||||
|
'compute.instance.volume.attach',
|
||||||
|
'compute.instance.volume.detach',
|
||||||
|
'compute.libvirt.error',
|
||||||
|
'compute_task.build_instances',
|
||||||
|
'compute_task.migrate_server',
|
||||||
|
'compute_task.rebuild_server',
|
||||||
|
'HostAPI.power_action.end',
|
||||||
|
'HostAPI.power_action.start',
|
||||||
|
'HostAPI.set_enabled.end',
|
||||||
|
'HostAPI.set_enabled.start',
|
||||||
|
'HostAPI.set_maintenance.end',
|
||||||
|
'HostAPI.set_maintenance.start',
|
||||||
|
'keypair.create.start',
|
||||||
|
'keypair.create.end',
|
||||||
|
'keypair.delete.start',
|
||||||
|
'keypair.delete.end',
|
||||||
|
'keypair.import.start',
|
||||||
|
'keypair.import.end',
|
||||||
|
'network.floating_ip.allocate',
|
||||||
|
'network.floating_ip.associate',
|
||||||
|
'network.floating_ip.deallocate',
|
||||||
|
'network.floating_ip.disassociate',
|
||||||
|
'scheduler.select_destinations.end',
|
||||||
|
'scheduler.select_destinations.start',
|
||||||
|
'servergroup.addmember',
|
||||||
|
'servergroup.create',
|
||||||
|
'servergroup.delete',
|
||||||
|
'volume.usage',
|
||||||
|
]
|
||||||
|
|
||||||
|
message = _('%(event_type)s is not a versioned notification and not '
|
||||||
|
'whitelisted. See ./doc/source/notification.rst')
|
||||||
|
|
||||||
|
def __init__(self, notifier):
|
||||||
|
self.notifier = notifier
|
||||||
|
for priority in ['debug', 'info', 'warn', 'error', 'critical']:
|
||||||
|
setattr(self, priority,
|
||||||
|
functools.partial(self._notify, priority))
|
||||||
|
|
||||||
|
def _is_wrap_exception_notification(self, payload):
|
||||||
|
# nova.exception.wrap_exception decorator emits notification where the
|
||||||
|
# event_type is the name of the decorated function. This is used in
|
||||||
|
# many places but it will be converted to versioned notification in one
|
||||||
|
# run by updating the decorator so it is pointless to white list all
|
||||||
|
# the function names here we white list the notification itself
|
||||||
|
# detected by the special payload keys.
|
||||||
|
return {'exception', 'args'} == set(payload.keys())
|
||||||
|
|
||||||
|
def _notify(self, priority, ctxt, event_type, payload):
|
||||||
|
if (event_type not in self.allowed_legacy_notification_event_types and
|
||||||
|
not self._is_wrap_exception_notification(payload)):
|
||||||
|
if self.fatal:
|
||||||
|
raise AssertionError(self.message % {'event_type': event_type})
|
||||||
|
else:
|
||||||
|
LOG.warning(self.message, {'event_type': event_type})
|
||||||
|
|
||||||
|
getattr(self.notifier, priority)(ctxt, event_type, payload)
|
||||||
|
@ -244,6 +244,8 @@ class TestCase(testtools.TestCase):
|
|||||||
|
|
||||||
openstack_driver.DRIVER_CACHE = {}
|
openstack_driver.DRIVER_CACHE = {}
|
||||||
|
|
||||||
|
self.useFixture(nova_fixtures.ForbidNewLegacyNotificationFixture())
|
||||||
|
|
||||||
def _restore_obj_registry(self):
|
def _restore_obj_registry(self):
|
||||||
objects_base.NovaObjectRegistry._registry._obj_classes = \
|
objects_base.NovaObjectRegistry._registry._obj_classes = \
|
||||||
self._base_test_obj_backup
|
self._base_test_obj_backup
|
||||||
|
@ -524,3 +524,26 @@ class EngineFacadeFixture(fixtures.Fixture):
|
|||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self._ctx_manager._root_factory = self._existing_factory
|
self._ctx_manager._root_factory = self._existing_factory
|
||||||
|
|
||||||
|
|
||||||
|
class ForbidNewLegacyNotificationFixture(fixtures.Fixture):
|
||||||
|
"""Make sure the test fails if new legacy notification is added"""
|
||||||
|
def __init__(self):
|
||||||
|
super(ForbidNewLegacyNotificationFixture, self).__init__()
|
||||||
|
self.notifier = rpc.LegacyValidatingNotifier
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(ForbidNewLegacyNotificationFixture, self).setUp()
|
||||||
|
self.notifier.fatal = True
|
||||||
|
|
||||||
|
# allow the special test value used in
|
||||||
|
# nova.tests.unit.test_notifications.NotificationsTestCase
|
||||||
|
self.notifier.allowed_legacy_notification_event_types.append(
|
||||||
|
'_decorated_function')
|
||||||
|
|
||||||
|
self.addCleanup(self.cleanup)
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
self.notifier.fatal = False
|
||||||
|
self.notifier.allowed_legacy_notification_event_types.remove(
|
||||||
|
'_decorated_function')
|
||||||
|
@ -222,7 +222,8 @@ class TestRPC(testtools.TestCase):
|
|||||||
notifier = rpc.get_notifier('service', publisher_id='foo')
|
notifier = rpc.get_notifier('service', publisher_id='foo')
|
||||||
|
|
||||||
mock_prep.assert_called_once_with(publisher_id='foo')
|
mock_prep.assert_called_once_with(publisher_id='foo')
|
||||||
self.assertEqual('notifier', notifier)
|
self.assertIsInstance(notifier, rpc.LegacyValidatingNotifier)
|
||||||
|
self.assertEqual('notifier', notifier.notifier)
|
||||||
|
|
||||||
def test_get_notifier_null_publisher(self):
|
def test_get_notifier_null_publisher(self):
|
||||||
rpc.LEGACY_NOTIFIER = mock.Mock()
|
rpc.LEGACY_NOTIFIER = mock.Mock()
|
||||||
@ -233,7 +234,8 @@ class TestRPC(testtools.TestCase):
|
|||||||
notifier = rpc.get_notifier('service', host='bar')
|
notifier = rpc.get_notifier('service', host='bar')
|
||||||
|
|
||||||
mock_prep.assert_called_once_with(publisher_id='service.bar')
|
mock_prep.assert_called_once_with(publisher_id='service.bar')
|
||||||
self.assertEqual('notifier', notifier)
|
self.assertIsInstance(notifier, rpc.LegacyValidatingNotifier)
|
||||||
|
self.assertEqual('notifier', notifier.notifier)
|
||||||
|
|
||||||
def test_get_versioned_notifier(self):
|
def test_get_versioned_notifier(self):
|
||||||
rpc.NOTIFIER = mock.Mock()
|
rpc.NOTIFIER = mock.Mock()
|
||||||
|
Loading…
Reference in New Issue
Block a user