diff --git a/cqlengine/connection.py b/cqlengine/connection.py index 444400d5..34f8da3a 100644 --- a/cqlengine/connection.py +++ b/cqlengine/connection.py @@ -1,26 +1,21 @@ #http://pypi.python.org/pypi/cql/1.0.4 #http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2 / #http://cassandra.apache.org/doc/cql/CQL.html - +from __future__ import absolute_import from collections import namedtuple -from cassandra.cluster import Cluster, NoHostAvailable -from cassandra.query import SimpleStatement, Statement -import six - -try: - import Queue as queue -except ImportError: - # python 3 - import queue - -import logging - +from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable +from cassandra.query import SimpleStatement, Statement, dict_factory +from cqlengine.statements import BaseCQLStatement from cqlengine.exceptions import CQLEngineException, UndefinedKeyspaceException from cassandra import ConsistencyLevel -from cqlengine.statements import BaseCQLStatement -from cassandra.query import dict_factory + +import six +import logging + LOG = logging.getLogger('cqlengine.cql') +NOT_SET = _NOT_SET # required for passing timeout to Session.execute + class CQLConnectionError(CQLEngineException): pass @@ -85,7 +80,8 @@ def setup( raise session.row_factory = dict_factory -def execute(query, params=None, consistency_level=None): + +def execute(query, params=None, consistency_level=None, timeout=NOT_SET): handle_lazy_connect() @@ -109,7 +105,7 @@ def execute(query, params=None, consistency_level=None): LOG.info(query.query_string) params = params or {} - result = session.execute(query, params) + result = session.execute(query, params, timeout=timeout) return result diff --git a/cqlengine/models.py b/cqlengine/models.py index 5003b3c0..d88836b1 100644 --- a/cqlengine/models.py +++ b/cqlengine/models.py @@ -4,7 +4,7 @@ import warnings from cqlengine import columns from cqlengine.exceptions import ModelException, CQLEngineException, ValidationError -from cqlengine.query import ModelQuerySet, DMLQuery, AbstractQueryableColumn +from cqlengine.query import ModelQuerySet, DMLQuery, AbstractQueryableColumn, NOT_SET from cqlengine.query import DoesNotExist as _DoesNotExist from cqlengine.query import MultipleObjectsReturned as _MultipleObjectsReturned @@ -342,6 +342,7 @@ class BaseModel(object): # that update should be used when persisting changes self._is_persisted = False self._batch = None + self._timeout = NOT_SET def __repr__(self): @@ -564,6 +565,11 @@ class BaseModel(object): def get(cls, *args, **kwargs): return cls.objects.get(*args, **kwargs) + def timeout(self, timeout): + assert self._batch is None, 'Setting both timeout and batch is not supported' + self._timeout = timeout + return self + def save(self): # handle polymorphic models if self._is_polymorphic: @@ -580,7 +586,8 @@ class BaseModel(object): timestamp=self._timestamp, consistency=self.__consistency__, if_not_exists=self._if_not_exists, - transaction=self._transaction).save() + transaction=self._transaction, + timeout=self._timeout).save() #reset the value managers for v in self._values.values(): @@ -619,7 +626,8 @@ class BaseModel(object): ttl=self._ttl, timestamp=self._timestamp, consistency=self.__consistency__, - transaction=self._transaction).update() + transaction=self._transaction, + timeout=self._timeout).update() #reset the value managers for v in self._values.values(): @@ -633,7 +641,11 @@ class BaseModel(object): def delete(self): """ Deletes this instance """ - self.__dmlquery__(self.__class__, self, batch=self._batch, timestamp=self._timestamp, consistency=self.__consistency__).delete() + self.__dmlquery__(self.__class__, self, + batch=self._batch, + timestamp=self._timestamp, + consistency=self.__consistency__, + timeout=self._timeout).delete() def get_changed_columns(self): """ returns a list of the columns that have been updated since instantiation or save """ @@ -644,6 +656,7 @@ class BaseModel(object): return cls.objects.batch(batch) def _inst_batch(self, batch): + assert self._timeout is NOT_SET, 'Setting both timeout and batch is not supported' self._batch = batch return self diff --git a/cqlengine/query.py b/cqlengine/query.py index b8c563e3..5f4c2cb5 100644 --- a/cqlengine/query.py +++ b/cqlengine/query.py @@ -1,10 +1,12 @@ +from __future__ import absolute_import import copy import time from datetime import datetime, timedelta from cqlengine import BaseContainerColumn, Map, columns from cqlengine.columns import Counter, List, Set -from cqlengine.connection import execute +from .connection import execute, NOT_SET + from cqlengine.exceptions import CQLEngineException, ValidationError, LWTException from cqlengine.functions import Token, BaseQueryFunction, QueryValue, UnicodeMixin @@ -87,7 +89,8 @@ class BatchQuery(object): """ _consistency = None - def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on_exception=False): + def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on_exception=False, + timeout=NOT_SET): """ :param batch_type: (optional) One of batch type values available through BatchType enum :type batch_type: str or None @@ -101,10 +104,9 @@ class BatchQuery(object): encountering an error within the context. By default, any exception raised from within the context scope will cause the batched queries not to be executed. :type execute_on_exception: bool - :param callbacks: A list of functions to be executed after the batch executes. Note, that if the batch - does not execute, the callbacks are not executed. This, thus, effectively is a list of "on success" - callback handlers. If defined, must be a collection of callables. - :type callbacks: list or set or tuple + :param timeout: (optional) Timeout for the entire batch (in seconds), if not specified fallback + to default session timeout + :type timeout: float or None """ self.queries = [] self.batch_type = batch_type @@ -113,6 +115,7 @@ class BatchQuery(object): self.timestamp = timestamp self._consistency = consistency self._execute_on_exception = execute_on_exception + self._timeout = timeout self._callbacks = [] def add_query(self, query): @@ -181,7 +184,7 @@ class BatchQuery(object): query_list.append('APPLY BATCH;') - tmp = execute('\n'.join(query_list), parameters, self._consistency) + tmp = execute('\n'.join(query_list), parameters, self._consistency, self._timeout) check_applied(tmp) self.queries = [] @@ -235,6 +238,7 @@ class AbstractQuerySet(object): self._consistency = None self._timestamp = None self._if_not_exists = False + self._timeout = NOT_SET @property def column_family_name(self): @@ -244,7 +248,7 @@ class AbstractQuerySet(object): if self._batch: return self._batch.add_query(q) else: - result = execute(q, consistency_level=self._consistency) + result = execute(q, consistency_level=self._consistency, timeout=self._timeout) if self._transaction: check_applied(result) return result @@ -269,6 +273,8 @@ class AbstractQuerySet(object): # fly off into other batch instances which are never # executed, thx @dokai clone.__dict__[k] = self._batch + elif k == '_timeout': + clone.__dict__[k] = self._timeout else: clone.__dict__[k] = copy.deepcopy(v, memo) @@ -656,6 +662,15 @@ class AbstractQuerySet(object): def __ne__(self, q): return not (self != q) + def timeout(self, timeout): + """ + :param timeout: Timeout for the query (in seconds) + :type timeout: float or None + """ + clone = copy.deepcopy(self) + clone._timeout = timeout + return clone + class ResultObject(dict): """ @@ -835,7 +850,8 @@ class DMLQuery(object): _timestamp = None _if_not_exists = False - def __init__(self, model, instance=None, batch=None, ttl=None, consistency=None, timestamp=None, if_not_exists=False, transaction=None): + def __init__(self, model, instance=None, batch=None, ttl=None, consistency=None, timestamp=None, + if_not_exists=False, transaction=None, timeout=NOT_SET): self.model = model self.column_family_name = self.model.column_family_name() self.instance = instance @@ -845,12 +861,13 @@ class DMLQuery(object): self._timestamp = timestamp self._if_not_exists = if_not_exists self._transaction = transaction + self._timeout = timeout def _execute(self, q): if self._batch: return self._batch.add_query(q) else: - tmp = execute(q, consistency_level=self._consistency) + tmp = execute(q, consistency_level=self._consistency, timeout=self._timeout) if self._if_not_exists or self._transaction: check_applied(tmp) return tmp @@ -994,5 +1011,3 @@ class DMLQuery(object): col.to_database(getattr(self.instance, name)) )) self._execute(ds) - - diff --git a/cqlengine/tests/query/test_batch_query.py b/cqlengine/tests/query/test_batch_query.py index 9f3ecb5d..74d2724a 100644 --- a/cqlengine/tests/query/test_batch_query.py +++ b/cqlengine/tests/query/test_batch_query.py @@ -3,9 +3,13 @@ from unittest import skip from uuid import uuid4 import random from cqlengine import Model, columns +from cqlengine.connection import NOT_SET from cqlengine.management import drop_table, sync_table from cqlengine.query import BatchQuery, DMLQuery from cqlengine.tests.base import BaseCassEngTestCase +from cassandra.cluster import Session +import mock + class TestMultiKeyModel(Model): __keyspace__ = 'test' @@ -169,3 +173,15 @@ class BatchQueryTests(BaseCassEngTestCase): # should be 0 because the batch should not execute self.assertEqual(0, len(obj)) + + def test_batch_execute_timeout(self): + with mock.patch.object(Session, 'execute', autospec=True) as mock_execute: + with BatchQuery(timeout=1) as b: + BatchQueryLogModel.batch(b).create(k=2, v=2) + mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=1) + + def test_batch_execute_no_timeout(self): + with mock.patch.object(Session, 'execute', autospec=True) as mock_execute: + with BatchQuery() as b: + BatchQueryLogModel.batch(b).create(k=2, v=2) + mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=NOT_SET) diff --git a/cqlengine/tests/query/test_queryset.py b/cqlengine/tests/query/test_queryset.py index 6482cc7f..00b1be49 100644 --- a/cqlengine/tests/query/test_queryset.py +++ b/cqlengine/tests/query/test_queryset.py @@ -1,15 +1,16 @@ +from __future__ import absolute_import from datetime import datetime import time from unittest import TestCase, skipUnless from uuid import uuid1, uuid4 import uuid +from cassandra.cluster import Session from cqlengine.tests.base import BaseCassEngTestCase +from cqlengine.connection import NOT_SET import mock -from cqlengine.exceptions import ModelException from cqlengine import functions -from cqlengine.management import sync_table, drop_table, sync_table -from cqlengine.management import drop_table +from cqlengine.management import sync_table, drop_table from cqlengine.models import Model from cqlengine import columns from cqlengine import query @@ -707,3 +708,51 @@ def test_paged_result_handling(): assert len(results) == 2 +class ModelQuerySetTimeoutTestCase(BaseQuerySetUsage): + def test_default_timeout(self): + with mock.patch.object(Session, 'execute', autospec=True) as mock_execute: + list(TestModel.objects()) + mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=NOT_SET) + + def test_float_timeout(self): + with mock.patch.object(Session, 'execute', autospec=True) as mock_execute: + list(TestModel.objects().timeout(0.5)) + mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=0.5) + + def test_none_timeout(self): + with mock.patch.object(Session, 'execute', autospec=True) as mock_execute: + list(TestModel.objects().timeout(None)) + mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=None) + + +class DMLQueryTimeoutTestCase(BaseQuerySetUsage): + def setUp(self): + self.model = TestModel(test_id=1, attempt_id=1, description='timeout test') + super(DMLQueryTimeoutTestCase, self).setUp() + + def test_default_timeout(self): + with mock.patch.object(Session, 'execute', autospec=True) as mock_execute: + self.model.save() + mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=NOT_SET) + + def test_float_timeout(self): + with mock.patch.object(Session, 'execute', autospec=True) as mock_execute: + self.model.timeout(0.5).save() + mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=0.5) + + def test_none_timeout(self): + with mock.patch.object(Session, 'execute', autospec=True) as mock_execute: + self.model.timeout(None).save() + mock_execute.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY, timeout=None) + + def test_timeout_then_batch(self): + b = query.BatchQuery() + m = self.model.timeout(None) + with self.assertRaises(AssertionError): + m.batch(b) + + def test_batch_then_timeout(self): + b = query.BatchQuery() + m = self.model.batch(b) + with self.assertRaises(AssertionError): + m.timeout(0.5) diff --git a/docs/topics/queryset.rst b/docs/topics/queryset.rst index 2466dece..650f7d29 100644 --- a/docs/topics/queryset.rst +++ b/docs/topics/queryset.rst @@ -592,6 +592,53 @@ QuerySet method reference Row.objects(row_id=5).update(map_column__update={1: 2, 3: 4}) +Per Query Timeouts +=================== + +By default all queries are executed with the timeout defined in `~cqlengine.connection.setup()` +The examples below show how to specify a per-query timeout. +A timeout is specified in seconds and can be an int, float or None. +None means no timeout. + + + .. code-block:: python + + class Row(Model): + id = columns.Integer(primary_key=True) + name = columns.Text() + + + Fetch all objects with a timeout of 5 seconds + .. code-block:: python + + Row.objects().timeout(5).all() + + Create a single row with a 50ms timeout + .. code-block:: python + + Row(id=1, name='Jon').timeout(0.05).create() + + Delete a single row with no timeout + .. code-block:: python + + Row(id=1).timeout(None).delete() + + Update a single row with no timeout + .. code-block:: python + + Row(id=1).timeout(None).update(name='Blake') + + Batch query timeouts + .. code-block:: python + + with BatchQuery(timeout=10) as b: + Row(id=1, name='Jon').create() + + + NOTE: You cannot set both timeout and batch at the same time, batch will use the timeout defined in it's constructor. + Setting the timeout on the model is meaningless and will raise an AssertionError. + + Named Tables ===================