From 4d0722aef2f7420f99b25f28e79fecc47b077d2d Mon Sep 17 00:00:00 2001 From: Alex Couture-Beil Date: Tue, 1 Apr 2014 18:07:24 -0700 Subject: [PATCH] Changed randomization to simply randomize the initial starting partition of the sorted list of partition rather than completely randomizing the initial ordering before round-robin cycling the partitions --- kafka/producer.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/kafka/producer.py b/kafka/producer.py index 6359aa2..7a7c48f 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -198,9 +198,13 @@ class SimpleProducer(Producer): if topic not in self.partition_cycles: if topic not in self.client.topic_partitions: self.client.load_metadata_for_topics(topic) - randomly_ordered_partitions = self.client.topic_partitions[topic][:] - random.shuffle(randomly_ordered_partitions) - self.partition_cycles[topic] = cycle(randomly_ordered_partitions) + self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + + # Randomize the initial partition that is returned + num_partitions = len(self.client.topic_partitions[topic]) + for _ in xrange(random.randint(0, num_partitions-1)): + self.partition_cycles[topic].next() + return self.partition_cycles[topic].next() def send_messages(self, topic, *msg):