move sample/event specifc pipeline models to own module

- move sample/event specifc pipeline models to own module
- make grouping key computation part of pipeline
- remove pipeline mocks from polling tests

Change-Id: I20349e48751090210f8a0074c4a735f1b7e74bc1
This commit is contained in:
gord chung 2017-10-31 15:37:21 +00:00
parent 9e58f1a6f4
commit cfbc3e00c2
26 changed files with 352 additions and 426 deletions

View File

@ -23,7 +23,7 @@ from oslo_config import cfg
from oslo_utils import timeutils
from stevedore import extension
from ceilometer import pipeline
from ceilometer.pipeline import sample as sample_pipe
from ceilometer import sample
from ceilometer import service
@ -77,7 +77,7 @@ def send_sample():
root_logger.addHandler(console)
root_logger.setLevel(logging.DEBUG)
pipeline_manager = pipeline.setup_pipeline(
pipeline_manager = sample_pipe.setup_pipeline(
conf, extension.ExtensionManager('ceilometer.transformer'))
with pipeline_manager.publisher() as p:

View File

@ -30,7 +30,8 @@ from tooz import coordination
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer import pipeline
from ceilometer.pipeline import event as event_endpoint
from ceilometer.pipeline import event as event_pipe
from ceilometer.pipeline import sample as sample_pipe
from ceilometer import utils
@ -153,9 +154,9 @@ class NotificationService(cotyledon.Service):
if self.conf.notification.workload_partitioning:
pipe_manager = pipeline.SamplePipelineTransportManager(self.conf)
for pipe in pipeline_manager.pipelines:
key = pipeline.get_pipeline_grouping_key(pipe)
key = pipe.get_grouping_key() or ['resource_id']
pipe_manager.add_transporter(
(pipe.source.support_meter, key or ['resource_id'],
(pipe.source.support_meter, key,
self._get_notifiers(transport, pipe)))
else:
pipe_manager = pipeline_manager
@ -195,9 +196,8 @@ class NotificationService(cotyledon.Service):
super(NotificationService, self).run()
self.coord_lock = threading.Lock()
self.pipeline_manager = pipeline.setup_pipeline(self.conf)
self.event_pipeline_manager = pipeline.setup_event_pipeline(self.conf)
self.pipeline_manager = sample_pipe.setup_pipeline(self.conf)
self.event_pipeline_manager = event_pipe.setup_pipeline(self.conf)
self.transport = messaging.get_transport(self.conf)
@ -247,7 +247,7 @@ class NotificationService(cotyledon.Service):
endpoints = []
endpoints.append(
event_endpoint.EventEndpoint(event_pipe_manager))
event_pipe.EventEndpoint(event_pipe_manager))
targets = self.get_targets()
for ext in notification_manager:
@ -290,7 +290,7 @@ class NotificationService(cotyledon.Service):
targets = []
for pipe in pipelines:
if isinstance(pipe, pipeline.EventPipeline):
if isinstance(pipe, event_pipe.EventPipeline):
endpoints.append(pipeline.EventPipelineEndpoint(pipe))
else:
endpoints.append(pipeline.SamplePipelineEndpoint(pipe))

View File

@ -23,7 +23,6 @@ from oslo_log import log
import oslo_messaging
from oslo_utils import timeutils
import six
from stevedore import extension
from ceilometer import agent
from ceilometer.event import models
@ -226,48 +225,6 @@ class PipelineSource(agent.Source):
self.cfg)
class EventSource(PipelineSource):
"""Represents a source of events.
In effect it is a set of notification handlers capturing events for a set
of matching notifications.
"""
def __init__(self, cfg):
super(EventSource, self).__init__(cfg)
self.events = cfg.get('events')
try:
self.check_source_filtering(self.events, 'events')
except agent.SourceException as err:
raise PipelineException(err.msg, cfg)
def support_event(self, event_name):
return self.is_supported(self.events, event_name)
class SampleSource(PipelineSource):
"""Represents a source of samples.
In effect it is a set of notification handlers processing
samples for a set of matching meters. Each source encapsulates meter name
matching and mapping to one or more sinks for publication.
"""
def __init__(self, cfg):
super(SampleSource, self).__init__(cfg)
try:
self.meters = cfg['meters']
except KeyError:
raise PipelineException("Missing meters value", cfg)
try:
self.check_source_filtering(self.meters, 'meters')
except agent.SourceException as err:
raise PipelineException(err.msg, cfg)
def support_meter(self, meter_name):
return self.is_supported(self.meters, meter_name)
class Sink(object):
"""Represents a sink for the transformation and publication of data.
@ -345,105 +302,11 @@ class Sink(object):
return transformers
class EventSink(Sink):
PUBLISHER_PURPOSE = 'event'
def publish_events(self, events):
if events:
for p in self.publishers:
try:
p.publish_events(events)
except Exception:
LOG.error("Pipeline %(pipeline)s: %(status)s "
"after error from publisher %(pub)s" %
{'pipeline': self,
'status': 'Continue' if
self.multi_publish else 'Exit', 'pub': p},
exc_info=True)
if not self.multi_publish:
raise
@staticmethod
def flush():
"""Flush data after all events have been injected to pipeline."""
class SampleSink(Sink):
PUBLISHER_PURPOSE = 'sample'
def _transform_sample(self, start, sample):
try:
for transformer in self.transformers[start:]:
sample = transformer.handle_sample(sample)
if not sample:
LOG.debug(
"Pipeline %(pipeline)s: Sample dropped by "
"transformer %(trans)s", {'pipeline': self,
'trans': transformer})
return
return sample
except Exception:
LOG.error("Pipeline %(pipeline)s: Exit after error "
"from transformer %(trans)s "
"for %(smp)s" % {'pipeline': self,
'trans': transformer,
'smp': sample},
exc_info=True)
def _publish_samples(self, start, samples):
"""Push samples into pipeline for publishing.
:param start: The first transformer that the sample will be injected.
This is mainly for flush() invocation that transformer
may emit samples.
:param samples: Sample list.
"""
transformed_samples = []
if not self.transformers:
transformed_samples = samples
else:
for sample in samples:
LOG.debug(
"Pipeline %(pipeline)s: Transform sample "
"%(smp)s from %(trans)s transformer", {'pipeline': self,
'smp': sample,
'trans': start})
sample = self._transform_sample(start, sample)
if sample:
transformed_samples.append(sample)
if transformed_samples:
for p in self.publishers:
try:
p.publish_samples(transformed_samples)
except Exception:
LOG.error("Pipeline %(pipeline)s: Continue after "
"error from publisher %(pub)s"
% {'pipeline': self, 'pub': p},
exc_info=True)
def publish_samples(self, samples):
self._publish_samples(0, samples)
def flush(self):
"""Flush data after all samples have been injected to pipeline."""
for (i, transformer) in enumerate(self.transformers):
try:
self._publish_samples(i + 1,
list(transformer.flush()))
except Exception:
LOG.error("Pipeline %(pipeline)s: Error "
"flushing transformer %(trans)s"
% {'pipeline': self, 'trans': transformer},
exc_info=True)
@six.add_metaclass(abc.ABCMeta)
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
@ -469,78 +332,11 @@ class Pipeline(object):
def publish_data(self, data):
"""Publish data from pipeline."""
class EventPipeline(Pipeline):
"""Represents a pipeline for Events."""
def __str__(self):
# NOTE(gordc): prepend a namespace so we ensure event and sample
# pipelines do not have the same name.
return 'event:%s' % super(EventPipeline, self).__str__()
def support_event(self, event_type):
return self.source.support_event(event_type)
def publish_data(self, events):
if not isinstance(events, list):
events = [events]
supported = [e for e in events
if self.source.support_event(e.event_type)]
self.sink.publish_events(supported)
class SamplePipeline(Pipeline):
"""Represents a pipeline for Samples."""
def support_meter(self, meter_name):
return self.source.support_meter(meter_name)
def _validate_volume(self, s):
volume = s.volume
if volume is None:
LOG.warning(
'metering data %(counter_name)s for %(resource_id)s '
'@ %(timestamp)s has no volume (volume: None), the sample will'
' be dropped'
% {'counter_name': s.name,
'resource_id': s.resource_id,
'timestamp': s.timestamp if s.timestamp else 'NO TIMESTAMP'}
)
return False
if not isinstance(volume, (int, float)):
try:
volume = float(volume)
except ValueError:
LOG.warning(
'metering data %(counter_name)s for %(resource_id)s '
'@ %(timestamp)s has volume which is not a number '
'(volume: %(counter_volume)s), the sample will be dropped'
% {'counter_name': s.name,
'resource_id': s.resource_id,
'timestamp': (
s.timestamp if s.timestamp else 'NO TIMESTAMP'),
'counter_volume': volume}
)
return False
return True
def publish_data(self, samples):
if not isinstance(samples, list):
samples = [samples]
supported = [s for s in samples if self.source.support_meter(s.name)
and self._validate_volume(s)]
self.sink.publish_samples(supported)
SAMPLE_TYPE = {'name': 'sample',
'pipeline': SamplePipeline,
'source': SampleSource,
'sink': SampleSink}
EVENT_TYPE = {'name': 'event',
'pipeline': EventPipeline,
'source': EventSource,
'sink': EventSink}
def get_grouping_key(self):
keys = []
for transformer in self.sink.transformers:
keys += transformer.grouping_keys
return list(set(keys))
class PublisherManager(object):
@ -564,8 +360,7 @@ class PipelineManager(agent.ConfigManagerBase):
Pipeline manager sets up pipelines according to config file
"""
def __init__(self, conf, cfg_file, transformer_manager,
p_type=SAMPLE_TYPE):
def __init__(self, conf, cfg_file, transformer_manager, p_type):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@ -674,29 +469,6 @@ class PipelineManager(agent.ConfigManagerBase):
return PublishContext(self.pipelines)
def setup_event_pipeline(conf, transformer_manager=None):
"""Setup event pipeline manager according to yaml config file."""
default = extension.ExtensionManager('ceilometer.transformer')
cfg_file = conf.event_pipeline_cfg_file
return PipelineManager(conf, cfg_file, transformer_manager or default,
EVENT_TYPE)
def setup_pipeline(conf, transformer_manager=None):
"""Setup pipeline manager according to yaml config file."""
default = extension.ExtensionManager('ceilometer.transformer')
cfg_file = conf.pipeline_cfg_file
return PipelineManager(conf, cfg_file, transformer_manager or default,
SAMPLE_TYPE)
def get_pipeline_grouping_key(pipe):
keys = []
for transformer in pipe.sink.transformers:
keys += transformer.grouping_keys
return list(set(keys))
class NotificationEndpoint(object):
"""Base Endpoint for plugins that support the notification API."""

