Cleanup wakeup socketpair on close to avoid leak in KafkaClient
This commit is contained in:
@@ -138,12 +138,6 @@ class KafkaClient(object):
|
||||
self._wake_r.setblocking(False)
|
||||
self._selector.register(self._wake_r, selectors.EVENT_READ)
|
||||
|
||||
def __del__(self):
|
||||
if hasattr(self, '_wake_r'):
|
||||
self._wake_r.close()
|
||||
if hasattr(self, '_wake_w'):
|
||||
self._wake_w.close()
|
||||
|
||||
def _bootstrap(self, hosts):
|
||||
# Exponential backoff if bootstrap fails
|
||||
backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
|
||||
@@ -272,14 +266,16 @@ class KafkaClient(object):
|
||||
return self._conns[node_id].connected()
|
||||
|
||||
def close(self, node_id=None):
|
||||
"""Closes the connection to a particular node (if there is one).
|
||||
"""Closes one or all broker connections.
|
||||
|
||||
Arguments:
|
||||
node_id (int): the id of the node to close
|
||||
node_id (int, optional): the id of the node to close
|
||||
"""
|
||||
if node_id is None:
|
||||
for conn in self._conns.values():
|
||||
conn.close()
|
||||
self._wake_r.close()
|
||||
self._wake_w.close()
|
||||
elif node_id in self._conns:
|
||||
self._conns[node_id].close()
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user