HeartbeatTask should reschedule heartbeat on coordinator_unknown()
This commit is contained in:
@@ -594,14 +594,18 @@ class HeartbeatTask(object):
|
|||||||
|
|
||||||
def __call__(self):
|
def __call__(self):
|
||||||
if (self._coordinator.generation < 0 or
|
if (self._coordinator.generation < 0 or
|
||||||
self._coordinator.need_rejoin() or
|
self._coordinator.need_rejoin()):
|
||||||
self._coordinator.coordinator_unknown()):
|
|
||||||
# no need to send the heartbeat we're not using auto-assignment
|
# no need to send the heartbeat we're not using auto-assignment
|
||||||
# or if we are awaiting a rebalance
|
# or if we are awaiting a rebalance
|
||||||
log.debug("Skipping heartbeat: no auto-assignment"
|
log.debug("Skipping heartbeat: no auto-assignment"
|
||||||
" or waiting on rebalance")
|
" or waiting on rebalance")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if self._coordinator.coordinator_unknown():
|
||||||
|
log.warning("Coordinator unknown during heartbeat -- will retry")
|
||||||
|
self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError())
|
||||||
|
return
|
||||||
|
|
||||||
if self._heartbeat.session_expired():
|
if self._heartbeat.session_expired():
|
||||||
# we haven't received a successful heartbeat in one session interval
|
# we haven't received a successful heartbeat in one session interval
|
||||||
# so mark the coordinator dead
|
# so mark the coordinator dead
|
||||||
|
|||||||
Reference in New Issue
Block a user