Merge "add delta transfomer support"
This commit is contained in:
commit
bf3e38085d
@ -61,6 +61,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
'unit_conversion': conversions.ScalingTransformer,
|
'unit_conversion': conversions.ScalingTransformer,
|
||||||
'rate_of_change': conversions.RateOfChangeTransformer,
|
'rate_of_change': conversions.RateOfChangeTransformer,
|
||||||
'arithmetic': arithmetic.ArithmeticTransformer,
|
'arithmetic': arithmetic.ArithmeticTransformer,
|
||||||
|
'delta': conversions.DeltaTransformer,
|
||||||
}
|
}
|
||||||
|
|
||||||
if name in class_name_ext:
|
if name in class_name_ext:
|
||||||
@ -1897,6 +1898,164 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
|
|
||||||
|
def _do_test_delta(self, data, expected, growth_only=False):
|
||||||
|
transformer_cfg = [
|
||||||
|
{
|
||||||
|
'name': 'delta',
|
||||||
|
'parameters': {
|
||||||
|
'target': {'name': 'new_meter'},
|
||||||
|
'growth_only': growth_only,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
]
|
||||||
|
self._set_pipeline_cfg('transformers', transformer_cfg)
|
||||||
|
self._set_pipeline_cfg('counters', ['cpu'])
|
||||||
|
|
||||||
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
|
self.transformer_manager)
|
||||||
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
|
pipe.publish_data(None, data)
|
||||||
|
pipe.flush(None)
|
||||||
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
|
self.assertEqual(expected, len(publisher.samples))
|
||||||
|
return publisher.samples
|
||||||
|
|
||||||
|
def test_delta_transformer(self):
|
||||||
|
samples = [
|
||||||
|
sample.Sample(
|
||||||
|
name='cpu',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
volume=26,
|
||||||
|
unit='ns',
|
||||||
|
user_id='test_user',
|
||||||
|
project_id='test_proj',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=timeutils.utcnow().isoformat(),
|
||||||
|
resource_metadata={'version': '1.0'}
|
||||||
|
),
|
||||||
|
sample.Sample(
|
||||||
|
name='cpu',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
volume=16,
|
||||||
|
unit='ns',
|
||||||
|
user_id='test_user',
|
||||||
|
project_id='test_proj',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=timeutils.utcnow().isoformat(),
|
||||||
|
resource_metadata={'version': '2.0'}
|
||||||
|
),
|
||||||
|
sample.Sample(
|
||||||
|
name='cpu',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
volume=53,
|
||||||
|
unit='ns',
|
||||||
|
user_id='test_user_bis',
|
||||||
|
project_id='test_proj_bis',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=timeutils.utcnow().isoformat(),
|
||||||
|
resource_metadata={'version': '1.0'}
|
||||||
|
),
|
||||||
|
]
|
||||||
|
deltas = self._do_test_delta(samples, 2)
|
||||||
|
self.assertEqual('new_meter', deltas[0].name)
|
||||||
|
self.assertEqual('delta', deltas[0].type)
|
||||||
|
self.assertEqual('ns', deltas[0].unit)
|
||||||
|
self.assertEqual({'version': '2.0'}, deltas[0].resource_metadata)
|
||||||
|
self.assertEqual(-10, deltas[0].volume)
|
||||||
|
self.assertEqual('new_meter', deltas[1].name)
|
||||||
|
self.assertEqual('delta', deltas[1].type)
|
||||||
|
self.assertEqual('ns', deltas[1].unit)
|
||||||
|
self.assertEqual({'version': '1.0'}, deltas[1].resource_metadata)
|
||||||
|
self.assertEqual(37, deltas[1].volume)
|
||||||
|
|
||||||
|
def test_delta_transformer_out_of_order(self):
|
||||||
|
samples = [
|
||||||
|
sample.Sample(
|
||||||
|
name='cpu',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
volume=26,
|
||||||
|
unit='ns',
|
||||||
|
user_id='test_user',
|
||||||
|
project_id='test_proj',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=timeutils.utcnow().isoformat(),
|
||||||
|
resource_metadata={'version': '1.0'}
|
||||||
|
),
|
||||||
|
sample.Sample(
|
||||||
|
name='cpu',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
volume=16,
|
||||||
|
unit='ns',
|
||||||
|
user_id='test_user',
|
||||||
|
project_id='test_proj',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=((timeutils.utcnow() - datetime.timedelta(minutes=5))
|
||||||
|
.isoformat()),
|
||||||
|
resource_metadata={'version': '2.0'}
|
||||||
|
),
|
||||||
|
sample.Sample(
|
||||||
|
name='cpu',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
volume=53,
|
||||||
|
unit='ns',
|
||||||
|
user_id='test_user_bis',
|
||||||
|
project_id='test_proj_bis',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=timeutils.utcnow().isoformat(),
|
||||||
|
resource_metadata={'version': '1.0'}
|
||||||
|
),
|
||||||
|
]
|
||||||
|
deltas = self._do_test_delta(samples, 1)
|
||||||
|
self.assertEqual('new_meter', deltas[0].name)
|
||||||
|
self.assertEqual('delta', deltas[0].type)
|
||||||
|
self.assertEqual('ns', deltas[0].unit)
|
||||||
|
self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata)
|
||||||
|
self.assertEqual(27, deltas[0].volume)
|
||||||
|
|
||||||
|
def test_delta_transformer_growth_only(self):
|
||||||
|
samples = [
|
||||||
|
sample.Sample(
|
||||||
|
name='cpu',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
volume=26,
|
||||||
|
unit='ns',
|
||||||
|
user_id='test_user',
|
||||||
|
project_id='test_proj',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=timeutils.utcnow().isoformat(),
|
||||||
|
resource_metadata={'version': '1.0'}
|
||||||
|
),
|
||||||
|
sample.Sample(
|
||||||
|
name='cpu',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
volume=16,
|
||||||
|
unit='ns',
|
||||||
|
user_id='test_user',
|
||||||
|
project_id='test_proj',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=timeutils.utcnow().isoformat(),
|
||||||
|
resource_metadata={'version': '2.0'}
|
||||||
|
),
|
||||||
|
sample.Sample(
|
||||||
|
name='cpu',
|
||||||
|
type=sample.TYPE_CUMULATIVE,
|
||||||
|
volume=53,
|
||||||
|
unit='ns',
|
||||||
|
user_id='test_user_bis',
|
||||||
|
project_id='test_proj_bis',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=timeutils.utcnow().isoformat(),
|
||||||
|
resource_metadata={'version': '1.0'}
|
||||||
|
),
|
||||||
|
]
|
||||||
|
deltas = self._do_test_delta(samples, 1, True)
|
||||||
|
self.assertEqual('new_meter', deltas[0].name)
|
||||||
|
self.assertEqual('delta', deltas[0].type)
|
||||||
|
self.assertEqual('ns', deltas[0].unit)
|
||||||
|
self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata)
|
||||||
|
self.assertEqual(37, deltas[0].volume)
|
||||||
|
|
||||||
def test_unique_pipeline_names(self):
|
def test_unique_pipeline_names(self):
|
||||||
self._dup_pipeline_name_cfg()
|
self._dup_pipeline_name_cfg()
|
||||||
self._exception_create_pipelinemanager()
|
self._exception_create_pipelinemanager()
|
||||||
|
@ -229,42 +229,42 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
|||||||
def test_rate_of_change_boilerplate_disk_read_cfg(self):
|
def test_rate_of_change_boilerplate_disk_read_cfg(self):
|
||||||
meters = ('disk.read.bytes', 'disk.read.requests')
|
meters = ('disk.read.bytes', 'disk.read.requests')
|
||||||
units = ('B', 'request')
|
units = ('B', 'request')
|
||||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
|
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||||
meters,
|
meters,
|
||||||
units)
|
units)
|
||||||
|
|
||||||
def test_rate_of_change_boilerplate_disk_write_cfg(self):
|
def test_rate_of_change_boilerplate_disk_write_cfg(self):
|
||||||
meters = ('disk.write.bytes', 'disk.write.requests')
|
meters = ('disk.write.bytes', 'disk.write.requests')
|
||||||
units = ('B', 'request')
|
units = ('B', 'request')
|
||||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
|
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||||
meters,
|
meters,
|
||||||
units)
|
units)
|
||||||
|
|
||||||
def test_rate_of_change_boilerplate_network_incoming_cfg(self):
|
def test_rate_of_change_boilerplate_network_incoming_cfg(self):
|
||||||
meters = ('network.incoming.bytes', 'network.incoming.packets')
|
meters = ('network.incoming.bytes', 'network.incoming.packets')
|
||||||
units = ('B', 'packet')
|
units = ('B', 'packet')
|
||||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
|
||||||
meters,
|
meters,
|
||||||
units)
|
units)
|
||||||
|
|
||||||
def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self):
|
def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self):
|
||||||
meters = ('disk.device.read.bytes', 'disk.device.read.requests')
|
meters = ('disk.device.read.bytes', 'disk.device.read.requests')
|
||||||
units = ('B', 'request')
|
units = ('B', 'request')
|
||||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
|
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||||
meters,
|
meters,
|
||||||
units)
|
units)
|
||||||
|
|
||||||
def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self):
|
def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self):
|
||||||
meters = ('disk.device.write.bytes', 'disk.device.write.requests')
|
meters = ('disk.device.write.bytes', 'disk.device.write.requests')
|
||||||
units = ('B', 'request')
|
units = ('B', 'request')
|
||||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
|
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||||
meters,
|
meters,
|
||||||
units)
|
units)
|
||||||
|
|
||||||
def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
|
def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
|
||||||
meters = ('network.outgoing.bytes', 'network.outgoing.packets')
|
meters = ('network.outgoing.bytes', 'network.outgoing.packets')
|
||||||
units = ('B', 'packet')
|
units = ('B', 'packet')
|
||||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
|
||||||
meters,
|
meters,
|
||||||
units)
|
units)
|
||||||
|
|
||||||
|
@ -20,15 +20,15 @@ from oslo_log import log
|
|||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from ceilometer.i18n import _
|
from ceilometer.i18n import _, _LW
|
||||||
from ceilometer import sample
|
from ceilometer import sample
|
||||||
from ceilometer import transformer
|
from ceilometer import transformer
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ScalingTransformer(transformer.TransformerBase):
|
class BaseConversionTransformer(transformer.TransformerBase):
|
||||||
"""Transformer to apply a scaling conversion."""
|
"""Transformer to derive conversion."""
|
||||||
|
|
||||||
grouping_keys = ['resource_id']
|
grouping_keys = ['resource_id']
|
||||||
|
|
||||||
@ -44,22 +44,7 @@ class ScalingTransformer(transformer.TransformerBase):
|
|||||||
target = target or {}
|
target = target or {}
|
||||||
self.source = source
|
self.source = source
|
||||||
self.target = target
|
self.target = target
|
||||||
self.scale = target.get('scale')
|
super(BaseConversionTransformer, self).__init__(**kwargs)
|
||||||
LOG.debug('scaling conversion transformer with source:'
|
|
||||||
' %(source)s target: %(target)s:', {'source': source,
|
|
||||||
'target': target})
|
|
||||||
super(ScalingTransformer, self).__init__(**kwargs)
|
|
||||||
|
|
||||||
def _scale(self, s):
|
|
||||||
"""Apply the scaling factor.
|
|
||||||
|
|
||||||
Either a straight multiplicative factor or else a string to be eval'd.
|
|
||||||
"""
|
|
||||||
ns = transformer.Namespace(s.as_dict())
|
|
||||||
|
|
||||||
scale = self.scale
|
|
||||||
return ((eval(scale, {}, ns) if isinstance(scale, six.string_types)
|
|
||||||
else s.volume * scale) if scale else s.volume)
|
|
||||||
|
|
||||||
def _map(self, s, attr):
|
def _map(self, s, attr):
|
||||||
"""Apply the name or unit mapping if configured."""
|
"""Apply the name or unit mapping if configured."""
|
||||||
@ -74,6 +59,93 @@ class ScalingTransformer(transformer.TransformerBase):
|
|||||||
pass
|
pass
|
||||||
return mapped or self.target.get(attr, getattr(s, attr))
|
return mapped or self.target.get(attr, getattr(s, attr))
|
||||||
|
|
||||||
|
|
||||||
|
class DeltaTransformer(BaseConversionTransformer):
|
||||||
|
"""Transformer based on the delta of a sample volume."""
|
||||||
|
|
||||||
|
def __init__(self, target=None, growth_only=False, **kwargs):
|
||||||
|
"""Initialize transformer with configured parameters.
|
||||||
|
|
||||||
|
:param growth_only: capture only positive deltas
|
||||||
|
"""
|
||||||
|
super(DeltaTransformer, self).__init__(target=target, **kwargs)
|
||||||
|
self.growth_only = growth_only
|
||||||
|
self.cache = {}
|
||||||
|
|
||||||
|
def handle_sample(self, context, s):
|
||||||
|
"""Handle a sample, converting if necessary."""
|
||||||
|
key = s.name + s.resource_id
|
||||||
|
prev = self.cache.get(key)
|
||||||
|
timestamp = timeutils.parse_isotime(s.timestamp)
|
||||||
|
self.cache[key] = (s.volume, timestamp)
|
||||||
|
|
||||||
|
if prev:
|
||||||
|
prev_volume = prev[0]
|
||||||
|
prev_timestamp = prev[1]
|
||||||
|
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
|
||||||
|
# disallow violations of the arrow of time
|
||||||
|
if time_delta < 0:
|
||||||
|
LOG.warn(_LW('Dropping out of time order sample: %s'), (s,))
|
||||||
|
# Reset the cache to the newer sample.
|
||||||
|
self.cache[key] = prev
|
||||||
|
return None
|
||||||
|
volume_delta = s.volume - prev_volume
|
||||||
|
if self.growth_only and volume_delta < 0:
|
||||||
|
LOG.warn(_LW('Negative delta detected, dropping value'))
|
||||||
|
s = None
|
||||||
|
else:
|
||||||
|
s = self._convert(s, volume_delta)
|
||||||
|
LOG.debug('Converted to: %s', s)
|
||||||
|
else:
|
||||||
|
LOG.warn(_LW('Dropping sample with no predecessor: %s'),
|
||||||
|
(s,))
|
||||||
|
s = None
|
||||||
|
return s
|
||||||
|
|
||||||
|
def _convert(self, s, delta):
|
||||||
|
"""Transform the appropriate sample fields."""
|
||||||
|
return sample.Sample(
|
||||||
|
name=self._map(s, 'name'),
|
||||||
|
unit=s.unit,
|
||||||
|
type=sample.TYPE_DELTA,
|
||||||
|
volume=delta,
|
||||||
|
user_id=s.user_id,
|
||||||
|
project_id=s.project_id,
|
||||||
|
resource_id=s.resource_id,
|
||||||
|
timestamp=s.timestamp,
|
||||||
|
resource_metadata=s.resource_metadata
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ScalingTransformer(BaseConversionTransformer):
|
||||||
|
"""Transformer to apply a scaling conversion."""
|
||||||
|
|
||||||
|
def __init__(self, source=None, target=None, **kwargs):
|
||||||
|
"""Initialize transformer with configured parameters.
|
||||||
|
|
||||||
|
:param source: dict containing source sample unit
|
||||||
|
:param target: dict containing target sample name, type,
|
||||||
|
unit and scaling factor (a missing value
|
||||||
|
connotes no change)
|
||||||
|
"""
|
||||||
|
super(ScalingTransformer, self).__init__(source=source, target=target,
|
||||||
|
**kwargs)
|
||||||
|
self.scale = self.target.get('scale')
|
||||||
|
LOG.debug('scaling conversion transformer with source:'
|
||||||
|
' %(source)s target: %(target)s:', {'source': self.source,
|
||||||
|
'target': self.target})
|
||||||
|
|
||||||
|
def _scale(self, s):
|
||||||
|
"""Apply the scaling factor.
|
||||||
|
|
||||||
|
Either a straight multiplicative factor or else a string to be eval'd.
|
||||||
|
"""
|
||||||
|
ns = transformer.Namespace(s.as_dict())
|
||||||
|
|
||||||
|
scale = self.scale
|
||||||
|
return ((eval(scale, {}, ns) if isinstance(scale, six.string_types)
|
||||||
|
else s.volume * scale) if scale else s.volume)
|
||||||
|
|
||||||
def _convert(self, s, growth=1):
|
def _convert(self, s, growth=1):
|
||||||
"""Transform the appropriate sample fields."""
|
"""Transform the appropriate sample fields."""
|
||||||
return sample.Sample(
|
return sample.Sample(
|
||||||
|
@ -12,6 +12,7 @@ sources:
|
|||||||
- "cpu"
|
- "cpu"
|
||||||
sinks:
|
sinks:
|
||||||
- cpu_sink
|
- cpu_sink
|
||||||
|
- cpu_delta_sink
|
||||||
- name: disk_source
|
- name: disk_source
|
||||||
interval: 600
|
interval: 600
|
||||||
meters:
|
meters:
|
||||||
@ -50,6 +51,15 @@ sinks:
|
|||||||
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
|
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
|
||||||
publishers:
|
publishers:
|
||||||
- notifier://
|
- notifier://
|
||||||
|
- name: cpu_delta_sink
|
||||||
|
transformers:
|
||||||
|
- name: "delta"
|
||||||
|
parameters:
|
||||||
|
target:
|
||||||
|
name: "cpu.delta"
|
||||||
|
growth_only: True
|
||||||
|
publishers:
|
||||||
|
- notifier://
|
||||||
- name: disk_sink
|
- name: disk_sink
|
||||||
transformers:
|
transformers:
|
||||||
- name: "rate_of_change"
|
- name: "rate_of_change"
|
||||||
|
@ -220,6 +220,7 @@ ceilometer.hardware.inspectors =
|
|||||||
|
|
||||||
ceilometer.transformer =
|
ceilometer.transformer =
|
||||||
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
|
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
|
||||||
|
delta = ceilometer.transformer.conversions:DeltaTransformer
|
||||||
unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
|
unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
|
||||||
rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
|
rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
|
||||||
aggregator = ceilometer.transformer.conversions:AggregatorTransformer
|
aggregator = ceilometer.transformer.conversions:AggregatorTransformer
|
||||||
|
Loading…
Reference in New Issue
Block a user