Merge pull request #606 from zackdever/partition-leader-errors

Catch all errors thrown by _get_leader_for_partition in SimpleClient
This commit is contained in:
Dana Powers
2016-03-17 16:32:30 -07:00
2 changed files with 5 additions and 4 deletions

View File

@@ -169,7 +169,8 @@ class SimpleClient(object):
for payload in payloads:
try:
leader = self._get_leader_for_partition(payload.topic, payload.partition)
except KafkaUnavailableError:
except (KafkaUnavailableError, LeaderNotAvailableError,
UnknownTopicOrPartitionError):
leader = None
payloads_by_broker[leader].append(payload)
return dict(payloads_by_broker)

View File

@@ -11,7 +11,7 @@ from kafka.common import (
BrokerMetadata,
TopicPartition, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
KafkaTimeoutError, ConnectionError
KafkaTimeoutError, ConnectionError, FailedPayloadsError
)
from kafka.conn import KafkaConnection
from kafka.future import Future
@@ -361,7 +361,7 @@ class TestSimpleClient(unittest.TestCase):
"topic_noleader", 0,
[create_message("a"), create_message("b")])]
with self.assertRaises(LeaderNotAvailableError):
with self.assertRaises(FailedPayloadsError):
client.send_produce_request(requests)
@patch('kafka.SimpleClient._get_conn')
@@ -386,7 +386,7 @@ class TestSimpleClient(unittest.TestCase):
"topic_doesnt_exist", 0,
[create_message("a"), create_message("b")])]
with self.assertRaises(UnknownTopicOrPartitionError):
with self.assertRaises(FailedPayloadsError):
client.send_produce_request(requests)
def test_timeout(self):