cqle: Make integration tests run in Python 2.6

PYTHON-288
This commit is contained in:
Adam Holmberg
2015-06-11 16:33:20 -05:00
parent b47cbd6279
commit 2f6d11208e
34 changed files with 281 additions and 257 deletions

View File

@@ -639,7 +639,7 @@ class Decimal(Column):
if val is None:
return
try:
return _Decimal(val)
return _Decimal(repr(val)) if isinstance(val, float) else _Decimal(val)
except InvalidOperation:
raise ValidationError("{0} '{1}' can't be coerced to decimal".format(self.column_name, val))
@@ -808,7 +808,7 @@ class Map(BaseContainerColumn):
if not isinstance(val, dict):
raise ValidationError('{0} {1} is not a dict object'.format(self.column_name, val))
if None in val:
raise ValidationError("{} None is not allowed in a map".format(self.column_name))
raise ValidationError("{0} None is not allowed in a map".format(self.column_name))
return dict((self.key_col.validate(k), self.value_col.validate(v)) for k, v in val.items())
def to_python(self, value):

View File

@@ -62,23 +62,16 @@ class BaseQueryFunction(QueryValue):
pass
class MinTimeUUID(BaseQueryFunction):
"""
return a fake timeuuid corresponding to the smallest possible timeuuid for the given timestamp
http://cassandra.apache.org/doc/cql3/CQL.html#timeuuidFun
"""
format_string = 'MinTimeUUID(%({0})s)'
class TimeUUIDQueryFunction(BaseQueryFunction):
def __init__(self, value):
"""
:param value: the time to create a maximum time uuid from
:param value: the time to create bounding time uuid from
:type value: datetime
"""
if not isinstance(value, datetime):
raise ValidationError('datetime instance is required')
super(MinTimeUUID, self).__init__(value)
super(TimeUUIDQueryFunction, self).__init__(value)
def to_database(self, val):
epoch = datetime(1970, 1, 1, tzinfo=val.tzinfo)
@@ -89,31 +82,23 @@ class MinTimeUUID(BaseQueryFunction):
ctx[str(self.context_id)] = self.to_database(self.value)
class MaxTimeUUID(BaseQueryFunction):
class MinTimeUUID(TimeUUIDQueryFunction):
"""
return a fake timeuuid corresponding to the smallest possible timeuuid for the given timestamp
http://cassandra.apache.org/doc/cql3/CQL.html#timeuuidFun
"""
format_string = 'MinTimeUUID(%({0})s)'
class MaxTimeUUID(TimeUUIDQueryFunction):
"""
return a fake timeuuid corresponding to the largest possible timeuuid for the given timestamp
http://cassandra.apache.org/doc/cql3/CQL.html#timeuuidFun
"""
format_string = 'MaxTimeUUID(%({0})s)'
def __init__(self, value):
"""
:param value: the time to create a minimum time uuid from
:type value: datetime
"""
if not isinstance(value, datetime):
raise ValidationError('datetime instance is required')
super(MaxTimeUUID, self).__init__(value)
def to_database(self, val):
epoch = datetime(1970, 1, 1, tzinfo=val.tzinfo)
offset = epoch.tzinfo.utcoffset(epoch).total_seconds() if epoch.tzinfo else 0
return int(((val - epoch).total_seconds() - offset) * 1000)
def update_context(self, ctx):
ctx[str(self.context_id)] = self.to_database(self.value)
class Token(BaseQueryFunction):

View File

@@ -11,28 +11,30 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import sys
from unittest import TestCase
from cassandra.cqlengine.connection import get_session
class BaseCassEngTestCase(TestCase):
class BaseCassEngTestCase(unittest.TestCase):
session = None
def setUp(self):
self.session = get_session()
super(BaseCassEngTestCase, self).setUp()
def assertHasAttr(self, obj, attr):
self.assertTrue(hasattr(obj, attr),
"{} doesn't have attribute: {}".format(obj, attr))
"{0} doesn't have attribute: {1}".format(obj, attr))
def assertNotHasAttr(self, obj, attr):
self.assertFalse(hasattr(obj, attr),
"{} shouldn't have the attribute: {}".format(obj, attr))
"{0} shouldn't have the attribute: {1}".format(obj, attr))
if sys.version_info > (3, 0):
def assertItemsEqual(self, first, second, msg=None):

View File

