more formatting

This commit is contained in:
Blake Eggleston
2014-03-11 13:27:12 -07:00
parent 1c6660199b
commit cc93d45f3c
2 changed files with 43 additions and 42 deletions

View File

@@ -483,7 +483,6 @@ class BaseCQLStatement(object):
return long(((tmp - datetime.fromtimestamp(0)).total_seconds()) * 1000000)
def __unicode__(self):
raise NotImplementedError

View File

@@ -22,6 +22,7 @@ class TzOffset(tzinfo):
"""Minimal implementation of a timezone offset to help testing with timezone
aware datetimes.
"""
def __init__(self, offset):
self._offset = timedelta(hours=offset)
@@ -34,6 +35,7 @@ class TzOffset(tzinfo):
def dst(self, dt):
return timedelta(0)
class TestModel(Model):
test_id = columns.Integer(primary_key=True)
attempt_id = columns.Integer(primary_key=True)
@@ -41,6 +43,7 @@ class TestModel(Model):
expected_result = columns.Integer()
test_result = columns.Integer()
class IndexedTestModel(Model):
test_id = columns.Integer(primary_key=True)
attempt_id = columns.Integer(index=True)
@@ -48,6 +51,7 @@ class IndexedTestModel(Model):
expected_result = columns.Integer()
test_result = columns.Integer(index=True)
class TestMultiClusteringModel(Model):
one = columns.Integer(primary_key=True)
two = columns.Integer(primary_key=True)
@@ -55,7 +59,6 @@ class TestMultiClusteringModel(Model):
class TestQuerySetOperation(BaseCassEngTestCase):
def test_query_filter_parsing(self):
"""
Tests the queryset filter method parses it's kwargs properly
@@ -164,8 +167,8 @@ class TestQuerySetOperation(BaseCassEngTestCase):
Tests that setting only or defer fields that don't exist raises an exception
"""
class BaseQuerySetUsage(BaseCassEngTestCase):
class BaseQuerySetUsage(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(BaseQuerySetUsage, cls).setUpClass()
@@ -201,9 +204,12 @@ class BaseQuerySetUsage(BaseCassEngTestCase):
IndexedTestModel.objects.create(test_id=7, attempt_id=3, description='try8', expected_result=20, test_result=20)
IndexedTestModel.objects.create(test_id=8, attempt_id=0, description='try9', expected_result=50, test_result=40)
IndexedTestModel.objects.create(test_id=9, attempt_id=1, description='try10', expected_result=60, test_result=40)
IndexedTestModel.objects.create(test_id=10, attempt_id=2, description='try11', expected_result=70, test_result=45)
IndexedTestModel.objects.create(test_id=11, attempt_id=3, description='try12', expected_result=75, test_result=45)
IndexedTestModel.objects.create(test_id=9, attempt_id=1, description='try10', expected_result=60,
test_result=40)
IndexedTestModel.objects.create(test_id=10, attempt_id=2, description='try11', expected_result=70,
test_result=45)
IndexedTestModel.objects.create(test_id=11, attempt_id=3, description='try12', expected_result=75,
test_result=45)
@classmethod
def tearDownClass(cls):
@@ -212,8 +218,8 @@ class BaseQuerySetUsage(BaseCassEngTestCase):
drop_table(IndexedTestModel)
drop_table(TestMultiClusteringModel)
class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
def test_count(self):
""" Tests that adding filtering statements affects the count query as expected """
assert TestModel.objects.count() == 12
@@ -232,7 +238,7 @@ class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
""" Tests that iterating over a query set pulls back all of the expected results """
q = TestModel.objects(test_id=0)
#tuple of expected attempt_id, expected_result values
compare_set = set([(0,5), (1,10), (2,15), (3,20)])
compare_set = set([(0, 5), (1, 10), (2, 15), (3, 20)])
for t in q:
val = t.attempt_id, t.expected_result
assert val in compare_set
@@ -243,7 +249,7 @@ class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
q = TestModel.objects(attempt_id=3).allow_filtering()
assert len(q) == 3
#tuple of expected test_id, expected_result values
compare_set = set([(0,20), (1,20), (2,75)])
compare_set = set([(0, 20), (1, 20), (2, 75)])
for t in q:
val = t.test_id, t.expected_result
assert val in compare_set
@@ -254,7 +260,7 @@ class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
q = TestModel.objects(TestModel.attempt_id == 3).allow_filtering()
assert len(q) == 3
#tuple of expected test_id, expected_result values
compare_set = set([(0,20), (1,20), (2,75)])
compare_set = set([(0, 20), (1, 20), (2, 75)])
for t in q:
val = t.test_id, t.expected_result
assert val in compare_set
@@ -266,7 +272,7 @@ class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
# test with both the filtering method and the query method
for q in (TestModel.objects(test_id=0), TestModel.objects(TestModel.test_id == 0)):
#tuple of expected attempt_id, expected_result values
compare_set = set([(0,5), (1,10), (2,15), (3,20)])
compare_set = set([(0, 5), (1, 10), (2, 15), (3, 20)])
for t in q:
val = t.attempt_id, t.expected_result
assert val in compare_set
@@ -274,7 +280,7 @@ class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
assert len(compare_set) == 0
#try it again
compare_set = set([(0,5), (1,10), (2,15), (3,20)])
compare_set = set([(0, 5), (1, 10), (2, 15), (3, 20)])
for t in q:
val = t.attempt_id, t.expected_result
assert val in compare_set
@@ -287,7 +293,7 @@ class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
"""
for q in (TestModel.objects(test_id=0), TestModel.objects(TestModel.test_id == 0)):
q = q.order_by('attempt_id')
expected_order = [0,1,2,3]
expected_order = [0, 1, 2, 3]
iter1 = iter(q)
iter2 = iter(q)
for attempt_id in expected_order:
@@ -354,18 +360,18 @@ class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
"""
"""
class TestQuerySetOrdering(BaseQuerySetUsage):
def test_order_by_success_case(self):
q = TestModel.objects(test_id=0).order_by('attempt_id')
expected_order = [0,1,2,3]
for model, expect in zip(q,expected_order):
expected_order = [0, 1, 2, 3]
for model, expect in zip(q, expected_order):
assert model.attempt_id == expect
q = q.order_by('-attempt_id')
expected_order.reverse()
for model, expect in zip(q,expected_order):
for model, expect in zip(q, expected_order):
assert model.attempt_id == expect
def test_ordering_by_non_second_primary_keys_fail(self):
@@ -403,7 +409,6 @@ class TestQuerySetOrdering(BaseQuerySetUsage):
class TestQuerySetSlicing(BaseQuerySetUsage):
def test_out_of_range_index_raises_error(self):
q = TestModel.objects(test_id=0).order_by('attempt_id')
with self.assertRaises(IndexError):
@@ -411,32 +416,32 @@ class TestQuerySetSlicing(BaseQuerySetUsage):
def test_array_indexing_works_properly(self):
q = TestModel.objects(test_id=0).order_by('attempt_id')
expected_order = [0,1,2,3]
expected_order = [0, 1, 2, 3]
for i in range(len(q)):
assert q[i].attempt_id == expected_order[i]
def test_negative_indexing_works_properly(self):
q = TestModel.objects(test_id=0).order_by('attempt_id')
expected_order = [0,1,2,3]
expected_order = [0, 1, 2, 3]
assert q[-1].attempt_id == expected_order[-1]
assert q[-2].attempt_id == expected_order[-2]
def test_slicing_works_properly(self):
q = TestModel.objects(test_id=0).order_by('attempt_id')
expected_order = [0,1,2,3]
expected_order = [0, 1, 2, 3]
for model, expect in zip(q[1:3], expected_order[1:3]):
assert model.attempt_id == expect
def test_negative_slicing(self):
q = TestModel.objects(test_id=0).order_by('attempt_id')
expected_order = [0,1,2,3]
expected_order = [0, 1, 2, 3]
for model, expect in zip(q[-3:], expected_order[-3:]):
assert model.attempt_id == expect
for model, expect in zip(q[:-1], expected_order[:-1]):
assert model.attempt_id == expect
class TestQuerySetValidation(BaseQuerySetUsage):
class TestQuerySetValidation(BaseQuerySetUsage):
def test_primary_key_or_index_must_be_specified(self):
"""
Tests that queries that don't have an equals relation to a primary key or indexed field fail
@@ -453,7 +458,6 @@ class TestQuerySetValidation(BaseQuerySetUsage):
q = TestModel.objects(test_id__gt=0)
list([i for i in q])
def test_indexed_field_can_be_queried(self):
"""
Tests that queries on an indexed field will work without any primary key relations specified
@@ -461,8 +465,8 @@ class TestQuerySetValidation(BaseQuerySetUsage):
q = IndexedTestModel.objects(test_result=25)
assert q.count() == 4
class TestQuerySetDelete(BaseQuerySetUsage):
class TestQuerySetDelete(BaseQuerySetUsage):
def test_delete(self):
TestModel.objects.create(test_id=3, attempt_id=0, description='try9', expected_result=50, test_result=40)
TestModel.objects.create(test_id=3, attempt_id=1, description='try10', expected_result=60, test_result=40)
@@ -487,15 +491,15 @@ class TestQuerySetDelete(BaseQuerySetUsage):
with self.assertRaises(query.QueryException):
TestModel.objects(attempt_id=0).delete()
class TestQuerySetConnectionHandling(BaseQuerySetUsage):
class TestQuerySetConnectionHandling(BaseQuerySetUsage):
def test_conn_is_returned_after_filling_cache(self):
"""
Tests that the queryset returns it's connection after it's fetched all of it's results
"""
q = TestModel.objects(test_id=0)
#tuple of expected attempt_id, expected_result values
compare_set = set([(0,5), (1,10), (2,15), (3,20)])
compare_set = set([(0, 5), (1, 10), (2, 15), (3, 20)])
for t in q:
val = t.attempt_id, t.expected_result
assert val in compare_set
@@ -506,13 +510,12 @@ class TestQuerySetConnectionHandling(BaseQuerySetUsage):
class TimeUUIDQueryModel(Model):
partition = columns.UUID(primary_key=True)
time = columns.TimeUUID(primary_key=True)
data = columns.Text(required=False)
partition = columns.UUID(primary_key=True)
time = columns.TimeUUID(primary_key=True)
data = columns.Text(required=False)
class TestMinMaxTimeUUIDFunctions(BaseCassEngTestCase):
@classmethod
def setUpClass(cls):
super(TestMinMaxTimeUUIDFunctions, cls).setUpClass()
@@ -585,14 +588,14 @@ class TestMinMaxTimeUUIDFunctions(BaseCassEngTestCase):
q = [d for d in q]
assert len(q) == 2
datas = [d.data for d in q]
assert '1' in datas
assert '2' in datas
assert '1' in datas
assert '2' in datas
q = TimeUUIDQueryModel.filter(partition=pk, time__gte=functions.MinTimeUUID(midpoint))
assert len(q) == 2
datas = [d.data for d in q]
assert '3' in datas
assert '4' in datas
assert '3' in datas
assert '4' in datas
# test query expression filtering
q = TimeUUIDQueryModel.filter(
@@ -602,8 +605,8 @@ class TestMinMaxTimeUUIDFunctions(BaseCassEngTestCase):
q = [d for d in q]
assert len(q) == 2
datas = [d.data for d in q]
assert '1' in datas
assert '2' in datas
assert '1' in datas
assert '2' in datas
q = TimeUUIDQueryModel.filter(
TimeUUIDQueryModel.partition == pk,
@@ -611,15 +614,14 @@ class TestMinMaxTimeUUIDFunctions(BaseCassEngTestCase):
)
assert len(q) == 2
datas = [d.data for d in q]
assert '3' in datas
assert '4' in datas
assert '3' in datas
assert '4' in datas
class TestInOperator(BaseQuerySetUsage):
def test_kwarg_success_case(self):
""" Tests the in operator works with the kwarg query method """
q = TestModel.filter(test_id__in=[0,1])
q = TestModel.filter(test_id__in=[0, 1])
assert q.count() == 8
def test_query_expression_success_case(self):
@@ -637,8 +639,8 @@ class TestValuesList(BaseQuerySetUsage):
item = q.values_list('expected_result', flat=True).first()
assert item == 10
class TestObjectsProperty(BaseQuerySetUsage):
class TestObjectsProperty(BaseQuerySetUsage):
def test_objects_property_returns_fresh_queryset(self):
assert TestModel.objects._result_cache is None
len(TestModel.objects) # evaluate queryset