From a5785c1e9be9a2187de10a402a7b45952bc0a268 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Tue, 4 Jun 2013 17:10:45 +0200 Subject: [PATCH] pipeline: switch publisher loading model to driver This change modify the loading of the publisher so it's driver based, like our current storage model. That will allow to use the same publisher multiple times with different parameters in the future. This change also adds a new publisher called 'test' that comes from a factorisation of some of the testing code. Blueprint: pipeline-publisher-url Change-Id: Ie26beac213383fff2db759e2df216ba1f454ef9c Signed-off-by: Julien Danjou --- bin/ceilometer-send-counter | 4 - ceilometer/agent.py | 4 - ceilometer/api/hooks.py | 5 +- ceilometer/collector/service.py | 4 - ceilometer/notifier.py | 4 - ceilometer/objectstore/swift_middleware.py | 4 - ceilometer/pipeline.py | 58 ++--- ceilometer/publisher/__init__.py | 16 +- ceilometer/publisher/test.py | 37 +++ setup.cfg | 1 + tests/agentbase.py | 48 +--- tests/compute/test_manager.py | 3 +- tests/objectstore/test_swift_middleware.py | 2 +- tests/test_notifier.py | 31 +-- tests/test_pipeline.py | 269 +++++++++------------ 15 files changed, 200 insertions(+), 290 deletions(-) create mode 100644 ceilometer/publisher/test.py diff --git a/bin/ceilometer-send-counter b/bin/ceilometer-send-counter index 9cb93ea8..a5eefd86 100755 --- a/bin/ceilometer-send-counter +++ b/bin/ceilometer-send-counter @@ -28,7 +28,6 @@ from stevedore import dispatch from ceilometer import counter from ceilometer import pipeline -from ceilometer import publisher from ceilometer import service from ceilometer import transformer from ceilometer.openstack.common import timeutils @@ -87,9 +86,6 @@ pipeline_manager = pipeline.setup_pipeline( transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) with pipeline_manager.publisher(context.get_admin_context(), diff --git a/ceilometer/agent.py b/ceilometer/agent.py index bc215fb8..af72576b 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -24,7 +24,6 @@ from oslo.config import cfg from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer import pipeline -from ceilometer import publisher from ceilometer import transformer LOG = log.getLogger(__name__) @@ -57,9 +56,6 @@ class AgentManager(object): transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) self.pollster_manager = extension_manager diff --git a/ceilometer/api/hooks.py b/ceilometer/api/hooks.py index 7455e5c9..1e0d7bca 100644 --- a/ceilometer/api/hooks.py +++ b/ceilometer/api/hooks.py @@ -21,7 +21,6 @@ from oslo.config import cfg from pecan import hooks from ceilometer import pipeline -from ceilometer import publisher from ceilometer import storage from ceilometer import transformer @@ -61,9 +60,7 @@ class PipelineHook(hooks.PecanHook): # when the file is imported. self.__class__.pipeline_manager = pipeline.setup_pipeline( transformer.TransformerExtensionManager( - 'ceilometer.transformer'), - publisher.PublisherExtensionManager( - 'ceilometer.publisher')) + 'ceilometer.transformer')) def before(self, state): state.request.pipeline_manager = self.pipeline_manager diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index ddbbe865..a3dfac8e 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -31,7 +31,6 @@ from ceilometer.openstack.common.rpc import service as rpc_service from ceilometer.openstack.common import timeutils from ceilometer import pipeline -from ceilometer import publisher from ceilometer import storage from ceilometer import transformer @@ -127,9 +126,6 @@ class CollectorService(rpc_service.Service): transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) LOG.debug('loading notification handlers from %s', diff --git a/ceilometer/notifier.py b/ceilometer/notifier.py index 3670a5e2..8ec78eee 100644 --- a/ceilometer/notifier.py +++ b/ceilometer/notifier.py @@ -17,7 +17,6 @@ # under the License. from ceilometer import pipeline -from ceilometer import publisher from ceilometer import transformer from ceilometer.openstack.common import context as req_context from ceilometer.openstack.common import log as logging @@ -57,9 +56,6 @@ def _load_pipeline_manager(): transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) diff --git a/ceilometer/objectstore/swift_middleware.py b/ceilometer/objectstore/swift_middleware.py index b7447ec9..1e45bed8 100644 --- a/ceilometer/objectstore/swift_middleware.py +++ b/ceilometer/objectstore/swift_middleware.py @@ -60,7 +60,6 @@ from ceilometer import counter from ceilometer.openstack.common import context from ceilometer.openstack.common import timeutils from ceilometer import pipeline -from ceilometer import publisher from ceilometer import service from ceilometer import transformer @@ -84,9 +83,6 @@ class CeilometerMiddleware(object): transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) def __call__(self, env, start_response): diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index bf59c045..664d1470 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -23,6 +23,8 @@ from oslo.config import cfg import yaml from ceilometer.openstack.common import log +from ceilometer import publisher + OPTS = [ cfg.StrOpt('pipeline_cfg_file', @@ -90,7 +92,7 @@ class Pipeline(object): """ - def __init__(self, cfg, publisher_manager, transformer_manager): + def __init__(self, cfg, transformer_manager): self.cfg = cfg try: @@ -100,10 +102,8 @@ class Pipeline(object): except ValueError: raise PipelineException("Invalid interval value", cfg) self.counters = cfg['counters'] - self.publishers = cfg['publishers'] # It's legal to have no transformer specified self.transformer_cfg = cfg['transformers'] or [] - self.publisher_manager = publisher_manager except KeyError as err: raise PipelineException( "Required field %s not specified" % err.args[0], cfg) @@ -113,7 +113,15 @@ class Pipeline(object): self._check_counters() - self._check_publishers(cfg, publisher_manager) + if not cfg.get('publishers'): + raise PipelineException("No publisher specified", cfg) + + self.publishers = [] + for p in cfg['publishers']: + try: + self.publishers.append(publisher.get_publisher(p)) + except Exception: + LOG.exception("Unable to load publisher %s", p) self.transformers = self._setup_transformers(cfg, transformer_manager) @@ -144,16 +152,6 @@ class Pipeline(object): "Included counters specified with wildcard", self.cfg) - def _check_publishers(self, cfg, publisher_manager): - if not self.publishers: - raise PipelineException( - "No publisher specified", cfg) - if not set(self.publishers).issubset(set(publisher_manager.names())): - raise PipelineException( - "Publishers %s invalid" % - set(self.publishers).difference( - set(self.publisher_manager.names())), cfg) - def _setup_transformers(self, cfg, transformer_manager): transformer_cfg = cfg['transformers'] or [] transformers = [] @@ -174,14 +172,6 @@ class Pipeline(object): return transformers - def _publish_counters_to_one_publisher(self, ext, ctxt, counters, source): - try: - ext.obj.publish_counters(ctxt, counters, source) - except Exception as err: - LOG.warning("Pipeline %s: Continue after error " - "from publisher %s", self, ext.name) - LOG.exception(err) - def _transform_counter(self, start, ctxt, counter, source): try: for transformer in self.transformers[start:]: @@ -218,12 +208,13 @@ class Pipeline(object): transformed_counters.append(counter) LOG.audit("Pipeline %s: Publishing counters", self) - self.publisher_manager.map(self.publishers, - self._publish_counters_to_one_publisher, - ctxt=ctxt, - counters=transformed_counters, - source=source, - ) + + for p in self.publishers: + try: + p.publish_counters(ctxt, transformed_counters, source) + except Exception: + LOG.exception("Pipeline %s: Continue after error " + "from publisher %s", self, p) LOG.audit("Pipeline %s: Published counters", self) @@ -288,8 +279,7 @@ class PipelineManager(object): """ def __init__(self, cfg, - transformer_manager, - publisher_manager): + transformer_manager): """Setup the pipelines according to config. The top of the cfg is a list of pipeline definitions. @@ -331,8 +321,7 @@ class PipelineManager(object): Publisher's name is plugin name in setup.py """ - self.pipelines = [Pipeline(pipedef, publisher_manager, - transformer_manager) + self.pipelines = [Pipeline(pipedef, transformer_manager) for pipedef in cfg] def publisher(self, context, source): @@ -344,7 +333,7 @@ class PipelineManager(object): return PublishContext(context, source, self.pipelines) -def setup_pipeline(transformer_manager, publisher_manager): +def setup_pipeline(transformer_manager): """Setup pipeline manager according to yaml config file.""" cfg_file = cfg.CONF.pipeline_cfg_file if not os.path.exists(cfg_file): @@ -359,5 +348,4 @@ def setup_pipeline(transformer_manager, publisher_manager): LOG.info("Pipeline config: %s", pipeline_cfg) return PipelineManager(pipeline_cfg, - transformer_manager, - publisher_manager) + transformer_manager) diff --git a/ceilometer/publisher/__init__.py b/ceilometer/publisher/__init__.py index b60797fc..33fe6a6e 100644 --- a/ceilometer/publisher/__init__.py +++ b/ceilometer/publisher/__init__.py @@ -19,17 +19,17 @@ # under the License. import abc -from stevedore import dispatch +from stevedore import driver -class PublisherExtensionManager(dispatch.NameDispatchExtensionManager): +def get_publisher(name, namespace='ceilometer.publisher'): + """Get publisher driver and load it. - def __init__(self, namespace): - super(PublisherExtensionManager, self).__init__( - namespace=namespace, - check_func=lambda x: True, - invoke_on_load=True, - ) + :param name: Name of the publisher driver. + :param namespace: Namespace to use to look for drivers. + """ + loaded_driver = driver.DriverManager(namespace, name) + return loaded_driver.driver() class PublisherBase(object): diff --git a/ceilometer/publisher/test.py b/ceilometer/publisher/test.py new file mode 100644 index 00000000..8d12d8d5 --- /dev/null +++ b/ceilometer/publisher/test.py @@ -0,0 +1,37 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Publish a counter in memory, useful for testing +""" + +from ceilometer import publisher + + +class TestPublisher(publisher.PublisherBase): + """Publisher used in unit testing.""" + + def __init__(self): + self.counters = [] + + def publish_counters(self, context, counters, source): + """Send a metering message for publishing + + :param context: Execution context from the service or RPC call + :param counter: Counter from pipeline after transformation + :param source: counter source + """ + self.counters.extend(counters) diff --git a/setup.cfg b/setup.cfg index d13ac584..28fe5474 100644 --- a/setup.cfg +++ b/setup.cfg @@ -77,6 +77,7 @@ ceilometer.transformer = accumulator = ceilometer.transformer.accumulator:TransformerAccumulator ceilometer.publisher = + test = ceilometer.publisher.test:TestPublisher meter_publisher = ceilometer.publisher.meter:MeterPublisher meter = ceilometer.publisher.meter:MeterPublisher udp = ceilometer.publisher.udp:UDPPublisher diff --git a/tests/agentbase.py b/tests/agentbase.py index a8ff0df1..18a1f8c3 100644 --- a/tests/agentbase.py +++ b/tests/agentbase.py @@ -28,7 +28,6 @@ from stevedore.tests import manager as extension_tests from ceilometer import counter from ceilometer import pipeline -from ceilometer import publisher from ceilometer.tests import base from ceilometer import transformer @@ -70,13 +69,6 @@ class TestPollsterException(TestPollster): class BaseAgentManagerTestCase(base.TestCase): - class PublisherClass(): - def __init__(self): - self.counters = [] - - def publish_counters(self, ctxt, counter, source): - self.counters.extend(counter) - class Pollster(TestPollster): counters = [] test_data = default_test_data @@ -94,29 +86,12 @@ class BaseAgentManagerTestCase(base.TestCase): test_data = default_test_data._replace(name='testexceptionanother') def setup_pipeline(self): - self.publisher = self.PublisherClass() self.transformer_manager = transformer.TransformerExtensionManager( 'ceilometer.transformer', ) - self.publisher_manager = publisher.PublisherExtensionManager( - 'fake', - ) - self.publisher_manager.extensions = [ - extension.Extension( - 'test_pub', - None, - None, - self.publisher, - ), ] - self.publisher_manager.by_name = dict( - (e.name, e) - for e - in self.publisher_manager.extensions) - self.mgr.pipeline_manager = pipeline.PipelineManager( self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) def create_extension_manager(self): return extension_tests.TestExtensionManager( @@ -160,7 +135,7 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 60, 'counters': ['test'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }, ] self.setup_pipeline() @@ -176,7 +151,8 @@ class BaseAgentManagerTestCase(base.TestCase): self.assertEqual(len(polling_tasks), 1) self.assertTrue(60 in polling_tasks.keys()) self.mgr.interval_task(polling_tasks.values()[0]) - self.assertEqual(self.publisher.counters[0], self.Pollster.test_data) + pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(pub.counters[0], self.Pollster.test_data) def test_setup_polling_tasks_multiple_interval(self): self.pipeline_cfg.append({ @@ -184,7 +160,7 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 10, 'counters': ['test'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }) self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() @@ -199,7 +175,7 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 10, 'counters': ['test_invalid'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }) polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(len(polling_tasks), 1) @@ -211,7 +187,7 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 60, 'counters': ['testanother'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }) self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() @@ -226,23 +202,23 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 10, 'counters': ['testexceptionanother'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }, { 'name': "test_pipeline_2", 'interval': 10, 'counters': ['testexception'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }, ] self.mgr.pipeline_manager = pipeline.PipelineManager( self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(len(polling_tasks.keys()), 1) polling_tasks.get(10) self.mgr.interval_task(polling_tasks.get(10)) - self.assertEqual(len(self.publisher.counters), 0) + pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(pub.counters), 0) diff --git a/tests/compute/test_manager.py b/tests/compute/test_manager.py index 314a149b..90305e94 100644 --- a/tests/compute/test_manager.py +++ b/tests/compute/test_manager.py @@ -59,7 +59,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): self.mgr.setup_notifier_task() self.mgr.poll_instance(None, self.instance) self.assertEqual(len(self.Pollster.counters), 1) - assert self.publisher.counters[0] == self.Pollster.test_data + pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(pub.counters[0], self.Pollster.test_data) def test_setup_polling_tasks(self): super(TestRunTasks, self).test_setup_polling_tasks() diff --git a/tests/objectstore/test_swift_middleware.py b/tests/objectstore/test_swift_middleware.py index a3138d34..506c3ac7 100644 --- a/tests/objectstore/test_swift_middleware.py +++ b/tests/objectstore/test_swift_middleware.py @@ -61,7 +61,7 @@ class TestSwiftMiddleware(base.TestCase): def flush(self, ctx, source): pass - def _faux_setup_pipeline(self, transformer_manager, publisher_manager): + def _faux_setup_pipeline(self, transformer_manager): return self.pipeline_manager def setUp(self): diff --git a/tests/test_notifier.py b/tests/test_notifier.py index fd883c21..048ee0fe 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -20,10 +20,8 @@ from ceilometer import notifier from ceilometer import pipeline -from ceilometer import publisher from ceilometer import transformer from ceilometer.tests import base as tests_base -from stevedore import extension MESSAGE = { @@ -68,44 +66,21 @@ MESSAGE = { class TestNotifier(tests_base.TestCase): - class PublisherClass(): - def __init__(self): - self.counters = [] - - def publish_counters(self, ctxt, counter, source): - self.counters.extend(counter) - def test_process_notification(self): - pub = self.PublisherClass() transformer_manager = transformer.TransformerExtensionManager( 'ceilometer.transformer', ) - publisher_manager = publisher.PublisherExtensionManager( - 'fake', - ) - publisher_manager.extensions = [ - extension.Extension( - 'test_pub', - None, - None, - pub, - ), ] - publisher_manager.by_name = dict( - (e.name, e) - for e - in publisher_manager.extensions) - notifier._pipeline_manager = pipeline.PipelineManager( [{ 'name': "test_pipeline", 'interval': 60, 'counters': ['*'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }], - transformer_manager, - publisher_manager) + transformer_manager) + pub = notifier._pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(pub.counters), 0) notifier.notify(None, MESSAGE) self.assertTrue(len(pub.counters) > 0) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index c0d18bd2..8a2785c7 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -21,6 +21,7 @@ from stevedore import extension from ceilometer import counter from ceilometer import publisher +from ceilometer.publisher import test as test_publisher from ceilometer import transformer from ceilometer.transformer import accumulator from ceilometer.openstack.common import timeutils @@ -53,12 +54,11 @@ class TestPipeline(base.TestCase): raise KeyError(name) - class PublisherClass(): - def __init__(self): - self.counters = [] - - def publish_counters(self, ctxt, counters, source): - self.counters.extend(counters) + def get_publisher(self, name, namespace=''): + fake_drivers = {'test': test_publisher.TestPublisher, + 'new': test_publisher.TestPublisher, + 'except': self.PublisherClassException} + return fake_drivers[name]() class PublisherClassException(): def publish_counters(self, ctxt, counters, source): @@ -115,40 +115,10 @@ class TestPipeline(base.TestCase): "get_ext", self.fake_tem_get_ext) - self.publisher_manager = publisher.PublisherExtensionManager( - 'fake', - ) + self.stubs.Set(publisher, 'get_publisher', self.get_publisher) self.transformer_manager = transformer.TransformerExtensionManager() - self.publisher = self.PublisherClass() - self.new_publisher = self.PublisherClass() - self.publisher_exception = self.PublisherClassException() - self.publisher_manager.extensions = [ - extension.Extension( - 'test', - None, - None, - self.publisher, - ), - extension.Extension( - 'new', - None, - None, - self.new_publisher, - ), - extension.Extension( - 'except', - None, - None, - self.publisher_exception, - ), - ] - self.publisher_manager.by_name = dict( - (e.name, e) - for e - in self.publisher_manager.extensions) - self.pipeline_cfg = [{ 'name': "test_pipeline", 'interval': 5, @@ -164,8 +134,7 @@ class TestPipeline(base.TestCase): self.assertRaises(pipeline.PipelineException, pipeline.PipelineManager, self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) def test_no_counters(self): del self.pipeline_cfg[0]['counters'] @@ -205,7 +174,6 @@ class TestPipeline(base.TestCase): def test_check_publishers_invalid_publisher(self): publisher_cfg = ['test_invalid'] self.pipeline_cfg[0]['publishers'] = publisher_cfg - self._exception_create_pipelinemanager() def test_invalid_string_interval(self): self.pipeline_cfg[0]['interval'] = 'string' @@ -221,24 +189,22 @@ class TestPipeline(base.TestCase): def test_get_interval(self): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) pipe = pipeline_manager.pipelines[0] self.assertTrue(pipe.get_interval() == 5) def test_publisher_transformer_invoked(self): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a') @@ -246,73 +212,68 @@ class TestPipeline(base.TestCase): counter_cfg = ['a', 'b'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.test_counter = self.test_counter._replace(name='b') with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 2) + self.assertEqual(len(publisher.counters), 2) self.assertTrue(len(self.TransformerClass.samples) == 2) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') - self.assertTrue(getattr(self.publisher.counters[1], "name") - == 'b_update') + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') + self.assertEqual(getattr(publisher.counters[1], "name"), 'b_update') def test_wildcard_counter(self): counter_cfg = ['*'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') def test_wildcard_excluded_counters(self): counter_cfg = ['*', '!a'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) def test_wildcard_excluded_counters_not_excluded(self): counter_cfg = ['*', '!b'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(len(self.TransformerClass.samples), 1) + self.assertEqual(getattr(publisher.counters[0], "name"), + 'a_update') def test_all_excluded_counters_not_excluded(self): counter_cfg = ['!b', '!c'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a') @@ -320,8 +281,7 @@ class TestPipeline(base.TestCase): counter_cfg = ['!a', '!c'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) self.assertTrue(pipeline_manager.pipelines[0].support_counter('b')) self.assertFalse(pipeline_manager.pipelines[0].support_counter('c')) @@ -342,8 +302,7 @@ class TestPipeline(base.TestCase): }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -352,12 +311,12 @@ class TestPipeline(base.TestCase): with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') - self.assertTrue(len(self.new_publisher.counters) == 1) - self.assertTrue(getattr(self.new_publisher.counters[0], "name") - == 'b_new') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') + new_publisher = pipeline_manager.pipelines[1].publishers[0] + self.assertEqual(len(new_publisher.counters), 1) + self.assertEqual(getattr(new_publisher.counters[0], "name"), 'b_new') self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a') @@ -382,8 +341,7 @@ class TestPipeline(base.TestCase): 'publishers': ['except'], }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -393,10 +351,9 @@ class TestPipeline(base.TestCase): with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') - self.assertTrue(len(self.new_publisher.counters) == 0) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') self.assertTrue(len(self.TransformerClass.samples) == 2) self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a') @@ -406,22 +363,22 @@ class TestPipeline(base.TestCase): def test_none_transformer_pipeline(self): self.pipeline_cfg[0]['transformers'] = None pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), 'a') def test_empty_transformer_pipeline(self): self.pipeline_cfg[0]['transformers'] = [] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), 'a') def test_multiple_transformer_same_class(self): self.pipeline_cfg[0]['transformers'] = [ @@ -435,15 +392,15 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update_update') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update_update') self.assertTrue(len(self.TransformerClass.samples) == 2) self.assertTrue(getattr(self.TransformerClass.samples[0], 'name') == 'a') @@ -468,8 +425,7 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -478,9 +434,10 @@ class TestPipeline(base.TestCase): == 'a') self.assertTrue(getattr(self.TransformerClass.samples[1], 'name') == 'a_update') - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update_new') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update_new') def test_multiple_transformer_drop_transformer(self): self.pipeline_cfg[0]['transformers'] = [ @@ -504,12 +461,12 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 0) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 0) self.assertTrue(len(self.TransformerClass.samples) == 1) self.assertTrue(getattr(self.TransformerClass.samples[0], 'name') == 'a') @@ -520,45 +477,44 @@ class TestPipeline(base.TestCase): def test_multiple_publisher(self): self.pipeline_cfg[0]['publishers'] = ['test', 'new'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(len(self.new_publisher.counters) == 1) - self.assertTrue(getattr(self.new_publisher.counters[0], 'name') - == 'a_update') - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update') + publisher = pipeline_manager.pipelines[0].publishers[0] + new_publisher = pipeline_manager.pipelines[0].publishers[1] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(len(new_publisher.counters), 1) + self.assertEqual(getattr(new_publisher.counters[0], 'name'), + 'a_update') + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update') def test_multiple_publisher_isolation(self): self.pipeline_cfg[0]['publishers'] = ['except', 'new'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.new_publisher.counters) == 1) - self.assertTrue(getattr(self.new_publisher.counters[0], 'name') - == 'a_update') + new_publisher = pipeline_manager.pipelines[0].publishers[1] + self.assertEqual(len(new_publisher.counters), 1) + self.assertEqual(getattr(new_publisher.counters[0], 'name'), + 'a_update') def test_multiple_counter_pipeline(self): self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter, self.test_counter._replace(name='b')]) - self.assertTrue(len(self.publisher.counters) == 2) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update') - self.assertTrue(getattr(self.publisher.counters[1], 'name') - == 'b_update') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 2) + self.assertEqual(getattr(publisher.counters[0], 'name'), 'a_update') + self.assertEqual(getattr(publisher.counters[1], 'name'), 'b_update') def test_flush_pipeline_cache(self): CACHE_SIZE = 10 @@ -578,22 +534,22 @@ class TestPipeline(base.TestCase): }, ] ) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_counter(None, self.test_counter, None) - self.assertTrue(len(self.publisher.counters) == 0) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 0) pipe.flush(None, None) - self.assertEqual(len(self.publisher.counters), 0) + self.assertEqual(len(publisher.counters), 0) pipe.publish_counter(None, self.test_counter, None) pipe.flush(None, None) - self.assertEqual(len(self.publisher.counters), 0) + self.assertEqual(len(publisher.counters), 0) for i in range(CACHE_SIZE - 2): pipe.publish_counter(None, self.test_counter, None) pipe.flush(None, None) - self.assertEqual(len(self.publisher.counters), CACHE_SIZE) - self.assertTrue(getattr(self.publisher.counters[0], 'name') + self.assertEqual(len(publisher.counters), CACHE_SIZE) + self.assertTrue(getattr(publisher.counters[0], 'name') == 'a_update_new') def test_flush_pipeline_cache_multiple_counter(self): @@ -615,23 +571,22 @@ class TestPipeline(base.TestCase): ) self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter, self.test_counter._replace(name='b')]) - self.assertTrue(len(self.publisher.counters) == 0) - self.assertEqual(len(self.publisher.counters), 0) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 0) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertEqual(len(self.publisher.counters), CACHE_SIZE) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update_new') - self.assertTrue(getattr(self.publisher.counters[1], 'name') - == 'b_update_new') + self.assertEqual(len(publisher.counters), CACHE_SIZE) + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update_new') + self.assertEqual(getattr(publisher.counters[1], 'name'), + 'b_update_new') def test_flush_pipeline_cache_before_publisher(self): self.pipeline_cfg[0]['transformers'].append({ @@ -639,16 +594,16 @@ class TestPipeline(base.TestCase): 'parameters': {} }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) pipe = pipeline_manager.pipelines[0] + publisher = pipe.publishers[0] pipe.publish_counter(None, self.test_counter, None) - self.assertTrue(len(self.publisher.counters) == 0) + self.assertEqual(len(publisher.counters), 0) pipe.flush(None, None) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update') + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update') def test_variable_counter(self): self.pipeline_cfg = [{ @@ -662,17 +617,17 @@ class TestPipeline(base.TestCase): 'publishers': ["test"], }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) self.test_counter = self.test_counter._replace(name='a:b') with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a:b_update') + self.assertEqual(getattr(publisher.counters[0], "name"), + 'a:b_update') self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a:b')