Aggregate metadata integration tests
This commit is contained in:
@@ -1154,7 +1154,7 @@ class Cluster(object):
|
||||
aggregate, max_schema_agreement_wait):
|
||||
raise Exception("Schema was not refreshed. See log for details.")
|
||||
|
||||
def submit_schema_refresh(self, keyspace=None, table=None, usertype=None, function=None):
|
||||
def submit_schema_refresh(self, keyspace=None, table=None, usertype=None, function=None, aggregate=None):
|
||||
"""
|
||||
Schedule a refresh of the internal representation of the current
|
||||
schema for this cluster. See :meth:`~.refresh_schema` for description of parameters.
|
||||
|
||||
@@ -22,11 +22,13 @@ from mock import Mock
|
||||
import six
|
||||
import sys
|
||||
|
||||
from cassandra import AlreadyExists, UserFunctionDescriptor
|
||||
from cassandra import AlreadyExists, SignatureDescriptor
|
||||
|
||||
from cassandra.cluster import Cluster
|
||||
from cassandra.cqltypes import DoubleType, Int32Type, ListType, UTF8Type, MapType
|
||||
from cassandra.encoder import Encoder
|
||||
from cassandra.metadata import (Metadata, KeyspaceMetadata, TableMetadata,
|
||||
Token, MD5Token, TokenMap, murmur3)
|
||||
Token, MD5Token, TokenMap, murmur3, Function, Aggregate)
|
||||
from cassandra.policies import SimpleConvictionPolicy
|
||||
from cassandra.pool import Host
|
||||
|
||||
@@ -929,14 +931,11 @@ class KeyspaceAlterMetadata(unittest.TestCase):
|
||||
self.assertNotEqual(original_keyspace_meta, new_keyspace_meta)
|
||||
self.assertEqual(new_keyspace_meta.durable_writes, False)
|
||||
|
||||
from cassandra.cqltypes import DoubleType
|
||||
from cassandra.metadata import Function
|
||||
|
||||
|
||||
class FunctionMetadata(unittest.TestCase):
|
||||
|
||||
keyspace_name = "functionmetadatatest"
|
||||
|
||||
class FunctionTest(unittest.TestCase):
|
||||
"""
|
||||
Base functionality for Function and Aggregate metadata test classes
|
||||
"""
|
||||
@property
|
||||
def function_name(self):
|
||||
return self._testMethodName.lower()
|
||||
@@ -947,40 +946,57 @@ class FunctionMetadata(unittest.TestCase):
|
||||
raise unittest.SkipTest("Function metadata requires native protocol version 4+")
|
||||
|
||||
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
||||
cls.keyspace_name = cls.__name__.lower()
|
||||
cls.session = cls.cluster.connect()
|
||||
cls.session.execute("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name)
|
||||
cls.session.execute("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name)
|
||||
cls.session.set_keyspace(cls.keyspace_name)
|
||||
cls.keyspace_function_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].functions
|
||||
cls.keyspace_aggregate_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].aggregates
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
cls.session.execute("DROP KEYSPACE %s" % cls.keyspace_name)
|
||||
cls.session.execute("DROP KEYSPACE IF EXISTS %s" % cls.keyspace_name)
|
||||
cls.cluster.shutdown()
|
||||
|
||||
class VerifiedFunction(object):
|
||||
def __init__(self, test_case, **function_kwargs):
|
||||
class Verified(object):
|
||||
|
||||
def __init__(self, test_case, meta_class, element_meta, **function_kwargs):
|
||||
self.test_case = test_case
|
||||
self.function_kwargs = dict(function_kwargs)
|
||||
self.meta_class = meta_class
|
||||
self.element_meta = element_meta
|
||||
|
||||
def __enter__(self):
|
||||
tc = self.test_case
|
||||
expected_meta = Function(**self.function_kwargs)
|
||||
tc.assertNotIn(expected_meta.signature, tc.keyspace_function_meta)
|
||||
expected_meta = self.meta_class(**self.function_kwargs)
|
||||
tc.assertNotIn(expected_meta.signature, self.element_meta)
|
||||
tc.session.execute(expected_meta.as_cql_query())
|
||||
tc.assertIn(expected_meta.signature, tc.keyspace_function_meta)
|
||||
tc.assertIn(expected_meta.signature, self.element_meta)
|
||||
|
||||
generated_meta = tc.keyspace_function_meta[expected_meta.signature]
|
||||
generated_meta = self.element_meta[expected_meta.signature]
|
||||
self.test_case.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query())
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
tc = self.test_case
|
||||
tc.session.execute("DROP FUNCTION %s.%s" % (tc.keyspace_name, self.signature))
|
||||
tc.assertNotIn(self.signature, tc.keyspace_function_meta)
|
||||
tc.session.execute("DROP %s %s.%s" % (self.meta_class.__name__, tc.keyspace_name, self.signature))
|
||||
tc.assertNotIn(self.signature, self.element_meta)
|
||||
|
||||
@property
|
||||
def signature(self):
|
||||
return UserFunctionDescriptor.format_signature(self.function_kwargs['name'],
|
||||
self.function_kwargs['type_signature'])
|
||||
return SignatureDescriptor.format_signature(self.function_kwargs['name'],
|
||||
self.function_kwargs['type_signature'])
|
||||
|
||||
class VerifiedFunction(Verified):
|
||||
def __init__(self, test_case, **kwargs):
|
||||
super(FunctionTest.VerifiedFunction, self).__init__(test_case, Function, test_case.keyspace_function_meta, **kwargs)
|
||||
|
||||
class VerifiedAggregate(Verified):
|
||||
def __init__(self, test_case, **kwargs):
|
||||
super(FunctionTest.VerifiedAggregate, self).__init__(test_case, Aggregate, test_case.keyspace_aggregate_meta, **kwargs)
|
||||
|
||||
|
||||
class FunctionMetadata(FunctionTest):
|
||||
|
||||
def make_function_kwargs(self, deterministic=True, called_on_null=True):
|
||||
return {'keyspace': self.keyspace_name,
|
||||
@@ -993,18 +1009,15 @@ class FunctionMetadata(unittest.TestCase):
|
||||
'is_deterministic': deterministic,
|
||||
'called_on_null_input': called_on_null}
|
||||
|
||||
def test_create_drop_function(self):
|
||||
with self.VerifiedFunction(self, **self.make_function_kwargs()):
|
||||
pass
|
||||
|
||||
def test_functions_after_udt(self):
|
||||
self.assertNotIn(self.function_name, self.keyspace_function_meta)
|
||||
|
||||
udt_name = 'udtx'
|
||||
self.session.execute("CREATE TYPE %s.%s (x int)" % (self.keyspace_name, udt_name))
|
||||
self.session.execute("CREATE TYPE %s (x int)" % udt_name)
|
||||
|
||||
# Ideally we would make a function that takes a udt type, but
|
||||
# this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen
|
||||
# https://issues.apache.org/jira/browse/CASSANDRA-9186
|
||||
# Maybe update this after release
|
||||
#kwargs = self.make_function_kwargs()
|
||||
#kwargs['type_signature'][0] = "frozen<%s>" % udt_name
|
||||
@@ -1066,3 +1079,162 @@ class FunctionMetadata(unittest.TestCase):
|
||||
with self.VerifiedFunction(self, **kwargs) as vf:
|
||||
fn_meta = self.keyspace_function_meta[vf.signature]
|
||||
self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) RETURNS NULL ON NULL INPUT RETURNS .*")
|
||||
|
||||
|
||||
class AggregateMetadata(FunctionTest):
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(AggregateMetadata, cls).setup_class()
|
||||
|
||||
cls.session.execute("""CREATE OR REPLACE FUNCTION sum_int(s int, i int)
|
||||
RETURNS NULL ON NULL INPUT
|
||||
RETURNS int
|
||||
LANGUAGE javascript AS 's + i';""")
|
||||
cls.session.execute("""CREATE OR REPLACE FUNCTION sum_int_two(s int, i int, j int)
|
||||
RETURNS NULL ON NULL INPUT
|
||||
RETURNS int
|
||||
LANGUAGE javascript AS 's + i + j';""")
|
||||
cls.session.execute("""CREATE OR REPLACE FUNCTION "List_As_String"(l list<text>)
|
||||
RETURNS NULL ON NULL INPUT
|
||||
RETURNS int
|
||||
LANGUAGE javascript AS ''''' + l';""")
|
||||
cls.session.execute("""CREATE OR REPLACE FUNCTION extend_list(s list<text>, i int)
|
||||
CALLED ON NULL INPUT
|
||||
RETURNS list<text>
|
||||
LANGUAGE java AS 'if (i != null) s.add(i.toString()); return s;';""")
|
||||
cls.session.execute("""CREATE OR REPLACE FUNCTION update_map(s map<int, int>, i int)
|
||||
RETURNS NULL ON NULL INPUT
|
||||
RETURNS map<int, int>
|
||||
LANGUAGE java AS 's.put(new Integer(i), new Integer(i)); return s;';""")
|
||||
cls.session.execute("""CREATE TABLE IF NOT EXISTS t
|
||||
(k int PRIMARY KEY, v int)""")
|
||||
for x in range(4):
|
||||
cls.session.execute("INSERT INTO t (k,v) VALUES (%s, %s)", (x, x))
|
||||
cls.session.execute("INSERT INTO t (k) VALUES (%s)", (4,))
|
||||
|
||||
def make_aggregate_kwargs(self, state_func, state_type, final_func=None, init_cond=None):
|
||||
return {'keyspace': self.keyspace_name,
|
||||
'name': self.function_name + '_aggregate',
|
||||
'type_signature': ['int'],
|
||||
'state_func': state_func,
|
||||
'state_type': state_type,
|
||||
'final_func': final_func,
|
||||
'initial_condition': init_cond,
|
||||
'return_type': "does not matter for creation"}
|
||||
|
||||
def test_return_type_meta(self):
|
||||
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=1)) as va:
|
||||
self.assertIs(self.keyspace_aggregate_meta[va.signature].return_type, Int32Type)
|
||||
|
||||
def test_init_cond(self):
|
||||
# This is required until the java driver bundled with C* is updated to support v4
|
||||
c = Cluster(protocol_version=3)
|
||||
s = c.connect(self.keyspace_name)
|
||||
|
||||
expected_values = range(4)
|
||||
|
||||
# int32
|
||||
for init_cond in (-1, 0, 1):
|
||||
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=init_cond)) as va:
|
||||
sum_res = s.execute("SELECT %s(v) AS sum FROM t" % va.function_kwargs['name'])[0].sum
|
||||
self.assertEqual(sum_res, init_cond + sum(expected_values))
|
||||
|
||||
# list<text>
|
||||
for init_cond in ([], ['1', '2']):
|
||||
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]), init_cond=init_cond)) as va:
|
||||
list_res = s.execute("SELECT %s(v) AS list_res FROM t" % va.function_kwargs['name'])[0].list_res
|
||||
self.assertListEqual(list_res[:len(init_cond)], init_cond)
|
||||
self.assertEqual(set(i for i in list_res[len(init_cond):]),
|
||||
set(str(i) for i in expected_values))
|
||||
|
||||
# map<int,int>
|
||||
expected_map_values = dict((i, i) for i in expected_values)
|
||||
expected_key_set = set(expected_values)
|
||||
for init_cond in ({}, {1: 2, 3: 4}, {5: 5}):
|
||||
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('update_map', MapType.apply_parameters([Int32Type, Int32Type]), init_cond=init_cond)) as va:
|
||||
map_res = s.execute("SELECT %s(v) AS map_res FROM t" % va.function_kwargs['name'])[0].map_res
|
||||
self.assertDictContainsSubset(expected_map_values, map_res)
|
||||
init_not_updated = dict((k, init_cond[k]) for k in set(init_cond) - expected_key_set)
|
||||
self.assertDictContainsSubset(init_not_updated, map_res)
|
||||
c.shutdown()
|
||||
|
||||
def test_aggregates_after_functions(self):
|
||||
# functions must come before functions in keyspace dump
|
||||
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]))):
|
||||
keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string()
|
||||
func_idx = keyspace_cql.find("CREATE FUNCTION")
|
||||
aggregate_idx = keyspace_cql.rfind("CREATE AGGREGATE")
|
||||
self.assertNotIn(-1, (aggregate_idx, func_idx), "AGGREGATE or FUNCTION not found in keyspace_cql: " + keyspace_cql)
|
||||
self.assertGreater(aggregate_idx, func_idx)
|
||||
|
||||
def test_same_name_diff_types(self):
|
||||
kwargs = self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=0)
|
||||
with self.VerifiedAggregate(self, **kwargs):
|
||||
kwargs['state_func'] = 'sum_int_two'
|
||||
kwargs['type_signature'] = ['int', 'int']
|
||||
with self.VerifiedAggregate(self, **kwargs):
|
||||
aggregates = [a for a in self.keyspace_aggregate_meta.values() if a.name == kwargs['name']]
|
||||
self.assertEqual(len(aggregates), 2)
|
||||
self.assertNotEqual(aggregates[0].type_signature, aggregates[1].type_signature)
|
||||
|
||||
def test_aggregates_follow_keyspace_alter(self):
|
||||
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=0)):
|
||||
original_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name]
|
||||
self.session.execute('ALTER KEYSPACE %s WITH durable_writes = false' % self.keyspace_name)
|
||||
try:
|
||||
new_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name]
|
||||
self.assertNotEqual(original_keyspace_meta, new_keyspace_meta)
|
||||
self.assertIs(original_keyspace_meta.aggregates, new_keyspace_meta.aggregates)
|
||||
finally:
|
||||
self.session.execute('ALTER KEYSPACE %s WITH durable_writes = true' % self.keyspace_name)
|
||||
|
||||
def test_cql_optional_params(self):
|
||||
kwargs = self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]))
|
||||
|
||||
# no initial condition, final func
|
||||
self.assertIsNone(kwargs['initial_condition'])
|
||||
self.assertIsNone(kwargs['final_func'])
|
||||
with self.VerifiedAggregate(self, **kwargs) as va:
|
||||
meta = self.keyspace_aggregate_meta[va.signature]
|
||||
self.assertIsNone(meta.initial_condition)
|
||||
self.assertIsNone(meta.final_func)
|
||||
cql = meta.as_cql_query()
|
||||
self.assertEqual(cql.find('INITCOND'), -1)
|
||||
self.assertEqual(cql.find('FINALFUNC'), -1)
|
||||
|
||||
# initial condition, no final func
|
||||
kwargs['initial_condition'] = ['init', 'cond']
|
||||
with self.VerifiedAggregate(self, **kwargs) as va:
|
||||
meta = self.keyspace_aggregate_meta[va.signature]
|
||||
self.assertListEqual(meta.initial_condition, kwargs['initial_condition'])
|
||||
self.assertIsNone(meta.final_func)
|
||||
cql = meta.as_cql_query()
|
||||
search_string = "INITCOND %s" % Encoder().cql_encode_all_types(kwargs['initial_condition'])
|
||||
self.assertGreater(cql.find(search_string), 0, '"%s" search string not found in cql:\n%s' % (search_string, cql))
|
||||
self.assertEqual(cql.find('FINALFUNC'), -1)
|
||||
|
||||
# no initial condition, final func
|
||||
kwargs['initial_condition'] = None
|
||||
kwargs['final_func'] = 'List_As_String'
|
||||
with self.VerifiedAggregate(self, **kwargs) as va:
|
||||
meta = self.keyspace_aggregate_meta[va.signature]
|
||||
self.assertIsNone(meta.initial_condition)
|
||||
self.assertEqual(meta.final_func, kwargs['final_func'])
|
||||
cql = meta.as_cql_query()
|
||||
self.assertEqual(cql.find('INITCOND'), -1)
|
||||
search_string = 'FINALFUNC "%s"' % kwargs['final_func']
|
||||
self.assertGreater(cql.find(search_string), 0, '"%s" search string not found in cql:\n%s' % (search_string, cql))
|
||||
|
||||
# both
|
||||
kwargs['initial_condition'] = ['init', 'cond']
|
||||
kwargs['final_func'] = 'List_As_String'
|
||||
with self.VerifiedAggregate(self, **kwargs) as va:
|
||||
meta = self.keyspace_aggregate_meta[va.signature]
|
||||
self.assertListEqual(meta.initial_condition, kwargs['initial_condition'])
|
||||
self.assertEqual(meta.final_func, kwargs['final_func'])
|
||||
cql = meta.as_cql_query()
|
||||
init_cond_idx = cql.find("INITCOND %s" % Encoder().cql_encode_all_types(kwargs['initial_condition']))
|
||||
final_func_idx = cql.find('FINALFUNC "%s"' % kwargs['final_func'])
|
||||
self.assertNotIn(-1, (init_cond_idx, final_func_idx))
|
||||
self.assertGreater(init_cond_idx, final_func_idx)
|
||||
|
||||
Reference in New Issue
Block a user