# -*- encoding: utf-8 -*- # # Copyright © 2012-2013 eNovance # # Author: Julien Danjou # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from oslo.config import cfg import oslo.messaging from stevedore import extension from ceilometer.event import converter as event_converter from ceilometer import messaging from ceilometer.openstack.common import context from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline from ceilometer import service from ceilometer.storage import models LOG = log.getLogger(__name__) OPTS = [ cfg.BoolOpt('ack_on_event_error', default=True, deprecated_group='collector', help='Acknowledge message when event persistence fails.'), cfg.BoolOpt('store_events', deprecated_group='collector', default=False, help='Save event details.'), ] cfg.CONF.register_opts(OPTS, group="notification") class NotificationService(service.DispatchedService, os_service.Service): NOTIFICATION_NAMESPACE = 'ceilometer.notification' def start(self): super(NotificationService, self).start() self.pipeline_manager = pipeline.setup_pipeline() self.notification_manager = extension.ExtensionManager( namespace=self.NOTIFICATION_NAMESPACE, invoke_on_load=True, ) if not list(self.notification_manager): LOG.warning(_('Failed to load any notification handlers for %s'), self.NOTIFICATION_NAMESPACE) ack_on_error = cfg.CONF.notification.ack_on_event_error targets = [] for ext in self.notification_manager: handler = ext.obj LOG.debug(_('Event types from %(name)s: %(type)s' ' (ack_on_error=%(error)s)') % {'name': ext.name, 'type': ', '.join(handler.event_types), 'error': ack_on_error}) targets.extend(handler.get_targets(cfg.CONF)) self.listener = messaging.get_notification_listener(targets, self) LOG.debug(_('Loading event definitions')) self.event_converter = event_converter.setup_events( extension.ExtensionManager( namespace='ceilometer.event.trait_plugin')) self.listener.start() # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) def stop(self): self.listener.stop() super(NotificationService, self).stop() def info(self, ctxt, publisher_id, event_type, payload, metadata): notification = messaging.convert_to_old_notification_format( 'info', ctxt, publisher_id, event_type, payload, metadata) self.process_notification(notification) def process_notification(self, notification): """RPC endpoint for notification messages When another service sends a notification over the message bus, this method receives it. See _setup_subscription(). """ LOG.debug(_('notification %r'), notification.get('event_type')) self.notification_manager.map(self._process_notification_for_ext, notification=notification) if cfg.CONF.notification.store_events: return self._message_to_event(notification) else: return oslo.messaging.NotificationResult.HANDLED def _message_to_event(self, body): """Convert message to Ceilometer Event. NOTE: the rpc layer currently rips out the notification delivery_info, which is critical to determining the source of the notification. This will have to get added back later. """ event = self.event_converter.to_event(body) if event is not None: LOG.debug(_('Saving event "%s"'), event.event_type) problem_events = [] for dispatcher in self.dispatcher_manager: problem_events.extend(dispatcher.obj.record_events(event)) if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]: if not cfg.CONF.notification.ack_on_event_error: return oslo.messaging.NotificationResult.REQUEUE return oslo.messaging.NotificationResult.HANDLED def _process_notification_for_ext(self, ext, notification): """Wrapper for calling pipelines when a notification arrives When a message is received by process_notification(), it calls this method with each notification plugin to allow all the plugins process the notification. """ with self.pipeline_manager.publisher(context.get_admin_context()) as p: # FIXME(dhellmann): Spawn green thread? p(list(ext.obj.to_samples(notification)))