Merge pull request #237 from datastax/PYTHON-205

PYTHON-205 - Build Schema on Disagreement
This commit is contained in:
Adam Holmberg
2015-01-08 14:09:28 -06:00
5 changed files with 59 additions and 28 deletions

View File

@@ -336,6 +336,7 @@ class Cluster(object):
"""
The maximum duration (in seconds) that the driver will wait for schema
agreement across the cluster. Defaults to ten seconds.
If set <= 0, the driver will bypass schema agreement waits altogether.
"""
metadata = None
@@ -1044,14 +1045,26 @@ class Cluster(object):
for pool in session._pools.values():
pool.ensure_core_connections()
def submit_schema_refresh(self, keyspace=None, table=None):
def refresh_schema(self, keyspace=None, table=None, usertype=None, schema_agreement_wait=None):
"""
Synchronously refresh the schema metadata.
By default timeout for this operation is governed by :attr:`~.Cluster.max_schema_agreement_wait`
and :attr:`~.Cluster.control_connection_timeout`.
Passing schema_agreement_wait here overrides :attr:`~.Cluster.max_schema_agreement_wait`.
Setting schema_agreement_wait <= 0 will bypass schema agreement and refresh schema immediately.
An Exception is raised if schema refresh fails for any reason.
"""
if not self.control_connection.refresh_schema(keyspace, table, usertype, schema_agreement_wait):
raise Exception("Schema was not refreshed. See log for details.")
def submit_schema_refresh(self, keyspace=None, table=None, usertype=None):
"""
Schedule a refresh of the internal representation of the current
schema for this cluster. If `keyspace` is specified, only that
keyspace will be refreshed, and likewise for `table`.
"""
return self.executor.submit(
self.control_connection.refresh_schema, keyspace, table)
self.control_connection.refresh_schema, keyspace, table, usertype)
def _prepare_all_queries(self, host):
if not self._prepared_statements:
@@ -1810,6 +1823,9 @@ class ControlConnection(object):
self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
self._refresh_schema(connection, preloaded_results=shared_results)
if not self._cluster.metadata.keyspaces:
log.warning("[control connection] No schema built on connect; retrying without wait for schema agreement")
self._refresh_schema(connection, preloaded_results=shared_results, schema_agreement_wait=0)
except Exception:
connection.close()
raise
@@ -1883,26 +1899,32 @@ class ControlConnection(object):
self._connection.close()
del self._connection
def refresh_schema(self, keyspace=None, table=None, usertype=None):
def refresh_schema(self, keyspace=None, table=None, usertype=None,
schema_agreement_wait=None):
try:
if self._connection:
self._refresh_schema(self._connection, keyspace, table, usertype)
return self._refresh_schema(self._connection, keyspace, table, usertype,
schema_agreement_wait=schema_agreement_wait)
except ReferenceError:
pass # our weak reference to the Cluster is no good
except Exception:
log.debug("[control connection] Error refreshing schema", exc_info=True)
self._signal_error()
return False
def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None, preloaded_results=None):
def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None,
preloaded_results=None, schema_agreement_wait=None):
if self._cluster.is_shutdown:
return
return False
assert table is None or usertype is None
agreed = self.wait_for_schema_agreement(connection, preloaded_results=preloaded_results)
agreed = self.wait_for_schema_agreement(connection,
preloaded_results=preloaded_results,
wait_time=schema_agreement_wait)
if not agreed:
log.debug("Skipping schema refresh due to lack of schema agreement")
return
return False
cl = ConsistencyLevel.ONE
if table:
@@ -1918,7 +1940,7 @@ class ControlConnection(object):
col_query = QueryMessage(query=self._SELECT_COLUMNS + where_clause, consistency_level=cl)
triggers_query = QueryMessage(query=self._SELECT_TRIGGERS + where_clause, consistency_level=cl)
(cf_success, cf_result), (col_success, col_result), (triggers_success, triggers_result) \
= connection.wait_for_responses(cf_query, col_query, triggers_query, fail_on_error=False)
= connection.wait_for_responses(cf_query, col_query, triggers_query, timeout=self._timeout, fail_on_error=False)
log.debug("[control connection] Fetched table info for %s.%s, rebuilding metadata", keyspace, table)
cf_result = _handle_results(cf_success, cf_result)
@@ -1957,7 +1979,7 @@ class ControlConnection(object):
QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl)
]
responses = connection.wait_for_responses(*queries, fail_on_error=False)
responses = connection.wait_for_responses(*queries, timeout=self._timeout, fail_on_error=False)
(ks_success, ks_result), (cf_success, cf_result), \
(col_success, col_result), (types_success, types_result), \
(trigger_success, triggers_result) = responses
@@ -1985,8 +2007,8 @@ class ControlConnection(object):
log.debug("[control connection] triggers table not found")
triggers_result = {}
elif isinstance(triggers_result, Unauthorized):
log.warn("[control connection] this version of Cassandra does not allow access to schema_triggers metadata with authorization enabled (CASSANDRA-7967); "
"The driver will operate normally, but will not reflect triggers in the local metadata model, or schema strings.")
log.warning("[control connection] this version of Cassandra does not allow access to schema_triggers metadata with authorization enabled (CASSANDRA-7967); "
"The driver will operate normally, but will not reflect triggers in the local metadata model, or schema strings.")
triggers_result = {}
else:
raise triggers_result
@@ -2003,6 +2025,7 @@ class ControlConnection(object):
log.debug("[control connection] Fetched schema, rebuilding metadata")
self._cluster.metadata.rebuild_schema(ks_result, types_result, cf_result, col_result, triggers_result)
return True
def refresh_node_list_and_token_map(self, force_token_rebuild=False):
try:
@@ -2063,7 +2086,7 @@ class ControlConnection(object):
tokens = row.get("tokens")
if not tokens:
log.warn("Excluding host (%s) with no tokens in system.peers table of %s." % (addr, connection.host))
log.warning("Excluding host (%s) with no tokens in system.peers table of %s." % (addr, connection.host))
continue
found_hosts.add(addr)
@@ -2137,12 +2160,17 @@ class ControlConnection(object):
self._cluster.on_down(host, is_host_addition=False)
def _handle_schema_change(self, event):
keyspace = event['keyspace'] or None
table = event.get('table') or None
keyspace = event.get('keyspace')
table = event.get('table')
usertype = event.get('type')
self._submit(self.refresh_schema, keyspace, table, usertype)
def wait_for_schema_agreement(self, connection=None, preloaded_results=None):
def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None):
total_timeout = wait_time if wait_time is not None else self._cluster.max_schema_agreement_wait
if total_timeout <= 0:
return True
# Each schema change typically generates two schema refreshes, one
# from the response type and one from the pushed notification. Holding
# a lock is just a simple way to cut down on the number of schema queries
@@ -2167,7 +2195,6 @@ class ControlConnection(object):
start = self._time.time()
elapsed = 0
cl = ConsistencyLevel.ONE
total_timeout = self._cluster.max_schema_agreement_wait
schema_mismatches = None
while elapsed < total_timeout:
peers_query = QueryMessage(query=self._SELECT_SCHEMA_PEERS, consistency_level=cl)
@@ -2196,8 +2223,8 @@ class ControlConnection(object):
self._time.sleep(0.2)
elapsed = self._time.time() - start
log.warn("Node %s is reporting a schema disagreement: %s",
connection.host, schema_mismatches)
log.warning("Node %s is reporting a schema disagreement: %s",
connection.host, schema_mismatches)
return False
def _get_schema_mismatches(self, peers_result, local_result, local_address):

