Merge "pipeline: manager publish multiple counters"

This commit is contained in:
Jenkins 2013-02-21 02:57:43 +00:00 committed by Gerrit Code Review
commit 8bca547c0e
6 changed files with 123 additions and 131 deletions

View File

@ -47,12 +47,9 @@ class AgentManager(object):
context, context,
cfg.CONF.counter_source, cfg.CONF.counter_source,
) )
with publisher: with publisher as p:
LOG.info('Polling %s', ext.name) LOG.debug('Polling and publishing %s', ext.name)
for c in ext.obj.get_counters(manager, *args, **kwargs): p(ext.obj.get_counters(manager, *args, **kwargs))
LOG.debug('Publishing counter: %s', c)
publisher(c)
except Exception as err: except Exception as err:
LOG.warning('Continuing after error from %s: %s', LOG.warning('Continuing after error from %s: %s',
ext.name, err) ext.name, err)

View File

@ -109,16 +109,11 @@ class CollectorService(service.PeriodicService):
def _process_notification_for_ext(self, ext, notification): def _process_notification_for_ext(self, ext, notification):
handler = ext.obj handler = ext.obj
if notification['event_type'] in handler.get_event_types(): if notification['event_type'] in handler.get_event_types():
for c in handler.process_notification(notification):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread?
self.publish_counter(c)
def publish_counter(self, counter):
"""Create a metering message for the counter and publish it."""
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
self.pipeline_manager.publish_counter(ctxt, counter, with self.pipeline_manager.publisher(ctxt,
cfg.CONF.counter_source) cfg.CONF.counter_source) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(handler.process_notification(notification)))
def record_metering_data(self, context, data): def record_metering_data(self, context, data):
"""This method is triggered when metering data is """This method is triggered when metering data is

View File

@ -66,20 +66,22 @@ class TransformerExtensionManager(extension.ExtensionManager):
class Publisher(object): class Publisher(object):
def __init__(self, pipeline, context, source): def __init__(self, pipelines, context, source):
self.pipeline = pipeline self.pipelines = pipelines
self.context = context self.context = context
self.source = source self.source = source
def __enter__(self): def __enter__(self):
def p(counters): def p(counters):
return self.pipeline.publish_counters(self.context, for p in self.pipelines:
p.publish_counters(self.context,
counters, counters,
self.source) self.source)
return p return p
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
self.pipeline.flush(self.context, self.source) for p in self.pipelines:
p.flush(self.context, self.source)
class Pipeline(object): class Pipeline(object):
@ -241,14 +243,6 @@ class Pipeline(object):
LOG.audit("Pipeline %s: Published counters", self) LOG.audit("Pipeline %s: Published counters", self)
def publisher(self, context, source):
"""Build a new Publisher for this pipeline.
:param context: The context.
:param source: Counter source.
"""
return Publisher(self, context, source)
def publish_counter(self, ctxt, counter, source): def publish_counter(self, ctxt, counter, source):
self.publish_counters(ctxt, [counter], source) self.publish_counters(ctxt, [counter], source)
@ -285,9 +279,16 @@ class Pipeline(object):
LOG.audit("Flush pipeline %s", self) LOG.audit("Flush pipeline %s", self)
for (i, transformer) in enumerate(self.transformers): for (i, transformer) in enumerate(self.transformers):
try:
self._publish_counters(i + 1, ctxt, self._publish_counters(i + 1, ctxt,
list(transformer.flush(ctxt, source)), list(transformer.flush(ctxt, source)),
source) source)
except Exception as err:
LOG.warning(
"Pipeline %s: Error flushing "
"transformer %s",
self, transformer)
LOG.exception(err)
def get_interval(self): def get_interval(self):
return self.interval return self.interval
@ -353,24 +354,13 @@ class PipelineManager(object):
transformer_manager) transformer_manager)
for pipedef in cfg] for pipedef in cfg]
def pipelines_for_counter(self, counter_name): def publisher(self, context, source):
"""Get all pipelines that support counter""" """Build a new Publisher for these manager pipelines.
return [p for p in self.pipelines if p.support_counter(counter_name)]
def publish_counter(self, ctxt, counter, source):
"""Publish counter through pipelines
This is helpful to notification mechanism, so that they don't need
to maintain the private mapping cache from counter to pipelines.
For polling based data collector, they may need keep private
mapping cache for different interval support.
:param context: The context.
:param source: Counter source.
""" """
# TODO(yjiang5) Utilize a cache return Publisher(self.pipelines, context, source)
for p in self.pipelines:
if p.support_counter(counter.name):
p.publish_counter(ctxt, counter, source)
def setup_pipeline(publisher_manager): def setup_pipeline(publisher_manager):

