Transformer to measure rate of change

Addresses BP transformer-unit

Provide a new transformer to emit derived counters that represent
the rate of change of existing counters, by retaining the previous
volume and timestamp.

This transformer will supercede the direct emission of derived counters
by pollsters (e.g. cpu_util calculated by the compute CPUPollster).

The target counters are identified either by name.

The scaling can be expressed as either a straight multiplicative
factor or as a string expression to be eval'd.

Configured as per usual via the pipeline.yaml, for example:

    counters:
        - "cpu"
    transformers:
        - name: "rate_of_change"
          parameters:
              target:
                  name: "cpu_util"
                  unit: "%"
                  type: "gauge"
                  scale: "100.0 * (10**9 / resource_metadata.get('cpu_number', 1))"
              replace: False

Change-Id: I0affa8d13a4fd72db08f818db809023d2f74217a
This commit is contained in:
Eoghan Glynn
2013-07-11 15:28:06 +01:00
parent a26b7cfbe6
commit 5ace235107
3 changed files with 191 additions and 2 deletions

View File

@@ -20,6 +20,7 @@ import copy
from ceilometer import counter as ceilocounter
from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils
from ceilometer import transformer
LOG = log.getLogger(__name__)
@@ -57,7 +58,7 @@ class ScalingTransformer(transformer.TransformerBase):
return ((eval(scale, {}, ns) if isinstance(scale, basestring)
else counter.volume * scale) if scale else counter.volume)
def _convert(self, counter):
def _convert(self, counter, growth=1):
"""Transform the appropriate counter fields.
"""
scale = self.target.get('scale')
@@ -65,7 +66,7 @@ class ScalingTransformer(transformer.TransformerBase):
name=self.target.get('name', counter.name),
unit=self.target.get('unit', counter.unit),
type=self.target.get('type', counter.type),
volume=self._scale(counter, scale),
volume=self._scale(counter, scale) * growth,
user_id=counter.user_id,
project_id=counter.project_id,
resource_id=counter.resource_id,
@@ -102,3 +103,47 @@ class ScalingTransformer(transformer.TransformerBase):
counters.append(self.preserved)
self.preserved = None
return counters
class RateOfChangeTransformer(ScalingTransformer):
"""Transformer based on the rate of change of a counter volume,
for example taking the current and previous volumes of a
cumulative counter and producing a gauge value based on the
proportion of some maximum used.
"""
def __init__(self, **kwargs):
"""Initialize transformer with configured parameters.
"""
self.cache = {}
super(RateOfChangeTransformer, self).__init__(**kwargs)
def handle_sample(self, context, counter, source):
"""Handle a sample, converting if necessary."""
LOG.debug('handling counter %s', (counter,))
key = counter.name + counter.resource_id
prev = self.cache.get(key)
timestamp = timeutils.parse_isotime(counter.timestamp)
self.cache[key] = (counter.volume, timestamp)
if prev:
prev_volume = prev[0]
prev_timestamp = prev[1]
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
# we only allow negative deltas for noncumulative counters, whereas
# for cumulative we assume that a reset has occurred in the interim
# so that the current volume gives a lower bound on growth
volume_delta = (counter.volume - prev_volume
if (prev_volume <= counter.volume or
counter.type != ceilocounter.TYPE_CUMULATIVE)
else counter.volume)
rate_of_change = ((1.0 * volume_delta / time_delta)
if time_delta else 0.0)
transformed = self._convert(counter, rate_of_change)
LOG.debug(_('converted to: %s') % (transformed,))
counter = self._keep(counter, transformed)
elif self.replace:
LOG.warn(_('dropping counter with no predecessor: %s') % counter)
counter = None
return counter

View File

@@ -76,6 +76,7 @@ ceilometer.compute.virt =
ceilometer.transformer =
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
ceilometer.publisher =
test = ceilometer.publisher.test:TestPublisher

View File

