diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 9baa5b40..d3fd1764 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -12,6 +12,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import itertools from oslo_config import cfg from oslo_context import context @@ -233,14 +234,9 @@ class NotificationService(service_base.BaseService): self.listeners.append(listener) def _refresh_agent(self, event): - self._refresh_listeners() + self._configure_pipeline_listeners(True) - def _refresh_listeners(self): - utils.kill_listeners(self.pipeline_listeners) - self._configure_pipeline_listeners() - - def _configure_pipeline_listeners(self): - self.pipeline_listeners = [] + def _configure_pipeline_listeners(self, reuse_listeners=False): ev_pipes = [] if cfg.CONF.notification.store_events: ev_pipes = self.event_pipeline_manager.pipelines @@ -249,21 +245,39 @@ class NotificationService(service_base.BaseService): partitioned = self.partition_coordinator.extract_my_subset( self.group_id, range(cfg.CONF.notification.pipeline_processing_queues)) - for pipe_set in partitioned: - for pipe in pipelines: - LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, - pipe_set) - pipe_endpoint = (pipeline.EventPipelineEndpoint - if isinstance(pipe, pipeline.EventPipeline) - else pipeline.SamplePipelineEndpoint) - listener = messaging.get_notification_listener( - transport, - [oslo_messaging.Target( - topic='%s-%s-%s' % (self.NOTIFICATION_IPC, - pipe.name, pipe_set))], - [pipe_endpoint(self.ctxt, pipe)]) - listener.start() - self.pipeline_listeners.append(listener) + + queue_set = {} + for pipe_set, pipe in itertools.product(partitioned, pipelines): + queue_set['%s-%s-%s' % + (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe + + if reuse_listeners: + topics = queue_set.keys() + kill_list = [] + for listener in self.pipeline_listeners: + if listener.dispatcher.targets[0].topic in topics: + queue_set.pop(listener.dispatcher.targets[0].topic) + else: + kill_list.append(listener) + for listener in kill_list: + utils.kill_listeners([listener]) + self.pipeline_listeners.remove(listener) + else: + utils.kill_listeners(self.pipeline_listeners) + self.pipeline_listeners = [] + + for topic, pipe in queue_set.items(): + LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, + pipe_set) + pipe_endpoint = (pipeline.EventPipelineEndpoint + if isinstance(pipe, pipeline.EventPipeline) + else pipeline.SamplePipelineEndpoint) + listener = messaging.get_notification_listener( + transport, + [oslo_messaging.Target(topic=topic)], + [pipe_endpoint(self.ctxt, pipe)]) + listener.start() + self.pipeline_listeners.append(listener) def stop(self): if self.partition_coordinator: @@ -290,4 +304,4 @@ class NotificationService(service_base.BaseService): # re-start the pipeline listeners if workload partitioning # is enabled. if cfg.CONF.notification.workload_partitioning: - self._refresh_listeners() + self._configure_pipeline_listeners() diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 31b85ad7..c0ebe6b5 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -455,9 +455,29 @@ class TestRealNotificationHA(BaseRealNotification): def test_reset_listeners_on_refresh(self): self.srv.start() + listeners = self.srv.pipeline_listeners + self.assertEqual(20, len(listeners)) + self.srv._configure_pipeline_listeners() self.assertEqual(20, len(self.srv.pipeline_listeners)) - self.srv._refresh_listeners() - self.assertEqual(20, len(self.srv.pipeline_listeners)) + for listener in listeners: + self.assertNotIn(listeners, set(self.srv.pipeline_listeners)) + self.srv.stop() + + def test_retain_common_listeners_on_refresh(self): + with mock.patch('ceilometer.coordination.PartitionCoordinator' + '.extract_my_subset', return_value=[1, 2]): + self.srv.start() + self.assertEqual(4, len(self.srv.pipeline_listeners)) + listeners = [listener for listener in self.srv.pipeline_listeners] + with mock.patch('ceilometer.coordination.PartitionCoordinator' + '.extract_my_subset', return_value=[1, 3]): + self.srv._refresh_agent(None) + self.assertEqual(4, len(self.srv.pipeline_listeners)) + for listener in listeners: + if listener.dispatcher.targets[0].topic.endswith('1'): + self.assertIn(listener, set(self.srv.pipeline_listeners)) + else: + self.assertNotIn(listener, set(self.srv.pipeline_listeners)) self.srv.stop() @mock.patch('oslo_messaging.Notifier.sample')