diff --git a/doc/source/admin/kafka.rst b/doc/source/admin/kafka.rst index 3fc3864c9..c581fb0ad 100644 --- a/doc/source/admin/kafka.rst +++ b/doc/source/admin/kafka.rst @@ -166,6 +166,13 @@ Notifier Options - :oslo.config:option:`oslo_messaging_kafka.producer_batch_timeout` - :oslo.config:option:`oslo_messaging_kafka.producer_batch_size` +compression_codec + The compression codec for all data generated by the producer, valid values + are: none, gzip, snappy, lz4, zstd. Note that the legal option of this + depends on the kafka version, please refer to `kafka documentation`_. + +.. _kafka documentation: https://kafka.apache.org/documentation/ + Security Options ^^^^^^^^^^^^^^^^ diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index dc4fe0960..07b473d0a 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -255,6 +255,7 @@ class ProducerConnection(Connection): super(ProducerConnection, self).__init__(conf, url) self.batch_size = self.driver_conf.producer_batch_size self.linger_ms = self.driver_conf.producer_batch_timeout * 1000 + self.compression_codec = self.driver_conf.compression_codec self.producer = None self.producer_lock = threading.Lock() @@ -317,6 +318,7 @@ class ProducerConnection(Connection): 'bootstrap.servers': ",".join(self.hostaddrs), 'linger.ms': self.linger_ms, 'batch.num.messages': self.batch_size, + 'compression.codec': self.compression_codec, 'security.protocol': self.security_protocol, 'sasl.mechanism': self.sasl_mechanism, 'sasl.username': self.username, diff --git a/oslo_messaging/_drivers/kafka_driver/kafka_options.py b/oslo_messaging/_drivers/kafka_driver/kafka_options.py index 5fbe7b220..42c990cd4 100644 --- a/oslo_messaging/_drivers/kafka_driver/kafka_options.py +++ b/oslo_messaging/_drivers/kafka_driver/kafka_options.py @@ -48,6 +48,13 @@ KAFKA_OPTS = [ cfg.IntOpt('producer_batch_size', default=16384, help='Size of batch for the producer async send'), + cfg.StrOpt('compression_codec', default='none', + choices=['none', 'gzip', 'snappy', 'lz4', 'zstd'], + help='The compression codec for all data generated by the ' + 'producer. Valid values are: gzip, snappy, lz4, zstd. If ' + 'not set, compression will not be used. Note that the ' + 'legal option of this depends on the kafka version'), + cfg.BoolOpt('enable_auto_commit', default=False, help='Enable asynchronous consumer commits'), diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 80af57651..0af8c053e 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -108,6 +108,7 @@ class TestKafkaDriver(test_utils.BaseTestCase): 'bootstrap.servers': '', 'linger.ms': mock.ANY, 'batch.num.messages': mock.ANY, + 'compression.codec': 'none', 'security.protocol': 'PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'sasl.username': mock.ANY, diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 6800f596f..384b37265 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -539,3 +539,27 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertEqual(100, len(events[0][1])) self.assertEqual(100, len(events[1][1])) self.assertEqual(5, len(events[2][1])) + + def test_compression(self): + get_timeout = 1 + if self.url.startswith("amqp:"): + self.conf.set_override('kombu_compression', 'gzip', + group='oslo_messaging_rabbit') + if self.url.startswith("kafka://"): + get_timeout = 5 + self.conf.set_override('compression_codec', 'gzip', + group='oslo_messaging_kafka') + self.conf.set_override('consumer_group', 'test_compression', + group='oslo_messaging_kafka') + + listener = self.useFixture( + utils.NotificationFixture(self.conf, self.url, + ['test_compression'])) + notifier = listener.notifier('abc') + + notifier.info({}, 'test', 'Hello World!') + event = listener.events.get(timeout=get_timeout) + self.assertEqual('info', event[0]) + self.assertEqual('test', event[1]) + self.assertEqual('Hello World!', event[2]) + self.assertEqual('abc', event[3])