Improve socket disconnect handling
This commit is contained in:
@@ -225,7 +225,7 @@ class KafkaClient(object):
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
if self._refresh_on_disconnects:
|
if self._refresh_on_disconnects:
|
||||||
log.warning("Node %s connect failed -- refreshing metadata", node_id)
|
log.warning("Node %s connection failed -- refreshing metadata", node_id)
|
||||||
self.cluster.request_update()
|
self.cluster.request_update()
|
||||||
|
|
||||||
def _maybe_connect(self, node_id):
|
def _maybe_connect(self, node_id):
|
||||||
|
@@ -381,9 +381,17 @@ class BrokerConnection(object):
|
|||||||
# Not receiving is the state of reading the payload header
|
# Not receiving is the state of reading the payload header
|
||||||
if not self._receiving:
|
if not self._receiving:
|
||||||
try:
|
try:
|
||||||
# An extremely small, but non-zero, probability that there are
|
bytes_to_read = 4 - self._rbuffer.tell()
|
||||||
# more than 0 but not yet 4 bytes available to read
|
data = self._sock.recv(bytes_to_read)
|
||||||
self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
|
# We expect socket.recv to raise an exception if there is not
|
||||||
|
# enough data to read the full bytes_to_read
|
||||||
|
# but if the socket is disconnected, we will get empty data
|
||||||
|
# without an exception raised
|
||||||
|
if not data:
|
||||||
|
log.error('%s: socket disconnected', self)
|
||||||
|
self.close(error=Errors.ConnectionError('socket disconnected'))
|
||||||
|
return None
|
||||||
|
self._rbuffer.write(data)
|
||||||
except ssl.SSLWantReadError:
|
except ssl.SSLWantReadError:
|
||||||
return None
|
return None
|
||||||
except ConnectionError as e:
|
except ConnectionError as e:
|
||||||
@@ -411,7 +419,17 @@ class BrokerConnection(object):
|
|||||||
if self._receiving:
|
if self._receiving:
|
||||||
staged_bytes = self._rbuffer.tell()
|
staged_bytes = self._rbuffer.tell()
|
||||||
try:
|
try:
|
||||||
self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
|
bytes_to_read = self._next_payload_bytes - staged_bytes
|
||||||
|
data = self._sock.recv(bytes_to_read)
|
||||||
|
# We expect socket.recv to raise an exception if there is not
|
||||||
|
# enough data to read the full bytes_to_read
|
||||||
|
# but if the socket is disconnected, we will get empty data
|
||||||
|
# without an exception raised
|
||||||
|
if not data:
|
||||||
|
log.error('%s: socket disconnected', self)
|
||||||
|
self.close(error=Errors.ConnectionError('socket disconnected'))
|
||||||
|
return None
|
||||||
|
self._rbuffer.write(data)
|
||||||
except ssl.SSLWantReadError:
|
except ssl.SSLWantReadError:
|
||||||
return None
|
return None
|
||||||
except ConnectionError as e:
|
except ConnectionError as e:
|
||||||
|
Reference in New Issue
Block a user