Fix client error handling
This differentiates between errors that occur when sending the request and receiving the response, and adds BufferUnderflowError handling.
This commit is contained in:
@@ -6,8 +6,10 @@ import logging
|
||||
import socket
|
||||
import time
|
||||
|
||||
from kafka.common import ErrorMapping, TopicAndPartition
|
||||
from kafka.common import ConnectionError, FailedPayloadsException
|
||||
from kafka.common import (
|
||||
ErrorMapping, TopicAndPartition, BufferUnderflowError, ConnectionError,
|
||||
FailedPayloadsException
|
||||
)
|
||||
from kafka.conn import KafkaConnection
|
||||
from kafka.protocol import KafkaProtocol
|
||||
|
||||
@@ -165,14 +167,24 @@ class KafkaClient(object):
|
||||
request = encoder_fn(client_id=self.client_id,
|
||||
correlation_id=requestId, payloads=payloads)
|
||||
|
||||
failed = False
|
||||
# Send the request, recv the response
|
||||
try:
|
||||
conn.send(requestId, request)
|
||||
if decoder_fn is None:
|
||||
continue
|
||||
response = conn.recv(requestId)
|
||||
except ConnectionError, e: # ignore BufferUnderflow for now
|
||||
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
|
||||
try:
|
||||
response = conn.recv(requestId)
|
||||
except (ConnectionError, BufferUnderflowError), e:
|
||||
log.warning("Could not receive response to request [%s] "
|
||||
"from server %s: %s", request, conn, e)
|
||||
failed = True
|
||||
except ConnectionError, e:
|
||||
log.warning("Could not send request [%s] to server %s: %s",
|
||||
request, conn, e)
|
||||
failed = True
|
||||
|
||||
if failed:
|
||||
failed_payloads += payloads
|
||||
self.topics_to_brokers = {} # reset metadata
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user