Optionally skip auto-commit during consumer.close (#1031)
This commit is contained in:
@@ -376,13 +376,13 @@ class KafkaConsumer(six.Iterator):
|
|||||||
"""
|
"""
|
||||||
return self._subscription.assigned_partitions()
|
return self._subscription.assigned_partitions()
|
||||||
|
|
||||||
def close(self):
|
def close(self, autocommit=True):
|
||||||
"""Close the consumer, waiting indefinitely for any needed cleanup."""
|
"""Close the consumer, waiting indefinitely for any needed cleanup."""
|
||||||
if self._closed:
|
if self._closed:
|
||||||
return
|
return
|
||||||
log.debug("Closing the KafkaConsumer.")
|
log.debug("Closing the KafkaConsumer.")
|
||||||
self._closed = True
|
self._closed = True
|
||||||
self._coordinator.close()
|
self._coordinator.close(autocommit=autocommit)
|
||||||
self._metrics.close()
|
self._metrics.close()
|
||||||
self._client.close()
|
self._client.close()
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -325,8 +325,9 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
|
|
||||||
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
|
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
|
||||||
|
|
||||||
def close(self):
|
def close(self, autocommit=True):
|
||||||
try:
|
try:
|
||||||
|
if autocommit:
|
||||||
self._maybe_auto_commit_offsets_sync()
|
self._maybe_auto_commit_offsets_sync()
|
||||||
finally:
|
finally:
|
||||||
super(ConsumerCoordinator, self).close()
|
super(ConsumerCoordinator, self).close()
|
||||||
|
|||||||
Reference in New Issue
Block a user