Fix duplicate node-up handling
a22f80b8 introduced a bug where multiple UP notifications
could result in multiple reconnectors being started. It was also
possible for the executor threads to become deadlocked waiting on a
condition that would never be set, which meant that future down and up
node handling would never be executed.
This commit is contained in:
@@ -579,11 +579,8 @@ class Cluster(object):
|
||||
for listener in self.listeners:
|
||||
listener.on_up(host)
|
||||
finally:
|
||||
host._handle_node_up_condition.acquire()
|
||||
if host._currently_handling_node_up:
|
||||
with host.lock:
|
||||
host._currently_handling_node_up = False
|
||||
host._handle_node_up_condition.notify()
|
||||
host._handle_node_up_condition.release()
|
||||
|
||||
# see if there are any pools to add or remove now that the host is marked up
|
||||
for session in self.sessions:
|
||||
@@ -596,15 +593,20 @@ class Cluster(object):
|
||||
if self._is_shutdown:
|
||||
return
|
||||
|
||||
host._handle_node_up_condition.acquire()
|
||||
while host._currently_handling_node_up:
|
||||
host._handle_node_up_condition.wait()
|
||||
host._currently_handling_node_up = True
|
||||
host._handle_node_up_condition.release()
|
||||
|
||||
if host.is_up:
|
||||
log.debug("Waiting to acquire lock for handling up status of node %s", host)
|
||||
with host.lock:
|
||||
if host._currently_handling_node_up:
|
||||
log.debug("Another thread is already handling up status of node %s", host)
|
||||
return
|
||||
|
||||
if host.is_up:
|
||||
log.debug("Host %s was already marked up", host)
|
||||
return
|
||||
|
||||
host._currently_handling_node_up = True
|
||||
log.debug("Starting to handle up status of node %s", host)
|
||||
|
||||
have_future = False
|
||||
futures = set()
|
||||
try:
|
||||
log.info("Host %s may be up; will prepare queries and open connection pool", host)
|
||||
@@ -615,20 +617,25 @@ class Cluster(object):
|
||||
reconnector.cancel()
|
||||
|
||||
self._prepare_all_queries(host)
|
||||
log.debug("Done preparing all queries for host %s", host)
|
||||
log.debug("Done preparing all queries for host %s, ", host)
|
||||
|
||||
for session in self.sessions:
|
||||
session.remove_pool(host)
|
||||
|
||||
log.debug("Signalling to load balancing policy that host %s is up", host)
|
||||
self.load_balancing_policy.on_up(host)
|
||||
|
||||
log.debug("Signalling to control connection that host %s is up", host)
|
||||
self.control_connection.on_up(host)
|
||||
|
||||
log.debug("Attempting to open new connection pools for host %s", host)
|
||||
futures_lock = Lock()
|
||||
futures_results = []
|
||||
callback = partial(self._on_up_future_completed, host, futures, futures_results, futures_lock)
|
||||
for session in self.sessions:
|
||||
future = session.add_or_renew_pool(host, is_host_addition=False)
|
||||
if future is not None:
|
||||
have_future = True
|
||||
future.add_done_callback(callback)
|
||||
futures.add(future)
|
||||
except Exception:
|
||||
@@ -638,11 +645,13 @@ class Cluster(object):
|
||||
|
||||
self._cleanup_failed_on_up_handling(host)
|
||||
|
||||
host._handle_node_up_condition.acquire()
|
||||
with host.lock:
|
||||
host._currently_handling_node_up = False
|
||||
host._handle_node_up_condition.notify()
|
||||
host._handle_node_up_condition.release()
|
||||
raise
|
||||
else:
|
||||
if not have_future:
|
||||
with host.lock:
|
||||
host._currently_handling_node_up = False
|
||||
|
||||
# for testing purposes
|
||||
return futures
|
||||
@@ -669,7 +678,7 @@ class Cluster(object):
|
||||
reconnector.start()
|
||||
|
||||
@run_in_executor
|
||||
def on_down(self, host, is_host_addition, force_if_down=False):
|
||||
def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
|
||||
"""
|
||||
Intended for internal use only.
|
||||
"""
|
||||
@@ -677,7 +686,7 @@ class Cluster(object):
|
||||
return
|
||||
|
||||
with host.lock:
|
||||
if (not (host.is_up or force_if_down)) or host.is_currently_reconnecting():
|
||||
if (not host.is_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
|
||||
return
|
||||
|
||||
host.set_down()
|
||||
@@ -767,10 +776,10 @@ class Cluster(object):
|
||||
listener.on_remove(host)
|
||||
self.control_connection.on_remove(host)
|
||||
|
||||
def signal_connection_failure(self, host, connection_exc, is_host_addition):
|
||||
def signal_connection_failure(self, host, connection_exc, is_host_addition, expect_host_to_be_down=False):
|
||||
is_down = host.signal_connection_failure(connection_exc)
|
||||
if is_down:
|
||||
self.on_down(host, is_host_addition, force_if_down=True)
|
||||
self.on_down(host, is_host_addition, expect_host_to_be_down)
|
||||
return is_down
|
||||
|
||||
def add_host(self, address, datacenter=None, rack=None, signal=True):
|
||||
@@ -1201,8 +1210,12 @@ class Session(object):
|
||||
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
|
||||
return False
|
||||
except Exception as conn_exc:
|
||||
log.warn("Failed to create connection pool for new host %s: %s", host, conn_exc)
|
||||
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
|
||||
log.warn("Failed to create connection pool for new host %s: %s",
|
||||
host, conn_exc)
|
||||
# the host itself will still be marked down, so we need to pass
|
||||
# a special flag to make sure the reconnector is created
|
||||
self.cluster.signal_connection_failure(
|
||||
host, conn_exc, is_host_addition, expect_host_to_be_down=True)
|
||||
return False
|
||||
|
||||
previous = self._pools.get(host)
|
||||
|
||||
@@ -69,7 +69,6 @@ class Host(object):
|
||||
lock = None
|
||||
|
||||
_currently_handling_node_up = False
|
||||
_handle_node_up_condition = None
|
||||
|
||||
def __init__(self, inet_address, conviction_policy_factory, datacenter=None, rack=None):
|
||||
if inet_address is None:
|
||||
@@ -81,7 +80,6 @@ class Host(object):
|
||||
self.conviction_policy = conviction_policy_factory(self)
|
||||
self.set_location_info(datacenter, rack)
|
||||
self.lock = RLock()
|
||||
self._handle_node_up_condition = Condition()
|
||||
|
||||
@property
|
||||
def datacenter(self):
|
||||
|
||||
Reference in New Issue
Block a user