Contain and log schema metadata parsing exceptions.

PYTHON-370
Allows the driver to connect even if there is unexpected/inconsistent
schema metadata.
This commit is contained in:
Adam Holmberg
2015-08-17 15:17:07 -05:00
parent a9fb027864
commit f92d6e4e8d
2 changed files with 279 additions and 150 deletions

View File

@@ -21,6 +21,7 @@ import logging
import re
import six
from six.moves import zip
import sys
from threading import RLock
murmur3 = None
@@ -150,23 +151,27 @@ class Metadata(object):
current_keyspaces = set()
for row in ks_results:
keyspace_meta = self._build_keyspace_metadata(row)
keyspace_col_rows = col_def_rows.get(keyspace_meta.name, {})
keyspace_trigger_rows = trigger_rows.get(keyspace_meta.name, {})
for table_row in cf_def_rows.get(keyspace_meta.name, []):
table_meta = self._build_table_metadata(keyspace_meta, table_row, keyspace_col_rows, keyspace_trigger_rows)
keyspace_meta._add_table_metadata(table_meta)
try:
keyspace_col_rows = col_def_rows.get(keyspace_meta.name, {})
keyspace_trigger_rows = trigger_rows.get(keyspace_meta.name, {})
for table_row in cf_def_rows.get(keyspace_meta.name, []):
table_meta = self._build_table_metadata(keyspace_meta, table_row, keyspace_col_rows, keyspace_trigger_rows)
keyspace_meta._add_table_metadata(table_meta)
for usertype_row in usertype_rows.get(keyspace_meta.name, []):
usertype = self._build_usertype(keyspace_meta.name, usertype_row)
keyspace_meta.user_types[usertype.name] = usertype
for usertype_row in usertype_rows.get(keyspace_meta.name, []):
usertype = self._build_usertype(keyspace_meta.name, usertype_row)
keyspace_meta.user_types[usertype.name] = usertype
for fn_row in fn_rows.get(keyspace_meta.name, []):
fn = self._build_function(keyspace_meta.name, fn_row)
keyspace_meta.functions[fn.signature] = fn
for fn_row in fn_rows.get(keyspace_meta.name, []):
fn = self._build_function(keyspace_meta.name, fn_row)
keyspace_meta.functions[fn.signature] = fn
for agg_row in agg_rows.get(keyspace_meta.name, []):
agg = self._build_aggregate(keyspace_meta.name, agg_row)
keyspace_meta.aggregates[agg.signature] = agg
for agg_row in agg_rows.get(keyspace_meta.name, []):
agg = self._build_aggregate(keyspace_meta.name, agg_row)
keyspace_meta.aggregates[agg.signature] = agg
except Exception:
log.exception("Error while parsing metadata for keyspace %s. Metadata model will be incomplete.", keyspace_meta.name)
keyspace_meta._exc_info = sys.exc_info()
current_keyspaces.add(keyspace_meta.name)
old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None)
@@ -261,10 +266,16 @@ class Metadata(object):
def _build_keyspace_metadata(self, row):
name = row["keyspace_name"]
durable_writes = row["durable_writes"]
strategy_class = row["strategy_class"]
strategy_options = json.loads(row["strategy_options"])
return KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options)
try:
durable_writes = row["durable_writes"]
strategy_class = row["strategy_class"]
strategy_options = json.loads(row["strategy_options"])
ksm = KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options)
except Exception:
log.exception("Error while parsing metadata for keyspace %s row(%s)", name, row)
ksm = KeyspaceMetadata(name, False, 'UNKNOWN', {})
ksm._exc_info = sys.exc_info()
return ksm
def _build_usertype(self, keyspace, usertype_row):
type_classes = list(map(types.lookup_casstype, usertype_row['field_types']))
@@ -296,143 +307,147 @@ class Metadata(object):
log.warning("Building table metadata with no column meta for %s.%s",
keyspace_metadata.name, cfname)
comparator = types.lookup_casstype(row["comparator"])
if issubclass(comparator, types.CompositeType):
column_name_types = comparator.subtypes
is_composite_comparator = True
else:
column_name_types = (comparator,)
is_composite_comparator = False
num_column_name_components = len(column_name_types)
last_col = column_name_types[-1]
column_aliases = row.get("column_aliases", None)
clustering_rows = [r for r in cf_col_rows
if r.get('type', None) == "clustering_key"]
if len(clustering_rows) > 1:
clustering_rows = sorted(clustering_rows, key=lambda row: row.get('component_index'))
if column_aliases is not None:
column_aliases = json.loads(column_aliases)
else:
column_aliases = [r.get('column_name') for r in clustering_rows]
if is_composite_comparator:
if issubclass(last_col, types.ColumnToCollectionType):
# collections
is_compact = False
has_value = False
clustering_size = num_column_name_components - 2
elif (len(column_aliases) == num_column_name_components - 1
and issubclass(last_col, types.UTF8Type)):
# aliases?
is_compact = False
has_value = False
clustering_size = num_column_name_components - 1
else:
# compact table
is_compact = True
has_value = column_aliases or not cf_col_rows
clustering_size = num_column_name_components
# Some thrift tables define names in composite types (see PYTHON-192)
if not column_aliases and hasattr(comparator, 'fieldnames'):
column_aliases = comparator.fieldnames
else:
is_compact = True
if column_aliases or not cf_col_rows:
has_value = True
clustering_size = num_column_name_components
else:
has_value = False
clustering_size = 0
table_meta = TableMetadata(keyspace_metadata, cfname)
table_meta.comparator = comparator
# partition key
partition_rows = [r for r in cf_col_rows
if r.get('type', None) == "partition_key"]
try:
comparator = types.lookup_casstype(row["comparator"])
table_meta.comparator = comparator
if len(partition_rows) > 1:
partition_rows = sorted(partition_rows, key=lambda row: row.get('component_index'))
key_aliases = row.get("key_aliases")
if key_aliases is not None:
key_aliases = json.loads(key_aliases) if key_aliases else []
else:
# In 2.0+, we can use the 'type' column. In 3.0+, we have to use it.
key_aliases = [r.get('column_name') for r in partition_rows]
key_validator = row.get("key_validator")
if key_validator is not None:
key_type = types.lookup_casstype(key_validator)
key_types = key_type.subtypes if issubclass(key_type, types.CompositeType) else [key_type]
else:
key_types = [types.lookup_casstype(r.get('validator')) for r in partition_rows]
for i, col_type in enumerate(key_types):
if len(key_aliases) > i:
column_name = key_aliases[i]
elif i == 0:
column_name = "key"
if issubclass(comparator, types.CompositeType):
column_name_types = comparator.subtypes
is_composite_comparator = True
else:
column_name = "key%d" % i
column_name_types = (comparator,)
is_composite_comparator = False
col = ColumnMetadata(table_meta, column_name, col_type)
table_meta.columns[column_name] = col
table_meta.partition_key.append(col)
num_column_name_components = len(column_name_types)
last_col = column_name_types[-1]
# clustering key
for i in range(clustering_size):
if len(column_aliases) > i:
column_name = column_aliases[i]
column_aliases = row.get("column_aliases", None)
clustering_rows = [r for r in cf_col_rows
if r.get('type', None) == "clustering_key"]
if len(clustering_rows) > 1:
clustering_rows = sorted(clustering_rows, key=lambda row: row.get('component_index'))
if column_aliases is not None:
column_aliases = json.loads(column_aliases)
else:
column_name = "column%d" % i
column_aliases = [r.get('column_name') for r in clustering_rows]
col = ColumnMetadata(table_meta, column_name, column_name_types[i])
table_meta.columns[column_name] = col
table_meta.clustering_key.append(col)
if is_composite_comparator:
if issubclass(last_col, types.ColumnToCollectionType):
# collections
is_compact = False
has_value = False
clustering_size = num_column_name_components - 2
elif (len(column_aliases) == num_column_name_components - 1
and issubclass(last_col, types.UTF8Type)):
# aliases?
is_compact = False
has_value = False
clustering_size = num_column_name_components - 1
else:
# compact table
is_compact = True
has_value = column_aliases or not cf_col_rows
clustering_size = num_column_name_components
# value alias (if present)
if has_value:
value_alias_rows = [r for r in cf_col_rows
if r.get('type', None) == "compact_value"]
if not key_aliases: # TODO are we checking the right thing here?
value_alias = "value"
# Some thrift tables define names in composite types (see PYTHON-192)
if not column_aliases and hasattr(comparator, 'fieldnames'):
column_aliases = comparator.fieldnames
else:
value_alias = row.get("value_alias", None)
if value_alias is None and value_alias_rows: # CASSANDRA-8487
# In 2.0+, we can use the 'type' column. In 3.0+, we have to use it.
value_alias = value_alias_rows[0].get('column_name')
is_compact = True
if column_aliases or not cf_col_rows:
has_value = True
clustering_size = num_column_name_components
else:
has_value = False
clustering_size = 0
default_validator = row.get("default_validator")
if default_validator:
validator = types.lookup_casstype(default_validator)
# partition key
partition_rows = [r for r in cf_col_rows
if r.get('type', None) == "partition_key"]
if len(partition_rows) > 1:
partition_rows = sorted(partition_rows, key=lambda row: row.get('component_index'))
key_aliases = row.get("key_aliases")
if key_aliases is not None:
key_aliases = json.loads(key_aliases) if key_aliases else []
else:
if value_alias_rows: # CASSANDRA-8487
validator = types.lookup_casstype(value_alias_rows[0].get('validator'))
# In 2.0+, we can use the 'type' column. In 3.0+, we have to use it.
key_aliases = [r.get('column_name') for r in partition_rows]
col = ColumnMetadata(table_meta, value_alias, validator)
if value_alias: # CASSANDRA-8487
table_meta.columns[value_alias] = col
key_validator = row.get("key_validator")
if key_validator is not None:
key_type = types.lookup_casstype(key_validator)
key_types = key_type.subtypes if issubclass(key_type, types.CompositeType) else [key_type]
else:
key_types = [types.lookup_casstype(r.get('validator')) for r in partition_rows]
# other normal columns
for col_row in cf_col_rows:
column_meta = self._build_column_metadata(table_meta, col_row)
table_meta.columns[column_meta.name] = column_meta
for i, col_type in enumerate(key_types):
if len(key_aliases) > i:
column_name = key_aliases[i]
elif i == 0:
column_name = "key"
else:
column_name = "key%d" % i
if trigger_rows:
for trigger_row in trigger_rows[cfname]:
trigger_meta = self._build_trigger_metadata(table_meta, trigger_row)
table_meta.triggers[trigger_meta.name] = trigger_meta
col = ColumnMetadata(table_meta, column_name, col_type)
table_meta.columns[column_name] = col
table_meta.partition_key.append(col)
table_meta.options = self._build_table_options(row)
table_meta.is_compact_storage = is_compact
# clustering key
for i in range(clustering_size):
if len(column_aliases) > i:
column_name = column_aliases[i]
else:
column_name = "column%d" % i
col = ColumnMetadata(table_meta, column_name, column_name_types[i])
table_meta.columns[column_name] = col
table_meta.clustering_key.append(col)
# value alias (if present)
if has_value:
value_alias_rows = [r for r in cf_col_rows
if r.get('type', None) == "compact_value"]
if not key_aliases: # TODO are we checking the right thing here?
value_alias = "value"
else:
value_alias = row.get("value_alias", None)
if value_alias is None and value_alias_rows: # CASSANDRA-8487
# In 2.0+, we can use the 'type' column. In 3.0+, we have to use it.
value_alias = value_alias_rows[0].get('column_name')
default_validator = row.get("default_validator")
if default_validator:
validator = types.lookup_casstype(default_validator)
else:
if value_alias_rows: # CASSANDRA-8487
validator = types.lookup_casstype(value_alias_rows[0].get('validator'))
col = ColumnMetadata(table_meta, value_alias, validator)
if value_alias: # CASSANDRA-8487
table_meta.columns[value_alias] = col
# other normal columns
for col_row in cf_col_rows:
column_meta = self._build_column_metadata(table_meta, col_row)
table_meta.columns[column_meta.name] = column_meta
if trigger_rows:
for trigger_row in trigger_rows[cfname]:
trigger_meta = self._build_trigger_metadata(table_meta, trigger_row)
table_meta.triggers[trigger_meta.name] = trigger_meta
table_meta.options = self._build_table_options(row)
table_meta.is_compact_storage = is_compact
except Exception:
table_meta._exc_info = sys.exc_info()
log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_metadata.name, cfname, row, cf_col_rows)
return table_meta
@@ -834,6 +849,10 @@ class KeyspaceMetadata(object):
.. versionadded:: 2.6.0
"""
_exc_info = None
""" set if metadata parsing failed """
def __init__(self, name, durable_writes, strategy_class, strategy_options):
self.name = name
self.durable_writes = durable_writes
@@ -849,11 +868,20 @@ class KeyspaceMetadata(object):
Returns a CQL query string that can be used to recreate the entire keyspace,
including user-defined types and tables.
"""
return "\n\n".join([self.as_cql_query()]
+ self.user_type_strings()
+ [f.as_cql_query(True) for f in self.functions.values()]
+ [a.as_cql_query(True) for a in self.aggregates.values()]
+ [t.export_as_string() for t in self.tables.values()])
cql = "\n\n".join([self.as_cql_query()]
+ self.user_type_strings()
+ [f.as_cql_query(True) for f in self.functions.values()]
+ [a.as_cql_query(True) for a in self.aggregates.values()]
+ [t.export_as_string() for t in self.tables.values()])
if self._exc_info:
import traceback
ret = "/*\nWarning: Keyspace %s is incomplete because of an error processing metadata.\n" % \
(self.name)
for line in traceback.format_exception(*self._exc_info):
ret += line
ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % cql
return ret
return cql
def as_cql_query(self):
"""
@@ -1221,6 +1249,9 @@ class TableMetadata(object):
A dict mapping trigger names to :class:`.TriggerMetadata` instances.
"""
_exc_info = None
""" set if metadata parsing failed """
@property
def is_cql_compatible(self):
"""
@@ -1244,7 +1275,7 @@ class TableMetadata(object):
self.clustering_key = [] if clustering_key is None else clustering_key
self.columns = OrderedDict() if columns is None else columns
self.indexes = {}
self.options = options
self.options = {} if options is None else options
self.comparator = None
self.triggers = OrderedDict() if triggers is None else triggers
@@ -1254,14 +1285,20 @@ class TableMetadata(object):
along with all indexes on it. The returned string is formatted to
be human readable.
"""
if self.is_cql_compatible:
ret = self.all_as_cql()
else:
if self._exc_info:
import traceback
ret = "/*\nWarning: Table %s.%s is incomplete because of an error processing metadata.\n" % \
(self.keyspace.name, self.name)
for line in traceback.format_exception(*self._exc_info):
ret += line
ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self.all_as_cql()
elif not self.is_cql_compatible:
# If we can't produce this table with CQL, comment inline
ret = "/*\nWarning: Table %s.%s omitted because it has constructs not compatible with CQL (was created via legacy API).\n" % \
(self.keyspace.name, self.name)
ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s" % self.all_as_cql()
ret += "\n*/"
ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self.all_as_cql()
else:
ret = self.all_as_cql()
return ret

