Update README with new APIs
This commit is contained in:
131
README.md
131
README.md
@@ -4,17 +4,17 @@ This module provides low-level protocol support Apache Kafka. It implements the
|
|||||||
(and their responses): Produce, Fetch, MultiFetch, MultiProduce, and Offsets. Gzip and Snappy compression
|
(and their responses): Produce, Fetch, MultiFetch, MultiProduce, and Offsets. Gzip and Snappy compression
|
||||||
is also supported.
|
is also supported.
|
||||||
|
|
||||||
Compatible with Apache Kafka 0.7x. Tested against 0.7.0, 0.7.1, and 0.7.2
|
Compatible with Apache Kafka 0.7x. Tested against 0.8
|
||||||
|
|
||||||
http://incubator.apache.org/kafka/
|
http://incubator.apache.org/kafka/
|
||||||
|
|
||||||
# License
|
# License
|
||||||
|
|
||||||
Copyright 2012, David Arthur under Apache License, v2.0. See `LICENSE`
|
Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`
|
||||||
|
|
||||||
# Status
|
# Status
|
||||||
|
|
||||||
Current version is 0.1-alpha. The current API should be pretty stable.
|
Current version is 0.2-alpha. This version is under development, APIs are subject to change
|
||||||
|
|
||||||
# Install
|
# Install
|
||||||
|
|
||||||
@@ -89,109 +89,34 @@ python -m test.integration
|
|||||||
|
|
||||||
# Usage
|
# Usage
|
||||||
|
|
||||||
## Send a message to a topic
|
## High level
|
||||||
|
|
||||||
|
```python
|
||||||
|
from kafka.client import KafkaClient
|
||||||
|
|
||||||
|
producer = SimpleProducer(kafka, "my-topic")
|
||||||
|
producer.send_messages("some message")
|
||||||
|
producer.send_messages("this method", "is variadic")
|
||||||
|
|
||||||
|
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
|
||||||
|
for message in consumer:
|
||||||
|
print(message)
|
||||||
|
|
||||||
|
kafka.close()
|
||||||
|
```
|
||||||
|
|
||||||
|
## Low level
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from kafka.client import KafkaClient
|
from kafka.client import KafkaClient
|
||||||
kafka = KafkaClient("localhost", 9092)
|
kafka = KafkaClient("localhost", 9092)
|
||||||
kafka.send_messages_simple("my-topic", "some message")
|
req = ProduceRequest(topic="my-topic", partition=1,
|
||||||
|
messages=[KafkaProdocol.encode_message("some message")])
|
||||||
|
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
|
||||||
kafka.close()
|
kafka.close()
|
||||||
```
|
|
||||||
|
resps[0].topic # "my-topic"
|
||||||
## Send several messages to a topic
|
resps[0].partition # 1
|
||||||
|
resps[0].error # 0 (hopefully)
|
||||||
Same as before, just add more arguments to `send_simple`
|
resps[0].offset # offset of the first message sent in this request
|
||||||
|
|
||||||
```python
|
|
||||||
kafka = KafkaClient("localhost", 9092)
|
|
||||||
kafka.send_messages_simple("my-topic", "some message", "another message", "and another")
|
|
||||||
kafka.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
## Recieve some messages from a topic
|
|
||||||
|
|
||||||
Supply `get_message_set` with a `FetchRequest`, get back the messages and new `FetchRequest`
|
|
||||||
|
|
||||||
```python
|
|
||||||
kafka = KafkaClient("localhost", 9092)
|
|
||||||
req = FetchRequest("my-topic", 0, 0, 1024*1024)
|
|
||||||
(messages, req1) = kafka.get_message_set(req)
|
|
||||||
kafka.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
The returned `FetchRequest` includes the offset of the next message. This makes
|
|
||||||
paging through the queue very simple.
|
|
||||||
|
|
||||||
## Send multiple messages to multiple topics
|
|
||||||
|
|
||||||
For this we use the `send_multi_message_set` method along with `ProduceRequest` objects.
|
|
||||||
|
|
||||||
```python
|
|
||||||
kafka = KafkaClient("localhost", 9092)
|
|
||||||
req1 = ProduceRequest("my-topic-1", 0, [
|
|
||||||
create_message_from_string("message one"),
|
|
||||||
create_message_from_string("message two")
|
|
||||||
])
|
|
||||||
req2 = ProduceRequest("my-topic-2", 0, [
|
|
||||||
create_message_from_string("nachricht ein"),
|
|
||||||
create_message_from_string("nachricht zwei")
|
|
||||||
])
|
|
||||||
kafka.send_multi_message_set([req1, req1])
|
|
||||||
kafka.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
## Iterate through all messages from an offset
|
|
||||||
|
|
||||||
The `iter_messages` method will make the underlying calls to `get_message_set`
|
|
||||||
to provide a generator that returns every message available.
|
|
||||||
|
|
||||||
```python
|
|
||||||
kafka = KafkaClient("localhost", 9092)
|
|
||||||
for msg in kafka.iter_messages("my-topic", 0, 0, 1024*1024):
|
|
||||||
print(msg.payload)
|
|
||||||
kafka.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
An optional `auto` argument will control auto-paging through results
|
|
||||||
|
|
||||||
```python
|
|
||||||
kafka = KafkaClient("localhost", 9092)
|
|
||||||
for msg in kafka.iter_messages("my-topic", 0, 0, 1024*1024, False):
|
|
||||||
print(msg.payload)
|
|
||||||
kafka.close()
|
|
||||||
```
|
|
||||||
This will only iterate through messages in the byte range of (0, 1024\*1024)
|
|
||||||
|
|
||||||
## Create some compressed messages
|
|
||||||
|
|
||||||
```python
|
|
||||||
kafka = KafkaClient("localhost", 9092)
|
|
||||||
messages = [kafka.create_snappy_message("testing 1"),
|
|
||||||
kafka.create_snappy_message("testing 2")]
|
|
||||||
req = ProduceRequest(topic, 1, messages)
|
|
||||||
kafka.send_message_set(req)
|
|
||||||
kafka.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
## Use Kafka like a FIFO queue
|
|
||||||
|
|
||||||
Simple API: `get`, `put`, `close`.
|
|
||||||
|
|
||||||
```python
|
|
||||||
kafka = KafkaClient("localhost", 9092)
|
|
||||||
q = KafkaQueue(kafka, "my-topic", [0,1])
|
|
||||||
q.put("first")
|
|
||||||
q.put("second")
|
|
||||||
q.get() # first
|
|
||||||
q.get() # second
|
|
||||||
q.close()
|
|
||||||
kafka.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
Since the producer and consumers are backed by actual `multiprocessing.Queue`, you can
|
|
||||||
do blocking or non-blocking puts and gets.
|
|
||||||
|
|
||||||
```python
|
|
||||||
q.put("first", block=False)
|
|
||||||
q.get(block=True, timeout=10)
|
|
||||||
```
|
```
|
||||||
|
|||||||
Reference in New Issue
Block a user