Added keys to compressed messages (both gzip and snappy).
This commit is contained in:
@@ -568,7 +568,7 @@ def create_gzip_message(payloads, key=None):
|
||||
key: bytes, a key used for partition routing (optional)
|
||||
"""
|
||||
message_set = KafkaProtocol._encode_message_set(
|
||||
[create_message(payload) for payload in payloads])
|
||||
[create_message(payload, key) for payload in payloads])
|
||||
|
||||
gzipped = gzip_encode(message_set)
|
||||
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
|
||||
@@ -589,7 +589,7 @@ def create_snappy_message(payloads, key=None):
|
||||
key: bytes, a key used for partition routing (optional)
|
||||
"""
|
||||
message_set = KafkaProtocol._encode_message_set(
|
||||
[create_message(payload) for payload in payloads])
|
||||
[create_message(payload, key) for payload in payloads])
|
||||
|
||||
snapped = snappy_encode(message_set)
|
||||
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
|
||||
|
Reference in New Issue
Block a user