From 2ac8503647003535c872ed82c311c4b335862ec5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 May 2016 15:18:21 -0700 Subject: [PATCH] Cleanup wakeup socketpair on close to avoid leak in KafkaClient --- kafka/client_async.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 7079f01..e0db51a 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -138,12 +138,6 @@ class KafkaClient(object): self._wake_r.setblocking(False) self._selector.register(self._wake_r, selectors.EVENT_READ) - def __del__(self): - if hasattr(self, '_wake_r'): - self._wake_r.close() - if hasattr(self, '_wake_w'): - self._wake_w.close() - def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails @@ -272,14 +266,16 @@ class KafkaClient(object): return self._conns[node_id].connected() def close(self, node_id=None): - """Closes the connection to a particular node (if there is one). + """Closes one or all broker connections. Arguments: - node_id (int): the id of the node to close + node_id (int, optional): the id of the node to close """ if node_id is None: for conn in self._conns.values(): conn.close() + self._wake_r.close() + self._wake_w.close() elif node_id in self._conns: self._conns[node_id].close() else: