diff --git a/cassandra/cluster.py b/cassandra/cluster.py index b0d7dcc0..b639af69 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -22,6 +22,7 @@ import atexit from collections import defaultdict from concurrent.futures import ThreadPoolExecutor import logging +from random import random import socket import sys import time @@ -391,6 +392,40 @@ class Cluster(object): Setting to zero disables heartbeats. """ + schema_event_refresh_window = 2 + """ + Window, in seconds, within which a schema component will be refreshed after + receiving a schema_change event. + + The driver delays a random amount of time in the range [0.0, window) + before executing the refresh. This serves two purposes: + + 1.) Spread the refresh for deployments with large fanout from C* to client tier, + preventing a 'thundering herd' problem with many clients refreshing simultaneously. + + 2.) Remove redundant refreshes. Redundant events arriving within the delay period + are discarded, and only one refresh is executed. + + Setting this to zero will execute refreshes immediately. + + Setting this negative will disable schema refreshes in response to push events + (refreshes will still occur in response to schema change responses to DDL statements + executed by Sessions of this Cluster). + """ + + topology_event_refresh_window = 10 + """ + Window, in seconds, within which the node and token list will be refreshed after + receiving a topology_change event. + + Setting this to zero will execute refreshes immediately. + + Setting this negative will disable node refreshes in response to push events + (refreshes will still occur in response to new nodes observed on "UP" events). + + See :attr:`.schema_event_refresh_window` for discussion of rationale + """ + sessions = None control_connection = None scheduler = None @@ -427,7 +462,9 @@ class Cluster(object): executor_threads=2, max_schema_agreement_wait=10, control_connection_timeout=2.0, - idle_heartbeat_interval=30): + idle_heartbeat_interval=30, + schema_event_refresh_window=2, + topology_event_refresh_window=10): """ Any of the mutable Cluster attributes may be set as keyword arguments to the constructor. @@ -478,6 +515,8 @@ class Cluster(object): self.max_schema_agreement_wait = max_schema_agreement_wait self.control_connection_timeout = control_connection_timeout self.idle_heartbeat_interval = idle_heartbeat_interval + self.schema_event_refresh_window = schema_event_refresh_window + self.topology_event_refresh_window = topology_event_refresh_window self._listeners = set() self._listener_lock = Lock() @@ -522,7 +561,8 @@ class Cluster(object): self.metrics = Metrics(weakref.proxy(self)) self.control_connection = ControlConnection( - self, self.control_connection_timeout) + self, self.control_connection_timeout, + self.schema_event_refresh_window, self.topology_event_refresh_window) def register_user_type(self, keyspace, user_type, klass): """ @@ -936,7 +976,7 @@ class Cluster(object): self._start_reconnector(host, is_host_addition) - def on_add(self, host): + def on_add(self, host, refresh_nodes=True): if self.is_shutdown: return @@ -948,7 +988,7 @@ class Cluster(object): log.debug("Done preparing queries for new host %r", host) self.load_balancing_policy.on_add(host) - self.control_connection.on_add(host) + self.control_connection.on_add(host, refresh_nodes) if distance == HostDistance.IGNORED: log.debug("Not adding connection pool for new host %r because the " @@ -1024,7 +1064,7 @@ class Cluster(object): self.on_down(host, is_host_addition, expect_host_to_be_down) return is_down - def add_host(self, address, datacenter=None, rack=None, signal=True): + def add_host(self, address, datacenter=None, rack=None, signal=True, refresh_nodes=True): """ Called when adding initial contact points and when the control connection subsequently discovers a new node. Intended for internal @@ -1033,7 +1073,7 @@ class Cluster(object): new_host = self.metadata.add_host(address, datacenter, rack) if new_host and signal: log.info("New Cassandra host %r discovered", new_host) - self.on_add(new_host) + self.on_add(new_host, refresh_nodes) return new_host @@ -1080,11 +1120,11 @@ class Cluster(object): By default, the timeout for this operation is governed by :attr:`~.Cluster.max_schema_agreement_wait` and :attr:`~.Cluster.control_connection_timeout`. - + Passing max_schema_agreement_wait here overrides :attr:`~.Cluster.max_schema_agreement_wait`. - + Setting max_schema_agreement_wait <= 0 will bypass schema agreement and refresh schema immediately. - + An Exception is raised if schema refresh fails for any reason. """ if not self.control_connection.refresh_schema(keyspace, table, usertype, max_schema_agreement_wait): @@ -1099,6 +1139,27 @@ class Cluster(object): return self.executor.submit( self.control_connection.refresh_schema, keyspace, table, usertype) + def refresh_nodes(self): + """ + Synchronously refresh the node list and token metadata + + An Exception is raised if node refresh fails for any reason. + """ + if not self.control_connection.refresh_node_list_and_token_map(): + raise Exception("Node list was not refreshed. See log for details.") + + def set_meta_refresh_enabled(self, enabled): + """ + Sets a flag to enable (True) or disable (False) all metadata refresh queries. + This applies to both schema and node topology. + + Disabling this is useful to minimize refreshes during multiple changes. + + Meta refresh must be enabled for the driver to become aware of any cluster + topology changes or schema updates. + """ + self.control_connection.set_meta_refresh_enabled(bool(enabled)) + def _prepare_all_queries(self, host): if not self._prepared_statements: return @@ -1770,16 +1831,26 @@ class ControlConnection(object): _timeout = None _protocol_version = None + _schema_event_refresh_window = None + _topology_event_refresh_window = None + + _meta_refresh_enabled = True + # for testing purposes _time = time - def __init__(self, cluster, timeout): + def __init__(self, cluster, timeout, + schema_event_refresh_window, + topology_event_refresh_window): # use a weak reference to allow the Cluster instance to be GC'ed (and # shutdown) since implementing __del__ disables the cycle detector self._cluster = weakref.proxy(cluster) self._connection = None self._timeout = timeout + self._schema_event_refresh_window = schema_event_refresh_window + self._topology_event_refresh_window = topology_event_refresh_window + self._lock = RLock() self._schema_agreement_lock = Lock() @@ -1937,6 +2008,10 @@ class ControlConnection(object): def refresh_schema(self, keyspace=None, table=None, usertype=None, schema_agreement_wait=None): + if not self._meta_refresh_enabled: + log.debug("[control connection] Skipping schema refresh because meta refresh is disabled") + return False + try: if self._connection: return self._refresh_schema(self._connection, keyspace, table, usertype, @@ -2064,14 +2139,20 @@ class ControlConnection(object): return True def refresh_node_list_and_token_map(self, force_token_rebuild=False): + if not self._meta_refresh_enabled: + log.debug("[control connection] Skipping node list refresh because meta refresh is disabled") + return False + try: if self._connection: self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild) + return True except ReferenceError: pass # our weak reference to the Cluster is no good except Exception: log.debug("[control connection] Error refreshing node list and token map", exc_info=True) self._signal_error() + return False def _refresh_node_list_and_token_map(self, connection, preloaded_results=None, force_token_rebuild=False): @@ -2132,7 +2213,7 @@ class ControlConnection(object): rack = row.get("rack") if host is None: log.debug("[control connection] Found new host to connect to: %s", addr) - host = self._cluster.add_host(addr, datacenter, rack, signal=True) + host = self._cluster.add_host(addr, datacenter, rack, signal=True, refresh_nodes=False) should_rebuild_token_map = True else: should_rebuild_token_map |= self._update_location_info(host, datacenter, rack) @@ -2167,25 +2248,25 @@ class ControlConnection(object): def _handle_topology_change(self, event): change_type = event["change_type"] addr, port = event["address"] - if change_type == "NEW_NODE": - self._cluster.scheduler.schedule(10, self.refresh_node_list_and_token_map) + if change_type == "NEW_NODE" or change_type == "MOVED_NODE": + if self._topology_event_refresh_window >= 0: + delay = random() * self._topology_event_refresh_window + self._cluster.scheduler.schedule_unique(delay, self.refresh_node_list_and_token_map) elif change_type == "REMOVED_NODE": host = self._cluster.metadata.get_host(addr) - self._cluster.scheduler.schedule(0, self._cluster.remove_host, host) - elif change_type == "MOVED_NODE": - self._cluster.scheduler.schedule(1, self.refresh_node_list_and_token_map) + self._cluster.scheduler.schedule_unique(0, self._cluster.remove_host, host) def _handle_status_change(self, event): change_type = event["change_type"] addr, port = event["address"] host = self._cluster.metadata.get_host(addr) if change_type == "UP": + delay = 1 + random() * 0.5 # randomness to avoid thundering herd problem on events if host is None: # this is the first time we've seen the node - self._cluster.scheduler.schedule(2, self.refresh_node_list_and_token_map) + self._cluster.scheduler.schedule_unique(delay, self.refresh_node_list_and_token_map) else: - # this will be run by the scheduler - self._cluster.scheduler.schedule(2, self._cluster.on_up, host) + self._cluster.scheduler.schedule_unique(delay, self._cluster.on_up, host) elif change_type == "DOWN": # Note that there is a slight risk we can receive the event late and thus # mark the host down even though we already had reconnected successfully. @@ -2196,10 +2277,14 @@ class ControlConnection(object): self._cluster.on_down(host, is_host_addition=False) def _handle_schema_change(self, event): + if self._schema_event_refresh_window < 0: + return + keyspace = event.get('keyspace') table = event.get('table') usertype = event.get('type') - self._submit(self.refresh_schema, keyspace, table, usertype) + delay = random() * self._schema_event_refresh_window + self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype) def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None): @@ -2320,8 +2405,9 @@ class ControlConnection(object): # this will result in a task being submitted to the executor to reconnect self.reconnect() - def on_add(self, host): - self.refresh_node_list_and_token_map(force_token_rebuild=True) + def on_add(self, host, refresh_nodes=True): + if refresh_nodes: + self.refresh_node_list_and_token_map(force_token_rebuild=True) def on_remove(self, host): self.refresh_node_list_and_token_map(force_token_rebuild=True) @@ -2334,6 +2420,9 @@ class ControlConnection(object): if connection is self._connection and (connection.is_defunct or connection.is_closed): self.reconnect() + def set_meta_refresh_enabled(self, enabled): + self._meta_refresh_enabled = enabled + def _stop_scheduler(scheduler, thread): try: @@ -2347,12 +2436,14 @@ def _stop_scheduler(scheduler, thread): class _Scheduler(object): - _scheduled = None + _queue = None + _scheduled_tasks = None _executor = None is_shutdown = False def __init__(self, executor): - self._scheduled = Queue.PriorityQueue() + self._queue = Queue.PriorityQueue() + self._scheduled_tasks = set() self._executor = executor t = Thread(target=self.run, name="Task Scheduler") @@ -2370,14 +2461,25 @@ class _Scheduler(object): # this can happen on interpreter shutdown pass self.is_shutdown = True - self._scheduled.put_nowait((0, None)) + self._queue.put_nowait((0, None)) - def schedule(self, delay, fn, *args, **kwargs): + def schedule(self, delay, fn, *args): + self._insert_task(delay, (fn, args)) + + def schedule_unique(self, delay, fn, *args): + task = (fn, args) + if task not in self._scheduled_tasks: + self._insert_task(delay, task) + else: + log.debug("Ignoring schedule_unique for already-scheduled task: %r", task) + + def _insert_task(self, delay, task): if not self.is_shutdown: run_at = time.time() + delay - self._scheduled.put_nowait((run_at, (fn, args, kwargs))) + self._scheduled_tasks.add(task) + self._queue.put_nowait((run_at, task)) else: - log.debug("Ignoring scheduled function after shutdown: %r", fn) + log.debug("Ignoring scheduled task after shutdown: %r", task) def run(self): while True: @@ -2386,16 +2488,17 @@ class _Scheduler(object): try: while True: - run_at, task = self._scheduled.get(block=True, timeout=None) + run_at, task = self._queue.get(block=True, timeout=None) if self.is_shutdown: log.debug("Not executing scheduled task due to Scheduler shutdown") return if run_at <= time.time(): - fn, args, kwargs = task - future = self._executor.submit(fn, *args, **kwargs) + self._scheduled_tasks.remove(task) + fn, args = task + future = self._executor.submit(fn, *args) future.add_done_callback(self._log_if_failed) else: - self._scheduled.put_nowait((run_at, task)) + self._queue.put_nowait((run_at, task)) break except Queue.Empty: pass @@ -2412,9 +2515,13 @@ class _Scheduler(object): def refresh_schema_and_set_result(keyspace, table, usertype, control_conn, response_future): try: - log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s", - keyspace, table, usertype) - control_conn._refresh_schema(response_future._connection, keyspace, table, usertype) + if control_conn._meta_refresh_enabled: + log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s", + keyspace, table, usertype) + control_conn._refresh_schema(response_future._connection, keyspace, table, usertype) + else: + log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; " + "Keyspace: %s; Table: %s, Type: %s", keyspace, table, usertype) except Exception: log.exception("Exception refreshing schema in response to schema change:") response_future.session.submit( diff --git a/cassandra/connection.py b/cassandra/connection.py index a7d98ecb..9a41a0d6 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -802,15 +802,19 @@ class ConnectionHeartbeat(Thread): try: for connections, owner in [(o.get_connections(), o) for o in self._get_connection_holders()]: for connection in connections: - if not (connection.is_defunct or connection.is_closed) and connection.is_idle: - try: - futures.append(HeartbeatFuture(connection, owner)) - except Exception: - log.warning("Failed sending heartbeat message on connection (%s) to %s", - id(connection), connection.host, exc_info=True) - failed_connections.append((connection, owner)) + if not (connection.is_defunct or connection.is_closed): + if connection.is_idle: + try: + futures.append(HeartbeatFuture(connection, owner)) + except Exception: + log.warning("Failed sending heartbeat message on connection (%s) to %s", + id(connection), connection.host, exc_info=True) + failed_connections.append((connection, owner)) + else: + connection.reset_idle() else: - connection.reset_idle() + # make sure the owner sees this defunt/closed connection + owner.return_connection(connection) for f in futures: connection = f.connection diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst index f0a9a312..b1b1aebb 100644 --- a/docs/api/cassandra/cluster.rst +++ b/docs/api/cassandra/cluster.rst @@ -41,6 +41,10 @@ .. autoattribute:: idle_heartbeat_interval + .. autoattribute:: schema_event_refresh_window + + .. autoattribute:: topology_event_refresh_window + .. automethod:: connect .. automethod:: shutdown @@ -61,6 +65,11 @@ .. automethod:: refresh_schema + .. automethod:: refresh_nodes + + .. automethod:: set_meta_refresh_enabled + + .. autoclass:: Session () .. autoattribute:: default_timeout diff --git a/tests/integration/standard/test_authentication.py b/tests/integration/standard/test_authentication.py index f2488888..1534947a 100644 --- a/tests/integration/standard/test_authentication.py +++ b/tests/integration/standard/test_authentication.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import time from cassandra.cluster import Cluster, NoHostAvailable from cassandra.auth import PlainTextAuthProvider, SASLClient, SaslAuthProvider @@ -36,7 +37,10 @@ def setup_module(): 'authorizer': 'CassandraAuthorizer'} ccm_cluster.set_configuration_options(config_options) log.debug("Starting ccm test cluster with %s", config_options) - ccm_cluster.start(wait_for_binary_proto=True) + ccm_cluster.start(wait_for_binary_proto=True, wait_other_notice=True) + # there seems to be some race, with some versions of C* taking longer to + # get the auth (and default user) setup. Sleep here to give it a chance + time.sleep(2) def teardown_module(): diff --git a/tests/unit/test_control_connection.py b/tests/unit/test_control_connection.py index 12d09781..de95dbf4 100644 --- a/tests/unit/test_control_connection.py +++ b/tests/unit/test_control_connection.py @@ -73,7 +73,7 @@ class MockCluster(object): self.scheduler = Mock(spec=_Scheduler) self.executor = Mock(spec=ThreadPoolExecutor) - def add_host(self, address, datacenter, rack, signal=False): + def add_host(self, address, datacenter, rack, signal=False, refresh_nodes=True): host = Host(address, SimpleConvictionPolicy, datacenter, rack) self.added_hosts.append(host) return host @@ -131,7 +131,7 @@ class ControlConnectionTest(unittest.TestCase): self.connection = MockConnection() self.time = FakeTime() - self.control_connection = ControlConnection(self.cluster, timeout=1) + self.control_connection = ControlConnection(self.cluster, 1, 0, 0) self.control_connection._connection = self.connection self.control_connection._time = self.time @@ -345,39 +345,44 @@ class ControlConnectionTest(unittest.TestCase): 'change_type': 'NEW_NODE', 'address': ('1.2.3.4', 9000) } + self.cluster.scheduler.reset_mock() self.control_connection._handle_topology_change(event) - self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map) + self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_node_list_and_token_map) event = { 'change_type': 'REMOVED_NODE', 'address': ('1.2.3.4', 9000) } + self.cluster.scheduler.reset_mock() self.control_connection._handle_topology_change(event) - self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.remove_host, None) + self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.cluster.remove_host, None) event = { 'change_type': 'MOVED_NODE', 'address': ('1.2.3.4', 9000) } + self.cluster.scheduler.reset_mock() self.control_connection._handle_topology_change(event) - self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map) + self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_node_list_and_token_map) def test_handle_status_change(self): event = { 'change_type': 'UP', 'address': ('1.2.3.4', 9000) } + self.cluster.scheduler.reset_mock() self.control_connection._handle_status_change(event) - self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map) + self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_node_list_and_token_map) # do the same with a known Host event = { 'change_type': 'UP', 'address': ('192.168.1.0', 9000) } + self.cluster.scheduler.reset_mock() self.control_connection._handle_status_change(event) host = self.cluster.metadata.hosts['192.168.1.0'] - self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.on_up, host) + self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.cluster.on_up, host) self.cluster.scheduler.schedule.reset_mock() event = { @@ -404,9 +409,11 @@ class ControlConnectionTest(unittest.TestCase): 'keyspace': 'ks1', 'table': 'table1' } + self.cluster.scheduler.reset_mock() self.control_connection._handle_schema_change(event) - self.cluster.executor.submit.assert_called_with(self.control_connection.refresh_schema, 'ks1', 'table1', None) + self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', 'table1', None) + self.cluster.scheduler.reset_mock() event['table'] = None self.control_connection._handle_schema_change(event) - self.cluster.executor.submit.assert_called_with(self.control_connection.refresh_schema, 'ks1', None, None) + self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', None, None)