Add idle_heartbeat_timeout cluster option to tune how long to wait for heartbeat responses
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
3.11
|
||||
====
|
||||
|
||||
Features
|
||||
--------
|
||||
* Add idle_heartbeat_timeout cluster option to tune how long to wait for heartbeat responses. (PYTHON-762)
|
||||
|
||||
Bug Fixes
|
||||
---------
|
||||
* is_idempotent flag is not propagated from PreparedStatement to BoundStatement (PYTHON-736)
|
||||
|
||||
@@ -590,6 +590,12 @@ class Cluster(object):
|
||||
Setting to zero disables heartbeats.
|
||||
"""
|
||||
|
||||
idle_heartbeat_timeout = 30
|
||||
"""
|
||||
Timeout, in seconds, on which the heartbeat wait for idle connection responses.
|
||||
Lowering this value can help to discover bad connections earlier.
|
||||
"""
|
||||
|
||||
schema_event_refresh_window = 2
|
||||
"""
|
||||
Window, in seconds, within which a schema component will be refreshed after
|
||||
@@ -756,7 +762,8 @@ class Cluster(object):
|
||||
reprepare_on_up=True,
|
||||
execution_profiles=None,
|
||||
allow_beta_protocol_version=False,
|
||||
timestamp_generator=None):
|
||||
timestamp_generator=None,
|
||||
idle_heartbeat_timeout=30):
|
||||
"""
|
||||
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
|
||||
extablishing connection pools or refreshing metadata.
|
||||
@@ -847,6 +854,7 @@ class Cluster(object):
|
||||
self.max_schema_agreement_wait = max_schema_agreement_wait
|
||||
self.control_connection_timeout = control_connection_timeout
|
||||
self.idle_heartbeat_interval = idle_heartbeat_interval
|
||||
self.idle_heartbeat_timeout = idle_heartbeat_timeout
|
||||
self.schema_event_refresh_window = schema_event_refresh_window
|
||||
self.topology_event_refresh_window = topology_event_refresh_window
|
||||
self.status_event_refresh_window = status_event_refresh_window
|
||||
@@ -1187,7 +1195,11 @@ class Cluster(object):
|
||||
self.profile_manager.check_supported() # todo: rename this method
|
||||
|
||||
if self.idle_heartbeat_interval:
|
||||
self._idle_heartbeat = ConnectionHeartbeat(self.idle_heartbeat_interval, self.get_connection_holders)
|
||||
self._idle_heartbeat = ConnectionHeartbeat(
|
||||
self.idle_heartbeat_interval,
|
||||
self.get_connection_holders,
|
||||
timeout=self.idle_heartbeat_timeout
|
||||
)
|
||||
self._is_setup = True
|
||||
|
||||
session = self._new_session(keyspace)
|
||||
|
||||
@@ -951,9 +951,10 @@ class HeartbeatFuture(object):
|
||||
|
||||
class ConnectionHeartbeat(Thread):
|
||||
|
||||
def __init__(self, interval_sec, get_connection_holders):
|
||||
def __init__(self, interval_sec, get_connection_holders, timeout):
|
||||
Thread.__init__(self, name="Connection heartbeat")
|
||||
self._interval = interval_sec
|
||||
self._timeout = timeout
|
||||
self._get_connection_holders = get_connection_holders
|
||||
self._shutdown_event = Event()
|
||||
self.daemon = True
|
||||
@@ -990,8 +991,8 @@ class ConnectionHeartbeat(Thread):
|
||||
owner.return_connection(connection)
|
||||
self._raise_if_stopped()
|
||||
|
||||
# Wait max `self._interval` seconds for all HeartbeatFutures to complete
|
||||
timeout = self._interval
|
||||
# Wait max `self._timeout` seconds for all HeartbeatFutures to complete
|
||||
timeout = self._timeout
|
||||
start_time = time.time()
|
||||
for f in futures:
|
||||
self._raise_if_stopped()
|
||||
@@ -1007,7 +1008,7 @@ class ConnectionHeartbeat(Thread):
|
||||
id(connection), connection.host)
|
||||
failed_connections.append((f.connection, f.owner, e))
|
||||
|
||||
timeout = self._interval - (time.time() - start_time)
|
||||
timeout = self._timeout - (time.time() - start_time)
|
||||
|
||||
for connection, owner, exc in failed_connections:
|
||||
self._raise_if_stopped()
|
||||
|
||||
@@ -46,6 +46,8 @@
|
||||
|
||||
.. autoattribute:: idle_heartbeat_interval
|
||||
|
||||
.. autoattribute:: idle_heartbeat_timeout
|
||||
|
||||
.. autoattribute:: schema_event_refresh_window
|
||||
|
||||
.. autoattribute:: topology_event_refresh_window
|
||||
|
||||
@@ -277,8 +277,8 @@ class ConnectionHeartbeatTest(unittest.TestCase):
|
||||
get_holders = Mock(return_value=holders)
|
||||
return get_holders
|
||||
|
||||
def run_heartbeat(self, get_holders_fun, count=2, interval=0.05):
|
||||
ch = ConnectionHeartbeat(interval, get_holders_fun)
|
||||
def run_heartbeat(self, get_holders_fun, count=2, interval=0.05, timeout=0.05):
|
||||
ch = ConnectionHeartbeat(interval, get_holders_fun, timeout=timeout)
|
||||
time.sleep(interval * count)
|
||||
ch.stop()
|
||||
self.assertTrue(get_holders_fun.call_count)
|
||||
|
||||
Reference in New Issue
Block a user