Change SimpleConsumer.reset_partition_offset to return offset / None on failure (dont raise exception)

This commit is contained in:
Dana Powers
2015-06-10 09:50:06 -07:00
parent 680a8dc337
commit ed42d78991

View File

@@ -27,7 +27,7 @@ from .base import (
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
FetchRequest, OffsetRequest,
FetchRequest, KafkaError, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
@@ -144,6 +144,13 @@ class SimpleConsumer(Consumer):
(self.group, self.topic, str(self.offsets.keys()))
def reset_partition_offset(self, partition):
"""Update offsets using auto_offset_reset policy (smallest|largest)
Arguments:
partition (int): the partition for which offsets should be updated
Returns: Updated offset on success, None on failure
"""
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
@@ -163,10 +170,17 @@ class SimpleConsumer(Consumer):
raise
# send_offset_request
(resp, ) = self.client.send_offset_request(reqs)
check_error(resp)
self.offsets[partition] = resp.offsets[0]
self.fetch_offsets[partition] = resp.offsets[0]
log.info('Resetting topic-partition offset to %s for %s:%d',
self.auto_offset_reset, self.topic, partition)
try:
(resp, ) = self.client.send_offset_request(reqs)
except KafkaError as e:
log.error('%s sending offset request for %s:%d',
e.__class__.__name__, self.topic, partition)
else:
self.offsets[partition] = resp.offsets[0]
self.fetch_offsets[partition] = resp.offsets[0]
return resp.offsets[0]
def provide_partition_info(self):
"""