Merge "Support kafka message compression"

This commit is contained in:
Zuul 2019-08-05 15:53:35 +00:00 committed by Gerrit Code Review
commit dfc8fe9a8f
5 changed files with 41 additions and 0 deletions

View File

@ -166,6 +166,13 @@ Notifier Options
- :oslo.config:option:`oslo_messaging_kafka.producer_batch_timeout` - :oslo.config:option:`oslo_messaging_kafka.producer_batch_timeout`
- :oslo.config:option:`oslo_messaging_kafka.producer_batch_size` - :oslo.config:option:`oslo_messaging_kafka.producer_batch_size`
compression_codec
The compression codec for all data generated by the producer, valid values
are: none, gzip, snappy, lz4, zstd. Note that the legal option of this
depends on the kafka version, please refer to `kafka documentation`_.
.. _kafka documentation: https://kafka.apache.org/documentation/
Security Options Security Options
^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^

View File

@ -255,6 +255,7 @@ class ProducerConnection(Connection):
super(ProducerConnection, self).__init__(conf, url) super(ProducerConnection, self).__init__(conf, url)
self.batch_size = self.driver_conf.producer_batch_size self.batch_size = self.driver_conf.producer_batch_size
self.linger_ms = self.driver_conf.producer_batch_timeout * 1000 self.linger_ms = self.driver_conf.producer_batch_timeout * 1000
self.compression_codec = self.driver_conf.compression_codec
self.producer = None self.producer = None
self.producer_lock = threading.Lock() self.producer_lock = threading.Lock()
@ -317,6 +318,7 @@ class ProducerConnection(Connection):
'bootstrap.servers': ",".join(self.hostaddrs), 'bootstrap.servers': ",".join(self.hostaddrs),
'linger.ms': self.linger_ms, 'linger.ms': self.linger_ms,
'batch.num.messages': self.batch_size, 'batch.num.messages': self.batch_size,
'compression.codec': self.compression_codec,
'security.protocol': self.security_protocol, 'security.protocol': self.security_protocol,
'sasl.mechanism': self.sasl_mechanism, 'sasl.mechanism': self.sasl_mechanism,
'sasl.username': self.username, 'sasl.username': self.username,

View File

@ -48,6 +48,13 @@ KAFKA_OPTS = [
cfg.IntOpt('producer_batch_size', default=16384, cfg.IntOpt('producer_batch_size', default=16384,
help='Size of batch for the producer async send'), help='Size of batch for the producer async send'),
cfg.StrOpt('compression_codec', default='none',
choices=['none', 'gzip', 'snappy', 'lz4', 'zstd'],
help='The compression codec for all data generated by the '
'producer. Valid values are: gzip, snappy, lz4, zstd. If '
'not set, compression will not be used. Note that the '
'legal option of this depends on the kafka version'),
cfg.BoolOpt('enable_auto_commit', cfg.BoolOpt('enable_auto_commit',
default=False, default=False,
help='Enable asynchronous consumer commits'), help='Enable asynchronous consumer commits'),

View File

@ -108,6 +108,7 @@ class TestKafkaDriver(test_utils.BaseTestCase):
'bootstrap.servers': '', 'bootstrap.servers': '',
'linger.ms': mock.ANY, 'linger.ms': mock.ANY,
'batch.num.messages': mock.ANY, 'batch.num.messages': mock.ANY,
'compression.codec': 'none',
'security.protocol': 'PLAINTEXT', 'security.protocol': 'PLAINTEXT',
'sasl.mechanism': 'PLAIN', 'sasl.mechanism': 'PLAIN',
'sasl.username': mock.ANY, 'sasl.username': mock.ANY,

View File

@ -539,3 +539,27 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(100, len(events[0][1])) self.assertEqual(100, len(events[0][1]))
self.assertEqual(100, len(events[1][1])) self.assertEqual(100, len(events[1][1]))
self.assertEqual(5, len(events[2][1])) self.assertEqual(5, len(events[2][1]))
def test_compression(self):
get_timeout = 1
if self.url.startswith("amqp:"):
self.conf.set_override('kombu_compression', 'gzip',
group='oslo_messaging_rabbit')
if self.url.startswith("kafka://"):
get_timeout = 5
self.conf.set_override('compression_codec', 'gzip',
group='oslo_messaging_kafka')
self.conf.set_override('consumer_group', 'test_compression',
group='oslo_messaging_kafka')
listener = self.useFixture(
utils.NotificationFixture(self.conf, self.url,
['test_compression']))
notifier = listener.notifier('abc')
notifier.info({}, 'test', 'Hello World!')
event = listener.events.get(timeout=get_timeout)
self.assertEqual('info', event[0])
self.assertEqual('test', event[1])
self.assertEqual('Hello World!', event[2])
self.assertEqual('abc', event[3])