More Docstring Improvements

This commit is contained in:
Dana Powers
2015-12-30 12:51:34 -08:00
parent 1dd9e8bb05
commit e093ffefae
7 changed files with 429 additions and 150 deletions

View File

@@ -40,6 +40,33 @@ class KafkaClient(object):
}
def __init__(self, **configs):
"""Initialize an asynchronous kafka client
Keyword Arguments:
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
strings) that the consumer should contact to bootstrap initial
cluster metadata. This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request. Default port is 9092. If no servers are
specified, will default to localhost:9092.
client_id (str): a name for this client. This string is passed in
each request to servers and can be used to identify specific
server-side log entries that correspond to this client. Also
submitted to GroupCoordinator for logging with respect to
consumer group administration. Default: 'kafka-python-{version}'
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: 131072
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: 32768
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -128,12 +155,13 @@ class KafkaClient(object):
return state
def ready(self, node_id):
"""
Begin connecting to the given node, return true if we are already
connected and ready to send to that node.
"""Check whether a node is connected and ok to send more requests.
@param node_id The id of the node to check
@return True if we are ready to send to the given node
Arguments:
node_id (int): the id of the node to check
Returns:
bool: True if we are ready to send to the given node
"""
if self.is_ready(node_id):
return True
@@ -151,7 +179,8 @@ class KafkaClient(object):
def close(self, node_id=None):
"""Closes the connection to a particular node (if there is one).
@param node_id The id of the node
Arguments:
node_id (int): the id of the node to close
"""
if node_id is None:
for conn in self._conns.values():
@@ -163,27 +192,34 @@ class KafkaClient(object):
return
def is_disconnected(self, node_id):
"""Check whether the node connection has been disconnected failed.
A disconnected node has either been closed or has failed. Connection
failures are usually transient and can be resumed in the next ready()
call, but there are cases where transient failures need to be caught
and re-acted upon.
"""
Check if the connection of the node has failed, based on the connection
state. Such connection failures are usually transient and can be resumed
in the next ready(node) call, but there are cases where transient
failures need to be caught and re-acted upon.
Arguments:
node_id (int): the id of the node to check
@param node_id the id of the node to check
@return true iff the connection has failed and the node is disconnected
Returns:
bool: True iff the node exists and is disconnected
"""
if node_id not in self._conns:
return False
return self._conns[node_id].state is ConnectionStates.DISCONNECTED
def is_ready(self, node_id):
"""
Check if the node with the given id is ready to send more requests.
"""Check whether a node is ready to send more requests.
@param node_id The id of the node
@return true if the node is ready
In addition to connection-level checks, this method also is used to
block additional requests from being sent during a metadata refresh.
Arguments:
node_id (int): id of the node to check
Returns:
bool: True if the node is ready and metadata is not refreshing
"""
# if we need to update our metadata now declare all requests unready to
# make metadata requests first priority
@@ -199,12 +235,17 @@ class KafkaClient(object):
return conn.connected() and conn.can_send_more()
def send(self, node_id, request):
"""
Send the given request. Requests can only be sent out to ready nodes.
"""Send a request to a specific node.
@param node destination node
@param request The request
@param now The current timestamp
Arguments:
node_id (int): destination node
request (Struct): request object (not-encoded)
Raises:
IllegalStateError: if node_id is not ready
Returns:
Future: resolves to Response struct
"""
if not self._can_send_request(node_id):
raise Errors.IllegalStateError("Attempt to send a request to node %s which is not ready." % node_id)
@@ -217,15 +258,20 @@ class KafkaClient(object):
return self._conns[node_id].send(request, expect_response=expect_response)
def poll(self, timeout_ms=None, future=None):
"""Do actual reads and writes to sockets.
"""Try to read and write to sockets.
@param timeout_ms The maximum amount of time to wait (in ms) for
responses if there are none available immediately.
Must be non-negative. The actual timeout will be the
minimum of timeout, request timeout and metadata
timeout. If unspecified, default to request_timeout_ms
@param future Optionally block until the provided future completes.
@return The list of responses received.
This method will also attempt to complete node connections, refresh
stale metadata, and run previously-scheduled tasks.
Arguments:
timeout_ms (int, optional): maximum amount of time to wait (in ms)
for at least one response. Must be non-negative. The actual
timeout will be the minimum of timeout, request timeout and
metadata timeout. Default: request_timeout_ms
future (Future, optional): if provided, blocks until future.is_done
Returns:
list: responses received (can be empty)
"""
if timeout_ms is None:
timeout_ms = self.config['request_timeout_ms']
@@ -283,7 +329,15 @@ class KafkaClient(object):
return responses
def in_flight_request_count(self, node_id=None):
"""Get the number of in-flight requests"""
"""Get the number of in-flight requests for a node or all nodes.
Arguments:
node_id (int, optional): a specific node to check. If unspecified,
return the total for all nodes
Returns:
int: pending in-flight requests for the node, or all nodes if None
"""
if node_id is not None:
if node_id not in self._conns:
return 0
@@ -292,16 +346,17 @@ class KafkaClient(object):
return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
def least_loaded_node(self):
"""
Choose the node with the fewest outstanding requests which is at least
eligible for connection. This method will prefer a node with an
existing connection, but will potentially choose a node for which we
don't yet have a connection if all existing connections are in use.
This method will never choose a node for which there is no existing
connection and from which we have disconnected within the reconnect
backoff period.
"""Choose the node with fewest outstanding requests, with fallbacks.
@return The node_id with the fewest in-flight requests.
This method will prefer a node with an existing connection, but will
potentially choose a node for which we don't yet have a connection if
all existing connections are in use. This method will never choose a
node that was disconnected within the reconnect backoff period.
If all else fails, the method will attempt to bootstrap again using the
bootstrap_servers list.
Returns:
node_id or None if no suitable node was found
"""
nodes = list(self._conns.keys())
random.shuffle(nodes)
@@ -339,10 +394,13 @@ class KafkaClient(object):
return None
def set_topics(self, topics):
"""
Set specific topics to track for metadata
"""Set specific topics to track for metadata.
Returns a future that will complete after metadata request/response
Arguments:
topics (list of str): topics to check for metadata
Returns:
Future: resolves after metadata request/response
"""
if set(topics).difference(self._topics):
future = self.cluster.request_update()
@@ -353,7 +411,11 @@ class KafkaClient(object):
# request metadata update on disconnect and timedout
def _maybe_refresh_metadata(self):
"""Send a metadata request if needed"""
"""Send a metadata request if needed.
Returns:
int: milliseconds until next refresh
"""
ttl = self.cluster.ttl()
if ttl > 0:
return ttl
@@ -383,26 +445,30 @@ class KafkaClient(object):
return 0
def schedule(self, task, at):
"""
Schedule a new task to be executed at the given time.
"""Schedule a new task to be executed at the given time.
This is "best-effort" scheduling and should only be used for coarse
synchronization. A task cannot be scheduled for multiple times
simultaneously; any previously scheduled instance of the same task
will be cancelled.
@param task The task to be scheduled -- function or implement __call__
@param at Epoch seconds when it should run (see time.time())
@returns Future
Arguments:
task (callable): task to be scheduled
at (float or int): epoch seconds when task should run
Returns:
Future: resolves to result of task call, or exception if raised
"""
return self._delayed_tasks.add(task, at)
def unschedule(self, task):
"""
Unschedule a task. This will remove all instances of the task from the task queue.
"""Unschedule a task.
This will remove all instances of the task from the task queue.
This is a no-op if the task is not scheduled.
@param task The task to be unscheduled.
Arguments:
task (callable): task to be unscheduled
"""
self._delayed_tasks.remove(task)
@@ -415,10 +481,14 @@ class DelayedTaskQueue(object):
self._counter = itertools.count() # unique sequence count
def add(self, task, at):
"""Add a task to run at a later time
"""Add a task to run at a later time.
task: anything
at: seconds from epoch to schedule task (see time.time())
Arguments:
task: can be anything, but generally a callable
at (float or int): epoch seconds to schedule task
Returns:
Future: a future that will be returned with the task when ready
"""
if task in self._task_map:
self.remove(task)
@@ -430,9 +500,10 @@ class DelayedTaskQueue(object):
return future
def remove(self, task):
"""Remove a previously scheduled task
"""Remove a previously scheduled task.
Raises KeyError if task is not found
Raises:
KeyError: if task is not found
"""
entry = self._task_map.pop(task)
task, future = entry[-1]
@@ -456,7 +527,7 @@ class DelayedTaskQueue(object):
return (task, future)
def next_at(self):
"""Number of seconds until next task is ready"""
"""Number of seconds until next task is ready."""
self._drop_removed()
if not self._tasks:
return sys.maxint

