Add available_partitions_for_topic() and partitions_for_broker()

This commit is contained in:
Dana Powers
2016-01-23 15:07:23 -08:00
parent d2012e067c
commit 318b10744c

View File

@@ -1,12 +1,15 @@
from __future__ import absolute_import
import collections
import copy
import logging
import random
import time
import six
import kafka.common as Errors
from kafka.common import BrokerMetadata
from kafka.common import BrokerMetadata, TopicPartition
from .future import Future
log = logging.getLogger(__name__)
@@ -21,6 +24,7 @@ class ClusterMetadata(object):
def __init__(self, **configs):
self._brokers = {}
self._partitions = {}
self._broker_partitions = collections.defaultdict(set)
self._groups = {}
self._version = 0
self._last_refresh_ms = 0
@@ -41,15 +45,29 @@ class ClusterMetadata(object):
return self._brokers.get(broker_id)
def partitions_for_topic(self, topic):
"""Return set of all partitions for topic (whether available or not)"""
if topic not in self._partitions:
return None
return set(self._partitions[topic].keys())
def available_partitions_for_topic(self, topic):
"""Return set of partitions with known leaders"""
if topic not in self._partitions:
return None
return set([partition for partition, leader
in six.iteritems(self._partitions[topic])
if leader != -1])
def leader_for_partition(self, partition):
"""Return node_id of leader, -1 unavailable, None if unknown."""
if partition.topic not in self._partitions:
return None
return self._partitions[partition.topic].get(partition.partition)
def partitions_for_broker(self, broker_id):
"""Return TopicPartitions for which the broker is a leader"""
return self._broker_partitions.get(broker_id)
def coordinator_for_group(self, group):
return self._groups.get(group)
@@ -106,7 +124,8 @@ class ClusterMetadata(object):
# Drop any UnknownTopic, InvalidTopic, and TopicAuthorizationFailed
# but retain LeaderNotAvailable because it means topic is initializing
self._partitions = {}
self._partitions.clear()
self._broker_partitions.clear()
for error_code, topic, partitions in metadata.topics:
error_type = Errors.for_code(error_code)
@@ -114,6 +133,8 @@ class ClusterMetadata(object):
self._partitions[topic] = {}
for _, partition, leader, _, _ in partitions:
self._partitions[topic][partition] = leader
if leader != -1:
self._broker_partitions[leader].add(TopicPartition(topic, partition))
elif error_type is Errors.LeaderNotAvailableError:
log.error("Topic %s is not available during auto-create"
" initialization", topic)