Merge "add event pipeline"

This commit is contained in:
Jenkins 2015-01-29 14:27:16 +00:00 committed by Gerrit Code Review
commit 479ac5feea
10 changed files with 531 additions and 25 deletions

View File

@ -17,10 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import fnmatch
import os
from oslo_config import cfg
import six
import yaml
from ceilometer.i18n import _
@ -35,6 +37,10 @@ OPTS = [
default="pipeline.yaml",
help="Configuration file for pipeline definition."
),
cfg.StrOpt('event_pipeline_cfg_file',
default="event_pipeline.yaml",
help="Configuration file for event pipeline definition."
),
]
cfg.CONF.register_opts(OPTS)
@ -104,6 +110,7 @@ class Source(object):
try:
self.name = cfg['name']
self.sinks = cfg.get('sinks')
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
@ -143,6 +150,41 @@ class Source(object):
'Included %s specified with wildcard' % d_type,
self.cfg)
@staticmethod
def is_supported(dataset, data_name):
# Support wildcard like storage.* and !disk.*
# Start with negation, we consider that the order is deny, allow
if any(fnmatch.fnmatch(data_name, datapoint[1:])
for datapoint in dataset if datapoint[0] == '!'):
return False
if any(fnmatch.fnmatch(data_name, datapoint)
for datapoint in dataset if datapoint[0] != '!'):
return True
# if we only have negation, we suppose the default is allow
return all(datapoint.startswith('!') for datapoint in dataset)
class EventSource(Source):
"""Represents a source of events.
In effect it is a set of notification handlers capturing events for a set
of matching notifications.
"""
def __init__(self, cfg):
super(EventSource, self).__init__(cfg)
try:
self.events = cfg['events']
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
self.check_source_filtering(self.events, 'events')
def support_event(self, event_name):
return self.is_supported(self.events, event_name)
class SampleSource(Source):
"""Represents a source of samples.
@ -162,7 +204,6 @@ class SampleSource(Source):
raise PipelineException("Invalid interval value", cfg)
# Support 'counters' for backward compatibility
self.meters = cfg.get('meters', cfg.get('counters'))
self.sinks = cfg.get('sinks')
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
@ -191,24 +232,7 @@ class SampleSource(Source):
def support_meter(self, meter_name):
meter_name = self._variable_meter_name(meter_name)
# Special case: if we only have negation, we suppose the default is
# allow
default = all(meter.startswith('!') for meter in self.meters)
# Support wildcard like storage.* and !disk.*
# Start with negation, we consider that the order is deny, allow
if any(fnmatch.fnmatch(meter_name, meter[1:])
for meter in self.meters
if meter[0] == '!'):
return False
if any(fnmatch.fnmatch(meter_name, meter)
for meter in self.meters
if meter[0] != '!'):
return True
return default
return self.is_supported(self.meters, meter_name)
class Sink(object):
@ -285,6 +309,24 @@ class Sink(object):
return transformers
class EventSink(Sink):
def publish_events(self, ctxt, events):
if events:
for p in self.publishers:
try:
p.publish_events(ctxt, events)
except Exception:
LOG.exception(_(
"Pipeline %(pipeline)s: Continue after error "
"from publisher %(pub)s") % ({'pipeline': self,
'pub': p}))
def flush(self, ctxt):
"""Flush data after all events have been injected to pipeline."""
pass
class SampleSink(Sink):
def _transform_sample(self, start, ctxt, sample):
@ -299,6 +341,7 @@ class SampleSink(Sink):
return
return sample
except Exception as err:
# TODO(gordc): only use one log level.
LOG.warning(_("Pipeline %(pipeline)s: "
"Exit after error from transformer "
"%(trans)s for %(smp)s") % ({'pipeline': self,
@ -359,6 +402,7 @@ class SampleSink(Sink):
LOG.exception(err)
@six.add_metaclass(abc.ABCMeta)
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
@ -378,6 +422,29 @@ class Pipeline(object):
def publishers(self):
return self.sink.publishers
@abc.abstractmethod
def publish_data(self, ctxt, data):
"""Publish data from pipeline."""
class EventPipeline(Pipeline):
"""Represents a pipeline for Events."""
def __str__(self):
# NOTE(gordc): prepend a namespace so we ensure event and sample
# pipelines do not have the same name.
return 'event:%s' % super(EventPipeline, self).__str__()
def support_event(self, event_type):
return self.source.support_event(event_type)
def publish_data(self, ctxt, events):
if not isinstance(events, list):
events = [events]
supported = [e for e in events
if self.source.support_event(e.event_type)]
self.sink.publish_events(ctxt, supported)
class SamplePipeline(Pipeline):
"""Represents a pipeline for Samples."""
@ -407,6 +474,10 @@ SAMPLE_TYPE = {'pipeline': SamplePipeline,
'source': SampleSource,
'sink': SampleSink}
EVENT_TYPE = {'pipeline': EventPipeline,
'source': EventSource,
'sink': EventSink}
class PipelineManager(object):
"""Pipeline Manager
@ -556,9 +627,7 @@ class PipelineManager(object):
return PublishContext(context, self.pipelines)
def setup_pipeline(transformer_manager=None):
"""Setup pipeline manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
if not os.path.exists(cfg_file):
cfg_file = cfg.CONF.find_file(cfg_file)
@ -574,4 +643,16 @@ def setup_pipeline(transformer_manager=None):
transformer_manager or
xformer.TransformerExtensionManager(
'ceilometer.transformer',
))
), p_type)
def setup_event_pipeline(transformer_manager=None):
"""Setup event pipeline manager according to yaml config file."""
cfg_file = cfg.CONF.event_pipeline_cfg_file
return _setup_pipeline_manager(cfg_file, transformer_manager, EVENT_TYPE)
def setup_pipeline(transformer_manager=None):
"""Setup pipeline manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
return _setup_pipeline_manager(cfg_file, transformer_manager)