View File

@@ -39,6 +39,33 @@ class Fetcher(object):
}
def __init__(self, client, subscriptions, **configs):
"""Initialize a Kafka Message Fetcher.
Keyword Arguments:
key_deserializer (callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (callable, optional): Any callable that takes a
raw message value and returns a deserialized value.
fetch_min_bytes (int): Minimum amount of data the server should
return for a fetch request, otherwise wait up to
fetch_max_wait_ms for more data to accumulate. Default: 1024.
fetch_max_wait_ms (int): The maximum amount of time in milliseconds
the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500.
max_partition_fetch_bytes (int): The maximum amount of data
per-partition the server will return. The maximum total memory
used for a request = #partitions * max_partition_fetch_bytes.
This size must be at least as large as the maximum message size
the server allows or else it is possible for the producer to
send messages larger than the consumer can fetch. If that
happens, the consumer can get stuck trying to fetch a large
message on a certain partition. Default: 1048576.
check_crcs (bool): Automatically check the CRC32 of the records
consumed. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
"""
#metrics=None,
#metric_group_prefix='consumer',
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -56,7 +83,11 @@ class Fetcher(object):
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions"""
"""Send FetchRequests asynchronously for all assigned partitions.
Returns:
List of Futures: each future resolves to a FetchResponse
"""
futures = []
for node_id, request in six.iteritems(self._create_fetch_requests()):
if self._client.ready(node_id):
@@ -70,8 +101,11 @@ class Fetcher(object):
def update_fetch_positions(self, partitions):
"""Update the fetch positions for the provided partitions.
@param partitions: iterable of TopicPartitions
@raises NoOffsetForPartitionError If no offset is stored for a given
Arguments:
partitions (list of TopicPartitions): partitions to update
Raises:
NoOffsetForPartitionError: if no offset is stored for a given
partition and no reset policy is available
"""
# reset the fetch position to the committed position
@@ -104,8 +138,11 @@ class Fetcher(object):
def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.
@param partition The given partition that needs reset offset
@raises NoOffsetForPartitionError If no offset reset strategy is defined
Arguments:
partition (TopicPartition): the partition that needs reset offset
Raises:
NoOffsetForPartitionError: if no offset reset strategy is defined
"""
timestamp = self._subscriptions.assignment[partition].reset_strategy
if timestamp is OffsetResetStrategy.EARLIEST:
@@ -129,11 +166,14 @@ class Fetcher(object):
Blocks until offset is obtained, or a non-retriable exception is raised
@param partition The partition that needs fetching offset.
@param timestamp The timestamp for fetching offset.
@raises exceptions
@return The offset of the message that is published before the given
timestamp
Arguments:
partition The partition that needs fetching offset.
timestamp (int): timestamp for fetching offset. -1 for the latest
available, -2 for the earliest available. Otherwise timestamp
is treated as epoch seconds.
Returns:
int: message offset
"""
while True:
future = self._send_offset_request(partition, timestamp)
@@ -150,10 +190,12 @@ class Fetcher(object):
self._client.poll(future=refresh_future)
def _raise_if_offset_out_of_range(self):
"""
If any partition from previous FetchResponse contains
OffsetOutOfRangeError and the default_reset_policy is None,
raise OffsetOutOfRangeError
"""Check FetchResponses for offset out of range.
Raises:
OffsetOutOfRangeError: if any partition from previous FetchResponse
contains OffsetOutOfRangeError and the default_reset_policy is
None
"""
current_out_of_range_partitions = {}
@@ -174,11 +216,10 @@ class Fetcher(object):
raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions)
def _raise_if_unauthorized_topics(self):
"""
If any topic from previous FetchResponse contains an Authorization
error, raise an exception
"""Check FetchResponses for topic authorization failures.
@raise TopicAuthorizationFailedError
Raises:
TopicAuthorizationFailedError
"""
if self._unauthorized_topics:
topics = set(self._unauthorized_topics)
@@ -186,12 +227,10 @@ class Fetcher(object):
raise Errors.TopicAuthorizationFailedError(topics)
def _raise_if_record_too_large(self):
"""
If any partition from previous FetchResponse gets a RecordTooLarge
error, raise RecordTooLargeError
"""Check FetchResponses for messages larger than the max per partition.
@raise RecordTooLargeError If there is a message larger than fetch size
and hence cannot be ever returned
Raises:
RecordTooLargeError: if there is a message larger than fetch size
"""
copied_record_too_large_partitions = dict(self._record_too_large_partitions)
self._record_too_large_partitions.clear()
@@ -207,12 +246,21 @@ class Fetcher(object):
self.config['max_partition_fetch_bytes'])
def fetched_records(self):
"""Returns previously fetched records and updates consumed offsets
"""Returns previously fetched records and updates consumed offsets.
NOTE: returning empty records guarantees the consumed position are NOT updated.
@return {TopicPartition: deque([messages])}
@raises OffsetOutOfRangeError if no subscription offset_reset_strategy
Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
InvalidMessageError: if message crc validation fails (check_crcs
must be set to True)
RecordTooLargeError: if a message is larger than the currently
configured max_partition_fetch_bytes
TopicAuthorizationError: if consumer is not authorized to fetch
messages from the topic
Returns:
dict: {TopicPartition: deque([messages])}
"""
if self._subscriptions.needs_partition_assignment:
return {}
@@ -280,12 +328,14 @@ class Fetcher(object):
return key, value
def _send_offset_request(self, partition, timestamp):
"""
Fetch a single offset before the given timestamp for the partition.
"""Fetch a single offset before the given timestamp for the partition.
@param partition The TopicPartition that needs fetching offset.
@param timestamp The timestamp for fetching offset.
@return A future which can be polled to obtain the corresponding offset.
Arguments:
partition (TopicPartition): partition that needs fetching offset
timestamp (int): timestamp for fetching offset
Returns:
Future: resolves to the corresponding offset
"""
node_id = self._client.cluster.leader_for_partition(partition)
if node_id is None:
@@ -315,11 +365,13 @@ class Fetcher(object):
def _handle_offset_response(self, partition, future, response):
"""Callback for the response of the list offset call above.
@param partition The partition that was fetched
@param future the future to update based on response
@param response The OffsetResponse from the server
Arguments:
partition (TopicPartition): The partition that was fetched
future (Future): the future to update based on response
response (OffsetResponse): response from the server
@raises IllegalStateError if response does not match partition
Raises:
IllegalStateError: if response does not match partition
"""
topic, partition_info = response.topics[0]
if len(response.topics) != 1 or len(partition_info) != 1:
@@ -351,10 +403,13 @@ class Fetcher(object):
future.failure(error_type(partition))
def _create_fetch_requests(self):
"""
Create fetch requests for all assigned partitions, grouped by node
Except where no leader, node has requests in flight, or we have
not returned all previously fetched records to consumer
"""Create fetch requests for all assigned partitions, grouped by node.
FetchRequests skipped if no leader, node has requests in flight, or we
have not returned all previously fetched records to consumer
Returns:
dict: {node_id: [FetchRequest,...]}
"""
# create the fetch info as a dict of lists of partition info tuples
# which can be passed to FetchRequest() via .items()

View File

@@ -114,6 +114,10 @@ class KafkaConsumer(object):
periodically committed in the background. Default: True.
auto_commit_interval_ms (int): milliseconds between automatic
offset commits, if enable_auto_commit is True. Default: 5000.
default_offset_commit_callback (callable): called as
callback(offsets, response) response will be either an Exception
or a OffsetCommitResponse struct. This callback can be used to
trigger custom actions when a commit request completes.
check_crcs (bool): Automatically check the CRC32 of the records
consumed. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may
@@ -438,13 +442,17 @@ class KafkaConsumer(object):
self._subscription.resume(partition)
def seek(self, partition, offset):
"""Manually specify the fetch offset for a TopicPartition
"""Manually specify the fetch offset for a TopicPartition.
Overrides the fetch offsets that the consumer will use on the next
poll(). If this API is invoked for the same partition more than once,
the latest offset will be used on the next poll(). Note that you may
lose data if this API is arbitrarily used in the middle of consumption,
to reset the fetch offsets.
Arguments:
partition (TopicPartition): partition for seek operation
offset (int): message offset in partition
"""
if offset < 0:
raise Errors.IllegalStateError("seek offset must not be a negative number")

