Raise a ConnectionError when a socket.error is raised when receiving data
Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead.
This commit is contained in:
@@ -35,13 +35,21 @@ class KafkaConnection(local):
|
||||
# Private API #
|
||||
###################
|
||||
|
||||
def _raise_connection_error(self):
|
||||
self._dirty = True
|
||||
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
|
||||
|
||||
def _read_bytes(self, num_bytes):
|
||||
bytes_left = num_bytes
|
||||
resp = ''
|
||||
log.debug("About to read %d bytes from Kafka", num_bytes)
|
||||
|
||||
while bytes_left:
|
||||
try:
|
||||
data = self._sock.recv(bytes_left)
|
||||
except socket.error, e:
|
||||
log.error('Unable to receive data from Kafka: %s', e)
|
||||
self._raise_connection_error()
|
||||
if data == '':
|
||||
raise BufferUnderflowError("Not enough data to read this response")
|
||||
bytes_left -= len(data)
|
||||
@@ -65,10 +73,6 @@ class KafkaConnection(local):
|
||||
resp = self._read_bytes(size)
|
||||
return str(resp)
|
||||
|
||||
def _raise_connection_error(self):
|
||||
self._dirty = True
|
||||
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
|
||||
|
||||
##################
|
||||
# Public API #
|
||||
##################
|
||||
@@ -84,8 +88,8 @@ class KafkaConnection(local):
|
||||
sent = self._sock.sendall(payload)
|
||||
if sent is not None:
|
||||
self._raise_connection_error()
|
||||
except socket.error:
|
||||
log.exception('Unable to send payload to Kafka')
|
||||
except socket.error, e:
|
||||
log.error('Unable to send payload to Kafka: %s', e)
|
||||
self._raise_connection_error()
|
||||
|
||||
def recv(self, request_id):
|
||||
|
@@ -67,8 +67,8 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
|
||||
client.send_produce_request(reqs,
|
||||
acks=req_acks,
|
||||
timeout=ack_timeout)
|
||||
except Exception as exp:
|
||||
log.exception("Unable to send message")
|
||||
except Exception as e:
|
||||
log.error("Unable to send message: %s", e)
|
||||
|
||||
|
||||
class Producer(object):
|
||||
@@ -145,7 +145,7 @@ class Producer(object):
|
||||
resp = self.client.send_produce_request([req], acks=self.req_acks,
|
||||
timeout=self.ack_timeout)
|
||||
except Exception as e:
|
||||
log.exception("Unable to send messages")
|
||||
log.error("Unable to send messages: %s", e)
|
||||
raise e
|
||||
return resp
|
||||
|
||||
|
Reference in New Issue
Block a user