Merge branch 'master' of github.com:cqlengine/cqlengine

Conflicts:
	docs/topics/models.rst
This commit is contained in:
Blake Eggleston
2013-10-25 11:59:12 -07:00
9 changed files with 332 additions and 28 deletions

View File

@@ -12,3 +12,13 @@ __version__ = open(__cqlengine_version_path__, 'r').readline().strip()
SizeTieredCompactionStrategy = "SizeTieredCompactionStrategy"
LeveledCompactionStrategy = "LeveledCompactionStrategy"
ANY = "ANY"
ONE = "ONE"
TWO = "TWO"
THREE = "THREE"
QUORUM = "QUORUM"
LOCAL_QUORUM = "LOCAL_QUORUM"
EACH_QUORUM = "EACH_QUORUM"
ALL = "ALL"

View File

@@ -165,10 +165,10 @@ class ConnectionPool(object):
from thrift.transport import TSocket, TTransport
thrift_socket = TSocket.TSocket(host.name, host.port)
if self._timeout is not None:
thrift_socket.setTimeout(self._timeout)
return TTransport.TFramedTransport(thrift_socket)
def _create_connection(self):
@@ -202,14 +202,17 @@ class ConnectionPool(object):
raise CQLConnectionError("Could not connect to any server in cluster")
def execute(self, query, params):
def execute(self, query, params, consistency_level=None):
if not consistency_level:
consistency_level = self._consistency
while True:
try:
con = self.get()
if not con:
raise CQLEngineException("Error calling execute without calling setup.")
cur = con.cursor()
cur.execute(query, params)
cur.execute(query, params, consistency_level=consistency_level)
columns = [i[0] for i in cur.description or []]
results = [RowResult(r) for r in cur.fetchall()]
LOG.debug('{} {}'.format(query, repr(params)))
@@ -226,9 +229,11 @@ class ConnectionPool(object):
raise ex
def execute(query, params=None):
def execute(query, params=None, consistency_level=None):
params = params or {}
return connection_pool.execute(query, params)
if consistency_level is None:
consistency_level = connection_pool._consistency
return connection_pool.execute(query, params, consistency_level)
@contextmanager
def connection_manager():

View File

@@ -7,7 +7,6 @@ from cqlengine.query import ModelQuerySet, DMLQuery, AbstractQueryableColumn
from cqlengine.query import DoesNotExist as _DoesNotExist
from cqlengine.query import MultipleObjectsReturned as _MultipleObjectsReturned
class ModelDefinitionException(ModelException): pass
@@ -70,6 +69,52 @@ class QuerySetDescriptor(object):
"""
raise NotImplementedError
class TTLDescriptor(object):
"""
returns a query set descriptor
"""
def __get__(self, instance, model):
if instance:
# instance method
def ttl_setter(ts):
instance._ttl = ts
return instance
return ttl_setter
qs = model.__queryset__(model)
def ttl_setter(ts):
qs._ttl = ts
return qs
return ttl_setter
def __call__(self, *args, **kwargs):
raise NotImplementedError
class ConsistencyDescriptor(object):
"""
returns a query set descriptor if called on Class, instance if it was an instance call
"""
def __get__(self, instance, model):
if instance:
def consistency_setter(consistency):
instance.__consistency__ = consistency
return instance
return consistency_setter
qs = model.__queryset__(model)
def consistency_setter(consistency):
qs._consistency = consistency
return qs
return consistency_setter
def __call__(self, *args, **kwargs):
raise NotImplementedError
class ColumnQueryEvaluator(AbstractQueryableColumn):
"""
@@ -148,6 +193,8 @@ class BaseModel(object):
class MultipleObjectsReturned(_MultipleObjectsReturned): pass
objects = QuerySetDescriptor()
ttl = TTLDescriptor()
consistency = ConsistencyDescriptor()
#table names will be generated automatically from it's model and package name
#however, you can also define them manually here
@@ -179,10 +226,14 @@ class BaseModel(object):
__queryset__ = ModelQuerySet
__dmlquery__ = DMLQuery
__ttl__ = None
__consistency__ = None # can be set per query
__read_repair_chance__ = 0.1
def __init__(self, **values):
self._values = {}
self._ttl = None
for name, column in self._columns.items():
value = values.get(name, None)
@@ -365,7 +416,10 @@ class BaseModel(object):
is_new = self.pk is None
self.validate()
self.__dmlquery__(self.__class__, self, batch=self._batch).save()
self.__dmlquery__(self.__class__, self,
batch=self._batch,
ttl=self._ttl,
consistency=self.__consistency__).save()
#reset the value managers
for v in self._values.values():
@@ -396,7 +450,10 @@ class BaseModel(object):
setattr(self, self._polymorphic_column_name, self.__polymorphic_key__)
self.validate()
self.__dmlquery__(self.__class__, self, batch=self._batch).update()
self.__dmlquery__(self.__class__, self,
batch=self._batch,
ttl=self._ttl,
consistency=self.__consistency__).update()
#reset the value managers
for v in self._values.values():

