Merge pull request #454 from trbs/gzip_compressionlevel
allow to specify compression level for codecs which support this
This commit is contained in:
@@ -22,12 +22,15 @@ def has_snappy():
|
||||
return _HAS_SNAPPY
|
||||
|
||||
|
||||
def gzip_encode(payload):
|
||||
def gzip_encode(payload, compresslevel=None):
|
||||
if not compresslevel:
|
||||
compresslevel = 9
|
||||
|
||||
with BytesIO() as buf:
|
||||
|
||||
# Gzip context manager introduced in python 2.6
|
||||
# so old-fashioned way until we decide to not support 2.6
|
||||
gzipper = gzip.GzipFile(fileobj=buf, mode="w")
|
||||
gzipper = gzip.GzipFile(fileobj=buf, mode="w", compresslevel=compresslevel)
|
||||
try:
|
||||
gzipper.write(payload)
|
||||
finally:
|
||||
|
||||
@@ -47,7 +47,8 @@ SYNC_FAIL_ON_ERROR_DEFAULT = True
|
||||
def _send_upstream(queue, client, codec, batch_time, batch_size,
|
||||
req_acks, ack_timeout, retry_options, stop_event,
|
||||
log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
|
||||
stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
|
||||
stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
|
||||
codec_compresslevel=None):
|
||||
"""Private method to manage producing messages asynchronously
|
||||
|
||||
Listens on the queue for a specified number of messages or until
|
||||
@@ -123,7 +124,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
||||
|
||||
# Send collected requests upstream
|
||||
for topic_partition, msg in msgset.items():
|
||||
messages = create_message_set(msg, codec, key)
|
||||
messages = create_message_set(msg, codec, key, codec_compresslevel)
|
||||
req = ProduceRequest(topic_partition.topic,
|
||||
topic_partition.partition,
|
||||
tuple(messages))
|
||||
@@ -267,6 +268,7 @@ class Producer(object):
|
||||
req_acks=ACK_AFTER_LOCAL_WRITE,
|
||||
ack_timeout=DEFAULT_ACK_TIMEOUT,
|
||||
codec=None,
|
||||
codec_compresslevel=None,
|
||||
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
|
||||
async=False,
|
||||
batch_send=False, # deprecated, use async
|
||||
@@ -297,6 +299,7 @@ class Producer(object):
|
||||
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
|
||||
|
||||
self.codec = codec
|
||||
self.codec_compresslevel = codec_compresslevel
|
||||
|
||||
if self.async:
|
||||
# Messages are sent through this queue
|
||||
@@ -314,7 +317,8 @@ class Producer(object):
|
||||
self.req_acks, self.ack_timeout,
|
||||
async_retry_options, self.thread_stop_event),
|
||||
kwargs={'log_messages_on_error': async_log_messages_on_error,
|
||||
'stop_timeout': async_stop_timeout}
|
||||
'stop_timeout': async_stop_timeout,
|
||||
'codec_compresslevel': self.codec_compresslevel}
|
||||
)
|
||||
|
||||
# Thread will die if main thread exits
|
||||
@@ -388,7 +392,7 @@ class Producer(object):
|
||||
'Current queue size %d.' % self.queue.qsize())
|
||||
resp = []
|
||||
else:
|
||||
messages = create_message_set([(m, key) for m in msg], self.codec, key)
|
||||
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
|
||||
req = ProduceRequest(topic, partition, messages)
|
||||
try:
|
||||
resp = self.client.send_produce_request(
|
||||
|
||||
@@ -547,7 +547,7 @@ def create_message(payload, key=None):
|
||||
return Message(0, 0, key, payload)
|
||||
|
||||
|
||||
def create_gzip_message(payloads, key=None):
|
||||
def create_gzip_message(payloads, key=None, compresslevel=None):
|
||||
"""
|
||||
Construct a Gzipped Message containing multiple Messages
|
||||
|
||||
@@ -562,7 +562,7 @@ def create_gzip_message(payloads, key=None):
|
||||
message_set = KafkaProtocol._encode_message_set(
|
||||
[create_message(payload, pl_key) for payload, pl_key in payloads])
|
||||
|
||||
gzipped = gzip_encode(message_set)
|
||||
gzipped = gzip_encode(message_set, compresslevel=compresslevel)
|
||||
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
|
||||
|
||||
return Message(0, 0x00 | codec, key, gzipped)
|
||||
@@ -589,7 +589,7 @@ def create_snappy_message(payloads, key=None):
|
||||
return Message(0, 0x00 | codec, key, snapped)
|
||||
|
||||
|
||||
def create_message_set(messages, codec=CODEC_NONE, key=None):
|
||||
def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
|
||||
"""Create a message set using the given codec.
|
||||
|
||||
If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
|
||||
@@ -598,7 +598,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None):
|
||||
if codec == CODEC_NONE:
|
||||
return [create_message(m, k) for m, k in messages]
|
||||
elif codec == CODEC_GZIP:
|
||||
return [create_gzip_message(messages, key)]
|
||||
return [create_gzip_message(messages, key, compresslevel)]
|
||||
elif codec == CODEC_SNAPPY:
|
||||
return [create_snappy_message(messages, key)]
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user