View File

@@ -42,10 +42,10 @@ class SubscriptionState(object):
def __init__(self, offset_reset_strategy='earliest'):
"""Initialize a SubscriptionState instance
offset_reset_strategy: 'earliest' or 'latest', otherwise
exception will be raised when fetching an offset
that is no longer available.
Defaults to earliest.
Keyword Arguments:
offset_reset_strategy: 'earliest' or 'latest', otherwise
exception will be raised when fetching an offset that is no
longer available. Default: 'earliest'
"""
try:
offset_reset_strategy = getattr(OffsetResetStrategy,
@@ -67,14 +67,39 @@ class SubscriptionState(object):
self.needs_fetch_committed_offsets = True
def subscribe(self, topics=(), pattern=None, listener=None):
"""Subscribe to a list of topics, or a topic regex pattern
"""Subscribe to a list of topics, or a topic regex pattern.
Partitions will be assigned via a group coordinator
(incompatible with assign_from_user)
Partitions will be dynamically assigned via a group coordinator.
Topic subscriptions are not incremental: this list will replace the
current assignment (if there is one).
Optionally include listener callback, which must be a
ConsumerRebalanceListener and will be called before and
after each rebalance operation.
This method is incompatible with assign_from_user()
Arguments:
topics (list): List of topics for subscription.
pattern (str): Pattern to match available topics. You must provide
either topics or pattern, but not both.
listener (ConsumerRebalanceListener): Optionally include listener
callback, which will be called before and after each rebalance
operation.
As part of group management, the consumer will keep track of the
list of consumers that belong to a particular group and will
trigger a rebalance operation if one of the following events
trigger:
* Number of partitions change for any of the subscribed topics
* Topic is created or deleted
* An existing member of the consumer group dies
* A new member is added to the consumer group
When any of these events are triggered, the provided listener
will be invoked first to indicate that the consumer's assignment
has been revoked, and then again when the new assignment has
been received. Note that this listener will immediately override
any listener set in a previous call to subscribe. It is
guaranteed, however, that the partitions revoked/assigned
through this interface are from topics subscribed in this call.
"""
if self._user_assignment or (topics and pattern):
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
@@ -93,6 +118,14 @@ class SubscriptionState(object):
self.listener = listener
def change_subscription(self, topics):
"""Change the topic subscription.
Arguments:
topics (list of str): topics for subscription
Raises:
IllegalStateErrror: if assign_from_user has been used already
"""
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
@@ -117,7 +150,8 @@ class SubscriptionState(object):
This is used by the group leader to ensure that it receives metadata
updates for all topics that any member of the group is subscribed to.
@param topics list of topics to add to the group subscription
Arguments:
topics (list of str): topics to add to the group subscription
"""
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
@@ -128,12 +162,22 @@ class SubscriptionState(object):
self.needs_partition_assignment = True
def assign_from_user(self, partitions):
"""
Change the assignment to the specified partitions provided by the user,
note this is different from assign_from_subscribed()
whose input partitions are provided from the subscribed topics.
"""Manually assign a list of TopicPartitions to this consumer.
@param partitions: list (or iterable) of TopicPartition()
This interface does not allow for incremental assignment and will
replace the previous assignment (if there was one).
Manual topic assignment through this method does not use the consumer's
group management functionality. As such, there will be no rebalance
operation triggered when group membership or cluster and topic metadata
change. Note that it is not possible to use both manual partition
assignment with assign() and group assignment with subscribe().
Arguments:
partitions (list of TopicPartition): assignment for this instance.
Raises:
IllegalStateError: if consumer has already called subscribe()
"""
if self.subscription is not None:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
@@ -175,6 +219,7 @@ class SubscriptionState(object):
log.info("Updated partition assignment: %s", assignments)
def unsubscribe(self):
"""Clear all topic subscriptions and partition assignments"""
self.subscription = None
self._user_assignment.clear()
self.assignment.clear()
@@ -191,17 +236,32 @@ class SubscriptionState(object):
that would require rebalancing (the leader fetches metadata for all
topics in the group so that it can do partition assignment).
@return set of topics
Returns:
set: topics
"""
return self._group_subscription
def seek(self, partition, offset):
"""Manually specify the fetch offset for a TopicPartition.
Overrides the fetch offsets that the consumer will use on the next
poll(). If this API is invoked for the same partition more than once,
the latest offset will be used on the next poll(). Note that you may
lose data if this API is arbitrarily used in the middle of consumption,
to reset the fetch offsets.
Arguments:
partition (TopicPartition): partition for seek operation
offset (int): message offset in partition
"""
self.assignment[partition].seek(offset)
def assigned_partitions(self):
"""Return set of TopicPartitions in current assignment."""
return set(self.assignment.keys())
def fetchable_partitions(self):
"""Return set of TopicPartitions that should be Fetched."""
fetchable = set()
for partition, state in six.iteritems(self.assignment):
if state.is_fetchable():
@@ -209,6 +269,7 @@ class SubscriptionState(object):
return fetchable
def partitions_auto_assigned(self):
"""Return True unless user supplied partitions manually."""
return self.subscription is not None
def all_consumed_offsets(self):
@@ -220,11 +281,18 @@ class SubscriptionState(object):
return all_consumed
def need_offset_reset(self, partition, offset_reset_strategy=None):
"""Mark partition for offset reset using specified or default strategy.
Arguments:
partition (TopicPartition): partition to mark
offset_reset_strategy (OffsetResetStrategy, optional)
"""
if offset_reset_strategy is None:
offset_reset_strategy = self._default_offset_reset_strategy
self.assignment[partition].await_reset(offset_reset_strategy)
def has_default_offset_reset_policy(self):
"""Return True if default offset reset policy is Earliest or Latest"""
return self._default_offset_reset_strategy != OffsetResetStrategy.NONE
def is_offset_reset_needed(self, partition):
@@ -372,8 +440,9 @@ class ConsumerRebalanceListener(object):
NOTE: This method is only called before rebalances. It is not called
prior to KafkaConsumer.close()
@param partitions The list of partitions that were assigned to the
consumer on the last rebalance
Arguments:
revoked (list of TopicPartition): the partitions that were assigned
to the consumer on the last rebalance
"""
pass
@@ -389,8 +458,8 @@ class ConsumerRebalanceListener(object):
their on_partitions_revoked() callback before any instance executes its
on_partitions_assigned() callback.
@param partitions The list of partitions that are now assigned to the
consumer (may include partitions previously assigned
to the consumer)
Arguments:
assigned (list of TopicPartition): the partitions assigned to the
consumer (may include partitions that were previously assigned)
"""
pass

View File

@@ -53,6 +53,25 @@ class AbstractCoordinator(object):
}
def __init__(self, client, **configs):
"""
Keyword Arguments:
group_id (str): name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and
committing offsets. Default: 'kafka-python-default-group'
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure
that the consumer's session stays active and to facilitate
rebalancing when new consumers join or leave the group. The
value must be set lower than session_timeout_ms, but typically
should be set no higher than 1/3 of that value. It can be
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
"""
if not client:
raise Errors.IllegalStateError('a client is required to use'
' Group Coordinator')
@@ -79,7 +98,8 @@ class AbstractCoordinator(object):
Unique identifier for the class of protocols implements
(e.g. "consumer" or "connect").
@return str protocol type name
Returns:
str: protocol type name
"""
pass
@@ -96,7 +116,8 @@ class AbstractCoordinator(object):
Note: metadata must be type bytes or support an encode() method
@return [(protocol, metadata), ...]
Returns:
list: [(protocol, metadata), ...]
"""
pass
@@ -107,9 +128,10 @@ class AbstractCoordinator(object):
This is typically used to perform any cleanup from the previous
generation (such as committing offsets for the consumer)
@param generation The previous generation or -1 if there was none
@param member_id The identifier of this member in the previous group
or '' if there was none
Arguments:
generation (int): The previous generation or -1 if there was none
member_id (str): The identifier of this member in the previous group
or '' if there was none
"""
pass
@@ -120,14 +142,16 @@ class AbstractCoordinator(object):
This is used by the leader to push state to all the members of the group
(e.g. to push partition assignments in the case of the new consumer)
@param leader_id: The id of the leader (which is this member)
@param protocol: the chosen group protocol (assignment strategy)
@param members: [(member_id, metadata_bytes)] from JoinGroupResponse.
metadata_bytes are associated with the chosen group
protocol, and the Coordinator subclass is responsible
for decoding metadata_bytes based on that protocol.
Arguments:
leader_id (str): The id of the leader (which is this member)
protocol (str): the chosen group protocol (assignment strategy)
members (list): [(member_id, metadata_bytes)] from
JoinGroupResponse. metadata_bytes are associated with the chosen
group protocol, and the Coordinator subclass is responsible for
decoding metadata_bytes based on that protocol.
@return dict of {member_id: assignment}; assignment must either be bytes
Returns:
dict: {member_id: assignment}; assignment must either be bytes
or have an encode() method to convert to bytes
"""
pass
@@ -137,22 +161,23 @@ class AbstractCoordinator(object):
member_assignment_bytes):
"""Invoked when a group member has successfully joined a group.
@param generation The generation that was joined
@param member_id The identifier for the local member in the group
@param protocol The protocol selected by the coordinator
@param member_assignment_bytes The protocol-encoded assignment
propagated from the group leader. The Coordinator instance is
responsible for decoding based on the chosen protocol.
Arguments:
generation (int): the generation that was joined
member_id (str): the identifier for the local member in the group
protocol (str): the protocol selected by the coordinator
member_assignment_bytes (bytes): the protocol-encoded assignment
propagated from the group leader. The Coordinator instance is
responsible for decoding based on the chosen protocol.
"""
pass
def coordinator_unknown(self):
"""
Check if we know who the coordinator is and we have an active connection
"""Check if we know who the coordinator is and have an active connection
Side-effect: reset coordinator_id to None if connection failed
@return True if the coordinator is unknown
Returns:
bool: True if the coordinator is unknown
"""
if self.coordinator_id is None:
return True
@@ -186,9 +211,10 @@ class AbstractCoordinator(object):
raise future.exception # pylint: disable-msg=raising-bad-type
def need_rejoin(self):
"""
Check whether the group should be rejoined (e.g. if metadata changes)
@return True if it should, False otherwise
"""Check whether the group should be rejoined (e.g. if metadata changes)
Returns:
bool: True if it should, False otherwise
"""
return self.rejoin_needed