View File

@ -189,6 +189,7 @@ class TestCollectorService(tests_base.TestCase):
# configuration. # configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'): with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start() self.srv.start()
self.srv.pipeline_manager.pipelines[0] = MagicMock()
self.srv.notification_manager = test_manager.TestExtensionManager( self.srv.notification_manager = test_manager.TestExtensionManager(
[extension.Extension('test', [extension.Extension('test',
None, None,
@ -197,4 +198,5 @@ class TestCollectorService(tests_base.TestCase):
), ),
]) ])
self.srv.process_notification(TEST_NOTICE) self.srv.process_notification(TEST_NOTICE)
assert self.srv.pipeline_manager.publish_counter.called self.assertTrue(
self.srv.pipeline_manager.publisher.called)

View File

@ -43,15 +43,22 @@ class FakeApp(object):
class TestSwiftMiddleware(base.TestCase): class TestSwiftMiddleware(base.TestCase):
class _faux_pipeline_manager(): class _faux_pipeline_manager(object):
class _faux_pipeline(object):
def __init__(self): def __init__(self):
self.counters = [] self.counters = []
def publish_counters(self, context, counters, source): def publish_counters(self, ctxt, counters, source):
self.counters.extend(counters) self.counters.extend(counters)
def flush(self, ctx, source):
pass
def __init__(self):
self.pipelines = [self._faux_pipeline()]
def publisher(self, context, source): def publisher(self, context, source):
return pipeline.Publisher(self, context, source) return pipeline.Publisher(self.pipelines, context, source)
def flush(self, context, source): def flush(self, context, source):
pass pass
@ -78,8 +85,9 @@ class TestSwiftMiddleware(base.TestCase):
environ={'REQUEST_METHOD': 'GET'}) environ={'REQUEST_METHOD': 'GET'})
resp = app(req.environ, self.start_response) resp = app(req.environ, self.start_response)
self.assertEqual(list(resp), ["This string is 28 bytes long"]) self.assertEqual(list(resp), ["This string is 28 bytes long"])
self.assertEqual(len(self.pipeline_manager.counters), 1) counters = self.pipeline_manager.pipelines[0].counters
data = self.pipeline_manager.counters[0] self.assertEqual(len(counters), 1)
data = counters[0]
self.assertEqual(data.volume, 28) self.assertEqual(data.volume, 28)
self.assertEqual(data.resource_metadata['version'], '1.0') self.assertEqual(data.resource_metadata['version'], '1.0')
self.assertEqual(data.resource_metadata['container'], 'container') self.assertEqual(data.resource_metadata['container'], 'container')
@ -92,8 +100,9 @@ class TestSwiftMiddleware(base.TestCase):
'wsgi.input': 'wsgi.input':
StringIO.StringIO('some stuff')}) StringIO.StringIO('some stuff')})
resp = list(app(req.environ, self.start_response)) resp = list(app(req.environ, self.start_response))
self.assertEqual(len(self.pipeline_manager.counters), 1) counters = self.pipeline_manager.pipelines[0].counters
data = self.pipeline_manager.counters[0] self.assertEqual(len(counters), 1)
data = counters[0]
self.assertEqual(data.volume, 10) self.assertEqual(data.volume, 10)
self.assertEqual(data.resource_metadata['version'], '1.0') self.assertEqual(data.resource_metadata['version'], '1.0')
self.assertEqual(data.resource_metadata['container'], 'container') self.assertEqual(data.resource_metadata['container'], 'container')
@ -106,8 +115,9 @@ class TestSwiftMiddleware(base.TestCase):
'wsgi.input': 'wsgi.input':
StringIO.StringIO('some other stuff')}) StringIO.StringIO('some other stuff')})
resp = list(app(req.environ, self.start_response)) resp = list(app(req.environ, self.start_response))
self.assertEqual(len(self.pipeline_manager.counters), 1) counters = self.pipeline_manager.pipelines[0].counters
data = self.pipeline_manager.counters[0] self.assertEqual(len(counters), 1)
data = counters[0]
self.assertEqual(data.volume, 16) self.assertEqual(data.volume, 16)
self.assertEqual(data.resource_metadata['version'], '1.0') self.assertEqual(data.resource_metadata['version'], '1.0')
self.assertEqual(data.resource_metadata['container'], 'container') self.assertEqual(data.resource_metadata['container'], 'container')
@ -118,8 +128,9 @@ class TestSwiftMiddleware(base.TestCase):
req = Request.blank('/1.0/account/container', req = Request.blank('/1.0/account/container',
environ={'REQUEST_METHOD': 'GET'}) environ={'REQUEST_METHOD': 'GET'})
resp = list(app(req.environ, self.start_response)) resp = list(app(req.environ, self.start_response))
self.assertEqual(len(self.pipeline_manager.counters), 1) counters = self.pipeline_manager.pipelines[0].counters
data = self.pipeline_manager.counters[0] self.assertEqual(len(counters), 1)
data = counters[0]
self.assertEqual(data.volume, 28) self.assertEqual(data.volume, 28)
self.assertEqual(data.resource_metadata['version'], '1.0') self.assertEqual(data.resource_metadata['version'], '1.0')
self.assertEqual(data.resource_metadata['container'], 'container') self.assertEqual(data.resource_metadata['container'], 'container')

View File

@ -220,26 +220,19 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['transformers'] = transformer_cfg self.pipeline_cfg[0]['transformers'] = transformer_cfg
self._exception_create_pipelinemanager() self._exception_create_pipelinemanager()
def test_pipelines_for_counter(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
self.assertTrue(len(pipeline_manager.pipelines_for_counter('a'))
== 1)
self.assertTrue(len(pipeline_manager.pipelines_for_counter('b'))
== 0)
def test_get_interval(self): def test_get_interval(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe = pipeline_manager.pipelines[0]
self.assertTrue(pipe.get_interval() == 5) self.assertTrue(pipe.get_interval() == 5)
def test_publisher_transformer_invoked(self): def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1)
@ -253,12 +246,15 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None) with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
pipe = pipeline_manager.pipelines_for_counter('b')[0]
self.test_counter = self.test_counter._replace(name='b') self.test_counter = self.test_counter._replace(name='b')
pipe.publish_counter(None, self.test_counter, None) with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 2) self.assertTrue(len(self.publisher.counters) == 2)
self.assertTrue(len(self.TransformerClass.samples) == 2) self.assertTrue(len(self.TransformerClass.samples) == 2)
@ -272,8 +268,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1)
@ -285,17 +281,15 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a') self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
self.assertTrue(len(pipe) == 0)
def test_wildcard_excluded_counters_not_excluded(self): def test_wildcard_excluded_counters_not_excluded(self):
counter_cfg = ['*', '!b'] counter_cfg = ['*', '!b']
self.pipeline_cfg[0]['counters'] = counter_cfg self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name") self.assertTrue(getattr(self.publisher.counters[0], "name")
@ -306,8 +300,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1)
@ -321,10 +315,9 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a') self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
self.assertTrue(len(pipe) == 0) self.assertTrue(pipeline_manager.pipelines[0].support_counter('b'))
pipe_1 = pipeline_manager.pipelines_for_counter('c') self.assertFalse(pipeline_manager.pipelines[0].support_counter('c'))
self.assertTrue(len(pipe_1) == 0)
def test_multiple_pipeline(self): def test_multiple_pipeline(self):
self.pipeline_cfg.append({ self.pipeline_cfg.append({
@ -343,13 +336,13 @@ class TestPipeline(base.TestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe_1 = pipeline_manager.pipelines_for_counter('b')[0]
pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b') self.test_counter = self.test_counter._replace(name='b')
pipe_1.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name") self.assertTrue(getattr(self.publisher.counters[0], "name")
@ -382,12 +375,14 @@ class TestPipeline(base.TestCase):
}) })
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe_1 = pipeline_manager.pipelines_for_counter('b')[0]
pipe.publish_counter(None, self.test_counter, None) with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.test_counter = self.test_counter._replace(name='b') self.test_counter = self.test_counter._replace(name='b')
pipe_1.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name") self.assertTrue(getattr(self.publisher.counters[0], "name")
@ -403,8 +398,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['transformers'] = None self.pipeline_cfg[0]['transformers'] = None
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a') self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a')
@ -412,8 +407,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['transformers'] = [] self.pipeline_cfg[0]['transformers'] = []
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a') self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a')
@ -430,8 +425,9 @@ class TestPipeline(base.TestCase):
] ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None) with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name') self.assertTrue(getattr(self.publisher.counters[0], 'name')
@ -461,8 +457,8 @@ class TestPipeline(base.TestCase):
] ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter])
self.assertTrue(len(self.TransformerClass.samples) == 2) self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name') self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
@ -496,8 +492,8 @@ class TestPipeline(base.TestCase):
] ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 0) self.assertTrue(len(self.publisher.counters) == 0)
self.assertTrue(len(self.TransformerClass.samples) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1)
@ -511,8 +507,9 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['publishers'] = ['test', 'new'] self.pipeline_cfg[0]['publishers'] = ['test', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None) with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.new_publisher.counters) == 1) self.assertTrue(len(self.new_publisher.counters) == 1)
@ -525,8 +522,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['publishers'] = ['except', 'new'] self.pipeline_cfg[0]['publishers'] = ['except', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter])
self.assertTrue(len(self.new_publisher.counters) == 1) self.assertTrue(len(self.new_publisher.counters) == 1)
self.assertTrue(getattr(self.new_publisher.counters[0], 'name') self.assertTrue(getattr(self.new_publisher.counters[0], 'name')
@ -536,12 +533,10 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = ['a', 'b'] self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe_1 = pipeline_manager.pipelines_for_counter('b')[0] p([self.test_counter,
self.assertTrue(pipe is pipe_1) self.test_counter._replace(name='b')])
pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b')
pipe_1.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 2) self.assertTrue(len(self.publisher.counters) == 2)
self.assertTrue(getattr(self.publisher.counters[0], 'name') self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update') == 'a_update')
@ -567,7 +562,7 @@ class TestPipeline(base.TestCase):
) )
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] pipe = pipeline_manager.pipelines[0]
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 0) self.assertTrue(len(self.publisher.counters) == 0)
@ -603,15 +598,16 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = ['a', 'b'] self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] with pipeline_manager.publisher(None, None) as p:
pipe.publish_counter(None, self.test_counter, None) p([self.test_counter,
self.test_counter = self.test_counter._replace(name='b') self.test_counter._replace(name='b')])
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 0) self.assertTrue(len(self.publisher.counters) == 0)
pipe.flush(None, None)
self.assertEqual(len(self.publisher.counters), 0) self.assertEqual(len(self.publisher.counters), 0)
pipe.publish_counter(None, self.test_counter, None)
pipe.flush(None, None) with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertEqual(len(self.publisher.counters), CACHE_SIZE) self.assertEqual(len(self.publisher.counters), CACHE_SIZE)
self.assertTrue(getattr(self.publisher.counters[0], 'name') self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new') == 'a_update_new')
@ -625,7 +621,7 @@ class TestPipeline(base.TestCase):
}) })
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0] pipe = pipeline_manager.pipelines[0]
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 0) self.assertTrue(len(self.publisher.counters) == 0)
@ -647,10 +643,11 @@ class TestPipeline(base.TestCase):
}, ] }, ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager) self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a:*')[0]
self.test_counter = self.test_counter._replace(name='a:b') self.test_counter = self.test_counter._replace(name='a:b')
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1) self.assertTrue(len(self.TransformerClass.samples) == 1)