diff --git a/ceilometer/event/endpoint.py b/ceilometer/event/endpoint.py index fadc8ee2..65cc7f6a 100644 --- a/ceilometer/event/endpoint.py +++ b/ceilometer/event/endpoint.py @@ -17,26 +17,29 @@ import logging import oslo.messaging from oslo_config import cfg +from oslo_context import context from stevedore import extension -import ceilometer -from ceilometer import dispatcher from ceilometer.event import converter as event_converter -from ceilometer.event.storage import models from ceilometer.i18n import _ from ceilometer import messaging +from ceilometer.publisher import utils LOG = logging.getLogger(__name__) class EventsNotificationEndpoint(object): - def __init__(self): + def __init__(self, transporter): super(EventsNotificationEndpoint, self).__init__() - self.dispatcher_manager = dispatcher.load_dispatcher_manager() LOG.debug(_('Loading event definitions')) + self.ctxt = context.get_admin_context() self.event_converter = event_converter.setup_events( extension.ExtensionManager( namespace='ceilometer.event.trait_plugin')) + self.transporter = transporter + # NOTE(gordc): if no publisher, this isn't a PipelineManager and + # data should be requeued. + self.requeue = not hasattr(transporter, 'publisher') def info(self, ctxt, publisher_id, event_type, payload, metadata): """Convert message to Ceilometer Event. @@ -56,19 +59,21 @@ class EventsNotificationEndpoint(object): self.process_notification(notification) def process_notification(self, notification): - event = self.event_converter.to_event(notification) - - if event is not None: - LOG.debug(_('Saving event "%s"'), event.event_type) - problem_events = [] - for dispatcher_ext in self.dispatcher_manager: - try: - problem_events.extend( - dispatcher_ext.obj.record_events(event)) - except ceilometer.NotImplementedError: - LOG.warn(_('Event is not implemented with the storage' - ' backend')) - if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]: - if not cfg.CONF.notification.ack_on_event_error: - return oslo.messaging.NotificationResult.REQUEUE + try: + event = self.event_converter.to_event(notification) + if event is not None: + if self.requeue: + for notifier in self.transporter: + notifier.sample( + self.ctxt.to_dict(), + event_type='pipeline.event', + payload=[utils.message_from_event( + event, cfg.CONF.publisher.metering_secret)]) + else: + with self.transporter.publisher(self.ctxt) as p: + p(event) + except Exception: + if not cfg.CONF.notification.ack_on_event_error: + return oslo.messaging.NotificationResult.REQUEUE + raise return oslo.messaging.NotificationResult.HANDLED diff --git a/ceilometer/event/storage/models.py b/ceilometer/event/storage/models.py index d31437d9..480643e6 100644 --- a/ceilometer/event/storage/models.py +++ b/ceilometer/event/storage/models.py @@ -17,6 +17,11 @@ from oslo_utils import timeutils from ceilometer.storage import base +def serialize_dt(value): + """Serializes parameter if it is datetime.""" + return value.isoformat() if hasattr(value, 'isoformat') else value + + class Event(base.Model): """A raw event from the source system. Events have Traits. @@ -51,6 +56,12 @@ class Event(base.Model): (self.message_id, self.event_type, self.generated, " ".join(trait_list))) + def serialize(self): + return {'message_id': self.message_id, + 'event_type': self.event_type, + 'generated': serialize_dt(self.generated), + 'traits': [trait.serialize() for trait in self.traits]} + class Trait(base.Model): """A Trait is a key/value pair of data on an Event. @@ -80,6 +91,9 @@ class Trait(base.Model): def __repr__(self): return "" % (self.name, self.dtype, self.value) + def serialize(self): + return self.name, self.dtype, serialize_dt(self.value) + def get_type_name(self): return self.get_name_by_type(self.dtype) diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 307e5dd0..09fa8382 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -77,22 +77,33 @@ class NotificationService(os_service.Service): invoke_args=(transporter, ) ) + def _get_notifier(self, transport, pipe): + return oslo.messaging.Notifier( + transport, + driver=cfg.CONF.publisher_notifier.metering_driver, + publisher_id='ceilometer.notification', + topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name)) + def start(self): super(NotificationService, self).start() self.pipeline_manager = pipeline.setup_pipeline() + if cfg.CONF.notification.store_events: + self.event_pipeline_manager = pipeline.setup_event_pipeline() transport = messaging.get_transport() self.partition_coordinator = coordination.PartitionCoordinator() self.partition_coordinator.start() + event_transporter = None if cfg.CONF.notification.workload_partitioning: transporter = [] for pipe in self.pipeline_manager.pipelines: - transporter.append(oslo.messaging.Notifier( - transport, - driver=cfg.CONF.publisher_notifier.metering_driver, - publisher_id='ceilometer.notification', - topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))) + transporter.append(self._get_notifier(transport, pipe)) + if cfg.CONF.notification.store_events: + event_transporter = [] + for pipe in self.event_pipeline_manager.pipelines: + event_transporter.append(self._get_notifier(transport, + pipe)) self.ctxt = context.get_admin_context() self.group_id = self.NOTIFICATION_NAMESPACE @@ -105,10 +116,12 @@ class NotificationService(os_service.Service): # beeen registered by oslo.messaging messaging.get_notifier(transport, '') transporter = self.pipeline_manager + if cfg.CONF.notification.store_events: + event_transporter = self.event_pipeline_manager self.group_id = None self.listeners = self.pipeline_listeners = [] - self._configure_main_queue_listeners(transporter) + self._configure_main_queue_listeners(transporter, event_transporter) if cfg.CONF.notification.workload_partitioning: self.partition_coordinator.join_group(self.group_id) @@ -124,10 +137,9 @@ class NotificationService(os_service.Service): # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) - def _configure_main_queue_listeners(self, transporter): - self.notification_manager = self._get_notifications_manager( - transporter) - if not list(self.notification_manager): + def _configure_main_queue_listeners(self, transporter, event_transporter): + notification_manager = self._get_notifications_manager(transporter) + if not list(notification_manager): LOG.warning(_('Failed to load any notification handlers for %s'), self.NOTIFICATION_NAMESPACE) @@ -135,10 +147,11 @@ class NotificationService(os_service.Service): endpoints = [] if cfg.CONF.notification.store_events: - endpoints = [event_endpoint.EventsNotificationEndpoint()] + endpoints.append( + event_endpoint.EventsNotificationEndpoint(event_transporter)) targets = [] - for ext in self.notification_manager: + for ext in notification_manager: handler = ext.obj LOG.debug(_('Event types from %(name)s: %(type)s' ' (ack_on_error=%(error)s)') % @@ -176,16 +189,22 @@ class NotificationService(os_service.Service): def _configure_pipeline_listeners(self): self.pipeline_listeners = [] + ev_pipes = [] + if cfg.CONF.notification.store_events: + ev_pipes = self.event_pipeline_manager.pipelines partitioned = self.partition_coordinator.extract_my_subset( - self.group_id, self.pipeline_manager.pipelines) + self.group_id, self.pipeline_manager.pipelines + ev_pipes) transport = messaging.get_transport() for pipe in partitioned: LOG.debug(_('Pipeline endpoint: %s'), pipe.name) + pipe_endpoint = (pipeline.EventPipelineEndpoint + if isinstance(pipe, pipeline.EventPipeline) else + pipeline.SamplePipelineEndpoint) listener = messaging.get_notification_listener( transport, [oslo.messaging.Target( topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))], - [pipeline.PipelineEndpoint(self.ctxt, pipe)]) + [pipe_endpoint(self.ctxt, pipe)]) listener.start() self.pipeline_listeners.append(listener) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index c83105d3..ce1a15d1 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -22,9 +22,11 @@ import fnmatch import os from oslo_config import cfg +from oslo_utils import timeutils import six import yaml +from ceilometer.event.storage import models from ceilometer.i18n import _ from ceilometer.openstack.common import log from ceilometer import publisher @@ -57,13 +59,19 @@ class PipelineException(Exception): return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg) +@six.add_metaclass(abc.ABCMeta) class PipelineEndpoint(object): def __init__(self, context, pipeline): self.publish_context = PublishContext(context, [pipeline]) + @abc.abstractmethod + def sample(self, ctxt, publisher_id, event_type, payload, metadata): + pass + + +class SamplePipelineEndpoint(PipelineEndpoint): def sample(self, ctxt, publisher_id, event_type, payload, metadata): - """RPC endpoint for pipeline messages.""" samples = [ sample_util.Sample(name=s['counter_name'], type=s['counter_type'], @@ -81,6 +89,23 @@ class PipelineEndpoint(object): p(samples) +class EventPipelineEndpoint(PipelineEndpoint): + def sample(self, ctxt, publisher_id, event_type, payload, metadata): + events = [ + models.Event( + message_id=ev['message_id'], + event_type=ev['event_type'], + generated=timeutils.normalize_time( + timeutils.parse_isotime(ev['generated'])), + traits=[models.Trait(name, dtype, + models.Trait.convert_value(dtype, value)) + for name, dtype, value in ev['traits']]) + for ev in payload + ] + with self.publish_context as p: + p(events) + + class PublishContext(object): def __init__(self, context, pipelines=None): @@ -280,7 +305,8 @@ class Sink(object): # Support old format without URL p = p + "://" try: - self.publishers.append(publisher.get_publisher(p)) + self.publishers.append(publisher.get_publisher(p, + self.NAMESPACE)) except Exception: LOG.exception(_("Unable to load publisher %s"), p) @@ -311,6 +337,8 @@ class Sink(object): class EventSink(Sink): + NAMESPACE = 'ceilometer.event.publisher' + def publish_events(self, ctxt, events): if events: for p in self.publishers: @@ -329,6 +357,8 @@ class EventSink(Sink): class SampleSink(Sink): + NAMESPACE = 'ceilometer.publisher' + def _transform_sample(self, start, ctxt, sample): try: for transformer in self.transformers[start:]: diff --git a/ceilometer/publisher/direct.py b/ceilometer/publisher/direct.py index 16efa161..7b3d071b 100644 --- a/ceilometer/publisher/direct.py +++ b/ceilometer/publisher/direct.py @@ -13,11 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. - from oslo.config import cfg from oslo.utils import timeutils -import ceilometer from ceilometer.dispatcher import database from ceilometer import publisher from ceilometer.publisher import utils @@ -33,7 +31,9 @@ class DirectPublisher(publisher.PublisherBase): def __init__(self, parsed_url): super(DirectPublisher, self).__init__(parsed_url) - self.meter_conn = database.DatabaseDispatcher(cfg.CONF).meter_conn + dispatcher = database.DatabaseDispatcher(cfg.CONF) + self.meter_conn = dispatcher.meter_conn + self.event_conn = dispatcher.event_conn def publish_samples(self, context, samples): if not isinstance(samples, list): @@ -54,4 +54,7 @@ class DirectPublisher(publisher.PublisherBase): self.meter_conn.record_metering_data(meter) def publish_events(self, context, events): - raise ceilometer.NotImplementedError + if not isinstance(events, list): + events = [events] + + self.event_conn.record_events(events) diff --git a/ceilometer/publisher/utils.py b/ceilometer/publisher/utils.py index ab9a360b..21ddbd86 100644 --- a/ceilometer/publisher/utils.py +++ b/ceilometer/publisher/utils.py @@ -118,3 +118,13 @@ def meter_message_from_counter(sample, secret): } msg['message_signature'] = compute_signature(msg, secret) return msg + + +def message_from_event(event, secret): + """Make an event message ready to be published or stored. + + Returns a serialized model of Event containing an event message + """ + msg = event.serialize() + msg['message_signature'] = compute_signature(msg, secret) + return msg diff --git a/ceilometer/tests/event/test_endpoint.py b/ceilometer/tests/event/test_endpoint.py index 8ca24881..c66f6271 100644 --- a/ceilometer/tests/event/test_endpoint.py +++ b/ceilometer/tests/event/test_endpoint.py @@ -18,11 +18,8 @@ import mock import oslo.messaging from oslo_config import cfg from oslo_config import fixture as fixture_config -from stevedore import extension -import ceilometer from ceilometer.event import endpoint as event_endpoint -from ceilometer.event.storage import models from ceilometer.tests import base as tests_base TEST_NOTICE_CTXT = { @@ -91,46 +88,21 @@ class TestEventEndpoint(tests_base.BaseTestCase): self.CONF.set_override("store_events", True, group="notification") self.setup_messaging(self.CONF) - self.mock_dispatcher = mock.MagicMock() - self.endpoint = event_endpoint.EventsNotificationEndpoint() - (self.endpoint. - dispatcher_manager) = (extension.ExtensionManager. - make_test_instance([extension. - Extension('test', None, - None, - self. - mock_dispatcher) - ])) + self.mock_pm = mock.MagicMock() + self.endpoint = event_endpoint.EventsNotificationEndpoint(self.mock_pm) self.endpoint.event_converter = mock.MagicMock() self.endpoint.event_converter.to_event.return_value = mock.MagicMock( event_type='test.test') - @mock.patch('ceilometer.event.endpoint.LOG') - def test_event_not_implemented(self, log): - re = self.mock_dispatcher.record_events - re.side_effect = ceilometer.NotImplementedError - message = {'event_type': "foo", 'message_id': "abc"} - ret = self.endpoint.process_notification(message) - log.warn.assert_called_once_with( - 'Event is not implemented with the storage backend') - self.assertEqual(oslo.messaging.NotificationResult.HANDLED, ret) - def test_message_to_event(self): self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', 'compute.instance.create.end', TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) - def test_message_to_event_duplicate(self): - self.mock_dispatcher.record_events.return_value = [ - (models.Event.DUPLICATE, object())] - message = {'event_type': "foo", 'message_id': "abc"} - self.endpoint.process_notification(message) # Should return silently. - def test_message_to_event_bad_event(self): self.CONF.set_override("ack_on_event_error", False, group="notification") - self.mock_dispatcher.record_events.return_value = [ - (models.Event.UNKNOWN_PROBLEM, object())] + self.mock_pm.publisher.side_effect = Exception message = {'event_type': "foo", 'message_id': "abc"} ret = self.endpoint.process_notification(message) self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, ret) diff --git a/ceilometer/tests/publisher/test_direct.py b/ceilometer/tests/publisher/test_direct.py index 5292bf0f..e8d7cdfa 100644 --- a/ceilometer/tests/publisher/test_direct.py +++ b/ceilometer/tests/publisher/test_direct.py @@ -20,6 +20,7 @@ import uuid from oslo.utils import netutils +from ceilometer.event.storage import models as event from ceilometer.publisher import direct from ceilometer import sample from ceilometer.tests import db as tests_db @@ -80,3 +81,22 @@ class TestDirectPublisher(tests_db.TestBase, self.assertEqual(3, len(meters), 'There should be 3 samples') self.assertEqual(['alpha', 'beta', 'gamma'], names) + + +class TestEventDirectPublisher(tests_db.TestBase, + tests_db.MixinTestsWithBackendScenarios): + + test_data = [event.Event(message_id=str(uuid.uuid4()), + event_type='event_%d' % i, + generated=datetime.datetime.utcnow(), + traits=[]) + for i in range(0, 5)] + + def test_direct_publisher(self, ): + parsed_url = netutils.urlsplit('direct://') + publisher = direct.DirectPublisher(parsed_url) + publisher.publish_events(None, self.test_data) + + e_types = list(self.event_conn.get_event_types()) + self.assertEqual(5, len(e_types)) + self.assertEqual(['event_%d' % i for i in range(0, 5)], e_types) diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index c9b41ef9..42f51704 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -139,6 +139,7 @@ class TestNotification(tests_base.BaseTestCase): self.assertNotEqual(self.fake_event_endpoint, self.srv.listeners[0].dispatcher.endpoints[0]) + @mock.patch('ceilometer.pipeline.setup_event_pipeline', mock.MagicMock()) def test_process_notification_with_events(self): self.CONF.set_override("store_events", True, group="notification") self._do_process_notification_manager_start() @@ -146,21 +147,6 @@ class TestNotification(tests_base.BaseTestCase): self.assertEqual(self.fake_event_endpoint, self.srv.listeners[0].dispatcher.endpoints[0]) - @mock.patch('ceilometer.event.converter.get_config_file', - mock.MagicMock(return_value=None)) - @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) - @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start', - mock.MagicMock()) - def test_event_dispatcher_loaded(self): - self.CONF.set_override("store_events", True, group="notification") - with mock.patch.object(self.srv, - '_get_notifications_manager') as get_nm: - get_nm.side_effect = self.fake_get_notifications_manager - self.srv.start() - self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints)) - event_endpoint = self.srv.listeners[0].dispatcher.endpoints[0] - self.assertEqual(1, len(list(event_endpoint.dispatcher_manager))) - @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start', mock.MagicMock()) @@ -193,13 +179,32 @@ class BaseRealNotification(tests_base.BaseTestCase): 'transformers': [], 'publishers': ['test://'], }]) - self.expected_samples = 2 - pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline, prefix="pipeline", suffix="yaml") self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + + self.CONF.set_override("store_events", True, group="notification") + ev_pipeline = yaml.dump({ + 'sources': [{ + 'name': 'test_event', + 'events': '*', + 'sinks': ['test_sink'] + }], + 'sinks': [{ + 'name': 'test_sink', + 'publishers': ['test://'] + }] + }) + self.expected_events = 1 + ev_pipeline_cfg_file = fileutils.write_to_tempfile( + content=ev_pipeline, prefix="event_pipeline", suffix="yaml") + self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file) + self.CONF.set_override( + "definitions_cfg_file", + self.path_get('etc/ceilometer/event_definitions.yaml'), + group='event') self.publisher = test_publisher.TestPublisher("") def _check_notification_service(self): @@ -211,7 +216,8 @@ class BaseRealNotification(tests_base.BaseTestCase): TEST_NOTICE_PAYLOAD) start = timeutils.utcnow() while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if len(self.publisher.samples) >= self.expected_samples: + if (len(self.publisher.samples) >= self.expected_samples and + len(self.publisher.events) >= self.expected_events): break eventlet.sleep(0) @@ -219,6 +225,7 @@ class BaseRealNotification(tests_base.BaseTestCase): resources = list(set(s.resource_id for s in self.publisher.samples)) self.assertEqual(self.expected_samples, len(self.publisher.samples)) + self.assertEqual(self.expected_events, len(self.publisher.events)) self.assertEqual(["9f9d01b9-4a58-4271-9e27-398b21ab20d1"], resources) @@ -263,7 +270,7 @@ class TestRealNotificationHA(BaseRealNotification): def test_reset_listeners_on_refresh(self): self.srv.start() - self.assertEqual(1, len(self.srv.pipeline_listeners)) + self.assertEqual(2, len(self.srv.pipeline_listeners)) self.srv._refresh_agent(None) - self.assertEqual(1, len(self.srv.pipeline_listeners)) + self.assertEqual(2, len(self.srv.pipeline_listeners)) self.srv.stop() diff --git a/setup.cfg b/setup.cfg index 95c559d5..8a8975dc 100755 --- a/setup.cfg +++ b/setup.cfg @@ -261,6 +261,10 @@ ceilometer.publisher = file = ceilometer.publisher.file:FilePublisher direct = ceilometer.publisher.direct:DirectPublisher +ceilometer.event.publisher = + test = ceilometer.publisher.test:TestPublisher + direct = ceilometer.publisher.direct:DirectPublisher + ceilometer.alarm.evaluator = threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator