Make more methods private(ish) in AbstractCoordinator
This commit is contained in:
@@ -176,7 +176,7 @@ class AbstractCoordinator(object):
|
||||
self._client.poll()
|
||||
continue
|
||||
|
||||
future = self.send_group_metadata_request()
|
||||
future = self._send_group_metadata_request()
|
||||
self._client.poll(future=future)
|
||||
|
||||
if future.failed():
|
||||
@@ -205,7 +205,7 @@ class AbstractCoordinator(object):
|
||||
while self.need_rejoin():
|
||||
self.ensure_coordinator_known()
|
||||
|
||||
future = self.perform_group_join()
|
||||
future = self._perform_group_join()
|
||||
self._client.poll(future=future)
|
||||
|
||||
if future.succeeded():
|
||||
@@ -224,7 +224,7 @@ class AbstractCoordinator(object):
|
||||
raise exception # pylint: disable-msg=raising-bad-type
|
||||
time.sleep(self._retry_backoff_ms / 1000.0)
|
||||
|
||||
def perform_group_join(self):
|
||||
def _perform_group_join(self):
|
||||
"""Join the group and return the assignment for the next generation.
|
||||
|
||||
This function handles both JoinGroup and SyncGroup, delegating to
|
||||
@@ -269,9 +269,9 @@ class AbstractCoordinator(object):
|
||||
self.protocol = response.group_protocol
|
||||
#self.sensors.join_latency.record(response.requestLatencyMs())
|
||||
if response.leader_id == response.member_id:
|
||||
self.on_join_leader(response).chain(future)
|
||||
self._on_join_leader(response).chain(future)
|
||||
else:
|
||||
self.on_join_follower().chain(future)
|
||||
self._on_join_follower().chain(future)
|
||||
|
||||
elif error_type is Errors.GroupLoadInProgressError:
|
||||
log.debug("Attempt to join group %s rejected since coordinator is"
|
||||
@@ -308,7 +308,7 @@ class AbstractCoordinator(object):
|
||||
log.error("Unexpected error in join group response: %s", error)
|
||||
future.failure(error)
|
||||
|
||||
def on_join_follower(self):
|
||||
def _on_join_follower(self):
|
||||
# send follower's sync group with an empty assignment
|
||||
request = SyncGroupRequest(
|
||||
self.group_id,
|
||||
@@ -317,9 +317,9 @@ class AbstractCoordinator(object):
|
||||
{})
|
||||
log.debug("Issuing follower SyncGroup (%s) to coordinator %s",
|
||||
request, self.coordinator_id)
|
||||
return self.send_sync_group_request(request)
|
||||
return self._send_sync_group_request(request)
|
||||
|
||||
def on_join_leader(self, response):
|
||||
def _on_join_leader(self, response):
|
||||
"""
|
||||
Perform leader synchronization and send back the assignment
|
||||
for the group via SyncGroupRequest
|
||||
@@ -342,9 +342,9 @@ class AbstractCoordinator(object):
|
||||
|
||||
log.debug("Issuing leader SyncGroup (%s) to coordinator %s",
|
||||
request, self.coordinator_id)
|
||||
return self.send_sync_group_request(request)
|
||||
return self._send_sync_group_request(request)
|
||||
|
||||
def send_sync_group_request(self, request):
|
||||
def _send_sync_group_request(self, request):
|
||||
if self.coordinator_unknown():
|
||||
return Future().failure(Errors.GroupCoordinatorNotAvailableError())
|
||||
future = Future()
|
||||
@@ -389,7 +389,7 @@ class AbstractCoordinator(object):
|
||||
log.error("Unexpected error from SyncGroup: %s", error)
|
||||
future.failure(error)
|
||||
|
||||
def send_group_metadata_request(self):
|
||||
def _send_group_metadata_request(self):
|
||||
"""Discover the current coordinator for the group.
|
||||
|
||||
Sends a GroupMetadata request to one of the brokers. The returned future
|
||||
@@ -477,7 +477,7 @@ class AbstractCoordinator(object):
|
||||
else:
|
||||
log.error("LeaveGroup request failed: %s", error_type())
|
||||
|
||||
def send_heartbeat_request(self):
|
||||
def _send_heartbeat_request(self):
|
||||
"""Send a heartbeat request now (visible only for testing)."""
|
||||
request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
|
||||
future = Future()
|
||||
@@ -568,7 +568,7 @@ class HeartbeatTask(object):
|
||||
log.debug("Sending HeartbeatRequest")
|
||||
self._heartbeat.sent_heartbeat()
|
||||
self._request_in_flight = True
|
||||
future = self._coordinator.send_heartbeat_request()
|
||||
future = self._coordinator._send_heartbeat_request()
|
||||
future.add_callback(self._handle_heartbeat_success)
|
||||
future.add_errback(self._handle_heartbeat_failure)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user