Merge pull request #253 from bergundy/master

add query timeout feature
This commit is contained in:
Jon Haddad
2014-11-19 08:02:04 -08:00
6 changed files with 172 additions and 36 deletions

View File

@@ -1,26 +1,21 @@
#http://pypi.python.org/pypi/cql/1.0.4
#http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2 /
#http://cassandra.apache.org/doc/cql/CQL.html
from __future__ import absolute_import
from collections import namedtuple
from cassandra.cluster import Cluster, NoHostAvailable
from cassandra.query import SimpleStatement, Statement
import six
try:
import Queue as queue
except ImportError:
# python 3
import queue
import logging
from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable
from cassandra.query import SimpleStatement, Statement, dict_factory
from cqlengine.statements import BaseCQLStatement
from cqlengine.exceptions import CQLEngineException, UndefinedKeyspaceException
from cassandra import ConsistencyLevel
from cqlengine.statements import BaseCQLStatement
from cassandra.query import dict_factory
import six
import logging
LOG = logging.getLogger('cqlengine.cql')
NOT_SET = _NOT_SET # required for passing timeout to Session.execute
class CQLConnectionError(CQLEngineException): pass
@@ -85,7 +80,8 @@ def setup(
raise
session.row_factory = dict_factory
def execute(query, params=None, consistency_level=None):
def execute(query, params=None, consistency_level=None, timeout=NOT_SET):
handle_lazy_connect()
@@ -109,7 +105,7 @@ def execute(query, params=None, consistency_level=None):
LOG.info(query.query_string)
params = params or {}
result = session.execute(query, params)
result = session.execute(query, params, timeout=timeout)
return result

View File

@@ -4,7 +4,7 @@ import warnings
from cqlengine import columns
from cqlengine.exceptions import ModelException, CQLEngineException, ValidationError
from cqlengine.query import ModelQuerySet, DMLQuery, AbstractQueryableColumn
from cqlengine.query import ModelQuerySet, DMLQuery, AbstractQueryableColumn, NOT_SET
from cqlengine.query import DoesNotExist as _DoesNotExist
from cqlengine.query import MultipleObjectsReturned as _MultipleObjectsReturned
@@ -342,6 +342,7 @@ class BaseModel(object):
# that update should be used when persisting changes
self._is_persisted = False
self._batch = None
self._timeout = NOT_SET
def __repr__(self):
@@ -564,6 +565,11 @@ class BaseModel(object):
def get(cls, *args, **kwargs):
return cls.objects.get(*args, **kwargs)
def timeout(self, timeout):
assert self._batch is None, 'Setting both timeout and batch is not supported'
self._timeout = timeout
return self
def save(self):
# handle polymorphic models
if self._is_polymorphic:
@@ -580,7 +586,8 @@ class BaseModel(object):
timestamp=self._timestamp,
consistency=self.__consistency__,
if_not_exists=self._if_not_exists,
transaction=self._transaction).save()
transaction=self._transaction,
timeout=self._timeout).save()
#reset the value managers
for v in self._values.values():
@@ -619,7 +626,8 @@ class BaseModel(object):
ttl=self._ttl,
timestamp=self._timestamp,
consistency=self.__consistency__,
transaction=self._transaction).update()
transaction=self._transaction,
timeout=self._timeout).update()
#reset the value managers
for v in self._values.values():
@@ -633,7 +641,11 @@ class BaseModel(object):
def delete(self):
""" Deletes this instance """
self.__dmlquery__(self.__class__, self, batch=self._batch, timestamp=self._timestamp, consistency=self.__consistency__).delete()
self.__dmlquery__(self.__class__, self,
batch=self._batch,
timestamp=self._timestamp,
consistency=self.__consistency__,
timeout=self._timeout).delete()
def get_changed_columns(self):
""" returns a list of the columns that have been updated since instantiation or save """
@@ -644,6 +656,7 @@ class BaseModel(object):
return cls.objects.batch(batch)
def _inst_batch(self, batch):
assert self._timeout is NOT_SET, 'Setting both timeout and batch is not supported'
self._batch = batch
return self

View File