View File

@ -34,7 +34,7 @@ def get_publisher(url, namespace='ceilometer.publisher'):
@six.add_metaclass(abc.ABCMeta)
class PublisherBase(object):
"""Base class for plugins that publish the sampler."""
"""Base class for plugins that publish data."""
def __init__(self, parsed_url):
pass
@ -42,3 +42,7 @@ class PublisherBase(object):
@abc.abstractmethod
def publish_samples(self, context, samples):
"""Publish samples into final conduit."""
@abc.abstractmethod
def publish_events(self, context, events):
"""Publish events into final conduit."""

View File

@ -17,6 +17,7 @@
from oslo.config import cfg
from oslo.utils import timeutils
import ceilometer
from ceilometer.dispatcher import database
from ceilometer import publisher
from ceilometer.publisher import utils
@ -51,3 +52,6 @@ class DirectPublisher(publisher.PublisherBase):
ts = timeutils.parse_isotime(meter['timestamp'])
meter['timestamp'] = timeutils.normalize_time(ts)
self.meter_conn.record_metering_data(meter)
def publish_events(self, context, events):
raise ceilometer.NotImplementedError

View File

@ -18,6 +18,7 @@ import logging.handlers
from six.moves.urllib import parse as urlparse
import ceilometer
from ceilometer.i18n import _
from ceilometer.openstack.common import log
from ceilometer import publisher
@ -95,3 +96,11 @@ class FilePublisher(publisher.PublisherBase):
if self.publisher_logger:
for sample in samples:
self.publisher_logger.info(sample.as_dict())
def publish_events(self, context, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
raise ceilometer.NotImplementedError

View File

@ -24,6 +24,7 @@ from oslo_config import cfg
import six
import six.moves.urllib.parse as urlparse
import ceilometer
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer.openstack.common import log
@ -160,6 +161,14 @@ class MessagingPublisher(publisher.PublisherBase):
queue.pop(0)
return []
def publish_events(self, context, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
raise ceilometer.NotImplementedError
@abc.abstractmethod
def _send(self, context, topic, meters):
"""Send the meters to the messaging topic."""

View File

@ -23,6 +23,7 @@ class TestPublisher(publisher.PublisherBase):
def __init__(self, parsed_url):
self.samples = []
self.events = []
self.calls = 0
def publish_samples(self, context, samples):
@ -33,3 +34,12 @@ class TestPublisher(publisher.PublisherBase):
"""
self.samples.extend(samples)
self.calls += 1
def publish_events(self, context, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
self.events.extend(events)
self.calls += 1

View File

@ -21,6 +21,7 @@ import msgpack
from oslo_config import cfg
from oslo_utils import netutils
import ceilometer
from ceilometer.i18n import _
from ceilometer.openstack.common import log
from ceilometer import publisher
@ -62,3 +63,11 @@ class UDPPublisher(publisher.PublisherBase):
except Exception as e:
LOG.warn(_("Unable to send sample over UDP"))
LOG.exception(e)
def publish_events(self, context, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
raise ceilometer.NotImplementedError

View File

@ -78,7 +78,10 @@ class BasePipelineTestCase(base.BaseTestCase):
return fake_drivers[url](url)
class PublisherClassException(publisher.PublisherBase):
def publish_samples(self, ctxt, counters):
def publish_samples(self, ctxt, samples):
raise Exception()
def publish_events(self, ctxt, events):
raise Exception()
class TransformerClass(transformer.TransformerBase):

View File

@ -0,0 +1,364 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import traceback
import uuid
from oslotest import base
from oslotest import mockpatch
from ceilometer.event.storage import models
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
class EventPipelineTestCase(base.BaseTestCase):
def get_publisher(self, url, namespace=''):
fake_drivers = {'test://': test_publisher.TestPublisher,
'new://': test_publisher.TestPublisher,
'except://': self.PublisherClassException}
return fake_drivers[url](url)
class PublisherClassException(publisher.PublisherBase):
def publish_samples(self, ctxt, samples):
pass
def publish_events(self, ctxt, events):
raise Exception()
def setUp(self):
super(EventPipelineTestCase, self).setUp()
self.p_type = pipeline.EVENT_TYPE
self.transformer_manager = None
self.test_event = models.Event(
message_id=uuid.uuid4(),
event_type='a',
generated=datetime.datetime.utcnow(),
traits=[
models.Trait('t_text', 1, 'text_trait'),
models.Trait('t_int', 2, 'int_trait'),
models.Trait('t_float', 3, 'float_trait'),
models.Trait('t_datetime', 4, 'datetime_trait')
]
)
self.test_event2 = models.Event(
message_id=uuid.uuid4(),
event_type='b',
generated=datetime.datetime.utcnow(),
traits=[
models.Trait('t_text', 1, 'text_trait'),
models.Trait('t_int', 2, 'int_trait'),
models.Trait('t_float', 3, 'float_trait'),
models.Trait('t_datetime', 4, 'datetime_trait')
]
)
self.useFixture(mockpatch.PatchObject(
publisher, 'get_publisher', side_effect=self.get_publisher))
self._setup_pipeline_cfg()
self._reraise_exception = True
self.useFixture(mockpatch.Patch(
'ceilometer.pipeline.LOG.exception',
side_effect=self._handle_reraise_exception))
def _handle_reraise_exception(self, msg):
if self._reraise_exception:
raise Exception(traceback.format_exc())
def _setup_pipeline_cfg(self):
"""Setup the appropriate form of pipeline config."""
source = {'name': 'test_source',
'events': ['a'],
'sinks': ['test_sink']}
sink = {'name': 'test_sink',
'publishers': ['test://']}
self.pipeline_cfg = {'sources': [source], 'sinks': [sink]}
def _augment_pipeline_cfg(self):
"""Augment the pipeline config with an additional element."""
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'events': ['b'],
'sinks': ['second_sink']
})
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
'publishers': ['new://'],
})
def _break_pipeline_cfg(self):
"""Break the pipeline config with a malformed element."""
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'events': ['b'],
'sinks': ['second_sink']
})
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
'publishers': ['except'],
})
def _dup_pipeline_name_cfg(self):
"""Break the pipeline config with duplicate pipeline name."""
self.pipeline_cfg['sources'].append({
'name': 'test_source',
'events': ['a'],
'sinks': ['test_sink']
})
def _set_pipeline_cfg(self, field, value):
if field in self.pipeline_cfg['sources'][0]:
self.pipeline_cfg['sources'][0][field] = value
else:
self.pipeline_cfg['sinks'][0][field] = value
def _extend_pipeline_cfg(self, field, value):
if field in self.pipeline_cfg['sources'][0]:
self.pipeline_cfg['sources'][0][field].extend(value)
else:
self.pipeline_cfg['sinks'][0][field].extend(value)
def _unset_pipeline_cfg(self, field):
if field in self.pipeline_cfg['sources'][0]:
del self.pipeline_cfg['sources'][0][field]
else:
del self.pipeline_cfg['sinks'][0][field]
def _exception_create_pipelinemanager(self):
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
def test_no_events(self):
self._unset_pipeline_cfg('events')
self._exception_create_pipelinemanager()
def test_no_name(self):
self._unset_pipeline_cfg('name')
self._exception_create_pipelinemanager()
def test_name(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
for pipe in pipeline_manager.pipelines:
self.assertTrue(pipe.name.startswith('event:'))
def test_no_publishers(self):
self._unset_pipeline_cfg('publishers')
self._exception_create_pipelinemanager()
def test_check_events_include_exclude_same(self):
event_cfg = ['a', '!a']
self._set_pipeline_cfg('events', event_cfg)
self._exception_create_pipelinemanager()
def test_check_events_include_exclude(self):
event_cfg = ['a', '!b']
self._set_pipeline_cfg('events', event_cfg)
self._exception_create_pipelinemanager()
def test_check_events_wildcard_included(self):
event_cfg = ['a', '*']
self._set_pipeline_cfg('events', event_cfg)
self._exception_create_pipelinemanager()
def test_check_publishers_invalid_publisher(self):
publisher_cfg = ['test_invalid']
self._set_pipeline_cfg('publishers', publisher_cfg)
def test_multiple_included_events(self):
event_cfg = ['a', 'b']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.events))
with pipeline_manager.publisher(None) as p:
p([self.test_event2])
self.assertEqual(2, len(publisher.events))
self.assertEqual('a', getattr(publisher.events[0], 'event_type'))
self.assertEqual('b', getattr(publisher.events[1], 'event_type'))
def test_event_non_match(self):
event_cfg = ['nomatch']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.events))
self.assertEqual(0, publisher.calls)
def test_wildcard_event(self):
event_cfg = ['*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.events))
self.assertEqual('a', getattr(publisher.events[0], 'event_type'))
def test_wildcard_excluded_events(self):
event_cfg = ['*', '!a']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertFalse(pipeline_manager.pipelines[0].support_event('a'))
def test_wildcard_excluded_events_not_excluded(self):
event_cfg = ['*', '!b']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.events))
self.assertEqual('a', getattr(publisher.events[0], 'event_type'))
def test_all_excluded_events_not_excluded(self):
event_cfg = ['!b', '!c']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.events))
self.assertEqual('a', getattr(publisher.events[0], 'event_type'))
def test_all_excluded_events_excluded(self):
event_cfg = ['!a', '!c']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertFalse(pipeline_manager.pipelines[0].support_event('a'))
self.assertTrue(pipeline_manager.pipelines[0].support_event('b'))
self.assertFalse(pipeline_manager.pipelines[0].support_event('c'))
def test_wildcard_and_excluded_wildcard_events(self):
event_cfg = ['*', '!compute.*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertFalse(pipeline_manager.pipelines[0].
support_event('compute.instance.create.start'))
self.assertTrue(pipeline_manager.pipelines[0].
support_event('identity.user.create'))
def test_included_event_and_wildcard_events(self):
event_cfg = ['compute.instance.create.start', 'identity.*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertTrue(pipeline_manager.pipelines[0].
support_event('identity.user.create'))
self.assertTrue(pipeline_manager.pipelines[0].
support_event('compute.instance.create.start'))
self.assertFalse(pipeline_manager.pipelines[0].
support_event('compute.instance.create.stop'))
def test_excluded_event_and_excluded_wildcard_events(self):
event_cfg = ['!compute.instance.create.start', '!identity.*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertFalse(pipeline_manager.pipelines[0].
support_event('identity.user.create'))
self.assertFalse(pipeline_manager.pipelines[0].
support_event('compute.instance.create.start'))
self.assertTrue(pipeline_manager.pipelines[0].
support_event('compute.instance.create.stop'))
def test_multiple_pipeline(self):
self._augment_pipeline_cfg()
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
p([self.test_event, self.test_event2])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.events))
self.assertEqual(1, publisher.calls)
self.assertEqual('a', getattr(publisher.events[0], 'event_type'))
new_publisher = pipeline_manager.pipelines[1].publishers[0]
self.assertEqual(1, len(new_publisher.events))
self.assertEqual(1, new_publisher.calls)
self.assertEqual('b', getattr(new_publisher.events[0], 'event_type'))
def test_multiple_publisher(self):
self._set_pipeline_cfg('publishers', ['test://', 'new://'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
new_publisher = pipeline_manager.pipelines[0].publishers[1]
self.assertEqual(1, len(publisher.events))
self.assertEqual(1, len(new_publisher.events))
self.assertEqual('a', getattr(new_publisher.events[0], 'event_type'))
self.assertEqual('a', getattr(publisher.events[0], 'event_type'))
def test_multiple_publisher_isolation(self):
self._reraise_exception = False
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[1]
self.assertEqual(1, len(publisher.events))
self.assertEqual('a', getattr(publisher.events[0], 'event_type'))
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()

View File

@ -0,0 +1,13 @@
---
sources:
- name: event_source
events:
- "*"
sinks:
- event_sink
sinks:
- name: event_sink
transformers:
triggers:
publishers:
- direct://