diff --git a/ceilometer/notification.py b/ceilometer/notification.py index b6d6b288ee..c6c71f6bc9 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -105,7 +105,7 @@ class NotificationService(cotyledon.Service): proper active/active HA. """ - NOTIFICATION_NAMESPACE = 'ceilometer.notification' + NOTIFICATION_NAMESPACE = 'ceilometer.notification.v2' def __init__(self, worker_id, conf, coordination_id=None): super(NotificationService, self).__init__(worker_id) @@ -229,14 +229,10 @@ class NotificationService(cotyledon.Service): endpoints.extend(pipe_mgr.get_interim_endpoints()) targets = [] - for mgr in self.managers: - for pipe_set, pipe in itertools.product(partitioned, - mgr.pipelines): - LOG.debug('Pipeline endpoint: %s from set: %s', - pipe.name, pipe_set) - topic = '%s-%s-%s' % (pipe.NOTIFICATION_IPC, - pipe.name, pipe_set) - targets.append(oslo_messaging.Target(topic=topic)) + for mgr, hash_id in itertools.product(self.managers, partitioned): + topic = '-'.join([mgr.NOTIFICATION_IPC, mgr.pm_type, str(hash_id)]) + LOG.debug('Listening to queue: %s', topic) + targets.append(oslo_messaging.Target(topic=topic)) if self.pipeline_listener: self.pipeline_listener.stop() diff --git a/ceilometer/pipeline/base.py b/ceilometer/pipeline/base.py index 506997eebb..99a7b70907 100644 --- a/ceilometer/pipeline/base.py +++ b/ceilometer/pipeline/base.py @@ -48,21 +48,19 @@ class PipelineException(agent.ConfigException): class InterimPublishContext(object): """Publisher to hash/shard data to pipelines""" - def __init__(self, conf, pipelines): + def __init__(self, conf, mgr): self.conf = conf - self.pipe_notifiers = [] - transport = messaging.get_transport(conf) - for pipe in pipelines: - self.pipe_notifiers.append( - (pipe, self._get_notifiers(transport, pipe))) + self.mgr = mgr + self.notifiers = self._get_notifiers(messaging.get_transport(conf)) - def _get_notifiers(self, transport, pipe): + def _get_notifiers(self, transport): notifiers = [] for x in range(self.conf.notification.pipeline_processing_queues): notifiers.append(oslo_messaging.Notifier( transport, driver=self.conf.publisher_notifier.telemetry_driver, - topics=['%s-%s-%s' % (pipe.NOTIFICATION_IPC, pipe.name, x)])) + topics=['-'.join( + [self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])])) return notifiers @staticmethod @@ -79,15 +77,14 @@ class InterimPublishContext(object): def p(data): data = [data] if not isinstance(data, list) else data for datapoint in data: - for pipe, notifiers in self.pipe_notifiers: + for pipe in self.mgr.pipelines: if pipe.supported(datapoint): serialized_data = pipe.serializer(datapoint) key = (self.hash_grouping(serialized_data, pipe.get_grouping_key()) - % len(notifiers)) - notifier = notifiers[key] - notifier.sample({}, event_type=pipe.name, - payload=[serialized_data]) + % len(self.notifiers)) + self.notifiers[key].sample({}, event_type=pipe.name, + payload=[serialized_data]) return p def __exit__(self, exc_type, exc_value, traceback): @@ -221,8 +218,6 @@ class Sink(object): class Pipeline(object): """Represents a coupling between a sink and a corresponding source.""" - NOTIFICATION_IPC = 'ceilometer-pipe' - def __init__(self, conf, source, sink): self.conf = conf self.source = source @@ -284,6 +279,8 @@ class PipelineManager(agent.ConfigManagerBase): Pipeline manager sets up pipelines according to config file """ + NOTIFICATION_IPC = 'ceilometer_ipc' + def __init__(self, conf, cfg_file, transformer_manager, partition): """Setup the pipelines according to config. @@ -408,7 +405,7 @@ class PipelineManager(agent.ConfigManagerBase): def interim_publisher(self): """Build publishing context for IPC.""" - return InterimPublishContext(self.conf, self.pipelines) + return InterimPublishContext(self.conf, self) def get_main_publisher(self): """Return the publishing context to use""" diff --git a/ceilometer/tests/unit/test_notification.py b/ceilometer/tests/unit/test_notification.py index f7b18447a7..a1f95b9281 100644 --- a/ceilometer/tests/unit/test_notification.py +++ b/ceilometer/tests/unit/test_notification.py @@ -300,10 +300,8 @@ class TestRealNotificationHA(BaseRealNotification): topics = [target.topic for target in mock_listener.call_args[0][1]] self.assertEqual(4, len(topics)) self.assertEqual( - {'ceilometer-pipe-test_pipeline:test_sink-4', - 'ceilometer-pipe-event:test_event:test_sink-4', - 'ceilometer-pipe-event:test_event:test_sink-9', - 'ceilometer-pipe-test_pipeline:test_sink-9'}, + {'ceilometer_ipc-sample-4', 'ceilometer_ipc-sample-9', + 'ceilometer_ipc-event-4', 'ceilometer_ipc-event-9'}, set(topics)) @mock.patch('oslo_messaging.get_batch_notification_listener') diff --git a/releasenotes/notes/manager-based-ipc-queues-85e3bf59ffdfb0ac.yaml b/releasenotes/notes/manager-based-ipc-queues-85e3bf59ffdfb0ac.yaml new file mode 100644 index 0000000000..b4478ddebb --- /dev/null +++ b/releasenotes/notes/manager-based-ipc-queues-85e3bf59ffdfb0ac.yaml @@ -0,0 +1,17 @@ +--- +features: + - | + Workload partitioning of notification agent is now split into queues + based on pipeline type (sample, event, etc...) rather than per individual + pipeline. This will save some memory usage specifically for pipeline + definitions with many source/sink combinations. +upgrade: + - | + If workload partitioning of the notification agent is enabled, the + notification agent should not run alongside pre-Queens agents. Doing so + may result in missed samples when leveraging transformations. To upgrade + without loss of data, set `notification_control_exchanges` option to + empty so only existing `ceilometer-pipe-*` queues are processed. Once + cleared, reset `notification_control_exchanges` option and launch the new + notification agent(s). If `workload_partitioning` is not enabled, no + special steps are required.