@@ -13,13 +13,18 @@
# limitations under the License.
from datetime import datetime, timedelta
import json, six, sys, traceback, logging
import json
import logging
import six
import sys
import traceback
from uuid import uuid4
from cassandra import WriteTimeout
from cassandra.cqlengine.models import Model, ValidationError
import cassandra.cqlengine.columns as columns
from cassandra.cqlengine.functions import get_total_seconds
from cassandra.cqlengine.models import Model, ValidationError
from cassandra.cqlengine.management import sync_table, drop_table
from tests.integration.cqlengine import is_prepend_reversed
from tests.integration.cqlengine.base import BaseCassEngTestCase
@@ -38,14 +43,16 @@ class JsonTestColumn(columns.Column):
db_type = 'text'
def to_python(self, value):
if value is None: return
if value is None:
return
if isinstance(value, six.string_types):
return json.loads(value)
else:
return value
def to_database(self, value):
if value is None: return
if value is None:
return
return json.dumps(value)
@@ -53,18 +60,15 @@ class TestSetColumn(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestSetColumn, cls).setUpClass()
drop_table(TestSetModel)
sync_table(TestSetModel)
@classmethod
def tearDownClass(cls):
super(TestSetColumn, cls).tearDownClass()
drop_table(TestSetModel)
def test_add_none_fails(self):
with self.assertRaises(ValidationError):
m = TestSetModel.create(int_set=set([None]))
self.assertRaises(ValidationError, TestSetModel.create, **{'int_set': set([None])})
def test_empty_set_initial(self):
"""
@@ -83,7 +87,7 @@ class TestSetColumn(BaseCassEngTestCase):
m.save()
m = TestSetModel.get(partition=m.partition)
self.assertNotIn(5, m.int_set)
self.assertTrue(5 not in m.int_set)
def test_blind_deleting_last_item_should_succeed(self):
m = TestSetModel.create()
@@ -93,7 +97,7 @@ class TestSetColumn(BaseCassEngTestCase):
TestSetModel.objects(partition=m.partition).update(int_set=set())
m = TestSetModel.get(partition=m.partition)
self.assertNotIn(5, m.int_set)
self.assertTrue(5 not in m.int_set)
def test_empty_set_retrieval(self):
m = TestSetModel.create()
@@ -102,7 +106,7 @@ class TestSetColumn(BaseCassEngTestCase):
def test_io_success(self):
""" Tests that a basic usage works as expected """
m1 = TestSetModel.create(int_set={1, 2}, text_set={'kai', 'andreas'})
m1 = TestSetModel.create(int_set=set((1, 2)), text_set=set(('kai', 'andreas')))
m2 = TestSetModel.get(partition=m1.partition)
assert isinstance(m2.int_set, set)
@@ -118,8 +122,7 @@ class TestSetColumn(BaseCassEngTestCase):
"""
Tests that attempting to use the wrong types will raise an exception
"""
with self.assertRaises(ValidationError):
TestSetModel.create(int_set={'string', True}, text_set={1, 3.0})
self.assertRaises(ValidationError, TestSetModel.create, **{'int_set': set(('string', True)), 'text_set': set((1, 3.0))})
def test_element_count_validation(self):
"""
@@ -127,27 +130,26 @@ class TestSetColumn(BaseCassEngTestCase):
"""
while True:
try:
TestSetModel.create(text_set={str(uuid4()) for i in range(65535)})
TestSetModel.create(text_set=set(str(uuid4()) for i in range(65535)))
break
except WriteTimeout:
ex_type, ex, tb = sys.exc_info()
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
del tb
with self.assertRaises(ValidationError):
TestSetModel.create(text_set={str(uuid4()) for i in range(65536)})
self.assertRaises(ValidationError, TestSetModel.create, **{'text_set': set(str(uuid4()) for i in range(65536))})
def test_partial_updates(self):
""" Tests that partial udpates work as expected """
m1 = TestSetModel.create(int_set={1, 2, 3, 4})
m1 = TestSetModel.create(int_set=set((1, 2, 3, 4)))
m1.int_set.add(5)
m1.int_set.remove(1)
assert m1.int_set == {2, 3, 4, 5}
assert m1.int_set == set((2, 3, 4, 5))
m1.save()
m2 = TestSetModel.get(partition=m1.partition)
assert m2.int_set == {2, 3, 4, 5}
assert m2.int_set == set((2, 3, 4, 5))
def test_instantiation_with_column_class(self):
"""
@@ -167,9 +169,9 @@ class TestSetColumn(BaseCassEngTestCase):
def test_to_python(self):
""" Tests that to_python of value column is called """
column = columns.Set(JsonTestColumn)
val = {1, 2, 3}
val = set((1, 2, 3))
db_val = column.to_database(val)
assert db_val == {json.dumps(v) for v in val}
assert db_val == set(json.dumps(v) for v in val)
py_val = column.to_python(db_val)
assert py_val == val
@@ -177,17 +179,16 @@ class TestSetColumn(BaseCassEngTestCase):
""" tests that the default empty container is not saved if it hasn't been updated """
pkey = uuid4()
# create a row with set data
TestSetModel.create(partition=pkey, int_set={3, 4})
TestSetModel.create(partition=pkey, int_set=set((3, 4)))
# create another with no set data
TestSetModel.create(partition=pkey)
m = TestSetModel.get(partition=pkey)
self.assertEqual(m.int_set, {3, 4})
self.assertEqual(m.int_set, set((3, 4)))
class TestListModel(Model):
partition = columns.UUID(primary_key=True, default=uuid4)
int_list = columns.List(columns.Integer, required=False)
text_list = columns.List(columns.Text, required=False)
@@ -197,20 +198,18 @@ class TestListColumn(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestListColumn, cls).setUpClass()
drop_table(TestListModel)
sync_table(TestListModel)
@classmethod
def tearDownClass(cls):
super(TestListColumn, cls).tearDownClass()
drop_table(TestListModel)
def test_initial(self):
tmp = TestListModel.create()
tmp.int_list.append(1)
def test_initial(self):
def test_initial_retrieve(self):
tmp = TestListModel.create()
tmp2 = TestListModel.get(partition=tmp.partition)
tmp2.int_list.append(1)
@@ -236,8 +235,7 @@ class TestListColumn(BaseCassEngTestCase):
"""
Tests that attempting to use the wrong types will raise an exception
"""
with self.assertRaises(ValidationError):
TestListModel.create(int_list=['string', True], text_list=[1, 3.0])
self.assertRaises(ValidationError, TestListModel.create, **{'int_list': ['string', True], 'text_list': [1, 3.0]})
def test_element_count_validation(self):
"""
@@ -251,8 +249,7 @@ class TestListColumn(BaseCassEngTestCase):
ex_type, ex, tb = sys.exc_info()
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
del tb
with self.assertRaises(ValidationError):
TestListModel.create(text_list=[str(uuid4()) for i in range(65536)])
self.assertRaises(ValidationError, TestListModel.create, **{'text_list': [str(uuid4()) for _ in range(65536)]})
def test_partial_updates(self):
""" Tests that partial udpates work as expected """
@@ -300,16 +297,16 @@ class TestListColumn(BaseCassEngTestCase):
""" tests that the default empty container is not saved if it hasn't been updated """
pkey = uuid4()
# create a row with list data
TestListModel.create(partition=pkey, int_list=[1,2,3,4])
TestListModel.create(partition=pkey, int_list=[1, 2, 3, 4])
# create another with no list data
TestListModel.create(partition=pkey)
m = TestListModel.get(partition=pkey)
self.assertEqual(m.int_list, [1,2,3,4])
self.assertEqual(m.int_list, [1, 2, 3, 4])
def test_remove_entry_works(self):
pkey = uuid4()
tmp = TestListModel.create(partition=pkey, int_list=[1,2])
tmp = TestListModel.create(partition=pkey, int_list=[1, 2])
tmp.int_list.pop()
tmp.update()
tmp = TestListModel.get(partition=pkey)
@@ -317,7 +314,7 @@ class TestListColumn(BaseCassEngTestCase):
def test_update_from_non_empty_to_empty(self):
pkey = uuid4()
tmp = TestListModel.create(partition=pkey, int_list=[1,2])
tmp = TestListModel.create(partition=pkey, int_list=[1, 2])
tmp.int_list = []
tmp.update()
@@ -326,8 +323,7 @@ class TestListColumn(BaseCassEngTestCase):
def test_insert_none(self):
pkey = uuid4()
with self.assertRaises(ValidationError):
TestListModel.create(partition=pkey, int_list=[None])
self.assertRaises(ValidationError, TestListModel.create, **{'partition': pkey, 'int_list': [None]})
def test_blind_list_updates_from_none(self):
""" Tests that updates from None work as expected """
@@ -344,6 +340,7 @@ class TestListColumn(BaseCassEngTestCase):
m3 = TestListModel.get(partition=m.partition)
assert m3.int_list == []
class TestMapModel(Model):
partition = columns.UUID(primary_key=True, default=uuid4)
int_map = columns.Map(columns.Integer, columns.UUID, required=False)
@@ -354,13 +351,11 @@ class TestMapColumn(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestMapColumn, cls).setUpClass()
drop_table(TestMapModel)
sync_table(TestMapModel)
@classmethod
def tearDownClass(cls):
super(TestMapColumn, cls).tearDownClass()
drop_table(TestMapModel)
def test_empty_default(self):
@@ -368,8 +363,7 @@ class TestMapColumn(BaseCassEngTestCase):
tmp.int_map['blah'] = 1
def test_add_none_as_map_key(self):
with self.assertRaises(ValidationError):
TestMapModel.create(int_map={None: uuid4()})
self.assertRaises(ValidationError, TestMapModel.create, **{'int_map': {None: uuid4()}})
def test_empty_retrieve(self):
tmp = TestMapModel.create()
@@ -384,7 +378,7 @@ class TestMapColumn(BaseCassEngTestCase):
tmp.save()
tmp = TestMapModel.get(partition=tmp.partition)
self.assertNotIn("blah", tmp.int_map)
self.assertTrue("blah" not in tmp.int_map)
def test_io_success(self):
""" Tests that a basic usage works as expected """
@@ -395,25 +389,24 @@ class TestMapColumn(BaseCassEngTestCase):
m1 = TestMapModel.create(int_map={1: k1, 2: k2}, text_map={'now': now, 'then': then})
m2 = TestMapModel.get(partition=m1.partition)
assert isinstance(m2.int_map, dict)
assert isinstance(m2.text_map, dict)
self.assertTrue(isinstance(m2.int_map, dict))
self.assertTrue(isinstance(m2.text_map, dict))
assert 1 in m2.int_map
assert 2 in m2.int_map
assert m2.int_map[1] == k1
assert m2.int_map[2] == k2
self.assertTrue(1 in m2.int_map)
self.assertTrue(2 in m2.int_map)
self.assertEqual(m2.int_map[1], k1)
self.assertEqual(m2.int_map[2], k2)
assert 'now' in m2.text_map
assert 'then' in m2.text_map
assert (now - m2.text_map['now']).total_seconds() < 0.001
assert (then - m2.text_map['then']).total_seconds() < 0.001
self.assertTrue('now' in m2.text_map)
self.assertTrue('then' in m2.text_map)
self.assertAlmostEqual(get_total_seconds(now - m2.text_map['now']), 0, 2)
self.assertAlmostEqual(get_total_seconds(then - m2.text_map['then']), 0, 2)
def test_type_validation(self):
"""
Tests that attempting to use the wrong types will raise an exception
"""
with self.assertRaises(ValidationError):
TestMapModel.create(int_map={'key': 2, uuid4(): 'val'}, text_map={2: 5})
self.assertRaises(ValidationError, TestMapModel.create, **{'int_map': {'key': 2, uuid4(): 'val'}, 'text_map': {2: 5}})
def test_element_count_validation(self):
"""
@@ -421,19 +414,18 @@ class TestMapColumn(BaseCassEngTestCase):
"""
while True:
try:
TestMapModel.create(text_map={str(uuid4()): i for i in range(65535)})
TestMapModel.create(text_map=dict((str(uuid4()), i) for i in range(65535)))
break
except WriteTimeout:
ex_type, ex, tb = sys.exc_info()
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
del tb
with self.assertRaises(ValidationError):
TestMapModel.create(text_map={str(uuid4()): i for i in range(65536)})
self.assertRaises(ValidationError, TestMapModel.create, **{'text_map': dict((str(uuid4()), i) for i in range(65536))})
def test_partial_updates(self):
""" Tests that partial udpates work as expected """
now = datetime.now()
#derez it a bit
# derez it a bit
now = datetime(*now.timetuple()[:-3])
early = now - timedelta(minutes=30)
earlier = early - timedelta(minutes=30)
@@ -511,7 +503,7 @@ class TestMapColumn(BaseCassEngTestCase):
column = columns.Map(JsonTestColumn, JsonTestColumn)
val = {1: 2, 3: 4, 5: 6}
db_val = column.to_database(val)
assert db_val == {json.dumps(k):json.dumps(v) for k,v in val.items()}
assert db_val == dict((json.dumps(k), json.dumps(v)) for k, v in val.items())
py_val = column.to_python(db_val)
assert py_val == val
@@ -530,7 +522,6 @@ class TestMapColumn(BaseCassEngTestCase):
class TestCamelMapModel(Model):
partition = columns.UUID(primary_key=True, default=uuid4)
camelMap = columns.Map(columns.Text, columns.Integer, required=False)
@@ -539,13 +530,11 @@ class TestCamelMapColumn(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestCamelMapColumn, cls).setUpClass()
drop_table(TestCamelMapModel)
sync_table(TestCamelMapModel)
@classmethod
def tearDownClass(cls):
super(TestCamelMapColumn, cls).tearDownClass()
drop_table(TestCamelMapModel)
def test_camelcase_column(self):

