Merge pull request #365 from dpkp/kafka_consumer_fixup
KafkaConsumer topic/partition fixes
This commit is contained in:
@@ -194,10 +194,10 @@ class KafkaConsumer(object):
|
||||
elif isinstance(arg, tuple):
|
||||
topic = kafka_bytestring(arg[0])
|
||||
partition = arg[1]
|
||||
self._consume_topic_partition(topic, partition)
|
||||
if len(arg) == 3:
|
||||
offset = arg[2]
|
||||
self._offsets.fetch[(topic, partition)] = offset
|
||||
self._consume_topic_partition(topic, partition)
|
||||
|
||||
# { topic: partitions, ... } dict
|
||||
elif isinstance(arg, dict):
|
||||
@@ -224,7 +224,7 @@ class KafkaConsumer(object):
|
||||
topic = kafka_bytestring(key[0])
|
||||
partition = key[1]
|
||||
self._consume_topic_partition(topic, partition)
|
||||
self._offsets.fetch[key] = value
|
||||
self._offsets.fetch[(topic, partition)] = value
|
||||
|
||||
else:
|
||||
raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
|
||||
@@ -312,16 +312,16 @@ class KafkaConsumer(object):
|
||||
max_wait_time = self._config['fetch_wait_max_ms']
|
||||
min_bytes = self._config['fetch_min_bytes']
|
||||
|
||||
# Get current fetch offsets
|
||||
offsets = self._offsets.fetch
|
||||
if not offsets:
|
||||
if not self._topics:
|
||||
raise KafkaConfigurationError('No topics or partitions configured')
|
||||
if not self._topics:
|
||||
raise KafkaConfigurationError('No topics or partitions configured')
|
||||
|
||||
if not self._offsets.fetch:
|
||||
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
|
||||
|
||||
fetches = []
|
||||
for topic_partition, offset in six.iteritems(offsets):
|
||||
fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
|
||||
fetches = [FetchRequest(topic, partition,
|
||||
self._offsets.fetch[(topic, partition)],
|
||||
max_bytes)
|
||||
for (topic, partition) in self._topics]
|
||||
|
||||
# client.send_fetch_request will collect topic/partition requests by leader
|
||||
# and send each group as a single FetchRequest to the correct broker
|
||||
@@ -336,49 +336,53 @@ class KafkaConsumer(object):
|
||||
return
|
||||
|
||||
for resp in responses:
|
||||
topic_partition = (resp.topic, resp.partition)
|
||||
topic = kafka_bytestring(resp.topic)
|
||||
partition = resp.partition
|
||||
try:
|
||||
check_error(resp)
|
||||
except OffsetOutOfRangeError:
|
||||
logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
|
||||
'(Highwatermark: %d)',
|
||||
resp.topic, resp.partition,
|
||||
offsets[topic_partition], resp.highwaterMark)
|
||||
logger.warning('OffsetOutOfRange: topic %s, partition %d, '
|
||||
'offset %d (Highwatermark: %d)',
|
||||
topic, partition,
|
||||
self.offsets._fetch[(topic, partition)],
|
||||
resp.highwaterMark)
|
||||
# Reset offset
|
||||
self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
|
||||
self._offsets.fetch[(topic, partition)] = (
|
||||
self._reset_partition_offset((topic, partition))
|
||||
)
|
||||
continue
|
||||
|
||||
except NotLeaderForPartitionError:
|
||||
logger.warning("NotLeaderForPartitionError for %s - %d. "
|
||||
"Metadata may be out of date",
|
||||
resp.topic, resp.partition)
|
||||
topic, partition)
|
||||
self._refresh_metadata_on_error()
|
||||
continue
|
||||
|
||||
except RequestTimedOutError:
|
||||
logger.warning("RequestTimedOutError for %s - %d",
|
||||
resp.topic, resp.partition)
|
||||
topic, partition)
|
||||
continue
|
||||
|
||||
# Track server highwater mark
|
||||
self._offsets.highwater[topic_partition] = resp.highwaterMark
|
||||
self._offsets.highwater[(topic, partition)] = resp.highwaterMark
|
||||
|
||||
# Yield each message
|
||||
# Kafka-python could raise an exception during iteration
|
||||
# we are not catching -- user will need to address
|
||||
for (offset, message) in resp.messages:
|
||||
# deserializer_class could raise an exception here
|
||||
msg = KafkaMessage(resp.topic,
|
||||
resp.partition,
|
||||
offset, message.key,
|
||||
self._config['deserializer_class'](message.value))
|
||||
val = self._config['deserializer_class'](message.value)
|
||||
msg = KafkaMessage(topic, partition, offset, message.key, val)
|
||||
|
||||
if offset < self._offsets.fetch[topic_partition]:
|
||||
logger.debug('Skipping message %s because its offset is less than the consumer offset',
|
||||
msg)
|
||||
# in some cases the server will return earlier messages
|
||||
# than we requested. skip them per kafka spec
|
||||
if offset < self._offsets.fetch[(topic, partition)]:
|
||||
logger.debug('message offset less than fetched offset '
|
||||
'skipping: %s', msg)
|
||||
continue
|
||||
# Only increment fetch offset if we safely got the message and deserialized
|
||||
self._offsets.fetch[topic_partition] = offset + 1
|
||||
self._offsets.fetch[(topic, partition)] = offset + 1
|
||||
|
||||
# Then yield to user
|
||||
yield msg
|
||||
|
||||
Reference in New Issue
Block a user