Add infra for versioned notifications

Versioned notifications uses oslo versioned objects to model the
notification. The wire format of the paylod also uses the serialized
format of the oslo versioned object.

Partially-Implements: bp versioned-notification-api
Change-Id: I32a0f5280eb333e7e21de9ce175e1f435f2e35d5
This commit is contained in:
Balazs Gibizer 2015-11-18 16:18:57 +01:00
parent 3900adee73
commit 1d6451e619
6 changed files with 531 additions and 1 deletions

View File

@ -53,6 +53,7 @@ def register_all():
__import__('nova.objects.monitor_metric')
__import__('nova.objects.network')
__import__('nova.objects.network_request')
__import__('nova.objects.notification')
__import__('nova.objects.numa')
__import__('nova.objects.pci_device')
__import__('nova.objects.pci_device_pool')

View File

@ -483,6 +483,44 @@ class DiskFormat(Enum):
valid_values=DiskFormat.ALL)
class NotificationPriority(Enum):
AUDIT = 'audit'
CRITICAL = 'critical'
DEBUG = 'debug'
INFO = 'info'
ERROR = 'error'
SAMPLE = 'sample'
WARN = 'warn'
ALL = (AUDIT, CRITICAL, DEBUG, INFO, ERROR, SAMPLE, WARN)
def __init__(self):
super(NotificationPriority, self).__init__(
valid_values=NotificationPriority.ALL)
class NotificationPhase(Enum):
START = 'start'
END = 'end'
ERROR = 'error'
ALL = (START, END, ERROR)
def __init__(self):
super(NotificationPhase, self).__init__(
valid_values=NotificationPhase.ALL)
class NotificationAction(Enum):
UPDATE = 'update'
ALL = (UPDATE,)
def __init__(self):
super(NotificationAction, self).__init__(
valid_values=NotificationAction.ALL)
class IPAddress(FieldType):
@staticmethod
def coerce(obj, attr, value):
@ -737,6 +775,18 @@ class DiskFormatField(BaseEnumField):
AUTO_TYPE = DiskFormat()
class NotificationPriorityField(BaseEnumField):
AUTO_TYPE = NotificationPriority()
class NotificationPhaseField(BaseEnumField):
AUTO_TYPE = NotificationPhase()
class NotificationActionField(BaseEnumField):
AUTO_TYPE = NotificationAction()
class IPAddressField(AutoTypedField):
AUTO_TYPE = IPAddress()

View File

@ -0,0 +1,137 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.objects import base
from nova.objects import fields
from nova import rpc
@base.NovaObjectRegistry.register
class EventType(base.NovaObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'object': fields.StringField(nullable=False),
'action': fields.NotificationActionField(nullable=False),
'phase': fields.NotificationPhaseField(nullable=True),
}
def to_notification_event_type_field(self):
"""Serialize the object to the wire format."""
s = '%s.%s' % (self.object, self.action)
if self.obj_attr_is_set('phase'):
s += '.%s' % self.phase
return s
# Note(gibi): It is explicitly not registered as this class shall not be used
# directly, it is just a base class for notification payloads.
@base.NovaObjectRegistry.register_if(False)
class NotificationPayloadBase(base.NovaObject):
"""Base class for the payload of versioned notifications."""
# SCHEMA defines how to populate the payload fields. It is a dictionary
# where every key value pair has the following format:
# <payload_field_name>: (<data_source_name>,
# <field_of_the_data_source>)
# The <payload_field_name> is the name where the data will be stored in the
# payload object, this field has to be defined as a field of the payload.
# The <data_source_name> shall refer to name of the parameter passed as
# kwarg to the payload's populate_schema() call and this object will be
# used as the source of the data. The <field_of_the_data_source> shall be
# a valid field of the passed argument.
# The SCHEMA needs to be applied with the populate_schema() call before the
# notification can be emitted.
# The value of the payload.<payload_field_name> field will be set by the
# <data_source_name>.<field_of_the_data_source> field. The
# <data_source_name> will not be part of the payload object internal or
# external representation.
# Payload fields that are not set by the SCHEMA can be filled in the same
# way as in any versioned object.
SCHEMA = {}
# Version 1.0: Initial version
VERSION = '1.0'
def __init__(self, *args, **kwargs):
super(NotificationPayloadBase, self).__init__(*args, **kwargs)
self.populated = not self.SCHEMA
def populate_schema(self, **kwargs):
"""Populate the object based on the SCHEMA and the source objects
:param kwargs: A dict contains the source object at the key defined in
the SCHEMA
"""
for key, (obj, field) in self.SCHEMA.items():
source = kwargs[obj]
if source.obj_attr_is_set(field):
setattr(self, key, getattr(source, field))
self.populated = True
@base.NovaObjectRegistry.register
class NotificationPublisher(base.NovaObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'host': fields.StringField(nullable=False),
'binary': fields.StringField(nullable=False),
}
@classmethod
def from_service_obj(cls, service):
return cls(host=service.host, binary=service.binary)
# Note(gibi): It is explicitly not registered as this class shall not be used
# directly, it is just a base class for notification.
@base.NovaObjectRegistry.register_if(False)
class NotificationBase(base.NovaObject):
"""Base class for versioned notifications.
Every subclass shall define a 'payload' field.
"""
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'priority': fields.NotificationPriorityField(),
'event_type': fields.ObjectField('EventType'),
'publisher': fields.ObjectField('NotificationPublisher'),
}
def _emit(self, context, event_type, publisher_id, payload):
assert rpc.NOTIFIER is not None
notifier = rpc.NOTIFIER.prepare(publisher_id=publisher_id)
notify = getattr(notifier, self.priority)
notify(context, event_type=event_type, payload=payload)
def emit(self, context):
"""Send the notification."""
assert self.payload.populated
# Note(gibi): notification payload will be a newly populated object
# therefore every field of it will look changed so this does not carry
# any extra information so we drop this from the payload.
self.payload.obj_reset_changes(recursive=False)
self._emit(context,
event_type=
self.event_type.to_notification_event_type_field(),
publisher_id='%s:%s' %
(self.publisher.binary,
self.publisher.host),
payload=self.payload.obj_to_primitive())

