From ed42d7899117e4bba8ef47afe825c13185cbdfc7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2015 09:50:06 -0700 Subject: [PATCH] Change SimpleConsumer.reset_partition_offset to return offset / None on failure (dont raise exception) --- kafka/consumer/simple.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index e4233ff..c75e78b 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -27,7 +27,7 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, OffsetRequest, + FetchRequest, KafkaError, OffsetRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error @@ -144,6 +144,13 @@ class SimpleConsumer(Consumer): (self.group, self.topic, str(self.offsets.keys())) def reset_partition_offset(self, partition): + """Update offsets using auto_offset_reset policy (smallest|largest) + + Arguments: + partition (int): the partition for which offsets should be updated + + Returns: Updated offset on success, None on failure + """ LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': @@ -163,10 +170,17 @@ class SimpleConsumer(Consumer): raise # send_offset_request - (resp, ) = self.client.send_offset_request(reqs) - check_error(resp) - self.offsets[partition] = resp.offsets[0] - self.fetch_offsets[partition] = resp.offsets[0] + log.info('Resetting topic-partition offset to %s for %s:%d', + self.auto_offset_reset, self.topic, partition) + try: + (resp, ) = self.client.send_offset_request(reqs) + except KafkaError as e: + log.error('%s sending offset request for %s:%d', + e.__class__.__name__, self.topic, partition) + else: + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + return resp.offsets[0] def provide_partition_info(self): """