Refactor SimpleClient connect logic to support multiple connecting states
This commit is contained in:
@@ -67,8 +67,12 @@ class SimpleClient(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
conn = self._conns[host_key]
|
conn = self._conns[host_key]
|
||||||
while conn.connect() == ConnectionStates.CONNECTING:
|
timeout = time.time() + self.timeout
|
||||||
pass
|
while time.time() < timeout:
|
||||||
|
if conn.connect() is ConnectionStates.CONNECTED:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise ConnectionError("%s:%s (%s)" % (host, port, afi))
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
def _get_leader_for_partition(self, topic, partition):
|
def _get_leader_for_partition(self, topic, partition):
|
||||||
@@ -149,9 +153,11 @@ class SimpleClient(object):
|
|||||||
random.shuffle(hosts)
|
random.shuffle(hosts)
|
||||||
|
|
||||||
for (host, port, afi) in hosts:
|
for (host, port, afi) in hosts:
|
||||||
conn = self._get_conn(host, port, afi)
|
try:
|
||||||
if not conn.connected():
|
conn = self._get_conn(host, port, afi)
|
||||||
log.warning("Skipping unconnected connection: %s", conn)
|
except ConnectionError:
|
||||||
|
log.warning("Skipping unconnected connection: %s:%s (AFI %s)",
|
||||||
|
host, port, afi)
|
||||||
continue
|
continue
|
||||||
request = encoder_fn(payloads=payloads)
|
request = encoder_fn(payloads=payloads)
|
||||||
future = conn.send(request)
|
future = conn.send(request)
|
||||||
@@ -233,9 +239,9 @@ class SimpleClient(object):
|
|||||||
|
|
||||||
|
|
||||||
host, port, afi = get_ip_port_afi(broker.host)
|
host, port, afi = get_ip_port_afi(broker.host)
|
||||||
conn = self._get_conn(host, broker.port, afi)
|
try:
|
||||||
conn.connect()
|
conn = self._get_conn(host, broker.port, afi)
|
||||||
if not conn.connected():
|
except ConnectionError:
|
||||||
refresh_metadata = True
|
refresh_metadata = True
|
||||||
failed_payloads(broker_payloads)
|
failed_payloads(broker_payloads)
|
||||||
continue
|
continue
|
||||||
@@ -419,10 +425,19 @@ class SimpleClient(object):
|
|||||||
return c
|
return c
|
||||||
|
|
||||||
def reinit(self):
|
def reinit(self):
|
||||||
for conn in self._conns.values():
|
timeout = time.time() + self.timeout
|
||||||
|
conns = set(self._conns.values())
|
||||||
|
for conn in conns:
|
||||||
conn.close()
|
conn.close()
|
||||||
while conn.connect() == ConnectionStates.CONNECTING:
|
conn.connect()
|
||||||
pass
|
|
||||||
|
while time.time() < timeout:
|
||||||
|
for conn in list(conns):
|
||||||
|
conn.connect()
|
||||||
|
if conn.connected():
|
||||||
|
conns.remove(conn)
|
||||||
|
if not conns:
|
||||||
|
break
|
||||||
|
|
||||||
def reset_topic_metadata(self, *topics):
|
def reset_topic_metadata(self, *topics):
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
|
|||||||
Reference in New Issue
Block a user