View File

@ -941,3 +941,58 @@ class TestIPV6Network(TestField):
for x in good]
self.from_primitive_values = [(x, netaddr.IPNetwork(x))
for x in good]
class TestNotificationPriority(TestField):
def setUp(self):
super(TestNotificationPriority, self).setUp()
self.field = fields.NotificationPriorityField()
self.coerce_good_values = [('audit', 'audit'),
('critical', 'critical'),
('debug', 'debug'),
('error', 'error'),
('sample', 'sample'),
('warn', 'warn')]
self.coerce_bad_values = ['warning']
self.to_primitive_values = self.coerce_good_values[0:1]
self.from_primitive_values = self.coerce_good_values[0:1]
def test_stringify(self):
self.assertEqual("'warn'", self.field.stringify('warn'))
def test_stringify_invalid(self):
self.assertRaises(ValueError, self.field.stringify, 'warning')
class TestNotificationPhase(TestField):
def setUp(self):
super(TestNotificationPhase, self).setUp()
self.field = fields.NotificationPhaseField()
self.coerce_good_values = [('start', 'start'),
('end', 'end'),
('error', 'error')]
self.coerce_bad_values = ['begin']
self.to_primitive_values = self.coerce_good_values[0:1]
self.from_primitive_values = self.coerce_good_values[0:1]
def test_stringify(self):
self.assertEqual("'error'", self.field.stringify('error'))
def test_stringify_invalid(self):
self.assertRaises(ValueError, self.field.stringify, 'begin')
class TestNotificationAction(TestField):
def setUp(self):
super(TestNotificationAction, self).setUp()
self.field = fields.NotificationActionField()
self.coerce_good_values = [('update', 'update')]
self.coerce_bad_values = ['magic']
self.to_primitive_values = self.coerce_good_values[0:1]
self.from_primitive_values = self.coerce_good_values[0:1]
def test_stringify(self):
self.assertEqual("'update'", self.field.stringify('update'))
def test_stringify_invalid(self):
self.assertRaises(ValueError, self.field.stringify, 'magic')

View File

