Make emitting versioned notifications configurable
A new config option is introduced called 'notification_format' which specifies the format of the notifications emitted, possible values are 'versioned', 'unversioned', 'both'. The default value is 'both'. Also to help consumers the versioned notifications will be emited to a separate topic called 'versioned_notifications'. DocImpact: new config option notification_format Partially-Implements: bp versioned-notification-api Change-Id: Ie45c03175800bb14269f3976b03a53488e084339
This commit is contained in:
parent
1d6451e619
commit
bd401d4cc8
@ -114,8 +114,7 @@ class NotificationBase(base.NovaObject):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def _emit(self, context, event_type, publisher_id, payload):
|
def _emit(self, context, event_type, publisher_id, payload):
|
||||||
assert rpc.NOTIFIER is not None
|
notifier = rpc.get_versioned_notifier(publisher_id)
|
||||||
notifier = rpc.NOTIFIER.prepare(publisher_id=publisher_id)
|
|
||||||
notify = getattr(notifier, self.priority)
|
notify = getattr(notifier, self.priority)
|
||||||
notify(context, event_type=event_type, payload=payload)
|
notify(context, event_type=event_type, payload=payload)
|
||||||
|
|
||||||
|
40
nova/rpc.py
40
nova/rpc.py
@ -33,8 +33,20 @@ from oslo_serialization import jsonutils
|
|||||||
import nova.context
|
import nova.context
|
||||||
import nova.exception
|
import nova.exception
|
||||||
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
notification_opts = [
|
||||||
|
cfg.StrOpt('notification_format',
|
||||||
|
choices=['unversioned', 'versioned', 'both'],
|
||||||
|
default='both',
|
||||||
|
help='Specifies which notification format shall be used by '
|
||||||
|
'nova.'),
|
||||||
|
]
|
||||||
|
|
||||||
|
CONF.register_opts(notification_opts)
|
||||||
|
|
||||||
TRANSPORT = None
|
TRANSPORT = None
|
||||||
|
LEGACY_NOTIFIER = None
|
||||||
NOTIFIER = None
|
NOTIFIER = None
|
||||||
|
|
||||||
ALLOWED_EXMODS = [
|
ALLOWED_EXMODS = [
|
||||||
@ -56,21 +68,34 @@ TRANSPORT_ALIASES = {
|
|||||||
|
|
||||||
|
|
||||||
def init(conf):
|
def init(conf):
|
||||||
global TRANSPORT, NOTIFIER
|
global TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
|
||||||
exmods = get_allowed_exmods()
|
exmods = get_allowed_exmods()
|
||||||
TRANSPORT = messaging.get_transport(conf,
|
TRANSPORT = messaging.get_transport(conf,
|
||||||
allowed_remote_exmods=exmods,
|
allowed_remote_exmods=exmods,
|
||||||
aliases=TRANSPORT_ALIASES)
|
aliases=TRANSPORT_ALIASES)
|
||||||
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
||||||
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
|
if conf.notification_format == 'unversioned':
|
||||||
|
LEGACY_NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
|
||||||
|
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
|
||||||
|
driver='noop')
|
||||||
|
elif conf.notification_format == 'both':
|
||||||
|
LEGACY_NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
|
||||||
|
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
|
||||||
|
topic='versioned_notifications')
|
||||||
|
else:
|
||||||
|
LEGACY_NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
|
||||||
|
driver='noop')
|
||||||
|
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
|
||||||
|
topic='versioned_notifications')
|
||||||
|
|
||||||
|
|
||||||
def cleanup():
|
def cleanup():
|
||||||
global TRANSPORT, NOTIFIER
|
global TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
|
||||||
assert TRANSPORT is not None
|
assert TRANSPORT is not None
|
||||||
|
assert LEGACY_NOTIFIER is not None
|
||||||
assert NOTIFIER is not None
|
assert NOTIFIER is not None
|
||||||
TRANSPORT.cleanup()
|
TRANSPORT.cleanup()
|
||||||
TRANSPORT = NOTIFIER = None
|
TRANSPORT = LEGACY_NOTIFIER = NOTIFIER = None
|
||||||
|
|
||||||
|
|
||||||
def set_defaults(control_exchange):
|
def set_defaults(control_exchange):
|
||||||
@ -141,7 +166,12 @@ def get_server(target, endpoints, serializer=None):
|
|||||||
|
|
||||||
|
|
||||||
def get_notifier(service, host=None, publisher_id=None):
|
def get_notifier(service, host=None, publisher_id=None):
|
||||||
assert NOTIFIER is not None
|
assert LEGACY_NOTIFIER is not None
|
||||||
if not publisher_id:
|
if not publisher_id:
|
||||||
publisher_id = "%s.%s" % (service, host or CONF.host)
|
publisher_id = "%s.%s" % (service, host or CONF.host)
|
||||||
|
return LEGACY_NOTIFIER.prepare(publisher_id=publisher_id)
|
||||||
|
|
||||||
|
|
||||||
|
def get_versioned_notifier(publisher_id):
|
||||||
|
assert NOTIFIER is not None
|
||||||
return NOTIFIER.prepare(publisher_id=publisher_id)
|
return NOTIFIER.prepare(publisher_id=publisher_id)
|
||||||
|
@ -21,10 +21,12 @@ from oslo_serialization import jsonutils
|
|||||||
from nova import rpc
|
from nova import rpc
|
||||||
|
|
||||||
NOTIFICATIONS = []
|
NOTIFICATIONS = []
|
||||||
|
VERSIONED_NOTIFICATIONS = []
|
||||||
|
|
||||||
|
|
||||||
def reset():
|
def reset():
|
||||||
del NOTIFICATIONS[:]
|
del NOTIFICATIONS[:]
|
||||||
|
del VERSIONED_NOTIFICATIONS[:]
|
||||||
|
|
||||||
|
|
||||||
FakeMessage = collections.namedtuple('Message',
|
FakeMessage = collections.namedtuple('Message',
|
||||||
@ -64,11 +66,27 @@ class FakeNotifier(object):
|
|||||||
NOTIFICATIONS.append(msg)
|
NOTIFICATIONS.append(msg)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeVersionedNotifier(FakeNotifier):
|
||||||
|
def _notify(self, priority, ctxt, event_type, payload):
|
||||||
|
payload = self._serializer.serialize_entity(ctxt, payload)
|
||||||
|
VERSIONED_NOTIFICATIONS.append({'publisher_id': self.publisher_id,
|
||||||
|
'priority': priority,
|
||||||
|
'event_type': event_type,
|
||||||
|
'payload': payload})
|
||||||
|
|
||||||
|
|
||||||
def stub_notifier(stubs):
|
def stub_notifier(stubs):
|
||||||
stubs.Set(messaging, 'Notifier', FakeNotifier)
|
stubs.Set(messaging, 'Notifier', FakeNotifier)
|
||||||
if rpc.NOTIFIER:
|
if rpc.LEGACY_NOTIFIER and rpc.NOTIFIER:
|
||||||
stubs.Set(rpc, 'NOTIFIER',
|
stubs.Set(rpc, 'LEGACY_NOTIFIER',
|
||||||
FakeNotifier(rpc.NOTIFIER.transport,
|
FakeNotifier(rpc.LEGACY_NOTIFIER.transport,
|
||||||
rpc.NOTIFIER.publisher_id,
|
rpc.LEGACY_NOTIFIER.publisher_id,
|
||||||
serializer=getattr(rpc.NOTIFIER, '_serializer',
|
serializer=getattr(rpc.LEGACY_NOTIFIER,
|
||||||
|
'_serializer',
|
||||||
|
None)))
|
||||||
|
stubs.Set(rpc, 'NOTIFIER',
|
||||||
|
FakeVersionedNotifier(rpc.NOTIFIER.transport,
|
||||||
|
rpc.NOTIFIER.publisher_id,
|
||||||
|
serializer=getattr(rpc.NOTIFIER,
|
||||||
|
'_serializer',
|
||||||
None)))
|
None)))
|
||||||
|
@ -140,8 +140,9 @@ class TestNotificationBase(test.NoDBTestCase):
|
|||||||
actual_payload = mock_notify.call_args[1]['payload']
|
actual_payload = mock_notify.call_args[1]['payload']
|
||||||
self.assertJsonEqual(expected_payload, actual_payload)
|
self.assertJsonEqual(expected_payload, actual_payload)
|
||||||
|
|
||||||
|
@mock.patch('nova.rpc.LEGACY_NOTIFIER')
|
||||||
@mock.patch('nova.rpc.NOTIFIER')
|
@mock.patch('nova.rpc.NOTIFIER')
|
||||||
def test_emit_notification(self, mock_notifier):
|
def test_emit_notification(self, mock_notifier, mock_legacy):
|
||||||
|
|
||||||
mock_context = mock.Mock()
|
mock_context = mock.Mock()
|
||||||
mock_context.to_dict.return_value = {}
|
mock_context.to_dict.return_value = {}
|
||||||
@ -152,6 +153,7 @@ class TestNotificationBase(test.NoDBTestCase):
|
|||||||
mock_context,
|
mock_context,
|
||||||
expected_event_type='test_object.update.start',
|
expected_event_type='test_object.update.start',
|
||||||
expected_payload=self.expected_payload)
|
expected_payload=self.expected_payload)
|
||||||
|
self.assertFalse(mock_legacy.called)
|
||||||
|
|
||||||
@mock.patch('nova.rpc.NOTIFIER')
|
@mock.patch('nova.rpc.NOTIFIER')
|
||||||
def test_emit_with_host_and_binary_as_publisher(self, mock_notifier):
|
def test_emit_with_host_and_binary_as_publisher(self, mock_notifier):
|
||||||
@ -174,8 +176,9 @@ class TestNotificationBase(test.NoDBTestCase):
|
|||||||
expected_event_type='test_object.update',
|
expected_event_type='test_object.update',
|
||||||
expected_payload=self.expected_payload)
|
expected_payload=self.expected_payload)
|
||||||
|
|
||||||
|
@mock.patch('nova.rpc.LEGACY_NOTIFIER')
|
||||||
@mock.patch('nova.rpc.NOTIFIER')
|
@mock.patch('nova.rpc.NOTIFIER')
|
||||||
def test_emit_event_type_without_phase(self, mock_notifier):
|
def test_emit_event_type_without_phase(self, mock_notifier, mock_legacy):
|
||||||
noti = self.TestNotification(
|
noti = self.TestNotification(
|
||||||
event_type=notification.EventType(
|
event_type=notification.EventType(
|
||||||
object='test_object',
|
object='test_object',
|
||||||
@ -194,6 +197,7 @@ class TestNotificationBase(test.NoDBTestCase):
|
|||||||
mock_context,
|
mock_context,
|
||||||
expected_event_type='test_object.update',
|
expected_event_type='test_object.update',
|
||||||
expected_payload=self.expected_payload)
|
expected_payload=self.expected_payload)
|
||||||
|
self.assertFalse(mock_legacy.called)
|
||||||
|
|
||||||
@mock.patch('nova.rpc.NOTIFIER')
|
@mock.patch('nova.rpc.NOTIFIER')
|
||||||
def test_not_possible_to_emit_if_not_populated(self, mock_notifier):
|
def test_not_possible_to_emit_if_not_populated(self, mock_notifier):
|
||||||
|
51
nova/tests/unit/test_notifier.py
Normal file
51
nova/tests/unit/test_notifier.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
# Copyright 2015 OpenStack Foundation
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from nova import rpc
|
||||||
|
from nova import test
|
||||||
|
|
||||||
|
|
||||||
|
class TestNotifier(test.NoDBTestCase):
|
||||||
|
|
||||||
|
@mock.patch('oslo_messaging.get_transport')
|
||||||
|
@mock.patch('oslo_messaging.Notifier')
|
||||||
|
def test_notification_format_affects_notification_driver(self,
|
||||||
|
mock_notifier,
|
||||||
|
mock_transport):
|
||||||
|
conf = mock.Mock()
|
||||||
|
|
||||||
|
cases = {
|
||||||
|
'unversioned': [
|
||||||
|
mock.call(mock.ANY, serializer=mock.ANY),
|
||||||
|
mock.call(mock.ANY, serializer=mock.ANY, driver='noop')],
|
||||||
|
'both': [
|
||||||
|
mock.call(mock.ANY, serializer=mock.ANY),
|
||||||
|
mock.call(mock.ANY, serializer=mock.ANY,
|
||||||
|
topic='versioned_notifications')],
|
||||||
|
'versioned': [
|
||||||
|
mock.call(mock.ANY, serializer=mock.ANY, driver='noop'),
|
||||||
|
mock.call(mock.ANY, serializer=mock.ANY,
|
||||||
|
topic='versioned_notifications')]}
|
||||||
|
|
||||||
|
for config in cases:
|
||||||
|
mock_notifier.reset_mock()
|
||||||
|
mock_notifier.side_effect = ['first', 'second']
|
||||||
|
conf.notification_format = config
|
||||||
|
rpc.init(conf)
|
||||||
|
self.assertEqual(cases[config], mock_notifier.call_args_list)
|
||||||
|
self.assertEqual('first', rpc.LEGACY_NOTIFIER)
|
||||||
|
self.assertEqual('second', rpc.NOTIFIER)
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- As part of refactoring the notification interface of Nova
|
||||||
|
a new config option 'notification_format' has been added to specifies
|
||||||
|
which notification format shall be used by nova. The possible values
|
||||||
|
are 'unversioned' (e.g. legacy), 'versioned', 'both'. The default
|
||||||
|
value is 'both'.
|
Loading…
Reference in New Issue
Block a user