diff --git a/bin/ceilometer-send-counter b/bin/ceilometer-send-counter index 9cb93ea8..a5eefd86 100755 --- a/bin/ceilometer-send-counter +++ b/bin/ceilometer-send-counter @@ -28,7 +28,6 @@ from stevedore import dispatch from ceilometer import counter from ceilometer import pipeline -from ceilometer import publisher from ceilometer import service from ceilometer import transformer from ceilometer.openstack.common import timeutils @@ -87,9 +86,6 @@ pipeline_manager = pipeline.setup_pipeline( transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) with pipeline_manager.publisher(context.get_admin_context(), diff --git a/ceilometer/agent.py b/ceilometer/agent.py index bc215fb8..af72576b 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -24,7 +24,6 @@ from oslo.config import cfg from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer import pipeline -from ceilometer import publisher from ceilometer import transformer LOG = log.getLogger(__name__) @@ -57,9 +56,6 @@ class AgentManager(object): transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) self.pollster_manager = extension_manager diff --git a/ceilometer/api/hooks.py b/ceilometer/api/hooks.py index 7455e5c9..1e0d7bca 100644 --- a/ceilometer/api/hooks.py +++ b/ceilometer/api/hooks.py @@ -21,7 +21,6 @@ from oslo.config import cfg from pecan import hooks from ceilometer import pipeline -from ceilometer import publisher from ceilometer import storage from ceilometer import transformer @@ -61,9 +60,7 @@ class PipelineHook(hooks.PecanHook): # when the file is imported. self.__class__.pipeline_manager = pipeline.setup_pipeline( transformer.TransformerExtensionManager( - 'ceilometer.transformer'), - publisher.PublisherExtensionManager( - 'ceilometer.publisher')) + 'ceilometer.transformer')) def before(self, state): state.request.pipeline_manager = self.pipeline_manager diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index ddbbe865..a3dfac8e 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -31,7 +31,6 @@ from ceilometer.openstack.common.rpc import service as rpc_service from ceilometer.openstack.common import timeutils from ceilometer import pipeline -from ceilometer import publisher from ceilometer import storage from ceilometer import transformer @@ -127,9 +126,6 @@ class CollectorService(rpc_service.Service): transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) LOG.debug('loading notification handlers from %s', diff --git a/ceilometer/notifier.py b/ceilometer/notifier.py index 3670a5e2..8ec78eee 100644 --- a/ceilometer/notifier.py +++ b/ceilometer/notifier.py @@ -17,7 +17,6 @@ # under the License. from ceilometer import pipeline -from ceilometer import publisher from ceilometer import transformer from ceilometer.openstack.common import context as req_context from ceilometer.openstack.common import log as logging @@ -57,9 +56,6 @@ def _load_pipeline_manager(): transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) diff --git a/ceilometer/objectstore/swift_middleware.py b/ceilometer/objectstore/swift_middleware.py index b7447ec9..1e45bed8 100644 --- a/ceilometer/objectstore/swift_middleware.py +++ b/ceilometer/objectstore/swift_middleware.py @@ -60,7 +60,6 @@ from ceilometer import counter from ceilometer.openstack.common import context from ceilometer.openstack.common import timeutils from ceilometer import pipeline -from ceilometer import publisher from ceilometer import service from ceilometer import transformer @@ -84,9 +83,6 @@ class CeilometerMiddleware(object): transformer.TransformerExtensionManager( 'ceilometer.transformer', ), - publisher.PublisherExtensionManager( - 'ceilometer.publisher', - ), ) def __call__(self, env, start_response): diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index bf59c045..664d1470 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -23,6 +23,8 @@ from oslo.config import cfg import yaml from ceilometer.openstack.common import log +from ceilometer import publisher + OPTS = [ cfg.StrOpt('pipeline_cfg_file', @@ -90,7 +92,7 @@ class Pipeline(object): """ - def __init__(self, cfg, publisher_manager, transformer_manager): + def __init__(self, cfg, transformer_manager): self.cfg = cfg try: @@ -100,10 +102,8 @@ class Pipeline(object): except ValueError: raise PipelineException("Invalid interval value", cfg) self.counters = cfg['counters'] - self.publishers = cfg['publishers'] # It's legal to have no transformer specified self.transformer_cfg = cfg['transformers'] or [] - self.publisher_manager = publisher_manager except KeyError as err: raise PipelineException( "Required field %s not specified" % err.args[0], cfg) @@ -113,7 +113,15 @@ class Pipeline(object): self._check_counters() - self._check_publishers(cfg, publisher_manager) + if not cfg.get('publishers'): + raise PipelineException("No publisher specified", cfg) + + self.publishers = [] + for p in cfg['publishers']: + try: + self.publishers.append(publisher.get_publisher(p)) + except Exception: + LOG.exception("Unable to load publisher %s", p) self.transformers = self._setup_transformers(cfg, transformer_manager) @@ -144,16 +152,6 @@ class Pipeline(object): "Included counters specified with wildcard", self.cfg) - def _check_publishers(self, cfg, publisher_manager): - if not self.publishers: - raise PipelineException( - "No publisher specified", cfg) - if not set(self.publishers).issubset(set(publisher_manager.names())): - raise PipelineException( - "Publishers %s invalid" % - set(self.publishers).difference( - set(self.publisher_manager.names())), cfg) - def _setup_transformers(self, cfg, transformer_manager): transformer_cfg = cfg['transformers'] or [] transformers = [] @@ -174,14 +172,6 @@ class Pipeline(object): return transformers - def _publish_counters_to_one_publisher(self, ext, ctxt, counters, source): - try: - ext.obj.publish_counters(ctxt, counters, source) - except Exception as err: - LOG.warning("Pipeline %s: Continue after error " - "from publisher %s", self, ext.name) - LOG.exception(err) - def _transform_counter(self, start, ctxt, counter, source): try: for transformer in self.transformers[start:]: @@ -218,12 +208,13 @@ class Pipeline(object): transformed_counters.append(counter) LOG.audit("Pipeline %s: Publishing counters", self) - self.publisher_manager.map(self.publishers, - self._publish_counters_to_one_publisher, - ctxt=ctxt, - counters=transformed_counters, - source=source, - ) + + for p in self.publishers: + try: + p.publish_counters(ctxt, transformed_counters, source) + except Exception: + LOG.exception("Pipeline %s: Continue after error " + "from publisher %s", self, p) LOG.audit("Pipeline %s: Published counters", self) @@ -288,8 +279,7 @@ class PipelineManager(object): """ def __init__(self, cfg, - transformer_manager, - publisher_manager): + transformer_manager): """Setup the pipelines according to config. The top of the cfg is a list of pipeline definitions. @@ -331,8 +321,7 @@ class PipelineManager(object): Publisher's name is plugin name in setup.py """ - self.pipelines = [Pipeline(pipedef, publisher_manager, - transformer_manager) + self.pipelines = [Pipeline(pipedef, transformer_manager) for pipedef in cfg] def publisher(self, context, source): @@ -344,7 +333,7 @@ class PipelineManager(object): return PublishContext(context, source, self.pipelines) -def setup_pipeline(transformer_manager, publisher_manager): +def setup_pipeline(transformer_manager): """Setup pipeline manager according to yaml config file.""" cfg_file = cfg.CONF.pipeline_cfg_file if not os.path.exists(cfg_file): @@ -359,5 +348,4 @@ def setup_pipeline(transformer_manager, publisher_manager): LOG.info("Pipeline config: %s", pipeline_cfg) return PipelineManager(pipeline_cfg, - transformer_manager, - publisher_manager) + transformer_manager) diff --git a/ceilometer/publisher/__init__.py b/ceilometer/publisher/__init__.py index b60797fc..33fe6a6e 100644 --- a/ceilometer/publisher/__init__.py +++ b/ceilometer/publisher/__init__.py @@ -19,17 +19,17 @@ # under the License. import abc -from stevedore import dispatch +from stevedore import driver -class PublisherExtensionManager(dispatch.NameDispatchExtensionManager): +def get_publisher(name, namespace='ceilometer.publisher'): + """Get publisher driver and load it. - def __init__(self, namespace): - super(PublisherExtensionManager, self).__init__( - namespace=namespace, - check_func=lambda x: True, - invoke_on_load=True, - ) + :param name: Name of the publisher driver. + :param namespace: Namespace to use to look for drivers. + """ + loaded_driver = driver.DriverManager(namespace, name) + return loaded_driver.driver() class PublisherBase(object): diff --git a/ceilometer/publisher/test.py b/ceilometer/publisher/test.py new file mode 100644 index 00000000..8d12d8d5 --- /dev/null +++ b/ceilometer/publisher/test.py @@ -0,0 +1,37 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Publish a counter in memory, useful for testing +""" + +from ceilometer import publisher + + +class TestPublisher(publisher.PublisherBase): + """Publisher used in unit testing.""" + + def __init__(self): + self.counters = [] + + def publish_counters(self, context, counters, source): + """Send a metering message for publishing + + :param context: Execution context from the service or RPC call + :param counter: Counter from pipeline after transformation + :param source: counter source + """ + self.counters.extend(counters) diff --git a/setup.cfg b/setup.cfg index d13ac584..28fe5474 100644 --- a/setup.cfg +++ b/setup.cfg @@ -77,6 +77,7 @@ ceilometer.transformer = accumulator = ceilometer.transformer.accumulator:TransformerAccumulator ceilometer.publisher = + test = ceilometer.publisher.test:TestPublisher meter_publisher = ceilometer.publisher.meter:MeterPublisher meter = ceilometer.publisher.meter:MeterPublisher udp = ceilometer.publisher.udp:UDPPublisher diff --git a/tests/agentbase.py b/tests/agentbase.py index a8ff0df1..18a1f8c3 100644 --- a/tests/agentbase.py +++ b/tests/agentbase.py @@ -28,7 +28,6 @@ from stevedore.tests import manager as extension_tests from ceilometer import counter from ceilometer import pipeline -from ceilometer import publisher from ceilometer.tests import base from ceilometer import transformer @@ -70,13 +69,6 @@ class TestPollsterException(TestPollster): class BaseAgentManagerTestCase(base.TestCase): - class PublisherClass(): - def __init__(self): - self.counters = [] - - def publish_counters(self, ctxt, counter, source): - self.counters.extend(counter) - class Pollster(TestPollster): counters = [] test_data = default_test_data @@ -94,29 +86,12 @@ class BaseAgentManagerTestCase(base.TestCase): test_data = default_test_data._replace(name='testexceptionanother') def setup_pipeline(self): - self.publisher = self.PublisherClass() self.transformer_manager = transformer.TransformerExtensionManager( 'ceilometer.transformer', ) - self.publisher_manager = publisher.PublisherExtensionManager( - 'fake', - ) - self.publisher_manager.extensions = [ - extension.Extension( - 'test_pub', - None, - None, - self.publisher, - ), ] - self.publisher_manager.by_name = dict( - (e.name, e) - for e - in self.publisher_manager.extensions) - self.mgr.pipeline_manager = pipeline.PipelineManager( self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) def create_extension_manager(self): return extension_tests.TestExtensionManager( @@ -160,7 +135,7 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 60, 'counters': ['test'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }, ] self.setup_pipeline() @@ -176,7 +151,8 @@ class BaseAgentManagerTestCase(base.TestCase): self.assertEqual(len(polling_tasks), 1) self.assertTrue(60 in polling_tasks.keys()) self.mgr.interval_task(polling_tasks.values()[0]) - self.assertEqual(self.publisher.counters[0], self.Pollster.test_data) + pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(pub.counters[0], self.Pollster.test_data) def test_setup_polling_tasks_multiple_interval(self): self.pipeline_cfg.append({ @@ -184,7 +160,7 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 10, 'counters': ['test'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }) self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() @@ -199,7 +175,7 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 10, 'counters': ['test_invalid'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }) polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(len(polling_tasks), 1) @@ -211,7 +187,7 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 60, 'counters': ['testanother'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }) self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() @@ -226,23 +202,23 @@ class BaseAgentManagerTestCase(base.TestCase): 'interval': 10, 'counters': ['testexceptionanother'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }, { 'name': "test_pipeline_2", 'interval': 10, 'counters': ['testexception'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }, ] self.mgr.pipeline_manager = pipeline.PipelineManager( self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(len(polling_tasks.keys()), 1) polling_tasks.get(10) self.mgr.interval_task(polling_tasks.get(10)) - self.assertEqual(len(self.publisher.counters), 0) + pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(pub.counters), 0) diff --git a/tests/compute/test_manager.py b/tests/compute/test_manager.py index 314a149b..90305e94 100644 --- a/tests/compute/test_manager.py +++ b/tests/compute/test_manager.py @@ -59,7 +59,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): self.mgr.setup_notifier_task() self.mgr.poll_instance(None, self.instance) self.assertEqual(len(self.Pollster.counters), 1) - assert self.publisher.counters[0] == self.Pollster.test_data + pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(pub.counters[0], self.Pollster.test_data) def test_setup_polling_tasks(self): super(TestRunTasks, self).test_setup_polling_tasks() diff --git a/tests/objectstore/test_swift_middleware.py b/tests/objectstore/test_swift_middleware.py index a3138d34..506c3ac7 100644 --- a/tests/objectstore/test_swift_middleware.py +++ b/tests/objectstore/test_swift_middleware.py @@ -61,7 +61,7 @@ class TestSwiftMiddleware(base.TestCase): def flush(self, ctx, source): pass - def _faux_setup_pipeline(self, transformer_manager, publisher_manager): + def _faux_setup_pipeline(self, transformer_manager): return self.pipeline_manager def setUp(self): diff --git a/tests/test_notifier.py b/tests/test_notifier.py index fd883c21..048ee0fe 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -20,10 +20,8 @@ from ceilometer import notifier from ceilometer import pipeline -from ceilometer import publisher from ceilometer import transformer from ceilometer.tests import base as tests_base -from stevedore import extension MESSAGE = { @@ -68,44 +66,21 @@ MESSAGE = { class TestNotifier(tests_base.TestCase): - class PublisherClass(): - def __init__(self): - self.counters = [] - - def publish_counters(self, ctxt, counter, source): - self.counters.extend(counter) - def test_process_notification(self): - pub = self.PublisherClass() transformer_manager = transformer.TransformerExtensionManager( 'ceilometer.transformer', ) - publisher_manager = publisher.PublisherExtensionManager( - 'fake', - ) - publisher_manager.extensions = [ - extension.Extension( - 'test_pub', - None, - None, - pub, - ), ] - publisher_manager.by_name = dict( - (e.name, e) - for e - in publisher_manager.extensions) - notifier._pipeline_manager = pipeline.PipelineManager( [{ 'name': "test_pipeline", 'interval': 60, 'counters': ['*'], 'transformers': [], - 'publishers': ["test_pub"], + 'publishers': ["test"], }], - transformer_manager, - publisher_manager) + transformer_manager) + pub = notifier._pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(pub.counters), 0) notifier.notify(None, MESSAGE) self.assertTrue(len(pub.counters) > 0) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index c0d18bd2..8a2785c7 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -21,6 +21,7 @@ from stevedore import extension from ceilometer import counter from ceilometer import publisher +from ceilometer.publisher import test as test_publisher from ceilometer import transformer from ceilometer.transformer import accumulator from ceilometer.openstack.common import timeutils @@ -53,12 +54,11 @@ class TestPipeline(base.TestCase): raise KeyError(name) - class PublisherClass(): - def __init__(self): - self.counters = [] - - def publish_counters(self, ctxt, counters, source): - self.counters.extend(counters) + def get_publisher(self, name, namespace=''): + fake_drivers = {'test': test_publisher.TestPublisher, + 'new': test_publisher.TestPublisher, + 'except': self.PublisherClassException} + return fake_drivers[name]() class PublisherClassException(): def publish_counters(self, ctxt, counters, source): @@ -115,40 +115,10 @@ class TestPipeline(base.TestCase): "get_ext", self.fake_tem_get_ext) - self.publisher_manager = publisher.PublisherExtensionManager( - 'fake', - ) + self.stubs.Set(publisher, 'get_publisher', self.get_publisher) self.transformer_manager = transformer.TransformerExtensionManager() - self.publisher = self.PublisherClass() - self.new_publisher = self.PublisherClass() - self.publisher_exception = self.PublisherClassException() - self.publisher_manager.extensions = [ - extension.Extension( - 'test', - None, - None, - self.publisher, - ), - extension.Extension( - 'new', - None, - None, - self.new_publisher, - ), - extension.Extension( - 'except', - None, - None, - self.publisher_exception, - ), - ] - self.publisher_manager.by_name = dict( - (e.name, e) - for e - in self.publisher_manager.extensions) - self.pipeline_cfg = [{ 'name': "test_pipeline", 'interval': 5, @@ -164,8 +134,7 @@ class TestPipeline(base.TestCase): self.assertRaises(pipeline.PipelineException, pipeline.PipelineManager, self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) def test_no_counters(self): del self.pipeline_cfg[0]['counters'] @@ -205,7 +174,6 @@ class TestPipeline(base.TestCase): def test_check_publishers_invalid_publisher(self): publisher_cfg = ['test_invalid'] self.pipeline_cfg[0]['publishers'] = publisher_cfg - self._exception_create_pipelinemanager() def test_invalid_string_interval(self): self.pipeline_cfg[0]['interval'] = 'string' @@ -221,24 +189,22 @@ class TestPipeline(base.TestCase): def test_get_interval(self): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) pipe = pipeline_manager.pipelines[0] self.assertTrue(pipe.get_interval() == 5) def test_publisher_transformer_invoked(self): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a') @@ -246,73 +212,68 @@ class TestPipeline(base.TestCase): counter_cfg = ['a', 'b'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.test_counter = self.test_counter._replace(name='b') with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 2) + self.assertEqual(len(publisher.counters), 2) self.assertTrue(len(self.TransformerClass.samples) == 2) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') - self.assertTrue(getattr(self.publisher.counters[1], "name") - == 'b_update') + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') + self.assertEqual(getattr(publisher.counters[1], "name"), 'b_update') def test_wildcard_counter(self): counter_cfg = ['*'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') def test_wildcard_excluded_counters(self): counter_cfg = ['*', '!a'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) def test_wildcard_excluded_counters_not_excluded(self): counter_cfg = ['*', '!b'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(len(self.TransformerClass.samples), 1) + self.assertEqual(getattr(publisher.counters[0], "name"), + 'a_update') def test_all_excluded_counters_not_excluded(self): counter_cfg = ['!b', '!c'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a') @@ -320,8 +281,7 @@ class TestPipeline(base.TestCase): counter_cfg = ['!a', '!c'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) self.assertTrue(pipeline_manager.pipelines[0].support_counter('b')) self.assertFalse(pipeline_manager.pipelines[0].support_counter('c')) @@ -342,8 +302,7 @@ class TestPipeline(base.TestCase): }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -352,12 +311,12 @@ class TestPipeline(base.TestCase): with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') - self.assertTrue(len(self.new_publisher.counters) == 1) - self.assertTrue(getattr(self.new_publisher.counters[0], "name") - == 'b_new') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') + new_publisher = pipeline_manager.pipelines[1].publishers[0] + self.assertEqual(len(new_publisher.counters), 1) + self.assertEqual(getattr(new_publisher.counters[0], "name"), 'b_new') self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a') @@ -382,8 +341,7 @@ class TestPipeline(base.TestCase): 'publishers': ['except'], }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -393,10 +351,9 @@ class TestPipeline(base.TestCase): with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a_update') - self.assertTrue(len(self.new_publisher.counters) == 0) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update') self.assertTrue(len(self.TransformerClass.samples) == 2) self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a') @@ -406,22 +363,22 @@ class TestPipeline(base.TestCase): def test_none_transformer_pipeline(self): self.pipeline_cfg[0]['transformers'] = None pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), 'a') def test_empty_transformer_pipeline(self): self.pipeline_cfg[0]['transformers'] = [] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), 'a') def test_multiple_transformer_same_class(self): self.pipeline_cfg[0]['transformers'] = [ @@ -435,15 +392,15 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update_update') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update_update') self.assertTrue(len(self.TransformerClass.samples) == 2) self.assertTrue(getattr(self.TransformerClass.samples[0], 'name') == 'a') @@ -468,8 +425,7 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -478,9 +434,10 @@ class TestPipeline(base.TestCase): == 'a') self.assertTrue(getattr(self.TransformerClass.samples[1], 'name') == 'a_update') - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update_new') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update_new') def test_multiple_transformer_drop_transformer(self): self.pipeline_cfg[0]['transformers'] = [ @@ -504,12 +461,12 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 0) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 0) self.assertTrue(len(self.TransformerClass.samples) == 1) self.assertTrue(getattr(self.TransformerClass.samples[0], 'name') == 'a') @@ -520,45 +477,44 @@ class TestPipeline(base.TestCase): def test_multiple_publisher(self): self.pipeline_cfg[0]['publishers'] = ['test', 'new'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(len(self.new_publisher.counters) == 1) - self.assertTrue(getattr(self.new_publisher.counters[0], 'name') - == 'a_update') - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update') + publisher = pipeline_manager.pipelines[0].publishers[0] + new_publisher = pipeline_manager.pipelines[0].publishers[1] + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(len(new_publisher.counters), 1) + self.assertEqual(getattr(new_publisher.counters[0], 'name'), + 'a_update') + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update') def test_multiple_publisher_isolation(self): self.pipeline_cfg[0]['publishers'] = ['except', 'new'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.new_publisher.counters) == 1) - self.assertTrue(getattr(self.new_publisher.counters[0], 'name') - == 'a_update') + new_publisher = pipeline_manager.pipelines[0].publishers[1] + self.assertEqual(len(new_publisher.counters), 1) + self.assertEqual(getattr(new_publisher.counters[0], 'name'), + 'a_update') def test_multiple_counter_pipeline(self): self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter, self.test_counter._replace(name='b')]) - self.assertTrue(len(self.publisher.counters) == 2) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update') - self.assertTrue(getattr(self.publisher.counters[1], 'name') - == 'b_update') + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 2) + self.assertEqual(getattr(publisher.counters[0], 'name'), 'a_update') + self.assertEqual(getattr(publisher.counters[1], 'name'), 'b_update') def test_flush_pipeline_cache(self): CACHE_SIZE = 10 @@ -578,22 +534,22 @@ class TestPipeline(base.TestCase): }, ] ) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_counter(None, self.test_counter, None) - self.assertTrue(len(self.publisher.counters) == 0) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 0) pipe.flush(None, None) - self.assertEqual(len(self.publisher.counters), 0) + self.assertEqual(len(publisher.counters), 0) pipe.publish_counter(None, self.test_counter, None) pipe.flush(None, None) - self.assertEqual(len(self.publisher.counters), 0) + self.assertEqual(len(publisher.counters), 0) for i in range(CACHE_SIZE - 2): pipe.publish_counter(None, self.test_counter, None) pipe.flush(None, None) - self.assertEqual(len(self.publisher.counters), CACHE_SIZE) - self.assertTrue(getattr(self.publisher.counters[0], 'name') + self.assertEqual(len(publisher.counters), CACHE_SIZE) + self.assertTrue(getattr(publisher.counters[0], 'name') == 'a_update_new') def test_flush_pipeline_cache_multiple_counter(self): @@ -615,23 +571,22 @@ class TestPipeline(base.TestCase): ) self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter, self.test_counter._replace(name='b')]) - self.assertTrue(len(self.publisher.counters) == 0) - self.assertEqual(len(self.publisher.counters), 0) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 0) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertEqual(len(self.publisher.counters), CACHE_SIZE) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update_new') - self.assertTrue(getattr(self.publisher.counters[1], 'name') - == 'b_update_new') + self.assertEqual(len(publisher.counters), CACHE_SIZE) + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update_new') + self.assertEqual(getattr(publisher.counters[1], 'name'), + 'b_update_new') def test_flush_pipeline_cache_before_publisher(self): self.pipeline_cfg[0]['transformers'].append({ @@ -639,16 +594,16 @@ class TestPipeline(base.TestCase): 'parameters': {} }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) pipe = pipeline_manager.pipelines[0] + publisher = pipe.publishers[0] pipe.publish_counter(None, self.test_counter, None) - self.assertTrue(len(self.publisher.counters) == 0) + self.assertEqual(len(publisher.counters), 0) pipe.flush(None, None) - self.assertTrue(len(self.publisher.counters) == 1) - self.assertTrue(getattr(self.publisher.counters[0], 'name') - == 'a_update') + self.assertEqual(len(publisher.counters), 1) + self.assertEqual(getattr(publisher.counters[0], 'name'), + 'a_update') def test_variable_counter(self): self.pipeline_cfg = [{ @@ -662,17 +617,17 @@ class TestPipeline(base.TestCase): 'publishers': ["test"], }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager, - self.publisher_manager) + self.transformer_manager) self.test_counter = self.test_counter._replace(name='a:b') with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) - self.assertTrue(len(self.publisher.counters) == 1) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 1) self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.publisher.counters[0], "name") - == 'a:b_update') + self.assertEqual(getattr(publisher.counters[0], "name"), + 'a:b_update') self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a:b')