catch all errors thrown by _get_leader_for_partition in SimpleClient
This commit is contained in:
@@ -169,7 +169,8 @@ class SimpleClient(object):
|
||||
for payload in payloads:
|
||||
try:
|
||||
leader = self._get_leader_for_partition(payload.topic, payload.partition)
|
||||
except KafkaUnavailableError:
|
||||
except (KafkaUnavailableError, LeaderNotAvailableError,
|
||||
UnknownTopicOrPartitionError):
|
||||
leader = None
|
||||
payloads_by_broker[leader].append(payload)
|
||||
return dict(payloads_by_broker)
|
||||
|
||||
@@ -11,7 +11,7 @@ from kafka.common import (
|
||||
BrokerMetadata,
|
||||
TopicPartition, KafkaUnavailableError,
|
||||
LeaderNotAvailableError, UnknownTopicOrPartitionError,
|
||||
KafkaTimeoutError, ConnectionError
|
||||
KafkaTimeoutError, ConnectionError, FailedPayloadsError
|
||||
)
|
||||
from kafka.conn import KafkaConnection
|
||||
from kafka.future import Future
|
||||
@@ -361,7 +361,7 @@ class TestSimpleClient(unittest.TestCase):
|
||||
"topic_noleader", 0,
|
||||
[create_message("a"), create_message("b")])]
|
||||
|
||||
with self.assertRaises(LeaderNotAvailableError):
|
||||
with self.assertRaises(FailedPayloadsError):
|
||||
client.send_produce_request(requests)
|
||||
|
||||
@patch('kafka.SimpleClient._get_conn')
|
||||
@@ -386,7 +386,7 @@ class TestSimpleClient(unittest.TestCase):
|
||||
"topic_doesnt_exist", 0,
|
||||
[create_message("a"), create_message("b")])]
|
||||
|
||||
with self.assertRaises(UnknownTopicOrPartitionError):
|
||||
with self.assertRaises(FailedPayloadsError):
|
||||
client.send_produce_request(requests)
|
||||
|
||||
def test_timeout(self):
|
||||
|
||||
Reference in New Issue
Block a user