From f92d6e4e8d46454ea51f59a20484f59ad0d54a4d Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 17 Aug 2015 15:17:07 -0500 Subject: [PATCH] Contain and log schema metadata parsing exceptions. PYTHON-370 Allows the driver to connect even if there is unexpected/inconsistent schema metadata. --- cassandra/metadata.py | 337 +++++++++++--------- tests/integration/standard/test_metadata.py | 92 ++++++ 2 files changed, 279 insertions(+), 150 deletions(-) diff --git a/cassandra/metadata.py b/cassandra/metadata.py index c893857b..1968b1a4 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -21,6 +21,7 @@ import logging import re import six from six.moves import zip +import sys from threading import RLock murmur3 = None @@ -150,23 +151,27 @@ class Metadata(object): current_keyspaces = set() for row in ks_results: keyspace_meta = self._build_keyspace_metadata(row) - keyspace_col_rows = col_def_rows.get(keyspace_meta.name, {}) - keyspace_trigger_rows = trigger_rows.get(keyspace_meta.name, {}) - for table_row in cf_def_rows.get(keyspace_meta.name, []): - table_meta = self._build_table_metadata(keyspace_meta, table_row, keyspace_col_rows, keyspace_trigger_rows) - keyspace_meta._add_table_metadata(table_meta) + try: + keyspace_col_rows = col_def_rows.get(keyspace_meta.name, {}) + keyspace_trigger_rows = trigger_rows.get(keyspace_meta.name, {}) + for table_row in cf_def_rows.get(keyspace_meta.name, []): + table_meta = self._build_table_metadata(keyspace_meta, table_row, keyspace_col_rows, keyspace_trigger_rows) + keyspace_meta._add_table_metadata(table_meta) - for usertype_row in usertype_rows.get(keyspace_meta.name, []): - usertype = self._build_usertype(keyspace_meta.name, usertype_row) - keyspace_meta.user_types[usertype.name] = usertype + for usertype_row in usertype_rows.get(keyspace_meta.name, []): + usertype = self._build_usertype(keyspace_meta.name, usertype_row) + keyspace_meta.user_types[usertype.name] = usertype - for fn_row in fn_rows.get(keyspace_meta.name, []): - fn = self._build_function(keyspace_meta.name, fn_row) - keyspace_meta.functions[fn.signature] = fn + for fn_row in fn_rows.get(keyspace_meta.name, []): + fn = self._build_function(keyspace_meta.name, fn_row) + keyspace_meta.functions[fn.signature] = fn - for agg_row in agg_rows.get(keyspace_meta.name, []): - agg = self._build_aggregate(keyspace_meta.name, agg_row) - keyspace_meta.aggregates[agg.signature] = agg + for agg_row in agg_rows.get(keyspace_meta.name, []): + agg = self._build_aggregate(keyspace_meta.name, agg_row) + keyspace_meta.aggregates[agg.signature] = agg + except Exception: + log.exception("Error while parsing metadata for keyspace %s. Metadata model will be incomplete.", keyspace_meta.name) + keyspace_meta._exc_info = sys.exc_info() current_keyspaces.add(keyspace_meta.name) old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None) @@ -261,10 +266,16 @@ class Metadata(object): def _build_keyspace_metadata(self, row): name = row["keyspace_name"] - durable_writes = row["durable_writes"] - strategy_class = row["strategy_class"] - strategy_options = json.loads(row["strategy_options"]) - return KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options) + try: + durable_writes = row["durable_writes"] + strategy_class = row["strategy_class"] + strategy_options = json.loads(row["strategy_options"]) + ksm = KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options) + except Exception: + log.exception("Error while parsing metadata for keyspace %s row(%s)", name, row) + ksm = KeyspaceMetadata(name, False, 'UNKNOWN', {}) + ksm._exc_info = sys.exc_info() + return ksm def _build_usertype(self, keyspace, usertype_row): type_classes = list(map(types.lookup_casstype, usertype_row['field_types'])) @@ -296,143 +307,147 @@ class Metadata(object): log.warning("Building table metadata with no column meta for %s.%s", keyspace_metadata.name, cfname) - comparator = types.lookup_casstype(row["comparator"]) - - if issubclass(comparator, types.CompositeType): - column_name_types = comparator.subtypes - is_composite_comparator = True - else: - column_name_types = (comparator,) - is_composite_comparator = False - - num_column_name_components = len(column_name_types) - last_col = column_name_types[-1] - - column_aliases = row.get("column_aliases", None) - - clustering_rows = [r for r in cf_col_rows - if r.get('type', None) == "clustering_key"] - if len(clustering_rows) > 1: - clustering_rows = sorted(clustering_rows, key=lambda row: row.get('component_index')) - - if column_aliases is not None: - column_aliases = json.loads(column_aliases) - else: - column_aliases = [r.get('column_name') for r in clustering_rows] - - if is_composite_comparator: - if issubclass(last_col, types.ColumnToCollectionType): - # collections - is_compact = False - has_value = False - clustering_size = num_column_name_components - 2 - elif (len(column_aliases) == num_column_name_components - 1 - and issubclass(last_col, types.UTF8Type)): - # aliases? - is_compact = False - has_value = False - clustering_size = num_column_name_components - 1 - else: - # compact table - is_compact = True - has_value = column_aliases or not cf_col_rows - clustering_size = num_column_name_components - - # Some thrift tables define names in composite types (see PYTHON-192) - if not column_aliases and hasattr(comparator, 'fieldnames'): - column_aliases = comparator.fieldnames - else: - is_compact = True - if column_aliases or not cf_col_rows: - has_value = True - clustering_size = num_column_name_components - else: - has_value = False - clustering_size = 0 - table_meta = TableMetadata(keyspace_metadata, cfname) - table_meta.comparator = comparator - # partition key - partition_rows = [r for r in cf_col_rows - if r.get('type', None) == "partition_key"] + try: + comparator = types.lookup_casstype(row["comparator"]) + table_meta.comparator = comparator - if len(partition_rows) > 1: - partition_rows = sorted(partition_rows, key=lambda row: row.get('component_index')) - - key_aliases = row.get("key_aliases") - if key_aliases is not None: - key_aliases = json.loads(key_aliases) if key_aliases else [] - else: - # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it. - key_aliases = [r.get('column_name') for r in partition_rows] - - key_validator = row.get("key_validator") - if key_validator is not None: - key_type = types.lookup_casstype(key_validator) - key_types = key_type.subtypes if issubclass(key_type, types.CompositeType) else [key_type] - else: - key_types = [types.lookup_casstype(r.get('validator')) for r in partition_rows] - - for i, col_type in enumerate(key_types): - if len(key_aliases) > i: - column_name = key_aliases[i] - elif i == 0: - column_name = "key" + if issubclass(comparator, types.CompositeType): + column_name_types = comparator.subtypes + is_composite_comparator = True else: - column_name = "key%d" % i + column_name_types = (comparator,) + is_composite_comparator = False - col = ColumnMetadata(table_meta, column_name, col_type) - table_meta.columns[column_name] = col - table_meta.partition_key.append(col) + num_column_name_components = len(column_name_types) + last_col = column_name_types[-1] - # clustering key - for i in range(clustering_size): - if len(column_aliases) > i: - column_name = column_aliases[i] + column_aliases = row.get("column_aliases", None) + + clustering_rows = [r for r in cf_col_rows + if r.get('type', None) == "clustering_key"] + if len(clustering_rows) > 1: + clustering_rows = sorted(clustering_rows, key=lambda row: row.get('component_index')) + + if column_aliases is not None: + column_aliases = json.loads(column_aliases) else: - column_name = "column%d" % i + column_aliases = [r.get('column_name') for r in clustering_rows] - col = ColumnMetadata(table_meta, column_name, column_name_types[i]) - table_meta.columns[column_name] = col - table_meta.clustering_key.append(col) + if is_composite_comparator: + if issubclass(last_col, types.ColumnToCollectionType): + # collections + is_compact = False + has_value = False + clustering_size = num_column_name_components - 2 + elif (len(column_aliases) == num_column_name_components - 1 + and issubclass(last_col, types.UTF8Type)): + # aliases? + is_compact = False + has_value = False + clustering_size = num_column_name_components - 1 + else: + # compact table + is_compact = True + has_value = column_aliases or not cf_col_rows + clustering_size = num_column_name_components - # value alias (if present) - if has_value: - value_alias_rows = [r for r in cf_col_rows - if r.get('type', None) == "compact_value"] - - if not key_aliases: # TODO are we checking the right thing here? - value_alias = "value" + # Some thrift tables define names in composite types (see PYTHON-192) + if not column_aliases and hasattr(comparator, 'fieldnames'): + column_aliases = comparator.fieldnames else: - value_alias = row.get("value_alias", None) - if value_alias is None and value_alias_rows: # CASSANDRA-8487 - # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it. - value_alias = value_alias_rows[0].get('column_name') + is_compact = True + if column_aliases or not cf_col_rows: + has_value = True + clustering_size = num_column_name_components + else: + has_value = False + clustering_size = 0 - default_validator = row.get("default_validator") - if default_validator: - validator = types.lookup_casstype(default_validator) + # partition key + partition_rows = [r for r in cf_col_rows + if r.get('type', None) == "partition_key"] + + if len(partition_rows) > 1: + partition_rows = sorted(partition_rows, key=lambda row: row.get('component_index')) + + key_aliases = row.get("key_aliases") + if key_aliases is not None: + key_aliases = json.loads(key_aliases) if key_aliases else [] else: - if value_alias_rows: # CASSANDRA-8487 - validator = types.lookup_casstype(value_alias_rows[0].get('validator')) + # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it. + key_aliases = [r.get('column_name') for r in partition_rows] - col = ColumnMetadata(table_meta, value_alias, validator) - if value_alias: # CASSANDRA-8487 - table_meta.columns[value_alias] = col + key_validator = row.get("key_validator") + if key_validator is not None: + key_type = types.lookup_casstype(key_validator) + key_types = key_type.subtypes if issubclass(key_type, types.CompositeType) else [key_type] + else: + key_types = [types.lookup_casstype(r.get('validator')) for r in partition_rows] - # other normal columns - for col_row in cf_col_rows: - column_meta = self._build_column_metadata(table_meta, col_row) - table_meta.columns[column_meta.name] = column_meta + for i, col_type in enumerate(key_types): + if len(key_aliases) > i: + column_name = key_aliases[i] + elif i == 0: + column_name = "key" + else: + column_name = "key%d" % i - if trigger_rows: - for trigger_row in trigger_rows[cfname]: - trigger_meta = self._build_trigger_metadata(table_meta, trigger_row) - table_meta.triggers[trigger_meta.name] = trigger_meta + col = ColumnMetadata(table_meta, column_name, col_type) + table_meta.columns[column_name] = col + table_meta.partition_key.append(col) - table_meta.options = self._build_table_options(row) - table_meta.is_compact_storage = is_compact + # clustering key + for i in range(clustering_size): + if len(column_aliases) > i: + column_name = column_aliases[i] + else: + column_name = "column%d" % i + + col = ColumnMetadata(table_meta, column_name, column_name_types[i]) + table_meta.columns[column_name] = col + table_meta.clustering_key.append(col) + + # value alias (if present) + if has_value: + value_alias_rows = [r for r in cf_col_rows + if r.get('type', None) == "compact_value"] + + if not key_aliases: # TODO are we checking the right thing here? + value_alias = "value" + else: + value_alias = row.get("value_alias", None) + if value_alias is None and value_alias_rows: # CASSANDRA-8487 + # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it. + value_alias = value_alias_rows[0].get('column_name') + + default_validator = row.get("default_validator") + if default_validator: + validator = types.lookup_casstype(default_validator) + else: + if value_alias_rows: # CASSANDRA-8487 + validator = types.lookup_casstype(value_alias_rows[0].get('validator')) + + col = ColumnMetadata(table_meta, value_alias, validator) + if value_alias: # CASSANDRA-8487 + table_meta.columns[value_alias] = col + + # other normal columns + for col_row in cf_col_rows: + column_meta = self._build_column_metadata(table_meta, col_row) + table_meta.columns[column_meta.name] = column_meta + + if trigger_rows: + for trigger_row in trigger_rows[cfname]: + trigger_meta = self._build_trigger_metadata(table_meta, trigger_row) + table_meta.triggers[trigger_meta.name] = trigger_meta + + table_meta.options = self._build_table_options(row) + table_meta.is_compact_storage = is_compact + except Exception: + table_meta._exc_info = sys.exc_info() + log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_metadata.name, cfname, row, cf_col_rows) return table_meta @@ -834,6 +849,10 @@ class KeyspaceMetadata(object): .. versionadded:: 2.6.0 """ + + _exc_info = None + """ set if metadata parsing failed """ + def __init__(self, name, durable_writes, strategy_class, strategy_options): self.name = name self.durable_writes = durable_writes @@ -849,11 +868,20 @@ class KeyspaceMetadata(object): Returns a CQL query string that can be used to recreate the entire keyspace, including user-defined types and tables. """ - return "\n\n".join([self.as_cql_query()] - + self.user_type_strings() - + [f.as_cql_query(True) for f in self.functions.values()] - + [a.as_cql_query(True) for a in self.aggregates.values()] - + [t.export_as_string() for t in self.tables.values()]) + cql = "\n\n".join([self.as_cql_query()] + + self.user_type_strings() + + [f.as_cql_query(True) for f in self.functions.values()] + + [a.as_cql_query(True) for a in self.aggregates.values()] + + [t.export_as_string() for t in self.tables.values()]) + if self._exc_info: + import traceback + ret = "/*\nWarning: Keyspace %s is incomplete because of an error processing metadata.\n" % \ + (self.name) + for line in traceback.format_exception(*self._exc_info): + ret += line + ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % cql + return ret + return cql def as_cql_query(self): """ @@ -1221,6 +1249,9 @@ class TableMetadata(object): A dict mapping trigger names to :class:`.TriggerMetadata` instances. """ + _exc_info = None + """ set if metadata parsing failed """ + @property def is_cql_compatible(self): """ @@ -1244,7 +1275,7 @@ class TableMetadata(object): self.clustering_key = [] if clustering_key is None else clustering_key self.columns = OrderedDict() if columns is None else columns self.indexes = {} - self.options = options + self.options = {} if options is None else options self.comparator = None self.triggers = OrderedDict() if triggers is None else triggers @@ -1254,14 +1285,20 @@ class TableMetadata(object): along with all indexes on it. The returned string is formatted to be human readable. """ - if self.is_cql_compatible: - ret = self.all_as_cql() - else: + if self._exc_info: + import traceback + ret = "/*\nWarning: Table %s.%s is incomplete because of an error processing metadata.\n" % \ + (self.keyspace.name, self.name) + for line in traceback.format_exception(*self._exc_info): + ret += line + ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self.all_as_cql() + elif not self.is_cql_compatible: # If we can't produce this table with CQL, comment inline ret = "/*\nWarning: Table %s.%s omitted because it has constructs not compatible with CQL (was created via legacy API).\n" % \ (self.keyspace.name, self.name) - ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s" % self.all_as_cql() - ret += "\n*/" + ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self.all_as_cql() + else: + ret = self.all_as_cql() return ret diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index 5dc3181e..bc54f137 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -1754,3 +1754,95 @@ class AggregateMetadata(FunctionTest): final_func_idx = cql.find('FINALFUNC "%s"' % kwargs['final_func']) self.assertNotIn(-1, (init_cond_idx, final_func_idx)) self.assertGreater(init_cond_idx, final_func_idx) + + +class BadMetaTest(unittest.TestCase): + """ + Test behavior when metadata has unexpected form + Verify that new cluster/session can still connect, and the CQL output indicates the exception with a warning. + PYTHON-370 + """ + + @property + def function_name(self): + return self._testMethodName.lower() + + @classmethod + def setup_class(cls): + cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + cls.keyspace_name = cls.__name__.lower() + cls.session = cls.cluster.connect() + cls.session.execute("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) + cls.session.set_keyspace(cls.keyspace_name) + + @classmethod + def teardown_class(cls): + cls.session.execute("DROP KEYSPACE IF EXISTS %s" % cls.keyspace_name) + cls.cluster.shutdown() + + def _run_on_all_nodes(self, query, params): + # used to update schema data on all nodes in the cluster + for _ in self.cluster.metadata.all_hosts(): + self.session.execute(query, params) + + def test_keyspace_bad_options(self): + strategy_options = self.session.execute('SELECT strategy_options FROM system.schema_keyspaces WHERE keyspace_name=%s', (self.keyspace_name,))[0].strategy_options + where_cls = " WHERE keyspace_name='%s'" % (self.keyspace_name,) + try: + self._run_on_all_nodes('UPDATE system.schema_keyspaces SET strategy_options=%s' + where_cls, ('some bad json',)) + c = Cluster(protocol_version=PROTOCOL_VERSION) + c.connect() + meta = c.metadata.keyspaces[self.keyspace_name] + self.assertIsNotNone(meta._exc_info) + self.assertIn("/*\nWarning:", meta.export_as_string()) + c.shutdown() + finally: + self._run_on_all_nodes('UPDATE system.schema_keyspaces SET strategy_options=%s' + where_cls, (strategy_options,)) + + def test_keyspace_bad_index(self): + self.session.execute('CREATE TABLE %s (k int PRIMARY KEY, v int)' % self.function_name) + self.session.execute('CREATE INDEX ON %s(v)' % self.function_name) + where_cls = " WHERE keyspace_name='%s' AND columnfamily_name='%s' AND column_name='v'" \ + % (self.keyspace_name, self.function_name) + index_options = self.session.execute('SELECT index_options FROM system.schema_columns' + where_cls)[0].index_options + try: + self._run_on_all_nodes('UPDATE system.schema_columns SET index_options=%s' + where_cls, ('some bad json',)) + c = Cluster(protocol_version=PROTOCOL_VERSION) + c.connect() + meta = c.metadata.keyspaces[self.keyspace_name].tables[self.function_name] + self.assertIsNotNone(meta._exc_info) + self.assertIn("/*\nWarning:", meta.export_as_string()) + c.shutdown() + finally: + self._run_on_all_nodes('UPDATE system.schema_columns SET index_options=%s' + where_cls, (index_options,)) + + def test_table_bad_comparator(self): + self.session.execute('CREATE TABLE %s (k int PRIMARY KEY, v int)' % self.function_name) + where_cls = " WHERE keyspace_name='%s' AND columnfamily_name='%s'" % (self.keyspace_name, self.function_name) + comparator = self.session.execute('SELECT comparator FROM system.schema_columnfamilies' + where_cls)[0].comparator + try: + self._run_on_all_nodes('UPDATE system.schema_columnfamilies SET comparator=%s' + where_cls, ('DynamicCompositeType()',)) + c = Cluster(protocol_version=PROTOCOL_VERSION) + c.connect() + meta = c.metadata.keyspaces[self.keyspace_name].tables[self.function_name] + self.assertIsNotNone(meta._exc_info) + self.assertIn("/*\nWarning:", meta.export_as_string()) + c.shutdown() + finally: + self._run_on_all_nodes('UPDATE system.schema_columnfamilies SET comparator=%s' + where_cls, (comparator,)) + + @unittest.skipUnless(PROTOCOL_VERSION >= 3, "Requires protocol version 3+") + def test_user_type_bad_typename(self): + self.session.execute('CREATE TYPE %s (i int, d double)' % self.function_name) + where_cls = " WHERE keyspace_name='%s' AND type_name='%s'" % (self.keyspace_name, self.function_name) + field_types = self.session.execute('SELECT field_types FROM system.schema_usertypes' + where_cls)[0].field_types + try: + self._run_on_all_nodes('UPDATE system.schema_usertypes SET field_types[0]=%s' + where_cls, ('Tr@inWr3ck##)))',)) + c = Cluster(protocol_version=PROTOCOL_VERSION) + c.connect() + meta = c.metadata.keyspaces[self.keyspace_name] + self.assertIsNotNone(meta._exc_info) + self.assertIn("/*\nWarning:", meta.export_as_string()) + c.shutdown() + finally: + self._run_on_all_nodes('UPDATE system.schema_usertypes SET field_types=%s' + where_cls, (field_types,))