From 4f87a14c7e204c0a52f47615314c08b06956a3f1 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Mon, 29 Jul 2013 15:01:59 +0200 Subject: [PATCH] Remove source as a publisher argument We used to have source as a publisher argument, but this anyway ends being included in its Sample itself. So rather than complicating the internal publishing API, simplify it and just include source as a Sample field. Change-Id: Idb9d0c2b5969318dd16394a81d7d61c67f613892 --- bin/ceilometer-send-counter | 3 +- ceilometer/agent.py | 5 +- ceilometer/api/controllers/v2.py | 11 +- ceilometer/collector/service.py | 3 +- ceilometer/notifier.py | 6 +- ceilometer/objectstore/swift_middleware.py | 8 +- ceilometer/pipeline.py | 36 ++--- ceilometer/publisher/__init__.py | 2 +- ceilometer/publisher/file.py | 3 +- ceilometer/publisher/rpc.py | 5 +- ceilometer/publisher/test.py | 3 +- ceilometer/publisher/udp.py | 6 +- ceilometer/sample.py | 71 ++++----- ceilometer/transformer/__init__.py | 6 +- ceilometer/transformer/accumulator.py | 4 +- ceilometer/transformer/conversions.py | 6 +- tests/agentbase.py | 33 +++- tests/collector/test_service.py | 4 +- tests/objectstore/test_swift_middleware.py | 9 +- tests/publisher/test_file.py | 11 +- tests/publisher/test_rpc_publisher.py | 37 ++--- tests/publisher/test_udp.py | 22 +-- tests/test_pipeline.py | 167 +++++++++++++++------ 23 files changed, 261 insertions(+), 200 deletions(-) diff --git a/bin/ceilometer-send-counter b/bin/ceilometer-send-counter index a98a90ec..9a53c727 100755 --- a/bin/ceilometer-send-counter +++ b/bin/ceilometer-send-counter @@ -87,8 +87,7 @@ pipeline_manager = pipeline.setup_pipeline( ), ) -with pipeline_manager.publisher(context.get_admin_context(), - cfg.CONF.sample_source) as p: +with pipeline_manager.publisher(context.get_admin_context()) as p: p([sample.Sample( name=cfg.CONF.counter_name, type=cfg.CONF.counter_type, diff --git a/ceilometer/agent.py b/ceilometer/agent.py index 6cc85cd3..d1d690cd 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -19,8 +19,6 @@ import abc import itertools -from oslo.config import cfg - from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer import pipeline @@ -39,8 +37,7 @@ class PollingTask(object): self.manager = agent_manager self.pollsters = set() self.publish_context = pipeline.PublishContext( - agent_manager.context, - cfg.CONF.sample_source) + agent_manager.context) def add(self, pollster, pipelines): self.publish_context.add_pipelines(pipelines) diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index 3a9ef263..a9602500 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -42,7 +42,6 @@ from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer.openstack.common import timeutils from ceilometer import sample -from ceilometer import pipeline from ceilometer import storage from ceilometer.api import acl @@ -529,11 +528,8 @@ class MeterController(rest.RestController): s.timestamp = now s.source = '%s:%s' % (s.project_id, source) - with pipeline.PublishContext( - context.get_admin_context(), - source, - pecan.request.pipeline_manager.pipelines, - ) as publisher: + with pecan.request.pipeline_manager.publisher( + context.get_admin_context()) as publisher: publisher([sample.Sample( name=s.counter_name, type=s.counter_type, @@ -543,7 +539,8 @@ class MeterController(rest.RestController): project_id=s.project_id, resource_id=s.resource_id, timestamp=s.timestamp.isoformat(), - resource_metadata=s.resource_metadata) for s in samples]) + resource_metadata=s.resource_metadata, + source=source) for s in samples]) # TODO(asalkeld) this is not ideal, it would be nice if the publisher # returned the created sample message with message id (or at least the diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index eea642b2..6f387fdc 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -250,8 +250,7 @@ class CollectorService(rpc_service.Service): handler = ext.obj if notification['event_type'] in handler.get_event_types(): ctxt = context.get_admin_context() - with self.pipeline_manager.publisher(ctxt, - cfg.CONF.sample_source) as p: + with self.pipeline_manager.publisher(ctxt) as p: # FIXME(dhellmann): Spawn green thread? p(list(handler.process_notification(notification))) diff --git a/ceilometer/notifier.py b/ceilometer/notifier.py index eae2d4ad..eeea2a69 100644 --- a/ceilometer/notifier.py +++ b/ceilometer/notifier.py @@ -20,14 +20,11 @@ from ceilometer import pipeline from ceilometer import transformer from ceilometer.openstack.common import context as req_context from ceilometer.openstack.common import log as logging -from oslo.config import cfg from stevedore import extension LOG = logging.getLogger(__name__) -cfg.CONF.import_opt('sample_source', 'ceilometer.sample') - _notification_manager = None _pipeline_manager = None @@ -63,8 +60,7 @@ def _process_notification_for_ext(ext, context, notification): handler = ext.obj if notification['event_type'] in handler.get_event_types(): - with _pipeline_manager.publisher(context, - cfg.CONF.sample_source) as p: + with _pipeline_manager.publisher(context) as p: # FIXME(dhellmann): Spawn green thread? p(list(handler.process_notification(notification))) diff --git a/ceilometer/objectstore/swift_middleware.py b/ceilometer/objectstore/swift_middleware.py index e1ef5fce..3919ab42 100644 --- a/ceilometer/objectstore/swift_middleware.py +++ b/ceilometer/objectstore/swift_middleware.py @@ -37,7 +37,6 @@ metadata_headers = X-TEST from __future__ import absolute_import -from oslo.config import cfg from swift.common.utils import split_path import webob @@ -130,11 +129,8 @@ class CeilometerMiddleware(object): resource_metadata['http_header_%s' % header] = req.headers.get( header.upper()) - with pipeline.PublishContext( - context.get_admin_context(), - cfg.CONF.sample_source, - self.pipeline_manager.pipelines, - ) as publisher: + with self.pipeline_manager.publisher( + context.get_admin_context()) as publisher: if bytes_received: publisher([sample.Sample( name='storage.objects.incoming.bytes', diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 8d4dea27..f8f49c09 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -49,10 +49,9 @@ class PipelineException(Exception): class PublishContext(object): - def __init__(self, context, source, pipelines=[]): + def __init__(self, context, pipelines=[]): self.pipelines = set(pipelines) self.context = context - self.source = source def add_pipelines(self, pipelines): self.pipelines.update(pipelines) @@ -61,13 +60,12 @@ class PublishContext(object): def p(counters): for p in self.pipelines: p.publish_counters(self.context, - counters, - self.source) + counters) return p def __exit__(self, exc_type, exc_value, traceback): for p in self.pipelines: - p.flush(self.context, self.source) + p.flush(self.context) class Pipeline(object): @@ -175,10 +173,10 @@ class Pipeline(object): return transformers - def _transform_counter(self, start, ctxt, counter, source): + def _transform_counter(self, start, ctxt, counter): try: for transformer in self.transformers[start:]: - counter = transformer.handle_sample(ctxt, counter, source) + counter = transformer.handle_sample(ctxt, counter) if not counter: LOG.debug("Pipeline %s: Counter dropped by transformer %s", self, transformer) @@ -190,7 +188,7 @@ class Pipeline(object): self, transformer, counter) LOG.exception(err) - def _publish_counters(self, start, ctxt, counters, source): + def _publish_counters(self, start, ctxt, counters): """Push counter into pipeline for publishing. param start: the first transformer that the counter will be injected. @@ -198,7 +196,6 @@ class Pipeline(object): may emit counters param ctxt: execution context from the manager or service param counters: counter list - param source: counter source """ @@ -206,7 +203,7 @@ class Pipeline(object): for counter in counters: LOG.debug("Pipeline %s: Transform counter %s from %s transformer", self, counter, start) - counter = self._transform_counter(start, ctxt, counter, source) + counter = self._transform_counter(start, ctxt, counter) if counter: transformed_counters.append(counter) @@ -214,22 +211,22 @@ class Pipeline(object): for p in self.publishers: try: - p.publish_counters(ctxt, transformed_counters, source) + p.publish_counters(ctxt, transformed_counters) except Exception: LOG.exception("Pipeline %s: Continue after error " "from publisher %s", self, p) LOG.audit("Pipeline %s: Published counters", self) - def publish_counter(self, ctxt, counter, source): - self.publish_counters(ctxt, [counter], source) + def publish_counter(self, ctxt, counter): + self.publish_counters(ctxt, [counter]) - def publish_counters(self, ctxt, counters, source): + def publish_counters(self, ctxt, counters): for counter_name, counters in itertools.groupby( sorted(counters, key=lambda c: c.name), lambda c: c.name): if self.support_counter(counter_name): - self._publish_counters(0, ctxt, counters, source) + self._publish_counters(0, ctxt, counters) # (yjiang5) To support counters like instance:m1.tiny, # which include variable part at the end starting with ':'. @@ -252,15 +249,14 @@ class Pipeline(object): else: return counter_name in self.counters - def flush(self, ctxt, source): + def flush(self, ctxt): """Flush data after all counter have been injected to pipeline.""" LOG.audit("Flush pipeline %s", self) for (i, transformer) in enumerate(self.transformers): try: self._publish_counters(i + 1, ctxt, - list(transformer.flush(ctxt, source)), - source) + list(transformer.flush(ctxt))) except Exception as err: LOG.warning( "Pipeline %s: Error flushing " @@ -327,13 +323,13 @@ class PipelineManager(object): self.pipelines = [Pipeline(pipedef, transformer_manager) for pipedef in cfg] - def publisher(self, context, source): + def publisher(self, context): """Build a new Publisher for these manager pipelines. :param context: The context. :param source: Counter source. """ - return PublishContext(context, source, self.pipelines) + return PublishContext(context, self.pipelines) def setup_pipeline(transformer_manager): diff --git a/ceilometer/publisher/__init__.py b/ceilometer/publisher/__init__.py index 9783ac13..9ffbae2b 100644 --- a/ceilometer/publisher/__init__.py +++ b/ceilometer/publisher/__init__.py @@ -43,5 +43,5 @@ class PublisherBase(object): pass @abc.abstractmethod - def publish_counters(self, context, counters, source): + def publish_counters(self, context, counters): "Publish counters into final conduit." diff --git a/ceilometer/publisher/file.py b/ceilometer/publisher/file.py index fb046ee7..919db539 100644 --- a/ceilometer/publisher/file.py +++ b/ceilometer/publisher/file.py @@ -86,12 +86,11 @@ class FilePublisher(publisher.PublisherBase): rfh.setLevel(logging.INFO) self.publisher_logger.addHandler(rfh) - def publish_counters(self, context, counters, source): + def publish_counters(self, context, counters): """Send a metering message for publishing :param context: Execution context from the service or RPC call :param counter: Counter from pipeline after transformation - :param source: counter source """ if self.publisher_logger: self.publisher_logger.info(counters) diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/rpc.py index 603caad3..8df2d52e 100644 --- a/ceilometer/publisher/rpc.py +++ b/ceilometer/publisher/rpc.py @@ -137,12 +137,11 @@ class RPCPublisher(publisher.PublisherBase): % self.policy) self.policy = 'default' - def publish_counters(self, context, counters, source): + def publish_counters(self, context, counters): """Publish counters on RPC. :param context: Execution context from the service or RPC call. :param counters: Counters from pipeline after transformation. - :param source: Counter source. """ @@ -150,7 +149,7 @@ class RPCPublisher(publisher.PublisherBase): meter_message_from_counter( counter, cfg.CONF.publisher_rpc.metering_secret, - source) + counter.source) for counter in counters ] diff --git a/ceilometer/publisher/test.py b/ceilometer/publisher/test.py index 2b2ffe3a..8dd0bf30 100644 --- a/ceilometer/publisher/test.py +++ b/ceilometer/publisher/test.py @@ -27,11 +27,10 @@ class TestPublisher(publisher.PublisherBase): def __init__(self, parsed_url): self.counters = [] - def publish_counters(self, context, counters, source): + def publish_counters(self, context, counters): """Send a metering message for publishing :param context: Execution context from the service or RPC call :param counter: Counter from pipeline after transformation - :param source: counter source """ self.counters.extend(counters) diff --git a/ceilometer/publisher/udp.py b/ceilometer/publisher/udp.py index a6b8806b..6cf9feda 100644 --- a/ceilometer/publisher/udp.py +++ b/ceilometer/publisher/udp.py @@ -41,17 +41,15 @@ class UDPPublisher(publisher.PublisherBase): self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - def publish_counters(self, context, counters, source): + def publish_counters(self, context, counters): """Send a metering message for publishing :param context: Execution context from the service or RPC call :param counter: Counter from pipeline after transformation - :param source: counter source """ for counter in counters: - msg = counter._asdict() - msg['source'] = source + msg = counter.as_dict() host = self.host port = self.port LOG.debug(_("Publishing counter %(msg)s over UDP to " diff --git a/ceilometer/sample.py b/ceilometer/sample.py index 60c63b67..7309468c 100644 --- a/ceilometer/sample.py +++ b/ceilometer/sample.py @@ -1,8 +1,10 @@ # -*- encoding: utf-8 -*- # # Copyright © 2012 New Dream Network, LLC (DreamHost) +# Copyright © 2013 eNovance # -# Author: Doug Hellmann +# Authors: Doug Hellmann +# Julien Danjou # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -22,7 +24,6 @@ ensure that all of the appropriate fields have been filled in by the plugins that create them. """ -import collections import copy from oslo.config import cfg @@ -40,6 +41,7 @@ cfg.CONF.register_opts(OPTS) # Fields explanation: # +# Source: the source of this sample # Name: the name of the meter, must be unique # Type: the type of the meter, must be either: # - cumulative: the value is incremented and never reset to 0 @@ -52,40 +54,41 @@ cfg.CONF.register_opts(OPTS) # Resource ID: the resource ID # Timestamp: when the sample has been read # Resource metadata: various metadata -Sample = collections.namedtuple('Sample', - ' '.join([ - 'name', - 'type', - 'unit', - 'volume', - 'user_id', - 'project_id', - 'resource_id', - 'timestamp', - 'resource_metadata', - ])) +class Sample(object): + def __init__(self, name, type, unit, volume, user_id, project_id, + resource_id, timestamp, resource_metadata, source=None): + self.name = name + self.type = type + self.unit = unit + self.volume = volume + self.user_id = user_id + self.project_id = project_id + self.resource_id = resource_id + self.timestamp = timestamp + self.resource_metadata = resource_metadata + self.source = source or cfg.CONF.sample_source + + def as_dict(self): + return copy.copy(self.__dict__) + + @classmethod + def from_notification(cls, name, type, volume, unit, + user_id, project_id, resource_id, + message): + metadata = copy.copy(message['payload']) + metadata['event_type'] = message['event_type'] + metadata['host'] = message['publisher_id'] + return cls(name=name, + type=type, + volume=volume, + unit=unit, + user_id=user_id, + project_id=project_id, + resource_id=resource_id, + timestamp=message['timestamp'], + resource_metadata=metadata) TYPE_GAUGE = 'gauge' TYPE_DELTA = 'delta' TYPE_CUMULATIVE = 'cumulative' - - -def from_notification(cls, name, type, volume, unit, - user_id, project_id, resource_id, - message): - metadata = copy.copy(message['payload']) - metadata['event_type'] = message['event_type'] - metadata['host'] = message['publisher_id'] - return cls(name=name, - type=type, - volume=volume, - unit=unit, - user_id=user_id, - project_id=project_id, - resource_id=resource_id, - timestamp=message['timestamp'], - resource_metadata=metadata) - - -Sample.from_notification = classmethod(from_notification) diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py index 176ef0b4..7a37962a 100644 --- a/ceilometer/transformer/__init__.py +++ b/ceilometer/transformer/__init__.py @@ -53,18 +53,16 @@ class TransformerBase(object): super(TransformerBase, self).__init__() @abc.abstractmethod - def handle_sample(self, context, counter, source): + def handle_sample(self, context, counter): """Transform a counter. :param context: Passed from the data collector. :param counter: A counter. - :param source: Passed from data collector. """ - def flush(self, context, source): + def flush(self, context): """Flush counters cached previously. :param context: Passed from the data collector. - :param source: Source of counters that are being published. """ return [] diff --git a/ceilometer/transformer/accumulator.py b/ceilometer/transformer/accumulator.py index 1ae28008..15a9b4ae 100644 --- a/ceilometer/transformer/accumulator.py +++ b/ceilometer/transformer/accumulator.py @@ -31,13 +31,13 @@ class TransformerAccumulator(transformer.TransformerBase): self.size = size super(TransformerAccumulator, self).__init__(**kwargs) - def handle_sample(self, context, counter, source): + def handle_sample(self, context, counter): if self.size >= 1: self.counters.append(counter) else: return counter - def flush(self, context, source): + def flush(self, context): if len(self.counters) >= self.size: x = self.counters self.counters = [] diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index a4fef549..d0a0d5d1 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -74,7 +74,7 @@ class ScalingTransformer(transformer.TransformerBase): """Apply the scaling factor (either a straight multiplicative factor or else a string to be eval'd). """ - ns = Namespace(counter._asdict()) + ns = Namespace(counter.as_dict()) return ((eval(scale, {}, ns) if isinstance(scale, basestring) else counter.volume * scale) if scale else counter.volume) @@ -95,7 +95,7 @@ class ScalingTransformer(transformer.TransformerBase): resource_metadata=counter.resource_metadata ) - def handle_sample(self, context, counter, source): + def handle_sample(self, context, counter): """Handle a sample, converting if necessary.""" LOG.debug('handling counter %s', (counter,)) if (self.source.get('unit', counter.unit) == counter.unit): @@ -117,7 +117,7 @@ class RateOfChangeTransformer(ScalingTransformer): self.cache = {} super(RateOfChangeTransformer, self).__init__(**kwargs) - def handle_sample(self, context, counter, source): + def handle_sample(self, context, counter): """Handle a sample, converting if necessary.""" LOG.debug('handling counter %s', (counter,)) key = counter.name + counter.resource_id diff --git a/tests/agentbase.py b/tests/agentbase.py index 3bd89f93..d1c38e31 100644 --- a/tests/agentbase.py +++ b/tests/agentbase.py @@ -71,15 +71,42 @@ class BaseAgentManagerTestCase(base.TestCase): class PollsterAnother(TestPollster): counters = [] - test_data = default_test_data._replace(name='testanother') + test_data = sample.Sample( + name='testanother', + type=default_test_data.type, + unit=default_test_data.unit, + volume=default_test_data.volume, + user_id=default_test_data.user_id, + project_id=default_test_data.project_id, + resource_id=default_test_data.resource_id, + timestamp=default_test_data.timestamp, + resource_metadata=default_test_data.resource_metadata) class PollsterException(TestPollsterException): counters = [] - test_data = default_test_data._replace(name='testexception') + test_data = sample.Sample( + name='testexception', + type=default_test_data.type, + unit=default_test_data.unit, + volume=default_test_data.volume, + user_id=default_test_data.user_id, + project_id=default_test_data.project_id, + resource_id=default_test_data.resource_id, + timestamp=default_test_data.timestamp, + resource_metadata=default_test_data.resource_metadata) class PollsterExceptionAnother(TestPollsterException): counters = [] - test_data = default_test_data._replace(name='testexceptionanother') + test_data = sample.Sample( + name='testexceptionanother', + type=default_test_data.type, + unit=default_test_data.unit, + volume=default_test_data.volume, + user_id=default_test_data.user_id, + project_id=default_test_data.project_id, + resource_id=default_test_data.resource_id, + timestamp=default_test_data.timestamp, + resource_metadata=default_test_data.resource_metadata) def setup_pipeline(self): self.transformer_manager = transformer.TransformerExtensionManager( diff --git a/tests/collector/test_service.py b/tests/collector/test_service.py index 127dd86e..8d4cdab6 100644 --- a/tests/collector/test_service.py +++ b/tests/collector/test_service.py @@ -115,7 +115,7 @@ class TestUDPCollectorService(TestCollector): def setUp(self): super(TestUDPCollectorService, self).setUp() self.srv = service.UDPCollectorService() - self.counter = dict(sample.Sample( + self.counter = sample.Sample( name='foobar', type='bad', unit='F', @@ -125,7 +125,7 @@ class TestUDPCollectorService(TestCollector): resource_id='cat', timestamp='NOW!', resource_metadata={}, - )._asdict()) + ).as_dict() def test_service_has_storage_conn(self): srv = service.UDPCollectorService() diff --git a/tests/objectstore/test_swift_middleware.py b/tests/objectstore/test_swift_middleware.py index 3860eb81..e3ccbeb7 100644 --- a/tests/objectstore/test_swift_middleware.py +++ b/tests/objectstore/test_swift_middleware.py @@ -43,24 +43,21 @@ class FakeApp(object): class TestSwiftMiddleware(base.TestCase): - class _faux_pipeline_manager(object): + class _faux_pipeline_manager(pipeline.PipelineManager): class _faux_pipeline(object): def __init__(self, pipeline_manager): self.pipeline_manager = pipeline_manager self.counters = [] - def publish_counters(self, ctxt, counters, source): + def publish_counters(self, ctxt, counters): self.counters.extend(counters) - def flush(self, context, source): + def flush(self, context): pass def __init__(self): self.pipelines = [self._faux_pipeline(self)] - def flush(self, ctx, source): - pass - def _faux_setup_pipeline(self, transformer_manager): return self.pipeline_manager diff --git a/tests/publisher/test_file.py b/tests/publisher/test_file.py index bf895602..62249f21 100644 --- a/tests/publisher/test_file.py +++ b/tests/publisher/test_file.py @@ -66,16 +66,13 @@ class TestFilePublisher(base.TestCase): ), ] - COUNTER_SOURCE = 'testsource' - def test_file_publisher(self): # Test valid configurations parsed_url = urlsplit( 'file:///tmp/log_file?max_bytes=50&backup_count=3') publisher = file.FilePublisher(parsed_url) publisher.publish_counters(None, - self.test_data, - self.COUNTER_SOURCE) + self.test_data) handler = publisher.publisher_logger.handlers[0] self.assertTrue(isinstance(handler, @@ -91,8 +88,7 @@ class TestFilePublisher(base.TestCase): 'file:///tmp/log_file_plain') publisher = file.FilePublisher(parsed_url) publisher.publish_counters(None, - self.test_data, - self.COUNTER_SOURCE) + self.test_data) handler = publisher.publisher_logger.handlers[0] self.assertTrue(isinstance(handler, @@ -109,7 +105,6 @@ class TestFilePublisher(base.TestCase): 'file:///tmp/log_file_bad?max_bytes=yus&backup_count=5y') publisher = file.FilePublisher(parsed_url) publisher.publish_counters(None, - self.test_data, - self.COUNTER_SOURCE) + self.test_data) self.assertIsNone(publisher.publisher_logger) diff --git a/tests/publisher/test_rpc_publisher.py b/tests/publisher/test_rpc_publisher.py index d3474e7c..602a0088 100644 --- a/tests/publisher/test_rpc_publisher.py +++ b/tests/publisher/test_rpc_publisher.py @@ -222,8 +222,7 @@ class TestPublish(base.TestCase): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://')) publisher.publish_counters(None, - self.test_data, - 'test') + self.test_data) self.assertEqual(len(self.published), 1) self.assertEqual(self.published[0][0], cfg.CONF.publisher_rpc.metering_topic) @@ -235,8 +234,7 @@ class TestPublish(base.TestCase): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?target=custom_procedure_call')) publisher.publish_counters(None, - self.test_data, - 'test') + self.test_data) self.assertEqual(len(self.published), 1) self.assertEqual(self.published[0][0], cfg.CONF.publisher_rpc.metering_topic) @@ -248,8 +246,7 @@ class TestPublish(base.TestCase): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?per_meter_topic=1')) publisher.publish_counters(None, - self.test_data, - 'test') + self.test_data) self.assertEqual(len(self.published), 4) for topic, rpc_call in self.published: meters = rpc_call['args']['data'] @@ -276,7 +273,7 @@ class TestPublish(base.TestCase): self.assertRaises( SystemExit, publisher.publish_counters, - None, self.test_data, 'test') + None, self.test_data) self.assertEqual(publisher.policy, 'default') self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 0) @@ -288,7 +285,7 @@ class TestPublish(base.TestCase): self.assertRaises( SystemExit, publisher.publish_counters, - None, self.test_data, 'test') + None, self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 0) @@ -299,7 +296,7 @@ class TestPublish(base.TestCase): self.assertRaises( SystemExit, publisher.publish_counters, - None, self.test_data, 'test') + None, self.test_data) self.assertEqual(publisher.policy, 'default') self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 0) @@ -309,8 +306,7 @@ class TestPublish(base.TestCase): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?policy=drop')) publisher.publish_counters(None, - self.test_data, - 'test') + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 0) @@ -319,8 +315,7 @@ class TestPublish(base.TestCase): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?policy=queue')) publisher.publish_counters(None, - self.test_data, - 'test') + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 1) @@ -329,15 +324,13 @@ class TestPublish(base.TestCase): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?policy=queue')) publisher.publish_counters(None, - self.test_data, - 'test') + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 1) self.rpc_unreachable = False publisher.publish_counters(None, - self.test_data, - 'test') + self.test_data) self.assertEqual(len(self.published), 2) self.assertEqual(len(publisher.local_queue), 0) @@ -347,9 +340,10 @@ class TestPublish(base.TestCase): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?policy=queue&max_queue_length=3')) for i in range(0, 5): + for s in self.test_data: + s.source = 'test-%d' % i publisher.publish_counters(None, - self.test_data, - 'test-%d' % i) + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 3) self.assertEqual( @@ -370,9 +364,10 @@ class TestPublish(base.TestCase): publisher = rpc.RPCPublisher( network_utils.urlsplit('rpc://?policy=queue')) for i in range(0, 2000): + for s in self.test_data: + s.source = 'test-%d' % i publisher.publish_counters(None, - self.test_data, - 'test-%d' % i) + self.test_data) self.assertEqual(len(self.published), 0) self.assertEqual(len(publisher.local_queue), 1024) self.assertEqual( diff --git a/tests/publisher/test_udp.py b/tests/publisher/test_udp.py index 5678900b..db9722f6 100644 --- a/tests/publisher/test_udp.py +++ b/tests/publisher/test_udp.py @@ -29,6 +29,9 @@ from ceilometer.tests import base from ceilometer.openstack.common import network_utils +COUNTER_SOURCE = 'testsource' + + class TestUDPPublisher(base.TestCase): test_data = [ @@ -42,6 +45,7 @@ class TestUDPPublisher(base.TestCase): resource_id='test_run_tasks', timestamp=datetime.datetime.utcnow().isoformat(), resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, ), sample.Sample( name='test', @@ -53,6 +57,7 @@ class TestUDPPublisher(base.TestCase): resource_id='test_run_tasks', timestamp=datetime.datetime.utcnow().isoformat(), resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, ), sample.Sample( name='test2', @@ -64,6 +69,7 @@ class TestUDPPublisher(base.TestCase): resource_id='test_run_tasks', timestamp=datetime.datetime.utcnow().isoformat(), resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, ), sample.Sample( name='test2', @@ -75,6 +81,7 @@ class TestUDPPublisher(base.TestCase): resource_id='test_run_tasks', timestamp=datetime.datetime.utcnow().isoformat(), resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, ), sample.Sample( name='test3', @@ -86,6 +93,7 @@ class TestUDPPublisher(base.TestCase): resource_id='test_run_tasks', timestamp=datetime.datetime.utcnow().isoformat(), resource_metadata={'name': 'TestPublish'}, + source=COUNTER_SOURCE, ), ] @@ -99,8 +107,6 @@ class TestUDPPublisher(base.TestCase): return udp_socket return _fake_socket_socket - COUNTER_SOURCE = 'testsource' - def test_published(self): self.data_sent = [] with mock.patch('socket.socket', @@ -108,8 +114,7 @@ class TestUDPPublisher(base.TestCase): publisher = udp.UDPPublisher( network_utils.urlsplit('udp://somehost')) publisher.publish_counters(None, - self.test_data, - self.COUNTER_SOURCE) + self.test_data) self.assertEqual(len(self.data_sent), 5) @@ -117,10 +122,6 @@ class TestUDPPublisher(base.TestCase): for data, dest in self.data_sent: counter = msgpack.loads(data) - self.assertEqual(counter['source'], self.COUNTER_SOURCE) - # Remove source because our test Counters don't have it, so the - # comparison would fail later - del counter['source'] sent_counters.append(counter) # Check destination @@ -129,7 +130,7 @@ class TestUDPPublisher(base.TestCase): # Check that counters are equal self.assertEqual(sorted(sent_counters), - sorted([dict(d._asdict()) for d in self.test_data])) + sorted([dict(d.as_dict()) for d in self.test_data])) @staticmethod def _raise_ioerror(): @@ -146,5 +147,4 @@ class TestUDPPublisher(base.TestCase): publisher = udp.UDPPublisher( network_utils.urlsplit('udp://localhost')) publisher.publish_counters(None, - self.test_data, - self.COUNTER_SOURCE) + self.test_data) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 80e9ca45..c779a6e2 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -67,7 +67,7 @@ class TestPipeline(base.TestCase): return fake_drivers[url](url) class PublisherClassException(publisher.PublisherBase): - def publish_counters(self, ctxt, counters, source): + def publish_counters(self, ctxt, counters): raise Exception() class TransformerClass(transformer.TransformerBase): @@ -77,13 +77,23 @@ class TestPipeline(base.TestCase): self.__class__.samples = [] self.append_name = append_name - def flush(self, ctxt, source): + def flush(self, ctxt): return [] - def handle_sample(self, ctxt, counter, source): + def handle_sample(self, ctxt, counter): self.__class__.samples.append(counter) newname = getattr(counter, 'name') + self.append_name - return counter._replace(name=newname) + return sample.Sample( + name=newname, + type=counter.type, + volume=counter.volume, + unit=counter.unit, + user_id=counter.user_id, + project_id=counter.project_id, + resource_id=counter.resource_id, + timestamp=counter.timestamp, + resource_metadata=counter.resource_metadata, + ) class TransformerClassDrop(transformer.TransformerBase): samples = [] @@ -91,11 +101,11 @@ class TestPipeline(base.TestCase): def __init__(self): self.__class__.samples = [] - def handle_sample(self, ctxt, counter, source): + def handle_sample(self, ctxt, counter): self.__class__.samples.append(counter) class TransformerClassException(object): - def handle_sample(self, ctxt, counter, source): + def handle_sample(self, ctxt, counter): raise Exception() def setUp(self): @@ -204,7 +214,7 @@ class TestPipeline(base.TestCase): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -220,14 +230,25 @@ class TestPipeline(base.TestCase): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 1) - self.test_counter = self.test_counter._replace(name='b') - with pipeline_manager.publisher(None, None) as p: + self.test_counter = sample.Sample( + name='b', + type=self.test_counter.type, + volume=self.test_counter.volume, + unit=self.test_counter.unit, + user_id=self.test_counter.user_id, + project_id=self.test_counter.project_id, + resource_id=self.test_counter.resource_id, + timestamp=self.test_counter.timestamp, + resource_metadata=self.test_counter.resource_metadata, + ) + + with pipeline_manager.publisher(None) as p: p([self.test_counter]) self.assertEqual(len(publisher.counters), 2) @@ -240,7 +261,7 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -260,7 +281,7 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 1) @@ -273,7 +294,7 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -309,12 +330,22 @@ class TestPipeline(base.TestCase): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) - self.test_counter = self.test_counter._replace(name='b') + self.test_counter = sample.Sample( + name='b', + type=self.test_counter.type, + volume=self.test_counter.volume, + unit=self.test_counter.unit, + user_id=self.test_counter.user_id, + project_id=self.test_counter.project_id, + resource_id=self.test_counter.resource_id, + timestamp=self.test_counter.timestamp, + resource_metadata=self.test_counter.resource_metadata, + ) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -349,12 +380,22 @@ class TestPipeline(base.TestCase): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) - self.test_counter = self.test_counter._replace(name='b') + self.test_counter = sample.Sample( + name='b', + type=self.test_counter.type, + volume=self.test_counter.volume, + unit=self.test_counter.unit, + user_id=self.test_counter.user_id, + project_id=self.test_counter.project_id, + resource_id=self.test_counter.resource_id, + timestamp=self.test_counter.timestamp, + resource_metadata=self.test_counter.resource_metadata, + ) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -370,7 +411,7 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['transformers'] = None pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 1) @@ -380,7 +421,7 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['transformers'] = [] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 1) @@ -400,7 +441,7 @@ class TestPipeline(base.TestCase): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -432,7 +473,7 @@ class TestPipeline(base.TestCase): ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) self.assertTrue(len(self.TransformerClass.samples) == 2) @@ -468,7 +509,7 @@ class TestPipeline(base.TestCase): ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -485,7 +526,7 @@ class TestPipeline(base.TestCase): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -501,7 +542,7 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['publishers'] = ['except://', 'new://'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) new_publisher = pipeline_manager.pipelines[0].publishers[1] @@ -513,9 +554,19 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter, - self.test_counter._replace(name='b')]) + sample.Sample( + name='b', + type=self.test_counter.type, + volume=self.test_counter.volume, + unit=self.test_counter.unit, + user_id=self.test_counter.user_id, + project_id=self.test_counter.project_id, + resource_id=self.test_counter.resource_id, + timestamp=self.test_counter.timestamp, + resource_metadata=self.test_counter.resource_metadata, + )]) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 2) @@ -543,17 +594,17 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counter(None, self.test_counter, None) + pipe.publish_counter(None, self.test_counter) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 0) - pipe.flush(None, None) + pipe.flush(None) self.assertEqual(len(publisher.counters), 0) - pipe.publish_counter(None, self.test_counter, None) - pipe.flush(None, None) + pipe.publish_counter(None, self.test_counter) + pipe.flush(None) self.assertEqual(len(publisher.counters), 0) for i in range(CACHE_SIZE - 2): - pipe.publish_counter(None, self.test_counter, None) - pipe.flush(None, None) + pipe.publish_counter(None, self.test_counter) + pipe.flush(None) self.assertEqual(len(publisher.counters), CACHE_SIZE) self.assertTrue(getattr(publisher.counters[0], 'name') == 'a_update_new') @@ -578,14 +629,24 @@ class TestPipeline(base.TestCase): self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter, - self.test_counter._replace(name='b')]) + sample.Sample( + name='b', + type=self.test_counter.type, + volume=self.test_counter.volume, + unit=self.test_counter.unit, + user_id=self.test_counter.user_id, + project_id=self.test_counter.project_id, + resource_id=self.test_counter.resource_id, + timestamp=self.test_counter.timestamp, + resource_metadata=self.test_counter.resource_metadata, + )]) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 0) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) self.assertEqual(len(publisher.counters), CACHE_SIZE) @@ -604,9 +665,9 @@ class TestPipeline(base.TestCase): pipe = pipeline_manager.pipelines[0] publisher = pipe.publishers[0] - pipe.publish_counter(None, self.test_counter, None) + pipe.publish_counter(None, self.test_counter) self.assertEqual(len(publisher.counters), 0) - pipe.flush(None, None) + pipe.flush(None) self.assertEqual(len(publisher.counters), 1) self.assertEqual(getattr(publisher.counters[0], 'name'), 'a_update') @@ -625,9 +686,19 @@ class TestPipeline(base.TestCase): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) - self.test_counter = self.test_counter._replace(name='a:b') + self.test_counter = sample.Sample( + name='a:b', + type=self.test_counter.type, + volume=self.test_counter.volume, + unit=self.test_counter.unit, + user_id=self.test_counter.user_id, + project_id=self.test_counter.project_id, + resource_id=self.test_counter.resource_id, + timestamp=self.test_counter.timestamp, + resource_metadata=self.test_counter.resource_metadata, + ) - with pipeline_manager.publisher(None, None) as p: + with pipeline_manager.publisher(None) as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -670,10 +741,10 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counters(None, counters, None) + pipe.publish_counters(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 1) - pipe.flush(None, None) + pipe.flush(None) self.assertEqual(len(publisher.counters), 1) cpu_mins = publisher.counters[-1] self.assertEquals(getattr(cpu_mins, 'name'), 'cpu_mins') @@ -723,7 +794,7 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counters(None, counters, None) + pipe.publish_counters(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 2) core_temp = publisher.counters[1] @@ -812,10 +883,10 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counters(None, counters, None) + pipe.publish_counters(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 2) - pipe.flush(None, None) + pipe.flush(None) self.assertEqual(len(publisher.counters), 2) cpu_util = publisher.counters[0] self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util') @@ -896,8 +967,8 @@ class TestPipeline(base.TestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - pipe.publish_counters(None, counters, None) + pipe.publish_counters(None, counters) publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 0) - pipe.flush(None, None) + pipe.flush(None) self.assertEqual(len(publisher.counters), 0)