Adding client_fetch_size to queue interface

Also more docs
This commit is contained in:
David Arthur
2012-11-20 10:16:15 -05:00
parent 40e49dce48
commit 0bc2afe910
2 changed files with 57 additions and 3 deletions

View File

@@ -172,3 +172,26 @@ req = ProduceRequest(topic, 1, messages)
kafka.send_message_set(req)
kafka.close()
```
## Use Kafka like a FIFO queue
Simple API: `get`, `put`, `close`.
```python
kafka = KafkaClient("localhost", 9092)
q = KafkaQueue(kafka, "my-topic", [0,1])
q.put("first")
q.put("second")
q.get() # first
q.get() # second
q.close()
kafka.close()
```
Since the producer and consumers are backed by actual `multiprocessing.Queue`, you can
do blocking or non-blocking puts and gets.
```python
q.put("first", block=False)
q.get(block=True, timeout=10)
```

View File

@@ -9,12 +9,13 @@ from .client import KafkaClient, FetchRequest, ProduceRequest
log = logging.getLogger("kafka")
class KafkaConsumerProcess(Process):
def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200):
def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200):
self.client = copy(client)
self.topic = topic
self.partition = partition
self.out_queue = out_queue
self.barrier = barrier
self.consumer_fetch_size = consumer_fetch_size
self.consumer_sleep = consumer_sleep / 1000.
log.info("Initializing %s" % self)
Process.__init__(self)
@@ -26,7 +27,7 @@ class KafkaConsumerProcess(Process):
def run(self):
self.barrier.wait()
log.info("Starting %s" % self)
fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)
fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.consumer_fetch_size)
while True:
if self.barrier.is_set() == False:
log.info("Shutdown %s" % self)
@@ -91,6 +92,10 @@ class KafkaQueue(object):
"""
KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers
Messages are eagerly loaded by the consumer in batches of size consumer_fetch_size.
Messages are buffered in the producer thread until producer_flush_timeout or
producer_flush_buffer is reached.
Params
======
client: KafkaClient object
@@ -101,6 +106,8 @@ class KafkaQueue(object):
Consumer Config
===============
consumer_fetch_size: int, number of bytes to fetch in one call to Kafka. Default
is 1024
consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches
the end of a partition. Default is 200
@@ -133,12 +140,36 @@ class KafkaQueue(object):
self.barrier.set()
def get(self, block=True, timeout=None):
"""
Consume a message from Kafka
Params
======
block: boolean, default True
timeout: int, number of seconds to wait when blocking, default None
Returns
=======
msg: str, the payload from Kafka
"""
return self.in_queue.get(block, timeout).payload
def put(self, msg, block=True, timeout=None):
return self.out_queue.put(msg, block, timeout)
"""
Send a message to Kafka
Params
======
msg: std, the message to send
block: boolean, default True
timeout: int, number of seconds to wait when blocking, default None
"""
self.out_queue.put(msg, block, timeout)
def close(self):
"""
Close the internal queues and Kafka consumers/producer
"""
self.in_queue.close()
self.out_queue.close()
self.barrier.clear()