diff --git a/ceilometer/notification.py b/ceilometer/notification.py index b4e1752218..bc70731524 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -109,7 +109,7 @@ class NotificationService(service_base.BaseService): notifiers.append(oslo_messaging.Notifier( transport, driver=cfg.CONF.publisher_notifier.telemetry_driver, - publisher_id='ceilometer.notification', + publisher_id=pipe.name, topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x))) return notifiers @@ -145,7 +145,12 @@ class NotificationService(service_base.BaseService): super(NotificationService, self).start() self.partition_coordinator = None self.coord_lock = threading.Lock() - self.listeners, self.pipeline_listeners = [], [] + + self.listeners = [] + + # NOTE(kbespalov): for the pipeline queues used a single amqp host + # hence only one listener is required + self.pipeline_listener = None self.pipeline_manager = pipeline.setup_pipeline() @@ -172,7 +177,6 @@ class NotificationService(service_base.BaseService): self.event_pipe_manager = self._get_event_pipeline_manager( self.transport) - self.listeners, self.pipeline_listeners = [], [] self._configure_main_queue_listeners(self.pipe_manager, self.event_pipe_manager) @@ -186,7 +190,7 @@ class NotificationService(service_base.BaseService): self.tg.add_timer(cfg.CONF.coordination.check_watchers, self.partition_coordinator.run_watchers) # configure pipelines after all coordination is configured. - self._configure_pipeline_listeners() + self._configure_pipeline_listener() if not cfg.CONF.notification.disable_non_metric_meters: LOG.warning(_LW('Non-metric meters may be collected. It is highly ' @@ -241,9 +245,9 @@ class NotificationService(service_base.BaseService): self.listeners.append(listener) def _refresh_agent(self, event): - self._configure_pipeline_listeners(True) + self._configure_pipeline_listener() - def _configure_pipeline_listeners(self, reuse_listeners=False): + def _configure_pipeline_listener(self): with self.coord_lock: ev_pipes = [] if cfg.CONF.notification.store_events: @@ -254,40 +258,33 @@ class NotificationService(service_base.BaseService): self.group_id, range(cfg.CONF.notification.pipeline_processing_queues)) - queue_set = {} + endpoints = [] + targets = [] + + for pipe in pipelines: + if isinstance(pipe, pipeline.EventPipeline): + endpoints.append(pipeline.EventPipelineEndpoint(pipe)) + else: + endpoints.append(pipeline.SamplePipelineEndpoint(pipe)) + for pipe_set, pipe in itertools.product(partitioned, pipelines): - queue_set['%s-%s-%s' % - (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe + LOG.debug('Pipeline endpoint: %s from set: %s', + pipe.name, pipe_set) + topic = '%s-%s-%s' % (self.NOTIFICATION_IPC, + pipe.name, pipe_set) + targets.append(oslo_messaging.Target(topic=topic)) - if reuse_listeners: - topics = queue_set.keys() - kill_list = [] - for listener in self.pipeline_listeners: - if listener.dispatcher.targets[0].topic in topics: - queue_set.pop(listener.dispatcher.targets[0].topic) - else: - kill_list.append(listener) - for listener in kill_list: - utils.kill_listeners([listener]) - self.pipeline_listeners.remove(listener) - else: - utils.kill_listeners(self.pipeline_listeners) - self.pipeline_listeners = [] + if self.pipeline_listener: + self.pipeline_listener.stop() + self.pipeline_listener.wait() - for topic, pipe in queue_set.items(): - LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, - pipe_set) - pipe_endpoint = (pipeline.EventPipelineEndpoint - if isinstance(pipe, pipeline.EventPipeline) - else pipeline.SamplePipelineEndpoint) - listener = messaging.get_batch_notification_listener( - transport, - [oslo_messaging.Target(topic=topic)], - [pipe_endpoint(pipe)], - batch_size=cfg.CONF.notification.batch_size, - batch_timeout=cfg.CONF.notification.batch_timeout) - listener.start() - self.pipeline_listeners.append(listener) + self.pipeline_listener = messaging.get_batch_notification_listener( + transport, + targets, + endpoints, + batch_size=cfg.CONF.notification.batch_size, + batch_timeout=cfg.CONF.notification.batch_timeout) + self.pipeline_listener.start() def stop(self): if getattr(self, 'partition_coordinator', None): @@ -295,8 +292,8 @@ class NotificationService(service_base.BaseService): listeners = [] if getattr(self, 'listeners', None): listeners.extend(self.listeners) - if getattr(self, 'pipeline_listeners', None): - listeners.extend(self.pipeline_listeners) + if getattr(self, 'pipeline_listener', None): + listeners.append(self.pipeline_listener) utils.kill_listeners(listeners) super(NotificationService, self).stop() @@ -319,4 +316,4 @@ class NotificationService(service_base.BaseService): # re-start the pipeline listeners if workload partitioning # is enabled. if cfg.CONF.notification.workload_partitioning: - self._configure_pipeline_listeners() + self._configure_pipeline_listener() diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index e4d3d0f2ac..13bc6c5aea 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -81,6 +81,8 @@ class PipelineException(Exception): class PipelineEndpoint(object): def __init__(self, pipeline): + self.filter_rule = oslo_messaging.NotificationFilter( + publisher_id=pipeline.name) self.publish_context = PublishContext([pipeline]) @abc.abstractmethod diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index c080ff1425..0b2a715e25 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -256,7 +256,6 @@ class BaseRealNotification(tests_base.BaseTestCase): if (len(self.publisher.samples) >= self.expected_samples and len(self.publisher.events) >= self.expected_events): break - self.assertNotEqual(self.srv.listeners, self.srv.pipeline_listeners) self.srv.stop() resources = list(set(s.resource_id for s in self.publisher.samples)) @@ -386,31 +385,56 @@ class TestRealNotificationHA(BaseRealNotification): fake_publisher_cls.return_value = self.publisher self._check_notification_service() - def test_reset_listeners_on_refresh(self): + def test_reset_listener_on_refresh(self): self.srv.start() - listeners = self.srv.pipeline_listeners - self.assertEqual(20, len(listeners)) - self.srv._configure_pipeline_listeners() - self.assertEqual(20, len(self.srv.pipeline_listeners)) - for listener in listeners: - self.assertNotIn(listeners, set(self.srv.pipeline_listeners)) + listener = self.srv.pipeline_listener + self.assertEqual(20, + len(self.srv.pipeline_listener.dispatcher.targets)) + self.srv._configure_pipeline_listener() + self.assertEqual(20, + len(self.srv.pipeline_listener.dispatcher.targets)) + self.assertIsNot(listener, self.srv.pipeline_listener) self.srv.stop() - def test_retain_common_listeners_on_refresh(self): + def test_retain_common_targets_on_refresh(self): with mock.patch('ceilometer.coordination.PartitionCoordinator' '.extract_my_subset', return_value=[1, 2]): self.srv.start() - self.assertEqual(4, len(self.srv.pipeline_listeners)) - listeners = [listener for listener in self.srv.pipeline_listeners] + listened_before = [target.topic for target in + self.srv.pipeline_listener.dispatcher.targets] + self.assertEqual(4, len(listened_before)) with mock.patch('ceilometer.coordination.PartitionCoordinator' '.extract_my_subset', return_value=[1, 3]): self.srv._refresh_agent(None) - self.assertEqual(4, len(self.srv.pipeline_listeners)) - for listener in listeners: - if listener.dispatcher.targets[0].topic.endswith('1'): - self.assertIn(listener, set(self.srv.pipeline_listeners)) - else: - self.assertNotIn(listener, set(self.srv.pipeline_listeners)) + listened_after = [target.topic for target in + self.srv.pipeline_listener.dispatcher.targets] + self.assertEqual(4, len(listened_after)) + common = set(listened_before) & set(listened_after) + for topic in common: + self.assertTrue(topic.endswith('1')) + self.srv.stop() + + def test_notify_to_relevant_endpoint(self): + self.srv.start() + dispatcher = self.srv.pipeline_listener.dispatcher + self.assertIsNotEmpty(dispatcher.targets) + + endpoints = {} + + for endpoint in dispatcher.endpoints: + self.assertEqual(1, len(endpoint.publish_context.pipelines)) + pipe = list(endpoint.publish_context.pipelines)[0] + endpoints[pipe.name] = endpoint + + notifiers = [] + notifiers.extend(self.srv.pipe_manager.transporters[0][2]) + notifiers.extend(self.srv.event_pipe_manager.transporters[0][2]) + for notifier in notifiers: + filter_rule = endpoints[notifier.publisher_id].filter_rule + self.assertEqual(True, filter_rule.match(None, + notifier.publisher_id, + None, None, None)) + self.srv.stop() @mock.patch('oslo_messaging.Notifier.sample')