Merge pull request #328 from wkiser/validate_offset

Fixes consumer/kafka and consumer/simple to only yield messages if the m...
This commit is contained in:
Dana Powers
2015-03-02 17:36:48 -08:00
2 changed files with 8 additions and 0 deletions

View File

@@ -430,6 +430,10 @@ class KafkaConsumer(object):
offset, message.key,
self._config['deserializer_class'](message.value))
if offset < self._offsets.fetch[topic_partition]:
logger.debug('Skipping message %s because its offset is less than the consumer offset',
msg)
continue
# Only increment fetch offset if we safely got the message and deserialized
self._offsets.fetch[topic_partition] = offset + 1

View File

@@ -356,6 +356,10 @@ class SimpleConsumer(Consumer):
buffer_size = partitions[partition]
try:
for message in resp.messages:
if message.offset < self.fetch_offsets[partition]:
log.debug('Skipping message %s because its offset is less than the consumer offset',
message)
continue
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1