@@ -1,10 +1,12 @@
from __future__ import absolute_import
import copy
import time
from datetime import datetime, timedelta
from cqlengine import BaseContainerColumn, Map, columns
from cqlengine.columns import Counter, List, Set
from cqlengine.connection import execute
from .connection import execute, NOT_SET
from cqlengine.exceptions import CQLEngineException, ValidationError, LWTException
from cqlengine.functions import Token, BaseQueryFunction, QueryValue, UnicodeMixin
@@ -87,7 +89,8 @@ class BatchQuery(object):
"""
_consistency = None
def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on_exception=False):
def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on_exception=False,
timeout=NOT_SET):
"""
:param batch_type: (optional) One of batch type values available through BatchType enum
:type batch_type: str or None
@@ -101,10 +104,9 @@ class BatchQuery(object):
encountering an error within the context. By default, any exception raised from within
the context scope will cause the batched queries not to be executed.
:type execute_on_exception: bool
:param callbacks: A list of functions to be executed after the batch executes. Note, that if the batch
does not execute, the callbacks are not executed. This, thus, effectively is a list of "on success"
callback handlers. If defined, must be a collection of callables.
:type callbacks: list or set or tuple
:param timeout: (optional) Timeout for the entire batch (in seconds), if not specified fallback
to default session timeout
:type timeout: float or None
"""
self.queries = []
self.batch_type = batch_type
@@ -113,6 +115,7 @@ class BatchQuery(object):
self.timestamp = timestamp
self._consistency = consistency
self._execute_on_exception = execute_on_exception
self._timeout = timeout
self._callbacks = []
def add_query(self, query):
@@ -181,7 +184,7 @@ class BatchQuery(object):
query_list.append('APPLY BATCH;')
tmp = execute('\n'.join(query_list), parameters, self._consistency)
tmp = execute('\n'.join(query_list), parameters, self._consistency, self._timeout)
check_applied(tmp)
self.queries = []
@@ -235,6 +238,7 @@ class AbstractQuerySet(object):
self._consistency = None
self._timestamp = None
self._if_not_exists = False
self._timeout = NOT_SET
@property
def column_family_name(self):
@@ -244,7 +248,7 @@ class AbstractQuerySet(object):
if self._batch:
return self._batch.add_query(q)
else:
result = execute(q, consistency_level=self._consistency)
result = execute(q, consistency_level=self._consistency, timeout=self._timeout)
if self._transaction:
check_applied(result)
return result
@@ -269,6 +273,8 @@ class AbstractQuerySet(object):
# fly off into other batch instances which are never
# executed, thx @dokai
clone.__dict__[k] = self._batch
elif k == '_timeout':
clone.__dict__[k] = self._timeout
else:
clone.__dict__[k] = copy.deepcopy(v, memo)
@@ -656,6 +662,15 @@ class AbstractQuerySet(object):
def __ne__(self, q):
return not (self != q)
def timeout(self, timeout):
"""
:param timeout: Timeout for the query (in seconds)
:type timeout: float or None
"""
clone = copy.deepcopy(self)
clone._timeout = timeout
return clone
class ResultObject(dict):
"""
@@ -835,7 +850,8 @@ class DMLQuery(object):
_timestamp = None
_if_not_exists = False
def __init__(self, model, instance=None, batch=None, ttl=None, consistency=None, timestamp=None, if_not_exists=False, transaction=None):
def __init__(self, model, instance=None, batch=None, ttl=None, consistency=None, timestamp=None,
if_not_exists=False, transaction=None, timeout=NOT_SET):
self.model = model
self.column_family_name = self.model.column_family_name()
self.instance = instance
@@ -845,12 +861,13 @@ class DMLQuery(object):
self._timestamp = timestamp
self._if_not_exists = if_not_exists
self._transaction = transaction
self._timeout = timeout
def _execute(self, q):
if self._batch:
return self._batch.add_query(q)
else:
tmp = execute(q, consistency_level=self._consistency)
tmp = execute(q, consistency_level=self._consistency, timeout=self._timeout)
if self._if_not_exists or self._transaction:
check_applied(tmp)
return tmp
@@ -994,5 +1011,3 @@ class DMLQuery(object):
col.to_database(getattr(self.instance, name))
))
self._execute(ds)

View File

