3.1 KiB
3.1 KiB
Usage
KafkaConsumer
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
= KafkaConsumer('my-topic',
consumer ='my-group',
group_id=['localhost:9092'])
bootstrap_serversfor message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# consume earliest available messages, dont commit offsets
='earliest', enable_auto_commit=False)
KafkaConsumer(auto_offset_reset
# consume json messages
=lambda m: json.loads(m.decode('ascii')))
KafkaConsumer(value_deserializer
# consume msgpack
=msgpack.unpackb)
KafkaConsumer(value_deserializer
# StopIteration if no message after 1sec
=1000)
KafkaConsumer(consumer_timeout_ms
# Subscribe to a regex topic pattern
= KafkaConsumer()
consumer ='^awesome.*')
consumer.subscribe(pattern
# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
= KafkaConsumer('my-topic',
consumer1 ='my-group',
group_id='my.server.com')
bootstrap_servers= KafkaConsumer('my-topic',
consumer2 ='my-group',
group_id='my.server.com') bootstrap_servers
There are many configuration options for the consumer class. See
~kafka.KafkaConsumer
API documentation for more details.
KafkaProducer
from kafka import KafkaProducer
from kafka.errors import KafkaError
= KafkaProducer(bootstrap_servers=['broker1:1234'])
producer
# Asynchronous by default
= producer.send('my-topic', b'raw_bytes')
future
# Block for 'synchronous' sends
try:
= future.get(timeout=10)
record_metadata except KafkaError:
# Decide what to do if produce request failed...
log.exception()pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
'my-topic', key=b'foo', value=b'bar')
producer.send(
# encode objects via msgpack
= KafkaProducer(value_serializer=msgpack.dumps)
producer 'msgpack-topic', {'key': 'value'})
producer.send(
# produce json messages
= KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer 'json-topic', {'key': 'value'})
producer.send(
# produce asynchronously
for _ in range(100):
'my-topic', b'msg')
producer.send(
# block until all async messages are sent
producer.flush()
# configure multiple retries
= KafkaProducer(retries=5) producer