Use kafka_bytestring when decoding message topics in KafkaConsumer.fetch_messages

This commit is contained in:
Dana Powers
2015-04-04 21:34:24 -07:00
parent 811fd4cbb9
commit c0fc334612

View File

@@ -336,49 +336,53 @@ class KafkaConsumer(object):
return
for resp in responses:
topic_partition = (resp.topic, resp.partition)
topic = kafka_bytestring(resp.topic)
partition = resp.partition
try:
check_error(resp)
except OffsetOutOfRangeError:
logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
'(Highwatermark: %d)',
resp.topic, resp.partition,
offsets[topic_partition], resp.highwaterMark)
logger.warning('OffsetOutOfRange: topic %s, partition %d, '
'offset %d (Highwatermark: %d)',
topic, partition,
self.offsets._fetch[(topic, partition)],
resp.highwaterMark)
# Reset offset
self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
self._offsets.fetch[(topic, partition)] = (
self._reset_partition_offset((topic, partition))
)
continue
except NotLeaderForPartitionError:
logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
resp.topic, resp.partition)
topic, partition)
self._refresh_metadata_on_error()
continue
except RequestTimedOutError:
logger.warning("RequestTimedOutError for %s - %d",
resp.topic, resp.partition)
topic, partition)
continue
# Track server highwater mark
self._offsets.highwater[topic_partition] = resp.highwaterMark
self._offsets.highwater[(topic, partition)] = resp.highwaterMark
# Yield each message
# Kafka-python could raise an exception during iteration
# we are not catching -- user will need to address
for (offset, message) in resp.messages:
# deserializer_class could raise an exception here
msg = KafkaMessage(resp.topic,
resp.partition,
offset, message.key,
self._config['deserializer_class'](message.value))
val = self._config['deserializer_class'](message.value)
msg = KafkaMessage(topic, partition, offset, message.key, val)
if offset < self._offsets.fetch[topic_partition]:
logger.debug('Skipping message %s because its offset is less than the consumer offset',
msg)
# in some cases the server will return earlier messages
# than we requested. skip them per kafka spec
if offset < self._offsets.fetch[(topic, partition)]:
logger.debug('message offset less than fetched offset '
'skipping: %s', msg)
continue
# Only increment fetch offset if we safely got the message and deserialized
self._offsets.fetch[topic_partition] = offset + 1
self._offsets.fetch[(topic, partition)] = offset + 1
# Then yield to user
yield msg