Changes based on comments by @rdiomar, plus added LeaderUnavailableError for clarity
This commit is contained in:
@@ -8,6 +8,7 @@ from itertools import count
|
||||
from kafka.common import (ErrorMapping, TopicAndPartition,
|
||||
ConnectionError, FailedPayloadsError,
|
||||
BrokerResponseError, PartitionUnavailableError,
|
||||
LeaderUnavailableError,
|
||||
KafkaUnavailableError)
|
||||
|
||||
from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
|
||||
@@ -124,8 +125,10 @@ class KafkaClient(object):
|
||||
leader = self._get_leader_for_partition(payload.topic,
|
||||
payload.partition)
|
||||
if leader is None:
|
||||
raise PartitionUnavailableError(
|
||||
"No leader for topic %s partition %s" % (payload.topic, payload.partition))
|
||||
raise LeaderUnavailableError(
|
||||
"Leader not available for topic %s partition %s" %
|
||||
(payload.topic, payload.partition))
|
||||
|
||||
payloads_by_broker[leader].append(payload)
|
||||
original_keys.append((payload.topic, payload.partition))
|
||||
|
||||
@@ -250,7 +253,7 @@ class KafkaClient(object):
|
||||
self.reset_topic_metadata(topic)
|
||||
|
||||
if not partitions:
|
||||
log.info('No partitions for %s', topic)
|
||||
log.warning('No partitions for %s', topic)
|
||||
continue
|
||||
|
||||
self.topic_partitions[topic] = []
|
||||
@@ -258,7 +261,7 @@ class KafkaClient(object):
|
||||
self.topic_partitions[topic].append(partition)
|
||||
topic_part = TopicAndPartition(topic, partition)
|
||||
if meta.leader == -1:
|
||||
log.info('No leader for topic %s partition %s', topic, partition)
|
||||
log.warning('No leader for topic %s partition %s', topic, partition)
|
||||
self.topics_to_brokers[topic_part] = None
|
||||
else:
|
||||
self.topics_to_brokers[topic_part] = brokers[meta.leader]
|
||||
|
||||
@@ -82,6 +82,10 @@ class BrokerResponseError(KafkaError):
|
||||
pass
|
||||
|
||||
|
||||
class LeaderUnavailableError(KafkaError):
|
||||
pass
|
||||
|
||||
|
||||
class PartitionUnavailableError(KafkaError):
|
||||
pass
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from kafka.common import (
|
||||
ProduceRequest, FetchRequest, Message, ChecksumError,
|
||||
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
|
||||
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
|
||||
TopicAndPartition, PartitionUnavailableError
|
||||
TopicAndPartition, LeaderUnavailableError, PartitionUnavailableError
|
||||
)
|
||||
from kafka.codec import (
|
||||
has_gzip, has_snappy, gzip_encode, gzip_decode,
|
||||
@@ -386,7 +386,7 @@ class TestProtocol(unittest.TestCase):
|
||||
pass
|
||||
|
||||
|
||||
class TestClient(unittest.TestCase):
|
||||
class TestKafkaClient(unittest.TestCase):
|
||||
|
||||
@patch('kafka.client.KafkaConnection')
|
||||
@patch('kafka.client.KafkaProtocol')
|
||||
@@ -520,7 +520,7 @@ class TestClient(unittest.TestCase):
|
||||
@patch('kafka.client.KafkaConnection')
|
||||
@patch('kafka.client.KafkaProtocol')
|
||||
def test_send_produce_request_raises_when_noleader(self, protocol, conn):
|
||||
"Getting leader for partitions returns None when the partiion has no leader"
|
||||
"Send producer request raises LeaderUnavailableError if leader is not available"
|
||||
|
||||
conn.recv.return_value = 'response' # anything but None
|
||||
|
||||
@@ -542,7 +542,7 @@ class TestClient(unittest.TestCase):
|
||||
[create_message("a"), create_message("b")])]
|
||||
|
||||
self.assertRaises(
|
||||
PartitionUnavailableError,
|
||||
LeaderUnavailableError,
|
||||
client.send_produce_request, requests)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Reference in New Issue
Block a user