From 338e7a655d5d422dc9db878d2f0952549e54e711 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Fri, 18 Sep 2015 14:39:25 -0400 Subject: [PATCH] retain existing listeners on refresh there is an overhead to killing and recreating listeners. this patch keeps the pipeline listeners we already have after rebalancing rather than blindly killing and recreating all of them. Change-Id: Ic7c23fd7649ca0b828cc4266582163bd326c2b80 Closes-Bug: #1496459 --- ceilometer/notification.py | 60 ++++++++++++------- .../tests/functional/test_notification.py | 24 +++++++- 2 files changed, 59 insertions(+), 25 deletions(-) diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 9baa5b40..d3fd1764 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -12,6 +12,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import itertools from oslo_config import cfg from oslo_context import context @@ -233,14 +234,9 @@ class NotificationService(service_base.BaseService): self.listeners.append(listener) def _refresh_agent(self, event): - self._refresh_listeners() + self._configure_pipeline_listeners(True) - def _refresh_listeners(self): - utils.kill_listeners(self.pipeline_listeners) - self._configure_pipeline_listeners() - - def _configure_pipeline_listeners(self): - self.pipeline_listeners = [] + def _configure_pipeline_listeners(self, reuse_listeners=False): ev_pipes = [] if cfg.CONF.notification.store_events: ev_pipes = self.event_pipeline_manager.pipelines @@ -249,21 +245,39 @@ class NotificationService(service_base.BaseService): partitioned = self.partition_coordinator.extract_my_subset( self.group_id, range(cfg.CONF.notification.pipeline_processing_queues)) - for pipe_set in partitioned: - for pipe in pipelines: - LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, - pipe_set) - pipe_endpoint = (pipeline.EventPipelineEndpoint - if isinstance(pipe, pipeline.EventPipeline) - else pipeline.SamplePipelineEndpoint) - listener = messaging.get_notification_listener( - transport, - [oslo_messaging.Target( - topic='%s-%s-%s' % (self.NOTIFICATION_IPC, - pipe.name, pipe_set))], - [pipe_endpoint(self.ctxt, pipe)]) - listener.start() - self.pipeline_listeners.append(listener) + + queue_set = {} + for pipe_set, pipe in itertools.product(partitioned, pipelines): + queue_set['%s-%s-%s' % + (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe + + if reuse_listeners: + topics = queue_set.keys() + kill_list = [] + for listener in self.pipeline_listeners: + if listener.dispatcher.targets[0].topic in topics: + queue_set.pop(listener.dispatcher.targets[0].topic) + else: + kill_list.append(listener) + for listener in kill_list: + utils.kill_listeners([listener]) + self.pipeline_listeners.remove(listener) + else: + utils.kill_listeners(self.pipeline_listeners) + self.pipeline_listeners = [] + + for topic, pipe in queue_set.items(): + LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, + pipe_set) + pipe_endpoint = (pipeline.EventPipelineEndpoint + if isinstance(pipe, pipeline.EventPipeline) + else pipeline.SamplePipelineEndpoint) + listener = messaging.get_notification_listener( + transport, + [oslo_messaging.Target(topic=topic)], + [pipe_endpoint(self.ctxt, pipe)]) + listener.start() + self.pipeline_listeners.append(listener) def stop(self): if self.partition_coordinator: @@ -290,4 +304,4 @@ class NotificationService(service_base.BaseService): # re-start the pipeline listeners if workload partitioning # is enabled. if cfg.CONF.notification.workload_partitioning: - self._refresh_listeners() + self._configure_pipeline_listeners() diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 31b85ad7..c0ebe6b5 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -455,9 +455,29 @@ class TestRealNotificationHA(BaseRealNotification): def test_reset_listeners_on_refresh(self): self.srv.start() + listeners = self.srv.pipeline_listeners + self.assertEqual(20, len(listeners)) + self.srv._configure_pipeline_listeners() self.assertEqual(20, len(self.srv.pipeline_listeners)) - self.srv._refresh_listeners() - self.assertEqual(20, len(self.srv.pipeline_listeners)) + for listener in listeners: + self.assertNotIn(listeners, set(self.srv.pipeline_listeners)) + self.srv.stop() + + def test_retain_common_listeners_on_refresh(self): + with mock.patch('ceilometer.coordination.PartitionCoordinator' + '.extract_my_subset', return_value=[1, 2]): + self.srv.start() + self.assertEqual(4, len(self.srv.pipeline_listeners)) + listeners = [listener for listener in self.srv.pipeline_listeners] + with mock.patch('ceilometer.coordination.PartitionCoordinator' + '.extract_my_subset', return_value=[1, 3]): + self.srv._refresh_agent(None) + self.assertEqual(4, len(self.srv.pipeline_listeners)) + for listener in listeners: + if listener.dispatcher.targets[0].topic.endswith('1'): + self.assertIn(listener, set(self.srv.pipeline_listeners)) + else: + self.assertNotIn(listener, set(self.srv.pipeline_listeners)) self.srv.stop() @mock.patch('oslo_messaging.Notifier.sample')