As the events API gets fleshed out and more robust, it will become increasingly necessary to ensure that the database schema modeling events has the ability to quickly retrieve a list of event types. With the current schema having a foreign key from event to unique_name (the monolithic generic string key lookup table), in order to get a simple list of event types, one needs to do the following: SELECT DISTINCT e.unique_name FROM event; Not only is this inefficient, but the readability of the model and database schema suffers from the generalization of the unique name field relationship. With this patch, a new event_type table is added to the schema to allow for quick and easy lookups of event types like so: SELECT id, desc FROM event_type; This will lead to future ability to categorize event types with ease. Instead of adding columns to the monolithic unique_name table, we would only need to add a foreign key relationship to the new event_type table tying the event_type to a future event_type_class table... This patch also renames "event_name" to "event_type" everywhere to avoid confusion. Change-Id: I6e630ec534f16ba1bb9370d1859ae7640fc6b05b Partial-Bug: 1211015
		
			
				
	
	
		
			300 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			300 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- encoding: utf-8 -*-
 | 
						|
#
 | 
						|
# Copyright © 2012-2013 eNovance <licensing@enovance.com>
 | 
						|
#
 | 
						|
# Author: Julien Danjou <julien@danjou.info>
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
# not use this file except in compliance with the License. You may obtain
 | 
						|
# a copy of the License at
 | 
						|
#
 | 
						|
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
# License for the specific language governing permissions and limitations
 | 
						|
# under the License.
 | 
						|
 | 
						|
import socket
 | 
						|
 | 
						|
import msgpack
 | 
						|
from oslo.config import cfg
 | 
						|
from stevedore import extension
 | 
						|
from stevedore import named
 | 
						|
 | 
						|
from ceilometer.openstack.common import context
 | 
						|
from ceilometer.openstack.common.gettextutils import _  # noqa
 | 
						|
from ceilometer.openstack.common import log
 | 
						|
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
 | 
						|
from ceilometer.openstack.common.rpc import service as rpc_service
 | 
						|
from ceilometer.openstack.common import service as os_service
 | 
						|
from ceilometer.openstack.common import timeutils
 | 
						|
from ceilometer import pipeline
 | 
						|
from ceilometer import service
 | 
						|
from ceilometer.storage import models
 | 
						|
from ceilometer import transformer
 | 
						|
 | 
						|
OPTS = [
 | 
						|
    cfg.StrOpt('udp_address',
 | 
						|
               default='0.0.0.0',
 | 
						|
               help='address to bind the UDP socket to'
 | 
						|
               'disabled if set to an empty string'),
 | 
						|
    cfg.IntOpt('udp_port',
 | 
						|
               default=4952,
 | 
						|
               help='port to bind the UDP socket to'),
 | 
						|
    cfg.BoolOpt('ack_on_event_error',
 | 
						|
                default=True,
 | 
						|
                help='Acknowledge message when event persistence fails'),
 | 
						|
    cfg.BoolOpt('store_events',
 | 
						|
                default=False,
 | 
						|
                help='Save event details'),
 | 
						|
    cfg.MultiStrOpt('dispatcher',
 | 
						|
                    default=['database'],
 | 
						|
                    help='dispatcher to process metering data'),
 | 
						|
]
 | 
						|
 | 
						|
cfg.CONF.register_opts(OPTS, group="collector")
 | 
						|
 | 
						|
LOG = log.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class CollectorBase(object):
 | 
						|
 | 
						|
    DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
 | 
						|
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        super(CollectorBase, self).__init__(*args, **kwargs)
 | 
						|
        LOG.debug('loading dispatchers from %s',
 | 
						|
                  self.DISPATCHER_NAMESPACE)
 | 
						|
        self.dispatcher_manager = named.NamedExtensionManager(
 | 
						|
            namespace=self.DISPATCHER_NAMESPACE,
 | 
						|
            names=cfg.CONF.collector.dispatcher,
 | 
						|
            invoke_on_load=True,
 | 
						|
            invoke_args=[cfg.CONF])
 | 
						|
        if not list(self.dispatcher_manager):
 | 
						|
            LOG.warning('Failed to load any dispatchers for %s',
 | 
						|
                        self.DISPATCHER_NAMESPACE)
 | 
						|
 | 
						|
 | 
						|
