diff --git a/ceilometer/notification.py b/ceilometer/notification.py index bfa581bd..dced7a93 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -109,8 +109,9 @@ class NotificationService(service_base.BaseService): if cfg.CONF.notification.workload_partitioning: pipe_manager = pipeline.SamplePipelineTransportManager() for pipe in pipeline_manager.pipelines: + key = pipeline.get_pipeline_grouping_key(pipe) pipe_manager.add_transporter( - (pipe.source.support_meter, + (pipe.source.support_meter, key or ['resource_id'], self._get_notifiers(transport, pipe))) else: pipe_manager = pipeline_manager @@ -126,7 +127,7 @@ class NotificationService(service_base.BaseService): event_pipe_manager = pipeline.EventPipelineTransportManager() for pipe in self.event_pipeline_manager.pipelines: event_pipe_manager.add_transporter( - (pipe.source.support_event, + (pipe.source.support_event, ['event_type'], self._get_notifiers(transport, pipe))) else: event_pipe_manager = self.event_pipeline_manager diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 620933dd..95863aef 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -125,12 +125,19 @@ class _PipelineTransportManager(object): def __init__(self): self.transporters = [] + @staticmethod + def hash_grouping(datapoint, grouping_keys): + value = '' + for key in grouping_keys or []: + value += datapoint.get(key) if datapoint.get(key) else '' + return hash(value) + def add_transporter(self, transporter): self.transporters.append(transporter) def publisher(self, context): serializer = self.serializer - hash_to_bucketise = self.hash_to_bucketise + hash_grouping = self.hash_grouping transporters = self.transporters filter_attr = self.filter_attr event_type = self.event_type @@ -144,10 +151,11 @@ class _PipelineTransportManager(object): data = [data] if not isinstance(data, list) else data for datapoint in data: serialized_data = serializer(datapoint) - for d_filter, notifiers in transporters: + for d_filter, grouping_keys, notifiers in transporters: if d_filter(serialized_data[filter_attr]): - key = (hash_to_bucketise(serialized_data) % - len(notifiers)) + key = (hash_grouping(serialized_data, + grouping_keys) + % len(notifiers)) notifier = notifiers[key] notifier.sample(context.to_dict(), event_type=event_type, @@ -164,10 +172,6 @@ class SamplePipelineTransportManager(_PipelineTransportManager): filter_attr = 'counter_name' event_type = 'ceilometer.pipeline' - @staticmethod - def hash_to_bucketise(datapoint): - return hash(datapoint['resource_id']) - @staticmethod def serializer(data): return publisher_utils.meter_message_from_counter( @@ -178,10 +182,6 @@ class EventPipelineTransportManager(_PipelineTransportManager): filter_attr = 'event_type' event_type = 'pipeline.event' - @staticmethod - def hash_to_bucketise(datapoint): - return hash(datapoint['event_type']) - @staticmethod def serializer(data): return publisher_utils.message_from_event( @@ -853,3 +853,10 @@ def setup_polling(): """Setup polling manager according to yaml config file.""" cfg_file = cfg.CONF.pipeline_cfg_file return _setup_polling_manager(cfg_file) + + +def get_pipeline_grouping_key(pipe): + keys = [] + for transformer in pipe.sink.transformers: + keys += transformer.grouping_keys + return list(set(keys)) diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 5fe14fa2..e55b6f5b 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -425,10 +425,50 @@ class TestRealNotificationHA(BaseRealNotification): mock_notifier.call_args_list[2][1]['event_type']) self.srv.stop() - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_multiple_agents(self, fake_publisher_cls): - fake_publisher_cls.return_value = self.publisher +class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): + def setup_pipeline(self, transformers): + pipeline = yaml.dump({ + 'sources': [{ + 'name': 'test_pipeline', + 'interval': 5, + 'meters': ['instance', 'memory'], + 'sinks': ['test_sink'] + }], + 'sinks': [{ + 'name': 'test_sink', + 'transformers': transformers, + 'publishers': ['test://'] + }] + }) + if six.PY3: + pipeline = pipeline.encode('utf-8') + + pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline, + prefix="pipeline", + suffix="yaml") + return pipeline_cfg_file + + def setUp(self): + super(TestRealNotificationMultipleAgents, self).setUp() + self.CONF = self.useFixture(fixture_config.Config()).conf + service.prepare_service([]) + self.setup_messaging(self.CONF, 'nova') + + pipeline_cfg_file = self.setup_pipeline(['instance', 'memory']) + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self.CONF.set_override("store_events", False, group="notification") + self.CONF.set_override("disable_non_metric_meters", False, + group="notification") + self.CONF.set_override('workload_partitioning', True, + group='notification') + self.publisher = test_publisher.TestPublisher("") + self.publisher2 = test_publisher.TestPublisher("") + + def _check_notifications(self, fake_publisher_cls): + fake_publisher_cls.side_effect = [self.publisher, self.publisher2] + + self.srv = notification.NotificationService() self.srv2 = notification.NotificationService() with mock.patch('ceilometer.coordination.PartitionCoordinator' '._get_members', return_value=['harry', 'lloyd']): @@ -448,17 +488,63 @@ class TestRealNotificationHA(BaseRealNotification): notifier.info(context.RequestContext(), 'compute.instance.create.end', payload2) self.expected_samples = 4 - self.expected_events = 2 start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 60: - if (len(self.publisher.samples) >= self.expected_samples and - len(self.publisher.events) >= self.expected_events): - break - eventlet.sleep(0) - self.srv.stop() - self.srv2.stop() + with mock.patch('six.moves.builtins.hash', lambda x: int(x)): + while timeutils.delta_seconds(start, timeutils.utcnow()) < 60: + if (len(self.publisher.samples + self.publisher2.samples) >= + self.expected_samples): + break + eventlet.sleep(0) + self.srv.stop() + self.srv2.stop() - resources = set(s.resource_id for s in self.publisher.samples) - self.assertEqual(self.expected_samples, len(self.publisher.samples)) - self.assertEqual(self.expected_events, len(self.publisher.events)) - self.assertEqual(set(['1', '0']), resources) + self.assertEqual(2, len(self.publisher.samples)) + self.assertEqual(2, len(self.publisher2.samples)) + self.assertEqual(1, len(set( + s.resource_id for s in self.publisher.samples))) + self.assertEqual(1, len(set( + s.resource_id for s in self.publisher2.samples))) + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_multiple_agents_no_transform(self, fake_publisher_cls): + pipeline_cfg_file = self.setup_pipeline([]) + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self._check_notifications(fake_publisher_cls) + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_multiple_agents_transform(self, fake_publisher_cls): + pipeline_cfg_file = self.setup_pipeline( + [{ + 'name': 'unit_conversion', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_mins', + 'unit': 'min', + 'scale': 'volume'}, + } + }]) + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self._check_notifications(fake_publisher_cls) + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_multiple_agents_multiple_transform(self, fake_publisher_cls): + pipeline_cfg_file = self.setup_pipeline( + [{ + 'name': 'unit_conversion', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_mins', + 'unit': 'min', + 'scale': 'volume'}, + } + }, { + 'name': 'unit_conversion', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_mins', + 'unit': 'min', + 'scale': 'volume'}, + } + }]) + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self._check_notifications(fake_publisher_cls) diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index e4f0a813..4ff335ab 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -86,6 +86,7 @@ class BasePipelineTestCase(base.BaseTestCase): class TransformerClass(transformer.TransformerBase): samples = [] + grouping_keys = ['counter_name'] def __init__(self, append_name='_update'): self.__class__.samples = [] @@ -111,6 +112,7 @@ class BasePipelineTestCase(base.BaseTestCase): class TransformerClassDrop(transformer.TransformerBase): samples = [] + grouping_keys = ['resource_id'] def __init__(self): self.__class__.samples = [] @@ -119,6 +121,8 @@ class BasePipelineTestCase(base.BaseTestCase): self.__class__.samples.append(counter) class TransformerClassException(object): + grouping_keys = ['resource_id'] + @staticmethod def handle_sample(ctxt, counter): raise Exception() @@ -1897,3 +1901,48 @@ class BasePipelineTestCase(base.BaseTestCase): def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() self._exception_create_pipelinemanager() + + def test_get_pipeline_grouping_key(self): + transformer_cfg = [ + { + 'name': 'update', + 'parameters': {} + }, + { + 'name': 'unit_conversion', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_mins', + 'unit': 'min', + 'scale': 'volume'}, + } + }, + { + 'name': 'update', + 'parameters': {} + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + self.assertEqual(set(['resource_id', 'counter_name']), + set(pipeline.get_pipeline_grouping_key( + pipeline_manager.pipelines[0]))) + + def test_get_pipeline_duplicate_grouping_key(self): + transformer_cfg = [ + { + 'name': 'update', + 'parameters': {} + }, + { + 'name': 'update', + 'parameters': {} + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + self.assertEqual(['counter_name'], + pipeline.get_pipeline_grouping_key( + pipeline_manager.pipelines[0])) diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py index c3124c2e..2525e774 100644 --- a/ceilometer/transformer/__init__.py +++ b/ceilometer/transformer/__init__.py @@ -43,6 +43,10 @@ class TransformerBase(object): :param sample: A sample. """ + @abc.abstractproperty + def grouping_keys(self): + """Keys used to group transformer.""" + def flush(self, context): """Flush samples cached previously. diff --git a/ceilometer/transformer/accumulator.py b/ceilometer/transformer/accumulator.py index 0ae9ce9a..2479511d 100644 --- a/ceilometer/transformer/accumulator.py +++ b/ceilometer/transformer/accumulator.py @@ -22,6 +22,8 @@ class TransformerAccumulator(transformer.TransformerBase): And then flushes them out into the wild. """ + grouping_keys = ['resource_id'] + def __init__(self, size=1, **kwargs): if size >= 1: self.samples = [] diff --git a/ceilometer/transformer/arithmetic.py b/ceilometer/transformer/arithmetic.py index 557ac83d..365c7848 100644 --- a/ceilometer/transformer/arithmetic.py +++ b/ceilometer/transformer/arithmetic.py @@ -35,6 +35,8 @@ class ArithmeticTransformer(transformer.TransformerBase): over one or more meters and/or their metadata. """ + grouping_keys = ['resource_id'] + meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)') def __init__(self, target=None, **kwargs): diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index f4068d11..44e6f54c 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -30,6 +30,8 @@ LOG = log.getLogger(__name__) class ScalingTransformer(transformer.TransformerBase): """Transformer to apply a scaling conversion.""" + grouping_keys = ['resource_id'] + def __init__(self, source=None, target=None, **kwargs): """Initialize transformer with configured parameters.