Handle dirty flag in conn.recv()
* If the connection is dirty, reinit * If we get a BufferUnderflowError, the server could have gone away, so mark it dirty
This commit is contained in:
@@ -44,7 +44,8 @@ class KafkaConnection(local):
|
|||||||
bytes_left = num_bytes
|
bytes_left = num_bytes
|
||||||
resp = ''
|
resp = ''
|
||||||
log.debug("About to read %d bytes from Kafka", num_bytes)
|
log.debug("About to read %d bytes from Kafka", num_bytes)
|
||||||
|
if self._dirty:
|
||||||
|
self.reinit()
|
||||||
while bytes_left:
|
while bytes_left:
|
||||||
try:
|
try:
|
||||||
data = self._sock.recv(bytes_left)
|
data = self._sock.recv(bytes_left)
|
||||||
@@ -52,6 +53,7 @@ class KafkaConnection(local):
|
|||||||
log.error('Unable to receive data from Kafka: %s', e)
|
log.error('Unable to receive data from Kafka: %s', e)
|
||||||
self._raise_connection_error()
|
self._raise_connection_error()
|
||||||
if data == '':
|
if data == '':
|
||||||
|
self._dirty = True
|
||||||
raise BufferUnderflowError("Not enough data to read this response")
|
raise BufferUnderflowError("Not enough data to read this response")
|
||||||
bytes_left -= len(data)
|
bytes_left -= len(data)
|
||||||
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
|
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
|
||||||
|
|||||||
Reference in New Issue
Block a user