Use NotificationPlugin as an oslo.msg endpoint

This refactor the NotificationPlugin code to have
NotificationPlugin acting as a oslo.messaging endpoint

Change-Id: Iecdce3513a22a1c1d5037ed2969c6333e893791d
This commit is contained in:
Mehdi Abaakouk
2014-02-26 17:17:43 +01:00
parent 9733a883b6
commit aab1800d0d
15 changed files with 382 additions and 224 deletions

View File

@@ -0,0 +1,69 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012-2014 eNovance <licensing@enovance.com>
#
# Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo.config import cfg
import oslo.messaging
from stevedore import extension
from ceilometer.event import converter as event_converter
from ceilometer import messaging
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer import service
from ceilometer.storage import models
LOG = logging.getLogger(__name__)
class EventsNotificationEndpoint(service.DispatchedService):
def __init__(self):
LOG.debug(_('Loading event definitions'))
self.event_converter = event_converter.setup_events(
extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin'))
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""Convert message to Ceilometer Event.
:param ctxt: oslo.messaging context
:param publisher_id: publisher of the notification
:param event_type: type of notification
:param payload: notification payload
:param metadata: metadata about the notification
"""
#NOTE: the rpc layer currently rips out the notification
#delivery_info, which is critical to determining the
#source of the notification. This will have to get added back later.
notification = messaging.convert_to_old_notification_format(
'info', ctxt, publisher_id, event_type, payload, metadata)
self.process_notification(notification)
def process_notification(self, notification):
event = self.event_converter.to_event(notification)
if event is not None:
LOG.debug(_('Saving event "%s"'), event.event_type)
problem_events = []
for dispatcher in self.dispatcher_manager:
problem_events.extend(dispatcher.obj.record_events(event))
if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]:
if not cfg.CONF.notification.ack_on_event_error:
return oslo.messaging.NotificationResult.REQUEUE
return oslo.messaging.NotificationResult.HANDLED

View File

@@ -63,11 +63,11 @@ def get_rpc_client(**kwargs):
return oslo.messaging.RPCClient(TRANSPORT, target)
def get_notification_listener(targets, endpoint):
def get_notification_listener(targets, endpoints):
"""Return a configured oslo.messaging notification listener."""
global TRANSPORT
return oslo.messaging.get_notification_listener(
TRANSPORT, targets, [endpoint], executor='eventlet')
TRANSPORT, targets, endpoints, executor='eventlet')
def get_notifier(publisher_id):

View File

@@ -17,18 +17,14 @@
# under the License.
from oslo.config import cfg
import oslo.messaging
from stevedore import extension
from ceilometer.event import converter as event_converter
from ceilometer.event import endpoint as event_endpoint
from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline
from ceilometer import service
from ceilometer.storage import models
LOG = log.getLogger(__name__)
@@ -48,25 +44,34 @@ OPTS = [
cfg.CONF.register_opts(OPTS, group="notification")
class NotificationService(service.DispatchedService, os_service.Service):
class NotificationService(os_service.Service):
NOTIFICATION_NAMESPACE = 'ceilometer.notification'
@classmethod
def _get_notifications_manager(cls, pm):
return extension.ExtensionManager(
namespace=cls.NOTIFICATION_NAMESPACE,
invoke_on_load=True,
invoke_args=(pm, )
)
def start(self):
super(NotificationService, self).start()
self.pipeline_manager = pipeline.setup_pipeline()
self.notification_manager = extension.ExtensionManager(
namespace=self.NOTIFICATION_NAMESPACE,
invoke_on_load=True,
)
self.notification_manager = self._get_notifications_manager(
self.pipeline_manager)
if not list(self.notification_manager):
LOG.warning(_('Failed to load any notification handlers for %s'),
self.NOTIFICATION_NAMESPACE)
ack_on_error = cfg.CONF.notification.ack_on_event_error
endpoints = []
if cfg.CONF.notification.store_events:
endpoints = [event_endpoint.EventsNotificationEndpoint()]
targets = []
for ext in self.notification_manager:
handler = ext.obj
@@ -76,71 +81,14 @@ class NotificationService(service.DispatchedService, os_service.Service):
'type': ', '.join(handler.event_types),
'error': ack_on_error})
targets.extend(handler.get_targets(cfg.CONF))
endpoints.append(handler)
self.listener = messaging.get_notification_listener(targets, self)
LOG.debug(_('Loading event definitions'))
self.event_converter = event_converter.setup_events(
extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin'))
self.listener = messaging.get_notification_listener(targets, endpoints)
self.listener.start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def stop(self):
self.listener.stop()
super(NotificationService, self).stop()
def info(self, ctxt, publisher_id, event_type, payload, metadata):
notification = messaging.convert_to_old_notification_format(
'info', ctxt, publisher_id, event_type, payload, metadata)
self.process_notification(notification)
def process_notification(self, notification):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it. See _setup_subscription().
"""
LOG.debug(_('notification %r'), notification.get('event_type'))
self.notification_manager.map(self._process_notification_for_ext,
notification=notification)
if cfg.CONF.notification.store_events:
return self._message_to_event(notification)
else:
return oslo.messaging.NotificationResult.HANDLED
def _message_to_event(self, body):
"""Convert message to Ceilometer Event.
NOTE: the rpc layer currently rips out the notification
delivery_info, which is critical to determining the
source of the notification. This will have to get added back later.
"""
event = self.event_converter.to_event(body)
if event is not None:
LOG.debug(_('Saving event "%s"'), event.event_type)
problem_events = []
for dispatcher in self.dispatcher_manager:
problem_events.extend(dispatcher.obj.record_events(event))
if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]:
if not cfg.CONF.notification.ack_on_event_error:
return oslo.messaging.NotificationResult.REQUEUE
return oslo.messaging.NotificationResult.HANDLED
def _process_notification_for_ext(self, ext, notification):
"""Wrapper for calling pipelines when a notification arrives
When a message is received by process_notification(), it calls
this method with each notification plugin to allow all the
plugins process the notification.
"""
with self.pipeline_manager.publisher(context.get_admin_context()) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(ext.obj.to_samples(notification)))

