Log warning if message set appears double-compressed in KafkaConsumer
This commit is contained in:
@@ -361,6 +361,14 @@ class Fetcher(six.Iterator):
|
|||||||
# If relative offset is used, we need to decompress the entire message first to compute
|
# If relative offset is used, we need to decompress the entire message first to compute
|
||||||
# the absolute offset.
|
# the absolute offset.
|
||||||
inner_mset = msg.decompress()
|
inner_mset = msg.decompress()
|
||||||
|
|
||||||
|
# There should only ever be a single layer of compression
|
||||||
|
if inner_mset[0][-1].is_compressed():
|
||||||
|
log.warning('MessageSet at %s offset %d appears '
|
||||||
|
' double-compressed. This should not'
|
||||||
|
' happen -- check your producers!',
|
||||||
|
tp, offset)
|
||||||
|
|
||||||
if msg.magic > 0:
|
if msg.magic > 0:
|
||||||
last_offset, _, _ = inner_mset[-1]
|
last_offset, _, _ = inner_mset[-1]
|
||||||
absolute_base_offset = offset - last_offset
|
absolute_base_offset = offset - last_offset
|
||||||
|
Reference in New Issue
Block a user