Merge "allow configurable pipeline partitioning"

This commit is contained in:
Jenkins 2015-09-03 20:20:30 +00:00 committed by Gerrit Code Review
commit 429ebe3432
2 changed files with 30 additions and 18 deletions
ceilometer

@ -34,6 +34,12 @@ LOG = log.getLogger(__name__)
OPTS = [
cfg.IntOpt('pipeline_processing_queues',
default=10,
min=1,
help='Number of queues to parallelize workload across. This '
'value should be larger than the number of active '
'notification agents for optimal results.'),
cfg.BoolOpt('ack_on_event_error',
default=True,
deprecated_group='collector',
@ -96,12 +102,12 @@ class NotificationService(service_base.BaseService):
def _get_notifiers(self, transport, pipe):
notifiers = []
for agent in self.partition_coordinator._get_members(self.group_id):
for x in range(cfg.CONF.notification.pipeline_processing_queues):
notifiers.append(oslo_messaging.Notifier(
transport,
driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id='ceilometer.notification',
topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, agent)))
topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x)))
return notifiers
def _get_pipe_manager(self, transport, pipeline_manager):
@ -238,20 +244,24 @@ class NotificationService(service_base.BaseService):
ev_pipes = self.event_pipeline_manager.pipelines
pipelines = self.pipeline_manager.pipelines + ev_pipes
transport = messaging.get_transport()
for pipe in pipelines:
LOG.debug('Pipeline endpoint: %s', pipe.name)
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline) else
pipeline.SamplePipelineEndpoint)
listener = messaging.get_notification_listener(
transport,
[oslo_messaging.Target(
topic='%s-%s-%s' % (self.NOTIFICATION_IPC,
pipe.name,
self.partition_coordinator._my_id))],
[pipe_endpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues))
for pipe_set in partitioned:
for pipe in pipelines:
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
pipe_set)
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint)
listener = messaging.get_notification_listener(
transport,
[oslo_messaging.Target(
topic='%s-%s-%s' % (self.NOTIFICATION_IPC,
pipe.name, pipe_set))],
[pipe_endpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
def stop(self):
if self.partition_coordinator:

@ -390,9 +390,9 @@ class TestRealNotificationHA(BaseRealNotification):
def test_reset_listeners_on_refresh(self):
self.srv.start()
self.assertEqual(2, len(self.srv.pipeline_listeners))
self.assertEqual(20, len(self.srv.pipeline_listeners))
self.srv._refresh_listeners()
self.assertEqual(2, len(self.srv.pipeline_listeners))
self.assertEqual(20, len(self.srv.pipeline_listeners))
self.srv.stop()
@mock.patch('oslo_messaging.Notifier.sample')
@ -463,6 +463,8 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
group="notification")
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.CONF.set_override('pipeline_processing_queues', 2,
group='notification')
self.publisher = test_publisher.TestPublisher("")
self.publisher2 = test_publisher.TestPublisher("")