More _maybe_connect refactoring -- preparing for selectors
This commit is contained in:
@@ -163,21 +163,29 @@ class KafkaClient(object):
|
|||||||
host, port, afi = get_ip_port_afi(broker.host)
|
host, port, afi = get_ip_port_afi(broker.host)
|
||||||
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
|
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
|
||||||
**self.config)
|
**self.config)
|
||||||
state = self._conns[node_id].connect()
|
conn = self._conns[node_id]
|
||||||
if self._conns[node_id].connecting():
|
if conn.connected():
|
||||||
self._connecting.add(node_id)
|
return True
|
||||||
|
|
||||||
|
conn.connect()
|
||||||
|
|
||||||
|
if conn.connecting():
|
||||||
|
if node_id not in self._connecting:
|
||||||
|
self._connecting.add(node_id)
|
||||||
|
|
||||||
# Whether CONNECTED or DISCONNECTED, we need to remove from connecting
|
# Whether CONNECTED or DISCONNECTED, we need to remove from connecting
|
||||||
elif node_id in self._connecting:
|
elif node_id in self._connecting:
|
||||||
log.debug("Node %s connection state is %s", node_id, state)
|
|
||||||
self._connecting.remove(node_id)
|
self._connecting.remove(node_id)
|
||||||
|
|
||||||
|
if conn.connected():
|
||||||
|
log.debug("Node %s connected", node_id)
|
||||||
|
|
||||||
# Connection failures imply that our metadata is stale, so let's refresh
|
# Connection failures imply that our metadata is stale, so let's refresh
|
||||||
if state is ConnectionStates.DISCONNECTED:
|
elif conn.disconnected():
|
||||||
log.warning("Node %s connect failed -- refreshing metadata", node_id)
|
log.warning("Node %s connect failed -- refreshing metadata", node_id)
|
||||||
self.cluster.request_update()
|
self.cluster.request_update()
|
||||||
|
|
||||||
return self._conns[node_id].connected()
|
return conn.connected()
|
||||||
|
|
||||||
def ready(self, node_id):
|
def ready(self, node_id):
|
||||||
"""Check whether a node is connected and ok to send more requests.
|
"""Check whether a node is connected and ok to send more requests.
|
||||||
@@ -228,7 +236,7 @@ class KafkaClient(object):
|
|||||||
"""
|
"""
|
||||||
if node_id not in self._conns:
|
if node_id not in self._conns:
|
||||||
return False
|
return False
|
||||||
return self._conns[node_id].state is ConnectionStates.DISCONNECTED
|
return self._conns[node_id].disconnected()
|
||||||
|
|
||||||
def connection_delay(self, node_id):
|
def connection_delay(self, node_id):
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user