queues per manager rather than per pipeline

currently we create queue per pipeline which is not necessary. it
creates more memory usage and doesn't necessarily distribute
work more effectively. this hashes data to queues based on
manager but still internally, the data is destined to specific
pipeline based on event_type. this will minimise queue usage while
keeping internal code path the same.

Change-Id: I0ccd51f13457f208fe2ccedb6e680c91e132f78f
This commit is contained in:
gord chung 2017-11-03 22:18:33 +00:00
parent 60d9b87a80
commit 911b973d70
4 changed files with 37 additions and 29 deletions

@ -105,7 +105,7 @@ class NotificationService(cotyledon.Service):
proper active/active HA.
"""
NOTIFICATION_NAMESPACE = 'ceilometer.notification'
NOTIFICATION_NAMESPACE = 'ceilometer.notification.v2'
def __init__(self, worker_id, conf, coordination_id=None):
super(NotificationService, self).__init__(worker_id)
@ -229,14 +229,10 @@ class NotificationService(cotyledon.Service):
endpoints.extend(pipe_mgr.get_interim_endpoints())
targets = []
for mgr in self.managers:
for pipe_set, pipe in itertools.product(partitioned,
mgr.pipelines):
LOG.debug('Pipeline endpoint: %s from set: %s',
pipe.name, pipe_set)
topic = '%s-%s-%s' % (pipe.NOTIFICATION_IPC,
pipe.name, pipe_set)
targets.append(oslo_messaging.Target(topic=topic))
for mgr, hash_id in itertools.product(self.managers, partitioned):
topic = '-'.join([mgr.NOTIFICATION_IPC, mgr.pm_type, str(hash_id)])
LOG.debug('Listening to queue: %s', topic)
targets.append(oslo_messaging.Target(topic=topic))
if self.pipeline_listener:
self.pipeline_listener.stop()

@ -48,21 +48,19 @@ class PipelineException(agent.ConfigException):
class InterimPublishContext(object):
"""Publisher to hash/shard data to pipelines"""
def __init__(self, conf, pipelines):
def __init__(self, conf, mgr):
self.conf = conf
self.pipe_notifiers = []
transport = messaging.get_transport(conf)
for pipe in pipelines:
self.pipe_notifiers.append(
(pipe, self._get_notifiers(transport, pipe)))
self.mgr = mgr
self.notifiers = self._get_notifiers(messaging.get_transport(conf))
def _get_notifiers(self, transport, pipe):
def _get_notifiers(self, transport):
notifiers = []
for x in range(self.conf.notification.pipeline_processing_queues):
notifiers.append(oslo_messaging.Notifier(
transport,
driver=self.conf.publisher_notifier.telemetry_driver,
topics=['%s-%s-%s' % (pipe.NOTIFICATION_IPC, pipe.name, x)]))
topics=['-'.join(
[self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])]))
return notifiers
@staticmethod
@ -79,15 +77,14 @@ class InterimPublishContext(object):
def p(data):
data = [data] if not isinstance(data, list) else data
for datapoint in data:
for pipe, notifiers in self.pipe_notifiers:
for pipe in self.mgr.pipelines:
if pipe.supported(datapoint):
serialized_data = pipe.serializer(datapoint)
key = (self.hash_grouping(serialized_data,
pipe.get_grouping_key())
% len(notifiers))
notifier = notifiers[key]
notifier.sample({}, event_type=pipe.name,
payload=[serialized_data])
% len(self.notifiers))
self.notifiers[key].sample({}, event_type=pipe.name,
payload=[serialized_data])
return p
def __exit__(self, exc_type, exc_value, traceback):
@ -221,8 +218,6 @@ class Sink(object):
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
NOTIFICATION_IPC = 'ceilometer-pipe'
def __init__(self, conf, source, sink):
self.conf = conf
self.source = source
@ -284,6 +279,8 @@ class PipelineManager(agent.ConfigManagerBase):
Pipeline manager sets up pipelines according to config file
"""
NOTIFICATION_IPC = 'ceilometer_ipc'
def __init__(self, conf, cfg_file, transformer_manager, partition):
"""Setup the pipelines according to config.
@ -408,7 +405,7 @@ class PipelineManager(agent.ConfigManagerBase):
def interim_publisher(self):
"""Build publishing context for IPC."""
return InterimPublishContext(self.conf, self.pipelines)
return InterimPublishContext(self.conf, self)
def get_main_publisher(self):
"""Return the publishing context to use"""

@ -300,10 +300,8 @@ class TestRealNotificationHA(BaseRealNotification):
topics = [target.topic for target in mock_listener.call_args[0][1]]
self.assertEqual(4, len(topics))
self.assertEqual(
{'ceilometer-pipe-test_pipeline:test_sink-4',
'ceilometer-pipe-event:test_event:test_sink-4',
'ceilometer-pipe-event:test_event:test_sink-9',
'ceilometer-pipe-test_pipeline:test_sink-9'},
{'ceilometer_ipc-sample-4', 'ceilometer_ipc-sample-9',
'ceilometer_ipc-event-4', 'ceilometer_ipc-event-9'},
set(topics))
@mock.patch('oslo_messaging.get_batch_notification_listener')

@ -0,0 +1,17 @@
---
features:
- |
Workload partitioning of notification agent is now split into queues
based on pipeline type (sample, event, etc...) rather than per individual
pipeline. This will save some memory usage specifically for pipeline
definitions with many source/sink combinations.
upgrade:
- |
If workload partitioning of the notification agent is enabled, the
notification agent should not run alongside pre-Queens agents. Doing so
may result in missed samples when leveraging transformations. To upgrade
without loss of data, set `notification_control_exchanges` option to
empty so only existing `ceilometer-pipe-*` queues are processed. Once
cleared, reset `notification_control_exchanges` option and launch the new
notification agent(s). If `workload_partitioning` is not enabled, no
special steps are required.