diff --git a/cassandra/metadata.py b/cassandra/metadata.py index ba9cbf03..ab9573dd 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -1076,6 +1076,20 @@ class TableMetadata(object): return not incompatible return True + extensions = None + """ + Metadata describing configuration for table extensions + """ + + _extension_registry = {} + + class _RegisteredExtensionType(type): + def __new__(mcs, name, bases, dct): + cls = super(TableMetadata._RegisteredExtensionType, mcs).__new__(mcs, name, bases, dct) + if name != 'RegisteredTableExtension': + TableMetadata._extension_registry[cls.name] = cls + return cls + def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None): self.keyspace_name = keyspace_name self.name = name @@ -1124,6 +1138,13 @@ class TableMetadata(object): for view_meta in self.views.values(): ret += "\n\n%s;" % (view_meta.as_cql_query(formatted=True),) + if self.extensions: # None + for k in six.viewkeys(self._extension_registry) & self.extensions: # no viewkeys on OrderedMapSerializeKey + ext = self._extension_registry[k] + cql = ext.after_table_cql(self, k, self.extensions[k]) + if cql: + ret += "\n\n%s" % (cql,) + return ret def as_cql_query(self, formatted=False): @@ -1205,7 +1226,6 @@ class TableMetadata(object): actual_options.setdefault("class", value) compaction_option_strings = ["'%s': '%s'" % (k, v) for k, v in actual_options.items()] - ret.append('compaction = {%s}' % ', '.join(compaction_option_strings)) for system_table_name in cls.compaction_options.keys(): options_copy.pop(system_table_name, None) # delete if present @@ -1225,6 +1245,31 @@ class TableMetadata(object): return list(sorted(ret)) +class TableExtensionInterface(object): + """ + Defines CQL/DDL for Cassandra table extensions. + """ + # limited API for now. Could be expanded as new extension types materialize -- "extend_option_strings", for example + @classmethod + def after_table_cql(cls, ext_key, ext_blob): + """ + Called to produce CQL/DDL to follow the table definition. + Should contain requisite terminating semicolon(s). + """ + pass + + +@six.add_metaclass(TableMetadata._RegisteredExtensionType) +class RegisteredTableExtension(TableExtensionInterface): + """ + Extending this class registers it by name (associated by key in the `system_schema.tables.extensions` map). + """ + name = None + """ + Name of the extension (key in the map) + """ + + def protect_name(name): return maybe_escape_name(name) @@ -2222,6 +2267,8 @@ class SchemaParserV3(SchemaParserV22): index_meta = self._build_index_metadata(table_meta, index_row) if index_meta: table_meta.indexes[index_meta.name] = index_meta + + table_meta.extensions = row.get('extensions', {}) except Exception: table_meta._exc_info = sys.exc_info() log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, table_name, row, col_rows) diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index 40db8ac8..f5798593 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -29,13 +29,13 @@ from cassandra.cluster import Cluster from cassandra.encoder import Encoder from cassandra.metadata import (Metadata, KeyspaceMetadata, IndexMetadata, Token, MD5Token, TokenMap, murmur3, Function, Aggregate, protect_name, protect_names, - get_schema_parser) + get_schema_parser, RegisteredTableExtension) from cassandra.policies import SimpleConvictionPolicy from cassandra.pool import Host from tests.integration import get_cluster, use_singledc, PROTOCOL_VERSION, get_server_versions, execute_until_pass, \ BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase, BasicExistingKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster, CASSANDRA_VERSION, \ - BasicExistingSegregatedKeyspaceUnitTestCase, dseonly, DSE_VERSION, get_supported_protocol_versions + BasicExistingSegregatedKeyspaceUnitTestCase, dseonly, DSE_VERSION, get_supported_protocol_versions, greaterthanorequalcass30 def setup_module(): @@ -858,6 +858,73 @@ class SchemaMetadataTests(BasicSegregatedKeyspaceUnitTestCase): self.assertEqual(index_2.index_options["target"], "keys(b)") self.assertEqual(index_2.keyspace_name, "schemametadatatests") + @greaterthanorequalcass30 + def test_table_extensions(self): + s = self.session + ks = self.keyspace_name + ks_meta = s.cluster.metadata.keyspaces[ks] + t = self.function_table_name + + s.execute("CREATE TABLE %s.%s (k text PRIMARY KEY, v int)" % (ks, t)) + + table_meta = ks_meta.tables[t] + + self.assertFalse(table_meta.extensions) + self.assertNotIn(t, table_meta._extension_registry) + + original_cql = table_meta.export_as_string() + + # extensions registered, not present + # -------------------------------------- + class Ext0(RegisteredTableExtension): + name = t + + @classmethod + def after_table_cql(cls, table_meta, ext_key, ext_blob): + return "%s %s %s %s" % (cls.name, table_meta.name, ext_key, ext_blob) + + class Ext1(Ext0): + name = t + '##' + + self.assertFalse(table_meta.extensions) + self.assertIn(Ext0.name, table_meta._extension_registry) + self.assertIn(Ext1.name, table_meta._extension_registry) + self.assertEqual(len(table_meta._extension_registry), 2) + + self.cluster.refresh_table_metadata(ks, t) + table_meta = ks_meta.tables[t] + + self.assertEqual(table_meta.export_as_string(), original_cql) + + p = s.prepare('UPDATE system_schema.tables SET extensions=? WHERE keyspace_name=? AND table_name=?') # for blob type coercing + # extensions registered, one present + # -------------------------------------- + ext_map = {Ext0.name: six.b("THA VALUE")} + s.execute(p, (ext_map, ks, t)) + self.cluster.refresh_table_metadata(ks, t) + table_meta = ks_meta.tables[t] + + self.assertIn(Ext0.name, table_meta.extensions) + new_cql = table_meta.export_as_string() + self.assertNotEqual(new_cql, original_cql) + self.assertIn(Ext0.after_table_cql(table_meta, Ext0.name, ext_map[Ext0.name]), new_cql) + self.assertNotIn(Ext1.name, new_cql) + + # extensions registered, one present + # -------------------------------------- + ext_map = {Ext0.name: six.b("THA VALUE"), + Ext1.name: six.b("OTHA VALUE")} + s.execute(p, (ext_map, ks, t)) + self.cluster.refresh_table_metadata(ks, t) + table_meta = ks_meta.tables[t] + + self.assertIn(Ext0.name, table_meta.extensions) + self.assertIn(Ext1.name, table_meta.extensions) + new_cql = table_meta.export_as_string() + self.assertNotEqual(new_cql, original_cql) + self.assertIn(Ext0.after_table_cql(table_meta, Ext0.name, ext_map[Ext0.name]), new_cql) + self.assertIn(Ext1.after_table_cql(table_meta, Ext1.name, ext_map[Ext1.name]), new_cql) + class TestCodeCoverage(unittest.TestCase):