Merge pull request #240 from datastax/PYTHON-197
PYTHON-197 - Idle connection heartbeat
This commit is contained in:
@@ -39,12 +39,13 @@ except ImportError:
|
||||
from cassandra.util import WeakSet # NOQA
|
||||
|
||||
from functools import partial, wraps
|
||||
from itertools import groupby
|
||||
from itertools import groupby, chain
|
||||
|
||||
from cassandra import (ConsistencyLevel, AuthenticationFailed,
|
||||
InvalidRequest, OperationTimedOut,
|
||||
UnsupportedOperation, Unauthorized)
|
||||
from cassandra.connection import ConnectionException, ConnectionShutdown
|
||||
from cassandra.connection import (ConnectionException, ConnectionShutdown,
|
||||
ConnectionHeartbeat)
|
||||
from cassandra.encoder import Encoder
|
||||
from cassandra.protocol import (QueryMessage, ResultMessage,
|
||||
ErrorMessage, ReadTimeoutErrorMessage,
|
||||
@@ -372,6 +373,14 @@ class Cluster(object):
|
||||
If set to :const:`None`, there will be no timeout for these queries.
|
||||
"""
|
||||
|
||||
idle_heartbeat_interval = 30
|
||||
"""
|
||||
Interval, in seconds, on which to heartbeat idle connections. This helps
|
||||
keep connections open through network devices that expire idle connections.
|
||||
It also helps discover bad connections early in low-traffic scenarios.
|
||||
Setting to zero disables heartbeats.
|
||||
"""
|
||||
|
||||
sessions = None
|
||||
control_connection = None
|
||||
scheduler = None
|
||||
@@ -380,6 +389,7 @@ class Cluster(object):
|
||||
_is_setup = False
|
||||
_prepared_statements = None
|
||||
_prepared_statement_lock = None
|
||||
_idle_heartbeat = None
|
||||
|
||||
_user_types = None
|
||||
"""
|
||||
@@ -406,7 +416,8 @@ class Cluster(object):
|
||||
protocol_version=2,
|
||||
executor_threads=2,
|
||||
max_schema_agreement_wait=10,
|
||||
control_connection_timeout=2.0):
|
||||
control_connection_timeout=2.0,
|
||||
idle_heartbeat_interval=30):
|
||||
"""
|
||||
Any of the mutable Cluster attributes may be set as keyword arguments
|
||||
to the constructor.
|
||||
@@ -456,6 +467,7 @@ class Cluster(object):
|
||||
self.cql_version = cql_version
|
||||
self.max_schema_agreement_wait = max_schema_agreement_wait
|
||||
self.control_connection_timeout = control_connection_timeout
|
||||
self.idle_heartbeat_interval = idle_heartbeat_interval
|
||||
|
||||
self._listeners = set()
|
||||
self._listener_lock = Lock()
|
||||
@@ -688,18 +700,19 @@ class Cluster(object):
|
||||
self.load_balancing_policy.populate(
|
||||
weakref.proxy(self), self.metadata.all_hosts())
|
||||
|
||||
if self.control_connection:
|
||||
try:
|
||||
self.control_connection.connect()
|
||||
log.debug("Control connection created")
|
||||
except Exception:
|
||||
log.exception("Control connection failed to connect, "
|
||||
"shutting down Cluster:")
|
||||
self.shutdown()
|
||||
raise
|
||||
try:
|
||||
self.control_connection.connect()
|
||||
log.debug("Control connection created")
|
||||
except Exception:
|
||||
log.exception("Control connection failed to connect, "
|
||||
"shutting down Cluster:")
|
||||
self.shutdown()
|
||||
raise
|
||||
|
||||
self.load_balancing_policy.check_supported()
|
||||
|
||||
if self.idle_heartbeat_interval:
|
||||
self._idle_heartbeat = ConnectionHeartbeat(self.idle_heartbeat_interval, self.get_connection_holders)
|
||||
self._is_setup = True
|
||||
|
||||
session = self._new_session()
|
||||
@@ -707,6 +720,13 @@ class Cluster(object):
|
||||
session.set_keyspace(keyspace)
|
||||
return session
|
||||
|
||||
def get_connection_holders(self):
|
||||
holders = []
|
||||
for s in self.sessions:
|
||||
holders.extend(s.get_pools())
|
||||
holders.append(self.control_connection)
|
||||
return holders
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
Closes all sessions and connection associated with this Cluster.
|
||||
@@ -721,18 +741,17 @@ class Cluster(object):
|
||||
else:
|
||||
self.is_shutdown = True
|
||||
|
||||
if self.scheduler:
|
||||
self.scheduler.shutdown()
|
||||
if self._idle_heartbeat:
|
||||
self._idle_heartbeat.stop()
|
||||
|
||||
if self.control_connection:
|
||||
self.control_connection.shutdown()
|
||||
self.scheduler.shutdown()
|
||||
|
||||
if self.sessions:
|
||||
for session in self.sessions:
|
||||
session.shutdown()
|
||||
self.control_connection.shutdown()
|
||||
|
||||
if self.executor:
|
||||
self.executor.shutdown()
|
||||
for session in self.sessions:
|
||||
session.shutdown()
|
||||
|
||||
self.executor.shutdown()
|
||||
|
||||
def _new_session(self):
|
||||
session = Session(self, self.metadata.all_hosts())
|
||||
@@ -1656,6 +1675,9 @@ class Session(object):
|
||||
def get_pool_state(self):
|
||||
return dict((host, pool.get_state()) for host, pool in self._pools.items())
|
||||
|
||||
def get_pools(self):
|
||||
return self._pools.values()
|
||||
|
||||
|
||||
class UserTypeDoesNotExist(Exception):
|
||||
"""
|
||||
@@ -2271,11 +2293,6 @@ class ControlConnection(object):
|
||||
# manually
|
||||
self.reconnect()
|
||||
|
||||
@property
|
||||
def is_open(self):
|
||||
conn = self._connection
|
||||
return bool(conn and conn.is_open)
|
||||
|
||||
def on_up(self, host):
|
||||
pass
|
||||
|
||||
@@ -2295,6 +2312,14 @@ class ControlConnection(object):
|
||||
def on_remove(self, host):
|
||||
self.refresh_node_list_and_token_map(force_token_rebuild=True)
|
||||
|
||||
def get_connections(self):
|
||||
c = getattr(self, '_connection', None)
|
||||
return [c] if c else []
|
||||
|
||||
def return_connection(self, connection):
|
||||
if connection is self._connection and (connection.is_defunct or connection.is_closed):
|
||||
self.reconnect()
|
||||
|
||||
|
||||
def _stop_scheduler(scheduler, thread):
|
||||
try:
|
||||
|
||||
@@ -20,7 +20,7 @@ import io
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from threading import Event, RLock
|
||||
from threading import Thread, Event, RLock
|
||||
import time
|
||||
|
||||
if 'gevent.monkey' in sys.modules:
|
||||
@@ -159,7 +159,7 @@ class Connection(object):
|
||||
in_flight = 0
|
||||
|
||||
# A set of available request IDs. When using the v3 protocol or higher,
|
||||
# this will no initially include all request IDs in order to save memory,
|
||||
# this will not initially include all request IDs in order to save memory,
|
||||
# but the set will grow if it is exhausted.
|
||||
request_ids = None
|
||||
|
||||
@@ -172,6 +172,8 @@ class Connection(object):
|
||||
lock = None
|
||||
user_type_map = None
|
||||
|
||||
msg_received = False
|
||||
|
||||
is_control_connection = False
|
||||
_iobuf = None
|
||||
|
||||
@@ -401,6 +403,8 @@ class Connection(object):
|
||||
with self.lock:
|
||||
self.request_ids.append(stream_id)
|
||||
|
||||
self.msg_received = True
|
||||
|
||||
body = None
|
||||
try:
|
||||
# check that the protocol version is supported
|
||||
@@ -673,6 +677,13 @@ class Connection(object):
|
||||
|
||||
self.send_msg(query, request_id, process_result)
|
||||
|
||||
@property
|
||||
def is_idle(self):
|
||||
return not self.msg_received
|
||||
|
||||
def reset_idle(self):
|
||||
self.msg_received = False
|
||||
|
||||
def __str__(self):
|
||||
status = ""
|
||||
if self.is_defunct:
|
||||
@@ -732,3 +743,96 @@ class ResponseWaiter(object):
|
||||
raise OperationTimedOut()
|
||||
else:
|
||||
return self.responses
|
||||
|
||||
|
||||
class HeartbeatFuture(object):
|
||||
def __init__(self, connection, owner):
|
||||
self._exception = None
|
||||
self._event = Event()
|
||||
self.connection = connection
|
||||
self.owner = owner
|
||||
log.debug("Sending options message heartbeat on idle connection %s %s",
|
||||
id(connection), connection.host)
|
||||
with connection.lock:
|
||||
if connection.in_flight < connection.max_request_id:
|
||||
connection.in_flight += 1
|
||||
connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback)
|
||||
else:
|
||||
self._exception = Exception("Failed to send heartbeat because connection 'in_flight' exceeds threshold")
|
||||
self._event.set()
|
||||
|
||||
def wait(self, timeout):
|
||||
self._event.wait(timeout)
|
||||
if self._event.is_set():
|
||||
if self._exception:
|
||||
raise self._exception
|
||||
else:
|
||||
raise OperationTimedOut()
|
||||
|
||||
def _options_callback(self, response):
|
||||
if not isinstance(response, SupportedMessage):
|
||||
if isinstance(response, ConnectionException):
|
||||
self._exception = response
|
||||
else:
|
||||
self._exception = ConnectionException("Received unexpected response to OptionsMessage: %s"
|
||||
% (response,))
|
||||
|
||||
log.debug("Received options response on connection (%s) from %s",
|
||||
id(self.connection), self.connection.host)
|
||||
self._event.set()
|
||||
|
||||
|
||||
class ConnectionHeartbeat(Thread):
|
||||
|
||||
def __init__(self, interval_sec, get_connection_holders):
|
||||
Thread.__init__(self, name="Connection heartbeat")
|
||||
self._interval = interval_sec
|
||||
self._get_connection_holders = get_connection_holders
|
||||
self._shutdown_event = Event()
|
||||
self.daemon = True
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
self._shutdown_event.wait(self._interval)
|
||||
while not self._shutdown_event.is_set():
|
||||
start_time = time.time()
|
||||
|
||||
futures = []
|
||||
failed_connections = []
|
||||
try:
|
||||
for connections, owner in [(o.get_connections(), o) for o in self._get_connection_holders()]:
|
||||
for connection in connections:
|
||||
if not (connection.is_defunct or connection.is_closed) and connection.is_idle:
|
||||
try:
|
||||
futures.append(HeartbeatFuture(connection, owner))
|
||||
except Exception:
|
||||
log.warning("Failed sending heartbeat message on connection (%s) to %s",
|
||||
id(connection), connection.host, exc_info=True)
|
||||
failed_connections.append((connection, owner))
|
||||
else:
|
||||
connection.reset_idle()
|
||||
|
||||
for f in futures:
|
||||
connection = f.connection
|
||||
try:
|
||||
f.wait(self._interval)
|
||||
# TODO: move this, along with connection locks in pool, down into Connection
|
||||
with connection.lock:
|
||||
connection.in_flight -= 1
|
||||
connection.reset_idle()
|
||||
except Exception:
|
||||
log.warning("Heartbeat failed for connection (%s) to %s",
|
||||
id(connection), connection.host, exc_info=True)
|
||||
failed_connections.append((f.connection, f.owner))
|
||||
|
||||
for connection, owner in failed_connections:
|
||||
connection.defunct(Exception('Connection heartbeat failure'))
|
||||
owner.return_connection(connection)
|
||||
except Exception:
|
||||
log.error("Failed connection heartbeat", exc_info=True)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
self._shutdown_event.wait(max(self._interval - elapsed, 0.01))
|
||||
|
||||
def stop(self):
|
||||
self._shutdown_event.set()
|
||||
|
||||
@@ -361,6 +361,10 @@ class HostConnection(object):
|
||||
|
||||
self._connection.set_keyspace_async(keyspace, connection_finished_setting_keyspace)
|
||||
|
||||
def get_connections(self):
|
||||
c = self._connection
|
||||
return [c] if c else []
|
||||
|
||||
def get_state(self):
|
||||
have_conn = self._connection is not None
|
||||
in_flight = self._connection.in_flight if have_conn else 0
|
||||
@@ -695,6 +699,9 @@ class HostConnectionPool(object):
|
||||
for conn in self._connections:
|
||||
conn.set_keyspace_async(keyspace, connection_finished_setting_keyspace)
|
||||
|
||||
def get_connections(self):
|
||||
return self._connections
|
||||
|
||||
def get_state(self):
|
||||
in_flights = ", ".join([str(c.in_flight) for c in self._connections])
|
||||
return "shutdown: %s, open_count: %d, in_flights: %s" % (self.is_shutdown, self.open_count, in_flights)
|
||||
|
||||
Reference in New Issue
Block a user