Files
deb-python-cassandra-driver/tests/integration/standard/test_metadata.py
2014-07-25 13:29:09 -05:00

561 lines
22 KiB
Python

# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from mock import Mock
from cassandra import AlreadyExists
from cassandra.cluster import Cluster
from cassandra.metadata import (Metadata, KeyspaceMetadata, TableMetadata,
Token, MD5Token, TokenMap, murmur3)
from cassandra.policies import SimpleConvictionPolicy
from cassandra.pool import Host
from tests.integration import get_cluster, PROTOCOL_VERSION, get_server_versions
class SchemaMetadataTest(unittest.TestCase):
ksname = "schemametadatatest"
@property
def cfname(self):
return self._testMethodName.lower()
@classmethod
def setup_class(cls):
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
try:
results = session.execute("SELECT keyspace_name FROM system.schema_keyspaces")
existing_keyspaces = [row[0] for row in results]
if cls.ksname in existing_keyspaces:
session.execute("DROP KEYSPACE %s" % cls.ksname)
session.execute(
"""
CREATE KEYSPACE %s
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""" % cls.ksname)
finally:
cluster.shutdown()
@classmethod
def teardown_class(cls):
cluster = Cluster(['127.0.0.1'],
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
try:
session.execute("DROP KEYSPACE %s" % cls.ksname)
finally:
cluster.shutdown()
def setUp(self):
self.cluster = Cluster(['127.0.0.1'],
protocol_version=PROTOCOL_VERSION)
self.session = self.cluster.connect()
def tearDown(self):
try:
self.session.execute(
"""
DROP TABLE {ksname}.{cfname}
""".format(ksname=self.ksname, cfname=self.cfname))
finally:
self.cluster.shutdown()
def make_create_statement(self, partition_cols, clustering_cols=None, other_cols=None, compact=False):
clustering_cols = clustering_cols or []
other_cols = other_cols or []
statement = "CREATE TABLE %s.%s (" % (self.ksname, self.cfname)
if len(partition_cols) == 1 and not clustering_cols:
statement += "%s text PRIMARY KEY, " % partition_cols[0]
else:
statement += ", ".join("%s text" % col for col in partition_cols)
statement += ", "
statement += ", ".join("%s text" % col for col in clustering_cols + other_cols)
if len(partition_cols) != 1 or clustering_cols:
statement += ", PRIMARY KEY ("
if len(partition_cols) > 1:
statement += "(" + ", ".join(partition_cols) + ")"
else:
statement += partition_cols[0]
if clustering_cols:
statement += ", "
statement += ", ".join(clustering_cols)
statement += ")"
statement += ")"
if compact:
statement += " WITH COMPACT STORAGE"
return statement
def check_create_statement(self, tablemeta, original):
recreate = tablemeta.as_cql_query(formatted=False)
self.assertEqual(original, recreate[:len(original)])
self.session.execute("DROP TABLE %s.%s" % (self.ksname, self.cfname))
self.session.execute(recreate)
# create the table again, but with formatting enabled
self.session.execute("DROP TABLE %s.%s" % (self.ksname, self.cfname))
recreate = tablemeta.as_cql_query(formatted=True)
self.session.execute(recreate)
def get_table_metadata(self):
self.cluster.control_connection.refresh_schema()
return self.cluster.metadata.keyspaces[self.ksname].tables[self.cfname]
def test_basic_table_meta_properties(self):
create_statement = self.make_create_statement(["a"], [], ["b", "c"])
self.session.execute(create_statement)
self.cluster.control_connection.refresh_schema()
meta = self.cluster.metadata
self.assertNotEqual(meta.cluster_ref, None)
self.assertNotEqual(meta.cluster_name, None)
self.assertTrue(self.ksname in meta.keyspaces)
ksmeta = meta.keyspaces[self.ksname]
self.assertEqual(ksmeta.name, self.ksname)
self.assertTrue(ksmeta.durable_writes)
self.assertEqual(ksmeta.replication_strategy.name, 'SimpleStrategy')
self.assertEqual(ksmeta.replication_strategy.replication_factor, 1)
self.assertTrue(self.cfname in ksmeta.tables)
tablemeta = ksmeta.tables[self.cfname]
self.assertEqual(tablemeta.keyspace, ksmeta)
self.assertEqual(tablemeta.name, self.cfname)
self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
self.assertEqual([], tablemeta.clustering_key)
self.assertEqual([u'a', u'b', u'c'], sorted(tablemeta.columns.keys()))
for option in tablemeta.options:
self.assertIn(option, TableMetadata.recognized_options)
self.check_create_statement(tablemeta, create_statement)
def test_compound_primary_keys(self):
create_statement = self.make_create_statement(["a"], ["b"], ["c"])
create_statement += " WITH CLUSTERING ORDER BY (b ASC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
self.assertEqual([u'b'], [c.name for c in tablemeta.clustering_key])
self.assertEqual([u'a', u'b', u'c'], sorted(tablemeta.columns.keys()))
self.check_create_statement(tablemeta, create_statement)
def test_compound_primary_keys_more_columns(self):
create_statement = self.make_create_statement(["a"], ["b", "c"], ["d", "e", "f"])
create_statement += " WITH CLUSTERING ORDER BY (b ASC, c ASC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
self.assertEqual([u'b', u'c'], [c.name for c in tablemeta.clustering_key])
self.assertEqual(
[u'a', u'b', u'c', u'd', u'e', u'f'],
sorted(tablemeta.columns.keys()))
self.check_create_statement(tablemeta, create_statement)
def test_composite_primary_key(self):
create_statement = self.make_create_statement(["a", "b"], [], ["c"])
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a', u'b'], [c.name for c in tablemeta.partition_key])
self.assertEqual([], tablemeta.clustering_key)
self.assertEqual([u'a', u'b', u'c'], sorted(tablemeta.columns.keys()))
self.check_create_statement(tablemeta, create_statement)
def test_composite_in_compound_primary_key(self):
create_statement = self.make_create_statement(["a", "b"], ["c"], ["d", "e"])
create_statement += " WITH CLUSTERING ORDER BY (c ASC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a', u'b'], [c.name for c in tablemeta.partition_key])
self.assertEqual([u'c'], [c.name for c in tablemeta.clustering_key])
self.assertEqual([u'a', u'b', u'c', u'd', u'e'], sorted(tablemeta.columns.keys()))
self.check_create_statement(tablemeta, create_statement)
def test_compound_primary_keys_compact(self):
create_statement = self.make_create_statement(["a"], ["b"], ["c"], compact=True)
create_statement += " AND CLUSTERING ORDER BY (b ASC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
self.assertEqual([u'b'], [c.name for c in tablemeta.clustering_key])
self.assertEqual([u'a', u'b', u'c'], sorted(tablemeta.columns.keys()))
self.check_create_statement(tablemeta, create_statement)
def test_compound_primary_keys_more_columns_compact(self):
create_statement = self.make_create_statement(["a"], ["b", "c"], ["d"], compact=True)
create_statement += " AND CLUSTERING ORDER BY (b ASC, c ASC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
self.assertEqual([u'b', u'c'], [c.name for c in tablemeta.clustering_key])
self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys()))
self.check_create_statement(tablemeta, create_statement)
def test_composite_primary_key_compact(self):
create_statement = self.make_create_statement(["a", "b"], [], ["c"], compact=True)
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a', u'b'], [c.name for c in tablemeta.partition_key])
self.assertEqual([], tablemeta.clustering_key)
self.assertEqual([u'a', u'b', u'c'], sorted(tablemeta.columns.keys()))
self.check_create_statement(tablemeta, create_statement)
def test_composite_in_compound_primary_key_compact(self):
create_statement = self.make_create_statement(["a", "b"], ["c"], ["d"], compact=True)
create_statement += " AND CLUSTERING ORDER BY (c ASC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a', u'b'], [c.name for c in tablemeta.partition_key])
self.assertEqual([u'c'], [c.name for c in tablemeta.clustering_key])
self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys()))
self.check_create_statement(tablemeta, create_statement)
def test_compound_primary_keys_ordering(self):
create_statement = self.make_create_statement(["a"], ["b"], ["c"])
create_statement += " WITH CLUSTERING ORDER BY (b DESC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.check_create_statement(tablemeta, create_statement)
def test_compound_primary_keys_more_columns_ordering(self):
create_statement = self.make_create_statement(["a"], ["b", "c"], ["d", "e", "f"])
create_statement += " WITH CLUSTERING ORDER BY (b DESC, c ASC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.check_create_statement(tablemeta, create_statement)
def test_composite_in_compound_primary_key_ordering(self):
create_statement = self.make_create_statement(["a", "b"], ["c"], ["d", "e"])
create_statement += " WITH CLUSTERING ORDER BY (c DESC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.check_create_statement(tablemeta, create_statement)
def test_indexes(self):
create_statement = self.make_create_statement(["a"], ["b", "c"], ["d", "e", "f"])
create_statement += " WITH CLUSTERING ORDER BY (b ASC, c ASC)"
self.session.execute(create_statement)
d_index = "CREATE INDEX d_index ON %s.%s (d)" % (self.ksname, self.cfname)
e_index = "CREATE INDEX e_index ON %s.%s (e)" % (self.ksname, self.cfname)
self.session.execute(d_index)
self.session.execute(e_index)
tablemeta = self.get_table_metadata()
statements = tablemeta.export_as_string().strip()
statements = [s.strip() for s in statements.split(';')]
statements = list(filter(bool, statements))
self.assertEqual(3, len(statements))
self.assertEqual(d_index, statements[1])
self.assertEqual(e_index, statements[2])
# make sure indexes are included in KeyspaceMetadata.export_as_string()
ksmeta = self.cluster.metadata.keyspaces[self.ksname]
statement = ksmeta.export_as_string()
self.assertIn('CREATE INDEX d_index', statement)
self.assertIn('CREATE INDEX e_index', statement)
class TestCodeCoverage(unittest.TestCase):
def test_export_schema(self):
"""
Test export schema functionality
"""
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
cluster.connect()
self.assertIsInstance(cluster.metadata.export_schema_as_string(), six.string_types)
def test_export_keyspace_schema(self):
"""
Test export keyspace schema functionality
"""
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
cluster.connect()
for keyspace in cluster.metadata.keyspaces:
keyspace_metadata = cluster.metadata.keyspaces[keyspace]
self.assertIsInstance(keyspace_metadata.export_as_string(), six.string_types)
self.assertIsInstance(keyspace_metadata.as_cql_query(), six.string_types)
def test_export_keyspace_schema_udts(self):
"""
Test udt exports
"""
if get_server_versions()[0] < (2, 1, 0):
raise unittest.SkipTest('UDTs were introduced in Cassandra 2.1')
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
session.execute("""
CREATE KEYSPACE export_udts
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
AND durable_writes = true;
""")
session.execute("""
CREATE TYPE export_udts.street (
street_number int,
street_name text)
""")
session.execute("""
CREATE TYPE export_udts.zip (
zipcode int,
zip_plus_4 int)
""")
session.execute("""
CREATE TYPE export_udts.address (
street_address street,
zip_code zip)
""")
session.execute("""
CREATE TABLE export_udts.users (
user text PRIMARY KEY,
addresses map<text, address>)
""")
expected_string = """CREATE KEYSPACE export_udts WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
CREATE TYPE export_udts.street (
street_number int,
street_name text
);
CREATE TYPE export_udts.zip (
zipcode int,
zip_plus_4 int
);
CREATE TYPE export_udts.address (
street_address street,
zip_code zip
);
CREATE TABLE export_udts.users (
user text PRIMARY KEY,
addresses map<text, address>
) WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';"""
self.assertEqual(cluster.metadata.keyspaces['export_udts'].export_as_string(), expected_string)
table_meta = cluster.metadata.keyspaces['export_udts'].tables['users']
expected_string = """CREATE TABLE export_udts.users (
user text PRIMARY KEY,
addresses map<text, address>
) WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';"""
self.assertEqual(table_meta.export_as_string(), expected_string)
def test_case_sensitivity(self):
"""
Test that names that need to be escaped in CREATE statements are
"""
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
ksname = 'AnInterestingKeyspace'
cfname = 'AnInterestingTable'
session.execute("""
CREATE KEYSPACE "%s"
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
""" % (ksname,))
session.execute("""
CREATE TABLE "%s"."%s" (
k int,
"A" int,
"B" int,
"MyColumn" int,
PRIMARY KEY (k, "A"))
WITH CLUSTERING ORDER BY ("A" DESC)
""" % (ksname, cfname))
session.execute("""
CREATE INDEX myindex ON "%s"."%s" ("MyColumn")
""" % (ksname, cfname))
ksmeta = cluster.metadata.keyspaces[ksname]
schema = ksmeta.export_as_string()
self.assertIn('CREATE KEYSPACE "AnInterestingKeyspace"', schema)
self.assertIn('CREATE TABLE "AnInterestingKeyspace"."AnInterestingTable"', schema)
self.assertIn('"A" int', schema)
self.assertIn('"B" int', schema)
self.assertIn('"MyColumn" int', schema)
self.assertIn('PRIMARY KEY (k, "A")', schema)
self.assertIn('WITH CLUSTERING ORDER BY ("A" DESC)', schema)
self.assertIn('CREATE INDEX myindex ON "AnInterestingKeyspace"."AnInterestingTable" ("MyColumn")', schema)
def test_already_exists_exceptions(self):
"""
Ensure AlreadyExists exception is thrown when hit
"""
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
ksname = 'test3rf'
cfname = 'test'
ddl = '''
CREATE KEYSPACE %s
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}'''
self.assertRaises(AlreadyExists, session.execute, ddl % ksname)
ddl = '''
CREATE TABLE %s.%s (
k int PRIMARY KEY,
v int )'''
self.assertRaises(AlreadyExists, session.execute, ddl % (ksname, cfname))
def test_replicas(self):
"""
Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace
"""
if murmur3 is None:
raise unittest.SkipTest('the murmur3 extension is not available')
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.assertEqual(cluster.metadata.get_replicas('test3rf', 'key'), [])
cluster.connect('test3rf')
self.assertNotEqual(list(cluster.metadata.get_replicas('test3rf', 'key')), [])
host = list(cluster.metadata.get_replicas('test3rf', 'key'))[0]
self.assertEqual(host.datacenter, 'datacenter1')
self.assertEqual(host.rack, 'rack1')
def test_token_map(self):
"""
Test token mappings
"""
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
cluster.connect('test3rf')
ring = cluster.metadata.token_map.ring
owners = list(cluster.metadata.token_map.token_to_host_owner[token] for token in ring)
get_replicas = cluster.metadata.token_map.get_replicas
for ksname in ('test1rf', 'test2rf', 'test3rf'):
self.assertNotEqual(list(get_replicas(ksname, ring[0])), [])
for i, token in enumerate(ring):
self.assertEqual(set(get_replicas('test3rf', token)), set(owners))
self.assertEqual(set(get_replicas('test2rf', token)), set([owners[(i + 1) % 3], owners[(i + 2) % 3]]))
self.assertEqual(set(get_replicas('test1rf', token)), set([owners[(i + 1) % 3]]))
class TokenMetadataTest(unittest.TestCase):
"""
Test of TokenMap creation and other behavior.
"""
def test_token(self):
expected_node_count = len(get_cluster().nodes)
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
cluster.connect()
tmap = cluster.metadata.token_map
self.assertTrue(issubclass(tmap.token_class, Token))
self.assertEqual(expected_node_count, len(tmap.ring))
cluster.shutdown()
def test_getting_replicas(self):
tokens = [MD5Token(str(i)) for i in range(0, (2 ** 127 - 1), 2 ** 125)]
hosts = [Host("ip%d" % i, SimpleConvictionPolicy) for i in range(len(tokens))]
token_to_primary_replica = dict(zip(tokens, hosts))
keyspace = KeyspaceMetadata("ks", True, "SimpleStrategy", {"replication_factor": "1"})
metadata = Mock(spec=Metadata, keyspaces={'ks': keyspace})
token_map = TokenMap(MD5Token, token_to_primary_replica, tokens, metadata)
# tokens match node tokens exactly
for i, token in enumerate(tokens):
expected_host = hosts[(i + 1) % len(hosts)]
replicas = token_map.get_replicas("ks", token)
self.assertEqual(set(replicas), set([expected_host]))
# shift the tokens back by one
for token, expected_host in zip(tokens, hosts):
replicas = token_map.get_replicas("ks", MD5Token(str(token.value - 1)))
self.assertEqual(set(replicas), set([expected_host]))
# shift the tokens forward by one
for i, token in enumerate(tokens):
replicas = token_map.get_replicas("ks", MD5Token(str(token.value + 1)))
expected_host = hosts[(i + 1) % len(hosts)]
self.assertEqual(set(replicas), set([expected_host]))