This commit is contained in:
@@ -105,19 +105,12 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
else:
|
else:
|
||||||
interval = self.config['auto_commit_interval_ms'] / 1000.0
|
interval = self.config['auto_commit_interval_ms'] / 1000.0
|
||||||
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
|
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
|
||||||
|
self._auto_commit_task.reschedule()
|
||||||
# When using broker-coordinated consumer groups, auto-commit will
|
|
||||||
# be automatically enabled on group join (see _on_join_complete)
|
|
||||||
# Otherwise, we should enable now b/c there will be no group join
|
|
||||||
if self.config['api_version'] < (0, 9):
|
|
||||||
self._auto_commit_task.enable()
|
|
||||||
|
|
||||||
self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
|
self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
|
||||||
self._subscription)
|
self._subscription)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
if hasattr(self, '_auto_commit_task') and self._auto_commit_task:
|
|
||||||
self._auto_commit_task.disable()
|
|
||||||
if hasattr(self, '_cluster') and self._cluster:
|
if hasattr(self, '_cluster') and self._cluster:
|
||||||
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
|
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
|
||||||
|
|
||||||
@@ -211,9 +204,9 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
# based on the received assignment
|
# based on the received assignment
|
||||||
assignor.on_assignment(assignment)
|
assignor.on_assignment(assignment)
|
||||||
|
|
||||||
# restart the autocommit task if needed
|
# reschedule the auto commit starting from now
|
||||||
if self._auto_commit_task:
|
if self._auto_commit_task:
|
||||||
self._auto_commit_task.enable()
|
self._auto_commit_task.reschedule()
|
||||||
|
|
||||||
assigned = set(self._subscription.assigned_partitions())
|
assigned = set(self._subscription.assigned_partitions())
|
||||||
log.info("Setting newly assigned partitions %s for group %s",
|
log.info("Setting newly assigned partitions %s for group %s",
|
||||||
@@ -396,10 +389,6 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
if self._auto_commit_task is None:
|
if self._auto_commit_task is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
# disable periodic commits prior to committing synchronously. note that they will
|
|
||||||
# be re-enabled after a rebalance completes
|
|
||||||
self._auto_commit_task.disable()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.commit_offsets_sync(self._subscription.all_consumed_offsets())
|
self.commit_offsets_sync(self._subscription.all_consumed_offsets())
|
||||||
|
|
||||||
@@ -672,47 +661,25 @@ class AutoCommitTask(object):
|
|||||||
self._coordinator = coordinator
|
self._coordinator = coordinator
|
||||||
self._client = coordinator._client
|
self._client = coordinator._client
|
||||||
self._interval = interval
|
self._interval = interval
|
||||||
self._enabled = False
|
|
||||||
self._request_in_flight = False
|
|
||||||
|
|
||||||
def enable(self):
|
def reschedule(self, at=None):
|
||||||
if self._enabled:
|
if at is None:
|
||||||
log.warning("AutoCommitTask is already enabled")
|
at = time.time() + self._interval
|
||||||
return
|
|
||||||
|
|
||||||
self._enabled = True
|
|
||||||
if not self._request_in_flight:
|
|
||||||
self._client.schedule(self, time.time() + self._interval)
|
|
||||||
|
|
||||||
def disable(self):
|
|
||||||
self._enabled = False
|
|
||||||
try:
|
|
||||||
self._client.unschedule(self)
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _reschedule(self, at):
|
|
||||||
assert self._enabled, 'AutoCommitTask not enabled'
|
|
||||||
self._client.schedule(self, at)
|
self._client.schedule(self, at)
|
||||||
|
|
||||||
def __call__(self):
|
def __call__(self):
|
||||||
if not self._enabled:
|
|
||||||
return
|
|
||||||
|
|
||||||
if self._coordinator.coordinator_unknown():
|
if self._coordinator.coordinator_unknown():
|
||||||
log.debug("Cannot auto-commit offsets for group %s because the"
|
log.debug("Cannot auto-commit offsets for group %s because the"
|
||||||
" coordinator is unknown", self._coordinator.group_id)
|
" coordinator is unknown", self._coordinator.group_id)
|
||||||
backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
|
backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
|
||||||
self._client.schedule(self, time.time() + backoff)
|
self.reschedule(time.time() + backoff)
|
||||||
return
|
return
|
||||||
|
|
||||||
self._request_in_flight = True
|
|
||||||
self._coordinator.commit_offsets_async(
|
self._coordinator.commit_offsets_async(
|
||||||
self._coordinator._subscription.all_consumed_offsets(),
|
self._coordinator._subscription.all_consumed_offsets(),
|
||||||
self._handle_commit_response)
|
self._handle_commit_response)
|
||||||
|
|
||||||
def _handle_commit_response(self, offsets, result):
|
def _handle_commit_response(self, offsets, result):
|
||||||
self._request_in_flight = False
|
|
||||||
if result is True:
|
if result is True:
|
||||||
log.debug("Successfully auto-committed offsets for group %s",
|
log.debug("Successfully auto-committed offsets for group %s",
|
||||||
self._coordinator.group_id)
|
self._coordinator.group_id)
|
||||||
@@ -731,10 +698,7 @@ class AutoCommitTask(object):
|
|||||||
self._coordinator.group_id, result)
|
self._coordinator.group_id, result)
|
||||||
next_at = time.time() + self._interval
|
next_at = time.time() + self._interval
|
||||||
|
|
||||||
if not self._enabled:
|
self.reschedule(next_at)
|
||||||
log.warning("Skipping auto-commit reschedule -- it is disabled")
|
|
||||||
return
|
|
||||||
self._reschedule(next_at)
|
|
||||||
|
|
||||||
|
|
||||||
class ConsumerCoordinatorMetrics(object):
|
class ConsumerCoordinatorMetrics(object):
|
||||||
|
@@ -370,10 +370,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
|
|||||||
side_effect=error)
|
side_effect=error)
|
||||||
if has_auto_commit:
|
if has_auto_commit:
|
||||||
assert coordinator._auto_commit_task is not None
|
assert coordinator._auto_commit_task is not None
|
||||||
# auto-commit enable is defered until after group join in 0.9+
|
|
||||||
if api_version >= (0, 9):
|
|
||||||
coordinator._auto_commit_task.enable()
|
|
||||||
assert coordinator._auto_commit_task._enabled is True
|
|
||||||
else:
|
else:
|
||||||
assert coordinator._auto_commit_task is None
|
assert coordinator._auto_commit_task is None
|
||||||
|
|
||||||
@@ -381,7 +377,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
|
|||||||
|
|
||||||
if has_auto_commit:
|
if has_auto_commit:
|
||||||
assert coordinator._auto_commit_task is not None
|
assert coordinator._auto_commit_task is not None
|
||||||
assert coordinator._auto_commit_task._enabled is False
|
|
||||||
|
|
||||||
assert commit_sync.call_count == (1 if commit_offsets else 0)
|
assert commit_sync.call_count == (1 if commit_offsets else 0)
|
||||||
assert mock_warn.call_count == (1 if warn else 0)
|
assert mock_warn.call_count == (1 if warn else 0)
|
||||||
|
Reference in New Issue
Block a user