Merge pull request #675 from datastax/table-extensions
Add interfaces for defining and registering table extensions
This commit is contained in:
		| @@ -1076,6 +1076,20 @@ class TableMetadata(object): | |||||||
|             return not incompatible |             return not incompatible | ||||||
|         return True |         return True | ||||||
|  |  | ||||||
|  |     extensions = None | ||||||
|  |     """ | ||||||
|  |     Metadata describing configuration for table extensions | ||||||
|  |     """ | ||||||
|  |  | ||||||
|  |     _extension_registry = {} | ||||||
|  |  | ||||||
|  |     class _RegisteredExtensionType(type): | ||||||
|  |         def __new__(mcs, name, bases, dct): | ||||||
|  |             cls = super(TableMetadata._RegisteredExtensionType, mcs).__new__(mcs, name, bases, dct) | ||||||
|  |             if name != 'RegisteredTableExtension': | ||||||
|  |                 TableMetadata._extension_registry[cls.name] = cls | ||||||
|  |             return cls | ||||||
|  |  | ||||||
|     def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None): |     def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None): | ||||||
|         self.keyspace_name = keyspace_name |         self.keyspace_name = keyspace_name | ||||||
|         self.name = name |         self.name = name | ||||||
| @@ -1124,6 +1138,13 @@ class TableMetadata(object): | |||||||
|         for view_meta in self.views.values(): |         for view_meta in self.views.values(): | ||||||
|             ret += "\n\n%s;" % (view_meta.as_cql_query(formatted=True),) |             ret += "\n\n%s;" % (view_meta.as_cql_query(formatted=True),) | ||||||
|  |  | ||||||
|  |         if self.extensions:  # None | ||||||
|  |             for k in six.viewkeys(self._extension_registry) & self.extensions:  # no viewkeys on OrderedMapSerializeKey | ||||||
|  |                 ext = self._extension_registry[k] | ||||||
|  |                 cql = ext.after_table_cql(self, k, self.extensions[k]) | ||||||
|  |                 if cql: | ||||||
|  |                     ret += "\n\n%s" % (cql,) | ||||||
|  |  | ||||||
|         return ret |         return ret | ||||||
|  |  | ||||||
|     def as_cql_query(self, formatted=False): |     def as_cql_query(self, formatted=False): | ||||||
| @@ -1205,7 +1226,6 @@ class TableMetadata(object): | |||||||
|         actual_options.setdefault("class", value) |         actual_options.setdefault("class", value) | ||||||
|  |  | ||||||
|         compaction_option_strings = ["'%s': '%s'" % (k, v) for k, v in actual_options.items()] |         compaction_option_strings = ["'%s': '%s'" % (k, v) for k, v in actual_options.items()] | ||||||
|         ret.append('compaction = {%s}' % ', '.join(compaction_option_strings)) |  | ||||||
|  |  | ||||||
|         for system_table_name in cls.compaction_options.keys(): |         for system_table_name in cls.compaction_options.keys(): | ||||||
|             options_copy.pop(system_table_name, None)  # delete if present |             options_copy.pop(system_table_name, None)  # delete if present | ||||||
| @@ -1225,6 +1245,31 @@ class TableMetadata(object): | |||||||
|         return list(sorted(ret)) |         return list(sorted(ret)) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class TableExtensionInterface(object): | ||||||
|  |     """ | ||||||
|  |     Defines CQL/DDL for Cassandra table extensions. | ||||||
|  |     """ | ||||||
|  |     # limited API for now. Could be expanded as new extension types materialize -- "extend_option_strings", for example | ||||||
|  |     @classmethod | ||||||
|  |     def after_table_cql(cls, ext_key, ext_blob): | ||||||
|  |         """ | ||||||
|  |         Called to produce CQL/DDL to follow the table definition. | ||||||
|  |         Should contain requisite terminating semicolon(s). | ||||||
|  |         """ | ||||||
|  |         pass | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @six.add_metaclass(TableMetadata._RegisteredExtensionType) | ||||||
|  | class RegisteredTableExtension(TableExtensionInterface): | ||||||
|  |     """ | ||||||
|  |     Extending this class registers it by name (associated by key in the `system_schema.tables.extensions` map). | ||||||
|  |     """ | ||||||
|  |     name = None | ||||||
|  |     """ | ||||||
|  |     Name of the extension (key in the map) | ||||||
|  |     """ | ||||||
|  |  | ||||||
|  |  | ||||||
| def protect_name(name): | def protect_name(name): | ||||||
|     return maybe_escape_name(name) |     return maybe_escape_name(name) | ||||||
|  |  | ||||||
| @@ -2222,6 +2267,8 @@ class SchemaParserV3(SchemaParserV22): | |||||||
|                 index_meta = self._build_index_metadata(table_meta, index_row) |                 index_meta = self._build_index_metadata(table_meta, index_row) | ||||||
|                 if index_meta: |                 if index_meta: | ||||||
|                     table_meta.indexes[index_meta.name] = index_meta |                     table_meta.indexes[index_meta.name] = index_meta | ||||||
|  |  | ||||||
|  |             table_meta.extensions = row.get('extensions', {}) | ||||||
|         except Exception: |         except Exception: | ||||||
|             table_meta._exc_info = sys.exc_info() |             table_meta._exc_info = sys.exc_info() | ||||||
|             log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, table_name, row, col_rows) |             log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, table_name, row, col_rows) | ||||||
|   | |||||||
| @@ -29,13 +29,13 @@ from cassandra.cluster import Cluster | |||||||
| from cassandra.encoder import Encoder | from cassandra.encoder import Encoder | ||||||
| from cassandra.metadata import (Metadata, KeyspaceMetadata, IndexMetadata, | from cassandra.metadata import (Metadata, KeyspaceMetadata, IndexMetadata, | ||||||
|                                 Token, MD5Token, TokenMap, murmur3, Function, Aggregate, protect_name, protect_names, |                                 Token, MD5Token, TokenMap, murmur3, Function, Aggregate, protect_name, protect_names, | ||||||
|                                 get_schema_parser) |                                 get_schema_parser, RegisteredTableExtension) | ||||||
| from cassandra.policies import SimpleConvictionPolicy | from cassandra.policies import SimpleConvictionPolicy | ||||||
| from cassandra.pool import Host | from cassandra.pool import Host | ||||||
|  |  | ||||||
| from tests.integration import get_cluster, use_singledc, PROTOCOL_VERSION, get_server_versions, execute_until_pass, \ | from tests.integration import get_cluster, use_singledc, PROTOCOL_VERSION, get_server_versions, execute_until_pass, \ | ||||||
|     BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase, BasicExistingKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster, CASSANDRA_VERSION, \ |     BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase, BasicExistingKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster, CASSANDRA_VERSION, \ | ||||||
|     BasicExistingSegregatedKeyspaceUnitTestCase, dseonly, DSE_VERSION, get_supported_protocol_versions |     BasicExistingSegregatedKeyspaceUnitTestCase, dseonly, DSE_VERSION, get_supported_protocol_versions, greaterthanorequalcass30 | ||||||
|  |  | ||||||
|  |  | ||||||
| def setup_module(): | def setup_module(): | ||||||
| @@ -858,6 +858,73 @@ class SchemaMetadataTests(BasicSegregatedKeyspaceUnitTestCase): | |||||||
|         self.assertEqual(index_2.index_options["target"], "keys(b)") |         self.assertEqual(index_2.index_options["target"], "keys(b)") | ||||||
|         self.assertEqual(index_2.keyspace_name, "schemametadatatests") |         self.assertEqual(index_2.keyspace_name, "schemametadatatests") | ||||||
|  |  | ||||||
|  |     @greaterthanorequalcass30 | ||||||
|  |     def test_table_extensions(self): | ||||||
|  |         s = self.session | ||||||
|  |         ks = self.keyspace_name | ||||||
|  |         ks_meta = s.cluster.metadata.keyspaces[ks] | ||||||
|  |         t = self.function_table_name | ||||||
|  |  | ||||||
|  |         s.execute("CREATE TABLE %s.%s (k text PRIMARY KEY, v int)" % (ks, t)) | ||||||
|  |  | ||||||
|  |         table_meta = ks_meta.tables[t] | ||||||
|  |  | ||||||
|  |         self.assertFalse(table_meta.extensions) | ||||||
|  |         self.assertNotIn(t, table_meta._extension_registry) | ||||||
|  |  | ||||||
|  |         original_cql = table_meta.export_as_string() | ||||||
|  |  | ||||||
|  |         # extensions registered, not present | ||||||
|  |         # -------------------------------------- | ||||||
|  |         class Ext0(RegisteredTableExtension): | ||||||
|  |             name = t | ||||||
|  |  | ||||||
|  |             @classmethod | ||||||
|  |             def after_table_cql(cls, table_meta, ext_key, ext_blob): | ||||||
|  |                 return "%s %s %s %s" % (cls.name, table_meta.name, ext_key, ext_blob) | ||||||
|  |  | ||||||
|  |         class Ext1(Ext0): | ||||||
|  |             name = t + '##' | ||||||
|  |  | ||||||
|  |         self.assertFalse(table_meta.extensions) | ||||||
|  |         self.assertIn(Ext0.name, table_meta._extension_registry) | ||||||
|  |         self.assertIn(Ext1.name, table_meta._extension_registry) | ||||||
|  |         self.assertEqual(len(table_meta._extension_registry), 2) | ||||||
|  |  | ||||||
|  |         self.cluster.refresh_table_metadata(ks, t) | ||||||
|  |         table_meta = ks_meta.tables[t] | ||||||
|  |  | ||||||
|  |         self.assertEqual(table_meta.export_as_string(), original_cql) | ||||||
|  |  | ||||||
|  |         p = s.prepare('UPDATE system_schema.tables SET extensions=? WHERE keyspace_name=? AND table_name=?')  # for blob type coercing | ||||||
|  |         # extensions registered, one present | ||||||
|  |         # -------------------------------------- | ||||||
|  |         ext_map = {Ext0.name: six.b("THA VALUE")} | ||||||
|  |         s.execute(p, (ext_map, ks, t)) | ||||||
|  |         self.cluster.refresh_table_metadata(ks, t) | ||||||
|  |         table_meta = ks_meta.tables[t] | ||||||
|  |  | ||||||
|  |         self.assertIn(Ext0.name, table_meta.extensions) | ||||||
|  |         new_cql = table_meta.export_as_string() | ||||||
|  |         self.assertNotEqual(new_cql, original_cql) | ||||||
|  |         self.assertIn(Ext0.after_table_cql(table_meta, Ext0.name, ext_map[Ext0.name]), new_cql) | ||||||
|  |         self.assertNotIn(Ext1.name, new_cql) | ||||||
|  |  | ||||||
|  |         # extensions registered, one present | ||||||
|  |         # -------------------------------------- | ||||||
|  |         ext_map = {Ext0.name: six.b("THA VALUE"), | ||||||
|  |                    Ext1.name: six.b("OTHA VALUE")} | ||||||
|  |         s.execute(p, (ext_map, ks, t)) | ||||||
|  |         self.cluster.refresh_table_metadata(ks, t) | ||||||
|  |         table_meta = ks_meta.tables[t] | ||||||
|  |  | ||||||
|  |         self.assertIn(Ext0.name, table_meta.extensions) | ||||||
|  |         self.assertIn(Ext1.name, table_meta.extensions) | ||||||
|  |         new_cql = table_meta.export_as_string() | ||||||
|  |         self.assertNotEqual(new_cql, original_cql) | ||||||
|  |         self.assertIn(Ext0.after_table_cql(table_meta, Ext0.name, ext_map[Ext0.name]), new_cql) | ||||||
|  |         self.assertIn(Ext1.after_table_cql(table_meta, Ext1.name, ext_map[Ext1.name]), new_cql) | ||||||
|  |  | ||||||
|  |  | ||||||
| class TestCodeCoverage(unittest.TestCase): | class TestCodeCoverage(unittest.TestCase): | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Adam Holmberg
					Adam Holmberg