1019 lines
37 KiB
Python
1019 lines
37 KiB
Python
# Copyright 2013-2017 DataStax, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
This module holds classes for working with prepared statements and
|
|
specifying consistency levels and retry policies for individual
|
|
queries.
|
|
"""
|
|
|
|
from collections import namedtuple
|
|
from datetime import datetime, timedelta
|
|
import re
|
|
import struct
|
|
import time
|
|
import six
|
|
from six.moves import range, zip
|
|
|
|
from cassandra import ConsistencyLevel, OperationTimedOut
|
|
from cassandra.util import unix_time_from_uuid1
|
|
from cassandra.encoder import Encoder
|
|
import cassandra.encoder
|
|
from cassandra.protocol import _UNSET_VALUE
|
|
from cassandra.util import OrderedDict, _sanitize_identifiers
|
|
|
|
import logging
|
|
log = logging.getLogger(__name__)
|
|
|
|
UNSET_VALUE = _UNSET_VALUE
|
|
"""
|
|
Specifies an unset value when binding a prepared statement.
|
|
|
|
Unset values are ignored, allowing prepared statements to be used without specify
|
|
|
|
See https://issues.apache.org/jira/browse/CASSANDRA-7304 for further details on semantics.
|
|
|
|
.. versionadded:: 2.6.0
|
|
|
|
Only valid when using native protocol v4+
|
|
"""
|
|
|
|
NON_ALPHA_REGEX = re.compile('[^a-zA-Z0-9]')
|
|
START_BADCHAR_REGEX = re.compile('^[^a-zA-Z0-9]*')
|
|
END_BADCHAR_REGEX = re.compile('[^a-zA-Z0-9_]*$')
|
|
|
|
_clean_name_cache = {}
|
|
|
|
|
|
def _clean_column_name(name):
|
|
try:
|
|
return _clean_name_cache[name]
|
|
except KeyError:
|
|
clean = NON_ALPHA_REGEX.sub("_", START_BADCHAR_REGEX.sub("", END_BADCHAR_REGEX.sub("", name)))
|
|
_clean_name_cache[name] = clean
|
|
return clean
|
|
|
|
|
|
def tuple_factory(colnames, rows):
|
|
"""
|
|
Returns each row as a tuple
|
|
|
|
Example::
|
|
|
|
>>> from cassandra.query import tuple_factory
|
|
>>> session = cluster.connect('mykeyspace')
|
|
>>> session.row_factory = tuple_factory
|
|
>>> rows = session.execute("SELECT name, age FROM users LIMIT 1")
|
|
>>> print rows[0]
|
|
('Bob', 42)
|
|
|
|
.. versionchanged:: 2.0.0
|
|
moved from ``cassandra.decoder`` to ``cassandra.query``
|
|
"""
|
|
return rows
|
|
|
|
|
|
def named_tuple_factory(colnames, rows):
|
|
"""
|
|
Returns each row as a `namedtuple <https://docs.python.org/2/library/collections.html#collections.namedtuple>`_.
|
|
This is the default row factory.
|
|
|
|
Example::
|
|
|
|
>>> from cassandra.query import named_tuple_factory
|
|
>>> session = cluster.connect('mykeyspace')
|
|
>>> session.row_factory = named_tuple_factory
|
|
>>> rows = session.execute("SELECT name, age FROM users LIMIT 1")
|
|
>>> user = rows[0]
|
|
|
|
>>> # you can access field by their name:
|
|
>>> print "name: %s, age: %d" % (user.name, user.age)
|
|
name: Bob, age: 42
|
|
|
|
>>> # or you can access fields by their position (like a tuple)
|
|
>>> name, age = user
|
|
>>> print "name: %s, age: %d" % (name, age)
|
|
name: Bob, age: 42
|
|
>>> name = user[0]
|
|
>>> age = user[1]
|
|
>>> print "name: %s, age: %d" % (name, age)
|
|
name: Bob, age: 42
|
|
|
|
.. versionchanged:: 2.0.0
|
|
moved from ``cassandra.decoder`` to ``cassandra.query``
|
|
"""
|
|
clean_column_names = map(_clean_column_name, colnames)
|
|
try:
|
|
Row = namedtuple('Row', clean_column_names)
|
|
except Exception:
|
|
clean_column_names = list(map(_clean_column_name, colnames)) # create list because py3 map object will be consumed by first attempt
|
|
log.warning("Failed creating named tuple for results with column names %s (cleaned: %s) "
|
|
"(see Python 'namedtuple' documentation for details on name rules). "
|
|
"Results will be returned with positional names. "
|
|
"Avoid this by choosing different names, using SELECT \"<col name>\" AS aliases, "
|
|
"or specifying a different row_factory on your Session" %
|
|
(colnames, clean_column_names))
|
|
Row = namedtuple('Row', _sanitize_identifiers(clean_column_names))
|
|
|
|
return [Row(*row) for row in rows]
|
|
|
|
|
|
def dict_factory(colnames, rows):
|
|
"""
|
|
Returns each row as a dict.
|
|
|
|
Example::
|
|
|
|
>>> from cassandra.query import dict_factory
|
|
>>> session = cluster.connect('mykeyspace')
|
|
>>> session.row_factory = dict_factory
|
|
>>> rows = session.execute("SELECT name, age FROM users LIMIT 1")
|
|
>>> print rows[0]
|
|
{u'age': 42, u'name': u'Bob'}
|
|
|
|
.. versionchanged:: 2.0.0
|
|
moved from ``cassandra.decoder`` to ``cassandra.query``
|
|
"""
|
|
return [dict(zip(colnames, row)) for row in rows]
|
|
|
|
|
|
def ordered_dict_factory(colnames, rows):
|
|
"""
|
|
Like :meth:`~cassandra.query.dict_factory`, but returns each row as an OrderedDict,
|
|
so the order of the columns is preserved.
|
|
|
|
.. versionchanged:: 2.0.0
|
|
moved from ``cassandra.decoder`` to ``cassandra.query``
|
|
"""
|
|
return [OrderedDict(zip(colnames, row)) for row in rows]
|
|
|
|
|
|
FETCH_SIZE_UNSET = object()
|
|
|
|
|
|
class Statement(object):
|
|
"""
|
|
An abstract class representing a single query. There are three subclasses:
|
|
:class:`.SimpleStatement`, :class:`.BoundStatement`, and :class:`.BatchStatement`.
|
|
These can be passed to :meth:`.Session.execute()`.
|
|
"""
|
|
|
|
retry_policy = None
|
|
"""
|
|
An instance of a :class:`cassandra.policies.RetryPolicy` or one of its
|
|
subclasses. This controls when a query will be retried and how it
|
|
will be retried.
|
|
"""
|
|
|
|
consistency_level = None
|
|
"""
|
|
The :class:`.ConsistencyLevel` to be used for this operation. Defaults
|
|
to :const:`None`, which means that the default consistency level for
|
|
the Session this is executed in will be used.
|
|
"""
|
|
|
|
fetch_size = FETCH_SIZE_UNSET
|
|
"""
|
|
How many rows will be fetched at a time. This overrides the default
|
|
of :attr:`.Session.default_fetch_size`
|
|
|
|
This only takes effect when protocol version 2 or higher is used.
|
|
See :attr:`.Cluster.protocol_version` for details.
|
|
|
|
.. versionadded:: 2.0.0
|
|
"""
|
|
|
|
keyspace = None
|
|
"""
|
|
The string name of the keyspace this query acts on. This is used when
|
|
:class:`~.TokenAwarePolicy` is configured for
|
|
:attr:`.Cluster.load_balancing_policy`
|
|
|
|
It is set implicitly on :class:`.BoundStatement`, and :class:`.BatchStatement`,
|
|
but must be set explicitly on :class:`.SimpleStatement`.
|
|
|
|
.. versionadded:: 2.1.3
|
|
"""
|
|
|
|
custom_payload = None
|
|
"""
|
|
:ref:`custom_payload` to be passed to the server.
|
|
|
|
These are only allowed when using protocol version 4 or higher.
|
|
|
|
.. versionadded:: 2.6.0
|
|
"""
|
|
|
|
is_idempotent = False
|
|
"""
|
|
Flag indicating whether this statement is safe to run multiple times in speculative execution.
|
|
"""
|
|
|
|
_serial_consistency_level = None
|
|
_routing_key = None
|
|
|
|
def __init__(self, retry_policy=None, consistency_level=None, routing_key=None,
|
|
serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None, custom_payload=None,
|
|
is_idempotent=False):
|
|
if retry_policy and not hasattr(retry_policy, 'on_read_timeout'): # just checking one method to detect positional parameter errors
|
|
raise ValueError('retry_policy should implement cassandra.policies.RetryPolicy')
|
|
self.retry_policy = retry_policy
|
|
if consistency_level is not None:
|
|
self.consistency_level = consistency_level
|
|
self._routing_key = routing_key
|
|
if serial_consistency_level is not None:
|
|
self.serial_consistency_level = serial_consistency_level
|
|
if fetch_size is not FETCH_SIZE_UNSET:
|
|
self.fetch_size = fetch_size
|
|
if keyspace is not None:
|
|
self.keyspace = keyspace
|
|
if custom_payload is not None:
|
|
self.custom_payload = custom_payload
|
|
self.is_idempotent = is_idempotent
|
|
|
|
def _key_parts_packed(self, parts):
|
|
for p in parts:
|
|
l = len(p)
|
|
yield struct.pack(">H%dsB" % l, l, p, 0)
|
|
|
|
def _get_routing_key(self):
|
|
return self._routing_key
|
|
|
|
def _set_routing_key(self, key):
|
|
if isinstance(key, (list, tuple)):
|
|
if len(key) == 1:
|
|
self._routing_key = key[0]
|
|
else:
|
|
self._routing_key = b"".join(self._key_parts_packed(key))
|
|
else:
|
|
self._routing_key = key
|
|
|
|
def _del_routing_key(self):
|
|
self._routing_key = None
|
|
|
|
routing_key = property(
|
|
_get_routing_key,
|
|
_set_routing_key,
|
|
_del_routing_key,
|
|
"""
|
|
The :attr:`~.TableMetadata.partition_key` portion of the primary key,
|
|
which can be used to determine which nodes are replicas for the query.
|
|
|
|
If the partition key is a composite, a list or tuple must be passed in.
|
|
Each key component should be in its packed (binary) format, so all
|
|
components should be strings.
|
|
""")
|
|
|
|
def _get_serial_consistency_level(self):
|
|
return self._serial_consistency_level
|
|
|
|
def _set_serial_consistency_level(self, serial_consistency_level):
|
|
acceptable = (None, ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL)
|
|
if serial_consistency_level not in acceptable:
|
|
raise ValueError(
|
|
"serial_consistency_level must be either ConsistencyLevel.SERIAL "
|
|
"or ConsistencyLevel.LOCAL_SERIAL")
|
|
self._serial_consistency_level = serial_consistency_level
|
|
|
|
def _del_serial_consistency_level(self):
|
|
self._serial_consistency_level = None
|
|
|
|
serial_consistency_level = property(
|
|
_get_serial_consistency_level,
|
|
_set_serial_consistency_level,
|
|
_del_serial_consistency_level,
|
|
"""
|
|
The serial consistency level is only used by conditional updates
|
|
(``INSERT``, ``UPDATE`` and ``DELETE`` with an ``IF`` condition). For
|
|
those, the ``serial_consistency_level`` defines the consistency level of
|
|
the serial phase (or "paxos" phase) while the normal
|
|
:attr:`~.consistency_level` defines the consistency for the "learn" phase,
|
|
i.e. what type of reads will be guaranteed to see the update right away.
|
|
For example, if a conditional write has a :attr:`~.consistency_level` of
|
|
:attr:`~.ConsistencyLevel.QUORUM` (and is successful), then a
|
|
:attr:`~.ConsistencyLevel.QUORUM` read is guaranteed to see that write.
|
|
But if the regular :attr:`~.consistency_level` of that write is
|
|
:attr:`~.ConsistencyLevel.ANY`, then only a read with a
|
|
:attr:`~.consistency_level` of :attr:`~.ConsistencyLevel.SERIAL` is
|
|
guaranteed to see it (even a read with consistency
|
|
:attr:`~.ConsistencyLevel.ALL` is not guaranteed to be enough).
|
|
|
|
The serial consistency can only be one of :attr:`~.ConsistencyLevel.SERIAL`
|
|
or :attr:`~.ConsistencyLevel.LOCAL_SERIAL`. While ``SERIAL`` guarantees full
|
|
linearizability (with other ``SERIAL`` updates), ``LOCAL_SERIAL`` only
|
|
guarantees it in the local data center.
|
|
|
|
The serial consistency level is ignored for any query that is not a
|
|
conditional update. Serial reads should use the regular
|
|
:attr:`consistency_level`.
|
|
|
|
Serial consistency levels may only be used against Cassandra 2.0+
|
|
and the :attr:`~.Cluster.protocol_version` must be set to 2 or higher.
|
|
|
|
See :doc:`/lwt` for a discussion on how to work with results returned from
|
|
conditional statements.
|
|
|
|
.. versionadded:: 2.0.0
|
|
""")
|
|
|
|
|
|
class SimpleStatement(Statement):
|
|
"""
|
|
A simple, un-prepared query.
|
|
"""
|
|
|
|
def __init__(self, query_string, retry_policy=None, consistency_level=None, routing_key=None,
|
|
serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None,
|
|
custom_payload=None, is_idempotent=False):
|
|
"""
|
|
`query_string` should be a literal CQL statement with the exception
|
|
of parameter placeholders that will be filled through the
|
|
`parameters` argument of :meth:`.Session.execute()`.
|
|
|
|
See :class:`Statement` attributes for a description of the other parameters.
|
|
"""
|
|
Statement.__init__(self, retry_policy, consistency_level, routing_key,
|
|
serial_consistency_level, fetch_size, keyspace, custom_payload, is_idempotent)
|
|
self._query_string = query_string
|
|
|
|
@property
|
|
def query_string(self):
|
|
return self._query_string
|
|
|
|
def __str__(self):
|
|
consistency = ConsistencyLevel.value_to_name.get(self.consistency_level, 'Not Set')
|
|
return (u'<SimpleStatement query="%s", consistency=%s>' %
|
|
(self.query_string, consistency))
|
|
__repr__ = __str__
|
|
|
|
|
|
class PreparedStatement(object):
|
|
"""
|
|
A statement that has been prepared against at least one Cassandra node.
|
|
Instances of this class should not be created directly, but through
|
|
:meth:`.Session.prepare()`.
|
|
|
|
A :class:`.PreparedStatement` should be prepared only once. Re-preparing a statement
|
|
may affect performance (as the operation requires a network roundtrip).
|
|
"""
|
|
|
|
column_metadata = None #TODO: make this bind_metadata in next major
|
|
consistency_level = None
|
|
custom_payload = None
|
|
fetch_size = FETCH_SIZE_UNSET
|
|
keyspace = None # change to prepared_keyspace in major release
|
|
protocol_version = None
|
|
query_id = None
|
|
query_string = None
|
|
result_metadata = None
|
|
routing_key_indexes = None
|
|
_routing_key_index_set = None
|
|
serial_consistency_level = None
|
|
|
|
def __init__(self, column_metadata, query_id, routing_key_indexes, query,
|
|
keyspace, protocol_version, result_metadata):
|
|
self.column_metadata = column_metadata
|
|
self.query_id = query_id
|
|
self.routing_key_indexes = routing_key_indexes
|
|
self.query_string = query
|
|
self.keyspace = keyspace
|
|
self.protocol_version = protocol_version
|
|
self.result_metadata = result_metadata
|
|
self.is_idempotent = False
|
|
|
|
@classmethod
|
|
def from_message(cls, query_id, column_metadata, pk_indexes, cluster_metadata,
|
|
query, prepared_keyspace, protocol_version, result_metadata):
|
|
if not column_metadata:
|
|
return PreparedStatement(column_metadata, query_id, None, query, prepared_keyspace, protocol_version, result_metadata)
|
|
|
|
if pk_indexes:
|
|
routing_key_indexes = pk_indexes
|
|
else:
|
|
routing_key_indexes = None
|
|
|
|
first_col = column_metadata[0]
|
|
ks_meta = cluster_metadata.keyspaces.get(first_col.keyspace_name)
|
|
if ks_meta:
|
|
table_meta = ks_meta.tables.get(first_col.table_name)
|
|
if table_meta:
|
|
partition_key_columns = table_meta.partition_key
|
|
|
|
# make a map of {column_name: index} for each column in the statement
|
|
statement_indexes = dict((c.name, i) for i, c in enumerate(column_metadata))
|
|
|
|
# a list of which indexes in the statement correspond to partition key items
|
|
try:
|
|
routing_key_indexes = [statement_indexes[c.name]
|
|
for c in partition_key_columns]
|
|
except KeyError: # we're missing a partition key component in the prepared
|
|
pass # statement; just leave routing_key_indexes as None
|
|
|
|
return PreparedStatement(column_metadata, query_id, routing_key_indexes,
|
|
query, prepared_keyspace, protocol_version, result_metadata)
|
|
|
|
def bind(self, values):
|
|
"""
|
|
Creates and returns a :class:`BoundStatement` instance using `values`.
|
|
|
|
See :meth:`BoundStatement.bind` for rules on input ``values``.
|
|
"""
|
|
return BoundStatement(self).bind(values)
|
|
|
|
def is_routing_key_index(self, i):
|
|
if self._routing_key_index_set is None:
|
|
self._routing_key_index_set = set(self.routing_key_indexes) if self.routing_key_indexes else set()
|
|
return i in self._routing_key_index_set
|
|
|
|
def __str__(self):
|
|
consistency = ConsistencyLevel.value_to_name.get(self.consistency_level, 'Not Set')
|
|
return (u'<PreparedStatement query="%s", consistency=%s>' %
|
|
(self.query_string, consistency))
|
|
__repr__ = __str__
|
|
|
|
|
|
class BoundStatement(Statement):
|
|
"""
|
|
A prepared statement that has been bound to a particular set of values.
|
|
These may be created directly or through :meth:`.PreparedStatement.bind()`.
|
|
"""
|
|
|
|
prepared_statement = None
|
|
"""
|
|
The :class:`PreparedStatement` instance that this was created from.
|
|
"""
|
|
|
|
values = None
|
|
"""
|
|
The sequence of values that were bound to the prepared statement.
|
|
"""
|
|
|
|
def __init__(self, prepared_statement, retry_policy=None, consistency_level=None, routing_key=None,
|
|
serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None,
|
|
custom_payload=None):
|
|
"""
|
|
`prepared_statement` should be an instance of :class:`PreparedStatement`.
|
|
|
|
See :class:`Statement` attributes for a description of the other parameters.
|
|
"""
|
|
self.prepared_statement = prepared_statement
|
|
|
|
self.consistency_level = prepared_statement.consistency_level
|
|
self.serial_consistency_level = prepared_statement.serial_consistency_level
|
|
self.fetch_size = prepared_statement.fetch_size
|
|
self.custom_payload = prepared_statement.custom_payload
|
|
self.is_idempotent = prepared_statement.is_idempotent
|
|
self.values = []
|
|
|
|
meta = prepared_statement.column_metadata
|
|
if meta:
|
|
self.keyspace = meta[0].keyspace_name
|
|
|
|
Statement.__init__(self, retry_policy, consistency_level, routing_key,
|
|
serial_consistency_level, fetch_size, keyspace, custom_payload)
|
|
|
|
def bind(self, values):
|
|
"""
|
|
Binds a sequence of values for the prepared statement parameters
|
|
and returns this instance. Note that `values` *must* be:
|
|
|
|
* a sequence, even if you are only binding one value, or
|
|
* a dict that relates 1-to-1 between dict keys and columns
|
|
|
|
.. versionchanged:: 2.6.0
|
|
|
|
:data:`~.UNSET_VALUE` was introduced. These can be bound as positional parameters
|
|
in a sequence, or by name in a dict. Additionally, when using protocol v4+:
|
|
|
|
* short sequences will be extended to match bind parameters with UNSET_VALUE
|
|
* names may be omitted from a dict with UNSET_VALUE implied.
|
|
|
|
.. versionchanged:: 3.0.0
|
|
|
|
method will not throw if extra keys are present in bound dict (PYTHON-178)
|
|
"""
|
|
if values is None:
|
|
values = ()
|
|
proto_version = self.prepared_statement.protocol_version
|
|
col_meta = self.prepared_statement.column_metadata
|
|
|
|
# special case for binding dicts
|
|
if isinstance(values, dict):
|
|
values_dict = values
|
|
values = []
|
|
|
|
# sort values accordingly
|
|
for col in col_meta:
|
|
try:
|
|
values.append(values_dict[col.name])
|
|
except KeyError:
|
|
if proto_version >= 4:
|
|
values.append(UNSET_VALUE)
|
|
else:
|
|
raise KeyError(
|
|
'Column name `%s` not found in bound dict.' %
|
|
(col.name))
|
|
|
|
value_len = len(values)
|
|
col_meta_len = len(col_meta)
|
|
|
|
if value_len > col_meta_len:
|
|
raise ValueError(
|
|
"Too many arguments provided to bind() (got %d, expected %d)" %
|
|
(len(values), len(col_meta)))
|
|
|
|
# this is fail-fast for clarity pre-v4. When v4 can be assumed,
|
|
# the error will be better reported when UNSET_VALUE is implicitly added.
|
|
if proto_version < 4 and self.prepared_statement.routing_key_indexes and \
|
|
value_len < len(self.prepared_statement.routing_key_indexes):
|
|
raise ValueError(
|
|
"Too few arguments provided to bind() (got %d, required %d for routing key)" %
|
|
(value_len, len(self.prepared_statement.routing_key_indexes)))
|
|
|
|
self.raw_values = values
|
|
self.values = []
|
|
for value, col_spec in zip(values, col_meta):
|
|
if value is None:
|
|
self.values.append(None)
|
|
elif value is UNSET_VALUE:
|
|
if proto_version >= 4:
|
|
self._append_unset_value()
|
|
else:
|
|
raise ValueError("Attempt to bind UNSET_VALUE while using unsuitable protocol version (%d < 4)" % proto_version)
|
|
else:
|
|
try:
|
|
self.values.append(col_spec.type.serialize(value, proto_version))
|
|
except (TypeError, struct.error) as exc:
|
|
actual_type = type(value)
|
|
message = ('Received an argument of invalid type for column "%s". '
|
|
'Expected: %s, Got: %s; (%s)' % (col_spec.name, col_spec.type, actual_type, exc))
|
|
raise TypeError(message)
|
|
|
|
if proto_version >= 4:
|
|
diff = col_meta_len - len(self.values)
|
|
if diff:
|
|
for _ in range(diff):
|
|
self._append_unset_value()
|
|
|
|
return self
|
|
|
|
def _append_unset_value(self):
|
|
next_index = len(self.values)
|
|
if self.prepared_statement.is_routing_key_index(next_index):
|
|
col_meta = self.prepared_statement.column_metadata[next_index]
|
|
raise ValueError("Cannot bind UNSET_VALUE as a part of the routing key '%s'" % col_meta.name)
|
|
self.values.append(UNSET_VALUE)
|
|
|
|
@property
|
|
def routing_key(self):
|
|
if not self.prepared_statement.routing_key_indexes:
|
|
return None
|
|
|
|
if self._routing_key is not None:
|
|
return self._routing_key
|
|
|
|
routing_indexes = self.prepared_statement.routing_key_indexes
|
|
if len(routing_indexes) == 1:
|
|
self._routing_key = self.values[routing_indexes[0]]
|
|
else:
|
|
self._routing_key = b"".join(self._key_parts_packed(self.values[i] for i in routing_indexes))
|
|
|
|
return self._routing_key
|
|
|
|
def __str__(self):
|
|
consistency = ConsistencyLevel.value_to_name.get(self.consistency_level, 'Not Set')
|
|
return (u'<BoundStatement query="%s", values=%s, consistency=%s>' %
|
|
(self.prepared_statement.query_string, self.raw_values, consistency))
|
|
__repr__ = __str__
|
|
|
|
|
|
class BatchType(object):
|
|
"""
|
|
A BatchType is used with :class:`.BatchStatement` instances to control
|
|
the atomicity of the batch operation.
|
|
|
|
.. versionadded:: 2.0.0
|
|
"""
|
|
|
|
LOGGED = None
|
|
"""
|
|
Atomic batch operation.
|
|
"""
|
|
|
|
UNLOGGED = None
|
|
"""
|
|
Non-atomic batch operation.
|
|
"""
|
|
|
|
COUNTER = None
|
|
"""
|
|
Batches of counter operations.
|
|
"""
|
|
|
|
def __init__(self, name, value):
|
|
self.name = name
|
|
self.value = value
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def __repr__(self):
|
|
return "BatchType.%s" % (self.name, )
|
|
|
|
|
|
BatchType.LOGGED = BatchType("LOGGED", 0)
|
|
BatchType.UNLOGGED = BatchType("UNLOGGED", 1)
|
|
BatchType.COUNTER = BatchType("COUNTER", 2)
|
|
|
|
|
|
class BatchStatement(Statement):
|
|
"""
|
|
A protocol-level batch of operations which are applied atomically
|
|
by default.
|
|
|
|
.. versionadded:: 2.0.0
|
|
"""
|
|
|
|
batch_type = None
|
|
"""
|
|
The :class:`.BatchType` for the batch operation. Defaults to
|
|
:attr:`.BatchType.LOGGED`.
|
|
"""
|
|
|
|
serial_consistency_level = None
|
|
"""
|
|
The same as :attr:`.Statement.serial_consistency_level`, but is only
|
|
supported when using protocol version 3 or higher.
|
|
"""
|
|
|
|
_statements_and_parameters = None
|
|
_session = None
|
|
|
|
def __init__(self, batch_type=BatchType.LOGGED, retry_policy=None,
|
|
consistency_level=None, serial_consistency_level=None,
|
|
session=None, custom_payload=None):
|
|
"""
|
|
`batch_type` specifies The :class:`.BatchType` for the batch operation.
|
|
Defaults to :attr:`.BatchType.LOGGED`.
|
|
|
|
`retry_policy` should be a :class:`~.RetryPolicy` instance for
|
|
controlling retries on the operation.
|
|
|
|
`consistency_level` should be a :class:`~.ConsistencyLevel` value
|
|
to be used for all operations in the batch.
|
|
|
|
`custom_payload` is a :ref:`custom_payload` passed to the server.
|
|
Note: as Statement objects are added to the batch, this map is
|
|
updated with any values found in their custom payloads. These are
|
|
only allowed when using protocol version 4 or higher.
|
|
|
|
Example usage:
|
|
|
|
.. code-block:: python
|
|
|
|
insert_user = session.prepare("INSERT INTO users (name, age) VALUES (?, ?)")
|
|
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
|
|
|
|
for (name, age) in users_to_insert:
|
|
batch.add(insert_user, (name, age))
|
|
|
|
session.execute(batch)
|
|
|
|
You can also mix different types of operations within a batch:
|
|
|
|
.. code-block:: python
|
|
|
|
batch = BatchStatement()
|
|
batch.add(SimpleStatement("INSERT INTO users (name, age) VALUES (%s, %s)"), (name, age))
|
|
batch.add(SimpleStatement("DELETE FROM pending_users WHERE name=%s"), (name,))
|
|
session.execute(batch)
|
|
|
|
.. versionadded:: 2.0.0
|
|
|
|
.. versionchanged:: 2.1.0
|
|
Added `serial_consistency_level` as a parameter
|
|
|
|
.. versionchanged:: 2.6.0
|
|
Added `custom_payload` as a parameter
|
|
"""
|
|
self.batch_type = batch_type
|
|
self._statements_and_parameters = []
|
|
self._session = session
|
|
Statement.__init__(self, retry_policy=retry_policy, consistency_level=consistency_level,
|
|
serial_consistency_level=serial_consistency_level, custom_payload=custom_payload)
|
|
|
|
def clear(self):
|
|
"""
|
|
This is a convenience method to clear a batch statement for reuse.
|
|
|
|
*Note:* it should not be used concurrently with uncompleted execution futures executing the same
|
|
``BatchStatement``.
|
|
"""
|
|
del self._statements_and_parameters[:]
|
|
self.keyspace = None
|
|
self.routing_key = None
|
|
if self.custom_payload:
|
|
self.custom_payload.clear()
|
|
|
|
def add(self, statement, parameters=None):
|
|
"""
|
|
Adds a :class:`.Statement` and optional sequence of parameters
|
|
to be used with the statement to the batch.
|
|
|
|
Like with other statements, parameters must be a sequence, even
|
|
if there is only one item.
|
|
"""
|
|
if isinstance(statement, six.string_types):
|
|
if parameters:
|
|
encoder = Encoder() if self._session is None else self._session.encoder
|
|
statement = bind_params(statement, parameters, encoder)
|
|
self._add_statement_and_params(False, statement, ())
|
|
elif isinstance(statement, PreparedStatement):
|
|
query_id = statement.query_id
|
|
bound_statement = statement.bind(() if parameters is None else parameters)
|
|
self._update_state(bound_statement)
|
|
self._add_statement_and_params(True, query_id, bound_statement.values)
|
|
elif isinstance(statement, BoundStatement):
|
|
if parameters:
|
|
raise ValueError(
|
|
"Parameters cannot be passed with a BoundStatement "
|
|
"to BatchStatement.add()")
|
|
self._update_state(statement)
|
|
self._add_statement_and_params(True, statement.prepared_statement.query_id, statement.values)
|
|
else:
|
|
# it must be a SimpleStatement
|
|
query_string = statement.query_string
|
|
if parameters:
|
|
encoder = Encoder() if self._session is None else self._session.encoder
|
|
query_string = bind_params(query_string, parameters, encoder)
|
|
self._update_state(statement)
|
|
self._add_statement_and_params(False, query_string, ())
|
|
return self
|
|
|
|
def add_all(self, statements, parameters):
|
|
"""
|
|
Adds a sequence of :class:`.Statement` objects and a matching sequence
|
|
of parameters to the batch. Statement and parameter sequences must be of equal length or
|
|
one will be truncated. :const:`None` can be used in the parameters position where are needed.
|
|
"""
|
|
for statement, value in zip(statements, parameters):
|
|
self.add(statement, value)
|
|
|
|
def _add_statement_and_params(self, is_prepared, statement, parameters):
|
|
if len(self._statements_and_parameters) >= 0xFFFF:
|
|
raise ValueError("Batch statement cannot contain more than %d statements." % 0xFFFF)
|
|
self._statements_and_parameters.append((is_prepared, statement, parameters))
|
|
|
|
def _maybe_set_routing_attributes(self, statement):
|
|
if self.routing_key is None:
|
|
if statement.keyspace and statement.routing_key:
|
|
self.routing_key = statement.routing_key
|
|
self.keyspace = statement.keyspace
|
|
|
|
def _update_custom_payload(self, statement):
|
|
if statement.custom_payload:
|
|
if self.custom_payload is None:
|
|
self.custom_payload = {}
|
|
self.custom_payload.update(statement.custom_payload)
|
|
|
|
def _update_state(self, statement):
|
|
self._maybe_set_routing_attributes(statement)
|
|
self._update_custom_payload(statement)
|
|
|
|
def __len__(self):
|
|
return len(self._statements_and_parameters)
|
|
|
|
def __str__(self):
|
|
consistency = ConsistencyLevel.value_to_name.get(self.consistency_level, 'Not Set')
|
|
return (u'<BatchStatement type=%s, statements=%d, consistency=%s>' %
|
|
(self.batch_type, len(self), consistency))
|
|
__repr__ = __str__
|
|
|
|
|
|
ValueSequence = cassandra.encoder.ValueSequence
|
|
"""
|
|
A wrapper class that is used to specify that a sequence of values should
|
|
be treated as a CQL list of values instead of a single column collection when used
|
|
as part of the `parameters` argument for :meth:`.Session.execute()`.
|
|
|
|
This is typically needed when supplying a list of keys to select.
|
|
For example::
|
|
|
|
>>> my_user_ids = ('alice', 'bob', 'charles')
|
|
>>> query = "SELECT * FROM users WHERE user_id IN %s"
|
|
>>> session.execute(query, parameters=[ValueSequence(my_user_ids)])
|
|
|
|
"""
|
|
|
|
|
|
def bind_params(query, params, encoder):
|
|
if six.PY2 and isinstance(query, six.text_type):
|
|
query = query.encode('utf-8')
|
|
if isinstance(params, dict):
|
|
return query % dict((k, encoder.cql_encode_all_types(v)) for k, v in six.iteritems(params))
|
|
else:
|
|
return query % tuple(encoder.cql_encode_all_types(v) for v in params)
|
|
|
|
|
|
class TraceUnavailable(Exception):
|
|
"""
|
|
Raised when complete trace details cannot be fetched from Cassandra.
|
|
"""
|
|
pass
|
|
|
|
|
|
class QueryTrace(object):
|
|
"""
|
|
A trace of the duration and events that occurred when executing
|
|
an operation.
|
|
"""
|
|
|
|
trace_id = None
|
|
"""
|
|
:class:`uuid.UUID` unique identifier for this tracing session. Matches
|
|
the ``session_id`` column in ``system_traces.sessions`` and
|
|
``system_traces.events``.
|
|
"""
|
|
|
|
request_type = None
|
|
"""
|
|
A string that very generally describes the traced operation.
|
|
"""
|
|
|
|
duration = None
|
|
"""
|
|
A :class:`datetime.timedelta` measure of the duration of the query.
|
|
"""
|
|
|
|
client = None
|
|
"""
|
|
The IP address of the client that issued this request
|
|
|
|
This is only available when using Cassandra 2.2+
|
|
"""
|
|
|
|
coordinator = None
|
|
"""
|
|
The IP address of the host that acted as coordinator for this request.
|
|
"""
|
|
|
|
parameters = None
|
|
"""
|
|
A :class:`dict` of parameters for the traced operation, such as the
|
|
specific query string.
|
|
"""
|
|
|
|
started_at = None
|
|
"""
|
|
A UTC :class:`datetime.datetime` object describing when the operation
|
|
was started.
|
|
"""
|
|
|
|
events = None
|
|
"""
|
|
A chronologically sorted list of :class:`.TraceEvent` instances
|
|
representing the steps the traced operation went through. This
|
|
corresponds to the rows in ``system_traces.events`` for this tracing
|
|
session.
|
|
"""
|
|
|
|
_session = None
|
|
|
|
_SELECT_SESSIONS_FORMAT = "SELECT * FROM system_traces.sessions WHERE session_id = %s"
|
|
_SELECT_EVENTS_FORMAT = "SELECT * FROM system_traces.events WHERE session_id = %s"
|
|
_BASE_RETRY_SLEEP = 0.003
|
|
|
|
def __init__(self, trace_id, session):
|
|
self.trace_id = trace_id
|
|
self._session = session
|
|
|
|
def populate(self, max_wait=2.0, wait_for_complete=True, query_cl=None):
|
|
"""
|
|
Retrieves the actual tracing details from Cassandra and populates the
|
|
attributes of this instance. Because tracing details are stored
|
|
asynchronously by Cassandra, this may need to retry the session
|
|
detail fetch. If the trace is still not available after `max_wait`
|
|
seconds, :exc:`.TraceUnavailable` will be raised; if `max_wait` is
|
|
:const:`None`, this will retry forever.
|
|
|
|
`wait_for_complete=False` bypasses the wait for duration to be populated.
|
|
This can be used to query events from partial sessions.
|
|
|
|
`query_cl` specifies a consistency level to use for polling the trace tables,
|
|
if it should be different than the session default.
|
|
"""
|
|
attempt = 0
|
|
start = time.time()
|
|
while True:
|
|
time_spent = time.time() - start
|
|
if max_wait is not None and time_spent >= max_wait:
|
|
raise TraceUnavailable(
|
|
"Trace information was not available within %f seconds. Consider raising Session.max_trace_wait." % (max_wait,))
|
|
|
|
log.debug("Attempting to fetch trace info for trace ID: %s", self.trace_id)
|
|
session_results = self._execute(
|
|
SimpleStatement(self._SELECT_SESSIONS_FORMAT, consistency_level=query_cl), (self.trace_id,), time_spent, max_wait)
|
|
|
|
# PYTHON-730: There is race condition that the duration mutation is written before started_at the for fast queries
|
|
is_complete = session_results and session_results[0].duration is not None and session_results[0].started_at is not None
|
|
if not session_results or (wait_for_complete and not is_complete):
|
|
time.sleep(self._BASE_RETRY_SLEEP * (2 ** attempt))
|
|
attempt += 1
|
|
continue
|
|
if is_complete:
|
|
log.debug("Fetched trace info for trace ID: %s", self.trace_id)
|
|
else:
|
|
log.debug("Fetching parital trace info for trace ID: %s", self.trace_id)
|
|
|
|
session_row = session_results[0]
|
|
self.request_type = session_row.request
|
|
self.duration = timedelta(microseconds=session_row.duration) if is_complete else None
|
|
self.started_at = session_row.started_at
|
|
self.coordinator = session_row.coordinator
|
|
self.parameters = session_row.parameters
|
|
# since C* 2.2
|
|
self.client = getattr(session_row, 'client', None)
|
|
|
|
log.debug("Attempting to fetch trace events for trace ID: %s", self.trace_id)
|
|
time_spent = time.time() - start
|
|
event_results = self._execute(
|
|
SimpleStatement(self._SELECT_EVENTS_FORMAT, consistency_level=query_cl), (self.trace_id,), time_spent, max_wait)
|
|
log.debug("Fetched trace events for trace ID: %s", self.trace_id)
|
|
self.events = tuple(TraceEvent(r.activity, r.event_id, r.source, r.source_elapsed, r.thread)
|
|
for r in event_results)
|
|
break
|
|
|
|
def _execute(self, query, parameters, time_spent, max_wait):
|
|
timeout = (max_wait - time_spent) if max_wait is not None else None
|
|
future = self._session._create_response_future(query, parameters, trace=False, custom_payload=None, timeout=timeout)
|
|
# in case the user switched the row factory, set it to namedtuple for this query
|
|
future.row_factory = named_tuple_factory
|
|
future.send_request()
|
|
|
|
try:
|
|
return future.result()
|
|
except OperationTimedOut:
|
|
raise TraceUnavailable("Trace information was not available within %f seconds" % (max_wait,))
|
|
|
|
def __str__(self):
|
|
return "%s [%s] coordinator: %s, started at: %s, duration: %s, parameters: %s" \
|
|
% (self.request_type, self.trace_id, self.coordinator, self.started_at,
|
|
self.duration, self.parameters)
|
|
|
|
|
|
class TraceEvent(object):
|
|
"""
|
|
Representation of a single event within a query trace.
|
|
"""
|
|
|
|
description = None
|
|
"""
|
|
A brief description of the event.
|
|
"""
|
|
|
|
datetime = None
|
|
"""
|
|
A UTC :class:`datetime.datetime` marking when the event occurred.
|
|
"""
|
|
|
|
source = None
|
|
"""
|
|
The IP address of the node this event occurred on.
|
|
"""
|
|
|
|
source_elapsed = None
|
|
"""
|
|
A :class:`datetime.timedelta` measuring the amount of time until
|
|
this event occurred starting from when :attr:`.source` first
|
|
received the query.
|
|
"""
|
|
|
|
thread_name = None
|
|
"""
|
|
The name of the thread that this event occurred on.
|
|
"""
|
|
|
|
def __init__(self, description, timeuuid, source, source_elapsed, thread_name):
|
|
self.description = description
|
|
self.datetime = datetime.utcfromtimestamp(unix_time_from_uuid1(timeuuid))
|
|
self.source = source
|
|
if source_elapsed is not None:
|
|
self.source_elapsed = timedelta(microseconds=source_elapsed)
|
|
else:
|
|
self.source_elapsed = None
|
|
self.thread_name = thread_name
|
|
|
|
def __str__(self):
|
|
return "%s on %s[%s] at %s" % (self.description, self.source, self.thread_name, self.datetime)
|