Files
deb-python-cassandra-driver/cqlengine/tests/test_consistency.py
2013-10-24 15:36:20 -07:00

36 lines
1.1 KiB
Python

from cqlengine.management import sync_table, drop_table
from cqlengine.tests.base import BaseCassEngTestCase
from cqlengine.models import Model
from uuid import uuid4
from cqlengine import columns
import mock
from cqlengine.connection import ConnectionPool
from cqlengine import ALL
class TestConsistencyModel(Model):
id = columns.UUID(primary_key=True, default=lambda:uuid4())
count = columns.Integer()
text = columns.Text(required=False)
class BaseConsistencyTest(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(BaseConsistencyTest, cls).setUpClass()
sync_table(TestConsistencyModel)
@classmethod
def tearDownClass(cls):
super(BaseConsistencyTest, cls).tearDownClass()
drop_table(TestConsistencyModel)
class TestConsistency(BaseConsistencyTest):
def test_create_uses_consistency(self):
with mock.patch.object(ConnectionPool, 'execute') as m:
TestConsistencyModel.consistency(ALL).create(text="i am not fault tolerant this way")
args = m.call_args
self.assertEqual(ALL, args[2])