improve notification processing
this patches does multiple things: - it ignores batching when pulling from main queue to maintain as much ordering as possible. this avoids related messages split across multiple batches and one batch starting much farther along than another - it sets pipeline processing listeners to single thread. at this stage, the pipeline queue contains less messages and therefore very likely, thread1 and thread2 will grab related messages and race to set cache - adds sorting to pipeline queue so if batching is enabled, we can further ensure that messages are in order before processing. - enables batching by default. (one thread per listener grabbing one message at a time will be slow.) - offers better batching for direct to db dispatcher as resources are grouped Change-Id: Iac5b552bae1e73f93cbfc830b1e83510b1aceb9e
This commit is contained in:
@@ -73,10 +73,12 @@ OPTS = [
|
||||
"notifications go to rabbit-nova:5672, while all "
|
||||
"cinder notifications go to rabbit-cinder:5672."),
|
||||
cfg.IntOpt('batch_size',
|
||||
default=1,
|
||||
default=100, min=1,
|
||||
help='Number of notification messages to wait before '
|
||||
'publishing them'),
|
||||
'publishing them. Batching is advised when transformations are'
|
||||
'applied in pipeline.'),
|
||||
cfg.IntOpt('batch_timeout',
|
||||
default=5,
|
||||
help='Number of seconds to wait before publishing samples'
|
||||
'when batch_size is not reached (None means indefinitely)'),
|
||||
]
|
||||
@@ -258,10 +260,10 @@ class NotificationService(service_base.PipelineBasedService):
|
||||
urls = cfg.CONF.notification.messaging_urls or [None]
|
||||
for url in urls:
|
||||
transport = messaging.get_transport(url)
|
||||
# NOTE(gordc): ignore batching as we want pull
|
||||
# to maintain sequencing as much as possible.
|
||||
listener = messaging.get_batch_notification_listener(
|
||||
transport, targets, endpoints,
|
||||
batch_size=cfg.CONF.notification.batch_size,
|
||||
batch_timeout=cfg.CONF.notification.batch_timeout)
|
||||
transport, targets, endpoints)
|
||||
listener.start()
|
||||
self.listeners.append(listener)
|
||||
|
||||
@@ -309,7 +311,10 @@ class NotificationService(service_base.PipelineBasedService):
|
||||
endpoints,
|
||||
batch_size=cfg.CONF.notification.batch_size,
|
||||
batch_timeout=cfg.CONF.notification.batch_timeout)
|
||||
self.pipeline_listener.start()
|
||||
# NOTE(gordc): set single thread to process data sequentially
|
||||
# if batching enabled.
|
||||
batch = (1 if cfg.CONF.notification.batch_size > 1 else None)
|
||||
self.pipeline_listener.start(override_pool_size=batch)
|
||||
|
||||
def terminate(self):
|
||||
self.shutdown = True
|
||||
|
@@ -17,6 +17,7 @@
|
||||
import abc
|
||||
import hashlib
|
||||
from itertools import chain
|
||||
from operator import methodcaller
|
||||
import os
|
||||
|
||||
from oslo_config import cfg
|
||||
@@ -105,7 +106,7 @@ class SamplePipelineEndpoint(PipelineEndpoint):
|
||||
s, cfg.CONF.publisher.telemetry_secret)
|
||||
]
|
||||
with self.publish_context as p:
|
||||
p(samples)
|
||||
p(sorted(samples, key=methodcaller('get_iso_timestamp')))
|
||||
|
||||
|
||||
class EventPipelineEndpoint(PipelineEndpoint):
|
||||
|
@@ -24,7 +24,7 @@ import copy
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('sample_source',
|
||||
@@ -99,6 +99,10 @@ class Sample(object):
|
||||
def set_timestamp(self, timestamp):
|
||||
self.timestamp = timestamp
|
||||
|
||||
def get_iso_timestamp(self):
|
||||
return timeutils.parse_isotime(self.timestamp)
|
||||
|
||||
|
||||
TYPE_GAUGE = 'gauge'
|
||||
TYPE_DELTA = 'delta'
|
||||
TYPE_CUMULATIVE = 'cumulative'
|
||||
|
@@ -379,6 +379,16 @@ class TestRealNotificationHA(BaseRealNotification):
|
||||
fake_publisher_cls.return_value = self.publisher
|
||||
self._check_notification_service()
|
||||
|
||||
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
|
||||
def test_notification_threads(self, m_listener):
|
||||
self.CONF.set_override('batch_size', 1, group='notification')
|
||||
self.srv.run()
|
||||
m_listener.assert_called_with(override_pool_size=None)
|
||||
m_listener.reset_mock()
|
||||
self.CONF.set_override('batch_size', 2, group='notification')
|
||||
self.srv.run()
|
||||
m_listener.assert_called_with(override_pool_size=1)
|
||||
|
||||
@mock.patch('oslo_messaging.get_batch_notification_listener')
|
||||
def test_reset_listener_on_refresh(self, mock_listener):
|
||||
mock_listener.side_effect = [
|
||||
|
@@ -0,0 +1,11 @@
|
||||
---
|
||||
upgrade:
|
||||
- Batching is enabled by default now when coordinated workers are enabled.
|
||||
Depending on load, it is recommended to scale out the number of
|
||||
`pipeline_processing_queues` to improve distribution. `batch_size` should
|
||||
also be configured accordingly.
|
||||
fixes:
|
||||
- Fix to improve handling messages in environments heavily backed up.
|
||||
Previously, notification handlers greedily grabbed messages from queues
|
||||
which could cause ordering issues. A fix was applied to sequentially
|
||||
process messages in a single thread to prevent ordering issues.
|
Reference in New Issue
Block a user