From 5750fddf288c749cacfc825753928f66e755758d Mon Sep 17 00:00:00 2001 From: gordon chung Date: Wed, 3 Feb 2016 09:59:55 -0500 Subject: [PATCH] improve notification processing this patches does multiple things: - it ignores batching when pulling from main queue to maintain as much ordering as possible. this avoids related messages split across multiple batches and one batch starting much farther along than another - it sets pipeline processing listeners to single thread. at this stage, the pipeline queue contains less messages and therefore very likely, thread1 and thread2 will grab related messages and race to set cache - adds sorting to pipeline queue so if batching is enabled, we can further ensure that messages are in order before processing. - enables batching by default. (one thread per listener grabbing one message at a time will be slow.) - offers better batching for direct to db dispatcher as resources are grouped Change-Id: Iac5b552bae1e73f93cbfc830b1e83510b1aceb9e --- ceilometer/notification.py | 17 +++++++++++------ ceilometer/pipeline.py | 3 ++- ceilometer/sample.py | 6 +++++- .../tests/functional/test_notification.py | 10 ++++++++++ ...ingle-thread-pipelines-f9e6ac4b062747fe.yaml | 11 +++++++++++ 5 files changed, 39 insertions(+), 8 deletions(-) create mode 100644 releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 1207bd58b6..7383907e8a 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -73,10 +73,12 @@ OPTS = [ "notifications go to rabbit-nova:5672, while all " "cinder notifications go to rabbit-cinder:5672."), cfg.IntOpt('batch_size', - default=1, + default=100, min=1, help='Number of notification messages to wait before ' - 'publishing them'), + 'publishing them. Batching is advised when transformations are' + 'applied in pipeline.'), cfg.IntOpt('batch_timeout', + default=5, help='Number of seconds to wait before publishing samples' 'when batch_size is not reached (None means indefinitely)'), ] @@ -258,10 +260,10 @@ class NotificationService(service_base.PipelineBasedService): urls = cfg.CONF.notification.messaging_urls or [None] for url in urls: transport = messaging.get_transport(url) + # NOTE(gordc): ignore batching as we want pull + # to maintain sequencing as much as possible. listener = messaging.get_batch_notification_listener( - transport, targets, endpoints, - batch_size=cfg.CONF.notification.batch_size, - batch_timeout=cfg.CONF.notification.batch_timeout) + transport, targets, endpoints) listener.start() self.listeners.append(listener) @@ -309,7 +311,10 @@ class NotificationService(service_base.PipelineBasedService): endpoints, batch_size=cfg.CONF.notification.batch_size, batch_timeout=cfg.CONF.notification.batch_timeout) - self.pipeline_listener.start() + # NOTE(gordc): set single thread to process data sequentially + # if batching enabled. + batch = (1 if cfg.CONF.notification.batch_size > 1 else None) + self.pipeline_listener.start(override_pool_size=batch) def terminate(self): self.shutdown = True diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index e174608ab7..5067c4656c 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -17,6 +17,7 @@ import abc import hashlib from itertools import chain +from operator import methodcaller import os from oslo_config import cfg @@ -105,7 +106,7 @@ class SamplePipelineEndpoint(PipelineEndpoint): s, cfg.CONF.publisher.telemetry_secret) ] with self.publish_context as p: - p(samples) + p(sorted(samples, key=methodcaller('get_iso_timestamp'))) class EventPipelineEndpoint(PipelineEndpoint): diff --git a/ceilometer/sample.py b/ceilometer/sample.py index 933bc94880..0f8e54e37f 100644 --- a/ceilometer/sample.py +++ b/ceilometer/sample.py @@ -24,7 +24,7 @@ import copy import uuid from oslo_config import cfg - +from oslo_utils import timeutils OPTS = [ cfg.StrOpt('sample_source', @@ -99,6 +99,10 @@ class Sample(object): def set_timestamp(self, timestamp): self.timestamp = timestamp + def get_iso_timestamp(self): + return timeutils.parse_isotime(self.timestamp) + + TYPE_GAUGE = 'gauge' TYPE_DELTA = 'delta' TYPE_CUMULATIVE = 'cumulative' diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 02452d1a90..8b482ba625 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -379,6 +379,16 @@ class TestRealNotificationHA(BaseRealNotification): fake_publisher_cls.return_value = self.publisher self._check_notification_service() + @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start') + def test_notification_threads(self, m_listener): + self.CONF.set_override('batch_size', 1, group='notification') + self.srv.run() + m_listener.assert_called_with(override_pool_size=None) + m_listener.reset_mock() + self.CONF.set_override('batch_size', 2, group='notification') + self.srv.run() + m_listener.assert_called_with(override_pool_size=1) + @mock.patch('oslo_messaging.get_batch_notification_listener') def test_reset_listener_on_refresh(self, mock_listener): mock_listener.side_effect = [ diff --git a/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml b/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml new file mode 100644 index 0000000000..c041e90101 --- /dev/null +++ b/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml @@ -0,0 +1,11 @@ +--- +upgrade: + - Batching is enabled by default now when coordinated workers are enabled. + Depending on load, it is recommended to scale out the number of + `pipeline_processing_queues` to improve distribution. `batch_size` should + also be configured accordingly. +fixes: + - Fix to improve handling messages in environments heavily backed up. + Previously, notification handlers greedily grabbed messages from queues + which could cause ordering issues. A fix was applied to sequentially + process messages in a single thread to prevent ordering issues.