View File

@@ -31,41 +31,48 @@ class TestClassConstruction(BaseCassEngTestCase):
def test_defining_a_non_counter_column_fails(self):
""" Tests that defining a non counter column field in a model with a counter column fails """
with self.assertRaises(ModelDefinitionException):
try:
class model(Model):
partition = columns.UUID(primary_key=True, default=uuid4)
counter = columns.Counter()
text = columns.Text()
self.fail("did not raise expected ModelDefinitionException")
except ModelDefinitionException:
pass
def test_defining_a_primary_key_counter_column_fails(self):
""" Tests that defining primary keys on counter columns fails """
with self.assertRaises(TypeError):
try:
class model(Model):
partition = columns.UUID(primary_key=True, default=uuid4)
cluster = columns.Counter(primary_ley=True)
counter = columns.Counter()
self.fail("did not raise expected TypeError")
except TypeError:
pass
# force it
with self.assertRaises(ModelDefinitionException):
try:
class model(Model):
partition = columns.UUID(primary_key=True, default=uuid4)
cluster = columns.Counter()
cluster.primary_key = True
counter = columns.Counter()
self.fail("did not raise expected ModelDefinitionException")
except ModelDefinitionException:
pass
class TestCounterColumn(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestCounterColumn, cls).setUpClass()
drop_table(TestCounterModel)
sync_table(TestCounterModel)
@classmethod
def tearDownClass(cls):
super(TestCounterColumn, cls).tearDownClass()
drop_table(TestCounterModel)
def test_updates(self):

