Merge pull request #268 from datastax/PYTHON-211

PYTHON-211 - UDF metadata in Cassandra 3.0
This commit is contained in:
Adam Holmberg
2015-04-20 10:19:07 -05:00
8 changed files with 755 additions and 38 deletions

View File

@@ -126,6 +126,58 @@ def consistency_value_to_name(value):
return ConsistencyLevel.value_to_name[value] if value is not None else "Not Set"
class SignatureDescriptor(object):
def __init__(self, name, type_signature):
self.name = name
self.type_signature = type_signature
@property
def signature(self):
"""
function signatue string in the form 'name([type0[,type1[...]]])'
can be used to uniquely identify overloaded function names within a keyspace
"""
return self.format_signature(self.name, self.type_signature)
@staticmethod
def format_signature(name, type_signature):
return "%s(%s)" % (name, ','.join(t for t in type_signature))
class UserFunctionDescriptor(SignatureDescriptor):
"""
Describes a User function by name and argument signature
"""
name = None
"""
name of the function
"""
type_signature = None
"""
Ordered list of CQL argument type name comprising the type signature
"""
class UserAggregateDescriptor(SignatureDescriptor):
"""
Describes a User aggregate function by name and argument signature
"""
name = None
"""
name of the aggregate
"""
type_signature = None
"""
Ordered list of CQL argument type name comprising the type signature
"""
class Unavailable(Exception):
"""
There were not enough live replicas to satisfy the requested consistency

View File

@@ -1116,9 +1116,29 @@ class Cluster(object):
for pool in session._pools.values():
pool.ensure_core_connections()
def refresh_schema(self, keyspace=None, table=None, usertype=None, max_schema_agreement_wait=None):
def _validate_refresh_schema(self, keyspace, table, usertype, function, aggregate):
if any((table, usertype, function, aggregate)):
if not keyspace:
raise ValueError("keyspace is required to refresh specific sub-entity {table, usertype, function, aggregate}")
if sum(1 for e in (table, usertype, function) if e) > 1:
raise ValueError("{table, usertype, function, aggregate} are mutually exclusive")
def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, aggregate=None, max_schema_agreement_wait=None):
"""
Synchronously refresh the schema metadata.
Synchronously refresh schema metadata.
{keyspace, table, usertype} are string names of the respective entities.
``function`` is a :class:`cassandra.UserFunctionDescriptor`.
``aggregate`` is a :class:`cassandra.UserAggregateDescriptor`.
If none of ``{keyspace, table, usertype, function, aggregate}`` are specified, the entire schema is refreshed.
If any of ``{keyspace, table, usertype, function, aggregate}`` are specified, ``keyspace`` is required.
If only ``keyspace`` is specified, just the top-level keyspace metadata is refreshed (e.g. replication).
The remaining arguments ``{table, usertype, function, aggregate}``
are mutually exclusive -- only one may be specified.
By default, the timeout for this operation is governed by :attr:`~.Cluster.max_schema_agreement_wait`
and :attr:`~.Cluster.control_connection_timeout`.
@@ -1129,17 +1149,19 @@ class Cluster(object):
An Exception is raised if schema refresh fails for any reason.
"""
if not self.control_connection.refresh_schema(keyspace, table, usertype, max_schema_agreement_wait):
self._validate_refresh_schema(keyspace, table, usertype, function, aggregate)
if not self.control_connection.refresh_schema(keyspace, table, usertype, function,
aggregate, max_schema_agreement_wait):
raise Exception("Schema was not refreshed. See log for details.")
def submit_schema_refresh(self, keyspace=None, table=None, usertype=None):
def submit_schema_refresh(self, keyspace=None, table=None, usertype=None, function=None, aggregate=None):
"""
Schedule a refresh of the internal representation of the current
schema for this cluster. If `keyspace` is specified, only that
keyspace will be refreshed, and likewise for `table`.
schema for this cluster. See :meth:`~.refresh_schema` for description of parameters.
"""
self._validate_refresh_schema(keyspace, table, usertype, function, aggregate)
return self.executor.submit(
self.control_connection.refresh_schema, keyspace, table, usertype)
self.control_connection.refresh_schema, keyspace, table, usertype, function, aggregate)
def refresh_nodes(self):
"""
@@ -1820,6 +1842,8 @@ class ControlConnection(object):
_SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies"
_SELECT_COLUMNS = "SELECT * FROM system.schema_columns"
_SELECT_USERTYPES = "SELECT * FROM system.schema_usertypes"
_SELECT_FUNCTIONS = "SELECT * FROM system.schema_functions"
_SELECT_AGGREGATES = "SELECT * FROM system.schema_aggregates"
_SELECT_TRIGGERS = "SELECT * FROM system.schema_triggers"
_SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers"
@@ -2007,16 +2031,16 @@ class ControlConnection(object):
self._connection.close()
del self._connection
def refresh_schema(self, keyspace=None, table=None, usertype=None,
schema_agreement_wait=None):
def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None,
aggregate=None, schema_agreement_wait=None):
if not self._meta_refresh_enabled:
log.debug("[control connection] Skipping schema refresh because meta refresh is disabled")
return False
try:
if self._connection:
return self._refresh_schema(self._connection, keyspace, table, usertype,
schema_agreement_wait=schema_agreement_wait)
return self._refresh_schema(self._connection, keyspace, table, usertype, function,
aggregate, schema_agreement_wait=schema_agreement_wait)
except ReferenceError:
pass # our weak reference to the Cluster is no good
except Exception:
@@ -2024,12 +2048,12 @@ class ControlConnection(object):
self._signal_error()
return False
def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None,
preloaded_results=None, schema_agreement_wait=None):
def _refresh_schema(self, connection, keyspace=None, table=None, usertype=None, function=None,
aggregate=None, preloaded_results=None, schema_agreement_wait=None):
if self._cluster.is_shutdown:
return False
assert table is None or usertype is None
assert sum(1 for arg in (table, usertype, function, aggregate) if arg) <= 1
agreed = self.wait_for_schema_agreement(connection,
preloaded_results=preloaded_results,
@@ -2073,6 +2097,24 @@ class ControlConnection(object):
log.debug("[control connection] Fetched user type info for %s.%s, rebuilding metadata", keyspace, usertype)
types_result = dict_factory(*types_result.results) if types_result.results else {}
self._cluster.metadata.usertype_changed(keyspace, usertype, types_result)
elif function:
# user defined function within this keyspace changed
where_clause = " WHERE keyspace_name = '%s' AND function_name = '%s' AND signature = [%s]" \
% (keyspace, function.name, ','.join("'%s'" % t for t in function.type_signature))
functions_query = QueryMessage(query=self._SELECT_FUNCTIONS + where_clause, consistency_level=cl)
functions_result = connection.wait_for_response(functions_query)
log.debug("[control connection] Fetched user function info for %s.%s, rebuilding metadata", keyspace, function.signature)
functions_result = dict_factory(*functions_result.results) if functions_result.results else {}
self._cluster.metadata.function_changed(keyspace, function, functions_result)
elif aggregate:
# user defined aggregate within this keyspace changed
where_clause = " WHERE keyspace_name = '%s' AND aggregate_name = '%s' AND signature = [%s]" \
% (keyspace, aggregate.name, ','.join("'%s'" % t for t in aggregate.type_signature))
aggregates_query = QueryMessage(query=self._SELECT_AGGREGATES + where_clause, consistency_level=cl)
aggregates_result = connection.wait_for_response(aggregates_query)
log.debug("[control connection] Fetched user aggregate info for %s.%s, rebuilding metadata", keyspace, aggregate.signature)
aggregates_result = dict_factory(*aggregates_result.results) if aggregates_result.results else {}
self._cluster.metadata.aggregate_changed(keyspace, aggregate, aggregates_result)
elif keyspace:
# only the keyspace itself changed (such as replication settings)
where_clause = " WHERE keyspace_name = '%s'" % (keyspace,)
@@ -2088,12 +2130,16 @@ class ControlConnection(object):
QueryMessage(query=self._SELECT_COLUMN_FAMILIES, consistency_level=cl),
QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl),
QueryMessage(query=self._SELECT_USERTYPES, consistency_level=cl),
QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl),
QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl),
QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl)
]
responses = connection.wait_for_responses(*queries, timeout=self._timeout, fail_on_error=False)
(ks_success, ks_result), (cf_success, cf_result), \
(col_success, col_result), (types_success, types_result), \
(functions_success, functions_result), \
(aggregates_success, aggregates_result), \
(trigger_success, triggers_result) = responses
if ks_success:
@@ -2135,8 +2181,29 @@ class ControlConnection(object):
else:
raise types_result
# functions were introduced in Cassandra 3.0
if functions_success:
functions_result = dict_factory(*functions_result.results) if functions_result.results else {}
else:
if isinstance(functions_result, InvalidRequest):
log.debug("[control connection] user functions table not found")
functions_result = {}
else:
raise functions_result
# aggregates were introduced in Cassandra 3.0
if aggregates_success:
aggregates_result = dict_factory(*aggregates_result.results) if aggregates_result.results else {}
else:
if isinstance(aggregates_result, InvalidRequest):
log.debug("[control connection] user aggregates table not found")
aggregates_result = {}
else:
raise aggregates_result
log.debug("[control connection] Fetched schema, rebuilding metadata")
self._cluster.metadata.rebuild_schema(ks_result, types_result, cf_result, col_result, triggers_result)
self._cluster.metadata.rebuild_schema(ks_result, types_result, functions_result,
aggregates_result, cf_result, col_result, triggers_result)
return True
def refresh_node_list_and_token_map(self, force_token_rebuild=False):
@@ -2280,12 +2347,13 @@ class ControlConnection(object):
def _handle_schema_change(self, event):
if self._schema_event_refresh_window < 0:
return
keyspace = event.get('keyspace')
table = event.get('table')
usertype = event.get('type')
function = event.get('function')
aggregate = event.get('aggregate')
delay = random() * self._schema_event_refresh_window
self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype)
self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype, function, aggregate)
def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None):
@@ -2514,19 +2582,20 @@ class _Scheduler(object):
exc_info=exc)
def refresh_schema_and_set_result(keyspace, table, usertype, control_conn, response_future):
def refresh_schema_and_set_result(keyspace, table, usertype, function, aggregate, control_conn, response_future):
try:
if control_conn._meta_refresh_enabled:
log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s",
keyspace, table, usertype)
control_conn._refresh_schema(response_future._connection, keyspace, table, usertype)
log.debug("Refreshing schema in response to schema change. "
"Keyspace: %s; Table: %s, Type: %s, Function: %s, Aggregate: %s",
keyspace, table, usertype, function, aggregate)
control_conn._refresh_schema(response_future._connection, keyspace, table, usertype, function, aggregate)
else:
log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; "
"Keyspace: %s; Table: %s, Type: %s", keyspace, table, usertype)
"Keyspace: %s; Table: %s, Type: %s, Function: %s", keyspace, table, usertype, function, aggregate)
except Exception:
log.exception("Exception refreshing schema in response to schema change:")
response_future.session.submit(
control_conn.refresh_schema, keyspace, table, usertype)
control_conn.refresh_schema, keyspace, table, usertype, function, aggregate)
finally:
response_future._set_final_result(None)
@@ -2711,6 +2780,8 @@ class ResponseFuture(object):
response.results['keyspace'],
response.results.get('table'),
response.results.get('type'),
response.results.get('function'),
response.results.get('aggregate'),
self.session.cluster.control_connection,
self)
else:

View File

@@ -28,7 +28,8 @@ import types
from uuid import UUID
import six
from cassandra.util import OrderedDict, OrderedMap, sortedset, Time
from cassandra.util import (OrderedDict, OrderedMap, OrderedMapSerializedKey,
sortedset, Time)
if six.PY3:
long = int
@@ -79,6 +80,7 @@ class Encoder(object):
dict: self.cql_encode_map_collection,
OrderedDict: self.cql_encode_map_collection,
OrderedMap: self.cql_encode_map_collection,
OrderedMapSerializedKey: self.cql_encode_map_collection,
list: self.cql_encode_list_collection,
tuple: self.cql_encode_list_collection,
set: self.cql_encode_set_collection,

View File

@@ -19,9 +19,10 @@ from itertools import islice, cycle
import json
import logging
import re
import six
from six.moves import zip
from threading import RLock
import weakref
import six
murmur3 = None
try:
@@ -29,7 +30,9 @@ try:
except ImportError as e:
pass
from cassandra import SignatureDescriptor
import cassandra.cqltypes as types
from cassandra.encoder import Encoder
from cassandra.marshal import varint_unpack
from cassandra.pool import Host
from cassandra.util import OrderedDict
@@ -88,7 +91,8 @@ class Metadata(object):
"""
return "\n".join(ks.export_as_string() for ks in self.keyspaces.values())
def rebuild_schema(self, ks_results, type_results, cf_results, col_results, triggers_result):
def rebuild_schema(self, ks_results, type_results, function_results,
aggregate_results, cf_results, col_results, triggers_result):
"""
Rebuild the view of the current schema from a fresh set of rows from
the system schema tables.
@@ -98,6 +102,8 @@ class Metadata(object):
cf_def_rows = defaultdict(list)
col_def_rows = defaultdict(lambda: defaultdict(list))
usertype_rows = defaultdict(list)
fn_rows = defaultdict(list)
agg_rows = defaultdict(list)
trigger_rows = defaultdict(lambda: defaultdict(list))
for row in cf_results:
@@ -111,6 +117,12 @@ class Metadata(object):
for row in type_results:
usertype_rows[row["keyspace_name"]].append(row)
for row in function_results:
fn_rows[row["keyspace_name"]].append(row)
for row in aggregate_results:
agg_rows[row["keyspace_name"]].append(row)
for row in triggers_result:
ksname = row["keyspace_name"]
cfname = row["columnfamily_name"]
@@ -131,6 +143,14 @@ class Metadata(object):
usertype = self._build_usertype(keyspace_meta.name, usertype_row)
keyspace_meta.user_types[usertype.name] = usertype
for fn_row in fn_rows.get(keyspace_meta.name, []):
fn = self._build_function(keyspace_meta.name, fn_row)
keyspace_meta.functions[fn.signature] = fn
for agg_row in agg_rows.get(keyspace_meta.name, []):
agg = self._build_aggregate(keyspace_meta.name, agg_row)
keyspace_meta.aggregates[agg.signature] = agg
current_keyspaces.add(keyspace_meta.name)
old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None)
self.keyspaces[keyspace_meta.name] = keyspace_meta
@@ -140,8 +160,8 @@ class Metadata(object):
self._keyspace_added(keyspace_meta.name)
# remove not-just-added keyspaces
removed_keyspaces = [ksname for ksname in self.keyspaces.keys()
if ksname not in current_keyspaces]
removed_keyspaces = [name for name in self.keyspaces.keys()
if name not in current_keyspaces]
self.keyspaces = dict((name, meta) for name, meta in self.keyspaces.items()
if name in current_keyspaces)
for ksname in removed_keyspaces:
@@ -160,6 +180,8 @@ class Metadata(object):
if old_keyspace_meta:
keyspace_meta.tables = old_keyspace_meta.tables
keyspace_meta.user_types = old_keyspace_meta.user_types
keyspace_meta.functions = old_keyspace_meta.functions
keyspace_meta.aggregates = old_keyspace_meta.aggregates
if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy):
self._keyspace_updated(keyspace)
else:
@@ -173,6 +195,22 @@ class Metadata(object):
# the type was deleted
self.keyspaces[keyspace].user_types.pop(name, None)
def function_changed(self, keyspace, function, function_results):
if function_results:
new_function = self._build_function(keyspace, function_results[0])
self.keyspaces[keyspace].functions[function.signature] = new_function
else:
# the function was deleted
self.keyspaces[keyspace].functions.pop(function.signature, None)
def aggregate_changed(self, keyspace, aggregate, aggregate_results):
if aggregate_results:
new_aggregate = self._build_aggregate(keyspace, aggregate_results[0])
self.keyspaces[keyspace].aggregates[aggregate.signature] = new_aggregate
else:
# the aggregate was deleted
self.keyspaces[keyspace].aggregates.pop(aggregate.signature, None)
def table_changed(self, keyspace, table, cf_results, col_results, triggers_result):
try:
keyspace_meta = self.keyspaces[keyspace]
@@ -215,6 +253,23 @@ class Metadata(object):
return UserType(usertype_row['keyspace_name'], usertype_row['type_name'],
usertype_row['field_names'], type_classes)
def _build_function(self, keyspace, function_row):
return_type = types.lookup_casstype(function_row['return_type'])
return Function(function_row['keyspace_name'], function_row['function_name'],
function_row['signature'], function_row['argument_names'],
return_type, function_row['language'], function_row['body'],
function_row['is_deterministic'], function_row['called_on_null_input'])
def _build_aggregate(self, keyspace, aggregate_row):
state_type = types.lookup_casstype(aggregate_row['state_type'])
initial_condition = aggregate_row['initcond']
if initial_condition is not None:
initial_condition = state_type.deserialize(initial_condition, 3)
return_type = types.lookup_casstype(aggregate_row['return_type'])
return Aggregate(aggregate_row['keyspace_name'], aggregate_row['aggregate_name'],
aggregate_row['signature'], aggregate_row['state_func'], state_type,
aggregate_row['final_func'], initial_condition, return_type)
def _build_table_metadata(self, keyspace_metadata, row, col_rows, trigger_rows):
cfname = row["columnfamily_name"]
cf_col_rows = col_rows.get(cfname, [])
@@ -735,24 +790,43 @@ class KeyspaceMetadata(object):
user_types = None
"""
A map from user-defined type names to instances of :class:`~cassandra.metadata..UserType`.
A map from user-defined type names to instances of :class:`~cassandra.metadata.UserType`.
.. versionadded:: 2.1.0
"""
functions = None
"""
A map from user-defined function signatures to instances of :class:`~cassandra.metadata.Function`.
.. versionadded:: 3.0.0
"""
aggregates = None
"""
A map from user-defined aggregate signatures to instances of :class:`~cassandra.metadata.Aggregate`.
.. versionadded:: 3.0.0
"""
def __init__(self, name, durable_writes, strategy_class, strategy_options):
self.name = name
self.durable_writes = durable_writes
self.replication_strategy = ReplicationStrategy.create(strategy_class, strategy_options)
self.tables = {}
self.user_types = {}
self.functions = {}
self.aggregates = {}
def export_as_string(self):
"""
Returns a CQL query string that can be used to recreate the entire keyspace,
including user-defined types and tables.
"""
return "\n\n".join([self.as_cql_query()] + self.user_type_strings() + [t.export_as_string() for t in self.tables.values()])
return "\n\n".join([self.as_cql_query()]
+ self.user_type_strings()
+ [f.as_cql_query(True) for f in self.functions.values()]
+ [a.as_cql_query(True) for a in self.aggregates.values()]
+ [t.export_as_string() for t in self.tables.values()])
def as_cql_query(self):
"""
@@ -843,6 +917,191 @@ class UserType(object):
return ret
class Aggregate(object):
"""
A user defined aggregate function, as created by ``CREATE AGGREGATE`` statements.
Aggregate functions were introduced in Cassandra 3.0
.. versionadded:: 3.0.0
"""
keyspace = None
"""
The string name of the keyspace in which this aggregate is defined
"""
name = None
"""
The name of this aggregate
"""
type_signature = None
"""
An ordered list of the types for each argument to the aggregate
"""
final_func = None
"""
Name of a final function
"""
initial_condition = None
"""
Initial condition of the aggregate
"""
return_type = None
"""
Return type of the aggregate
"""
state_func = None
"""
Name of a state function
"""
state_type = None
"""
Flag indicating whether this function is deterministic
(required for functional indexes)
"""
def __init__(self, keyspace, name, type_signature, state_func,
state_type, final_func, initial_condition, return_type):
self.keyspace = keyspace
self.name = name
self.type_signature = type_signature
self.state_func = state_func
self.state_type = state_type
self.final_func = final_func
self.initial_condition = initial_condition
self.return_type = return_type
def as_cql_query(self, formatted=False):
"""
Returns a CQL query that can be used to recreate this aggregate.
If `formatted` is set to :const:`True`, extra whitespace will
be added to make the query more readable.
"""
sep = '\n' if formatted else ' '
keyspace = protect_name(self.keyspace)
name = protect_name(self.name)
type_list = ', '.join(self.type_signature)
state_func = protect_name(self.state_func)
state_type = self.state_type.cql_parameterized_type()
ret = "CREATE AGGREGATE %(keyspace)s.%(name)s(%(type_list)s)%(sep)s" \
"SFUNC %(state_func)s%(sep)s" \
"STYPE %(state_type)s" % locals()
ret += ''.join((sep, 'FINALFUNC ', protect_name(self.final_func))) if self.final_func else ''
ret += ''.join((sep, 'INITCOND ', Encoder().cql_encode_all_types(self.initial_condition)))\
if self.initial_condition is not None else ''
return ret
@property
def signature(self):
return SignatureDescriptor.format_signature(self.name, self.type_signature)
class Function(object):
"""
A user defined function, as created by ``CREATE FUNCTION`` statements.
User-defined functions were introduced in Cassandra 3.0
.. versionadded:: 3.0.0
"""
keyspace = None
"""
The string name of the keyspace in which this function is defined
"""
name = None
"""
The name of this function
"""
type_signature = None
"""
An ordered list of the types for each argument to the function
"""
arguemnt_names = None
"""
An ordered list of the names of each argument to the function
"""
return_type = None
"""
Return type of the function
"""
language = None
"""
Language of the function body
"""
body = None
"""
Function body string
"""
is_deterministic = None
"""
Flag indicating whether this function is deterministic
(required for functional indexes)
"""
called_on_null_input = None
"""
Flag indicating whether this function should be called for rows with null values
(convenience function to avoid handling nulls explicitly if the result will just be null)
"""
def __init__(self, keyspace, name, type_signature, argument_names,
return_type, language, body, is_deterministic, called_on_null_input):
self.keyspace = keyspace
self.name = name
self.type_signature = type_signature
self.argument_names = argument_names
self.return_type = return_type
self.language = language
self.body = body
self.is_deterministic = is_deterministic
self.called_on_null_input = called_on_null_input
def as_cql_query(self, formatted=False):
"""
Returns a CQL query that can be used to recreate this function.
If `formatted` is set to :const:`True`, extra whitespace will
be added to make the query more readable.
"""
sep = '\n' if formatted else ' '
keyspace = protect_name(self.keyspace)
name = protect_name(self.name)
arg_list = ', '.join(["%s %s" % (protect_name(n), t)
for n, t in zip(self.argument_names, self.type_signature)])
determ = '' if self.is_deterministic else 'NON DETERMINISTIC '
typ = self.return_type.cql_parameterized_type()
lang = self.language
body = protect_value(self.body)
on_null = "CALLED" if self.called_on_null_input else "RETURNS NULL"
return "CREATE %(determ)sFUNCTION %(keyspace)s.%(name)s(%(arg_list)s)%(sep)s" \
"%(on_null)s ON NULL INPUT%(sep)s" \
"RETURNS %(typ)s%(sep)s" \
"LANGUAGE %(lang)s%(sep)s" \
"AS %(body)s;" % locals()
@property
def signature(self):
return SignatureDescriptor.format_signature(self.name, self.type_signature)
class TableMetadata(object):
"""
A representation of the schema for a single table.

