Use standard exceptions in client._get_leader_for_partition()

- drop custom PartitionUnavailable exception
- raise UnknownTopicOrPartitionError or LeaderNotAvailableError
- add tests for exception raises
This commit is contained in:
Dana Powers
2014-09-01 17:03:46 -07:00
parent b260b356b2
commit 945ecbcee7
5 changed files with 48 additions and 37 deletions

View File

@@ -8,9 +8,9 @@ import kafka.common
from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError, LeaderNotAvailableError,
KafkaUnavailableError, KafkaTimeoutError,
UnknownTopicOrPartitionError, NotLeaderForPartitionError)
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -63,20 +63,37 @@ class KafkaClient(object):
Returns the leader for a partition or None if the partition exists
but has no leader.
PartitionUnavailableError will be raised if the topic or partition
UnknownTopicOrPartitionError will be raised if the topic or partition
is not part of the metadata.
LeaderNotAvailableError is raised if server has metadata, but there is
no current leader
"""
key = TopicAndPartition(topic, partition)
# reload metadata whether the partition is not available
# or has no leader (broker is None)
if self.topics_to_brokers.get(key) is None:
self.load_metadata_for_topics(topic)
if key not in self.topics_to_brokers:
raise PartitionUnavailableError("%s not available" % str(key))
# Use cached metadata if it is there
if self.topics_to_brokers.get(key) is not None:
return self.topics_to_brokers[key]
return self.topics_to_brokers[key]
# Otherwise refresh metadata
# If topic does not already exist, this will raise
# UnknownTopicOrPartitionError if not auto-creating
# LeaderNotAvailableError otherwise until partitions are created
self.load_metadata_for_topics(topic)
# If the partition doesn't actually exist, raise
if partition not in self.topic_partitions[topic]:
raise UnknownTopicOrPartitionError(key)
# If there's no leader for the partition, raise
meta = self.topic_partitions[topic][partition]
if meta.leader == -1:
raise LeaderNotAvailableError(meta)
# Otherwise return the BrokerMetadata
return self.brokers[meta.leader]
def _next_id(self):
"""
@@ -136,10 +153,6 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
if leader is None:
raise LeaderNotAvailableError(
"Leader not available for topic %s partition %s" %
(payload.topic, payload.partition))
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))

View File

@@ -158,10 +158,6 @@ class KafkaTimeoutError(KafkaError):
pass
class PartitionUnavailableError(KafkaError):
pass
class FailedPayloadsError(KafkaError):
pass

View File

@@ -7,7 +7,7 @@ from kafka.common import (
ProduceRequest, MetadataResponse,
BrokerMetadata, TopicMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
LeaderNotAvailableError, PartitionUnavailableError, NoError,
LeaderNotAvailableError, NoError,
UnknownTopicOrPartitionError
)
from kafka.protocol import create_message
@@ -191,7 +191,6 @@ class TestKafkaClient(unittest2.TestCase):
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_unassigned_partitions(self, protocol, conn):
"Get leader raises if no partitions is defined for a topic"
conn.recv.return_value = 'response' # anything but None
@@ -201,7 +200,8 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
TopicMetadata('topic_no_partitions', NO_ERROR, [])
TopicMetadata('topic_no_partitions', NO_LEADER, []),
TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -209,13 +209,15 @@ class TestKafkaClient(unittest2.TestCase):
self.assertDictEqual({}, client.topics_to_brokers)
with self.assertRaises(PartitionUnavailableError):
with self.assertRaises(LeaderNotAvailableError):
client._get_leader_for_partition('topic_no_partitions', 0)
with self.assertRaises(UnknownTopicOrPartitionError):
client._get_leader_for_partition('topic_unknown', 0)
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_returns_none_when_noleader(self, protocol, conn):
"Getting leader for partitions returns None when the partiion has no leader"
def test_get_leader_exceptions_when_noleader(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
@@ -241,8 +243,16 @@ class TestKafkaClient(unittest2.TestCase):
TopicAndPartition('topic_noleader', 1): None
},
client.topics_to_brokers)
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
# No leader partitions -- raise LeaderNotAvailableError
with self.assertRaises(LeaderNotAvailableError):
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
with self.assertRaises(LeaderNotAvailableError):
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
# Unknown partitions -- raise UnknownTopicOrPartitionError
with self.assertRaises(UnknownTopicOrPartitionError):
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2))
topics = [
TopicMetadata('topic_noleader', NO_ERROR, [

View File

@@ -5,15 +5,7 @@ import unittest2
from mock import MagicMock, patch
from kafka import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.common import (
ProduceRequest, BrokerMetadata, PartitionMetadata,
TopicAndPartition, PartitionUnavailableError
)
from kafka.protocol import (
create_message, KafkaProtocol
)
class TestKafkaConsumer(unittest2.TestCase):
def test_non_integer_partitions(self):

View File

@@ -13,8 +13,8 @@ from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ProduceResponse, FetchResponse, OffsetAndMessage,
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
KafkaUnavailableError, PartitionUnavailableError,
UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError,
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
ProtocolError
)
from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,