View File

@@ -208,17 +208,22 @@ class BatchQuery(object):
http://www.datastax.com/docs/1.2/cql_cli/cql/BATCH
"""
_consistency = None
def __init__(self, batch_type=None, timestamp=None):
def __init__(self, batch_type=None, timestamp=None, consistency=None):
self.queries = []
self.batch_type = batch_type
if timestamp is not None and not isinstance(timestamp, datetime):
raise CQLEngineException('timestamp object must be an instance of datetime')
self.timestamp = timestamp
self._consistency = consistency
def add_query(self, query, params):
self.queries.append((query, params))
def consistency(self, consistency):
self._consistency = consistency
def execute(self):
if len(self.queries) == 0:
# Empty batch is a no-op
@@ -238,7 +243,7 @@ class BatchQuery(object):
query_list.append('APPLY BATCH;')
execute('\n'.join(query_list), parameters)
execute('\n'.join(query_list), parameters, self._consistency)
self.queries = []
@@ -283,6 +288,8 @@ class AbstractQuerySet(object):
self._result_idx = None
self._batch = None
self._ttl = None
self._consistency = None
@property
def column_family_name(self):
@@ -300,7 +307,7 @@ class AbstractQuerySet(object):
def __deepcopy__(self, memo):
clone = self.__class__(self.model)
for k,v in self.__dict__.items():
if k in ['_con', '_cur', '_result_cache', '_result_idx']:
if k in ['_con', '_cur', '_result_cache', '_result_idx']: # don't clone these
clone.__dict__[k] = None
elif k == '_batch':
# we need to keep the same batch instance across
@@ -361,7 +368,7 @@ class AbstractQuerySet(object):
if self._batch:
raise CQLEngineException("Only inserts, updates, and deletes are available in batch mode")
if self._result_cache is None:
columns, self._result_cache = execute(self._select_query(), self._where_values())
columns, self._result_cache = execute(self._select_query(), self._where_values(), self._consistency)
self._construct_result = self._get_result_constructor(columns)
def _fill_result_cache_to_idx(self, idx):
@@ -614,7 +621,7 @@ class AbstractQuerySet(object):
return self._only_or_defer('defer', fields)
def create(self, **kwargs):
return self.model(**kwargs).batch(self._batch).save()
return self.model(**kwargs).batch(self._batch).ttl(self._ttl).consistency(self._consistency).save()
#----delete---
def delete(self, columns=[]):
@@ -740,6 +747,11 @@ class ModelQuerySet(AbstractQuerySet):
return column.db_field_name, order_type
def _get_ttl_statement(self):
if not self._ttl:
return ""
return "USING TTL {}".format(self._ttl)
def values_list(self, *fields, **kwargs):
""" Instructs the query set to return tuples, not model instance """
flat = kwargs.pop('flat', False)
@@ -753,6 +765,16 @@ class ModelQuerySet(AbstractQuerySet):
clone._flat_values_list = flat
return clone
def consistency(self, consistency):
clone = copy.deepcopy(self)
clone._consistency = consistency
return clone
def ttl(self, ttl):
clone = copy.deepcopy(self)
clone._ttl = ttl
return clone
def update(self, **values):
""" Updates the rows in this queryset """
if not values:
@@ -785,13 +807,15 @@ class ModelQuerySet(AbstractQuerySet):
ctx[field_id] = val
if set_statements:
qs = "UPDATE {} SET {} WHERE {}".format(
ttl_stmt = "USING TTL {}".format(self._ttl) if self._ttl else ""
qs = "UPDATE {} SET {} WHERE {} {}".format(
self.column_family_name,
', '.join(set_statements),
self._where_clause()
self._where_clause(),
ttl_stmt
)
ctx.update(self._where_values())
execute(qs, ctx)
execute(qs, ctx, self._consistency)
if nulled_columns:
qs = "DELETE {} FROM {} WHERE {}".format(
@@ -799,7 +823,7 @@ class ModelQuerySet(AbstractQuerySet):
self.column_family_name,
self._where_clause()
)
execute(qs, self._where_values())
execute(qs, self._where_values(), self._consistency)
class DMLQuery(object):
@@ -810,13 +834,16 @@ class DMLQuery(object):
unlike the read query object, this is mutable
"""
_ttl = None
_consistency = None
def __init__(self, model, instance=None, batch=None):
def __init__(self, model, instance=None, batch=None, ttl=None, consistency=None):
self.model = model
self.column_family_name = self.model.column_family_name()
self.instance = instance
self._batch = batch
pass
self._ttl = ttl
self._consistency = consistency
def batch(self, batch_obj):
if batch_obj is not None and not isinstance(batch_obj, BatchQuery):
@@ -913,6 +940,9 @@ class DMLQuery(object):
qs += [' AND '.join(where_statements)]
if self._ttl:
qs += ["USING TTL {}".format(self._ttl)]
# clear the qs if there are no set statements and this is not a counter model
if not set_statements and not self.instance._has_counter:
qs = []
@@ -924,7 +954,7 @@ class DMLQuery(object):
if self._batch:
self._batch.add_query(qs, query_values)
else:
execute(qs, query_values)
execute(qs, query_values, consistency_level=self._consistency)
self._delete_null_columns()
@@ -971,15 +1001,20 @@ class DMLQuery(object):
qs += ['VALUES']
qs += ["({})".format(', '.join([':'+field_ids[f] for f in field_names]))]
if self._ttl:
qs += ["USING TTL {}".format(self._ttl)]
qs += []
qs = ' '.join(qs)
# skip query execution if it's empty
# caused by pointless update queries
if qs:
if self._batch:
self._batch.add_query(qs, query_values)
else:
execute(qs, query_values)
execute(qs, query_values, self._consistency)
# delete any nulled columns
self._delete_null_columns()
@@ -1003,6 +1038,6 @@ class DMLQuery(object):
if self._batch:
self._batch.add_query(qs, field_values)
else:
execute(qs, field_values)
execute(qs, field_values, self._consistency)