View File

@@ -19,17 +19,36 @@ class AbstractPartitionAssignor(object):
def assign(self, cluster, members):
"""Perform group assignment given cluster metadata and member subscriptions
@param cluster: cluster metadata
@param members: {member_id: subscription}
@return {member_id: MemberAssignment}
Arguments:
cluster (ClusterMetadata): metadata for use in assignment
members (dict of {member_id: MemberMetadata}): decoded metadata for
each member in the group.
Returns:
dict: {member_id: MemberAssignment}
"""
pass
@abc.abstractmethod
def metadata(self, topics):
"""return ProtocolMetadata to be submitted via JoinGroupRequest"""
"""Generate ProtocolMetadata to be submitted via JoinGroupRequest.
Arguments:
topics (set): a member's subscribed topics
Returns:
MemberMetadata struct
"""
pass
@abc.abstractmethod
def on_assignment(self, assignment):
"""Callback that runs on each assignment.
This method can be used to update internal state, if any, of the
partition assignor.
Arguments:
assignment (MemberAssignment): the member's assignment
"""
pass

View File

@@ -50,14 +50,45 @@ class ConsumerCoordinator(AbstractCoordinator):
'group_id': 'kafka-python-default-group',
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, error: True,
'default_offset_commit_callback': lambda offsets, response: True,
'assignors': (),
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
}
"""Initialize the coordination manager."""
def __init__(self, client, subscription, **configs):
"""Initialize the coordination manager.
Keyword Arguments:
group_id (str): name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and
committing offsets. Default: 'kafka-python-default-group'
enable_auto_commit (bool): If true the consumer's offset will be
periodically committed in the background. Default: True.
auto_commit_interval_ms (int): milliseconds between automatic
offset commits, if enable_auto_commit is True. Default: 5000.
default_offset_commit_callback (callable): called as
callback(offsets, response) response will be either an Exception
or a OffsetCommitResponse struct. This callback can be used to
trigger custom actions when a commit request completes.
assignors (list): List of objects to use to distribute partition
ownership amongst consumer instances when group management is
used. Default: [RoundRobinPartitionAssignor]
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure
that the consumer's session stays active and to facilitate
rebalancing when new consumers join or leave the group. The
value must be set lower than session_timeout_ms, but typically
should be set no higher than 1/3 of that value. It can be
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
"""
super(ConsumerCoordinator, self).__init__(client, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config: