Dont warn on socket disconnections caused by KafkaClient.close()
This commit is contained in:
@@ -136,6 +136,7 @@ class KafkaClient(object):
|
||||
self._wake_r, self._wake_w = socket.socketpair()
|
||||
self._wake_r.setblocking(False)
|
||||
self._selector.register(self._wake_r, selectors.EVENT_READ)
|
||||
self._closed = False
|
||||
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
|
||||
|
||||
def _bootstrap(self, hosts):
|
||||
@@ -226,7 +227,7 @@ class KafkaClient(object):
|
||||
self._selector.unregister(conn._sock)
|
||||
except KeyError:
|
||||
pass
|
||||
if self._refresh_on_disconnects:
|
||||
if self._refresh_on_disconnects and not self._closed:
|
||||
log.warning("Node %s connection failed -- refreshing metadata", node_id)
|
||||
self.cluster.request_update()
|
||||
|
||||
@@ -274,6 +275,7 @@ class KafkaClient(object):
|
||||
node_id (int, optional): the id of the node to close
|
||||
"""
|
||||
if node_id is None:
|
||||
self._closed = True
|
||||
for conn in self._conns.values():
|
||||
conn.close()
|
||||
self._wake_r.close()
|
||||
|
||||
Reference in New Issue
Block a user