View File

@@ -513,7 +513,7 @@ class _ReplicationStrategy(object):
try:
rs_instance = rs_class(options_map)
except Exception as exc:
log.warn("Failed creating %s with options %s: %s", strategy_name, options_map, exc)
log.warning("Failed creating %s with options %s: %s", strategy_name, options_map, exc)
return None
return rs_instance

View File

@@ -183,7 +183,7 @@ class _ReconnectionHandler(object):
# call on_exception for logging purposes even if next_delay is None
if self.on_exception(exc, next_delay):
if next_delay is None:
log.warn(
log.warning(
"Will not continue to retry reconnection attempts "
"due to an exhausted retry schedule")
else:

View File

@@ -103,11 +103,12 @@ def named_tuple_factory(colnames, rows):
try:
Row = namedtuple('Row', clean_column_names)
except Exception:
log.warn("Failed creating named tuple for results with column names %s (cleaned: %s) (see Python 'namedtuple' documentation for details on name rules). "
"Results will be returned with positional names. "
"Avoid this by choosing different names, using SELECT \"<col name>\" AS aliases, "
"or specifying a different row_factory on your Session" %
(colnames, clean_column_names))
log.warning("Failed creating named tuple for results with column names %s (cleaned: %s) "
"(see Python 'namedtuple' documentation for details on name rules). "
"Results will be returned with positional names. "
"Avoid this by choosing different names, using SELECT \"<col name>\" AS aliases, "
"or specifying a different row_factory on your Session" %
(colnames, clean_column_names))
Row = namedtuple('Row', clean_column_names, rename=True)
return [Row(*row) for row in rows]
@@ -190,7 +191,7 @@ class Statement(object):
:class:`~.TokenAwarePolicy` is configured for
:attr:`.Cluster.load_balancing_policy`
It is set implicitly on :class:`.BoundStatement`, and :class:`.BatchStatement`,
It is set implicitly on :class:`.BoundStatement`, and :class:`.BatchStatement`,
but must be set explicitly on :class:`.SimpleStatement`.
.. versionadded:: 2.1.3
@@ -326,7 +327,7 @@ class PreparedStatement(object):
column_metadata = None
query_id = None
query_string = None
keyspace = None # change to prepared_keyspace in major release
keyspace = None # change to prepared_keyspace in major release
routing_key_indexes = None

View File

@@ -546,6 +546,9 @@ CREATE TABLE export_udts.users (
def test_legacy_tables(self):
if get_server_versions()[0] < (2, 1, 0):
raise unittest.SkipTest('Test schema output assumes 2.1.0+ options')
cli_script = """CREATE KEYSPACE legacy
WITH placement_strategy = 'SimpleStrategy'
AND strategy_options = {replication_factor:1};