Treat KafkaUnavailableError like other errors.
This commit is contained in:
@@ -162,11 +162,16 @@ class KafkaClient(object):
|
||||
payloads_by_broker = collections.defaultdict(list)
|
||||
|
||||
for payload in payloads:
|
||||
leader = self._get_leader_for_partition(payload.topic,
|
||||
payload.partition)
|
||||
|
||||
payloads_by_broker[leader].append(payload)
|
||||
brokers_for_payloads.append(leader)
|
||||
try:
|
||||
leader = self._get_leader_for_partition(payload.topic,
|
||||
payload.partition)
|
||||
payloads_by_broker[leader].append(payload)
|
||||
brokers_for_payloads.append(leader)
|
||||
except KafkaUnavailableError as e:
|
||||
log.warning('KafkaUnavailableError attempting to send request '
|
||||
'on topic %s partition %d', payload.topic, payload.partition)
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = FailedPayloadsErrors(payload)
|
||||
|
||||
# For each broker, send the list of request payloads
|
||||
# and collect the responses and errors
|
||||
|
||||
Reference in New Issue
Block a user