From 790374ede42634111d1c7a0fbf79f82195312145 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 13 Feb 2015 15:49:59 -0600 Subject: [PATCH 01/33] Add support for CASSANDRA-7660 This experimentally adds support for CASSANDRA-7660, which returns the indexes of bind markers that correspond to partition key columns in prepared statement response metadata. --- cassandra/cluster.py | 4 ++-- cassandra/protocol.py | 35 +++++++++++++++++++++++++++++++---- cassandra/query.py | 37 ++++++++++++++++++++----------------- 3 files changed, 53 insertions(+), 23 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index b639af69..40d414c7 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1534,13 +1534,13 @@ class Session(object): future = ResponseFuture(self, message, query=None) try: future.send_request() - query_id, column_metadata = future.result(self.default_timeout) + query_id, column_metadata, pk_indexes = future.result(self.default_timeout) except Exception: log.exception("Error preparing query:") raise prepared_statement = PreparedStatement.from_message( - query_id, column_metadata, self.cluster.metadata, query, self.keyspace, + query_id, column_metadata, pk_indexes, self.cluster.metadata, query, self.keyspace, self._protocol_version) host = future._current_host diff --git a/cassandra/protocol.py b/cassandra/protocol.py index af279ef9..f570f487 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -559,7 +559,7 @@ class ResultMessage(_MessageType): ksname = read_string(f) results = ksname elif kind == RESULT_KIND_PREPARED: - results = cls.recv_results_prepared(f, user_type_map) + results = cls.recv_results_prepared(f, protocol_version, user_type_map) elif kind == RESULT_KIND_SCHEMA_CHANGE: results = cls.recv_results_schema_change(f, protocol_version) return cls(kind, results, paging_state) @@ -578,16 +578,17 @@ class ResultMessage(_MessageType): return (paging_state, (colnames, parsed_rows)) @classmethod - def recv_results_prepared(cls, f, user_type_map): + def recv_results_prepared(cls, f, protocol_version, user_type_map): query_id = read_binary_string(f) - _, column_metadata = cls.recv_results_metadata(f, user_type_map) - return (query_id, column_metadata) + column_metadata, pk_indexes = cls.recv_prepared_metadata(f, protocol_version, user_type_map) + return (query_id, column_metadata, pk_indexes) @classmethod def recv_results_metadata(cls, f, user_type_map): flags = read_int(f) glob_tblspec = bool(flags & cls._FLAGS_GLOBAL_TABLES_SPEC) colcount = read_int(f) + if flags & cls._HAS_MORE_PAGES_FLAG: paging_state = read_binary_longstring(f) else: @@ -608,6 +609,32 @@ class ResultMessage(_MessageType): column_metadata.append((colksname, colcfname, colname, coltype)) return paging_state, column_metadata + @classmethod + def recv_prepared_metadata(cls, f, protocol_version, user_type_map): + flags = read_int(f) + glob_tblspec = bool(flags & cls._FLAGS_GLOBAL_TABLES_SPEC) + colcount = read_int(f) + pk_indexes = None + if protocol_version >= 4: + num_pk_indexes = read_int(f) + pk_indexes = [read_short(f) for _ in range(num_pk_indexes)] + + if glob_tblspec: + ksname = read_string(f) + cfname = read_string(f) + column_metadata = [] + for _ in range(colcount): + if glob_tblspec: + colksname = ksname + colcfname = cfname + else: + colksname = read_string(f) + colcfname = read_string(f) + colname = read_string(f) + coltype = cls.read_type(f, user_type_map) + column_metadata.append((colksname, colcfname, colname, coltype)) + return column_metadata, pk_indexes + @classmethod def recv_results_schema_change(cls, f, protocol_version): return EventMessage.recv_schema_change(f, protocol_version) diff --git a/cassandra/query.py b/cassandra/query.py index c159e787..0f7db27e 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -353,29 +353,32 @@ class PreparedStatement(object): self.fetch_size = fetch_size @classmethod - def from_message(cls, query_id, column_metadata, cluster_metadata, query, prepared_keyspace, protocol_version): + def from_message(cls, query_id, column_metadata, pk_indexes, cluster_metadata, query, prepared_keyspace, protocol_version): if not column_metadata: return PreparedStatement(column_metadata, query_id, None, query, prepared_keyspace, protocol_version) - partition_key_columns = None - routing_key_indexes = None + if pk_indexes: + routing_key_indexes = pk_indexes + else: + partition_key_columns = None + routing_key_indexes = None - ks_name, table_name, _, _ = column_metadata[0] - ks_meta = cluster_metadata.keyspaces.get(ks_name) - if ks_meta: - table_meta = ks_meta.tables.get(table_name) - if table_meta: - partition_key_columns = table_meta.partition_key + ks_name, table_name, _, _ = column_metadata[0] + ks_meta = cluster_metadata.keyspaces.get(ks_name) + if ks_meta: + table_meta = ks_meta.tables.get(table_name) + if table_meta: + partition_key_columns = table_meta.partition_key - # make a map of {column_name: index} for each column in the statement - statement_indexes = dict((c[2], i) for i, c in enumerate(column_metadata)) + # make a map of {column_name: index} for each column in the statement + statement_indexes = dict((c[2], i) for i, c in enumerate(column_metadata)) - # a list of which indexes in the statement correspond to partition key items - try: - routing_key_indexes = [statement_indexes[c.name] - for c in partition_key_columns] - except KeyError: # we're missing a partition key component in the prepared - pass # statement; just leave routing_key_indexes as None + # a list of which indexes in the statement correspond to partition key items + try: + routing_key_indexes = [statement_indexes[c.name] + for c in partition_key_columns] + except KeyError: # we're missing a partition key component in the prepared + pass # statement; just leave routing_key_indexes as None return PreparedStatement(column_metadata, query_id, routing_key_indexes, query, prepared_keyspace, protocol_version) From 2d5466f8a2cb384ec96abc1720c8418faa542cd3 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Mon, 9 Mar 2015 16:58:58 -0500 Subject: [PATCH 02/33] Add Read and WriteFailure error classes --- cassandra/protocol.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index ca628a43..8b22f75d 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -263,6 +263,35 @@ class ReadTimeoutErrorMessage(RequestExecutionException): return ReadTimeout(self.summary_msg(), **self.info) +class ReadFailureMessage(RequestExecutionException): + summary = "Replica(s) failed to execute read" + error_code = 0x1300 + + @staticmethod + def recv_error_info(f): + return { + 'consistency': read_consistency_level(f), + 'received_responses': read_int(f), + 'required_responses': read_int(f), + 'failures': read_int(f), + 'data_retrieved': bool(read_byte(f)), + } + + +class WriteFailureMessage(RequestExecutionException): + summary = "Replica(s) failed to execute write" + error_code = 0x1500 + + @staticmethod + def recv_error_info(f): + return { + 'consistency': read_consistency_level(f), + 'received_responses': read_int(f), + 'required_responses': read_int(f), + 'failures': read_int(f), + 'write_type': WriteType.name_to_value[read_string(f)], + } + class SyntaxException(RequestValidationException): summary = 'Syntax error in CQL query' error_code = 0x2000 From 56b8a1463c61e8f0859941b7ae62a37a55470a2d Mon Sep 17 00:00:00 2001 From: Stefania Alborghetti Date: Tue, 10 Mar 2015 11:56:13 +0800 Subject: [PATCH 03/33] Added WriteFailure and ReadFailure exceptions --- cassandra/__init__.py | 70 +++++++++++++++++++++++++++++++++++++++++++ cassandra/protocol.py | 7 +++++ 2 files changed, 77 insertions(+) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 96f08576..909eeb13 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -221,6 +221,76 @@ class WriteTimeout(Timeout): self.write_type = write_type +class Failure(Exception): + """ + Replicas sent a failure to the coordinator. + """ + + consistency = None + """ The requested :class:`ConsistencyLevel` """ + + required_responses = None + """ The number of required replica responses """ + + received_responses = None + """ + The number of replicas that responded before the coordinator timed out + the operation + """ + + failures = None + """ + The number of replicas that sent a failure message + """ + + def __init__(self, summary_message, consistency=None, required_responses=None, received_responses=None, failures=None): + self.consistency = consistency + self.required_responses = required_responses + self.received_responses = received_responses + self.failures = failures + Exception.__init__(self, summary_message + ' info=' + + repr({'consistency': consistency_value_to_name(consistency), + 'required_responses': required_responses, + 'received_responses': received_responses, + 'failures' : failures})) + + +class ReadFailure(Failure): + """ + A subclass of :exc:`Failure` for read operations. + + This indicates that the replicas sent a failure message to the coordinator. + """ + + data_retrieved = None + """ + A boolean indicating whether the requested data was retrieved + by the coordinator from any replicas before it timed out the + operation + """ + + def __init__(self, message, data_retrieved=None, **kwargs): + Failure.__init__(self, message, **kwargs) + self.data_retrieved = data_retrieved + + +class WriteFailure(Failure): + """ + A subclass of :exc:`Failure` for write operations. + + This indicates that the replicas sent a failure message to the coordinator. + """ + + write_type = None + """ + The type of write operation, enum on :class:`~cassandra.policies.WriteType` + """ + + def __init__(self, message, write_type=None, **kwargs): + Failure.__init__(self, message, **kwargs) + self.write_type = write_type + + class AlreadyExists(Exception): """ An attempt was made to create a keyspace or table that already exists. diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 8b22f75d..7f5c2cea 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -22,6 +22,7 @@ from six.moves import range import io from cassandra import (Unavailable, WriteTimeout, ReadTimeout, + WriteFailure, ReadFailure, AlreadyExists, InvalidRequest, Unauthorized, UnsupportedOperation) from cassandra.marshal import (int32_pack, int32_unpack, uint16_pack, uint16_unpack, @@ -277,6 +278,9 @@ class ReadFailureMessage(RequestExecutionException): 'data_retrieved': bool(read_byte(f)), } + def to_exception(self): + return ReadFailure(self.summary_msg(), **self.info) + class WriteFailureMessage(RequestExecutionException): summary = "Replica(s) failed to execute write" @@ -292,6 +296,9 @@ class WriteFailureMessage(RequestExecutionException): 'write_type': WriteType.name_to_value[read_string(f)], } + def to_exception(self): + return WriteFailure(self.summary_msg(), **self.info) + class SyntaxException(RequestValidationException): summary = 'Syntax error in CQL query' error_code = 0x2000 From 79c89ed67c60aed3c6e2ff67fd8469eb93873fc6 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 8 Apr 2015 11:26:33 -0500 Subject: [PATCH 04/33] Add C* 3.0/v4 to integration test default protocol --- tests/integration/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index ce3f042b..3ea397f0 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -118,7 +118,9 @@ else: log.info('Using Cassandra version: %s', CASSANDRA_VERSION) CCM_KWARGS['version'] = CASSANDRA_VERSION -if CASSANDRA_VERSION > '2.1': +if CASSANDRA_VERSION > '3.0': + default_protocol_version = 4 +elif CASSANDRA_VERSION > '2.1': default_protocol_version = 3 elif CASSANDRA_VERSION > '2.0': default_protocol_version = 2 From 37677993faee7ec4c31da651cf67b13a7b34db03 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 8 Apr 2015 12:42:51 -0500 Subject: [PATCH 05/33] Add a test for prepared statements with no column meta --- .../standard/test_prepared_statements.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/integration/standard/test_prepared_statements.py b/tests/integration/standard/test_prepared_statements.py index a75e6bc3..9240996c 100644 --- a/tests/integration/standard/test_prepared_statements.py +++ b/tests/integration/standard/test_prepared_statements.py @@ -213,6 +213,31 @@ class PreparedStatementTests(unittest.TestCase): cluster.shutdown() + def test_no_meta(self): + cluster = Cluster(protocol_version=PROTOCOL_VERSION) + session = cluster.connect() + + prepared = session.prepare( + """ + INSERT INTO test3rf.test (k, v) VALUES (0, 0) + """) + + self.assertIsInstance(prepared, PreparedStatement) + bound = prepared.bind(None) + session.execute(bound) + + prepared = session.prepare( + """ + SELECT * FROM test3rf.test WHERE k=0 + """) + self.assertIsInstance(prepared, PreparedStatement) + + bound = prepared.bind(None) + results = session.execute(bound) + self.assertEqual(results[0].v, 0) + + cluster.shutdown() + def test_none_values_dicts(self): """ Ensure binding None is handled correctly with dict bindings From 5dfc8b688d22d6fd8c309aee8e253947f3f667db Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 8 Apr 2015 15:30:49 -0500 Subject: [PATCH 06/33] Change R/W failure exception base to CoordinationFailure --- cassandra/__init__.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index c1e0bdd8..eacc29e0 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -221,7 +221,7 @@ class WriteTimeout(Timeout): self.write_type = write_type -class Failure(Exception): +class CoordinationFailure(Exception): """ Replicas sent a failure to the coordinator. """ @@ -252,12 +252,12 @@ class Failure(Exception): repr({'consistency': consistency_value_to_name(consistency), 'required_responses': required_responses, 'received_responses': received_responses, - 'failures' : failures})) + 'failures': failures})) -class ReadFailure(Failure): +class ReadFailure(CoordinationFailure): """ - A subclass of :exc:`Failure` for read operations. + A subclass of :exc:`CoordinationFailure` for read operations. This indicates that the replicas sent a failure message to the coordinator. """ @@ -270,13 +270,13 @@ class ReadFailure(Failure): """ def __init__(self, message, data_retrieved=None, **kwargs): - Failure.__init__(self, message, **kwargs) + CoordinationFailure.__init__(self, message, **kwargs) self.data_retrieved = data_retrieved -class WriteFailure(Failure): +class WriteFailure(CoordinationFailure): """ - A subclass of :exc:`Failure` for write operations. + A subclass of :exc:`CoordinationFailure` for write operations. This indicates that the replicas sent a failure message to the coordinator. """ @@ -287,7 +287,7 @@ class WriteFailure(Failure): """ def __init__(self, message, write_type=None, **kwargs): - Failure.__init__(self, message, **kwargs) + CoordinationFailure.__init__(self, message, **kwargs) self.write_type = write_type From 275a2663aa9e466b517729324d32765d17eae1bd Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 8 Apr 2015 15:31:40 -0500 Subject: [PATCH 07/33] Add FunctionFailure error message and exception --- cassandra/__init__.py | 27 +++++++++++++++++++++++++++ cassandra/protocol.py | 19 ++++++++++++++++++- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index eacc29e0..a9c7e21a 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -291,6 +291,33 @@ class WriteFailure(CoordinationFailure): self.write_type = write_type +class FunctionFailure(Exception): + """ + User Defined Function failed during execution + """ + + keyspace = None + """ + Keyspace of the function + """ + + function = None + """ + Name of the function + """ + + arg_types = None + """ + Argument types of the function + """ + + def __init__(self, summary_message, keyspace, function, arg_types): + self.keyspace = keyspace + self.function = function + self.arg_types = arg_types + Exception.__init__(self, summary_message) + + class AlreadyExists(Exception): """ An attempt was made to create a keyspace or table that already exists. diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 527aa262..bdf7ae2b 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -22,7 +22,7 @@ from six.moves import range import io from cassandra import (Unavailable, WriteTimeout, ReadTimeout, - WriteFailure, ReadFailure, + WriteFailure, ReadFailure, FunctionFailure, AlreadyExists, InvalidRequest, Unauthorized, UnsupportedOperation) from cassandra.marshal import (int32_pack, int32_unpack, uint16_pack, uint16_unpack, @@ -282,6 +282,22 @@ class ReadFailureMessage(RequestExecutionException): return ReadFailure(self.summary_msg(), **self.info) +class FunctionFailureMessage(RequestExecutionException): + summary = "User Defined Function failure" + error_code = 0x1400 + + @staticmethod + def recv_error_info(f): + return { + 'keyspace': read_string(f), + 'function': read_string(f), + 'arg_types': [read_string(f) for _ in range(read_short(f))], + } + + def to_exception(self): + return FunctionFailure(self.summary_msg(), **self.info) + + class WriteFailureMessage(RequestExecutionException): summary = "Replica(s) failed to execute write" error_code = 0x1500 @@ -299,6 +315,7 @@ class WriteFailureMessage(RequestExecutionException): def to_exception(self): return WriteFailure(self.summary_msg(), **self.info) + class SyntaxException(RequestValidationException): summary = 'Syntax error in CQL query' error_code = 0x2000 From d5626750b184047a9f2f50465fd6edbb3a1b77cc Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 8 Apr 2015 16:47:56 -0500 Subject: [PATCH 08/33] Add new request exceptions to docs --- cassandra/__init__.py | 2 +- docs/api/cassandra.rst | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index a9c7e21a..fe57b2f9 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -308,7 +308,7 @@ class FunctionFailure(Exception): arg_types = None """ - Argument types of the function + List of argument type names of the function """ def __init__(self, summary_message, keyspace, function, arg_types): diff --git a/docs/api/cassandra.rst b/docs/api/cassandra.rst index 90d23d10..fd0de0be 100644 --- a/docs/api/cassandra.rst +++ b/docs/api/cassandra.rst @@ -26,6 +26,15 @@ .. autoexception:: WriteTimeout() :members: +.. autoexception:: ReadFailure() + :members: + +.. autoexception:: WriteFailure() + :members: + +.. autoexception:: FunctionFailure() + :members: + .. autoexception:: AlreadyExists() :members: From 4136a42601ce453de51862ac9368b84e05b95c98 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 9 Apr 2015 11:58:52 -0500 Subject: [PATCH 09/33] Populate QueryTrace.client for C* 3.0+ --- cassandra/query.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cassandra/query.py b/cassandra/query.py index fc71b8f6..cbe021fe 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -761,6 +761,13 @@ class QueryTrace(object): A :class:`datetime.timedelta` measure of the duration of the query. """ + client = None + """ + The IP address of the client that issued this request + + This is only available when using Cassandra 3.0+ + """ + coordinator = None """ The IP address of the host that acted as coordinator for this request. @@ -829,6 +836,8 @@ class QueryTrace(object): self.started_at = session_row.started_at self.coordinator = session_row.coordinator self.parameters = session_row.parameters + # since C* 3.0 + self.client = getattr(session_row, 'client', None) log.debug("Attempting to fetch trace events for trace ID: %s", self.trace_id) time_spent = time.time() - start From 96d4a485ada62247796857467cbdf7db6735d9a0 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 9 Apr 2015 16:40:53 -0500 Subject: [PATCH 10/33] Retrive UDF metadata --- cassandra/cluster.py | 15 +++++- cassandra/metadata.py | 121 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 132 insertions(+), 4 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index cb04152a..e14c882c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1820,6 +1820,7 @@ class ControlConnection(object): _SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies" _SELECT_COLUMNS = "SELECT * FROM system.schema_columns" _SELECT_USERTYPES = "SELECT * FROM system.schema_usertypes" + _SELECT_FUNCTIONS = "SELECT * FROM system.schema_functions" _SELECT_TRIGGERS = "SELECT * FROM system.schema_triggers" _SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers" @@ -2088,12 +2089,14 @@ class ControlConnection(object): QueryMessage(query=self._SELECT_COLUMN_FAMILIES, consistency_level=cl), QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), QueryMessage(query=self._SELECT_USERTYPES, consistency_level=cl), + QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl), QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl) ] responses = connection.wait_for_responses(*queries, timeout=self._timeout, fail_on_error=False) (ks_success, ks_result), (cf_success, cf_result), \ (col_success, col_result), (types_success, types_result), \ + (functions_success, functions_result), \ (trigger_success, triggers_result) = responses if ks_success: @@ -2135,8 +2138,18 @@ class ControlConnection(object): else: raise types_result + # functions were introduced in Cassandra 3.0 + if functions_success: + functions_result = dict_factory(*functions_result.results) if functions_result.results else {} + else: + if isinstance(functions_result, InvalidRequest): + log.debug("[control connection] user functions table not found") + functions_result = {} + else: + raise functions_result + log.debug("[control connection] Fetched schema, rebuilding metadata") - self._cluster.metadata.rebuild_schema(ks_result, types_result, cf_result, col_result, triggers_result) + self._cluster.metadata.rebuild_schema(ks_result, types_result, functions_result, cf_result, col_result, triggers_result) return True def refresh_node_list_and_token_map(self, force_token_rebuild=False): diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 1f88d8f9..896b848b 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -19,9 +19,10 @@ from itertools import islice, cycle import json import logging import re +import six +from six.moves import zip from threading import RLock import weakref -import six murmur3 = None try: @@ -88,7 +89,8 @@ class Metadata(object): """ return "\n".join(ks.export_as_string() for ks in self.keyspaces.values()) - def rebuild_schema(self, ks_results, type_results, cf_results, col_results, triggers_result): + def rebuild_schema(self, ks_results, type_results, function_results, + cf_results, col_results, triggers_result): """ Rebuild the view of the current schema from a fresh set of rows from the system schema tables. @@ -98,6 +100,7 @@ class Metadata(object): cf_def_rows = defaultdict(list) col_def_rows = defaultdict(lambda: defaultdict(list)) usertype_rows = defaultdict(list) + fn_rows = defaultdict(list) trigger_rows = defaultdict(lambda: defaultdict(list)) for row in cf_results: @@ -111,6 +114,9 @@ class Metadata(object): for row in type_results: usertype_rows[row["keyspace_name"]].append(row) + for row in function_results: + fn_rows[row["keyspace_name"]].append(row) + for row in triggers_result: ksname = row["keyspace_name"] cfname = row["columnfamily_name"] @@ -131,6 +137,10 @@ class Metadata(object): usertype = self._build_usertype(keyspace_meta.name, usertype_row) keyspace_meta.user_types[usertype.name] = usertype + for fn_row in fn_rows.get(keyspace_meta.name, []): + fn = self._build_function(keyspace_meta.name, fn_row) + keyspace_meta.functions[fn.name] = fn + current_keyspaces.add(keyspace_meta.name) old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None) self.keyspaces[keyspace_meta.name] = keyspace_meta @@ -140,7 +150,7 @@ class Metadata(object): self._keyspace_added(keyspace_meta.name) # remove not-just-added keyspaces - removed_keyspaces = [ksname for ksname in self.keyspaces.keys() + removed_keyspaces = [name for name in self.keyspaces.keys() if ksname not in current_keyspaces] self.keyspaces = dict((name, meta) for name, meta in self.keyspaces.items() if name in current_keyspaces) @@ -215,6 +225,14 @@ class Metadata(object): return UserType(usertype_row['keyspace_name'], usertype_row['type_name'], usertype_row['field_names'], type_classes) + def _build_function(self, keyspace, function_row): + return_type = types.lookup_casstype(function_row['return_type']) + return Function(function_row['keyspace_name'], function_row['function_name'], + function_row['signature'], function_row['argument_names'], + return_type, function_row['language'], function_row['body'], + function_row['is_deterministic'], function_row.get('called_on_null_input')) + # called_on_null_input is not yet merged + def _build_table_metadata(self, keyspace_metadata, row, col_rows, trigger_rows): cfname = row["columnfamily_name"] cf_col_rows = col_rows.get(cfname, []) @@ -740,12 +758,19 @@ class KeyspaceMetadata(object): .. versionadded:: 2.1.0 """ + functions = None + """ + A map from user-defined function names to instances of :class:`~cassandra.metadata..Function`. + + .. versionadded:: 3.0.0 + """ def __init__(self, name, durable_writes, strategy_class, strategy_options): self.name = name self.durable_writes = durable_writes self.replication_strategy = ReplicationStrategy.create(strategy_class, strategy_options) self.tables = {} self.user_types = {} + self.functions = {} def export_as_string(self): """ @@ -843,6 +868,96 @@ class UserType(object): return ret +class Function(object): + """ + A user defined function, as created by ``CREATE FUNCTION`` statements. + + User-defined functions were introduced in Cassandra 3.0 + + .. versionadded:: 3.0.0 + """ + + keyspace = None + """ + The string name of the keyspace in which this function is defined + """ + + name = None + """ + The name of this function + """ + + signature = None + """ + An ordered list of the types for each argument to the function + """ + + arguemnt_names = None + """ + An ordered list of the names of each argument to the function + """ + + return_type = None + """ + Return type of the function + """ + + language = None + """ + Language of the function body + """ + + body = None + """ + Function body string + """ + + is_deterministic = None + """ + Flag indicating whether this function is deterministic + (required for functional indexes) + """ + + called_on_null_input = None + """ + Flag indicating whether this function should be called for rows with null values + (convenience function to avoid handling nulls explicitly if the result will just be null) + """ + + def __init__(self, keyspace, name, signature, argument_names, + return_type, language, body, is_deterministic, called_on_null_input): + self.keyspace = keyspace + self.name = name + self.signature = signature + self.argument_names = argument_names + self.return_type = return_type + self.language = language + self.body = body + self.is_deterministic = is_deterministic + self.called_on_null_input = called_on_null_input + + def as_cql_query(self, formatted=False): + """ + Returns a CQL query that can be used to recreate this function. + If `formatted` is set to :const:`True`, extra whitespace will + be added to make the query more readable. + """ + sep = '\n' if formatted else ' ' + keyspace = protect_name(self.keyspace) + name = protect_name(self.name) + arg_list = ', '.join(["%s %s" % (protect_name(n), t) + for n, t in zip(self.argument_names, self.signature)]) + determ = '' if self.is_deterministic else 'NON DETERMINISTIC ' + typ = self.return_type.cql_parameterized_type() + lang = self.language + body = protect_value(self.body) + + return "CREATE %(determ)sFUNCTION %(keyspace)s.%(name)s(%(arg_list)s)%(sep)s" \ + "RETURNS %(typ)s%(sep)s" \ + "LANGUAGE %(lang)s%(sep)s" \ + "AS %(body)s;" % locals() + + class TableMetadata(object): """ A representation of the schema for a single table. From 6b509718039635be9b8d3f4b3da76b4d540bda15 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 10 Apr 2015 14:20:54 -0500 Subject: [PATCH 11/33] Handle function schema_change events; basic tests Possibly still some server-side issues to iron out --- cassandra/cluster.py | 34 +++++---- cassandra/metadata.py | 9 +++ cassandra/protocol.py | 4 +- tests/integration/standard/test_metadata.py | 80 +++++++++++++++++++++ 4 files changed, 113 insertions(+), 14 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index e14c882c..96eb4044 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1116,7 +1116,7 @@ class Cluster(object): for pool in session._pools.values(): pool.ensure_core_connections() - def refresh_schema(self, keyspace=None, table=None, usertype=None, max_schema_agreement_wait=None): + def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, max_schema_agreement_wait=None): """ Synchronously refresh the schema metadata. @@ -1129,7 +1129,7 @@ class Cluster(object): An Exception is raised if schema refresh fails for any reason. """ - if not self.control_connection.refresh_schema(keyspace, table, usertype, max_schema_agreement_wait): + if not self.control_connection.refresh_schema(keyspace, table, usertype, function, max_schema_agreement_wait): raise Exception("Schema was not refreshed. See log for details.") def submit_schema_refresh(self, keyspace=None, table=None, usertype=None): @@ -2008,7 +2008,7 @@ class ControlConnection(object): self._connection.close() del self._connection - def refresh_schema(self, keyspace=None, table=None, usertype=None, + def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, schema_agreement_wait=None): if not self._meta_refresh_enabled: log.debug("[control connection] Skipping schema refresh because meta refresh is disabled") @@ -2016,7 +2016,7 @@ class ControlConnection(object): try: if self._connection: - return self._refresh_schema(self._connection, keyspace, table, usertype, + return self._refresh_schema(self._connection, keyspace, table, usertype, function, schema_agreement_wait=schema_agreement_wait) except ReferenceError: pass # our weak reference to the Cluster is no good @@ -2025,7 +2025,7 @@ class ControlConnection(object): self._signal_error() return False - def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None, + def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None, function=None, preloaded_results=None, schema_agreement_wait=None): if self._cluster.is_shutdown: return False @@ -2074,6 +2074,14 @@ class ControlConnection(object): log.debug("[control connection] Fetched user type info for %s.%s, rebuilding metadata", keyspace, usertype) types_result = dict_factory(*types_result.results) if types_result.results else {} self._cluster.metadata.usertype_changed(keyspace, usertype, types_result) + elif function: + # user defined function within this keyspace changed + where_clause = " WHERE keyspace_name = '%s' AND function_name = '%s'" % (keyspace, function) + functions_query = QueryMessage(query=self._SELECT_FUNCTIONS + where_clause, consistency_level=cl) + functions_result = connection.wait_for_response(functions_query) + log.debug("[control connection] Fetched user function info for %s.%s, rebuilding metadata", keyspace, function) + functions_result = dict_factory(*functions_result.results) if functions_result.results else {} + self._cluster.metadata.function_changed(keyspace, function, functions_result) elif keyspace: # only the keyspace itself changed (such as replication settings) where_clause = " WHERE keyspace_name = '%s'" % (keyspace,) @@ -2297,8 +2305,9 @@ class ControlConnection(object): keyspace = event.get('keyspace') table = event.get('table') usertype = event.get('type') + function = event.get('function', event.get('aggregate')) delay = random() * self._schema_event_refresh_window - self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype) + self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype, function) def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None): @@ -2527,19 +2536,19 @@ class _Scheduler(object): exc_info=exc) -def refresh_schema_and_set_result(keyspace, table, usertype, control_conn, response_future): +def refresh_schema_and_set_result(keyspace, table, usertype, function, control_conn, response_future): try: if control_conn._meta_refresh_enabled: - log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s", - keyspace, table, usertype) - control_conn._refresh_schema(response_future._connection, keyspace, table, usertype) + log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s, Function: %s", + keyspace, table, usertype, function) + control_conn._refresh_schema(response_future._connection, keyspace, table, usertype, function) else: log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; " - "Keyspace: %s; Table: %s, Type: %s", keyspace, table, usertype) + "Keyspace: %s; Table: %s, Type: %s, Function: %s", keyspace, table, usertype, function) except Exception: log.exception("Exception refreshing schema in response to schema change:") response_future.session.submit( - control_conn.refresh_schema, keyspace, table, usertype) + control_conn.refresh_schema, keyspace, table, usertype, function) finally: response_future._set_final_result(None) @@ -2724,6 +2733,7 @@ class ResponseFuture(object): response.results['keyspace'], response.results.get('table'), response.results.get('type'), + response.results.get('function', response.results.get('aggregate')), self.session.cluster.control_connection, self) else: diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 896b848b..d6cd8e58 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -170,6 +170,7 @@ class Metadata(object): if old_keyspace_meta: keyspace_meta.tables = old_keyspace_meta.tables keyspace_meta.user_types = old_keyspace_meta.user_types + keyspace_meta.functions = old_keyspace_meta.functions if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy): self._keyspace_updated(keyspace) else: @@ -183,6 +184,14 @@ class Metadata(object): # the type was deleted self.keyspaces[keyspace].user_types.pop(name, None) + def function_changed(self, keyspace, name, function_results): + if function_results: + new_function = self._build_function(keyspace, function_results[0]) + self.keyspaces[keyspace].functions[name] = new_function + else: + # the function was deleted + self.keyspaces[keyspace].functions.pop(name, None) + def table_changed(self, keyspace, table, cf_results, col_results, triggers_result): try: keyspace_meta = self.keyspaces[keyspace] diff --git a/cassandra/protocol.py b/cassandra/protocol.py index d81a6507..922925a5 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -851,8 +851,8 @@ class EventMessage(_MessageType): target = read_string(f) keyspace = read_string(f) if target != "KEYSPACE": - table_or_type = read_string(f) - return {'change_type': change_type, 'keyspace': keyspace, target.lower(): table_or_type} + target_name = read_string(f) + return {'change_type': change_type, 'keyspace': keyspace, target.lower(): target_name} else: return {'change_type': change_type, 'keyspace': keyspace} else: diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index e5b2eab3..367b9ee7 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -928,3 +928,83 @@ class KeyspaceAlterMetadata(unittest.TestCase): new_keyspace_meta = self.cluster.metadata.keyspaces[name] self.assertNotEqual(original_keyspace_meta, new_keyspace_meta) self.assertEqual(new_keyspace_meta.durable_writes, False) + +from cassandra.cqltypes import DoubleType +from cassandra.metadata import Function + + +class FunctionMetadata(unittest.TestCase): + + keyspace_name = "functionmetadatatest" + + @property + def function_name(self): + return self._testMethodName.lower() + + @classmethod + def setup_class(cls): + if PROTOCOL_VERSION < 4: + raise unittest.SkipTest("Function metadata requires native protocol version 4+") + + cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + cls.session = cls.cluster.connect() + cls.session.execute("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) + cls.keyspace_function_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].functions + + @classmethod + def teardown_class(cls): + cls.session.execute("DROP KEYSPACE %s" % cls.keyspace_name) + cls.cluster.shutdown() + + def make_function_kwargs(self, deterministic=True, called_on_null=True): + return {'keyspace': self.keyspace_name, + 'name': self.function_name, + 'signature': ['double', 'int'], + 'argument_names': ['d', 'i'], + 'return_type': DoubleType, + 'language': 'java', + 'body': 'return new Double(0.0);', + 'is_deterministic': deterministic, + 'called_on_null_input': called_on_null} + + def test_create_drop_function(self): + self.assertNotIn(self.function_name, self.keyspace_function_meta) + + expected_meta = Function(**self.make_function_kwargs()) + self.session.execute(expected_meta.as_cql_query()) + self.assertIn(self.function_name, self.keyspace_function_meta) + + generated_meta = self.keyspace_function_meta[self.function_name] + self.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) + + self.session.execute("DROP FUNCTION %s.%s" % (self.keyspace_name, self.function_name)) + self.assertNotIn(self.function_name, self.keyspace_function_meta) + + # TODO: this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen + @unittest.expectedFailure + def test_functions_after_udt(self): + self.assertNotIn(self.function_name, self.keyspace_function_meta) + + udt_name = 'udtx' + self.session.execute("CREATE TYPE %s.%s (x int)" % (self.keyspace_name, udt_name)) + + # make a function that takes a udt type + kwargs = self.make_function_kwargs() + kwargs['signature'][0] = "frozen<%s>" % udt_name + + expected_meta = Function(**kwargs) + self.session.execute(expected_meta.as_cql_query()) + self.assertIn(self.function_name, self.keyspace_function_meta) + + generated_meta = self.keyspace_function_meta[self.function_name] + self.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) + + # udts must come before functions in keyspace dump + keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string() + type_idx = keyspace_cql.rfind("CREATE TYPE") + func_idx = keyspace_cql.find("CREATE FUCNTION") + self.assertNotIn(-1, (type_idx, func_idx)) + self.assertGreater(func_idx, type_idx) + + self.session.execute("DROP FUNCTION %s.%s" % (self.keyspace_name, self.function_name)) + self.assertNotIn(self.function_name, self.keyspace_function_meta) From 1e420158a9dd6124f449205a3fe9424b86076288 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 10 Apr 2015 16:19:24 -0500 Subject: [PATCH 12/33] improve and expond function metadata tests --- tests/integration/standard/test_metadata.py | 98 +++++++++++++++------ 1 file changed, 69 insertions(+), 29 deletions(-) diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index 367b9ee7..b8df0738 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -956,6 +956,27 @@ class FunctionMetadata(unittest.TestCase): cls.session.execute("DROP KEYSPACE %s" % cls.keyspace_name) cls.cluster.shutdown() + class VerifiedFunction(object): + def __init__(self, test_case, **function_kwargs): + self.test_case = test_case + self.function_kwargs = function_kwargs + + def __enter__(self): + tc = self.test_case + expected_meta = Function(**self.function_kwargs) + tc.assertNotIn(expected_meta.name, tc.keyspace_function_meta) + tc.session.execute(expected_meta.as_cql_query()) + tc.assertIn(expected_meta.name, tc.keyspace_function_meta) + + generated_meta = tc.keyspace_function_meta[expected_meta.name] + self.test_case.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) + + def __exit__(self, exc_type, exc_val, exc_tb): + tc = self.test_case + function_name = self.function_kwargs['name'] + tc.session.execute("DROP FUNCTION %s.%s" % (tc.keyspace_name, function_name)) + tc.assertNotIn(function_name, tc.keyspace_function_meta) + def make_function_kwargs(self, deterministic=True, called_on_null=True): return {'keyspace': self.keyspace_name, 'name': self.function_name, @@ -968,43 +989,62 @@ class FunctionMetadata(unittest.TestCase): 'called_on_null_input': called_on_null} def test_create_drop_function(self): - self.assertNotIn(self.function_name, self.keyspace_function_meta) + with self.VerifiedFunction(self, **self.make_function_kwargs()): + pass - expected_meta = Function(**self.make_function_kwargs()) - self.session.execute(expected_meta.as_cql_query()) - self.assertIn(self.function_name, self.keyspace_function_meta) - - generated_meta = self.keyspace_function_meta[self.function_name] - self.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) - - self.session.execute("DROP FUNCTION %s.%s" % (self.keyspace_name, self.function_name)) - self.assertNotIn(self.function_name, self.keyspace_function_meta) - - # TODO: this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen - @unittest.expectedFailure def test_functions_after_udt(self): self.assertNotIn(self.function_name, self.keyspace_function_meta) udt_name = 'udtx' self.session.execute("CREATE TYPE %s.%s (x int)" % (self.keyspace_name, udt_name)) - # make a function that takes a udt type + # Ideally we would make a function that takes a udt type, but + # this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen + # Maybe update this after release + #kwargs = self.make_function_kwargs() + #kwargs['signature'][0] = "frozen<%s>" % udt_name + + #expected_meta = Function(**kwargs) + #with self.VerifiedFunction(self, **kwargs): + with self.VerifiedFunction(self, **self.make_function_kwargs()): + # udts must come before functions in keyspace dump + keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string() + type_idx = keyspace_cql.rfind("CREATE TYPE") + func_idx = keyspace_cql.find("CREATE FUNCTION") + self.assertNotIn(-1, (type_idx, func_idx), "TYPE or FUNCTION not found in keyspace_cql: " + keyspace_cql) + self.assertGreater(func_idx, type_idx) + + def test_functions_follow_keyspace_alter(self): + with self.VerifiedFunction(self, **self.make_function_kwargs()): + original_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name] + self.session.execute('ALTER KEYSPACE %s WITH durable_writes = false' % self.keyspace_name) + try: + new_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name] + self.assertNotEqual(original_keyspace_meta, new_keyspace_meta) + self.assertIs(original_keyspace_meta.functions, new_keyspace_meta.functions) + finally: + self.session.execute('ALTER KEYSPACE %s WITH durable_writes = true' % self.keyspace_name) + + def test_function_cql_determinism(self): kwargs = self.make_function_kwargs() - kwargs['signature'][0] = "frozen<%s>" % udt_name + kwargs['is_deterministic'] = True + with self.VerifiedFunction(self, **kwargs): + fn_meta = self.keyspace_function_meta[self.function_name] + self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*") - expected_meta = Function(**kwargs) - self.session.execute(expected_meta.as_cql_query()) - self.assertIn(self.function_name, self.keyspace_function_meta) + kwargs['is_deterministic'] = False + with self.VerifiedFunction(self, **kwargs): + fn_meta = self.keyspace_function_meta[self.function_name] + self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE NON DETERMINISTIC FUNCTION.*") - generated_meta = self.keyspace_function_meta[self.function_name] - self.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) + def test_function_cql_called_on_null(self): + kwargs = self.make_function_kwargs() + kwargs['called_on_null_input'] = True + with self.VerifiedFunction(self, **kwargs): + fn_meta = self.keyspace_function_meta[self.function_name] + self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) CALLED ON NULL INPUT RETURNS .*") - # udts must come before functions in keyspace dump - keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string() - type_idx = keyspace_cql.rfind("CREATE TYPE") - func_idx = keyspace_cql.find("CREATE FUCNTION") - self.assertNotIn(-1, (type_idx, func_idx)) - self.assertGreater(func_idx, type_idx) - - self.session.execute("DROP FUNCTION %s.%s" % (self.keyspace_name, self.function_name)) - self.assertNotIn(self.function_name, self.keyspace_function_meta) + kwargs['called_on_null_input'] = False + with self.VerifiedFunction(self, **kwargs): + fn_meta = self.keyspace_function_meta[self.function_name] + self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) NOT CALLED ON NULL INPUT RETURNS .*") From fd9df7bd58f68b6f05dd85d1d0fcad090890af75 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 10 Apr 2015 16:20:59 -0500 Subject: [PATCH 13/33] commit to called_on_null_input schema for CASSANDRA-8374 --- cassandra/metadata.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cassandra/metadata.py b/cassandra/metadata.py index d6cd8e58..abb2d60a 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -239,8 +239,7 @@ class Metadata(object): return Function(function_row['keyspace_name'], function_row['function_name'], function_row['signature'], function_row['argument_names'], return_type, function_row['language'], function_row['body'], - function_row['is_deterministic'], function_row.get('called_on_null_input')) - # called_on_null_input is not yet merged + function_row['is_deterministic'], function_row['called_on_null_input']) def _build_table_metadata(self, keyspace_metadata, row, col_rows, trigger_rows): cfname = row["columnfamily_name"] @@ -786,7 +785,10 @@ class KeyspaceMetadata(object): Returns a CQL query string that can be used to recreate the entire keyspace, including user-defined types and tables. """ - return "\n\n".join([self.as_cql_query()] + self.user_type_strings() + [t.export_as_string() for t in self.tables.values()]) + return "\n\n".join([self.as_cql_query()] + + self.user_type_strings() + + [f.as_cql_query(True) for f in self.functions.values()] + + [t.export_as_string() for t in self.tables.values()]) def as_cql_query(self): """ @@ -960,8 +962,10 @@ class Function(object): typ = self.return_type.cql_parameterized_type() lang = self.language body = protect_value(self.body) + on_null = "CALLED" if self.called_on_null_input else "RETURNS NULL" return "CREATE %(determ)sFUNCTION %(keyspace)s.%(name)s(%(arg_list)s)%(sep)s" \ + "%(on_null)s ON NULL INPUT%(sep)s" \ "RETURNS %(typ)s%(sep)s" \ "LANGUAGE %(lang)s%(sep)s" \ "AS %(body)s;" % locals() From 527c6c05bca436c6f84651a333872e5cf5ea5c8d Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 13 Apr 2015 14:19:37 -0500 Subject: [PATCH 14/33] Handle overloaded user functions with differing type signatures --- cassandra/__init__.py | 25 +++++++++++++++++++++++++ cassandra/cluster.py | 9 +++++---- cassandra/metadata.py | 21 +++++++++++++-------- cassandra/protocol.py | 13 ++++++++----- 4 files changed, 51 insertions(+), 17 deletions(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 296601cb..d1ad0afe 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -302,3 +302,28 @@ class UnsupportedOperation(Exception): for more details. """ pass + + +class UserFunctionDescriptor(object): + """ + Describes a User function or aggregate by name and argument signature + """ + + name = None + + type_signature = None + """ + Ordered list of CQL argument types + """ + + def __init__(self, name, type_signature): + self.name = name + self.type_signature = type_signature + + @property + def signature(self): + return self.format_signature(self.name, self.type_signature) + + @staticmethod + def format_signature(name, type_signature): + return "%s(%s)" % (name, ','.join(t for t in type_signature)) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 96eb4044..e1f5ec6f 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2076,10 +2076,11 @@ class ControlConnection(object): self._cluster.metadata.usertype_changed(keyspace, usertype, types_result) elif function: # user defined function within this keyspace changed - where_clause = " WHERE keyspace_name = '%s' AND function_name = '%s'" % (keyspace, function) + where_clause = " WHERE keyspace_name = '%s' AND function_name = '%s' AND signature = [%s]" \ + % (keyspace, function.name, ','.join("'%s'" % t for t in function.type_signature)) functions_query = QueryMessage(query=self._SELECT_FUNCTIONS + where_clause, consistency_level=cl) functions_result = connection.wait_for_response(functions_query) - log.debug("[control connection] Fetched user function info for %s.%s, rebuilding metadata", keyspace, function) + log.debug("[control connection] Fetched user function info for %s.%s, rebuilding metadata", keyspace, function.signature) functions_result = dict_factory(*functions_result.results) if functions_result.results else {} self._cluster.metadata.function_changed(keyspace, function, functions_result) elif keyspace: @@ -2305,7 +2306,7 @@ class ControlConnection(object): keyspace = event.get('keyspace') table = event.get('table') usertype = event.get('type') - function = event.get('function', event.get('aggregate')) + function = event.get('function') delay = random() * self._schema_event_refresh_window self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype, function) @@ -2733,7 +2734,7 @@ class ResponseFuture(object): response.results['keyspace'], response.results.get('table'), response.results.get('type'), - response.results.get('function', response.results.get('aggregate')), + response.results.get('function'), self.session.cluster.control_connection, self) else: diff --git a/cassandra/metadata.py b/cassandra/metadata.py index abb2d60a..32edd01f 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -30,6 +30,7 @@ try: except ImportError as e: pass +from cassandra import UserFunctionDescriptor import cassandra.cqltypes as types from cassandra.marshal import varint_unpack from cassandra.pool import Host @@ -139,7 +140,7 @@ class Metadata(object): for fn_row in fn_rows.get(keyspace_meta.name, []): fn = self._build_function(keyspace_meta.name, fn_row) - keyspace_meta.functions[fn.name] = fn + keyspace_meta.functions[fn.signature] = fn current_keyspaces.add(keyspace_meta.name) old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None) @@ -184,13 +185,13 @@ class Metadata(object): # the type was deleted self.keyspaces[keyspace].user_types.pop(name, None) - def function_changed(self, keyspace, name, function_results): + def function_changed(self, keyspace, function, function_results): if function_results: new_function = self._build_function(keyspace, function_results[0]) - self.keyspaces[keyspace].functions[name] = new_function + self.keyspaces[keyspace].functions[function.signature] = new_function else: # the function was deleted - self.keyspaces[keyspace].functions.pop(name, None) + self.keyspaces[keyspace].functions.pop(function.signature, None) def table_changed(self, keyspace, table, cf_results, col_results, triggers_result): try: @@ -898,7 +899,7 @@ class Function(object): The name of this function """ - signature = None + type_signature = None """ An ordered list of the types for each argument to the function """ @@ -935,11 +936,11 @@ class Function(object): (convenience function to avoid handling nulls explicitly if the result will just be null) """ - def __init__(self, keyspace, name, signature, argument_names, + def __init__(self, keyspace, name, type_signature, argument_names, return_type, language, body, is_deterministic, called_on_null_input): self.keyspace = keyspace self.name = name - self.signature = signature + self.type_signature = type_signature self.argument_names = argument_names self.return_type = return_type self.language = language @@ -957,7 +958,7 @@ class Function(object): keyspace = protect_name(self.keyspace) name = protect_name(self.name) arg_list = ', '.join(["%s %s" % (protect_name(n), t) - for n, t in zip(self.argument_names, self.signature)]) + for n, t in zip(self.argument_names, self.type_signature)]) determ = '' if self.is_deterministic else 'NON DETERMINISTIC ' typ = self.return_type.cql_parameterized_type() lang = self.language @@ -970,6 +971,10 @@ class Function(object): "LANGUAGE %(lang)s%(sep)s" \ "AS %(body)s;" % locals() + @property + def signature(self): + return UserFunctionDescriptor.format_signature(self.name, self.type_signature) + class TableMetadata(object): """ diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 922925a5..9ec114e7 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -23,7 +23,7 @@ import io from cassandra import (Unavailable, WriteTimeout, ReadTimeout, AlreadyExists, InvalidRequest, Unauthorized, - UnsupportedOperation) + UnsupportedOperation, UserFunctionDescriptor) from cassandra.marshal import (int32_pack, int32_unpack, uint16_pack, uint16_unpack, int8_pack, int8_unpack, uint64_pack, header_pack, v3_header_pack) @@ -850,15 +850,18 @@ class EventMessage(_MessageType): if protocol_version >= 3: target = read_string(f) keyspace = read_string(f) + event = {'change_type': change_type, 'keyspace': keyspace} if target != "KEYSPACE": target_name = read_string(f) - return {'change_type': change_type, 'keyspace': keyspace, target.lower(): target_name} - else: - return {'change_type': change_type, 'keyspace': keyspace} + if target in ('FUNCTION', 'AGGREGATE'): + event['function'] = UserFunctionDescriptor(target_name, [read_string(f) for _ in range(read_short(f))]) + else: + event[target.lower()] = target_name else: keyspace = read_string(f) table = read_string(f) - return {'change_type': change_type, 'keyspace': keyspace, 'table': table} + event = {'change_type': change_type, 'keyspace': keyspace, 'table': table} + return event def write_header(f, version, flags, stream_id, opcode, length): From d2a0931c7db658d590030361f1887e61d510d3fb Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 13 Apr 2015 14:22:05 -0500 Subject: [PATCH 15/33] Finalize user function meta integration tests - handle signatures - new signature overload test - proper called_on_null regex --- tests/integration/standard/test_metadata.py | 56 ++++++++++++++------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index b8df0738..05c8ba56 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -22,7 +22,7 @@ from mock import Mock import six import sys -from cassandra import AlreadyExists +from cassandra import AlreadyExists, UserFunctionDescriptor from cassandra.cluster import Cluster from cassandra.metadata import (Metadata, KeyspaceMetadata, TableMetadata, @@ -959,28 +959,33 @@ class FunctionMetadata(unittest.TestCase): class VerifiedFunction(object): def __init__(self, test_case, **function_kwargs): self.test_case = test_case - self.function_kwargs = function_kwargs + self.function_kwargs = dict(function_kwargs) def __enter__(self): tc = self.test_case expected_meta = Function(**self.function_kwargs) - tc.assertNotIn(expected_meta.name, tc.keyspace_function_meta) + tc.assertNotIn(expected_meta.signature, tc.keyspace_function_meta) tc.session.execute(expected_meta.as_cql_query()) - tc.assertIn(expected_meta.name, tc.keyspace_function_meta) + tc.assertIn(expected_meta.signature, tc.keyspace_function_meta) - generated_meta = tc.keyspace_function_meta[expected_meta.name] + generated_meta = tc.keyspace_function_meta[expected_meta.signature] self.test_case.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) + return self def __exit__(self, exc_type, exc_val, exc_tb): tc = self.test_case - function_name = self.function_kwargs['name'] - tc.session.execute("DROP FUNCTION %s.%s" % (tc.keyspace_name, function_name)) - tc.assertNotIn(function_name, tc.keyspace_function_meta) + tc.session.execute("DROP FUNCTION %s.%s" % (tc.keyspace_name, self.signature)) + tc.assertNotIn(self.signature, tc.keyspace_function_meta) + + @property + def signature(self): + return UserFunctionDescriptor.format_signature(self.function_kwargs['name'], + self.function_kwargs['type_signature']) def make_function_kwargs(self, deterministic=True, called_on_null=True): return {'keyspace': self.keyspace_name, 'name': self.function_name, - 'signature': ['double', 'int'], + 'type_signature': ['double', 'int'], 'argument_names': ['d', 'i'], 'return_type': DoubleType, 'language': 'java', @@ -1002,7 +1007,7 @@ class FunctionMetadata(unittest.TestCase): # this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen # Maybe update this after release #kwargs = self.make_function_kwargs() - #kwargs['signature'][0] = "frozen<%s>" % udt_name + #kwargs['type_signature'][0] = "frozen<%s>" % udt_name #expected_meta = Function(**kwargs) #with self.VerifiedFunction(self, **kwargs): @@ -1014,6 +1019,19 @@ class FunctionMetadata(unittest.TestCase): self.assertNotIn(-1, (type_idx, func_idx), "TYPE or FUNCTION not found in keyspace_cql: " + keyspace_cql) self.assertGreater(func_idx, type_idx) + def test_function_same_name_diff_types(self): + kwargs = self.make_function_kwargs() + with self.VerifiedFunction(self, **kwargs): + # another function: same name, different type sig. + self.assertGreater(len(kwargs['type_signature']), 1) + self.assertGreater(len(kwargs['argument_names']), 1) + kwargs['type_signature'] = kwargs['type_signature'][:1] + kwargs['argument_names'] = kwargs['argument_names'][:1] + with self.VerifiedFunction(self, **kwargs): + functions = [f for f in self.keyspace_function_meta.values() if f.name == self.function_name] + self.assertEqual(len(functions), 2) + self.assertNotEqual(functions[0].type_signature, functions[1].type_signature) + def test_functions_follow_keyspace_alter(self): with self.VerifiedFunction(self, **self.make_function_kwargs()): original_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name] @@ -1028,23 +1046,23 @@ class FunctionMetadata(unittest.TestCase): def test_function_cql_determinism(self): kwargs = self.make_function_kwargs() kwargs['is_deterministic'] = True - with self.VerifiedFunction(self, **kwargs): - fn_meta = self.keyspace_function_meta[self.function_name] + with self.VerifiedFunction(self, **kwargs) as vf: + fn_meta = self.keyspace_function_meta[vf.signature] self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*") kwargs['is_deterministic'] = False - with self.VerifiedFunction(self, **kwargs): - fn_meta = self.keyspace_function_meta[self.function_name] + with self.VerifiedFunction(self, **kwargs) as vf: + fn_meta = self.keyspace_function_meta[vf.signature] self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE NON DETERMINISTIC FUNCTION.*") def test_function_cql_called_on_null(self): kwargs = self.make_function_kwargs() kwargs['called_on_null_input'] = True - with self.VerifiedFunction(self, **kwargs): - fn_meta = self.keyspace_function_meta[self.function_name] + with self.VerifiedFunction(self, **kwargs) as vf: + fn_meta = self.keyspace_function_meta[vf.signature] self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) CALLED ON NULL INPUT RETURNS .*") kwargs['called_on_null_input'] = False - with self.VerifiedFunction(self, **kwargs): - fn_meta = self.keyspace_function_meta[self.function_name] - self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) NOT CALLED ON NULL INPUT RETURNS .*") + with self.VerifiedFunction(self, **kwargs) as vf: + fn_meta = self.keyspace_function_meta[vf.signature] + self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) RETURNS NULL ON NULL INPUT RETURNS .*") From 446a10687d0ef5b2b30482e3c37d57cf76b3a752 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 13 Apr 2015 15:12:48 -0500 Subject: [PATCH 16/33] User function API docs --- cassandra/__init__.py | 58 ++++++++++++++++++++++++------------------ cassandra/cluster.py | 31 +++++++++++++++++----- docs/api/cassandra.rst | 3 +++ 3 files changed, 61 insertions(+), 31 deletions(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index d1ad0afe..2d41a8ea 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -126,6 +126,39 @@ def consistency_value_to_name(value): return ConsistencyLevel.value_to_name[value] if value is not None else "Not Set" +class UserFunctionDescriptor(object): + """ + Describes a User function or aggregate by name and argument signature + """ + + name = None + """ + name of the function + """ + + type_signature = None + """ + Ordered list of CQL argument type name comprising the type signature + """ + + def __init__(self, name, type_signature): + self.name = name + self.type_signature = type_signature + + @property + def signature(self): + """ + function signatue string in the form 'name([type0[,type1[...]]])' + + can be used to uniquely identify overloaded function names within a keyspace + """ + return self.format_signature(self.name, self.type_signature) + + @staticmethod + def format_signature(name, type_signature): + return "%s(%s)" % (name, ','.join(t for t in type_signature)) + + class Unavailable(Exception): """ There were not enough live replicas to satisfy the requested consistency @@ -302,28 +335,3 @@ class UnsupportedOperation(Exception): for more details. """ pass - - -class UserFunctionDescriptor(object): - """ - Describes a User function or aggregate by name and argument signature - """ - - name = None - - type_signature = None - """ - Ordered list of CQL argument types - """ - - def __init__(self, name, type_signature): - self.name = name - self.type_signature = type_signature - - @property - def signature(self): - return self.format_signature(self.name, self.type_signature) - - @staticmethod - def format_signature(name, type_signature): - return "%s(%s)" % (name, ','.join(t for t in type_signature)) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index e1f5ec6f..b3172e78 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1116,9 +1116,27 @@ class Cluster(object): for pool in session._pools.values(): pool.ensure_core_connections() + def _validate_refresh_schema(self, keyspace, table, usertype, function): + if any((table, usertype, function)): + if not keyspace: + raise ValueError("keyspace is required to refresh specific sub-entity {table, usertype, function}") + if sum(1 for e in (table, usertype, function) if e) > 1: + raise ValueError("{table, usertype, function} are mutually exclusive") + def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, max_schema_agreement_wait=None): """ - Synchronously refresh the schema metadata. + Synchronously refresh all schema metadata. + + {keyspace, table, usertype} are string names of the respective entities. + ``function`` is a :class:`cassandra.UserFunctionDescriptor`. + + If none of ``{keyspace, table, usertype, function}`` are specified, the entire schema is refreshed. + + If any of ``{keyspace, table, usertype, function}`` are specified, ``keyspace`` is required. + + If only ``keyspace`` is specified, just the top-level keyspace metadata is refreshed (e.g. replication). + + The remaining arguments ``{table, usertype, function}`` are mutually exclusive -- only one may be specified. By default, the timeout for this operation is governed by :attr:`~.Cluster.max_schema_agreement_wait` and :attr:`~.Cluster.control_connection_timeout`. @@ -1129,17 +1147,18 @@ class Cluster(object): An Exception is raised if schema refresh fails for any reason. """ + self._validate_refresh_schema(keyspace, table, usertype, function) if not self.control_connection.refresh_schema(keyspace, table, usertype, function, max_schema_agreement_wait): raise Exception("Schema was not refreshed. See log for details.") - def submit_schema_refresh(self, keyspace=None, table=None, usertype=None): + def submit_schema_refresh(self, keyspace=None, table=None, usertype=None, function=None): """ Schedule a refresh of the internal representation of the current - schema for this cluster. If `keyspace` is specified, only that - keyspace will be refreshed, and likewise for `table`. + schema for this cluster. See :meth:`~.refresh_schema` for description of parameters. """ + self._validate_refresh_schema(keyspace, table, usertype, function) return self.executor.submit( - self.control_connection.refresh_schema, keyspace, table, usertype) + self.control_connection.refresh_schema, keyspace, table, usertype, function) def refresh_nodes(self): """ @@ -2030,7 +2049,7 @@ class ControlConnection(object): if self._cluster.is_shutdown: return False - assert table is None or usertype is None + assert sum(1 for arg in (table, usertype, function) if arg) <= 1 agreed = self.wait_for_schema_agreement(connection, preloaded_results=preloaded_results, diff --git a/docs/api/cassandra.rst b/docs/api/cassandra.rst index 90d23d10..91500d0f 100644 --- a/docs/api/cassandra.rst +++ b/docs/api/cassandra.rst @@ -14,6 +14,9 @@ .. autoclass:: ConsistencyLevel :members: +.. autoclass:: UserFunctionDescriptor + :members: + .. autoexception:: Unavailable() :members: From bd51fb978663cee3ec77295dad9ca3b233a69564 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 14 Apr 2015 16:51:38 -0500 Subject: [PATCH 17/33] Add meta handling for AGGREGATE --- cassandra/__init__.py | 47 ++++++++++----- cassandra/cluster.py | 80 +++++++++++++++++-------- cassandra/metadata.py | 136 ++++++++++++++++++++++++++++++++++++++++-- cassandra/protocol.py | 7 ++- 4 files changed, 223 insertions(+), 47 deletions(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 2d41a8ea..10fd6158 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -126,20 +126,7 @@ def consistency_value_to_name(value): return ConsistencyLevel.value_to_name[value] if value is not None else "Not Set" -class UserFunctionDescriptor(object): - """ - Describes a User function or aggregate by name and argument signature - """ - - name = None - """ - name of the function - """ - - type_signature = None - """ - Ordered list of CQL argument type name comprising the type signature - """ +class SignatureDescriptor(object): def __init__(self, name, type_signature): self.name = name @@ -159,6 +146,38 @@ class UserFunctionDescriptor(object): return "%s(%s)" % (name, ','.join(t for t in type_signature)) +class UserFunctionDescriptor(SignatureDescriptor): + """ + Describes a User function by name and argument signature + """ + + name = None + """ + name of the function + """ + + type_signature = None + """ + Ordered list of CQL argument type name comprising the type signature + """ + + +class UserAggregateDescriptor(SignatureDescriptor): + """ + Describes a User aggregate function by name and argument signature + """ + + name = None + """ + name of the aggregate + """ + + type_signature = None + """ + Ordered list of CQL argument type name comprising the type signature + """ + + class Unavailable(Exception): """ There were not enough live replicas to satisfy the requested consistency diff --git a/cassandra/cluster.py b/cassandra/cluster.py index b3172e78..8832bb8f 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1116,27 +1116,29 @@ class Cluster(object): for pool in session._pools.values(): pool.ensure_core_connections() - def _validate_refresh_schema(self, keyspace, table, usertype, function): - if any((table, usertype, function)): + def _validate_refresh_schema(self, keyspace, table, usertype, function, aggregate): + if any((table, usertype, function, aggregate)): if not keyspace: - raise ValueError("keyspace is required to refresh specific sub-entity {table, usertype, function}") + raise ValueError("keyspace is required to refresh specific sub-entity {table, usertype, function, aggregate}") if sum(1 for e in (table, usertype, function) if e) > 1: - raise ValueError("{table, usertype, function} are mutually exclusive") + raise ValueError("{table, usertype, function, aggregate} are mutually exclusive") - def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, max_schema_agreement_wait=None): + def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, aggregate=None, max_schema_agreement_wait=None): """ - Synchronously refresh all schema metadata. + Synchronously refresh schema metadata. {keyspace, table, usertype} are string names of the respective entities. ``function`` is a :class:`cassandra.UserFunctionDescriptor`. + ``aggregate`` is a :class:`cassandra.UserAggregateDescriptor`. - If none of ``{keyspace, table, usertype, function}`` are specified, the entire schema is refreshed. + If none of ``{keyspace, table, usertype, function, aggregate}`` are specified, the entire schema is refreshed. - If any of ``{keyspace, table, usertype, function}`` are specified, ``keyspace`` is required. + If any of ``{keyspace, table, usertype, function, aggregate}`` are specified, ``keyspace`` is required. If only ``keyspace`` is specified, just the top-level keyspace metadata is refreshed (e.g. replication). - The remaining arguments ``{table, usertype, function}`` are mutually exclusive -- only one may be specified. + The remaining arguments ``{table, usertype, function, aggregate}`` + are mutually exclusive -- only one may be specified. By default, the timeout for this operation is governed by :attr:`~.Cluster.max_schema_agreement_wait` and :attr:`~.Cluster.control_connection_timeout`. @@ -1147,8 +1149,9 @@ class Cluster(object): An Exception is raised if schema refresh fails for any reason. """ - self._validate_refresh_schema(keyspace, table, usertype, function) - if not self.control_connection.refresh_schema(keyspace, table, usertype, function, max_schema_agreement_wait): + self._validate_refresh_schema(keyspace, table, usertype, function, aggregate) + if not self.control_connection.refresh_schema(keyspace, table, usertype, function, + aggregate, max_schema_agreement_wait): raise Exception("Schema was not refreshed. See log for details.") def submit_schema_refresh(self, keyspace=None, table=None, usertype=None, function=None): @@ -1156,9 +1159,9 @@ class Cluster(object): Schedule a refresh of the internal representation of the current schema for this cluster. See :meth:`~.refresh_schema` for description of parameters. """ - self._validate_refresh_schema(keyspace, table, usertype, function) + self._validate_refresh_schema(keyspace, table, usertype, function, aggregate) return self.executor.submit( - self.control_connection.refresh_schema, keyspace, table, usertype, function) + self.control_connection.refresh_schema, keyspace, table, usertype, function, aggregate) def refresh_nodes(self): """ @@ -1840,6 +1843,7 @@ class ControlConnection(object): _SELECT_COLUMNS = "SELECT * FROM system.schema_columns" _SELECT_USERTYPES = "SELECT * FROM system.schema_usertypes" _SELECT_FUNCTIONS = "SELECT * FROM system.schema_functions" + _SELECT_AGGREGATES = "SELECT * FROM system.schema_aggregates" _SELECT_TRIGGERS = "SELECT * FROM system.schema_triggers" _SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers" @@ -2028,7 +2032,7 @@ class ControlConnection(object): del self._connection def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, - schema_agreement_wait=None): + aggregate=None, schema_agreement_wait=None): if not self._meta_refresh_enabled: log.debug("[control connection] Skipping schema refresh because meta refresh is disabled") return False @@ -2036,7 +2040,7 @@ class ControlConnection(object): try: if self._connection: return self._refresh_schema(self._connection, keyspace, table, usertype, function, - schema_agreement_wait=schema_agreement_wait) + aggregate, schema_agreement_wait=schema_agreement_wait) except ReferenceError: pass # our weak reference to the Cluster is no good except Exception: @@ -2045,11 +2049,11 @@ class ControlConnection(object): return False def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None, function=None, - preloaded_results=None, schema_agreement_wait=None): + aggregate=None, preloaded_results=None, schema_agreement_wait=None): if self._cluster.is_shutdown: return False - assert sum(1 for arg in (table, usertype, function) if arg) <= 1 + assert sum(1 for arg in (table, usertype, function, aggregate) if arg) <= 1 agreed = self.wait_for_schema_agreement(connection, preloaded_results=preloaded_results, @@ -2102,6 +2106,15 @@ class ControlConnection(object): log.debug("[control connection] Fetched user function info for %s.%s, rebuilding metadata", keyspace, function.signature) functions_result = dict_factory(*functions_result.results) if functions_result.results else {} self._cluster.metadata.function_changed(keyspace, function, functions_result) + elif aggregate: + # user defined aggregate within this keyspace changed + where_clause = " WHERE keyspace_name = '%s' AND aggregate_name = '%s' AND signature = [%s]" \ + % (keyspace, aggregate.name, ','.join("'%s'" % t for t in aggregate.type_signature)) + aggregates_query = QueryMessage(query=self._SELECT_AGGREGATES + where_clause, consistency_level=cl) + aggregates_result = connection.wait_for_response(aggregates_query) + log.debug("[control connection] Fetched user aggregate info for %s.%s, rebuilding metadata", keyspace, aggregate.signature) + aggregates_result = dict_factory(*aggregates_result.results) if aggregates_result.results else {} + self._cluster.metadata.aggregate_changed(keyspace, aggregate, aggregates_result) elif keyspace: # only the keyspace itself changed (such as replication settings) where_clause = " WHERE keyspace_name = '%s'" % (keyspace,) @@ -2118,6 +2131,7 @@ class ControlConnection(object): QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), QueryMessage(query=self._SELECT_USERTYPES, consistency_level=cl), QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl), + QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl), QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl) ] @@ -2125,6 +2139,7 @@ class ControlConnection(object): (ks_success, ks_result), (cf_success, cf_result), \ (col_success, col_result), (types_success, types_result), \ (functions_success, functions_result), \ + (aggregates_success, aggregates_result), \ (trigger_success, triggers_result) = responses if ks_success: @@ -2176,8 +2191,19 @@ class ControlConnection(object): else: raise functions_result + # aggregates were introduced in Cassandra 3.0 + if aggregates_success: + aggregates_result = dict_factory(*aggregates_result.results) if aggregates_result.results else {} + else: + if isinstance(aggregates_result, InvalidRequest): + log.debug("[control connection] user aggregates table not found") + aggregates_result = {} + else: + raise aggregates_result + log.debug("[control connection] Fetched schema, rebuilding metadata") - self._cluster.metadata.rebuild_schema(ks_result, types_result, functions_result, cf_result, col_result, triggers_result) + self._cluster.metadata.rebuild_schema(ks_result, types_result, functions_result, + aggregates_result, cf_result, col_result, triggers_result) return True def refresh_node_list_and_token_map(self, force_token_rebuild=False): @@ -2321,13 +2347,13 @@ class ControlConnection(object): def _handle_schema_change(self, event): if self._schema_event_refresh_window < 0: return - keyspace = event.get('keyspace') table = event.get('table') usertype = event.get('type') function = event.get('function') + aggregate = event.get('aggregate') delay = random() * self._schema_event_refresh_window - self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype, function) + self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype, function, aggregate) def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None): @@ -2556,19 +2582,20 @@ class _Scheduler(object): exc_info=exc) -def refresh_schema_and_set_result(keyspace, table, usertype, function, control_conn, response_future): +def refresh_schema_and_set_result(keyspace, table, usertype, function, aggregate, control_conn, response_future): try: if control_conn._meta_refresh_enabled: - log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s, Function: %s", - keyspace, table, usertype, function) - control_conn._refresh_schema(response_future._connection, keyspace, table, usertype, function) + log.debug("Refreshing schema in response to schema change. " + "Keyspace: %s; Table: %s, Type: %s, Function: %s, Aggregate: %s", + keyspace, table, usertype, function, aggregate) + control_conn._refresh_schema(response_future._connection, keyspace, table, usertype, function, aggregate) else: log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; " - "Keyspace: %s; Table: %s, Type: %s, Function: %s", keyspace, table, usertype, function) + "Keyspace: %s; Table: %s, Type: %s, Function: %s", keyspace, table, usertype, function, aggregate) except Exception: log.exception("Exception refreshing schema in response to schema change:") response_future.session.submit( - control_conn.refresh_schema, keyspace, table, usertype, function) + control_conn.refresh_schema, keyspace, table, usertype, function, aggregate) finally: response_future._set_final_result(None) @@ -2754,6 +2781,7 @@ class ResponseFuture(object): response.results.get('table'), response.results.get('type'), response.results.get('function'), + response.results.get('aggregate'), self.session.cluster.control_connection, self) else: diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 32edd01f..2b92e514 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -30,8 +30,9 @@ try: except ImportError as e: pass -from cassandra import UserFunctionDescriptor +from cassandra import SignatureDescriptor import cassandra.cqltypes as types +from cassandra.encoder import Encoder from cassandra.marshal import varint_unpack from cassandra.pool import Host from cassandra.util import OrderedDict @@ -91,7 +92,7 @@ class Metadata(object): return "\n".join(ks.export_as_string() for ks in self.keyspaces.values()) def rebuild_schema(self, ks_results, type_results, function_results, - cf_results, col_results, triggers_result): + aggregate_results, cf_results, col_results, triggers_result): """ Rebuild the view of the current schema from a fresh set of rows from the system schema tables. @@ -102,6 +103,7 @@ class Metadata(object): col_def_rows = defaultdict(lambda: defaultdict(list)) usertype_rows = defaultdict(list) fn_rows = defaultdict(list) + agg_rows = defaultdict(list) trigger_rows = defaultdict(lambda: defaultdict(list)) for row in cf_results: @@ -118,6 +120,9 @@ class Metadata(object): for row in function_results: fn_rows[row["keyspace_name"]].append(row) + for row in aggregate_results: + agg_rows[row["keyspace_name"]].append(row) + for row in triggers_result: ksname = row["keyspace_name"] cfname = row["columnfamily_name"] @@ -142,6 +147,10 @@ class Metadata(object): fn = self._build_function(keyspace_meta.name, fn_row) keyspace_meta.functions[fn.signature] = fn + for agg_row in agg_rows.get(keyspace_meta.name, []): + agg = self._build_aggregate(keyspace_meta.name, agg_row) + keyspace_meta.aggregates[agg.signature] = agg + current_keyspaces.add(keyspace_meta.name) old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None) self.keyspaces[keyspace_meta.name] = keyspace_meta @@ -172,6 +181,7 @@ class Metadata(object): keyspace_meta.tables = old_keyspace_meta.tables keyspace_meta.user_types = old_keyspace_meta.user_types keyspace_meta.functions = old_keyspace_meta.functions + keyspace_meta.aggregates = old_keyspace_meta.aggregates if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy): self._keyspace_updated(keyspace) else: @@ -193,6 +203,14 @@ class Metadata(object): # the function was deleted self.keyspaces[keyspace].functions.pop(function.signature, None) + def aggregate_changed(self, keyspace, aggregate, aggregate_results): + if aggregate_results: + new_aggregate = self._build_aggregate(keyspace, aggregate_results[0]) + self.keyspaces[keyspace].aggregates[aggregate.signature] = new_aggregate + else: + # the aggregate was deleted + self.keyspaces[keyspace].aggregates.pop(aggregate.signature, None) + def table_changed(self, keyspace, table, cf_results, col_results, triggers_result): try: keyspace_meta = self.keyspaces[keyspace] @@ -242,6 +260,16 @@ class Metadata(object): return_type, function_row['language'], function_row['body'], function_row['is_deterministic'], function_row['called_on_null_input']) + def _build_aggregate(self, keyspace, aggregate_row): + state_type = types.lookup_casstype(aggregate_row['state_type']) + initial_condition = aggregate_row['initcond'] + if initial_condition is not None: + initial_condition = state_type.deserialize(initial_condition, 3) + return_type = types.lookup_casstype(aggregate_row['return_type']) + return Aggregate(aggregate_row['keyspace_name'], aggregate_row['aggregate_name'], + aggregate_row['signature'], aggregate_row['final_func'], initial_condition, + return_type, aggregate_row['state_func'], state_type) + def _build_table_metadata(self, keyspace_metadata, row, col_rows, trigger_rows): cfname = row["columnfamily_name"] cf_col_rows = col_rows.get(cfname, []) @@ -762,14 +790,21 @@ class KeyspaceMetadata(object): user_types = None """ - A map from user-defined type names to instances of :class:`~cassandra.metadata..UserType`. + A map from user-defined type names to instances of :class:`~cassandra.metadata.UserType`. .. versionadded:: 2.1.0 """ functions = None """ - A map from user-defined function names to instances of :class:`~cassandra.metadata..Function`. + A map from user-defined function signatures to instances of :class:`~cassandra.metadata.Function`. + + .. versionadded:: 3.0.0 + """ + + aggregates = None + """ + A map from user-defined aggregate signatures to instances of :class:`~cassandra.metadata.Aggregate`. .. versionadded:: 3.0.0 """ @@ -780,6 +815,7 @@ class KeyspaceMetadata(object): self.tables = {} self.user_types = {} self.functions = {} + self.aggregates = {} def export_as_string(self): """ @@ -789,6 +825,7 @@ class KeyspaceMetadata(object): return "\n\n".join([self.as_cql_query()] + self.user_type_strings() + [f.as_cql_query(True) for f in self.functions.values()] + + [a.as_cql_query(True) for a in self.aggregates.values()] + [t.export_as_string() for t in self.tables.values()]) def as_cql_query(self): @@ -880,6 +917,95 @@ class UserType(object): return ret +class Aggregate(object): + """ + A user defined aggregate function, as created by ``CREATE AGGREGATE`` statements. + + Aggregate functions were introduced in Cassandra 3.0 + + .. versionadded:: 3.0.0 + """ + + keyspace = None + """ + The string name of the keyspace in which this aggregate is defined + """ + + name = None + """ + The name of this aggregate + """ + + type_signature = None + """ + An ordered list of the types for each argument to the aggregate + """ + + final_func = None + """ + Name of a final function + """ + + initial_condition = None + """ + Initial condition of the aggregate + """ + + return_type = None + """ + Return type of the aggregate + """ + + state_func = None + """ + Name of a state function + """ + + state_type = None + """ + Flag indicating whether this function is deterministic + (required for functional indexes) + """ + + def __init__(self, keyspace, name, type_signature, final_func, + initial_condition, return_type, state_func, state_type): + self.keyspace = keyspace + self.name = name + self.type_signature = type_signature + self.final_func = final_func + self.initial_condition = initial_condition + self.return_type = return_type + self.state_func = state_func + self.state_type = state_type + + def as_cql_query(self, formatted=False): + """ + Returns a CQL query that can be used to recreate this aggregate. + If `formatted` is set to :const:`True`, extra whitespace will + be added to make the query more readable. + """ + sep = '\n' if formatted else ' ' + keyspace = protect_name(self.keyspace) + name = protect_name(self.name) + arg_list = ', '.join(self.type_signature) + state_func = protect_name(self.state_func) + state_type = self.state_type.cql_parameterized_type() + + ret = "CREATE AGGREGATE %(keyspace)s.%(name)s(%(arg_list)s)%(sep)s" \ + "SFUNC %(state_func)s%(sep)s" \ + "STYPE %(state_type)s" % locals() + + ret += ''.join((sep, 'FINALFUNC ', protect_name(self.final_func))) if self.final_func else '' + ret += ''.join((sep, 'INITCOND ', Encoder().cql_encode_all_types(self.initial_condition)))\ + if self.initial_condition is not None else '' + + return ret + + @property + def signature(self): + return SignatureDescriptor.format_signature(self.name, self.type_signature) + + class Function(object): """ A user defined function, as created by ``CREATE FUNCTION`` statements. @@ -973,7 +1099,7 @@ class Function(object): @property def signature(self): - return UserFunctionDescriptor.format_signature(self.name, self.type_signature) + return SignatureDescriptor.format_signature(self.name, self.type_signature) class TableMetadata(object): diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 9ec114e7..c6c34615 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -23,7 +23,8 @@ import io from cassandra import (Unavailable, WriteTimeout, ReadTimeout, AlreadyExists, InvalidRequest, Unauthorized, - UnsupportedOperation, UserFunctionDescriptor) + UnsupportedOperation, UserFunctionDescriptor, + UserAggregateDescriptor) from cassandra.marshal import (int32_pack, int32_unpack, uint16_pack, uint16_unpack, int8_pack, int8_unpack, uint64_pack, header_pack, v3_header_pack) @@ -853,8 +854,10 @@ class EventMessage(_MessageType): event = {'change_type': change_type, 'keyspace': keyspace} if target != "KEYSPACE": target_name = read_string(f) - if target in ('FUNCTION', 'AGGREGATE'): + if target == 'FUNCTION': event['function'] = UserFunctionDescriptor(target_name, [read_string(f) for _ in range(read_short(f))]) + elif target == 'AGGREGATE': + event['aggregate'] = UserAggregateDescriptor(target_name, [read_string(f) for _ in range(read_short(f))]) else: event[target.lower()] = target_name else: From a0482216e8df887d3aa1276703cd3d87239f8148 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 14 Apr 2015 16:52:21 -0500 Subject: [PATCH 18/33] Doc update for function and aggregate metadata --- docs/api/cassandra.rst | 5 +++++ docs/api/cassandra/metadata.rst | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/docs/api/cassandra.rst b/docs/api/cassandra.rst index 91500d0f..87ef7fe2 100644 --- a/docs/api/cassandra.rst +++ b/docs/api/cassandra.rst @@ -16,6 +16,11 @@ .. autoclass:: UserFunctionDescriptor :members: + :inherited-members: + +.. autoclass:: UserAggregateDescriptor + :members: + :inherited-members: .. autoexception:: Unavailable() :members: diff --git a/docs/api/cassandra/metadata.rst b/docs/api/cassandra/metadata.rst index 2d3b09f7..a89fd03d 100644 --- a/docs/api/cassandra/metadata.rst +++ b/docs/api/cassandra/metadata.rst @@ -13,6 +13,15 @@ Schemas .. autoclass:: KeyspaceMetadata () :members: +.. autoclass:: UserType () + :members: + +.. autoclass:: Function () + :members: + +.. autoclass:: Aggregate () + :members: + .. autoclass:: TableMetadata () :members: From 3ab95f0adf09ff2ebcbb37271ca28ac418217aa2 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 16 Apr 2015 09:23:45 -0500 Subject: [PATCH 19/33] Add OrderedMapSerializedKey to cql encoder mapping This is to correctly encode values that are originally returned from a query. --- cassandra/encoder.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cassandra/encoder.py b/cassandra/encoder.py index 02eed2aa..d9c3e852 100644 --- a/cassandra/encoder.py +++ b/cassandra/encoder.py @@ -28,7 +28,8 @@ import types from uuid import UUID import six -from cassandra.util import OrderedDict, OrderedMap, sortedset, Time +from cassandra.util import (OrderedDict, OrderedMap, OrderedMapSerializedKey, + sortedset, Time) if six.PY3: long = int @@ -79,6 +80,7 @@ class Encoder(object): dict: self.cql_encode_map_collection, OrderedDict: self.cql_encode_map_collection, OrderedMap: self.cql_encode_map_collection, + OrderedMapSerializedKey: self.cql_encode_map_collection, list: self.cql_encode_list_collection, tuple: self.cql_encode_list_collection, set: self.cql_encode_set_collection, From e17c0fe2ee09f300141bb5d7d323e61e88c89c59 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 16 Apr 2015 09:25:55 -0500 Subject: [PATCH 20/33] Reorder aggregate args to match grammar order, not table column order. --- cassandra/metadata.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 2b92e514..b6e0aee7 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -267,8 +267,8 @@ class Metadata(object): initial_condition = state_type.deserialize(initial_condition, 3) return_type = types.lookup_casstype(aggregate_row['return_type']) return Aggregate(aggregate_row['keyspace_name'], aggregate_row['aggregate_name'], - aggregate_row['signature'], aggregate_row['final_func'], initial_condition, - return_type, aggregate_row['state_func'], state_type) + aggregate_row['signature'], aggregate_row['state_func'], state_type, + aggregate_row['final_func'], initial_condition, return_type) def _build_table_metadata(self, keyspace_metadata, row, col_rows, trigger_rows): cfname = row["columnfamily_name"] @@ -967,16 +967,16 @@ class Aggregate(object): (required for functional indexes) """ - def __init__(self, keyspace, name, type_signature, final_func, - initial_condition, return_type, state_func, state_type): + def __init__(self, keyspace, name, type_signature, state_func, + state_type, final_func, initial_condition, return_type): self.keyspace = keyspace self.name = name self.type_signature = type_signature + self.state_func = state_func + self.state_type = state_type self.final_func = final_func self.initial_condition = initial_condition self.return_type = return_type - self.state_func = state_func - self.state_type = state_type def as_cql_query(self, formatted=False): """ @@ -987,11 +987,11 @@ class Aggregate(object): sep = '\n' if formatted else ' ' keyspace = protect_name(self.keyspace) name = protect_name(self.name) - arg_list = ', '.join(self.type_signature) + type_list = ', '.join(self.type_signature) state_func = protect_name(self.state_func) state_type = self.state_type.cql_parameterized_type() - ret = "CREATE AGGREGATE %(keyspace)s.%(name)s(%(arg_list)s)%(sep)s" \ + ret = "CREATE AGGREGATE %(keyspace)s.%(name)s(%(type_list)s)%(sep)s" \ "SFUNC %(state_func)s%(sep)s" \ "STYPE %(state_type)s" % locals() From 5e5deb17419e4eb3fdefe490c765e536c66f69a8 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 16 Apr 2015 09:46:54 -0500 Subject: [PATCH 21/33] Aggregate metadata integration tests --- cassandra/cluster.py | 2 +- tests/integration/standard/test_metadata.py | 224 +++++++++++++++++--- 2 files changed, 199 insertions(+), 27 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 8832bb8f..e8697120 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1154,7 +1154,7 @@ class Cluster(object): aggregate, max_schema_agreement_wait): raise Exception("Schema was not refreshed. See log for details.") - def submit_schema_refresh(self, keyspace=None, table=None, usertype=None, function=None): + def submit_schema_refresh(self, keyspace=None, table=None, usertype=None, function=None, aggregate=None): """ Schedule a refresh of the internal representation of the current schema for this cluster. See :meth:`~.refresh_schema` for description of parameters. diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index 05c8ba56..9c80a2ee 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -22,11 +22,13 @@ from mock import Mock import six import sys -from cassandra import AlreadyExists, UserFunctionDescriptor +from cassandra import AlreadyExists, SignatureDescriptor from cassandra.cluster import Cluster +from cassandra.cqltypes import DoubleType, Int32Type, ListType, UTF8Type, MapType +from cassandra.encoder import Encoder from cassandra.metadata import (Metadata, KeyspaceMetadata, TableMetadata, - Token, MD5Token, TokenMap, murmur3) + Token, MD5Token, TokenMap, murmur3, Function, Aggregate) from cassandra.policies import SimpleConvictionPolicy from cassandra.pool import Host @@ -929,14 +931,11 @@ class KeyspaceAlterMetadata(unittest.TestCase): self.assertNotEqual(original_keyspace_meta, new_keyspace_meta) self.assertEqual(new_keyspace_meta.durable_writes, False) -from cassandra.cqltypes import DoubleType -from cassandra.metadata import Function - - -class FunctionMetadata(unittest.TestCase): - - keyspace_name = "functionmetadatatest" +class FunctionTest(unittest.TestCase): + """ + Base functionality for Function and Aggregate metadata test classes + """ @property def function_name(self): return self._testMethodName.lower() @@ -947,40 +946,57 @@ class FunctionMetadata(unittest.TestCase): raise unittest.SkipTest("Function metadata requires native protocol version 4+") cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION) + cls.keyspace_name = cls.__name__.lower() cls.session = cls.cluster.connect() - cls.session.execute("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) + cls.session.execute("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name) + cls.session.set_keyspace(cls.keyspace_name) cls.keyspace_function_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].functions + cls.keyspace_aggregate_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].aggregates @classmethod def teardown_class(cls): - cls.session.execute("DROP KEYSPACE %s" % cls.keyspace_name) + cls.session.execute("DROP KEYSPACE IF EXISTS %s" % cls.keyspace_name) cls.cluster.shutdown() - class VerifiedFunction(object): - def __init__(self, test_case, **function_kwargs): + class Verified(object): + + def __init__(self, test_case, meta_class, element_meta, **function_kwargs): self.test_case = test_case self.function_kwargs = dict(function_kwargs) + self.meta_class = meta_class + self.element_meta = element_meta def __enter__(self): tc = self.test_case - expected_meta = Function(**self.function_kwargs) - tc.assertNotIn(expected_meta.signature, tc.keyspace_function_meta) + expected_meta = self.meta_class(**self.function_kwargs) + tc.assertNotIn(expected_meta.signature, self.element_meta) tc.session.execute(expected_meta.as_cql_query()) - tc.assertIn(expected_meta.signature, tc.keyspace_function_meta) + tc.assertIn(expected_meta.signature, self.element_meta) - generated_meta = tc.keyspace_function_meta[expected_meta.signature] + generated_meta = self.element_meta[expected_meta.signature] self.test_case.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query()) return self def __exit__(self, exc_type, exc_val, exc_tb): tc = self.test_case - tc.session.execute("DROP FUNCTION %s.%s" % (tc.keyspace_name, self.signature)) - tc.assertNotIn(self.signature, tc.keyspace_function_meta) + tc.session.execute("DROP %s %s.%s" % (self.meta_class.__name__, tc.keyspace_name, self.signature)) + tc.assertNotIn(self.signature, self.element_meta) @property def signature(self): - return UserFunctionDescriptor.format_signature(self.function_kwargs['name'], - self.function_kwargs['type_signature']) + return SignatureDescriptor.format_signature(self.function_kwargs['name'], + self.function_kwargs['type_signature']) + + class VerifiedFunction(Verified): + def __init__(self, test_case, **kwargs): + super(FunctionTest.VerifiedFunction, self).__init__(test_case, Function, test_case.keyspace_function_meta, **kwargs) + + class VerifiedAggregate(Verified): + def __init__(self, test_case, **kwargs): + super(FunctionTest.VerifiedAggregate, self).__init__(test_case, Aggregate, test_case.keyspace_aggregate_meta, **kwargs) + + +class FunctionMetadata(FunctionTest): def make_function_kwargs(self, deterministic=True, called_on_null=True): return {'keyspace': self.keyspace_name, @@ -993,18 +1009,15 @@ class FunctionMetadata(unittest.TestCase): 'is_deterministic': deterministic, 'called_on_null_input': called_on_null} - def test_create_drop_function(self): - with self.VerifiedFunction(self, **self.make_function_kwargs()): - pass - def test_functions_after_udt(self): self.assertNotIn(self.function_name, self.keyspace_function_meta) udt_name = 'udtx' - self.session.execute("CREATE TYPE %s.%s (x int)" % (self.keyspace_name, udt_name)) + self.session.execute("CREATE TYPE %s (x int)" % udt_name) # Ideally we would make a function that takes a udt type, but # this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen + # https://issues.apache.org/jira/browse/CASSANDRA-9186 # Maybe update this after release #kwargs = self.make_function_kwargs() #kwargs['type_signature'][0] = "frozen<%s>" % udt_name @@ -1066,3 +1079,162 @@ class FunctionMetadata(unittest.TestCase): with self.VerifiedFunction(self, **kwargs) as vf: fn_meta = self.keyspace_function_meta[vf.signature] self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) RETURNS NULL ON NULL INPUT RETURNS .*") + + +class AggregateMetadata(FunctionTest): + + @classmethod + def setup_class(cls): + super(AggregateMetadata, cls).setup_class() + + cls.session.execute("""CREATE OR REPLACE FUNCTION sum_int(s int, i int) + RETURNS NULL ON NULL INPUT + RETURNS int + LANGUAGE javascript AS 's + i';""") + cls.session.execute("""CREATE OR REPLACE FUNCTION sum_int_two(s int, i int, j int) + RETURNS NULL ON NULL INPUT + RETURNS int + LANGUAGE javascript AS 's + i + j';""") + cls.session.execute("""CREATE OR REPLACE FUNCTION "List_As_String"(l list) + RETURNS NULL ON NULL INPUT + RETURNS int + LANGUAGE javascript AS ''''' + l';""") + cls.session.execute("""CREATE OR REPLACE FUNCTION extend_list(s list, i int) + CALLED ON NULL INPUT + RETURNS list + LANGUAGE java AS 'if (i != null) s.add(i.toString()); return s;';""") + cls.session.execute("""CREATE OR REPLACE FUNCTION update_map(s map, i int) + RETURNS NULL ON NULL INPUT + RETURNS map + LANGUAGE java AS 's.put(new Integer(i), new Integer(i)); return s;';""") + cls.session.execute("""CREATE TABLE IF NOT EXISTS t + (k int PRIMARY KEY, v int)""") + for x in range(4): + cls.session.execute("INSERT INTO t (k,v) VALUES (%s, %s)", (x, x)) + cls.session.execute("INSERT INTO t (k) VALUES (%s)", (4,)) + + def make_aggregate_kwargs(self, state_func, state_type, final_func=None, init_cond=None): + return {'keyspace': self.keyspace_name, + 'name': self.function_name + '_aggregate', + 'type_signature': ['int'], + 'state_func': state_func, + 'state_type': state_type, + 'final_func': final_func, + 'initial_condition': init_cond, + 'return_type': "does not matter for creation"} + + def test_return_type_meta(self): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=1)) as va: + self.assertIs(self.keyspace_aggregate_meta[va.signature].return_type, Int32Type) + + def test_init_cond(self): + # This is required until the java driver bundled with C* is updated to support v4 + c = Cluster(protocol_version=3) + s = c.connect(self.keyspace_name) + + expected_values = range(4) + + # int32 + for init_cond in (-1, 0, 1): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=init_cond)) as va: + sum_res = s.execute("SELECT %s(v) AS sum FROM t" % va.function_kwargs['name'])[0].sum + self.assertEqual(sum_res, init_cond + sum(expected_values)) + + # list + for init_cond in ([], ['1', '2']): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]), init_cond=init_cond)) as va: + list_res = s.execute("SELECT %s(v) AS list_res FROM t" % va.function_kwargs['name'])[0].list_res + self.assertListEqual(list_res[:len(init_cond)], init_cond) + self.assertEqual(set(i for i in list_res[len(init_cond):]), + set(str(i) for i in expected_values)) + + # map + expected_map_values = dict((i, i) for i in expected_values) + expected_key_set = set(expected_values) + for init_cond in ({}, {1: 2, 3: 4}, {5: 5}): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('update_map', MapType.apply_parameters([Int32Type, Int32Type]), init_cond=init_cond)) as va: + map_res = s.execute("SELECT %s(v) AS map_res FROM t" % va.function_kwargs['name'])[0].map_res + self.assertDictContainsSubset(expected_map_values, map_res) + init_not_updated = dict((k, init_cond[k]) for k in set(init_cond) - expected_key_set) + self.assertDictContainsSubset(init_not_updated, map_res) + c.shutdown() + + def test_aggregates_after_functions(self): + # functions must come before functions in keyspace dump + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]))): + keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string() + func_idx = keyspace_cql.find("CREATE FUNCTION") + aggregate_idx = keyspace_cql.rfind("CREATE AGGREGATE") + self.assertNotIn(-1, (aggregate_idx, func_idx), "AGGREGATE or FUNCTION not found in keyspace_cql: " + keyspace_cql) + self.assertGreater(aggregate_idx, func_idx) + + def test_same_name_diff_types(self): + kwargs = self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=0) + with self.VerifiedAggregate(self, **kwargs): + kwargs['state_func'] = 'sum_int_two' + kwargs['type_signature'] = ['int', 'int'] + with self.VerifiedAggregate(self, **kwargs): + aggregates = [a for a in self.keyspace_aggregate_meta.values() if a.name == kwargs['name']] + self.assertEqual(len(aggregates), 2) + self.assertNotEqual(aggregates[0].type_signature, aggregates[1].type_signature) + + def test_aggregates_follow_keyspace_alter(self): + with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=0)): + original_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name] + self.session.execute('ALTER KEYSPACE %s WITH durable_writes = false' % self.keyspace_name) + try: + new_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name] + self.assertNotEqual(original_keyspace_meta, new_keyspace_meta) + self.assertIs(original_keyspace_meta.aggregates, new_keyspace_meta.aggregates) + finally: + self.session.execute('ALTER KEYSPACE %s WITH durable_writes = true' % self.keyspace_name) + + def test_cql_optional_params(self): + kwargs = self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type])) + + # no initial condition, final func + self.assertIsNone(kwargs['initial_condition']) + self.assertIsNone(kwargs['final_func']) + with self.VerifiedAggregate(self, **kwargs) as va: + meta = self.keyspace_aggregate_meta[va.signature] + self.assertIsNone(meta.initial_condition) + self.assertIsNone(meta.final_func) + cql = meta.as_cql_query() + self.assertEqual(cql.find('INITCOND'), -1) + self.assertEqual(cql.find('FINALFUNC'), -1) + + # initial condition, no final func + kwargs['initial_condition'] = ['init', 'cond'] + with self.VerifiedAggregate(self, **kwargs) as va: + meta = self.keyspace_aggregate_meta[va.signature] + self.assertListEqual(meta.initial_condition, kwargs['initial_condition']) + self.assertIsNone(meta.final_func) + cql = meta.as_cql_query() + search_string = "INITCOND %s" % Encoder().cql_encode_all_types(kwargs['initial_condition']) + self.assertGreater(cql.find(search_string), 0, '"%s" search string not found in cql:\n%s' % (search_string, cql)) + self.assertEqual(cql.find('FINALFUNC'), -1) + + # no initial condition, final func + kwargs['initial_condition'] = None + kwargs['final_func'] = 'List_As_String' + with self.VerifiedAggregate(self, **kwargs) as va: + meta = self.keyspace_aggregate_meta[va.signature] + self.assertIsNone(meta.initial_condition) + self.assertEqual(meta.final_func, kwargs['final_func']) + cql = meta.as_cql_query() + self.assertEqual(cql.find('INITCOND'), -1) + search_string = 'FINALFUNC "%s"' % kwargs['final_func'] + self.assertGreater(cql.find(search_string), 0, '"%s" search string not found in cql:\n%s' % (search_string, cql)) + + # both + kwargs['initial_condition'] = ['init', 'cond'] + kwargs['final_func'] = 'List_As_String' + with self.VerifiedAggregate(self, **kwargs) as va: + meta = self.keyspace_aggregate_meta[va.signature] + self.assertListEqual(meta.initial_condition, kwargs['initial_condition']) + self.assertEqual(meta.final_func, kwargs['final_func']) + cql = meta.as_cql_query() + init_cond_idx = cql.find("INITCOND %s" % Encoder().cql_encode_all_types(kwargs['initial_condition'])) + final_func_idx = cql.find('FINALFUNC "%s"' % kwargs['final_func']) + self.assertNotIn(-1, (init_cond_idx, final_func_idx)) + self.assertGreater(init_cond_idx, final_func_idx) From f5d5f905e7cb7db97b4c06cbd314728c2acf410b Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 16 Apr 2015 15:34:42 -0500 Subject: [PATCH 22/33] Removed unused parameters from PreparedStatement init --- cassandra/query.py | 9 ++------- tests/unit/test_parameter_binding.py | 9 ++++----- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/cassandra/query.py b/cassandra/query.py index fc71b8f6..031a1684 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -338,19 +338,14 @@ class PreparedStatement(object): fetch_size = FETCH_SIZE_UNSET - def __init__(self, column_metadata, query_id, routing_key_indexes, query, keyspace, - protocol_version, consistency_level=None, serial_consistency_level=None, - fetch_size=FETCH_SIZE_UNSET): + def __init__(self, column_metadata, query_id, routing_key_indexes, query, + keyspace, protocol_version): self.column_metadata = column_metadata self.query_id = query_id self.routing_key_indexes = routing_key_indexes self.query_string = query self.keyspace = keyspace self.protocol_version = protocol_version - self.consistency_level = consistency_level - self.serial_consistency_level = serial_consistency_level - if fetch_size is not FETCH_SIZE_UNSET: - self.fetch_size = fetch_size @classmethod def from_message(cls, query_id, column_metadata, pk_indexes, cluster_metadata, query, prepared_keyspace, protocol_version): diff --git a/tests/unit/test_parameter_binding.py b/tests/unit/test_parameter_binding.py index f649c520..3fce7b9e 100644 --- a/tests/unit/test_parameter_binding.py +++ b/tests/unit/test_parameter_binding.py @@ -128,8 +128,8 @@ class BoundStatementTestCase(unittest.TestCase): routing_key_indexes=[], query=None, keyspace=keyspace, - protocol_version=2, - fetch_size=1234) + protocol_version=2) + prepared_statement.fetch_size = 1234 bound_statement = BoundStatement(prepared_statement=prepared_statement) self.assertEqual(1234, bound_statement.fetch_size) @@ -147,10 +147,9 @@ class BoundStatementTestCase(unittest.TestCase): routing_key_indexes=[0, 1], query=None, keyspace=keyspace, - protocol_version=2, - fetch_size=1234) + protocol_version=2) self.assertRaises(ValueError, prepared_statement.bind, (1,)) - bound = prepared_statement.bind((1,2)) + bound = prepared_statement.bind((1, 2)) self.assertEqual(bound.keyspace, keyspace) From 80e952b298cc26ff3199e320b7d97626eff1f5e9 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 17 Apr 2015 10:22:37 -0500 Subject: [PATCH 23/33] Custom payloads for protocol v4 PYTHON-280 --- CHANGELOG.rst | 1 - cassandra/auth.py | 2 ++ cassandra/cluster.py | 42 ++++++++++++++++++++++++----- cassandra/protocol.py | 43 +++++++++++++++++++++++++++++- cassandra/query.py | 61 ++++++++++++++++++++++++++++++++++--------- 5 files changed, 128 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index af2d6113..8f914168 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -78,7 +78,6 @@ Bug Fixes --------- * Make execute_concurrent compatible with Python 2.6 (PYTHON-159) * Handle Unauthorized message on schema_triggers query (PYTHON-155) -* Make execute_concurrent compatible with Python 2.6 (github-197) * Pure Python sorted set in support of UDTs nested in collections (PYTON-167) * Support CUSTOM index metadata and string export (PYTHON-165) diff --git a/cassandra/auth.py b/cassandra/auth.py index 67d302a9..508bd150 100644 --- a/cassandra/auth.py +++ b/cassandra/auth.py @@ -15,6 +15,7 @@ try: except ImportError: SASLClient = None + class AuthProvider(object): """ An abstract class that defines the interface that will be used for @@ -157,6 +158,7 @@ class SaslAuthProvider(AuthProvider): def new_authenticator(self, host): return SaslAuthenticator(**self.sasl_kwargs) + class SaslAuthenticator(Authenticator): """ A pass-through :class:`~.Authenticator` using the third party package diff --git a/cassandra/cluster.py b/cassandra/cluster.py index cb04152a..e84afd2e 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1361,7 +1361,7 @@ class Session(object): for future in futures: future.result() - def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False): + def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_payload=None): """ Execute the given query and synchronously wait for the response. @@ -1389,6 +1389,10 @@ class Session(object): instance and not just a string. If there is an error fetching the trace details, the :attr:`~.Statement.trace` attribute will be left as :const:`None`. + + `custom_payload` is a dict as described in TODO section. If `query` is a Statement + with its own custom_payload. the message will be a union of the two, + with the values specified here taking precedence. """ if timeout is _NOT_SET: timeout = self.default_timeout @@ -1398,7 +1402,7 @@ class Session(object): "The query argument must be an instance of a subclass of " "cassandra.query.Statement when trace=True") - future = self.execute_async(query, parameters, trace) + future = self.execute_async(query, parameters, trace, custom_payload) try: result = future.result(timeout) finally: @@ -1410,7 +1414,7 @@ class Session(object): return result - def execute_async(self, query, parameters=None, trace=False): + def execute_async(self, query, parameters=None, trace=False, custom_payload=None): """ Execute the given query and return a :class:`~.ResponseFuture` object which callbacks may be attached to for asynchronous response @@ -1422,6 +1426,13 @@ class Session(object): :meth:`.ResponseFuture.get_query_trace()` after the request completes to retrieve a :class:`.QueryTrace` instance. + `custom_payload` is a dict as described in TODO section. If `query` is + a Statement with a custom_payload specified. the message will be a + union of the two, with the values specified here taking precedence. + + If the server sends a custom payload in the response message, + the dict can be obtained via :attr:`.ResponseFuture.custom_payload` + Example usage:: >>> session = cluster.connect() @@ -1447,11 +1458,11 @@ class Session(object): ... log.exception("Operation failed:") """ - future = self._create_response_future(query, parameters, trace) + future = self._create_response_future(query, parameters, trace, custom_payload) future.send_request() return future - def _create_response_future(self, query, parameters, trace): + def _create_response_future(self, query, parameters, trace, custom_payload): """ Returns the ResponseFuture before calling send_request() on it """ prepared_statement = None @@ -1501,13 +1512,16 @@ class Session(object): if trace: message.tracing = True + message.update_custom_payload(query.custom_payload) + message.update_custom_payload(custom_payload) + return ResponseFuture( self, message, query, self.default_timeout, metrics=self._metrics, prepared_statement=prepared_statement) - def prepare(self, query): + def prepare(self, query, custom_payload=None): """ - Prepares a query string, returing a :class:`~cassandra.query.PreparedStatement` + Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement` instance which can be used as follows:: >>> session = cluster.connect("mykeyspace") @@ -1530,8 +1544,12 @@ class Session(object): **Important**: PreparedStatements should be prepared only once. Preparing the same query more than once will likely affect performance. + + `custom_payload` is a key value map to be passed along with the prepare + message. See TODO: refer to doc section """ message = PrepareMessage(query=query) + message.custom_payload = custom_payload future = ResponseFuture(self, message, query=None) try: future.send_request() @@ -1543,6 +1561,7 @@ class Session(object): prepared_statement = PreparedStatement.from_message( query_id, column_metadata, pk_indexes, self.cluster.metadata, query, self.keyspace, self._protocol_version) + prepared_statement.custom_payload = future.custom_payload host = future._current_host try: @@ -2567,6 +2586,7 @@ class ResponseFuture(object): _start_time = None _metrics = None _paging_state = None + _custom_payload = None def __init__(self, session, message, query, default_timeout=None, metrics=None, prepared_statement=None): self.session = session @@ -2654,6 +2674,12 @@ class ResponseFuture(object): """ return self._paging_state is not None + @property + def custom_payload(self): + if not self._event.is_set(): + raise Exception("custom_payload cannot be retrieved before ResponseFuture is finalized") + return self._custom_payload + def start_fetching_next_page(self): """ If there are more pages left in the query result, this asynchronously @@ -2690,6 +2716,8 @@ class ResponseFuture(object): if trace_id: self._query_trace = QueryTrace(trace_id, self.session) + self._custom_payload = getattr(response, 'custom_payload', None) + if isinstance(response, ResultMessage): if response.kind == RESULT_KIND_SET_KEYSPACE: session = getattr(self, 'session', None) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index d81a6507..a48af348 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -54,6 +54,7 @@ HEADER_DIRECTION_MASK = 0x80 COMPRESSED_FLAG = 0x01 TRACING_FLAG = 0x02 +CUSTOM_PAYLOAD_FLAG = 0x04 _message_types_by_name = {} _message_types_by_opcode = {} @@ -70,13 +71,19 @@ class _RegisterMessageType(type): class _MessageType(object): tracing = False + custom_payload = None def to_binary(self, stream_id, protocol_version, compression=None): + flags = 0 body = io.BytesIO() + if self.custom_payload: + if protocol_version < 4: + raise UnsupportedOperation("Custom key/value payloads can only be used with protocol version 4 or higher") + flags |= CUSTOM_PAYLOAD_FLAG + write_bytesmap(body, self.custom_payload) self.send_body(body, protocol_version) body = body.getvalue() - flags = 0 if compression and len(body) > 0: body = compression(body) flags |= COMPRESSED_FLAG @@ -89,6 +96,12 @@ class _MessageType(object): return msg.getvalue() + def update_custom_payload(self, other): + if other: + if not self.custom_payload: + self.custom_payload = {} + self.custom_payload.update(other) + def __repr__(self): return '<%s(%s)>' % (self.__class__.__name__, ', '.join('%s=%r' % i for i in _get_params(self))) @@ -116,6 +129,12 @@ def decode_response(protocol_version, user_type_map, stream_id, flags, opcode, b else: trace_id = None + if flags & CUSTOM_PAYLOAD_FLAG: + custom_payload = read_bytesmap(body) + flags ^= CUSTOM_PAYLOAD_FLAG + else: + custom_payload = None + if flags: log.warning("Unknown protocol flags set: %02x. May cause problems.", flags) @@ -123,6 +142,7 @@ def decode_response(protocol_version, user_type_map, stream_id, flags, opcode, b msg = msg_class.recv_body(body, protocol_version, user_type_map) msg.stream_id = stream_id msg.trace_id = trace_id + msg.custom_payload = custom_payload return msg @@ -918,6 +938,11 @@ def read_binary_string(f): return contents +def write_binary_string(f, s): + write_short(f, len(s)) + f.write(s) + + def write_string(f, s): if isinstance(s, six.text_type): s = s.encode('utf8') @@ -969,6 +994,22 @@ def write_stringmap(f, strmap): write_string(f, v) +def read_bytesmap(f): + numpairs = read_short(f) + bytesmap = {} + for _ in range(numpairs): + k = read_string(f) + bytesmap[k] = read_binary_string(f) + return bytesmap + + +def write_bytesmap(f, bytesmap): + write_short(f, len(bytesmap)) + for k, v in bytesmap.items(): + write_string(f, k) + write_binary_string(f, v) + + def read_stringmultimap(f): numkeys = read_short(f) strmmap = {} diff --git a/cassandra/query.py b/cassandra/query.py index 031a1684..70d16e9e 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -197,11 +197,25 @@ class Statement(object): .. versionadded:: 2.1.3 """ + custom_payload = None + """ + TODO: refer to custom proto doc section + A string:binary_type dict holding custom key/value pairs to be passed + in the frame to a custom QueryHandler on the server side. + + By default these values are ignored by the server. + + These are only allowed when using protocol version 4 or higher. + + .. versionadded:: 3.0.0 + """ + _serial_consistency_level = None _routing_key = None def __init__(self, retry_policy=None, consistency_level=None, routing_key=None, - serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None): + serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None, + custom_payload=None): self.retry_policy = retry_policy if consistency_level is not None: self.consistency_level = consistency_level @@ -212,6 +226,8 @@ class Statement(object): self.fetch_size = fetch_size if keyspace is not None: self.keyspace = keyspace + if custom_payload is not None: + self.custom_payload = custom_payload def _get_routing_key(self): return self._routing_key @@ -290,8 +306,7 @@ class Statement(object): class SimpleStatement(Statement): """ - A simple, un-prepared query. All attributes of :class:`Statement` apply - to this class as well. + A simple, un-prepared query. """ def __init__(self, query_string, *args, **kwargs): @@ -299,6 +314,8 @@ class SimpleStatement(Statement): `query_string` should be a literal CQL statement with the exception of parameter placeholders that will be filled through the `parameters` argument of :meth:`.Session.execute()`. + + All arguments to :class:`Statement` apply to this class as well """ Statement.__init__(self, *args, **kwargs) self._query_string = query_string @@ -338,6 +355,8 @@ class PreparedStatement(object): fetch_size = FETCH_SIZE_UNSET + custom_payload = None + def __init__(self, column_metadata, query_id, routing_key_indexes, query, keyspace, protocol_version): self.column_metadata = column_metadata @@ -397,8 +416,6 @@ class BoundStatement(Statement): """ A prepared statement that has been bound to a particular set of values. These may be created directly or through :meth:`.PreparedStatement.bind()`. - - All attributes of :class:`Statement` apply to this class as well. """ prepared_statement = None @@ -414,13 +431,15 @@ class BoundStatement(Statement): def __init__(self, prepared_statement, *args, **kwargs): """ `prepared_statement` should be an instance of :class:`PreparedStatement`. - All other ``*args`` and ``**kwargs`` will be passed to :class:`.Statement`. + + All arguments to :class:`Statement` apply to this class as well """ self.prepared_statement = prepared_statement self.consistency_level = prepared_statement.consistency_level self.serial_consistency_level = prepared_statement.serial_consistency_level self.fetch_size = prepared_statement.fetch_size + self.custom_payload = prepared_statement.custom_payload self.values = [] meta = prepared_statement.column_metadata @@ -601,7 +620,8 @@ class BatchStatement(Statement): _session = None def __init__(self, batch_type=BatchType.LOGGED, retry_policy=None, - consistency_level=None, serial_consistency_level=None, session=None): + consistency_level=None, serial_consistency_level=None, + session=None, custom_payload=None): """ `batch_type` specifies The :class:`.BatchType` for the batch operation. Defaults to :attr:`.BatchType.LOGGED`. @@ -612,6 +632,10 @@ class BatchStatement(Statement): `consistency_level` should be a :class:`~.ConsistencyLevel` value to be used for all operations in the batch. + `custom_payload` is a key-value map TODO: refer to doc section + Note: as Statement objects are added to the batch, this map is + updated with values from their custom payloads. + Example usage: .. code-block:: python @@ -637,12 +661,15 @@ class BatchStatement(Statement): .. versionchanged:: 2.1.0 Added `serial_consistency_level` as a parameter + + .. versionchanged:: 3.0.0 + Added `custom_payload` as a parameter """ self.batch_type = batch_type self._statements_and_parameters = [] self._session = session Statement.__init__(self, retry_policy=retry_policy, consistency_level=consistency_level, - serial_consistency_level=serial_consistency_level) + serial_consistency_level=serial_consistency_level, custom_payload=custom_payload) def add(self, statement, parameters=None): """ @@ -660,7 +687,7 @@ class BatchStatement(Statement): elif isinstance(statement, PreparedStatement): query_id = statement.query_id bound_statement = statement.bind(() if parameters is None else parameters) - self._maybe_set_routing_attributes(bound_statement) + self._update_state(bound_statement) self._statements_and_parameters.append( (True, query_id, bound_statement.values)) elif isinstance(statement, BoundStatement): @@ -668,7 +695,7 @@ class BatchStatement(Statement): raise ValueError( "Parameters cannot be passed with a BoundStatement " "to BatchStatement.add()") - self._maybe_set_routing_attributes(statement) + self._update_state(statement) self._statements_and_parameters.append( (True, statement.prepared_statement.query_id, statement.values)) else: @@ -677,7 +704,7 @@ class BatchStatement(Statement): if parameters: encoder = Encoder() if self._session is None else self._session.encoder query_string = bind_params(query_string, parameters, encoder) - self._maybe_set_routing_attributes(statement) + self._update_state(statement) self._statements_and_parameters.append((False, query_string, ())) return self @@ -696,6 +723,16 @@ class BatchStatement(Statement): self.routing_key = statement.routing_key self.keyspace = statement.keyspace + def _update_custom_payload(self, statement): + if statement.custom_payload: + if self.custom_payload is None: + self.custom_payload = {} + self.custom_payload.update(statement.custom_payload) + + def _update_state(self, statement): + self._maybe_set_routing_attributes(statement) + self._update_custom_payload(statement) + def __str__(self): consistency = ConsistencyLevel.value_to_name.get(self.consistency_level, 'Not Set') return (u'' % @@ -836,7 +873,7 @@ class QueryTrace(object): def _execute(self, query, parameters, time_spent, max_wait): # in case the user switched the row factory, set it to namedtuple for this query - future = self._session._create_response_future(query, parameters, trace=False) + future = self._session._create_response_future(query, parameters, trace=False, custom_payload=None) future.row_factory = named_tuple_factory future.send_request() From f4191903b7e85e82849c20401d5e776bf45d2a9e Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 17 Apr 2015 16:51:52 -0500 Subject: [PATCH 24/33] Docs for custom_payload --- cassandra/cluster.py | 17 +++++++++-------- cassandra/query.py | 11 ++++------- docs/api/index.rst | 1 + 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index e84afd2e..28885ae9 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1390,9 +1390,9 @@ class Session(object): trace details, the :attr:`~.Statement.trace` attribute will be left as :const:`None`. - `custom_payload` is a dict as described in TODO section. If `query` is a Statement - with its own custom_payload. the message will be a union of the two, - with the values specified here taking precedence. + `custom_payload` is a :ref:`custom_payload` dict to be passed to the server. + If `query` is a Statement with its own custom_payload. The message payload + will be a union of the two, with the values specified here taking precedence. """ if timeout is _NOT_SET: timeout = self.default_timeout @@ -1426,12 +1426,13 @@ class Session(object): :meth:`.ResponseFuture.get_query_trace()` after the request completes to retrieve a :class:`.QueryTrace` instance. - `custom_payload` is a dict as described in TODO section. If `query` is - a Statement with a custom_payload specified. the message will be a - union of the two, with the values specified here taking precedence. + `custom_payload` is a :ref:`custom_payload` dict to be passed to the server. + If `query` is a Statement with its own custom_payload. The message payload + will be a union of the two, with the values specified here taking precedence. If the server sends a custom payload in the response message, - the dict can be obtained via :attr:`.ResponseFuture.custom_payload` + the dict can be obtained following :meth:`.ResponseFuture.result` via + :attr:`.ResponseFuture.custom_payload` Example usage:: @@ -1546,7 +1547,7 @@ class Session(object): Preparing the same query more than once will likely affect performance. `custom_payload` is a key value map to be passed along with the prepare - message. See TODO: refer to doc section + message. See :ref:`custom_payload`. """ message = PrepareMessage(query=query) message.custom_payload = custom_payload diff --git a/cassandra/query.py b/cassandra/query.py index 70d16e9e..722e0d59 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -199,11 +199,7 @@ class Statement(object): custom_payload = None """ - TODO: refer to custom proto doc section - A string:binary_type dict holding custom key/value pairs to be passed - in the frame to a custom QueryHandler on the server side. - - By default these values are ignored by the server. + :ref:`custom_payload` to be passed to the server. These are only allowed when using protocol version 4 or higher. @@ -632,9 +628,10 @@ class BatchStatement(Statement): `consistency_level` should be a :class:`~.ConsistencyLevel` value to be used for all operations in the batch. - `custom_payload` is a key-value map TODO: refer to doc section + `custom_payload` is a :ref:`custom_payload` passed to the server. Note: as Statement objects are added to the batch, this map is - updated with values from their custom payloads. + updated with any values found in their custom payloads. These are + only allowed when using protocol version 4 or higher. Example usage: diff --git a/docs/api/index.rst b/docs/api/index.rst index e0fe9810..340a5e02 100644 --- a/docs/api/index.rst +++ b/docs/api/index.rst @@ -14,6 +14,7 @@ Core Driver cassandra/metrics cassandra/query cassandra/pool + cassandra/protocol cassandra/encoder cassandra/decoder cassandra/concurrent From 567f9b0216f51c9e80ea7d8ec5af96ba6acf1850 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 20 Apr 2015 10:08:20 -0500 Subject: [PATCH 25/33] Fix variable rename bug Peer reveiw input --- cassandra/metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra/metadata.py b/cassandra/metadata.py index b6e0aee7..a8e3ba92 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -161,7 +161,7 @@ class Metadata(object): # remove not-just-added keyspaces removed_keyspaces = [name for name in self.keyspaces.keys() - if ksname not in current_keyspaces] + if name not in current_keyspaces] self.keyspaces = dict((name, meta) for name, meta in self.keyspaces.items() if name in current_keyspaces) for ksname in removed_keyspaces: From 2c062213fde00a0c30b3dcbddd2e4dea6d7c210e Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Mon, 20 Apr 2015 14:22:09 -0500 Subject: [PATCH 26/33] Add the missing protocol doc file --- docs/api/cassandra/protocol.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 docs/api/cassandra/protocol.rst diff --git a/docs/api/cassandra/protocol.rst b/docs/api/cassandra/protocol.rst new file mode 100644 index 00000000..a6b4d1c4 --- /dev/null +++ b/docs/api/cassandra/protocol.rst @@ -0,0 +1,11 @@ +.. _custom_payload: + +Custom Payload +============== +Native protocol version 4+ allows for a custom payload to be sent between clients +and custom query handlers. The payload is specified as a string:binary_type dict +holding custom key/value pairs. + +By default these are ignored by the server. They can be useful for servers implementing +a custom QueryHandler. + From 9f0ca79a700ea06881ae67bd6a401adb31cef4df Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Fri, 24 Apr 2015 10:24:37 -0500 Subject: [PATCH 27/33] Add docs for ResponseFuture.custom_paylaod --- cassandra/cluster.py | 11 +++++++++++ docs/api/cassandra/cluster.rst | 2 ++ 2 files changed, 13 insertions(+) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 28885ae9..7d982e15 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2677,6 +2677,17 @@ class ResponseFuture(object): @property def custom_payload(self): + """ + The custom payload returned from the server, if any. This will only be + set by Cassandra servers implementing a custom QueryHandler, and only + for protocol_version 4+. + + Ensure the future is complete before trying to access this property + (call :meth:`.result()`, or after callback is invoked). + Otherwise it may throw if the response has not been received. + + :return: :ref:`custom_payload`. + """ if not self._event.is_set(): raise Exception("custom_payload cannot be retrieved before ResponseFuture is finalized") return self._custom_payload diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst index 6532c9a0..e6e3cf76 100644 --- a/docs/api/cassandra/cluster.rst +++ b/docs/api/cassandra/cluster.rst @@ -102,6 +102,8 @@ .. automethod:: get_query_trace() + .. autoattribute:: custom_payload() + .. autoattribute:: has_more_pages .. automethod:: start_fetching_next_page() From 7cb178ff0190b785369747d42604eee64ad6bbaf Mon Sep 17 00:00:00 2001 From: GregBestland Date: Tue, 5 May 2015 17:24:40 -0500 Subject: [PATCH 28/33] Adding tests for PYTHON-235 --- tests/integration/standard/test_query.py | 40 +++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index 780f3078..45cdcaf8 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os - +import socket from cassandra.concurrent import execute_concurrent @@ -89,6 +89,44 @@ class QueryTests(unittest.TestCase): cluster.shutdown() + def test_trace_client_ip(self): + """ ++ Test to validate that client trace contains client ip information. ++ ++ creates a simple query and ensures that the client trace information is present. This will + only be the case if the CQL version is 4 or greater./ + ++ ++ @since 3.0 ++ @jira_ticket PYTHON-235 ++ @expected_result client address should be present in CQL > 4, otherwise should be none. ++ ++ @test_category trace ++ """ + if PROTOCOL_VERSION < 4: + raise unittest.SkipTest( + "Protocol 4+ is required for client ip tracing, currently testing against %r" + % (PROTOCOL_VERSION,)) + cluster = Cluster(protocol_version=PROTOCOL_VERSION) + session = cluster.connect() + + query = "SELECT * FROM system.local" + statement = SimpleStatement(query) + session.execute(statement, trace=True) + + # Fetch the client_ip from the trace. + client_ip=statement.trace.client + # Ensure that ip is set for CQL >4 + self.assertIsNotNone(client_ip,"Client IP was not set in trace with CQL >=4.0") + # TODO we might want validate that client_ip actually matches our local ip rather than just validate that it + # is a valid ip. + try: + socket.inet_aton(client_ip) + except socket.error: + self.fail("Client IP retrieved from trace was not valid :{0}".format(client_ip)) + + cluster.shutdown() + class PreparedStatementTests(unittest.TestCase): From bca2a69392175a1b15f2a8c32bf1fc6c67f3d758 Mon Sep 17 00:00:00 2001 From: GregBestland Date: Wed, 6 May 2015 13:07:06 -0500 Subject: [PATCH 29/33] Fixing test for PYTHON-235 --- tests/integration/standard/test_query.py | 57 +++++++++++++----------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index 45cdcaf8..bb1cafa7 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -27,7 +27,7 @@ from cassandra.query import (PreparedStatement, BoundStatement, SimpleStatement, from cassandra.cluster import Cluster from cassandra.policies import HostDistance -from tests.integration import use_singledc, PROTOCOL_VERSION +from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions def setup_module(): @@ -89,42 +89,45 @@ class QueryTests(unittest.TestCase): cluster.shutdown() - def test_trace_client_ip(self): + def test_client_ip_in_trace(self): """ -+ Test to validate that client trace contains client ip information. -+ -+ creates a simple query and ensures that the client trace information is present. This will - only be the case if the CQL version is 4 or greater./ + Test to validate that client trace contains client ip information. -+ -+ @since 3.0 -+ @jira_ticket PYTHON-235 -+ @expected_result client address should be present in CQL > 4, otherwise should be none. -+ -+ @test_category trace + creates a simple query and ensures that the client trace information is present. This will + only be the case if the c* version is 3.0 or greater + + + @since 3.0 + @jira_ticket PYTHON-235 + @expected_result client address should be present in C* > 3, otherwise should be none. + + @test_category tracing + """ + #The current version on the trunk doesn't have the version set to 3.0 yet. + #For now we will use the protocol version. Once they update the version on C* trunk + #we can use the C*. See below + #self._cass_version, self._cql_version = get_server_versions() + #if self._cass_version < (3, 0): + # raise unittest.SkipTest("Client IP was not present in trace until C* 3.0") if PROTOCOL_VERSION < 4: - raise unittest.SkipTest( - "Protocol 4+ is required for client ip tracing, currently testing against %r" - % (PROTOCOL_VERSION,)) + raise unittest.SkipTest( + "Protocol 4+ is required for client ip tracing, currently testing against %r" + % (PROTOCOL_VERSION,)) + cluster = Cluster(protocol_version=PROTOCOL_VERSION) session = cluster.connect() query = "SELECT * FROM system.local" statement = SimpleStatement(query) - session.execute(statement, trace=True) - + response_future = session.execute_async(statement, trace=True) + response_future.result(10.0) + current_host = response_future._current_host.address # Fetch the client_ip from the trace. - client_ip=statement.trace.client - # Ensure that ip is set for CQL >4 - self.assertIsNotNone(client_ip,"Client IP was not set in trace with CQL >=4.0") - # TODO we might want validate that client_ip actually matches our local ip rather than just validate that it - # is a valid ip. - try: - socket.inet_aton(client_ip) - except socket.error: - self.fail("Client IP retrieved from trace was not valid :{0}".format(client_ip)) - + trace = response_future.get_query_trace(2.0) + client_ip = trace.client + # Ensure that ip is set for c* >3 + self.assertIsNotNone(client_ip,"Client IP was not set in trace with C* > 3.0") + self.assertEqual(client_ip,current_host,"Client IP from trace did not match the expected value") cluster.shutdown() From 7f2e992d768fbcfee57e493ded850a9299f9408c Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 6 May 2015 16:16:40 -0500 Subject: [PATCH 30/33] Minor tweaks on trace client integration test --- tests/integration/standard/test_query.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index bb1cafa7..7f84f9ab 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import os -import socket from cassandra.concurrent import execute_concurrent @@ -99,7 +98,7 @@ class QueryTests(unittest.TestCase): @since 3.0 @jira_ticket PYTHON-235 - @expected_result client address should be present in C* > 3, otherwise should be none. + @expected_result client address should be present in C* >= 3, otherwise should be none. @test_category tracing + """ From 087293050b322b736a7468ae70c2044d1510c47a Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Wed, 6 May 2015 16:29:23 -0500 Subject: [PATCH 31/33] Update unit tests for new refresh_schema signature. --- tests/unit/test_control_connection.py | 6 +++--- tests/unit/test_response_future.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_control_connection.py b/tests/unit/test_control_connection.py index 1f1ccc1b..9c93f5af 100644 --- a/tests/unit/test_control_connection.py +++ b/tests/unit/test_control_connection.py @@ -410,12 +410,12 @@ class ControlConnectionTest(unittest.TestCase): } self.cluster.scheduler.reset_mock() self.control_connection._handle_schema_change(event) - self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', 'table1', None) + self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', 'table1', None, None, None) self.cluster.scheduler.reset_mock() event['table'] = None self.control_connection._handle_schema_change(event) - self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', None, None) + self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', None, None, None, None) def test_refresh_disabled(self): cluster = MockCluster() @@ -463,4 +463,4 @@ class ControlConnectionTest(unittest.TestCase): cc_no_topo_refresh._handle_schema_change(schema_event) cluster.scheduler.schedule_unique.assert_has_calls([call(ANY, cc_no_topo_refresh.refresh_node_list_and_token_map), call(0.0, cc_no_topo_refresh.refresh_schema, - schema_event['keyspace'], schema_event['table'], None)]) + schema_event['keyspace'], schema_event['table'], None, None, None)]) diff --git a/tests/unit/test_response_future.py b/tests/unit/test_response_future.py index 027fe732..92351a9d 100644 --- a/tests/unit/test_response_future.py +++ b/tests/unit/test_response_future.py @@ -105,7 +105,7 @@ class ResponseFutureTests(unittest.TestCase): kind=RESULT_KIND_SCHEMA_CHANGE, results={'keyspace': "keyspace1", "table": "table1"}) rf._set_result(result) - session.submit.assert_called_once_with(ANY, 'keyspace1', 'table1', None, ANY, rf) + session.submit.assert_called_once_with(ANY, 'keyspace1', 'table1', None, None, None, ANY, rf) def test_other_result_message_kind(self): session = self.make_session() From 3825c3a9120040050c20e7e80b905ebc65bb03d6 Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Tue, 12 May 2015 09:06:22 -0500 Subject: [PATCH 32/33] length validation for custom payload map --- cassandra/protocol.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 1a553102..b0f9e6e2 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -103,6 +103,8 @@ class _MessageType(object): if not self.custom_payload: self.custom_payload = {} self.custom_payload.update(other) + if len(self.custom_payload) > 65535: + raise ValueError("Custom payload map exceeds max count allowed by protocol (65535)") def __repr__(self): return '<%s(%s)>' % (self.__class__.__name__, ', '.join('%s=%r' % i for i in _get_params(self))) From cef495e8563a36152cf23e3b71646d2b7f15b121 Mon Sep 17 00:00:00 2001 From: Kishan Karunaratne Date: Wed, 13 May 2015 16:53:44 -0700 Subject: [PATCH 33/33] fixed typo in test harness detecting C* 3.0 version --- tests/integration/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 3ea397f0..8933eefd 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -118,11 +118,11 @@ else: log.info('Using Cassandra version: %s', CASSANDRA_VERSION) CCM_KWARGS['version'] = CASSANDRA_VERSION -if CASSANDRA_VERSION > '3.0': +if CASSANDRA_VERSION >= '3.0': default_protocol_version = 4 -elif CASSANDRA_VERSION > '2.1': +elif CASSANDRA_VERSION >= '2.1': default_protocol_version = 3 -elif CASSANDRA_VERSION > '2.0': +elif CASSANDRA_VERSION >= '2.0': default_protocol_version = 2 else: default_protocol_version = 1