diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 1207bd58b6..7383907e8a 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -73,10 +73,12 @@ OPTS = [ "notifications go to rabbit-nova:5672, while all " "cinder notifications go to rabbit-cinder:5672."), cfg.IntOpt('batch_size', - default=1, + default=100, min=1, help='Number of notification messages to wait before ' - 'publishing them'), + 'publishing them. Batching is advised when transformations are' + 'applied in pipeline.'), cfg.IntOpt('batch_timeout', + default=5, help='Number of seconds to wait before publishing samples' 'when batch_size is not reached (None means indefinitely)'), ] @@ -258,10 +260,10 @@ class NotificationService(service_base.PipelineBasedService): urls = cfg.CONF.notification.messaging_urls or [None] for url in urls: transport = messaging.get_transport(url) + # NOTE(gordc): ignore batching as we want pull + # to maintain sequencing as much as possible. listener = messaging.get_batch_notification_listener( - transport, targets, endpoints, - batch_size=cfg.CONF.notification.batch_size, - batch_timeout=cfg.CONF.notification.batch_timeout) + transport, targets, endpoints) listener.start() self.listeners.append(listener) @@ -309,7 +311,10 @@ class NotificationService(service_base.PipelineBasedService): endpoints, batch_size=cfg.CONF.notification.batch_size, batch_timeout=cfg.CONF.notification.batch_timeout) - self.pipeline_listener.start() + # NOTE(gordc): set single thread to process data sequentially + # if batching enabled. + batch = (1 if cfg.CONF.notification.batch_size > 1 else None) + self.pipeline_listener.start(override_pool_size=batch) def terminate(self): self.shutdown = True diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index e174608ab7..5067c4656c 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -17,6 +17,7 @@ import abc import hashlib from itertools import chain +from operator import methodcaller import os from oslo_config import cfg @@ -105,7 +106,7 @@ class SamplePipelineEndpoint(PipelineEndpoint): s, cfg.CONF.publisher.telemetry_secret) ] with self.publish_context as p: - p(samples) + p(sorted(samples, key=methodcaller('get_iso_timestamp'))) class EventPipelineEndpoint(PipelineEndpoint): diff --git a/ceilometer/sample.py b/ceilometer/sample.py index 933bc94880..0f8e54e37f 100644 --- a/ceilometer/sample.py +++ b/ceilometer/sample.py @@ -24,7 +24,7 @@ import copy import uuid from oslo_config import cfg - +from oslo_utils import timeutils OPTS = [ cfg.StrOpt('sample_source', @@ -99,6 +99,10 @@ class Sample(object): def set_timestamp(self, timestamp): self.timestamp = timestamp + def get_iso_timestamp(self): + return timeutils.parse_isotime(self.timestamp) + + TYPE_GAUGE = 'gauge' TYPE_DELTA = 'delta' TYPE_CUMULATIVE = 'cumulative' diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 02452d1a90..8b482ba625 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -379,6 +379,16 @@ class TestRealNotificationHA(BaseRealNotification): fake_publisher_cls.return_value = self.publisher self._check_notification_service() + @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start') + def test_notification_threads(self, m_listener): + self.CONF.set_override('batch_size', 1, group='notification') + self.srv.run() + m_listener.assert_called_with(override_pool_size=None) + m_listener.reset_mock() + self.CONF.set_override('batch_size', 2, group='notification') + self.srv.run() + m_listener.assert_called_with(override_pool_size=1) + @mock.patch('oslo_messaging.get_batch_notification_listener') def test_reset_listener_on_refresh(self, mock_listener): mock_listener.side_effect = [ diff --git a/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml b/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml new file mode 100644 index 0000000000..c041e90101 --- /dev/null +++ b/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml @@ -0,0 +1,11 @@ +--- +upgrade: + - Batching is enabled by default now when coordinated workers are enabled. + Depending on load, it is recommended to scale out the number of + `pipeline_processing_queues` to improve distribution. `batch_size` should + also be configured accordingly. +fixes: + - Fix to improve handling messages in environments heavily backed up. + Previously, notification handlers greedily grabbed messages from queues + which could cause ordering issues. A fix was applied to sequentially + process messages in a single thread to prevent ordering issues.