Adds a custom attribute, `existing`, to the LWTException instance which allows the application layer to inspect the values read from Cassandra which conflicted with the attempted conditional write. The documentation already referred to the `existing` attribute but it was not implemented.
137 lines
4.6 KiB
Python
137 lines
4.6 KiB
Python
# Copyright 2013-2016 DataStax, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
try:
|
|
import unittest2 as unittest
|
|
except ImportError:
|
|
import unittest # noqa
|
|
|
|
import mock
|
|
import six
|
|
from uuid import uuid4
|
|
|
|
from cassandra.cqlengine import columns
|
|
from cassandra.cqlengine.management import sync_table, drop_table
|
|
from cassandra.cqlengine.models import Model
|
|
from cassandra.cqlengine.query import BatchQuery, LWTException
|
|
from cassandra.cqlengine.statements import TransactionClause
|
|
|
|
from tests.integration.cqlengine.base import BaseCassEngTestCase
|
|
from tests.integration import CASSANDRA_VERSION
|
|
|
|
|
|
class TestTransactionModel(Model):
|
|
id = columns.UUID(primary_key=True, default=uuid4)
|
|
count = columns.Integer()
|
|
text = columns.Text(required=False)
|
|
|
|
|
|
@unittest.skipUnless(CASSANDRA_VERSION >= '2.0.0', "transactions only supported on cassandra 2.0 or higher")
|
|
class TestTransaction(BaseCassEngTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestTransaction, cls).setUpClass()
|
|
sync_table(TestTransactionModel)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestTransaction, cls).tearDownClass()
|
|
drop_table(TestTransactionModel)
|
|
|
|
def test_update_using_transaction(self):
|
|
t = TestTransactionModel.create(text='blah blah')
|
|
t.text = 'new blah'
|
|
with mock.patch.object(self.session, 'execute') as m:
|
|
t.iff(text='blah blah').save()
|
|
|
|
args = m.call_args
|
|
self.assertIn('IF "text" = %(0)s', args[0][0].query_string)
|
|
|
|
def test_update_transaction_success(self):
|
|
t = TestTransactionModel.create(text='blah blah', count=5)
|
|
id = t.id
|
|
t.text = 'new blah'
|
|
t.iff(text='blah blah').save()
|
|
|
|
updated = TestTransactionModel.objects(id=id).first()
|
|
self.assertEqual(updated.count, 5)
|
|
self.assertEqual(updated.text, 'new blah')
|
|
|
|
def test_update_failure(self):
|
|
t = TestTransactionModel.create(text='blah blah')
|
|
t.text = 'new blah'
|
|
t = t.iff(text='something wrong')
|
|
|
|
with self.assertRaises(LWTException) as assertion:
|
|
t.save()
|
|
|
|
self.assertEqual(assertion.exception.existing, {
|
|
'text': 'blah blah',
|
|
'[applied]': False,
|
|
})
|
|
|
|
def test_blind_update(self):
|
|
t = TestTransactionModel.create(text='blah blah')
|
|
t.text = 'something else'
|
|
uid = t.id
|
|
|
|
with mock.patch.object(self.session, 'execute') as m:
|
|
TestTransactionModel.objects(id=uid).iff(text='blah blah').update(text='oh hey der')
|
|
|
|
args = m.call_args
|
|
self.assertIn('IF "text" = %(1)s', args[0][0].query_string)
|
|
|
|
def test_blind_update_fail(self):
|
|
t = TestTransactionModel.create(text='blah blah')
|
|
t.text = 'something else'
|
|
uid = t.id
|
|
qs = TestTransactionModel.objects(id=uid).iff(text='Not dis!')
|
|
with self.assertRaises(LWTException) as assertion:
|
|
qs.update(text='this will never work')
|
|
|
|
self.assertEqual(assertion.exception.existing, {
|
|
'text': 'blah blah',
|
|
'[applied]': False,
|
|
})
|
|
|
|
def test_transaction_clause(self):
|
|
tc = TransactionClause('some_value', 23)
|
|
tc.set_context_id(3)
|
|
|
|
self.assertEqual('"some_value" = %(3)s', six.text_type(tc))
|
|
self.assertEqual('"some_value" = %(3)s', str(tc))
|
|
|
|
def test_batch_update_transaction(self):
|
|
t = TestTransactionModel.create(text='something', count=5)
|
|
id = t.id
|
|
with BatchQuery() as b:
|
|
t.batch(b).iff(count=5).update(text='something else')
|
|
|
|
updated = TestTransactionModel.objects(id=id).first()
|
|
self.assertEqual(updated.text, 'something else')
|
|
|
|
b = BatchQuery()
|
|
updated.batch(b).iff(count=6).update(text='and another thing')
|
|
with self.assertRaises(LWTException) as assertion:
|
|
b.execute()
|
|
|
|
self.assertEqual(assertion.exception.existing, {
|
|
'id': id,
|
|
'count': 5,
|
|
'[applied]': False,
|
|
})
|
|
|
|
updated = TestTransactionModel.objects(id=id).first()
|
|
self.assertEqual(updated.text, 'something else')
|