diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 328dfcbe..4bd805ed 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,10 +1,15 @@ 3.11 ==== +Features +-------- +* Add idle_heartbeat_timeout cluster option to tune how long to wait for heartbeat responses. (PYTHON-762) + Bug Fixes --------- * is_idempotent flag is not propagated from PreparedStatement to BoundStatement (PYTHON-736) * Fix asyncore hang on exit (PYTHON-767) +* Driver takes several minutes to remove a bad host from session (PYTHON-762) Other ----- diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 372d5433..cfe12f9c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -590,6 +590,12 @@ class Cluster(object): Setting to zero disables heartbeats. """ + idle_heartbeat_timeout = 30 + """ + Timeout, in seconds, on which the heartbeat wait for idle connection responses. + Lowering this value can help to discover bad connections earlier. + """ + schema_event_refresh_window = 2 """ Window, in seconds, within which a schema component will be refreshed after @@ -756,7 +762,8 @@ class Cluster(object): reprepare_on_up=True, execution_profiles=None, allow_beta_protocol_version=False, - timestamp_generator=None): + timestamp_generator=None, + idle_heartbeat_timeout=30): """ ``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as extablishing connection pools or refreshing metadata. @@ -847,6 +854,7 @@ class Cluster(object): self.max_schema_agreement_wait = max_schema_agreement_wait self.control_connection_timeout = control_connection_timeout self.idle_heartbeat_interval = idle_heartbeat_interval + self.idle_heartbeat_timeout = idle_heartbeat_timeout self.schema_event_refresh_window = schema_event_refresh_window self.topology_event_refresh_window = topology_event_refresh_window self.status_event_refresh_window = status_event_refresh_window @@ -1187,7 +1195,11 @@ class Cluster(object): self.profile_manager.check_supported() # todo: rename this method if self.idle_heartbeat_interval: - self._idle_heartbeat = ConnectionHeartbeat(self.idle_heartbeat_interval, self.get_connection_holders) + self._idle_heartbeat = ConnectionHeartbeat( + self.idle_heartbeat_interval, + self.get_connection_holders, + timeout=self.idle_heartbeat_timeout + ) self._is_setup = True session = self._new_session(keyspace) diff --git a/cassandra/connection.py b/cassandra/connection.py index 5f39ffed..c1e9d888 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -951,9 +951,10 @@ class HeartbeatFuture(object): class ConnectionHeartbeat(Thread): - def __init__(self, interval_sec, get_connection_holders): + def __init__(self, interval_sec, get_connection_holders, timeout): Thread.__init__(self, name="Connection heartbeat") self._interval = interval_sec + self._timeout = timeout self._get_connection_holders = get_connection_holders self._shutdown_event = Event() self.daemon = True @@ -990,11 +991,14 @@ class ConnectionHeartbeat(Thread): owner.return_connection(connection) self._raise_if_stopped() + # Wait max `self._timeout` seconds for all HeartbeatFutures to complete + timeout = self._timeout + start_time = time.time() for f in futures: self._raise_if_stopped() connection = f.connection try: - f.wait(self._interval) + f.wait(timeout) # TODO: move this, along with connection locks in pool, down into Connection with connection.lock: connection.in_flight -= 1 @@ -1004,6 +1008,8 @@ class ConnectionHeartbeat(Thread): id(connection), connection.host) failed_connections.append((f.connection, f.owner, e)) + timeout = self._timeout - (time.time() - start_time) + for connection, owner, exc in failed_connections: self._raise_if_stopped() connection.defunct(exc) diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst index c37851ad..86e168ad 100644 --- a/docs/api/cassandra/cluster.rst +++ b/docs/api/cassandra/cluster.rst @@ -46,6 +46,8 @@ .. autoattribute:: idle_heartbeat_interval + .. autoattribute:: idle_heartbeat_timeout + .. autoattribute:: schema_event_refresh_window .. autoattribute:: topology_event_refresh_window diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index ec9e1a3f..9d312b85 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -277,8 +277,8 @@ class ConnectionHeartbeatTest(unittest.TestCase): get_holders = Mock(return_value=holders) return get_holders - def run_heartbeat(self, get_holders_fun, count=2, interval=0.05): - ch = ConnectionHeartbeat(interval, get_holders_fun) + def run_heartbeat(self, get_holders_fun, count=2, interval=0.05, timeout=0.05): + ch = ConnectionHeartbeat(interval, get_holders_fun, timeout=timeout) time.sleep(interval * count) ch.stop() self.assertTrue(get_holders_fun.call_count)