From aab1800d0d08189a3f2bbe008edccd4298ad2a86 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Wed, 26 Feb 2014 17:17:43 +0100 Subject: [PATCH] Use NotificationPlugin as an oslo.msg endpoint This refactor the NotificationPlugin code to have NotificationPlugin acting as a oslo.messaging endpoint Change-Id: Iecdce3513a22a1c1d5037ed2969c6333e893791d --- ceilometer/event/endpoint.py | 69 ++++++++++ ceilometer/messaging.py | 4 +- ceilometer/notification.py | 90 +++---------- ceilometer/notifier.py | 20 ++- ceilometer/plugin.py | 42 +++++- .../tests/compute/notifications/test_cpu.py | 22 ++-- .../compute/notifications/test_instance.py | 44 +++---- ceilometer/tests/event/test_endpoint.py | 123 ++++++++++++++++++ ceilometer/tests/image/test_notifications.py | 24 ++-- .../tests/network/test_notifications.py | 32 ++--- .../tests/orchestration/test_notifications.py | 3 +- ceilometer/tests/test_middleware.py | 11 +- ceilometer/tests/test_notification.py | 82 ++++-------- ceilometer/tests/test_plugin.py | 26 +++- ceilometer/tests/volume/test_notifications.py | 14 +- 15 files changed, 382 insertions(+), 224 deletions(-) create mode 100644 ceilometer/event/endpoint.py create mode 100644 ceilometer/tests/event/test_endpoint.py diff --git a/ceilometer/event/endpoint.py b/ceilometer/event/endpoint.py new file mode 100644 index 000000000..56f2a27e8 --- /dev/null +++ b/ceilometer/event/endpoint.py @@ -0,0 +1,69 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2012-2014 eNovance +# +# Author: Mehdi Abaakouk +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo.config import cfg +import oslo.messaging +from stevedore import extension + +from ceilometer.event import converter as event_converter +from ceilometer import messaging +from ceilometer.openstack.common.gettextutils import _ # noqa +from ceilometer import service +from ceilometer.storage import models + +LOG = logging.getLogger(__name__) + + +class EventsNotificationEndpoint(service.DispatchedService): + def __init__(self): + LOG.debug(_('Loading event definitions')) + self.event_converter = event_converter.setup_events( + extension.ExtensionManager( + namespace='ceilometer.event.trait_plugin')) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + """Convert message to Ceilometer Event. + + :param ctxt: oslo.messaging context + :param publisher_id: publisher of the notification + :param event_type: type of notification + :param payload: notification payload + :param metadata: metadata about the notification + """ + + #NOTE: the rpc layer currently rips out the notification + #delivery_info, which is critical to determining the + #source of the notification. This will have to get added back later. + notification = messaging.convert_to_old_notification_format( + 'info', ctxt, publisher_id, event_type, payload, metadata) + self.process_notification(notification) + + def process_notification(self, notification): + event = self.event_converter.to_event(notification) + + if event is not None: + LOG.debug(_('Saving event "%s"'), event.event_type) + problem_events = [] + for dispatcher in self.dispatcher_manager: + problem_events.extend(dispatcher.obj.record_events(event)) + if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]: + if not cfg.CONF.notification.ack_on_event_error: + return oslo.messaging.NotificationResult.REQUEUE + return oslo.messaging.NotificationResult.HANDLED diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index 4b66e3eb8..cd0c874ef 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -63,11 +63,11 @@ def get_rpc_client(**kwargs): return oslo.messaging.RPCClient(TRANSPORT, target) -def get_notification_listener(targets, endpoint): +def get_notification_listener(targets, endpoints): """Return a configured oslo.messaging notification listener.""" global TRANSPORT return oslo.messaging.get_notification_listener( - TRANSPORT, targets, [endpoint], executor='eventlet') + TRANSPORT, targets, endpoints, executor='eventlet') def get_notifier(publisher_id): diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 95abebf5a..509df13ca 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -17,18 +17,14 @@ # under the License. from oslo.config import cfg -import oslo.messaging from stevedore import extension -from ceilometer.event import converter as event_converter +from ceilometer.event import endpoint as event_endpoint from ceilometer import messaging -from ceilometer.openstack.common import context from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline -from ceilometer import service -from ceilometer.storage import models LOG = log.getLogger(__name__) @@ -48,25 +44,34 @@ OPTS = [ cfg.CONF.register_opts(OPTS, group="notification") -class NotificationService(service.DispatchedService, os_service.Service): +class NotificationService(os_service.Service): NOTIFICATION_NAMESPACE = 'ceilometer.notification' + @classmethod + def _get_notifications_manager(cls, pm): + return extension.ExtensionManager( + namespace=cls.NOTIFICATION_NAMESPACE, + invoke_on_load=True, + invoke_args=(pm, ) + ) + def start(self): super(NotificationService, self).start() self.pipeline_manager = pipeline.setup_pipeline() - self.notification_manager = extension.ExtensionManager( - namespace=self.NOTIFICATION_NAMESPACE, - invoke_on_load=True, - ) - + self.notification_manager = self._get_notifications_manager( + self.pipeline_manager) if not list(self.notification_manager): LOG.warning(_('Failed to load any notification handlers for %s'), self.NOTIFICATION_NAMESPACE) ack_on_error = cfg.CONF.notification.ack_on_event_error + endpoints = [] + if cfg.CONF.notification.store_events: + endpoints = [event_endpoint.EventsNotificationEndpoint()] + targets = [] for ext in self.notification_manager: handler = ext.obj @@ -76,71 +81,14 @@ class NotificationService(service.DispatchedService, os_service.Service): 'type': ', '.join(handler.event_types), 'error': ack_on_error}) targets.extend(handler.get_targets(cfg.CONF)) + endpoints.append(handler) - self.listener = messaging.get_notification_listener(targets, self) - - LOG.debug(_('Loading event definitions')) - self.event_converter = event_converter.setup_events( - extension.ExtensionManager( - namespace='ceilometer.event.trait_plugin')) - + self.listener = messaging.get_notification_listener(targets, endpoints) self.listener.start() + # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) def stop(self): self.listener.stop() super(NotificationService, self).stop() - - def info(self, ctxt, publisher_id, event_type, payload, metadata): - notification = messaging.convert_to_old_notification_format( - 'info', ctxt, publisher_id, event_type, payload, metadata) - self.process_notification(notification) - - def process_notification(self, notification): - """RPC endpoint for notification messages - - When another service sends a notification over the message - bus, this method receives it. See _setup_subscription(). - - """ - LOG.debug(_('notification %r'), notification.get('event_type')) - self.notification_manager.map(self._process_notification_for_ext, - notification=notification) - - if cfg.CONF.notification.store_events: - return self._message_to_event(notification) - else: - return oslo.messaging.NotificationResult.HANDLED - - def _message_to_event(self, body): - """Convert message to Ceilometer Event. - - NOTE: the rpc layer currently rips out the notification - delivery_info, which is critical to determining the - source of the notification. This will have to get added back later. - - """ - event = self.event_converter.to_event(body) - - if event is not None: - LOG.debug(_('Saving event "%s"'), event.event_type) - problem_events = [] - for dispatcher in self.dispatcher_manager: - problem_events.extend(dispatcher.obj.record_events(event)) - if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]: - if not cfg.CONF.notification.ack_on_event_error: - return oslo.messaging.NotificationResult.REQUEUE - return oslo.messaging.NotificationResult.HANDLED - - def _process_notification_for_ext(self, ext, notification): - """Wrapper for calling pipelines when a notification arrives - - When a message is received by process_notification(), it calls - this method with each notification plugin to allow all the - plugins process the notification. - - """ - with self.pipeline_manager.publisher(context.get_admin_context()) as p: - # FIXME(dhellmann): Spawn green thread? - p(list(ext.obj.to_samples(notification))) diff --git a/ceilometer/notifier.py b/ceilometer/notifier.py index fa510fc67..fb84c7686 100644 --- a/ceilometer/notifier.py +++ b/ceilometer/notifier.py @@ -33,7 +33,7 @@ _pipeline_manager = None def _load_notification_manager(): - global _notification_manager + global _notification_manager, _pipeline_manager namespace = 'ceilometer.notification' @@ -41,7 +41,9 @@ def _load_notification_manager(): _notification_manager = extension.ExtensionManager( namespace=namespace, - invoke_on_load=True) + invoke_on_load=True, + invoke_args=(_pipeline_manager, ) + ) if not list(_notification_manager): LOG.warning(_('Failed to load any notification handlers for %s'), @@ -58,19 +60,13 @@ def _load_pipeline_manager(): ) -def _process_notification_for_ext(ext, context, notification): - with _pipeline_manager.publisher(context) as p: - # FIXME(dhellmann): Spawn green thread? - p(list(ext.obj.to_samples(notification))) - - def notify(context, message): """Sends a notification as a meter using Ceilometer pipelines.""" - if not _notification_manager: - _load_notification_manager() if not _pipeline_manager: _load_pipeline_manager() - _notification_manager.map( - _process_notification_for_ext, + if not _notification_manager: + _load_notification_manager() + _notification_manager.map_method( + 'to_samples_and_publish', context=context or req_context.get_admin_context(), notification=message) diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py index 53d6de619..a017dbc89 100644 --- a/ceilometer/plugin.py +++ b/ceilometer/plugin.py @@ -25,6 +25,8 @@ import fnmatch import oslo.messaging import six +from ceilometer import messaging +from ceilometer.openstack.common import context from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log @@ -42,6 +44,9 @@ class PluginBase(object): @six.add_metaclass(abc.ABCMeta) class NotificationBase(PluginBase): """Base class for plugins that support the notification API.""" + def __init__(self, pipeline_manager): + super(NotificationBase, self).__init__() + self.pipeline_manager = pipeline_manager @abc.abstractproperty def event_types(self): @@ -84,17 +89,40 @@ class NotificationBase(PluginBase): return any(map(lambda e: fnmatch.fnmatch(event_type, e), event_type_to_handle)) - def to_samples(self, notification): - """Return samples produced by *process_notification* for the given - notification, if it's handled by this notification handler. + def info(self, ctxt, publisher_id, event_type, payload, metadata): + """RPC endpoint for notification messages + When another service sends a notification over the message + bus, this method receives it. + + :param ctxt: oslo.messaging context + :param publisher_id: publisher of the notification + :param event_type: type of notification + :param payload: notification payload + :param metadata: metadata about the notification + + """ + notification = messaging.convert_to_old_notification_format( + 'info', ctxt, publisher_id, event_type, payload, metadata) + self.to_samples_and_publish(context.get_admin_context(), notification) + + def to_samples_and_publish(self, context, notification): + """Return samples produced by *process_notification* for the given + notification. + + :param context: Execution context from the service or RPC call :param notification: The notification to process. """ - if self._handle_event_type(notification['event_type'], - self.event_types): - return self.process_notification(notification) - return [] + + #TODO(sileht): this will be moved into oslo.messaging + #see oslo.messaging bp notification-dispatcher-filter + if not self._handle_event_type(notification['event_type'], + self.event_types): + return + + with self.pipeline_manager.publisher(context) as p: + p(list(self.process_notification(notification))) @six.add_metaclass(abc.ABCMeta) diff --git a/ceilometer/tests/compute/notifications/test_cpu.py b/ceilometer/tests/compute/notifications/test_cpu.py index a7629361e..0ce3f8a53 100644 --- a/ceilometer/tests/compute/notifications/test_cpu.py +++ b/ceilometer/tests/compute/notifications/test_cpu.py @@ -100,7 +100,7 @@ class TestMetricsNotifications(test.BaseTestCase): def test_compute_metrics(self): ERROR_METRICS = copy.copy(METRICS_UPDATE) ERROR_METRICS['payload'] = {"metric_err": []} - ic = cpu.CpuFrequency() + ic = cpu.CpuFrequency(None) info = ic._get_sample(METRICS_UPDATE, 'cpu.frequency') info_none = ic._get_sample(METRICS_UPDATE, 'abc.efg') info_error = ic._get_sample(ERROR_METRICS, 'cpu.frequency') @@ -109,51 +109,51 @@ class TestMetricsNotifications(test.BaseTestCase): self.assertIsNone(info_error) def test_compute_cpu_frequency(self): - c = self._process_notification(cpu.CpuFrequency()) + c = self._process_notification(cpu.CpuFrequency(None)) self.assertEqual('compute.node.cpu.frequency', c.name) self.assertEqual(1600, c.volume) def test_compute_cpu_user_time(self): - c = self._process_notification(cpu.CpuUserTime()) + c = self._process_notification(cpu.CpuUserTime(None)) self.assertEqual('compute.node.cpu.user.time', c.name) self.assertEqual(17421440000000L, c.volume) def test_compute_cpu_kernel_time(self): - c = self._process_notification(cpu.CpuKernelTime()) + c = self._process_notification(cpu.CpuKernelTime(None)) self.assertEqual('compute.node.cpu.kernel.time', c.name) self.assertEqual(7852600000000L, c.volume) def test_compute_cpu_idle_time(self): - c = self._process_notification(cpu.CpuIdleTime()) + c = self._process_notification(cpu.CpuIdleTime(None)) self.assertEqual('compute.node.cpu.idle.time', c.name) self.assertEqual(1307374400000000L, c.volume) def test_compute_cpu_iowait_time(self): - c = self._process_notification(cpu.CpuIowaitTime()) + c = self._process_notification(cpu.CpuIowaitTime(None)) self.assertEqual('compute.node.cpu.iowait.time', c.name) self.assertEqual(11697470000000L, c.volume) def test_compute_cpu_kernel_percent(self): - c = self._process_notification(cpu.CpuKernelPercent()) + c = self._process_notification(cpu.CpuKernelPercent(None)) self.assertEqual('compute.node.cpu.kernel.percent', c.name) self.assertEqual(0.5841204961898534, c.volume) def test_compute_cpu_idle_percent(self): - c = self._process_notification(cpu.CpuIdlePercent()) + c = self._process_notification(cpu.CpuIdlePercent(None)) self.assertEqual('compute.node.cpu.idle.percent', c.name) self.assertEqual(97.24985141658965, c.volume) def test_compute_cpu_user_percent(self): - c = self._process_notification(cpu.CpuUserPercent()) + c = self._process_notification(cpu.CpuUserPercent(None)) self.assertEqual('compute.node.cpu.user.percent', c.name) self.assertEqual(1.2959045637294348, c.volume) def test_compute_cpu_iowait_percent(self): - c = self._process_notification(cpu.CpuIowaitPercent()) + c = self._process_notification(cpu.CpuIowaitPercent(None)) self.assertEqual('compute.node.cpu.iowait.percent', c.name) self.assertEqual(0.8701235234910634, c.volume) def test_compute_cpu_percent(self): - c = self._process_notification(cpu.CpuPercent()) + c = self._process_notification(cpu.CpuPercent(None)) self.assertEqual('compute.node.cpu.percent', c.name) self.assertEqual(2.7501485834103515, c.volume) diff --git a/ceilometer/tests/compute/notifications/test_instance.py b/ceilometer/tests/compute/notifications/test_instance.py index b63af6d79..3fce11c19 100644 --- a/ceilometer/tests/compute/notifications/test_instance.py +++ b/ceilometer/tests/compute/notifications/test_instance.py @@ -597,7 +597,7 @@ INSTANCE_SCHEDULED = { class TestNotifications(test.BaseTestCase): def test_process_notification(self): - info = list(instance.Instance().process_notification( + info = list(instance.Instance(None).process_notification( INSTANCE_CREATE_END ))[0] for name, actual, expected in [ @@ -629,42 +629,42 @@ class TestNotifications(test.BaseTestCase): self.assertNotIn('foo.bar', user_meta) def test_instance_create_instance(self): - ic = instance.Instance() + ic = instance.Instance(None) counters = list(ic.process_notification(INSTANCE_CREATE_END)) self.assertEqual(1, len(counters)) c = counters[0] self.assertEqual(1, c.volume) def test_instance_create_flavor(self): - ic = instance.InstanceFlavor() + ic = instance.InstanceFlavor(None) counters = list(ic.process_notification(INSTANCE_CREATE_END)) self.assertEqual(1, len(counters)) c = counters[0] self.assertEqual(1, c.volume) def test_instance_create_memory(self): - ic = instance.Memory() + ic = instance.Memory(None) counters = list(ic.process_notification(INSTANCE_CREATE_END)) self.assertEqual(1, len(counters)) c = counters[0] self.assertEqual(INSTANCE_CREATE_END['payload']['memory_mb'], c.volume) def test_instance_create_vcpus(self): - ic = instance.VCpus() + ic = instance.VCpus(None) counters = list(ic.process_notification(INSTANCE_CREATE_END)) self.assertEqual(1, len(counters)) c = counters[0] self.assertEqual(INSTANCE_CREATE_END['payload']['vcpus'], c.volume) def test_instance_create_root_disk_size(self): - ic = instance.RootDiskSize() + ic = instance.RootDiskSize(None) counters = list(ic.process_notification(INSTANCE_CREATE_END)) self.assertEqual(1, len(counters)) c = counters[0] self.assertEqual(INSTANCE_CREATE_END['payload']['root_gb'], c.volume) def test_instance_create_ephemeral_disk_size(self): - ic = instance.EphemeralDiskSize() + ic = instance.EphemeralDiskSize(None) counters = list(ic.process_notification(INSTANCE_CREATE_END)) self.assertEqual(1, len(counters)) c = counters[0] @@ -672,32 +672,32 @@ class TestNotifications(test.BaseTestCase): c.volume) def test_instance_exists_instance(self): - ic = instance.Instance() + ic = instance.Instance(None) counters = list(ic.process_notification(INSTANCE_EXISTS)) self.assertEqual(1, len(counters)) def test_instance_exists_metadata_list(self): - ic = instance.Instance() + ic = instance.Instance(None) counters = list(ic.process_notification(INSTANCE_EXISTS_METADATA_LIST)) self.assertEqual(1, len(counters)) def test_instance_exists_flavor(self): - ic = instance.Instance() + ic = instance.Instance(None) counters = list(ic.process_notification(INSTANCE_EXISTS)) self.assertEqual(1, len(counters)) def test_instance_delete_instance(self): - ic = instance.Instance() + ic = instance.Instance(None) counters = list(ic.process_notification(INSTANCE_DELETE_START)) self.assertEqual(1, len(counters)) def test_instance_delete_flavor(self): - ic = instance.Instance() + ic = instance.Instance(None) counters = list(ic.process_notification(INSTANCE_DELETE_START)) self.assertEqual(1, len(counters)) def test_instance_finish_resize_instance(self): - ic = instance.Instance() + ic = instance.Instance(None) counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END)) self.assertEqual(1, len(counters)) c = counters[0] @@ -705,7 +705,7 @@ class TestNotifications(test.BaseTestCase): self._verify_user_metadata(c.resource_metadata) def test_instance_finish_resize_flavor(self): - ic = instance.InstanceFlavor() + ic = instance.InstanceFlavor(None) counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END)) self.assertEqual(1, len(counters)) c = counters[0] @@ -714,7 +714,7 @@ class TestNotifications(test.BaseTestCase): self._verify_user_metadata(c.resource_metadata) def test_instance_finish_resize_memory(self): - ic = instance.Memory() + ic = instance.Memory(None) counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END)) self.assertEqual(1, len(counters)) c = counters[0] @@ -723,7 +723,7 @@ class TestNotifications(test.BaseTestCase): self._verify_user_metadata(c.resource_metadata) def test_instance_finish_resize_vcpus(self): - ic = instance.VCpus() + ic = instance.VCpus(None) counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END)) self.assertEqual(1, len(counters)) c = counters[0] @@ -732,7 +732,7 @@ class TestNotifications(test.BaseTestCase): self._verify_user_metadata(c.resource_metadata) def test_instance_resize_finish_instance(self): - ic = instance.Instance() + ic = instance.Instance(None) counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END)) self.assertEqual(1, len(counters)) c = counters[0] @@ -740,7 +740,7 @@ class TestNotifications(test.BaseTestCase): self._verify_user_metadata(c.resource_metadata) def test_instance_resize_finish_flavor(self): - ic = instance.InstanceFlavor() + ic = instance.InstanceFlavor(None) counters = list(ic.process_notification(INSTANCE_RESIZE_REVERT_END)) self.assertEqual(1, len(counters)) c = counters[0] @@ -749,7 +749,7 @@ class TestNotifications(test.BaseTestCase): self._verify_user_metadata(c.resource_metadata) def test_instance_resize_finish_memory(self): - ic = instance.Memory() + ic = instance.Memory(None) counters = list(ic.process_notification(INSTANCE_RESIZE_REVERT_END)) self.assertEqual(1, len(counters)) c = counters[0] @@ -758,7 +758,7 @@ class TestNotifications(test.BaseTestCase): self._verify_user_metadata(c.resource_metadata) def test_instance_resize_finish_vcpus(self): - ic = instance.VCpus() + ic = instance.VCpus(None) counters = list(ic.process_notification(INSTANCE_RESIZE_REVERT_END)) self.assertEqual(1, len(counters)) c = counters[0] @@ -767,7 +767,7 @@ class TestNotifications(test.BaseTestCase): self._verify_user_metadata(c.resource_metadata) def test_instance_delete_samples(self): - ic = instance.InstanceDelete() + ic = instance.InstanceDelete(None) counters = list(ic.process_notification(INSTANCE_DELETE_SAMPLES)) self.assertEqual(2, len(counters)) names = [c.name for c in counters] @@ -776,7 +776,7 @@ class TestNotifications(test.BaseTestCase): self._verify_user_metadata(c.resource_metadata) def test_instance_scheduled(self): - ic = instance.InstanceScheduled() + ic = instance.InstanceScheduled(None) self.assertIn(INSTANCE_SCHEDULED['event_type'], ic.event_types) diff --git a/ceilometer/tests/event/test_endpoint.py b/ceilometer/tests/event/test_endpoint.py new file mode 100644 index 000000000..0d9980d52 --- /dev/null +++ b/ceilometer/tests/event/test_endpoint.py @@ -0,0 +1,123 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2012 New Dream Network, LLC (DreamHost) +# +# Author: Doug Hellmann +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for Ceilometer notify daemon.""" + +import mock + +import oslo.messaging +from stevedore import extension + +from ceilometer.event import endpoint as event_endpoint +from ceilometer import messaging +from ceilometer.openstack.common.fixture import config +from ceilometer.storage import models +from ceilometer.tests import base as tests_base + +TEST_NOTICE_CTXT = { + u'auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2', + u'is_admin': True, + u'project_id': u'7c150a59fe714e6f9263774af9688f0e', + u'quota_class': None, + u'read_deleted': u'no', + u'remote_address': u'10.0.2.15', + u'request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66', + u'roles': [u'admin'], + u'timestamp': u'2012-05-08T20:23:41.425105', + u'user_id': u'1e3ce043029547f1a61c1996d1a531a2', +} + +TEST_NOTICE_METADATA = { + u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451', + u'timestamp': u'2012-05-08 20:23:48.028195', +} + +TEST_NOTICE_PAYLOAD = { + u'created_at': u'2012-05-08 20:23:41', + u'deleted_at': u'', + u'disk_gb': 0, + u'display_name': u'testme', + u'fixed_ips': [{u'address': u'10.0.0.2', + u'floating_ips': [], + u'meta': {}, + u'type': u'fixed', + u'version': 4}], + u'image_ref_url': u'http://10.0.2.15:9292/images/UUID', + u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1', + u'instance_type': u'm1.tiny', + u'instance_type_id': 2, + u'launched_at': u'2012-05-08 20:23:47.985999', + u'memory_mb': 512, + u'state': u'active', + u'state_description': u'', + u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e', + u'user_id': u'1e3ce043029547f1a61c1996d1a531a2', + u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3', + u'vcpus': 1, + u'root_gb': 0, + u'ephemeral_gb': 0, + u'host': u'compute-host-name', + u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4', + u'os_type': u'linux?', + u'architecture': u'x86', + u'image_ref': u'UUID', + u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5', + u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6', +} + + +class TestEventEndpoint(tests_base.BaseTestCase): + + def setUp(self): + super(TestEventEndpoint, self).setUp() + self.CONF = self.useFixture(config.Config()).conf + self.CONF([]) + messaging.setup('fake://') + self.addCleanup(messaging.cleanup) + self.CONF.set_override("connection", "log://", group='database') + self.CONF.set_override("store_events", True, group="notification") + + self.endpoint = event_endpoint.EventsNotificationEndpoint() + + self.mock_dispatcher = mock.MagicMock() + self.endpoint.event_converter = mock.MagicMock() + self.endpoint.event_converter.to_event.return_value = mock.MagicMock( + event_type='test.test') + self.endpoint.dispatcher_manager = \ + extension.ExtensionManager.make_test_instance([ + extension.Extension('test', None, None, self.mock_dispatcher) + ]) + + def test_message_to_event(self): + self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', + 'compute.instance.create.end', + TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) + + def test_message_to_event_duplicate(self): + self.mock_dispatcher.record_events.return_value = [ + (models.Event.DUPLICATE, object())] + message = {'event_type': "foo", 'message_id': "abc"} + self.endpoint.process_notification(message) # Should return silently. + + def test_message_to_event_bad_event(self): + self.CONF.set_override("ack_on_event_error", False, + group="notification") + self.mock_dispatcher.record_events.return_value = [ + (models.Event.UNKNOWN_PROBLEM, object())] + message = {'event_type': "foo", 'message_id': "abc"} + ret = self.endpoint.process_notification(message) + self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, ret) diff --git a/ceilometer/tests/image/test_notifications.py b/ceilometer/tests/image/test_notifications.py index af4352e38..fb8d895de 100644 --- a/ceilometer/tests/image/test_notifications.py +++ b/ceilometer/tests/image/test_notifications.py @@ -19,6 +19,8 @@ import datetime +import mock + from ceilometer.image import notifications from ceilometer.openstack.common import test from ceilometer import sample @@ -101,7 +103,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(u'images.example.com', metadata.get('host')) def test_image_download(self): - handler = notifications.ImageDownload() + handler = notifications.ImageDownload(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_SEND)) self.assertEqual(1, len(counters)) download = counters[0] @@ -111,7 +113,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_DELTA, download.type) def test_image_serve(self): - handler = notifications.ImageServe() + handler = notifications.ImageServe(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_SEND)) self.assertEqual(1, len(counters)) serve = counters[0] @@ -124,7 +126,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_DELTA, serve.type) def test_image_crud_on_update(self): - handler = notifications.ImageCRUD() + handler = notifications.ImageCRUD(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_UPDATE)) self.assertEqual(1, len(counters)) update = counters[0] @@ -132,7 +134,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_DELTA, update.type) def test_image_on_update(self): - handler = notifications.Image() + handler = notifications.Image(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_UPDATE)) self.assertEqual(1, len(counters)) update = counters[0] @@ -140,7 +142,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_GAUGE, update.type) def test_image_size_on_update(self): - handler = notifications.ImageSize() + handler = notifications.ImageSize(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_UPDATE)) self.assertEqual(1, len(counters)) update = counters[0] @@ -149,7 +151,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_GAUGE, update.type) def test_image_crud_on_upload(self): - handler = notifications.ImageCRUD() + handler = notifications.ImageCRUD(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_UPLOAD)) self.assertEqual(1, len(counters)) upload = counters[0] @@ -157,7 +159,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_DELTA, upload.type) def test_image_on_upload(self): - handler = notifications.Image() + handler = notifications.Image(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_UPLOAD)) self.assertEqual(1, len(counters)) upload = counters[0] @@ -165,7 +167,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_GAUGE, upload.type) def test_image_size_on_upload(self): - handler = notifications.ImageSize() + handler = notifications.ImageSize(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_UPLOAD)) self.assertEqual(1, len(counters)) upload = counters[0] @@ -174,7 +176,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_GAUGE, upload.type) def test_image_crud_on_delete(self): - handler = notifications.ImageCRUD() + handler = notifications.ImageCRUD(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_DELETE)) self.assertEqual(1, len(counters)) delete = counters[0] @@ -182,7 +184,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_DELTA, delete.type) def test_image_on_delete(self): - handler = notifications.Image() + handler = notifications.Image(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_DELETE)) self.assertEqual(1, len(counters)) delete = counters[0] @@ -190,7 +192,7 @@ class TestNotification(test.BaseTestCase): self.assertEqual(sample.TYPE_GAUGE, delete.type) def test_image_size_on_delete(self): - handler = notifications.ImageSize() + handler = notifications.ImageSize(mock.Mock()) counters = list(handler.process_notification(NOTIFICATION_DELETE)) self.assertEqual(1, len(counters)) delete = counters[0] diff --git a/ceilometer/tests/network/test_notifications.py b/ceilometer/tests/network/test_notifications.py index 4cc9289b0..64653ac32 100644 --- a/ceilometer/tests/network/test_notifications.py +++ b/ceilometer/tests/network/test_notifications.py @@ -18,6 +18,8 @@ """Tests for ceilometer.network.notifications """ +import mock + from ceilometer.network import notifications from ceilometer.tests import base as test @@ -250,53 +252,53 @@ NOTIFICATION_L3_METER = { class TestNotifications(test.BaseTestCase): def test_network_create(self): - v = notifications.Network() + v = notifications.Network(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_NETWORK_CREATE)) self.assertEqual(2, len(samples)) self.assertEqual("network.create", samples[1].name) def test_subnet_create(self): - v = notifications.Subnet() + v = notifications.Subnet(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_SUBNET_CREATE)) self.assertEqual(2, len(samples)) self.assertEqual("subnet.create", samples[1].name) def test_port_create(self): - v = notifications.Port() + v = notifications.Port(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_PORT_CREATE)) self.assertEqual(2, len(samples)) self.assertEqual("port.create", samples[1].name) def test_port_update(self): - v = notifications.Port() + v = notifications.Port(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_PORT_UPDATE)) self.assertEqual(2, len(samples)) self.assertEqual("port.update", samples[1].name) def test_network_exists(self): - v = notifications.Network() + v = notifications.Network(mock.Mock()) samples = v.process_notification(NOTIFICATION_NETWORK_EXISTS) self.assertEqual(1, len(list(samples))) def test_router_exists(self): - v = notifications.Router() + v = notifications.Router(mock.Mock()) samples = v.process_notification(NOTIFICATION_ROUTER_EXISTS) self.assertEqual(1, len(list(samples))) def test_floatingip_exists(self): - v = notifications.FloatingIP() + v = notifications.FloatingIP(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_FLOATINGIP_EXISTS)) self.assertEqual(1, len(samples)) self.assertEqual("ip.floating", samples[0].name) def test_floatingip_update(self): - v = notifications.FloatingIP() + v = notifications.FloatingIP(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_FLOATINGIP_UPDATE)) self.assertEqual(len(samples), 2) self.assertEqual(samples[0].name, "ip.floating") def test_metering_report(self): - v = notifications.Bandwidth() + v = notifications.Bandwidth(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_L3_METER)) self.assertEqual(1, len(samples)) self.assertEqual("bandwidth", samples[0].name) @@ -305,25 +307,25 @@ class TestNotifications(test.BaseTestCase): class TestEventTypes(test.BaseTestCase): def test_network(self): - v = notifications.Network() + v = notifications.Network(mock.Mock()) events = v.event_types self.assertIsNotEmpty(events) def test_subnet(self): - v = notifications.Subnet() + v = notifications.Subnet(mock.Mock()) events = v.event_types self.assertIsNotEmpty(events) def test_port(self): - v = notifications.Port() + v = notifications.Port(mock.Mock()) events = v.event_types self.assertIsNotEmpty(events) def test_router(self): - self.assertTrue(notifications.Router().event_types) + self.assertTrue(notifications.Router(mock.Mock()).event_types) def test_floatingip(self): - self.assertTrue(notifications.FloatingIP().event_types) + self.assertTrue(notifications.FloatingIP(mock.Mock()).event_types) def test_bandwidth(self): - self.assertTrue(notifications.Bandwidth().event_types) + self.assertTrue(notifications.Bandwidth(mock.Mock()).event_types) diff --git a/ceilometer/tests/orchestration/test_notifications.py b/ceilometer/tests/orchestration/test_notifications.py index f0d03f1a2..ebcc9fcd4 100644 --- a/ceilometer/tests/orchestration/test_notifications.py +++ b/ceilometer/tests/orchestration/test_notifications.py @@ -14,6 +14,7 @@ import datetime +import mock from oslo.config import cfg from ceilometer.openstack.common import test @@ -102,7 +103,7 @@ class TestNotification(test.BaseTestCase): def _test_operation(self, operation, trust=None): notif = stack_notification_for(operation, trust) - handler = notifications.StackCRUD() + handler = notifications.StackCRUD(mock.Mock()) data = list(handler.process_notification(notif)) self.assertEqual(len(data), 1) if trust: diff --git a/ceilometer/tests/test_middleware.py b/ceilometer/tests/test_middleware.py index 992ab6492..196943db7 100644 --- a/ceilometer/tests/test_middleware.py +++ b/ceilometer/tests/test_middleware.py @@ -16,6 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. +import mock + from ceilometer import middleware from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common import test @@ -74,7 +76,7 @@ class TestNotifications(test.BaseTestCase): self.CONF = self.useFixture(config.Config()).conf def test_process_request_notification(self): - sample = list(middleware.HTTPRequest().process_notification( + sample = list(middleware.HTTPRequest(mock.Mock()).process_notification( HTTP_REQUEST ))[0] self.assertEqual(HTTP_REQUEST['payload']['request']['HTTP_X_USER_ID'], @@ -86,9 +88,8 @@ class TestNotifications(test.BaseTestCase): self.assertEqual(1, sample.volume) def test_process_response_notification(self): - sample = list(middleware.HTTPResponse().process_notification( - HTTP_RESPONSE - ))[0] + sample = list(middleware.HTTPResponse( + mock.Mock()).process_notification(HTTP_RESPONSE))[0] self.assertEqual(HTTP_RESPONSE['payload']['request']['HTTP_X_USER_ID'], sample.user_id) self.assertEqual(HTTP_RESPONSE['payload']['request'] @@ -98,5 +99,5 @@ class TestNotifications(test.BaseTestCase): self.assertEqual(1, sample.volume) def test_targets(self): - targets = middleware.HTTPRequest().get_targets(self.CONF) + targets = middleware.HTTPRequest(mock.Mock()).get_targets(self.CONF) self.assertEqual(4, len(targets)) diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index cc3690562..71bfe3c3d 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -26,7 +26,6 @@ from ceilometer.compute.notifications import instance from ceilometer import messaging from ceilometer import notification from ceilometer.openstack.common.fixture import config -from ceilometer.storage import models from ceilometer.tests import base as tests_base TEST_NOTICE_CTXT = { @@ -89,79 +88,52 @@ class TestNotification(tests_base.BaseTestCase): messaging.setup('fake://') self.addCleanup(messaging.cleanup) self.CONF.set_override("connection", "log://", group='database') + self.CONF.set_override("store_events", False, group="notification") self.srv = notification.NotificationService() - def _make_test_manager(self, plugin): + def fake_get_notifications_manager(self, pm): + self.plugin = instance.Instance(pm) return extension.ExtensionManager.make_test_instance( [ extension.Extension('test', None, None, - plugin), + self.plugin) ] ) @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) - @mock.patch('ceilometer.event.converter.setup_events', mock.MagicMock()) @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start', mock.MagicMock()) + @mock.patch('ceilometer.event.endpoint.EventsNotificationEndpoint') + def _do_process_notification_manager_start(self, + fake_event_endpoint_class): + with mock.patch.object(self.srv, '_get_notifications_manager') \ + as get_nm: + get_nm.side_effect = self.fake_get_notifications_manager + self.srv.start() + self.fake_event_endpoint = fake_event_endpoint_class.return_value + def test_process_notification(self): - # If we try to create a real RPC connection, init_host() never - # returns. Mock it out so we can establish the service - # configuration. - self.CONF.set_override("store_events", False, group="notification") - self.srv.start() + self._do_process_notification_manager_start() self.srv.pipeline_manager.pipelines[0] = mock.MagicMock() - self.srv.notification_manager = self._make_test_manager( - instance.Instance() - ) - self.srv.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', - 'compute.instance.create.end', - TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) + self.plugin.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', + 'compute.instance.create.end', + TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) - self.assertTrue( - self.srv.pipeline_manager.publisher.called) + self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints)) + self.assertTrue(self.srv.pipeline_manager.publisher.called) def test_process_notification_no_events(self): - self.CONF.set_override("store_events", False, group="notification") - self.srv.notification_manager = mock.MagicMock() - with mock.patch.object(self.srv, - '_message_to_event') as fake_msg_to_event: - self.srv.process_notification({}) - self.assertFalse(fake_msg_to_event.called) + self._do_process_notification_manager_start() + self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints)) + self.assertNotEqual(self.fake_event_endpoint, + self.srv.listener.dispatcher.endpoints[0]) def test_process_notification_with_events(self): self.CONF.set_override("store_events", True, group="notification") - self.srv.notification_manager = mock.MagicMock() - with mock.patch.object(self.srv, - '_message_to_event') as fake_msg_to_event: - self.srv.process_notification({}) - self.assertTrue(fake_msg_to_event.called) - - def test_message_to_event_duplicate(self): - self.CONF.set_override("store_events", True, group="notification") - mock_dispatcher = mock.MagicMock() - self.srv.event_converter = mock.MagicMock() - self.srv.event_converter.to_event.return_value = mock.MagicMock( - event_type='test.test') - self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher) - mock_dispatcher.record_events.return_value = [ - (models.Event.DUPLICATE, object())] - message = {'event_type': "foo", 'message_id': "abc"} - self.srv._message_to_event(message) # Should return silently. - - def test_message_to_event_bad_event(self): - self.CONF.set_override("store_events", True, group="notification") - self.CONF.set_override("ack_on_event_error", False, - group="notification") - mock_dispatcher = mock.MagicMock() - self.srv.event_converter = mock.MagicMock() - self.srv.event_converter.to_event.return_value = mock.MagicMock( - event_type='test.test') - self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher) - mock_dispatcher.record_events.return_value = [ - (models.Event.UNKNOWN_PROBLEM, object())] - message = {'event_type': "foo", 'message_id': "abc"} - ret = self.srv._message_to_event(message) - self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, ret) + self._do_process_notification_manager_start() + self.assertEqual(2, len(self.srv.listener.dispatcher.endpoints)) + self.assertEqual(self.fake_event_endpoint, + self.srv.listener.dispatcher.endpoints[0]) diff --git a/ceilometer/tests/test_plugin.py b/ceilometer/tests/test_plugin.py index 4f206444f..344c27123 100644 --- a/ceilometer/tests/test_plugin.py +++ b/ceilometer/tests/test_plugin.py @@ -16,6 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. +import mock + from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common import test from ceilometer import plugin @@ -110,14 +112,26 @@ class NotificationBaseTestCase(test.BaseTestCase): class FakeNetworkPlugin(FakePlugin): event_types = ['network.*'] - def test_to_samples(self): - c = self.FakeComputePlugin() - n = self.FakeNetworkPlugin() - self.assertTrue(len(list(c.to_samples(TEST_NOTIFICATION))) > 0) - self.assertEqual(0, len(list(n.to_samples(TEST_NOTIFICATION)))) + def _do_test_to_samples(self, plugin_class, match): + pm = mock.MagicMock() + plug = plugin_class(pm) + publish = pm.publisher.return_value.__enter__.return_value + + plug.to_samples_and_publish(mock.Mock(), TEST_NOTIFICATION) + + if match: + publish.assert_called_once_with(list(TEST_NOTIFICATION)) + else: + self.assertEqual(0, publish.call_count) + + def test_to_samples_match(self): + self._do_test_to_samples(self.FakeComputePlugin, True) + + def test_to_samples_no_match(self): + self._do_test_to_samples(self.FakeNetworkPlugin, False) def test_get_targets_compat(self): - targets = self.FakeComputePlugin().get_targets(self.CONF) + targets = self.FakeComputePlugin(mock.Mock()).get_targets(self.CONF) self.assertEqual(3, len(targets)) self.assertEqual('t1', targets[0].topic) self.assertEqual('exchange1', targets[0].exchange) diff --git a/ceilometer/tests/volume/test_notifications.py b/ceilometer/tests/volume/test_notifications.py index 4feeed2b1..9602fd710 100644 --- a/ceilometer/tests/volume/test_notifications.py +++ b/ceilometer/tests/volume/test_notifications.py @@ -10,6 +10,8 @@ # License for the specific language governing permissions and limitations # under the License. +import mock + from ceilometer.openstack.common import test from ceilometer.volume import notifications @@ -111,7 +113,7 @@ class TestNotifications(test.BaseTestCase): self.assertEqual(notification['publisher_id'], metadata.get('host')) def test_volume_exists(self): - v = notifications.Volume() + v = notifications.Volume(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_VOLUME_EXISTS)) self.assertEqual(1, len(samples)) s = samples[0] @@ -119,7 +121,7 @@ class TestNotifications(test.BaseTestCase): self.assertEqual(1, s.volume) def test_volume_size_exists(self): - v = notifications.VolumeSize() + v = notifications.VolumeSize(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_VOLUME_EXISTS)) self.assertEqual(1, len(samples)) s = samples[0] @@ -129,7 +131,7 @@ class TestNotifications(test.BaseTestCase): s.volume) def test_volume_delete(self): - v = notifications.Volume() + v = notifications.Volume(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_VOLUME_DELETE)) self.assertEqual(1, len(samples)) s = samples[0] @@ -137,7 +139,7 @@ class TestNotifications(test.BaseTestCase): self.assertEqual(1, s.volume) def test_volume_size_delete(self): - v = notifications.VolumeSize() + v = notifications.VolumeSize(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_VOLUME_DELETE)) self.assertEqual(1, len(samples)) s = samples[0] @@ -147,7 +149,7 @@ class TestNotifications(test.BaseTestCase): s.volume) def test_volume_resize(self): - v = notifications.Volume() + v = notifications.Volume(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_VOLUME_RESIZE)) self.assertEqual(1, len(samples)) s = samples[0] @@ -155,7 +157,7 @@ class TestNotifications(test.BaseTestCase): self.assertEqual(1, s.volume) def test_volume_size_resize(self): - v = notifications.VolumeSize() + v = notifications.VolumeSize(mock.Mock()) samples = list(v.process_notification(NOTIFICATION_VOLUME_RESIZE)) self.assertEqual(1, len(samples)) s = samples[0]