This commit is contained in:
Adam Holmberg
2015-05-22 09:51:31 -05:00
2 changed files with 270 additions and 14 deletions

View File

@@ -248,7 +248,7 @@ class ClusterTests(unittest.TestCase):
original_meta = cluster.metadata.keyspaces original_meta = cluster.metadata.keyspaces
# full schema refresh, with wait # full schema refresh, with wait
cluster.refresh_schema() cluster.refresh_schema_metadata()
self.assertIsNot(original_meta, cluster.metadata.keyspaces) self.assertIsNot(original_meta, cluster.metadata.keyspaces)
self.assertEqual(original_meta, cluster.metadata.keyspaces) self.assertEqual(original_meta, cluster.metadata.keyspaces)
@@ -262,7 +262,7 @@ class ClusterTests(unittest.TestCase):
original_system_meta = original_meta['system'] original_system_meta = original_meta['system']
# only refresh one keyspace # only refresh one keyspace
cluster.refresh_schema(keyspace='system') cluster.refresh_keyspace_metadata('system')
current_meta = cluster.metadata.keyspaces current_meta = cluster.metadata.keyspaces
self.assertIs(original_meta, current_meta) self.assertIs(original_meta, current_meta)
current_system_meta = current_meta['system'] current_system_meta = current_meta['system']
@@ -279,7 +279,7 @@ class ClusterTests(unittest.TestCase):
original_system_schema_meta = original_system_meta.tables['schema_columnfamilies'] original_system_schema_meta = original_system_meta.tables['schema_columnfamilies']
# only refresh one table # only refresh one table
cluster.refresh_schema(keyspace='system', table='schema_columnfamilies') cluster.refresh_table_metadata('system', 'schema_columnfamilies')
current_meta = cluster.metadata.keyspaces current_meta = cluster.metadata.keyspaces
current_system_meta = current_meta['system'] current_system_meta = current_meta['system']
current_system_schema_meta = current_system_meta.tables['schema_columnfamilies'] current_system_schema_meta = current_system_meta.tables['schema_columnfamilies']
@@ -309,7 +309,7 @@ class ClusterTests(unittest.TestCase):
original_type_meta = original_test1rf_meta.user_types[type_name] original_type_meta = original_test1rf_meta.user_types[type_name]
# only refresh one type # only refresh one type
cluster.refresh_schema(keyspace='test1rf', usertype=type_name) cluster.refresh_user_type_metadata('test1rf', type_name)
current_meta = cluster.metadata.keyspaces current_meta = cluster.metadata.keyspaces
current_test1rf_meta = current_meta[keyspace_name] current_test1rf_meta = current_meta[keyspace_name]
current_type_meta = current_test1rf_meta.user_types[type_name] current_type_meta = current_test1rf_meta.user_types[type_name]
@@ -345,7 +345,7 @@ class ClusterTests(unittest.TestCase):
# cluster agreement wait used for refresh # cluster agreement wait used for refresh
original_meta = c.metadata.keyspaces original_meta = c.metadata.keyspaces
start_time = time.time() start_time = time.time()
self.assertRaisesRegexp(Exception, r"Schema was not refreshed.*", c.refresh_schema) self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata)
end_time = time.time() end_time = time.time()
self.assertGreaterEqual(end_time - start_time, agreement_timeout) self.assertGreaterEqual(end_time - start_time, agreement_timeout)
self.assertIs(original_meta, c.metadata.keyspaces) self.assertIs(original_meta, c.metadata.keyspaces)
@@ -353,7 +353,7 @@ class ClusterTests(unittest.TestCase):
# refresh wait overrides cluster value # refresh wait overrides cluster value
original_meta = c.metadata.keyspaces original_meta = c.metadata.keyspaces
start_time = time.time() start_time = time.time()
c.refresh_schema(max_schema_agreement_wait=0) c.refresh_schema_metadata(max_schema_agreement_wait=0)
end_time = time.time() end_time = time.time()
self.assertLess(end_time - start_time, agreement_timeout) self.assertLess(end_time - start_time, agreement_timeout)
self.assertIsNot(original_meta, c.metadata.keyspaces) self.assertIsNot(original_meta, c.metadata.keyspaces)
@@ -373,7 +373,7 @@ class ClusterTests(unittest.TestCase):
# cluster agreement wait used for refresh # cluster agreement wait used for refresh
original_meta = c.metadata.keyspaces original_meta = c.metadata.keyspaces
start_time = time.time() start_time = time.time()
c.refresh_schema() c.refresh_schema_metadata()
end_time = time.time() end_time = time.time()
self.assertLess(end_time - start_time, refresh_threshold) self.assertLess(end_time - start_time, refresh_threshold)
self.assertIsNot(original_meta, c.metadata.keyspaces) self.assertIsNot(original_meta, c.metadata.keyspaces)
@@ -382,7 +382,8 @@ class ClusterTests(unittest.TestCase):
# refresh wait overrides cluster value # refresh wait overrides cluster value
original_meta = c.metadata.keyspaces original_meta = c.metadata.keyspaces
start_time = time.time() start_time = time.time()
self.assertRaisesRegexp(Exception, r"Schema was not refreshed.*", c.refresh_schema, max_schema_agreement_wait=agreement_timeout) self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata,
max_schema_agreement_wait=agreement_timeout)
end_time = time.time() end_time = time.time()
self.assertGreaterEqual(end_time - start_time, agreement_timeout) self.assertGreaterEqual(end_time - start_time, agreement_timeout)
self.assertIs(original_meta, c.metadata.keyspaces) self.assertIs(original_meta, c.metadata.keyspaces)
@@ -566,8 +567,8 @@ class ClusterTests(unittest.TestCase):
session.execute('USE system_traces') session.execute('USE system_traces')
# refresh schema # refresh schema
cluster.refresh_schema() cluster.refresh_schema_metadata()
cluster.refresh_schema(max_schema_agreement_wait=0) cluster.refresh_schema_metadata(max_schema_agreement_wait=0)
# submit schema refresh # submit schema refresh
future = cluster.submit_schema_refresh() future = cluster.submit_schema_refresh()