View File

@@ -12,7 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import skipUnless
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from uuid import uuid4
from cassandra.cqlengine import columns
@@ -33,19 +37,21 @@ class TestStaticModel(Model):
text = columns.Text()
@skipUnless(STATIC_SUPPORTED, "only runs against the cql3 protocol v2.0")
class TestStaticColumn(BaseCassEngTestCase):
def setUp(cls):
if not STATIC_SUPPORTED:
raise unittest.SkipTest("only runs against the cql3 protocol v2.0")
super(TestStaticColumn, cls).setUp()
@classmethod
def setUpClass(cls):
super(TestStaticColumn, cls).setUpClass()
drop_table(TestStaticModel)
if STATIC_SUPPORTED: # setup and teardown run regardless of skip
sync_table(TestStaticModel)
@classmethod
def tearDownClass(cls):
super(TestStaticColumn, cls).tearDownClass()
drop_table(TestStaticModel)
def test_mixed_updates(self):

View File

@@ -12,9 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from datetime import datetime, timedelta, date, tzinfo
from decimal import Decimal as D
from unittest import TestCase, SkipTest
from uuid import uuid4, uuid1
from cassandra import InvalidRequest
@@ -46,12 +50,10 @@ class TestDatetime(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestDatetime, cls).setUpClass()
sync_table(cls.DatetimeTest)
@classmethod
def tearDownClass(cls):
super(TestDatetime, cls).tearDownClass()
drop_table(cls.DatetimeTest)
def test_datetime_io(self):
@@ -95,7 +97,6 @@ class TestBoolDefault(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestBoolDefault, cls).setUpClass()
sync_table(cls.BoolDefaultValueTest)
def test_default_is_set(self):
@@ -112,7 +113,6 @@ class TestBoolValidation(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestBoolValidation, cls).setUpClass()
sync_table(cls.BoolValidationTest)
def test_validation_preserves_none(self):
@@ -129,12 +129,10 @@ class TestVarInt(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestVarInt, cls).setUpClass()
sync_table(cls.VarIntTest)
@classmethod
def tearDownClass(cls):
super(TestVarInt, cls).tearDownClass()
sync_table(cls.VarIntTest)
def test_varint_io(self):
@@ -155,14 +153,12 @@ class TestDate(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
if PROTOCOL_VERSION < 4:
raise SkipTest("Protocol v4 datatypes require native protocol 4+, currently using: {0}".format(PROTOCOL_VERSION))
raise unittest.SkipTest("Protocol v4 datatypes require native protocol 4+, currently using: {0}".format(PROTOCOL_VERSION))
super(TestDate, cls).setUpClass()
sync_table(cls.DateTest)
@classmethod
def tearDownClass(cls):
super(TestDate, cls).tearDownClass()
drop_table(cls.DateTest)
def test_date_io(self):
@@ -195,12 +191,10 @@ class TestDecimal(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestDecimal, cls).setUpClass()
sync_table(cls.DecimalTest)
@classmethod
def tearDownClass(cls):
super(TestDecimal, cls).tearDownClass()
drop_table(cls.DecimalTest)
def test_decimal_io(self):
@@ -220,12 +214,10 @@ class TestUUID(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestUUID, cls).setUpClass()
sync_table(cls.UUIDTest)
@classmethod
def tearDownClass(cls):
super(TestUUID, cls).tearDownClass()
drop_table(cls.UUIDTest)
def test_uuid_str_with_dashes(self):
@@ -255,12 +247,10 @@ class TestTimeUUID(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestTimeUUID, cls).setUpClass()
sync_table(cls.TimeUUIDTest)
@classmethod
def tearDownClass(cls):
super(TestTimeUUID, cls).tearDownClass()
drop_table(cls.TimeUUIDTest)
def test_timeuuid_io(self):
@@ -361,10 +351,10 @@ class TestPythonDoesntDieWhenExtraFieldIsInCassandra(BaseCassEngTestCase):
drop_table(self.TestModel)
sync_table(self.TestModel)
self.TestModel.create()
execute("ALTER TABLE {} add blah int".format(self.TestModel.column_family_name(include_keyspace=True)))
execute("ALTER TABLE {0} add blah int".format(self.TestModel.column_family_name(include_keyspace=True)))
self.TestModel.objects().all()
class TestTimeUUIDFromDatetime(TestCase):
class TestTimeUUIDFromDatetime(BaseCassEngTestCase):
def test_conversion_specific_date(self):
dt = datetime(1981, 7, 11, microsecond=555000)

View File

@@ -11,12 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from datetime import datetime, timedelta, time
from decimal import Decimal
from uuid import uuid1, uuid4, UUID
import six
import unittest
from cassandra.cqlengine import columns
from cassandra.cqlengine.management import sync_table
@@ -58,7 +61,7 @@ class BaseColumnIOTest(BaseCassEngTestCase):
# create a table with the given column
class IOTestModel(Model):
table_name = cls.column.db_type + "_io_test_model_{}".format(uuid4().hex[:8])
table_name = cls.column.db_type + "_io_test_model_{0}".format(uuid4().hex[:8])
pkey = cls.column(primary_key=True)
data = cls.column()
@@ -150,22 +153,6 @@ class TestDateTime(BaseColumnIOTest):
data_val = now + timedelta(days=1)
class TestDate(BaseColumnIOTest):
@classmethod
def setUpClass(cls):
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest("Protocol v4 datatypes require native protocol 4+, currently using: {0}".format(PROTOCOL_VERSION))
super(TestDate, cls).setUpClass()
column = columns.Date
now = Date(datetime.now().date())
pkey_val = now
data_val = Date(now.days_from_epoch + 1)
class TestUUID(BaseColumnIOTest):
column = columns.UUID
@@ -218,16 +205,43 @@ class TestDecimalIO(BaseColumnIOTest):
data_val = Decimal('0.005'), 3.5, '8'
def comparator_converter(self, val):
return Decimal(val)
return Decimal(repr(val) if isinstance(val, float) else val)
class TestTime(BaseColumnIOTest):
class ProtocolV4Test(BaseColumnIOTest):
@classmethod
def setUpClass(cls):
if PROTOCOL_VERSION >= 4:
super(ProtocolV4Test, cls).setUpClass()
@classmethod
def tearDownClass(cls):
if PROTOCOL_VERSION >= 4:
super(ProtocolV4Test, cls).tearDownClass()
class TestDate(ProtocolV4Test):
def setUp(self):
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest("Protocol v4 datatypes require native protocol 4+, currently using: {0}".format(PROTOCOL_VERSION))
super(TestTime, cls).setUpClass()
super(TestDate, self).setUp()
column = columns.Date
now = Date(datetime.now().date())
pkey_val = now
data_val = Date(now.days_from_epoch + 1)
class TestTime(ProtocolV4Test):
def setUp(self):
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest("Protocol v4 datatypes require native protocol 4+, currently using: {0}".format(PROTOCOL_VERSION))
super(TestTime, self).setUp()
column = columns.Time
@@ -235,14 +249,13 @@ class TestTime(BaseColumnIOTest):
data_val = Time(time(16, 47, 25, 7))
class TestSmallInt(BaseColumnIOTest):
class TestSmallInt(ProtocolV4Test):
@classmethod
def setUpClass(cls):
def setUp(self):
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest("Protocol v4 datatypes require native protocol 4+, currently using: {0}".format(PROTOCOL_VERSION))
super(TestSmallInt, cls).setUpClass()
super(TestSmallInt, self).setUp()
column = columns.SmallInt
@@ -250,14 +263,13 @@ class TestSmallInt(BaseColumnIOTest):
data_val = 32523
class TestTinyInt(BaseColumnIOTest):
class TestTinyInt(ProtocolV4Test):
@classmethod
def setUpClass(cls):
def setUp(self):
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest("Protocol v4 datatypes require native protocol 4+, currently using: {0}".format(PROTOCOL_VERSION))
super(TestTinyInt, cls).setUpClass()
super(TestTinyInt, self).setUp()
column = columns.TinyInt

View File

@@ -1,14 +0,0 @@
# Copyright 2015 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -36,10 +36,11 @@ class BaseCompactionTest(BaseCassEngTestCase):
# key is a normal_key, converted to
# __compaction_key__
key = "__compaction_{}__".format(key)
key = "__compaction_{0}__".format(key)
with patch.object(self.model, key, 10), self.assertRaises(CQLEngineException):
get_compaction_options(self.model)
with patch.object(self.model, key, 10):
with self.assertRaises(CQLEngineException):
get_compaction_options(self.model)
class SizeTieredCompactionTest(BaseCompactionTest):

View File

@@ -11,10 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import mock
from unittest import skipUnless
import warnings
from cassandra.cqlengine import CACHING_ALL, CACHING_NONE
@@ -310,7 +312,7 @@ class NonModelFailureTest(BaseCassEngTestCase):
sync_table(self.FakeModel)
@skipUnless(PROTOCOL_VERSION >= 2, "only runs against the cql3 protocol v2.0")
@unittest.skipUnless(PROTOCOL_VERSION >= 2, "only runs against the cql3 protocol v2.0")
def test_static_columns():
class StaticModel(Model):
id = columns.Integer(primary_key=True)

View File

@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import skip
from uuid import uuid4
from tests.integration.cqlengine.base import BaseCassEngTestCase

View File

@@ -11,15 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from cassandra.cqlengine import columns
from cassandra.cqlengine.management import sync_table, drop_table, create_keyspace_simple, drop_keyspace
from cassandra.cqlengine.models import Model, ModelDefinitionException
class TestModel(TestCase):
class TestModel(unittest.TestCase):
""" Tests the non-io functionality of models """
def test_instance_equality(self):
@@ -104,7 +106,7 @@ class TestModel(TestCase):
drop_keyspace('keyspace')
class BuiltInAttributeConflictTest(TestCase):
class BuiltInAttributeConflictTest(unittest.TestCase):
"""tests Model definitions that conflict with built-in attributes/methods"""
def test_model_with_attribute_name_conflict(self):

View File

@@ -11,10 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from uuid import uuid4, UUID
import random
import unittest
from datetime import datetime, date, time
from decimal import Decimal
from operator import itemgetter

View File

@@ -11,10 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from datetime import datetime, date, time
from decimal import Decimal
import unittest
from uuid import UUID, uuid4
from cassandra.cqlengine.models import Model

View File

@@ -12,11 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from cassandra.cqlengine.operators import BaseQueryOperator, QueryOperatorException
class BaseOperatorTest(TestCase):
class BaseOperatorTest(unittest.TestCase):
def test_get_operator_cannot_be_called_from_base_class(self):
with self.assertRaises(QueryOperatorException):

View File

@@ -116,7 +116,7 @@ class BatchQueryTests(BaseCassEngTestCase):
for i in range(1):
for j in range(5):
TestMultiKeyModel.create(partition=i, cluster=j, count=i*j, text='{}:{}'.format(i,j))
TestMultiKeyModel.create(partition=i, cluster=j, count=i*j, text='{0}:{1}'.format(i,j))
with BatchQuery() as b:
TestMultiKeyModel.objects.batch(b).filter(partition=0).delete()

View File

@@ -14,6 +14,7 @@
from datetime import datetime, timedelta
from uuid import uuid4
from cassandra.cqlengine.functions import get_total_seconds
from tests.integration.cqlengine.base import BaseCassEngTestCase
@@ -66,6 +67,6 @@ class TestDateTimeQueries(BaseCassEngTestCase):
obj = DateTimeQueryTestModel.create(user=pk, day=now, data='energy cheese')
load = DateTimeQueryTestModel.get(user=pk)
assert abs(now - load.day).total_seconds() < 0.001
self.assertAlmostEqual(get_total_seconds(now - load.day), 0, 2)
obj.delete()

View File

@@ -99,7 +99,7 @@ class TestTokenFunction(BaseCassEngTestCase):
q = TestModel.objects.filter(pk__token__gt=func)
where = q._where[0]
where.set_context_id(1)
self.assertEqual(str(where), 'token("p1", "p2") > token(%({})s, %({})s)'.format(1, 2))
self.assertEqual(str(where), 'token("p1", "p2") > token(%({0})s, %({1})s)'.format(1, 2))
# Verify that a SELECT query can be successfully generated
str(q._select_query())
@@ -111,7 +111,7 @@ class TestTokenFunction(BaseCassEngTestCase):
q = TestModel.objects.filter(pk__token__gt=func)
where = q._where[0]
where.set_context_id(1)
self.assertEqual(str(where), 'token("p1", "p2") > token(%({})s, %({})s)'.format(1, 2))
self.assertEqual(str(where), 'token("p1", "p2") > token(%({0})s, %({1})s)'.format(1, 2))
str(q._select_query())
# The 'pk__token' virtual column may only be compared to a Token

View File

@@ -11,11 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from datetime import datetime
import time
from unittest import TestCase, skipUnless
from uuid import uuid1, uuid4
import uuid
@@ -685,7 +689,7 @@ class TestObjectsProperty(BaseQuerySetUsage):
assert TestModel.objects._result_cache is None
@skipUnless(PROTOCOL_VERSION >= 2, "only runs against the cql3 protocol v2.0")
@unittest.skipUnless(PROTOCOL_VERSION >= 2, "only runs against the cql3 protocol v2.0")
def test_paged_result_handling():
# addresses #225
class PagingTest(Model):

View File

@@ -137,11 +137,11 @@ class QueryUpdateTests(BaseCassEngTestCase):
partition = uuid4()
cluster = 1
TestQueryUpdateModel.objects.create(
partition=partition, cluster=cluster, text_set={"foo"})
partition=partition, cluster=cluster, text_set=set(("foo",)))
TestQueryUpdateModel.objects(
partition=partition, cluster=cluster).update(text_set__add={'bar'})
partition=partition, cluster=cluster).update(text_set__add=set(('bar',)))
obj = TestQueryUpdateModel.objects.get(partition=partition, cluster=cluster)
self.assertEqual(obj.text_set, {"foo", "bar"})
self.assertEqual(obj.text_set, set(("foo", "bar")))
def test_set_add_updates_new_record(self):
""" If the key doesn't exist yet, an update creates the record
@@ -149,20 +149,20 @@ class QueryUpdateTests(BaseCassEngTestCase):
partition = uuid4()
cluster = 1
TestQueryUpdateModel.objects(
partition=partition, cluster=cluster).update(text_set__add={'bar'})
partition=partition, cluster=cluster).update(text_set__add=set(('bar',)))
obj = TestQueryUpdateModel.objects.get(partition=partition, cluster=cluster)
self.assertEqual(obj.text_set, {"bar"})
self.assertEqual(obj.text_set, set(("bar",)))
def test_set_remove_updates(self):
partition = uuid4()
cluster = 1
TestQueryUpdateModel.objects.create(
partition=partition, cluster=cluster, text_set={"foo", "baz"})
partition=partition, cluster=cluster, text_set=set(("foo", "baz")))
TestQueryUpdateModel.objects(
partition=partition, cluster=cluster).update(
text_set__remove={'foo'})
text_set__remove=set(('foo',)))
obj = TestQueryUpdateModel.objects.get(partition=partition, cluster=cluster)
self.assertEqual(obj.text_set, {"baz"})
self.assertEqual(obj.text_set, set(("baz",)))
def test_set_remove_new_record(self):
""" Removing something not in the set should silently do nothing
@@ -170,12 +170,12 @@ class QueryUpdateTests(BaseCassEngTestCase):
partition = uuid4()
cluster = 1
TestQueryUpdateModel.objects.create(
partition=partition, cluster=cluster, text_set={"foo"})
partition=partition, cluster=cluster, text_set=set(("foo",)))
TestQueryUpdateModel.objects(
partition=partition, cluster=cluster).update(
text_set__remove={'afsd'})
text_set__remove=set(('afsd',)))
obj = TestQueryUpdateModel.objects.get(partition=partition, cluster=cluster)
self.assertEqual(obj.text_set, {"foo"})
self.assertEqual(obj.text_set, set(("foo",)))
def test_list_append_updates(self):
partition = uuid4()

