Merge pull request #398 from dpkp/kafka_consumer_failed_payloads
Kafka consumer failed payloads
This commit is contained in:
@@ -120,7 +120,10 @@ class KafkaConsumer(object):
|
||||
|
||||
if self._config['auto_commit_enable']:
|
||||
if not self._config['group_id']:
|
||||
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
|
||||
raise KafkaConfigurationError(
|
||||
'KafkaConsumer configured to auto-commit '
|
||||
'without required consumer group (group_id)'
|
||||
)
|
||||
|
||||
# Check auto-commit configuration
|
||||
if self._config['auto_commit_enable']:
|
||||
@@ -128,12 +131,15 @@ class KafkaConsumer(object):
|
||||
self._reset_auto_commit()
|
||||
|
||||
if not self._config['bootstrap_servers']:
|
||||
raise KafkaConfigurationError('bootstrap_servers required to '
|
||||
'configure KafkaConsumer')
|
||||
raise KafkaConfigurationError(
|
||||
'bootstrap_servers required to configure KafkaConsumer'
|
||||
)
|
||||
|
||||
self._client = KafkaClient(self._config['bootstrap_servers'],
|
||||
client_id=self._config['client_id'],
|
||||
timeout=(self._config['socket_timeout_ms'] / 1000.0))
|
||||
self._client = KafkaClient(
|
||||
self._config['bootstrap_servers'],
|
||||
client_id=self._config['client_id'],
|
||||
timeout=(self._config['socket_timeout_ms'] / 1000.0)
|
||||
)
|
||||
|
||||
def set_topic_partitions(self, *topics):
|
||||
"""
|
||||
@@ -163,12 +169,12 @@ class KafkaConsumer(object):
|
||||
# Consume topic1-all; topic2-partition2; topic3-partition0
|
||||
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
|
||||
|
||||
# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
|
||||
# Consume topic1-0 starting at offset 12, and topic2-1 at offset 45
|
||||
# using tuples --
|
||||
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
|
||||
kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45))
|
||||
|
||||
# using dict --
|
||||
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
|
||||
kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
|
||||
|
||||
"""
|
||||
self._topics = []
|
||||
@@ -216,8 +222,10 @@ class KafkaConsumer(object):
|
||||
for partition in value:
|
||||
self._consume_topic_partition(topic, partition)
|
||||
else:
|
||||
raise KafkaConfigurationError('Unknown topic type (dict key must be '
|
||||
'int or list/tuple of ints)')
|
||||
raise KafkaConfigurationError(
|
||||
'Unknown topic type '
|
||||
'(dict key must be int or list/tuple of ints)'
|
||||
)
|
||||
|
||||
# (topic, partition): offset
|
||||
elif isinstance(key, tuple):
|
||||
@@ -316,26 +324,30 @@ class KafkaConsumer(object):
|
||||
raise KafkaConfigurationError('No topics or partitions configured')
|
||||
|
||||
if not self._offsets.fetch:
|
||||
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
|
||||
raise KafkaConfigurationError(
|
||||
'No fetch offsets found when calling fetch_messages'
|
||||
)
|
||||
|
||||
fetches = [FetchRequest(topic, partition,
|
||||
self._offsets.fetch[(topic, partition)],
|
||||
max_bytes)
|
||||
for (topic, partition) in self._topics]
|
||||
|
||||
# client.send_fetch_request will collect topic/partition requests by leader
|
||||
# and send each group as a single FetchRequest to the correct broker
|
||||
try:
|
||||
responses = self._client.send_fetch_request(fetches,
|
||||
max_wait_time=max_wait_time,
|
||||
min_bytes=min_bytes,
|
||||
fail_on_error=False)
|
||||
except FailedPayloadsError:
|
||||
logger.warning('FailedPayloadsError attempting to fetch data from kafka')
|
||||
self._refresh_metadata_on_error()
|
||||
return
|
||||
# send_fetch_request will batch topic/partition requests by leader
|
||||
responses = self._client.send_fetch_request(
|
||||
fetches,
|
||||
max_wait_time=max_wait_time,
|
||||
min_bytes=min_bytes,
|
||||
fail_on_error=False
|
||||
)
|
||||
|
||||
for resp in responses:
|
||||
|
||||
if isinstance(resp, FailedPayloadsError):
|
||||
logger.warning('FailedPayloadsError attempting to fetch data')
|
||||
self._refresh_metadata_on_error()
|
||||
continue
|
||||
|
||||
topic = kafka_bytestring(resp.topic)
|
||||
partition = resp.partition
|
||||
try:
|
||||
@@ -381,7 +393,8 @@ class KafkaConsumer(object):
|
||||
logger.debug('message offset less than fetched offset '
|
||||
'skipping: %s', msg)
|
||||
continue
|
||||
# Only increment fetch offset if we safely got the message and deserialized
|
||||
# Only increment fetch offset
|
||||
# if we safely got the message and deserialized
|
||||
self._offsets.fetch[(topic, partition)] = offset + 1
|
||||
|
||||
# Then yield to user
|
||||
@@ -394,10 +407,12 @@ class KafkaConsumer(object):
|
||||
topic (str): topic for offset request
|
||||
partition (int): partition for offset request
|
||||
request_time_ms (int): Used to ask for all messages before a
|
||||
certain time (ms). There are two special values. Specify -1 to receive the latest
|
||||
offset (i.e. the offset of the next coming message) and -2 to receive the earliest
|
||||
available offset. Note that because offsets are pulled in descending order, asking for
|
||||
the earliest offset will always return you a single element.
|
||||
certain time (ms). There are two special values.
|
||||
Specify -1 to receive the latest offset (i.e. the offset of the
|
||||
next coming message) and -2 to receive the earliest available
|
||||
offset. Note that because offsets are pulled in descending
|
||||
order, asking for the earliest offset will always return you a
|
||||
single element.
|
||||
max_num_offsets (int): Maximum offsets to include in the OffsetResponse
|
||||
|
||||
Returns:
|
||||
@@ -497,7 +512,10 @@ class KafkaConsumer(object):
|
||||
"""
|
||||
if not self._config['group_id']:
|
||||
logger.warning('Cannot commit without a group_id!')
|
||||
raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)')
|
||||
raise KafkaConfigurationError(
|
||||
'Attempted to commit offsets '
|
||||
'without a configured consumer group (group_id)'
|
||||
)
|
||||
|
||||
# API supports storing metadata with each commit
|
||||
# but for now it is unused
|
||||
@@ -521,13 +539,17 @@ class KafkaConsumer(object):
|
||||
if commit_offset == self._offsets.commit[topic_partition]:
|
||||
continue
|
||||
|
||||
commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
|
||||
commits.append(
|
||||
OffsetCommitRequest(topic_partition[0], topic_partition[1],
|
||||
commit_offset, metadata)
|
||||
)
|
||||
|
||||
if commits:
|
||||
logger.info('committing consumer offsets to group %s', self._config['group_id'])
|
||||
resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']),
|
||||
commits,
|
||||
fail_on_error=False)
|
||||
resps = self._client.send_offset_commit_request(
|
||||
kafka_bytestring(self._config['group_id']), commits,
|
||||
fail_on_error=False
|
||||
)
|
||||
|
||||
for r in resps:
|
||||
check_error(r)
|
||||
@@ -724,9 +746,11 @@ class KafkaConsumer(object):
|
||||
#
|
||||
|
||||
def __repr__(self):
|
||||
return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
|
||||
for topic_partition in
|
||||
self._topics])
|
||||
return '<{0} topics=({1})>'.format(
|
||||
self.__class__.__name__,
|
||||
'|'.join(["%s-%d" % topic_partition
|
||||
for topic_partition in self._topics])
|
||||
)
|
||||
|
||||
#
|
||||
# other private methods
|
||||
|
Reference in New Issue
Block a user