Merge pull request #32 from mahendra/keyed
Implement support for keyed messages
This commit is contained in:
16
README.md
16
README.md
@@ -41,6 +41,22 @@ for message in consumer:
|
||||
kafka.close()
|
||||
```
|
||||
|
||||
## Keyed messages
|
||||
```python
|
||||
from kafka.client import KafkaClient
|
||||
from kafka.producer import KeyedProducer
|
||||
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
|
||||
|
||||
kafka = KafkaClient("localhost", 9092)
|
||||
|
||||
# HashedPartitioner is default
|
||||
producer = KeyedProducer(kafka, "my-topic")
|
||||
producer.send("key1", "some message")
|
||||
producer.send("key2", "this methode")
|
||||
|
||||
producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
|
||||
```
|
||||
|
||||
## Low level
|
||||
|
||||
```python
|
||||
|
||||
@@ -73,7 +73,12 @@ class KafkaClient(object):
|
||||
|
||||
self.brokers.update(brokers)
|
||||
self.topics_to_brokers = {}
|
||||
|
||||
for topic, partitions in topics.items():
|
||||
# Clear the list once before we add it. This removes stale entries
|
||||
# and avoids duplicates
|
||||
self.topic_partitions.pop(topic, None)
|
||||
|
||||
if not partitions:
|
||||
log.info("Partition is unassigned, delay for 1s and retry")
|
||||
time.sleep(1)
|
||||
|
||||
56
kafka/partitioner.py
Normal file
56
kafka/partitioner.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from itertools import cycle
|
||||
|
||||
|
||||
class Partitioner(object):
|
||||
"""
|
||||
Base class for a partitioner
|
||||
"""
|
||||
def __init__(self, partitions):
|
||||
"""
|
||||
Initialize the partitioner
|
||||
|
||||
partitions - A list of available partitions (during startup)
|
||||
"""
|
||||
self.partitions = partitions
|
||||
|
||||
def partition(self, key, partitions):
|
||||
"""
|
||||
Takes a string key and num_partitions as argument and returns
|
||||
a partition to be used for the message
|
||||
|
||||
partitions - The list of partitions is passed in every call. This
|
||||
may look like an overhead, but it will be useful
|
||||
(in future) when we handle cases like rebalancing
|
||||
"""
|
||||
raise NotImplemented('partition function has to be implemented')
|
||||
|
||||
|
||||
class RoundRobinPartitioner(Partitioner):
|
||||
"""
|
||||
Implements a round robin partitioner which sends data to partitions
|
||||
in a round robin fashion
|
||||
"""
|
||||
def __init__(self, partitions):
|
||||
self._set_partitions(partitions)
|
||||
|
||||
def _set_partitions(self, partitions):
|
||||
self.partitions = partitions
|
||||
self.iterpart = cycle(partitions)
|
||||
|
||||
def partition(self, key, partitions):
|
||||
# Refresh the partition list if necessary
|
||||
if self.partitions != partitions:
|
||||
self._set_partitions(partitions)
|
||||
|
||||
return self.iterpart.next()
|
||||
|
||||
|
||||
class HashedPartitioner(Partitioner):
|
||||
"""
|
||||
Implements a partitioner which selects the target partition based on
|
||||
the hash of the key
|
||||
"""
|
||||
def partition(self, key, partitions):
|
||||
size = len(partitions)
|
||||
idx = hash(key) % size
|
||||
return partitions[idx]
|
||||
@@ -3,6 +3,7 @@ import logging
|
||||
|
||||
from kafka.common import ProduceRequest
|
||||
from kafka.protocol import create_message
|
||||
from kafka.partitioner import HashedPartitioner
|
||||
|
||||
log = logging.getLogger("kafka")
|
||||
|
||||
@@ -23,3 +24,34 @@ class SimpleProducer(object):
|
||||
|
||||
resp = self.client.send_produce_request([req])[0]
|
||||
assert resp.error == 0
|
||||
|
||||
|
||||
class KeyedProducer(object):
|
||||
"""
|
||||
A producer which distributes messages to partitions based on the key
|
||||
|
||||
Args:
|
||||
client - The kafka client instance
|
||||
topic - The kafka topic to send messages to
|
||||
partitioner - A partitioner class that will be used to get the partition
|
||||
to send the message to. Must be derived from Partitioner
|
||||
"""
|
||||
def __init__(self, client, topic, partitioner=None):
|
||||
self.client = client
|
||||
self.topic = topic
|
||||
self.client._load_metadata_for_topics(topic)
|
||||
|
||||
if not partitioner:
|
||||
partitioner = HashedPartitioner
|
||||
|
||||
self.partitioner = partitioner(self.client.topic_partitions[topic])
|
||||
|
||||
def send(self, key, msg):
|
||||
partitions = self.client.topic_partitions[self.topic]
|
||||
partition = self.partitioner.partition(key, partitions)
|
||||
|
||||
req = ProduceRequest(self.topic, partition,
|
||||
messages=[create_message(msg)])
|
||||
|
||||
resp = self.client.send_produce_request([req])[0]
|
||||
assert resp.error == 0
|
||||
|
||||
Reference in New Issue
Block a user