Merge "queues per manager rather than per pipeline"

This commit is contained in:
Zuul 2017-11-21 03:57:10 +00:00 committed by Gerrit Code Review
commit 6d27648c29
4 changed files with 37 additions and 29 deletions

@ -105,7 +105,7 @@ class NotificationService(cotyledon.Service):
proper active/active HA.
"""
NOTIFICATION_NAMESPACE = 'ceilometer.notification'
NOTIFICATION_NAMESPACE = 'ceilometer.notification.v2'
def __init__(self, worker_id, conf, coordination_id=None):
super(NotificationService, self).__init__(worker_id)
@ -229,13 +229,9 @@ class NotificationService(cotyledon.Service):
endpoints.extend(pipe_mgr.get_interim_endpoints())
targets = []
for mgr in self.managers:
for pipe_set, pipe in itertools.product(partitioned,
mgr.pipelines):
LOG.debug('Pipeline endpoint: %s from set: %s',
pipe.name, pipe_set)
topic = '%s-%s-%s' % (pipe.NOTIFICATION_IPC,
pipe.name, pipe_set)
for mgr, hash_id in itertools.product(self.managers, partitioned):
topic = '-'.join([mgr.NOTIFICATION_IPC, mgr.pm_type, str(hash_id)])
LOG.debug('Listening to queue: %s', topic)
targets.append(oslo_messaging.Target(topic=topic))
if self.pipeline_listener:

@ -48,21 +48,19 @@ class PipelineException(agent.ConfigException):
class InterimPublishContext(object):
"""Publisher to hash/shard data to pipelines"""
def __init__(self, conf, pipelines):
def __init__(self, conf, mgr):
self.conf = conf
self.pipe_notifiers = []
transport = messaging.get_transport(conf)
for pipe in pipelines:
self.pipe_notifiers.append(
(pipe, self._get_notifiers(transport, pipe)))
self.mgr = mgr
self.notifiers = self._get_notifiers(messaging.get_transport(conf))
def _get_notifiers(self, transport, pipe):
def _get_notifiers(self, transport):
notifiers = []
for x in range(self.conf.notification.pipeline_processing_queues):
notifiers.append(oslo_messaging.Notifier(
transport,
driver=self.conf.publisher_notifier.telemetry_driver,
topics=['%s-%s-%s' % (pipe.NOTIFICATION_IPC, pipe.name, x)]))
topics=['-'.join(
[self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])]))
return notifiers
@staticmethod
@ -79,14 +77,13 @@ class InterimPublishContext(object):
def p(data):
data = [data] if not isinstance(data, list) else data
for datapoint in data:
for pipe, notifiers in self.pipe_notifiers:
for pipe in self.mgr.pipelines:
if pipe.supported(datapoint):
serialized_data = pipe.serializer(datapoint)
key = (self.hash_grouping(serialized_data,
pipe.get_grouping_key())
% len(notifiers))
notifier = notifiers[key]
notifier.sample({}, event_type=pipe.name,
% len(self.notifiers))
self.notifiers[key].sample({}, event_type=pipe.name,
payload=[serialized_data])
return p
@ -221,8 +218,6 @@ class Sink(object):
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
NOTIFICATION_IPC = 'ceilometer-pipe'
def __init__(self, conf, source, sink):
self.conf = conf
self.source = source
@ -284,6 +279,8 @@ class PipelineManager(agent.ConfigManagerBase):
Pipeline manager sets up pipelines according to config file
"""
NOTIFICATION_IPC = 'ceilometer_ipc'
def __init__(self, conf, cfg_file, transformer_manager, partition):
"""Setup the pipelines according to config.
@ -408,7 +405,7 @@ class PipelineManager(agent.ConfigManagerBase):
def interim_publisher(self):
"""Build publishing context for IPC."""
return InterimPublishContext(self.conf, self.pipelines)
return InterimPublishContext(self.conf, self)
def get_main_publisher(self):
"""Return the publishing context to use"""

@ -300,10 +300,8 @@ class TestRealNotificationHA(BaseRealNotification):
topics = [target.topic for target in mock_listener.call_args[0][1]]
self.assertEqual(4, len(topics))
self.assertEqual(
{'ceilometer-pipe-test_pipeline:test_sink-4',
'ceilometer-pipe-event:test_event:test_sink-4',
'ceilometer-pipe-event:test_event:test_sink-9',
'ceilometer-pipe-test_pipeline:test_sink-9'},
{'ceilometer_ipc-sample-4', 'ceilometer_ipc-sample-9',
'ceilometer_ipc-event-4', 'ceilometer_ipc-event-9'},
set(topics))
@mock.patch('oslo_messaging.get_batch_notification_listener')

@ -0,0 +1,17 @@
---
features:
- |
Workload partitioning of notification agent is now split into queues
based on pipeline type (sample, event, etc...) rather than per individual
pipeline. This will save some memory usage specifically for pipeline
definitions with many source/sink combinations.
upgrade:
- |
If workload partitioning of the notification agent is enabled, the
notification agent should not run alongside pre-Queens agents. Doing so
may result in missed samples when leveraging transformations. To upgrade
without loss of data, set `notification_control_exchanges` option to
empty so only existing `ceilometer-pipe-*` queues are processed. Once
cleared, reset `notification_control_exchanges` option and launch the new
notification agent(s). If `workload_partitioning` is not enabled, no
special steps are required.