View File

@ -16,6 +16,7 @@ from oslo_log import log
import oslo_messaging
from stevedore import extension
from ceilometer import agent
from ceilometer.event import converter
from ceilometer import pipeline
@ -60,3 +61,77 @@ class EventEndpoint(pipeline.NotificationEndpoint):
return oslo_messaging.NotificationResult.REQUEUE
LOG.error('Fail to process a notification', exc_info=True)
return oslo_messaging.NotificationResult.HANDLED
class EventSource(pipeline.PipelineSource):
"""Represents a source of events.
In effect it is a set of notification handlers capturing events for a set
of matching notifications.
"""
def __init__(self, cfg):
super(EventSource, self).__init__(cfg)
self.events = cfg.get('events')
try:
self.check_source_filtering(self.events, 'events')
except agent.SourceException as err:
raise pipeline.PipelineException(err.msg, cfg)
def support_event(self, event_name):
return self.is_supported(self.events, event_name)
class EventSink(pipeline.Sink):
def publish_events(self, events):
if events:
for p in self.publishers:
try:
p.publish_events(events)
except Exception:
LOG.error("Pipeline %(pipeline)s: %(status)s "
"after error from publisher %(pub)s" %
{'pipeline': self,
'status': 'Continue' if
self.multi_publish else 'Exit', 'pub': p},
exc_info=True)
if not self.multi_publish:
raise
class EventPipeline(pipeline.Pipeline):
"""Represents a pipeline for Events."""
def __str__(self):
# NOTE(gordc): prepend a namespace so we ensure event and sample
# pipelines do not have the same name.
return 'event:%s' % super(EventPipeline, self).__str__()
def support_event(self, event_type):
return self.source.support_event(event_type)
def publish_data(self, events):
if not isinstance(events, list):
events = [events]
supported = [e for e in events
if self.source.support_event(e.event_type)]
self.sink.publish_events(supported)
class EventPipelineManager(pipeline.PipelineManager):
def __init__(self, conf, cfg_file, transformer_manager):
# FIXME(gordc): improve how we set pipeline specific models
pipeline_types = {'name': 'event', 'pipeline': EventPipeline,
'source': EventSource, 'sink': EventSink}
super(EventPipelineManager, self).__init__(
conf, cfg_file, transformer_manager, pipeline_types)
def setup_pipeline(conf, transformer_manager=None):
"""Setup event pipeline manager according to yaml config file."""
default = extension.ExtensionManager('ceilometer.transformer')
cfg_file = conf.event_pipeline_cfg_file
return EventPipelineManager(
conf, cfg_file, transformer_manager or default)

