updating batch query to work with 1.2
This commit is contained in:
@@ -126,22 +126,20 @@ class LessThanOrEqualOperator(QueryOperator):
|
||||
symbol = "LTE"
|
||||
cql_symbol = '<='
|
||||
|
||||
class Consistency(object):
|
||||
ANY = 'ANY'
|
||||
ONE = 'ONE'
|
||||
QUORUM = 'QUORUM'
|
||||
LOCAL_QUORUM = 'LOCAL_QUORUM'
|
||||
EACH_QUORUM = 'EACH_QUORUM'
|
||||
ALL = 'ALL'
|
||||
class BatchType(object):
|
||||
Unlogged = 'UNLOGGED'
|
||||
Counter = 'COUNTER'
|
||||
|
||||
class BatchQuery(object):
|
||||
"""
|
||||
Handles the batching of queries
|
||||
|
||||
http://www.datastax.com/docs/1.2/cql_cli/cql/BATCH
|
||||
"""
|
||||
|
||||
def __init__(self, consistency=None, timestamp=None):
|
||||
def __init__(self, batch_type=None, timestamp=None):
|
||||
self.queries = []
|
||||
self.consistency = consistency
|
||||
self.batch_type = batch_type
|
||||
if timestamp is not None and not isinstance(timestamp, datetime):
|
||||
raise CQLEngineException('timestamp object must be an instance of datetime')
|
||||
self.timestamp = timestamp
|
||||
@@ -150,13 +148,11 @@ class BatchQuery(object):
|
||||
self.queries.append((query, params))
|
||||
|
||||
def execute(self):
|
||||
opener = 'BEGIN BATCH'
|
||||
if self.consistency:
|
||||
opener += ' USING CONSISTENCY {}'.format(self.consistency)
|
||||
opener = 'BEGIN ' + (self.batch_type + ' ' if self.batch_type else '') + ' BATCH'
|
||||
if self.timestamp:
|
||||
epoch = datetime(1970, 1, 1)
|
||||
ts = long((self.timestamp - epoch).total_seconds() * 1000)
|
||||
opener += ' TIMESTAMP {}'.format(ts)
|
||||
opener += ' USING TIMESTAMP {}'.format(ts)
|
||||
|
||||
query_list = [opener]
|
||||
parameters = {}
|
||||
|
||||
106
cqlengine/tests/query/test_batch_query.py
Normal file
106
cqlengine/tests/query/test_batch_query.py
Normal file
@@ -0,0 +1,106 @@
|
||||
from datetime import datetime
|
||||
from unittest import skip
|
||||
from uuid import uuid4
|
||||
import random
|
||||
from cqlengine import Model, columns
|
||||
from cqlengine.management import delete_table, create_table
|
||||
from cqlengine.query import BatchQuery
|
||||
from cqlengine.tests.base import BaseCassEngTestCase
|
||||
|
||||
class TestMultiKeyModel(Model):
|
||||
partition = columns.Integer(primary_key=True)
|
||||
cluster = columns.Integer(primary_key=True)
|
||||
count = columns.Integer(required=False)
|
||||
text = columns.Text(required=False)
|
||||
|
||||
class BatchQueryTests(BaseCassEngTestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(BatchQueryTests, cls).setUpClass()
|
||||
delete_table(TestMultiKeyModel)
|
||||
create_table(TestMultiKeyModel)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
super(BatchQueryTests, cls).tearDownClass()
|
||||
delete_table(TestMultiKeyModel)
|
||||
|
||||
def setUp(self):
|
||||
super(BatchQueryTests, self).setUp()
|
||||
self.pkey = 1
|
||||
for obj in TestMultiKeyModel.filter(partition=self.pkey):
|
||||
obj.delete()
|
||||
|
||||
def test_insert_success_case(self):
|
||||
|
||||
b = BatchQuery()
|
||||
inst = TestMultiKeyModel.batch(b).create(partition=self.pkey, cluster=2, count=3, text='4')
|
||||
|
||||
with self.assertRaises(TestMultiKeyModel.DoesNotExist):
|
||||
TestMultiKeyModel.get(partition=self.pkey, cluster=2)
|
||||
|
||||
b.execute()
|
||||
|
||||
TestMultiKeyModel.get(partition=self.pkey, cluster=2)
|
||||
|
||||
def test_update_success_case(self):
|
||||
|
||||
inst = TestMultiKeyModel.create(partition=self.pkey, cluster=2, count=3, text='4')
|
||||
|
||||
b = BatchQuery()
|
||||
|
||||
inst.count = 4
|
||||
inst.batch(b).save()
|
||||
|
||||
inst2 = TestMultiKeyModel.get(partition=self.pkey, cluster=2)
|
||||
assert inst2.count == 3
|
||||
|
||||
b.execute()
|
||||
|
||||
inst3 = TestMultiKeyModel.get(partition=self.pkey, cluster=2)
|
||||
assert inst3.count == 4
|
||||
|
||||
def test_delete_success_case(self):
|
||||
|
||||
inst = TestMultiKeyModel.create(partition=self.pkey, cluster=2, count=3, text='4')
|
||||
|
||||
b = BatchQuery()
|
||||
|
||||
inst.batch(b).delete()
|
||||
|
||||
TestMultiKeyModel.get(partition=self.pkey, cluster=2)
|
||||
|
||||
b.execute()
|
||||
|
||||
with self.assertRaises(TestMultiKeyModel.DoesNotExist):
|
||||
TestMultiKeyModel.get(partition=self.pkey, cluster=2)
|
||||
|
||||
def test_context_manager(self):
|
||||
|
||||
with BatchQuery() as b:
|
||||
for i in range(5):
|
||||
TestMultiKeyModel.batch(b).create(partition=self.pkey, cluster=i, count=3, text='4')
|
||||
|
||||
for i in range(5):
|
||||
with self.assertRaises(TestMultiKeyModel.DoesNotExist):
|
||||
TestMultiKeyModel.get(partition=self.pkey, cluster=i)
|
||||
|
||||
for i in range(5):
|
||||
TestMultiKeyModel.get(partition=self.pkey, cluster=i)
|
||||
|
||||
def test_bulk_delete_success_case(self):
|
||||
|
||||
for i in range(1):
|
||||
for j in range(5):
|
||||
TestMultiKeyModel.create(partition=i, cluster=j, count=i*j, text='{}:{}'.format(i,j))
|
||||
|
||||
with BatchQuery() as b:
|
||||
TestMultiKeyModel.objects.batch(b).filter(partition=0).delete()
|
||||
assert TestMultiKeyModel.filter(partition=0).count() == 5
|
||||
|
||||
assert TestMultiKeyModel.filter(partition=0).count() == 0
|
||||
#cleanup
|
||||
for m in TestMultiKeyModel.all():
|
||||
m.delete()
|
||||
|
||||
@@ -3,7 +3,7 @@ from uuid import uuid4
|
||||
import random
|
||||
from cqlengine import Model, columns
|
||||
from cqlengine.management import delete_table, create_table
|
||||
from cqlengine.query import BatchQuery, Consistency
|
||||
from cqlengine.query import BatchQuery
|
||||
from cqlengine.tests.base import BaseCassEngTestCase
|
||||
|
||||
class TestMultiKeyModel(Model):
|
||||
|
||||
Reference in New Issue
Block a user