@@ -3,9 +3,13 @@ from unittest import skip
from uuid import uuid4
import random
from cqlengine import Model, columns
from cqlengine.connection import NOT_SET
from cqlengine.management import drop_table, sync_table
from cqlengine.query import BatchQuery, DMLQuery
from cqlengine.tests.base import BaseCassEngTestCase
from cassandra.cluster import Session
import mock
class TestMultiKeyModel(Model):
__keyspace__ = 'test'
@@ -169,3 +173,15 @@ class BatchQueryTests(BaseCassEngTestCase):
# should be 0 because the batch should not execute
self.assertEqual(0, len(obj))
def test_batch_execute_timeout(self):
with mock.patch.object(Session, 'execute', autospec=True) as mock_execute:
with BatchQuery(timeout=1) as b:
BatchQueryLogModel.batch(b).create(k=2, v=2)
mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=1)
def test_batch_execute_no_timeout(self):
with mock.patch.object(Session, 'execute', autospec=True) as mock_execute:
with BatchQuery() as b:
BatchQueryLogModel.batch(b).create(k=2, v=2)
mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=NOT_SET)

View File

@@ -1,15 +1,16 @@
from __future__ import absolute_import
from datetime import datetime
import time
from unittest import TestCase, skipUnless
from uuid import uuid1, uuid4
import uuid
from cassandra.cluster import Session
from cqlengine.tests.base import BaseCassEngTestCase
from cqlengine.connection import NOT_SET
import mock
from cqlengine.exceptions import ModelException
from cqlengine import functions
from cqlengine.management import sync_table, drop_table, sync_table
from cqlengine.management import drop_table
from cqlengine.management import sync_table, drop_table
from cqlengine.models import Model
from cqlengine import columns
from cqlengine import query
@@ -707,3 +708,51 @@ def test_paged_result_handling():
assert len(results) == 2
class ModelQuerySetTimeoutTestCase(BaseQuerySetUsage):
def test_default_timeout(self):
with mock.patch.object(Session, 'execute', autospec=True) as mock_execute:
list(TestModel.objects())
mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=NOT_SET)
def test_float_timeout(self):
with mock.patch.object(Session, 'execute', autospec=True) as mock_execute:
list(TestModel.objects().timeout(0.5))
mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=0.5)
def test_none_timeout(self):
with mock.patch.object(Session, 'execute', autospec=True) as mock_execute:
list(TestModel.objects().timeout(None))
mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=None)
class DMLQueryTimeoutTestCase(BaseQuerySetUsage):
def setUp(self):
self.model = TestModel(test_id=1, attempt_id=1, description='timeout test')
super(DMLQueryTimeoutTestCase, self).setUp()
def test_default_timeout(self):
with mock.patch.object(Session, 'execute', autospec=True) as mock_execute:
self.model.save()
mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=NOT_SET)
def test_float_timeout(self):
with mock.patch.object(Session, 'execute', autospec=True) as mock_execute:
self.model.timeout(0.5).save()
mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=0.5)
def test_none_timeout(self):
with mock.patch.object(Session, 'execute', autospec=True) as mock_execute:
self.model.timeout(None).save()
mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=None)
def test_timeout_then_batch(self):
b = query.BatchQuery()
m = self.model.timeout(None)
with self.assertRaises(AssertionError):
m.batch(b)
def test_batch_then_timeout(self):
b = query.BatchQuery()
m = self.model.batch(b)
with self.assertRaises(AssertionError):
m.timeout(0.5)

View File

@@ -592,6 +592,53 @@ QuerySet method reference
Row.objects(row_id=5).update(map_column__update={1: 2, 3: 4})
Per Query Timeouts
===================
By default all queries are executed with the timeout defined in `~cqlengine.connection.setup()`
The examples below show how to specify a per-query timeout.
A timeout is specified in seconds and can be an int, float or None.
None means no timeout.
.. code-block:: python
class Row(Model):
id = columns.Integer(primary_key=True)
name = columns.Text()
Fetch all objects with a timeout of 5 seconds
.. code-block:: python
Row.objects().timeout(5).all()
Create a single row with a 50ms timeout
.. code-block:: python
Row(id=1, name='Jon').timeout(0.05).create()
Delete a single row with no timeout
.. code-block:: python
Row(id=1).timeout(None).delete()
Update a single row with no timeout
.. code-block:: python
Row(id=1).timeout(None).update(name='Blake')
Batch query timeouts
.. code-block:: python
with BatchQuery(timeout=10) as b:
Row(id=1, name='Jon').create()
NOTE: You cannot set both timeout and batch at the same time, batch will use the timeout defined in it's constructor.
Setting the timeout on the model is meaningless and will raise an AssertionError.
Named Tables
===================