Merge branch 'master' into asyncproducer
Conflicts: kafka/producer.py
This commit is contained in:
16
README.md
16
README.md
@@ -47,6 +47,22 @@ for message in consumer:
|
||||
kafka.close()
|
||||
```
|
||||
|
||||
## Keyed messages
|
||||
```python
|
||||
from kafka.client import KafkaClient
|
||||
from kafka.producer import KeyedProducer
|
||||
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
|
||||
|
||||
kafka = KafkaClient("localhost", 9092)
|
||||
|
||||
# HashedPartitioner is default
|
||||
producer = KeyedProducer(kafka, "my-topic")
|
||||
producer.send("key1", "some message")
|
||||
producer.send("key2", "this methode")
|
||||
|
||||
producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
|
||||
```
|
||||
|
||||
## Low level
|
||||
|
||||
```python
|
||||
|
||||
@@ -73,7 +73,12 @@ class KafkaClient(object):
|
||||
|
||||
self.brokers.update(brokers)
|
||||
self.topics_to_brokers = {}
|
||||
|
||||
for topic, partitions in topics.items():
|
||||
# Clear the list once before we add it. This removes stale entries
|
||||
# and avoids duplicates
|
||||
self.topic_partitions.pop(topic, None)
|
||||
|
||||
if not partitions:
|
||||
log.info("Partition is unassigned, delay for 1s and retry")
|
||||
time.sleep(1)
|
||||
|
||||
56
kafka/partitioner.py
Normal file
56
kafka/partitioner.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from itertools import cycle
|
||||
|
||||
|
||||
class Partitioner(object):
|
||||
"""
|
||||
Base class for a partitioner
|
||||
"""
|
||||
def __init__(self, partitions):
|
||||
"""
|
||||
Initialize the partitioner
|
||||
|
||||
partitions - A list of available partitions (during startup)
|
||||
"""
|
||||
self.partitions = partitions
|
||||
|
||||
def partition(self, key, partitions):
|
||||
"""
|
||||
Takes a string key and num_partitions as argument and returns
|
||||
a partition to be used for the message
|
||||
|
||||
partitions - The list of partitions is passed in every call. This
|
||||
may look like an overhead, but it will be useful
|
||||
(in future) when we handle cases like rebalancing
|
||||
"""
|
||||
raise NotImplemented('partition function has to be implemented')
|
||||
|
||||
|
||||
class RoundRobinPartitioner(Partitioner):
|
||||
"""
|
||||
Implements a round robin partitioner which sends data to partitions
|
||||
in a round robin fashion
|
||||
"""
|
||||
def __init__(self, partitions):
|
||||
self._set_partitions(partitions)
|
||||
|
||||
def _set_partitions(self, partitions):
|
||||
self.partitions = partitions
|
||||
self.iterpart = cycle(partitions)
|
||||
|
||||
def partition(self, key, partitions):
|
||||
# Refresh the partition list if necessary
|
||||
if self.partitions != partitions:
|
||||
self._set_partitions(partitions)
|
||||
|
||||
return self.iterpart.next()
|
||||
|
||||
|
||||
class HashedPartitioner(Partitioner):
|
||||
"""
|
||||
Implements a partitioner which selects the target partition based on
|
||||
the hash of the key
|
||||
"""
|
||||
def partition(self, key, partitions):
|
||||
size = len(partitions)
|
||||
idx = hash(key) % size
|
||||
return partitions[idx]
|
||||
@@ -4,6 +4,7 @@ import logging
|
||||
|
||||
from kafka.common import ProduceRequest
|
||||
from kafka.protocol import create_message
|
||||
from kafka.partitioner import HashedPartitioner
|
||||
|
||||
log = logging.getLogger("kafka")
|
||||
|
||||
@@ -72,3 +73,36 @@ class SimpleProducer(Producer):
|
||||
req = ProduceRequest(self.topic, self.next_partition.next(),
|
||||
messages=[create_message(m) for m in msg])
|
||||
self.send_request(req)
|
||||
|
||||
|
||||
class KeyedProducer(Producer):
|
||||
"""
|
||||
A producer which distributes messages to partitions based on the key
|
||||
|
||||
Args:
|
||||
client - The kafka client instance
|
||||
topic - The kafka topic to send messages to
|
||||
partitioner - A partitioner class that will be used to get the partition
|
||||
to send the message to. Must be derived from Partitioner
|
||||
async - If True, the messages are sent asynchronously via another
|
||||
thread (process). We will not wait for a response to these
|
||||
"""
|
||||
def __init__(self, client, topic, partitioner=None, async=False):
|
||||
self.topic = topic
|
||||
client._load_metadata_for_topics(topic)
|
||||
|
||||
if not partitioner:
|
||||
partitioner = HashedPartitioner
|
||||
|
||||
self.partitioner = partitioner(client.topic_partitions[topic])
|
||||
|
||||
super(KeyedProducer, self).__init__(client, async)
|
||||
|
||||
def send(self, key, msg):
|
||||
partitions = self.client.topic_partitions[self.topic]
|
||||
partition = self.partitioner.partition(key, partitions)
|
||||
|
||||
req = ProduceRequest(self.topic, partition,
|
||||
messages=[create_message(msg)])
|
||||
|
||||
self.send_request(req)
|
||||
|
||||
Reference in New Issue
Block a user