Correct message keys for async batching mode
This commit is contained in:
@@ -559,7 +559,7 @@ def create_gzip_message(payloads, key=None):
|
||||
|
||||
"""
|
||||
message_set = KafkaProtocol._encode_message_set(
|
||||
[create_message(payload, key) for payload in payloads])
|
||||
[create_message(payload, pl_key) for payload, pl_key in payloads])
|
||||
|
||||
gzipped = gzip_encode(message_set)
|
||||
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
|
||||
@@ -580,7 +580,7 @@ def create_snappy_message(payloads, key=None):
|
||||
|
||||
"""
|
||||
message_set = KafkaProtocol._encode_message_set(
|
||||
[create_message(payload, key) for payload in payloads])
|
||||
[create_message(payload, pl_key) for payload, pl_key in payloads])
|
||||
|
||||
snapped = snappy_encode(message_set)
|
||||
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
|
||||
@@ -595,7 +595,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None):
|
||||
return a list containing a single codec-encoded message.
|
||||
"""
|
||||
if codec == CODEC_NONE:
|
||||
return [create_message(m, key) for m in messages]
|
||||
return [create_message(m, k) for m, k in messages]
|
||||
elif codec == CODEC_GZIP:
|
||||
return [create_gzip_message(messages, key)]
|
||||
elif codec == CODEC_SNAPPY:
|
||||
|
||||
Reference in New Issue
Block a user