Improve auto-commit task handling when group_id is None
This commit is contained in:
@@ -91,8 +91,10 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
log.warning('Broker version (%s) does not support offset'
|
log.warning('Broker version (%s) does not support offset'
|
||||||
' commits; disabling auto-commit.',
|
' commits; disabling auto-commit.',
|
||||||
self.config['api_version'])
|
self.config['api_version'])
|
||||||
|
self.config['enable_auto_commit'] = False
|
||||||
elif self.config['group_id'] is None:
|
elif self.config['group_id'] is None:
|
||||||
log.warning('group_id is None: disabling auto-commit.')
|
log.warning('group_id is None: disabling auto-commit.')
|
||||||
|
self.config['enable_auto_commit'] = False
|
||||||
else:
|
else:
|
||||||
interval = self.config['auto_commit_interval_ms'] / 1000.0
|
interval = self.config['auto_commit_interval_ms'] / 1000.0
|
||||||
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
|
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
|
||||||
@@ -192,7 +194,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
assignor.on_assignment(assignment)
|
assignor.on_assignment(assignment)
|
||||||
|
|
||||||
# restart the autocommit task if needed
|
# restart the autocommit task if needed
|
||||||
if self.config['enable_auto_commit']:
|
if self._auto_commit_task:
|
||||||
self._auto_commit_task.enable()
|
self._auto_commit_task.enable()
|
||||||
|
|
||||||
assigned = set(self._subscription.assigned_partitions())
|
assigned = set(self._subscription.assigned_partitions())
|
||||||
@@ -364,27 +366,27 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
|
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
|
||||||
|
|
||||||
def _maybe_auto_commit_offsets_sync(self):
|
def _maybe_auto_commit_offsets_sync(self):
|
||||||
if self.config['api_version'] < (0, 8, 1):
|
if self._auto_commit_task is None:
|
||||||
return
|
return
|
||||||
if self.config['enable_auto_commit']:
|
|
||||||
# disable periodic commits prior to committing synchronously. note that they will
|
|
||||||
# be re-enabled after a rebalance completes
|
|
||||||
self._auto_commit_task.disable()
|
|
||||||
|
|
||||||
try:
|
# disable periodic commits prior to committing synchronously. note that they will
|
||||||
self.commit_offsets_sync(self._subscription.all_consumed_offsets())
|
# be re-enabled after a rebalance completes
|
||||||
|
self._auto_commit_task.disable()
|
||||||
|
|
||||||
# The three main group membership errors are known and should not
|
try:
|
||||||
# require a stacktrace -- just a warning
|
self.commit_offsets_sync(self._subscription.all_consumed_offsets())
|
||||||
except (Errors.UnknownMemberIdError,
|
|
||||||
Errors.IllegalGenerationError,
|
# The three main group membership errors are known and should not
|
||||||
Errors.RebalanceInProgressError):
|
# require a stacktrace -- just a warning
|
||||||
log.warning("Offset commit failed: group membership out of date"
|
except (Errors.UnknownMemberIdError,
|
||||||
" This is likely to cause duplicate message"
|
Errors.IllegalGenerationError,
|
||||||
" delivery.")
|
Errors.RebalanceInProgressError):
|
||||||
except Exception:
|
log.warning("Offset commit failed: group membership out of date"
|
||||||
log.exception("Offset commit failed: This is likely to cause"
|
" This is likely to cause duplicate message"
|
||||||
" duplicate message delivery")
|
" delivery.")
|
||||||
|
except Exception:
|
||||||
|
log.exception("Offset commit failed: This is likely to cause"
|
||||||
|
" duplicate message delivery")
|
||||||
|
|
||||||
def _send_offset_commit_request(self, offsets):
|
def _send_offset_commit_request(self, offsets):
|
||||||
"""Commit offsets for the specified list of topics and partitions.
|
"""Commit offsets for the specified list of topics and partitions.
|
||||||
|
|||||||
Reference in New Issue
Block a user