diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index 146859ba..bf6bb1d4 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -248,7 +248,7 @@ class ClusterTests(unittest.TestCase): original_meta = cluster.metadata.keyspaces # full schema refresh, with wait - cluster.refresh_schema() + cluster.refresh_schema_metadata() self.assertIsNot(original_meta, cluster.metadata.keyspaces) self.assertEqual(original_meta, cluster.metadata.keyspaces) @@ -262,7 +262,7 @@ class ClusterTests(unittest.TestCase): original_system_meta = original_meta['system'] # only refresh one keyspace - cluster.refresh_schema(keyspace='system') + cluster.refresh_keyspace_metadata('system') current_meta = cluster.metadata.keyspaces self.assertIs(original_meta, current_meta) current_system_meta = current_meta['system'] @@ -279,7 +279,7 @@ class ClusterTests(unittest.TestCase): original_system_schema_meta = original_system_meta.tables['schema_columnfamilies'] # only refresh one table - cluster.refresh_schema(keyspace='system', table='schema_columnfamilies') + cluster.refresh_table_metadata('system', 'schema_columnfamilies') current_meta = cluster.metadata.keyspaces current_system_meta = current_meta['system'] current_system_schema_meta = current_system_meta.tables['schema_columnfamilies'] @@ -309,7 +309,7 @@ class ClusterTests(unittest.TestCase): original_type_meta = original_test1rf_meta.user_types[type_name] # only refresh one type - cluster.refresh_schema(keyspace='test1rf', usertype=type_name) + cluster.refresh_user_type_metadata('test1rf', type_name) current_meta = cluster.metadata.keyspaces current_test1rf_meta = current_meta[keyspace_name] current_type_meta = current_test1rf_meta.user_types[type_name] @@ -345,7 +345,7 @@ class ClusterTests(unittest.TestCase): # cluster agreement wait used for refresh original_meta = c.metadata.keyspaces start_time = time.time() - self.assertRaisesRegexp(Exception, r"Schema was not refreshed.*", c.refresh_schema) + self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata) end_time = time.time() self.assertGreaterEqual(end_time - start_time, agreement_timeout) self.assertIs(original_meta, c.metadata.keyspaces) @@ -353,7 +353,7 @@ class ClusterTests(unittest.TestCase): # refresh wait overrides cluster value original_meta = c.metadata.keyspaces start_time = time.time() - c.refresh_schema(max_schema_agreement_wait=0) + c.refresh_schema_metadata(max_schema_agreement_wait=0) end_time = time.time() self.assertLess(end_time - start_time, agreement_timeout) self.assertIsNot(original_meta, c.metadata.keyspaces) @@ -373,7 +373,7 @@ class ClusterTests(unittest.TestCase): # cluster agreement wait used for refresh original_meta = c.metadata.keyspaces start_time = time.time() - c.refresh_schema() + c.refresh_schema_metadata() end_time = time.time() self.assertLess(end_time - start_time, refresh_threshold) self.assertIsNot(original_meta, c.metadata.keyspaces) @@ -382,7 +382,8 @@ class ClusterTests(unittest.TestCase): # refresh wait overrides cluster value original_meta = c.metadata.keyspaces start_time = time.time() - self.assertRaisesRegexp(Exception, r"Schema was not refreshed.*", c.refresh_schema, max_schema_agreement_wait=agreement_timeout) + self.assertRaisesRegexp(Exception, r"Schema metadata was not refreshed.*", c.refresh_schema_metadata, + max_schema_agreement_wait=agreement_timeout) end_time = time.time() self.assertGreaterEqual(end_time - start_time, agreement_timeout) self.assertIs(original_meta, c.metadata.keyspaces) @@ -566,8 +567,8 @@ class ClusterTests(unittest.TestCase): session.execute('USE system_traces') # refresh schema - cluster.refresh_schema() - cluster.refresh_schema(max_schema_agreement_wait=0) + cluster.refresh_schema_metadata() + cluster.refresh_schema_metadata(max_schema_agreement_wait=0) # submit schema refresh future = cluster.submit_schema_refresh() diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index 73ffe134..69d599f5 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -24,7 +24,8 @@ import six import sys import traceback -from cassandra import AlreadyExists, OperationTimedOut, SignatureDescriptor +from cassandra import AlreadyExists, OperationTimedOut, SignatureDescriptor, UserFunctionDescriptor, \ + UserAggregateDescriptor from cassandra.cluster import Cluster from cassandra.cqltypes import DoubleType, Int32Type, ListType, UTF8Type, MapType @@ -115,14 +116,14 @@ class SchemaMetadataTests(unittest.TestCase): self.session.execute(recreate) def get_table_metadata(self): - self.cluster.control_connection.refresh_schema() + self.cluster.refresh_table_metadata(self.ksname, self.cfname) return self.cluster.metadata.keyspaces[self.ksname].tables[self.cfname] def test_basic_table_meta_properties(self): create_statement = self.make_create_statement(["a"], [], ["b", "c"]) self.session.execute(create_statement) - self.cluster.control_connection.refresh_schema() + self.cluster.refresh_schema_metadata() meta = self.cluster.metadata self.assertNotEqual(meta.cluster_name, None) @@ -326,6 +327,260 @@ class SchemaMetadataTests(unittest.TestCase): tablemeta = self.get_table_metadata() self.assertIn("compression = {}", tablemeta.export_as_string()) + def test_refresh_schema_metadata(self): + """ + test for synchronously refreshing all cluster metadata + + test_refresh_schema_metadata tests all cluster metadata is refreshed when calling refresh_schema_metadata(). + It creates a second cluster object with schema_event_refresh_window=-1 such that schema refreshes are disabled + for schema change push events. It then alters the cluster, creating a new keyspace, using the first cluster + object, and verifies that the cluster metadata has not changed in the second cluster object. It then calls + refresh_schema_metadata() and verifies that the cluster metadata is updated in the second cluster object. + Similarly, it then proceeds to altering keyspace, table, UDT, UDF, and UDA metadata and subsequently verfies + that these metadata is updated when refresh_schema_metadata() is called. + + @since 2.6.0 + @jira_ticket PYTHON-291 + @expected_result Cluster, keyspace, table, UDT, UDF, and UDA metadata should be refreshed when refresh_schema_metadata() is called. + + @test_category metadata + """ + + cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1) + cluster2.connect() + + self.assertNotIn("new_keyspace", cluster2.metadata.keyspaces) + + # Cluster metadata modification + self.session.execute("CREATE KEYSPACE new_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}") + self.assertNotIn("new_keyspace", cluster2.metadata.keyspaces) + + cluster2.refresh_schema_metadata() + self.assertIn("new_keyspace", cluster2.metadata.keyspaces) + + # Keyspace metadata modification + self.session.execute("ALTER KEYSPACE {0} WITH durable_writes = false".format(self.ksname)) + self.assertTrue(cluster2.metadata.keyspaces[self.ksname].durable_writes) + cluster2.refresh_schema_metadata() + self.assertFalse(cluster2.metadata.keyspaces[self.ksname].durable_writes) + + # Table metadata modification + table_name = "test" + self.session.execute("CREATE TABLE {0}.{1} (a int PRIMARY KEY, b text)".format(self.ksname, table_name)) + cluster2.refresh_schema_metadata() + + self.session.execute("ALTER TABLE {0}.{1} ADD c double".format(self.ksname, table_name)) + self.assertNotIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns) + cluster2.refresh_schema_metadata() + self.assertIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns) + + if PROTOCOL_VERSION >= 3: + # UDT metadata modification + self.session.execute("CREATE TYPE {0}.user (age int, name text)".format(self.ksname)) + self.assertEqual(cluster2.metadata.keyspaces[self.ksname].user_types, {}) + cluster2.refresh_schema_metadata() + self.assertIn("user", cluster2.metadata.keyspaces[self.ksname].user_types) + + if PROTOCOL_VERSION >= 4: + # UDF metadata modification + self.session.execute("""CREATE FUNCTION {0}.sum_int(key int, val int) + RETURNS NULL ON NULL INPUT + RETURNS int + LANGUAGE javascript AS 'key + val';""".format(self.ksname)) + + self.assertEqual(cluster2.metadata.keyspaces[self.ksname].functions, {}) + cluster2.refresh_schema_metadata() + self.assertIn("sum_int(int,int)", cluster2.metadata.keyspaces[self.ksname].functions) + + # UDA metadata modification + self.session.execute("""CREATE AGGREGATE {0}.sum_agg(int) + SFUNC sum_int + STYPE int + INITCOND 0""" + .format(self.ksname)) + + self.assertEqual(cluster2.metadata.keyspaces[self.ksname].aggregates, {}) + cluster2.refresh_schema_metadata() + self.assertIn("sum_agg(int)", cluster2.metadata.keyspaces[self.ksname].aggregates) + + # Cluster metadata modification + self.session.execute("DROP KEYSPACE new_keyspace") + self.assertIn("new_keyspace", cluster2.metadata.keyspaces) + + cluster2.refresh_schema_metadata() + self.assertNotIn("new_keyspace", cluster2.metadata.keyspaces) + + cluster2.shutdown() + + def test_refresh_keyspace_metadata(self): + """ + test for synchronously refreshing keyspace metadata + + test_refresh_keyspace_metadata tests that keyspace metadata is refreshed when calling refresh_keyspace_metadata(). + It creates a second cluster object with schema_event_refresh_window=-1 such that schema refreshes are disabled + for schema change push events. It then alters the keyspace, disabling durable_writes, using the first cluster + object, and verifies that the keyspace metadata has not changed in the second cluster object. Finally, it calls + refresh_keyspace_metadata() and verifies that the keyspace metadata is updated in the second cluster object. + + @since 2.6.0 + @jira_ticket PYTHON-291 + @expected_result Keyspace metadata should be refreshed when refresh_keyspace_metadata() is called. + + @test_category metadata + """ + + cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1) + cluster2.connect() + + self.assertTrue(cluster2.metadata.keyspaces[self.ksname].durable_writes) + self.session.execute("ALTER KEYSPACE {0} WITH durable_writes = false".format(self.ksname)) + self.assertTrue(cluster2.metadata.keyspaces[self.ksname].durable_writes) + cluster2.refresh_keyspace_metadata(self.ksname) + self.assertFalse(cluster2.metadata.keyspaces[self.ksname].durable_writes) + + cluster2.shutdown() + + def test_refresh_table_metatadata(self): + """ + test for synchronously refreshing table metadata + + test_refresh_table_metatadata tests that table metadata is refreshed when calling test_refresh_table_metatadata(). + It creates a second cluster object with schema_event_refresh_window=-1 such that schema refreshes are disabled + for schema change push events. It then alters the table, adding a new column, using the first cluster + object, and verifies that the table metadata has not changed in the second cluster object. Finally, it calls + test_refresh_table_metatadata() and verifies that the table metadata is updated in the second cluster object. + + @since 2.6.0 + @jira_ticket PYTHON-291 + @expected_result Table metadata should be refreshed when refresh_table_metadata() is called. + + @test_category metadata + """ + + table_name = "test" + self.session.execute("CREATE TABLE {0}.{1} (a int PRIMARY KEY, b text)".format(self.ksname, table_name)) + + cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1) + cluster2.connect() + + self.assertNotIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns) + self.session.execute("ALTER TABLE {0}.{1} ADD c double".format(self.ksname, table_name)) + self.assertNotIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns) + + cluster2.refresh_table_metadata(self.ksname, table_name) + self.assertIn("c", cluster2.metadata.keyspaces[self.ksname].tables[table_name].columns) + + cluster2.shutdown() + + def test_refresh_user_type_metadata(self): + """ + test for synchronously refreshing UDT metadata in keyspace + + test_refresh_user_type_metadata tests that UDT metadata in a keyspace is refreshed when calling refresh_user_type_metadata(). + It creates a second cluster object with schema_event_refresh_window=-1 such that schema refreshes are disabled + for schema change push events. It then alters the keyspace, creating a new UDT, using the first cluster + object, and verifies that the UDT metadata has not changed in the second cluster object. Finally, it calls + refresh_user_type_metadata() and verifies that the UDT metadata in the keyspace is updated in the second cluster object. + + @since 2.6.0 + @jira_ticket PYTHON-291 + @expected_result UDT metadata in the keyspace should be refreshed when refresh_user_type_metadata() is called. + + @test_category metadata + """ + + if PROTOCOL_VERSION < 3: + raise unittest.SkipTest("Protocol 3+ is required for UDTs, currently testing against {0}".format(PROTOCOL_VERSION)) + + cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1) + cluster2.connect() + + self.assertEqual(cluster2.metadata.keyspaces[self.ksname].user_types, {}) + self.session.execute("CREATE TYPE {0}.user (age int, name text)".format(self.ksname)) + self.assertEqual(cluster2.metadata.keyspaces[self.ksname].user_types, {}) + + cluster2.refresh_user_type_metadata(self.ksname, "user") + self.assertIn("user", cluster2.metadata.keyspaces[self.ksname].user_types) + + cluster2.shutdown() + + def test_refresh_user_function_metadata(self): + """ + test for synchronously refreshing UDF metadata in keyspace + + test_refresh_user_function_metadata tests that UDF metadata in a keyspace is refreshed when calling + refresh_user_function_metadata(). It creates a second cluster object with schema_event_refresh_window=-1 such + that schema refreshes are disabled for schema change push events. It then alters the keyspace, creating a new + UDF, using the first cluster object, and verifies that the UDF metadata has not changed in the second cluster + object. Finally, it calls refresh_user_function_metadata() and verifies that the UDF metadata in the keyspace + is updated in the second cluster object. + + @since 2.6.0 + @jira_ticket PYTHON-291 + @expected_result UDF metadata in the keyspace should be refreshed when refresh_user_function_metadata() is called. + + @test_category metadata + """ + + if PROTOCOL_VERSION < 4: + raise unittest.SkipTest("Protocol 4+ is required for UDFs, currently testing against {0}".format(PROTOCOL_VERSION)) + + cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1) + cluster2.connect() + + self.assertEqual(cluster2.metadata.keyspaces[self.ksname].functions, {}) + self.session.execute("""CREATE FUNCTION {0}.sum_int(key int, val int) + RETURNS NULL ON NULL INPUT + RETURNS int + LANGUAGE javascript AS 'key + val';""".format(self.ksname)) + + self.assertEqual(cluster2.metadata.keyspaces[self.ksname].functions, {}) + cluster2.refresh_user_function_metadata(self.ksname, UserFunctionDescriptor("sum_int", ["int", "int"])) + self.assertIn("sum_int(int,int)", cluster2.metadata.keyspaces[self.ksname].functions) + + cluster2.shutdown() + + def test_refresh_user_aggregate_metadata(self): + """ + test for synchronously refreshing UDA metadata in keyspace + + test_refresh_user_aggregate_metadata tests that UDA metadata in a keyspace is refreshed when calling + refresh_user_aggregate_metadata(). It creates a second cluster object with schema_event_refresh_window=-1 such + that schema refreshes are disabled for schema change push events. It then alters the keyspace, creating a new + UDA, using the first cluster object, and verifies that the UDA metadata has not changed in the second cluster + object. Finally, it calls refresh_user_aggregate_metadata() and verifies that the UDF metadata in the keyspace + is updated in the second cluster object. + + @since 2.6.0 + @jira_ticket PYTHON-291 + @expected_result UDA metadata in the keyspace should be refreshed when refresh_user_aggregate_metadata() is called. + + @test_category metadata + """ + + if PROTOCOL_VERSION < 4: + raise unittest.SkipTest("Protocol 4+ is required for UDAs, currently testing against {0}".format(PROTOCOL_VERSION)) + + cluster2 = Cluster(protocol_version=PROTOCOL_VERSION, schema_event_refresh_window=-1) + cluster2.connect() + + self.assertEqual(cluster2.metadata.keyspaces[self.ksname].aggregates, {}) + self.session.execute("""CREATE FUNCTION {0}.sum_int(key int, val int) + RETURNS NULL ON NULL INPUT + RETURNS int + LANGUAGE javascript AS 'key + val';""".format(self.ksname)) + + self.session.execute("""CREATE AGGREGATE {0}.sum_agg(int) + SFUNC sum_int + STYPE int + INITCOND 0""" + .format(self.ksname)) + + self.assertEqual(cluster2.metadata.keyspaces[self.ksname].aggregates, {}) + cluster2.refresh_user_aggregate_metadata(self.ksname, UserAggregateDescriptor("sum_agg", ["int"])) + self.assertIn("sum_agg(int)", cluster2.metadata.keyspaces[self.ksname].aggregates) + + cluster2.shutdown() class TestCodeCoverage(unittest.TestCase): @@ -982,7 +1237,7 @@ class IndexMapTests(unittest.TestCase): self.session.execute("DROP INDEX a_idx") # temporarily synchronously refresh the schema metadata, until CASSANDRA-9391 is merged in - self.cluster.refresh_schema(self.keyspace_name, self.table_name) + self.cluster.refresh_table_metadata(self.keyspace_name, self.table_name) ks_meta = self.cluster.metadata.keyspaces[self.keyspace_name] table_meta = ks_meta.tables[self.table_name]