From 2e4b807131b4f21dbc4bcf34883592b738351168 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Wed, 19 Aug 2015 17:33:41 -0400 Subject: [PATCH] allow configurable pipeline partitioning existing code will create queues from the cartesian product of the set of pipelines and the set of agents. these queues are identified by pipe_type-pipe_name-agent_id. the problem arises where if an agent dies, any messages left on it's queue will be lost as agent_id's tend to be unique. this patch changes the logic so rather than using the number of agents as the set we build our cartesian product from, we use a set of numbers of a configurable size. by default, it will be a set of 10 so that queues are created from the product set of the set of pipelines and the set {0, 1, 2, 3, ... 9}. similarly, the pipelines queues will be identified by pipe_type-pipe_name-x, where x is value from numerical set. this way, if an agent dies, the queues are still known and can be redistributed and processed among the active agents. Change-Id: I35e3afdd2c9f97ab5bc6bb89531bd20308c5f8c0 Closes-Bug: #1486754 --- ceilometer/notification.py | 42 ++++++++++++------- .../tests/functional/test_notification.py | 6 ++- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/ceilometer/notification.py b/ceilometer/notification.py index dced7a93..680dc994 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -34,6 +34,12 @@ LOG = log.getLogger(__name__) OPTS = [ + cfg.IntOpt('pipeline_processing_queues', + default=10, + min=1, + help='Number of queues to parallelize workload across. This ' + 'value should be larger than the number of active ' + 'notification agents for optimal results.'), cfg.BoolOpt('ack_on_event_error', default=True, deprecated_group='collector', @@ -96,12 +102,12 @@ class NotificationService(service_base.BaseService): def _get_notifiers(self, transport, pipe): notifiers = [] - for agent in self.partition_coordinator._get_members(self.group_id): + for x in range(cfg.CONF.notification.pipeline_processing_queues): notifiers.append(oslo_messaging.Notifier( transport, driver=cfg.CONF.publisher_notifier.telemetry_driver, publisher_id='ceilometer.notification', - topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, agent))) + topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x))) return notifiers def _get_pipe_manager(self, transport, pipeline_manager): @@ -238,20 +244,24 @@ class NotificationService(service_base.BaseService): ev_pipes = self.event_pipeline_manager.pipelines pipelines = self.pipeline_manager.pipelines + ev_pipes transport = messaging.get_transport() - for pipe in pipelines: - LOG.debug('Pipeline endpoint: %s', pipe.name) - pipe_endpoint = (pipeline.EventPipelineEndpoint - if isinstance(pipe, pipeline.EventPipeline) else - pipeline.SamplePipelineEndpoint) - listener = messaging.get_notification_listener( - transport, - [oslo_messaging.Target( - topic='%s-%s-%s' % (self.NOTIFICATION_IPC, - pipe.name, - self.partition_coordinator._my_id))], - [pipe_endpoint(self.ctxt, pipe)]) - listener.start() - self.pipeline_listeners.append(listener) + partitioned = self.partition_coordinator.extract_my_subset( + self.group_id, + range(cfg.CONF.notification.pipeline_processing_queues)) + for pipe_set in partitioned: + for pipe in pipelines: + LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, + pipe_set) + pipe_endpoint = (pipeline.EventPipelineEndpoint + if isinstance(pipe, pipeline.EventPipeline) + else pipeline.SamplePipelineEndpoint) + listener = messaging.get_notification_listener( + transport, + [oslo_messaging.Target( + topic='%s-%s-%s' % (self.NOTIFICATION_IPC, + pipe.name, pipe_set))], + [pipe_endpoint(self.ctxt, pipe)]) + listener.start() + self.pipeline_listeners.append(listener) def stop(self): if self.partition_coordinator: diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index e053540c..5aa9c32b 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -390,9 +390,9 @@ class TestRealNotificationHA(BaseRealNotification): def test_reset_listeners_on_refresh(self): self.srv.start() - self.assertEqual(2, len(self.srv.pipeline_listeners)) + self.assertEqual(20, len(self.srv.pipeline_listeners)) self.srv._refresh_listeners() - self.assertEqual(2, len(self.srv.pipeline_listeners)) + self.assertEqual(20, len(self.srv.pipeline_listeners)) self.srv.stop() @mock.patch('oslo_messaging.Notifier.sample') @@ -463,6 +463,8 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): group="notification") self.CONF.set_override('workload_partitioning', True, group='notification') + self.CONF.set_override('pipeline_processing_queues', 2, + group='notification') self.publisher = test_publisher.TestPublisher("") self.publisher2 = test_publisher.TestPublisher("")