Add skip_double_compressed_messages option to KafkaConsumer

This commit is contained in:
Dana Powers
2016-07-14 23:57:04 -07:00
parent ed6098c272
commit ad13500cd1
2 changed files with 20 additions and 0 deletions

View File

@@ -39,6 +39,7 @@ class Fetcher(six.Iterator):
'fetch_max_wait_ms': 500, 'fetch_max_wait_ms': 500,
'max_partition_fetch_bytes': 1048576, 'max_partition_fetch_bytes': 1048576,
'check_crcs': True, 'check_crcs': True,
'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change 'iterator_refetch_records': 1, # undocumented -- interface may change
'api_version': (0, 8, 0), 'api_version': (0, 8, 0),
} }
@@ -71,6 +72,13 @@ class Fetcher(six.Iterator):
consumed. This ensures no on-the-wire or on-disk corruption to consumed. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True be disabled in cases seeking extreme performance. Default: True
skip_double_compressed_messages (bool): A bug in KafkaProducer
caused some messages to be corrupted via double-compression.
By default, the fetcher will return the messages as a compressed
blob of bytes with a single offset, i.e. how the message was
actually published to the cluster. If you prefer to have the
fetcher automatically detect corrupt messages and skip them,
set this option to True. Default: False.
""" """
self.config = copy.copy(self.DEFAULT_CONFIG) self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config: for key in self.config:
@@ -368,6 +376,10 @@ class Fetcher(six.Iterator):
' double-compressed. This should not' ' double-compressed. This should not'
' happen -- check your producers!', ' happen -- check your producers!',
tp, offset) tp, offset)
if self.config['skip_double_compressed_messages']:
log.warning('Skipping double-compressed message at'
' %s %d', tp, offset)
continue
if msg.magic > 0: if msg.magic > 0:
last_offset, _, _ = inner_mset[-1] last_offset, _, _ = inner_mset[-1]

View File

@@ -123,6 +123,13 @@ class KafkaConsumer(six.Iterator):
consumer_timeout_ms (int): number of milliseconds to block during consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the message iteration before raising StopIteration (i.e., ending the
iterator). Default -1 (block forever). iterator). Default -1 (block forever).
skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4
caused some messages to be corrupted via double-compression.
By default, the fetcher will return these messages as a compressed
blob of bytes with a single offset, i.e. how the message was
actually published to the cluster. If you prefer to have the
fetcher automatically detect corrupt messages and skip them,
set this option to True. Default: False.
security_protocol (str): Protocol used to communicate with brokers. security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
@@ -189,6 +196,7 @@ class KafkaConsumer(six.Iterator):
'send_buffer_bytes': None, 'send_buffer_bytes': None,
'receive_buffer_bytes': None, 'receive_buffer_bytes': None,
'consumer_timeout_ms': -1, 'consumer_timeout_ms': -1,
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT', 'security_protocol': 'PLAINTEXT',
'ssl_context': None, 'ssl_context': None,
'ssl_check_hostname': True, 'ssl_check_hostname': True,