From c79b5982596317751e555c90b3888a17fee21d51 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Tue, 21 Jul 2015 10:52:39 -0400 Subject: [PATCH] add flexible grouping key all transformers will now require a grouping key. the pipeline will take grouping keys from all transformers in it's definition. using the set of keys, the values will be pulled from the datapoint, hashed, and sent to a pipeline. if no transformers are applied, samples will be grouped by resource_id and events will be grouped by event_type. Implement blueprint distributed-coordinated-notifications Change-Id: Ief462d19655c238e5951881a58e183084d37ac13 --- ceilometer/notification.py | 5 +- ceilometer/pipeline.py | 31 +++-- .../tests/functional/test_notification.py | 116 +++++++++++++++--- ceilometer/tests/pipeline_base.py | 49 ++++++++ ceilometer/transformer/__init__.py | 4 + ceilometer/transformer/accumulator.py | 2 + ceilometer/transformer/arithmetic.py | 2 + ceilometer/transformer/conversions.py | 2 + 8 files changed, 182 insertions(+), 29 deletions(-) diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 41924696..ca0e47e4 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -109,8 +109,9 @@ class NotificationService(service_base.BaseService): if cfg.CONF.notification.workload_partitioning: pipe_manager = pipeline.SamplePipelineTransportManager() for pipe in pipeline_manager.pipelines: + key = pipeline.get_pipeline_grouping_key(pipe) pipe_manager.add_transporter( - (pipe.source.support_meter, + (pipe.source.support_meter, key or ['resource_id'], self._get_notifiers(transport, pipe))) else: pipe_manager = pipeline_manager @@ -126,7 +127,7 @@ class NotificationService(service_base.BaseService): event_pipe_manager = pipeline.EventPipelineTransportManager() for pipe in self.event_pipeline_manager.pipelines: event_pipe_manager.add_transporter( - (pipe.source.support_event, + (pipe.source.support_event, ['event_type'], self._get_notifiers(transport, pipe))) else: event_pipe_manager = self.event_pipeline_manager diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 37c43f4b..d8a8b665 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -125,12 +125,19 @@ class _PipelineTransportManager(object): def __init__(self): self.transporters = [] + @staticmethod + def hash_grouping(datapoint, grouping_keys): + value = '' + for key in grouping_keys or []: + value += datapoint.get(key) if datapoint.get(key) else '' + return hash(value) + def add_transporter(self, transporter): self.transporters.append(transporter) def publisher(self, context): serializer = self.serializer - hash_to_bucketise = self.hash_to_bucketise + hash_grouping = self.hash_grouping transporters = self.transporters filter_attr = self.filter_attr event_type = self.event_type @@ -144,10 +151,11 @@ class _PipelineTransportManager(object): data = [data] if not isinstance(data, list) else data for datapoint in data: serialized_data = serializer(datapoint) - for d_filter, notifiers in transporters: + for d_filter, grouping_keys, notifiers in transporters: if d_filter(serialized_data[filter_attr]): - key = (hash_to_bucketise(serialized_data) % - len(notifiers)) + key = (hash_grouping(serialized_data, + grouping_keys) + % len(notifiers)) notifier = notifiers[key] notifier.sample(context.to_dict(), event_type=event_type, @@ -164,10 +172,6 @@ class SamplePipelineTransportManager(_PipelineTransportManager): filter_attr = 'counter_name' event_type = 'ceilometer.pipeline' - @staticmethod - def hash_to_bucketise(datapoint): - return hash(datapoint['resource_id']) - @staticmethod def serializer(data): return publisher_utils.meter_message_from_counter( @@ -178,10 +182,6 @@ class EventPipelineTransportManager(_PipelineTransportManager): filter_attr = 'event_type' event_type = 'pipeline.event' - @staticmethod - def hash_to_bucketise(datapoint): - return hash(datapoint['event_type']) - @staticmethod def serializer(data): return publisher_utils.message_from_event( @@ -849,3 +849,10 @@ def setup_polling(): """Setup polling manager according to yaml config file.""" cfg_file = cfg.CONF.pipeline_cfg_file return _setup_polling_manager(cfg_file) + + +def get_pipeline_grouping_key(pipe): + keys = [] + for transformer in pipe.sink.transformers: + keys += transformer.grouping_keys + return list(set(keys)) diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 5fe14fa2..e55b6f5b 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -425,10 +425,50 @@ class TestRealNotificationHA(BaseRealNotification): mock_notifier.call_args_list[2][1]['event_type']) self.srv.stop() - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_multiple_agents(self, fake_publisher_cls): - fake_publisher_cls.return_value = self.publisher +class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): + def setup_pipeline(self, transformers): + pipeline = yaml.dump({ + 'sources': [{ + 'name': 'test_pipeline', + 'interval': 5, + 'meters': ['instance', 'memory'], + 'sinks': ['test_sink'] + }], + 'sinks': [{ + 'name': 'test_sink', + 'transformers': transformers, + 'publishers': ['test://'] + }] + }) + if six.PY3: + pipeline = pipeline.encode('utf-8') + + pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline, + prefix="pipeline", + suffix="yaml") + return pipeline_cfg_file + + def setUp(self): + super(TestRealNotificationMultipleAgents, self).setUp() + self.CONF = self.useFixture(fixture_config.Config()).conf + service.prepare_service([]) + self.setup_messaging(self.CONF, 'nova') + + pipeline_cfg_file = self.setup_pipeline(['instance', 'memory']) + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self.CONF.set_override("store_events", False, group="notification") + self.CONF.set_override("disable_non_metric_meters", False, + group="notification") + self.CONF.set_override('workload_partitioning', True, + group='notification') + self.publisher = test_publisher.TestPublisher("") + self.publisher2 = test_publisher.TestPublisher("") + + def _check_notifications(self, fake_publisher_cls): + fake_publisher_cls.side_effect = [self.publisher, self.publisher2] + + self.srv = notification.NotificationService() self.srv2 = notification.NotificationService() with mock.patch('ceilometer.coordination.PartitionCoordinator' '._get_members', return_value=['harry', 'lloyd']): @@ -448,17 +488,63 @@ class TestRealNotificationHA(BaseRealNotification): notifier.info(context.RequestContext(), 'compute.instance.create.end', payload2) self.expected_samples = 4 - self.expected_events = 2 start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 60: - if (len(self.publisher.samples) >= self.expected_samples and - len(self.publisher.events) >= self.expected_events): - break - eventlet.sleep(0) - self.srv.stop() - self.srv2.stop() + with mock.patch('six.moves.builtins.hash', lambda x: int(x)): + while timeutils.delta_seconds(start, timeutils.utcnow()) < 60: + if (len(self.publisher.samples + self.publisher2.samples) >= + self.expected_samples): + break + eventlet.sleep(0) + self.srv.stop() + self.srv2.stop() - resources = set(s.resource_id for s in self.publisher.samples) - self.assertEqual(self.expected_samples, len(self.publisher.samples)) - self.assertEqual(self.expected_events, len(self.publisher.events)) - self.assertEqual(set(['1', '0']), resources) + self.assertEqual(2, len(self.publisher.samples)) + self.assertEqual(2, len(self.publisher2.samples)) + self.assertEqual(1, len(set( + s.resource_id for s in self.publisher.samples))) + self.assertEqual(1, len(set( + s.resource_id for s in self.publisher2.samples))) + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_multiple_agents_no_transform(self, fake_publisher_cls): + pipeline_cfg_file = self.setup_pipeline([]) + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self._check_notifications(fake_publisher_cls) + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_multiple_agents_transform(self, fake_publisher_cls): + pipeline_cfg_file = self.setup_pipeline( + [{ + 'name': 'unit_conversion', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_mins', + 'unit': 'min', + 'scale': 'volume'}, + } + }]) + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self._check_notifications(fake_publisher_cls) + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_multiple_agents_multiple_transform(self, fake_publisher_cls): + pipeline_cfg_file = self.setup_pipeline( + [{ + 'name': 'unit_conversion', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_mins', + 'unit': 'min', + 'scale': 'volume'}, + } + }, { + 'name': 'unit_conversion', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_mins', + 'unit': 'min', + 'scale': 'volume'}, + } + }]) + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self._check_notifications(fake_publisher_cls) diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index e4f0a813..4ff335ab 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -86,6 +86,7 @@ class BasePipelineTestCase(base.BaseTestCase): class TransformerClass(transformer.TransformerBase): samples = [] + grouping_keys = ['counter_name'] def __init__(self, append_name='_update'): self.__class__.samples = [] @@ -111,6 +112,7 @@ class BasePipelineTestCase(base.BaseTestCase): class TransformerClassDrop(transformer.TransformerBase): samples = [] + grouping_keys = ['resource_id'] def __init__(self): self.__class__.samples = [] @@ -119,6 +121,8 @@ class BasePipelineTestCase(base.BaseTestCase): self.__class__.samples.append(counter) class TransformerClassException(object): + grouping_keys = ['resource_id'] + @staticmethod def handle_sample(ctxt, counter): raise Exception() @@ -1897,3 +1901,48 @@ class BasePipelineTestCase(base.BaseTestCase): def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() self._exception_create_pipelinemanager() + + def test_get_pipeline_grouping_key(self): + transformer_cfg = [ + { + 'name': 'update', + 'parameters': {} + }, + { + 'name': 'unit_conversion', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_mins', + 'unit': 'min', + 'scale': 'volume'}, + } + }, + { + 'name': 'update', + 'parameters': {} + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + self.assertEqual(set(['resource_id', 'counter_name']), + set(pipeline.get_pipeline_grouping_key( + pipeline_manager.pipelines[0]))) + + def test_get_pipeline_duplicate_grouping_key(self): + transformer_cfg = [ + { + 'name': 'update', + 'parameters': {} + }, + { + 'name': 'update', + 'parameters': {} + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + self.assertEqual(['counter_name'], + pipeline.get_pipeline_grouping_key( + pipeline_manager.pipelines[0])) diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py index c3124c2e..2525e774 100644 --- a/ceilometer/transformer/__init__.py +++ b/ceilometer/transformer/__init__.py @@ -43,6 +43,10 @@ class TransformerBase(object): :param sample: A sample. """ + @abc.abstractproperty + def grouping_keys(self): + """Keys used to group transformer.""" + def flush(self, context): """Flush samples cached previously. diff --git a/ceilometer/transformer/accumulator.py b/ceilometer/transformer/accumulator.py index 0ae9ce9a..2479511d 100644 --- a/ceilometer/transformer/accumulator.py +++ b/ceilometer/transformer/accumulator.py @@ -22,6 +22,8 @@ class TransformerAccumulator(transformer.TransformerBase): And then flushes them out into the wild. """ + grouping_keys = ['resource_id'] + def __init__(self, size=1, **kwargs): if size >= 1: self.samples = [] diff --git a/ceilometer/transformer/arithmetic.py b/ceilometer/transformer/arithmetic.py index 557ac83d..365c7848 100644 --- a/ceilometer/transformer/arithmetic.py +++ b/ceilometer/transformer/arithmetic.py @@ -35,6 +35,8 @@ class ArithmeticTransformer(transformer.TransformerBase): over one or more meters and/or their metadata. """ + grouping_keys = ['resource_id'] + meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)') def __init__(self, target=None, **kwargs): diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index f4068d11..44e6f54c 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -30,6 +30,8 @@ LOG = log.getLogger(__name__) class ScalingTransformer(transformer.TransformerBase): """Transformer to apply a scaling conversion.""" + grouping_keys = ['resource_id'] + def __init__(self, source=None, target=None, **kwargs): """Initialize transformer with configured parameters.