From 634d24fa5bf1c258acac7956f301b0176c325f89 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 6 Mar 2017 16:08:33 -0800 Subject: [PATCH] Tweak README docs to show use of consumer group (no longer default); clarify producer.flush --- README.rst | 17 +++++++++++++---- docs/index.rst | 17 +++++++++++++---- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/README.rst b/README.rst index d8367a4..e5e37ca 100644 --- a/README.rst +++ b/README.rst @@ -52,6 +52,12 @@ that expose basic message attributes: topic, partition, offset, key, and value: >>> for msg in consumer: ... print (msg) +>>> # join a consumer group for dynamic partition assignment and offset commits +>>> from kafka import KafkaConsumer +>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') +>>> for msg in consumer: +... print (msg) + >>> # manually assign the partition list for the consumer >>> from kafka import TopicPartition >>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') @@ -78,11 +84,14 @@ for more details. >>> for _ in range(100): ... producer.send('foobar', b'some_message_bytes') ->>> # Block until all pending messages are sent ->>> producer.flush() - >>> # Block until a single message is sent (or timeout) ->>> producer.send('foobar', b'another_message').get(timeout=60) +>>> future = producer.send('foobar', b'another_message') +>>> result = future.get(timeout=60) + +>>> # Block until all pending messages are at least put on the network +>>> # NOTE: This does not guarantee delivery or success! It is really +>>> # only useful if you configure internal batching using linger_ms +>>> producer.flush() >>> # Use a key for hashed-partitioning >>> producer.send('foobar', key=b'foo', value=b'bar') diff --git a/docs/index.rst b/docs/index.rst index 5e74d02..2cef7fe 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -51,6 +51,12 @@ that expose basic message attributes: topic, partition, offset, key, and value: >>> for msg in consumer: ... print (msg) +>>> # join a consumer group for dynamic partition assignment and offset commits +>>> from kafka import KafkaConsumer +>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') +>>> for msg in consumer: +... print (msg) + >>> # manually assign the partition list for the consumer >>> from kafka import TopicPartition >>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') @@ -76,11 +82,14 @@ client. See `KafkaProducer `_ for more details. >>> for _ in range(100): ... producer.send('foobar', b'some_message_bytes') ->>> # Block until all pending messages are sent ->>> producer.flush() - >>> # Block until a single message is sent (or timeout) ->>> producer.send('foobar', b'another_message').get(timeout=60) +>>> future = producer.send('foobar', b'another_message') +>>> result = future.get(timeout=60) + +>>> # Block until all pending messages are at least put on the network +>>> # NOTE: This does not guarantee delivery or success! It is really +>>> # only useful if you configure internal batching using linger_ms +>>> producer.flush() >>> # Use a key for hashed-partitioning >>> producer.send('foobar', key=b'foo', value=b'bar')