Initialized _dirty in KafkaConnection __init__() and set _sock to None in close()
This commit is contained in:
@@ -55,6 +55,7 @@ class KafkaConnection(local):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.timeout = timeout
|
||||
self._dirty = None
|
||||
self._sock = None
|
||||
|
||||
self.reinit()
|
||||
@@ -75,7 +76,7 @@ class KafkaConnection(local):
|
||||
responses = []
|
||||
|
||||
log.debug("About to read %d bytes from Kafka", num_bytes)
|
||||
if self._dirty:
|
||||
if self._dirty or not self._sock:
|
||||
self.reinit()
|
||||
|
||||
while bytes_left:
|
||||
@@ -105,7 +106,7 @@ class KafkaConnection(local):
|
||||
"Send a request to Kafka"
|
||||
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
|
||||
try:
|
||||
if self._dirty:
|
||||
if self._dirty or not self._sock:
|
||||
self.reinit()
|
||||
sent = self._sock.sendall(payload)
|
||||
if sent is not None:
|
||||
@@ -143,6 +144,7 @@ class KafkaConnection(local):
|
||||
"""
|
||||
if self._sock:
|
||||
self._sock.close()
|
||||
self._sock = None
|
||||
|
||||
def reinit(self):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user