View File

@@ -24,7 +24,8 @@ import io
from cassandra import (Unavailable, WriteTimeout, ReadTimeout,
WriteFailure, ReadFailure, FunctionFailure,
AlreadyExists, InvalidRequest, Unauthorized,
UnsupportedOperation)
UnsupportedOperation, UserFunctionDescriptor,
UserAggregateDescriptor)
from cassandra.marshal import (int32_pack, int32_unpack, uint16_pack, uint16_unpack,
int8_pack, int8_unpack, uint64_pack, header_pack,
v3_header_pack)
@@ -903,15 +904,20 @@ class EventMessage(_MessageType):
if protocol_version >= 3:
target = read_string(f)
keyspace = read_string(f)
event = {'change_type': change_type, 'keyspace': keyspace}
if target != "KEYSPACE":
table_or_type = read_string(f)
return {'change_type': change_type, 'keyspace': keyspace, target.lower(): table_or_type}
else:
return {'change_type': change_type, 'keyspace': keyspace}
target_name = read_string(f)
if target == 'FUNCTION':
event['function'] = UserFunctionDescriptor(target_name, [read_string(f) for _ in range(read_short(f))])
elif target == 'AGGREGATE':
event['aggregate'] = UserAggregateDescriptor(target_name, [read_string(f) for _ in range(read_short(f))])
else:
event[target.lower()] = target_name
else:
keyspace = read_string(f)
table = read_string(f)
return {'change_type': change_type, 'keyspace': keyspace, 'table': table}
event = {'change_type': change_type, 'keyspace': keyspace, 'table': table}
return event
def write_header(f, version, flags, stream_id, opcode, length):

