Merge branch 'master' into 0.8

Conflicts:
	README.md
	kafka-src
	kafka/client.py
	kafka/consumer.py
	kafka/protocol.py
	setup.py
	test/integration.py
This commit is contained in:
David Arthur
2013-04-02 20:38:55 -04:00
5 changed files with 62 additions and 53 deletions

105
README.md
View File

@@ -1,12 +1,13 @@
# Kakfa Python client
# Kafka Python client
This module provides low-level protocol support Apache Kafka. It implements the five basic request types
(and their responses): Produce, Fetch, MultiFetch, MultiProduce, and Offsets. Gzip and Snappy compression
is also supported.
This module provides low-level protocol support for Apache Kafka as well as
high-level consumer and producer classes. Request batching is supported by the
protocol as well as broker-aware request routing. Gzip and Snappy compression
is also supported for message sets.
Compatible with Apache Kafka 0.7x. Tested against 0.8
Compatible with Apache Kafka 0.8.0
http://incubator.apache.org/kafka/
http://kafka.apache.org/
# License
@@ -14,7 +15,47 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`
# Status
Current version is 0.2-alpha. This version is under development, APIs are subject to change
I'm following the version numbers of Kafka, plus one number to indicate the
version of this project. The current version is 0.8.0-1. This version is under
development, APIs are subject to change.
# Usage
## High level
```python
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
kafka = KafkaClient("localhost", 9092)
producer = SimpleProducer(kafka, "my-topic")
producer.send_messages("some message")
producer.send_messages("this method", "is variadic")
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
for message in consumer:
print(message)
kafka.close()
```
## Low level
```python
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()
resps[0].topic # "my-topic"
resps[0].partition # 1
resps[0].error # 0 (hopefully)
resps[0].offset # offset of the first message sent in this request
```
# Install
@@ -60,11 +101,14 @@ pip install python-snappy
# Tests
Some of the tests will fail if Snappy is not installed. These tests will throw NotImplementedError. If you see other failures,
they might be bugs - so please report them!
Some of the tests will fail if Snappy is not installed. These tests will throw
NotImplementedError. If you see other failures, they might be bugs - so please
report them!
## Run the unit tests
_These are broken at the moment_
```shell
python -m test.unit
```
@@ -81,46 +125,11 @@ cd kafka-src
./sbt package
```
Then from the root directory, run the integration tests
Next start up a ZooKeeper server on localhost:2181
```shell
python -m test.integration
/opt/zookeeper/bin/zkServer.sh start
```
# Usage
## High level
```python
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
kafka = KafkaClient("localhost", 9092)
producer = SimpleProducer(kafka, "my-topic")
producer.send_messages("some message")
producer.send_messages("this method", "is variadic")
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
for message in consumer:
print(message)
kafka.close()
```
## Low level
```python
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()
resps[0].topic # "my-topic"
resps[0].partition # 1
resps[0].error # 0 (hopefully)
resps[0].offset # offset of the first message sent in this request
```
This will actually start up real Kafka brokers and send messages in using the
client.

View File

@@ -233,7 +233,7 @@ class KafkaClient(object):
def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None):
raise NotImplementedError("Broker-managed offsets not supported in 0.8")
resps = self._send_broker_aware_request(payloads,
partial(KafkaProtocol.encode_offset_commit_fetch, group=group),
partial(KafkaProtocol.encode_offset_fetch_request, group=group),
KafkaProtocol.decode_offset_fetch_response)
out = []
for resp in resps:

View File

@@ -354,7 +354,6 @@ class KafkaProtocol(object):
======
data: bytes to decode
"""
data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
@@ -398,7 +397,6 @@ class KafkaProtocol(object):
data: bytes to decode
"""
data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)

View File

@@ -2,7 +2,7 @@ from distutils.core import setup
setup(
name="kafka-python",
version="0.2-alpha",
version="0.8.0-1",
author="David Arthur",
author_email="mumrah@gmail.com",
url="https://github.com/mumrah/kafka-python",

View File

@@ -6,6 +6,7 @@ import shlex
import shutil
import socket
import subprocess
import sys
import tempfile
from threading import Thread, Event
import time
@@ -73,7 +74,8 @@ class KafkaFixture(Thread):
args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot))
proc = subprocess.Popen(args)
ret = proc.wait()
assert ret == 0
if ret != 0:
sys.exit(1)
# Start Kafka