From 7dc164bc5dd2068499eac6e3173454f97f09d9d5 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Tue, 30 Dec 2014 16:26:44 -0500 Subject: [PATCH] add event pipeline this patch adds base framework for a new event pipeline manager to allow developers/operators ability to transform and trigger incoming notifications similar to how pipeline manager works for samples. Change-Id: I607763213d9bf0121bf4182e236335546a1222f8 Implements: blueprint notification-pipelines --- ceilometer/pipeline.py | 127 +++++++-- ceilometer/publisher/__init__.py | 6 +- ceilometer/publisher/direct.py | 4 + ceilometer/publisher/file.py | 9 + ceilometer/publisher/messaging.py | 9 + ceilometer/publisher/test.py | 10 + ceilometer/publisher/udp.py | 9 + ceilometer/tests/pipeline_base.py | 5 +- ceilometer/tests/test_event_pipeline.py | 364 ++++++++++++++++++++++++ etc/ceilometer/event_pipeline.yaml | 13 + 10 files changed, 531 insertions(+), 25 deletions(-) create mode 100644 ceilometer/tests/test_event_pipeline.py create mode 100644 etc/ceilometer/event_pipeline.yaml diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 8fc5820e..c83105d3 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -17,10 +17,12 @@ # License for the specific language governing permissions and limitations # under the License. +import abc import fnmatch import os from oslo_config import cfg +import six import yaml from ceilometer.i18n import _ @@ -35,6 +37,10 @@ OPTS = [ default="pipeline.yaml", help="Configuration file for pipeline definition." ), + cfg.StrOpt('event_pipeline_cfg_file', + default="event_pipeline.yaml", + help="Configuration file for event pipeline definition." + ), ] cfg.CONF.register_opts(OPTS) @@ -104,6 +110,7 @@ class Source(object): try: self.name = cfg['name'] + self.sinks = cfg.get('sinks') except KeyError as err: raise PipelineException( "Required field %s not specified" % err.args[0], cfg) @@ -143,6 +150,41 @@ class Source(object): 'Included %s specified with wildcard' % d_type, self.cfg) + @staticmethod + def is_supported(dataset, data_name): + # Support wildcard like storage.* and !disk.* + # Start with negation, we consider that the order is deny, allow + if any(fnmatch.fnmatch(data_name, datapoint[1:]) + for datapoint in dataset if datapoint[0] == '!'): + return False + + if any(fnmatch.fnmatch(data_name, datapoint) + for datapoint in dataset if datapoint[0] != '!'): + return True + + # if we only have negation, we suppose the default is allow + return all(datapoint.startswith('!') for datapoint in dataset) + + +class EventSource(Source): + """Represents a source of events. + + In effect it is a set of notification handlers capturing events for a set + of matching notifications. + """ + + def __init__(self, cfg): + super(EventSource, self).__init__(cfg) + try: + self.events = cfg['events'] + except KeyError as err: + raise PipelineException( + "Required field %s not specified" % err.args[0], cfg) + self.check_source_filtering(self.events, 'events') + + def support_event(self, event_name): + return self.is_supported(self.events, event_name) + class SampleSource(Source): """Represents a source of samples. @@ -162,7 +204,6 @@ class SampleSource(Source): raise PipelineException("Invalid interval value", cfg) # Support 'counters' for backward compatibility self.meters = cfg.get('meters', cfg.get('counters')) - self.sinks = cfg.get('sinks') except KeyError as err: raise PipelineException( "Required field %s not specified" % err.args[0], cfg) @@ -191,24 +232,7 @@ class SampleSource(Source): def support_meter(self, meter_name): meter_name = self._variable_meter_name(meter_name) - - # Special case: if we only have negation, we suppose the default is - # allow - default = all(meter.startswith('!') for meter in self.meters) - - # Support wildcard like storage.* and !disk.* - # Start with negation, we consider that the order is deny, allow - if any(fnmatch.fnmatch(meter_name, meter[1:]) - for meter in self.meters - if meter[0] == '!'): - return False - - if any(fnmatch.fnmatch(meter_name, meter) - for meter in self.meters - if meter[0] != '!'): - return True - - return default + return self.is_supported(self.meters, meter_name) class Sink(object): @@ -285,6 +309,24 @@ class Sink(object): return transformers +class EventSink(Sink): + + def publish_events(self, ctxt, events): + if events: + for p in self.publishers: + try: + p.publish_events(ctxt, events) + except Exception: + LOG.exception(_( + "Pipeline %(pipeline)s: Continue after error " + "from publisher %(pub)s") % ({'pipeline': self, + 'pub': p})) + + def flush(self, ctxt): + """Flush data after all events have been injected to pipeline.""" + pass + + class SampleSink(Sink): def _transform_sample(self, start, ctxt, sample): @@ -299,6 +341,7 @@ class SampleSink(Sink): return return sample except Exception as err: + # TODO(gordc): only use one log level. LOG.warning(_("Pipeline %(pipeline)s: " "Exit after error from transformer " "%(trans)s for %(smp)s") % ({'pipeline': self, @@ -359,6 +402,7 @@ class SampleSink(Sink): LOG.exception(err) +@six.add_metaclass(abc.ABCMeta) class Pipeline(object): """Represents a coupling between a sink and a corresponding source.""" @@ -378,6 +422,29 @@ class Pipeline(object): def publishers(self): return self.sink.publishers + @abc.abstractmethod + def publish_data(self, ctxt, data): + """Publish data from pipeline.""" + + +class EventPipeline(Pipeline): + """Represents a pipeline for Events.""" + + def __str__(self): + # NOTE(gordc): prepend a namespace so we ensure event and sample + # pipelines do not have the same name. + return 'event:%s' % super(EventPipeline, self).__str__() + + def support_event(self, event_type): + return self.source.support_event(event_type) + + def publish_data(self, ctxt, events): + if not isinstance(events, list): + events = [events] + supported = [e for e in events + if self.source.support_event(e.event_type)] + self.sink.publish_events(ctxt, supported) + class SamplePipeline(Pipeline): """Represents a pipeline for Samples.""" @@ -407,6 +474,10 @@ SAMPLE_TYPE = {'pipeline': SamplePipeline, 'source': SampleSource, 'sink': SampleSink} +EVENT_TYPE = {'pipeline': EventPipeline, + 'source': EventSource, + 'sink': EventSink} + class PipelineManager(object): """Pipeline Manager @@ -556,9 +627,7 @@ class PipelineManager(object): return PublishContext(context, self.pipelines) -def setup_pipeline(transformer_manager=None): - """Setup pipeline manager according to yaml config file.""" - cfg_file = cfg.CONF.pipeline_cfg_file +def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE): if not os.path.exists(cfg_file): cfg_file = cfg.CONF.find_file(cfg_file) @@ -574,4 +643,16 @@ def setup_pipeline(transformer_manager=None): transformer_manager or xformer.TransformerExtensionManager( 'ceilometer.transformer', - )) + ), p_type) + + +def setup_event_pipeline(transformer_manager=None): + """Setup event pipeline manager according to yaml config file.""" + cfg_file = cfg.CONF.event_pipeline_cfg_file + return _setup_pipeline_manager(cfg_file, transformer_manager, EVENT_TYPE) + + +def setup_pipeline(transformer_manager=None): + """Setup pipeline manager according to yaml config file.""" + cfg_file = cfg.CONF.pipeline_cfg_file + return _setup_pipeline_manager(cfg_file, transformer_manager) diff --git a/ceilometer/publisher/__init__.py b/ceilometer/publisher/__init__.py index 70578833..4cc49b0b 100644 --- a/ceilometer/publisher/__init__.py +++ b/ceilometer/publisher/__init__.py @@ -37,7 +37,7 @@ def get_publisher(url, namespace='ceilometer.publisher'): @six.add_metaclass(abc.ABCMeta) class PublisherBase(object): - """Base class for plugins that publish the sampler.""" + """Base class for plugins that publish data.""" def __init__(self, parsed_url): pass @@ -45,3 +45,7 @@ class PublisherBase(object): @abc.abstractmethod def publish_samples(self, context, samples): """Publish samples into final conduit.""" + + @abc.abstractmethod + def publish_events(self, context, events): + """Publish events into final conduit.""" diff --git a/ceilometer/publisher/direct.py b/ceilometer/publisher/direct.py index 660d7932..fb46d15c 100644 --- a/ceilometer/publisher/direct.py +++ b/ceilometer/publisher/direct.py @@ -19,6 +19,7 @@ from oslo.config import cfg from oslo.utils import timeutils +import ceilometer from ceilometer.dispatcher import database from ceilometer import publisher from ceilometer.publisher import utils @@ -53,3 +54,6 @@ class DirectPublisher(publisher.PublisherBase): ts = timeutils.parse_isotime(meter['timestamp']) meter['timestamp'] = timeutils.normalize_time(ts) self.meter_conn.record_metering_data(meter) + + def publish_events(self, context, events): + raise ceilometer.NotImplementedError diff --git a/ceilometer/publisher/file.py b/ceilometer/publisher/file.py index af0971bd..ded7762a 100644 --- a/ceilometer/publisher/file.py +++ b/ceilometer/publisher/file.py @@ -20,6 +20,7 @@ import logging.handlers from six.moves.urllib import parse as urlparse +import ceilometer from ceilometer.i18n import _ from ceilometer.openstack.common import log from ceilometer import publisher @@ -97,3 +98,11 @@ class FilePublisher(publisher.PublisherBase): if self.publisher_logger: for sample in samples: self.publisher_logger.info(sample.as_dict()) + + def publish_events(self, context, events): + """Send an event message for publishing + + :param context: Execution context from the service or RPC call + :param events: events from pipeline after transformation + """ + raise ceilometer.NotImplementedError diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index 6cfb46da..1e4c31ab 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -26,6 +26,7 @@ from oslo_config import cfg import six import six.moves.urllib.parse as urlparse +import ceilometer from ceilometer.i18n import _ from ceilometer import messaging from ceilometer.openstack.common import log @@ -162,6 +163,14 @@ class MessagingPublisher(publisher.PublisherBase): queue.pop(0) return [] + def publish_events(self, context, events): + """Send an event message for publishing + + :param context: Execution context from the service or RPC call + :param events: events from pipeline after transformation + """ + raise ceilometer.NotImplementedError + @abc.abstractmethod def _send(self, context, topic, meters): """Send the meters to the messaging topic.""" diff --git a/ceilometer/publisher/test.py b/ceilometer/publisher/test.py index 8164ec28..97a62936 100644 --- a/ceilometer/publisher/test.py +++ b/ceilometer/publisher/test.py @@ -25,6 +25,7 @@ class TestPublisher(publisher.PublisherBase): def __init__(self, parsed_url): self.samples = [] + self.events = [] self.calls = 0 def publish_samples(self, context, samples): @@ -35,3 +36,12 @@ class TestPublisher(publisher.PublisherBase): """ self.samples.extend(samples) self.calls += 1 + + def publish_events(self, context, events): + """Send an event message for publishing + + :param context: Execution context from the service or RPC call + :param events: events from pipeline after transformation + """ + self.events.extend(events) + self.calls += 1 diff --git a/ceilometer/publisher/udp.py b/ceilometer/publisher/udp.py index 9c3e2f93..10266a66 100644 --- a/ceilometer/publisher/udp.py +++ b/ceilometer/publisher/udp.py @@ -24,6 +24,7 @@ import msgpack from oslo_config import cfg from oslo_utils import netutils +import ceilometer from ceilometer.i18n import _ from ceilometer.openstack.common import log from ceilometer import publisher @@ -65,3 +66,11 @@ class UDPPublisher(publisher.PublisherBase): except Exception as e: LOG.warn(_("Unable to send sample over UDP")) LOG.exception(e) + + def publish_events(self, context, events): + """Send an event message for publishing + + :param context: Execution context from the service or RPC call + :param events: events from pipeline after transformation + """ + raise ceilometer.NotImplementedError diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index 0b4d3377..fd747017 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -78,7 +78,10 @@ class BasePipelineTestCase(base.BaseTestCase): return fake_drivers[url](url) class PublisherClassException(publisher.PublisherBase): - def publish_samples(self, ctxt, counters): + def publish_samples(self, ctxt, samples): + raise Exception() + + def publish_events(self, ctxt, events): raise Exception() class TransformerClass(transformer.TransformerBase): diff --git a/ceilometer/tests/test_event_pipeline.py b/ceilometer/tests/test_event_pipeline.py new file mode 100644 index 00000000..6890f86e --- /dev/null +++ b/ceilometer/tests/test_event_pipeline.py @@ -0,0 +1,364 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import traceback +import uuid + +from oslotest import base +from oslotest import mockpatch + +from ceilometer.event.storage import models +from ceilometer import pipeline +from ceilometer import publisher +from ceilometer.publisher import test as test_publisher + + +class EventPipelineTestCase(base.BaseTestCase): + + def get_publisher(self, url, namespace=''): + fake_drivers = {'test://': test_publisher.TestPublisher, + 'new://': test_publisher.TestPublisher, + 'except://': self.PublisherClassException} + return fake_drivers[url](url) + + class PublisherClassException(publisher.PublisherBase): + def publish_samples(self, ctxt, samples): + pass + + def publish_events(self, ctxt, events): + raise Exception() + + def setUp(self): + super(EventPipelineTestCase, self).setUp() + self.p_type = pipeline.EVENT_TYPE + self.transformer_manager = None + + self.test_event = models.Event( + message_id=uuid.uuid4(), + event_type='a', + generated=datetime.datetime.utcnow(), + traits=[ + models.Trait('t_text', 1, 'text_trait'), + models.Trait('t_int', 2, 'int_trait'), + models.Trait('t_float', 3, 'float_trait'), + models.Trait('t_datetime', 4, 'datetime_trait') + ] + ) + + self.test_event2 = models.Event( + message_id=uuid.uuid4(), + event_type='b', + generated=datetime.datetime.utcnow(), + traits=[ + models.Trait('t_text', 1, 'text_trait'), + models.Trait('t_int', 2, 'int_trait'), + models.Trait('t_float', 3, 'float_trait'), + models.Trait('t_datetime', 4, 'datetime_trait') + ] + ) + + self.useFixture(mockpatch.PatchObject( + publisher, 'get_publisher', side_effect=self.get_publisher)) + + self._setup_pipeline_cfg() + + self._reraise_exception = True + self.useFixture(mockpatch.Patch( + 'ceilometer.pipeline.LOG.exception', + side_effect=self._handle_reraise_exception)) + + def _handle_reraise_exception(self, msg): + if self._reraise_exception: + raise Exception(traceback.format_exc()) + + def _setup_pipeline_cfg(self): + """Setup the appropriate form of pipeline config.""" + source = {'name': 'test_source', + 'events': ['a'], + 'sinks': ['test_sink']} + sink = {'name': 'test_sink', + 'publishers': ['test://']} + self.pipeline_cfg = {'sources': [source], 'sinks': [sink]} + + def _augment_pipeline_cfg(self): + """Augment the pipeline config with an additional element.""" + self.pipeline_cfg['sources'].append({ + 'name': 'second_source', + 'events': ['b'], + 'sinks': ['second_sink'] + }) + self.pipeline_cfg['sinks'].append({ + 'name': 'second_sink', + 'publishers': ['new://'], + }) + + def _break_pipeline_cfg(self): + """Break the pipeline config with a malformed element.""" + self.pipeline_cfg['sources'].append({ + 'name': 'second_source', + 'events': ['b'], + 'sinks': ['second_sink'] + }) + self.pipeline_cfg['sinks'].append({ + 'name': 'second_sink', + 'publishers': ['except'], + }) + + def _dup_pipeline_name_cfg(self): + """Break the pipeline config with duplicate pipeline name.""" + self.pipeline_cfg['sources'].append({ + 'name': 'test_source', + 'events': ['a'], + 'sinks': ['test_sink'] + }) + + def _set_pipeline_cfg(self, field, value): + if field in self.pipeline_cfg['sources'][0]: + self.pipeline_cfg['sources'][0][field] = value + else: + self.pipeline_cfg['sinks'][0][field] = value + + def _extend_pipeline_cfg(self, field, value): + if field in self.pipeline_cfg['sources'][0]: + self.pipeline_cfg['sources'][0][field].extend(value) + else: + self.pipeline_cfg['sinks'][0][field].extend(value) + + def _unset_pipeline_cfg(self, field): + if field in self.pipeline_cfg['sources'][0]: + del self.pipeline_cfg['sources'][0][field] + else: + del self.pipeline_cfg['sinks'][0][field] + + def _exception_create_pipelinemanager(self): + self.assertRaises(pipeline.PipelineException, + pipeline.PipelineManager, + self.pipeline_cfg, + self.transformer_manager, + self.p_type) + + def test_no_events(self): + self._unset_pipeline_cfg('events') + self._exception_create_pipelinemanager() + + def test_no_name(self): + self._unset_pipeline_cfg('name') + self._exception_create_pipelinemanager() + + def test_name(self): + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + for pipe in pipeline_manager.pipelines: + self.assertTrue(pipe.name.startswith('event:')) + + def test_no_publishers(self): + self._unset_pipeline_cfg('publishers') + self._exception_create_pipelinemanager() + + def test_check_events_include_exclude_same(self): + event_cfg = ['a', '!a'] + self._set_pipeline_cfg('events', event_cfg) + self._exception_create_pipelinemanager() + + def test_check_events_include_exclude(self): + event_cfg = ['a', '!b'] + self._set_pipeline_cfg('events', event_cfg) + self._exception_create_pipelinemanager() + + def test_check_events_wildcard_included(self): + event_cfg = ['a', '*'] + self._set_pipeline_cfg('events', event_cfg) + self._exception_create_pipelinemanager() + + def test_check_publishers_invalid_publisher(self): + publisher_cfg = ['test_invalid'] + self._set_pipeline_cfg('publishers', publisher_cfg) + + def test_multiple_included_events(self): + event_cfg = ['a', 'b'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + + with pipeline_manager.publisher(None) as p: + p([self.test_event]) + + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(1, len(publisher.events)) + + with pipeline_manager.publisher(None) as p: + p([self.test_event2]) + + self.assertEqual(2, len(publisher.events)) + self.assertEqual('a', getattr(publisher.events[0], 'event_type')) + self.assertEqual('b', getattr(publisher.events[1], 'event_type')) + + def test_event_non_match(self): + event_cfg = ['nomatch'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + with pipeline_manager.publisher(None) as p: + p([self.test_event]) + + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(0, len(publisher.events)) + self.assertEqual(0, publisher.calls) + + def test_wildcard_event(self): + event_cfg = ['*'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + with pipeline_manager.publisher(None) as p: + p([self.test_event]) + + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(1, len(publisher.events)) + self.assertEqual('a', getattr(publisher.events[0], 'event_type')) + + def test_wildcard_excluded_events(self): + event_cfg = ['*', '!a'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + self.assertFalse(pipeline_manager.pipelines[0].support_event('a')) + + def test_wildcard_excluded_events_not_excluded(self): + event_cfg = ['*', '!b'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + with pipeline_manager.publisher(None) as p: + p([self.test_event]) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(1, len(publisher.events)) + self.assertEqual('a', getattr(publisher.events[0], 'event_type')) + + def test_all_excluded_events_not_excluded(self): + event_cfg = ['!b', '!c'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + with pipeline_manager.publisher(None) as p: + p([self.test_event]) + + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(1, len(publisher.events)) + self.assertEqual('a', getattr(publisher.events[0], 'event_type')) + + def test_all_excluded_events_excluded(self): + event_cfg = ['!a', '!c'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + self.assertFalse(pipeline_manager.pipelines[0].support_event('a')) + self.assertTrue(pipeline_manager.pipelines[0].support_event('b')) + self.assertFalse(pipeline_manager.pipelines[0].support_event('c')) + + def test_wildcard_and_excluded_wildcard_events(self): + event_cfg = ['*', '!compute.*'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + self.assertFalse(pipeline_manager.pipelines[0]. + support_event('compute.instance.create.start')) + self.assertTrue(pipeline_manager.pipelines[0]. + support_event('identity.user.create')) + + def test_included_event_and_wildcard_events(self): + event_cfg = ['compute.instance.create.start', 'identity.*'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + self.assertTrue(pipeline_manager.pipelines[0]. + support_event('identity.user.create')) + self.assertTrue(pipeline_manager.pipelines[0]. + support_event('compute.instance.create.start')) + self.assertFalse(pipeline_manager.pipelines[0]. + support_event('compute.instance.create.stop')) + + def test_excluded_event_and_excluded_wildcard_events(self): + event_cfg = ['!compute.instance.create.start', '!identity.*'] + self._set_pipeline_cfg('events', event_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + self.assertFalse(pipeline_manager.pipelines[0]. + support_event('identity.user.create')) + self.assertFalse(pipeline_manager.pipelines[0]. + support_event('compute.instance.create.start')) + self.assertTrue(pipeline_manager.pipelines[0]. + support_event('compute.instance.create.stop')) + + def test_multiple_pipeline(self): + self._augment_pipeline_cfg() + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + with pipeline_manager.publisher(None) as p: + p([self.test_event, self.test_event2]) + + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(1, len(publisher.events)) + self.assertEqual(1, publisher.calls) + self.assertEqual('a', getattr(publisher.events[0], 'event_type')) + new_publisher = pipeline_manager.pipelines[1].publishers[0] + self.assertEqual(1, len(new_publisher.events)) + self.assertEqual(1, new_publisher.calls) + self.assertEqual('b', getattr(new_publisher.events[0], 'event_type')) + + def test_multiple_publisher(self): + self._set_pipeline_cfg('publishers', ['test://', 'new://']) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + + with pipeline_manager.publisher(None) as p: + p([self.test_event]) + + publisher = pipeline_manager.pipelines[0].publishers[0] + new_publisher = pipeline_manager.pipelines[0].publishers[1] + self.assertEqual(1, len(publisher.events)) + self.assertEqual(1, len(new_publisher.events)) + self.assertEqual('a', getattr(new_publisher.events[0], 'event_type')) + self.assertEqual('a', getattr(publisher.events[0], 'event_type')) + + def test_multiple_publisher_isolation(self): + self._reraise_exception = False + self._set_pipeline_cfg('publishers', ['except://', 'new://']) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + with pipeline_manager.publisher(None) as p: + p([self.test_event]) + + publisher = pipeline_manager.pipelines[0].publishers[1] + self.assertEqual(1, len(publisher.events)) + self.assertEqual('a', getattr(publisher.events[0], 'event_type')) + + def test_unique_pipeline_names(self): + self._dup_pipeline_name_cfg() + self._exception_create_pipelinemanager() diff --git a/etc/ceilometer/event_pipeline.yaml b/etc/ceilometer/event_pipeline.yaml new file mode 100644 index 00000000..d6c5e256 --- /dev/null +++ b/etc/ceilometer/event_pipeline.yaml @@ -0,0 +1,13 @@ +--- +sources: + - name: event_source + events: + - "*" + sinks: + - event_sink +sinks: + - name: event_sink + transformers: + triggers: + publishers: + - direct://