6.4 KiB
6.4 KiB
Usage
SimpleProducer
from kafka import SimpleProducer, KafkaClient
# To send messages synchronously
= KafkaClient('localhost:9092')
kafka = SimpleProducer(kafka)
producer
# Note that the application is responsible for encoding messages to type bytes
b'my-topic', b'some message')
producer.send_messages(b'my-topic', b'this method', b'is variadic')
producer.send_messages(
# Send unicode message
b'my-topic', u'你怎么样?'.encode('utf-8')) producer.send_messages(
Asynchronous Mode
# To send messages asynchronously
= SimpleProducer(kafka, async=True)
producer b'my-topic', b'async message')
producer.send_messages(
# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
= SimpleProducer(kafka, async=False,
producer =SimpleProducer.ACK_AFTER_LOCAL_WRITE,
req_acks=2000,
ack_timeout=False)
sync_fail_on_error
= producer.send_messages(b'my-topic', b'another message')
responses for r in responses:
logging.info(r.offset)
# To send messages in batch. You can use any of the available
# producers for doing this. The following producer will collect
# messages in batch and send them to Kafka after 20 messages are
# collected or every 60 seconds
# Notes:
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
= SimpleProducer(kafka, async=True,
producer =20,
batch_send_every_n=60) batch_send_every_t
Keyed messages
from kafka import (
KafkaClient, KeyedProducer,
Murmur2Partitioner, RoundRobinPartitioner)
= KafkaClient('localhost:9092')
kafka
# HashedPartitioner is default (currently uses python hash())
= KeyedProducer(kafka)
producer b'my-topic', b'key1', b'some message')
producer.send_messages(b'my-topic', b'key2', b'this methode')
producer.send_messages(
# Murmur2Partitioner attempts to mirror the java client hashing
= KeyedProducer(kafka, partitioner=Murmur2Partitioner)
producer
# Or just produce round-robin (or just use SimpleProducer)
= KeyedProducer(kafka, partitioner=RoundRobinPartitioner) producer
KafkaConsumer
from kafka import KafkaConsumer
# To consume messages
= KafkaConsumer('my-topic',
consumer ='my_group',
group_id=['localhost:9092'])
bootstrap_serversfor message in consumer:
# message value is raw byte string -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key, message.value))
messages (m) are namedtuples with attributes:
- `m.topic`: topic name (str)
- `m.partition`: partition number (int)
- `m.offset`: message offset on topic-partition log (int)
- `m.key`: key (bytes - can be None)
- `m.value`: message (output of deserializer_class - default is raw bytes)
from kafka import KafkaConsumer
# more advanced consumer -- multiple topics w/ auto commit offset
# management
= KafkaConsumer('topic1', 'topic2',
consumer =['localhost:9092'],
bootstrap_servers='my_consumer_group',
group_id=True,
auto_commit_enable=30 * 1000,
auto_commit_interval_ms='smallest')
auto_offset_reset
# Infinite iteration
for m in consumer:
do_some_work(m)
# Mark this message as fully consumed
# so it can be included in the next commit
#
# **messages that are not marked w/ task_done currently do not commit!
consumer.task_done(m)
# If auto_commit_enable is False, remember to commit() periodically
consumer.commit()
# Batch process interface
while True:
for m in kafka.fetch_messages():
process_message(m)
consumer.task_done(m)
Configuration settings can be passed to constructor, otherwise defaults will be used:
='kafka.consumer.kafka',
client_id=None,
group_id=1024*1024,
fetch_message_max_bytes=1,
fetch_min_bytes=100,
fetch_wait_max_ms=200,
refresh_leader_backoff_ms=[],
bootstrap_servers=30*1000,
socket_timeout_ms='largest',
auto_offset_reset=lambda msg: msg,
deserializer_class=False,
auto_commit_enable=60 * 1000,
auto_commit_interval_ms=-1
consumer_timeout_ms
in more detail at
Configuration parameters are described //kafka.apache.org/documentation.html#highlevelconsumerapi http:
Multiprocess consumer
from kafka import KafkaClient, MultiProcessConsumer
= KafkaClient('localhost:9092')
kafka
# This will split the number of partitions among two processes
= MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2)
consumer
# This will spawn processes such that each handles 2 partitions max
= MultiProcessConsumer(kafka, b'my-group', b'my-topic',
consumer =2)
partitions_per_proc
for message in consumer:
print(message)
for message in consumer.get_messages(count=5, block=True, timeout=4):
print(message)
Low level
from kafka import KafkaClient, create_message
from kafka.protocol import KafkaProtocol
from kafka.common import ProduceRequest
= KafkaClient('localhost:9092')
kafka
= ProduceRequest(topic=b'my-topic', partition=1,
req =[create_message(b'some message')])
messages= kafka.send_produce_request(payloads=[req], fail_on_error=True)
resps
kafka.close()
0].topic # b'my-topic'
resps[0].partition # 1
resps[0].error # 0 (hopefully)
resps[0].offset # offset of the first message sent in this request resps[