Merge pull request #282 from wedaly/sphinx-api-docs
Add Sphinx API docs
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -8,3 +8,4 @@ env
|
|||||||
servers/*/kafka-bin
|
servers/*/kafka-bin
|
||||||
.coverage
|
.coverage
|
||||||
.noseids
|
.noseids
|
||||||
|
docs/_build
|
||||||
|
|||||||
232
README.md
232
README.md
@@ -2,6 +2,8 @@
|
|||||||
|
|
||||||
[](https://travis-ci.org/mumrah/kafka-python)
|
[](https://travis-ci.org/mumrah/kafka-python)
|
||||||
|
|
||||||
|
[Full documentation available on ReadTheDocs](http://kafka-python.readthedocs.org/en/latest/)
|
||||||
|
|
||||||
This module provides low-level protocol support for Apache Kafka as well as
|
This module provides low-level protocol support for Apache Kafka as well as
|
||||||
high-level consumer and producer classes. Request batching is supported by the
|
high-level consumer and producer classes. Request batching is supported by the
|
||||||
protocol as well as broker-aware request routing. Gzip and Snappy compression
|
protocol as well as broker-aware request routing. Gzip and Snappy compression
|
||||||
@@ -32,233 +34,3 @@ Python versions
|
|||||||
- 2.7 (tested on 2.7.8)
|
- 2.7 (tested on 2.7.8)
|
||||||
- pypy (tested on pypy 2.3.1 / python 2.7.6)
|
- pypy (tested on pypy 2.3.1 / python 2.7.6)
|
||||||
- (Python 3.3 and 3.4 support has been added to trunk and will be available the next release)
|
- (Python 3.3 and 3.4 support has been added to trunk and will be available the next release)
|
||||||
|
|
||||||
# Usage
|
|
||||||
|
|
||||||
## High level
|
|
||||||
|
|
||||||
```python
|
|
||||||
from kafka import KafkaClient, SimpleProducer, SimpleConsumer
|
|
||||||
|
|
||||||
# To send messages synchronously
|
|
||||||
kafka = KafkaClient("localhost:9092")
|
|
||||||
producer = SimpleProducer(kafka)
|
|
||||||
|
|
||||||
# Note that the application is responsible for encoding messages to type str
|
|
||||||
producer.send_messages("my-topic", "some message")
|
|
||||||
producer.send_messages("my-topic", "this method", "is variadic")
|
|
||||||
|
|
||||||
# Send unicode message
|
|
||||||
producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
|
|
||||||
|
|
||||||
# To send messages asynchronously
|
|
||||||
# WARNING: current implementation does not guarantee message delivery on failure!
|
|
||||||
# messages can get dropped! Use at your own risk! Or help us improve with a PR!
|
|
||||||
producer = SimpleProducer(kafka, async=True)
|
|
||||||
producer.send_messages("my-topic", "async message")
|
|
||||||
|
|
||||||
# To wait for acknowledgements
|
|
||||||
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
|
|
||||||
# a local log before sending response
|
|
||||||
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
|
|
||||||
# by all in sync replicas before sending a response
|
|
||||||
producer = SimpleProducer(kafka, async=False,
|
|
||||||
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
|
|
||||||
ack_timeout=2000)
|
|
||||||
|
|
||||||
response = producer.send_messages("my-topic", "another message")
|
|
||||||
|
|
||||||
if response:
|
|
||||||
print(response[0].error)
|
|
||||||
print(response[0].offset)
|
|
||||||
|
|
||||||
# To send messages in batch. You can use any of the available
|
|
||||||
# producers for doing this. The following producer will collect
|
|
||||||
# messages in batch and send them to Kafka after 20 messages are
|
|
||||||
# collected or every 60 seconds
|
|
||||||
# Notes:
|
|
||||||
# * If the producer dies before the messages are sent, there will be losses
|
|
||||||
# * Call producer.stop() to send the messages and cleanup
|
|
||||||
producer = SimpleProducer(kafka, batch_send=True,
|
|
||||||
batch_send_every_n=20,
|
|
||||||
batch_send_every_t=60)
|
|
||||||
|
|
||||||
# To consume messages
|
|
||||||
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
|
|
||||||
for message in consumer:
|
|
||||||
# message is raw byte string -- decode if necessary!
|
|
||||||
# e.g., for unicode: `message.decode('utf-8')`
|
|
||||||
print(message)
|
|
||||||
|
|
||||||
kafka.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
## Keyed messages
|
|
||||||
```python
|
|
||||||
from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner
|
|
||||||
|
|
||||||
kafka = KafkaClient("localhost:9092")
|
|
||||||
|
|
||||||
# HashedPartitioner is default
|
|
||||||
producer = KeyedProducer(kafka)
|
|
||||||
producer.send("my-topic", "key1", "some message")
|
|
||||||
producer.send("my-topic", "key2", "this methode")
|
|
||||||
|
|
||||||
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Multiprocess consumer
|
|
||||||
```python
|
|
||||||
from kafka import KafkaClient, MultiProcessConsumer
|
|
||||||
|
|
||||||
kafka = KafkaClient("localhost:9092")
|
|
||||||
|
|
||||||
# This will split the number of partitions among two processes
|
|
||||||
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
|
|
||||||
|
|
||||||
# This will spawn processes such that each handles 2 partitions max
|
|
||||||
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
|
|
||||||
partitions_per_proc=2)
|
|
||||||
|
|
||||||
for message in consumer:
|
|
||||||
print(message)
|
|
||||||
|
|
||||||
for message in consumer.get_messages(count=5, block=True, timeout=4):
|
|
||||||
print(message)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Low level
|
|
||||||
|
|
||||||
```python
|
|
||||||
from kafka import KafkaClient, create_message
|
|
||||||
from kafka.protocol import KafkaProtocol
|
|
||||||
from kafka.common import ProduceRequest
|
|
||||||
|
|
||||||
kafka = KafkaClient("localhost:9092")
|
|
||||||
|
|
||||||
req = ProduceRequest(topic="my-topic", partition=1,
|
|
||||||
messages=[create_message("some message")])
|
|
||||||
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
|
|
||||||
kafka.close()
|
|
||||||
|
|
||||||
resps[0].topic # "my-topic"
|
|
||||||
resps[0].partition # 1
|
|
||||||
resps[0].error # 0 (hopefully)
|
|
||||||
resps[0].offset # offset of the first message sent in this request
|
|
||||||
```
|
|
||||||
|
|
||||||
# Install
|
|
||||||
|
|
||||||
Install with your favorite package manager
|
|
||||||
|
|
||||||
## Latest Release
|
|
||||||
Pip:
|
|
||||||
|
|
||||||
```shell
|
|
||||||
pip install kafka-python
|
|
||||||
```
|
|
||||||
|
|
||||||
Releases are also listed at https://github.com/mumrah/kafka-python/releases
|
|
||||||
|
|
||||||
|
|
||||||
## Bleeding-Edge
|
|
||||||
```shell
|
|
||||||
git clone https://github.com/mumrah/kafka-python
|
|
||||||
pip install ./kafka-python
|
|
||||||
```
|
|
||||||
|
|
||||||
Setuptools:
|
|
||||||
```shell
|
|
||||||
git clone https://github.com/mumrah/kafka-python
|
|
||||||
easy_install ./kafka-python
|
|
||||||
```
|
|
||||||
|
|
||||||
Using `setup.py` directly:
|
|
||||||
```shell
|
|
||||||
git clone https://github.com/mumrah/kafka-python
|
|
||||||
cd kafka-python
|
|
||||||
python setup.py install
|
|
||||||
```
|
|
||||||
|
|
||||||
## Optional Snappy install
|
|
||||||
|
|
||||||
### Install Development Libraries
|
|
||||||
Download and build Snappy from http://code.google.com/p/snappy/downloads/list
|
|
||||||
|
|
||||||
Ubuntu:
|
|
||||||
```shell
|
|
||||||
apt-get install libsnappy-dev
|
|
||||||
```
|
|
||||||
|
|
||||||
OSX:
|
|
||||||
```shell
|
|
||||||
brew install snappy
|
|
||||||
```
|
|
||||||
|
|
||||||
From Source:
|
|
||||||
```shell
|
|
||||||
wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
|
|
||||||
tar xzvf snappy-1.0.5.tar.gz
|
|
||||||
cd snappy-1.0.5
|
|
||||||
./configure
|
|
||||||
make
|
|
||||||
sudo make install
|
|
||||||
```
|
|
||||||
|
|
||||||
### Install Python Module
|
|
||||||
Install the `python-snappy` module
|
|
||||||
```shell
|
|
||||||
pip install python-snappy
|
|
||||||
```
|
|
||||||
|
|
||||||
# Tests
|
|
||||||
|
|
||||||
## Run the unit tests
|
|
||||||
|
|
||||||
```shell
|
|
||||||
tox
|
|
||||||
```
|
|
||||||
|
|
||||||
## Run a subset of unit tests
|
|
||||||
```shell
|
|
||||||
# run protocol tests only
|
|
||||||
tox -- -v test.test_protocol
|
|
||||||
```
|
|
||||||
|
|
||||||
```shell
|
|
||||||
# test with pypy only
|
|
||||||
tox -e pypy
|
|
||||||
```
|
|
||||||
|
|
||||||
```shell
|
|
||||||
# Run only 1 test, and use python 2.7
|
|
||||||
tox -e py27 -- -v --with-id --collect-only
|
|
||||||
# pick a test number from the list like #102
|
|
||||||
tox -e py27 -- -v --with-id 102
|
|
||||||
```
|
|
||||||
|
|
||||||
## Run the integration tests
|
|
||||||
|
|
||||||
The integration tests will actually start up real local Zookeeper
|
|
||||||
instance and Kafka brokers, and send messages in using the client.
|
|
||||||
|
|
||||||
First, get the kafka binaries for integration testing:
|
|
||||||
```shell
|
|
||||||
./build_integration.sh
|
|
||||||
```
|
|
||||||
By default, the build_integration.sh script will download binary
|
|
||||||
distributions for all supported kafka versions.
|
|
||||||
To test against the latest source build, set KAFKA_VERSION=trunk
|
|
||||||
and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)
|
|
||||||
```shell
|
|
||||||
SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh
|
|
||||||
```
|
|
||||||
|
|
||||||
Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION`
|
|
||||||
env variable to the server build you want to use for testing:
|
|
||||||
```shell
|
|
||||||
KAFKA_VERSION=0.8.0 tox
|
|
||||||
KAFKA_VERSION=0.8.1 tox
|
|
||||||
KAFKA_VERSION=0.8.1.1 tox
|
|
||||||
KAFKA_VERSION=trunk tox
|
|
||||||
```
|
|
||||||
|
|||||||
177
docs/Makefile
Normal file
177
docs/Makefile
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
# Makefile for Sphinx documentation
|
||||||
|
#
|
||||||
|
|
||||||
|
# You can set these variables from the command line.
|
||||||
|
SPHINXOPTS =
|
||||||
|
SPHINXBUILD = sphinx-build
|
||||||
|
PAPER =
|
||||||
|
BUILDDIR = _build
|
||||||
|
|
||||||
|
# User-friendly check for sphinx-build
|
||||||
|
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
|
||||||
|
$(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/)
|
||||||
|
endif
|
||||||
|
|
||||||
|
# Internal variables.
|
||||||
|
PAPEROPT_a4 = -D latex_paper_size=a4
|
||||||
|
PAPEROPT_letter = -D latex_paper_size=letter
|
||||||
|
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
|
||||||
|
# the i18n builder cannot share the environment and doctrees with the others
|
||||||
|
I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
|
||||||
|
|
||||||
|
.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext
|
||||||
|
|
||||||
|
help:
|
||||||
|
@echo "Please use \`make <target>' where <target> is one of"
|
||||||
|
@echo " html to make standalone HTML files"
|
||||||
|
@echo " dirhtml to make HTML files named index.html in directories"
|
||||||
|
@echo " singlehtml to make a single large HTML file"
|
||||||
|
@echo " pickle to make pickle files"
|
||||||
|
@echo " json to make JSON files"
|
||||||
|
@echo " htmlhelp to make HTML files and a HTML help project"
|
||||||
|
@echo " qthelp to make HTML files and a qthelp project"
|
||||||
|
@echo " devhelp to make HTML files and a Devhelp project"
|
||||||
|
@echo " epub to make an epub"
|
||||||
|
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
|
||||||
|
@echo " latexpdf to make LaTeX files and run them through pdflatex"
|
||||||
|
@echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx"
|
||||||
|
@echo " text to make text files"
|
||||||
|
@echo " man to make manual pages"
|
||||||
|
@echo " texinfo to make Texinfo files"
|
||||||
|
@echo " info to make Texinfo files and run them through makeinfo"
|
||||||
|
@echo " gettext to make PO message catalogs"
|
||||||
|
@echo " changes to make an overview of all changed/added/deprecated items"
|
||||||
|
@echo " xml to make Docutils-native XML files"
|
||||||
|
@echo " pseudoxml to make pseudoxml-XML files for display purposes"
|
||||||
|
@echo " linkcheck to check all external links for integrity"
|
||||||
|
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -rf $(BUILDDIR)/*
|
||||||
|
|
||||||
|
html:
|
||||||
|
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
|
||||||
|
|
||||||
|
dirhtml:
|
||||||
|
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
|
||||||
|
|
||||||
|
singlehtml:
|
||||||
|
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
|
||||||
|
|
||||||
|
pickle:
|
||||||
|
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
|
||||||
|
@echo
|
||||||
|
@echo "Build finished; now you can process the pickle files."
|
||||||
|
|
||||||
|
json:
|
||||||
|
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
|
||||||
|
@echo
|
||||||
|
@echo "Build finished; now you can process the JSON files."
|
||||||
|
|
||||||
|
htmlhelp:
|
||||||
|
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
|
||||||
|
@echo
|
||||||
|
@echo "Build finished; now you can run HTML Help Workshop with the" \
|
||||||
|
".hhp project file in $(BUILDDIR)/htmlhelp."
|
||||||
|
|
||||||
|
qthelp:
|
||||||
|
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
|
||||||
|
@echo
|
||||||
|
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
|
||||||
|
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
|
||||||
|
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/kafka-python.qhcp"
|
||||||
|
@echo "To view the help file:"
|
||||||
|
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/kafka-python.qhc"
|
||||||
|
|
||||||
|
devhelp:
|
||||||
|
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
|
||||||
|
@echo
|
||||||
|
@echo "Build finished."
|
||||||
|
@echo "To view the help file:"
|
||||||
|
@echo "# mkdir -p $$HOME/.local/share/devhelp/kafka-python"
|
||||||
|
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/kafka-python"
|
||||||
|
@echo "# devhelp"
|
||||||
|
|
||||||
|
epub:
|
||||||
|
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
|
||||||
|
|
||||||
|
latex:
|
||||||
|
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
|
||||||
|
@echo
|
||||||
|
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
|
||||||
|
@echo "Run \`make' in that directory to run these through (pdf)latex" \
|
||||||
|
"(use \`make latexpdf' here to do that automatically)."
|
||||||
|
|
||||||
|
latexpdf:
|
||||||
|
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
|
||||||
|
@echo "Running LaTeX files through pdflatex..."
|
||||||
|
$(MAKE) -C $(BUILDDIR)/latex all-pdf
|
||||||
|
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
|
||||||
|
|
||||||
|
latexpdfja:
|
||||||
|
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
|
||||||
|
@echo "Running LaTeX files through platex and dvipdfmx..."
|
||||||
|
$(MAKE) -C $(BUILDDIR)/latex all-pdf-ja
|
||||||
|
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
|
||||||
|
|
||||||
|
text:
|
||||||
|
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The text files are in $(BUILDDIR)/text."
|
||||||
|
|
||||||
|
man:
|
||||||
|
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
|
||||||
|
|
||||||
|
texinfo:
|
||||||
|
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo."
|
||||||
|
@echo "Run \`make' in that directory to run these through makeinfo" \
|
||||||
|
"(use \`make info' here to do that automatically)."
|
||||||
|
|
||||||
|
info:
|
||||||
|
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
|
||||||
|
@echo "Running Texinfo files through makeinfo..."
|
||||||
|
make -C $(BUILDDIR)/texinfo info
|
||||||
|
@echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
|
||||||
|
|
||||||
|
gettext:
|
||||||
|
$(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
|
||||||
|
|
||||||
|
changes:
|
||||||
|
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
|
||||||
|
@echo
|
||||||
|
@echo "The overview file is in $(BUILDDIR)/changes."
|
||||||
|
|
||||||
|
linkcheck:
|
||||||
|
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
|
||||||
|
@echo
|
||||||
|
@echo "Link check complete; look for any errors in the above output " \
|
||||||
|
"or in $(BUILDDIR)/linkcheck/output.txt."
|
||||||
|
|
||||||
|
doctest:
|
||||||
|
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
|
||||||
|
@echo "Testing of doctests in the sources finished, look at the " \
|
||||||
|
"results in $(BUILDDIR)/doctest/output.txt."
|
||||||
|
|
||||||
|
xml:
|
||||||
|
$(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The XML files are in $(BUILDDIR)/xml."
|
||||||
|
|
||||||
|
pseudoxml:
|
||||||
|
$(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml
|
||||||
|
@echo
|
||||||
|
@echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml."
|
||||||
67
docs/api_reference.rst
Normal file
67
docs/api_reference.rst
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
API Reference
|
||||||
|
=============
|
||||||
|
|
||||||
|
kafka
|
||||||
|
-----
|
||||||
|
.. automodule:: kafka.client
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.codec
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.common
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.conn
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.context
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.protocol
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.queue
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.util
|
||||||
|
:members:
|
||||||
|
|
||||||
|
|
||||||
|
kafka.consumer
|
||||||
|
--------------
|
||||||
|
.. automodule:: kafka.consumer.base
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.consumer.kafka
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.consumer.multiprocess
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.consumer.simple
|
||||||
|
:members:
|
||||||
|
|
||||||
|
|
||||||
|
kafka.partitioner
|
||||||
|
-----------------
|
||||||
|
.. automodule:: kafka.partitioner.base
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.partitioner.hashed
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.partitioner.roundrobin
|
||||||
|
:members:
|
||||||
|
|
||||||
|
|
||||||
|
kafka.producer
|
||||||
|
--------------
|
||||||
|
.. automodule:: kafka.producer.base
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.producer.keyed
|
||||||
|
:members:
|
||||||
|
|
||||||
|
.. automodule:: kafka.producer.simple
|
||||||
|
:members:
|
||||||
264
docs/conf.py
Normal file
264
docs/conf.py
Normal file
@@ -0,0 +1,264 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# kafka-python documentation build configuration file, created by
|
||||||
|
# sphinx-quickstart on Sun Jan 4 12:21:50 2015.
|
||||||
|
#
|
||||||
|
# This file is execfile()d with the current directory set to its
|
||||||
|
# containing dir.
|
||||||
|
#
|
||||||
|
# Note that not all possible configuration values are present in this
|
||||||
|
# autogenerated file.
|
||||||
|
#
|
||||||
|
# All configuration values have a default; values that are commented out
|
||||||
|
# serve to show the default.
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
# If extensions (or modules to document with autodoc) are in another directory,
|
||||||
|
# add these directories to sys.path here. If the directory is relative to the
|
||||||
|
# documentation root, use os.path.abspath to make it absolute, like shown here.
|
||||||
|
#sys.path.insert(0, os.path.abspath('.'))
|
||||||
|
|
||||||
|
# -- General configuration ------------------------------------------------
|
||||||
|
|
||||||
|
# If your documentation needs a minimal Sphinx version, state it here.
|
||||||
|
#needs_sphinx = '1.0'
|
||||||
|
|
||||||
|
# Add any Sphinx extension module names here, as strings. They can be
|
||||||
|
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
|
||||||
|
# ones.
|
||||||
|
extensions = [
|
||||||
|
'sphinx.ext.autodoc',
|
||||||
|
'sphinx.ext.viewcode',
|
||||||
|
'sphinxcontrib.napoleon',
|
||||||
|
]
|
||||||
|
|
||||||
|
# Add any paths that contain templates here, relative to this directory.
|
||||||
|
templates_path = ['_templates']
|
||||||
|
|
||||||
|
# The suffix of source filenames.
|
||||||
|
source_suffix = '.rst'
|
||||||
|
|
||||||
|
# The encoding of source files.
|
||||||
|
#source_encoding = 'utf-8-sig'
|
||||||
|
|
||||||
|
# The master toctree document.
|
||||||
|
master_doc = 'index'
|
||||||
|
|
||||||
|
# General information about the project.
|
||||||
|
project = u'kafka-python'
|
||||||
|
copyright = u'2015, David Arthur'
|
||||||
|
|
||||||
|
# The version info for the project you're documenting, acts as replacement for
|
||||||
|
# |version| and |release|, also used in various other places throughout the
|
||||||
|
# built documents.
|
||||||
|
#
|
||||||
|
# The short X.Y version.
|
||||||
|
with open('../VERSION') as version_file:
|
||||||
|
version = version_file.read()
|
||||||
|
|
||||||
|
# The full version, including alpha/beta/rc tags.
|
||||||
|
release = version
|
||||||
|
|
||||||
|
# The language for content autogenerated by Sphinx. Refer to documentation
|
||||||
|
# for a list of supported languages.
|
||||||
|
#language = None
|
||||||
|
|
||||||
|
# There are two options for replacing |today|: either, you set today to some
|
||||||
|
# non-false value, then it is used:
|
||||||
|
#today = ''
|
||||||
|
# Else, today_fmt is used as the format for a strftime call.
|
||||||
|
#today_fmt = '%B %d, %Y'
|
||||||
|
|
||||||
|
# List of patterns, relative to source directory, that match files and
|
||||||
|
# directories to ignore when looking for source files.
|
||||||
|
exclude_patterns = ['_build']
|
||||||
|
|
||||||
|
# The reST default role (used for this markup: `text`) to use for all
|
||||||
|
# documents.
|
||||||
|
#default_role = None
|
||||||
|
|
||||||
|
# If true, '()' will be appended to :func: etc. cross-reference text.
|
||||||
|
#add_function_parentheses = True
|
||||||
|
|
||||||
|
# If true, the current module name will be prepended to all description
|
||||||
|
# unit titles (such as .. function::).
|
||||||
|
#add_module_names = True
|
||||||
|
|
||||||
|
# If true, sectionauthor and moduleauthor directives will be shown in the
|
||||||
|
# output. They are ignored by default.
|
||||||
|
#show_authors = False
|
||||||
|
|
||||||
|
# The name of the Pygments (syntax highlighting) style to use.
|
||||||
|
pygments_style = 'sphinx'
|
||||||
|
|
||||||
|
# A list of ignored prefixes for module index sorting.
|
||||||
|
#modindex_common_prefix = []
|
||||||
|
|
||||||
|
# If true, keep warnings as "system message" paragraphs in the built documents.
|
||||||
|
#keep_warnings = False
|
||||||
|
|
||||||
|
|
||||||
|
# -- Options for HTML output ----------------------------------------------
|
||||||
|
|
||||||
|
# The theme to use for HTML and HTML Help pages. See the documentation for
|
||||||
|
# a list of builtin themes.
|
||||||
|
html_theme = 'default'
|
||||||
|
|
||||||
|
# Theme options are theme-specific and customize the look and feel of a theme
|
||||||
|
# further. For a list of options available for each theme, see the
|
||||||
|
# documentation.
|
||||||
|
#html_theme_options = {}
|
||||||
|
|
||||||
|
# Add any paths that contain custom themes here, relative to this directory.
|
||||||
|
#html_theme_path = []
|
||||||
|
|
||||||
|
# The name for this set of Sphinx documents. If None, it defaults to
|
||||||
|
# "<project> v<release> documentation".
|
||||||
|
#html_title = None
|
||||||
|
|
||||||
|
# A shorter title for the navigation bar. Default is the same as html_title.
|
||||||
|
#html_short_title = None
|
||||||
|
|
||||||
|
# The name of an image file (relative to this directory) to place at the top
|
||||||
|
# of the sidebar.
|
||||||
|
#html_logo = None
|
||||||
|
|
||||||
|
# The name of an image file (within the static path) to use as favicon of the
|
||||||
|
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
|
||||||
|
# pixels large.
|
||||||
|
#html_favicon = None
|
||||||
|
|
||||||
|
# Add any paths that contain custom static files (such as style sheets) here,
|
||||||
|
# relative to this directory. They are copied after the builtin static files,
|
||||||
|
# so a file named "default.css" will overwrite the builtin "default.css".
|
||||||
|
html_static_path = ['_static']
|
||||||
|
|
||||||
|
# Add any extra paths that contain custom files (such as robots.txt or
|
||||||
|
# .htaccess) here, relative to this directory. These files are copied
|
||||||
|
# directly to the root of the documentation.
|
||||||
|
#html_extra_path = []
|
||||||
|
|
||||||
|
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
|
||||||
|
# using the given strftime format.
|
||||||
|
#html_last_updated_fmt = '%b %d, %Y'
|
||||||
|
|
||||||
|
# If true, SmartyPants will be used to convert quotes and dashes to
|
||||||
|
# typographically correct entities.
|
||||||
|
#html_use_smartypants = True
|
||||||
|
|
||||||
|
# Custom sidebar templates, maps document names to template names.
|
||||||
|
#html_sidebars = {}
|
||||||
|
|
||||||
|
# Additional templates that should be rendered to pages, maps page names to
|
||||||
|
# template names.
|
||||||
|
#html_additional_pages = {}
|
||||||
|
|
||||||
|
# If false, no module index is generated.
|
||||||
|
#html_domain_indices = True
|
||||||
|
|
||||||
|
# If false, no index is generated.
|
||||||
|
#html_use_index = True
|
||||||
|
|
||||||
|
# If true, the index is split into individual pages for each letter.
|
||||||
|
#html_split_index = False
|
||||||
|
|
||||||
|
# If true, links to the reST sources are added to the pages.
|
||||||
|
#html_show_sourcelink = True
|
||||||
|
|
||||||
|
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
|
||||||
|
#html_show_sphinx = True
|
||||||
|
|
||||||
|
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
|
||||||
|
#html_show_copyright = True
|
||||||
|
|
||||||
|
# If true, an OpenSearch description file will be output, and all pages will
|
||||||
|
# contain a <link> tag referring to it. The value of this option must be the
|
||||||
|
# base URL from which the finished HTML is served.
|
||||||
|
#html_use_opensearch = ''
|
||||||
|
|
||||||
|
# This is the file name suffix for HTML files (e.g. ".xhtml").
|
||||||
|
#html_file_suffix = None
|
||||||
|
|
||||||
|
# Output file base name for HTML help builder.
|
||||||
|
htmlhelp_basename = 'kafka-pythondoc'
|
||||||
|
|
||||||
|
|
||||||
|
# -- Options for LaTeX output ---------------------------------------------
|
||||||
|
|
||||||
|
latex_elements = {
|
||||||
|
# The paper size ('letterpaper' or 'a4paper').
|
||||||
|
#'papersize': 'letterpaper',
|
||||||
|
|
||||||
|
# The font size ('10pt', '11pt' or '12pt').
|
||||||
|
#'pointsize': '10pt',
|
||||||
|
|
||||||
|
# Additional stuff for the LaTeX preamble.
|
||||||
|
#'preamble': '',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Grouping the document tree into LaTeX files. List of tuples
|
||||||
|
# (source start file, target name, title,
|
||||||
|
# author, documentclass [howto, manual, or own class]).
|
||||||
|
latex_documents = [
|
||||||
|
('index', 'kafka-python.tex', u'kafka-python Documentation',
|
||||||
|
u'David Arthur', 'manual'),
|
||||||
|
]
|
||||||
|
|
||||||
|
# The name of an image file (relative to this directory) to place at the top of
|
||||||
|
# the title page.
|
||||||
|
#latex_logo = None
|
||||||
|
|
||||||
|
# For "manual" documents, if this is true, then toplevel headings are parts,
|
||||||
|
# not chapters.
|
||||||
|
#latex_use_parts = False
|
||||||
|
|
||||||
|
# If true, show page references after internal links.
|
||||||
|
#latex_show_pagerefs = False
|
||||||
|
|
||||||
|
# If true, show URL addresses after external links.
|
||||||
|
#latex_show_urls = False
|
||||||
|
|
||||||
|
# Documents to append as an appendix to all manuals.
|
||||||
|
#latex_appendices = []
|
||||||
|
|
||||||
|
# If false, no module index is generated.
|
||||||
|
#latex_domain_indices = True
|
||||||
|
|
||||||
|
|
||||||
|
# -- Options for manual page output ---------------------------------------
|
||||||
|
|
||||||
|
# One entry per manual page. List of tuples
|
||||||
|
# (source start file, name, description, authors, manual section).
|
||||||
|
man_pages = [
|
||||||
|
('index', 'kafka-python', u'kafka-python Documentation',
|
||||||
|
[u'David Arthur'], 1)
|
||||||
|
]
|
||||||
|
|
||||||
|
# If true, show URL addresses after external links.
|
||||||
|
#man_show_urls = False
|
||||||
|
|
||||||
|
|
||||||
|
# -- Options for Texinfo output -------------------------------------------
|
||||||
|
|
||||||
|
# Grouping the document tree into Texinfo files. List of tuples
|
||||||
|
# (source start file, target name, title, author,
|
||||||
|
# dir menu entry, description, category)
|
||||||
|
texinfo_documents = [
|
||||||
|
('index', 'kafka-python', u'kafka-python Documentation',
|
||||||
|
u'David Arthur', 'kafka-python', 'One line description of project.',
|
||||||
|
'Miscellaneous'),
|
||||||
|
]
|
||||||
|
|
||||||
|
# Documents to append as an appendix to all manuals.
|
||||||
|
#texinfo_appendices = []
|
||||||
|
|
||||||
|
# If false, no module index is generated.
|
||||||
|
#texinfo_domain_indices = True
|
||||||
|
|
||||||
|
# How to display URL addresses: 'footnote', 'no', or 'inline'.
|
||||||
|
#texinfo_show_urls = 'footnote'
|
||||||
|
|
||||||
|
# If true, do not generate a @detailmenu in the "Top" node's menu.
|
||||||
|
#texinfo_no_detailmenu = False
|
||||||
59
docs/index.rst
Normal file
59
docs/index.rst
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
|
||||||
|
kafka-python
|
||||||
|
============
|
||||||
|
|
||||||
|
This module provides low-level protocol support for Apache Kafka as well as
|
||||||
|
high-level consumer and producer classes. Request batching is supported by the
|
||||||
|
protocol as well as broker-aware request routing. Gzip and Snappy compression
|
||||||
|
is also supported for message sets.
|
||||||
|
|
||||||
|
http://kafka.apache.org/
|
||||||
|
|
||||||
|
On Freenode IRC at #kafka-python, as well as #apache-kafka
|
||||||
|
|
||||||
|
For general discussion of kafka-client design and implementation (not python specific),
|
||||||
|
see https://groups.google.com/forum/m/#!forum/kafka-clients
|
||||||
|
|
||||||
|
Status
|
||||||
|
------
|
||||||
|
|
||||||
|
The current stable version of this package is `0.9.2 <https://github.com/mumrah/kafka-python/releases/tag/v0.9.2>`_ and is compatible with:
|
||||||
|
|
||||||
|
Kafka broker versions
|
||||||
|
|
||||||
|
* 0.8.0
|
||||||
|
* 0.8.1
|
||||||
|
* 0.8.1.1
|
||||||
|
|
||||||
|
Python versions
|
||||||
|
|
||||||
|
* 2.6 (tested on 2.6.9)
|
||||||
|
* 2.7 (tested on 2.7.8)
|
||||||
|
* pypy (tested on pypy 2.3.1 / python 2.7.6)
|
||||||
|
* (Python 3.3 and 3.4 support has been added to trunk and will be available the next release)
|
||||||
|
|
||||||
|
License
|
||||||
|
-------
|
||||||
|
|
||||||
|
Copyright 2014, David Arthur under Apache License, v2.0. See `LICENSE <https://github.com/mumrah/kafka-python/blob/master/LICENSE>`_.
|
||||||
|
|
||||||
|
|
||||||
|
Contents
|
||||||
|
--------
|
||||||
|
|
||||||
|
.. toctree::
|
||||||
|
:maxdepth: 2
|
||||||
|
|
||||||
|
install
|
||||||
|
tests
|
||||||
|
usage
|
||||||
|
api_reference
|
||||||
|
|
||||||
|
|
||||||
|
Indices and tables
|
||||||
|
==================
|
||||||
|
|
||||||
|
* :ref:`genindex`
|
||||||
|
* :ref:`modindex`
|
||||||
|
* :ref:`search`
|
||||||
|
|
||||||
79
docs/install.rst
Normal file
79
docs/install.rst
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
Install
|
||||||
|
=======
|
||||||
|
|
||||||
|
Install with your favorite package manager
|
||||||
|
|
||||||
|
Latest Release
|
||||||
|
--------------
|
||||||
|
Pip:
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
pip install kafka-python
|
||||||
|
|
||||||
|
Releases are also listed at https://github.com/mumrah/kafka-python/releases
|
||||||
|
|
||||||
|
|
||||||
|
Bleeding-Edge
|
||||||
|
-------------
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
git clone https://github.com/mumrah/kafka-python
|
||||||
|
pip install ./kafka-python
|
||||||
|
|
||||||
|
Setuptools:
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
git clone https://github.com/mumrah/kafka-python
|
||||||
|
easy_install ./kafka-python
|
||||||
|
|
||||||
|
Using `setup.py` directly:
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
git clone https://github.com/mumrah/kafka-python
|
||||||
|
cd kafka-python
|
||||||
|
python setup.py install
|
||||||
|
|
||||||
|
|
||||||
|
Optional Snappy install
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
Install Development Libraries
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Download and build Snappy from http://code.google.com/p/snappy/downloads/list
|
||||||
|
|
||||||
|
Ubuntu:
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
apt-get install libsnappy-dev
|
||||||
|
|
||||||
|
OSX:
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
brew install snappy
|
||||||
|
|
||||||
|
From Source:
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz
|
||||||
|
tar xzvf snappy-1.0.5.tar.gz
|
||||||
|
cd snappy-1.0.5
|
||||||
|
./configure
|
||||||
|
make
|
||||||
|
sudo make install
|
||||||
|
|
||||||
|
Install Python Module
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Install the `python-snappy` module
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
pip install python-snappy
|
||||||
242
docs/make.bat
Normal file
242
docs/make.bat
Normal file
@@ -0,0 +1,242 @@
|
|||||||
|
@ECHO OFF
|
||||||
|
|
||||||
|
REM Command file for Sphinx documentation
|
||||||
|
|
||||||
|
if "%SPHINXBUILD%" == "" (
|
||||||
|
set SPHINXBUILD=sphinx-build
|
||||||
|
)
|
||||||
|
set BUILDDIR=_build
|
||||||
|
set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% .
|
||||||
|
set I18NSPHINXOPTS=%SPHINXOPTS% .
|
||||||
|
if NOT "%PAPER%" == "" (
|
||||||
|
set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS%
|
||||||
|
set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS%
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "" goto help
|
||||||
|
|
||||||
|
if "%1" == "help" (
|
||||||
|
:help
|
||||||
|
echo.Please use `make ^<target^>` where ^<target^> is one of
|
||||||
|
echo. html to make standalone HTML files
|
||||||
|
echo. dirhtml to make HTML files named index.html in directories
|
||||||
|
echo. singlehtml to make a single large HTML file
|
||||||
|
echo. pickle to make pickle files
|
||||||
|
echo. json to make JSON files
|
||||||
|
echo. htmlhelp to make HTML files and a HTML help project
|
||||||
|
echo. qthelp to make HTML files and a qthelp project
|
||||||
|
echo. devhelp to make HTML files and a Devhelp project
|
||||||
|
echo. epub to make an epub
|
||||||
|
echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter
|
||||||
|
echo. text to make text files
|
||||||
|
echo. man to make manual pages
|
||||||
|
echo. texinfo to make Texinfo files
|
||||||
|
echo. gettext to make PO message catalogs
|
||||||
|
echo. changes to make an overview over all changed/added/deprecated items
|
||||||
|
echo. xml to make Docutils-native XML files
|
||||||
|
echo. pseudoxml to make pseudoxml-XML files for display purposes
|
||||||
|
echo. linkcheck to check all external links for integrity
|
||||||
|
echo. doctest to run all doctests embedded in the documentation if enabled
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "clean" (
|
||||||
|
for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i
|
||||||
|
del /q /s %BUILDDIR%\*
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
%SPHINXBUILD% 2> nul
|
||||||
|
if errorlevel 9009 (
|
||||||
|
echo.
|
||||||
|
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
|
||||||
|
echo.installed, then set the SPHINXBUILD environment variable to point
|
||||||
|
echo.to the full path of the 'sphinx-build' executable. Alternatively you
|
||||||
|
echo.may add the Sphinx directory to PATH.
|
||||||
|
echo.
|
||||||
|
echo.If you don't have Sphinx installed, grab it from
|
||||||
|
echo.http://sphinx-doc.org/
|
||||||
|
exit /b 1
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "html" (
|
||||||
|
%SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The HTML pages are in %BUILDDIR%/html.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "dirhtml" (
|
||||||
|
%SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "singlehtml" (
|
||||||
|
%SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "pickle" (
|
||||||
|
%SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished; now you can process the pickle files.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "json" (
|
||||||
|
%SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished; now you can process the JSON files.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "htmlhelp" (
|
||||||
|
%SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished; now you can run HTML Help Workshop with the ^
|
||||||
|
.hhp project file in %BUILDDIR%/htmlhelp.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "qthelp" (
|
||||||
|
%SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished; now you can run "qcollectiongenerator" with the ^
|
||||||
|
.qhcp project file in %BUILDDIR%/qthelp, like this:
|
||||||
|
echo.^> qcollectiongenerator %BUILDDIR%\qthelp\kafka-python.qhcp
|
||||||
|
echo.To view the help file:
|
||||||
|
echo.^> assistant -collectionFile %BUILDDIR%\qthelp\kafka-python.ghc
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "devhelp" (
|
||||||
|
%SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "epub" (
|
||||||
|
%SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The epub file is in %BUILDDIR%/epub.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "latex" (
|
||||||
|
%SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished; the LaTeX files are in %BUILDDIR%/latex.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "latexpdf" (
|
||||||
|
%SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
|
||||||
|
cd %BUILDDIR%/latex
|
||||||
|
make all-pdf
|
||||||
|
cd %BUILDDIR%/..
|
||||||
|
echo.
|
||||||
|
echo.Build finished; the PDF files are in %BUILDDIR%/latex.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "latexpdfja" (
|
||||||
|
%SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
|
||||||
|
cd %BUILDDIR%/latex
|
||||||
|
make all-pdf-ja
|
||||||
|
cd %BUILDDIR%/..
|
||||||
|
echo.
|
||||||
|
echo.Build finished; the PDF files are in %BUILDDIR%/latex.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "text" (
|
||||||
|
%SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The text files are in %BUILDDIR%/text.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "man" (
|
||||||
|
%SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The manual pages are in %BUILDDIR%/man.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "texinfo" (
|
||||||
|
%SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "gettext" (
|
||||||
|
%SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The message catalogs are in %BUILDDIR%/locale.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "changes" (
|
||||||
|
%SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.The overview file is in %BUILDDIR%/changes.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "linkcheck" (
|
||||||
|
%SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Link check complete; look for any errors in the above output ^
|
||||||
|
or in %BUILDDIR%/linkcheck/output.txt.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "doctest" (
|
||||||
|
%SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Testing of doctests in the sources finished, look at the ^
|
||||||
|
results in %BUILDDIR%/doctest/output.txt.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "xml" (
|
||||||
|
%SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The XML files are in %BUILDDIR%/xml.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
if "%1" == "pseudoxml" (
|
||||||
|
%SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml
|
||||||
|
if errorlevel 1 exit /b 1
|
||||||
|
echo.
|
||||||
|
echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml.
|
||||||
|
goto end
|
||||||
|
)
|
||||||
|
|
||||||
|
:end
|
||||||
7
docs/requirements.txt
Normal file
7
docs/requirements.txt
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
sphinx
|
||||||
|
sphinxcontrib-napoleon
|
||||||
|
|
||||||
|
# Install kafka-python in editable mode
|
||||||
|
# This allows the sphinx autodoc module
|
||||||
|
# to load the Python modules and extract docstrings.
|
||||||
|
-e ..
|
||||||
59
docs/tests.rst
Normal file
59
docs/tests.rst
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
Tests
|
||||||
|
=====
|
||||||
|
|
||||||
|
Run the unit tests
|
||||||
|
------------------
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
tox
|
||||||
|
|
||||||
|
|
||||||
|
Run a subset of unit tests
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
# run protocol tests only
|
||||||
|
tox -- -v test.test_protocol
|
||||||
|
|
||||||
|
# test with pypy only
|
||||||
|
tox -e pypy
|
||||||
|
|
||||||
|
# Run only 1 test, and use python 2.7
|
||||||
|
tox -e py27 -- -v --with-id --collect-only
|
||||||
|
|
||||||
|
# pick a test number from the list like #102
|
||||||
|
tox -e py27 -- -v --with-id 102
|
||||||
|
|
||||||
|
|
||||||
|
Run the integration tests
|
||||||
|
-------------------------
|
||||||
|
|
||||||
|
The integration tests will actually start up real local Zookeeper
|
||||||
|
instance and Kafka brokers, and send messages in using the client.
|
||||||
|
|
||||||
|
First, get the kafka binaries for integration testing:
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
./build_integration.sh
|
||||||
|
|
||||||
|
By default, the build_integration.sh script will download binary
|
||||||
|
distributions for all supported kafka versions.
|
||||||
|
To test against the latest source build, set KAFKA_VERSION=trunk
|
||||||
|
and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh
|
||||||
|
|
||||||
|
Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION`
|
||||||
|
env variable to the server build you want to use for testing:
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
KAFKA_VERSION=0.8.0 tox
|
||||||
|
KAFKA_VERSION=0.8.1 tox
|
||||||
|
KAFKA_VERSION=0.8.1.1 tox
|
||||||
|
KAFKA_VERSION=trunk tox
|
||||||
122
docs/usage.rst
Normal file
122
docs/usage.rst
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
Usage
|
||||||
|
=====
|
||||||
|
|
||||||
|
High level
|
||||||
|
----------
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
|
from kafka import KafkaClient, SimpleProducer, SimpleConsumer
|
||||||
|
|
||||||
|
# To send messages synchronously
|
||||||
|
kafka = KafkaClient("localhost:9092")
|
||||||
|
producer = SimpleProducer(kafka)
|
||||||
|
|
||||||
|
# Note that the application is responsible for encoding messages to type str
|
||||||
|
producer.send_messages("my-topic", "some message")
|
||||||
|
producer.send_messages("my-topic", "this method", "is variadic")
|
||||||
|
|
||||||
|
# Send unicode message
|
||||||
|
producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
|
||||||
|
|
||||||
|
# To send messages asynchronously
|
||||||
|
# WARNING: current implementation does not guarantee message delivery on failure!
|
||||||
|
# messages can get dropped! Use at your own risk! Or help us improve with a PR!
|
||||||
|
producer = SimpleProducer(kafka, async=True)
|
||||||
|
producer.send_messages("my-topic", "async message")
|
||||||
|
|
||||||
|
# To wait for acknowledgements
|
||||||
|
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
|
||||||
|
# a local log before sending response
|
||||||
|
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
|
||||||
|
# by all in sync replicas before sending a response
|
||||||
|
producer = SimpleProducer(kafka, async=False,
|
||||||
|
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
|
||||||
|
ack_timeout=2000)
|
||||||
|
|
||||||
|
response = producer.send_messages("my-topic", "another message")
|
||||||
|
|
||||||
|
if response:
|
||||||
|
print(response[0].error)
|
||||||
|
print(response[0].offset)
|
||||||
|
|
||||||
|
# To send messages in batch. You can use any of the available
|
||||||
|
# producers for doing this. The following producer will collect
|
||||||
|
# messages in batch and send them to Kafka after 20 messages are
|
||||||
|
# collected or every 60 seconds
|
||||||
|
# Notes:
|
||||||
|
# * If the producer dies before the messages are sent, there will be losses
|
||||||
|
# * Call producer.stop() to send the messages and cleanup
|
||||||
|
producer = SimpleProducer(kafka, batch_send=True,
|
||||||
|
batch_send_every_n=20,
|
||||||
|
batch_send_every_t=60)
|
||||||
|
|
||||||
|
# To consume messages
|
||||||
|
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
|
||||||
|
for message in consumer:
|
||||||
|
# message is raw byte string -- decode if necessary!
|
||||||
|
# e.g., for unicode: `message.decode('utf-8')`
|
||||||
|
print(message)
|
||||||
|
|
||||||
|
kafka.close()
|
||||||
|
|
||||||
|
|
||||||
|
Keyed messages
|
||||||
|
--------------
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
|
from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner
|
||||||
|
|
||||||
|
kafka = KafkaClient("localhost:9092")
|
||||||
|
|
||||||
|
# HashedPartitioner is default
|
||||||
|
producer = KeyedProducer(kafka)
|
||||||
|
producer.send("my-topic", "key1", "some message")
|
||||||
|
producer.send("my-topic", "key2", "this methode")
|
||||||
|
|
||||||
|
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
|
||||||
|
|
||||||
|
|
||||||
|
Multiprocess consumer
|
||||||
|
---------------------
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
|
from kafka import KafkaClient, MultiProcessConsumer
|
||||||
|
|
||||||
|
kafka = KafkaClient("localhost:9092")
|
||||||
|
|
||||||
|
# This will split the number of partitions among two processes
|
||||||
|
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
|
||||||
|
|
||||||
|
# This will spawn processes such that each handles 2 partitions max
|
||||||
|
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
|
||||||
|
partitions_per_proc=2)
|
||||||
|
|
||||||
|
for message in consumer:
|
||||||
|
print(message)
|
||||||
|
|
||||||
|
for message in consumer.get_messages(count=5, block=True, timeout=4):
|
||||||
|
print(message)
|
||||||
|
|
||||||
|
Low level
|
||||||
|
---------
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
|
from kafka import KafkaClient, create_message
|
||||||
|
from kafka.protocol import KafkaProtocol
|
||||||
|
from kafka.common import ProduceRequest
|
||||||
|
|
||||||
|
kafka = KafkaClient("localhost:9092")
|
||||||
|
|
||||||
|
req = ProduceRequest(topic="my-topic", partition=1,
|
||||||
|
messages=[create_message("some message")])
|
||||||
|
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
|
||||||
|
kafka.close()
|
||||||
|
|
||||||
|
resps[0].topic # "my-topic"
|
||||||
|
resps[0].partition # 1
|
||||||
|
resps[0].error # 0 (hopefully)
|
||||||
|
resps[0].offset # offset of the first message sent in this request
|
||||||
@@ -131,19 +131,21 @@ class KafkaClient(object):
|
|||||||
the leader broker for that partition using the supplied encode/decode
|
the leader broker for that partition using the supplied encode/decode
|
||||||
functions
|
functions
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
|
||||||
payloads: list of object-like entities with a topic (str) and
|
payloads: list of object-like entities with a topic (str) and
|
||||||
partition (int) attribute
|
partition (int) attribute
|
||||||
encode_fn: a method to encode the list of payloads to a request body,
|
|
||||||
must accept client_id, correlation_id, and payloads as
|
encode_fn: a method to encode the list of payloads to a request body,
|
||||||
keyword arguments
|
must accept client_id, correlation_id, and payloads as
|
||||||
decode_fn: a method to decode a response body into response objects.
|
keyword arguments
|
||||||
The response objects must be object-like and have topic
|
|
||||||
and partition attributes
|
decode_fn: a method to decode a response body into response objects.
|
||||||
|
The response objects must be object-like and have topic
|
||||||
|
and partition attributes
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
Return
|
|
||||||
======
|
|
||||||
List of response objects in the same order as the supplied payloads
|
List of response objects in the same order as the supplied payloads
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -285,9 +287,9 @@ class KafkaClient(object):
|
|||||||
|
|
||||||
This method should be called after receiving any error
|
This method should be called after receiving any error
|
||||||
|
|
||||||
@param: *topics (optional)
|
Arguments:
|
||||||
If a list of topics is provided, the metadata refresh will be limited
|
*topics (optional): If a list of topics is provided,
|
||||||
to the specified topics only.
|
the metadata refresh will be limited to the specified topics only.
|
||||||
|
|
||||||
Exceptions:
|
Exceptions:
|
||||||
----------
|
----------
|
||||||
@@ -384,18 +386,16 @@ class KafkaClient(object):
|
|||||||
sent to a specific broker. Output is a list of responses in the
|
sent to a specific broker. Output is a list of responses in the
|
||||||
same order as the list of payloads specified
|
same order as the list of payloads specified
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
payloads: list of ProduceRequest
|
||||||
payloads: list of ProduceRequest
|
fail_on_error: boolean, should we raise an Exception if we
|
||||||
fail_on_error: boolean, should we raise an Exception if we
|
encounter an API error?
|
||||||
encounter an API error?
|
callback: function, instead of returning the ProduceResponse,
|
||||||
callback: function, instead of returning the ProduceResponse,
|
first pass it through this function
|
||||||
first pass it through this function
|
|
||||||
|
|
||||||
Return
|
Returns:
|
||||||
======
|
list of ProduceResponse or callback(ProduceResponse), in the
|
||||||
list of ProduceResponse or callback(ProduceResponse), in the
|
order of input payloads
|
||||||
order of input payloads
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
encoder = functools.partial(
|
encoder = functools.partial(
|
||||||
|
|||||||
@@ -47,10 +47,11 @@ class KafkaConnection(local):
|
|||||||
we can do something in here to facilitate multiplexed requests/responses
|
we can do something in here to facilitate multiplexed requests/responses
|
||||||
since the Kafka API includes a correlation id.
|
since the Kafka API includes a correlation id.
|
||||||
|
|
||||||
host: the host name or IP address of a kafka broker
|
Arguments:
|
||||||
port: the port number the kafka broker is listening on
|
host: the host name or IP address of a kafka broker
|
||||||
timeout: default 120. The socket timeout for sending and receiving data
|
port: the port number the kafka broker is listening on
|
||||||
in seconds. None means no timeout, so a request can block forever.
|
timeout: default 120. The socket timeout for sending and receiving data
|
||||||
|
in seconds. None means no timeout, so a request can block forever.
|
||||||
"""
|
"""
|
||||||
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
|
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
|
||||||
super(KafkaConnection, self).__init__()
|
super(KafkaConnection, self).__init__()
|
||||||
@@ -116,8 +117,10 @@ class KafkaConnection(local):
|
|||||||
def send(self, request_id, payload):
|
def send(self, request_id, payload):
|
||||||
"""
|
"""
|
||||||
Send a request to Kafka
|
Send a request to Kafka
|
||||||
param: request_id -- can be any int (used only for debug logging...)
|
|
||||||
param: payload -- an encoded kafka packet (see KafkaProtocol)
|
Arguments::
|
||||||
|
request_id (int): can be any int (used only for debug logging...)
|
||||||
|
payload: an encoded kafka packet (see KafkaProtocol)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
|
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
|
||||||
@@ -135,8 +138,12 @@ class KafkaConnection(local):
|
|||||||
def recv(self, request_id):
|
def recv(self, request_id):
|
||||||
"""
|
"""
|
||||||
Get a response packet from Kafka
|
Get a response packet from Kafka
|
||||||
param: request_id -- can be any int (only used for debug logging...)
|
|
||||||
returns encoded kafka packet response from server as type str
|
Arguments:
|
||||||
|
request_id: can be any int (only used for debug logging...)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Encoded kafka packet response from server
|
||||||
"""
|
"""
|
||||||
log.debug("Reading response %d from Kafka" % request_id)
|
log.debug("Reading response %d from Kafka" % request_id)
|
||||||
|
|
||||||
|
|||||||
@@ -32,9 +32,11 @@ class Consumer(object):
|
|||||||
Base class to be used by other consumers. Not to be used directly
|
Base class to be used by other consumers. Not to be used directly
|
||||||
|
|
||||||
This base class provides logic for
|
This base class provides logic for
|
||||||
|
|
||||||
* initialization and fetching metadata of partitions
|
* initialization and fetching metadata of partitions
|
||||||
* Auto-commit logic
|
* Auto-commit logic
|
||||||
* APIs for fetching pending message count
|
* APIs for fetching pending message count
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
|
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
|
||||||
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
||||||
@@ -93,8 +95,9 @@ class Consumer(object):
|
|||||||
"""
|
"""
|
||||||
Commit offsets for this consumer
|
Commit offsets for this consumer
|
||||||
|
|
||||||
partitions: list of partitions to commit, default is to commit
|
Keyword Arguments:
|
||||||
all of them
|
partitions (list): list of partitions to commit, default is to commit
|
||||||
|
all of them
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# short circuit if nothing happened. This check is kept outside
|
# short circuit if nothing happened. This check is kept outside
|
||||||
@@ -148,7 +151,8 @@ class Consumer(object):
|
|||||||
"""
|
"""
|
||||||
Gets the pending message count
|
Gets the pending message count
|
||||||
|
|
||||||
partitions: list of partitions to check for, default is to check all
|
Keyword Arguments:
|
||||||
|
partitions (list): list of partitions to check for, default is to check all
|
||||||
"""
|
"""
|
||||||
if not partitions:
|
if not partitions:
|
||||||
partitions = self.offsets.keys()
|
partitions = self.offsets.keys()
|
||||||
|
|||||||
@@ -54,72 +54,78 @@ class KafkaConsumer(object):
|
|||||||
"""
|
"""
|
||||||
A simpler kafka consumer
|
A simpler kafka consumer
|
||||||
|
|
||||||
```
|
.. code:: python
|
||||||
# A very basic 'tail' consumer, with no stored offset management
|
|
||||||
kafka = KafkaConsumer('topic1')
|
|
||||||
for m in kafka:
|
|
||||||
print m
|
|
||||||
|
|
||||||
# Alternate interface: next()
|
# A very basic 'tail' consumer, with no stored offset management
|
||||||
print kafka.next()
|
kafka = KafkaConsumer('topic1')
|
||||||
|
for m in kafka:
|
||||||
|
print m
|
||||||
|
|
||||||
# Alternate interface: batch iteration
|
# Alternate interface: next()
|
||||||
while True:
|
print kafka.next()
|
||||||
for m in kafka.fetch_messages():
|
|
||||||
print m
|
|
||||||
print "Done with batch - let's do another!"
|
|
||||||
```
|
|
||||||
|
|
||||||
```
|
# Alternate interface: batch iteration
|
||||||
# more advanced consumer -- multiple topics w/ auto commit offset management
|
while True:
|
||||||
kafka = KafkaConsumer('topic1', 'topic2',
|
for m in kafka.fetch_messages():
|
||||||
group_id='my_consumer_group',
|
print m
|
||||||
auto_commit_enable=True,
|
print "Done with batch - let's do another!"
|
||||||
auto_commit_interval_ms=30 * 1000,
|
|
||||||
auto_offset_reset='smallest')
|
|
||||||
|
|
||||||
# Infinite iteration
|
|
||||||
for m in kafka:
|
|
||||||
process_message(m)
|
|
||||||
kafka.task_done(m)
|
|
||||||
|
|
||||||
# Alternate interface: next()
|
.. code:: python
|
||||||
m = kafka.next()
|
|
||||||
process_message(m)
|
|
||||||
kafka.task_done(m)
|
|
||||||
|
|
||||||
# If auto_commit_enable is False, remember to commit() periodically
|
# more advanced consumer -- multiple topics w/ auto commit offset management
|
||||||
kafka.commit()
|
kafka = KafkaConsumer('topic1', 'topic2',
|
||||||
|
group_id='my_consumer_group',
|
||||||
|
auto_commit_enable=True,
|
||||||
|
auto_commit_interval_ms=30 * 1000,
|
||||||
|
auto_offset_reset='smallest')
|
||||||
|
|
||||||
# Batch process interface
|
# Infinite iteration
|
||||||
while True:
|
for m in kafka:
|
||||||
for m in kafka.fetch_messages():
|
process_message(m)
|
||||||
|
kafka.task_done(m)
|
||||||
|
|
||||||
|
# Alternate interface: next()
|
||||||
|
m = kafka.next()
|
||||||
process_message(m)
|
process_message(m)
|
||||||
kafka.task_done(m)
|
kafka.task_done(m)
|
||||||
```
|
|
||||||
|
# If auto_commit_enable is False, remember to commit() periodically
|
||||||
|
kafka.commit()
|
||||||
|
|
||||||
|
# Batch process interface
|
||||||
|
while True:
|
||||||
|
for m in kafka.fetch_messages():
|
||||||
|
process_message(m)
|
||||||
|
kafka.task_done(m)
|
||||||
|
|
||||||
|
|
||||||
messages (m) are namedtuples with attributes:
|
messages (m) are namedtuples with attributes:
|
||||||
m.topic: topic name (str)
|
|
||||||
m.partition: partition number (int)
|
* `m.topic`: topic name (str)
|
||||||
m.offset: message offset on topic-partition log (int)
|
* `m.partition`: partition number (int)
|
||||||
m.key: key (bytes - can be None)
|
* `m.offset`: message offset on topic-partition log (int)
|
||||||
m.value: message (output of deserializer_class - default is raw bytes)
|
* `m.key`: key (bytes - can be None)
|
||||||
|
* `m.value`: message (output of deserializer_class - default is raw bytes)
|
||||||
|
|
||||||
Configuration settings can be passed to constructor,
|
Configuration settings can be passed to constructor,
|
||||||
otherwise defaults will be used:
|
otherwise defaults will be used:
|
||||||
client_id='kafka.consumer.kafka',
|
|
||||||
group_id=None,
|
.. code:: python
|
||||||
fetch_message_max_bytes=1024*1024,
|
|
||||||
fetch_min_bytes=1,
|
client_id='kafka.consumer.kafka',
|
||||||
fetch_wait_max_ms=100,
|
group_id=None,
|
||||||
refresh_leader_backoff_ms=200,
|
fetch_message_max_bytes=1024*1024,
|
||||||
metadata_broker_list=None,
|
fetch_min_bytes=1,
|
||||||
socket_timeout_ms=30*1000,
|
fetch_wait_max_ms=100,
|
||||||
auto_offset_reset='largest',
|
refresh_leader_backoff_ms=200,
|
||||||
deserializer_class=lambda msg: msg,
|
metadata_broker_list=None,
|
||||||
auto_commit_enable=False,
|
socket_timeout_ms=30*1000,
|
||||||
auto_commit_interval_ms=60 * 1000,
|
auto_offset_reset='largest',
|
||||||
consumer_timeout_ms=-1
|
deserializer_class=lambda msg: msg,
|
||||||
|
auto_commit_enable=False,
|
||||||
|
auto_commit_interval_ms=60 * 1000,
|
||||||
|
consumer_timeout_ms=-1
|
||||||
|
|
||||||
Configuration parameters are described in more detail at
|
Configuration parameters are described in more detail at
|
||||||
http://kafka.apache.org/documentation.html#highlevelconsumerapi
|
http://kafka.apache.org/documentation.html#highlevelconsumerapi
|
||||||
@@ -133,6 +139,9 @@ class KafkaConsumer(object):
|
|||||||
"""
|
"""
|
||||||
Configuration settings can be passed to constructor,
|
Configuration settings can be passed to constructor,
|
||||||
otherwise defaults will be used:
|
otherwise defaults will be used:
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
client_id='kafka.consumer.kafka',
|
client_id='kafka.consumer.kafka',
|
||||||
group_id=None,
|
group_id=None,
|
||||||
fetch_message_max_bytes=1024*1024,
|
fetch_message_max_bytes=1024*1024,
|
||||||
@@ -189,28 +198,35 @@ class KafkaConsumer(object):
|
|||||||
Optionally specify offsets to start from
|
Optionally specify offsets to start from
|
||||||
|
|
||||||
Accepts types:
|
Accepts types:
|
||||||
str (utf-8): topic name (will consume all available partitions)
|
|
||||||
tuple: (topic, partition)
|
* str (utf-8): topic name (will consume all available partitions)
|
||||||
dict: { topic: partition }
|
* tuple: (topic, partition)
|
||||||
{ topic: [partition list] }
|
* dict:
|
||||||
{ topic: (partition tuple,) }
|
- { topic: partition }
|
||||||
|
- { topic: [partition list] }
|
||||||
|
- { topic: (partition tuple,) }
|
||||||
|
|
||||||
Optionally, offsets can be specified directly:
|
Optionally, offsets can be specified directly:
|
||||||
tuple: (topic, partition, offset)
|
|
||||||
dict: { (topic, partition): offset, ... }
|
|
||||||
|
|
||||||
Ex:
|
* tuple: (topic, partition, offset)
|
||||||
kafka = KafkaConsumer()
|
* dict: { (topic, partition): offset, ... }
|
||||||
|
|
||||||
# Consume topic1-all; topic2-partition2; topic3-partition0
|
Example:
|
||||||
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
|
|
||||||
|
|
||||||
# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
|
.. code:: python
|
||||||
# using tuples --
|
|
||||||
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
|
kafka = KafkaConsumer()
|
||||||
|
|
||||||
|
# Consume topic1-all; topic2-partition2; topic3-partition0
|
||||||
|
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
|
||||||
|
|
||||||
|
# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
|
||||||
|
# using tuples --
|
||||||
|
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
|
||||||
|
|
||||||
|
# using dict --
|
||||||
|
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
|
||||||
|
|
||||||
# using dict --
|
|
||||||
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
|
|
||||||
"""
|
"""
|
||||||
self._topics = []
|
self._topics = []
|
||||||
self._client.load_metadata_for_topics()
|
self._client.load_metadata_for_topics()
|
||||||
@@ -309,10 +325,12 @@ class KafkaConsumer(object):
|
|||||||
Otherwise blocks indefinitely
|
Otherwise blocks indefinitely
|
||||||
|
|
||||||
Note that this is also the method called internally during iteration:
|
Note that this is also the method called internally during iteration:
|
||||||
```
|
|
||||||
for m in consumer:
|
.. code:: python
|
||||||
pass
|
|
||||||
```
|
for m in consumer:
|
||||||
|
pass
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self._set_consumer_timeout_start()
|
self._set_consumer_timeout_start()
|
||||||
while True:
|
while True:
|
||||||
@@ -336,11 +354,12 @@ class KafkaConsumer(object):
|
|||||||
OffsetOutOfRange, per the configured `auto_offset_reset` policy
|
OffsetOutOfRange, per the configured `auto_offset_reset` policy
|
||||||
|
|
||||||
Key configuration parameters:
|
Key configuration parameters:
|
||||||
`fetch_message_max_bytes`
|
|
||||||
`fetch_max_wait_ms`
|
* `fetch_message_max_bytes`
|
||||||
`fetch_min_bytes`
|
* `fetch_max_wait_ms`
|
||||||
`deserializer_class`
|
* `fetch_min_bytes`
|
||||||
`auto_offset_reset`
|
* `deserializer_class`
|
||||||
|
* `auto_offset_reset`
|
||||||
"""
|
"""
|
||||||
|
|
||||||
max_bytes = self._config['fetch_message_max_bytes']
|
max_bytes = self._config['fetch_message_max_bytes']
|
||||||
@@ -418,20 +437,18 @@ class KafkaConsumer(object):
|
|||||||
"""
|
"""
|
||||||
Request available fetch offsets for a single topic/partition
|
Request available fetch offsets for a single topic/partition
|
||||||
|
|
||||||
@param topic (str)
|
Arguments:
|
||||||
@param partition (int)
|
topic (str)
|
||||||
@param request_time_ms (int) -- Used to ask for all messages before a
|
partition (int)
|
||||||
certain time (ms). There are two special
|
request_time_ms (int): Used to ask for all messages before a
|
||||||
values. Specify -1 to receive the latest
|
certain time (ms). There are two special values. Specify -1 to receive the latest
|
||||||
offset (i.e. the offset of the next coming
|
offset (i.e. the offset of the next coming message) and -2 to receive the earliest
|
||||||
message) and -2 to receive the earliest
|
available offset. Note that because offsets are pulled in descending order, asking for
|
||||||
available offset. Note that because offsets
|
the earliest offset will always return you a single element.
|
||||||
are pulled in descending order, asking for
|
max_num_offsets (int)
|
||||||
the earliest offset will always return you
|
|
||||||
a single element.
|
|
||||||
@param max_num_offsets (int)
|
|
||||||
|
|
||||||
@return offsets (list)
|
Returns:
|
||||||
|
offsets (list)
|
||||||
"""
|
"""
|
||||||
reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
|
reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
|
||||||
|
|
||||||
@@ -448,9 +465,12 @@ class KafkaConsumer(object):
|
|||||||
|
|
||||||
def offsets(self, group=None):
|
def offsets(self, group=None):
|
||||||
"""
|
"""
|
||||||
Returns a copy of internal offsets struct
|
Keyword Arguments:
|
||||||
optional param: group [fetch|commit|task_done|highwater]
|
group: Either "fetch", "commit", "task_done", or "highwater".
|
||||||
if no group specified, returns all groups
|
If no group specified, returns all groups.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A copy of internal offsets struct
|
||||||
"""
|
"""
|
||||||
if not group:
|
if not group:
|
||||||
return {
|
return {
|
||||||
@@ -498,8 +518,8 @@ class KafkaConsumer(object):
|
|||||||
Store consumed message offsets (marked via task_done())
|
Store consumed message offsets (marked via task_done())
|
||||||
to kafka cluster for this consumer_group.
|
to kafka cluster for this consumer_group.
|
||||||
|
|
||||||
Note -- this functionality requires server version >=0.8.1.1
|
**Note**: this functionality requires server version >=0.8.1.1
|
||||||
see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
|
See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_.
|
||||||
"""
|
"""
|
||||||
if not self._config['group_id']:
|
if not self._config['group_id']:
|
||||||
logger.warning('Cannot commit without a group_id!')
|
logger.warning('Cannot commit without a group_id!')
|
||||||
|
|||||||
@@ -80,19 +80,21 @@ class MultiProcessConsumer(Consumer):
|
|||||||
A consumer implementation that consumes partitions for a topic in
|
A consumer implementation that consumes partitions for a topic in
|
||||||
parallel using multiple processes
|
parallel using multiple processes
|
||||||
|
|
||||||
client: a connected KafkaClient
|
Arguments:
|
||||||
group: a name for this consumer, used for offset storage and must be unique
|
client: a connected KafkaClient
|
||||||
topic: the topic to consume
|
group: a name for this consumer, used for offset storage and must be unique
|
||||||
|
topic: the topic to consume
|
||||||
|
|
||||||
auto_commit: default True. Whether or not to auto commit the offsets
|
Keyword Arguments:
|
||||||
auto_commit_every_n: default 100. How many messages to consume
|
auto_commit: default True. Whether or not to auto commit the offsets
|
||||||
before a commit
|
auto_commit_every_n: default 100. How many messages to consume
|
||||||
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
before a commit
|
||||||
wait before commit
|
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
||||||
num_procs: Number of processes to start for consuming messages.
|
wait before commit
|
||||||
The available partitions will be divided among these processes
|
num_procs: Number of processes to start for consuming messages.
|
||||||
partitions_per_proc: Number of partitions to be allocated per process
|
The available partitions will be divided among these processes
|
||||||
(overrides num_procs)
|
partitions_per_proc: Number of partitions to be allocated per process
|
||||||
|
(overrides num_procs)
|
||||||
|
|
||||||
Auto commit details:
|
Auto commit details:
|
||||||
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
||||||
@@ -198,11 +200,12 @@ class MultiProcessConsumer(Consumer):
|
|||||||
"""
|
"""
|
||||||
Fetch the specified number of messages
|
Fetch the specified number of messages
|
||||||
|
|
||||||
count: Indicates the maximum number of messages to be fetched
|
Keyword Arguments:
|
||||||
block: If True, the API will block till some messages are fetched.
|
count: Indicates the maximum number of messages to be fetched
|
||||||
timeout: If block is True, the function will block for the specified
|
block: If True, the API will block till some messages are fetched.
|
||||||
time (in seconds) until count messages is fetched. If None,
|
timeout: If block is True, the function will block for the specified
|
||||||
it will block forever.
|
time (in seconds) until count messages is fetched. If None,
|
||||||
|
it will block forever.
|
||||||
"""
|
"""
|
||||||
messages = []
|
messages = []
|
||||||
|
|
||||||
|
|||||||
@@ -67,24 +67,32 @@ class SimpleConsumer(Consumer):
|
|||||||
A simple consumer implementation that consumes all/specified partitions
|
A simple consumer implementation that consumes all/specified partitions
|
||||||
for a topic
|
for a topic
|
||||||
|
|
||||||
client: a connected KafkaClient
|
Arguments:
|
||||||
group: a name for this consumer, used for offset storage and must be unique
|
client: a connected KafkaClient
|
||||||
topic: the topic to consume
|
group: a name for this consumer, used for offset storage and must be unique
|
||||||
partitions: An optional list of partitions to consume the data from
|
topic: the topic to consume
|
||||||
|
|
||||||
auto_commit: default True. Whether or not to auto commit the offsets
|
Keyword Arguments:
|
||||||
auto_commit_every_n: default 100. How many messages to consume
|
partitions: An optional list of partitions to consume the data from
|
||||||
before a commit
|
|
||||||
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
auto_commit: default True. Whether or not to auto commit the offsets
|
||||||
wait before commit
|
|
||||||
fetch_size_bytes: number of bytes to request in a FetchRequest
|
auto_commit_every_n: default 100. How many messages to consume
|
||||||
buffer_size: default 4K. Initial number of bytes to tell kafka we
|
before a commit
|
||||||
have available. This will double as needed.
|
|
||||||
max_buffer_size: default 16K. Max number of bytes to tell kafka we have
|
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
||||||
available. None means no limit.
|
wait before commit
|
||||||
iter_timeout: default None. How much time (in seconds) to wait for a
|
fetch_size_bytes: number of bytes to request in a FetchRequest
|
||||||
message in the iterator before exiting. None means no
|
|
||||||
timeout, so it will wait forever.
|
buffer_size: default 4K. Initial number of bytes to tell kafka we
|
||||||
|
have available. This will double as needed.
|
||||||
|
|
||||||
|
max_buffer_size: default 16K. Max number of bytes to tell kafka we have
|
||||||
|
available. None means no limit.
|
||||||
|
|
||||||
|
iter_timeout: default None. How much time (in seconds) to wait for a
|
||||||
|
message in the iterator before exiting. None means no
|
||||||
|
timeout, so it will wait forever.
|
||||||
|
|
||||||
Auto commit details:
|
Auto commit details:
|
||||||
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
||||||
@@ -133,11 +141,13 @@ class SimpleConsumer(Consumer):
|
|||||||
"""
|
"""
|
||||||
Alter the current offset in the consumer, similar to fseek
|
Alter the current offset in the consumer, similar to fseek
|
||||||
|
|
||||||
offset: how much to modify the offset
|
Arguments:
|
||||||
whence: where to modify it from
|
offset: how much to modify the offset
|
||||||
0 is relative to the earliest available offset (head)
|
whence: where to modify it from
|
||||||
1 is relative to the current offset
|
|
||||||
2 is relative to the latest known offset (tail)
|
* 0 is relative to the earliest available offset (head)
|
||||||
|
* 1 is relative to the current offset
|
||||||
|
* 2 is relative to the latest known offset (tail)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if whence == 1: # relative to current position
|
if whence == 1: # relative to current position
|
||||||
@@ -180,11 +190,12 @@ class SimpleConsumer(Consumer):
|
|||||||
"""
|
"""
|
||||||
Fetch the specified number of messages
|
Fetch the specified number of messages
|
||||||
|
|
||||||
count: Indicates the maximum number of messages to be fetched
|
Keyword Arguments:
|
||||||
block: If True, the API will block till some messages are fetched.
|
count: Indicates the maximum number of messages to be fetched
|
||||||
timeout: If block is True, the function will block for the specified
|
block: If True, the API will block till some messages are fetched.
|
||||||
time (in seconds) until count messages is fetched. If None,
|
timeout: If block is True, the function will block for the specified
|
||||||
it will block forever.
|
time (in seconds) until count messages is fetched. If None,
|
||||||
|
it will block forever.
|
||||||
"""
|
"""
|
||||||
messages = []
|
messages = []
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ class OffsetCommitContext(object):
|
|||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
|
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
|
||||||
consumer.provide_partition_info()
|
consumer.provide_partition_info()
|
||||||
consumer.fetch_last_known_offsets()
|
consumer.fetch_last_known_offsets()
|
||||||
@@ -57,7 +59,10 @@ class OffsetCommitContext(object):
|
|||||||
In order to know the current partition, it is helpful to initialize
|
In order to know the current partition, it is helpful to initialize
|
||||||
the consumer to provide partition info via:
|
the consumer to provide partition info via:
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
consumer.provide_partition_info()
|
consumer.provide_partition_info()
|
||||||
|
|
||||||
"""
|
"""
|
||||||
max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
|
max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,8 @@ class Partitioner(object):
|
|||||||
"""
|
"""
|
||||||
Initialize the partitioner
|
Initialize the partitioner
|
||||||
|
|
||||||
partitions - A list of available partitions (during startup)
|
Arguments:
|
||||||
|
partitions: A list of available partitions (during startup)
|
||||||
"""
|
"""
|
||||||
self.partitions = partitions
|
self.partitions = partitions
|
||||||
|
|
||||||
@@ -16,8 +17,9 @@ class Partitioner(object):
|
|||||||
Takes a string key and num_partitions as argument and returns
|
Takes a string key and num_partitions as argument and returns
|
||||||
a partition to be used for the message
|
a partition to be used for the message
|
||||||
|
|
||||||
partitions - The list of partitions is passed in every call. This
|
Arguments:
|
||||||
may look like an overhead, but it will be useful
|
partitions: The list of partitions is passed in every call. This
|
||||||
(in future) when we handle cases like rebalancing
|
may look like an overhead, but it will be useful
|
||||||
|
(in future) when we handle cases like rebalancing
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError('partition function has to be implemented')
|
raise NotImplementedError('partition function has to be implemented')
|
||||||
|
|||||||
@@ -85,20 +85,20 @@ class Producer(object):
|
|||||||
"""
|
"""
|
||||||
Base class to be used by producers
|
Base class to be used by producers
|
||||||
|
|
||||||
Params:
|
Arguments:
|
||||||
client - The Kafka client instance to use
|
client: The Kafka client instance to use
|
||||||
async - If set to true, the messages are sent asynchronously via another
|
async: If set to true, the messages are sent asynchronously via another
|
||||||
thread (process). We will not wait for a response to these
|
thread (process). We will not wait for a response to these
|
||||||
WARNING!!! current implementation of async producer does not
|
WARNING!!! current implementation of async producer does not
|
||||||
guarantee message delivery. Use at your own risk! Or help us
|
guarantee message delivery. Use at your own risk! Or help us
|
||||||
improve with a PR!
|
improve with a PR!
|
||||||
req_acks - A value indicating the acknowledgements that the server must
|
req_acks: A value indicating the acknowledgements that the server must
|
||||||
receive before responding to the request
|
receive before responding to the request
|
||||||
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
|
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
|
||||||
for an acknowledgement
|
for an acknowledgement
|
||||||
batch_send - If True, messages are send in batches
|
batch_send: If True, messages are send in batches
|
||||||
batch_send_every_n - If set, messages are send in batches of this size
|
batch_send_every_n: If set, messages are send in batches of this size
|
||||||
batch_send_every_t - If set, messages are send after this timeout
|
batch_send_every_t: If set, messages are send after this timeout
|
||||||
"""
|
"""
|
||||||
|
|
||||||
ACK_NOT_REQUIRED = 0 # No ack is required
|
ACK_NOT_REQUIRED = 0 # No ack is required
|
||||||
|
|||||||
@@ -15,17 +15,19 @@ class KeyedProducer(Producer):
|
|||||||
"""
|
"""
|
||||||
A producer which distributes messages to partitions based on the key
|
A producer which distributes messages to partitions based on the key
|
||||||
|
|
||||||
Args:
|
Arguments:
|
||||||
client - The kafka client instance
|
client: The kafka client instance
|
||||||
partitioner - A partitioner class that will be used to get the partition
|
|
||||||
to send the message to. Must be derived from Partitioner
|
Keyword Arguments:
|
||||||
async - If True, the messages are sent asynchronously via another
|
partitioner: A partitioner class that will be used to get the partition
|
||||||
|
to send the message to. Must be derived from Partitioner
|
||||||
|
async: If True, the messages are sent asynchronously via another
|
||||||
thread (process). We will not wait for a response to these
|
thread (process). We will not wait for a response to these
|
||||||
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
|
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
|
||||||
for an acknowledgement
|
for an acknowledgement
|
||||||
batch_send - If True, messages are send in batches
|
batch_send: If True, messages are send in batches
|
||||||
batch_send_every_n - If set, messages are send in batches of this size
|
batch_send_every_n: If set, messages are send in batches of this size
|
||||||
batch_send_every_t - If set, messages are send after this timeout
|
batch_send_every_t: If set, messages are send after this timeout
|
||||||
"""
|
"""
|
||||||
def __init__(self, client, partitioner=None, async=False,
|
def __init__(self, client, partitioner=None, async=False,
|
||||||
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
||||||
|
|||||||
@@ -19,21 +19,23 @@ class SimpleProducer(Producer):
|
|||||||
"""
|
"""
|
||||||
A simple, round-robin producer. Each message goes to exactly one partition
|
A simple, round-robin producer. Each message goes to exactly one partition
|
||||||
|
|
||||||
Params:
|
Arguments:
|
||||||
client - The Kafka client instance to use
|
client: The Kafka client instance to use
|
||||||
async - If True, the messages are sent asynchronously via another
|
|
||||||
|
Keyword Arguments:
|
||||||
|
async: If True, the messages are sent asynchronously via another
|
||||||
thread (process). We will not wait for a response to these
|
thread (process). We will not wait for a response to these
|
||||||
req_acks - A value indicating the acknowledgements that the server must
|
req_acks: A value indicating the acknowledgements that the server must
|
||||||
receive before responding to the request
|
receive before responding to the request
|
||||||
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
|
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
|
||||||
for an acknowledgement
|
for an acknowledgement
|
||||||
batch_send - If True, messages are send in batches
|
batch_send: If True, messages are send in batches
|
||||||
batch_send_every_n - If set, messages are send in batches of this size
|
batch_send_every_n: If set, messages are send in batches of this size
|
||||||
batch_send_every_t - If set, messages are send after this timeout
|
batch_send_every_t: If set, messages are send after this timeout
|
||||||
random_start - If true, randomize the initial partition which the
|
random_start: If true, randomize the initial partition which the
|
||||||
the first message block will be published to, otherwise
|
the first message block will be published to, otherwise
|
||||||
if false, the first message block will always publish
|
if false, the first message block will always publish
|
||||||
to partition 0 before cycling through each partition
|
to partition 0 before cycling through each partition
|
||||||
"""
|
"""
|
||||||
def __init__(self, client, async=False,
|
def __init__(self, client, async=False,
|
||||||
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
||||||
|
|||||||
@@ -185,18 +185,18 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Encode some ProduceRequest structs
|
Encode some ProduceRequest structs
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
client_id: string
|
||||||
client_id: string
|
correlation_id: int
|
||||||
correlation_id: int
|
payloads: list of ProduceRequest
|
||||||
payloads: list of ProduceRequest
|
acks: How "acky" you want the request to be
|
||||||
acks: How "acky" you want the request to be
|
0: immediate response
|
||||||
0: immediate response
|
1: written to disk by the leader
|
||||||
1: written to disk by the leader
|
2+: waits for this many number of replicas to sync
|
||||||
2+: waits for this many number of replicas to sync
|
-1: waits for all replicas to be in sync
|
||||||
-1: waits for all replicas to be in sync
|
timeout: Maximum time the server will wait for acks from replicas.
|
||||||
timeout: Maximum time the server will wait for acks from replicas.
|
This is _not_ a socket timeout
|
||||||
This is _not_ a socket timeout
|
|
||||||
"""
|
"""
|
||||||
payloads = [] if payloads is None else payloads
|
payloads = [] if payloads is None else payloads
|
||||||
grouped_payloads = group_by_topic_and_partition(payloads)
|
grouped_payloads = group_by_topic_and_partition(payloads)
|
||||||
@@ -225,9 +225,9 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Decode bytes to a ProduceResponse
|
Decode bytes to a ProduceResponse
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
data: bytes to decode
|
||||||
data: bytes to decode
|
|
||||||
"""
|
"""
|
||||||
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
|
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
|
||||||
|
|
||||||
@@ -248,14 +248,13 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Encodes some FetchRequest structs
|
Encodes some FetchRequest structs
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
client_id: string
|
||||||
client_id: string
|
correlation_id: int
|
||||||
correlation_id: int
|
payloads: list of FetchRequest
|
||||||
payloads: list of FetchRequest
|
max_wait_time: int, how long to block waiting on min_bytes of data
|
||||||
max_wait_time: int, how long to block waiting on min_bytes of data
|
min_bytes: int, the minimum number of bytes to accumulate before
|
||||||
min_bytes: int, the minimum number of bytes to accumulate before
|
returning the response
|
||||||
returning the response
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
payloads = [] if payloads is None else payloads
|
payloads = [] if payloads is None else payloads
|
||||||
@@ -284,9 +283,8 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Decode bytes to a FetchResponse
|
Decode bytes to a FetchResponse
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
data: bytes to decode
|
||||||
data: bytes to decode
|
|
||||||
"""
|
"""
|
||||||
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
|
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
|
||||||
|
|
||||||
@@ -333,9 +331,8 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Decode bytes to an OffsetResponse
|
Decode bytes to an OffsetResponse
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
data: bytes to decode
|
||||||
data: bytes to decode
|
|
||||||
"""
|
"""
|
||||||
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
|
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
|
||||||
|
|
||||||
@@ -360,11 +357,10 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Encode a MetadataRequest
|
Encode a MetadataRequest
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
client_id: string
|
||||||
client_id: string
|
correlation_id: int
|
||||||
correlation_id: int
|
topics: list of strings
|
||||||
topics: list of strings
|
|
||||||
"""
|
"""
|
||||||
if payloads is None:
|
if payloads is None:
|
||||||
topics = [] if topics is None else topics
|
topics = [] if topics is None else topics
|
||||||
@@ -388,9 +384,8 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Decode bytes to a MetadataResponse
|
Decode bytes to a MetadataResponse
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
data: bytes to decode
|
||||||
data: bytes to decode
|
|
||||||
"""
|
"""
|
||||||
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
|
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
|
||||||
|
|
||||||
@@ -439,12 +434,11 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Encode some OffsetCommitRequest structs
|
Encode some OffsetCommitRequest structs
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
client_id: string
|
||||||
client_id: string
|
correlation_id: int
|
||||||
correlation_id: int
|
group: string, the consumer group you are committing offsets for
|
||||||
group: string, the consumer group you are committing offsets for
|
payloads: list of OffsetCommitRequest
|
||||||
payloads: list of OffsetCommitRequest
|
|
||||||
"""
|
"""
|
||||||
grouped_payloads = group_by_topic_and_partition(payloads)
|
grouped_payloads = group_by_topic_and_partition(payloads)
|
||||||
|
|
||||||
@@ -470,9 +464,8 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Decode bytes to an OffsetCommitResponse
|
Decode bytes to an OffsetCommitResponse
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
data: bytes to decode
|
||||||
data: bytes to decode
|
|
||||||
"""
|
"""
|
||||||
((correlation_id,), cur) = relative_unpack('>i', data, 0)
|
((correlation_id,), cur) = relative_unpack('>i', data, 0)
|
||||||
((num_topics,), cur) = relative_unpack('>i', data, cur)
|
((num_topics,), cur) = relative_unpack('>i', data, cur)
|
||||||
@@ -491,12 +484,11 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Encode some OffsetFetchRequest structs
|
Encode some OffsetFetchRequest structs
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
client_id: string
|
||||||
client_id: string
|
correlation_id: int
|
||||||
correlation_id: int
|
group: string, the consumer group you are fetching offsets for
|
||||||
group: string, the consumer group you are fetching offsets for
|
payloads: list of OffsetFetchRequest
|
||||||
payloads: list of OffsetFetchRequest
|
|
||||||
"""
|
"""
|
||||||
grouped_payloads = group_by_topic_and_partition(payloads)
|
grouped_payloads = group_by_topic_and_partition(payloads)
|
||||||
|
|
||||||
@@ -522,9 +514,8 @@ class KafkaProtocol(object):
|
|||||||
"""
|
"""
|
||||||
Decode bytes to an OffsetFetchResponse
|
Decode bytes to an OffsetFetchResponse
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
data: bytes to decode
|
||||||
data: bytes to decode
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
((correlation_id,), cur) = relative_unpack('>i', data, 0)
|
((correlation_id,), cur) = relative_unpack('>i', data, 0)
|
||||||
@@ -547,10 +538,10 @@ def create_message(payload, key=None):
|
|||||||
"""
|
"""
|
||||||
Construct a Message
|
Construct a Message
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
payload: bytes, the payload to send to Kafka
|
||||||
payload: bytes, the payload to send to Kafka
|
key: bytes, a key used for partition routing (optional)
|
||||||
key: bytes, a key used for partition routing (optional)
|
|
||||||
"""
|
"""
|
||||||
return Message(0, 0, key, payload)
|
return Message(0, 0, key, payload)
|
||||||
|
|
||||||
@@ -562,10 +553,10 @@ def create_gzip_message(payloads, key=None):
|
|||||||
The given payloads will be encoded, compressed, and sent as a single atomic
|
The given payloads will be encoded, compressed, and sent as a single atomic
|
||||||
message to Kafka.
|
message to Kafka.
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
payloads: list(bytes), a list of payload to send be sent to Kafka
|
||||||
payloads: list(bytes), a list of payload to send be sent to Kafka
|
key: bytes, a key used for partition routing (optional)
|
||||||
key: bytes, a key used for partition routing (optional)
|
|
||||||
"""
|
"""
|
||||||
message_set = KafkaProtocol._encode_message_set(
|
message_set = KafkaProtocol._encode_message_set(
|
||||||
[create_message(payload, key) for payload in payloads])
|
[create_message(payload, key) for payload in payloads])
|
||||||
@@ -583,10 +574,10 @@ def create_snappy_message(payloads, key=None):
|
|||||||
The given payloads will be encoded, compressed, and sent as a single atomic
|
The given payloads will be encoded, compressed, and sent as a single atomic
|
||||||
message to Kafka.
|
message to Kafka.
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
payloads: list(bytes), a list of payload to send be sent to Kafka
|
||||||
payloads: list(bytes), a list of payload to send be sent to Kafka
|
key: bytes, a key used for partition routing (optional)
|
||||||
key: bytes, a key used for partition routing (optional)
|
|
||||||
"""
|
"""
|
||||||
message_set = KafkaProtocol._encode_message_set(
|
message_set = KafkaProtocol._encode_message_set(
|
||||||
[create_message(payload, key) for payload in payloads])
|
[create_message(payload, key) for payload in payloads])
|
||||||
|
|||||||
@@ -129,13 +129,12 @@ class KafkaQueue(object):
|
|||||||
Messages are buffered in the producer thread until
|
Messages are buffered in the producer thread until
|
||||||
producer_flush_timeout or producer_flush_buffer is reached.
|
producer_flush_timeout or producer_flush_buffer is reached.
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
client: KafkaClient object
|
||||||
client: KafkaClient object
|
topic: str, the topic name
|
||||||
topic: str, the topic name
|
partitions: list of ints, the partions to consume from
|
||||||
partitions: list of ints, the partions to consume from
|
producer_config: dict, see below
|
||||||
producer_config: dict, see below
|
consumer_config: dict, see below
|
||||||
consumer_config: dict, see below
|
|
||||||
|
|
||||||
Consumer Config
|
Consumer Config
|
||||||
===============
|
===============
|
||||||
@@ -184,14 +183,12 @@ class KafkaQueue(object):
|
|||||||
"""
|
"""
|
||||||
Consume a message from Kafka
|
Consume a message from Kafka
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
block: boolean, default True
|
||||||
block: boolean, default True
|
timeout: int, number of seconds to wait when blocking, default None
|
||||||
timeout: int, number of seconds to wait when blocking, default None
|
|
||||||
|
|
||||||
Returns
|
Returns:
|
||||||
=======
|
msg: str, the payload from Kafka
|
||||||
msg: str, the payload from Kafka
|
|
||||||
"""
|
"""
|
||||||
return self.in_queue.get(block, timeout).payload
|
return self.in_queue.get(block, timeout).payload
|
||||||
|
|
||||||
@@ -199,11 +196,10 @@ class KafkaQueue(object):
|
|||||||
"""
|
"""
|
||||||
Send a message to Kafka
|
Send a message to Kafka
|
||||||
|
|
||||||
Params
|
Arguments:
|
||||||
======
|
msg: std, the message to send
|
||||||
msg: std, the message to send
|
block: boolean, default True
|
||||||
block: boolean, default True
|
timeout: int, number of seconds to wait when blocking, default None
|
||||||
timeout: int, number of seconds to wait when blocking, default None
|
|
||||||
"""
|
"""
|
||||||
self.out_queue.put(msg, block, timeout)
|
self.out_queue.put(msg, block, timeout)
|
||||||
|
|
||||||
|
|||||||
@@ -103,10 +103,12 @@ class ReentrantTimer(object):
|
|||||||
A timer that can be restarted, unlike threading.Timer
|
A timer that can be restarted, unlike threading.Timer
|
||||||
(although this uses threading.Timer)
|
(although this uses threading.Timer)
|
||||||
|
|
||||||
t: timer interval in milliseconds
|
Arguments:
|
||||||
fn: a callable to invoke
|
|
||||||
args: tuple of args to be passed to function
|
t: timer interval in milliseconds
|
||||||
kwargs: keyword arguments to be passed to function
|
fn: a callable to invoke
|
||||||
|
args: tuple of args to be passed to function
|
||||||
|
kwargs: keyword arguments to be passed to function
|
||||||
"""
|
"""
|
||||||
def __init__(self, t, fn, *args, **kwargs):
|
def __init__(self, t, fn, *args, **kwargs):
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user