From 46315504e567b8e8f24dffade22e3867fb5b6cf3 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 12 Jan 2015 17:46:52 -0600 Subject: [PATCH 1/4] Add idle connection heartbeats. PYTHON-197 --- cassandra/cluster.py | 46 ++++++++++++++---- cassandra/connection.py | 102 +++++++++++++++++++++++++++++++++++++++- cassandra/pool.py | 7 +++ 3 files changed, 145 insertions(+), 10 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 0822e2a2..e8a99ebd 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -39,12 +39,13 @@ except ImportError: from cassandra.util import WeakSet # NOQA from functools import partial, wraps -from itertools import groupby +from itertools import groupby, chain from cassandra import (ConsistencyLevel, AuthenticationFailed, InvalidRequest, OperationTimedOut, UnsupportedOperation, Unauthorized) -from cassandra.connection import ConnectionException, ConnectionShutdown +from cassandra.connection import (ConnectionException, ConnectionShutdown, + ConnectionHeartbeat) from cassandra.encoder import Encoder from cassandra.protocol import (QueryMessage, ResultMessage, ErrorMessage, ReadTimeoutErrorMessage, @@ -372,6 +373,14 @@ class Cluster(object): If set to :const:`None`, there will be no timeout for these queries. """ + idle_heartbeat_interval = 30 + """ + Interval, in seconds, on which to heartbeat idle connections. This helps + keep connections open through network devices that expire idle connections. + It also helps discover bad connections early in low-traffic scenarios. + Setting to zero disables heartbeats. + """ + sessions = None control_connection = None scheduler = None @@ -380,6 +389,7 @@ class Cluster(object): _is_setup = False _prepared_statements = None _prepared_statement_lock = None + _idle_heartbeat = None _user_types = None """ @@ -406,7 +416,8 @@ class Cluster(object): protocol_version=2, executor_threads=2, max_schema_agreement_wait=10, - control_connection_timeout=2.0): + control_connection_timeout=2.0, + idle_heartbeat_interval=30): """ Any of the mutable Cluster attributes may be set as keyword arguments to the constructor. @@ -456,6 +467,7 @@ class Cluster(object): self.cql_version = cql_version self.max_schema_agreement_wait = max_schema_agreement_wait self.control_connection_timeout = control_connection_timeout + self.idle_heartbeat_interval = idle_heartbeat_interval self._listeners = set() self._listener_lock = Lock() @@ -700,6 +712,8 @@ class Cluster(object): self.load_balancing_policy.check_supported() + if self.idle_heartbeat_interval: + self._idle_heartbeat = ConnectionHeartbeat(self.idle_heartbeat_interval, self.get_connection_holders) self._is_setup = True session = self._new_session() @@ -707,6 +721,13 @@ class Cluster(object): session.set_keyspace(keyspace) return session + def get_connection_holders(self): + holders = [] + for s in self.sessions: + holders.extend(s.get_pools()) + holders.append(self.control_connection) + return holders + def shutdown(self): """ Closes all sessions and connection associated with this Cluster. @@ -734,6 +755,9 @@ class Cluster(object): if self.executor: self.executor.shutdown() + if self._idle_heartbeat: + self._idle_heartbeat.stop() + def _new_session(self): session = Session(self, self.metadata.all_hosts()) for keyspace, type_map in six.iteritems(self._user_types): @@ -1656,6 +1680,9 @@ class Session(object): def get_pool_state(self): return dict((host, pool.get_state()) for host, pool in self._pools.items()) + def get_pools(self): + return self._pools.values() + class UserTypeDoesNotExist(Exception): """ @@ -2271,11 +2298,6 @@ class ControlConnection(object): # manually self.reconnect() - @property - def is_open(self): - conn = self._connection - return bool(conn and conn.is_open) - def on_up(self, host): pass @@ -2295,6 +2317,14 @@ class ControlConnection(object): def on_remove(self, host): self.refresh_node_list_and_token_map(force_token_rebuild=True) + def get_connections(self): + c = getattr(self, '_connection', None) + return [c] if c else [] + + def return_connection(self, connection): + if connection is self._connection and (connection.is_defunct or connection.is_closed): + self.reconnect() + def _stop_scheduler(scheduler, thread): try: diff --git a/cassandra/connection.py b/cassandra/connection.py index 5a58793c..fa7e7274 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -20,7 +20,7 @@ import io import logging import os import sys -from threading import Event, RLock +from threading import Thread, Event, RLock import time if 'gevent.monkey' in sys.modules: @@ -159,7 +159,7 @@ class Connection(object): in_flight = 0 # A set of available request IDs. When using the v3 protocol or higher, - # this will no initially include all request IDs in order to save memory, + # this will not initially include all request IDs in order to save memory, # but the set will grow if it is exhausted. request_ids = None @@ -172,6 +172,8 @@ class Connection(object): lock = None user_type_map = None + msg_received = False + is_control_connection = False _iobuf = None @@ -401,6 +403,8 @@ class Connection(object): with self.lock: self.request_ids.append(stream_id) + self.msg_received = True + body = None try: # check that the protocol version is supported @@ -673,6 +677,13 @@ class Connection(object): self.send_msg(query, request_id, process_result) + @property + def is_idle(self): + return self.in_flight == 0 and not self.msg_received + + def reset_idle(self): + self.msg_received = False + def __str__(self): status = "" if self.is_defunct: @@ -732,3 +743,90 @@ class ResponseWaiter(object): raise OperationTimedOut() else: return self.responses + + +class HeartbeatFuture(object): + def __init__(self, connection, owner): + self._exception = None + self._event = Event() + self.connection = connection + self.owner = owner + log.debug("Sending options message heartbeat on idle connection %s %s", + id(connection), connection.host) + with connection.lock: + connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback) + connection.in_flight += 1 + + def wait(self, timeout): + if self._event.wait(timeout): + if self._exception: + raise self._exception + else: + raise OperationTimedOut() + + def _options_callback(self, response): + if not isinstance(response, SupportedMessage): + if isinstance(response, ConnectionException): + self._exception = response + else: + self._exception = ConnectionException("Received unexpected response to OptionsMessage: %s" + % (response,)) + + log.debug("Received options response on connection (%s) from %s", + id(self.connection), self.connection.host) + self._event.set() + + +class ConnectionHeartbeat(Thread): + + def __init__(self, interval_sec, get_connection_holders): + Thread.__init__(self, name="Connection heartbeat") + self._interval = interval_sec + self._get_connection_holders = get_connection_holders + self._shutdown_event = Event() + self.daemon = True + self.start() + + def run(self): + elapsed = 0 + while not self._shutdown_event.wait(self._interval - elapsed): + start_time = time.time() + + futures = [] + failed_connections = [] + try: + for connections, owner in [(o.get_connections(), o) for o in self._get_connection_holders()]: + for connection in connections: + if not (connection.is_defunct or connection.is_closed) and connection.is_idle: + try: + futures.append(HeartbeatFuture(connection, owner)) + except Exception: + log.warning("Failed sending heartbeat message on connection (%s) to %s", + id(connection), connection.host, exc_info=True) + failed_connections.append((connection, owner)) + else: + connection.reset_idle() + + for f in futures: + connection = f.connection + try: + f.wait(self._interval) + # TODO: move this, along with connection locks in pool, down into Connection + with connection.lock: + connection.in_flight -= 1 + connection.reset_idle() + except Exception: + log.warning("Heartbeat failed for connection (%s) to %s", + id(connection), connection.host, exc_info=True) + failed_connections.append((f.connection, f.owner)) + + for connection, owner in failed_connections: + connection.defunct(Exception('Connection heartbeat failure')) + owner.return_connection(connection) + except Exception: + log.warning("Failed connection heartbeat", exc_info=True) + + elapsed = time.time() - start_time + + def stop(self): + self._shutdown_event.set() diff --git a/cassandra/pool.py b/cassandra/pool.py index 587fa277..4331be43 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -360,6 +360,10 @@ class HostConnection(object): self._connection.set_keyspace_async(keyspace, connection_finished_setting_keyspace) + def get_connections(self): + c = self._connection + return [c] if c else [] + def get_state(self): have_conn = self._connection is not None in_flight = self._connection.in_flight if have_conn else 0 @@ -693,6 +697,9 @@ class HostConnectionPool(object): for conn in self._connections: conn.set_keyspace_async(keyspace, connection_finished_setting_keyspace) + def get_connections(self): + return self._connections + def get_state(self): in_flights = ", ".join([str(c.in_flight) for c in self._connections]) return "shutdown: %s, open_count: %d, in_flights: %s" % (self.is_shutdown, self.open_count, in_flights) From d2f3875599ccb702070c2fdfcf602b3e1b795cc9 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 12 Jan 2015 17:59:16 -0600 Subject: [PATCH 2/4] cleanup; Remove unneeded member checks in Cluster --- cassandra/cluster.py | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index e8a99ebd..e4b51ab6 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -700,15 +700,14 @@ class Cluster(object): self.load_balancing_policy.populate( weakref.proxy(self), self.metadata.all_hosts()) - if self.control_connection: - try: - self.control_connection.connect() - log.debug("Control connection created") - except Exception: - log.exception("Control connection failed to connect, " - "shutting down Cluster:") - self.shutdown() - raise + try: + self.control_connection.connect() + log.debug("Control connection created") + except Exception: + log.exception("Control connection failed to connect, " + "shutting down Cluster:") + self.shutdown() + raise self.load_balancing_policy.check_supported() @@ -742,18 +741,14 @@ class Cluster(object): else: self.is_shutdown = True - if self.scheduler: - self.scheduler.shutdown() + self.scheduler.shutdown() - if self.control_connection: - self.control_connection.shutdown() + self.control_connection.shutdown() - if self.sessions: - for session in self.sessions: - session.shutdown() + for session in self.sessions: + session.shutdown() - if self.executor: - self.executor.shutdown() + self.executor.shutdown() if self._idle_heartbeat: self._idle_heartbeat.stop() From 9af1cec1621d3568dd8d10329ca5bf70d05854e5 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 16 Jan 2015 08:54:59 -0600 Subject: [PATCH 3/4] Events do not return flag state in python 2.6 review updates --- cassandra/connection.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cassandra/connection.py b/cassandra/connection.py index fa7e7274..3df5bf9a 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -758,7 +758,8 @@ class HeartbeatFuture(object): connection.in_flight += 1 def wait(self, timeout): - if self._event.wait(timeout): + self._event.wait(timeout): + if self._event.is_set(): if self._exception: raise self._exception else: @@ -788,8 +789,8 @@ class ConnectionHeartbeat(Thread): self.start() def run(self): - elapsed = 0 - while not self._shutdown_event.wait(self._interval - elapsed): + self._shutdown_event.wait(self._interval) + while not self._shutdown_event.is_set(): start_time = time.time() futures = [] @@ -827,6 +828,7 @@ class ConnectionHeartbeat(Thread): log.warning("Failed connection heartbeat", exc_info=True) elapsed = time.time() - start_time + self._shutdown_event.wait(max(self._interval - elapsed, 0.01)) def stop(self): self._shutdown_event.set() From 8433378c9e5c6a75d670a2740f95e9fb42394060 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 16 Jan 2015 09:27:52 -0600 Subject: [PATCH 4/4] Stop hearbeat before conn close, Heartbeat idle in_flight check. --- cassandra/cluster.py | 6 +++--- cassandra/connection.py | 14 +++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index e4b51ab6..1bad5fb4 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -741,6 +741,9 @@ class Cluster(object): else: self.is_shutdown = True + if self._idle_heartbeat: + self._idle_heartbeat.stop() + self.scheduler.shutdown() self.control_connection.shutdown() @@ -750,9 +753,6 @@ class Cluster(object): self.executor.shutdown() - if self._idle_heartbeat: - self._idle_heartbeat.stop() - def _new_session(self): session = Session(self, self.metadata.all_hosts()) for keyspace, type_map in six.iteritems(self._user_types): diff --git a/cassandra/connection.py b/cassandra/connection.py index 3df5bf9a..b603f56b 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -679,7 +679,7 @@ class Connection(object): @property def is_idle(self): - return self.in_flight == 0 and not self.msg_received + return not self.msg_received def reset_idle(self): self.msg_received = False @@ -754,11 +754,15 @@ class HeartbeatFuture(object): log.debug("Sending options message heartbeat on idle connection %s %s", id(connection), connection.host) with connection.lock: - connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback) - connection.in_flight += 1 + if connection.in_flight < connection.max_request_id: + connection.in_flight += 1 + connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback) + else: + self._exception = Exception("Failed to send heartbeat because connection 'in_flight' exceeds threshold") + self._event.set() def wait(self, timeout): - self._event.wait(timeout): + self._event.wait(timeout) if self._event.is_set(): if self._exception: raise self._exception @@ -825,7 +829,7 @@ class ConnectionHeartbeat(Thread): connection.defunct(Exception('Connection heartbeat failure')) owner.return_connection(connection) except Exception: - log.warning("Failed connection heartbeat", exc_info=True) + log.error("Failed connection heartbeat", exc_info=True) elapsed = time.time() - start_time self._shutdown_event.wait(max(self._interval - elapsed, 0.01))