Fix notification listeners usage
Closes-Bug: #1570468 Create the single notification pipeline listener and to aggregate targets and endpoints in it. In this implementation the incoming notification msg will be dispatched in a relevant endpoint by its own filter rule based on publisher id of a notifier, which sent this message. Each pipeline notifier has a conforming publisher id based on a pipeline name in which it to produce messages. Change-Id: If8c1c6838fd291ef8dbdd4d27c78fe5a36522178
This commit is contained in:
parent
9bbb9f4589
commit
726b2d4d67
|
@ -109,7 +109,7 @@ class NotificationService(service_base.BaseService):
|
|||
notifiers.append(oslo_messaging.Notifier(
|
||||
transport,
|
||||
driver=cfg.CONF.publisher_notifier.telemetry_driver,
|
||||
publisher_id='ceilometer.notification',
|
||||
publisher_id=pipe.name,
|
||||
topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x)))
|
||||
return notifiers
|
||||
|
||||
|
@ -145,7 +145,12 @@ class NotificationService(service_base.BaseService):
|
|||
super(NotificationService, self).start()
|
||||
self.partition_coordinator = None
|
||||
self.coord_lock = threading.Lock()
|
||||
self.listeners, self.pipeline_listeners = [], []
|
||||
|
||||
self.listeners = []
|
||||
|
||||
# NOTE(kbespalov): for the pipeline queues used a single amqp host
|
||||
# hence only one listener is required
|
||||
self.pipeline_listener = None
|
||||
|
||||
self.pipeline_manager = pipeline.setup_pipeline()
|
||||
|
||||
|
@ -172,7 +177,6 @@ class NotificationService(service_base.BaseService):
|
|||
self.event_pipe_manager = self._get_event_pipeline_manager(
|
||||
self.transport)
|
||||
|
||||
self.listeners, self.pipeline_listeners = [], []
|
||||
self._configure_main_queue_listeners(self.pipe_manager,
|
||||
self.event_pipe_manager)
|
||||
|
||||
|
@ -186,7 +190,7 @@ class NotificationService(service_base.BaseService):
|
|||
self.tg.add_timer(cfg.CONF.coordination.check_watchers,
|
||||
self.partition_coordinator.run_watchers)
|
||||
# configure pipelines after all coordination is configured.
|
||||
self._configure_pipeline_listeners()
|
||||
self._configure_pipeline_listener()
|
||||
|
||||
if not cfg.CONF.notification.disable_non_metric_meters:
|
||||
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
|
||||
|
@ -241,9 +245,9 @@ class NotificationService(service_base.BaseService):
|
|||
self.listeners.append(listener)
|
||||
|
||||
def _refresh_agent(self, event):
|
||||
self._configure_pipeline_listeners(True)
|
||||
self._configure_pipeline_listener()
|
||||
|
||||
def _configure_pipeline_listeners(self, reuse_listeners=False):
|
||||
def _configure_pipeline_listener(self):
|
||||
with self.coord_lock:
|
||||
ev_pipes = []
|
||||
if cfg.CONF.notification.store_events:
|
||||
|
@ -254,40 +258,33 @@ class NotificationService(service_base.BaseService):
|
|||
self.group_id,
|
||||
range(cfg.CONF.notification.pipeline_processing_queues))
|
||||
|
||||
queue_set = {}
|
||||
endpoints = []
|
||||
targets = []
|
||||
|
||||
for pipe in pipelines:
|
||||
if isinstance(pipe, pipeline.EventPipeline):
|
||||
endpoints.append(pipeline.EventPipelineEndpoint(pipe))
|
||||
else:
|
||||
endpoints.append(pipeline.SamplePipelineEndpoint(pipe))
|
||||
|
||||
for pipe_set, pipe in itertools.product(partitioned, pipelines):
|
||||
queue_set['%s-%s-%s' %
|
||||
(self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
|
||||
LOG.debug('Pipeline endpoint: %s from set: %s',
|
||||
pipe.name, pipe_set)
|
||||
topic = '%s-%s-%s' % (self.NOTIFICATION_IPC,
|
||||
pipe.name, pipe_set)
|
||||
targets.append(oslo_messaging.Target(topic=topic))
|
||||
|
||||
if reuse_listeners:
|
||||
topics = queue_set.keys()
|
||||
kill_list = []
|
||||
for listener in self.pipeline_listeners:
|
||||
if listener.dispatcher.targets[0].topic in topics:
|
||||
queue_set.pop(listener.dispatcher.targets[0].topic)
|
||||
else:
|
||||
kill_list.append(listener)
|
||||
for listener in kill_list:
|
||||
utils.kill_listeners([listener])
|
||||
self.pipeline_listeners.remove(listener)
|
||||
else:
|
||||
utils.kill_listeners(self.pipeline_listeners)
|
||||
self.pipeline_listeners = []
|
||||
if self.pipeline_listener:
|
||||
self.pipeline_listener.stop()
|
||||
self.pipeline_listener.wait()
|
||||
|
||||
for topic, pipe in queue_set.items():
|
||||
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
|
||||
pipe_set)
|
||||
pipe_endpoint = (pipeline.EventPipelineEndpoint
|
||||
if isinstance(pipe, pipeline.EventPipeline)
|
||||
else pipeline.SamplePipelineEndpoint)
|
||||
listener = messaging.get_batch_notification_listener(
|
||||
transport,
|
||||
[oslo_messaging.Target(topic=topic)],
|
||||
[pipe_endpoint(pipe)],
|
||||
batch_size=cfg.CONF.notification.batch_size,
|
||||
batch_timeout=cfg.CONF.notification.batch_timeout)
|
||||
listener.start()
|
||||
self.pipeline_listeners.append(listener)
|
||||
self.pipeline_listener = messaging.get_batch_notification_listener(
|
||||
transport,
|
||||
targets,
|
||||
endpoints,
|
||||
batch_size=cfg.CONF.notification.batch_size,
|
||||
batch_timeout=cfg.CONF.notification.batch_timeout)
|
||||
self.pipeline_listener.start()
|
||||
|
||||
def stop(self):
|
||||
if getattr(self, 'partition_coordinator', None):
|
||||
|
@ -295,8 +292,8 @@ class NotificationService(service_base.BaseService):
|
|||
listeners = []
|
||||
if getattr(self, 'listeners', None):
|
||||
listeners.extend(self.listeners)
|
||||
if getattr(self, 'pipeline_listeners', None):
|
||||
listeners.extend(self.pipeline_listeners)
|
||||
if getattr(self, 'pipeline_listener', None):
|
||||
listeners.append(self.pipeline_listener)
|
||||
utils.kill_listeners(listeners)
|
||||
super(NotificationService, self).stop()
|
||||
|
||||
|
@ -319,4 +316,4 @@ class NotificationService(service_base.BaseService):
|
|||
# re-start the pipeline listeners if workload partitioning
|
||||
# is enabled.
|
||||
if cfg.CONF.notification.workload_partitioning:
|
||||
self._configure_pipeline_listeners()
|
||||
self._configure_pipeline_listener()
|
||||
|
|
|
@ -81,6 +81,8 @@ class PipelineException(Exception):
|
|||
class PipelineEndpoint(object):
|
||||
|
||||
def __init__(self, pipeline):
|
||||
self.filter_rule = oslo_messaging.NotificationFilter(
|
||||
publisher_id=pipeline.name)
|
||||
self.publish_context = PublishContext([pipeline])
|
||||
|
||||
@abc.abstractmethod
|
||||
|
|
|
@ -256,7 +256,6 @@ class BaseRealNotification(tests_base.BaseTestCase):
|
|||
if (len(self.publisher.samples) >= self.expected_samples and
|
||||
len(self.publisher.events) >= self.expected_events):
|
||||
break
|
||||
self.assertNotEqual(self.srv.listeners, self.srv.pipeline_listeners)
|
||||
self.srv.stop()
|
||||
|
||||
resources = list(set(s.resource_id for s in self.publisher.samples))
|
||||
|
@ -386,31 +385,56 @@ class TestRealNotificationHA(BaseRealNotification):
|
|||
fake_publisher_cls.return_value = self.publisher
|
||||
self._check_notification_service()
|
||||
|
||||
def test_reset_listeners_on_refresh(self):
|
||||
def test_reset_listener_on_refresh(self):
|
||||
self.srv.start()
|
||||
listeners = self.srv.pipeline_listeners
|
||||
self.assertEqual(20, len(listeners))
|
||||
self.srv._configure_pipeline_listeners()
|
||||
self.assertEqual(20, len(self.srv.pipeline_listeners))
|
||||
for listener in listeners:
|
||||
self.assertNotIn(listeners, set(self.srv.pipeline_listeners))
|
||||
listener = self.srv.pipeline_listener
|
||||
self.assertEqual(20,
|
||||
len(self.srv.pipeline_listener.dispatcher.targets))
|
||||
self.srv._configure_pipeline_listener()
|
||||
self.assertEqual(20,
|
||||
len(self.srv.pipeline_listener.dispatcher.targets))
|
||||
self.assertIsNot(listener, self.srv.pipeline_listener)
|
||||
self.srv.stop()
|
||||
|
||||
def test_retain_common_listeners_on_refresh(self):
|
||||
def test_retain_common_targets_on_refresh(self):
|
||||
with mock.patch('ceilometer.coordination.PartitionCoordinator'
|
||||
'.extract_my_subset', return_value=[1, 2]):
|
||||
self.srv.start()
|
||||
self.assertEqual(4, len(self.srv.pipeline_listeners))
|
||||
listeners = [listener for listener in self.srv.pipeline_listeners]
|
||||
listened_before = [target.topic for target in
|
||||
self.srv.pipeline_listener.dispatcher.targets]
|
||||
self.assertEqual(4, len(listened_before))
|
||||
with mock.patch('ceilometer.coordination.PartitionCoordinator'
|
||||
'.extract_my_subset', return_value=[1, 3]):
|
||||
self.srv._refresh_agent(None)
|
||||
self.assertEqual(4, len(self.srv.pipeline_listeners))
|
||||
for listener in listeners:
|
||||
if listener.dispatcher.targets[0].topic.endswith('1'):
|
||||
self.assertIn(listener, set(self.srv.pipeline_listeners))
|
||||
else:
|
||||
self.assertNotIn(listener, set(self.srv.pipeline_listeners))
|
||||
listened_after = [target.topic for target in
|
||||
self.srv.pipeline_listener.dispatcher.targets]
|
||||
self.assertEqual(4, len(listened_after))
|
||||
common = set(listened_before) & set(listened_after)
|
||||
for topic in common:
|
||||
self.assertTrue(topic.endswith('1'))
|
||||
self.srv.stop()
|
||||
|
||||
def test_notify_to_relevant_endpoint(self):
|
||||
self.srv.start()
|
||||
dispatcher = self.srv.pipeline_listener.dispatcher
|
||||
self.assertIsNotEmpty(dispatcher.targets)
|
||||
|
||||
endpoints = {}
|
||||
|
||||
for endpoint in dispatcher.endpoints:
|
||||
self.assertEqual(1, len(endpoint.publish_context.pipelines))
|
||||
pipe = list(endpoint.publish_context.pipelines)[0]
|
||||
endpoints[pipe.name] = endpoint
|
||||
|
||||
notifiers = []
|
||||
notifiers.extend(self.srv.pipe_manager.transporters[0][2])
|
||||
notifiers.extend(self.srv.event_pipe_manager.transporters[0][2])
|
||||
for notifier in notifiers:
|
||||
filter_rule = endpoints[notifier.publisher_id].filter_rule
|
||||
self.assertEqual(True, filter_rule.match(None,
|
||||
notifier.publisher_id,
|
||||
None, None, None))
|
||||
|
||||
self.srv.stop()
|
||||
|
||||
@mock.patch('oslo_messaging.Notifier.sample')
|
||||
|
|
Loading…
Reference in New Issue