From b2a65033fce6f5128fd53e82b3b63b633e324671 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 13 Jun 2013 14:11:33 +0530 Subject: [PATCH 1/4] Implement support for Keyed producer Provides support for two partitioners * Round robin * Hashed (default as per kafka clients) --- README.md | 12 ++++++++++ kafka/partitioner.py | 52 ++++++++++++++++++++++++++++++++++++++++++++ kafka/producer.py | 32 +++++++++++++++++++++++++++ 3 files changed, 96 insertions(+) create mode 100644 kafka/partitioner.py diff --git a/README.md b/README.md index 8b57172..dbdb295 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,18 @@ for message in consumer: kafka.close() ``` +## Keyed messages +from kafka.client import KafkaClient +from kafka.producer import KeyedProducer +from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner + +kafka = KafkaClient("localhost", 9092) + +producer = KeyedProducer(kafka, "my-topic", partitioner=HashedPartitioner) +producer.send("key1", "some message") +producer.send("key2", "this methode") + + ## Low level ```python diff --git a/kafka/partitioner.py b/kafka/partitioner.py new file mode 100644 index 0000000..0f49b07 --- /dev/null +++ b/kafka/partitioner.py @@ -0,0 +1,52 @@ +from itertools import cycle + + +class Partitioner(object): + """ + Base class for a partitioner + """ + def __init__(self, partitions): + """ + Initialize the partitioner + + partitions - A list of available partitions (during startup) + """ + self.partitions = partitions + + def partition(self, key, partitions): + """ + Takes a string key and num_partitions as argument and returns + a partition to be used for the message + + partitions - The list of partitions is passed in every call. This + may look like an overhead, but it will be useful + (in future) when we handle cases like rebalancing + """ + raise NotImplemented('partition function has to be implemented') + + +class RoundRobinPartitioner(Partitioner): + """ + Implements a round robin partitioner which sends data to partitions + in a round robin fashion + """ + def __init__(self, partitions): + self.partitions = cycle(partitions) + + def partition(self, key, partitions): + # Refresh the partition list if necessary + if self.partitions != partitions: + self.partitions = cycle(partitions) + + return self.partitions.next() + + +class HashedPartitioner(Partitioner): + """ + Implements a partitioner which selects the target partition based on + the hash of the key + """ + def partition(self, key, partitions): + size = len(partitions) + idx = hash(key) % size + return partitions[idx] diff --git a/kafka/producer.py b/kafka/producer.py index 589eb11..75f90c6 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -3,6 +3,7 @@ import logging from kafka.common import ProduceRequest from kafka.protocol import create_message +from kafka.partitioner import HashedPartitioner log = logging.getLogger("kafka") @@ -23,3 +24,34 @@ class SimpleProducer(object): resp = self.client.send_produce_request([req])[0] assert resp.error == 0 + + +class KeyedProducer(object): + """ + A producer which distributes messages to partitions based on the key + + Args: + client - The kafka client instance + topic - The kafka topic to send messages to + partitioner - A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + """ + def __init__(self, client, topic, partitioner=None): + self.client = client + self.topic = topic + self.client._load_metadata_for_topics(topic) + + if not partitioner: + partitioner = HashedPartitioner + + self.partitioner = partitioner(self.client.topic_partitions[topic]) + + def send(self, client, key, msg): + partitions = self.client.topic_partitions[topic] + partition = self.partitioner.partition(key, partitions) + + req = ProduceRequest(self.topic, partition, + messages=[create_message(msg)]) + + resp = self.client.send_produce_request([req])[0] + assert resp.error == 0 From ac76520bf880fb91e76d34a8e67cbe7e96017c7d Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 13 Jun 2013 14:13:32 +0530 Subject: [PATCH 2/4] Fix markup in README --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index dbdb295..40b5585 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ kafka.close() ``` ## Keyed messages +```python from kafka.client import KafkaClient from kafka.producer import KeyedProducer from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner @@ -51,7 +52,7 @@ kafka = KafkaClient("localhost", 9092) producer = KeyedProducer(kafka, "my-topic", partitioner=HashedPartitioner) producer.send("key1", "some message") producer.send("key2", "this methode") - +``` ## Low level From d3cf18a9fb961f8c640bd2ff0384b9a66ae179bb Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 13 Jun 2013 14:40:09 +0530 Subject: [PATCH 3/4] Fix bugs and testing * Ensure that round-robin partitioner works fine * _load_metadata_for_topics() would cause duplicate and stale entries in self.topic_partitions. Fix this --- kafka/client.py | 5 +++++ kafka/partitioner.py | 10 +++++++--- kafka/producer.py | 4 ++-- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 5595d49..1146798 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -73,7 +73,12 @@ class KafkaClient(object): self.brokers.update(brokers) self.topics_to_brokers = {} + for topic, partitions in topics.items(): + # Clear the list once before we add it. This removes stale entries + # and avoids duplicates + self.topic_partitions.pop(topic, None) + if not partitions: log.info("Partition is unassigned, delay for 1s and retry") time.sleep(1) diff --git a/kafka/partitioner.py b/kafka/partitioner.py index 0f49b07..84db4d5 100644 --- a/kafka/partitioner.py +++ b/kafka/partitioner.py @@ -31,14 +31,18 @@ class RoundRobinPartitioner(Partitioner): in a round robin fashion """ def __init__(self, partitions): - self.partitions = cycle(partitions) + self._set_partitions(partitions) + + def _set_partitions(self, partitions): + self.partitions = partitions + self.iterpart = cycle(partitions) def partition(self, key, partitions): # Refresh the partition list if necessary if self.partitions != partitions: - self.partitions = cycle(partitions) + self._set_partitions(partitions) - return self.partitions.next() + return self.iterpart.next() class HashedPartitioner(Partitioner): diff --git a/kafka/producer.py b/kafka/producer.py index 75f90c6..69c3830 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -46,8 +46,8 @@ class KeyedProducer(object): self.partitioner = partitioner(self.client.topic_partitions[topic]) - def send(self, client, key, msg): - partitions = self.client.topic_partitions[topic] + def send(self, key, msg): + partitions = self.client.topic_partitions[self.topic] partition = self.partitioner.partition(key, partitions) req = ProduceRequest(self.topic, partition, From 0723cf8beca7ff6433c79ca04ad1945b00eea6d1 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 13 Jun 2013 14:43:23 +0530 Subject: [PATCH 4/4] Better examples in README --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 40b5585..60cc745 100644 --- a/README.md +++ b/README.md @@ -49,9 +49,12 @@ from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner kafka = KafkaClient("localhost", 9092) -producer = KeyedProducer(kafka, "my-topic", partitioner=HashedPartitioner) +# HashedPartitioner is default +producer = KeyedProducer(kafka, "my-topic") producer.send("key1", "some message") producer.send("key2", "this methode") + +producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) ``` ## Low level