Merge pull request #248 from datastax/PYTHON-202

PYTHON-202 - Meta Refresh Controls
This commit is contained in:
Adam Holmberg
2015-01-29 13:22:35 -06:00
5 changed files with 184 additions and 53 deletions

View File

@@ -22,6 +22,7 @@ import atexit
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import logging
from random import random
import socket
import sys
import time
@@ -391,6 +392,40 @@ class Cluster(object):
Setting to zero disables heartbeats.
"""
schema_event_refresh_window = 2
"""
Window, in seconds, within which a schema component will be refreshed after
receiving a schema_change event.
The driver delays a random amount of time in the range [0.0, window)
before executing the refresh. This serves two purposes:
1.) Spread the refresh for deployments with large fanout from C* to client tier,
preventing a 'thundering herd' problem with many clients refreshing simultaneously.
2.) Remove redundant refreshes. Redundant events arriving within the delay period
are discarded, and only one refresh is executed.
Setting this to zero will execute refreshes immediately.
Setting this negative will disable schema refreshes in response to push events
(refreshes will still occur in response to schema change responses to DDL statements
executed by Sessions of this Cluster).
"""
topology_event_refresh_window = 10
"""
Window, in seconds, within which the node and token list will be refreshed after
receiving a topology_change event.
Setting this to zero will execute refreshes immediately.
Setting this negative will disable node refreshes in response to push events
(refreshes will still occur in response to new nodes observed on "UP" events).
See :attr:`.schema_event_refresh_window` for discussion of rationale
"""
sessions = None
control_connection = None
scheduler = None
@@ -427,7 +462,9 @@ class Cluster(object):
executor_threads=2,
max_schema_agreement_wait=10,
control_connection_timeout=2.0,
idle_heartbeat_interval=30):
idle_heartbeat_interval=30,
schema_event_refresh_window=2,
topology_event_refresh_window=10):
"""
Any of the mutable Cluster attributes may be set as keyword arguments
to the constructor.
@@ -478,6 +515,8 @@ class Cluster(object):
self.max_schema_agreement_wait = max_schema_agreement_wait
self.control_connection_timeout = control_connection_timeout
self.idle_heartbeat_interval = idle_heartbeat_interval
self.schema_event_refresh_window = schema_event_refresh_window
self.topology_event_refresh_window = topology_event_refresh_window
self._listeners = set()
self._listener_lock = Lock()
@@ -522,7 +561,8 @@ class Cluster(object):
self.metrics = Metrics(weakref.proxy(self))
self.control_connection = ControlConnection(
self, self.control_connection_timeout)
self, self.control_connection_timeout,
self.schema_event_refresh_window, self.topology_event_refresh_window)
def register_user_type(self, keyspace, user_type, klass):
"""
@@ -936,7 +976,7 @@ class Cluster(object):
self._start_reconnector(host, is_host_addition)
def on_add(self, host):
def on_add(self, host, refresh_nodes=True):
if self.is_shutdown:
return
@@ -948,7 +988,7 @@ class Cluster(object):
log.debug("Done preparing queries for new host %r", host)
self.load_balancing_policy.on_add(host)
self.control_connection.on_add(host)
self.control_connection.on_add(host, refresh_nodes)
if distance == HostDistance.IGNORED:
log.debug("Not adding connection pool for new host %r because the "
@@ -1024,7 +1064,7 @@ class Cluster(object):
self.on_down(host, is_host_addition, expect_host_to_be_down)
return is_down
def add_host(self, address, datacenter=None, rack=None, signal=True):
def add_host(self, address, datacenter=None, rack=None, signal=True, refresh_nodes=True):
"""
Called when adding initial contact points and when the control
connection subsequently discovers a new node. Intended for internal
@@ -1033,7 +1073,7 @@ class Cluster(object):
new_host = self.metadata.add_host(address, datacenter, rack)
if new_host and signal:
log.info("New Cassandra host %r discovered", new_host)
self.on_add(new_host)
self.on_add(new_host, refresh_nodes)
return new_host
@@ -1080,11 +1120,11 @@ class Cluster(object):
By default, the timeout for this operation is governed by :attr:`~.Cluster.max_schema_agreement_wait`
and :attr:`~.Cluster.control_connection_timeout`.
Passing max_schema_agreement_wait here overrides :attr:`~.Cluster.max_schema_agreement_wait`.
Setting max_schema_agreement_wait <= 0 will bypass schema agreement and refresh schema immediately.
An Exception is raised if schema refresh fails for any reason.
"""
if not self.control_connection.refresh_schema(keyspace, table, usertype, max_schema_agreement_wait):
@@ -1099,6 +1139,27 @@ class Cluster(object):
return self.executor.submit(
self.control_connection.refresh_schema, keyspace, table, usertype)
def refresh_nodes(self):
"""
Synchronously refresh the node list and token metadata
An Exception is raised if node refresh fails for any reason.
"""
if not self.control_connection.refresh_node_list_and_token_map():
raise Exception("Node list was not refreshed. See log for details.")
def set_meta_refresh_enabled(self, enabled):
"""
Sets a flag to enable (True) or disable (False) all metadata refresh queries.
This applies to both schema and node topology.
Disabling this is useful to minimize refreshes during multiple changes.
Meta refresh must be enabled for the driver to become aware of any cluster
topology changes or schema updates.
"""
self.control_connection.set_meta_refresh_enabled(bool(enabled))
def _prepare_all_queries(self, host):
if not self._prepared_statements:
return
@@ -1770,16 +1831,26 @@ class ControlConnection(object):
_timeout = None
_protocol_version = None
_schema_event_refresh_window = None
_topology_event_refresh_window = None
_meta_refresh_enabled = True
# for testing purposes
_time = time
def __init__(self, cluster, timeout):
def __init__(self, cluster, timeout,
schema_event_refresh_window,
topology_event_refresh_window):
# use a weak reference to allow the Cluster instance to be GC'ed (and
# shutdown) since implementing __del__ disables the cycle detector
self._cluster = weakref.proxy(cluster)
self._connection = None
self._timeout = timeout
self._schema_event_refresh_window = schema_event_refresh_window
self._topology_event_refresh_window = topology_event_refresh_window
self._lock = RLock()
self._schema_agreement_lock = Lock()
@@ -1937,6 +2008,10 @@ class ControlConnection(object):
def refresh_schema(self, keyspace=None, table=None, usertype=None,
schema_agreement_wait=None):
if not self._meta_refresh_enabled:
log.debug("[control connection] Skipping schema refresh because meta refresh is disabled")
return False
try:
if self._connection:
return self._refresh_schema(self._connection, keyspace, table, usertype,
@@ -2064,14 +2139,20 @@ class ControlConnection(object):
return True
def refresh_node_list_and_token_map(self, force_token_rebuild=False):
if not self._meta_refresh_enabled:
log.debug("[control connection] Skipping node list refresh because meta refresh is disabled")
return False
try:
if self._connection:
self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild)
return True
except ReferenceError:
pass # our weak reference to the Cluster is no good
except Exception:
log.debug("[control connection] Error refreshing node list and token map", exc_info=True)
self._signal_error()
return False
def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
force_token_rebuild=False):
@@ -2132,7 +2213,7 @@ class ControlConnection(object):
rack = row.get("rack")
if host is None:
log.debug("[control connection] Found new host to connect to: %s", addr)
host = self._cluster.add_host(addr, datacenter, rack, signal=True)
host = self._cluster.add_host(addr, datacenter, rack, signal=True, refresh_nodes=False)
should_rebuild_token_map = True
else:
should_rebuild_token_map |= self._update_location_info(host, datacenter, rack)
@@ -2167,25 +2248,25 @@ class ControlConnection(object):
def _handle_topology_change(self, event):
change_type = event["change_type"]
addr, port = event["address"]
if change_type == "NEW_NODE":
self._cluster.scheduler.schedule(10, self.refresh_node_list_and_token_map)
if change_type == "NEW_NODE" or change_type == "MOVED_NODE":
if self._topology_event_refresh_window >= 0:
delay = random() * self._topology_event_refresh_window
self._cluster.scheduler.schedule_unique(delay, self.refresh_node_list_and_token_map)
elif change_type == "REMOVED_NODE":
host = self._cluster.metadata.get_host(addr)
self._cluster.scheduler.schedule(0, self._cluster.remove_host, host)
elif change_type == "MOVED_NODE":
self._cluster.scheduler.schedule(1, self.refresh_node_list_and_token_map)
self._cluster.scheduler.schedule_unique(0, self._cluster.remove_host, host)
def _handle_status_change(self, event):
change_type = event["change_type"]
addr, port = event["address"]
host = self._cluster.metadata.get_host(addr)
if change_type == "UP":
delay = 1 + random() * 0.5 # randomness to avoid thundering herd problem on events
if host is None:
# this is the first time we've seen the node
self._cluster.scheduler.schedule(2, self.refresh_node_list_and_token_map)
self._cluster.scheduler.schedule_unique(delay, self.refresh_node_list_and_token_map)
else:
# this will be run by the scheduler
self._cluster.scheduler.schedule(2, self._cluster.on_up, host)
self._cluster.scheduler.schedule_unique(delay, self._cluster.on_up, host)
elif change_type == "DOWN":
# Note that there is a slight risk we can receive the event late and thus
# mark the host down even though we already had reconnected successfully.
@@ -2196,10 +2277,14 @@ class ControlConnection(object):
self._cluster.on_down(host, is_host_addition=False)
def _handle_schema_change(self, event):
if self._schema_event_refresh_window < 0:
return
keyspace = event.get('keyspace')
table = event.get('table')
usertype = event.get('type')
self._submit(self.refresh_schema, keyspace, table, usertype)
delay = random() * self._schema_event_refresh_window
self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype)
def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None):
@@ -2320,8 +2405,9 @@ class ControlConnection(object):
# this will result in a task being submitted to the executor to reconnect
self.reconnect()
def on_add(self, host):
self.refresh_node_list_and_token_map(force_token_rebuild=True)
def on_add(self, host, refresh_nodes=True):
if refresh_nodes:
self.refresh_node_list_and_token_map(force_token_rebuild=True)
def on_remove(self, host):
self.refresh_node_list_and_token_map(force_token_rebuild=True)
@@ -2334,6 +2420,9 @@ class ControlConnection(object):
if connection is self._connection and (connection.is_defunct or connection.is_closed):
self.reconnect()
def set_meta_refresh_enabled(self, enabled):
self._meta_refresh_enabled = enabled
def _stop_scheduler(scheduler, thread):
try:
@@ -2347,12 +2436,14 @@ def _stop_scheduler(scheduler, thread):
class _Scheduler(object):
_scheduled = None
_queue = None
_scheduled_tasks = None
_executor = None
is_shutdown = False
def __init__(self, executor):
self._scheduled = Queue.PriorityQueue()
self._queue = Queue.PriorityQueue()
self._scheduled_tasks = set()
self._executor = executor
t = Thread(target=self.run, name="Task Scheduler")
@@ -2370,14 +2461,25 @@ class _Scheduler(object):
# this can happen on interpreter shutdown
pass
self.is_shutdown = True
self._scheduled.put_nowait((0, None))
self._queue.put_nowait((0, None))
def schedule(self, delay, fn, *args, **kwargs):
def schedule(self, delay, fn, *args):
self._insert_task(delay, (fn, args))
def schedule_unique(self, delay, fn, *args):
task = (fn, args)
if task not in self._scheduled_tasks:
self._insert_task(delay, task)
else:
log.debug("Ignoring schedule_unique for already-scheduled task: %r", task)
def _insert_task(self, delay, task):
if not self.is_shutdown:
run_at = time.time() + delay
self._scheduled.put_nowait((run_at, (fn, args, kwargs)))
self._scheduled_tasks.add(task)
self._queue.put_nowait((run_at, task))
else:
log.debug("Ignoring scheduled function after shutdown: %r", fn)
log.debug("Ignoring scheduled task after shutdown: %r", task)
def run(self):
while True:
@@ -2386,16 +2488,17 @@ class _Scheduler(object):
try:
while True:
run_at, task = self._scheduled.get(block=True, timeout=None)
run_at, task = self._queue.get(block=True, timeout=None)
if self.is_shutdown:
log.debug("Not executing scheduled task due to Scheduler shutdown")
return
if run_at <= time.time():
fn, args, kwargs = task
future = self._executor.submit(fn, *args, **kwargs)
self._scheduled_tasks.remove(task)
fn, args = task
future = self._executor.submit(fn, *args)
future.add_done_callback(self._log_if_failed)
else:
self._scheduled.put_nowait((run_at, task))
self._queue.put_nowait((run_at, task))
break
except Queue.Empty:
pass
@@ -2412,9 +2515,13 @@ class _Scheduler(object):
def refresh_schema_and_set_result(keyspace, table, usertype, control_conn, response_future):
try:
log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s",
keyspace, table, usertype)
control_conn._refresh_schema(response_future._connection, keyspace, table, usertype)
if control_conn._meta_refresh_enabled:
log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s",
keyspace, table, usertype)
control_conn._refresh_schema(response_future._connection, keyspace, table, usertype)
else:
log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; "
"Keyspace: %s; Table: %s, Type: %s", keyspace, table, usertype)
except Exception:
log.exception("Exception refreshing schema in response to schema change:")
response_future.session.submit(

View File

@@ -802,15 +802,19 @@ class ConnectionHeartbeat(Thread):
try:
for connections, owner in [(o.get_connections(), o) for o in self._get_connection_holders()]:
for connection in connections:
if not (connection.is_defunct or connection.is_closed) and connection.is_idle:
try:
futures.append(HeartbeatFuture(connection, owner))
except Exception:
log.warning("Failed sending heartbeat message on connection (%s) to %s",
id(connection), connection.host, exc_info=True)
failed_connections.append((connection, owner))
if not (connection.is_defunct or connection.is_closed):
if connection.is_idle:
try:
futures.append(HeartbeatFuture(connection, owner))
except Exception:
log.warning("Failed sending heartbeat message on connection (%s) to %s",
id(connection), connection.host, exc_info=True)
failed_connections.append((connection, owner))
else:
connection.reset_idle()
else:
connection.reset_idle()
# make sure the owner sees this defunt/closed connection
owner.return_connection(connection)
for f in futures:
connection = f.connection

View File

@@ -41,6 +41,10 @@
.. autoattribute:: idle_heartbeat_interval
.. autoattribute:: schema_event_refresh_window
.. autoattribute:: topology_event_refresh_window
.. automethod:: connect
.. automethod:: shutdown
@@ -61,6 +65,11 @@
.. automethod:: refresh_schema
.. automethod:: refresh_nodes
.. automethod:: set_meta_refresh_enabled
.. autoclass:: Session ()
.. autoattribute:: default_timeout

View File

@@ -13,6 +13,7 @@
# limitations under the License.
import logging
import time
from cassandra.cluster import Cluster, NoHostAvailable
from cassandra.auth import PlainTextAuthProvider, SASLClient, SaslAuthProvider
@@ -36,7 +37,10 @@ def setup_module():
'authorizer': 'CassandraAuthorizer'}
ccm_cluster.set_configuration_options(config_options)
log.debug("Starting ccm test cluster with %s", config_options)
ccm_cluster.start(wait_for_binary_proto=True)
ccm_cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
# there seems to be some race, with some versions of C* taking longer to
# get the auth (and default user) setup. Sleep here to give it a chance
time.sleep(2)
def teardown_module():

View File

@@ -73,7 +73,7 @@ class MockCluster(object):
self.scheduler = Mock(spec=_Scheduler)
self.executor = Mock(spec=ThreadPoolExecutor)
def add_host(self, address, datacenter, rack, signal=False):
def add_host(self, address, datacenter, rack, signal=False, refresh_nodes=True):
host = Host(address, SimpleConvictionPolicy, datacenter, rack)
self.added_hosts.append(host)
return host
@@ -131,7 +131,7 @@ class ControlConnectionTest(unittest.TestCase):
self.connection = MockConnection()
self.time = FakeTime()
self.control_connection = ControlConnection(self.cluster, timeout=1)
self.control_connection = ControlConnection(self.cluster, 1, 0, 0)
self.control_connection._connection = self.connection
self.control_connection._time = self.time
@@ -345,39 +345,44 @@ class ControlConnectionTest(unittest.TestCase):
'change_type': 'NEW_NODE',
'address': ('1.2.3.4', 9000)
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_topology_change(event)
self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_node_list_and_token_map)
event = {
'change_type': 'REMOVED_NODE',
'address': ('1.2.3.4', 9000)
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_topology_change(event)
self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.remove_host, None)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.cluster.remove_host, None)
event = {
'change_type': 'MOVED_NODE',
'address': ('1.2.3.4', 9000)
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_topology_change(event)
self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_node_list_and_token_map)
def test_handle_status_change(self):
event = {
'change_type': 'UP',
'address': ('1.2.3.4', 9000)
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_status_change(event)
self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_node_list_and_token_map)
# do the same with a known Host
event = {
'change_type': 'UP',
'address': ('192.168.1.0', 9000)
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_status_change(event)
host = self.cluster.metadata.hosts['192.168.1.0']
self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.on_up, host)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.cluster.on_up, host)
self.cluster.scheduler.schedule.reset_mock()
event = {
@@ -404,9 +409,11 @@ class ControlConnectionTest(unittest.TestCase):
'keyspace': 'ks1',
'table': 'table1'
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_schema_change(event)
self.cluster.executor.submit.assert_called_with(self.control_connection.refresh_schema, 'ks1', 'table1', None)
self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', 'table1', None)
self.cluster.scheduler.reset_mock()
event['table'] = None
self.control_connection._handle_schema_change(event)
self.cluster.executor.submit.assert_called_with(self.control_connection.refresh_schema, 'ks1', None, None)
self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', None, None)