Change Consumer commit() to return True/False and log error; dont raise client exceptions
This commit is contained in:
@@ -8,7 +8,7 @@ from threading import Lock
|
|||||||
import kafka.common
|
import kafka.common
|
||||||
from kafka.common import (
|
from kafka.common import (
|
||||||
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
|
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
|
||||||
UnknownTopicOrPartitionError, check_error
|
UnknownTopicOrPartitionError, check_error, KafkaError
|
||||||
)
|
)
|
||||||
|
|
||||||
from kafka.util import kafka_bytestring, ReentrantTimer
|
from kafka.util import kafka_bytestring, ReentrantTimer
|
||||||
@@ -114,12 +114,13 @@ class Consumer(object):
|
|||||||
self.offsets[resp.partition] = resp.offset
|
self.offsets[resp.partition] = resp.offset
|
||||||
|
|
||||||
def commit(self, partitions=None):
|
def commit(self, partitions=None):
|
||||||
"""
|
"""Commit stored offsets to Kafka via OffsetCommitRequest (v0)
|
||||||
Commit offsets for this consumer
|
|
||||||
|
|
||||||
Keyword Arguments:
|
Keyword Arguments:
|
||||||
partitions (list): list of partitions to commit, default is to commit
|
partitions (list): list of partitions to commit, default is to commit
|
||||||
all of them
|
all of them
|
||||||
|
|
||||||
|
Returns: True on success, False on failure
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# short circuit if nothing happened. This check is kept outside
|
# short circuit if nothing happened. This check is kept outside
|
||||||
@@ -135,22 +136,27 @@ class Consumer(object):
|
|||||||
|
|
||||||
reqs = []
|
reqs = []
|
||||||
if partitions is None: # commit all partitions
|
if partitions is None: # commit all partitions
|
||||||
partitions = self.offsets.keys()
|
partitions = list(self.offsets.keys())
|
||||||
|
|
||||||
|
log.info('Committing new offsets for %s, partitions %s',
|
||||||
|
self.topic, partitions)
|
||||||
for partition in partitions:
|
for partition in partitions:
|
||||||
offset = self.offsets[partition]
|
offset = self.offsets[partition]
|
||||||
log.debug("Commit offset %d in SimpleConsumer: "
|
log.debug('Commit offset %d in SimpleConsumer: '
|
||||||
"group=%s, topic=%s, partition=%s" %
|
'group=%s, topic=%s, partition=%s',
|
||||||
(offset, self.group, self.topic, partition))
|
offset, self.group, self.topic, partition)
|
||||||
|
|
||||||
reqs.append(OffsetCommitRequest(self.topic, partition,
|
reqs.append(OffsetCommitRequest(self.topic, partition,
|
||||||
offset, None))
|
offset, None))
|
||||||
|
|
||||||
resps = self.client.send_offset_commit_request(self.group, reqs)
|
try:
|
||||||
for resp in resps:
|
self.client.send_offset_commit_request(self.group, reqs)
|
||||||
kafka.common.check_error(resp)
|
except KafkaError as e:
|
||||||
|
log.error('%s saving offsets: %s', e.__class__.__name__, e)
|
||||||
|
return False
|
||||||
|
else:
|
||||||
self.count_since_commit = 0
|
self.count_since_commit = 0
|
||||||
|
return True
|
||||||
|
|
||||||
def _auto_commit(self):
|
def _auto_commit(self):
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user