Cleanup unneeded bootstrap connection to avoid leak in KafkaClient
This commit is contained in:
@@ -133,10 +133,10 @@ class KafkaClient(object):
|
||||
self._delayed_tasks = DelayedTaskQueue()
|
||||
self._last_bootstrap = 0
|
||||
self._bootstrap_fails = 0
|
||||
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
|
||||
self._wake_r, self._wake_w = socket.socketpair()
|
||||
self._wake_r.setblocking(False)
|
||||
self._selector.register(self._wake_r, selectors.EVENT_READ)
|
||||
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
|
||||
|
||||
def _bootstrap(self, hosts):
|
||||
# Exponential backoff if bootstrap fails
|
||||
@@ -174,6 +174,8 @@ class KafkaClient(object):
|
||||
# in that case, we should keep the bootstrap connection
|
||||
if not len(self.cluster.brokers()):
|
||||
self._conns['bootstrap'] = bootstrap
|
||||
else:
|
||||
bootstrap.close()
|
||||
self._bootstrap_fails = 0
|
||||
break
|
||||
# No bootstrap found...
|
||||
|
||||
@@ -183,19 +183,22 @@ def test_close(mocker, conn):
|
||||
cli = KafkaClient()
|
||||
mocker.patch.object(cli, '_selector')
|
||||
|
||||
# bootstrap connection should have been closed
|
||||
assert conn.close.call_count == 1
|
||||
|
||||
# Unknown node - silent
|
||||
cli.close(2)
|
||||
|
||||
# Single node close
|
||||
cli._maybe_connect(0)
|
||||
assert not conn.close.call_count
|
||||
cli.close(0)
|
||||
assert conn.close.call_count == 1
|
||||
cli.close(0)
|
||||
assert conn.close.call_count == 2
|
||||
|
||||
# All node close
|
||||
cli._maybe_connect(1)
|
||||
cli.close()
|
||||
assert conn.close.call_count == 3
|
||||
assert conn.close.call_count == 4
|
||||
|
||||
|
||||
def test_is_disconnected(conn):
|
||||
|
||||
Reference in New Issue
Block a user