View File

@@ -1754,3 +1754,95 @@ class AggregateMetadata(FunctionTest):
final_func_idx = cql.find('FINALFUNC "%s"' % kwargs['final_func'])
self.assertNotIn(-1, (init_cond_idx, final_func_idx))
self.assertGreater(init_cond_idx, final_func_idx)
class BadMetaTest(unittest.TestCase):
"""
Test behavior when metadata has unexpected form
Verify that new cluster/session can still connect, and the CQL output indicates the exception with a warning.
PYTHON-370
"""
@property
def function_name(self):
return self._testMethodName.lower()
@classmethod
def setup_class(cls):
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
cls.keyspace_name = cls.__name__.lower()
cls.session = cls.cluster.connect()
cls.session.execute("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name)
cls.session.set_keyspace(cls.keyspace_name)
@classmethod
def teardown_class(cls):
cls.session.execute("DROP KEYSPACE IF EXISTS %s" % cls.keyspace_name)
cls.cluster.shutdown()
def _run_on_all_nodes(self, query, params):
# used to update schema data on all nodes in the cluster
for _ in self.cluster.metadata.all_hosts():
self.session.execute(query, params)
def test_keyspace_bad_options(self):
strategy_options = self.session.execute('SELECT strategy_options FROM system.schema_keyspaces WHERE keyspace_name=%s', (self.keyspace_name,))[0].strategy_options
where_cls = " WHERE keyspace_name='%s'" % (self.keyspace_name,)
try:
self._run_on_all_nodes('UPDATE system.schema_keyspaces SET strategy_options=%s' + where_cls, ('some bad json',))
c = Cluster(protocol_version=PROTOCOL_VERSION)
c.connect()
meta = c.metadata.keyspaces[self.keyspace_name]
self.assertIsNotNone(meta._exc_info)
self.assertIn("/*\nWarning:", meta.export_as_string())
c.shutdown()
finally:
self._run_on_all_nodes('UPDATE system.schema_keyspaces SET strategy_options=%s' + where_cls, (strategy_options,))
def test_keyspace_bad_index(self):
self.session.execute('CREATE TABLE %s (k int PRIMARY KEY, v int)' % self.function_name)
self.session.execute('CREATE INDEX ON %s(v)' % self.function_name)
where_cls = " WHERE keyspace_name='%s' AND columnfamily_name='%s' AND column_name='v'" \
% (self.keyspace_name, self.function_name)
index_options = self.session.execute('SELECT index_options FROM system.schema_columns' + where_cls)[0].index_options
try:
self._run_on_all_nodes('UPDATE system.schema_columns SET index_options=%s' + where_cls, ('some bad json',))
c = Cluster(protocol_version=PROTOCOL_VERSION)
c.connect()
meta = c.metadata.keyspaces[self.keyspace_name].tables[self.function_name]
self.assertIsNotNone(meta._exc_info)
self.assertIn("/*\nWarning:", meta.export_as_string())
c.shutdown()
finally:
self._run_on_all_nodes('UPDATE system.schema_columns SET index_options=%s' + where_cls, (index_options,))
def test_table_bad_comparator(self):
self.session.execute('CREATE TABLE %s (k int PRIMARY KEY, v int)' % self.function_name)
where_cls = " WHERE keyspace_name='%s' AND columnfamily_name='%s'" % (self.keyspace_name, self.function_name)
comparator = self.session.execute('SELECT comparator FROM system.schema_columnfamilies' + where_cls)[0].comparator
try:
self._run_on_all_nodes('UPDATE system.schema_columnfamilies SET comparator=%s' + where_cls, ('DynamicCompositeType()',))
c = Cluster(protocol_version=PROTOCOL_VERSION)
c.connect()
meta = c.metadata.keyspaces[self.keyspace_name].tables[self.function_name]
self.assertIsNotNone(meta._exc_info)
self.assertIn("/*\nWarning:", meta.export_as_string())
c.shutdown()
finally:
self._run_on_all_nodes('UPDATE system.schema_columnfamilies SET comparator=%s' + where_cls, (comparator,))
@unittest.skipUnless(PROTOCOL_VERSION >= 3, "Requires protocol version 3+")
def test_user_type_bad_typename(self):
self.session.execute('CREATE TYPE %s (i int, d double)' % self.function_name)
where_cls = " WHERE keyspace_name='%s' AND type_name='%s'" % (self.keyspace_name, self.function_name)
field_types = self.session.execute('SELECT field_types FROM system.schema_usertypes' + where_cls)[0].field_types
try:
self._run_on_all_nodes('UPDATE system.schema_usertypes SET field_types[0]=%s' + where_cls, ('Tr@inWr3ck##)))',))
c = Cluster(protocol_version=PROTOCOL_VERSION)
c.connect()
meta = c.metadata.keyspaces[self.keyspace_name]
self.assertIsNotNone(meta._exc_info)
self.assertIn("/*\nWarning:", meta.export_as_string())
c.shutdown()
finally:
self._run_on_all_nodes('UPDATE system.schema_usertypes SET field_types=%s' + where_cls, (field_types,))