Keep full PartitionMetadata from MetadataResponses
This commit is contained in:
@@ -9,7 +9,7 @@ import time
|
|||||||
import six
|
import six
|
||||||
|
|
||||||
import kafka.common as Errors
|
import kafka.common as Errors
|
||||||
from kafka.common import BrokerMetadata, TopicPartition
|
from kafka.common import BrokerMetadata, PartitionMetadata, TopicPartition
|
||||||
from .future import Future
|
from .future import Future
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -55,15 +55,17 @@ class ClusterMetadata(object):
|
|||||||
"""Return set of partitions with known leaders"""
|
"""Return set of partitions with known leaders"""
|
||||||
if topic not in self._partitions:
|
if topic not in self._partitions:
|
||||||
return None
|
return None
|
||||||
return set([partition for partition, leader
|
return set([partition for partition, metadata
|
||||||
in six.iteritems(self._partitions[topic])
|
in six.iteritems(self._partitions[topic])
|
||||||
if leader != -1])
|
if metadata.leader != -1])
|
||||||
|
|
||||||
def leader_for_partition(self, partition):
|
def leader_for_partition(self, partition):
|
||||||
"""Return node_id of leader, -1 unavailable, None if unknown."""
|
"""Return node_id of leader, -1 unavailable, None if unknown."""
|
||||||
if partition.topic not in self._partitions:
|
if partition.topic not in self._partitions:
|
||||||
return None
|
return None
|
||||||
return self._partitions[partition.topic].get(partition.partition)
|
elif partition.partition not in self._partitions[partition.topic]:
|
||||||
|
return None
|
||||||
|
return self._partitions[partition.topic][partition.partition].leader
|
||||||
|
|
||||||
def partitions_for_broker(self, broker_id):
|
def partitions_for_broker(self, broker_id):
|
||||||
"""Return TopicPartitions for which the broker is a leader"""
|
"""Return TopicPartitions for which the broker is a leader"""
|
||||||
@@ -133,8 +135,10 @@ class ClusterMetadata(object):
|
|||||||
error_type = Errors.for_code(error_code)
|
error_type = Errors.for_code(error_code)
|
||||||
if error_type is Errors.NoError:
|
if error_type is Errors.NoError:
|
||||||
self._partitions[topic] = {}
|
self._partitions[topic] = {}
|
||||||
for _, partition, leader, _, _ in partitions:
|
for p_error, partition, leader, replicas, isr in partitions:
|
||||||
self._partitions[topic][partition] = leader
|
self._partitions[topic][partition] = PartitionMetadata(
|
||||||
|
topic=topic, partition=partition, leader=leader,
|
||||||
|
replicas=replicas, isr=isr, error=p_error)
|
||||||
if leader != -1:
|
if leader != -1:
|
||||||
self._broker_partitions[leader].add(TopicPartition(topic, partition))
|
self._broker_partitions[leader].add(TopicPartition(topic, partition))
|
||||||
elif error_type is Errors.LeaderNotAvailableError:
|
elif error_type is Errors.LeaderNotAvailableError:
|
||||||
|
|||||||
Reference in New Issue
Block a user