From 911b973d7056b8bbbffef1f4d36bd662173a4f91 Mon Sep 17 00:00:00 2001 From: gord chung <gord@live.ca> Date: Fri, 3 Nov 2017 22:18:33 +0000 Subject: [PATCH] queues per manager rather than per pipeline currently we create queue per pipeline which is not necessary. it creates more memory usage and doesn't necessarily distribute work more effectively. this hashes data to queues based on manager but still internally, the data is destined to specific pipeline based on event_type. this will minimise queue usage while keeping internal code path the same. Change-Id: I0ccd51f13457f208fe2ccedb6e680c91e132f78f --- ceilometer/notification.py | 14 ++++----- ceilometer/pipeline/base.py | 29 +++++++++---------- ceilometer/tests/unit/test_notification.py | 6 ++-- ...ger-based-ipc-queues-85e3bf59ffdfb0ac.yaml | 17 +++++++++++ 4 files changed, 37 insertions(+), 29 deletions(-) create mode 100644 releasenotes/notes/manager-based-ipc-queues-85e3bf59ffdfb0ac.yaml diff --git a/ceilometer/notification.py b/ceilometer/notification.py index b6d6b288ee..c6c71f6bc9 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -105,7 +105,7 @@ class NotificationService(cotyledon.Service): proper active/active HA. """ - NOTIFICATION_NAMESPACE = 'ceilometer.notification' + NOTIFICATION_NAMESPACE = 'ceilometer.notification.v2' def __init__(self, worker_id, conf, coordination_id=None): super(NotificationService, self).__init__(worker_id) @@ -229,14 +229,10 @@ class NotificationService(cotyledon.Service): endpoints.extend(pipe_mgr.get_interim_endpoints()) targets = [] - for mgr in self.managers: - for pipe_set, pipe in itertools.product(partitioned, - mgr.pipelines): - LOG.debug('Pipeline endpoint: %s from set: %s', - pipe.name, pipe_set) - topic = '%s-%s-%s' % (pipe.NOTIFICATION_IPC, - pipe.name, pipe_set) - targets.append(oslo_messaging.Target(topic=topic)) + for mgr, hash_id in itertools.product(self.managers, partitioned): + topic = '-'.join([mgr.NOTIFICATION_IPC, mgr.pm_type, str(hash_id)]) + LOG.debug('Listening to queue: %s', topic) + targets.append(oslo_messaging.Target(topic=topic)) if self.pipeline_listener: self.pipeline_listener.stop() diff --git a/ceilometer/pipeline/base.py b/ceilometer/pipeline/base.py index 506997eebb..99a7b70907 100644 --- a/ceilometer/pipeline/base.py +++ b/ceilometer/pipeline/base.py @@ -48,21 +48,19 @@ class PipelineException(agent.ConfigException): class InterimPublishContext(object): """Publisher to hash/shard data to pipelines""" - def __init__(self, conf, pipelines): + def __init__(self, conf, mgr): self.conf = conf - self.pipe_notifiers = [] - transport = messaging.get_transport(conf) - for pipe in pipelines: - self.pipe_notifiers.append( - (pipe, self._get_notifiers(transport, pipe))) + self.mgr = mgr + self.notifiers = self._get_notifiers(messaging.get_transport(conf)) - def _get_notifiers(self, transport, pipe): + def _get_notifiers(self, transport): notifiers = [] for x in range(self.conf.notification.pipeline_processing_queues): notifiers.append(oslo_messaging.Notifier( transport, driver=self.conf.publisher_notifier.telemetry_driver, - topics=['%s-%s-%s' % (pipe.NOTIFICATION_IPC, pipe.name, x)])) + topics=['-'.join( + [self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])])) return notifiers @staticmethod @@ -79,15 +77,14 @@ class InterimPublishContext(object): def p(data): data = [data] if not isinstance(data, list) else data for datapoint in data: - for pipe, notifiers in self.pipe_notifiers: + for pipe in self.mgr.pipelines: if pipe.supported(datapoint): serialized_data = pipe.serializer(datapoint) key = (self.hash_grouping(serialized_data, pipe.get_grouping_key()) - % len(notifiers)) - notifier = notifiers[key] - notifier.sample({}, event_type=pipe.name, - payload=[serialized_data]) + % len(self.notifiers)) + self.notifiers[key].sample({}, event_type=pipe.name, + payload=[serialized_data]) return p def __exit__(self, exc_type, exc_value, traceback): @@ -221,8 +218,6 @@ class Sink(object): class Pipeline(object): """Represents a coupling between a sink and a corresponding source.""" - NOTIFICATION_IPC = 'ceilometer-pipe' - def __init__(self, conf, source, sink): self.conf = conf self.source = source @@ -284,6 +279,8 @@ class PipelineManager(agent.ConfigManagerBase): Pipeline manager sets up pipelines according to config file """ + NOTIFICATION_IPC = 'ceilometer_ipc' + def __init__(self, conf, cfg_file, transformer_manager, partition): """Setup the pipelines according to config. @@ -408,7 +405,7 @@ class PipelineManager(agent.ConfigManagerBase): def interim_publisher(self): """Build publishing context for IPC.""" - return InterimPublishContext(self.conf, self.pipelines) + return InterimPublishContext(self.conf, self) def get_main_publisher(self): """Return the publishing context to use""" diff --git a/ceilometer/tests/unit/test_notification.py b/ceilometer/tests/unit/test_notification.py index f7b18447a7..a1f95b9281 100644 --- a/ceilometer/tests/unit/test_notification.py +++ b/ceilometer/tests/unit/test_notification.py @@ -300,10 +300,8 @@ class TestRealNotificationHA(BaseRealNotification): topics = [target.topic for target in mock_listener.call_args[0][1]] self.assertEqual(4, len(topics)) self.assertEqual( - {'ceilometer-pipe-test_pipeline:test_sink-4', - 'ceilometer-pipe-event:test_event:test_sink-4', - 'ceilometer-pipe-event:test_event:test_sink-9', - 'ceilometer-pipe-test_pipeline:test_sink-9'}, + {'ceilometer_ipc-sample-4', 'ceilometer_ipc-sample-9', + 'ceilometer_ipc-event-4', 'ceilometer_ipc-event-9'}, set(topics)) @mock.patch('oslo_messaging.get_batch_notification_listener') diff --git a/releasenotes/notes/manager-based-ipc-queues-85e3bf59ffdfb0ac.yaml b/releasenotes/notes/manager-based-ipc-queues-85e3bf59ffdfb0ac.yaml new file mode 100644 index 0000000000..b4478ddebb --- /dev/null +++ b/releasenotes/notes/manager-based-ipc-queues-85e3bf59ffdfb0ac.yaml @@ -0,0 +1,17 @@ +--- +features: + - | + Workload partitioning of notification agent is now split into queues + based on pipeline type (sample, event, etc...) rather than per individual + pipeline. This will save some memory usage specifically for pipeline + definitions with many source/sink combinations. +upgrade: + - | + If workload partitioning of the notification agent is enabled, the + notification agent should not run alongside pre-Queens agents. Doing so + may result in missed samples when leveraging transformations. To upgrade + without loss of data, set `notification_control_exchanges` option to + empty so only existing `ceilometer-pipe-*` queues are processed. Once + cleared, reset `notification_control_exchanges` option and launch the new + notification agent(s). If `workload_partitioning` is not enabled, no + special steps are required.