Update docs for release w/ new async classes
This commit is contained in:
parent
2a2e77aa1e
commit
d4e85ecd1d
5
docs/apidoc/BrokerConnection.rst
Normal file
5
docs/apidoc/BrokerConnection.rst
Normal file
@ -0,0 +1,5 @@
|
||||
BrokerConnection
|
||||
================
|
||||
|
||||
.. autoclass:: kafka.BrokerConnection
|
||||
:members:
|
5
docs/apidoc/KafkaClient.rst
Normal file
5
docs/apidoc/KafkaClient.rst
Normal file
@ -0,0 +1,5 @@
|
||||
KafkaClient
|
||||
===========
|
||||
|
||||
.. autoclass:: kafka.KafkaClient
|
||||
:members:
|
5
docs/apidoc/KafkaConsumer.rst
Normal file
5
docs/apidoc/KafkaConsumer.rst
Normal file
@ -0,0 +1,5 @@
|
||||
KafkaConsumer
|
||||
=============
|
||||
|
||||
.. autoclass:: kafka.KafkaConsumer
|
||||
:members:
|
4
docs/apidoc/KafkaProducer.rst
Normal file
4
docs/apidoc/KafkaProducer.rst
Normal file
@ -0,0 +1,4 @@
|
||||
KafkaProducer
|
||||
=============
|
||||
|
||||
<unreleased> See :class:`kafka.producer.SimpleProducer`
|
14
docs/apidoc/SimpleProducer.rst
Normal file
14
docs/apidoc/SimpleProducer.rst
Normal file
@ -0,0 +1,14 @@
|
||||
SimpleProducer
|
||||
==============
|
||||
|
||||
.. autoclass:: kafka.producer.SimpleProducer
|
||||
:members:
|
||||
:show-inheritance:
|
||||
|
||||
.. autoclass:: kafka.producer.KeyedProducer
|
||||
:members:
|
||||
:show-inheritance:
|
||||
|
||||
.. automodule:: kafka.producer.base
|
||||
:members:
|
||||
:show-inheritance:
|
@ -1,7 +1,10 @@
|
||||
kafka
|
||||
=====
|
||||
kafka-python API
|
||||
****************
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 4
|
||||
|
||||
kafka
|
||||
KafkaConsumer
|
||||
KafkaProducer
|
||||
KafkaClient
|
||||
BrokerConnection
|
||||
SimpleProducer
|
||||
|
14
docs/compatibility.rst
Normal file
14
docs/compatibility.rst
Normal file
@ -0,0 +1,14 @@
|
||||
Compatibility
|
||||
-------------
|
||||
|
||||
.. image:: https://img.shields.io/badge/kafka-0.9%2C%200.8.2%2C%200.8.1%2C%200.8-brightgreen.svg
|
||||
:target: https://kafka-python.readthedocs.org/compatibility.html
|
||||
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
|
||||
:target: https://pypi.python.org/pypi/kafka-python
|
||||
|
||||
kafka-python is compatible with (and tested against) broker versions 0.9.0.0
|
||||
through 0.8.0 . kafka-python is not compatible with the 0.8.2-beta release.
|
||||
|
||||
kafka-python is tested on python 2.6, 2.7, 3.3, 3.4, 3.5, and pypy.
|
||||
|
||||
Builds and tests via Travis-CI. See https://travis-ci.org/dpkp/kafka-python
|
@ -49,7 +49,7 @@ master_doc = 'index'
|
||||
|
||||
# General information about the project.
|
||||
project = u'kafka-python'
|
||||
copyright = u'2015 - David Arthur, Dana Powers, and Contributors'
|
||||
copyright = u'2016 -- Dana Powes, David Arthur, and Contributors'
|
||||
|
||||
# The version info for the project you're documenting, acts as replacement for
|
||||
# |version| and |release|, also used in various other places throughout the
|
||||
|
128
docs/index.rst
128
docs/index.rst
@ -1,66 +1,86 @@
|
||||
kafka-python
|
||||
============
|
||||
############
|
||||
|
||||
This module provides low-level protocol support for Apache Kafka as well as
|
||||
high-level consumer and producer classes. Request batching is supported by the
|
||||
protocol as well as broker-aware request routing. Gzip and Snappy compression
|
||||
is also supported for message sets.
|
||||
.. image:: https://img.shields.io/badge/kafka-0.9%2C%200.8.2%2C%200.8.1%2C%200.8-brightgreen.svg
|
||||
:target: https://kafka-python.readthedocs.org/compatibility.html
|
||||
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
|
||||
:target: https://pypi.python.org/pypi/kafka-python
|
||||
.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
|
||||
:target: https://coveralls.io/github/dpkp/kafka-python?branch=master
|
||||
.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master
|
||||
:target: https://travis-ci.org/dpkp/kafka-python
|
||||
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
|
||||
:target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
|
||||
|
||||
Coordinated Consumer Group support is under development - see Issue #38.
|
||||
>>> pip install kafka-python
|
||||
|
||||
On Freenode IRC at #kafka-python, as well as #apache-kafka
|
||||
|
||||
For general discussion of kafka-client design and implementation (not python specific),
|
||||
see https://groups.google.com/forum/m/#!forum/kafka-clients
|
||||
|
||||
For information about Apache Kafka generally, see https://kafka.apache.org/
|
||||
|
||||
Status
|
||||
------
|
||||
|
||||
The current stable version of this package is `0.9.5 <https://github.com/dpkp/kafka-python/releases/tag/v0.9.5>`_ and is compatible with:
|
||||
|
||||
Kafka broker versions
|
||||
|
||||
* 0.9.0.0
|
||||
* 0.8.2.2
|
||||
* 0.8.2.1
|
||||
* 0.8.1.1
|
||||
* 0.8.1
|
||||
* 0.8.0
|
||||
|
||||
Python versions
|
||||
|
||||
* 3.5 (tested on 3.5.0)
|
||||
* 3.4 (tested on 3.4.2)
|
||||
* 3.3 (tested on 3.3.5)
|
||||
* 2.7 (tested on 2.7.9)
|
||||
* 2.6 (tested on 2.6.9)
|
||||
* pypy (tested on pypy 2.5.0 / python 2.7.8)
|
||||
|
||||
License
|
||||
-------
|
||||
|
||||
Apache License, v2.0. See `LICENSE <https://github.com/dpkp/kafka-python/blob/master/LICENSE>`_.
|
||||
|
||||
Copyright 2015, David Arthur, Dana Powers, and Contributors
|
||||
(See `AUTHORS <https://github.com/dpkp/kafka-python/blob/master/AUTHORS.md>`_).
|
||||
kafka-python is a client for the Apache Kafka distributed stream processing
|
||||
system. It is designed to function much like the official java client, with a
|
||||
sprinkling of pythonic interfaces (e.g., iterators).
|
||||
|
||||
|
||||
Contents
|
||||
--------
|
||||
KafkaConsumer
|
||||
*************
|
||||
|
||||
>>> from kafka import KafkaConsumer
|
||||
>>> consumer = KafkaConsumer('my_favorite_topic')
|
||||
>>> for msg in consumer:
|
||||
... print (msg)
|
||||
|
||||
:class:`~kafka.consumer.KafkaConsumer` is a full-featured,
|
||||
high-level message consumer class that is similar in design and function to the
|
||||
new 0.9 java consumer. Most configuration parameters defined by the official
|
||||
java client are supported as optional kwargs, with generally similar behavior.
|
||||
Gzip and Snappy compressed messages are supported transparently.
|
||||
|
||||
In addition to the standard
|
||||
:meth:`~kafka.consumer.KafkaConsumer.poll` interface (which returns
|
||||
micro-batches of messages, grouped by topic-partition), kafka-python supports
|
||||
single-message iteration, yielding :class:`~kafka.consumer.ConsumerRecord`
|
||||
namedtuples, which include the topic, partition, offset, key, and value of each
|
||||
message.
|
||||
|
||||
By default, :class:`~kafka.consumer.KafkaConsumer` will attempt to auto-commit
|
||||
message offsets every 5 seconds. When used with 0.9 kafka brokers,
|
||||
:class:`~kafka.consumer.KafkaConsumer` will dynamically assign partitions using
|
||||
the kafka GroupCoordinator APIs and a
|
||||
:class:`~kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor`
|
||||
partitioning strategy, enabling relatively straightforward parallel consumption
|
||||
patterns. See :doc:`usage` for examples.
|
||||
|
||||
|
||||
KafkaProducer
|
||||
*************
|
||||
|
||||
TBD
|
||||
|
||||
|
||||
Protocol
|
||||
********
|
||||
|
||||
A secondary goal of kafka-python is to provide an easy-to-use protocol layer
|
||||
for interacting with kafka brokers via the python repl. This is useful for
|
||||
testing, probing, and general experimentation. The protocol support is
|
||||
leveraged to enable a :meth:`~kafka.KafkaClient.check_version()`
|
||||
method that probes a kafka broker and
|
||||
attempts to identify which version it is running (0.8.0 to 0.9).
|
||||
|
||||
|
||||
Low-level
|
||||
*********
|
||||
|
||||
Legacy support is maintained for low-level consumer and producer classes,
|
||||
SimpleConsumer and SimpleProducer.
|
||||
|
||||
|
||||
.. toctree::
|
||||
:hidden:
|
||||
:maxdepth: 2
|
||||
|
||||
usage
|
||||
Usage Overview <usage>
|
||||
API </apidoc/modules>
|
||||
install
|
||||
tests
|
||||
API reference </apidoc/modules>
|
||||
|
||||
Indices and tables
|
||||
==================
|
||||
|
||||
* :ref:`genindex`
|
||||
* :ref:`modindex`
|
||||
* :ref:`search`
|
||||
compatibility
|
||||
support
|
||||
license
|
||||
|
@ -1,10 +1,10 @@
|
||||
Install
|
||||
=======
|
||||
#######
|
||||
|
||||
Install with your favorite package manager
|
||||
|
||||
Latest Release
|
||||
--------------
|
||||
**************
|
||||
Pip:
|
||||
|
||||
.. code:: bash
|
||||
@ -15,7 +15,7 @@ Releases are also listed at https://github.com/dpkp/kafka-python/releases
|
||||
|
||||
|
||||
Bleeding-Edge
|
||||
-------------
|
||||
*************
|
||||
|
||||
.. code:: bash
|
||||
|
||||
@ -39,10 +39,10 @@ Using `setup.py` directly:
|
||||
|
||||
|
||||
Optional Snappy install
|
||||
-----------------------
|
||||
***********************
|
||||
|
||||
Install Development Libraries
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
=============================
|
||||
|
||||
Download and build Snappy from http://code.google.com/p/snappy/downloads/list
|
||||
|
||||
@ -70,7 +70,7 @@ From Source:
|
||||
sudo make install
|
||||
|
||||
Install Python Module
|
||||
^^^^^^^^^^^^^^^^^^^^^
|
||||
=====================
|
||||
|
||||
Install the `python-snappy` module
|
||||
|
||||
|
10
docs/license.rst
Normal file
10
docs/license.rst
Normal file
@ -0,0 +1,10 @@
|
||||
License
|
||||
-------
|
||||
|
||||
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
|
||||
:target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
|
||||
|
||||
Apache License, v2.0. See `LICENSE <https://github.com/dpkp/kafka-python/blob/master/LICENSE>`_.
|
||||
|
||||
Copyright 2016, David Arthur, Dana Powers, and Contributors
|
||||
(See `AUTHORS <https://github.com/dpkp/kafka-python/blob/master/AUTHORS.md>`_).
|
11
docs/support.rst
Normal file
11
docs/support.rst
Normal file
@ -0,0 +1,11 @@
|
||||
Support
|
||||
-------
|
||||
|
||||
For support, see github issues at https://github.com/dpkp/kafka-python
|
||||
|
||||
Limited IRC chat at #kafka-python on freenode (general chat is #apache-kafka).
|
||||
|
||||
For information about Apache Kafka generally, see https://kafka.apache.org/
|
||||
|
||||
For general discussion of kafka-client design and implementation (not python
|
||||
specific), see https://groups.google.com/forum/m/#!forum/kafka-clients
|
@ -1,59 +1,83 @@
|
||||
Tests
|
||||
=====
|
||||
|
||||
Run the unit tests
|
||||
.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
|
||||
:target: https://coveralls.io/github/dpkp/kafka-python?branch=master
|
||||
.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master
|
||||
:target: https://travis-ci.org/dpkp/kafka-python
|
||||
|
||||
Test environments are managed via tox. The test suite is run via pytest.
|
||||
Individual tests are written using unittest, pytest, and in some cases,
|
||||
doctest.
|
||||
|
||||
Linting is run via pylint, but is generally skipped on python2.6 and pypy
|
||||
due to pylint compatibility / performance issues.
|
||||
|
||||
For test coverage details, see https://coveralls.io/github/dpkp/kafka-python
|
||||
|
||||
The test suite includes unit tests that mock network interfaces, as well as
|
||||
integration tests that setup and teardown kafka broker (and zookeeper)
|
||||
fixtures for client / consumer / producer testing.
|
||||
|
||||
|
||||
Unit tests
|
||||
------------------
|
||||
|
||||
.. code:: bash
|
||||
To run the tests locally, install tox -- `pip install tox`
|
||||
See http://tox.readthedocs.org/en/latest/install.html
|
||||
|
||||
tox
|
||||
|
||||
|
||||
Run a subset of unit tests
|
||||
--------------------------
|
||||
Then simply run tox, optionally setting the python environment.
|
||||
If unset, tox will loop through all environments.
|
||||
|
||||
.. code:: bash
|
||||
|
||||
tox -e py27
|
||||
tox -e py35
|
||||
|
||||
# run protocol tests only
|
||||
tox -- -v test.test_protocol
|
||||
|
||||
# test with pypy only
|
||||
tox -e pypy
|
||||
# re-run the last failing test, dropping into pdb
|
||||
tox -e py27 -- --lf --pdb
|
||||
|
||||
# Run only 1 test, and use python 2.7
|
||||
tox -e py27 -- -v --with-id --collect-only
|
||||
|
||||
# pick a test number from the list like #102
|
||||
tox -e py27 -- -v --with-id 102
|
||||
# see available (pytest) options
|
||||
tox -e py27 -- --help
|
||||
|
||||
|
||||
Run the integration tests
|
||||
-------------------------
|
||||
Integration tests
|
||||
-----------------
|
||||
|
||||
The integration tests will actually start up real local Zookeeper
|
||||
instance and Kafka brokers, and send messages in using the client.
|
||||
.. code:: bash
|
||||
|
||||
First, get the kafka binaries for integration testing:
|
||||
KAFKA_VERSION=0.9.0.0 tox -e py27
|
||||
KAFKA_VERSION=0.8.2.2 tox -e py35
|
||||
|
||||
|
||||
Integration tests start Kafka and Zookeeper fixtures. This requires downloading
|
||||
kafka server binaries:
|
||||
|
||||
.. code:: bash
|
||||
|
||||
./build_integration.sh
|
||||
|
||||
By default, the build_integration.sh script will download binary
|
||||
distributions for all supported kafka versions.
|
||||
To test against the latest source build, set KAFKA_VERSION=trunk
|
||||
and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)
|
||||
By default, this will install 0.8.1.1, 0.8.2.2, and 0.9.0.0 brokers into the
|
||||
servers/ directory. To install a specific version, set `KAFKA_VERSION=1.2.3`:
|
||||
|
||||
.. code:: bash
|
||||
|
||||
SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh
|
||||
KAFKA_VERSION=0.8.0 ./build_integration.sh
|
||||
|
||||
Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION`
|
||||
env variable to the server build you want to use for testing:
|
||||
|
||||
.. code:: bash
|
||||
|
||||
KAFKA_VERSION=0.8.0 tox
|
||||
KAFKA_VERSION=0.8.1 tox
|
||||
KAFKA_VERSION=0.8.1.1 tox
|
||||
KAFKA_VERSION=trunk tox
|
||||
KAFKA_VERSION=0.9.0.0 tox -e py27
|
||||
|
||||
To test against the kafka source tree, set KAFKA_VERSION=trunk
|
||||
[optionally set SCALA_VERSION (defaults to 2.10)]
|
||||
|
||||
.. code:: bash
|
||||
|
||||
SCALA_VERSION=2.11 KAFKA_VERSION=trunk ./build_integration.sh
|
||||
KAFKA_VERSION=trunk tox -e py35
|
||||
|
252
docs/usage.rst
252
docs/usage.rst
@ -1,46 +1,69 @@
|
||||
Usage
|
||||
=====
|
||||
*****
|
||||
|
||||
SimpleProducer
|
||||
--------------
|
||||
|
||||
KafkaConsumer
|
||||
=============
|
||||
|
||||
.. code:: python
|
||||
|
||||
from kafka import SimpleProducer, KafkaClient
|
||||
from kafka import KafkaConsumer
|
||||
|
||||
# To send messages synchronously
|
||||
kafka = KafkaClient('localhost:9092')
|
||||
producer = SimpleProducer(kafka)
|
||||
# To consume latest messages and auto-commit offsets
|
||||
consumer = KafkaConsumer('my-topic',
|
||||
group_id='my-group',
|
||||
bootstrap_servers=['localhost:9092'])
|
||||
for message in consumer:
|
||||
# message value and key are raw bytes -- decode if necessary!
|
||||
# e.g., for unicode: `message.value.decode('utf-8')`
|
||||
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
|
||||
message.offset, message.key,
|
||||
message.value))
|
||||
|
||||
# Note that the application is responsible for encoding messages to type bytes
|
||||
producer.send_messages(b'my-topic', b'some message')
|
||||
producer.send_messages(b'my-topic', b'this method', b'is variadic')
|
||||
# consume earliest available messages, dont commit offsets
|
||||
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
|
||||
|
||||
# Send unicode message
|
||||
producer.send_messages(b'my-topic', u'你怎么样?'.encode('utf-8'))
|
||||
# consume json messages
|
||||
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
|
||||
|
||||
# consume msgpack
|
||||
KafkaConsumer(value_deserializer=msgpack.unpackb)
|
||||
|
||||
# StopIteration if no message after 1sec
|
||||
KafkaConsumer(consumer_timeout_ms=1000)
|
||||
|
||||
# Subscribe to a regex topic pattern
|
||||
consumer = KafkaConsumer()
|
||||
consumer.subscribe(pattern='^awesome.*')
|
||||
|
||||
# Use multiple consumers in parallel w/ 0.9 kafka brokers
|
||||
# typically you would run each on a different server / process / CPU
|
||||
consumer1 = KafkaConsumer('my-topic',
|
||||
group_id='my-group',
|
||||
bootstrap_servers='my.server.com')
|
||||
consumer2 = KafkaConsumer('my-topic',
|
||||
group_id='my-group',
|
||||
bootstrap_servers='my.server.com')
|
||||
|
||||
|
||||
There are many configuration options for the consumer class. See
|
||||
:class:`~kafka.KafkaConsumer` API documentation for more details.
|
||||
|
||||
|
||||
SimpleProducer
|
||||
==============
|
||||
|
||||
Asynchronous Mode
|
||||
-----------------
|
||||
|
||||
.. code:: python
|
||||
|
||||
from kafka import SimpleProducer, SimpleClient
|
||||
|
||||
# To send messages asynchronously
|
||||
producer = SimpleProducer(kafka, async=True)
|
||||
producer.send_messages(b'my-topic', b'async message')
|
||||
|
||||
# To wait for acknowledgements
|
||||
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
|
||||
# a local log before sending response
|
||||
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
|
||||
# by all in sync replicas before sending a response
|
||||
producer = SimpleProducer(kafka, async=False,
|
||||
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
|
||||
ack_timeout=2000,
|
||||
sync_fail_on_error=False)
|
||||
|
||||
responses = producer.send_messages(b'my-topic', b'another message')
|
||||
for r in responses:
|
||||
logging.info(r.offset)
|
||||
client = SimpleClient('localhost:9092')
|
||||
producer = SimpleProducer(client, async=True)
|
||||
producer.send_messages('my-topic', b'async message')
|
||||
|
||||
# To send messages in batch. You can use any of the available
|
||||
# producers for doing this. The following producer will collect
|
||||
@ -49,20 +72,55 @@ Asynchronous Mode
|
||||
# Notes:
|
||||
# * If the producer dies before the messages are sent, there will be losses
|
||||
# * Call producer.stop() to send the messages and cleanup
|
||||
producer = SimpleProducer(kafka, async=True,
|
||||
producer = SimpleProducer(client,
|
||||
async=True,
|
||||
batch_send_every_n=20,
|
||||
batch_send_every_t=60)
|
||||
|
||||
Keyed messages
|
||||
--------------
|
||||
Synchronous Mode
|
||||
----------------
|
||||
|
||||
.. code:: python
|
||||
|
||||
from kafka import SimpleProducer, SimpleClient
|
||||
|
||||
# To send messages synchronously
|
||||
client = SimpleClient('localhost:9092')
|
||||
producer = SimpleProducer(client, async=False)
|
||||
|
||||
# Note that the application is responsible for encoding messages to type bytes
|
||||
producer.send_messages('my-topic', b'some message')
|
||||
producer.send_messages('my-topic', b'this method', b'is variadic')
|
||||
|
||||
# Send unicode message
|
||||
producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8'))
|
||||
|
||||
# To wait for acknowledgements
|
||||
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
|
||||
# a local log before sending response
|
||||
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
|
||||
# by all in sync replicas before sending a response
|
||||
producer = SimpleProducer(client,
|
||||
async=False,
|
||||
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
|
||||
ack_timeout=2000,
|
||||
sync_fail_on_error=False)
|
||||
|
||||
responses = producer.send_messages('my-topic', b'another message')
|
||||
for r in responses:
|
||||
logging.info(r.offset)
|
||||
|
||||
|
||||
KeyedProducer
|
||||
=============
|
||||
|
||||
.. code:: python
|
||||
|
||||
from kafka import (
|
||||
KafkaClient, KeyedProducer,
|
||||
SimpleClient, KeyedProducer,
|
||||
Murmur2Partitioner, RoundRobinPartitioner)
|
||||
|
||||
kafka = KafkaClient('localhost:9092')
|
||||
kafka = SimpleClient('localhost:9092')
|
||||
|
||||
# HashedPartitioner is default (currently uses python hash())
|
||||
producer = KeyedProducer(kafka)
|
||||
@ -74,131 +132,3 @@ Keyed messages
|
||||
|
||||
# Or just produce round-robin (or just use SimpleProducer)
|
||||
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
|
||||
|
||||
|
||||
|
||||
KafkaConsumer
|
||||
-------------
|
||||
|
||||
.. code:: python
|
||||
|
||||
from kafka import KafkaConsumer
|
||||
|
||||
# To consume messages
|
||||
consumer = KafkaConsumer('my-topic',
|
||||
group_id='my_group',
|
||||
bootstrap_servers=['localhost:9092'])
|
||||
for message in consumer:
|
||||
# message value is raw byte string -- decode if necessary!
|
||||
# e.g., for unicode: `message.value.decode('utf-8')`
|
||||
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
|
||||
message.offset, message.key,
|
||||
message.value))
|
||||
|
||||
|
||||
messages (m) are namedtuples with attributes:
|
||||
|
||||
* `m.topic`: topic name (str)
|
||||
* `m.partition`: partition number (int)
|
||||
* `m.offset`: message offset on topic-partition log (int)
|
||||
* `m.key`: key (bytes - can be None)
|
||||
* `m.value`: message (output of deserializer_class - default is raw bytes)
|
||||
|
||||
|
||||
.. code:: python
|
||||
|
||||
from kafka import KafkaConsumer
|
||||
|
||||
# more advanced consumer -- multiple topics w/ auto commit offset
|
||||
# management
|
||||
consumer = KafkaConsumer('topic1', 'topic2',
|
||||
bootstrap_servers=['localhost:9092'],
|
||||
group_id='my_consumer_group',
|
||||
auto_commit_enable=True,
|
||||
auto_commit_interval_ms=30 * 1000,
|
||||
auto_offset_reset='smallest')
|
||||
|
||||
# Infinite iteration
|
||||
for m in consumer:
|
||||
do_some_work(m)
|
||||
|
||||
# Mark this message as fully consumed
|
||||
# so it can be included in the next commit
|
||||
#
|
||||
# **messages that are not marked w/ task_done currently do not commit!
|
||||
consumer.task_done(m)
|
||||
|
||||
# If auto_commit_enable is False, remember to commit() periodically
|
||||
consumer.commit()
|
||||
|
||||
# Batch process interface
|
||||
while True:
|
||||
for m in kafka.fetch_messages():
|
||||
process_message(m)
|
||||
consumer.task_done(m)
|
||||
|
||||
|
||||
Configuration settings can be passed to constructor,
|
||||
otherwise defaults will be used:
|
||||
|
||||
.. code:: python
|
||||
|
||||
client_id='kafka.consumer.kafka',
|
||||
group_id=None,
|
||||
fetch_message_max_bytes=1024*1024,
|
||||
fetch_min_bytes=1,
|
||||
fetch_wait_max_ms=100,
|
||||
refresh_leader_backoff_ms=200,
|
||||
bootstrap_servers=[],
|
||||
socket_timeout_ms=30*1000,
|
||||
auto_offset_reset='largest',
|
||||
deserializer_class=lambda msg: msg,
|
||||
auto_commit_enable=False,
|
||||
auto_commit_interval_ms=60 * 1000,
|
||||
consumer_timeout_ms=-1
|
||||
|
||||
Configuration parameters are described in more detail at
|
||||
http://kafka.apache.org/documentation.html#highlevelconsumerapi
|
||||
|
||||
Multiprocess consumer
|
||||
---------------------
|
||||
|
||||
.. code:: python
|
||||
|
||||
from kafka import KafkaClient, MultiProcessConsumer
|
||||
|
||||
kafka = KafkaClient('localhost:9092')
|
||||
|
||||
# This will split the number of partitions among two processes
|
||||
consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2)
|
||||
|
||||
# This will spawn processes such that each handles 2 partitions max
|
||||
consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic',
|
||||
partitions_per_proc=2)
|
||||
|
||||
for message in consumer:
|
||||
print(message)
|
||||
|
||||
for message in consumer.get_messages(count=5, block=True, timeout=4):
|
||||
print(message)
|
||||
|
||||
Low level
|
||||
---------
|
||||
|
||||
.. code:: python
|
||||
|
||||
from kafka import KafkaClient, create_message
|
||||
from kafka.protocol import KafkaProtocol
|
||||
from kafka.common import ProduceRequest
|
||||
|
||||
kafka = KafkaClient('localhost:9092')
|
||||
|
||||
req = ProduceRequest(topic=b'my-topic', partition=1,
|
||||
messages=[create_message(b'some message')])
|
||||
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
|
||||
kafka.close()
|
||||
|
||||
resps[0].topic # b'my-topic'
|
||||
resps[0].partition # 1
|
||||
resps[0].error # 0 (hopefully)
|
||||
resps[0].offset # offset of the first message sent in this request
|
||||
|
Loading…
Reference in New Issue
Block a user