Short-circuit group coordinator requests when NodeNotReady (#995)
This commit is contained in:
parent
6ef7675ba0
commit
bb709f4c14
@ -286,6 +286,10 @@ class BaseCoordinator(object):
|
|||||||
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
|
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
|
||||||
return Future().failure(e)
|
return Future().failure(e)
|
||||||
|
|
||||||
|
elif not self._client.ready(self.coordinator_id, metadata_priority=False):
|
||||||
|
e = Errors.NodeNotReadyError(self.coordinator_id)
|
||||||
|
return Future().failure(e)
|
||||||
|
|
||||||
# send a join group request to the coordinator
|
# send a join group request to the coordinator
|
||||||
log.info("(Re-)joining group %s", self.group_id)
|
log.info("(Re-)joining group %s", self.group_id)
|
||||||
request = JoinGroupRequest[0](
|
request = JoinGroupRequest[0](
|
||||||
@ -416,6 +420,13 @@ class BaseCoordinator(object):
|
|||||||
if self.coordinator_unknown():
|
if self.coordinator_unknown():
|
||||||
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
|
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
|
||||||
return Future().failure(e)
|
return Future().failure(e)
|
||||||
|
|
||||||
|
# We assume that coordinator is ready if we're sending SyncGroup
|
||||||
|
# as it typically follows a successful JoinGroup
|
||||||
|
# Also note that if client.ready() enforces a metadata priority policy,
|
||||||
|
# we can get into an infinite loop if the leader assignment process
|
||||||
|
# itself requests a metadata update
|
||||||
|
|
||||||
future = Future()
|
future = Future()
|
||||||
_f = self._client.send(self.coordinator_id, request)
|
_f = self._client.send(self.coordinator_id, request)
|
||||||
_f.add_callback(self._handle_sync_group_response, future, time.time())
|
_f.add_callback(self._handle_sync_group_response, future, time.time())
|
||||||
@ -467,6 +478,10 @@ class BaseCoordinator(object):
|
|||||||
if node_id is None:
|
if node_id is None:
|
||||||
return Future().failure(Errors.NoBrokersAvailable())
|
return Future().failure(Errors.NoBrokersAvailable())
|
||||||
|
|
||||||
|
elif not self._client.ready(node_id, metadata_priority=False):
|
||||||
|
e = Errors.NodeNotReadyError(node_id)
|
||||||
|
return Future().failure(e)
|
||||||
|
|
||||||
log.debug("Sending group coordinator request for group %s to broker %s",
|
log.debug("Sending group coordinator request for group %s to broker %s",
|
||||||
self.group_id, node_id)
|
self.group_id, node_id)
|
||||||
request = GroupCoordinatorRequest[0](self.group_id)
|
request = GroupCoordinatorRequest[0](self.group_id)
|
||||||
@ -553,6 +568,14 @@ class BaseCoordinator(object):
|
|||||||
|
|
||||||
def _send_heartbeat_request(self):
|
def _send_heartbeat_request(self):
|
||||||
"""Send a heartbeat request"""
|
"""Send a heartbeat request"""
|
||||||
|
if self.coordinator_unknown():
|
||||||
|
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
|
||||||
|
return Future().failure(e)
|
||||||
|
|
||||||
|
elif not self._client.ready(self.coordinator_id, metadata_priority=False):
|
||||||
|
e = Errors.NodeNotReadyError(self.coordinator_id)
|
||||||
|
return Future().failure(e)
|
||||||
|
|
||||||
request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
|
request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
|
||||||
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
|
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
|
||||||
future = Future()
|
future = Future()
|
||||||
|
Loading…
Reference in New Issue
Block a user