diff --git a/ceilometer/notification.py b/ceilometer/notification.py index dced7a93..680dc994 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -34,6 +34,12 @@ LOG = log.getLogger(__name__) OPTS = [ + cfg.IntOpt('pipeline_processing_queues', + default=10, + min=1, + help='Number of queues to parallelize workload across. This ' + 'value should be larger than the number of active ' + 'notification agents for optimal results.'), cfg.BoolOpt('ack_on_event_error', default=True, deprecated_group='collector', @@ -96,12 +102,12 @@ class NotificationService(service_base.BaseService): def _get_notifiers(self, transport, pipe): notifiers = [] - for agent in self.partition_coordinator._get_members(self.group_id): + for x in range(cfg.CONF.notification.pipeline_processing_queues): notifiers.append(oslo_messaging.Notifier( transport, driver=cfg.CONF.publisher_notifier.telemetry_driver, publisher_id='ceilometer.notification', - topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, agent))) + topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x))) return notifiers def _get_pipe_manager(self, transport, pipeline_manager): @@ -238,20 +244,24 @@ class NotificationService(service_base.BaseService): ev_pipes = self.event_pipeline_manager.pipelines pipelines = self.pipeline_manager.pipelines + ev_pipes transport = messaging.get_transport() - for pipe in pipelines: - LOG.debug('Pipeline endpoint: %s', pipe.name) - pipe_endpoint = (pipeline.EventPipelineEndpoint - if isinstance(pipe, pipeline.EventPipeline) else - pipeline.SamplePipelineEndpoint) - listener = messaging.get_notification_listener( - transport, - [oslo_messaging.Target( - topic='%s-%s-%s' % (self.NOTIFICATION_IPC, - pipe.name, - self.partition_coordinator._my_id))], - [pipe_endpoint(self.ctxt, pipe)]) - listener.start() - self.pipeline_listeners.append(listener) + partitioned = self.partition_coordinator.extract_my_subset( + self.group_id, + range(cfg.CONF.notification.pipeline_processing_queues)) + for pipe_set in partitioned: + for pipe in pipelines: + LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, + pipe_set) + pipe_endpoint = (pipeline.EventPipelineEndpoint + if isinstance(pipe, pipeline.EventPipeline) + else pipeline.SamplePipelineEndpoint) + listener = messaging.get_notification_listener( + transport, + [oslo_messaging.Target( + topic='%s-%s-%s' % (self.NOTIFICATION_IPC, + pipe.name, pipe_set))], + [pipe_endpoint(self.ctxt, pipe)]) + listener.start() + self.pipeline_listeners.append(listener) def stop(self): if self.partition_coordinator: diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index e053540c..5aa9c32b 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -390,9 +390,9 @@ class TestRealNotificationHA(BaseRealNotification): def test_reset_listeners_on_refresh(self): self.srv.start() - self.assertEqual(2, len(self.srv.pipeline_listeners)) + self.assertEqual(20, len(self.srv.pipeline_listeners)) self.srv._refresh_listeners() - self.assertEqual(2, len(self.srv.pipeline_listeners)) + self.assertEqual(20, len(self.srv.pipeline_listeners)) self.srv.stop() @mock.patch('oslo_messaging.Notifier.sample') @@ -463,6 +463,8 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): group="notification") self.CONF.set_override('workload_partitioning', True, group='notification') + self.CONF.set_override('pipeline_processing_queues', 2, + group='notification') self.publisher = test_publisher.TestPublisher("") self.publisher2 = test_publisher.TestPublisher("")