Make SimpleConsumer auto_offset_reset more like KafkaConsumer

This commit is contained in:
Enrico Canzonieri
2015-01-26 14:40:49 -08:00
committed by Enrico Canzonieri
parent 6bc2c7aada
commit f517ddf283

View File

@@ -8,6 +8,7 @@ import logging
import time
import six
import sys
try:
from Queue import Empty, Queue
@@ -87,6 +88,9 @@ class SimpleConsumer(Consumer):
iter_timeout: default None. How much time (in seconds) to wait for a
message in the iterator before exiting. None means no
timeout, so it will wait forever.
auto_offset_reset: default largest. Reset partition offsets upon
OffsetOutOfRangeError. Valid values are largest and smallest.
If None do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -101,7 +105,7 @@ class SimpleConsumer(Consumer):
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None,
use_latest_offsets=True):
auto_offset_reset='largest'):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -120,7 +124,7 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.use_latest_offsets = use_latest_offsets
self.auto_offset_reset = auto_offset_reset
self.queue = Queue()
def __repr__(self):
@@ -130,10 +134,21 @@ class SimpleConsumer(Consumer):
def reset_partition_offset(self, partition):
LATEST = -1
EARLIEST = -2
if self.use_latest_offsets:
if self.auto_offset_reset == 'largest':
reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
else:
elif self.auto_offset_reset == 'smallest':
reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
if sys.exc_info() == (None, None, None):
raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
'valid auto_offset_reset setting '
'(largest|smallest)')
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
raise
# send_offset_request
(resp, ) = self.client.send_offset_request(reqs)