Remove 'counter' occurences in pipeline

Change-Id: I6f8c93b30c068811e88f7afcbebb45e61606ffd2
Blueprint: remove-counter
This commit is contained in:
Julien Danjou 2013-08-12 15:37:43 +02:00
parent de22ede604
commit 1d9f006959
14 changed files with 137 additions and 137 deletions

View File

@ -28,7 +28,7 @@ LOG = log.getLogger(__name__)
class PollingTask(object):
"""Polling task for polling counters and inject into pipeline.
"""Polling task for polling samples and inject into pipeline.
A polling task can be invoked periodically or only once.
"""
@ -45,7 +45,7 @@ class PollingTask(object):
@abc.abstractmethod
def poll_and_publish(self):
"""Polling counter and publish into pipeline."""
"""Polling sample and publish into pipeline."""
class AgentManager(object):
@ -70,7 +70,7 @@ class AgentManager(object):
for pipeline, pollster in itertools.product(
self.pipeline_manager.pipelines,
self.pollster_manager.extensions):
if pipeline.support_counter(pollster.name):
if pipeline.support_meter(pollster.name):
polling_task = polling_tasks.get(pipeline.interval, None)
if not polling_task:
polling_task = self.create_polling_task()

View File

@ -100,19 +100,19 @@ class CeilometerMiddleware(object):
bytes_sent += len(chunk)
yield chunk
finally:
self.publish_counter(env,
input_proxy.bytes_received,
bytes_sent)
self.publish_sample(env,
input_proxy.bytes_received,
bytes_sent)
try:
iterable = self.app(env, my_start_response)
except Exception:
self.publish_counter(env, input_proxy.bytes_received, 0)
self.publish_sample(env, input_proxy.bytes_received, 0)
raise
else:
return iter_response(iterable)
def publish_counter(self, env, bytes_received, bytes_sent):
def publish_sample(self, env, bytes_received, bytes_sent):
req = REQUEST.Request(env)
version, account, container, obj = split_path(req.path, 1, 4, True)
now = timeutils.utcnow().isoformat()

View File

@ -18,6 +18,7 @@
import itertools
import os
import operator
from oslo.config import cfg
import yaml
@ -57,10 +58,10 @@ class PublishContext(object):
self.pipelines.update(pipelines)
def __enter__(self):
def p(counters):
def p(samples):
for p in self.pipelines:
p.publish_counters(self.context,
counters)
p.publish_samples(self.context,
samples)
return p
def __exit__(self, exc_type, exc_value, traceback):
@ -74,9 +75,9 @@ class Pipeline(object):
Pipeline describes a chain of handlers. The chain starts with
tranformer and ends with one or more publishers.
The first transformer in the chain gets counter from data collector, i.e.
The first transformer in the chain gets sample from data collector, i.e.
pollster or notification handler, takes some action like dropping,
aggregation, changing field etc, then passes the updated counter
aggregation, changing field etc, then passes the updated sample
to next step.
The subsequent transformers, if any, handle the data similarly.
@ -85,7 +86,7 @@ class Pipeline(object):
method depends on publisher type, for example, pushing into data storage
through message bus, sending to external CW software through CW API call.
If no transformer is included in the chain, the publishers get counters
If no transformer is included in the chain, the publishers get samples
from data collector and publish them directly.
"""
@ -99,7 +100,8 @@ class Pipeline(object):
self.interval = int(cfg['interval'])
except ValueError:
raise PipelineException("Invalid interval value", cfg)
self.counters = cfg['counters']
# Support 'counters' for backward compatibility
self.meters = cfg.get('meters', cfg.get('counters'))
# It's legal to have no transformer specified
self.transformer_cfg = cfg['transformers'] or []
except KeyError as err:
@ -109,7 +111,7 @@ class Pipeline(object):
if self.interval <= 0:
raise PipelineException("Interval value should > 0", cfg)
self._check_counters()
self._check_meters()
if not cfg.get('publishers'):
raise PipelineException("No publisher specified", cfg)
@ -129,28 +131,28 @@ class Pipeline(object):
def __str__(self):
return self.name
def _check_counters(self):
"""Counter rules checking
def _check_meters(self):
"""Meter rules checking
At least one meaningful counter exist
Included type and excluded type counter can't co-exist at
At least one meaningful meter exist
Included type and excluded type meter can't co-exist at
the same pipeline
Included type counter and wildcard can't co-exist at same pipeline
Included type meter and wildcard can't co-exist at same pipeline
"""
counters = self.counters
if not counters:
raise PipelineException("No counter specified", self.cfg)
meters = self.meters
if not meters:
raise PipelineException("No meter specified", self.cfg)
if [x for x in counters if x[0] not in '!*'] and \
[x for x in counters if x[0] == '!']:
if [x for x in meters if x[0] not in '!*'] and \
[x for x in meters if x[0] == '!']:
raise PipelineException(
"Both included and excluded counters specified",
"Both included and excluded meters specified",
cfg)
if '*' in counters and [x for x in counters if x[0] not in '!*']:
if '*' in meters and [x for x in meters if x[0] not in '!*']:
raise PipelineException(
"Included counters specified with wildcard",
"Included meters specified with wildcard",
self.cfg)
def _setup_transformers(self, cfg, transformer_manager):
@ -173,90 +175,90 @@ class Pipeline(object):
return transformers
def _transform_counter(self, start, ctxt, counter):
def _transform_sample(self, start, ctxt, sample):
try:
for transformer in self.transformers[start:]:
counter = transformer.handle_sample(ctxt, counter)
if not counter:
LOG.debug("Pipeline %s: Counter dropped by transformer %s",
sample = transformer.handle_sample(ctxt, sample)
if not sample:
LOG.debug("Pipeline %s: Sample dropped by transformer %s",
self, transformer)
return
return counter
return sample
except Exception as err:
LOG.warning("Pipeline %s: Exit after error from transformer"
"%s for %s",
self, transformer, counter)
self, transformer, sample)
LOG.exception(err)
def _publish_counters(self, start, ctxt, counters):
"""Push counter into pipeline for publishing.
def _publish_samples(self, start, ctxt, samples):
"""Push samples into pipeline for publishing.
param start: the first transformer that the counter will be injected.
This is mainly for flush() invocation that transformer
may emit counters
param ctxt: execution context from the manager or service
param counters: counter list
:param start: The first transformer that the sample will be injected.
This is mainly for flush() invocation that transformer
may emit samples.
:param ctxt: Execution context from the manager or service.
:param samples: Sample list.
"""
transformed_counters = []
for counter in counters:
LOG.debug("Pipeline %s: Transform counter %s from %s transformer",
self, counter, start)
counter = self._transform_counter(start, ctxt, counter)
if counter:
transformed_counters.append(counter)
transformed_samples = []
for sample in samples:
LOG.debug("Pipeline %s: Transform sample %s from %s transformer",
self, sample, start)
sample = self._transform_sample(start, ctxt, sample)
if sample:
transformed_samples.append(sample)
LOG.audit("Pipeline %s: Publishing counters", self)
LOG.audit("Pipeline %s: Publishing samples", self)
for p in self.publishers:
try:
p.publish_counters(ctxt, transformed_counters)
p.publish_samples(ctxt, transformed_samples)
except Exception:
LOG.exception("Pipeline %s: Continue after error "
"from publisher %s", self, p)
LOG.audit("Pipeline %s: Published counters", self)
LOG.audit("Pipeline %s: Published samples", self)
def publish_counter(self, ctxt, counter):
self.publish_counters(ctxt, [counter])
def publish_sample(self, ctxt, sample):
self.publish_samples(ctxt, [sample])
def publish_counters(self, ctxt, counters):
for counter_name, counters in itertools.groupby(
sorted(counters, key=lambda c: c.name),
lambda c: c.name):
if self.support_counter(counter_name):
self._publish_counters(0, ctxt, counters)
def publish_samples(self, ctxt, samples):
for meter_name, samples in itertools.groupby(
sorted(samples, key=operator.attrgetter('name')),
operator.attrgetter('name')):
if self.support_meter(meter_name):
self._publish_samples(0, ctxt, samples)
# (yjiang5) To support counters like instance:m1.tiny,
# (yjiang5) To support meters like instance:m1.tiny,
# which include variable part at the end starting with ':'.
# Hope we will not add such counters in future.
def _variable_counter_name(self, name):
# Hope we will not add such meters in future.
def _variable_meter_name(self, name):
m = name.partition(':')
if m[1] == ':':
return m[1].join((m[0], '*'))
else:
return name
def support_counter(self, counter_name):
counter_name = self._variable_counter_name(counter_name)
if ('!' + counter_name) in self.counters:
def support_meter(self, meter_name):
meter_name = self._variable_meter_name(meter_name)
if ('!' + meter_name) in self.meters:
return False
if '*' in self.counters:
if '*' in self.meters:
return True
elif self.counters[0][0] == '!':
return not ('!' + counter_name) in self.counters
elif self.meters[0][0] == '!':
return not ('!' + meter_name) in self.meters
else:
return counter_name in self.counters
return meter_name in self.meters
def flush(self, ctxt):
"""Flush data after all counter have been injected to pipeline."""
"""Flush data after all samples have been injected to pipeline."""
LOG.audit("Flush pipeline %s", self)
for (i, transformer) in enumerate(self.transformers):
try:
self._publish_counters(i + 1, ctxt,
list(transformer.flush(ctxt)))
self._publish_samples(i + 1, ctxt,
list(transformer.flush(ctxt)))
except Exception as err:
LOG.warning(
"Pipeline %s: Error flushing "
@ -283,12 +285,12 @@ class PipelineManager(object):
The top of the cfg is a list of pipeline definitions.
Pipeline definition is an dictionary specifying the target counters,
Pipeline definition is an dictionary specifying the target samples,
the tranformers involved, and the target publishers:
{
"name": pipeline_name
"interval": interval_time
"counters" : ["counter_1", "counter_2"],
"meters" : ["meter_1", "meter_2"],
"tranformers":[
{"name": "Transformer_1",
"parameters": {"p1": "value"}},
@ -299,20 +301,19 @@ class PipelineManager(object):
"publishers": ["publisher_1", "publisher_2"]
}
Interval is how many seconds should the counters be injected to
Interval is how many seconds should the samples be injected to
the pipeline.
Valid counter format is '*', '!counter_name', or 'counter_name'.
'*' is wildcard symbol means any counters; '!counter_name' means
"counter_name" will be excluded; 'counter_name' means 'counter_name'
Valid meter format is '*', '!meter_name', or 'meter_name'.
'*' is wildcard symbol means any meters; '!meter_name' means
"meter_name" will be excluded; 'meter_name' means 'meter_name'
will be included.
The 'counter_name" is Counter namedtuple's name field. For counter
names with variable like "instance:m1.tiny", it's "instance:*", as
returned by get_counter_list().
The 'meter_name" is Sample name field. For meter names with
variable like "instance:m1.tiny", it's "instance:*".
Valid counters definition is all "included counter names", all
"excluded counter names", wildcard and "excluded counter names", or
Valid meters definition is all "included meter names", all
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
Transformer's name is plugin name in setup.py.
@ -327,7 +328,6 @@ class PipelineManager(object):
"""Build a new Publisher for these manager pipelines.
:param context: The context.
:param source: Counter source.
"""
return PublishContext(context, self.pipelines)

View File

@ -43,5 +43,5 @@ class PublisherBase(object):
pass
@abc.abstractmethod
def publish_counters(self, context, counters):
def publish_samples(self, context, samples):
"Publish counters into final conduit."

View File

@ -86,7 +86,7 @@ class FilePublisher(publisher.PublisherBase):
rfh.setLevel(logging.INFO)
self.publisher_logger.addHandler(rfh)
def publish_counters(self, context, counters):
def publish_samples(self, context, counters):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call

View File

@ -136,7 +136,7 @@ class RPCPublisher(publisher.PublisherBase):
% self.policy)
self.policy = 'default'
def publish_counters(self, context, counters):
def publish_samples(self, context, counters):
"""Publish counters on RPC.
:param context: Execution context from the service or RPC call.

View File

@ -27,7 +27,7 @@ class TestPublisher(publisher.PublisherBase):
def __init__(self, parsed_url):
self.counters = []
def publish_counters(self, context, counters):
def publish_samples(self, context, counters):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call

View File

@ -41,7 +41,7 @@ class UDPPublisher(publisher.PublisherBase):
self.socket = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM)
def publish_counters(self, context, counters):
def publish_samples(self, context, counters):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call

View File

@ -2,7 +2,7 @@
-
name: meter_pipeline
interval: 600
counters:
meters:
- "*"
transformers:
publishers:
@ -10,7 +10,7 @@
-
name: cpu_pipeline
interval: 600
counters:
meters:
- "cpu"
transformers:
- name: "rate_of_change"

View File

@ -49,7 +49,7 @@ class TestSwiftMiddleware(base.TestCase):
self.pipeline_manager = pipeline_manager
self.counters = []
def publish_counters(self, ctxt, counters):
def publish_samples(self, ctxt, counters):
self.counters.extend(counters)
def flush(self, context):

View File

@ -71,8 +71,8 @@ class TestFilePublisher(base.TestCase):
parsed_url = urlsplit(
'file:///tmp/log_file?max_bytes=50&backup_count=3')
publisher = file.FilePublisher(parsed_url)
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
handler = publisher.publisher_logger.handlers[0]
self.assertTrue(isinstance(handler,
@ -87,8 +87,8 @@ class TestFilePublisher(base.TestCase):
parsed_url = urlsplit(
'file:///tmp/log_file_plain')
publisher = file.FilePublisher(parsed_url)
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
handler = publisher.publisher_logger.handlers[0]
self.assertTrue(isinstance(handler,
@ -104,7 +104,7 @@ class TestFilePublisher(base.TestCase):
parsed_url = urlsplit(
'file:///tmp/log_file_bad?max_bytes=yus&backup_count=5y')
publisher = file.FilePublisher(parsed_url)
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertIsNone(publisher.publisher_logger)

View File

@ -220,8 +220,8 @@ class TestPublish(base.TestCase):
def test_published(self):
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://'))
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.published), 1)
self.assertEqual(self.published[0][0],
cfg.CONF.publisher_rpc.metering_topic)
@ -232,8 +232,8 @@ class TestPublish(base.TestCase):
def test_publish_target(self):
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?target=custom_procedure_call'))
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.published), 1)
self.assertEqual(self.published[0][0],
cfg.CONF.publisher_rpc.metering_topic)
@ -244,8 +244,8 @@ class TestPublish(base.TestCase):
def test_published_with_per_meter_topic(self):
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?per_meter_topic=1'))
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.published), 4)
for topic, rpc_call in self.published:
meters = rpc_call['args']['data']
@ -271,7 +271,7 @@ class TestPublish(base.TestCase):
network_utils.urlsplit('rpc://'))
self.assertRaises(
SystemExit,
publisher.publish_counters,
publisher.publish_samples,
None, self.test_data)
self.assertEqual(publisher.policy, 'default')
self.assertEqual(len(self.published), 0)
@ -283,7 +283,7 @@ class TestPublish(base.TestCase):
network_utils.urlsplit('rpc://?policy=default'))
self.assertRaises(
SystemExit,
publisher.publish_counters,
publisher.publish_samples,
None, self.test_data)
self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 0)
@ -294,7 +294,7 @@ class TestPublish(base.TestCase):
network_utils.urlsplit('rpc://?policy=notexist'))
self.assertRaises(
SystemExit,
publisher.publish_counters,
publisher.publish_samples,
None, self.test_data)
self.assertEqual(publisher.policy, 'default')
self.assertEqual(len(self.published), 0)
@ -304,8 +304,8 @@ class TestPublish(base.TestCase):
self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=drop'))
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 0)
@ -313,8 +313,8 @@ class TestPublish(base.TestCase):
self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue'))
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 1)
@ -322,14 +322,14 @@ class TestPublish(base.TestCase):
self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue'))
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 1)
self.rpc_unreachable = False
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.published), 2)
self.assertEqual(len(publisher.local_queue), 0)
@ -341,8 +341,8 @@ class TestPublish(base.TestCase):
for i in range(0, 5):
for s in self.test_data:
s.source = 'test-%d' % i
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 3)
self.assertEqual(
@ -365,8 +365,8 @@ class TestPublish(base.TestCase):
for i in range(0, 2000):
for s in self.test_data:
s.source = 'test-%d' % i
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.published), 0)
self.assertEqual(len(publisher.local_queue), 1024)
self.assertEqual(

View File

@ -113,8 +113,8 @@ class TestUDPPublisher(base.TestCase):
self._make_fake_socket(self.data_sent)):
publisher = udp.UDPPublisher(
network_utils.urlsplit('udp://somehost'))
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)
self.assertEqual(len(self.data_sent), 5)
@ -146,5 +146,5 @@ class TestUDPPublisher(base.TestCase):
self._make_broken_socket):
publisher = udp.UDPPublisher(
network_utils.urlsplit('udp://localhost'))
publisher.publish_counters(None,
self.test_data)
publisher.publish_samples(None,
self.test_data)

View File

@ -67,7 +67,7 @@ class TestPipeline(base.TestCase):
return fake_drivers[url](url)
class PublisherClassException(publisher.PublisherBase):
def publish_counters(self, ctxt, counters):
def publish_samples(self, ctxt, counters):
raise Exception()
class TransformerClass(transformer.TransformerBase):
@ -274,7 +274,7 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
def test_wildcard_excluded_counters_not_excluded(self):
counter_cfg = ['*', '!b']
@ -309,9 +309,9 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
self.assertTrue(pipeline_manager.pipelines[0].support_counter('b'))
self.assertFalse(pipeline_manager.pipelines[0].support_counter('c'))
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
self.assertTrue(pipeline_manager.pipelines[0].support_meter('b'))
self.assertFalse(pipeline_manager.pipelines[0].support_meter('c'))
def test_multiple_pipeline(self):
self.pipeline_cfg.append({
@ -594,16 +594,16 @@ class TestPipeline(base.TestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_counter(None, self.test_counter)
pipe.publish_sample(None, self.test_counter)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0)
pipe.flush(None)
self.assertEqual(len(publisher.counters), 0)
pipe.publish_counter(None, self.test_counter)
pipe.publish_sample(None, self.test_counter)
pipe.flush(None)
self.assertEqual(len(publisher.counters), 0)
for i in range(CACHE_SIZE - 2):
pipe.publish_counter(None, self.test_counter)
pipe.publish_sample(None, self.test_counter)
pipe.flush(None)
self.assertEqual(len(publisher.counters), CACHE_SIZE)
self.assertTrue(getattr(publisher.counters[0], 'name')
@ -665,7 +665,7 @@ class TestPipeline(base.TestCase):
pipe = pipeline_manager.pipelines[0]
publisher = pipe.publishers[0]
pipe.publish_counter(None, self.test_counter)
pipe.publish_sample(None, self.test_counter)
self.assertEqual(len(publisher.counters), 0)
pipe.flush(None)
self.assertEqual(len(publisher.counters), 1)
@ -741,7 +741,7 @@ class TestPipeline(base.TestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters)
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
pipe.flush(None)
@ -794,7 +794,7 @@ class TestPipeline(base.TestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters)
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 2)
core_temp = publisher.counters[1]
@ -883,7 +883,7 @@ class TestPipeline(base.TestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters)
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 2)
pipe.flush(None)
@ -967,7 +967,7 @@ class TestPipeline(base.TestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_counters(None, counters)
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0)
pipe.flush(None)