View File

@@ -33,7 +33,7 @@ _pipeline_manager = None
def _load_notification_manager():
global _notification_manager
global _notification_manager, _pipeline_manager
namespace = 'ceilometer.notification'
@@ -41,7 +41,9 @@ def _load_notification_manager():
_notification_manager = extension.ExtensionManager(
namespace=namespace,
invoke_on_load=True)
invoke_on_load=True,
invoke_args=(_pipeline_manager, )
)
if not list(_notification_manager):
LOG.warning(_('Failed to load any notification handlers for %s'),
@@ -58,19 +60,13 @@ def _load_pipeline_manager():
)
def _process_notification_for_ext(ext, context, notification):
with _pipeline_manager.publisher(context) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(ext.obj.to_samples(notification)))
def notify(context, message):
"""Sends a notification as a meter using Ceilometer pipelines."""
if not _notification_manager:
_load_notification_manager()
if not _pipeline_manager:
_load_pipeline_manager()
_notification_manager.map(
_process_notification_for_ext,
if not _notification_manager:
_load_notification_manager()
_notification_manager.map_method(
'to_samples_and_publish',
context=context or req_context.get_admin_context(),
notification=message)

View File

@@ -25,6 +25,8 @@ import fnmatch
import oslo.messaging
import six
from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
@@ -42,6 +44,9 @@ class PluginBase(object):
@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
"""Base class for plugins that support the notification API."""
def __init__(self, pipeline_manager):
super(NotificationBase, self).__init__()
self.pipeline_manager = pipeline_manager
@abc.abstractproperty
def event_types(self):
@@ -84,17 +89,40 @@ class NotificationBase(PluginBase):
return any(map(lambda e: fnmatch.fnmatch(event_type, e),
event_type_to_handle))
def to_samples(self, notification):
"""Return samples produced by *process_notification* for the given
notification, if it's handled by this notification handler.
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it.
:param ctxt: oslo.messaging context
:param publisher_id: publisher of the notification
:param event_type: type of notification
:param payload: notification payload
:param metadata: metadata about the notification
"""
notification = messaging.convert_to_old_notification_format(
'info', ctxt, publisher_id, event_type, payload, metadata)
self.to_samples_and_publish(context.get_admin_context(), notification)
def to_samples_and_publish(self, context, notification):
"""Return samples produced by *process_notification* for the given
notification.
:param context: Execution context from the service or RPC call
:param notification: The notification to process.
"""
if self._handle_event_type(notification['event_type'],
self.event_types):
return self.process_notification(notification)
return []
#TODO(sileht): this will be moved into oslo.messaging
#see oslo.messaging bp notification-dispatcher-filter
if not self._handle_event_type(notification['event_type'],
self.event_types):
return
with self.pipeline_manager.publisher(context) as p:
p(list(self.process_notification(notification)))
@six.add_metaclass(abc.ABCMeta)

View File

@@ -100,7 +100,7 @@ class TestMetricsNotifications(test.BaseTestCase):
def test_compute_metrics(self):
ERROR_METRICS = copy.copy(METRICS_UPDATE)
ERROR_METRICS['payload'] = {"metric_err": []}
ic = cpu.CpuFrequency()
ic = cpu.CpuFrequency(None)
info = ic._get_sample(METRICS_UPDATE, 'cpu.frequency')
info_none = ic._get_sample(METRICS_UPDATE, 'abc.efg')
info_error = ic._get_sample(ERROR_METRICS, 'cpu.frequency')
@@ -109,51 +109,51 @@ class TestMetricsNotifications(test.BaseTestCase):
self.assertIsNone(info_error)
def test_compute_cpu_frequency(self):
c = self._process_notification(cpu.CpuFrequency())
c = self._process_notification(cpu.CpuFrequency(None))
self.assertEqual('compute.node.cpu.frequency', c.name)
self.assertEqual(1600, c.volume)
def test_compute_cpu_user_time(self):
c = self._process_notification(cpu.CpuUserTime())
c = self._process_notification(cpu.CpuUserTime(None))
self.assertEqual('compute.node.cpu.user.time', c.name)
self.assertEqual(17421440000000L, c.volume)
def test_compute_cpu_kernel_time(self):
c = self._process_notification(cpu.CpuKernelTime())
c = self._process_notification(cpu.CpuKernelTime(None))
self.assertEqual('compute.node.cpu.kernel.time', c.name)
self.assertEqual(7852600000000L, c.volume)
def test_compute_cpu_idle_time(self):
c = self._process_notification(cpu.CpuIdleTime())
c = self._process_notification(cpu.CpuIdleTime(None))
self.assertEqual('compute.node.cpu.idle.time', c.name)
self.assertEqual(1307374400000000L, c.volume)
def test_compute_cpu_iowait_time(self):
c = self._process_notification(cpu.CpuIowaitTime())
c = self._process_notification(cpu.CpuIowaitTime(None))
self.assertEqual('compute.node.cpu.iowait.time', c.name)
self.assertEqual(11697470000000L, c.volume)
def test_compute_cpu_kernel_percent(self):
c = self._process_notification(cpu.CpuKernelPercent())
c = self._process_notification(cpu.CpuKernelPercent(None))
self.assertEqual('compute.node.cpu.kernel.percent', c.name)
self.assertEqual(0.5841204961898534, c.volume)
def test_compute_cpu_idle_percent(self):
c = self._process_notification(cpu.CpuIdlePercent())
c = self._process_notification(cpu.CpuIdlePercent(None))
self.assertEqual('compute.node.cpu.idle.percent', c.name)
self.assertEqual(97.24985141658965, c.volume)
def test_compute_cpu_user_percent(self):
c = self._process_notification(cpu.CpuUserPercent())
c = self._process_notification(cpu.CpuUserPercent(None))
self.assertEqual('compute.node.cpu.user.percent', c.name)
self.assertEqual(1.2959045637294348, c.volume)
def test_compute_cpu_iowait_percent(self):
c = self._process_notification(cpu.CpuIowaitPercent())
c = self._process_notification(cpu.CpuIowaitPercent(None))
self.assertEqual('compute.node.cpu.iowait.percent', c.name)
self.assertEqual(0.8701235234910634, c.volume)
def test_compute_cpu_percent(self):
c = self._process_notification(cpu.CpuPercent())
c = self._process_notification(cpu.CpuPercent(None))
self.assertEqual('compute.node.cpu.percent', c.name)
self.assertEqual(2.7501485834103515, c.volume)

View File

@@ -597,7 +597,7 @@ INSTANCE_SCHEDULED = {
class TestNotifications(test.BaseTestCase):
def test_process_notification(self):
info = list(instance.Instance().process_notification(
info = list(instance.Instance(None).process_notification(
INSTANCE_CREATE_END
))[0]
for name, actual, expected in [
@@ -629,42 +629,42 @@ class TestNotifications(test.BaseTestCase):
self.assertNotIn('foo.bar', user_meta)
def test_instance_create_instance(self):
ic = instance.Instance()
ic = instance.Instance(None)
counters = list(ic.process_notification(INSTANCE_CREATE_END))
self.assertEqual(1, len(counters))
c = counters[0]
self.assertEqual(1, c.volume)
def test_instance_create_flavor(self):
ic = instance.InstanceFlavor()
ic = instance.InstanceFlavor(None)
counters = list(ic.process_notification(INSTANCE_CREATE_END))
self.assertEqual(1, len(counters))
c = counters[0]
self.assertEqual(1, c.volume)
def test_instance_create_memory(self):
ic = instance.Memory()
ic = instance.Memory(None)
counters = list(ic.process_notification(INSTANCE_CREATE_END))
self.assertEqual(1, len(counters))
c = counters[0]
self.assertEqual(INSTANCE_CREATE_END['payload']['memory_mb'], c.volume)
def test_instance_create_vcpus(self):
ic = instance.VCpus()
ic = instance.VCpus(None)
counters = list(ic.process_notification(INSTANCE_CREATE_END))
self.assertEqual(1, len(counters))
c = counters[0]
self.assertEqual(INSTANCE_CREATE_END['payload']['vcpus'], c.volume)
def test_instance_create_root_disk_size(self):
ic = instance.RootDiskSize()
ic = instance.RootDiskSize(None)
counters = list(ic.process_notification(INSTANCE_CREATE_END))
self.assertEqual(1, len(counters))
c = counters[0]
self.assertEqual(INSTANCE_CREATE_END['payload']['root_gb'], c.volume)
def test_instance_create_ephemeral_disk_size(self):
ic = instance.EphemeralDiskSize()
ic = instance.EphemeralDiskSize(None)
counters = list(ic.process_notification(INSTANCE_CREATE_END))
self.assertEqual(1, len(counters))
c = counters[0]
@@ -672,32 +672,32 @@ class TestNotifications(test.BaseTestCase):
c.volume)
def test_instance_exists_instance(self):
ic = instance.Instance()
ic = instance.Instance(None)
counters = list(ic.process_notification(INSTANCE_EXISTS))
self.assertEqual(1, len(counters))
def test_instance_exists_metadata_list(self):
ic = instance.Instance()
ic = instance.Instance(None)
counters = list(ic.process_notification(INSTANCE_EXISTS_METADATA_LIST))
self.assertEqual(1, len(counters))
def test_instance_exists_flavor(self):
ic = instance.Instance()
ic = instance.Instance(None)
counters = list(ic.process_notification(INSTANCE_EXISTS))
self.assertEqual(1, len(counters))
def test_instance_delete_instance(self):
ic = instance.Instance()
ic = instance.Instance(None)
counters = list(ic.process_notification(INSTANCE_DELETE_START))
self.assertEqual(1, len(counters))
def test_instance_delete_flavor(self):
ic = instance.Instance()
ic = instance.Instance(None)
counters = list(ic.process_notification(INSTANCE_DELETE_START))
self.assertEqual(1, len(counters))
def test_instance_finish_resize_instance(self):
ic = instance.Instance()
ic = instance.Instance(None)
counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END))
self.assertEqual(1, len(counters))
c = counters[0]
@@ -705,7 +705,7 @@ class TestNotifications(test.BaseTestCase):
self._verify_user_metadata(c.resource_metadata)
def test_instance_finish_resize_flavor(self):
ic = instance.InstanceFlavor()
ic = instance.InstanceFlavor(None)
counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END))
self.assertEqual(1, len(counters))
c = counters[0]
@@ -714,7 +714,7 @@ class TestNotifications(test.BaseTestCase):
self._verify_user_metadata(c.resource_metadata)
def test_instance_finish_resize_memory(self):
ic = instance.Memory()
ic = instance.Memory(None)
counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END))
self.assertEqual(1, len(counters))
c = counters[0]
@@ -723,7 +723,7 @@ class TestNotifications(test.BaseTestCase):
self._verify_user_metadata(c.resource_metadata)
def test_instance_finish_resize_vcpus(self):
ic = instance.VCpus()
ic = instance.VCpus(None)
counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END))
self.assertEqual(1, len(counters))
c = counters[0]
@@ -732,7 +732,7 @@ class TestNotifications(test.BaseTestCase):
self._verify_user_metadata(c.resource_metadata)
def test_instance_resize_finish_instance(self):
ic = instance.Instance()
ic = instance.Instance(None)
counters = list(ic.process_notification(INSTANCE_FINISH_RESIZE_END))
self.assertEqual(1, len(counters))
c = counters[0]
@@ -740,7 +740,7 @@ class TestNotifications(test.BaseTestCase):
self._verify_user_metadata(c.resource_metadata)
def test_instance_resize_finish_flavor(self):
ic = instance.InstanceFlavor()
ic = instance.InstanceFlavor(None)
counters = list(ic.process_notification(INSTANCE_RESIZE_REVERT_END))
self.assertEqual(1, len(counters))
c = counters[0]
@@ -749,7 +749,7 @@ class TestNotifications(test.BaseTestCase):
self._verify_user_metadata(c.resource_metadata)
def test_instance_resize_finish_memory(self):
ic = instance.Memory()
ic = instance.Memory(None)
counters = list(ic.process_notification(INSTANCE_RESIZE_REVERT_END))
self.assertEqual(1, len(counters))
c = counters[0]
@@ -758,7 +758,7 @@ class TestNotifications(test.BaseTestCase):
self._verify_user_metadata(c.resource_metadata)
def test_instance_resize_finish_vcpus(self):
ic = instance.VCpus()
ic = instance.VCpus(None)
counters = list(ic.process_notification(INSTANCE_RESIZE_REVERT_END))
self.assertEqual(1, len(counters))
c = counters[0]
@@ -767,7 +767,7 @@ class TestNotifications(test.BaseTestCase):
self._verify_user_metadata(c.resource_metadata)
def test_instance_delete_samples(self):
ic = instance.InstanceDelete()
ic = instance.InstanceDelete(None)
counters = list(ic.process_notification(INSTANCE_DELETE_SAMPLES))
self.assertEqual(2, len(counters))
names = [c.name for c in counters]
@@ -776,7 +776,7 @@ class TestNotifications(test.BaseTestCase):
self._verify_user_metadata(c.resource_metadata)
def test_instance_scheduled(self):
ic = instance.InstanceScheduled()
ic = instance.InstanceScheduled(None)
self.assertIn(INSTANCE_SCHEDULED['event_type'],
ic.event_types)

