Apply new _get_conn connect logic in KafkaClient.check_version
This commit is contained in:
@@ -598,18 +598,26 @@ class KafkaClient(object):
|
||||
raise Errors.NoBrokersAvailable()
|
||||
|
||||
def connect(node_id):
|
||||
timeout_at = time.time() + timeout
|
||||
# brokers < 0.9 do not return any broker metadata if there are no topics
|
||||
# so we're left with a single bootstrap connection
|
||||
while not self.ready(node_id):
|
||||
if time.time() >= timeout_at:
|
||||
raise Errors.NodeNotReadyError(node_id)
|
||||
time.sleep(0.025)
|
||||
|
||||
self._maybe_connect(node_id)
|
||||
conn = self._conns[node_id]
|
||||
# Monkeypatch the connection request timeout
|
||||
# Generally this timeout should not get triggered
|
||||
# but in case it does, we want it to be reasonably short
|
||||
self._conns[node_id].config['request_timeout_ms'] = timeout * 1000
|
||||
conn.config['request_timeout_ms'] = timeout * 1000
|
||||
if conn.connected():
|
||||
return
|
||||
|
||||
timeout_at = time.time() + timeout
|
||||
# brokers < 0.9 do not return any broker metadata if there are no topics
|
||||
# so we're left with a single bootstrap connection
|
||||
while time.time() < timeout_at and conn.connecting():
|
||||
if conn.connect() is ConnectionStates.CONNECTED:
|
||||
break
|
||||
else:
|
||||
time.sleep(0.05)
|
||||
else:
|
||||
conn.close()
|
||||
raise Errors.NodeNotReadyError(node_id)
|
||||
|
||||
# kafka kills the connection when it doesnt recognize an API request
|
||||
# so we can send a test request and then follow immediately with a
|
||||
|
||||
Reference in New Issue
Block a user