From 5e5deb17419e4eb3fdefe490c765e536c66f69a8 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 16 Apr 2015 09:46:54 -0500 Subject: [PATCH] Aggregate metadata integration tests --- cassandra/cluster.py | 2 +- tests/integration/standard/test_metadata.py | 224 +++++++++++++++++--- 2 files changed, 199 insertions(+), 27 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 8832bb8f..e8697120 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1154,7 +1154,7 @@ class Cluster(object): aggregate, max_schema_agreement_wait): raise Exception("Schema was not refreshed. See log for details.") - def submit_schema_refresh(self, keyspace=None, table=None, usertype=None, function=None): + def submit_schema_refresh(self, keyspace=None, table=None, usertype=None, function=None, aggregate=None): """ Schedule a refresh of the internal representation of the current schema for this cluster. See :meth:`~.refresh_schema` for description of parameters. diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index 05c8ba56..9c80a2ee 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -22,11 +22,13 @@ from mock import Mock import six import sys -from cassandra import AlreadyExists, UserFunctionDescriptor +from cassandra import AlreadyExists, SignatureDescriptor from cassandra.cluster import Cluster +from cassandra.cqltypes import DoubleType, Int32Type, ListType, UTF8Type, MapType +from cassandra.encoder import Encoder from cassandra.metadata import (Metadata, KeyspaceMetadata, TableMetadata, - Token, MD5Token, TokenMap, murmur3) + Token, MD5Token, TokenMap, murmur3, Function, Aggregate) from cassandra.policies import SimpleConvictionPolicy from cassandra.pool import Host @@ -929,14 +931,11 @@ class KeyspaceAlterMetadata(unittest.TestCase): self.assertNotEqual(original_keyspace_meta, new_keyspace_meta) self.assertEqual(new_keyspace_meta.durable_writes, False) -from cassandra.cqltypes import DoubleType -from cassandra.metadata import Function - - -class FunctionMetadata(unittest.TestCase): - - keyspace_name = "functionmetadatatest" +class FunctionTest(unittest.TestCase): + """ + Base functionality for Function and Aggregate metadata test classes + """ @property def function_name(self): return self._testMethodName.lower() @@ -947,40 +946,57 @@ class FunctionMetadata(unittest.TestCase): raise unittest.SkipTest("Function metadata requires native protocol version 4+") cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + cls.keyspace_name = cls.__name__.lower() cls.session = cls.cluster.connect() - cls.session.execute("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) + cls.session.execute("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) + cls.session.set_keyspace(cls.keyspace_name) cls.keyspace_function_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].functions + cls.keyspace_aggregate_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].aggregates @classmethod def teardown_class(cls): - cls.session.execute("DROP KEYSPACE %s" % cls.keyspace_name) + cls.session.execute("DROP KEYSPACE IF EXISTS %s" % cls.keyspace_name) cls.cluster.shutdown() - class VerifiedFunction(object): - def __init__(self, test_case, **function_kwargs): + class Verified(object): + + def __init__(self, test_case, meta_class, element_meta, **function_kwargs): self.test_case = test_case self.function_kwargs = dict(function_kwargs) + self.meta_class = meta_class + self.element_meta = element_meta def __enter__(self): tc = self.test_case - expected_meta = Function(**self.function_kwargs) - tc.assertNotIn(expected_meta.signature, tc.keyspace_function_meta) + expected_meta = self.meta_class(**self.function_kwargs) + tc.assertNotIn(expected_meta.signature, self.element_meta) tc.session.execute(expected_meta.as_cql_query()) - tc.assertIn(expected_meta.signature, tc.keyspace_function_meta) + tc.assertIn(expected_meta.signature, self.element_meta) - generated_meta = tc.keyspace_function_meta[expected_meta.signature] + generated_meta = self.element_meta[expected_meta.signature] self.test_case.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) return self def __exit__(self, exc_type, exc_val, exc_tb): tc = self.test_case - tc.session.execute("DROP FUNCTION %s.%s" % (tc.keyspace_name, self.signature)) - tc.assertNotIn(self.signature, tc.keyspace_function_meta) + tc.session.execute("DROP %s %s.%s" % (self.meta_class.__name__, tc.keyspace_name, self.signature)) + tc.assertNotIn(self.signature, self.element_meta) @property def signature(self): - return UserFunctionDescriptor.format_signature(self.function_kwargs['name'], - self.function_kwargs['type_signature']) + return SignatureDescriptor.format_signature(self.function_kwargs['name'], + self.function_kwargs['type_signature']) + + class VerifiedFunction(Verified): + def __init__(self, test_case, **kwargs): + super(FunctionTest.VerifiedFunction, self).__init__(test_case, Function, test_case.keyspace_function_meta, **kwargs) + + class VerifiedAggregate(Verified): + def __init__(self, test_case, **kwargs): + super(FunctionTest.VerifiedAggregate, self).__init__(test_case, Aggregate, test_case.keyspace_aggregate_meta, **kwargs) + + +class FunctionMetadata(FunctionTest): def make_function_kwargs(self, deterministic=True, called_on_null=True): return {'keyspace': self.keyspace_name, @@ -993,18 +1009,15 @@ class FunctionMetadata(unittest.TestCase): 'is_deterministic': deterministic, 'called_on_null_input': called_on_null} - def test_create_drop_function(self): - with self.VerifiedFunction(self, **self.make_function_kwargs()): - pass - def test_functions_after_udt(self): self.assertNotIn(self.function_name, self.keyspace_function_meta) udt_name = 'udtx' - self.session.execute("CREATE TYPE %s.%s (x int)" % (self.keyspace_name, udt_name)) + self.session.execute("CREATE TYPE %s (x int)" % udt_name) # Ideally we would make a function that takes a udt type, but # this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen + # https://issues.apache.org/jira/browse/CASSANDRA-9186 # Maybe update this after release #kwargs = self.make_function_kwargs() #kwargs['type_signature'][0] = "frozen<%s>" % udt_name @@ -1066,3 +1079,162 @@ class FunctionMetadata(unittest.TestCase): with self.VerifiedFunction(self, **kwargs) as vf: fn_meta = self.keyspace_function_meta[vf.signature] self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) RETURNS NULL ON NULL INPUT RETURNS .*") + + +class AggregateMetadata(FunctionTest): + + @classmethod + def setup_class(cls): + super(AggregateMetadata, cls).setup_class() + + cls.session.execute("""CREATE OR REPLACE FUNCTION sum_int(s int, i int) + RETURNS NULL ON NULL INPUT + RETURNS int + LANGUAGE javascript AS 's + i';""") + cls.session.execute("""CREATE OR REPLACE FUNCTION sum_int_two(s int, i int, j int) + RETURNS NULL ON NULL INPUT + RETURNS int + LANGUAGE javascript AS 's + i + j';""") + cls.session.execute("""CREATE OR REPLACE FUNCTION "List_As_String"(l list) + RETURNS NULL ON NULL INPUT + RETURNS int + LANGUAGE javascript AS ''''' + l';""") + cls.session.execute("""CREATE OR REPLACE FUNCTION extend_list(s list, i int) + CALLED ON NULL INPUT + RETURNS list + LANGUAGE java AS 'if (i != null) s.add(i.toString()); return s;';""") + cls.session.execute("""CREATE OR REPLACE FUNCTION update_map(s map, i int) + RETURNS NULL ON NULL INPUT + RETURNS map + LANGUAGE java AS 's.put(new Integer(i), new Integer(i)); return s;';""") + cls.session.execute("""CREATE TABLE IF NOT EXISTS t + (k int PRIMARY KEY, v int)""") + for x in range(4): + cls.session.execute("INSERT INTO t (k,v) VALUES (%s, %s)", (x, x)) + cls.session.execute("INSERT INTO t (k) VALUES (%s)", (4,)) + + def make_aggregate_kwargs(self, state_func, state_type, final_func=None, init_cond=None): + return {'keyspace': self.keyspace_name, + 'name': self.function_name + '_aggregate', + 'type_signature': ['int'], + 'state_func': state_func, + 'state_type': state_type, + 'final_func': final_func, + 'initial_condition': init_cond, + 'return_type': "does not matter for creation"} + + def test_return_type_meta(self): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=1)) as va: + self.assertIs(self.keyspace_aggregate_meta[va.signature].return_type, Int32Type) + + def test_init_cond(self): + # This is required until the java driver bundled with C* is updated to support v4 + c = Cluster(protocol_version=3) + s = c.connect(self.keyspace_name) + + expected_values = range(4) + + # int32 + for init_cond in (-1, 0, 1): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=init_cond)) as va: + sum_res = s.execute("SELECT %s(v) AS sum FROM t" % va.function_kwargs['name'])[0].sum + self.assertEqual(sum_res, init_cond + sum(expected_values)) + + # list + for init_cond in ([], ['1', '2']): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]), init_cond=init_cond)) as va: + list_res = s.execute("SELECT %s(v) AS list_res FROM t" % va.function_kwargs['name'])[0].list_res + self.assertListEqual(list_res[:len(init_cond)], init_cond) + self.assertEqual(set(i for i in list_res[len(init_cond):]), + set(str(i) for i in expected_values)) + + # map + expected_map_values = dict((i, i) for i in expected_values) + expected_key_set = set(expected_values) + for init_cond in ({}, {1: 2, 3: 4}, {5: 5}): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('update_map', MapType.apply_parameters([Int32Type, Int32Type]), init_cond=init_cond)) as va: + map_res = s.execute("SELECT %s(v) AS map_res FROM t" % va.function_kwargs['name'])[0].map_res + self.assertDictContainsSubset(expected_map_values, map_res) + init_not_updated = dict((k, init_cond[k]) for k in set(init_cond) - expected_key_set) + self.assertDictContainsSubset(init_not_updated, map_res) + c.shutdown() + + def test_aggregates_after_functions(self): + # functions must come before functions in keyspace dump + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]))): + keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string() + func_idx = keyspace_cql.find("CREATE FUNCTION") + aggregate_idx = keyspace_cql.rfind("CREATE AGGREGATE") + self.assertNotIn(-1, (aggregate_idx, func_idx), "AGGREGATE or FUNCTION not found in keyspace_cql: " + keyspace_cql) + self.assertGreater(aggregate_idx, func_idx) + + def test_same_name_diff_types(self): + kwargs = self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=0) + with self.VerifiedAggregate(self, **kwargs): + kwargs['state_func'] = 'sum_int_two' + kwargs['type_signature'] = ['int', 'int'] + with self.VerifiedAggregate(self, **kwargs): + aggregates = [a for a in self.keyspace_aggregate_meta.values() if a.name == kwargs['name']] + self.assertEqual(len(aggregates), 2) + self.assertNotEqual(aggregates[0].type_signature, aggregates[1].type_signature) + + def test_aggregates_follow_keyspace_alter(self): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=0)): + original_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name] + self.session.execute('ALTER KEYSPACE %s WITH durable_writes = false' % self.keyspace_name) + try: + new_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name] + self.assertNotEqual(original_keyspace_meta, new_keyspace_meta) + self.assertIs(original_keyspace_meta.aggregates, new_keyspace_meta.aggregates) + finally: + self.session.execute('ALTER KEYSPACE %s WITH durable_writes = true' % self.keyspace_name) + + def test_cql_optional_params(self): + kwargs = self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type])) + + # no initial condition, final func + self.assertIsNone(kwargs['initial_condition']) + self.assertIsNone(kwargs['final_func']) + with self.VerifiedAggregate(self, **kwargs) as va: + meta = self.keyspace_aggregate_meta[va.signature] + self.assertIsNone(meta.initial_condition) + self.assertIsNone(meta.final_func) + cql = meta.as_cql_query() + self.assertEqual(cql.find('INITCOND'), -1) + self.assertEqual(cql.find('FINALFUNC'), -1) + + # initial condition, no final func + kwargs['initial_condition'] = ['init', 'cond'] + with self.VerifiedAggregate(self, **kwargs) as va: + meta = self.keyspace_aggregate_meta[va.signature] + self.assertListEqual(meta.initial_condition, kwargs['initial_condition']) + self.assertIsNone(meta.final_func) + cql = meta.as_cql_query() + search_string = "INITCOND %s" % Encoder().cql_encode_all_types(kwargs['initial_condition']) + self.assertGreater(cql.find(search_string), 0, '"%s" search string not found in cql:\n%s' % (search_string, cql)) + self.assertEqual(cql.find('FINALFUNC'), -1) + + # no initial condition, final func + kwargs['initial_condition'] = None + kwargs['final_func'] = 'List_As_String' + with self.VerifiedAggregate(self, **kwargs) as va: + meta = self.keyspace_aggregate_meta[va.signature] + self.assertIsNone(meta.initial_condition) + self.assertEqual(meta.final_func, kwargs['final_func']) + cql = meta.as_cql_query() + self.assertEqual(cql.find('INITCOND'), -1) + search_string = 'FINALFUNC "%s"' % kwargs['final_func'] + self.assertGreater(cql.find(search_string), 0, '"%s" search string not found in cql:\n%s' % (search_string, cql)) + + # both + kwargs['initial_condition'] = ['init', 'cond'] + kwargs['final_func'] = 'List_As_String' + with self.VerifiedAggregate(self, **kwargs) as va: + meta = self.keyspace_aggregate_meta[va.signature] + self.assertListEqual(meta.initial_condition, kwargs['initial_condition']) + self.assertEqual(meta.final_func, kwargs['final_func']) + cql = meta.as_cql_query() + init_cond_idx = cql.find("INITCOND %s" % Encoder().cql_encode_all_types(kwargs['initial_condition'])) + final_func_idx = cql.find('FINALFUNC "%s"' % kwargs['final_func']) + self.assertNotIn(-1, (init_cond_idx, final_func_idx)) + self.assertGreater(init_cond_idx, final_func_idx)