Merge pull request #341 from dpkp/kafka_consumer_docs

KafkaConsumer documentation
This commit is contained in:
Dana Powers
2015-03-29 18:09:03 -07:00
4 changed files with 185 additions and 149 deletions

View File

@@ -1,5 +1,6 @@
sphinx sphinx
sphinxcontrib-napoleon sphinxcontrib-napoleon
sphinx_rtd_theme
# Install kafka-python in editable mode # Install kafka-python in editable mode
# This allows the sphinx autodoc module # This allows the sphinx autodoc module

View File

@@ -1,12 +1,12 @@
Usage Usage
===== =====
High level SimpleProducer
---------- --------------
.. code:: python .. code:: python
from kafka import SimpleProducer, KafkaClient, KafkaConsumer from kafka import SimpleProducer, KafkaClient
# To send messages synchronously # To send messages synchronously
kafka = KafkaClient("localhost:9092") kafka = KafkaClient("localhost:9092")
@@ -51,17 +51,6 @@ High level
batch_send_every_n=20, batch_send_every_n=20,
batch_send_every_t=60) batch_send_every_t=60)
# To consume messages
consumer = KafkaConsumer("my-topic", group_id="my_group",
metadata_broker_list=["localhost:9092"])
for message in consumer:
# message is raw byte string -- decode if necessary!
# e.g., for unicode: `message.decode('utf-8')`
print(message)
kafka.close()
Keyed messages Keyed messages
-------------- --------------
@@ -80,6 +69,92 @@ Keyed messages
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
KafkaConsumer
-------------
.. code:: python
from kafka import KafkaConsumer
# To consume messages
consumer = KafkaConsumer("my-topic",
group_id="my_group",
bootstrap_servers=["localhost:9092"])
for message in consumer:
# message value is raw byte string -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
kafka.close()
messages (m) are namedtuples with attributes:
* `m.topic`: topic name (str)
* `m.partition`: partition number (int)
* `m.offset`: message offset on topic-partition log (int)
* `m.key`: key (bytes - can be None)
* `m.value`: message (output of deserializer_class - default is raw bytes)
.. code:: python
from kafka import KafkaConsumer
# more advanced consumer -- multiple topics w/ auto commit offset
# management
consumer = KafkaConsumer('topic1', 'topic2',
bootstrap_servers=['localhost:9092'],
group_id='my_consumer_group',
auto_commit_enable=True,
auto_commit_interval_ms=30 * 1000,
auto_offset_reset='smallest')
# Infinite iteration
for m in consumer:
do_some_work(m)
# Mark this message as fully consumed
# so it can be included in the next commit
#
# **messages that are not marked w/ task_done currently do not commit!
kafka.task_done(m)
# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()
# Batch process interface
while True:
for m in kafka.fetch_messages():
process_message(m)
kafka.task_done(m)
Configuration settings can be passed to constructor,
otherwise defaults will be used:
.. code:: python
client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
bootstrap_servers=[],
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
Multiprocess consumer Multiprocess consumer
--------------------- ---------------------

View File

