KafkaClient should try/except ConnnectionError when calling _get_conn
This commit is contained in:
@@ -169,19 +169,19 @@ class KafkaClient(object):
|
|||||||
responses_by_broker = collections.defaultdict(list)
|
responses_by_broker = collections.defaultdict(list)
|
||||||
broker_failures = []
|
broker_failures = []
|
||||||
for broker, payloads in payloads_by_broker.items():
|
for broker, payloads in payloads_by_broker.items():
|
||||||
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
|
|
||||||
requestId = self._next_id()
|
requestId = self._next_id()
|
||||||
request = encoder_fn(client_id=self.client_id,
|
request = encoder_fn(client_id=self.client_id,
|
||||||
correlation_id=requestId, payloads=payloads)
|
correlation_id=requestId, payloads=payloads)
|
||||||
|
|
||||||
# Send the request, recv the response
|
# Send the request, recv the response
|
||||||
try:
|
try:
|
||||||
|
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
|
||||||
conn.send(requestId, request)
|
conn.send(requestId, request)
|
||||||
|
|
||||||
except ConnectionError as e:
|
except ConnectionError as e:
|
||||||
broker_failures.append(broker)
|
broker_failures.append(broker)
|
||||||
log.warning("Could not send request [%s] to server %s: %s",
|
log.warning("Could not send request [%s] to server %s: %s",
|
||||||
binascii.b2a_hex(request), conn, e)
|
binascii.b2a_hex(request), broker, e)
|
||||||
|
|
||||||
for payload in payloads:
|
for payload in payloads:
|
||||||
responses_by_broker[broker].append(FailedPayloadsError(payload))
|
responses_by_broker[broker].append(FailedPayloadsError(payload))
|
||||||
|
Reference in New Issue
Block a user