allow configurable pipeline partitioning

existing code will create queues from the cartesian product of the
set of pipelines and the set of agents. these queues are identified
by pipe_type-pipe_name-agent_id. the problem arises where if an agent
dies, any messages left on it's queue will be lost as agent_id's tend
to be unique.

this patch changes the logic so rather than using the number of agents
as the set we build our cartesian product from, we use a set of numbers
of a configurable size. by default, it will be a set of 10 so that queues
are created from the product set of the set of pipelines and the set
{0, 1, 2, 3, ... 9}. similarly, the pipelines queues will be identified
by pipe_type-pipe_name-x, where x is value from numerical set. this
way, if an agent dies, the queues are still known and can be
redistributed and processed among the active agents.

Change-Id: I35e3afdd2c9f97ab5bc6bb89531bd20308c5f8c0
Closes-Bug: #1486754
This commit is contained in:
gordon chung 2015-08-19 17:33:41 -04:00
parent fcf5b4e7e2
commit 2e4b807131
2 changed files with 30 additions and 18 deletions

View File

@ -34,6 +34,12 @@ LOG = log.getLogger(__name__)
OPTS = [ OPTS = [
cfg.IntOpt('pipeline_processing_queues',
default=10,
min=1,
help='Number of queues to parallelize workload across. This '
'value should be larger than the number of active '
'notification agents for optimal results.'),
cfg.BoolOpt('ack_on_event_error', cfg.BoolOpt('ack_on_event_error',
default=True, default=True,
deprecated_group='collector', deprecated_group='collector',
@ -96,12 +102,12 @@ class NotificationService(service_base.BaseService):
def _get_notifiers(self, transport, pipe): def _get_notifiers(self, transport, pipe):
notifiers = [] notifiers = []
for agent in self.partition_coordinator._get_members(self.group_id): for x in range(cfg.CONF.notification.pipeline_processing_queues):
notifiers.append(oslo_messaging.Notifier( notifiers.append(oslo_messaging.Notifier(
transport, transport,
driver=cfg.CONF.publisher_notifier.telemetry_driver, driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id='ceilometer.notification', publisher_id='ceilometer.notification',
topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, agent))) topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x)))
return notifiers return notifiers
def _get_pipe_manager(self, transport, pipeline_manager): def _get_pipe_manager(self, transport, pipeline_manager):
@ -238,20 +244,24 @@ class NotificationService(service_base.BaseService):
ev_pipes = self.event_pipeline_manager.pipelines ev_pipes = self.event_pipeline_manager.pipelines
pipelines = self.pipeline_manager.pipelines + ev_pipes pipelines = self.pipeline_manager.pipelines + ev_pipes
transport = messaging.get_transport() transport = messaging.get_transport()
for pipe in pipelines: partitioned = self.partition_coordinator.extract_my_subset(
LOG.debug('Pipeline endpoint: %s', pipe.name) self.group_id,
pipe_endpoint = (pipeline.EventPipelineEndpoint range(cfg.CONF.notification.pipeline_processing_queues))
if isinstance(pipe, pipeline.EventPipeline) else for pipe_set in partitioned:
pipeline.SamplePipelineEndpoint) for pipe in pipelines:
listener = messaging.get_notification_listener( LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
transport, pipe_set)
[oslo_messaging.Target( pipe_endpoint = (pipeline.EventPipelineEndpoint
topic='%s-%s-%s' % (self.NOTIFICATION_IPC, if isinstance(pipe, pipeline.EventPipeline)
pipe.name, else pipeline.SamplePipelineEndpoint)
self.partition_coordinator._my_id))], listener = messaging.get_notification_listener(
[pipe_endpoint(self.ctxt, pipe)]) transport,
listener.start() [oslo_messaging.Target(
self.pipeline_listeners.append(listener) topic='%s-%s-%s' % (self.NOTIFICATION_IPC,
pipe.name, pipe_set))],
[pipe_endpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
def stop(self): def stop(self):
if self.partition_coordinator: if self.partition_coordinator:

View File

@ -390,9 +390,9 @@ class TestRealNotificationHA(BaseRealNotification):
def test_reset_listeners_on_refresh(self): def test_reset_listeners_on_refresh(self):
self.srv.start() self.srv.start()
self.assertEqual(2, len(self.srv.pipeline_listeners)) self.assertEqual(20, len(self.srv.pipeline_listeners))
self.srv._refresh_listeners() self.srv._refresh_listeners()
self.assertEqual(2, len(self.srv.pipeline_listeners)) self.assertEqual(20, len(self.srv.pipeline_listeners))
self.srv.stop() self.srv.stop()
@mock.patch('oslo_messaging.Notifier.sample') @mock.patch('oslo_messaging.Notifier.sample')
@ -463,6 +463,8 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
group="notification") group="notification")
self.CONF.set_override('workload_partitioning', True, self.CONF.set_override('workload_partitioning', True,
group='notification') group='notification')
self.CONF.set_override('pipeline_processing_queues', 2,
group='notification')
self.publisher = test_publisher.TestPublisher("") self.publisher = test_publisher.TestPublisher("")
self.publisher2 = test_publisher.TestPublisher("") self.publisher2 = test_publisher.TestPublisher("")