pipeline: remove transformer support
Transformers cannot work correctly on multiple nodes without workload partitioning, which has been removed. The transformation of data is no more the responsability of Ceilometer. The data storage used by default (Gnocchi), is able to handle the scenario that Ceilometer covered by default such as computing the rate of a metric. Change-Id: If3683318b998a37c40bc09314dd8ab3eef326ba7 Depends-On: Ifd1d04ce813028f115c19bc983e7dd1e63c6f8a5 Depends-On: I0330c09d72c20d63d08770b52d3071512a418260
This commit is contained in:
parent
cb8aee3945
commit
9db5c6c9bf
@ -91,33 +91,22 @@ class Sink(object):
|
||||
Each sink config is concerned *only* with the transformation rules
|
||||
and publication conduits for data.
|
||||
|
||||
In effect, a sink describes a chain of handlers. The chain starts
|
||||
with zero or more transformers and ends with one or more publishers.
|
||||
|
||||
The first transformer in the chain is passed data from the
|
||||
corresponding source, takes some action such as deriving rate of
|
||||
change, performing unit conversion, or aggregating, before passing
|
||||
the modified data to next step.
|
||||
|
||||
The subsequent transformers, if any, handle the data similarly.
|
||||
In effect, a sink describes a chain of handlers. The chain ends with one or
|
||||
more publishers.
|
||||
|
||||
At the end of the chain, publishers publish the data. The exact
|
||||
publishing method depends on publisher type, for example, pushing
|
||||
into data storage via the message bus providing guaranteed delivery,
|
||||
or for loss-tolerant data UDP may be used.
|
||||
|
||||
If no transformers are included in the chain, the publishers are
|
||||
passed data directly from the sink which are published unchanged.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, cfg, transformer_manager, publisher_manager):
|
||||
def __init__(self, conf, cfg, publisher_manager):
|
||||
self.conf = conf
|
||||
self.cfg = cfg
|
||||
|
||||
try:
|
||||
self.name = cfg['name']
|
||||
# It's legal to have no transformer specified
|
||||
self.transformer_cfg = cfg.get('transformers') or []
|
||||
except KeyError as err:
|
||||
raise PipelineException(
|
||||
"Required field %s not specified" % err.args[0], cfg)
|
||||
@ -138,30 +127,10 @@ class Sink(object):
|
||||
exc_info=True)
|
||||
|
||||
self.multi_publish = True if len(self.publishers) > 1 else False
|
||||
self.transformers = self._setup_transformers(cfg, transformer_manager)
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
def _setup_transformers(self, cfg, transformer_manager):
|
||||
transformers = []
|
||||
for transformer in self.transformer_cfg:
|
||||
parameter = transformer['parameters'] or {}
|
||||
try:
|
||||
ext = transformer_manager[transformer['name']]
|
||||
except KeyError:
|
||||
raise PipelineException(
|
||||
"No transformer named %s loaded" % transformer['name'],
|
||||
cfg)
|
||||
transformers.append(ext.plugin(**parameter))
|
||||
LOG.info(
|
||||
"Pipeline %(pipeline)s: Setup transformer instance %(name)s "
|
||||
"with parameter %(param)s" % ({'pipeline': self,
|
||||
'name': transformer['name'],
|
||||
'param': parameter}))
|
||||
|
||||
return transformers
|
||||
|
||||
@staticmethod
|
||||
def flush():
|
||||
"""Flush data after all events have been injected to pipeline."""
|
||||
@ -220,7 +189,7 @@ class PipelineManager(agent.ConfigManagerBase):
|
||||
|
||||
NOTIFICATION_IPC = 'ceilometer_ipc'
|
||||
|
||||
def __init__(self, conf, cfg_file, transformer_manager):
|
||||
def __init__(self, conf, cfg_file):
|
||||
"""Setup the pipelines according to config.
|
||||
|
||||
The configuration is supported as follows:
|
||||
@ -244,13 +213,6 @@ class PipelineManager(agent.ConfigManagerBase):
|
||||
},
|
||||
],
|
||||
"sinks": [{"name": sink_1,
|
||||
"transformers": [
|
||||
{"name": "Transformer_1",
|
||||
"parameters": {"p1": "value"}},
|
||||
|
||||
{"name": "Transformer_2",
|
||||
"parameters": {"p1": "value"}},
|
||||
],
|
||||
"publishers": ["publisher_1", "publisher_2"]
|
||||
},
|
||||
{"name": sink_2,
|
||||
@ -268,8 +230,6 @@ class PipelineManager(agent.ConfigManagerBase):
|
||||
"excluded meter names", wildcard and "excluded meter names", or
|
||||
only wildcard.
|
||||
|
||||
Transformer's name is plugin name in setup.cfg.
|
||||
|
||||
Publisher's name is plugin name in setup.cfg
|
||||
|
||||
"""
|
||||
@ -303,7 +263,6 @@ class PipelineManager(agent.ConfigManagerBase):
|
||||
else:
|
||||
unique_names.add(name)
|
||||
sinks[s['name']] = self.pm_sink(self.conf, s,
|
||||
transformer_manager,
|
||||
publisher_manager)
|
||||
unique_names.clear()
|
||||
|
||||
|
@ -5,92 +5,7 @@ sources:
|
||||
- "*"
|
||||
sinks:
|
||||
- meter_sink
|
||||
- name: cpu_source
|
||||
meters:
|
||||
- "cpu"
|
||||
sinks:
|
||||
- cpu_sink
|
||||
- cpu_delta_sink
|
||||
- name: disk_source
|
||||
meters:
|
||||
- "disk.read.bytes"
|
||||
- "disk.read.requests"
|
||||
- "disk.write.bytes"
|
||||
- "disk.write.requests"
|
||||
- "disk.device.read.bytes"
|
||||
- "disk.device.read.requests"
|
||||
- "disk.device.write.bytes"
|
||||
- "disk.device.write.requests"
|
||||
sinks:
|
||||
- disk_sink
|
||||
- name: network_source
|
||||
meters:
|
||||
- "network.incoming.bytes"
|
||||
- "network.incoming.packets"
|
||||
- "network.outgoing.bytes"
|
||||
- "network.outgoing.packets"
|
||||
sinks:
|
||||
- network_sink
|
||||
sinks:
|
||||
- name: meter_sink
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
||||
# All these transformers are deprecated, and will be removed in the future, don't use them.
|
||||
- name: cpu_sink
|
||||
transformers:
|
||||
- name: "rate_of_change"
|
||||
parameters:
|
||||
target:
|
||||
name: "cpu_util"
|
||||
unit: "%"
|
||||
type: "gauge"
|
||||
max: 100
|
||||
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
||||
# All these transformers are deprecated, and will be removed in the future, don't use them.
|
||||
- name: cpu_delta_sink
|
||||
transformers:
|
||||
- name: "delta"
|
||||
parameters:
|
||||
target:
|
||||
name: "cpu.delta"
|
||||
growth_only: True
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
||||
# All these transformers are deprecated, and will be removed in the future, don't use them.
|
||||
- name: disk_sink
|
||||
transformers:
|
||||
- name: "rate_of_change"
|
||||
parameters:
|
||||
source:
|
||||
map_from:
|
||||
name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)"
|
||||
unit: "(B|request)"
|
||||
target:
|
||||
map_to:
|
||||
name: "\\1.\\2.\\3.rate"
|
||||
unit: "\\1/s"
|
||||
type: "gauge"
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
||||
# All these transformers are deprecated, and will be removed in the future, don't use them.
|
||||
- name: network_sink
|
||||
transformers:
|
||||
- name: "rate_of_change"
|
||||
parameters:
|
||||
source:
|
||||
map_from:
|
||||
name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
|
||||
unit: "(B|packet)"
|
||||
target:
|
||||
map_to:
|
||||
name: "network.\\1.\\2.rate"
|
||||
unit: "\\1/s"
|
||||
type: "gauge"
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
@ -126,7 +126,7 @@ class EventPipelineManager(base.PipelineManager):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(EventPipelineManager, self).__init__(
|
||||
conf, conf.event_pipeline_cfg_file, {})
|
||||
conf, conf.event_pipeline_cfg_file)
|
||||
|
||||
def get_main_endpoints(self):
|
||||
return [EventEndpoint(self.conf, self.publisher())]
|
||||
|
@ -73,74 +73,25 @@ class SampleSource(base.PipelineSource):
|
||||
|
||||
class SampleSink(base.Sink):
|
||||
|
||||
def _transform_sample(self, start, sample):
|
||||
try:
|
||||
for transformer in self.transformers[start:]:
|
||||
sample = transformer.handle_sample(sample)
|
||||
if not sample:
|
||||
LOG.debug(
|
||||
"Pipeline %(pipeline)s: Sample dropped by "
|
||||
"transformer %(trans)s", {'pipeline': self,
|
||||
'trans': transformer})
|
||||
return
|
||||
return sample
|
||||
except Exception:
|
||||
LOG.error("Pipeline %(pipeline)s: Exit after error "
|
||||
"from transformer %(trans)s "
|
||||
"for %(smp)s" % {'pipeline': self,
|
||||
'trans': transformer,
|
||||
'smp': sample},
|
||||
exc_info=True)
|
||||
|
||||
def _publish_samples(self, start, samples):
|
||||
def publish_samples(self, samples):
|
||||
"""Push samples into pipeline for publishing.
|
||||
|
||||
:param start: The first transformer that the sample will be injected.
|
||||
This is mainly for flush() invocation that transformer
|
||||
may emit samples.
|
||||
:param samples: Sample list.
|
||||
|
||||
"""
|
||||
|
||||
transformed_samples = []
|
||||
if not self.transformers:
|
||||
transformed_samples = samples
|
||||
else:
|
||||
for sample in samples:
|
||||
LOG.debug(
|
||||
"Pipeline %(pipeline)s: Transform sample "
|
||||
"%(smp)s from %(trans)s transformer", {'pipeline': self,
|
||||
'smp': sample,
|
||||
'trans': start})
|
||||
sample = self._transform_sample(start, sample)
|
||||
if sample:
|
||||
transformed_samples.append(sample)
|
||||
|
||||
if transformed_samples:
|
||||
if samples:
|
||||
for p in self.publishers:
|
||||
try:
|
||||
p.publish_samples(transformed_samples)
|
||||
p.publish_samples(samples)
|
||||
except Exception:
|
||||
LOG.error("Pipeline %(pipeline)s: Continue after "
|
||||
"error from publisher %(pub)s"
|
||||
% {'pipeline': self, 'pub': p},
|
||||
exc_info=True)
|
||||
|
||||
def publish_samples(self, samples):
|
||||
self._publish_samples(0, samples)
|
||||
|
||||
def flush(self):
|
||||
"""Flush data after all samples have been injected to pipeline."""
|
||||
|
||||
for (i, transformer) in enumerate(self.transformers):
|
||||
try:
|
||||
self._publish_samples(i + 1,
|
||||
list(transformer.flush()))
|
||||
except Exception:
|
||||
LOG.error("Pipeline %(pipeline)s: Error "
|
||||
"flushing transformer %(trans)s"
|
||||
% {'pipeline': self, 'trans': transformer},
|
||||
exc_info=True)
|
||||
@staticmethod
|
||||
def flush():
|
||||
pass
|
||||
|
||||
|
||||
class SamplePipeline(base.Pipeline):
|
||||
@ -195,11 +146,7 @@ class SamplePipelineManager(base.PipelineManager):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(SamplePipelineManager, self).__init__(
|
||||
conf, conf.pipeline_cfg_file, self.get_transform_manager())
|
||||
|
||||
@staticmethod
|
||||
def get_transform_manager():
|
||||
return extension.ExtensionManager('ceilometer.transformer')
|
||||
conf, conf.pipeline_cfg_file)
|
||||
|
||||
def get_main_endpoints(self):
|
||||
exts = extension.ExtensionManager(
|
||||
|
@ -86,8 +86,6 @@ resources:
|
||||
vcpus:
|
||||
cpu:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
cpu.delta:
|
||||
cpu_util:
|
||||
cpu_l3_cache:
|
||||
disk.root.size:
|
||||
disk.ephemeral.size:
|
||||
@ -132,8 +130,6 @@ resources:
|
||||
|
||||
- resource_type: instance_network_interface
|
||||
metrics:
|
||||
network.outgoing.packets.rate:
|
||||
network.incoming.packets.rate:
|
||||
network.outgoing.packets:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
network.incoming.packets:
|
||||
@ -146,8 +142,6 @@ resources:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
network.incoming.packets.error:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
network.outgoing.bytes.rate:
|
||||
network.incoming.bytes.rate:
|
||||
network.outgoing.bytes:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
network.incoming.bytes:
|
||||
@ -160,16 +154,12 @@ resources:
|
||||
metrics:
|
||||
disk.device.read.requests:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
disk.device.read.requests.rate:
|
||||
disk.device.write.requests:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
disk.device.write.requests.rate:
|
||||
disk.device.read.bytes:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
disk.device.read.bytes.rate:
|
||||
disk.device.write.bytes:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
disk.device.write.bytes.rate:
|
||||
disk.device.latency:
|
||||
disk.device.read.latency:
|
||||
disk.device.write.latency:
|
||||
|
@ -63,7 +63,6 @@ class HttpPublisher(publisher.ConfigPublisherBase):
|
||||
the sinks like the following:
|
||||
|
||||
- name: event_sink
|
||||
transformers:
|
||||
publishers:
|
||||
- http://host:80/path?timeout=1&max_retries=2
|
||||
|
||||
|
@ -100,7 +100,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
|
||||
def publish_samples(self, samples):
|
||||
"""Publish samples on RPC.
|
||||
|
||||
:param samples: Samples from pipeline after transformation.
|
||||
:param samples: Samples from pipeline.
|
||||
|
||||
"""
|
||||
|
||||
@ -172,7 +172,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
|
||||
def publish_events(self, events):
|
||||
"""Send an event message for publishing
|
||||
|
||||
:param events: events from pipeline after transformation
|
||||
:param events: events from pipeline.
|
||||
"""
|
||||
ev_list = [utils.message_from_event(
|
||||
event, self.conf.publisher.telemetry_secret) for event in events]
|
||||
@ -216,7 +216,6 @@ class NotifierPublisher(MessagingPublisher):
|
||||
- notifier_sink
|
||||
sinks:
|
||||
- name: notifier_sink
|
||||
transformers:
|
||||
publishers:
|
||||
- notifier://[notifier_ip]:[notifier_port]?topic=[topic]&
|
||||
driver=driver&max_retry=100
|
||||
|
@ -36,7 +36,6 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
|
||||
- zaqar_sink
|
||||
sinks:
|
||||
- name: zaqar_sink
|
||||
transformers:
|
||||
publishers:
|
||||
- zaqar://?queue=meter_queue&ttl=1200
|
||||
|
||||
@ -63,7 +62,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
|
||||
def publish_samples(self, samples):
|
||||
"""Send a metering message for publishing
|
||||
|
||||
:param samples: Samples from pipeline after transformation
|
||||
:param samples: Samples from pipeline.
|
||||
"""
|
||||
queue = self.client.queue(self.queue_name)
|
||||
messages = [{'body': sample.as_dict(), 'ttl': self.ttl}
|
||||
@ -73,7 +72,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
|
||||
def publish_events(self, events):
|
||||
"""Send an event message for publishing
|
||||
|
||||
:param events: events from pipeline after transformation
|
||||
:param events: events from pipeline.
|
||||
"""
|
||||
queue = self.client.queue(self.queue_name)
|
||||
messages = [{'body': event.serialize(), 'ttl': self.ttl}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -677,7 +677,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
@ -720,7 +719,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
@ -742,7 +740,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
@ -771,7 +768,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
@ -812,7 +808,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
|
@ -12,9 +12,6 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import yaml
|
||||
|
||||
from ceilometer.pipeline import base
|
||||
from ceilometer.pipeline import sample as pipeline
|
||||
from ceilometer import sample
|
||||
@ -27,7 +24,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
'meters': ['a'],
|
||||
'sinks': ['test_sink']}
|
||||
sink = {'name': 'test_sink',
|
||||
'transformers': [{'name': 'update', 'parameters': {}}],
|
||||
'publishers': ['test://']}
|
||||
self.pipeline_cfg = {'sources': [source], 'sinks': [sink]}
|
||||
|
||||
@ -39,13 +35,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
})
|
||||
self.pipeline_cfg['sinks'].append({
|
||||
'name': 'second_sink',
|
||||
'transformers': [{
|
||||
'name': 'update',
|
||||
'parameters':
|
||||
{
|
||||
'append_name': '_new',
|
||||
}
|
||||
}],
|
||||
'publishers': ['new'],
|
||||
})
|
||||
|
||||
@ -57,13 +46,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
})
|
||||
self.pipeline_cfg['sinks'].append({
|
||||
'name': 'second_sink',
|
||||
'transformers': [{
|
||||
'name': 'update',
|
||||
'parameters':
|
||||
{
|
||||
'append_name': '_new',
|
||||
}
|
||||
}],
|
||||
'publishers': ['except'],
|
||||
})
|
||||
|
||||
@ -113,13 +95,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
self._set_pipeline_cfg('meters', meter_cfg)
|
||||
self.pipeline_cfg['sinks'].append({
|
||||
'name': 'second_sink',
|
||||
'transformers': [{
|
||||
'name': 'update',
|
||||
'parameters':
|
||||
{
|
||||
'append_name': '_new',
|
||||
}
|
||||
}],
|
||||
'publishers': ['new'],
|
||||
})
|
||||
self.pipeline_cfg['sources'][0]['sinks'].append('second_sink')
|
||||
@ -150,12 +125,11 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
str(pipeline_manager.pipelines[1]))
|
||||
test_publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||
new_publisher = pipeline_manager.pipelines[1].publishers[0]
|
||||
for publisher, sfx in [(test_publisher, '_update'),
|
||||
(new_publisher, '_new')]:
|
||||
for publisher in (test_publisher, new_publisher):
|
||||
self.assertEqual(2, len(publisher.samples))
|
||||
self.assertEqual(2, publisher.calls)
|
||||
self.assertEqual('a' + sfx, getattr(publisher.samples[0], "name"))
|
||||
self.assertEqual('b' + sfx, getattr(publisher.samples[1], "name"))
|
||||
self.assertEqual('a', getattr(publisher.samples[0], "name"))
|
||||
self.assertEqual('b', getattr(publisher.samples[1], "name"))
|
||||
|
||||
def test_multiple_sources_with_single_sink(self):
|
||||
self.pipeline_cfg['sources'].append({
|
||||
@ -193,68 +167,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
for publisher in [test_publisher, another_publisher]:
|
||||
self.assertEqual(2, len(publisher.samples))
|
||||
self.assertEqual(2, publisher.calls)
|
||||
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
|
||||
self.assertEqual('b_update', getattr(publisher.samples[1], "name"))
|
||||
|
||||
transformed_samples = self.TransformerClass.samples
|
||||
self.assertEqual(2, len(transformed_samples))
|
||||
self.assertEqual(['a', 'b'],
|
||||
[getattr(s, 'name') for s in transformed_samples])
|
||||
|
||||
def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index,
|
||||
meters, units):
|
||||
with open('ceilometer/pipeline/data/pipeline.yaml') as fap:
|
||||
data = fap.read()
|
||||
pipeline_cfg = yaml.safe_load(data)
|
||||
for s in pipeline_cfg['sinks']:
|
||||
s['publishers'] = ['test://']
|
||||
name = self.cfg2file(pipeline_cfg)
|
||||
self.CONF.set_override('pipeline_cfg_file', name)
|
||||
pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
|
||||
pipe = pipeline_manager.pipelines[index]
|
||||
self._do_test_rate_of_change_mapping(pipe, meters, units)
|
||||
|
||||
def test_rate_of_change_boilerplate_disk_read_cfg(self):
|
||||
meters = ('disk.read.bytes', 'disk.read.requests')
|
||||
units = ('B', 'request')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_disk_write_cfg(self):
|
||||
meters = ('disk.write.bytes', 'disk.write.requests')
|
||||
units = ('B', 'request')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_network_incoming_cfg(self):
|
||||
meters = ('network.incoming.bytes', 'network.incoming.packets')
|
||||
units = ('B', 'packet')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self):
|
||||
meters = ('disk.device.read.bytes', 'disk.device.read.requests')
|
||||
units = ('B', 'request')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self):
|
||||
meters = ('disk.device.write.bytes', 'disk.device.write.requests')
|
||||
units = ('B', 'request')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
|
||||
meters = ('network.outgoing.bytes', 'network.outgoing.packets')
|
||||
units = ('B', 'packet')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
|
||||
meters,
|
||||
units)
|
||||
self.assertEqual('a', getattr(publisher.samples[0], "name"))
|
||||
self.assertEqual('b', getattr(publisher.samples[1], "name"))
|
||||
|
||||
def test_duplicated_sinks_names(self):
|
||||
self.pipeline_cfg['sinks'].append({
|
||||
|
@ -141,7 +141,6 @@ class BaseRealNotification(BaseNotificationTest):
|
||||
}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ['test://']
|
||||
}]
|
||||
})
|
||||
|
@ -1,115 +0,0 @@
|
||||
#
|
||||
# Copyright 2016 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from oslotest import base
|
||||
|
||||
from ceilometer import sample
|
||||
from ceilometer.transformer import conversions
|
||||
|
||||
|
||||
class AggregatorTransformerTestCase(base.BaseTestCase):
|
||||
SAMPLE = sample.Sample(
|
||||
name='cpu',
|
||||
type=sample.TYPE_CUMULATIVE,
|
||||
unit='ns',
|
||||
volume='1234567',
|
||||
user_id='56c5692032f34041900342503fecab30',
|
||||
project_id='ac9494df2d9d4e709bac378cceabaf23',
|
||||
resource_id='1ca738a1-c49c-4401-8346-5c60ebdb03f4',
|
||||
timestamp="2015-10-29 14:12:15.485877+00:00",
|
||||
resource_metadata={}
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(AggregatorTransformerTestCase, self).setUp()
|
||||
self._sample_offset = 0
|
||||
|
||||
def test_init_input_validation(self):
|
||||
aggregator = conversions.AggregatorTransformer("2", "15", None,
|
||||
None, None)
|
||||
self.assertEqual(2, aggregator.size)
|
||||
self.assertEqual(15, aggregator.retention_time)
|
||||
|
||||
def test_init_no_size_or_rention_time(self):
|
||||
aggregator = conversions.AggregatorTransformer()
|
||||
self.assertEqual(1, aggregator.size)
|
||||
self.assertIsNone(aggregator.retention_time)
|
||||
|
||||
def test_init_size_zero(self):
|
||||
aggregator = conversions.AggregatorTransformer(size="0")
|
||||
self.assertEqual(1, aggregator.size)
|
||||
self.assertIsNone(aggregator.retention_time)
|
||||
|
||||
def test_init_input_validation_size_invalid(self):
|
||||
self.assertRaises(ValueError, conversions.AggregatorTransformer,
|
||||
"abc", "15", None, None, None)
|
||||
|
||||
def test_init_input_validation_retention_time_invalid(self):
|
||||
self.assertRaises(ValueError, conversions.AggregatorTransformer,
|
||||
"2", "abc", None, None, None)
|
||||
|
||||
def test_init_no_timestamp(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None)
|
||||
self.assertEqual("first", aggregator.timestamp)
|
||||
|
||||
def test_init_timestamp_none(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None, None)
|
||||
self.assertEqual("first", aggregator.timestamp)
|
||||
|
||||
def test_init_timestamp_first(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None, "first")
|
||||
self.assertEqual("first", aggregator.timestamp)
|
||||
|
||||
def test_init_timestamp_last(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None, "last")
|
||||
self.assertEqual("last", aggregator.timestamp)
|
||||
|
||||
def test_init_timestamp_invalid(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None,
|
||||
"invalid_option")
|
||||
self.assertEqual("first", aggregator.timestamp)
|
||||
|
||||
def test_size_unbounded(self):
|
||||
aggregator = conversions.AggregatorTransformer(size="0",
|
||||
retention_time="300")
|
||||
self._insert_sample_data(aggregator)
|
||||
|
||||
samples = aggregator.flush()
|
||||
|
||||
self.assertEqual([], samples)
|
||||
|
||||
def test_size_bounded(self):
|
||||
aggregator = conversions.AggregatorTransformer(size="100")
|
||||
self._insert_sample_data(aggregator)
|
||||
|
||||
samples = aggregator.flush()
|
||||
|
||||
self.assertEqual(100, len(samples))
|
||||
|
||||
def _insert_sample_data(self, aggregator):
|
||||
for _ in range(100):
|
||||
sample = copy.copy(self.SAMPLE)
|
||||
sample.resource_id = sample.resource_id + str(self._sample_offset)
|
||||
sample.timestamp = datetime.datetime.isoformat(timeutils.utcnow())
|
||||
aggregator.handle_sample(sample)
|
||||
self._sample_offset += 1
|
@ -1,73 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Intel Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import collections
|
||||
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class TransformerBase(object):
|
||||
"""Base class for plugins that transform the sample."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Setup transformer.
|
||||
|
||||
Each time a transformed is involved in a pipeline, a new transformer
|
||||
instance is created and chained into the pipeline. i.e. transformer
|
||||
instance is per pipeline. This helps if transformer need keep some
|
||||
cache and per-pipeline information.
|
||||
|
||||
:param kwargs: The parameters that are defined in pipeline config file.
|
||||
"""
|
||||
super(TransformerBase, self).__init__()
|
||||
|
||||
@abc.abstractmethod
|
||||
def handle_sample(self, sample):
|
||||
"""Transform a sample.
|
||||
|
||||
:param sample: A sample.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def flush():
|
||||
"""Flush samples cached previously."""
|
||||
return []
|
||||
|
||||
|
||||
class Namespace(object):
|
||||
"""Encapsulates the namespace.
|
||||
|
||||
Encapsulation is done by wrapping the evaluation of the configured rule.
|
||||
This allows nested dicts to be accessed in the attribute style,
|
||||
and missing attributes to yield false when used in a boolean expression.
|
||||
"""
|
||||
def __init__(self, seed):
|
||||
self.__dict__ = collections.defaultdict(lambda: Namespace({}))
|
||||
self.__dict__.update(seed)
|
||||
for k, v in six.iteritems(self.__dict__):
|
||||
if isinstance(v, dict):
|
||||
self.__dict__[k] = Namespace(v)
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return self.__dict__[attr]
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.__dict__[key]
|
||||
|
||||
def __nonzero__(self):
|
||||
return len(self.__dict__) > 0
|
||||
__bool__ = __nonzero__
|
@ -1,42 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Julien Danjou
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from ceilometer import transformer
|
||||
|
||||
|
||||
class TransformerAccumulator(transformer.TransformerBase):
|
||||
"""Transformer that accumulates samples until a threshold.
|
||||
|
||||
And then flushes them out into the wild.
|
||||
"""
|
||||
|
||||
def __init__(self, size=1, **kwargs):
|
||||
if size >= 1:
|
||||
self.samples = []
|
||||
self.size = size
|
||||
super(TransformerAccumulator, self).__init__(**kwargs)
|
||||
|
||||
def handle_sample(self, sample):
|
||||
if self.size >= 1:
|
||||
self.samples.append(sample)
|
||||
else:
|
||||
return sample
|
||||
|
||||
def flush(self):
|
||||
if len(self.samples) >= self.size:
|
||||
x = self.samples
|
||||
self.samples = []
|
||||
return x
|
||||
return []
|
@ -1,157 +0,0 @@
|
||||
#
|
||||
# Copyright 2014 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import keyword
|
||||
import math
|
||||
import re
|
||||
|
||||
from oslo_log import log
|
||||
import six
|
||||
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer import sample
|
||||
from ceilometer import transformer
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ArithmeticTransformer(transformer.TransformerBase):
|
||||
"""Multi meter arithmetic transformer.
|
||||
|
||||
Transformer that performs arithmetic operations
|
||||
over one or more meters and/or their metadata.
|
||||
"""
|
||||
|
||||
meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
|
||||
|
||||
def __init__(self, target=None, **kwargs):
|
||||
super(ArithmeticTransformer, self).__init__(**kwargs)
|
||||
target = target or {}
|
||||
self.target = target
|
||||
self.expr = target.get('expr', '')
|
||||
self.expr_escaped, self.escaped_names = self.parse_expr(self.expr)
|
||||
self.required_meters = list(self.escaped_names.values())
|
||||
self.misconfigured = len(self.required_meters) == 0
|
||||
if not self.misconfigured:
|
||||
self.reference_meter = self.required_meters[0]
|
||||
# convert to set for more efficient contains operation
|
||||
self.required_meters = set(self.required_meters)
|
||||
self.cache = collections.defaultdict(dict)
|
||||
self.latest_timestamp = None
|
||||
else:
|
||||
LOG.warning(_('Arithmetic transformer must use at least one'
|
||||
' meter in expression \'%s\''), self.expr)
|
||||
|
||||
def _update_cache(self, _sample):
|
||||
"""Update the cache with the latest sample."""
|
||||
escaped_name = self.escaped_names.get(_sample.name, '')
|
||||
if escaped_name not in self.required_meters:
|
||||
return
|
||||
self.cache[_sample.resource_id][escaped_name] = _sample
|
||||
|
||||
def _check_requirements(self, resource_id):
|
||||
"""Check if all the required meters are available in the cache."""
|
||||
return len(self.cache[resource_id]) == len(self.required_meters)
|
||||
|
||||
def _calculate(self, resource_id):
|
||||
"""Evaluate the expression and return a new sample if successful."""
|
||||
ns_dict = dict((m, s.as_dict()) for m, s
|
||||
in six.iteritems(self.cache[resource_id]))
|
||||
ns = transformer.Namespace(ns_dict)
|
||||
try:
|
||||
new_volume = eval(self.expr_escaped, {}, ns)
|
||||
if math.isnan(new_volume):
|
||||
raise ArithmeticError(_('Expression evaluated to '
|
||||
'a NaN value!'))
|
||||
|
||||
reference_sample = self.cache[resource_id][self.reference_meter]
|
||||
return sample.Sample(
|
||||
name=self.target.get('name', reference_sample.name),
|
||||
unit=self.target.get('unit', reference_sample.unit),
|
||||
type=self.target.get('type', reference_sample.type),
|
||||
volume=float(new_volume),
|
||||
user_id=reference_sample.user_id,
|
||||
project_id=reference_sample.project_id,
|
||||
resource_id=reference_sample.resource_id,
|
||||
timestamp=self.latest_timestamp,
|
||||
resource_metadata=reference_sample.resource_metadata
|
||||
)
|
||||
except Exception as e:
|
||||
LOG.warning(_('Unable to evaluate expression %(expr)s: %(exc)s'),
|
||||
{'expr': self.expr, 'exc': e})
|
||||
|
||||
def handle_sample(self, _sample):
|
||||
self._update_cache(_sample)
|
||||
self.latest_timestamp = _sample.timestamp
|
||||
|
||||
def flush(self):
|
||||
new_samples = []
|
||||
if not self.misconfigured:
|
||||
# When loop self.cache, the dict could not be change by others.
|
||||
# If changed, will raise "RuntimeError: dictionary changed size
|
||||
# during iteration". so we make a tmp copy and just loop it.
|
||||
tmp_cache = copy.copy(self.cache)
|
||||
for resource_id in tmp_cache:
|
||||
if self._check_requirements(resource_id):
|
||||
new_samples.append(self._calculate(resource_id))
|
||||
if resource_id in self.cache:
|
||||
self.cache.pop(resource_id)
|
||||
return new_samples
|
||||
|
||||
@classmethod
|
||||
def parse_expr(cls, expr):
|
||||
"""Transforms meter names in the expression into valid identifiers.
|
||||
|
||||
:param expr: unescaped expression
|
||||
:return: A tuple of the escaped expression and a dict representing
|
||||
the translation of meter names into Python identifiers
|
||||
"""
|
||||
|
||||
class Replacer(object):
|
||||
"""Replaces matched meter names with escaped names.
|
||||
|
||||
If the meter name is not followed by parameter access in the
|
||||
expression, it defaults to accessing the 'volume' parameter.
|
||||
"""
|
||||
|
||||
def __init__(self, original_expr):
|
||||
self.original_expr = original_expr
|
||||
self.escaped_map = {}
|
||||
|
||||
def __call__(self, match):
|
||||
meter_name = match.group(1)
|
||||
escaped_name = self.escape(meter_name)
|
||||
self.escaped_map[meter_name] = escaped_name
|
||||
|
||||
if (match.end(0) == len(self.original_expr) or
|
||||
self.original_expr[match.end(0)] != '.'):
|
||||
escaped_name += '.volume'
|
||||
return escaped_name
|
||||
|
||||
@staticmethod
|
||||
def escape(name):
|
||||
has_dot = '.' in name
|
||||
if has_dot:
|
||||
name = name.replace('.', '_')
|
||||
|
||||
if has_dot or name.endswith('ESC') or name in keyword.kwlist:
|
||||
name = "_" + name + '_ESC'
|
||||
return name
|
||||
|
||||
replacer = Replacer(expr)
|
||||
expr = re.sub(cls.meter_name_re, replacer, expr)
|
||||
return expr, replacer.escaped_map
|
@ -1,344 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import re
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer import sample
|
||||
from ceilometer import transformer
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseConversionTransformer(transformer.TransformerBase):
|
||||
"""Transformer to derive conversion."""
|
||||
|
||||
def __init__(self, source=None, target=None, **kwargs):
|
||||
"""Initialize transformer with configured parameters.
|
||||
|
||||
:param source: dict containing source sample unit
|
||||
:param target: dict containing target sample name, type,
|
||||
unit and scaling factor (a missing value
|
||||
connotes no change)
|
||||
"""
|
||||
self.source = source or {}
|
||||
self.target = target or {}
|
||||
super(BaseConversionTransformer, self).__init__(**kwargs)
|
||||
|
||||
def _map(self, s, attr):
|
||||
"""Apply the name or unit mapping if configured."""
|
||||
mapped = None
|
||||
from_ = self.source.get('map_from')
|
||||
to_ = self.target.get('map_to')
|
||||
if from_ and to_:
|
||||
if from_.get(attr) and to_.get(attr):
|
||||
try:
|
||||
mapped = re.sub(from_[attr], to_[attr], getattr(s, attr))
|
||||
except Exception:
|
||||
pass
|
||||
return mapped or self.target.get(attr, getattr(s, attr))
|
||||
|
||||
|
||||
class DeltaTransformer(BaseConversionTransformer):
|
||||
"""Transformer based on the delta of a sample volume."""
|
||||
|
||||
def __init__(self, target=None, growth_only=False, **kwargs):
|
||||
"""Initialize transformer with configured parameters.
|
||||
|
||||
:param growth_only: capture only positive deltas
|
||||
"""
|
||||
super(DeltaTransformer, self).__init__(target=target, **kwargs)
|
||||
self.growth_only = growth_only
|
||||
self.cache = {}
|
||||
|
||||
def handle_sample(self, s):
|
||||
"""Handle a sample, converting if necessary."""
|
||||
key = s.name + s.resource_id
|
||||
prev = self.cache.get(key)
|
||||
timestamp = timeutils.parse_isotime(s.timestamp)
|
||||
self.cache[key] = (s.volume, timestamp)
|
||||
|
||||
if prev:
|
||||
prev_volume = prev[0]
|
||||
prev_timestamp = prev[1]
|
||||
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
|
||||
# disallow violations of the arrow of time
|
||||
if time_delta < 0:
|
||||
LOG.warning('Dropping out of time order sample: %s', (s,))
|
||||
# Reset the cache to the newer sample.
|
||||
self.cache[key] = prev
|
||||
return None
|
||||
volume_delta = s.volume - prev_volume
|
||||
if self.growth_only and volume_delta < 0:
|
||||
LOG.warning('Negative delta detected, dropping value')
|
||||
s = None
|
||||
else:
|
||||
s = self._convert(s, volume_delta)
|
||||
LOG.debug('Converted to: %s', s)
|
||||
else:
|
||||
LOG.warning('Dropping sample with no predecessor: %s', (s,))
|
||||
s = None
|
||||
return s
|
||||
|
||||
def _convert(self, s, delta):
|
||||
"""Transform the appropriate sample fields."""
|
||||
return sample.Sample(
|
||||
name=self._map(s, 'name'),
|
||||
unit=s.unit,
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=delta,
|
||||
user_id=s.user_id,
|
||||
project_id=s.project_id,
|
||||
resource_id=s.resource_id,
|
||||
timestamp=s.timestamp,
|
||||
resource_metadata=s.resource_metadata
|
||||
)
|
||||
|
||||
|
||||
class ScalingTransformer(BaseConversionTransformer):
|
||||
"""Transformer to apply a scaling conversion."""
|
||||
|
||||
def __init__(self, source=None, target=None, **kwargs):
|
||||
"""Initialize transformer with configured parameters.
|
||||
|
||||
:param source: dict containing source sample unit
|
||||
:param target: dict containing target sample name, type,
|
||||
unit and scaling factor (a missing value
|
||||
connotes no change)
|
||||
"""
|
||||
super(ScalingTransformer, self).__init__(source=source, target=target,
|
||||
**kwargs)
|
||||
self.scale = self.target.get('scale')
|
||||
self.max = self.target.get('max')
|
||||
LOG.debug('scaling conversion transformer with source:'
|
||||
' %(source)s target: %(target)s:', {'source': self.source,
|
||||
'target': self.target})
|
||||
|
||||
def _scale(self, s):
|
||||
"""Apply the scaling factor.
|
||||
|
||||
Either a straight multiplicative factor or else a string to be eval'd.
|
||||
"""
|
||||
ns = transformer.Namespace(s.as_dict())
|
||||
|
||||
scale = self.scale
|
||||
return ((eval(scale, {}, ns) if isinstance(scale, six.string_types)
|
||||
else s.volume * scale) if scale else s.volume)
|
||||
|
||||
def _convert(self, s, growth=1):
|
||||
"""Transform the appropriate sample fields."""
|
||||
volume = self._scale(s) * growth
|
||||
return sample.Sample(
|
||||
name=self._map(s, 'name'),
|
||||
unit=self._map(s, 'unit'),
|
||||
type=self.target.get('type', s.type),
|
||||
volume=min(volume, self.max) if self.max else volume,
|
||||
user_id=s.user_id,
|
||||
project_id=s.project_id,
|
||||
resource_id=s.resource_id,
|
||||
timestamp=s.timestamp,
|
||||
resource_metadata=s.resource_metadata
|
||||
)
|
||||
|
||||
def handle_sample(self, s):
|
||||
"""Handle a sample, converting if necessary."""
|
||||
LOG.debug('handling sample %s', s)
|
||||
if self.source.get('unit', s.unit) == s.unit:
|
||||
s = self._convert(s)
|
||||
LOG.debug('converted to: %s', s)
|
||||
return s
|
||||
|
||||
|
||||
class RateOfChangeTransformer(ScalingTransformer):
|
||||
"""Transformer based on the rate of change of a sample volume.
|
||||
|
||||
For example, taking the current and previous volumes of a cumulative sample
|
||||
and producing a gauge value based on the proportion of some maximum used.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Initialize transformer with configured parameters."""
|
||||
super(RateOfChangeTransformer, self).__init__(**kwargs)
|
||||
self.cache = {}
|
||||
self.scale = self.scale or '1'
|
||||
|
||||
def handle_sample(self, s):
|
||||
"""Handle a sample, converting if necessary."""
|
||||
LOG.debug('handling sample %s', s)
|
||||
key = s.name + s.resource_id
|
||||
prev = self.cache.get(key)
|
||||
timestamp = timeutils.parse_isotime(s.timestamp)
|
||||
self.cache[key] = (s.volume, timestamp, s.monotonic_time)
|
||||
|
||||
if prev:
|
||||
prev_volume = prev[0]
|
||||
prev_timestamp = prev[1]
|
||||
prev_monotonic_time = prev[2]
|
||||
if (prev_monotonic_time is not None and
|
||||
s.monotonic_time is not None):
|
||||
# NOTE(sileht): Prefer high precision timer
|
||||
time_delta = s.monotonic_time - prev_monotonic_time
|
||||
else:
|
||||
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
|
||||
# disallow violations of the arrow of time
|
||||
if time_delta < 0:
|
||||
LOG.warning(_('dropping out of time order sample: %s'), (s,))
|
||||
# Reset the cache to the newer sample.
|
||||
self.cache[key] = prev
|
||||
return None
|
||||
# we only allow negative volume deltas for noncumulative
|
||||
# samples, whereas for cumulative we assume that a reset has
|
||||
# occurred in the interim so that the current volume gives a
|
||||
# lower bound on growth
|
||||
volume_delta = (s.volume - prev_volume
|
||||
if (prev_volume <= s.volume or
|
||||
s.type != sample.TYPE_CUMULATIVE)
|
||||
else s.volume)
|
||||
rate_of_change = ((1.0 * volume_delta / time_delta)
|
||||
if time_delta else 0.0)
|
||||
|
||||
s = self._convert(s, rate_of_change)
|
||||
LOG.debug('converted to: %s', s)
|
||||
else:
|
||||
LOG.warning(_('dropping sample with no predecessor: %s'),
|
||||
(s,))
|
||||
s = None
|
||||
return s
|
||||
|
||||
|
||||
class AggregatorTransformer(ScalingTransformer):
|
||||
"""Transformer that aggregates samples.
|
||||
|
||||
Aggregation goes until a threshold or/and a retention_time, and then
|
||||
flushes them out into the wild.
|
||||
|
||||
Example:
|
||||
To aggregate sample by resource_metadata and keep the
|
||||
resource_metadata of the latest received sample;
|
||||
|
||||
AggregatorTransformer(retention_time=60, resource_metadata='last')
|
||||
|
||||
To aggregate sample by user_id and resource_metadata and keep the
|
||||
user_id of the first received sample and drop the resource_metadata.
|
||||
|
||||
AggregatorTransformer(size=15, user_id='first',
|
||||
resource_metadata='drop')
|
||||
|
||||
To keep the timestamp of the last received sample rather
|
||||
than the first:
|
||||
|
||||
AggregatorTransformer(timestamp="last")
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, size=1, retention_time=None,
|
||||
project_id=None, user_id=None, resource_metadata="last",
|
||||
timestamp="first", **kwargs):
|
||||
super(AggregatorTransformer, self).__init__(**kwargs)
|
||||
self.samples = {}
|
||||
self.counts = collections.defaultdict(int)
|
||||
self.size = int(size) if size else None
|
||||
self.retention_time = float(retention_time) if retention_time else None
|
||||
if not (self.size or self.retention_time):
|
||||
self.size = 1
|
||||
|
||||
if timestamp in ["first", "last"]:
|
||||
self.timestamp = timestamp
|
||||
else:
|
||||
self.timestamp = "first"
|
||||
|
||||
self.initial_timestamp = None
|
||||
self.aggregated_samples = 0
|
||||
|
||||
self.key_attributes = []
|
||||
self.merged_attribute_policy = {}
|
||||
|
||||
self._init_attribute('project_id', project_id)
|
||||
self._init_attribute('user_id', user_id)
|
||||
self._init_attribute('resource_metadata', resource_metadata,
|
||||
is_droppable=True, mandatory=True)
|
||||
|
||||
def _init_attribute(self, name, value, is_droppable=False,
|
||||
mandatory=False):
|
||||
drop = ['drop'] if is_droppable else []
|
||||
if value or mandatory:
|
||||
if value not in ['last', 'first'] + drop:
|
||||
LOG.warning('%s is unknown (%s), using last' % (name, value))
|
||||
value = 'last'
|
||||
self.merged_attribute_policy[name] = value
|
||||
else:
|
||||
self.key_attributes.append(name)
|
||||
|
||||
def _get_unique_key(self, s):
|
||||
# NOTE(arezmerita): in samples generated by ceilometer middleware,
|
||||
# when accessing without authentication publicly readable/writable
|
||||
# swift containers, the project_id and the user_id are missing.
|
||||
# They will be replaced by <undefined> for unique key construction.
|
||||
keys = ['<undefined>' if getattr(s, f) is None else getattr(s, f)
|
||||
for f in self.key_attributes]
|
||||
non_aggregated_keys = "-".join(keys)
|
||||
# NOTE(sileht): it assumes, a meter always have the same unit/type
|
||||
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
|
||||
|
||||
def handle_sample(self, sample_):
|
||||
if not self.initial_timestamp:
|
||||
self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
|
||||
|
||||
self.aggregated_samples += 1
|
||||
key = self._get_unique_key(sample_)
|
||||
self.counts[key] += 1
|
||||
if key not in self.samples:
|
||||
self.samples[key] = self._convert(sample_)
|
||||
if self.merged_attribute_policy[
|
||||
'resource_metadata'] == 'drop':
|
||||
self.samples[key].resource_metadata = {}
|
||||
else:
|
||||
if self.timestamp == "last":
|
||||
self.samples[key].timestamp = sample_.timestamp
|
||||
if sample_.type == sample.TYPE_CUMULATIVE:
|
||||
self.samples[key].volume = self._scale(sample_)
|
||||
else:
|
||||
self.samples[key].volume += self._scale(sample_)
|
||||
for field in self.merged_attribute_policy:
|
||||
if self.merged_attribute_policy[field] == 'last':
|
||||
setattr(self.samples[key], field,
|
||||
getattr(sample_, field))
|
||||
|
||||
def flush(self):
|
||||
if not self.initial_timestamp:
|
||||
return []
|
||||
|
||||
expired = (self.retention_time and
|
||||
timeutils.is_older_than(self.initial_timestamp,
|
||||
self.retention_time))
|
||||
full = self.size and self.aggregated_samples >= self.size
|
||||
if full or expired:
|
||||
x = list(self.samples.values())
|
||||
# gauge aggregates need to be averages
|
||||
for s in x:
|
||||
if s.type == sample.TYPE_GAUGE:
|
||||
key = self._get_unique_key(s)
|
||||
s.volume /= self.counts[key]
|
||||
self.samples.clear()
|
||||
self.counts.clear()
|
||||
self.aggregated_samples = 0
|
||||
self.initial_timestamp = None
|
||||
return x
|
||||
return []
|
@ -405,6 +405,9 @@ if is_service_enabled ceilometer; then
|
||||
start_ceilometer
|
||||
elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then
|
||||
iniset $TEMPEST_CONFIG telemetry alarm_granularity $CEILOMETER_ALARM_GRANULARITY
|
||||
iniset $TEMPEST_CONFIG telemetry alarm_threshold 10000000000
|
||||
iniset $TEMPEST_CONFIG telemetry alarm_metric_name cpu
|
||||
iniset $TEMPEST_CONFIG telemetry alarm_aggregation_method rate:mean
|
||||
fi
|
||||
|
||||
if [[ "$1" == "unstack" ]]; then
|
||||
|
@ -19,7 +19,11 @@ fi
|
||||
# Gnocchi default archive_policy for Ceilometer
|
||||
# TODO(sileht): when Gnocchi 4.0 is out use the tarball instead
|
||||
GNOCCHI_GIT_PATH=${GNOCCHI_GIT_PATH:-git+https://github.com/gnocchixyz/gnocchi#egg=gnocchi}
|
||||
GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low}
|
||||
if [ -n "$GNOCCHI_ARCHIVE_POLICY_TEMPEST" ]; then
|
||||
GNOCCHI_ARCHIVE_POLICY=$GNOCCHI_ARCHIVE_POLICY_TEMPEST
|
||||
else
|
||||
GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low}
|
||||
fi
|
||||
GNOCCHI_CONF_DIR=${GNOCCHI_CONF_DIR:-/etc/gnocchi}
|
||||
GNOCCHI_CONF=${GNOCCHI_CONF:-${GNOCCHI_CONF_DIR}/gnocchi.conf}
|
||||
GNOCCHI_COORDINATOR_URL=${CEILOMETER_COORDINATOR_URL:-redis://localhost:6379}
|
||||
|
@ -103,14 +103,6 @@ The following meters are collected for OpenStack Compute.
|
||||
| cpu | Cumu\ | ns | instance | Pollster | Libvirt,| CPU time used |
|
||||
| | lative| | ID | | Hyper-V | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| cpu.delta | Delta | ns | instance | Pollster | Libvirt,| CPU time used s\ |
|
||||
| | | | ID | | Hyper-V | ince previous d\ |
|
||||
| | | | | | | atapoint |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| cpu_util | Gauge | % | instance | Pollster | LibVirt,| Average CPU |
|
||||
| | | | ID | | vSphere,| utilization |
|
||||
| | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| vcpus | Gauge | vcpu | instance | Notific\ | Libvirt,| Number of virtual|
|
||||
| | | | ID | ation | Hyper-V | CPUs allocated to|
|
||||
| | | | | | | the instance |
|
||||
@ -118,17 +110,9 @@ The following meters are collected for OpenStack Compute.
|
||||
| disk.read\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of read |
|
||||
| .requests | ative | uest | ID | | Hyper-V | requests |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.read\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of |
|
||||
| .requests\| | est/s| ID | | Hyper-V,| read requests |
|
||||
| .rate | | | | | vSphere | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.writ\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of write |
|
||||
| e.requests| ative | uest | ID | | Hyper-V | requests |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.writ\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of |
|
||||
| e.request\| | est/s| ID | | Hyper-V,| write requests |
|
||||
| s.rate | | | | | vSphere | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.read\| Cumu\ | B | instance | Pollster | Libvirt,| Volume of reads |
|
||||
| .bytes | lative| | ID | | Hyper-V | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
@ -149,38 +133,18 @@ The following meters are collected for OpenStack Compute.
|
||||
| ice.read\ | lative| uest | | | Hyper-V | requests |
|
||||
| .requests | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of |
|
||||
| ice.read\ | | est/s| | | Hyper-V,| read requests |
|
||||
| .requests\| | | | | vSphere | |
|
||||
| .rate | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Cumu\ | req\ | disk ID | Pollster | Libvirt,| Number of write |
|
||||
| ice.write\| lative| uest | | | Hyper-V | requests |
|
||||
| .requests | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of |
|
||||
| ice.write\| | est/s| | | Hyper-V,| write requests |
|
||||
| .requests\| | | | | vSphere | |
|
||||
| .rate | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of reads |
|
||||
| ice.read\ | lative| | | | Hyper-V | |
|
||||
| .bytes | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of |
|
||||
| ice.read\ | | | | | Hyper-V,| reads |
|
||||
| .bytes | | | | | vSphere | |
|
||||
| .rate | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of writes |
|
||||
| ice.write\| lative| | | | Hyper-V | |
|
||||
| .bytes | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of |
|
||||
| ice.write\| | | | | Hyper-V,| writes |
|
||||
| .bytes | | | | | vSphere | |
|
||||
| .rate | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.root\| Gauge | GB | instance | Notific\ | Libvirt,| Size of root disk|
|
||||
| .size | | | ID | ation | Hyper-V | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
@ -236,38 +200,18 @@ The following meters are collected for OpenStack Compute.
|
||||
| incoming.\| lative| | ID | | Hyper-V | incoming bytes |
|
||||
| bytes | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of |
|
||||
| incoming.\| | | ID | | Hyper-V,| incoming bytes |
|
||||
| bytes.rate| | | | | vSphere,| |
|
||||
| | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Cumu\ | B | interface| Pollster | Libvirt,| Number of |
|
||||
| outgoing\ | lative| | ID | | Hyper-V | outgoing bytes |
|
||||
| .bytes | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of |
|
||||
| outgoing.\| | | ID | | Hyper-V,| outgoing bytes |
|
||||
| bytes.rate| | | | | vSphere,| |
|
||||
| | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of |
|
||||
| incoming\ | lative| ket | ID | | Hyper-V | incoming packets |
|
||||
| .packets | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Gauge | pack\| interface| Pollster | Libvirt,| Average rate of |
|
||||
| incoming\ | | et/s | ID | | Hyper-V,| incoming packets |
|
||||
| .packets\ | | | | | vSphere,| |
|
||||
| .rate | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of |
|
||||
| outgoing\ | lative| ket | ID | | Hyper-V | outgoing packets |
|
||||
| .packets | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Gauge | pac\ | interface| Pollster | Libvirt,| Average rate of |
|
||||
| outgoing\ | | ket/s| ID | | Hyper-V,| outgoing packets |
|
||||
| .packets\ | | | | | vSphere,| |
|
||||
| .rate | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| **Meters added in the Newton release** |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| cpu_l3_c\ | Gauge | B | instance | Pollster | Libvirt | L3 cache used b\ |
|
||||
@ -354,50 +298,6 @@ The following meters are collected for OpenStack Compute.
|
||||
To enable libvirt ``disk.*`` support when running on RBD-backed shared
|
||||
storage, you need to install libvirt version 1.2.16+.
|
||||
|
||||
The Telemetry service supports creating new meters by using transformers, but
|
||||
this is deprecated and discouraged to use. Among the meters gathered from
|
||||
libvirt and Hyper-V, there are a few which are derived from other meters. The
|
||||
list of meters that are created by using the ``rate_of_change`` transformer
|
||||
from the above table is the following:
|
||||
|
||||
- cpu_util
|
||||
|
||||
- cpu.delta
|
||||
|
||||
- disk.read.requests.rate
|
||||
|
||||
- disk.write.requests.rate
|
||||
|
||||
- disk.read.bytes.rate
|
||||
|
||||
- disk.write.bytes.rate
|
||||
|
||||
- disk.device.read.requests.rate
|
||||
|
||||
- disk.device.write.requests.rate
|
||||
|
||||
- disk.device.read.bytes.rate
|
||||
|
||||
- disk.device.write.bytes.rate
|
||||
|
||||
- network.incoming.bytes.rate
|
||||
|
||||
- network.outgoing.bytes.rate
|
||||
|
||||
- network.incoming.packets.rate
|
||||
|
||||
- network.outgoing.packets.rate
|
||||
|
||||
.. note::
|
||||
|
||||
If storing data in Gnocchi, derived rate_of_change metrics are also
|
||||
computed using Gnocchi in addition to Ceilometer transformers. It avoids
|
||||
missing data when Ceilometer services restart.
|
||||
To minimize Ceilometer memory requirements transformers can be disabled.
|
||||
These ``rate_of_change`` meters are deprecated and will be removed in
|
||||
default Ceilometer configuration in future release.
|
||||
|
||||
|
||||
OpenStack Compute is capable of collecting ``CPU`` related meters from
|
||||
the compute host machines. In order to use that you need to set the
|
||||
``compute_monitors`` option to ``cpu.virt_driver`` in the
|
||||
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
The support for transformers has been removed from the pipeline.
|
@ -222,14 +222,6 @@ ceilometer.compute.virt =
|
||||
ceilometer.hardware.inspectors =
|
||||
snmp = ceilometer.hardware.inspector.snmp:SNMPInspector
|
||||
|
||||
ceilometer.transformer =
|
||||
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
|
||||
delta = ceilometer.transformer.conversions:DeltaTransformer
|
||||
unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
|
||||
rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
|
||||
aggregator = ceilometer.transformer.conversions:AggregatorTransformer
|
||||
arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
|
||||
|
||||
ceilometer.sample.publisher =
|
||||
test = ceilometer.publisher.test:TestPublisher
|
||||
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher
|
||||
|
Loading…
Reference in New Issue
Block a user