diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 55e9228f..dee208c6 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1395,10 +1395,8 @@ class Cluster(object): """ if self.is_shutdown: return - with host.lock: was_up = host.is_up - # ignore down signals if we have open pools to the host # this is to avoid closing pools when a control connection host became isolated if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED: @@ -3134,7 +3132,7 @@ class ControlConnection(object): c = getattr(self, '_connection', None) return [c] if c else [] - def return_connection(self, connection): + def return_connection(self, connection, mark_host_down=False): # noqa if connection is self._connection and (connection.is_defunct or connection.is_closed): self.reconnect() diff --git a/cassandra/connection.py b/cassandra/connection.py index 216b7905..cad889ee 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -1007,7 +1007,7 @@ class ConnectionHeartbeat(Thread): for connection, owner, exc in failed_connections: self._raise_if_stopped() connection.defunct(exc) - owner.return_connection(connection) + owner.return_connection(connection, mark_host_down=True) except self.ShutdownException: pass except Exception: diff --git a/cassandra/pool.py b/cassandra/pool.py index 4784be57..0fab068b 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -359,18 +359,29 @@ class HostConnection(object): raise NoConnectionsAvailable("All request IDs are currently in use") - def return_connection(self, connection): + def return_connection(self, connection, mark_host_down=False): with connection.lock: connection.in_flight -= 1 with self._stream_available_condition: self._stream_available_condition.notify() - if (connection.is_defunct or connection.is_closed) and not connection.signaled_error: - log.debug("Defunct or closed connection (%s) returned to pool, potentially " - "marking host %s as down", id(connection), self.host) - is_down = self._session.cluster.signal_connection_failure( - self.host, connection.last_error, is_host_addition=False) - connection.signaled_error = True + if connection.is_defunct or connection.is_closed: + if connection.signaled_error and not mark_host_down: + return + + is_down = False + if not connection.signaled_error: + log.debug("Defunct or closed connection (%s) returned to pool, potentially " + "marking host %s as down", id(connection), self.host) + is_down = self._session.cluster.signal_connection_failure( + self.host, connection.last_error, is_host_addition=False) + connection.signaled_error = True + + # Force mark down a host on error, used by the ConnectionHeartbeat + if mark_host_down and not is_down: + is_down = True + self._session.cluster.on_down(self.host, is_host_addition=False) + if is_down: self.shutdown() else: @@ -382,6 +393,11 @@ class HostConnection(object): self._session.submit(self._replace, connection) def _replace(self, connection): + with self._lock: + if self.is_shutdown: + self._is_replacing = False + return + log.debug("Replacing connection (%s) to %s", id(connection), self.host) try: conn = self._session.cluster.connection_factory(self.host.address) @@ -626,7 +642,7 @@ class HostConnectionPool(object): raise NoConnectionsAvailable() - def return_connection(self, connection): + def return_connection(self, connection, mark_host_down=False): #noqa with connection.lock: connection.in_flight -= 1 in_flight = connection.in_flight