View File

@@ -6,7 +6,7 @@ from cqlengine.tests.base import BaseCassEngTestCase
from cqlengine.exceptions import ModelException
from cqlengine import functions
from cqlengine.management import create_table
from cqlengine.management import create_table, drop_table
from cqlengine.management import delete_table
from cqlengine.models import Model
from cqlengine import columns
@@ -219,9 +219,9 @@ class BaseQuerySetUsage(BaseCassEngTestCase):
@classmethod
def tearDownClass(cls):
super(BaseQuerySetUsage, cls).tearDownClass()
delete_table(TestModel)
delete_table(IndexedTestModel)
delete_table(TestMultiClusteringModel)
drop_table(TestModel)
drop_table(IndexedTestModel)
drop_table(TestMultiClusteringModel)
class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):

View File

@@ -0,0 +1,78 @@
from cqlengine.management import sync_table, drop_table
from cqlengine.tests.base import BaseCassEngTestCase
from cqlengine.models import Model
from uuid import uuid4
from cqlengine import columns
import mock
from cqlengine.connection import ConnectionPool
from cqlengine import ALL, BatchQuery
class TestConsistencyModel(Model):
id = columns.UUID(primary_key=True, default=lambda:uuid4())
count = columns.Integer()
text = columns.Text(required=False)
class BaseConsistencyTest(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(BaseConsistencyTest, cls).setUpClass()
sync_table(TestConsistencyModel)
@classmethod
def tearDownClass(cls):
super(BaseConsistencyTest, cls).tearDownClass()
drop_table(TestConsistencyModel)
class TestConsistency(BaseConsistencyTest):
def test_create_uses_consistency(self):
qs = TestConsistencyModel.consistency(ALL)
with mock.patch.object(ConnectionPool, 'execute') as m:
qs.create(text="i am not fault tolerant this way")
args = m.call_args
self.assertEqual(ALL, args[0][2])
def test_queryset_is_returned_on_create(self):
qs = TestConsistencyModel.consistency(ALL)
self.assertTrue(isinstance(qs, TestConsistencyModel.__queryset__), type(qs))
def test_update_uses_consistency(self):
t = TestConsistencyModel.create(text="bacon and eggs")
t.text = "ham sandwich"
with mock.patch.object(ConnectionPool, 'execute') as m:
t.consistency(ALL).save()
args = m.call_args
self.assertEqual(ALL, args[0][2])
def test_batch_consistency(self):
with mock.patch.object(ConnectionPool, 'execute') as m:
with BatchQuery(consistency=ALL) as b:
TestConsistencyModel.batch(b).create(text="monkey")
args = m.call_args
self.assertEqual(ALL, args[0][2])
with mock.patch.object(ConnectionPool, 'execute') as m:
with BatchQuery() as b:
TestConsistencyModel.batch(b).create(text="monkey")
args = m.call_args
self.assertNotEqual(ALL, args[0][2])
def test_blind_update(self):
t = TestConsistencyModel.create(text="bacon and eggs")
t.text = "ham sandwich"
uid = t.id
with mock.patch.object(ConnectionPool, 'execute') as m:
TestConsistencyModel.objects(id=uid).consistency(ALL).update(text="grilled cheese")
args = m.call_args
self.assertEqual(ALL, args[0][2])

104
cqlengine/tests/test_ttl.py Normal file
View File

@@ -0,0 +1,104 @@
from cqlengine.management import sync_table, drop_table
from cqlengine.tests.base import BaseCassEngTestCase
from cqlengine.models import Model
from uuid import uuid4
from cqlengine import columns
import mock
from cqlengine.connection import ConnectionPool
class TestTTLModel(Model):
id = columns.UUID(primary_key=True, default=lambda:uuid4())
count = columns.Integer()
text = columns.Text(required=False)
class BaseTTLTest(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(BaseTTLTest, cls).setUpClass()
sync_table(TestTTLModel)
@classmethod
def tearDownClass(cls):
super(BaseTTLTest, cls).tearDownClass()
drop_table(TestTTLModel)
class TTLQueryTests(BaseTTLTest):
def test_update_queryset_ttl_success_case(self):
""" tests that ttls on querysets work as expected """
def test_select_ttl_failure(self):
""" tests that ttls on select queries raise an exception """
class TTLModelTests(BaseTTLTest):
def test_ttl_included_on_create(self):
""" tests that ttls on models work as expected """
with mock.patch.object(ConnectionPool, 'execute') as m:
TestTTLModel.ttl(60).create(text="hello blake")
query = m.call_args[0][0]
self.assertIn("USING TTL", query)
def test_queryset_is_returned_on_class(self):
"""
ensures we get a queryset descriptor back
"""
qs = TestTTLModel.ttl(60)
self.assertTrue(isinstance(qs, TestTTLModel.__queryset__), type(qs))
class TTLInstanceUpdateTest(BaseTTLTest):
def test_update_includes_ttl(self):
model = TestTTLModel.create(text="goodbye blake")
with mock.patch.object(ConnectionPool, 'execute') as m:
model.ttl(60).update(text="goodbye forever")
query = m.call_args[0][0]
self.assertIn("USING TTL", query)
class TTLInstanceTest(BaseTTLTest):
def test_instance_is_returned(self):
"""
ensures that we properly handle the instance.ttl(60).save() scenario
:return:
"""
o = TestTTLModel.create(text="whatever")
o.text = "new stuff"
o.ttl(60)
self.assertEqual(60, o._ttl)
def test_ttl_is_include_with_query_on_update(self):
o = TestTTLModel.create(text="whatever")
o.text = "new stuff"
o.ttl(60)
with mock.patch.object(ConnectionPool, 'execute') as m:
o.save()
query = m.call_args[0][0]
self.assertIn("USING TTL", query)
class TTLBlindUpdateTest(BaseTTLTest):
def test_ttl_included_with_blind_update(self):
o = TestTTLModel.create(text="whatever")
tid = o.id
with mock.patch.object(ConnectionPool, 'execute') as m:
TestTTLModel.objects(id=tid).ttl(60).update(text="bacon")
query = m.call_args[0][0]
self.assertIn("USING TTL", query)

View File

@@ -133,11 +133,18 @@ Model Methods
#saves it to Cassandra
person.save()
.. method:: delete()
Deletes the object from the database.
.. method:: batch(batch_object)
Sets the batch object to run instance updates and inserts queries with.
.. method:: ttl(ttl_in_sec)
Sets the ttl values to run instance updates and inserts queries with.
-- method:: update(**values)

View File

@@ -354,6 +354,14 @@ QuerySet method reference
Enables the (usually) unwise practive of querying on a clustering key without also defining a partition key
.. method:: batch(batch_object)
Sets the batch object to run the query on. Note that running a select query with a batch object will raise an exception
.. method:: ttl(ttl_in_seconds)
Sets the ttl to run the query query with. Note that running a select query with a ttl value will raise an exception
-- method:: update(**values)
Performs an update on the row selected by the queryset. Include values to update in the