Updates to KafkaConsumer usage docs
This commit is contained in:
@@ -80,32 +80,24 @@ KafkaConsumer
|
||||
# To consume messages
|
||||
consumer = KafkaConsumer("my-topic",
|
||||
group_id="my_group",
|
||||
metadata_broker_list=["localhost:9092"])
|
||||
bootstrap_servers=["localhost:9092"])
|
||||
for message in consumer:
|
||||
# message is raw byte string -- decode if necessary!
|
||||
# e.g., for unicode: `message.decode('utf-8')`
|
||||
print(message)
|
||||
# message value is raw byte string -- decode if necessary!
|
||||
# e.g., for unicode: `message.value.decode('utf-8')`
|
||||
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
|
||||
message.offset, message.key,
|
||||
message.value))
|
||||
|
||||
kafka.close()
|
||||
|
||||
.. code:: python
|
||||
|
||||
from kafka import KafkaConsumer
|
||||
messages (m) are namedtuples with attributes:
|
||||
|
||||
# A very basic 'tail' consumer, with no stored offset management
|
||||
kafka = KafkaConsumer('topic1',
|
||||
metadata_broker_list=['localhost:9092'])
|
||||
for m in kafka:
|
||||
print m
|
||||
|
||||
# Alternate interface: next()
|
||||
print kafka.next()
|
||||
|
||||
# Alternate interface: batch iteration
|
||||
while True:
|
||||
for m in kafka.fetch_messages():
|
||||
print m
|
||||
print "Done with batch - let's do another!"
|
||||
* `m.topic`: topic name (str)
|
||||
* `m.partition`: partition number (int)
|
||||
* `m.offset`: message offset on topic-partition log (int)
|
||||
* `m.key`: key (bytes - can be None)
|
||||
* `m.value`: message (output of deserializer_class - default is raw bytes)
|
||||
|
||||
|
||||
.. code:: python
|
||||
@@ -114,22 +106,22 @@ KafkaConsumer
|
||||
|
||||
# more advanced consumer -- multiple topics w/ auto commit offset
|
||||
# management
|
||||
kafka = KafkaConsumer('topic1', 'topic2',
|
||||
metadata_broker_list=['localhost:9092'],
|
||||
group_id='my_consumer_group',
|
||||
auto_commit_enable=True,
|
||||
auto_commit_interval_ms=30 * 1000,
|
||||
auto_offset_reset='smallest')
|
||||
consumer = KafkaConsumer('topic1', 'topic2',
|
||||
bootstrap_servers=['localhost:9092'],
|
||||
group_id='my_consumer_group',
|
||||
auto_commit_enable=True,
|
||||
auto_commit_interval_ms=30 * 1000,
|
||||
auto_offset_reset='smallest')
|
||||
|
||||
# Infinite iteration
|
||||
for m in kafka:
|
||||
process_message(m)
|
||||
kafka.task_done(m)
|
||||
for m in consumer:
|
||||
do_some_work(m)
|
||||
|
||||
# Alternate interface: next()
|
||||
m = kafka.next()
|
||||
process_message(m)
|
||||
kafka.task_done(m)
|
||||
# Mark this message as fully consumed
|
||||
# so it can be included in the next commit
|
||||
#
|
||||
# **messages that are not marked w/ task_done currently do not commit!
|
||||
kafka.task_done(m)
|
||||
|
||||
# If auto_commit_enable is False, remember to commit() periodically
|
||||
kafka.commit()
|
||||
@@ -141,14 +133,6 @@ KafkaConsumer
|
||||
kafka.task_done(m)
|
||||
|
||||
|
||||
messages (m) are namedtuples with attributes:
|
||||
|
||||
* `m.topic`: topic name (str)
|
||||
* `m.partition`: partition number (int)
|
||||
* `m.offset`: message offset on topic-partition log (int)
|
||||
* `m.key`: key (bytes - can be None)
|
||||
* `m.value`: message (output of deserializer_class - default is raw bytes)
|
||||
|
||||
Configuration settings can be passed to constructor,
|
||||
otherwise defaults will be used:
|
||||
|
||||
@@ -160,7 +144,7 @@ KafkaConsumer
|
||||
fetch_min_bytes=1,
|
||||
fetch_wait_max_ms=100,
|
||||
refresh_leader_backoff_ms=200,
|
||||
metadata_broker_list=None,
|
||||
bootstrap_servers=[],
|
||||
socket_timeout_ms=30*1000,
|
||||
auto_offset_reset='largest',
|
||||
deserializer_class=lambda msg: msg,
|
||||
|
||||
Reference in New Issue
Block a user