View File

@@ -11,12 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from unittest import TestCase
from cassandra.cqlengine.statements import AssignmentClause, SetUpdateClause, ListUpdateClause, MapUpdateClause, MapDeleteClause, FieldDeleteClause, CounterUpdateClause
class AssignmentClauseTests(TestCase):
class AssignmentClauseTests(unittest.TestCase):
def test_rendering(self):
pass
@@ -27,14 +30,14 @@ class AssignmentClauseTests(TestCase):
self.assertEqual(ac.insert_tuple(), ('a', 10))
class SetUpdateClauseTests(TestCase):
class SetUpdateClauseTests(unittest.TestCase):
def test_update_from_none(self):
c = SetUpdateClause('s', {1, 2}, previous=None)
c = SetUpdateClause('s', set((1, 2)), previous=None)
c._analyze()
c.set_context_id(0)
self.assertEqual(c._assignments, {1, 2})
self.assertEqual(c._assignments, set((1, 2)))
self.assertIsNone(c._additions)
self.assertIsNone(c._removals)
@@ -43,11 +46,11 @@ class SetUpdateClauseTests(TestCase):
ctx = {}
c.update_context(ctx)
self.assertEqual(ctx, {'0': {1, 2}})
self.assertEqual(ctx, {'0': set((1, 2))})
def test_null_update(self):
""" tests setting a set to None creates an empty update statement """
c = SetUpdateClause('s', None, previous={1, 2})
c = SetUpdateClause('s', None, previous=set((1, 2)))
c._analyze()
c.set_context_id(0)
@@ -64,7 +67,7 @@ class SetUpdateClauseTests(TestCase):
def test_no_update(self):
""" tests an unchanged value creates an empty update statement """
c = SetUpdateClause('s', {1, 2}, previous={1, 2})
c = SetUpdateClause('s', set((1, 2)), previous=set((1, 2)))
c._analyze()
c.set_context_id(0)
@@ -95,15 +98,15 @@ class SetUpdateClauseTests(TestCase):
ctx = {}
c.update_context(ctx)
self.assertEqual(ctx, {'0' : set()})
self.assertEqual(ctx, {'0': set()})
def test_additions(self):
c = SetUpdateClause('s', {1, 2, 3}, previous={1, 2})
c = SetUpdateClause('s', set((1, 2, 3)), previous=set((1, 2)))
c._analyze()
c.set_context_id(0)
self.assertIsNone(c._assignments)
self.assertEqual(c._additions, {3})
self.assertEqual(c._additions, set((3,)))
self.assertIsNone(c._removals)
self.assertEqual(c.get_context_size(), 1)
@@ -111,42 +114,42 @@ class SetUpdateClauseTests(TestCase):
ctx = {}
c.update_context(ctx)
self.assertEqual(ctx, {'0': {3}})
self.assertEqual(ctx, {'0': set((3,))})
def test_removals(self):
c = SetUpdateClause('s', {1, 2}, previous={1, 2, 3})
c = SetUpdateClause('s', set((1, 2)), previous=set((1, 2, 3)))
c._analyze()
c.set_context_id(0)
self.assertIsNone(c._assignments)
self.assertIsNone(c._additions)
self.assertEqual(c._removals, {3})
self.assertEqual(c._removals, set((3,)))
self.assertEqual(c.get_context_size(), 1)
self.assertEqual(str(c), '"s" = "s" - %(0)s')
ctx = {}
c.update_context(ctx)
self.assertEqual(ctx, {'0': {3}})
self.assertEqual(ctx, {'0': set((3,))})
def test_additions_and_removals(self):
c = SetUpdateClause('s', {2, 3}, previous={1, 2})
c = SetUpdateClause('s', set((2, 3)), previous=set((1, 2)))
c._analyze()
c.set_context_id(0)
self.assertIsNone(c._assignments)
self.assertEqual(c._additions, {3})
self.assertEqual(c._removals, {1})
self.assertEqual(c._additions, set((3,)))
self.assertEqual(c._removals, set((1,)))
self.assertEqual(c.get_context_size(), 2)
self.assertEqual(str(c), '"s" = "s" + %(0)s, "s" = "s" - %(1)s')
ctx = {}
c.update_context(ctx)
self.assertEqual(ctx, {'0': {3}, '1': {1}})
self.assertEqual(ctx, {'0': set((3,)), '1': set((1,))})
class ListUpdateClauseTests(TestCase):
class ListUpdateClauseTests(unittest.TestCase):
def test_update_from_none(self):
c = ListUpdateClause('s', [1, 2, 3])
@@ -262,7 +265,7 @@ class ListUpdateClauseTests(TestCase):
self.assertEqual(ctx, {'0': [1, 2, 3]})
class MapUpdateTests(TestCase):
class MapUpdateTests(unittest.TestCase):
def test_update(self):
c = MapUpdateClause('s', {3: 0, 5: 6}, previous={5: 0, 3: 4})
@@ -298,7 +301,7 @@ class MapUpdateTests(TestCase):
self.assertNotIn(1, c._updates)
class CounterUpdateTests(TestCase):
class CounterUpdateTests(unittest.TestCase):
def test_positive_update(self):
c = CounterUpdateClause('a', 5, 3)
@@ -334,7 +337,7 @@ class CounterUpdateTests(TestCase):
self.assertEqual(ctx, {'5': 0})
class MapDeleteTests(TestCase):
class MapDeleteTests(unittest.TestCase):
def test_update(self):
c = MapDeleteClause('s', {3: 0}, {1: 2, 3: 4, 5: 6})
@@ -350,7 +353,7 @@ class MapDeleteTests(TestCase):
self.assertEqual(ctx, {'0': 1, '1': 5})
class FieldDeleteTests(TestCase):
class FieldDeleteTests(unittest.TestCase):
def test_str(self):
f = FieldDeleteClause("blake")