View File

@@ -24,7 +24,8 @@ import six
import sys import sys
import traceback import traceback
from cassandra import AlreadyExists, OperationTimedOut, SignatureDescriptor from cassandra import AlreadyExists, OperationTimedOut, SignatureDescriptor, UserFunctionDescriptor, \
UserAggregateDescriptor
from cassandra.cluster import Cluster from cassandra.cluster import Cluster
from cassandra.cqltypes import DoubleType, Int32Type, ListType, UTF8Type, MapType from cassandra.cqltypes import DoubleType, Int32Type, ListType, UTF8Type, MapType
@@ -115,14 +116,14 @@ class SchemaMetadataTests(unittest.TestCase):
self.session.execute(recreate) self.session.execute(recreate)
def get_table_metadata(self): def get_table_metadata(self):
self.cluster.control_connection.refresh_schema() self.cluster.refresh_table_metadata(self.ksname, self.cfname)
return self.cluster.metadata.keyspaces[self.ksname].tables[self.cfname] return self.cluster.metadata.keyspaces[self.ksname].tables[self.cfname]
def test_basic_table_meta_properties(self): def test_basic_table_meta_properties(self):
create_statement = self.make_create_statement(["a"], [], ["b", "c"]) create_statement = self.make_create_statement(["a"], [], ["b", "c"])
self.session.execute(create_statement) self.session.execute(create_statement)
self.cluster.control_connection.refresh_schema() self.cluster.refresh_schema_metadata()
meta = self.cluster.metadata meta = self.cluster.metadata
self.assertNotEqual(meta.cluster_name, None) self.assertNotEqual(meta.cluster_name, None)
@@ -326,6 +327,260 @@ class SchemaMetadataTests(unittest.TestCase):
tablemeta = self.get_table_metadata() tablemeta = self.get_table_metadata()
self.assertIn("compression = {}", tablemeta.export_as_string()) self.assertIn("compression = {}", tablemeta.export_as_string())
def test_refresh_schema_metadata(self):
"""
test for synchronously refreshing all cluster metadata
test_refresh_schema_metadata tests all cluster metadata is refreshed when calling refresh_schema_metadata().
It creates a second cluster object with schema_event_refresh_window=-1 such that schema refreshes are disabled
for schema change push events. It then alters the cluster, creating a new keyspace, using the first cluster
object, and verifies that the cluster metadata has not changed in the second cluster object. It then calls
refresh_schema_metadata() and verifies that the cluster metadata is updated in the second cluster object.
Similarly, it then proceeds to altering keyspace, table, UDT, UDF, and UDA metadata and subsequently verfies
that these metadata is updated when refresh_schema_metadata() is called.
@since 2.6.0
@jira_ticket PYTHON-291
@expected_result Cluster, keyspace, table, UDT, UDF, and UDA metadata should be refreshed when refresh_schema_metadata() is called.
@test_category metadata
"""
cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1)
cluster2.connect()
self.assertNotIn("new_keyspace", cluster2.metadata.keyspaces)
# Cluster metadata modification
self.session.execute("CREATE KEYSPACE new_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}")
self.assertNotIn("new_keyspace", cluster2.metadata.keyspaces)
cluster2.refresh_schema_metadata()
self.assertIn("new_keyspace", cluster2.metadata.keyspaces)
# Keyspace metadata modification
self.session.execute("ALTER KEYSPACE {0} WITH durable_writes = false".format(self.ksname))
self.assertTrue(cluster2.metadata.keyspaces[self.ksname].durable_writes)
cluster2.refresh_schema_metadata()
self.assertFalse(cluster2.metadata.keyspaces[self.ksname].durable_writes)
# Table metadata modification
table_name = "test"
self.session.execute("CREATE TABLE {0}.{1} (a int PRIMARY KEY, b text)".format(self.ksname, table_name))
cluster2.refresh_schema_metadata()
self.session.execute("ALTER TABLE {0}.{1} ADD c double".format(self.ksname, table_name))
self.assertNotIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns)
cluster2.refresh_schema_metadata()
self.assertIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns)
if PROTOCOL_VERSION >= 3:
# UDT metadata modification
self.session.execute("CREATE TYPE {0}.user (age int, name text)".format(self.ksname))
self.assertEqual(cluster2.metadata.keyspaces[self.ksname].user_types, {})
cluster2.refresh_schema_metadata()
self.assertIn("user", cluster2.metadata.keyspaces[self.ksname].user_types)
if PROTOCOL_VERSION >= 4:
# UDF metadata modification
self.session.execute("""CREATE FUNCTION {0}.sum_int(key int, val int)
RETURNS NULL ON NULL INPUT
RETURNS int
LANGUAGE javascript AS 'key + val';""".format(self.ksname))
self.assertEqual(cluster2.metadata.keyspaces[self.ksname].functions, {})
cluster2.refresh_schema_metadata()
self.assertIn("sum_int(int,int)", cluster2.metadata.keyspaces[self.ksname].functions)
# UDA metadata modification
self.session.execute("""CREATE AGGREGATE {0}.sum_agg(int)
SFUNC sum_int
STYPE int
INITCOND 0"""
.format(self.ksname))
self.assertEqual(cluster2.metadata.keyspaces[self.ksname].aggregates, {})
cluster2.refresh_schema_metadata()
self.assertIn("sum_agg(int)", cluster2.metadata.keyspaces[self.ksname].aggregates)
# Cluster metadata modification
self.session.execute("DROP KEYSPACE new_keyspace")
self.assertIn("new_keyspace", cluster2.metadata.keyspaces)
cluster2.refresh_schema_metadata()
self.assertNotIn("new_keyspace", cluster2.metadata.keyspaces)
cluster2.shutdown()
def test_refresh_keyspace_metadata(self):
"""
test for synchronously refreshing keyspace metadata
test_refresh_keyspace_metadata tests that keyspace metadata is refreshed when calling refresh_keyspace_metadata().
It creates a second cluster object with schema_event_refresh_window=-1 such that schema refreshes are disabled
for schema change push events. It then alters the keyspace, disabling durable_writes, using the first cluster
object, and verifies that the keyspace metadata has not changed in the second cluster object. Finally, it calls
refresh_keyspace_metadata() and verifies that the keyspace metadata is updated in the second cluster object.
@since 2.6.0
@jira_ticket PYTHON-291
@expected_result Keyspace metadata should be refreshed when refresh_keyspace_metadata() is called.
@test_category metadata
"""
cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1)
cluster2.connect()
self.assertTrue(cluster2.metadata.keyspaces[self.ksname].durable_writes)
self.session.execute("ALTER KEYSPACE {0} WITH durable_writes = false".format(self.ksname))
self.assertTrue(cluster2.metadata.keyspaces[self.ksname].durable_writes)
cluster2.refresh_keyspace_metadata(self.ksname)
self.assertFalse(cluster2.metadata.keyspaces[self.ksname].durable_writes)
cluster2.shutdown()
def test_refresh_table_metatadata(self):
"""
test for synchronously refreshing table metadata
test_refresh_table_metatadata tests that table metadata is refreshed when calling test_refresh_table_metatadata().
It creates a second cluster object with schema_event_refresh_window=-1 such that schema refreshes are disabled
for schema change push events. It then alters the table, adding a new column, using the first cluster
object, and verifies that the table metadata has not changed in the second cluster object. Finally, it calls
test_refresh_table_metatadata() and verifies that the table metadata is updated in the second cluster object.
@since 2.6.0
@jira_ticket PYTHON-291
@expected_result Table metadata should be refreshed when refresh_table_metadata() is called.
@test_category metadata
"""
table_name = "test"
self.session.execute("CREATE TABLE {0}.{1} (a int PRIMARY KEY, b text)".format(self.ksname, table_name))
cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1)
cluster2.connect()
self.assertNotIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns)
self.session.execute("ALTER TABLE {0}.{1} ADD c double".format(self.ksname, table_name))
self.assertNotIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns)
cluster2.refresh_table_metadata(self.ksname, table_name)
self.assertIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns)
cluster2.shutdown()
def test_refresh_user_type_metadata(self):
"""
test for synchronously refreshing UDT metadata in keyspace
test_refresh_user_type_metadata tests that UDT metadata in a keyspace is refreshed when calling refresh_user_type_metadata().
It creates a second cluster object with schema_event_refresh_window=-1 such that schema refreshes are disabled
for schema change push events. It then alters the keyspace, creating a new UDT, using the first cluster
object, and verifies that the UDT metadata has not changed in the second cluster object. Finally, it calls
refresh_user_type_metadata() and verifies that the UDT metadata in the keyspace is updated in the second cluster object.
@since 2.6.0
@jira_ticket PYTHON-291
@expected_result UDT metadata in the keyspace should be refreshed when refresh_user_type_metadata() is called.
@test_category metadata
"""
if PROTOCOL_VERSION < 3:
raise unittest.SkipTest("Protocol 3+ is required for UDTs, currently testing against {0}".format(PROTOCOL_VERSION))
cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1)
cluster2.connect()
self.assertEqual(cluster2.metadata.keyspaces[self.ksname].user_types, {})
self.session.execute("CREATE TYPE {0}.user (age int, name text)".format(self.ksname))
self.assertEqual(cluster2.metadata.keyspaces[self.ksname].user_types, {})
cluster2.refresh_user_type_metadata(self.ksname, "user")
self.assertIn("user", cluster2.metadata.keyspaces[self.ksname].user_types)
cluster2.shutdown()
def test_refresh_user_function_metadata(self):
"""
test for synchronously refreshing UDF metadata in keyspace
test_refresh_user_function_metadata tests that UDF metadata in a keyspace is refreshed when calling
refresh_user_function_metadata(). It creates a second cluster object with schema_event_refresh_window=-1 such
that schema refreshes are disabled for schema change push events. It then alters the keyspace, creating a new
UDF, using the first cluster object, and verifies that the UDF metadata has not changed in the second cluster
object. Finally, it calls refresh_user_function_metadata() and verifies that the UDF metadata in the keyspace
is updated in the second cluster object.
@since 2.6.0
@jira_ticket PYTHON-291
@expected_result UDF metadata in the keyspace should be refreshed when refresh_user_function_metadata() is called.
@test_category metadata
"""
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest("Protocol 4+ is required for UDFs, currently testing against {0}".format(PROTOCOL_VERSION))
cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1)
cluster2.connect()
self.assertEqual(cluster2.metadata.keyspaces[self.ksname].functions, {})
self.session.execute("""CREATE FUNCTION {0}.sum_int(key int, val int)
RETURNS NULL ON NULL INPUT
RETURNS int
LANGUAGE javascript AS 'key + val';""".format(self.ksname))
self.assertEqual(cluster2.metadata.keyspaces[self.ksname].functions, {})
cluster2.refresh_user_function_metadata(self.ksname, UserFunctionDescriptor("sum_int", ["int", "int"]))
self.assertIn("sum_int(int,int)", cluster2.metadata.keyspaces[self.ksname].functions)
cluster2.shutdown()
def test_refresh_user_aggregate_metadata(self):
"""
test for synchronously refreshing UDA metadata in keyspace
test_refresh_user_aggregate_metadata tests that UDA metadata in a keyspace is refreshed when calling
refresh_user_aggregate_metadata(). It creates a second cluster object with schema_event_refresh_window=-1 such
that schema refreshes are disabled for schema change push events. It then alters the keyspace, creating a new
UDA, using the first cluster object, and verifies that the UDA metadata has not changed in the second cluster
object. Finally, it calls refresh_user_aggregate_metadata() and verifies that the UDF metadata in the keyspace
is updated in the second cluster object.
@since 2.6.0
@jira_ticket PYTHON-291
@expected_result UDA metadata in the keyspace should be refreshed when refresh_user_aggregate_metadata() is called.
@test_category metadata
"""
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest("Protocol 4+ is required for UDAs, currently testing against {0}".format(PROTOCOL_VERSION))
cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1)
cluster2.connect()
self.assertEqual(cluster2.metadata.keyspaces[self.ksname].aggregates, {})
self.session.execute("""CREATE FUNCTION {0}.sum_int(key int, val int)
RETURNS NULL ON NULL INPUT
RETURNS int
LANGUAGE javascript AS 'key + val';""".format(self.ksname))
self.session.execute("""CREATE AGGREGATE {0}.sum_agg(int)
SFUNC sum_int
STYPE int
INITCOND 0"""
.format(self.ksname))
self.assertEqual(cluster2.metadata.keyspaces[self.ksname].aggregates, {})
cluster2.refresh_user_aggregate_metadata(self.ksname, UserAggregateDescriptor("sum_agg", ["int"]))
self.assertIn("sum_agg(int)", cluster2.metadata.keyspaces[self.ksname].aggregates)
cluster2.shutdown()
class TestCodeCoverage(unittest.TestCase): class TestCodeCoverage(unittest.TestCase):
@@ -982,7 +1237,7 @@ class IndexMapTests(unittest.TestCase):
self.session.execute("DROP INDEX a_idx") self.session.execute("DROP INDEX a_idx")
# temporarily synchronously refresh the schema metadata, until CASSANDRA-9391 is merged in # temporarily synchronously refresh the schema metadata, until CASSANDRA-9391 is merged in
self.cluster.refresh_schema(self.keyspace_name, self.table_name) self.cluster.refresh_table_metadata(self.keyspace_name, self.table_name)
ks_meta = self.cluster.metadata.keyspaces[self.keyspace_name] ks_meta = self.cluster.metadata.keyspaces[self.keyspace_name]
table_meta = ks_meta.tables[self.table_name] table_meta = ks_meta.tables[self.table_name]