View File

@@ -14,6 +14,14 @@
.. autoclass:: ConsistencyLevel
:members:
.. autoclass:: UserFunctionDescriptor
:members:
:inherited-members:
.. autoclass:: UserAggregateDescriptor
:members:
:inherited-members:
.. autoexception:: Unavailable()
:members:

View File

@@ -13,6 +13,15 @@ Schemas
.. autoclass:: KeyspaceMetadata ()
:members:
.. autoclass:: UserType ()
:members:
.. autoclass:: Function ()
:members:
.. autoclass:: Aggregate ()
:members:
.. autoclass:: TableMetadata ()
:members:

View File

@@ -22,11 +22,13 @@ from mock import Mock
import six
import sys
from cassandra import AlreadyExists
from cassandra import AlreadyExists, SignatureDescriptor
from cassandra.cluster import Cluster
from cassandra.cqltypes import DoubleType, Int32Type, ListType, UTF8Type, MapType
from cassandra.encoder import Encoder
from cassandra.metadata import (Metadata, KeyspaceMetadata, TableMetadata,
Token, MD5Token, TokenMap, murmur3)
Token, MD5Token, TokenMap, murmur3, Function, Aggregate)
from cassandra.policies import SimpleConvictionPolicy
from cassandra.pool import Host
@@ -928,3 +930,311 @@ class KeyspaceAlterMetadata(unittest.TestCase):
new_keyspace_meta = self.cluster.metadata.keyspaces[name]
self.assertNotEqual(original_keyspace_meta, new_keyspace_meta)
self.assertEqual(new_keyspace_meta.durable_writes, False)
class FunctionTest(unittest.TestCase):
"""
Base functionality for Function and Aggregate metadata test classes
"""
@property
def function_name(self):
return self._testMethodName.lower()
@classmethod
def setup_class(cls):
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest("Function metadata requires native protocol version 4+")
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
cls.keyspace_name = cls.__name__.lower()
cls.session = cls.cluster.connect()
cls.session.execute("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" % cls.keyspace_name)
cls.session.set_keyspace(cls.keyspace_name)
cls.keyspace_function_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].functions
cls.keyspace_aggregate_meta = cls.cluster.metadata.keyspaces[cls.keyspace_name].aggregates
@classmethod
def teardown_class(cls):
cls.session.execute("DROP KEYSPACE IF EXISTS %s" % cls.keyspace_name)
cls.cluster.shutdown()
class Verified(object):
def __init__(self, test_case, meta_class, element_meta, **function_kwargs):
self.test_case = test_case
self.function_kwargs = dict(function_kwargs)
self.meta_class = meta_class
self.element_meta = element_meta
def __enter__(self):
tc = self.test_case
expected_meta = self.meta_class(**self.function_kwargs)
tc.assertNotIn(expected_meta.signature, self.element_meta)
tc.session.execute(expected_meta.as_cql_query())
tc.assertIn(expected_meta.signature, self.element_meta)
generated_meta = self.element_meta[expected_meta.signature]
self.test_case.assertEqual(generated_meta.as_cql_query(), expected_meta.as_cql_query())
return self
def __exit__(self, exc_type, exc_val, exc_tb):
tc = self.test_case
tc.session.execute("DROP %s %s.%s" % (self.meta_class.__name__, tc.keyspace_name, self.signature))
tc.assertNotIn(self.signature, self.element_meta)
@property
def signature(self):
return SignatureDescriptor.format_signature(self.function_kwargs['name'],
self.function_kwargs['type_signature'])
class VerifiedFunction(Verified):
def __init__(self, test_case, **kwargs):
super(FunctionTest.VerifiedFunction, self).__init__(test_case, Function, test_case.keyspace_function_meta, **kwargs)
class VerifiedAggregate(Verified):
def __init__(self, test_case, **kwargs):
super(FunctionTest.VerifiedAggregate, self).__init__(test_case, Aggregate, test_case.keyspace_aggregate_meta, **kwargs)
class FunctionMetadata(FunctionTest):
def make_function_kwargs(self, deterministic=True, called_on_null=True):
return {'keyspace': self.keyspace_name,
'name': self.function_name,
'type_signature': ['double', 'int'],
'argument_names': ['d', 'i'],
'return_type': DoubleType,
'language': 'java',
'body': 'return new Double(0.0);',
'is_deterministic': deterministic,
'called_on_null_input': called_on_null}
def test_functions_after_udt(self):
self.assertNotIn(self.function_name, self.keyspace_function_meta)
udt_name = 'udtx'
self.session.execute("CREATE TYPE %s (x int)" % udt_name)
# Ideally we would make a function that takes a udt type, but
# this presently fails because C* c059a56 requires udt to be frozen to create, but does not store meta indicating frozen
# https://issues.apache.org/jira/browse/CASSANDRA-9186
# Maybe update this after release
#kwargs = self.make_function_kwargs()
#kwargs['type_signature'][0] = "frozen<%s>" % udt_name
#expected_meta = Function(**kwargs)
#with self.VerifiedFunction(self, **kwargs):
with self.VerifiedFunction(self, **self.make_function_kwargs()):
# udts must come before functions in keyspace dump
keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string()
type_idx = keyspace_cql.rfind("CREATE TYPE")
func_idx = keyspace_cql.find("CREATE FUNCTION")
self.assertNotIn(-1, (type_idx, func_idx), "TYPE or FUNCTION not found in keyspace_cql: " + keyspace_cql)
self.assertGreater(func_idx, type_idx)
def test_function_same_name_diff_types(self):
kwargs = self.make_function_kwargs()
with self.VerifiedFunction(self, **kwargs):
# another function: same name, different type sig.
self.assertGreater(len(kwargs['type_signature']), 1)
self.assertGreater(len(kwargs['argument_names']), 1)
kwargs['type_signature'] = kwargs['type_signature'][:1]
kwargs['argument_names'] = kwargs['argument_names'][:1]
with self.VerifiedFunction(self, **kwargs):
functions = [f for f in self.keyspace_function_meta.values() if f.name == self.function_name]
self.assertEqual(len(functions), 2)
self.assertNotEqual(functions[0].type_signature, functions[1].type_signature)
def test_functions_follow_keyspace_alter(self):
with self.VerifiedFunction(self, **self.make_function_kwargs()):
original_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name]
self.session.execute('ALTER KEYSPACE %s WITH durable_writes = false' % self.keyspace_name)
try:
new_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name]
self.assertNotEqual(original_keyspace_meta, new_keyspace_meta)
self.assertIs(original_keyspace_meta.functions, new_keyspace_meta.functions)
finally:
self.session.execute('ALTER KEYSPACE %s WITH durable_writes = true' % self.keyspace_name)
def test_function_cql_determinism(self):
kwargs = self.make_function_kwargs()
kwargs['is_deterministic'] = True
with self.VerifiedFunction(self, **kwargs) as vf:
fn_meta = self.keyspace_function_meta[vf.signature]
self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*")
kwargs['is_deterministic'] = False
with self.VerifiedFunction(self, **kwargs) as vf:
fn_meta = self.keyspace_function_meta[vf.signature]
self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE NON DETERMINISTIC FUNCTION.*")
def test_function_cql_called_on_null(self):
kwargs = self.make_function_kwargs()
kwargs['called_on_null_input'] = True
with self.VerifiedFunction(self, **kwargs) as vf:
fn_meta = self.keyspace_function_meta[vf.signature]
self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) CALLED ON NULL INPUT RETURNS .*")
kwargs['called_on_null_input'] = False
with self.VerifiedFunction(self, **kwargs) as vf:
fn_meta = self.keyspace_function_meta[vf.signature]
self.assertRegexpMatches(fn_meta.as_cql_query(), "CREATE FUNCTION.*\) RETURNS NULL ON NULL INPUT RETURNS .*")
class AggregateMetadata(FunctionTest):
@classmethod
def setup_class(cls):
super(AggregateMetadata, cls).setup_class()
cls.session.execute("""CREATE OR REPLACE FUNCTION sum_int(s int, i int)
RETURNS NULL ON NULL INPUT
RETURNS int
LANGUAGE javascript AS 's + i';""")
cls.session.execute("""CREATE OR REPLACE FUNCTION sum_int_two(s int, i int, j int)
RETURNS NULL ON NULL INPUT
RETURNS int
LANGUAGE javascript AS 's + i + j';""")
cls.session.execute("""CREATE OR REPLACE FUNCTION "List_As_String"(l list<text>)
RETURNS NULL ON NULL INPUT
RETURNS int
LANGUAGE javascript AS ''''' + l';""")
cls.session.execute("""CREATE OR REPLACE FUNCTION extend_list(s list<text>, i int)
CALLED ON NULL INPUT
RETURNS list<text>
LANGUAGE java AS 'if (i != null) s.add(i.toString()); return s;';""")
cls.session.execute("""CREATE OR REPLACE FUNCTION update_map(s map<int, int>, i int)
RETURNS NULL ON NULL INPUT
RETURNS map<int, int>
LANGUAGE java AS 's.put(new Integer(i), new Integer(i)); return s;';""")
cls.session.execute("""CREATE TABLE IF NOT EXISTS t
(k int PRIMARY KEY, v int)""")
for x in range(4):
cls.session.execute("INSERT INTO t (k,v) VALUES (%s, %s)", (x, x))
cls.session.execute("INSERT INTO t (k) VALUES (%s)", (4,))
def make_aggregate_kwargs(self, state_func, state_type, final_func=None, init_cond=None):
return {'keyspace': self.keyspace_name,
'name': self.function_name + '_aggregate',
'type_signature': ['int'],
'state_func': state_func,
'state_type': state_type,
'final_func': final_func,
'initial_condition': init_cond,
'return_type': "does not matter for creation"}
def test_return_type_meta(self):
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=1)) as va:
self.assertIs(self.keyspace_aggregate_meta[va.signature].return_type, Int32Type)
def test_init_cond(self):
# This is required until the java driver bundled with C* is updated to support v4
c = Cluster(protocol_version=3)
s = c.connect(self.keyspace_name)
expected_values = range(4)
# int32
for init_cond in (-1, 0, 1):
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=init_cond)) as va:
sum_res = s.execute("SELECT %s(v) AS sum FROM t" % va.function_kwargs['name'])[0].sum
self.assertEqual(sum_res, init_cond + sum(expected_values))
# list<text>
for init_cond in ([], ['1', '2']):
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]), init_cond=init_cond)) as va:
list_res = s.execute("SELECT %s(v) AS list_res FROM t" % va.function_kwargs['name'])[0].list_res
self.assertListEqual(list_res[:len(init_cond)], init_cond)
self.assertEqual(set(i for i in list_res[len(init_cond):]),
set(str(i) for i in expected_values))
# map<int,int>
expected_map_values = dict((i, i) for i in expected_values)
expected_key_set = set(expected_values)
for init_cond in ({}, {1: 2, 3: 4}, {5: 5}):
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('update_map', MapType.apply_parameters([Int32Type, Int32Type]), init_cond=init_cond)) as va:
map_res = s.execute("SELECT %s(v) AS map_res FROM t" % va.function_kwargs['name'])[0].map_res
self.assertDictContainsSubset(expected_map_values, map_res)
init_not_updated = dict((k, init_cond[k]) for k in set(init_cond) - expected_key_set)
self.assertDictContainsSubset(init_not_updated, map_res)
c.shutdown()
def test_aggregates_after_functions(self):
# functions must come before functions in keyspace dump
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]))):
keyspace_cql = self.cluster.metadata.keyspaces[self.keyspace_name].export_as_string()
func_idx = keyspace_cql.find("CREATE FUNCTION")
aggregate_idx = keyspace_cql.rfind("CREATE AGGREGATE")
self.assertNotIn(-1, (aggregate_idx, func_idx), "AGGREGATE or FUNCTION not found in keyspace_cql: " + keyspace_cql)
self.assertGreater(aggregate_idx, func_idx)
def test_same_name_diff_types(self):
kwargs = self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=0)
with self.VerifiedAggregate(self, **kwargs):
kwargs['state_func'] = 'sum_int_two'
kwargs['type_signature'] = ['int', 'int']
with self.VerifiedAggregate(self, **kwargs):
aggregates = [a for a in self.keyspace_aggregate_meta.values() if a.name == kwargs['name']]
self.assertEqual(len(aggregates), 2)
self.assertNotEqual(aggregates[0].type_signature, aggregates[1].type_signature)
def test_aggregates_follow_keyspace_alter(self):
with self.VerifiedAggregate(self, **self.make_aggregate_kwargs('sum_int', Int32Type, init_cond=0)):
original_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name]
self.session.execute('ALTER KEYSPACE %s WITH durable_writes = false' % self.keyspace_name)
try:
new_keyspace_meta = self.cluster.metadata.keyspaces[self.keyspace_name]
self.assertNotEqual(original_keyspace_meta, new_keyspace_meta)
self.assertIs(original_keyspace_meta.aggregates, new_keyspace_meta.aggregates)
finally:
self.session.execute('ALTER KEYSPACE %s WITH durable_writes = true' % self.keyspace_name)
def test_cql_optional_params(self):
kwargs = self.make_aggregate_kwargs('extend_list', ListType.apply_parameters([UTF8Type]))
# no initial condition, final func
self.assertIsNone(kwargs['initial_condition'])
self.assertIsNone(kwargs['final_func'])
with self.VerifiedAggregate(self, **kwargs) as va:
meta = self.keyspace_aggregate_meta[va.signature]
self.assertIsNone(meta.initial_condition)
self.assertIsNone(meta.final_func)
cql = meta.as_cql_query()
self.assertEqual(cql.find('INITCOND'), -1)
self.assertEqual(cql.find('FINALFUNC'), -1)
# initial condition, no final func
kwargs['initial_condition'] = ['init', 'cond']
with self.VerifiedAggregate(self, **kwargs) as va:
meta = self.keyspace_aggregate_meta[va.signature]
self.assertListEqual(meta.initial_condition, kwargs['initial_condition'])
self.assertIsNone(meta.final_func)
cql = meta.as_cql_query()
search_string = "INITCOND %s" % Encoder().cql_encode_all_types(kwargs['initial_condition'])
self.assertGreater(cql.find(search_string), 0, '"%s" search string not found in cql:\n%s' % (search_string, cql))
self.assertEqual(cql.find('FINALFUNC'), -1)
# no initial condition, final func
kwargs['initial_condition'] = None
kwargs['final_func'] = 'List_As_String'
with self.VerifiedAggregate(self, **kwargs) as va:
meta = self.keyspace_aggregate_meta[va.signature]
self.assertIsNone(meta.initial_condition)
self.assertEqual(meta.final_func, kwargs['final_func'])
cql = meta.as_cql_query()
self.assertEqual(cql.find('INITCOND'), -1)
search_string = 'FINALFUNC "%s"' % kwargs['final_func']
self.assertGreater(cql.find(search_string), 0, '"%s" search string not found in cql:\n%s' % (search_string, cql))
# both
kwargs['initial_condition'] = ['init', 'cond']
kwargs['final_func'] = 'List_As_String'
with self.VerifiedAggregate(self, **kwargs) as va:
meta = self.keyspace_aggregate_meta[va.signature]
self.assertListEqual(meta.initial_condition, kwargs['initial_condition'])
self.assertEqual(meta.final_func, kwargs['final_func'])
cql = meta.as_cql_query()
init_cond_idx = cql.find("INITCOND %s" % Encoder().cql_encode_all_types(kwargs['initial_condition']))
final_func_idx = cql.find('FINALFUNC "%s"' % kwargs['final_func'])
self.assertNotIn(-1, (init_cond_idx, final_func_idx))
self.assertGreater(init_cond_idx, final_func_idx)