Merge pull request #780 from datastax/python-762
PYTHON-762: All Heartbeat futures should have the same timeout
This commit is contained in:
@@ -1,10 +1,15 @@
|
||||
3.11
|
||||
====
|
||||
|
||||
Features
|
||||
--------
|
||||
* Add idle_heartbeat_timeout cluster option to tune how long to wait for heartbeat responses. (PYTHON-762)
|
||||
|
||||
Bug Fixes
|
||||
---------
|
||||
* is_idempotent flag is not propagated from PreparedStatement to BoundStatement (PYTHON-736)
|
||||
* Fix asyncore hang on exit (PYTHON-767)
|
||||
* Driver takes several minutes to remove a bad host from session (PYTHON-762)
|
||||
|
||||
Other
|
||||
-----
|
||||
|
||||
@@ -590,6 +590,12 @@ class Cluster(object):
|
||||
Setting to zero disables heartbeats.
|
||||
"""
|
||||
|
||||
idle_heartbeat_timeout = 30
|
||||
"""
|
||||
Timeout, in seconds, on which the heartbeat wait for idle connection responses.
|
||||
Lowering this value can help to discover bad connections earlier.
|
||||
"""
|
||||
|
||||
schema_event_refresh_window = 2
|
||||
"""
|
||||
Window, in seconds, within which a schema component will be refreshed after
|
||||
@@ -756,7 +762,8 @@ class Cluster(object):
|
||||
reprepare_on_up=True,
|
||||
execution_profiles=None,
|
||||
allow_beta_protocol_version=False,
|
||||
timestamp_generator=None):
|
||||
timestamp_generator=None,
|
||||
idle_heartbeat_timeout=30):
|
||||
"""
|
||||
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
|
||||
extablishing connection pools or refreshing metadata.
|
||||
@@ -847,6 +854,7 @@ class Cluster(object):
|
||||
self.max_schema_agreement_wait = max_schema_agreement_wait
|
||||
self.control_connection_timeout = control_connection_timeout
|
||||
self.idle_heartbeat_interval = idle_heartbeat_interval
|
||||
self.idle_heartbeat_timeout = idle_heartbeat_timeout
|
||||
self.schema_event_refresh_window = schema_event_refresh_window
|
||||
self.topology_event_refresh_window = topology_event_refresh_window
|
||||
self.status_event_refresh_window = status_event_refresh_window
|
||||
@@ -1187,7 +1195,11 @@ class Cluster(object):
|
||||
self.profile_manager.check_supported() # todo: rename this method
|
||||
|
||||
if self.idle_heartbeat_interval:
|
||||
self._idle_heartbeat = ConnectionHeartbeat(self.idle_heartbeat_interval, self.get_connection_holders)
|
||||
self._idle_heartbeat = ConnectionHeartbeat(
|
||||
self.idle_heartbeat_interval,
|
||||
self.get_connection_holders,
|
||||
timeout=self.idle_heartbeat_timeout
|
||||
)
|
||||
self._is_setup = True
|
||||
|
||||
session = self._new_session(keyspace)
|
||||
|
||||
@@ -951,9 +951,10 @@ class HeartbeatFuture(object):
|
||||
|
||||
class ConnectionHeartbeat(Thread):
|
||||
|
||||
def __init__(self, interval_sec, get_connection_holders):
|
||||
def __init__(self, interval_sec, get_connection_holders, timeout):
|
||||
Thread.__init__(self, name="Connection heartbeat")
|
||||
self._interval = interval_sec
|
||||
self._timeout = timeout
|
||||
self._get_connection_holders = get_connection_holders
|
||||
self._shutdown_event = Event()
|
||||
self.daemon = True
|
||||
@@ -990,11 +991,14 @@ class ConnectionHeartbeat(Thread):
|
||||
owner.return_connection(connection)
|
||||
self._raise_if_stopped()
|
||||
|
||||
# Wait max `self._timeout` seconds for all HeartbeatFutures to complete
|
||||
timeout = self._timeout
|
||||
start_time = time.time()
|
||||
for f in futures:
|
||||
self._raise_if_stopped()
|
||||
connection = f.connection
|
||||
try:
|
||||
f.wait(self._interval)
|
||||
f.wait(timeout)
|
||||
# TODO: move this, along with connection locks in pool, down into Connection
|
||||
with connection.lock:
|
||||
connection.in_flight -= 1
|
||||
@@ -1004,6 +1008,8 @@ class ConnectionHeartbeat(Thread):
|
||||
id(connection), connection.host)
|
||||
failed_connections.append((f.connection, f.owner, e))
|
||||
|
||||
timeout = self._timeout - (time.time() - start_time)
|
||||
|
||||
for connection, owner, exc in failed_connections:
|
||||
self._raise_if_stopped()
|
||||
connection.defunct(exc)
|
||||
|
||||
@@ -46,6 +46,8 @@
|
||||
|
||||
.. autoattribute:: idle_heartbeat_interval
|
||||
|
||||
.. autoattribute:: idle_heartbeat_timeout
|
||||
|
||||
.. autoattribute:: schema_event_refresh_window
|
||||
|
||||
.. autoattribute:: topology_event_refresh_window
|
||||
|
||||
@@ -277,8 +277,8 @@ class ConnectionHeartbeatTest(unittest.TestCase):
|
||||
get_holders = Mock(return_value=holders)
|
||||
return get_holders
|
||||
|
||||
def run_heartbeat(self, get_holders_fun, count=2, interval=0.05):
|
||||
ch = ConnectionHeartbeat(interval, get_holders_fun)
|
||||
def run_heartbeat(self, get_holders_fun, count=2, interval=0.05, timeout=0.05):
|
||||
ch = ConnectionHeartbeat(interval, get_holders_fun, timeout=timeout)
|
||||
time.sleep(interval * count)
|
||||
ch.stop()
|
||||
self.assertTrue(get_holders_fun.call_count)
|
||||
|
||||
Reference in New Issue
Block a user