View File

@ -11,7 +11,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from stevedore import extension
from ceilometer import agent
from ceilometer import pipeline
LOG = log.getLogger(__name__)
@ -44,3 +46,159 @@ class SampleEndpoint(pipeline.NotificationEndpoint):
def build_sample(notification):
"""Build sample from provided notification."""
pass
class SampleSource(pipeline.PipelineSource):
"""Represents a source of samples.
In effect it is a set of notification handlers processing
samples for a set of matching meters. Each source encapsulates meter name
matching and mapping to one or more sinks for publication.
"""
def __init__(self, cfg):
super(SampleSource, self).__init__(cfg)
try:
self.meters = cfg['meters']
except KeyError:
raise pipeline.PipelineException("Missing meters value", cfg)
try:
self.check_source_filtering(self.meters, 'meters')
except agent.SourceException as err:
raise pipeline.PipelineException(err.msg, cfg)
def support_meter(self, meter_name):
return self.is_supported(self.meters, meter_name)
class SampleSink(pipeline.Sink):
def _transform_sample(self, start, sample):
try:
for transformer in self.transformers[start:]:
sample = transformer.handle_sample(sample)
if not sample:
LOG.debug(
"Pipeline %(pipeline)s: Sample dropped by "
"transformer %(trans)s", {'pipeline': self,
'trans': transformer})
return
return sample
except Exception:
LOG.error("Pipeline %(pipeline)s: Exit after error "
"from transformer %(trans)s "
"for %(smp)s" % {'pipeline': self,
'trans': transformer,
'smp': sample},
exc_info=True)
def _publish_samples(self, start, samples):
"""Push samples into pipeline for publishing.
:param start: The first transformer that the sample will be injected.
This is mainly for flush() invocation that transformer
may emit samples.
:param samples: Sample list.
"""
transformed_samples = []
if not self.transformers:
transformed_samples = samples
else:
for sample in samples:
LOG.debug(
"Pipeline %(pipeline)s: Transform sample "
"%(smp)s from %(trans)s transformer", {'pipeline': self,
'smp': sample,
'trans': start})
sample = self._transform_sample(start, sample)
if sample:
transformed_samples.append(sample)
if transformed_samples:
for p in self.publishers:
try:
p.publish_samples(transformed_samples)
except Exception:
LOG.error("Pipeline %(pipeline)s: Continue after "
"error from publisher %(pub)s"
% {'pipeline': self, 'pub': p},
exc_info=True)
def publish_samples(self, samples):
self._publish_samples(0, samples)
def flush(self):
"""Flush data after all samples have been injected to pipeline."""
for (i, transformer) in enumerate(self.transformers):
try:
self._publish_samples(i + 1,
list(transformer.flush()))
except Exception:
LOG.error("Pipeline %(pipeline)s: Error "
"flushing transformer %(trans)s"
% {'pipeline': self, 'trans': transformer},
exc_info=True)
class SamplePipeline(pipeline.Pipeline):
"""Represents a pipeline for Samples."""
def support_meter(self, meter_name):
return self.source.support_meter(meter_name)
def _validate_volume(self, s):
volume = s.volume
if volume is None:
LOG.warning(
'metering data %(counter_name)s for %(resource_id)s '
'@ %(timestamp)s has no volume (volume: None), the sample will'
' be dropped'
% {'counter_name': s.name,
'resource_id': s.resource_id,
'timestamp': s.timestamp if s.timestamp else 'NO TIMESTAMP'}
)
return False
if not isinstance(volume, (int, float)):
try:
volume = float(volume)
except ValueError:
LOG.warning(
'metering data %(counter_name)s for %(resource_id)s '
'@ %(timestamp)s has volume which is not a number '
'(volume: %(counter_volume)s), the sample will be dropped'
% {'counter_name': s.name,
'resource_id': s.resource_id,
'timestamp': (
s.timestamp if s.timestamp else 'NO TIMESTAMP'),
'counter_volume': volume}
)
return False
return True
def publish_data(self, samples):
if not isinstance(samples, list):
samples = [samples]
supported = [s for s in samples if self.source.support_meter(s.name)
and self._validate_volume(s)]
self.sink.publish_samples(supported)
class SamplePipelineManager(pipeline.PipelineManager):
def __init__(self, conf, cfg_file, transformer_manager):
# FIXME(gordc): improve how we set pipeline specific models
pipeline_types = {'name': 'sample', 'pipeline': SamplePipeline,
'source': SampleSource, 'sink': SampleSink}
super(SamplePipelineManager, self).__init__(
conf, cfg_file, transformer_manager, pipeline_types)
def setup_pipeline(conf, transformer_manager=None):
"""Setup pipeline manager according to yaml config file."""
default = extension.ExtensionManager('ceilometer.transformer')
cfg_file = conf.pipeline_cfg_file
return SamplePipelineManager(
conf, cfg_file, transformer_manager or default)

