1506 lines
54 KiB
Python
1506 lines
54 KiB
Python
# Copyright 2013-2017 DataStax, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import copy
|
|
from datetime import datetime, timedelta
|
|
from functools import partial
|
|
import time
|
|
import six
|
|
from warnings import warn
|
|
|
|
from cassandra.query import SimpleStatement
|
|
from cassandra.cqlengine import columns, CQLEngineException, ValidationError, UnicodeMixin
|
|
from cassandra.cqlengine import connection as conn
|
|
from cassandra.cqlengine.functions import Token, BaseQueryFunction, QueryValue
|
|
from cassandra.cqlengine.operators import (InOperator, EqualsOperator, GreaterThanOperator,
|
|
GreaterThanOrEqualOperator, LessThanOperator,
|
|
LessThanOrEqualOperator, ContainsOperator, BaseWhereOperator)
|
|
from cassandra.cqlengine.statements import (WhereClause, SelectStatement, DeleteStatement,
|
|
UpdateStatement, InsertStatement,
|
|
BaseCQLStatement, MapDeleteClause, ConditionalClause)
|
|
|
|
|
|
class QueryException(CQLEngineException):
|
|
pass
|
|
|
|
|
|
class IfNotExistsWithCounterColumn(CQLEngineException):
|
|
pass
|
|
|
|
|
|
class IfExistsWithCounterColumn(CQLEngineException):
|
|
pass
|
|
|
|
|
|
class LWTException(CQLEngineException):
|
|
"""Lightweight conditional exception.
|
|
|
|
This exception will be raised when a write using an `IF` clause could not be
|
|
applied due to existing data violating the condition. The existing data is
|
|
available through the ``existing`` attribute.
|
|
|
|
:param existing: The current state of the data which prevented the write.
|
|
"""
|
|
def __init__(self, existing):
|
|
super(LWTException, self).__init__("LWT Query was not applied")
|
|
self.existing = existing
|
|
|
|
|
|
class DoesNotExist(QueryException):
|
|
pass
|
|
|
|
|
|
class MultipleObjectsReturned(QueryException):
|
|
pass
|
|
|
|
|
|
def check_applied(result):
|
|
"""
|
|
Raises LWTException if it looks like a failed LWT request.
|
|
"""
|
|
try:
|
|
applied = result.was_applied
|
|
except Exception:
|
|
applied = True # result was not LWT form
|
|
if not applied:
|
|
raise LWTException(result[0])
|
|
|
|
|
|
class AbstractQueryableColumn(UnicodeMixin):
|
|
"""
|
|
exposes cql query operators through pythons
|
|
builtin comparator symbols
|
|
"""
|
|
|
|
def _get_column(self):
|
|
raise NotImplementedError
|
|
|
|
def __unicode__(self):
|
|
raise NotImplementedError
|
|
|
|
def _to_database(self, val):
|
|
if isinstance(val, QueryValue):
|
|
return val
|
|
else:
|
|
return self._get_column().to_database(val)
|
|
|
|
def in_(self, item):
|
|
"""
|
|
Returns an in operator
|
|
|
|
used where you'd typically want to use python's `in` operator
|
|
"""
|
|
return WhereClause(six.text_type(self), InOperator(), item)
|
|
|
|
def contains_(self, item):
|
|
"""
|
|
Returns a CONTAINS operator
|
|
"""
|
|
return WhereClause(six.text_type(self), ContainsOperator(), item)
|
|
|
|
|
|
def __eq__(self, other):
|
|
return WhereClause(six.text_type(self), EqualsOperator(), self._to_database(other))
|
|
|
|
def __gt__(self, other):
|
|
return WhereClause(six.text_type(self), GreaterThanOperator(), self._to_database(other))
|
|
|
|
def __ge__(self, other):
|
|
return WhereClause(six.text_type(self), GreaterThanOrEqualOperator(), self._to_database(other))
|
|
|
|
def __lt__(self, other):
|
|
return WhereClause(six.text_type(self), LessThanOperator(), self._to_database(other))
|
|
|
|
def __le__(self, other):
|
|
return WhereClause(six.text_type(self), LessThanOrEqualOperator(), self._to_database(other))
|
|
|
|
|
|
class BatchType(object):
|
|
Unlogged = 'UNLOGGED'
|
|
Counter = 'COUNTER'
|
|
|
|
|
|
class BatchQuery(object):
|
|
"""
|
|
Handles the batching of queries
|
|
|
|
http://docs.datastax.com/en/cql/3.0/cql/cql_reference/batch_r.html
|
|
|
|
See :doc:`/cqlengine/batches` for more details.
|
|
"""
|
|
warn_multiple_exec = True
|
|
|
|
_consistency = None
|
|
|
|
_connection = None
|
|
_connection_explicit = False
|
|
|
|
|
|
def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on_exception=False,
|
|
timeout=conn.NOT_SET, connection=None):
|
|
"""
|
|
:param batch_type: (optional) One of batch type values available through BatchType enum
|
|
:type batch_type: str or None
|
|
:param timestamp: (optional) A datetime or timedelta object with desired timestamp to be applied
|
|
to the batch conditional.
|
|
:type timestamp: datetime or timedelta or None
|
|
:param consistency: (optional) One of consistency values ("ANY", "ONE", "QUORUM" etc)
|
|
:type consistency: The :class:`.ConsistencyLevel` to be used for the batch query, or None.
|
|
:param execute_on_exception: (Defaults to False) Indicates that when the BatchQuery instance is used
|
|
as a context manager the queries accumulated within the context must be executed despite
|
|
encountering an error within the context. By default, any exception raised from within
|
|
the context scope will cause the batched queries not to be executed.
|
|
:type execute_on_exception: bool
|
|
:param timeout: (optional) Timeout for the entire batch (in seconds), if not specified fallback
|
|
to default session timeout
|
|
:type timeout: float or None
|
|
:param str connection: Connection name to use for the batch execution
|
|
"""
|
|
self.queries = []
|
|
self.batch_type = batch_type
|
|
if timestamp is not None and not isinstance(timestamp, (datetime, timedelta)):
|
|
raise CQLEngineException('timestamp object must be an instance of datetime')
|
|
self.timestamp = timestamp
|
|
self._consistency = consistency
|
|
self._execute_on_exception = execute_on_exception
|
|
self._timeout = timeout
|
|
self._callbacks = []
|
|
self._executed = False
|
|
self._context_entered = False
|
|
self._connection = connection
|
|
if connection:
|
|
self._connection_explicit = True
|
|
|
|
def add_query(self, query):
|
|
if not isinstance(query, BaseCQLStatement):
|
|
raise CQLEngineException('only BaseCQLStatements can be added to a batch query')
|
|
self.queries.append(query)
|
|
|
|
def consistency(self, consistency):
|
|
self._consistency = consistency
|
|
|
|
def _execute_callbacks(self):
|
|
for callback, args, kwargs in self._callbacks:
|
|
callback(*args, **kwargs)
|
|
|
|
def add_callback(self, fn, *args, **kwargs):
|
|
"""Add a function and arguments to be passed to it to be executed after the batch executes.
|
|
|
|
A batch can support multiple callbacks.
|
|
|
|
Note, that if the batch does not execute, the callbacks are not executed.
|
|
A callback, thus, is an "on batch success" handler.
|
|
|
|
:param fn: Callable object
|
|
:type fn: callable
|
|
:param \*args: Positional arguments to be passed to the callback at the time of execution
|
|
:param \*\*kwargs: Named arguments to be passed to the callback at the time of execution
|
|
"""
|
|
if not callable(fn):
|
|
raise ValueError("Value for argument 'fn' is {0} and is not a callable object.".format(type(fn)))
|
|
self._callbacks.append((fn, args, kwargs))
|
|
|
|
def execute(self):
|
|
if self._executed and self.warn_multiple_exec:
|
|
msg = "Batch executed multiple times."
|
|
if self._context_entered:
|
|
msg += " If using the batch as a context manager, there is no need to call execute directly."
|
|
warn(msg)
|
|
self._executed = True
|
|
|
|
if len(self.queries) == 0:
|
|
# Empty batch is a no-op
|
|
# except for callbacks
|
|
self._execute_callbacks()
|
|
return
|
|
|
|
opener = 'BEGIN ' + (self.batch_type + ' ' if self.batch_type else '') + ' BATCH'
|
|
if self.timestamp:
|
|
|
|
if isinstance(self.timestamp, six.integer_types):
|
|
ts = self.timestamp
|
|
elif isinstance(self.timestamp, (datetime, timedelta)):
|
|
ts = self.timestamp
|
|
if isinstance(self.timestamp, timedelta):
|
|
ts += datetime.now() # Apply timedelta
|
|
ts = int(time.mktime(ts.timetuple()) * 1e+6 + ts.microsecond)
|
|
else:
|
|
raise ValueError("Batch expects a long, a timedelta, or a datetime")
|
|
|
|
opener += ' USING TIMESTAMP {0}'.format(ts)
|
|
|
|
query_list = [opener]
|
|
parameters = {}
|
|
ctx_counter = 0
|
|
for query in self.queries:
|
|
query.update_context_id(ctx_counter)
|
|
ctx = query.get_context()
|
|
ctx_counter += len(ctx)
|
|
query_list.append(' ' + str(query))
|
|
parameters.update(ctx)
|
|
|
|
query_list.append('APPLY BATCH;')
|
|
|
|
tmp = conn.execute('\n'.join(query_list), parameters, self._consistency, self._timeout, connection=self._connection)
|
|
check_applied(tmp)
|
|
|
|
self.queries = []
|
|
self._execute_callbacks()
|
|
|
|
def __enter__(self):
|
|
self._context_entered = True
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
# don't execute if there was an exception by default
|
|
if exc_type is not None and not self._execute_on_exception:
|
|
return
|
|
self.execute()
|
|
|
|
|
|
class ContextQuery(object):
|
|
"""
|
|
A Context manager to allow a Model to switch context easily. Presently, the context only
|
|
specifies a keyspace for model IO.
|
|
|
|
:param \*args: One or more models. A model should be a class type, not an instance.
|
|
:param \*\*kwargs: (optional) Context parameters: can be *keyspace* or *connection*
|
|
|
|
For example:
|
|
|
|
.. code-block:: python
|
|
|
|
with ContextQuery(Automobile, keyspace='test2') as A:
|
|
A.objects.create(manufacturer='honda', year=2008, model='civic')
|
|
print len(A.objects.all()) # 1 result
|
|
|
|
with ContextQuery(Automobile, keyspace='test4') as A:
|
|
print len(A.objects.all()) # 0 result
|
|
|
|
# Multiple models
|
|
with ContextQuery(Automobile, Automobile2, connection='cluster2') as (A, A2):
|
|
print len(A.objects.all())
|
|
print len(A2.objects.all())
|
|
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
from cassandra.cqlengine import models
|
|
|
|
self.models = []
|
|
|
|
if len(args) < 1:
|
|
raise ValueError("No model provided.")
|
|
|
|
keyspace = kwargs.pop('keyspace', None)
|
|
connection = kwargs.pop('connection', None)
|
|
|
|
if kwargs:
|
|
raise ValueError("Unknown keyword argument(s): {0}".format(
|
|
','.join(kwargs.keys())))
|
|
|
|
for model in args:
|
|
try:
|
|
issubclass(model, models.Model)
|
|
except TypeError:
|
|
raise ValueError("Models must be derived from base Model.")
|
|
|
|
m = models._clone_model_class(model, {})
|
|
|
|
if keyspace:
|
|
m.__keyspace__ = keyspace
|
|
if connection:
|
|
m.__connection__ = connection
|
|
|
|
self.models.append(m)
|
|
|
|
def __enter__(self):
|
|
if len(self.models) > 1:
|
|
return tuple(self.models)
|
|
return self.models[0]
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
return
|
|
|
|
|
|
class AbstractQuerySet(object):
|
|
|
|
def __init__(self, model):
|
|
super(AbstractQuerySet, self).__init__()
|
|
self.model = model
|
|
|
|
# Where clause filters
|
|
self._where = []
|
|
|
|
# Conditional clause filters
|
|
self._conditional = []
|
|
|
|
# ordering arguments
|
|
self._order = []
|
|
|
|
self._allow_filtering = False
|
|
|
|
# CQL has a default limit of 10000, it's defined here
|
|
# because explicit is better than implicit
|
|
self._limit = 10000
|
|
|
|
# We store the fields for which we use the Equal operator
|
|
# in a query, so we don't select it from the DB. _defer_fields
|
|
# will contain the names of the fields in the DB, not the names
|
|
# of the variables used by the mapper
|
|
self._defer_fields = set()
|
|
self._deferred_values = {}
|
|
|
|
# This variable will hold the names in the database of the fields
|
|
# for which we want to query
|
|
self._only_fields = []
|
|
|
|
self._values_list = False
|
|
self._flat_values_list = False
|
|
|
|
# results cache
|
|
self._result_cache = None
|
|
self._result_idx = None
|
|
self._result_generator = None
|
|
self._materialize_results = True
|
|
|
|
self._distinct_fields = None
|
|
|
|
self._count = None
|
|
|
|
self._batch = None
|
|
self._ttl = None
|
|
self._consistency = None
|
|
self._timestamp = None
|
|
self._if_not_exists = False
|
|
self._timeout = conn.NOT_SET
|
|
self._if_exists = False
|
|
self._fetch_size = None
|
|
self._connection = None
|
|
|
|
@property
|
|
def column_family_name(self):
|
|
return self.model.column_family_name()
|
|
|
|
def _execute(self, statement):
|
|
if self._batch:
|
|
return self._batch.add_query(statement)
|
|
else:
|
|
connection = self._connection or self.model._get_connection()
|
|
result = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection)
|
|
if self._if_not_exists or self._if_exists or self._conditional:
|
|
check_applied(result)
|
|
return result
|
|
|
|
def __unicode__(self):
|
|
return six.text_type(self._select_query())
|
|
|
|
def __str__(self):
|
|
return str(self.__unicode__())
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
return self.filter(*args, **kwargs)
|
|
|
|
def __deepcopy__(self, memo):
|
|
clone = self.__class__(self.model)
|
|
for k, v in self.__dict__.items():
|
|
if k in ['_con', '_cur', '_result_cache', '_result_idx', '_result_generator', '_construct_result']: # don't clone these, which are per-request-execution
|
|
clone.__dict__[k] = None
|
|
elif k == '_batch':
|
|
# we need to keep the same batch instance across
|
|
# all queryset clones, otherwise the batched queries
|
|
# fly off into other batch instances which are never
|
|
# executed, thx @dokai
|
|
clone.__dict__[k] = self._batch
|
|
elif k == '_timeout':
|
|
clone.__dict__[k] = self._timeout
|
|
else:
|
|
clone.__dict__[k] = copy.deepcopy(v, memo)
|
|
|
|
return clone
|
|
|
|
def __len__(self):
|
|
self._execute_query()
|
|
return self.count()
|
|
|
|
# ----query generation / execution----
|
|
|
|
def _select_fields(self):
|
|
""" returns the fields to select """
|
|
return []
|
|
|
|
def _validate_select_where(self):
|
|
""" put select query validation here """
|
|
|
|
def _select_query(self):
|
|
"""
|
|
Returns a select clause based on the given filter args
|
|
"""
|
|
if self._where:
|
|
self._validate_select_where()
|
|
return SelectStatement(
|
|
self.column_family_name,
|
|
fields=self._select_fields(),
|
|
where=self._where,
|
|
order_by=self._order,
|
|
limit=self._limit,
|
|
allow_filtering=self._allow_filtering,
|
|
distinct_fields=self._distinct_fields,
|
|
fetch_size=self._fetch_size
|
|
)
|
|
|
|
# ----Reads------
|
|
|
|
def _execute_query(self):
|
|
if self._batch:
|
|
raise CQLEngineException("Only inserts, updates, and deletes are available in batch mode")
|
|
if self._result_cache is None:
|
|
self._result_generator = (i for i in self._execute(self._select_query()))
|
|
self._result_cache = []
|
|
self._construct_result = self._maybe_inject_deferred(self._get_result_constructor())
|
|
|
|
# "DISTINCT COUNT()" is not supported in C* < 2.2, so we need to materialize all results to get
|
|
# len() and count() working with DISTINCT queries
|
|
if self._materialize_results or self._distinct_fields:
|
|
self._fill_result_cache()
|
|
|
|
def _fill_result_cache(self):
|
|
"""
|
|
Fill the result cache with all results.
|
|
"""
|
|
|
|
idx = 0
|
|
try:
|
|
while True:
|
|
idx += 1000
|
|
self._fill_result_cache_to_idx(idx)
|
|
except StopIteration:
|
|
pass
|
|
|
|
self._count = len(self._result_cache)
|
|
|
|
def _fill_result_cache_to_idx(self, idx):
|
|
self._execute_query()
|
|
if self._result_idx is None:
|
|
self._result_idx = -1
|
|
|
|
qty = idx - self._result_idx
|
|
if qty < 1:
|
|
return
|
|
else:
|
|
for idx in range(qty):
|
|
self._result_idx += 1
|
|
while True:
|
|
try:
|
|
self._result_cache[self._result_idx] = self._construct_result(self._result_cache[self._result_idx])
|
|
break
|
|
except IndexError:
|
|
self._result_cache.append(next(self._result_generator))
|
|
|
|
def __iter__(self):
|
|
self._execute_query()
|
|
|
|
idx = 0
|
|
while True:
|
|
if len(self._result_cache) <= idx:
|
|
try:
|
|
self._result_cache.append(next(self._result_generator))
|
|
except StopIteration:
|
|
break
|
|
|
|
instance = self._result_cache[idx]
|
|
if isinstance(instance, dict):
|
|
self._fill_result_cache_to_idx(idx)
|
|
yield self._result_cache[idx]
|
|
|
|
idx += 1
|
|
|
|
def __getitem__(self, s):
|
|
self._execute_query()
|
|
|
|
if isinstance(s, slice):
|
|
start = s.start if s.start else 0
|
|
|
|
# calculate the amount of results that need to be loaded
|
|
end = s.stop
|
|
if start < 0 or s.stop is None or s.stop < 0:
|
|
end = self.count()
|
|
|
|
try:
|
|
self._fill_result_cache_to_idx(end)
|
|
except StopIteration:
|
|
pass
|
|
|
|
return self._result_cache[start:s.stop:s.step]
|
|
else:
|
|
try:
|
|
s = int(s)
|
|
except (ValueError, TypeError):
|
|
raise TypeError('QuerySet indices must be integers')
|
|
|
|
# Using negative indexing is costly since we have to execute a count()
|
|
if s < 0:
|
|
num_results = self.count()
|
|
s += num_results
|
|
|
|
try:
|
|
self._fill_result_cache_to_idx(s)
|
|
except StopIteration:
|
|
raise IndexError
|
|
|
|
return self._result_cache[s]
|
|
|
|
def _get_result_constructor(self):
|
|
"""
|
|
Returns a function that will be used to instantiate query results
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
@staticmethod
|
|
def _construct_with_deferred(f, deferred, row):
|
|
row.update(deferred)
|
|
return f(row)
|
|
|
|
def _maybe_inject_deferred(self, constructor):
|
|
return partial(self._construct_with_deferred, constructor, self._deferred_values)\
|
|
if self._deferred_values else constructor
|
|
|
|
def batch(self, batch_obj):
|
|
"""
|
|
Set a batch object to run the query on.
|
|
|
|
Note: running a select query with a batch object will raise an exception
|
|
"""
|
|
if self._connection:
|
|
raise CQLEngineException("Cannot specify the connection on model in batch mode.")
|
|
|
|
if batch_obj is not None and not isinstance(batch_obj, BatchQuery):
|
|
raise CQLEngineException('batch_obj must be a BatchQuery instance or None')
|
|
clone = copy.deepcopy(self)
|
|
clone._batch = batch_obj
|
|
return clone
|
|
|
|
def first(self):
|
|
try:
|
|
return six.next(iter(self))
|
|
except StopIteration:
|
|
return None
|
|
|
|
def all(self):
|
|
"""
|
|
Returns a queryset matching all rows
|
|
|
|
.. code-block:: python
|
|
|
|
for user in User.objects().all():
|
|
print(user)
|
|
"""
|
|
return copy.deepcopy(self)
|
|
|
|
def consistency(self, consistency):
|
|
"""
|
|
Sets the consistency level for the operation. See :class:`.ConsistencyLevel`.
|
|
|
|
.. code-block:: python
|
|
|
|
for user in User.objects(id=3).consistency(CL.ONE):
|
|
print(user)
|
|
"""
|
|
clone = copy.deepcopy(self)
|
|
clone._consistency = consistency
|
|
return clone
|
|
|
|
def _parse_filter_arg(self, arg):
|
|
"""
|
|
Parses a filter arg in the format:
|
|
<colname>__<op>
|
|
:returns: colname, op tuple
|
|
"""
|
|
statement = arg.rsplit('__', 1)
|
|
if len(statement) == 1:
|
|
return arg, None
|
|
elif len(statement) == 2:
|
|
return (statement[0], statement[1]) if arg != 'pk__token' else (arg, None)
|
|
else:
|
|
raise QueryException("Can't parse '{0}'".format(arg))
|
|
|
|
def iff(self, *args, **kwargs):
|
|
"""Adds IF statements to queryset"""
|
|
if len([x for x in kwargs.values() if x is None]):
|
|
raise CQLEngineException("None values on iff are not allowed")
|
|
|
|
clone = copy.deepcopy(self)
|
|
for operator in args:
|
|
if not isinstance(operator, ConditionalClause):
|
|
raise QueryException('{0} is not a valid query operator'.format(operator))
|
|
clone._conditional.append(operator)
|
|
|
|
for arg, val in kwargs.items():
|
|
if isinstance(val, Token):
|
|
raise QueryException("Token() values are not valid in conditionals")
|
|
|
|
col_name, col_op = self._parse_filter_arg(arg)
|
|
try:
|
|
column = self.model._get_column(col_name)
|
|
except KeyError:
|
|
raise QueryException("Can't resolve column name: '{0}'".format(col_name))
|
|
|
|
if isinstance(val, BaseQueryFunction):
|
|
query_val = val
|
|
else:
|
|
query_val = column.to_database(val)
|
|
|
|
operator_class = BaseWhereOperator.get_operator(col_op or 'EQ')
|
|
operator = operator_class()
|
|
clone._conditional.append(WhereClause(column.db_field_name, operator, query_val))
|
|
|
|
return clone
|
|
|
|
def filter(self, *args, **kwargs):
|
|
"""
|
|
Adds WHERE arguments to the queryset, returning a new queryset
|
|
|
|
See :ref:`retrieving-objects-with-filters`
|
|
|
|
Returns a QuerySet filtered on the keyword arguments
|
|
"""
|
|
# add arguments to the where clause filters
|
|
if len([x for x in kwargs.values() if x is None]):
|
|
raise CQLEngineException("None values on filter are not allowed")
|
|
|
|
clone = copy.deepcopy(self)
|
|
for operator in args:
|
|
if not isinstance(operator, WhereClause):
|
|
raise QueryException('{0} is not a valid query operator'.format(operator))
|
|
clone._where.append(operator)
|
|
|
|
for arg, val in kwargs.items():
|
|
col_name, col_op = self._parse_filter_arg(arg)
|
|
quote_field = True
|
|
|
|
if not isinstance(val, Token):
|
|
try:
|
|
column = self.model._get_column(col_name)
|
|
except KeyError:
|
|
raise QueryException("Can't resolve column name: '{0}'".format(col_name))
|
|
else:
|
|
if col_name != 'pk__token':
|
|
raise QueryException("Token() values may only be compared to the 'pk__token' virtual column")
|
|
|
|
column = columns._PartitionKeysToken(self.model)
|
|
quote_field = False
|
|
|
|
partition_columns = column.partition_columns
|
|
if len(partition_columns) != len(val.value):
|
|
raise QueryException(
|
|
'Token() received {0} arguments but model has {1} partition keys'.format(
|
|
len(val.value), len(partition_columns)))
|
|
val.set_columns(partition_columns)
|
|
|
|
# get query operator, or use equals if not supplied
|
|
operator_class = BaseWhereOperator.get_operator(col_op or 'EQ')
|
|
operator = operator_class()
|
|
|
|
if isinstance(operator, InOperator):
|
|
if not isinstance(val, (list, tuple)):
|
|
raise QueryException('IN queries must use a list/tuple value')
|
|
query_val = [column.to_database(v) for v in val]
|
|
elif isinstance(val, BaseQueryFunction):
|
|
query_val = val
|
|
elif (isinstance(operator, ContainsOperator) and
|
|
isinstance(column, (columns.List, columns.Set, columns.Map))):
|
|
# For ContainsOperator and collections, we query using the value, not the container
|
|
query_val = val
|
|
else:
|
|
query_val = column.to_database(val)
|
|
if not col_op: # only equal values should be deferred
|
|
clone._defer_fields.add(column.db_field_name)
|
|
clone._deferred_values[column.db_field_name] = val # map by db field name for substitution in results
|
|
|
|
clone._where.append(WhereClause(column.db_field_name, operator, query_val, quote_field=quote_field))
|
|
|
|
return clone
|
|
|
|
def get(self, *args, **kwargs):
|
|
"""
|
|
Returns a single instance matching this query, optionally with additional filter kwargs.
|
|
|
|
See :ref:`retrieving-objects-with-filters`
|
|
|
|
Returns a single object matching the QuerySet.
|
|
|
|
.. code-block:: python
|
|
|
|
user = User.get(id=1)
|
|
|
|
If no objects are matched, a :class:`~.DoesNotExist` exception is raised.
|
|
|
|
If more than one object is found, a :class:`~.MultipleObjectsReturned` exception is raised.
|
|
"""
|
|
if args or kwargs:
|
|
return self.filter(*args, **kwargs).get()
|
|
|
|
self._execute_query()
|
|
|
|
# Check that the resultset only contains one element, avoiding sending a COUNT query
|
|
try:
|
|
self[1]
|
|
raise self.model.MultipleObjectsReturned('Multiple objects found')
|
|
except IndexError:
|
|
pass
|
|
|
|
try:
|
|
obj = self[0]
|
|
except IndexError:
|
|
raise self.model.DoesNotExist
|
|
|
|
return obj
|
|
|
|
def _get_ordering_condition(self, colname):
|
|
order_type = 'DESC' if colname.startswith('-') else 'ASC'
|
|
colname = colname.replace('-', '')
|
|
|
|
return colname, order_type
|
|
|
|
def order_by(self, *colnames):
|
|
"""
|
|
Sets the column(s) to be used for ordering
|
|
|
|
Default order is ascending, prepend a '-' to any column name for descending
|
|
|
|
*Note: column names must be a clustering key*
|
|
|
|
.. code-block:: python
|
|
|
|
from uuid import uuid1,uuid4
|
|
|
|
class Comment(Model):
|
|
photo_id = UUID(primary_key=True)
|
|
comment_id = TimeUUID(primary_key=True, default=uuid1) # second primary key component is a clustering key
|
|
comment = Text()
|
|
|
|
sync_table(Comment)
|
|
|
|
u = uuid4()
|
|
for x in range(5):
|
|
Comment.create(photo_id=u, comment="test %d" % x)
|
|
|
|
print("Normal")
|
|
for comment in Comment.objects(photo_id=u):
|
|
print comment.comment_id
|
|
|
|
print("Reversed")
|
|
for comment in Comment.objects(photo_id=u).order_by("-comment_id"):
|
|
print comment.comment_id
|
|
"""
|
|
if len(colnames) == 0:
|
|
clone = copy.deepcopy(self)
|
|
clone._order = []
|
|
return clone
|
|
|
|
conditions = []
|
|
for colname in colnames:
|
|
conditions.append('"{0}" {1}'.format(*self._get_ordering_condition(colname)))
|
|
|
|
clone = copy.deepcopy(self)
|
|
clone._order.extend(conditions)
|
|
return clone
|
|
|
|
def count(self):
|
|
"""
|
|
Returns the number of rows matched by this query.
|
|
|
|
*Note: This function executes a SELECT COUNT() and has a performance cost on large datasets*
|
|
"""
|
|
if self._batch:
|
|
raise CQLEngineException("Only inserts, updates, and deletes are available in batch mode")
|
|
|
|
if self._count is None:
|
|
query = self._select_query()
|
|
query.count = True
|
|
result = self._execute(query)
|
|
count_row = result[0].popitem()
|
|
self._count = count_row[1]
|
|
return self._count
|
|
|
|
def distinct(self, distinct_fields=None):
|
|
"""
|
|
Returns the DISTINCT rows matched by this query.
|
|
|
|
distinct_fields default to the partition key fields if not specified.
|
|
|
|
*Note: distinct_fields must be a partition key or a static column*
|
|
|
|
.. code-block:: python
|
|
|
|
class Automobile(Model):
|
|
manufacturer = columns.Text(partition_key=True)
|
|
year = columns.Integer(primary_key=True)
|
|
model = columns.Text(primary_key=True)
|
|
price = columns.Decimal()
|
|
|
|
sync_table(Automobile)
|
|
|
|
# create rows
|
|
|
|
Automobile.objects.distinct()
|
|
|
|
# or
|
|
|
|
Automobile.objects.distinct(['manufacturer'])
|
|
|
|
"""
|
|
|
|
clone = copy.deepcopy(self)
|
|
if distinct_fields:
|
|
clone._distinct_fields = distinct_fields
|
|
else:
|
|
clone._distinct_fields = [x.column_name for x in self.model._partition_keys.values()]
|
|
|
|
return clone
|
|
|
|
def limit(self, v):
|
|
"""
|
|
Limits the number of results returned by Cassandra. Use *0* or *None* to disable.
|
|
|
|
*Note that CQL's default limit is 10,000, so all queries without a limit set explicitly will have an implicit limit of 10,000*
|
|
|
|
.. code-block:: python
|
|
|
|
# Fetch 100 users
|
|
for user in User.objects().limit(100):
|
|
print(user)
|
|
|
|
# Fetch all users
|
|
for user in User.objects().limit(None):
|
|
print(user)
|
|
"""
|
|
|
|
if v is None:
|
|
v = 0
|
|
|
|
if not isinstance(v, six.integer_types):
|
|
raise TypeError
|
|
if v == self._limit:
|
|
return self
|
|
|
|
if v < 0:
|
|
raise QueryException("Negative limit is not allowed")
|
|
|
|
clone = copy.deepcopy(self)
|
|
clone._limit = v
|
|
return clone
|
|
|
|
def fetch_size(self, v):
|
|
"""
|
|
Sets the number of rows that are fetched at a time.
|
|
|
|
*Note that driver's default fetch size is 5000.*
|
|
|
|
.. code-block:: python
|
|
|
|
for user in User.objects().fetch_size(500):
|
|
print(user)
|
|
"""
|
|
|
|
if not isinstance(v, six.integer_types):
|
|
raise TypeError
|
|
if v == self._fetch_size:
|
|
return self
|
|
|
|
if v < 1:
|
|
raise QueryException("fetch size less than 1 is not allowed")
|
|
|
|
clone = copy.deepcopy(self)
|
|
clone._fetch_size = v
|
|
return clone
|
|
|
|
def allow_filtering(self):
|
|
"""
|
|
Enables the (usually) unwise practive of querying on a clustering key without also defining a partition key
|
|
"""
|
|
clone = copy.deepcopy(self)
|
|
clone._allow_filtering = True
|
|
return clone
|
|
|
|
def _only_or_defer(self, action, fields):
|
|
if action == 'only' and self._only_fields:
|
|
raise QueryException("QuerySet already has 'only' fields defined")
|
|
|
|
clone = copy.deepcopy(self)
|
|
|
|
# check for strange fields
|
|
missing_fields = [f for f in fields if f not in self.model._columns.keys()]
|
|
if missing_fields:
|
|
raise QueryException(
|
|
"Can't resolve fields {0} in {1}".format(
|
|
', '.join(missing_fields), self.model.__name__))
|
|
|
|
fields = [self.model._columns[field].db_field_name for field in fields]
|
|
|
|
if action == 'defer':
|
|
clone._defer_fields.update(fields)
|
|
elif action == 'only':
|
|
clone._only_fields = fields
|
|
else:
|
|
raise ValueError
|
|
|
|
return clone
|
|
|
|
def only(self, fields):
|
|
""" Load only these fields for the returned query """
|
|
return self._only_or_defer('only', fields)
|
|
|
|
def defer(self, fields):
|
|
""" Don't load these fields for the returned query """
|
|
return self._only_or_defer('defer', fields)
|
|
|
|
def create(self, **kwargs):
|
|
return self.model(**kwargs) \
|
|
.batch(self._batch) \
|
|
.ttl(self._ttl) \
|
|
.consistency(self._consistency) \
|
|
.if_not_exists(self._if_not_exists) \
|
|
.timestamp(self._timestamp) \
|
|
.if_exists(self._if_exists) \
|
|
.using(connection=self._connection) \
|
|
.save()
|
|
|
|
def delete(self):
|
|
"""
|
|
Deletes the contents of a query
|
|
"""
|
|
# validate where clause
|
|
partition_keys = set(x.db_field_name for x in self.model._partition_keys.values())
|
|
if partition_keys - set(c.field for c in self._where):
|
|
raise QueryException("The partition key must be defined on delete queries")
|
|
|
|
dq = DeleteStatement(
|
|
self.column_family_name,
|
|
where=self._where,
|
|
timestamp=self._timestamp,
|
|
conditionals=self._conditional,
|
|
if_exists=self._if_exists
|
|
)
|
|
self._execute(dq)
|
|
|
|
def __eq__(self, q):
|
|
if len(self._where) == len(q._where):
|
|
return all([w in q._where for w in self._where])
|
|
return False
|
|
|
|
def __ne__(self, q):
|
|
return not (self != q)
|
|
|
|
def timeout(self, timeout):
|
|
"""
|
|
:param timeout: Timeout for the query (in seconds)
|
|
:type timeout: float or None
|
|
"""
|
|
clone = copy.deepcopy(self)
|
|
clone._timeout = timeout
|
|
return clone
|
|
|
|
def using(self, keyspace=None, connection=None):
|
|
"""
|
|
Change the context on-the-fly of the Model class (keyspace, connection)
|
|
"""
|
|
|
|
if connection and self._batch:
|
|
raise CQLEngineException("Cannot specify a connection on model in batch mode.")
|
|
|
|
clone = copy.deepcopy(self)
|
|
if keyspace:
|
|
from cassandra.cqlengine.models import _clone_model_class
|
|
clone.model = _clone_model_class(self.model, {'__keyspace__': keyspace})
|
|
|
|
if connection:
|
|
clone._connection = connection
|
|
|
|
return clone
|
|
|
|
|
|
class ResultObject(dict):
|
|
"""
|
|
adds attribute access to a dictionary
|
|
"""
|
|
|
|
def __getattr__(self, item):
|
|
try:
|
|
return self[item]
|
|
except KeyError:
|
|
raise AttributeError
|
|
|
|
|
|
class SimpleQuerySet(AbstractQuerySet):
|
|
"""
|
|
Overrides _get_result_constructor for querysets that do not define a model (e.g. NamedTable queries)
|
|
"""
|
|
|
|
def _get_result_constructor(self):
|
|
"""
|
|
Returns a function that will be used to instantiate query results
|
|
"""
|
|
return ResultObject
|
|
|
|
|
|
class ModelQuerySet(AbstractQuerySet):
|
|
"""
|
|
"""
|
|
def _validate_select_where(self):
|
|
""" Checks that a filterset will not create invalid select statement """
|
|
# check that there's either a =, a IN or a CONTAINS (collection) relationship with a primary key or indexed field
|
|
equal_ops = [self.model._get_column_by_db_name(w.field) \
|
|
for w in self._where if isinstance(w.operator, EqualsOperator) and not isinstance(w.value, Token)]
|
|
token_comparison = any([w for w in self._where if isinstance(w.value, Token)])
|
|
if not any(w.primary_key or w.index for w in equal_ops) and not token_comparison and not self._allow_filtering:
|
|
raise QueryException(('Where clauses require either =, a IN or a CONTAINS (collection) '
|
|
'comparison with either a primary key or indexed field'))
|
|
|
|
if not self._allow_filtering:
|
|
# if the query is not on an indexed field
|
|
if not any(w.index for w in equal_ops):
|
|
if not any([w.partition_key for w in equal_ops]) and not token_comparison:
|
|
raise QueryException('Filtering on a clustering key without a partition key is not allowed unless allow_filtering() is called on the querset')
|
|
|
|
def _select_fields(self):
|
|
if self._defer_fields or self._only_fields:
|
|
fields = [columns.db_field_name for columns in self.model._columns.values()]
|
|
if self._defer_fields:
|
|
fields = [f for f in fields if f not in self._defer_fields]
|
|
# select the partition keys if all model fields are set defer
|
|
if not fields:
|
|
fields = [columns.db_field_name for columns in self.model._partition_keys.values()]
|
|
if self._only_fields:
|
|
fields = [f for f in fields if f in self._only_fields]
|
|
if not fields:
|
|
raise QueryException('No fields in select query. Only fields: "{0}", defer fields: "{1}"'.format(
|
|
','.join(self._only_fields), ','.join(self._defer_fields)))
|
|
return fields
|
|
return super(ModelQuerySet, self)._select_fields()
|
|
|
|
def _get_result_constructor(self):
|
|
""" Returns a function that will be used to instantiate query results """
|
|
if not self._values_list: # we want models
|
|
return self.model._construct_instance
|
|
elif self._flat_values_list: # the user has requested flattened list (1 value per row)
|
|
key = self._only_fields[0]
|
|
return lambda row: row[key]
|
|
else:
|
|
return lambda row: [row[f] for f in self._only_fields]
|
|
|
|
def _get_ordering_condition(self, colname):
|
|
colname, order_type = super(ModelQuerySet, self)._get_ordering_condition(colname)
|
|
|
|
column = self.model._columns.get(colname)
|
|
if column is None:
|
|
raise QueryException("Can't resolve the column name: '{0}'".format(colname))
|
|
|
|
# validate the column selection
|
|
if not column.primary_key:
|
|
raise QueryException(
|
|
"Can't order on '{0}', can only order on (clustered) primary keys".format(colname))
|
|
|
|
pks = [v for k, v in self.model._columns.items() if v.primary_key]
|
|
if column == pks[0]:
|
|
raise QueryException(
|
|
"Can't order by the first primary key (partition key), clustering (secondary) keys only")
|
|
|
|
return column.db_field_name, order_type
|
|
|
|
def values_list(self, *fields, **kwargs):
|
|
""" Instructs the query set to return tuples, not model instance """
|
|
flat = kwargs.pop('flat', False)
|
|
if kwargs:
|
|
raise TypeError('Unexpected keyword arguments to values_list: %s'
|
|
% (kwargs.keys(),))
|
|
if flat and len(fields) > 1:
|
|
raise TypeError("'flat' is not valid when values_list is called with more than one field.")
|
|
clone = self.only(fields)
|
|
clone._values_list = True
|
|
clone._flat_values_list = flat
|
|
return clone
|
|
|
|
def ttl(self, ttl):
|
|
"""
|
|
Sets the ttl (in seconds) for modified data.
|
|
|
|
*Note that running a select query with a ttl value will raise an exception*
|
|
"""
|
|
clone = copy.deepcopy(self)
|
|
clone._ttl = ttl
|
|
return clone
|
|
|
|
def timestamp(self, timestamp):
|
|
"""
|
|
Allows for custom timestamps to be saved with the record.
|
|
"""
|
|
clone = copy.deepcopy(self)
|
|
clone._timestamp = timestamp
|
|
return clone
|
|
|
|
def if_not_exists(self):
|
|
"""
|
|
Check the existence of an object before insertion.
|
|
|
|
If the insertion isn't applied, a LWTException is raised.
|
|
"""
|
|
if self.model._has_counter:
|
|
raise IfNotExistsWithCounterColumn('if_not_exists cannot be used with tables containing counter columns')
|
|
clone = copy.deepcopy(self)
|
|
clone._if_not_exists = True
|
|
return clone
|
|
|
|
def if_exists(self):
|
|
"""
|
|
Check the existence of an object before an update or delete.
|
|
|
|
If the update or delete isn't applied, a LWTException is raised.
|
|
"""
|
|
if self.model._has_counter:
|
|
raise IfExistsWithCounterColumn('if_exists cannot be used with tables containing counter columns')
|
|
clone = copy.deepcopy(self)
|
|
clone._if_exists = True
|
|
return clone
|
|
|
|
def update(self, **values):
|
|
"""
|
|
Performs an update on the row selected by the queryset. Include values to update in the
|
|
update like so:
|
|
|
|
.. code-block:: python
|
|
|
|
Model.objects(key=n).update(value='x')
|
|
|
|
Passing in updates for columns which are not part of the model will raise a ValidationError.
|
|
|
|
Per column validation will be performed, but instance level validation will not
|
|
(i.e., `Model.validate` is not called). This is sometimes referred to as a blind update.
|
|
|
|
For example:
|
|
|
|
.. code-block:: python
|
|
|
|
class User(Model):
|
|
id = Integer(primary_key=True)
|
|
name = Text()
|
|
|
|
setup(["localhost"], "test")
|
|
sync_table(User)
|
|
|
|
u = User.create(id=1, name="jon")
|
|
|
|
User.objects(id=1).update(name="Steve")
|
|
|
|
# sets name to null
|
|
User.objects(id=1).update(name=None)
|
|
|
|
|
|
Also supported is blindly adding and removing elements from container columns,
|
|
without loading a model instance from Cassandra.
|
|
|
|
Using the syntax `.update(column_name={x, y, z})` will overwrite the contents of the container, like updating a
|
|
non container column. However, adding `__<operation>` to the end of the keyword arg, makes the update call add
|
|
or remove items from the collection, without overwriting then entire column.
|
|
|
|
Given the model below, here are the operations that can be performed on the different container columns:
|
|
|
|
.. code-block:: python
|
|
|
|
class Row(Model):
|
|
row_id = columns.Integer(primary_key=True)
|
|
set_column = columns.Set(Integer)
|
|
list_column = columns.List(Integer)
|
|
map_column = columns.Map(Integer, Integer)
|
|
|
|
:class:`~cqlengine.columns.Set`
|
|
|
|
- `add`: adds the elements of the given set to the column
|
|
- `remove`: removes the elements of the given set to the column
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
# add elements to a set
|
|
Row.objects(row_id=5).update(set_column__add={6})
|
|
|
|
# remove elements to a set
|
|
Row.objects(row_id=5).update(set_column__remove={4})
|
|
|
|
:class:`~cqlengine.columns.List`
|
|
|
|
- `append`: appends the elements of the given list to the end of the column
|
|
- `prepend`: prepends the elements of the given list to the beginning of the column
|
|
|
|
.. code-block:: python
|
|
|
|
# append items to a list
|
|
Row.objects(row_id=5).update(list_column__append=[6, 7])
|
|
|
|
# prepend items to a list
|
|
Row.objects(row_id=5).update(list_column__prepend=[1, 2])
|
|
|
|
|
|
:class:`~cqlengine.columns.Map`
|
|
|
|
- `update`: adds the given keys/values to the columns, creating new entries if they didn't exist, and overwriting old ones if they did
|
|
|
|
.. code-block:: python
|
|
|
|
# add items to a map
|
|
Row.objects(row_id=5).update(map_column__update={1: 2, 3: 4})
|
|
|
|
# remove items from a map
|
|
Row.objects(row_id=5).update(map_column__remove={1, 2})
|
|
"""
|
|
if not values:
|
|
return
|
|
|
|
nulled_columns = set()
|
|
updated_columns = set()
|
|
us = UpdateStatement(self.column_family_name, where=self._where, ttl=self._ttl,
|
|
timestamp=self._timestamp, conditionals=self._conditional, if_exists=self._if_exists)
|
|
for name, val in values.items():
|
|
col_name, col_op = self._parse_filter_arg(name)
|
|
col = self.model._columns.get(col_name)
|
|
# check for nonexistant columns
|
|
if col is None:
|
|
raise ValidationError("{0}.{1} has no column named: {2}".format(self.__module__, self.model.__name__, col_name))
|
|
# check for primary key update attempts
|
|
if col.is_primary_key:
|
|
raise ValidationError("Cannot apply update to primary key '{0}' for {1}.{2}".format(col_name, self.__module__, self.model.__name__))
|
|
|
|
if col_op == 'remove' and isinstance(col, columns.Map):
|
|
if not isinstance(val, set):
|
|
raise ValidationError(
|
|
"Cannot apply update operation '{0}' on column '{1}' with value '{2}'. A set is required.".format(col_op, col_name, val))
|
|
val = {v: None for v in val}
|
|
else:
|
|
# we should not provide default values in this use case.
|
|
val = col.validate(val)
|
|
|
|
if val is None:
|
|
nulled_columns.add(col_name)
|
|
continue
|
|
|
|
us.add_update(col, val, operation=col_op)
|
|
updated_columns.add(col_name)
|
|
|
|
if us.assignments:
|
|
self._execute(us)
|
|
|
|
if nulled_columns:
|
|
delete_conditional = [condition for condition in self._conditional
|
|
if condition.field not in updated_columns] if self._conditional else None
|
|
ds = DeleteStatement(self.column_family_name, fields=nulled_columns,
|
|
where=self._where, conditionals=delete_conditional, if_exists=self._if_exists)
|
|
self._execute(ds)
|
|
|
|
|
|
class DMLQuery(object):
|
|
"""
|
|
A query object used for queries performing inserts, updates, or deletes
|
|
|
|
this is usually instantiated by the model instance to be modified
|
|
|
|
unlike the read query object, this is mutable
|
|
"""
|
|
_ttl = None
|
|
_consistency = None
|
|
_timestamp = None
|
|
_if_not_exists = False
|
|
_if_exists = False
|
|
|
|
def __init__(self, model, instance=None, batch=None, ttl=None, consistency=None, timestamp=None,
|
|
if_not_exists=False, conditional=None, timeout=conn.NOT_SET, if_exists=False):
|
|
self.model = model
|
|
self.column_family_name = self.model.column_family_name()
|
|
self.instance = instance
|
|
self._batch = batch
|
|
self._ttl = ttl
|
|
self._consistency = consistency
|
|
self._timestamp = timestamp
|
|
self._if_not_exists = if_not_exists
|
|
self._if_exists = if_exists
|
|
self._conditional = conditional
|
|
self._timeout = timeout
|
|
|
|
def _execute(self, statement):
|
|
connection = self.instance._get_connection() if self.instance else self.model._get_connection()
|
|
if self._batch:
|
|
if self._batch._connection:
|
|
if not self._batch._connection_explicit and connection and \
|
|
connection != self._batch._connection:
|
|
raise CQLEngineException('BatchQuery queries must be executed on the same connection')
|
|
else:
|
|
# set the BatchQuery connection from the model
|
|
self._batch._connection = connection
|
|
return self._batch.add_query(statement)
|
|
else:
|
|
results = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection)
|
|
if self._if_not_exists or self._if_exists or self._conditional:
|
|
check_applied(results)
|
|
return results
|
|
|
|
def batch(self, batch_obj):
|
|
if batch_obj is not None and not isinstance(batch_obj, BatchQuery):
|
|
raise CQLEngineException('batch_obj must be a BatchQuery instance or None')
|
|
self._batch = batch_obj
|
|
return self
|
|
|
|
def _delete_null_columns(self, conditionals=None):
|
|
"""
|
|
executes a delete query to remove columns that have changed to null
|
|
"""
|
|
ds = DeleteStatement(self.column_family_name, conditionals=conditionals, if_exists=self._if_exists)
|
|
deleted_fields = False
|
|
static_only = True
|
|
for _, v in self.instance._values.items():
|
|
col = v.column
|
|
if v.deleted:
|
|
ds.add_field(col.db_field_name)
|
|
deleted_fields = True
|
|
static_only &= col.static
|
|
elif isinstance(col, columns.Map):
|
|
uc = MapDeleteClause(col.db_field_name, v.value, v.previous_value)
|
|
if uc.get_context_size() > 0:
|
|
ds.add_field(uc)
|
|
deleted_fields = True
|
|
static_only |= col.static
|
|
|
|
if deleted_fields:
|
|
keys = self.model._partition_keys if static_only else self.model._primary_keys
|
|
for name, col in keys.items():
|
|
ds.add_where(col, EqualsOperator(), getattr(self.instance, name))
|
|
self._execute(ds)
|
|
|
|
def update(self):
|
|
"""
|
|
updates a row.
|
|
This is a blind update call.
|
|
All validation and cleaning needs to happen
|
|
prior to calling this.
|
|
"""
|
|
if self.instance is None:
|
|
raise CQLEngineException("DML Query intance attribute is None")
|
|
assert type(self.instance) == self.model
|
|
null_clustering_key = False if len(self.instance._clustering_keys) == 0 else True
|
|
static_changed_only = True
|
|
statement = UpdateStatement(self.column_family_name, ttl=self._ttl, timestamp=self._timestamp,
|
|
conditionals=self._conditional, if_exists=self._if_exists)
|
|
for name, col in self.instance._clustering_keys.items():
|
|
null_clustering_key = null_clustering_key and col._val_is_null(getattr(self.instance, name, None))
|
|
|
|
updated_columns = set()
|
|
# get defined fields and their column names
|
|
for name, col in self.model._columns.items():
|
|
# if clustering key is null, don't include non static columns
|
|
if null_clustering_key and not col.static and not col.partition_key:
|
|
continue
|
|
if not col.is_primary_key:
|
|
val = getattr(self.instance, name, None)
|
|
val_mgr = self.instance._values[name]
|
|
|
|
if val is None:
|
|
continue
|
|
|
|
if not val_mgr.changed and not isinstance(col, columns.Counter):
|
|
continue
|
|
|
|
static_changed_only = static_changed_only and col.static
|
|
statement.add_update(col, val, previous=val_mgr.previous_value)
|
|
updated_columns.add(col.db_field_name)
|
|
|
|
if statement.assignments:
|
|
for name, col in self.model._primary_keys.items():
|
|
# only include clustering key if clustering key is not null, and non static columns are changed to avoid cql error
|
|
if (null_clustering_key or static_changed_only) and (not col.partition_key):
|
|
continue
|
|
statement.add_where(col, EqualsOperator(), getattr(self.instance, name))
|
|
self._execute(statement)
|
|
|
|
if not null_clustering_key:
|
|
# remove conditions on fields that have been updated
|
|
delete_conditionals = [condition for condition in self._conditional
|
|
if condition.field not in updated_columns] if self._conditional else None
|
|
self._delete_null_columns(delete_conditionals)
|
|
|
|
def save(self):
|
|
"""
|
|
Creates / updates a row.
|
|
This is a blind insert call.
|
|
All validation and cleaning needs to happen
|
|
prior to calling this.
|
|
"""
|
|
if self.instance is None:
|
|
raise CQLEngineException("DML Query intance attribute is None")
|
|
assert type(self.instance) == self.model
|
|
|
|
nulled_fields = set()
|
|
if self.instance._has_counter or self.instance._can_update():
|
|
if self.instance._has_counter:
|
|
warn("'create' and 'save' actions on Counters are deprecated. A future version will disallow this. Use the 'update' mechanism instead.")
|
|
return self.update()
|
|
else:
|
|
insert = InsertStatement(self.column_family_name, ttl=self._ttl, timestamp=self._timestamp, if_not_exists=self._if_not_exists)
|
|
static_save_only = False if len(self.instance._clustering_keys) == 0 else True
|
|
for name, col in self.instance._clustering_keys.items():
|
|
static_save_only = static_save_only and col._val_is_null(getattr(self.instance, name, None))
|
|
for name, col in self.instance._columns.items():
|
|
if static_save_only and not col.static and not col.partition_key:
|
|
continue
|
|
val = getattr(self.instance, name, None)
|
|
if col._val_is_null(val):
|
|
if self.instance._values[name].changed:
|
|
nulled_fields.add(col.db_field_name)
|
|
continue
|
|
if col.has_default and not self.instance._values[name].changed:
|
|
# Ensure default columns included in a save() are marked as explicit, to get them *persisted* properly
|
|
self.instance._values[name].explicit = True
|
|
insert.add_assignment(col, getattr(self.instance, name, None))
|
|
|
|
# skip query execution if it's empty
|
|
# caused by pointless update queries
|
|
if not insert.is_empty:
|
|
self._execute(insert)
|
|
# delete any nulled columns
|
|
if not static_save_only:
|
|
self._delete_null_columns()
|
|
|
|
def delete(self):
|
|
""" Deletes one instance """
|
|
if self.instance is None:
|
|
raise CQLEngineException("DML Query instance attribute is None")
|
|
|
|
ds = DeleteStatement(self.column_family_name, timestamp=self._timestamp, conditionals=self._conditional, if_exists=self._if_exists)
|
|
for name, col in self.model._primary_keys.items():
|
|
val = getattr(self.instance, name)
|
|
if val is None and not col.partition_key:
|
|
continue
|
|
ds.add_where(col, EqualsOperator(), val)
|
|
self._execute(ds)
|
|
|
|
|
|
def _execute_statement(model, statement, consistency_level, timeout, connection=None):
|
|
params = statement.get_context()
|
|
s = SimpleStatement(str(statement), consistency_level=consistency_level, fetch_size=statement.fetch_size)
|
|
if model._partition_key_index:
|
|
key_values = statement.partition_key_values(model._partition_key_index)
|
|
if not any(v is None for v in key_values):
|
|
parts = model._routing_key_from_values(key_values, conn.get_cluster(connection).protocol_version)
|
|
s.routing_key = parts
|
|
s.keyspace = model._get_keyspace()
|
|
connection = connection or model._get_connection()
|
|
return conn.execute(s, params, timeout=timeout, connection=connection)
|