class UDPCollectorService(CollectorBase, os_service.Service):
 | 
						|
    """UDP listener for the collector service."""
 | 
						|
 | 
						|
    def start(self):
 | 
						|
        """Bind the UDP socket and handle incoming data."""
 | 
						|
        super(UDPCollectorService, self).start()
 | 
						|
 | 
						|
        udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | 
						|
        udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | 
						|
        udp.bind((cfg.CONF.collector.udp_address,
 | 
						|
                  cfg.CONF.collector.udp_port))
 | 
						|
 | 
						|
        self.running = True
 | 
						|
        while self.running:
 | 
						|
            # NOTE(jd) Arbitrary limit of 64K because that ought to be
 | 
						|
            # enough for anybody.
 | 
						|
            data, source = udp.recvfrom(64 * 1024)
 | 
						|
            try:
 | 
						|
                sample = msgpack.loads(data)
 | 
						|
            except Exception:
 | 
						|
                LOG.warn(_("UDP: Cannot decode data sent by %s"), str(source))
 | 
						|
            else:
 | 
						|
                try:
 | 
						|
                    sample['counter_name'] = sample['name']
 | 
						|
                    sample['counter_volume'] = sample['volume']
 | 
						|
                    sample['counter_unit'] = sample['unit']
 | 
						|
                    sample['counter_type'] = sample['type']
 | 
						|
                    LOG.debug("UDP: Storing %s", str(sample))
 | 
						|
                    self.dispatcher_manager.map(
 | 
						|
                        lambda ext, data: ext.obj.record_metering_data(data),
 | 
						|
                        sample)
 | 
						|
                except Exception:
 | 
						|
                    LOG.exception(_("UDP: Unable to store meter"))
 | 
						|
 | 
						|
    def stop(self):
 | 
						|
        self.running = False
 | 
						|
        super(UDPCollectorService, self).stop()
 | 
						|
 | 
						|
 | 
						|
def udp_collector():
 | 
						|
    service.prepare_service()
 | 
						|
    os_service.launch(UDPCollectorService()).wait()
 | 
						|
 | 
						|
 | 
						|