View File

@@ -11,12 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from unittest import TestCase
from cassandra.cqlengine.statements import AssignmentStatement, StatementException
class AssignmentStatementTest(TestCase):
class AssignmentStatementTest(unittest.TestCase):
def test_add_assignment_type_checking(self):
""" tests that only assignment clauses can be added to queries """

View File

@@ -11,12 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from unittest import TestCase
from cassandra.cqlengine.statements import BaseCQLStatement, StatementException
class BaseStatementTest(TestCase):
class BaseStatementTest(unittest.TestCase):
def test_where_clause_type_checking(self):
""" tests that only assignment clauses can be added to queries """

View File

@@ -11,13 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from unittest import TestCase
from cassandra.cqlengine.statements import InsertStatement, StatementException, AssignmentClause
import six
class InsertStatementTests(TestCase):
class InsertStatementTests(unittest.TestCase):
def test_where_clause_failure(self):
""" tests that where clauses cannot be added to Insert statements """

View File

@@ -11,13 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from unittest import TestCase
from cassandra.cqlengine.statements import SelectStatement, WhereClause
from cassandra.cqlengine.operators import *
import six
class SelectStatementTests(TestCase):
class SelectStatementTests(unittest.TestCase):
def test_single_field_is_listified(self):
""" tests that passing a string field into the constructor puts it into a list """

