drop deprecated pipeline

the old pipeline format was deprecated as of icehouse. this patch
switches all tests to use current pipeline format and drops support
for old pipeline.

Change-Id: Ide53c1c5beab4a586324c4727dba3a9e200f8082
This commit is contained in:
gordon chung 2015-07-02 21:39:39 -04:00
parent 54f33311ca
commit d2ae9d6dc6
7 changed files with 199 additions and 445 deletions

View File

@ -578,74 +578,44 @@ class PipelineManager(object):
def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
"""Setup the pipelines according to config.
The configuration is supported in one of two forms:
The configuration is supported as follows:
1. Deprecated: the source and sink configuration are conflated
as a list of consolidated pipelines.
Decoupled: the source and sink configuration are separately
specified before being linked together. This allows source-
specific configuration, such as resource discovery, to be
kept focused only on the fine-grained source while avoiding
the necessity for wide duplication of sink-related config.
The pipelines are defined as a list of dictionaries each
specifying the target samples, the transformers involved,
and the target publishers, for example:
The configuration is provided in the form of separate lists
of dictionaries defining sources and sinks, for example:
[{"name": pipeline_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
"transformers": [
{"name": "Transformer_1",
"parameters": {"p1": "value"}},
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
"sinks" : ["sink_1", "sink_2"]
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
"sinks" : ["sink_2"]
},
],
"sinks": [{"name": sink_1,
"transformers": [
{"name": "Transformer_1",
"parameters": {"p1": "value"}},
{"name": "Transformer_2",
"parameters": {"p1": "value"}},
],
"publishers": ["publisher_1", "publisher_2"]
},
{"name": pipeline_2,
"interval": interval_time,
"meters" : ["meter_3"],
"publishers": ["publisher_3"]
},
]
2. Decoupled: the source and sink configuration are separately
specified before being linked together. This allows source-
specific configuration, such as resource discovery, to be
kept focused only on the fine-grained source while avoiding
the necessity for wide duplication of sink-related config.
The configuration is provided in the form of separate lists
of dictionaries defining sources and sinks, for example:
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
"sinks" : ["sink_1", "sink_2"]
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
"sinks" : ["sink_2"]
},
],
"sinks": [{"name": sink_1,
"transformers": [
{"name": "Transformer_1",
"parameters": {"p1": "value"}},
{"name": "Transformer_2",
"parameters": {"p1": "value"}},
],
"publishers": ["publisher_1", "publisher_2"]
},
{"name": sink_2,
"publishers": ["publisher_3"]
},
]
}
The semantics of the common individual configuration elements
are identical in the deprecated and decoupled version.
{"name": "Transformer_2",
"parameters": {"p1": "value"}},
],
"publishers": ["publisher_1", "publisher_2"]
},
{"name": sink_2,
"publishers": ["publisher_3"]
},
]
}
The interval determines the cadence of sample injection into
the pipeline where samples are produced under the direct control
@ -674,60 +644,47 @@ class PipelineManager(object):
"""
self.pipelines = []
if 'sources' in cfg or 'sinks' in cfg:
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_('detected decoupled pipeline config format'))
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_('detected decoupled pipeline config format'))
unique_names = set()
sources = []
for s in cfg.get('sources', []):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated source names: %s" %
name, self)
else:
unique_names.add(name)
sources.append(p_type['source'](s))
unique_names.clear()
unique_names = set()
sources = []
for s in cfg.get('sources', []):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated source names: %s" %
name, self)
else:
unique_names.add(name)
sources.append(p_type['source'](s))
unique_names.clear()
sinks = {}
for s in cfg.get('sinks', []):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated sink names: %s" %
name, self)
else:
unique_names.add(name)
sinks[s['name']] = p_type['sink'](s, transformer_manager)
unique_names.clear()
sinks = {}
for s in cfg.get('sinks', []):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated sink names: %s" %
name, self)
else:
unique_names.add(name)
sinks[s['name']] = p_type['sink'](s, transformer_manager)
unique_names.clear()
for source in sources:
source.check_sinks(sinks)
for target in source.sinks:
pipe = p_type['pipeline'](source, sinks[target])
if pipe.name in unique_names:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"
" names are unique. (name is the source and sink"
" names combined)" % pipe.name, cfg)
else:
unique_names.add(pipe.name)
self.pipelines.append(pipe)
unique_names.clear()
else:
LOG.warning(_('detected deprecated pipeline config format'))
for pipedef in cfg:
source = p_type['source'](pipedef)
sink = p_type['sink'](pipedef, transformer_manager)
pipe = p_type['pipeline'](source, sink)
if pipe.name in [p.name for p in self.pipelines]:
for source in sources:
source.check_sinks(sinks)
for target in source.sinks:
pipe = p_type['pipeline'](source, sinks[target])
if pipe.name in unique_names:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"
" names are unique" % pipe.name, cfg)
" names are unique. (name is the source and sink"
" names combined)" % pipe.name, cfg)
else:
unique_names.add(pipe.name)
self.pipelines.append(pipe)
unique_names.clear()
def publisher(self, context):
"""Build a new Publisher for these manager pipelines.

View File

@ -232,14 +232,18 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
p_coord = self.mgr.partition_coordinator
p_coord.extract_my_subset.side_effect = fake_subset
self.mgr.tg = mock.MagicMock()
self.pipeline_cfg = [{
'name': "test_pipeline",
'interval': 60,
'counters': ['test'],
'resources': ['test://'] if self.source_resources else [],
'transformers': [],
'publishers': ["test"],
}, ]
self.pipeline_cfg = {
'sources': [{
'name': 'test_pipeline',
'interval': 60,
'meters': ['test'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
}
self.setup_pipeline()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.set_override(
@ -294,7 +298,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.mgr.join_partitioning_groups()
p_coord = self.mgr.partition_coordinator
static_group_ids = [utils.hash_of_set(p['resources'])
for p in self.pipeline_cfg
for p in self.pipeline_cfg['sources']
if p['resources']]
expected = [mock.call(self.mgr.construct_group_id(g))
for g in ['another_group', 'global'] + static_group_ids]
@ -308,7 +312,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertTrue(60 in polling_tasks.keys())
per_task_resources = polling_tasks[60].resources
self.assertEqual(1, len(per_task_resources))
self.assertEqual(set(self.pipeline_cfg[0]['resources']),
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
set(per_task_resources['test_pipeline-test'].get({})))
task = list(polling_tasks.values())[0]
self.mgr.interval_task(task)
@ -317,13 +321,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(self.Pollster.test_data, pub.samples[0])
def test_setup_polling_tasks_multiple_interval(self):
self.pipeline_cfg.append({
'name': "test_pipeline_1",
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
'interval': 10,
'counters': ['test'],
'meters': ['test'],
'resources': ['test://'] if self.source_resources else [],
'transformers': [],
'publishers': ["test"],
'sinks': ['test_sink']
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
@ -332,27 +335,24 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertTrue(10 in polling_tasks.keys())
def test_setup_polling_tasks_mismatch_counter(self):
self.pipeline_cfg.append(
{
'name': "test_pipeline_1",
'interval': 10,
'counters': ['test_invalid'],
'resources': ['invalid://'],
'transformers': [],
'publishers': ["test"],
})
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
'interval': 10,
'meters': ['test_invalid'],
'resources': ['invalid://'],
'sinks': ['test_sink']
})
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
def test_setup_polling_task_same_interval(self):
self.pipeline_cfg.append({
'name': "test_pipeline_1",
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
'interval': 60,
'counters': ['testanother'],
'meters': ['testanother'],
'resources': ['testanother://'] if self.source_resources else [],
'transformers': [],
'publishers': ["test"],
'sinks': ['test_sink']
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
@ -362,31 +362,30 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
key = 'test_pipeline-test'
self.assertEqual(set(self.pipeline_cfg[0]['resources']),
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
set(per_task_resources[key].get({})))
key = 'test_pipeline_1-testanother'
self.assertEqual(set(self.pipeline_cfg[1]['resources']),
self.assertEqual(set(self.pipeline_cfg['sources'][1]['resources']),
set(per_task_resources[key].get({})))
def test_interval_exception_isolation(self):
self.pipeline_cfg = [
{
'name': "test_pipeline_1",
self.pipeline_cfg = {
'sources': [{
'name': 'test_pipeline_1',
'interval': 10,
'counters': ['testexceptionanother'],
'meters': ['testexceptionanother'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']},
{'name': 'test_pipeline_2',
'interval': 10,
'meters': ['testexception'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"],
},
{
'name': "test_pipeline_2",
'interval': 10,
'counters': ['testexception'],
'resources': ['test://'] if self.source_resources else [],
'transformers': [],
'publishers': ["test"],
},
]
'publishers': ["test"]}]
}
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
@ -407,12 +406,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertTrue(mgr.tg.add_timer.called)
def test_manager_exception_persistency(self):
self.pipeline_cfg.append({
'name': "test_pipeline_1",
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
'interval': 60,
'counters': ['testanother'],
'transformers': [],
'publishers': ["test"],
'meters': ['testanother'],
'sinks': ['test_sink']
})
self.setup_pipeline()
@ -431,10 +429,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
if static_resources:
# just so we can test that static + pre_pipeline amalgamated
# override per_pollster
self.pipeline_cfg[0]['discovery'] = ['testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg[0]['resources'] = static_resources
self.pipeline_cfg['sources'][0]['discovery'] = [
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@ -475,8 +474,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.PollsterAnother.discovery = 'testdiscovery'
self.mgr.discovery_manager = self.create_discovery_manager()
self.Discovery.resources = discovered_resources
self.pipeline_cfg[0]['counters'].append('testanother')
self.pipeline_cfg[0]['resources'] = []
self.pipeline_cfg['sources'][0]['meters'].append('testanother')
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@ -491,11 +490,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
self.pipeline_cfg[0]['discovery'] = ['testdiscovery',
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg[0]['resources'] = static_resources
self.pipeline_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@ -528,14 +526,18 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
# assert that the individual lists of static and discovered resources
# for each pipeline with a common interval are passed to individual
# pollsters matching each pipeline
self.pipeline_cfg[0]['resources'] = ['test://']
self.pipeline_cfg[0]['discovery'] = ['testdiscovery']
self.pipeline_cfg.append({
'name': "another_pipeline",
self.pipeline_cfg['sources'][0]['resources'] = ['test://']
self.pipeline_cfg['sources'][0]['discovery'] = ['testdiscovery']
self.pipeline_cfg['sources'].append({
'name': 'another_pipeline',
'interval': 60,
'counters': ['test'],
'meters': ['test'],
'resources': ['another://'],
'discovery': ['testdiscoveryanother'],
'sinks': ['test_sink_new']
})
self.pipeline_cfg['sinks'].append({
'name': "test_sink_new",
'transformers': [],
'publishers': ["new"],
})
@ -561,8 +563,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.fail('unexpected sample resources %s' % samples)
all_resources = set(test_resources)
all_resources.update(another_resources)
expected_pipelines = {'test://': 'test_pipeline',
'another://': 'another_pipeline'}
expected_pipelines = {'test://': 'test_pipeline:test_sink',
'another://': 'another_pipeline:test_sink_new'}
sunk_resources = []
for pipe_line in self.mgr.pipeline_manager.pipelines:
self.assertEqual(1, len(pipe_line.publishers[0].samples))
@ -582,12 +584,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
sources = [{'name': 'test_source_1',
'interval': 60,
'counters': ['test'],
'meters': ['test'],
'discovery': ['testdiscovery'],
'sinks': ['test_sink_1']},
{'name': 'test_source_2',
'interval': 60,
'counters': ['testanother'],
'meters': ['testanother'],
'discovery': ['testdiscoveryanother'],
'sinks': ['test_sink_2']}]
sinks = [{'name': 'test_sink_1',
@ -614,7 +616,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.Discovery.resources = ['discovered_1', 'discovered_2']
sources = [{'name': 'test_source_1',
'interval': 60,
'counters': ['test'],
'meters': ['test'],
'discovery': ['testdiscovery'],
'sinks': ['test_sink_1', 'test_sink_2']}]
sinks = [{'name': 'test_sink_1',
@ -637,11 +639,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def test_discovery_partitioning(self):
self.mgr.discovery_manager = self.create_discovery_manager()
p_coord = self.mgr.partition_coordinator
self.pipeline_cfg[0]['discovery'] = ['testdiscovery',
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg[0]['resources'] = []
self.pipeline_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@ -658,24 +659,22 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
p_coord = self.mgr.partition_coordinator
static_resources = ['static_1', 'static_2']
static_resources2 = ['static_3', 'static_4']
self.pipeline_cfg[0]['resources'] = static_resources
self.pipeline_cfg.append({
'name': "test_pipeline2",
'interval': 60,
'counters': ['test', 'test2'],
'resources': static_resources2,
'transformers': [],
'publishers': ["test"],
})
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline2',
'interval': 60,
'meters': ['test', 'test2'],
'resources': static_resources2,
'sinks': ['test_sink']
})
# have one pipeline without static resources defined
self.pipeline_cfg.append({
'name': "test_pipeline3",
'interval': 60,
'counters': ['test', 'test2'],
'resources': [],
'transformers': [],
'publishers': ["test"],
})
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline3',
'interval': 60,
'meters': ['test', 'test2'],
'resources': [],
'sinks': ['test_sink']
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@ -692,8 +691,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
def test_arithmetic_transformer(self):
self.pipeline_cfg[0]['counters'] = ['test', 'testanother']
self.pipeline_cfg[0]['transformers'] = [
self.pipeline_cfg['sources'][0]['meters'] = ['test', 'testanother']
self.pipeline_cfg['sinks'][0]['transformers'] = [
{'name': 'arithmetic',
'parameters': {
'target': {'name': 'test_sum',
@ -714,7 +713,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
@mock.patch('ceilometer.tests.agent.agentbase.TestPollster.get_samples')
def test_skip_polling_and_publish_with_no_resources(
self, get_samples, LOG):
self.pipeline_cfg[0]['resources'] = []
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_pipeline()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches['test_pipeline'])[0]

View File

@ -213,16 +213,18 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.useFixture(mockpatch.Patch(
'keystoneclient.v2_0.client.Client',
side_effect=Exception))
self.pipeline_cfg = [
{
self.pipeline_cfg = {
'sources': [{
'name': "test_keystone",
'interval': 10,
'counters': ['testkeystone'],
'meters': ['testkeystone'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"],
},
]
'publishers': ["test"]}]
}
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
@ -239,16 +241,18 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
@mock.patch('ceilometer.agent.base.LOG')
def test_polling_exception(self, LOG):
source_name = 'test_pollingexception'
self.pipeline_cfg = [
{
self.pipeline_cfg = {
'sources': [{
'name': source_name,
'interval': 10,
'counters': ['testpollingexception'],
'meters': ['testpollingexception'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"],
},
]
'publishers': ["test"]}]
}
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)

View File

@ -750,16 +750,12 @@ class BasePipelineTestCase(base.BaseTestCase):
getattr(publisher.samples[0], 'name'))
def test_variable_counter(self):
self.pipeline_cfg = [{
'name': "test_pipeline",
'interval': 5,
'counters': ['a:*'],
'transformers': [
{'name': "update",
'parameters': {}}
],
'publishers': ["test://"],
}, ]
transformer_cfg = [{
'name': "update",
'parameters': {}
}]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['a:*'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)

View File

@ -1,135 +0,0 @@
#
# Copyright 2014 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaml
from ceilometer import pipeline
from ceilometer.tests import pipeline_base
class TestDeprecatedPipeline(pipeline_base.BasePipelineTestCase):
def _setup_pipeline_cfg(self):
self.pipeline_cfg = [{
'name': 'test_pipeline',
'interval': 5,
'counters': ['a'],
'transformers': [
{'name': 'update',
'parameters': {}}
],
'publishers': ['test://'],
}, ]
def _augment_pipeline_cfg(self):
self.pipeline_cfg.append({
'name': 'second_pipeline',
'interval': 5,
'counters': ['b'],
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['new'],
})
def _break_pipeline_cfg(self):
self.pipeline_cfg.append({
'name': 'second_pipeline',
'interval': 5,
'counters': ['b'],
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['except'],
})
def _dup_pipeline_name_cfg(self):
self.pipeline_cfg.append({
'name': 'test_pipeline',
'interval': 5,
'counters': ['b'],
'transformers': [],
'publishers': ['except'],
})
def _set_pipeline_cfg(self, field, value):
self.pipeline_cfg[0][field] = value
def _extend_pipeline_cfg(self, field, value):
self.pipeline_cfg[0][field].extend(value)
def _unset_pipeline_cfg(self, field):
del self.pipeline_cfg[0][field]
def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index,
meters, units):
with open('etc/ceilometer/deprecated_pipeline.yaml') as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
for p in pipeline_cfg:
p['publishers'] = ['test://']
pipeline_manager = pipeline.PipelineManager(pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[index]
self._do_test_rate_of_change_mapping(pipe, meters, units)
def test_rate_of_change_boilerplate_disk_read_cfg(self):
meters = ('disk.read.bytes', 'disk.read.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_disk_write_cfg(self):
meters = ('disk.write.bytes', 'disk.write.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self):
meters = ('disk.device.read.bytes', 'disk.device.read.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self):
meters = ('disk.device.write.bytes', 'disk.device.write.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_network_incoming_cfg(self):
meters = ('network.incoming.bytes', 'network.incoming.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)
def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
meters = ('network.outgoing.bytes', 'network.outgoing.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)

View File

@ -176,13 +176,19 @@ class BaseRealNotification(tests_base.BaseTestCase):
self.CONF = self.useFixture(fixture_config.Config()).conf
self.setup_messaging(self.CONF, 'nova')
pipeline = yaml.dump([{
'name': 'test_pipeline',
'interval': 5,
'counters': ['instance', 'memory'],
'transformers': [],
'publishers': ['test://'],
}])
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 5,
'meters': ['instance', 'memory'],
'sinks': ['test_sink']
}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ['test://']
}]
})
if six.PY3:
pipeline = pipeline.encode('utf-8')
self.expected_samples = 2

View File

@ -1,73 +0,0 @@
---
-
name: meter_pipeline
interval: 600
meters:
- "*"
resources:
transformers:
publishers:
- rpc://
-
name: cpu_pipeline
interval: 600
meters:
- "cpu"
transformers:
- name: "rate_of_change"
parameters:
target:
name: "cpu_util"
unit: "%"
type: "gauge"
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
publishers:
- rpc://
-
name: disk_pipeline
interval: 600
meters:
- "disk.read.bytes"
- "disk.read.requests"
- "disk.write.bytes"
- "disk.write.requests"
- "disk.device.read.bytes"
- "disk.device.read.requests"
- "disk.device.write.bytes"
- "disk.device.write.requests"
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)"
unit: "(B|request)"
target:
map_to:
name: "\\1.\\2.\\3.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- rpc://
-
name: network_pipeline
interval: 600
meters:
- "network.incoming.bytes"
- "network.incoming.packets"
- "network.outgoing.bytes"
- "network.outgoing.packets"
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
unit: "(B|packet)"
target:
map_to:
name: "network.\\1.\\2.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- rpc://