class UnableToSaveEventException(Exception):
 | 
						|
    """Thrown when we want to requeue an event.
 | 
						|
 | 
						|
    Any exception is fine, but this one should make debugging
 | 
						|
    a little easier.
 | 
						|
    """
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class CollectorService(CollectorBase, rpc_service.Service):
 | 
						|
 | 
						|
    COLLECTOR_NAMESPACE = 'ceilometer.collector'
 | 
						|
 | 
						|
    def start(self):
 | 
						|
        super(CollectorService, self).start()
 | 
						|
        # Add a dummy thread to have wait() working
 | 
						|
        self.tg.add_timer(604800, lambda: None)
 | 
						|
 | 
						|
    def initialize_service_hook(self, service):
 | 
						|
        '''Consumers must be declared before consume_thread start.'''
 | 
						|
        LOG.debug('initialize_service_hooks')
 | 
						|
        self.pipeline_manager = pipeline.setup_pipeline(
 | 
						|
            transformer.TransformerExtensionManager(
 | 
						|
                'ceilometer.transformer',
 | 
						|
            ),
 | 
						|
        )
 | 
						|
 | 
						|
        LOG.debug('loading notification handlers from %s',
 | 
						|
                  self.COLLECTOR_NAMESPACE)
 | 
						|
        self.notification_manager = \
 | 
						|
            extension.ExtensionManager(
 | 
						|
                namespace=self.COLLECTOR_NAMESPACE,
 | 
						|
                invoke_on_load=True,
 | 
						|
            )
 | 
						|
 | 
						|
        if not list(self.notification_manager):
 | 
						|
            LOG.warning('Failed to load any notification handlers for %s',
 | 
						|
                        self.COLLECTOR_NAMESPACE)
 | 
						|
        self.notification_manager.map(self._setup_subscription)
 | 
						|
 | 
						|
        # Set ourselves up as a separate worker for the metering data,
 | 
						|
        # since the default for service is to use create_consumer().
 | 
						|
        self.conn.create_worker(
 | 
						|
            cfg.CONF.publisher_rpc.metering_topic,
 | 
						|
            rpc_dispatcher.RpcDispatcher([self]),
 | 
						|
            'ceilometer.collector.' + cfg.CONF.publisher_rpc.metering_topic,
 | 
						|
        )
 | 
						|
 | 
						|
    def _setup_subscription(self, ext, *args, **kwds):
 | 
						|
        """Connect to message bus to get notifications
 | 
						|
 | 
						|
        Configure the RPC connection to listen for messages on the
 | 
						|
        right exchanges and topics so we receive all of the
 | 
						|
        notifications.
 | 
						|
 | 
						|
        Use a connection pool so that multiple collector instances can
 | 
						|
        run in parallel to share load and without competing with each
 | 
						|
        other for incoming messages.
 | 
						|
        """
 | 
						|
        handler = ext.obj
 | 
						|
        ack_on_error = cfg.CONF.collector.ack_on_event_error
 | 
						|
        LOG.debug('Event types from %s: %s (ack_on_error=%s)',
 | 
						|
                  ext.name, ', '.join(handler.event_types),
 | 
						|
                  ack_on_error)
 | 
						|
 | 
						|
        for exchange_topic in handler.get_exchange_topics(cfg.CONF):
 | 
						|
            for topic in exchange_topic.topics:
 | 
						|
                try:
 | 
						|
                    self.conn.join_consumer_pool(
 | 
						|
                        callback=self.process_notification,
 | 
						|
                        pool_name=topic,
 | 
						|
                        topic=topic,
 | 
						|
                        exchange_name=exchange_topic.exchange,
 | 
						|
                        ack_on_error=ack_on_error)
 | 
						|
                except Exception:
 | 
						|
                    LOG.exception('Could not join consumer pool %s/%s' %
 | 
						|
                                  (topic, exchange_topic.exchange))
 | 
						|
 | 
						|
    def record_metering_data(self, context, data):
 | 
						|
        """RPC endpoint for messages we send to ourself
 | 
						|
 | 
						|
        When the notification messages are re-published through the
 | 
						|
        RPC publisher, this method receives them for processing.
 | 
						|
        """
 | 
						|
        self.dispatcher_manager.map(self._record_metering_data_for_ext,
 | 
						|
                                    context=context,
 | 
						|
                                    data=data)
 | 
						|
 | 
						|
    def process_notification(self, notification):
 | 
						|
        """RPC endpoint for notification messages
 | 
						|
 | 
						|
        When another service sends a notification over the message
 | 
						|
        bus, this method receives it. See _setup_subscription().
 | 
						|
 | 
						|
        """
 | 
						|
        LOG.debug('notification %r', notification.get('event_type'))
 | 
						|
        self.notification_manager.map(self._process_notification_for_ext,
 | 
						|
                                      notification=notification)
 | 
						|
 | 
						|
        if cfg.CONF.collector.store_events:
 | 
						|
            self._message_to_event(notification)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _extract_when(body):
 | 
						|
        """Extract the generated datetime from the notification.
 | 
						|
        """
 | 
						|
        when = body.get('timestamp', body.get('_context_timestamp'))
 | 
						|
        if when:
 | 
						|
            return timeutils.normalize_time(timeutils.parse_isotime(when))
 | 
						|
 | 
						|
        return timeutils.utcnow()
 | 
						|
 | 
						|
    def _message_to_event(self, body):
 | 
						|
        """Convert message to Ceilometer Event.
 | 
						|
 | 
						|
        NOTE: this is currently based on the Nova notification format.
 | 
						|
        We will need to make this driver-based to support other formats.
 | 
						|
 | 
						|
        NOTE: the rpc layer currently rips out the notification
 | 
						|
        delivery_info, which is critical to determining the
 | 
						|
        source of the notification. This will have to get added back later.
 | 
						|
        """
 | 
						|
        message_id = body.get('message_id')
 | 
						|
        event_type = body['event_type']
 | 
						|
        when = self._extract_when(body)
 | 
						|
        LOG.debug('Saving event "%s"', event_type)
 | 
						|
 | 
						|
        publisher = body.get('publisher_id')
 | 
						|
        request_id = body.get('_context_request_id')
 | 
						|
        tenant_id = body.get('_context_tenant')
 | 
						|
 | 
						|
        text = models.Trait.TEXT_TYPE
 | 
						|
        all_traits = [models.Trait('service', text, publisher),
 | 
						|
                      models.Trait('request_id', text, request_id),
 | 
						|
                      models.Trait('tenant_id', text, tenant_id),
 | 
						|
                      ]
 | 
						|
        # Only store non-None value traits ...
 | 
						|
        traits = [trait for trait in all_traits if trait.value is not None]
 | 
						|
 | 
						|
        event = models.Event(message_id, event_type, when, traits)
 | 
						|
 | 
						|
        problem_events = []
 | 
						|
        for dispatcher in self.dispatcher_manager:
 | 
						|
            problem_events.extend(dispatcher.obj.record_events(event))
 | 
						|
        if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]:
 | 
						|
            # Don't ack the message, raise to requeue it
 | 
						|
            # if ack_on_error = False
 | 
						|
            raise UnableToSaveEventException()
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _record_metering_data_for_ext(ext, context, data):
 | 
						|
        """Wrapper for calling dispatcher plugin when a sample arrives
 | 
						|
 | 
						|
        When a message is received by record_metering_data(), it calls
 | 
						|
        this method with each plugin to allow it to process the data.
 | 
						|
 | 
						|
        """
 | 
						|
        ext.obj.record_metering_data(context, data)
 | 
						|
 | 
						|
    def _process_notification_for_ext(self, ext, notification):
 | 
						|
        """Wrapper for calling pipelines when a notification arrives
 | 
						|
 | 
						|
        When a message is received by process_notification(), it calls
 | 
						|
        this method with each notification plugin to allow all the
 | 
						|
        plugins process the notification.
 | 
						|
 | 
						|
        """
 | 
						|
        with self.pipeline_manager.publisher(context.get_admin_context()) as p:
 | 
						|
            # FIXME(dhellmann): Spawn green thread?
 | 
						|
            p(list(ext.obj.to_samples(notification)))
 | 
						|
 | 
						|
 | 
						|
def collector():
 | 
						|
    service.prepare_service()
 | 
						|
    os_service.launch(CollectorService(cfg.CONF.host,
 | 
						|
                                       'ceilometer.collector')).wait()
 |