Pass auto commit parameters from KafkaConsumer to ConsumerCoordinator

This commit is contained in:
Dana Powers
2015-12-29 15:45:48 -08:00
parent ab1e1d014f
commit ac57ada2bd
2 changed files with 3 additions and 1 deletions

View File

@@ -156,6 +156,8 @@ class KafkaConsumer(object):
self._client, self._subscription, **kwargs)
self._coordinator = ConsumerCoordinator(
self._client, self._group_id, self._subscription,
enable_auto_commit=self._enable_auto_commit,
auto_commit_interval_ms=self._auto_commit_interval_ms,
assignors=self._partition_assignment_strategy,
**kwargs)
self._closed = False

View File

@@ -46,7 +46,7 @@ class ConsumerProtocol(object):
class ConsumerCoordinator(AbstractCoordinator):
"""This class manages the coordination process with the consumer coordinator."""
_enable_auto_commit = True
_auto_commit_interval_ms = 60 * 1000
_auto_commit_interval_ms = 5000
_default_offset_commit_callback = lambda offsets, error: True
_assignors = ()
#_heartbeat_interval_ms = 3000