diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 60d00bd4..60114176 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -579,11 +579,8 @@ class Cluster(object): for listener in self.listeners: listener.on_up(host) finally: - host._handle_node_up_condition.acquire() - if host._currently_handling_node_up: + with host.lock: host._currently_handling_node_up = False - host._handle_node_up_condition.notify() - host._handle_node_up_condition.release() # see if there are any pools to add or remove now that the host is marked up for session in self.sessions: @@ -596,15 +593,20 @@ class Cluster(object): if self._is_shutdown: return - host._handle_node_up_condition.acquire() - while host._currently_handling_node_up: - host._handle_node_up_condition.wait() - host._currently_handling_node_up = True - host._handle_node_up_condition.release() + log.debug("Waiting to acquire lock for handling up status of node %s", host) + with host.lock: + if host._currently_handling_node_up: + log.debug("Another thread is already handling up status of node %s", host) + return - if host.is_up: - return + if host.is_up: + log.debug("Host %s was already marked up", host) + return + host._currently_handling_node_up = True + log.debug("Starting to handle up status of node %s", host) + + have_future = False futures = set() try: log.info("Host %s may be up; will prepare queries and open connection pool", host) @@ -615,20 +617,25 @@ class Cluster(object): reconnector.cancel() self._prepare_all_queries(host) - log.debug("Done preparing all queries for host %s", host) + log.debug("Done preparing all queries for host %s, ", host) for session in self.sessions: session.remove_pool(host) + log.debug("Signalling to load balancing policy that host %s is up", host) self.load_balancing_policy.on_up(host) + + log.debug("Signalling to control connection that host %s is up", host) self.control_connection.on_up(host) + log.debug("Attempting to open new connection pools for host %s", host) futures_lock = Lock() futures_results = [] callback = partial(self._on_up_future_completed, host, futures, futures_results, futures_lock) for session in self.sessions: future = session.add_or_renew_pool(host, is_host_addition=False) if future is not None: + have_future = True future.add_done_callback(callback) futures.add(future) except Exception: @@ -638,11 +645,13 @@ class Cluster(object): self._cleanup_failed_on_up_handling(host) - host._handle_node_up_condition.acquire() - host._currently_handling_node_up = False - host._handle_node_up_condition.notify() - host._handle_node_up_condition.release() + with host.lock: + host._currently_handling_node_up = False raise + else: + if not have_future: + with host.lock: + host._currently_handling_node_up = False # for testing purposes return futures @@ -669,7 +678,7 @@ class Cluster(object): reconnector.start() @run_in_executor - def on_down(self, host, is_host_addition, force_if_down=False): + def on_down(self, host, is_host_addition, expect_host_to_be_down=False): """ Intended for internal use only. """ @@ -677,7 +686,7 @@ class Cluster(object): return with host.lock: - if (not (host.is_up or force_if_down)) or host.is_currently_reconnecting(): + if (not host.is_up and not expect_host_to_be_down) or host.is_currently_reconnecting(): return host.set_down() @@ -767,10 +776,10 @@ class Cluster(object): listener.on_remove(host) self.control_connection.on_remove(host) - def signal_connection_failure(self, host, connection_exc, is_host_addition): + def signal_connection_failure(self, host, connection_exc, is_host_addition, expect_host_to_be_down=False): is_down = host.signal_connection_failure(connection_exc) if is_down: - self.on_down(host, is_host_addition, force_if_down=True) + self.on_down(host, is_host_addition, expect_host_to_be_down) return is_down def add_host(self, address, datacenter=None, rack=None, signal=True): @@ -1201,8 +1210,12 @@ class Session(object): self.cluster.signal_connection_failure(host, conn_exc, is_host_addition) return False except Exception as conn_exc: - log.warn("Failed to create connection pool for new host %s: %s", host, conn_exc) - self.cluster.signal_connection_failure(host, conn_exc, is_host_addition) + log.warn("Failed to create connection pool for new host %s: %s", + host, conn_exc) + # the host itself will still be marked down, so we need to pass + # a special flag to make sure the reconnector is created + self.cluster.signal_connection_failure( + host, conn_exc, is_host_addition, expect_host_to_be_down=True) return False previous = self._pools.get(host) diff --git a/cassandra/pool.py b/cassandra/pool.py index 5f4e804c..7fd8e355 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -69,7 +69,6 @@ class Host(object): lock = None _currently_handling_node_up = False - _handle_node_up_condition = None def __init__(self, inet_address, conviction_policy_factory, datacenter=None, rack=None): if inet_address is None: @@ -81,7 +80,6 @@ class Host(object): self.conviction_policy = conviction_policy_factory(self) self.set_location_info(datacenter, rack) self.lock = RLock() - self._handle_node_up_condition = Condition() @property def datacenter(self):