From 21af2e30b3134ca2aa91fc01e1af2cd4bce28e0c Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Wed, 13 Feb 2013 13:46:07 +0100 Subject: [PATCH] pipeline: manager publish multiple counters This makes the polling agent publish all counters in a row. This fixes bug #1126990 and bug #1130475. This moves the publisher() method to the *manager*. No agent/middleware interacts with only one pipeline, this one an implementation mistake. Change-Id: I45246849830066e39491f762b457adbdfa8d0e2e Signed-off-by: Julien Danjou --- ceilometer/agent.py | 9 +- ceilometer/collector/service.py | 13 +- ceilometer/pipeline.py | 56 ++++----- tests/collector/test_manager.py | 4 +- tests/objectstore/test_swift_middleware.py | 39 +++--- tests/test_pipeline.py | 133 ++++++++++----------- 6 files changed, 123 insertions(+), 131 deletions(-) diff --git a/ceilometer/agent.py b/ceilometer/agent.py index 67cff5dc..b631b3e9 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -47,12 +47,9 @@ class AgentManager(object): context, cfg.CONF.counter_source, ) - with publisher: - LOG.info('Polling %s', ext.name) - for c in ext.obj.get_counters(manager, *args, **kwargs): - LOG.debug('Publishing counter: %s', c) - publisher(c) - + with publisher as p: + LOG.debug('Polling and publishing %s', ext.name) + p(ext.obj.get_counters(manager, *args, **kwargs)) except Exception as err: LOG.warning('Continuing after error from %s: %s', ext.name, err) diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index e38c6788..c6237cd3 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -109,16 +109,11 @@ class CollectorService(service.PeriodicService): def _process_notification_for_ext(self, ext, notification): handler = ext.obj if notification['event_type'] in handler.get_event_types(): - for c in handler.process_notification(notification): - LOG.info('COUNTER: %s', c) + ctxt = context.get_admin_context() + with self.pipeline_manager.publisher(ctxt, + cfg.CONF.counter_source) as p: # FIXME(dhellmann): Spawn green thread? - self.publish_counter(c) - - def publish_counter(self, counter): - """Create a metering message for the counter and publish it.""" - ctxt = context.get_admin_context() - self.pipeline_manager.publish_counter(ctxt, counter, - cfg.CONF.counter_source) + p(list(handler.process_notification(notification))) def record_metering_data(self, context, data): """This method is triggered when metering data is diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index a091e531..5fcf4541 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -66,20 +66,22 @@ class TransformerExtensionManager(extension.ExtensionManager): class Publisher(object): - def __init__(self, pipeline, context, source): - self.pipeline = pipeline + def __init__(self, pipelines, context, source): + self.pipelines = pipelines self.context = context self.source = source def __enter__(self): def p(counters): - return self.pipeline.publish_counters(self.context, - counters, - self.source) + for p in self.pipelines: + p.publish_counters(self.context, + counters, + self.source) return p def __exit__(self, exc_type, exc_value, traceback): - self.pipeline.flush(self.context, self.source) + for p in self.pipelines: + p.flush(self.context, self.source) class Pipeline(object): @@ -241,14 +243,6 @@ class Pipeline(object): LOG.audit("Pipeline %s: Published counters", self) - def publisher(self, context, source): - """Build a new Publisher for this pipeline. - - :param context: The context. - :param source: Counter source. - """ - return Publisher(self, context, source) - def publish_counter(self, ctxt, counter, source): self.publish_counters(ctxt, [counter], source) @@ -285,9 +279,16 @@ class Pipeline(object): LOG.audit("Flush pipeline %s", self) for (i, transformer) in enumerate(self.transformers): - self._publish_counters(i + 1, ctxt, - list(transformer.flush(ctxt, source)), - source) + try: + self._publish_counters(i + 1, ctxt, + list(transformer.flush(ctxt, source)), + source) + except Exception as err: + LOG.warning( + "Pipeline %s: Error flushing " + "transformer %s", + self, transformer) + LOG.exception(err) def get_interval(self): return self.interval @@ -353,24 +354,13 @@ class PipelineManager(object): transformer_manager) for pipedef in cfg] - def pipelines_for_counter(self, counter_name): - """Get all pipelines that support counter""" - return [p for p in self.pipelines if p.support_counter(counter_name)] - - def publish_counter(self, ctxt, counter, source): - """Publish counter through pipelines - - This is helpful to notification mechanism, so that they don't need - to maintain the private mapping cache from counter to pipelines. - - For polling based data collector, they may need keep private - mapping cache for different interval support. + def publisher(self, context, source): + """Build a new Publisher for these manager pipelines. + :param context: The context. + :param source: Counter source. """ - # TODO(yjiang5) Utilize a cache - for p in self.pipelines: - if p.support_counter(counter.name): - p.publish_counter(ctxt, counter, source) + return Publisher(self.pipelines, context, source) def setup_pipeline(publisher_manager): diff --git a/tests/collector/test_manager.py b/tests/collector/test_manager.py index a30760ac..88b308ed 100644 --- a/tests/collector/test_manager.py +++ b/tests/collector/test_manager.py @@ -189,6 +189,7 @@ class TestCollectorService(tests_base.TestCase): # configuration. with patch('ceilometer.openstack.common.rpc.create_connection'): self.srv.start() + self.srv.pipeline_manager.pipelines[0] = MagicMock() self.srv.notification_manager = test_manager.TestExtensionManager( [extension.Extension('test', None, @@ -197,4 +198,5 @@ class TestCollectorService(tests_base.TestCase): ), ]) self.srv.process_notification(TEST_NOTICE) - assert self.srv.pipeline_manager.publish_counter.called + self.assertTrue( + self.srv.pipeline_manager.publisher.called) diff --git a/tests/objectstore/test_swift_middleware.py b/tests/objectstore/test_swift_middleware.py index 9b513371..586b8a70 100644 --- a/tests/objectstore/test_swift_middleware.py +++ b/tests/objectstore/test_swift_middleware.py @@ -43,15 +43,22 @@ class FakeApp(object): class TestSwiftMiddleware(base.TestCase): - class _faux_pipeline_manager(): - def __init__(self): - self.counters = [] + class _faux_pipeline_manager(object): + class _faux_pipeline(object): + def __init__(self): + self.counters = [] - def publish_counters(self, context, counters, source): - self.counters.extend(counters) + def publish_counters(self, ctxt, counters, source): + self.counters.extend(counters) + + def flush(self, ctx, source): + pass + + def __init__(self): + self.pipelines = [self._faux_pipeline()] def publisher(self, context, source): - return pipeline.Publisher(self, context, source) + return pipeline.Publisher(self.pipelines, context, source) def flush(self, context, source): pass @@ -78,8 +85,9 @@ class TestSwiftMiddleware(base.TestCase): environ={'REQUEST_METHOD': 'GET'}) resp = app(req.environ, self.start_response) self.assertEqual(list(resp), ["This string is 28 bytes long"]) - self.assertEqual(len(self.pipeline_manager.counters), 1) - data = self.pipeline_manager.counters[0] + counters = self.pipeline_manager.pipelines[0].counters + self.assertEqual(len(counters), 1) + data = counters[0] self.assertEqual(data.volume, 28) self.assertEqual(data.resource_metadata['version'], '1.0') self.assertEqual(data.resource_metadata['container'], 'container') @@ -92,8 +100,9 @@ class TestSwiftMiddleware(base.TestCase): 'wsgi.input': StringIO.StringIO('some stuff')}) resp = list(app(req.environ, self.start_response)) - self.assertEqual(len(self.pipeline_manager.counters), 1) - data = self.pipeline_manager.counters[0] + counters = self.pipeline_manager.pipelines[0].counters + self.assertEqual(len(counters), 1) + data = counters[0] self.assertEqual(data.volume, 10) self.assertEqual(data.resource_metadata['version'], '1.0') self.assertEqual(data.resource_metadata['container'], 'container') @@ -106,8 +115,9 @@ class TestSwiftMiddleware(base.TestCase): 'wsgi.input': StringIO.StringIO('some other stuff')}) resp = list(app(req.environ, self.start_response)) - self.assertEqual(len(self.pipeline_manager.counters), 1) - data = self.pipeline_manager.counters[0] + counters = self.pipeline_manager.pipelines[0].counters + self.assertEqual(len(counters), 1) + data = counters[0] self.assertEqual(data.volume, 16) self.assertEqual(data.resource_metadata['version'], '1.0') self.assertEqual(data.resource_metadata['container'], 'container') @@ -118,8 +128,9 @@ class TestSwiftMiddleware(base.TestCase): req = Request.blank('/1.0/account/container', environ={'REQUEST_METHOD': 'GET'}) resp = list(app(req.environ, self.start_response)) - self.assertEqual(len(self.pipeline_manager.counters), 1) - data = self.pipeline_manager.counters[0] + counters = self.pipeline_manager.pipelines[0].counters + self.assertEqual(len(counters), 1) + data = counters[0] self.assertEqual(data.volume, 28) self.assertEqual(data.resource_metadata['version'], '1.0') self.assertEqual(data.resource_metadata['container'], 'container') diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index d0d95045..27e466f0 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -220,26 +220,19 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['transformers'] = transformer_cfg self._exception_create_pipelinemanager() - def test_pipelines_for_counter(self): - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.publisher_manager) - self.assertTrue(len(pipeline_manager.pipelines_for_counter('a')) - == 1) - self.assertTrue(len(pipeline_manager.pipelines_for_counter('b')) - == 0) - def test_get_interval(self): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] + + pipe = pipeline_manager.pipelines[0] self.assertTrue(pipe.get_interval() == 5) def test_publisher_transformer_invoked(self): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1) @@ -253,12 +246,15 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) + self.assertTrue(len(self.publisher.counters) == 1) - pipe = pipeline_manager.pipelines_for_counter('b')[0] + self.test_counter = self.test_counter._replace(name='b') - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 2) self.assertTrue(len(self.TransformerClass.samples) == 2) @@ -272,8 +268,8 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1) @@ -285,17 +281,15 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a') - self.assertTrue(len(pipe) == 0) + self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) def test_wildcard_excluded_counters_not_excluded(self): counter_cfg = ['*', '!b'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) - + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1) self.assertTrue(getattr(self.publisher.counters[0], "name") @@ -306,8 +300,8 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1) @@ -321,10 +315,9 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a') - self.assertTrue(len(pipe) == 0) - pipe_1 = pipeline_manager.pipelines_for_counter('c') - self.assertTrue(len(pipe_1) == 0) + self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) + self.assertTrue(pipeline_manager.pipelines[0].support_counter('b')) + self.assertFalse(pipeline_manager.pipelines[0].support_counter('c')) def test_multiple_pipeline(self): self.pipeline_cfg.append({ @@ -343,13 +336,13 @@ class TestPipeline(base.TestCase): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe_1 = pipeline_manager.pipelines_for_counter('b')[0] - - pipe.publish_counter(None, self.test_counter, None) self.test_counter = self.test_counter._replace(name='b') - pipe_1.publish_counter(None, self.test_counter, None) + + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(getattr(self.publisher.counters[0], "name") @@ -382,12 +375,14 @@ class TestPipeline(base.TestCase): }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe_1 = pipeline_manager.pipelines_for_counter('b')[0] - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) + self.test_counter = self.test_counter._replace(name='b') - pipe_1.publish_counter(None, self.test_counter, None) + + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(getattr(self.publisher.counters[0], "name") @@ -403,8 +398,8 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['transformers'] = None pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a') @@ -412,8 +407,8 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['transformers'] = [] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a') @@ -430,8 +425,9 @@ class TestPipeline(base.TestCase): ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(getattr(self.publisher.counters[0], 'name') @@ -461,8 +457,8 @@ class TestPipeline(base.TestCase): ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.TransformerClass.samples) == 2) self.assertTrue(getattr(self.TransformerClass.samples[0], 'name') @@ -496,8 +492,8 @@ class TestPipeline(base.TestCase): ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 0) self.assertTrue(len(self.TransformerClass.samples) == 1) @@ -511,8 +507,9 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['publishers'] = ['test', 'new'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.new_publisher.counters) == 1) @@ -525,8 +522,8 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['publishers'] = ['except', 'new'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.new_publisher.counters) == 1) self.assertTrue(getattr(self.new_publisher.counters[0], 'name') @@ -536,12 +533,10 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe_1 = pipeline_manager.pipelines_for_counter('b')[0] - self.assertTrue(pipe is pipe_1) - pipe.publish_counter(None, self.test_counter, None) - self.test_counter = self.test_counter._replace(name='b') - pipe_1.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter, + self.test_counter._replace(name='b')]) + self.assertTrue(len(self.publisher.counters) == 2) self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a_update') @@ -567,7 +562,7 @@ class TestPipeline(base.TestCase): ) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] + pipe = pipeline_manager.pipelines[0] pipe.publish_counter(None, self.test_counter, None) self.assertTrue(len(self.publisher.counters) == 0) @@ -603,15 +598,16 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] - pipe.publish_counter(None, self.test_counter, None) - self.test_counter = self.test_counter._replace(name='b') - pipe.publish_counter(None, self.test_counter, None) + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter, + self.test_counter._replace(name='b')]) + self.assertTrue(len(self.publisher.counters) == 0) - pipe.flush(None, None) self.assertEqual(len(self.publisher.counters), 0) - pipe.publish_counter(None, self.test_counter, None) - pipe.flush(None, None) + + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) + self.assertEqual(len(self.publisher.counters), CACHE_SIZE) self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a_update_new') @@ -625,7 +621,7 @@ class TestPipeline(base.TestCase): }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a')[0] + pipe = pipeline_manager.pipelines[0] pipe.publish_counter(None, self.test_counter, None) self.assertTrue(len(self.publisher.counters) == 0) @@ -647,10 +643,11 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.publisher_manager) - pipe = pipeline_manager.pipelines_for_counter('a:*')[0] self.test_counter = self.test_counter._replace(name='a:b') - pipe.publish_counter(None, self.test_counter, None) + + with pipeline_manager.publisher(None, None) as p: + p([self.test_counter]) self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1)