View File

@ -16,8 +16,6 @@
import time
import mock
from ceilometer.compute.pollsters import instance_stats
from ceilometer.compute.virt import inspector as virt_inspector
from ceilometer.polling import manager
@ -26,7 +24,6 @@ from ceilometer.tests.unit.compute.pollsters import base
class TestCPUPollster(base.TestPollsterBase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._mock_inspect_instance(
virt_inspector.InstanceStats(cpu_time=1 * (10 ** 6), cpu_number=2),
@ -55,7 +52,6 @@ class TestCPUPollster(base.TestPollsterBase):
# the following apply to all instance resource pollsters but are tested
# here alone.
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_metadata(self):
mgr = manager.AgentManager(0, self.CONF)
pollster = instance_stats.CPUPollster(self.CONF)
@ -70,7 +66,6 @@ class TestCPUPollster(base.TestPollsterBase):
self.assertEqual('active', samples[0].resource_metadata['state'])
self.assertIsNone(samples[0].resource_metadata['task_state'])
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_reserved_metadata_with_keys(self):
self.CONF.set_override('reserved_metadata_keys', ['fqdn'])
@ -81,7 +76,6 @@ class TestCPUPollster(base.TestPollsterBase):
'stack': '2cadc4b4-8789-123c-b4eg-edd2f0a9c128'},
samples[0].resource_metadata['user_metadata'])
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_reserved_metadata_with_namespace(self):
mgr = manager.AgentManager(0, self.CONF)
pollster = instance_stats.CPUPollster(self.CONF)
@ -95,7 +89,6 @@ class TestCPUPollster(base.TestPollsterBase):
samples = list(pollster.get_samples(mgr, {}, [self.instance]))
self.assertNotIn('user_metadata', samples[0].resource_metadata)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_flavor_name_as_metadata_instance_type(self):
mgr = manager.AgentManager(0, self.CONF)
pollster = instance_stats.CPUPollster(self.CONF)
@ -107,7 +100,6 @@ class TestCPUPollster(base.TestPollsterBase):
class TestCPUUtilPollster(base.TestPollsterBase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._mock_inspect_instance(
virt_inspector.InstanceStats(cpu_util=40),
@ -131,7 +123,6 @@ class TestCPUUtilPollster(base.TestPollsterBase):
class TestCPUL3CachePollster(base.TestPollsterBase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._mock_inspect_instance(
virt_inspector.InstanceStats(cpu_l3_cache_usage=90112),

View File

@ -45,7 +45,6 @@ class TestBaseDiskIO(base.TestPollsterBase):
instances.append(instance)
return instances
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def _check_get_samples(self, factory, name, expected_count=2):
pollster = factory(self.CONF)

View File

@ -16,7 +16,6 @@
"""Tests for the compute pollsters.
"""
import mock
from oslotest import base
import six
@ -43,7 +42,6 @@ class FauxInstance(object):
class TestLocationMetadata(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
self.CONF = service.prepare_service([], [])
self.manager = manager.AgentManager(0, self.CONF)

View File

@ -24,7 +24,6 @@ from ceilometer.tests.unit.compute.pollsters import base
class TestMemoryPollster(base.TestPollsterBase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._mock_inspect_instance(
virt_inspector.InstanceStats(memory_usage=1.0),
@ -54,7 +53,6 @@ class TestMemoryPollster(base.TestPollsterBase):
_verify_memory_metering(0, 0, 1)
_verify_memory_metering(0, 0, 0)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples_with_empty_stats(self):
self._mock_inspect_instance(virt_inspector.NoDataException())
@ -70,7 +68,6 @@ class TestMemoryPollster(base.TestPollsterBase):
class TestResidentMemoryPollster(base.TestPollsterBase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._mock_inspect_instance(
virt_inspector.InstanceStats(memory_resident=1.0),
@ -105,7 +102,6 @@ class TestResidentMemoryPollster(base.TestPollsterBase):
class TestMemorySwapPollster(base.TestPollsterBase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._mock_inspect_instance(
virt_inspector.InstanceStats(memory_swap_in=1.0,
@ -137,7 +133,6 @@ class TestMemorySwapPollster(base.TestPollsterBase):
_check_memory_swap_in(1.0)
_check_memory_swap_out(4.0)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples_with_empty_stats(self):
self._mock_inspect_instance(virt_inspector.NoDataException())
mgr = manager.AgentManager(0, self.CONF)
@ -151,7 +146,6 @@ class TestMemorySwapPollster(base.TestPollsterBase):
class TestMemoryBandwidthPollster(base.TestPollsterBase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._mock_inspect_instance(
virt_inspector.InstanceStats(memory_bandwidth_total=1892352,
@ -183,7 +177,6 @@ class TestMemoryBandwidthPollster(base.TestPollsterBase):
_check_memory_bandwidth_total(1892352)
_check_memory_bandwidth_local(90112)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples_with_empty_stats(self):
self._mock_inspect_instance(virt_inspector.NoDataException())
mgr = manager.AgentManager(0, self.CONF)

View File

@ -117,7 +117,6 @@ class TestNetPollster(base.TestPollsterBase):
self.faux_instance = FauxInstance(**self.INSTANCE_PROPERTIES)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def _check_get_samples(self, factory, expected):
mgr = manager.AgentManager(0, self.CONF)
pollster = factory(self.CONF)
@ -226,7 +225,6 @@ class TestNetPollster(base.TestPollsterBase):
],
)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_metadata(self):
factory = net.OutgoingBytesPollster
pollster = factory(self.CONF)
@ -284,7 +282,6 @@ class TestNetRatesPollster(base.TestPollsterBase):
]
self.inspector.inspect_vnic_rates = mock.Mock(return_value=vnics)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def _check_get_samples(self, factory, expected):
mgr = manager.AgentManager(0, self.CONF)
pollster = factory(self.CONF)

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ceilometer.compute.pollsters import instance_stats
from ceilometer.compute.virt import inspector as virt_inspector
from ceilometer.polling import manager
@ -23,7 +21,6 @@ from ceilometer.tests.unit.compute.pollsters import base
class TestPerfPollster(base.TestPollsterBase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._mock_inspect_instance(
virt_inspector.InstanceStats(cpu_cycles=7259361,
@ -76,7 +73,6 @@ class TestPerfPollster(base.TestPollsterBase):
_check_perf_events_cache_references(74184)
_check_perf_events_cache_misses(16737)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples_with_empty_stats(self):
self._mock_inspect_instance(virt_inspector.NoDataException())
mgr = manager.AgentManager(0, self.CONF)

View File

@ -21,8 +21,7 @@ from oslo_utils import fileutils
import six
import yaml
from ceilometer import pipeline
from ceilometer.pipeline import event as event_endpoint
from ceilometer.pipeline import event as event_pipe
from ceilometer import publisher
from ceilometer.publisher import test
from ceilometer import service
@ -109,12 +108,12 @@ class TestEventEndpoint(tests_base.BaseTestCase):
self.CONF.set_override('event_pipeline_cfg_file',
ev_pipeline_cfg_file)
ev_pipeline_mgr = pipeline.setup_event_pipeline(self.CONF)
ev_pipeline_mgr = event_pipe.setup_pipeline(self.CONF)
return ev_pipeline_mgr
def _setup_endpoint(self, publishers):
ev_pipeline_mgr = self._setup_pipeline(publishers)
self.endpoint = event_endpoint.EventEndpoint(ev_pipeline_mgr)
self.endpoint = event_pipe.EventEndpoint(ev_pipeline_mgr)
self.endpoint.event_converter = mock.MagicMock()
self.endpoint.event_converter.to_event.return_value = mock.MagicMock(
@ -165,7 +164,7 @@ class TestEventEndpoint(tests_base.BaseTestCase):
'metadata': {},
'ctxt': {}
}
with mock.patch("ceilometer.pipeline.LOG") as mock_logger:
with mock.patch("ceilometer.pipeline.event.LOG") as mock_logger:
ret = self.endpoint.process_notifications('info', [message])
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)
exception_mock = mock_logger.error
@ -185,7 +184,7 @@ class TestEventEndpoint(tests_base.BaseTestCase):
'metadata': {},
'ctxt': {}
}
with mock.patch("ceilometer.pipeline.LOG") as mock_logger:
with mock.patch("ceilometer.pipeline.event.LOG") as mock_logger:
ret = self.endpoint.process_notifications('info', [message])
self.assertEqual(oslo_messaging.NotificationResult.HANDLED, ret)
exception_mock = mock_logger.error

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ceilometer.image import glance
from ceilometer.polling import manager
from ceilometer import service
@ -84,7 +82,6 @@ IMAGE_LIST = [
class TestImagePollsterPageSize(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(TestImagePollsterPageSize, self).setUp()
conf = service.prepare_service([], [])

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ceilometer.ipmi.pollsters import node
from ceilometer.tests.unit.ipmi.pollsters import base
@ -27,7 +25,6 @@ class TestPowerPollster(base.TestPollsterBase):
def make_pollster(self):
return node.PowerPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -44,7 +41,6 @@ class TestInletTemperaturePollster(base.TestPollsterBase):
def make_pollster(self):
return node.InletTemperaturePollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -61,7 +57,6 @@ class TestOutletTemperaturePollster(base.TestPollsterBase):
def make_pollster(self):
return node.OutletTemperaturePollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -78,7 +73,6 @@ class TestAirflowPollster(base.TestPollsterBase):
def make_pollster(self):
return node.AirflowPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -95,7 +89,6 @@ class TestCUPSIndexPollster(base.TestPollsterBase):
def make_pollster(self):
return node.CUPSIndexPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -113,7 +106,6 @@ class CPUUtilPollster(base.TestPollsterBase):
def make_pollster(self):
return node.CPUUtilPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -131,7 +123,6 @@ class MemUtilPollster(base.TestPollsterBase):
def make_pollster(self):
return node.MemUtilPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -149,7 +140,6 @@ class IOUtilPollster(base.TestPollsterBase):
def make_pollster(self):
return node.IOUtilPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ceilometer.ipmi.pollsters import sensor
from ceilometer.tests.unit.ipmi.notifications import ipmi_test_data
from ceilometer.tests.unit.ipmi.pollsters import base
@ -47,7 +45,6 @@ class TestTemperatureSensorPollster(base.TestPollsterBase):
def make_pollster(self):
return sensor.TemperatureSensorPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -62,7 +59,6 @@ class TestMissingSensorData(base.TestPollsterBase):
def make_pollster(self):
return sensor.TemperatureSensorPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
self._verify_metering(0)
@ -76,7 +72,6 @@ class TestMalformedSensorData(base.TestPollsterBase):
def make_pollster(self):
return sensor.TemperatureSensorPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
self._verify_metering(0)
@ -90,7 +85,6 @@ class TestMissingSensorId(base.TestPollsterBase):
def make_pollster(self):
return sensor.TemperatureSensorPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
self._verify_metering(0)
@ -104,7 +98,6 @@ class TestFanSensorPollster(base.TestPollsterBase):
def make_pollster(self):
return sensor.FanSensorPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -119,7 +112,6 @@ class TestCurrentSensorPollster(base.TestPollsterBase):
def make_pollster(self):
return sensor.CurrentSensorPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()
@ -134,7 +126,6 @@ class TestVoltageSensorPollster(base.TestPollsterBase):
def make_pollster(self):
return sensor.VoltageSensorPollster(self.CONF)
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_get_samples(self):
self._test_get_samples()

View File

@ -26,7 +26,6 @@ from ceilometer import service
class _BaseTestFWPollster(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(_BaseTestFWPollster, self).setUp()
self.addCleanup(mock.patch.stopall)

View File

@ -26,7 +26,6 @@ from ceilometer import service
class _BaseTestLBPollster(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(_BaseTestLBPollster, self).setUp()
self.addCleanup(mock.patch.stopall)
@ -470,7 +469,6 @@ class TestLBStatsPollster(_BaseTestLBPollster):
}
}
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def _check_get_samples(self, factory, sample_name, expected_volume,
expected_type):
pollster = factory(self.CONF)

View File

@ -25,7 +25,6 @@ from ceilometer import service
class _BaseTestLBPollster(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(_BaseTestLBPollster, self).setUp()
self.addCleanup(mock.patch.stopall)
@ -264,7 +263,6 @@ class TestLBStatsPollster(_BaseTestLBPollster):
'bytes_out': 3,
'total_connections': 4}
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def _check_get_samples(self, factory, sample_name, expected_volume,
expected_type):
pollster = factory(self.CONF)

View File

@ -26,7 +26,6 @@ from ceilometer import service
class _BaseTestVPNPollster(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(_BaseTestVPNPollster, self).setUp()
self.addCleanup(mock.patch.stopall)

View File

@ -27,7 +27,6 @@ from ceilometer import service
class _BaseTestFloatingIPPollster(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(_BaseTestFloatingIPPollster, self).setUp()
self.CONF = service.prepare_service([], [])

View File

@ -85,7 +85,6 @@ class TestRgwPollster(testscenarios.testcase.WithScenarios,
if i[0] in tenant_ids:
yield i
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(TestRgwPollster, self).setUp()
conf = service.prepare_service([], [])

View File

@ -102,7 +102,6 @@ class TestSwiftPollster(testscenarios.testcase.WithScenarios,
if i[0] in tenant_ids:
yield i
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(TestSwiftPollster, self).setUp()
self.CONF = service.prepare_service([], [])

View File

@ -27,7 +27,8 @@ from oslo_utils import timeutils
import six
from stevedore import extension
from ceilometer import pipeline
from ceilometer import pipeline as pipe_base
from ceilometer.pipeline import sample as pipeline
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
from ceilometer import sample
@ -194,8 +195,8 @@ class BasePipelineTestCase(base.BaseTestCase):
"""Clear an existing field in the pipeline config."""
def _exception_create_pipelinemanager(self):
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.assertRaises(pipe_base.PipelineException,
pipeline.SamplePipelineManager,
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
@ -206,9 +207,9 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_no_transformers(self):
self._unset_pipeline_cfg('transformers')
pipeline.PipelineManager(self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
pipeline.SamplePipelineManager(self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
def test_no_name(self):
self._unset_pipeline_cfg('name')
@ -246,7 +247,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._exception_create_pipelinemanager()
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -262,7 +263,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_included_counters(self):
counter_cfg = ['a', 'b']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
@ -292,10 +293,10 @@ class BasePipelineTestCase(base.BaseTestCase):
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
self.assertEqual('b_update', getattr(publisher.samples[1], "name"))
@mock.patch('ceilometer.pipeline.LOG')
@mock.patch('ceilometer.pipeline.sample.LOG')
def test_none_volume_counter(self, LOG):
self._set_pipeline_cfg('meters', ['empty_volume'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -326,10 +327,10 @@ class BasePipelineTestCase(base.BaseTestCase):
self.assertEqual(0, len(publisher.samples))
@mock.patch('ceilometer.pipeline.LOG')
@mock.patch('ceilometer.pipeline.sample.LOG')
def test_fake_volume_counter(self, LOG):
self._set_pipeline_cfg('meters', ['fake_volume'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -363,7 +364,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_counter_dont_match(self):
counter_cfg = ['nomatch']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -376,7 +377,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_wildcard_counter(self):
counter_cfg = ['*']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -390,7 +391,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_wildcard_excluded_counters(self):
counter_cfg = ['*', '!a']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
@ -398,7 +399,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_wildcard_excluded_counters_not_excluded(self):
counter_cfg = ['*', '!b']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -411,7 +412,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_all_excluded_counters_not_excluded(self):
counter_cfg = ['!b', '!c']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -427,7 +428,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_all_excluded_counters_is_excluded(self):
counter_cfg = ['!a', '!c']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
@ -437,7 +438,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_wildcard_and_excluded_wildcard_counters(self):
counter_cfg = ['*', '!disk.*']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
@ -447,7 +448,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_included_counter_and_wildcard_counters(self):
counter_cfg = ['cpu', 'disk.*']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertTrue(pipeline_manager.pipelines[0].
@ -459,7 +460,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_excluded_counter_and_excluded_wildcard_counters(self):
counter_cfg = ['!cpu', '!disk.*']
self._set_pipeline_cfg('meters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
@ -471,7 +472,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_pipeline(self):
self._augment_pipeline_cfg()
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -509,7 +510,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_pipeline_exception(self):
self._reraise_exception = False
self._break_pipeline_cfg()
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
@ -543,7 +544,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_none_transformer_pipeline(self):
self._set_pipeline_cfg('transformers', None)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -555,7 +556,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_empty_transformer_pipeline(self):
self._set_pipeline_cfg('transformers', [])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -577,7 +578,7 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -612,7 +613,7 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -651,7 +652,7 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -669,7 +670,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_publisher(self):
self._set_pipeline_cfg('publishers', ['test://', 'new://'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -687,7 +688,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_publisher_isolation(self):
self._reraise_exception = False
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -700,7 +701,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_counter_pipeline(self):
self._set_pipeline_cfg('meters', ['a', 'b'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -740,7 +741,7 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -778,7 +779,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
self._set_pipeline_cfg('meters', ['a', 'b'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -813,7 +814,7 @@ class BasePipelineTestCase(base.BaseTestCase):
'parameters': {}
}]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -854,7 +855,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={}
),
]
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -911,7 +912,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={}
),
]
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
@ -1002,7 +1003,7 @@ class BasePipelineTestCase(base.BaseTestCase):
'user_metadata': um},
),
]
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1087,7 +1088,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'cpu_number': 4}
),
]
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1114,7 +1115,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('meters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1176,7 +1177,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('meters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1234,7 +1235,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('meters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1356,7 +1357,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('meters', ['disk.read.bytes',
'disk.write.requests'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1441,7 +1442,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '3.0'}
),
]
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
@ -1478,7 +1479,7 @@ class BasePipelineTestCase(base.BaseTestCase):
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={'version': '1.0'}
))
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1674,7 +1675,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '2.0'}
)
]
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1714,7 +1715,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '1.0'}
),
]
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1762,7 +1763,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '2.0'}
)
]
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1909,7 +1910,7 @@ class BasePipelineTestCase(base.BaseTestCase):
timestamp=timeutils.utcnow().isoformat(),
resource_metadata=s.get('metadata')
))
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -2029,7 +2030,7 @@ class BasePipelineTestCase(base.BaseTestCase):
timestamp=timeutils.utcnow().isoformat(),
resource_metadata=None
)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -2062,7 +2063,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('meters', ['unrelated-sample'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
mock_utcnow.return_value = now + datetime.timedelta(seconds=200)
@ -2083,7 +2084,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('meters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -2254,12 +2255,11 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(set(['resource_id', 'counter_name']),
set(pipeline.get_pipeline_grouping_key(
pipeline_manager.pipelines[0])))
set(pipeline_manager.pipelines[0].get_grouping_key()))
def test_get_pipeline_duplicate_grouping_key(self):
transformer_cfg = [
@ -2273,9 +2273,8 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(['counter_name'],
pipeline.get_pipeline_grouping_key(
pipeline_manager.pipelines[0]))
pipeline_manager.pipelines[0].get_grouping_key())

View File

@ -15,7 +15,8 @@
import yaml
from ceilometer import pipeline
from ceilometer import pipeline as pipe_base
from ceilometer.pipeline import sample as pipeline
from ceilometer import sample
from ceilometer.tests.unit import pipeline_base
@ -122,7 +123,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'publishers': ['new'],
})
self.pipeline_cfg['sources'][0]['sinks'].append('second_sink')
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -163,7 +164,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'meters': ['b'],
'sinks': ['test_sink']
})
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -209,7 +210,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
pipeline_cfg = yaml.safe_load(data)
for s in pipeline_cfg['sinks']:
s['publishers'] = ['test://']
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = pipeline.SamplePipelineManager(
self.CONF,
self.cfg2file(pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[index]
@ -262,8 +263,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'name': 'test_sink',
'publishers': ['except'],
})
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.assertRaises(pipe_base.PipelineException,
pipeline.SamplePipelineManager,
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
@ -274,8 +275,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'meters': ['a'],
'sinks': ['test_sink']
})
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.assertRaises(pipe_base.PipelineException,
pipeline.SamplePipelineManager,
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)

View File

@ -21,6 +21,7 @@ import oslo_messaging
from ceilometer.event import models
from ceilometer import pipeline
from ceilometer.pipeline import event
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
from ceilometer.publisher import utils
@ -47,7 +48,6 @@ class EventPipelineTestCase(base.BaseTestCase):
super(EventPipelineTestCase, self).setUp()
self.CONF = service.prepare_service([], [])
self.p_type = pipeline.EVENT_TYPE
self.transformer_manager = None
self.test_event = models.Event(
@ -151,11 +151,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def _exception_create_pipelinemanager(self):
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
event.EventPipelineManager,
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
def test_no_events(self):
self._unset_pipeline_cfg('events')
@ -166,11 +165,10 @@ class EventPipelineTestCase(base.BaseTestCase):
self._exception_create_pipelinemanager()
def test_name(self):
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
for pipe in pipeline_manager.pipelines:
self.assertTrue(pipe.name.startswith('event:'))
@ -200,11 +198,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_multiple_included_events(self):
event_cfg = ['a', 'b']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_event])
@ -222,11 +219,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_event_non_match(self):
event_cfg = ['nomatch']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_event])
@ -237,11 +233,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_wildcard_event(self):
event_cfg = ['*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_event])
@ -252,21 +247,19 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_wildcard_excluded_events(self):
event_cfg = ['*', '!a']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_event('a'))
def test_wildcard_excluded_events_not_excluded(self):
event_cfg = ['*', '!b']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -276,11 +269,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_all_excluded_events_not_excluded(self):
event_cfg = ['!b', '!c']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_event])
@ -291,11 +283,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_all_excluded_events_excluded(self):
event_cfg = ['!a', '!c']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_event('a'))
self.assertTrue(pipeline_manager.pipelines[0].support_event('b'))
self.assertFalse(pipeline_manager.pipelines[0].support_event('c'))
@ -303,11 +294,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_wildcard_and_excluded_wildcard_events(self):
event_cfg = ['*', '!compute.*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
support_event('compute.instance.create.start'))
self.assertTrue(pipeline_manager.pipelines[0].
@ -316,11 +306,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_included_event_and_wildcard_events(self):
event_cfg = ['compute.instance.create.start', 'identity.*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
self.assertTrue(pipeline_manager.pipelines[0].
support_event('identity.user.create'))
self.assertTrue(pipeline_manager.pipelines[0].
@ -331,11 +320,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_excluded_event_and_excluded_wildcard_events(self):
event_cfg = ['!compute.instance.create.start', '!identity.*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
support_event('identity.user.create'))
self.assertFalse(pipeline_manager.pipelines[0].
@ -346,11 +334,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_multiple_pipeline(self):
self._augment_pipeline_cfg()
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_event, self.test_event2])
@ -365,11 +352,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_multiple_publisher(self):
self._set_pipeline_cfg('publishers', ['test://', 'new://'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_event])
@ -384,11 +370,10 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_multiple_publisher_isolation(self):
self._reraise_exception = False
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_event])
@ -426,11 +411,10 @@ class EventPipelineTestCase(base.BaseTestCase):
'ceilometer.publisher.test.TestPublisher',
return_value=fake_publisher))
pipeline_manager = pipeline.PipelineManager(
pipeline_manager = event.EventPipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager,
self.p_type)
self.transformer_manager)
event_pipeline_endpoint = pipeline.EventPipelineEndpoint(
pipeline_manager.pipelines[0])

View File

@ -11,8 +11,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ceilometer.polling import manager
from ceilometer import service
import ceilometer.tests.base as base
@ -104,7 +102,6 @@ BACKUP_LIST = [
class TestVolumeSizePollster(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(TestVolumeSizePollster, self).setUp()
conf = service.prepare_service([], [])
@ -124,7 +121,6 @@ class TestVolumeSizePollster(base.BaseTestCase):
class TestVolumeSnapshotSizePollster(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(TestVolumeSnapshotSizePollster, self).setUp()
conf = service.prepare_service([], [])
@ -146,7 +142,6 @@ class TestVolumeSnapshotSizePollster(base.BaseTestCase):
class TestVolumeBackupSizePollster(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(TestVolumeBackupSizePollster, self).setUp()
conf = service.prepare_service([], [])