@@ -52,114 +52,59 @@ DEPRECATED_CONFIG_KEYS = {
} }
class KafkaConsumer(object): class KafkaConsumer(object):
""" """A simpler kafka consumer"""
A simpler kafka consumer
.. code:: python
# A very basic 'tail' consumer, with no stored offset management
kafka = KafkaConsumer('topic1',
bootstrap_servers=['localhost:9092'])
for m in kafka:
print m
# Alternate interface: next()
print kafka.next()
# Alternate interface: batch iteration
while True:
for m in kafka.fetch_messages():
print m
print "Done with batch - let's do another!"
.. code:: python
# more advanced consumer -- multiple topics w/ auto commit offset
# management
kafka = KafkaConsumer('topic1', 'topic2',
bootstrap_servers=['localhost:9092'],
group_id='my_consumer_group',
auto_commit_enable=True,
auto_commit_interval_ms=30 * 1000,
auto_offset_reset='smallest')
# Infinite iteration
for m in kafka:
process_message(m)
kafka.task_done(m)
# Alternate interface: next()
m = kafka.next()
process_message(m)
kafka.task_done(m)
# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()
# Batch process interface
while True:
for m in kafka.fetch_messages():
process_message(m)
kafka.task_done(m)
messages (m) are namedtuples with attributes:
* `m.topic`: topic name (str)
* `m.partition`: partition number (int)
* `m.offset`: message offset on topic-partition log (int)
* `m.key`: key (bytes - can be None)
* `m.value`: message (output of deserializer_class - default is raw bytes)
Configuration settings can be passed to constructor,
otherwise defaults will be used:
.. code:: python
client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
bootstrap_servers=[],
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
"""
def __init__(self, *topics, **configs): def __init__(self, *topics, **configs):
self.configure(**configs) self.configure(**configs)
self.set_topic_partitions(*topics) self.set_topic_partitions(*topics)
def configure(self, **configs): def configure(self, **configs):
""" """Configure the consumer instance
Configuration settings can be passed to constructor, Configuration settings can be passed to constructor,
otherwise defaults will be used: otherwise defaults will be used:
.. code:: python Keyword Arguments:
bootstrap_servers (list): List of initial broker nodes the consumer
client_id='kafka.consumer.kafka', should contact to bootstrap initial cluster metadata. This does
group_id=None, not have to be the full node list. It just needs to have at
fetch_message_max_bytes=1024*1024, least one broker that will respond to a Metadata API Request.
fetch_min_bytes=1, client_id (str): a unique name for this client. Defaults to
fetch_wait_max_ms=100, 'kafka.consumer.kafka'.
refresh_leader_backoff_ms=200, group_id (str): the name of the consumer group to join,
bootstrap_servers=[], Offsets are fetched / committed to this group name.
socket_timeout_ms=30*1000, fetch_message_max_bytes (int, optional): Maximum bytes for each
auto_offset_reset='largest', topic/partition fetch request. Defaults to 1024*1024.
deserializer_class=lambda msg: msg, fetch_min_bytes (int, optional): Minimum amount of data the server
auto_commit_enable=False, should return for a fetch request, otherwise wait up to
auto_commit_interval_ms=60 * 1000, fetch_wait_max_ms for more data to accumulate. Defaults to 1.
auto_commit_interval_messages=None, fetch_wait_max_ms (int, optional): Maximum time for the server to
consumer_timeout_ms=-1 block waiting for fetch_min_bytes messages to accumulate.
Defaults to 100.
refresh_leader_backoff_ms (int, optional): Milliseconds to backoff
when refreshing metadata on errors (subject to random jitter).
Defaults to 200.
socket_timeout_ms (int, optional): TCP socket timeout in
milliseconds. Defaults to 30*1000.
auto_offset_reset (str, optional): A policy for resetting offsets on
OffsetOutOfRange errors. 'smallest' will move to the oldest
available message, 'largest' will move to the most recent. Any
ofther value will raise the exception. Defaults to 'largest'.
deserializer_class (callable, optional): Any callable that takes a
raw message value and returns a deserialized value. Defaults to
lambda msg: msg.
auto_commit_enable (bool, optional): Enabling auto-commit will cause
the KafkaConsumer to periodically commit offsets without an
explicit call to commit(). Defaults to False.
auto_commit_interval_ms (int, optional): If auto_commit_enabled,
the milliseconds between automatic offset commits. Defaults to
60 * 1000.
auto_commit_interval_messages (int, optional): If
auto_commit_enabled, a number of messages consumed between
automatic offset commits. Defaults to None (disabled).
consumer_timeout_ms (int, optional): number of millisecond to throw
a timeout exception to the consumer if no message is available
for consumption. Defaults to -1 (dont throw exception).
Configuration parameters are described in more detail at Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi http://kafka.apache.org/documentation.html#highlevelconsumerapi
@@ -316,18 +261,18 @@ class KafkaConsumer(object):
self._reset_message_iterator() self._reset_message_iterator()
def next(self): def next(self):
""" """Return the next available message
Return a single message from the message iterator
If consumer_timeout_ms is set, will raise ConsumerTimeout
if no message is available
Otherwise blocks indefinitely
Note that this is also the method called internally during iteration: Blocks indefinitely unless consumer_timeout_ms > 0
.. code:: python Returns:
a single KafkaMessage from the message iterator
for m in consumer: Raises:
pass ConsumerTimeout after consumer_timeout_ms and no message
Note:
This is also the method called internally during iteration
""" """
self._set_consumer_timeout_start() self._set_consumer_timeout_start()
@@ -343,21 +288,24 @@ class KafkaConsumer(object):
self._check_consumer_timeout() self._check_consumer_timeout()
def fetch_messages(self): def fetch_messages(self):
""" """Sends FetchRequests for all topic/partitions set for consumption
Sends FetchRequests for all topic/partitions set for consumption
Returns a generator that yields KafkaMessage structs
after deserializing with the configured `deserializer_class`
Refreshes metadata on errors, and resets fetch offset on Returns:
OffsetOutOfRange, per the configured `auto_offset_reset` policy Generator that yields KafkaMessage structs
after deserializing with the configured `deserializer_class`
Key configuration parameters: Note:
Refreshes metadata on errors, and resets fetch offset on
OffsetOutOfRange, per the configured `auto_offset_reset` policy
See Also:
Key KafkaConsumer configuration parameters:
* `fetch_message_max_bytes`
* `fetch_max_wait_ms`
* `fetch_min_bytes`
* `deserializer_class`
* `auto_offset_reset`
* `fetch_message_max_bytes`
* `fetch_max_wait_ms`
* `fetch_min_bytes`
* `deserializer_class`
* `auto_offset_reset`
""" """
max_bytes = self._config['fetch_message_max_bytes'] max_bytes = self._config['fetch_message_max_bytes']
@@ -436,21 +384,22 @@ class KafkaConsumer(object):
yield msg yield msg
def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
""" """Request available fetch offsets for a single topic/partition
Request available fetch offsets for a single topic/partition
Arguments: Keyword Arguments:
topic (str) topic (str): topic for offset request
partition (int) partition (int): partition for offset request
request_time_ms (int): Used to ask for all messages before a request_time_ms (int): Used to ask for all messages before a
certain time (ms). There are two special values. Specify -1 to receive the latest certain time (ms). There are two special values. Specify -1 to receive the latest
offset (i.e. the offset of the next coming message) and -2 to receive the earliest offset (i.e. the offset of the next coming message) and -2 to receive the earliest
available offset. Note that because offsets are pulled in descending order, asking for available offset. Note that because offsets are pulled in descending order, asking for
the earliest offset will always return you a single element. the earliest offset will always return you a single element.
max_num_offsets (int) max_num_offsets (int): Maximum offsets to include in the OffsetResponse
Returns: Returns:
offsets (list) a list of offsets in the OffsetResponse submitted for the provided
topic / partition. See:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
""" """
reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
@@ -466,7 +415,8 @@ class KafkaConsumer(object):
return resp.offsets return resp.offsets
def offsets(self, group=None): def offsets(self, group=None):
""" """Get internal consumer offset values
Keyword Arguments: Keyword Arguments:
group: Either "fetch", "commit", "task_done", or "highwater". group: Either "fetch", "commit", "task_done", or "highwater".
If no group specified, returns all groups. If no group specified, returns all groups.
@@ -485,10 +435,17 @@ class KafkaConsumer(object):
return dict(deepcopy(getattr(self._offsets, group))) return dict(deepcopy(getattr(self._offsets, group)))
def task_done(self, message): def task_done(self, message):
""" """Mark a fetched message as consumed.
Mark a fetched message as consumed.
Offsets for messages marked as "task_done" will be stored back Offsets for messages marked as "task_done" will be stored back
to the kafka cluster for this consumer group on commit() to the kafka cluster for this consumer group on commit()
Arguments:
message (KafkaMessage): the message to mark as complete
Returns:
Nothing
""" """
topic_partition = (message.topic, message.partition) topic_partition = (message.topic, message.partition)
offset = message.offset offset = message.offset
@@ -516,12 +473,15 @@ class KafkaConsumer(object):
self.commit() self.commit()
def commit(self): def commit(self):
""" """Store consumed message offsets (marked via task_done())
Store consumed message offsets (marked via task_done())
to kafka cluster for this consumer_group. to kafka cluster for this consumer_group.
**Note**: this functionality requires server version >=0.8.1.1 Returns:
See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_. True on success, or False if no offsets were found for commit
Note:
this functionality requires server version >=0.8.1.1
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
""" """
if not self._config['group_id']: if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!') logger.warning('Cannot commit without a group_id!')

View File

@@ -69,7 +69,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def kafka_consumer(self, **configs): def kafka_consumer(self, **configs):
brokers = '%s:%d' % (self.server.host, self.server.port) brokers = '%s:%d' % (self.server.host, self.server.port)
consumer = KafkaConsumer(self.topic, consumer = KafkaConsumer(self.topic,
metadata_broker_list=brokers, bootstrap_servers=brokers,
**configs) **configs)
return consumer return consumer