@@ -17,6 +17,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from stevedore import extension
from ceilometer import counter
@@ -47,6 +49,7 @@ class TestPipeline(base.TestCase):
'drop': self.TransformerClassDrop,
'cache': accumulator.TransformerAccumulator,
'unit_conversion': conversions.ScalingTransformer,
'rate_of_change': conversions.RateOfChangeTransformer,
}
if name in class_name_ext:
@@ -742,3 +745,143 @@ class TestPipeline(base.TestCase):
self.assertEquals(getattr(amb_temp, 'name'), 'ambient_temperature')
self.assertEquals(getattr(amb_temp, 'unit'), '°F')
self.assertEquals(getattr(amb_temp, 'volume'), 88.8)
self.assertEquals(getattr(core_temp, 'volume'), 96.8)
def _do_test_rate_of_change_conversion(self, prev, curr, offset,
type, expected):
s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))"
self.pipeline_cfg[0]['transformers'] = [
{
'name': 'rate_of_change',
'parameters': {
'source': {},
'target': {'name': 'cpu_util',
'unit': '%',
'type': counter.TYPE_GAUGE,
'scale': s},
'replace': False
}
},
]
self.pipeline_cfg[0]['counters'] = ['cpu']
now = timeutils.utcnow()
later = now + datetime.timedelta(minutes=offset)
counters = [
counter.Counter(
name='cpu',
type=type,
volume=prev,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=now.isoformat(),
resource_metadata={'cpu_number': 4}
),
counter.Counter(
name='cpu',
type=type,
volume=curr,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=later.isoformat(),
resource_metadata={'cpu_number': 4}
),
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters, None)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 2)
# original counters are passed thru' unmolested
self.assertEquals(publisher.counters[0], counters[0])
self.assertEquals(publisher.counters[1], counters[1])
pipe.flush(None, None)
self.assertEqual(len(publisher.counters), 3)
cpu_util = publisher.counters[-1]
self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util')
self.assertEquals(getattr(cpu_util, 'unit'), '%')
self.assertEquals(getattr(cpu_util, 'type'), counter.TYPE_GAUGE)
self.assertEquals(getattr(cpu_util, 'volume'), expected)
def test_rate_of_change_conversion(self):
self._do_test_rate_of_change_conversion(120000000000,
180000000000,
1,
counter.TYPE_CUMULATIVE,
25.0)
def test_rate_of_change_conversion_negative_cumulative_delta(self):
self._do_test_rate_of_change_conversion(180000000000,
120000000000,
1,
counter.TYPE_CUMULATIVE,
50.0)
def test_rate_of_change_conversion_negative_gauge_delta(self):
self._do_test_rate_of_change_conversion(180000000000,
120000000000,
1,
counter.TYPE_GAUGE,
-25.0)
def test_rate_of_change_conversion_zero_delay(self):
self._do_test_rate_of_change_conversion(120000000000,
120000000000,
0,
counter.TYPE_CUMULATIVE,
0.0)
def _do_test_rate_of_change_no_predecessor(self, replace):
s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))"
self.pipeline_cfg[0]['transformers'] = [
{
'name': 'rate_of_change',
'parameters': {
'source': {},
'target': {'name': 'cpu_util',
'unit': '%',
'type': counter.TYPE_GAUGE,
'scale': s},
'replace': replace
}
},
]
self.pipeline_cfg[0]['counters'] = ['cpu']
now = timeutils.utcnow()
counters = [
counter.Counter(
name='cpu',
type=counter.TYPE_CUMULATIVE,
volume=120000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=now.isoformat(),
resource_metadata={'cpu_number': 4}
),
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters, None)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0 if replace else 1)
pipe.flush(None, None)
self.assertEqual(len(publisher.counters), 0 if replace else 1)
if not replace:
self.assertEquals(publisher.counters[0], counters[0])
def _do_test_rate_of_change_no_predecessor_discard(self):
self._do_test_rate_of_change_no_predecessor(True)
def _do_test_rate_of_change_no_predecessor_preserve(self):
self._do_test_rate_of_change_no_predecessor(False)