Fix bugs and testing
* Ensure that round-robin partitioner works fine * _load_metadata_for_topics() would cause duplicate and stale entries in self.topic_partitions. Fix this
This commit is contained in:
@@ -73,7 +73,12 @@ class KafkaClient(object):
|
||||
|
||||
self.brokers.update(brokers)
|
||||
self.topics_to_brokers = {}
|
||||
|
||||
for topic, partitions in topics.items():
|
||||
# Clear the list once before we add it. This removes stale entries
|
||||
# and avoids duplicates
|
||||
self.topic_partitions.pop(topic, None)
|
||||
|
||||
if not partitions:
|
||||
log.info("Partition is unassigned, delay for 1s and retry")
|
||||
time.sleep(1)
|
||||
|
@@ -31,14 +31,18 @@ class RoundRobinPartitioner(Partitioner):
|
||||
in a round robin fashion
|
||||
"""
|
||||
def __init__(self, partitions):
|
||||
self.partitions = cycle(partitions)
|
||||
self._set_partitions(partitions)
|
||||
|
||||
def _set_partitions(self, partitions):
|
||||
self.partitions = partitions
|
||||
self.iterpart = cycle(partitions)
|
||||
|
||||
def partition(self, key, partitions):
|
||||
# Refresh the partition list if necessary
|
||||
if self.partitions != partitions:
|
||||
self.partitions = cycle(partitions)
|
||||
self._set_partitions(partitions)
|
||||
|
||||
return self.partitions.next()
|
||||
return self.iterpart.next()
|
||||
|
||||
|
||||
class HashedPartitioner(Partitioner):
|
||||
|
@@ -46,8 +46,8 @@ class KeyedProducer(object):
|
||||
|
||||
self.partitioner = partitioner(self.client.topic_partitions[topic])
|
||||
|
||||
def send(self, client, key, msg):
|
||||
partitions = self.client.topic_partitions[topic]
|
||||
def send(self, key, msg):
|
||||
partitions = self.client.topic_partitions[self.topic]
|
||||
partition = self.partitioner.partition(key, partitions)
|
||||
|
||||
req = ProduceRequest(self.topic, partition,
|
||||
|
Reference in New Issue
Block a user