diff --git a/cqlengine/query.py b/cqlengine/query.py index c148f8e4..2355edea 100644 --- a/cqlengine/query.py +++ b/cqlengine/query.py @@ -126,22 +126,20 @@ class LessThanOrEqualOperator(QueryOperator): symbol = "LTE" cql_symbol = '<=' -class Consistency(object): - ANY = 'ANY' - ONE = 'ONE' - QUORUM = 'QUORUM' - LOCAL_QUORUM = 'LOCAL_QUORUM' - EACH_QUORUM = 'EACH_QUORUM' - ALL = 'ALL' +class BatchType(object): + Unlogged = 'UNLOGGED' + Counter = 'COUNTER' class BatchQuery(object): """ Handles the batching of queries + + http://www.datastax.com/docs/1.2/cql_cli/cql/BATCH """ - def __init__(self, consistency=None, timestamp=None): + def __init__(self, batch_type=None, timestamp=None): self.queries = [] - self.consistency = consistency + self.batch_type = batch_type if timestamp is not None and not isinstance(timestamp, datetime): raise CQLEngineException('timestamp object must be an instance of datetime') self.timestamp = timestamp @@ -150,13 +148,11 @@ class BatchQuery(object): self.queries.append((query, params)) def execute(self): - opener = 'BEGIN BATCH' - if self.consistency: - opener += ' USING CONSISTENCY {}'.format(self.consistency) + opener = 'BEGIN ' + (self.batch_type + ' ' if self.batch_type else '') + ' BATCH' if self.timestamp: epoch = datetime(1970, 1, 1) ts = long((self.timestamp - epoch).total_seconds() * 1000) - opener += ' TIMESTAMP {}'.format(ts) + opener += ' USING TIMESTAMP {}'.format(ts) query_list = [opener] parameters = {} diff --git a/cqlengine/tests/query/test_batch_query.py b/cqlengine/tests/query/test_batch_query.py new file mode 100644 index 00000000..5b31826a --- /dev/null +++ b/cqlengine/tests/query/test_batch_query.py @@ -0,0 +1,106 @@ +from datetime import datetime +from unittest import skip +from uuid import uuid4 +import random +from cqlengine import Model, columns +from cqlengine.management import delete_table, create_table +from cqlengine.query import BatchQuery +from cqlengine.tests.base import BaseCassEngTestCase + +class TestMultiKeyModel(Model): + partition = columns.Integer(primary_key=True) + cluster = columns.Integer(primary_key=True) + count = columns.Integer(required=False) + text = columns.Text(required=False) + +class BatchQueryTests(BaseCassEngTestCase): + + @classmethod + def setUpClass(cls): + super(BatchQueryTests, cls).setUpClass() + delete_table(TestMultiKeyModel) + create_table(TestMultiKeyModel) + + @classmethod + def tearDownClass(cls): + super(BatchQueryTests, cls).tearDownClass() + delete_table(TestMultiKeyModel) + + def setUp(self): + super(BatchQueryTests, self).setUp() + self.pkey = 1 + for obj in TestMultiKeyModel.filter(partition=self.pkey): + obj.delete() + + def test_insert_success_case(self): + + b = BatchQuery() + inst = TestMultiKeyModel.batch(b).create(partition=self.pkey, cluster=2, count=3, text='4') + + with self.assertRaises(TestMultiKeyModel.DoesNotExist): + TestMultiKeyModel.get(partition=self.pkey, cluster=2) + + b.execute() + + TestMultiKeyModel.get(partition=self.pkey, cluster=2) + + def test_update_success_case(self): + + inst = TestMultiKeyModel.create(partition=self.pkey, cluster=2, count=3, text='4') + + b = BatchQuery() + + inst.count = 4 + inst.batch(b).save() + + inst2 = TestMultiKeyModel.get(partition=self.pkey, cluster=2) + assert inst2.count == 3 + + b.execute() + + inst3 = TestMultiKeyModel.get(partition=self.pkey, cluster=2) + assert inst3.count == 4 + + def test_delete_success_case(self): + + inst = TestMultiKeyModel.create(partition=self.pkey, cluster=2, count=3, text='4') + + b = BatchQuery() + + inst.batch(b).delete() + + TestMultiKeyModel.get(partition=self.pkey, cluster=2) + + b.execute() + + with self.assertRaises(TestMultiKeyModel.DoesNotExist): + TestMultiKeyModel.get(partition=self.pkey, cluster=2) + + def test_context_manager(self): + + with BatchQuery() as b: + for i in range(5): + TestMultiKeyModel.batch(b).create(partition=self.pkey, cluster=i, count=3, text='4') + + for i in range(5): + with self.assertRaises(TestMultiKeyModel.DoesNotExist): + TestMultiKeyModel.get(partition=self.pkey, cluster=i) + + for i in range(5): + TestMultiKeyModel.get(partition=self.pkey, cluster=i) + + def test_bulk_delete_success_case(self): + + for i in range(1): + for j in range(5): + TestMultiKeyModel.create(partition=i, cluster=j, count=i*j, text='{}:{}'.format(i,j)) + + with BatchQuery() as b: + TestMultiKeyModel.objects.batch(b).filter(partition=0).delete() + assert TestMultiKeyModel.filter(partition=0).count() == 5 + + assert TestMultiKeyModel.filter(partition=0).count() == 0 + #cleanup + for m in TestMultiKeyModel.all(): + m.delete() + diff --git a/cqlengine/tests/test_batch_query.py b/cqlengine/tests/test_batch_query.py index fb4aab4a..6ba60877 100644 --- a/cqlengine/tests/test_batch_query.py +++ b/cqlengine/tests/test_batch_query.py @@ -3,7 +3,7 @@ from uuid import uuid4 import random from cqlengine import Model, columns from cqlengine.management import delete_table, create_table -from cqlengine.query import BatchQuery, Consistency +from cqlengine.query import BatchQuery from cqlengine.tests.base import BaseCassEngTestCase class TestMultiKeyModel(Model):