@ -0,0 +1,240 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import timeutils
from nova import objects
from nova.objects import base
from nova.objects import fields
from nova.objects import notification
from nova import test
class TestNotificationBase(test.NoDBTestCase):
@base.NovaObjectRegistry.register_if(False)
class TestObject(base.NovaObject):
VERSION = '1.0'
fields = {
'field_1': fields.StringField(),
'field_2': fields.IntegerField(),
'not_important_field': fields.IntegerField(),
}
@base.NovaObjectRegistry.register_if(False)
class TestNotificationPayload(notification.NotificationPayloadBase):
VERSION = '1.0'
SCHEMA = {
'field_1': ('source_field', 'field_1'),
'field_2': ('source_field', 'field_2'),
}
fields = {
'extra_field': fields.StringField(), # filled by ctor
'field_1': fields.StringField(), # filled by the schema
'field_2': fields.IntegerField(), # filled by the schema
}
def populate_schema(self, source_field):
super(TestNotificationBase.TestNotificationPayload,
self).populate_schema(source_field=source_field)
@base.NovaObjectRegistry.register_if(False)
class TestNotificationPayloadEmptySchema(
notification.NotificationPayloadBase):
VERSION = '1.0'
fields = {
'extra_field': fields.StringField(), # filled by ctor
}
@base.NovaObjectRegistry.register_if(False)
class TestNotification(notification.NotificationBase):
VERSION = '1.0'
fields = {
'payload': fields.ObjectField('TestNotificationPayload')
}
@base.NovaObjectRegistry.register_if(False)
class TestNotificationEmptySchema(notification.NotificationBase):
VERSION = '1.0'
fields = {
'payload': fields.ObjectField('TestNotificationPayloadEmptySchema')
}
fake_service = {
'created_at': timeutils.utcnow().replace(microsecond=0),
'updated_at': None,
'deleted_at': None,
'deleted': False,
'id': 123,
'host': 'fake-host',
'binary': 'nova-fake',
'topic': 'fake-service-topic',
'report_count': 1,
'forced_down': False,
'disabled': False,
'disabled_reason': None,
'last_seen_up': None,
'version': 1}
expected_payload = {
'nova_object.name': 'TestNotificationPayload',
'nova_object.data': {
'extra_field': 'test string',
'field_1': 'test1',
'field_2': 42},
'nova_object.version': '1.0',
'nova_object.namespace': 'nova'}
def setUp(self):
super(TestNotificationBase, self).setUp()
with mock.patch('nova.db.service_update') as mock_db_service_update:
self.service_obj = objects.Service(context=mock.sentinel.context,
id=self.fake_service['id'])
self.service_obj.obj_reset_changes(['version'])
mock_db_service_update.return_value = self.fake_service
self.service_obj.save()
self.my_obj = self.TestObject(field_1='test1',
field_2=42,
not_important_field=13)
self.payload = self.TestNotificationPayload(
extra_field='test string')
self.payload.populate_schema(source_field=self.my_obj)
self.notification = self.TestNotification(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE,
phase=fields.NotificationPhase.START),
publisher=notification.NotificationPublisher.from_service_obj(
self.service_obj),
priority=fields.NotificationPriority.INFO,
payload=self.payload)
def _verify_notification(self, mock_notifier, mock_context,
expected_event_type,
expected_payload):
mock_notifier.prepare.assert_called_once_with(
publisher_id='nova-fake:fake-host')
mock_notify = mock_notifier.prepare.return_value.info
self.assertTrue(mock_notify.called)
self.assertEqual(mock_notify.call_args[0][0], mock_context)
self.assertEqual(mock_notify.call_args[1]['event_type'],
expected_event_type)
actual_payload = mock_notify.call_args[1]['payload']
self.assertJsonEqual(expected_payload, actual_payload)
@mock.patch('nova.rpc.NOTIFIER')
def test_emit_notification(self, mock_notifier):
mock_context = mock.Mock()
mock_context.to_dict.return_value = {}
self.notification.emit(mock_context)
self._verify_notification(
mock_notifier,
mock_context,
expected_event_type='test_object.update.start',
expected_payload=self.expected_payload)
@mock.patch('nova.rpc.NOTIFIER')
def test_emit_with_host_and_binary_as_publisher(self, mock_notifier):
noti = self.TestNotification(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE),
publisher=notification.NotificationPublisher(host='fake-host',
binary='nova-fake'),
priority=fields.NotificationPriority.INFO,
payload=self.payload)
mock_context = mock.Mock()
mock_context.to_dict.return_value = {}
noti.emit(mock_context)
self._verify_notification(
mock_notifier,
mock_context,
expected_event_type='test_object.update',
expected_payload=self.expected_payload)
@mock.patch('nova.rpc.NOTIFIER')
def test_emit_event_type_without_phase(self, mock_notifier):
noti = self.TestNotification(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE),
publisher=notification.NotificationPublisher.from_service_obj(
self.service_obj),
priority=fields.NotificationPriority.INFO,
payload=self.payload)
mock_context = mock.Mock()
mock_context.to_dict.return_value = {}
noti.emit(mock_context)
self._verify_notification(
mock_notifier,
mock_context,
expected_event_type='test_object.update',
expected_payload=self.expected_payload)
@mock.patch('nova.rpc.NOTIFIER')
def test_not_possible_to_emit_if_not_populated(self, mock_notifier):
non_populated_payload = self.TestNotificationPayload(
extra_field='test string')
noti = self.TestNotification(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE),
publisher=notification.NotificationPublisher.from_service_obj(
self.service_obj),
priority=fields.NotificationPriority.INFO,
payload=non_populated_payload)
mock_context = mock.Mock()
self.assertRaises(AssertionError, noti.emit, mock_context)
self.assertFalse(mock_notifier.called)
@mock.patch('nova.rpc.NOTIFIER')
def test_empty_schema(self, mock_notifier):
non_populated_payload = self.TestNotificationPayloadEmptySchema(
extra_field='test string')
noti = self.TestNotificationEmptySchema(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE),
publisher=notification.NotificationPublisher.from_service_obj(
self.service_obj),
priority=fields.NotificationPriority.INFO,
payload=non_populated_payload)
mock_context = mock.Mock()
mock_context.to_dict.return_value = {}
noti.emit(mock_context)
self._verify_notification(
mock_notifier,
mock_context,
expected_event_type='test_object.update',
expected_payload=
{'nova_object.name': 'TestNotificationPayloadEmptySchema',
'nova_object.data': {'extra_field': u'test string'},
'nova_object.version': '1.0',
'nova_object.namespace': 'nova'})

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import copy
import datetime
@ -35,6 +36,7 @@ from nova import exception
from nova import objects
from nova.objects import base
from nova.objects import fields
from nova.objects import notification
from nova import test
from nova.tests import fixtures as nova_fixtures
from nova.tests.unit import fake_notifier
@ -1124,6 +1126,7 @@ object_data = {
'EC2InstanceMapping': '1.0-a4556eb5c5e94c045fe84f49cf71644f',
'EC2SnapshotMapping': '1.0-47e7ddabe1af966dce0cfd0ed6cd7cd1',
'EC2VolumeMapping': '1.0-5b713751d6f97bad620f3378a521020d',
'EventType': '1.0-21dc35de314fc5fc0a7965211c0c00f7',
'FixedIP': '1.14-53e1c10b539f1a82fe83b1af4720efae',
'FixedIPList': '1.14-87a39361c8f08f059004d6b15103cdfd',
'Flavor': '1.1-b6bb7a730a79d720344accefafacf7ee',
@ -1161,6 +1164,7 @@ object_data = {
'MigrationList': '1.2-02c0ec0c50b75ca86a2a74c5e8c911cc',
'MonitorMetric': '1.1-53b1db7c4ae2c531db79761e7acc52ba',
'MonitorMetricList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'NotificationPublisher': '1.0-bbbc1402fb0e443a3eb227cc52b61545',
'NUMACell': '1.2-74fc993ac5c83005e76e34e8487f1c05',
'NUMAPagesTopology': '1.0-c71d86317283266dc8364c149155e48e',
'NUMATopology': '1.2-c63fad38be73b6afd04715c9c1b29220',
@ -1201,7 +1205,7 @@ object_data = {
class TestObjectVersions(test.NoDBTestCase):
def test_versions(self):
checker = fixture.ObjectVersionChecker(
checker = NotificationAwareObjectVersionChecker(
base.NovaObjectRegistry.obj_classes())
fingerprints = checker.get_hashes()
@ -1217,6 +1221,32 @@ class TestObjectVersions(test.NoDBTestCase):
'versions have been bumped, and then update their '
'hashes here.')
def test_notification_payload_version_depends_on_the_schema(self):
@base.NovaObjectRegistry.register_if(False)
class TestNotificationPayload(notification.NotificationPayloadBase):
VERSION = '1.0'
SCHEMA = {
'field_1': ('source_field', 'field_1'),
'field_2': ('source_field', 'field_2'),
}
fields = {
'extra_field': fields.StringField(), # filled by ctor
'field_1': fields.StringField(), # filled by the schema
'field_2': fields.IntegerField(), # filled by the schema
}
checker = NotificationAwareObjectVersionChecker(
{'TestNotificationPayload': (TestNotificationPayload,)})
old_hash = checker._get_fingerprint('TestNotificationPayload')
TestNotificationPayload.SCHEMA['field_3'] = ('source_field',
'field_3')
new_hash = checker._get_fingerprint('TestNotificationPayload')
self.assertNotEqual(old_hash, new_hash)
def test_obj_make_compatible(self):
# Iterate all object classes and verify that we can run
# obj_make_compatible with every older version than current.
@ -1332,3 +1362,20 @@ class TestObjMethodOverrides(test.NoDBTestCase):
obj_class = obj_classes[obj_name][0]
self.assertEqual(args,
inspect.getargspec(obj_class.obj_reset_changes))
class NotificationAwareObjectVersionChecker(fixture.ObjectVersionChecker):
def _get_fingerprint(self, obj_name, extra_data_func=None):
obj_class = copy.deepcopy(self.obj_classes[obj_name][0])
if issubclass(obj_class, notification.NotificationPayloadBase):
# Note(gibi): to make NotificationPayload version dependent on the
# SCHEMA of the payload we inject the SCHEMA content to the hash
# calculation algorithm
extra_relevant_data = collections.OrderedDict(
sorted(obj_class.SCHEMA.items()))
fields = getattr(obj_class, 'fields', {})
fields['_schema_'] = extra_relevant_data
setattr(obj_class, 'fields', fields)
return super(NotificationAwareObjectVersionChecker,
self)._get_fingerprint(obj_name)