View File

@@ -11,9 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from unittest import TestCase
from cassandra.cqlengine.columns import Set, List
from cassandra.cqlengine.operators import *
from cassandra.cqlengine.statements import (UpdateStatement, WhereClause,
AssignmentClause, SetUpdateClause,
@@ -21,7 +23,7 @@ from cassandra.cqlengine.statements import (UpdateStatement, WhereClause,
import six
class UpdateStatementTests(TestCase):
class UpdateStatementTests(unittest.TestCase):
def test_table_rendering(self):
""" tests that fields are properly added to the select statement """
@@ -60,7 +62,7 @@ class UpdateStatementTests(TestCase):
def test_update_set_add(self):
us = UpdateStatement('table')
us.add_assignment_clause(SetUpdateClause('a', {1}, operation='add'))
us.add_assignment_clause(SetUpdateClause('a', set((1,)), operation='add'))
self.assertEqual(six.text_type(us), 'UPDATE table SET "a" = "a" + %(0)s')
def test_update_empty_set_add_does_not_assign(self):

View File

@@ -11,14 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
from unittest import TestCase
import six
from cassandra.cqlengine.operators import EqualsOperator
from cassandra.cqlengine.statements import StatementException, WhereClause
class TestWhereClause(TestCase):
class TestWhereClause(unittest.TestCase):
def test_operator_check(self):
""" tests that creating a where statement with a non BaseWhereOperator object fails """

View File

@@ -108,7 +108,7 @@ class BatchQueryTests(BaseCassEngTestCase):
for i in range(1):
for j in range(5):
TestMultiKeyModel.create(partition=i, cluster=j, count=i*j, text='{}:{}'.format(i,j))
TestMultiKeyModel.create(partition=i, cluster=j, count=i*j, text='{0}:{1}'.format(i,j))
with BatchQuery() as b:
TestMultiKeyModel.objects.batch(b).filter(partition=0).delete()

View File

@@ -11,9 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import mock
from unittest import skipUnless
from uuid import uuid4
from cassandra.cqlengine import columns
@@ -71,7 +74,7 @@ class BaseIfNotExistsWithCounterTest(BaseCassEngTestCase):
class IfNotExistsInsertTests(BaseIfNotExistsTest):
@skipUnless(PROTOCOL_VERSION >= 2, "only runs against the cql3 protocol v2.0")
@unittest.skipUnless(PROTOCOL_VERSION >= 2, "only runs against the cql3 protocol v2.0")
def test_insert_if_not_exists_success(self):
""" tests that insertion with if_not_exists work as expected """
@@ -103,7 +106,7 @@ class IfNotExistsInsertTests(BaseIfNotExistsTest):
self.assertEqual(tm.count, 9)
self.assertEqual(tm.text, '111111111111')
@skipUnless(PROTOCOL_VERSION >= 2, "only runs against the cql3 protocol v2.0")
@unittest.skipUnless(PROTOCOL_VERSION >= 2, "only runs against the cql3 protocol v2.0")
def test_batch_insert_if_not_exists_success(self):
""" tests that batch insertion with if_not_exists work as expected """

View File

@@ -11,22 +11,26 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import gc
import os
import resource
from unittest import skipUnless
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.management import sync_table
class LoadTest(Model):
k = columns.Integer(primary_key=True)
v = columns.Integer()
@skipUnless("LOADTEST" in os.environ, "LOADTEST not on")
@unittest.skipUnless("LOADTEST" in os.environ, "LOADTEST not on")
def test_lots_of_queries():
sync_table(LoadTest)
import objgraph

View File

@@ -40,8 +40,9 @@ class BaseTimestampTest(BaseCassEngTestCase):
class BatchTest(BaseTimestampTest):
def test_batch_is_included(self):
with mock.patch.object(self.session, "execute") as m, BatchQuery(timestamp=timedelta(seconds=30)) as b:
TestTimestampModel.batch(b).create(count=1)
with mock.patch.object(self.session, "execute") as m:
with BatchQuery(timestamp=timedelta(seconds=30)) as b:
TestTimestampModel.batch(b).create(count=1)
"USING TIMESTAMP".should.be.within(m.call_args[0][0].query_string)
@@ -49,8 +50,9 @@ class BatchTest(BaseTimestampTest):
class CreateWithTimestampTest(BaseTimestampTest):
def test_batch(self):
with mock.patch.object(self.session, "execute") as m, BatchQuery() as b:
TestTimestampModel.timestamp(timedelta(seconds=10)).batch(b).create(count=1)
with mock.patch.object(self.session, "execute") as m:
with BatchQuery() as b:
TestTimestampModel.timestamp(timedelta(seconds=10)).batch(b).create(count=1)
query = m.call_args[0][0].query_string
@@ -96,8 +98,9 @@ class UpdateWithTimestampTest(BaseTimestampTest):
"USING TIMESTAMP".should.be.within(m.call_args[0][0].query_string)
def test_instance_update_in_batch(self):
with mock.patch.object(self.session, "execute") as m, BatchQuery() as b:
self.instance.batch(b).timestamp(timedelta(seconds=30)).update(count=2)
with mock.patch.object(self.session, "execute") as m:
with BatchQuery() as b:
self.instance.batch(b).timestamp(timedelta(seconds=30)).update(count=2)
query = m.call_args[0][0].query_string
"USING TIMESTAMP".should.be.within(query)

View File

@@ -11,10 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import mock
import six
from unittest import skipUnless
from uuid import uuid4
from cassandra.cqlengine import columns
@@ -32,7 +35,7 @@ class TestTransactionModel(Model):
text = columns.Text(required=False)
@skipUnless(CASSANDRA_VERSION >= '2.0.0', "transactions only supported on cassandra 2.0 or higher")
@unittest.skipUnless(CASSANDRA_VERSION >= '2.0.0', "transactions only supported on cassandra 2.0 or higher")
class TestTransaction(BaseCassEngTestCase):
@classmethod

View File

@@ -17,11 +17,6 @@ commands = {envpython} setup.py build_ext --inplace
nosetests --verbosity=2 tests/unit/
nosetests --verbosity=2 tests/integration/cqlengine
[testenv:py26]
commands = {envpython} setup.py build_ext --inplace
nosetests --verbosity=2 tests/unit/
# no cqlengine support for 2.6 right now
[testenv:pypy]
deps = {[base]deps}
commands = {envpython} setup.py build_ext --inplace