diff --git a/cassandra/cluster.py b/cassandra/cluster.py index fc6b22af..372d5433 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1395,8 +1395,10 @@ class Cluster(object): """ if self.is_shutdown: return + with host.lock: was_up = host.is_up + # ignore down signals if we have open pools to the host # this is to avoid closing pools when a control connection host became isolated if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED: @@ -3132,7 +3134,7 @@ class ControlConnection(object): c = getattr(self, '_connection', None) return [c] if c else [] - def return_connection(self, connection, mark_host_down=False): # noqa + def return_connection(self, connection): if connection is self._connection and (connection.is_defunct or connection.is_closed): self.reconnect() diff --git a/cassandra/connection.py b/cassandra/connection.py index 03c44fcc..5f39ffed 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -1007,7 +1007,10 @@ class ConnectionHeartbeat(Thread): for connection, owner, exc in failed_connections: self._raise_if_stopped() connection.defunct(exc) - owner.return_connection(connection, mark_host_down=True) + if not connection.is_control_connection: + # Only HostConnection supports shutdown_on_error + owner.shutdown_on_error = True + owner.return_connection(connection) except self.ShutdownException: pass except Exception: diff --git a/cassandra/pool.py b/cassandra/pool.py index 0fab068b..7a198e67 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -305,6 +305,7 @@ class HostConnection(object): host = None host_distance = None is_shutdown = False + shutdown_on_error = False _session = None _connection = None @@ -359,14 +360,14 @@ class HostConnection(object): raise NoConnectionsAvailable("All request IDs are currently in use") - def return_connection(self, connection, mark_host_down=False): + def return_connection(self, connection): with connection.lock: connection.in_flight -= 1 with self._stream_available_condition: self._stream_available_condition.notify() if connection.is_defunct or connection.is_closed: - if connection.signaled_error and not mark_host_down: + if connection.signaled_error and not self.shutdown_on_error: return is_down = False @@ -377,8 +378,7 @@ class HostConnection(object): self.host, connection.last_error, is_host_addition=False) connection.signaled_error = True - # Force mark down a host on error, used by the ConnectionHeartbeat - if mark_host_down and not is_down: + if self.shutdown_on_error and not is_down: is_down = True self._session.cluster.on_down(self.host, is_host_addition=False) @@ -395,7 +395,6 @@ class HostConnection(object): def _replace(self, connection): with self._lock: if self.is_shutdown: - self._is_replacing = False return log.debug("Replacing connection (%s) to %s", id(connection), self.host) @@ -642,7 +641,7 @@ class HostConnectionPool(object): raise NoConnectionsAvailable() - def return_connection(self, connection, mark_host_down=False): #noqa + def return_connection(self, connection): with connection.lock: connection.in_flight -= 1 in_flight = connection.in_flight