Improve various docstrings

This commit is contained in:
Dana Powers
2015-12-29 17:01:41 -08:00
parent b7d1ed3fb4
commit 2a2733d4fc
3 changed files with 43 additions and 22 deletions

View File

@@ -151,9 +151,16 @@ class SubscriptionState(object):
self.needs_partition_assignment = False
def assign_from_subscribed(self, assignments):
"""
Change the assignment to the specified partitions returned from the coordinator,
note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs
"""Update the assignment to the specified partitions
This method is called by the coordinator to dynamically assign
partitions based on the consumer's topic subscription. This is different
from assign_from_user() which directly sets the assignment from a
user-supplied TopicPartition list.
Arguments:
assignments (list of TopicPartition): partitions to assign to this
consumer instance.
"""
if self.subscription is None:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)

View File

@@ -230,7 +230,9 @@ class AbstractCoordinator(object):
This function handles both JoinGroup and SyncGroup, delegating to
_perform_assignment() if elected leader by the coordinator.
@return Future() of the assignment returned from the group leader
Returns:
Future: resolves to the encoded-bytes assignment returned from the
group leader
"""
if self.coordinator_unknown():
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
@@ -323,6 +325,12 @@ class AbstractCoordinator(object):
"""
Perform leader synchronization and send back the assignment
for the group via SyncGroupRequest
Arguments:
response (JoinResponse): broker response to parse
Returns:
Future: resolves to member assignment encoded-bytes
"""
try:
group_assignment = self._perform_assignment(response.leader_id,
@@ -391,10 +399,8 @@ class AbstractCoordinator(object):
def _send_group_metadata_request(self):
"""Discover the current coordinator for the group.
Sends a GroupMetadata request to one of the brokers. The returned future
should be polled to get the result of the request.
@return future indicating the completion of the metadata request
Returns:
Future: resolves to the node id of the coordinator
"""
node_id = self._client.least_loaded_node()
if node_id is None or not self._client.ready(node_id):
@@ -477,7 +483,7 @@ class AbstractCoordinator(object):
log.error("LeaveGroup request failed: %s", error_type())
def _send_heartbeat_request(self):
"""Send a heartbeat request now (visible only for testing)."""
"""Send a heartbeat request"""
request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)

View File

@@ -217,9 +217,10 @@ class ConsumerCoordinator(AbstractCoordinator):
self._subscription.mark_for_reassignment()
def need_rejoin(self):
"""
Check whether the group should be rejoined (e.g. if metadata changes)
@return True if it should, False otherwise
"""Check whether the group should be rejoined
Returns:
bool: True if consumer should rejoin group, False otherwise
"""
return (self._subscription.partitions_auto_assigned() and
(super(ConsumerCoordinator, self).need_rejoin() or
@@ -236,12 +237,13 @@ class ConsumerCoordinator(AbstractCoordinator):
self._subscription.needs_fetch_committed_offsets = False
def fetch_committed_offsets(self, partitions):
"""
Fetch the current committed offsets from the coordinator for a set of
partitions.
"""Fetch the current committed offsets for specified partitions
@param partitions The partitions to fetch offsets for
@return dict of {TopicPartition: OffsetMetadata}
Arguments:
partitions (list of TopicPartition): partitions to fetch
Returns:
dict: {TopicPartition: OffsetAndMetadata}
"""
while True:
self.ensure_coordinator_known()
@@ -330,9 +332,12 @@ class ConsumerCoordinator(AbstractCoordinator):
polled in the case of a synchronous commit or ignored in the
asynchronous case.
@param offsets dict of {TopicPartition: OffsetAndMetadata} that should
be committed
@return Future indicating whether the commit was successful or not
Arguments:
offsets (dict of {TopicPartition: OffsetAndMetadata}): what should
be committed
Returns:
Future: indicating whether the commit was successful or not
"""
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
@@ -443,8 +448,11 @@ class ConsumerCoordinator(AbstractCoordinator):
This is a non-blocking call. The returned future can be polled to get
the actual offsets returned from the broker.
@param partitions list of TopicPartitions
@return Future of committed offsets dict: {TopicPartition: offset}
Arguments:
partitions (list of TopicPartition): the partitions to fetch
Returns:
Future: resolves to dict of offsets: {TopicPartition: int}
"""
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)