Add interfaces for defining and registering table extensions

Cassandra 3.0+
This commit is contained in:
Adam Holmberg
2016-11-02 13:34:33 -05:00
parent 19c1603493
commit 5d7d880f7c
2 changed files with 117 additions and 3 deletions

View File

@@ -1076,6 +1076,20 @@ class TableMetadata(object):
return not incompatible return not incompatible
return True return True
extensions = None
"""
Metadata describing configuration for table extensions
"""
_extension_registry = {}
class _RegisteredExtensionType(type):
def __new__(mcs, name, bases, dct):
cls = super(TableMetadata._RegisteredExtensionType, mcs).__new__(mcs, name, bases, dct)
if name != 'RegisteredTableExtension':
TableMetadata._extension_registry[cls.name] = cls
return cls
def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None): def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None):
self.keyspace_name = keyspace_name self.keyspace_name = keyspace_name
self.name = name self.name = name
@@ -1124,6 +1138,13 @@ class TableMetadata(object):
for view_meta in self.views.values(): for view_meta in self.views.values():
ret += "\n\n%s;" % (view_meta.as_cql_query(formatted=True),) ret += "\n\n%s;" % (view_meta.as_cql_query(formatted=True),)
if self.extensions: # None
for k in six.viewkeys(self._extension_registry) & self.extensions: # no viewkeys on OrderedMapSerializeKey
ext = self._extension_registry[k]
cql = ext.after_table_cql(self, k, self.extensions[k])
if cql:
ret += "\n\n%s" % (cql,)
return ret return ret
def as_cql_query(self, formatted=False): def as_cql_query(self, formatted=False):
@@ -1205,7 +1226,6 @@ class TableMetadata(object):
actual_options.setdefault("class", value) actual_options.setdefault("class", value)
compaction_option_strings = ["'%s': '%s'" % (k, v) for k, v in actual_options.items()] compaction_option_strings = ["'%s': '%s'" % (k, v) for k, v in actual_options.items()]
ret.append('compaction = {%s}' % ', '.join(compaction_option_strings))
for system_table_name in cls.compaction_options.keys(): for system_table_name in cls.compaction_options.keys():
options_copy.pop(system_table_name, None) # delete if present options_copy.pop(system_table_name, None) # delete if present
@@ -1225,6 +1245,31 @@ class TableMetadata(object):
return list(sorted(ret)) return list(sorted(ret))
class TableExtensionInterface(object):
"""
Defines CQL/DDL for Cassandra table extensions.
"""
# limited API for now. Could be expanded as new extension types materialize -- "extend_option_strings", for example
@classmethod
def after_table_cql(cls, ext_key, ext_blob):
"""
Called to produce CQL/DDL to follow the table definition.
Should contain requisite terminating semicolon(s).
"""
pass
@six.add_metaclass(TableMetadata._RegisteredExtensionType)
class RegisteredTableExtension(TableExtensionInterface):
"""
Extending this class registers it by name (associated by key in the `system_schema.tables.extensions` map).
"""
name = None
"""
Name of the extension (key in the map)
"""
def protect_name(name): def protect_name(name):
return maybe_escape_name(name) return maybe_escape_name(name)
@@ -2222,6 +2267,8 @@ class SchemaParserV3(SchemaParserV22):
index_meta = self._build_index_metadata(table_meta, index_row) index_meta = self._build_index_metadata(table_meta, index_row)
if index_meta: if index_meta:
table_meta.indexes[index_meta.name] = index_meta table_meta.indexes[index_meta.name] = index_meta
table_meta.extensions = row.get('extensions', {})
except Exception: except Exception:
table_meta._exc_info = sys.exc_info() table_meta._exc_info = sys.exc_info()
log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, table_name, row, col_rows) log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, table_name, row, col_rows)

View File

@@ -29,13 +29,13 @@ from cassandra.cluster import Cluster
from cassandra.encoder import Encoder from cassandra.encoder import Encoder
from cassandra.metadata import (Metadata, KeyspaceMetadata, IndexMetadata, from cassandra.metadata import (Metadata, KeyspaceMetadata, IndexMetadata,
Token, MD5Token, TokenMap, murmur3, Function, Aggregate, protect_name, protect_names, Token, MD5Token, TokenMap, murmur3, Function, Aggregate, protect_name, protect_names,
get_schema_parser) get_schema_parser, RegisteredTableExtension)
from cassandra.policies import SimpleConvictionPolicy from cassandra.policies import SimpleConvictionPolicy
from cassandra.pool import Host from cassandra.pool import Host
from tests.integration import get_cluster, use_singledc, PROTOCOL_VERSION, get_server_versions, execute_until_pass, \ from tests.integration import get_cluster, use_singledc, PROTOCOL_VERSION, get_server_versions, execute_until_pass, \
BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase, BasicExistingKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster, CASSANDRA_VERSION, \ BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase, BasicExistingKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster, CASSANDRA_VERSION, \
BasicExistingSegregatedKeyspaceUnitTestCase, dseonly, DSE_VERSION, get_supported_protocol_versions BasicExistingSegregatedKeyspaceUnitTestCase, dseonly, DSE_VERSION, get_supported_protocol_versions, greaterthanorequalcass30
def setup_module(): def setup_module():
@@ -858,6 +858,73 @@ class SchemaMetadataTests(BasicSegregatedKeyspaceUnitTestCase):
self.assertEqual(index_2.index_options["target"], "keys(b)") self.assertEqual(index_2.index_options["target"], "keys(b)")
self.assertEqual(index_2.keyspace_name, "schemametadatatests") self.assertEqual(index_2.keyspace_name, "schemametadatatests")
@greaterthanorequalcass30
def test_table_extensions(self):
s = self.session
ks = self.keyspace_name
ks_meta = s.cluster.metadata.keyspaces[ks]
t = self.function_table_name
s.execute("CREATE TABLE %s.%s (k text PRIMARY KEY, v int)" % (ks, t))
table_meta = ks_meta.tables[t]
self.assertFalse(table_meta.extensions)
self.assertNotIn(t, table_meta._extension_registry)
original_cql = table_meta.export_as_string()
# extensions registered, not present
# --------------------------------------
class Ext0(RegisteredTableExtension):
name = t
@classmethod
def after_table_cql(cls, table_meta, ext_key, ext_blob):
return "%s %s %s %s" % (cls.name, table_meta.name, ext_key, ext_blob)
class Ext1(Ext0):
name = t + '##'
self.assertFalse(table_meta.extensions)
self.assertIn(Ext0.name, table_meta._extension_registry)
self.assertIn(Ext1.name, table_meta._extension_registry)
self.assertEqual(len(table_meta._extension_registry), 2)
self.cluster.refresh_table_metadata(ks, t)
table_meta = ks_meta.tables[t]
self.assertEqual(table_meta.export_as_string(), original_cql)
p = s.prepare('UPDATE system_schema.tables SET extensions=? WHERE keyspace_name=? AND table_name=?') # for blob type coercing
# extensions registered, one present
# --------------------------------------
ext_map = {Ext0.name: six.b("THA VALUE")}
s.execute(p, (ext_map, ks, t))
self.cluster.refresh_table_metadata(ks, t)
table_meta = ks_meta.tables[t]
self.assertIn(Ext0.name, table_meta.extensions)
new_cql = table_meta.export_as_string()
self.assertNotEqual(new_cql, original_cql)
self.assertIn(Ext0.after_table_cql(table_meta, Ext0.name, ext_map[Ext0.name]), new_cql)
self.assertNotIn(Ext1.name, new_cql)
# extensions registered, one present
# --------------------------------------
ext_map = {Ext0.name: six.b("THA VALUE"),
Ext1.name: six.b("OTHA VALUE")}
s.execute(p, (ext_map, ks, t))
self.cluster.refresh_table_metadata(ks, t)
table_meta = ks_meta.tables[t]
self.assertIn(Ext0.name, table_meta.extensions)
self.assertIn(Ext1.name, table_meta.extensions)
new_cql = table_meta.export_as_string()
self.assertNotEqual(new_cql, original_cql)
self.assertIn(Ext0.after_table_cql(table_meta, Ext0.name, ext_map[Ext0.name]), new_cql)
self.assertIn(Ext1.after_table_cql(table_meta, Ext1.name, ext_map[Ext1.name]), new_cql)
class TestCodeCoverage(unittest.TestCase): class TestCodeCoverage(unittest.TestCase):