View File

@@ -0,0 +1,123 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for Ceilometer notify daemon."""
import mock
import oslo.messaging
from stevedore import extension
from ceilometer.event import endpoint as event_endpoint
from ceilometer import messaging
from ceilometer.openstack.common.fixture import config
from ceilometer.storage import models
from ceilometer.tests import base as tests_base
TEST_NOTICE_CTXT = {
u'auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
u'is_admin': True,
u'project_id': u'7c150a59fe714e6f9263774af9688f0e',
u'quota_class': None,
u'read_deleted': u'no',
u'remote_address': u'10.0.2.15',
u'request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
u'roles': [u'admin'],
u'timestamp': u'2012-05-08T20:23:41.425105',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
}
TEST_NOTICE_METADATA = {
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
TEST_NOTICE_PAYLOAD = {
u'created_at': u'2012-05-08 20:23:41',
u'deleted_at': u'',
u'disk_gb': 0,
u'display_name': u'testme',
u'fixed_ips': [{u'address': u'10.0.0.2',
u'floating_ips': [],
u'meta': {},
u'type': u'fixed',
u'version': 4}],
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
u'instance_type': u'm1.tiny',
u'instance_type_id': 2,
u'launched_at': u'2012-05-08 20:23:47.985999',
u'memory_mb': 512,
u'state': u'active',
u'state_description': u'',
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
u'vcpus': 1,
u'root_gb': 0,
u'ephemeral_gb': 0,
u'host': u'compute-host-name',
u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
u'os_type': u'linux?',
u'architecture': u'x86',
u'image_ref': u'UUID',
u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
}
class TestEventEndpoint(tests_base.BaseTestCase):
def setUp(self):
super(TestEventEndpoint, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
self.CONF([])
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override("store_events", True, group="notification")
self.endpoint = event_endpoint.EventsNotificationEndpoint()
self.mock_dispatcher = mock.MagicMock()
self.endpoint.event_converter = mock.MagicMock()
self.endpoint.event_converter.to_event.return_value = mock.MagicMock(
event_type='test.test')
self.endpoint.dispatcher_manager = \
extension.ExtensionManager.make_test_instance([
extension.Extension('test', None, None, self.mock_dispatcher)
])
def test_message_to_event(self):
self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
def test_message_to_event_duplicate(self):
self.mock_dispatcher.record_events.return_value = [
(models.Event.DUPLICATE, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.endpoint.process_notification(message) # Should return silently.
def test_message_to_event_bad_event(self):
self.CONF.set_override("ack_on_event_error", False,
group="notification")
self.mock_dispatcher.record_events.return_value = [
(models.Event.UNKNOWN_PROBLEM, object())]
message = {'event_type': "foo", 'message_id': "abc"}
ret = self.endpoint.process_notification(message)
self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, ret)

View File

@@ -19,6 +19,8 @@
import datetime
import mock
from ceilometer.image import notifications
from ceilometer.openstack.common import test
from ceilometer import sample
@@ -101,7 +103,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(u'images.example.com', metadata.get('host'))
def test_image_download(self):
handler = notifications.ImageDownload()
handler = notifications.ImageDownload(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_SEND))
self.assertEqual(1, len(counters))
download = counters[0]
@@ -111,7 +113,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_DELTA, download.type)
def test_image_serve(self):
handler = notifications.ImageServe()
handler = notifications.ImageServe(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_SEND))
self.assertEqual(1, len(counters))
serve = counters[0]
@@ -124,7 +126,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_DELTA, serve.type)
def test_image_crud_on_update(self):
handler = notifications.ImageCRUD()
handler = notifications.ImageCRUD(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_UPDATE))
self.assertEqual(1, len(counters))
update = counters[0]
@@ -132,7 +134,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_DELTA, update.type)
def test_image_on_update(self):
handler = notifications.Image()
handler = notifications.Image(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_UPDATE))
self.assertEqual(1, len(counters))
update = counters[0]
@@ -140,7 +142,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_GAUGE, update.type)
def test_image_size_on_update(self):
handler = notifications.ImageSize()
handler = notifications.ImageSize(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_UPDATE))
self.assertEqual(1, len(counters))
update = counters[0]
@@ -149,7 +151,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_GAUGE, update.type)
def test_image_crud_on_upload(self):
handler = notifications.ImageCRUD()
handler = notifications.ImageCRUD(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_UPLOAD))
self.assertEqual(1, len(counters))
upload = counters[0]
@@ -157,7 +159,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_DELTA, upload.type)
def test_image_on_upload(self):
handler = notifications.Image()
handler = notifications.Image(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_UPLOAD))
self.assertEqual(1, len(counters))
upload = counters[0]
@@ -165,7 +167,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_GAUGE, upload.type)
def test_image_size_on_upload(self):
handler = notifications.ImageSize()
handler = notifications.ImageSize(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_UPLOAD))
self.assertEqual(1, len(counters))
upload = counters[0]
@@ -174,7 +176,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_GAUGE, upload.type)
def test_image_crud_on_delete(self):
handler = notifications.ImageCRUD()
handler = notifications.ImageCRUD(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_DELETE))
self.assertEqual(1, len(counters))
delete = counters[0]
@@ -182,7 +184,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_DELTA, delete.type)
def test_image_on_delete(self):
handler = notifications.Image()
handler = notifications.Image(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_DELETE))
self.assertEqual(1, len(counters))
delete = counters[0]
@@ -190,7 +192,7 @@ class TestNotification(test.BaseTestCase):
self.assertEqual(sample.TYPE_GAUGE, delete.type)
def test_image_size_on_delete(self):
handler = notifications.ImageSize()
handler = notifications.ImageSize(mock.Mock())
counters = list(handler.process_notification(NOTIFICATION_DELETE))
self.assertEqual(1, len(counters))
delete = counters[0]

View File

@@ -18,6 +18,8 @@
"""Tests for ceilometer.network.notifications
"""
import mock
from ceilometer.network import notifications
from ceilometer.tests import base as test
@@ -250,53 +252,53 @@ NOTIFICATION_L3_METER = {
class TestNotifications(test.BaseTestCase):
def test_network_create(self):
v = notifications.Network()
v = notifications.Network(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_NETWORK_CREATE))
self.assertEqual(2, len(samples))
self.assertEqual("network.create", samples[1].name)
def test_subnet_create(self):
v = notifications.Subnet()
v = notifications.Subnet(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_SUBNET_CREATE))
self.assertEqual(2, len(samples))
self.assertEqual("subnet.create", samples[1].name)
def test_port_create(self):
v = notifications.Port()
v = notifications.Port(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_PORT_CREATE))
self.assertEqual(2, len(samples))
self.assertEqual("port.create", samples[1].name)
def test_port_update(self):
v = notifications.Port()
v = notifications.Port(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_PORT_UPDATE))
self.assertEqual(2, len(samples))
self.assertEqual("port.update", samples[1].name)
def test_network_exists(self):
v = notifications.Network()
v = notifications.Network(mock.Mock())
samples = v.process_notification(NOTIFICATION_NETWORK_EXISTS)
self.assertEqual(1, len(list(samples)))
def test_router_exists(self):
v = notifications.Router()
v = notifications.Router(mock.Mock())
samples = v.process_notification(NOTIFICATION_ROUTER_EXISTS)
self.assertEqual(1, len(list(samples)))
def test_floatingip_exists(self):
v = notifications.FloatingIP()
v = notifications.FloatingIP(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_FLOATINGIP_EXISTS))
self.assertEqual(1, len(samples))
self.assertEqual("ip.floating", samples[0].name)
def test_floatingip_update(self):
v = notifications.FloatingIP()
v = notifications.FloatingIP(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_FLOATINGIP_UPDATE))
self.assertEqual(len(samples), 2)
self.assertEqual(samples[0].name, "ip.floating")
def test_metering_report(self):
v = notifications.Bandwidth()
v = notifications.Bandwidth(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_L3_METER))
self.assertEqual(1, len(samples))
self.assertEqual("bandwidth", samples[0].name)
@@ -305,25 +307,25 @@ class TestNotifications(test.BaseTestCase):
class TestEventTypes(test.BaseTestCase):
def test_network(self):
v = notifications.Network()
v = notifications.Network(mock.Mock())
events = v.event_types
self.assertIsNotEmpty(events)
def test_subnet(self):
v = notifications.Subnet()
v = notifications.Subnet(mock.Mock())
events = v.event_types
self.assertIsNotEmpty(events)
def test_port(self):
v = notifications.Port()
v = notifications.Port(mock.Mock())
events = v.event_types
self.assertIsNotEmpty(events)
def test_router(self):
self.assertTrue(notifications.Router().event_types)
self.assertTrue(notifications.Router(mock.Mock()).event_types)
def test_floatingip(self):
self.assertTrue(notifications.FloatingIP().event_types)
self.assertTrue(notifications.FloatingIP(mock.Mock()).event_types)
def test_bandwidth(self):
self.assertTrue(notifications.Bandwidth().event_types)
self.assertTrue(notifications.Bandwidth(mock.Mock()).event_types)

View File

@@ -14,6 +14,7 @@
import datetime
import mock
from oslo.config import cfg
from ceilometer.openstack.common import test
@@ -102,7 +103,7 @@ class TestNotification(test.BaseTestCase):
def _test_operation(self, operation, trust=None):
notif = stack_notification_for(operation, trust)
handler = notifications.StackCRUD()
handler = notifications.StackCRUD(mock.Mock())
data = list(handler.process_notification(notif))
self.assertEqual(len(data), 1)
if trust:

View File

@@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ceilometer import middleware
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
@@ -74,7 +76,7 @@ class TestNotifications(test.BaseTestCase):
self.CONF = self.useFixture(config.Config()).conf
def test_process_request_notification(self):
sample = list(middleware.HTTPRequest().process_notification(
sample = list(middleware.HTTPRequest(mock.Mock()).process_notification(
HTTP_REQUEST
))[0]
self.assertEqual(HTTP_REQUEST['payload']['request']['HTTP_X_USER_ID'],
@@ -86,9 +88,8 @@ class TestNotifications(test.BaseTestCase):
self.assertEqual(1, sample.volume)
def test_process_response_notification(self):
sample = list(middleware.HTTPResponse().process_notification(
HTTP_RESPONSE
))[0]
sample = list(middleware.HTTPResponse(
mock.Mock()).process_notification(HTTP_RESPONSE))[0]
self.assertEqual(HTTP_RESPONSE['payload']['request']['HTTP_X_USER_ID'],
sample.user_id)
self.assertEqual(HTTP_RESPONSE['payload']['request']
@@ -98,5 +99,5 @@ class TestNotifications(test.BaseTestCase):
self.assertEqual(1, sample.volume)
def test_targets(self):
targets = middleware.HTTPRequest().get_targets(self.CONF)
targets = middleware.HTTPRequest(mock.Mock()).get_targets(self.CONF)
self.assertEqual(4, len(targets))

View File

@@ -26,7 +26,6 @@ from ceilometer.compute.notifications import instance
from ceilometer import messaging
from ceilometer import notification
from ceilometer.openstack.common.fixture import config
from ceilometer.storage import models
from ceilometer.tests import base as tests_base
TEST_NOTICE_CTXT = {
@@ -89,79 +88,52 @@ class TestNotification(tests_base.BaseTestCase):
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override("store_events", False, group="notification")
self.srv = notification.NotificationService()
def _make_test_manager(self, plugin):
def fake_get_notifications_manager(self, pm):
self.plugin = instance.Instance(pm)
return extension.ExtensionManager.make_test_instance(
[
extension.Extension('test',
None,
None,
plugin),
self.plugin)
]
)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@mock.patch('ceilometer.event.converter.setup_events', mock.MagicMock())
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
mock.MagicMock())
@mock.patch('ceilometer.event.endpoint.EventsNotificationEndpoint')
def _do_process_notification_manager_start(self,
fake_event_endpoint_class):
with mock.patch.object(self.srv, '_get_notifications_manager') \
as get_nm:
get_nm.side_effect = self.fake_get_notifications_manager
self.srv.start()
self.fake_event_endpoint = fake_event_endpoint_class.return_value
def test_process_notification(self):
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the service
# configuration.
self.CONF.set_override("store_events", False, group="notification")
self.srv.start()
self._do_process_notification_manager_start()
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
self.srv.notification_manager = self._make_test_manager(
instance.Instance()
)
self.srv.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
self.plugin.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
self.assertTrue(
self.srv.pipeline_manager.publisher.called)
self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints))
self.assertTrue(self.srv.pipeline_manager.publisher.called)
def test_process_notification_no_events(self):
self.CONF.set_override("store_events", False, group="notification")
self.srv.notification_manager = mock.MagicMock()
with mock.patch.object(self.srv,
'_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertFalse(fake_msg_to_event.called)
self._do_process_notification_manager_start()
self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints))
self.assertNotEqual(self.fake_event_endpoint,
self.srv.listener.dispatcher.endpoints[0])
def test_process_notification_with_events(self):
self.CONF.set_override("store_events", True, group="notification")
self.srv.notification_manager = mock.MagicMock()
with mock.patch.object(self.srv,
'_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertTrue(fake_msg_to_event.called)
def test_message_to_event_duplicate(self):
self.CONF.set_override("store_events", True, group="notification")
mock_dispatcher = mock.MagicMock()
self.srv.event_converter = mock.MagicMock()
self.srv.event_converter.to_event.return_value = mock.MagicMock(
event_type='test.test')
self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
mock_dispatcher.record_events.return_value = [
(models.Event.DUPLICATE, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.srv._message_to_event(message) # Should return silently.
def test_message_to_event_bad_event(self):
self.CONF.set_override("store_events", True, group="notification")
self.CONF.set_override("ack_on_event_error", False,
group="notification")
mock_dispatcher = mock.MagicMock()
self.srv.event_converter = mock.MagicMock()
self.srv.event_converter.to_event.return_value = mock.MagicMock(
event_type='test.test')
self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
mock_dispatcher.record_events.return_value = [
(models.Event.UNKNOWN_PROBLEM, object())]
message = {'event_type': "foo", 'message_id': "abc"}
ret = self.srv._message_to_event(message)
self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, ret)
self._do_process_notification_manager_start()
self.assertEqual(2, len(self.srv.listener.dispatcher.endpoints))
self.assertEqual(self.fake_event_endpoint,
self.srv.listener.dispatcher.endpoints[0])

View File

@@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
from ceilometer import plugin
@@ -110,14 +112,26 @@ class NotificationBaseTestCase(test.BaseTestCase):
class FakeNetworkPlugin(FakePlugin):
event_types = ['network.*']
def test_to_samples(self):
c = self.FakeComputePlugin()
n = self.FakeNetworkPlugin()
self.assertTrue(len(list(c.to_samples(TEST_NOTIFICATION))) > 0)
self.assertEqual(0, len(list(n.to_samples(TEST_NOTIFICATION))))
def _do_test_to_samples(self, plugin_class, match):
pm = mock.MagicMock()
plug = plugin_class(pm)
publish = pm.publisher.return_value.__enter__.return_value
plug.to_samples_and_publish(mock.Mock(), TEST_NOTIFICATION)
if match:
publish.assert_called_once_with(list(TEST_NOTIFICATION))
else:
self.assertEqual(0, publish.call_count)
def test_to_samples_match(self):
self._do_test_to_samples(self.FakeComputePlugin, True)
def test_to_samples_no_match(self):
self._do_test_to_samples(self.FakeNetworkPlugin, False)
def test_get_targets_compat(self):
targets = self.FakeComputePlugin().get_targets(self.CONF)
targets = self.FakeComputePlugin(mock.Mock()).get_targets(self.CONF)
self.assertEqual(3, len(targets))
self.assertEqual('t1', targets[0].topic)
self.assertEqual('exchange1', targets[0].exchange)

View File

@@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ceilometer.openstack.common import test
from ceilometer.volume import notifications
@@ -111,7 +113,7 @@ class TestNotifications(test.BaseTestCase):
self.assertEqual(notification['publisher_id'], metadata.get('host'))
def test_volume_exists(self):
v = notifications.Volume()
v = notifications.Volume(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_VOLUME_EXISTS))
self.assertEqual(1, len(samples))
s = samples[0]
@@ -119,7 +121,7 @@ class TestNotifications(test.BaseTestCase):
self.assertEqual(1, s.volume)
def test_volume_size_exists(self):
v = notifications.VolumeSize()
v = notifications.VolumeSize(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_VOLUME_EXISTS))
self.assertEqual(1, len(samples))
s = samples[0]
@@ -129,7 +131,7 @@ class TestNotifications(test.BaseTestCase):
s.volume)
def test_volume_delete(self):
v = notifications.Volume()
v = notifications.Volume(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_VOLUME_DELETE))
self.assertEqual(1, len(samples))
s = samples[0]
@@ -137,7 +139,7 @@ class TestNotifications(test.BaseTestCase):
self.assertEqual(1, s.volume)
def test_volume_size_delete(self):
v = notifications.VolumeSize()
v = notifications.VolumeSize(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_VOLUME_DELETE))
self.assertEqual(1, len(samples))
s = samples[0]
@@ -147,7 +149,7 @@ class TestNotifications(test.BaseTestCase):
s.volume)
def test_volume_resize(self):
v = notifications.Volume()
v = notifications.Volume(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_VOLUME_RESIZE))
self.assertEqual(1, len(samples))
s = samples[0]
@@ -155,7 +157,7 @@ class TestNotifications(test.BaseTestCase):
self.assertEqual(1, s.volume)
def test_volume_size_resize(self):
v = notifications.VolumeSize()
v = notifications.VolumeSize(mock.Mock())
samples = list(v.process_notification(NOTIFICATION_VOLUME_RESIZE))
self.assertEqual(1, len(samples))
s = samples[0]