Handle function schema_change events; basic tests
Possibly still some server-side issues to iron out
This commit is contained in:
@@ -1116,7 +1116,7 @@ class Cluster(object):
|
||||
for pool in session._pools.values():
|
||||
pool.ensure_core_connections()
|
||||
|
||||
def refresh_schema(self, keyspace=None, table=None, usertype=None, max_schema_agreement_wait=None):
|
||||
def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, max_schema_agreement_wait=None):
|
||||
"""
|
||||
Synchronously refresh the schema metadata.
|
||||
|
||||
@@ -1129,7 +1129,7 @@ class Cluster(object):
|
||||
|
||||
An Exception is raised if schema refresh fails for any reason.
|
||||
"""
|
||||
if not self.control_connection.refresh_schema(keyspace, table, usertype, max_schema_agreement_wait):
|
||||
if not self.control_connection.refresh_schema(keyspace, table, usertype, function, max_schema_agreement_wait):
|
||||
raise Exception("Schema was not refreshed. See log for details.")
|
||||
|
||||
def submit_schema_refresh(self, keyspace=None, table=None, usertype=None):
|
||||
@@ -2008,7 +2008,7 @@ class ControlConnection(object):
|
||||
self._connection.close()
|
||||
del self._connection
|
||||
|
||||
def refresh_schema(self, keyspace=None, table=None, usertype=None,
|
||||
def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None,
|
||||
schema_agreement_wait=None):
|
||||
if not self._meta_refresh_enabled:
|
||||
log.debug("[control connection] Skipping schema refresh because meta refresh is disabled")
|
||||
@@ -2016,7 +2016,7 @@ class ControlConnection(object):
|
||||
|
||||
try:
|
||||
if self._connection:
|
||||
return self._refresh_schema(self._connection, keyspace, table, usertype,
|
||||
return self._refresh_schema(self._connection, keyspace, table, usertype, function,
|
||||
schema_agreement_wait=schema_agreement_wait)
|
||||
except ReferenceError:
|
||||
pass # our weak reference to the Cluster is no good
|
||||
@@ -2025,7 +2025,7 @@ class ControlConnection(object):
|
||||
self._signal_error()
|
||||
return False
|
||||
|
||||
def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None,
|
||||
def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None, function=None,
|
||||
preloaded_results=None, schema_agreement_wait=None):
|
||||
if self._cluster.is_shutdown:
|
||||
return False
|
||||
@@ -2074,6 +2074,14 @@ class ControlConnection(object):
|
||||
log.debug("[control connection] Fetched user type info for %s.%s, rebuilding metadata", keyspace, usertype)
|
||||
types_result = dict_factory(*types_result.results) if types_result.results else {}
|
||||
self._cluster.metadata.usertype_changed(keyspace, usertype, types_result)
|
||||
elif function:
|
||||
# user defined function within this keyspace changed
|
||||
where_clause = " WHERE keyspace_name = '%s' AND function_name = '%s'" % (keyspace, function)
|
||||
functions_query = QueryMessage(query=self._SELECT_FUNCTIONS + where_clause, consistency_level=cl)
|
||||
functions_result = connection.wait_for_response(functions_query)
|
||||
log.debug("[control connection] Fetched user function info for %s.%s, rebuilding metadata", keyspace, function)
|
||||
functions_result = dict_factory(*functions_result.results) if functions_result.results else {}
|
||||
self._cluster.metadata.function_changed(keyspace, function, functions_result)
|
||||
elif keyspace:
|
||||
# only the keyspace itself changed (such as replication settings)
|
||||
where_clause = " WHERE keyspace_name = '%s'" % (keyspace,)
|
||||
@@ -2297,8 +2305,9 @@ class ControlConnection(object):
|
||||
keyspace = event.get('keyspace')
|
||||
table = event.get('table')
|
||||
usertype = event.get('type')
|
||||
function = event.get('function', event.get('aggregate'))
|
||||
delay = random() * self._schema_event_refresh_window
|
||||
self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype)
|
||||
self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype, function)
|
||||
|
||||
def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None):
|
||||
|
||||
@@ -2527,19 +2536,19 @@ class _Scheduler(object):
|
||||
exc_info=exc)
|
||||
|
||||
|
||||
def refresh_schema_and_set_result(keyspace, table, usertype, control_conn, response_future):
|
||||
def refresh_schema_and_set_result(keyspace, table, usertype, function, control_conn, response_future):
|
||||
try:
|
||||
if control_conn._meta_refresh_enabled:
|
||||
log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s",
|
||||
keyspace, table, usertype)
|
||||
control_conn._refresh_schema(response_future._connection, keyspace, table, usertype)
|
||||
log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s, Function: %s",
|
||||
keyspace, table, usertype, function)
|
||||
control_conn._refresh_schema(response_future._connection, keyspace, table, usertype, function)
|
||||
else:
|
||||
log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; "
|
||||
"Keyspace: %s; Table: %s, Type: %s", keyspace, table, usertype)
|
||||
"Keyspace: %s; Table: %s, Type: %s, Function: %s", keyspace, table, usertype, function)
|
||||
except Exception:
|
||||
log.exception("Exception refreshing schema in response to schema change:")
|
||||
response_future.session.submit(
|
||||
control_conn.refresh_schema, keyspace, table, usertype)
|
||||
control_conn.refresh_schema, keyspace, table, usertype, function)
|
||||
finally:
|
||||
response_future._set_final_result(None)
|
||||
|
||||
@@ -2724,6 +2733,7 @@ class ResponseFuture(object):
|
||||
response.results['keyspace'],
|
||||
response.results.get('table'),
|
||||
response.results.get('type'),
|
||||
response.results.get('function', response.results.get('aggregate')),
|
||||
self.session.cluster.control_connection,
|
||||
self)
|
||||
else:
|
||||
|
||||
@@ -170,6 +170,7 @@ class Metadata(object):
|
||||
if old_keyspace_meta:
|
||||
keyspace_meta.tables = old_keyspace_meta.tables
|
||||
keyspace_meta.user_types = old_keyspace_meta.user_types
|
||||
keyspace_meta.functions = old_keyspace_meta.functions
|
||||
if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy):
|
||||
self._keyspace_updated(keyspace)
|
||||
else:
|
||||
@@ -183,6 +184,14 @@ class Metadata(object):
|
||||
# the type was deleted
|
||||
self.keyspaces[keyspace].user_types.pop(name, None)
|
||||
|
||||
def function_changed(self, keyspace, name, function_results):
|
||||
if function_results:
|
||||
new_function = self._build_function(keyspace, function_results[0])
|
||||
self.keyspaces[keyspace].functions[name] = new_function
|
||||
else:
|
||||
# the function was deleted
|
||||
self.keyspaces[keyspace].functions.pop(name, None)
|
||||
|
||||
def table_changed(self, keyspace, table, cf_results, col_results, triggers_result):
|
||||
try:
|
||||
keyspace_meta = self.keyspaces[keyspace]
|
||||
|
||||
@@ -851,8 +851,8 @@ class EventMessage(_MessageType):
|
||||
target = read_string(f)
|
||||
keyspace = read_string(f)
|
||||
if target != "KEYSPACE":
|
||||
table_or_type = read_string(f)
|
||||
return {'change_type': change_type, 'keyspace': keyspace, target.lower(): table_or_type}
|
||||
target_name = read_string(f)
|
||||
return {'change_type': change_type, 'keyspace': keyspace, target.lower(): target_name}
|
||||
else:
|
||||
return {'change_type': change_type, 'keyspace': keyspace}
|
||||
else:
|
||||
|
||||
@@ -928,3 +928,83 @@ class KeyspaceAlterMetadata(unittest.TestCase):
|
||||
new_keyspace_meta = self.cluster.metadata.keyspaces[name]
|
||||
self.assertNotEqual(original_keyspace_meta, new_keyspace_meta)
|
||||
self.assertEqual(new_keyspace_meta.durable_writes, False)
|
||||
|
||||
from cassandra.cqltypes import DoubleType
|
||||
from cassandra.metadata import Function
|
||||
|
||||
|
||||
class FunctionMetadata(unittest.TestCase):
|
||||
|
||||
keyspace_name = "functionmetadatatest"
|
||||
|
||||
@property
|
||||
def function_name(self):
|
||||
return self._testMethodName.lower()
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
if PROTOCOL_VERSION < 4:
|
||||
raise unittest.SkipTest("Function metadata requires native protocol version 4+")
|
||||
|
||||
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
||||
cls.session = cls.cluster.connect()
|
||||
cls.session.execute("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name)
|
||||
cls.keyspace_function_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].functions
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
cls.session.execute("DROP KEYSPACE %s" % cls.keyspace_name)
|
||||
cls.cluster.shutdown()
|
||||
|
||||
def make_function_kwargs(self, deterministic=True, called_on_null=True):
|
||||
return {'keyspace': self.keyspace_name,
|
||||
'name': self.function_name,
|
||||
'signature': ['double', 'int'],
|
||||
'argument_names': ['d', 'i'],
|
||||
'return_type': DoubleType,
|
||||
'language': 'java',
|
||||
'body': 'return new Double(0.0);',
|
||||
'is_deterministic': deterministic,
|
||||
'called_on_null_input': called_on_null}
|
||||
|
||||
def test_create_drop_function(self):
|
||||
self.assertNotIn(self.function_name, self.keyspace_function_meta)
|
||||
|
||||
expected_meta = Function(**self.make_function_kwargs())
|
||||
self.session.execute(expected_meta.as_cql_query())
|
||||
self.assertIn(self.function_name, self.keyspace_function_meta)
|
||||
|
||||
generated_meta = self.keyspace_function_meta[self.function_name]
|
||||
self.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query())
|
||||
|
||||
self.session.execute("DROP FUNCTION %s.%s" % (self.keyspace_name, self.function_name))
|
||||
self.assertNotIn(self.function_name, self.keyspace_function_meta)
|
||||
|
||||
# TODO: this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen
|
||||
@unittest.expectedFailure
|
||||
def test_functions_after_udt(self):
|
||||
self.assertNotIn(self.function_name, self.keyspace_function_meta)
|
||||
|
||||
udt_name = 'udtx'
|
||||
self.session.execute("CREATE TYPE %s.%s (x int)" % (self.keyspace_name, udt_name))
|
||||
|
||||
# make a function that takes a udt type
|
||||
kwargs = self.make_function_kwargs()
|
||||
kwargs['signature'][0] = "frozen<%s>" % udt_name
|
||||
|
||||
expected_meta = Function(**kwargs)
|
||||
self.session.execute(expected_meta.as_cql_query())
|
||||
self.assertIn(self.function_name, self.keyspace_function_meta)
|
||||
|
||||
generated_meta = self.keyspace_function_meta[self.function_name]
|
||||
self.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query())
|
||||
|
||||
# udts must come before functions in keyspace dump
|
||||
keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string()
|
||||
type_idx = keyspace_cql.rfind("CREATE TYPE")
|
||||
func_idx = keyspace_cql.find("CREATE FUCNTION")
|
||||
self.assertNotIn(-1, (type_idx, func_idx))
|
||||
self.assertGreater(func_idx, type_idx)
|
||||
|
||||
self.session.execute("DROP FUNCTION %s.%s" % (self.keyspace_name, self.function_name))
|
||||
self.assertNotIn(self.function_name, self.keyspace_function_meta)
|
||||
|
||||
Reference in New Issue
Block a user