diff --git a/cassandra/cluster.py b/cassandra/cluster.py index e14c882c..96eb4044 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1116,7 +1116,7 @@ class Cluster(object): for pool in session._pools.values(): pool.ensure_core_connections() - def refresh_schema(self, keyspace=None, table=None, usertype=None, max_schema_agreement_wait=None): + def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, max_schema_agreement_wait=None): """ Synchronously refresh the schema metadata. @@ -1129,7 +1129,7 @@ class Cluster(object): An Exception is raised if schema refresh fails for any reason. """ - if not self.control_connection.refresh_schema(keyspace, table, usertype, max_schema_agreement_wait): + if not self.control_connection.refresh_schema(keyspace, table, usertype, function, max_schema_agreement_wait): raise Exception("Schema was not refreshed. See log for details.") def submit_schema_refresh(self, keyspace=None, table=None, usertype=None): @@ -2008,7 +2008,7 @@ class ControlConnection(object): self._connection.close() del self._connection - def refresh_schema(self, keyspace=None, table=None, usertype=None, + def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, schema_agreement_wait=None): if not self._meta_refresh_enabled: log.debug("[control connection] Skipping schema refresh because meta refresh is disabled") @@ -2016,7 +2016,7 @@ class ControlConnection(object): try: if self._connection: - return self._refresh_schema(self._connection, keyspace, table, usertype, + return self._refresh_schema(self._connection, keyspace, table, usertype, function, schema_agreement_wait=schema_agreement_wait) except ReferenceError: pass # our weak reference to the Cluster is no good @@ -2025,7 +2025,7 @@ class ControlConnection(object): self._signal_error() return False - def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None, + def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None, function=None, preloaded_results=None, schema_agreement_wait=None): if self._cluster.is_shutdown: return False @@ -2074,6 +2074,14 @@ class ControlConnection(object): log.debug("[control connection] Fetched user type info for %s.%s, rebuilding metadata", keyspace, usertype) types_result = dict_factory(*types_result.results) if types_result.results else {} self._cluster.metadata.usertype_changed(keyspace, usertype, types_result) + elif function: + # user defined function within this keyspace changed + where_clause = " WHERE keyspace_name = '%s' AND function_name = '%s'" % (keyspace, function) + functions_query = QueryMessage(query=self._SELECT_FUNCTIONS + where_clause, consistency_level=cl) + functions_result = connection.wait_for_response(functions_query) + log.debug("[control connection] Fetched user function info for %s.%s, rebuilding metadata", keyspace, function) + functions_result = dict_factory(*functions_result.results) if functions_result.results else {} + self._cluster.metadata.function_changed(keyspace, function, functions_result) elif keyspace: # only the keyspace itself changed (such as replication settings) where_clause = " WHERE keyspace_name = '%s'" % (keyspace,) @@ -2297,8 +2305,9 @@ class ControlConnection(object): keyspace = event.get('keyspace') table = event.get('table') usertype = event.get('type') + function = event.get('function', event.get('aggregate')) delay = random() * self._schema_event_refresh_window - self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype) + self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype, function) def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None): @@ -2527,19 +2536,19 @@ class _Scheduler(object): exc_info=exc) -def refresh_schema_and_set_result(keyspace, table, usertype, control_conn, response_future): +def refresh_schema_and_set_result(keyspace, table, usertype, function, control_conn, response_future): try: if control_conn._meta_refresh_enabled: - log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s", - keyspace, table, usertype) - control_conn._refresh_schema(response_future._connection, keyspace, table, usertype) + log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s, Function: %s", + keyspace, table, usertype, function) + control_conn._refresh_schema(response_future._connection, keyspace, table, usertype, function) else: log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; " - "Keyspace: %s; Table: %s, Type: %s", keyspace, table, usertype) + "Keyspace: %s; Table: %s, Type: %s, Function: %s", keyspace, table, usertype, function) except Exception: log.exception("Exception refreshing schema in response to schema change:") response_future.session.submit( - control_conn.refresh_schema, keyspace, table, usertype) + control_conn.refresh_schema, keyspace, table, usertype, function) finally: response_future._set_final_result(None) @@ -2724,6 +2733,7 @@ class ResponseFuture(object): response.results['keyspace'], response.results.get('table'), response.results.get('type'), + response.results.get('function', response.results.get('aggregate')), self.session.cluster.control_connection, self) else: diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 896b848b..d6cd8e58 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -170,6 +170,7 @@ class Metadata(object): if old_keyspace_meta: keyspace_meta.tables = old_keyspace_meta.tables keyspace_meta.user_types = old_keyspace_meta.user_types + keyspace_meta.functions = old_keyspace_meta.functions if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy): self._keyspace_updated(keyspace) else: @@ -183,6 +184,14 @@ class Metadata(object): # the type was deleted self.keyspaces[keyspace].user_types.pop(name, None) + def function_changed(self, keyspace, name, function_results): + if function_results: + new_function = self._build_function(keyspace, function_results[0]) + self.keyspaces[keyspace].functions[name] = new_function + else: + # the function was deleted + self.keyspaces[keyspace].functions.pop(name, None) + def table_changed(self, keyspace, table, cf_results, col_results, triggers_result): try: keyspace_meta = self.keyspaces[keyspace] diff --git a/cassandra/protocol.py b/cassandra/protocol.py index d81a6507..922925a5 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -851,8 +851,8 @@ class EventMessage(_MessageType): target = read_string(f) keyspace = read_string(f) if target != "KEYSPACE": - table_or_type = read_string(f) - return {'change_type': change_type, 'keyspace': keyspace, target.lower(): table_or_type} + target_name = read_string(f) + return {'change_type': change_type, 'keyspace': keyspace, target.lower(): target_name} else: return {'change_type': change_type, 'keyspace': keyspace} else: diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index e5b2eab3..367b9ee7 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -928,3 +928,83 @@ class KeyspaceAlterMetadata(unittest.TestCase): new_keyspace_meta = self.cluster.metadata.keyspaces[name] self.assertNotEqual(original_keyspace_meta, new_keyspace_meta) self.assertEqual(new_keyspace_meta.durable_writes, False) + +from cassandra.cqltypes import DoubleType +from cassandra.metadata import Function + + +class FunctionMetadata(unittest.TestCase): + + keyspace_name = "functionmetadatatest" + + @property + def function_name(self): + return self._testMethodName.lower() + + @classmethod + def setup_class(cls): + if PROTOCOL_VERSION < 4: + raise unittest.SkipTest("Function metadata requires native protocol version 4+") + + cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + cls.session = cls.cluster.connect() + cls.session.execute("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) + cls.keyspace_function_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].functions + + @classmethod + def teardown_class(cls): + cls.session.execute("DROP KEYSPACE %s" % cls.keyspace_name) + cls.cluster.shutdown() + + def make_function_kwargs(self, deterministic=True, called_on_null=True): + return {'keyspace': self.keyspace_name, + 'name': self.function_name, + 'signature': ['double', 'int'], + 'argument_names': ['d', 'i'], + 'return_type': DoubleType, + 'language': 'java', + 'body': 'return new Double(0.0);', + 'is_deterministic': deterministic, + 'called_on_null_input': called_on_null} + + def test_create_drop_function(self): + self.assertNotIn(self.function_name, self.keyspace_function_meta) + + expected_meta = Function(**self.make_function_kwargs()) + self.session.execute(expected_meta.as_cql_query()) + self.assertIn(self.function_name, self.keyspace_function_meta) + + generated_meta = self.keyspace_function_meta[self.function_name] + self.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) + + self.session.execute("DROP FUNCTION %s.%s" % (self.keyspace_name, self.function_name)) + self.assertNotIn(self.function_name, self.keyspace_function_meta) + + # TODO: this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen + @unittest.expectedFailure + def test_functions_after_udt(self): + self.assertNotIn(self.function_name, self.keyspace_function_meta) + + udt_name = 'udtx' + self.session.execute("CREATE TYPE %s.%s (x int)" % (self.keyspace_name, udt_name)) + + # make a function that takes a udt type + kwargs = self.make_function_kwargs() + kwargs['signature'][0] = "frozen<%s>" % udt_name + + expected_meta = Function(**kwargs) + self.session.execute(expected_meta.as_cql_query()) + self.assertIn(self.function_name, self.keyspace_function_meta) + + generated_meta = self.keyspace_function_meta[self.function_name] + self.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) + + # udts must come before functions in keyspace dump + keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string() + type_idx = keyspace_cql.rfind("CREATE TYPE") + func_idx = keyspace_cql.find("CREATE FUCNTION") + self.assertNotIn(-1, (type_idx, func_idx)) + self.assertGreater(func_idx, type_idx) + + self.session.execute("DROP FUNCTION %s.%s" % (self.keyspace_name, self.function_name)) + self.assertNotIn(self.function_name, self.keyspace_function_meta)