Merge pull request #296 from ecanzonieri/validate_consumer_offset

Validate consumer offset in SimpleConsumer
This commit is contained in:
Dana Powers
2015-03-02 17:23:03 -08:00
2 changed files with 97 additions and 4 deletions

View File

@@ -8,6 +8,7 @@ import logging
import time
import six
import sys
try:
from Queue import Empty, Queue
@@ -16,7 +17,9 @@ except ImportError: # python 2
from kafka.common import (
FetchRequest, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, check_error
)
from .base import (
Consumer,
@@ -94,6 +97,10 @@ class SimpleConsumer(Consumer):
message in the iterator before exiting. None means no
timeout, so it will wait forever.
auto_offset_reset: default largest. Reset partition offsets upon
OffsetOutOfRangeError. Valid values are largest and smallest.
Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
reset one another when one is triggered. These triggers simply call the
@@ -106,7 +113,8 @@ class SimpleConsumer(Consumer):
fetch_size_bytes=FETCH_MIN_BYTES,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None):
iter_timeout=None,
auto_offset_reset='largest'):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -125,12 +133,38 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.auto_offset_reset = auto_offset_reset
self.queue = Queue()
def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
def reset_partition_offset(self, partition):
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
elif self.auto_offset_reset == 'smallest':
reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
if sys.exc_info() == (None, None, None):
raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
'valid auto_offset_reset setting '
'(largest|smallest)')
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
raise
# send_offset_request
(resp, ) = self.client.send_offset_request(reqs)
check_error(resp)
self.offsets[partition] = resp.offsets[0]
self.fetch_offsets[partition] = resp.offsets[0]
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
@@ -297,10 +331,27 @@ class SimpleConsumer(Consumer):
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
min_bytes=self.fetch_min_bytes)
min_bytes=self.fetch_min_bytes,
fail_on_error=False
)
retry_partitions = {}
for resp in responses:
try:
check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
self.client.reset_topic_metadata(resp.topic)
raise
except OffsetOutOfRangeError:
log.warning("OffsetOutOfRangeError for %s - %d. "
"Resetting partition offset...",
resp.topic, resp.partition)
self.reset_partition_offset(resp.partition)
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
partition = resp.partition
buffer_size = partitions[partition]
try:

View File

@@ -5,7 +5,7 @@ from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
from kafka.common import (
ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
@@ -85,6 +85,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@kafka_versions('all')
def test_simple_consumer_smallest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
consumer = self.consumer(auto_offset_reset='smallest')
# Move fetch offset ahead of 300 message (out of range)
consumer.seek(300, 2)
# Since auto_offset_reset is set to smallest we should read all 200
# messages from beginning.
self.assert_message_count([message for message in consumer], 200)
@kafka_versions('all')
def test_simple_consumer_largest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
# Default largest
consumer = self.consumer()
# Move fetch offset ahead of 300 message (out of range)
consumer.seek(300, 2)
# Since auto_offset_reset is set to largest we should not read any
# messages.
self.assert_message_count([message for message in consumer], 0)
# Send 200 new messages to the queue
self.send_messages(0, range(200, 300))
self.send_messages(1, range(300, 400))
# Since the offset is set to largest we should read all the new messages.
self.assert_message_count([message for message in consumer], 200)
@kafka_versions('all')
def test_simple_consumer_no_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
# Default largest
consumer = self.consumer(auto_offset_reset=None)
# Move fetch offset ahead of 300 message (out of range)
consumer.seek(300, 2)
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()
@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))