Docstring updates
This commit is contained in:
@@ -455,31 +455,28 @@ class KafkaClient(object):
|
||||
time.sleep(.5)
|
||||
|
||||
def load_metadata_for_topics(self, *topics):
|
||||
"""
|
||||
Fetch broker and topic-partition metadata from the server,
|
||||
and update internal data:
|
||||
broker list, topic/partition list, and topic/parition -> broker map
|
||||
"""Fetch broker and topic-partition metadata from the server.
|
||||
|
||||
This method should be called after receiving any error
|
||||
Updates internal data: broker list, topic/partition list, and
|
||||
topic/parition -> broker map. This method should be called after
|
||||
receiving any error.
|
||||
|
||||
Note: Exceptions *will not* be raised in a full refresh (i.e. no topic
|
||||
list). In this case, error codes will be logged as errors.
|
||||
Partition-level errors will also not be raised here (a single partition
|
||||
w/o a leader, for example).
|
||||
|
||||
Arguments:
|
||||
*topics (optional): If a list of topics is provided,
|
||||
the metadata refresh will be limited to the specified topics only.
|
||||
the metadata refresh will be limited to the specified topics
|
||||
only.
|
||||
|
||||
Exceptions:
|
||||
----------
|
||||
If the broker is configured to not auto-create topics,
|
||||
expect UnknownTopicOrPartitionError for topics that don't exist
|
||||
|
||||
If the broker is configured to auto-create topics,
|
||||
expect LeaderNotAvailableError for new topics
|
||||
until partitions have been initialized.
|
||||
|
||||
Exceptions *will not* be raised in a full refresh (i.e. no topic list)
|
||||
In this case, error codes will be logged as errors
|
||||
|
||||
Partition-level errors will also not be raised here
|
||||
(a single partition w/o a leader, for example)
|
||||
Raises:
|
||||
UnknownTopicOrPartitionError: Raised for topics that do not exist,
|
||||
unless the broker is configured to auto-create topics.
|
||||
LeaderNotAvailableError: Raised for topics that do not exist yet,
|
||||
when the broker is configured to auto-create topics. Retry
|
||||
after a short backoff (topics/partitions are initializing).
|
||||
"""
|
||||
if topics:
|
||||
self.reset_topic_metadata(*topics)
|
||||
|
@@ -56,23 +56,29 @@ def gzip_decode(payload):
|
||||
|
||||
|
||||
def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32*1024):
|
||||
"""Encodes the given data with snappy if xerial_compatible is set then the
|
||||
stream is encoded in a fashion compatible with the xerial snappy library
|
||||
"""Encodes the given data with snappy compression.
|
||||
|
||||
If xerial_compatible is set then the stream is encoded in a fashion
|
||||
compatible with the xerial snappy library.
|
||||
|
||||
The block size (xerial_blocksize) controls how frequent the blocking occurs
|
||||
32k is the default in the xerial library.
|
||||
|
||||
The format winds up being:
|
||||
|
||||
The block size (xerial_blocksize) controls how frequent the blocking
|
||||
occurs 32k is the default in the xerial library.
|
||||
|
||||
The format winds up being
|
||||
+-------------+------------+--------------+------------+--------------+
|
||||
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
|
||||
|-------------+------------+--------------+------------+--------------|
|
||||
+-------------+------------+--------------+------------+--------------+
|
||||
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
|
||||
+-------------+------------+--------------+------------+--------------+
|
||||
|
||||
It is important to not that the blocksize is the amount of uncompressed
|
||||
data presented to snappy at each block, whereas the blocklen is the
|
||||
number of bytes that will be present in the stream, that is the
|
||||
length will always be <= blocksize.
|
||||
|
||||
It is important to note that the blocksize is the amount of uncompressed
|
||||
data presented to snappy at each block, whereas the blocklen is the number
|
||||
of bytes that will be present in the stream; so the length will always be
|
||||
<= blocksize.
|
||||
|
||||
"""
|
||||
|
||||
if not has_snappy():
|
||||
@@ -109,9 +115,9 @@ def _detect_xerial_stream(payload):
|
||||
This mode writes a magic header of the format:
|
||||
+--------+--------------+------------+---------+--------+
|
||||
| Marker | Magic String | Null / Pad | Version | Compat |
|
||||
|--------+--------------+------------+---------+--------|
|
||||
+--------+--------------+------------+---------+--------+
|
||||
| byte | c-string | byte | int32 | int32 |
|
||||
|--------+--------------+------------+---------+--------|
|
||||
+--------+--------------+------------+---------+--------+
|
||||
| -126 | 'SNAPPY' | \0 | | |
|
||||
+--------+--------------+------------+---------+--------+
|
||||
|
||||
|
@@ -126,9 +126,17 @@ class BrokerConnection(object):
|
||||
return False
|
||||
|
||||
def connected(self):
|
||||
"""Return True iff socket is connected."""
|
||||
return self.state is ConnectionStates.CONNECTED
|
||||
|
||||
def close(self, error=None):
|
||||
"""Close socket and fail all in-flight-requests.
|
||||
|
||||
Arguments:
|
||||
error (Exception, optional): pending in-flight-requests
|
||||
will be failed with this exception.
|
||||
Default: kafka.common.ConnectionError.
|
||||
"""
|
||||
if self._sock:
|
||||
self._sock.close()
|
||||
self._sock = None
|
||||
@@ -189,11 +197,12 @@ class BrokerConnection(object):
|
||||
return future
|
||||
|
||||
def can_send_more(self):
|
||||
"""Return True unless there are max_in_flight_requests."""
|
||||
max_ifrs = self.config['max_in_flight_requests_per_connection']
|
||||
return len(self.in_flight_requests) < max_ifrs
|
||||
|
||||
def recv(self, timeout=0):
|
||||
"""Non-blocking network receive
|
||||
"""Non-blocking network receive.
|
||||
|
||||
Return response if available
|
||||
"""
|
||||
|
@@ -18,51 +18,17 @@ log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KafkaConsumer(six.Iterator):
|
||||
"""Consumer for Kafka 0.9"""
|
||||
DEFAULT_CONFIG = {
|
||||
'bootstrap_servers': 'localhost',
|
||||
'client_id': 'kafka-python-' + __version__,
|
||||
'group_id': 'kafka-python-default-group',
|
||||
'key_deserializer': None,
|
||||
'value_deserializer': None,
|
||||
'fetch_max_wait_ms': 500,
|
||||
'fetch_min_bytes': 1024,
|
||||
'max_partition_fetch_bytes': 1 * 1024 * 1024,
|
||||
'request_timeout_ms': 40 * 1000,
|
||||
'retry_backoff_ms': 100,
|
||||
'reconnect_backoff_ms': 50,
|
||||
'max_in_flight_requests_per_connection': 5,
|
||||
'auto_offset_reset': 'latest',
|
||||
'enable_auto_commit': True,
|
||||
'auto_commit_interval_ms': 5000,
|
||||
'check_crcs': True,
|
||||
'metadata_max_age_ms': 5 * 60 * 1000,
|
||||
'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
|
||||
'heartbeat_interval_ms': 3000,
|
||||
'session_timeout_ms': 30000,
|
||||
'send_buffer_bytes': 128 * 1024,
|
||||
'receive_buffer_bytes': 32 * 1024,
|
||||
'consumer_timeout_ms': -1,
|
||||
'api_version': 'auto',
|
||||
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
|
||||
#'metric_reporters': None,
|
||||
#'metrics_num_samples': 2,
|
||||
#'metrics_sample_window_ms': 30000,
|
||||
}
|
||||
"""Consume records from a Kafka cluster.
|
||||
|
||||
def __init__(self, *topics, **configs):
|
||||
"""A Kafka client that consumes records from a Kafka cluster.
|
||||
The consumer will transparently handle the failure of servers in the Kafka
|
||||
cluster, and adapt as topic-partitions are created or migrate between
|
||||
brokers. It also interacts with the assigned kafka Group Coordinator node
|
||||
to allow multiple consumers to load balance consumption of topics (requires
|
||||
kafka >= 0.9.0.0).
|
||||
|
||||
The consumer will transparently handle the failure of servers in the
|
||||
Kafka cluster, and transparently adapt as partitions of data it fetches
|
||||
migrate within the cluster. This client also interacts with the server
|
||||
to allow groups of consumers to load balance consumption using consumer
|
||||
groups.
|
||||
|
||||
Requires Kafka Server >= 0.9.0.0
|
||||
|
||||
Configuration settings can be passed to constructor as kwargs,
|
||||
otherwise defaults will be used:
|
||||
Arguments:
|
||||
*topics (str): optional list of topics to subscribe to. If not set,
|
||||
call subscribe() or assign() before consuming records.
|
||||
|
||||
Keyword Arguments:
|
||||
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
|
||||
@@ -156,9 +122,42 @@ class KafkaConsumer(six.Iterator):
|
||||
attempt to infer the broker version by probing various APIs.
|
||||
Default: auto
|
||||
|
||||
Note:
|
||||
Configuration parameters are described in more detail at
|
||||
https://kafka.apache.org/090/configuration.html#newconsumerconfigs
|
||||
"""
|
||||
DEFAULT_CONFIG = {
|
||||
'bootstrap_servers': 'localhost',
|
||||
'client_id': 'kafka-python-' + __version__,
|
||||
'group_id': 'kafka-python-default-group',
|
||||
'key_deserializer': None,
|
||||
'value_deserializer': None,
|
||||
'fetch_max_wait_ms': 500,
|
||||
'fetch_min_bytes': 1024,
|
||||
'max_partition_fetch_bytes': 1 * 1024 * 1024,
|
||||
'request_timeout_ms': 40 * 1000,
|
||||
'retry_backoff_ms': 100,
|
||||
'reconnect_backoff_ms': 50,
|
||||
'max_in_flight_requests_per_connection': 5,
|
||||
'auto_offset_reset': 'latest',
|
||||
'enable_auto_commit': True,
|
||||
'auto_commit_interval_ms': 5000,
|
||||
'check_crcs': True,
|
||||
'metadata_max_age_ms': 5 * 60 * 1000,
|
||||
'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
|
||||
'heartbeat_interval_ms': 3000,
|
||||
'session_timeout_ms': 30000,
|
||||
'send_buffer_bytes': 128 * 1024,
|
||||
'receive_buffer_bytes': 32 * 1024,
|
||||
'consumer_timeout_ms': -1,
|
||||
'api_version': 'auto',
|
||||
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
|
||||
#'metric_reporters': None,
|
||||
#'metrics_num_samples': 2,
|
||||
#'metrics_sample_window_ms': 30000,
|
||||
}
|
||||
|
||||
def __init__(self, *topics, **configs):
|
||||
self.config = copy.copy(self.DEFAULT_CONFIG)
|
||||
for key in self.config:
|
||||
if key in configs:
|
||||
@@ -204,20 +203,25 @@ class KafkaConsumer(six.Iterator):
|
||||
def assign(self, partitions):
|
||||
"""Manually assign a list of TopicPartitions to this consumer.
|
||||
|
||||
This interface does not allow for incremental assignment and will
|
||||
replace the previous assignment (if there was one).
|
||||
|
||||
Manual topic assignment through this method does not use the consumer's
|
||||
group management functionality. As such, there will be no rebalance
|
||||
operation triggered when group membership or cluster and topic metadata
|
||||
change. Note that it is not possible to use both manual partition
|
||||
assignment with assign() and group assignment with subscribe().
|
||||
|
||||
Arguments:
|
||||
partitions (list of TopicPartition): assignment for this instance.
|
||||
|
||||
Raises:
|
||||
IllegalStateError: if consumer has already called subscribe()
|
||||
|
||||
Warning:
|
||||
It is not possible to use both manual partition assignment with
|
||||
assign() and group assignment with subscribe().
|
||||
|
||||
Note:
|
||||
This interface does not support incremental assignment and will
|
||||
replace the previous assignment (if there was one).
|
||||
|
||||
Note:
|
||||
Manual topic assignment through this method does not use the
|
||||
consumer's group management functionality. As such, there will be
|
||||
no rebalance operation triggered when group membership or cluster
|
||||
and topic metadata change.
|
||||
"""
|
||||
self._subscription.assign_from_user(partitions)
|
||||
self._client.set_topics([tp.topic for tp in partitions])
|
||||
@@ -225,12 +229,12 @@ class KafkaConsumer(six.Iterator):
|
||||
def assignment(self):
|
||||
"""Get the TopicPartitions currently assigned to this consumer.
|
||||
|
||||
If partitions were directly assigning using assign(), then this will
|
||||
simply return the same partitions that were assigned.
|
||||
If topics were subscribed to using subscribe(), then this will give the
|
||||
If partitions were directly assigned using assign(), then this will
|
||||
simply return the same partitions that were previously assigned.
|
||||
If topics were subscribed using subscribe(), then this will give the
|
||||
set of topic partitions currently assigned to the consumer (which may
|
||||
be none if the assignment hasn't happened yet, or the partitions are in
|
||||
the process of getting reassigned).
|
||||
be none if the assignment hasn't happened yet, or if the partitions are
|
||||
in the process of being reassigned).
|
||||
|
||||
Returns:
|
||||
set: {TopicPartition, ...}
|
||||
@@ -654,31 +658,25 @@ class KafkaConsumer(six.Iterator):
|
||||
|
||||
# old KafkaConsumer methods are deprecated
|
||||
def configure(self, **configs):
|
||||
"""DEPRECATED -- initialize a new consumer"""
|
||||
raise NotImplementedError(
|
||||
'deprecated -- initialize a new consumer')
|
||||
|
||||
def set_topic_partitions(self, *topics):
|
||||
"""DEPRECATED -- use subscribe() or assign()"""
|
||||
raise NotImplementedError(
|
||||
'deprecated -- use subscribe() or assign()')
|
||||
|
||||
def fetch_messages(self):
|
||||
"""DEPRECATED -- use poll() or iterator interface"""
|
||||
raise NotImplementedError(
|
||||
'deprecated -- use poll() or iterator interface')
|
||||
|
||||
def get_partition_offsets(self, topic, partition,
|
||||
request_time_ms, max_num_offsets):
|
||||
"""DEPRECATED -- send OffsetRequest with KafkaClient"""
|
||||
raise NotImplementedError(
|
||||
'deprecated -- send an OffsetRequest with KafkaClient')
|
||||
|
||||
def offsets(self, group=None):
|
||||
"""DEPRECATED -- use committed(partition)"""
|
||||
raise NotImplementedError('deprecated -- use committed(partition)')
|
||||
|
||||
def task_done(self, message):
|
||||
"""DEPRECATED -- commit manually if needed"""
|
||||
raise NotImplementedError(
|
||||
'deprecated -- commit offsets manually if needed')
|
||||
|
@@ -61,7 +61,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
|
||||
|
||||
Arguments:
|
||||
queue (threading.Queue): the queue from which to get messages
|
||||
client (KafkaClient): instance to use for communicating with brokers
|
||||
client (kafka.SimpleClient): instance to use for communicating
|
||||
with brokers
|
||||
codec (kafka.protocol.ALL_CODECS): compression codec to use
|
||||
batch_time (int): interval in seconds to send message batches
|
||||
batch_size (int): count of messages that will trigger an immediate send
|
||||
@@ -225,9 +226,9 @@ class Producer(object):
|
||||
Base class to be used by producers
|
||||
|
||||
Arguments:
|
||||
client (KafkaClient): instance to use for broker communications.
|
||||
If async=True, the background thread will use client.copy(),
|
||||
which is expected to return a thread-safe object.
|
||||
client (kafka.SimpleClient): instance to use for broker
|
||||
communications. If async=True, the background thread will use
|
||||
client.copy(), which is expected to return a thread-safe object.
|
||||
codec (kafka.protocol.ALL_CODECS): compression codec to use.
|
||||
req_acks (int, optional): A value indicating the acknowledgements that
|
||||
the server must receive before responding to the request,
|
||||
@@ -345,20 +346,36 @@ class Producer(object):
|
||||
self.sync_fail_on_error = sync_fail_on_error
|
||||
|
||||
def send_messages(self, topic, partition, *msg):
|
||||
"""
|
||||
Helper method to send produce requests
|
||||
@param: topic, name of topic for produce request -- type str
|
||||
@param: partition, partition number for produce request -- type int
|
||||
@param: *msg, one or more message payloads -- type bytes
|
||||
@returns: ResponseRequest returned by server
|
||||
raises on error
|
||||
"""Helper method to send produce requests.
|
||||
|
||||
Note that msg type *must* be encoded to bytes by user.
|
||||
Passing unicode message will not work, for example
|
||||
you should encode before calling send_messages via
|
||||
something like `unicode_message.encode('utf-8')`
|
||||
Note that msg type *must* be encoded to bytes by user. Passing unicode
|
||||
message will not work, for example you should encode before calling
|
||||
send_messages via something like `unicode_message.encode('utf-8')`
|
||||
All messages will set the message 'key' to None.
|
||||
|
||||
All messages produced via this method will set the message 'key' to Null
|
||||
Arguments:
|
||||
topic (str): name of topic for produce request
|
||||
partition (int): partition number for produce request
|
||||
*msg (bytes): one or more message payloads
|
||||
|
||||
Returns:
|
||||
ResponseRequest returned by server
|
||||
|
||||
Raises:
|
||||
FailedPayloadsError: low-level connection error, can be caused by
|
||||
networking failures, or a malformed request.
|
||||
ConnectionError:
|
||||
KafkaUnavailableError: all known brokers are down when attempting
|
||||
to refresh metadata.
|
||||
LeaderNotAvailableError: topic or partition is initializing or
|
||||
a broker failed and leadership election is in progress.
|
||||
NotLeaderForPartitionError: metadata is out of sync; the broker
|
||||
that the request was sent to is not the leader for the topic
|
||||
or partition.
|
||||
UnknownTopicOrPartitionError: the topic or partition has not
|
||||
been created yet and auto-creation is not available.
|
||||
AsyncProducerQueueFull: in async mode, if too many messages are
|
||||
unsent and remain in the internal queue.
|
||||
"""
|
||||
return self._send_messages(topic, partition, *msg)
|
||||
|
||||
|
Reference in New Issue
Block a user