247 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			247 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from cqlengine import operators
 | |
| from cqlengine.named import NamedKeyspace
 | |
| from cqlengine.operators import EqualsOperator, GreaterThanOrEqualOperator
 | |
| from cqlengine.query import ResultObject
 | |
| from cqlengine.tests.query.test_queryset import BaseQuerySetUsage
 | |
| from cqlengine.tests.base import BaseCassEngTestCase
 | |
| 
 | |
| 
 | |
| class TestQuerySetOperation(BaseCassEngTestCase):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         super(TestQuerySetOperation, cls).setUpClass()
 | |
|         cls.keyspace = NamedKeyspace('cqlengine_test')
 | |
|         cls.table = cls.keyspace.table('test_model')
 | |
| 
 | |
|     def test_query_filter_parsing(self):
 | |
|         """
 | |
|         Tests the queryset filter method parses it's kwargs properly
 | |
|         """
 | |
|         query1 = self.table.objects(test_id=5)
 | |
|         assert len(query1._where) == 1
 | |
| 
 | |
|         op = query1._where[0]
 | |
|         assert isinstance(op.operator, operators.EqualsOperator)
 | |
|         assert op.value == 5
 | |
| 
 | |
|         query2 = query1.filter(expected_result__gte=1)
 | |
|         assert len(query2._where) == 2
 | |
| 
 | |
|         op = query2._where[1]
 | |
|         assert isinstance(op.operator, operators.GreaterThanOrEqualOperator)
 | |
|         assert op.value == 1
 | |
| 
 | |
|     def test_query_expression_parsing(self):
 | |
|         """ Tests that query experessions are evaluated properly """
 | |
|         query1 = self.table.filter(self.table.column('test_id') == 5)
 | |
|         assert len(query1._where) == 1
 | |
| 
 | |
|         op = query1._where[0]
 | |
|         assert isinstance(op.operator, operators.EqualsOperator)
 | |
|         assert op.value == 5
 | |
| 
 | |
|         query2 = query1.filter(self.table.column('expected_result') >= 1)
 | |
|         assert len(query2._where) == 2
 | |
| 
 | |
|         op = query2._where[1]
 | |
|         assert isinstance(op.operator, operators.GreaterThanOrEqualOperator)
 | |
|         assert op.value == 1
 | |
| 
 | |
|     def test_filter_method_where_clause_generation(self):
 | |
|         """
 | |
|         Tests the where clause creation
 | |
|         """
 | |
|         query1 = self.table.objects(test_id=5)
 | |
|         self.assertEqual(len(query1._where), 1)
 | |
|         where = query1._where[0]
 | |
|         self.assertEqual(where.field, 'test_id')
 | |
|         self.assertEqual(where.value, 5)
 | |
| 
 | |
|         query2 = query1.filter(expected_result__gte=1)
 | |
|         self.assertEqual(len(query2._where), 2)
 | |
| 
 | |
|         where = query2._where[0]
 | |
|         self.assertEqual(where.field, 'test_id')
 | |
|         self.assertIsInstance(where.operator, EqualsOperator)
 | |
|         self.assertEqual(where.value, 5)
 | |
| 
 | |
|         where = query2._where[1]
 | |
|         self.assertEqual(where.field, 'expected_result')
 | |
|         self.assertIsInstance(where.operator, GreaterThanOrEqualOperator)
 | |
|         self.assertEqual(where.value, 1)
 | |
| 
 | |
|     def test_query_expression_where_clause_generation(self):
 | |
|         """
 | |
|         Tests the where clause creation
 | |
|         """
 | |
|         query1 = self.table.objects(self.table.column('test_id') == 5)
 | |
|         self.assertEqual(len(query1._where), 1)
 | |
|         where = query1._where[0]
 | |
|         self.assertEqual(where.field, 'test_id')
 | |
|         self.assertEqual(where.value, 5)
 | |
| 
 | |
|         query2 = query1.filter(self.table.column('expected_result') >= 1)
 | |
|         self.assertEqual(len(query2._where), 2)
 | |
| 
 | |
|         where = query2._where[0]
 | |
|         self.assertEqual(where.field, 'test_id')
 | |
|         self.assertIsInstance(where.operator, EqualsOperator)
 | |
|         self.assertEqual(where.value, 5)
 | |
| 
 | |
|         where = query2._where[1]
 | |
|         self.assertEqual(where.field, 'expected_result')
 | |
|         self.assertIsInstance(where.operator, GreaterThanOrEqualOperator)
 | |
|         self.assertEqual(where.value, 1)
 | |
| 
 | |
| 
 | |
| class TestQuerySetCountSelectionAndIteration(BaseQuerySetUsage):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         super(TestQuerySetCountSelectionAndIteration, cls).setUpClass()
 | |
| 
 | |
|         from cqlengine.tests.query.test_queryset import TestModel
 | |
| 
 | |
|         ks,tn = TestModel.column_family_name().split('.')
 | |
|         cls.keyspace = NamedKeyspace(ks)
 | |
|         cls.table = cls.keyspace.table(tn)
 | |
| 
 | |
|     def test_count(self):
 | |
|         """ Tests that adding filtering statements affects the count query as expected """
 | |
|         assert self.table.objects.count() == 12
 | |
| 
 | |
|         q = self.table.objects(test_id=0)
 | |
|         assert q.count() == 4
 | |
| 
 | |
|     def test_query_expression_count(self):
 | |
|         """ Tests that adding query statements affects the count query as expected """
 | |
|         assert self.table.objects.count() == 12
 | |
| 
 | |
|         q = self.table.objects(self.table.column('test_id') == 0)
 | |
|         assert q.count() == 4
 | |
| 
 | |
|     def test_iteration(self):
 | |
|         """ Tests that iterating over a query set pulls back all of the expected results """
 | |
|         q = self.table.objects(test_id=0)
 | |
|         #tuple of expected attempt_id, expected_result values
 | |
|         compare_set = set([(0,5), (1,10), (2,15), (3,20)])
 | |
|         for t in q:
 | |
|             val = t.attempt_id, t.expected_result
 | |
|             assert val in compare_set
 | |
|             compare_set.remove(val)
 | |
|         assert len(compare_set) == 0
 | |
| 
 | |
|         # test with regular filtering
 | |
|         q = self.table.objects(attempt_id=3).allow_filtering()
 | |
|         assert len(q) == 3
 | |
|         #tuple of expected test_id, expected_result values
 | |
|         compare_set = set([(0,20), (1,20), (2,75)])
 | |
|         for t in q:
 | |
|             val = t.test_id, t.expected_result
 | |
|             assert val in compare_set
 | |
|             compare_set.remove(val)
 | |
|         assert len(compare_set) == 0
 | |
| 
 | |
|         # test with query method
 | |
|         q = self.table.objects(self.table.column('attempt_id') == 3).allow_filtering()
 | |
|         assert len(q) == 3
 | |
|         #tuple of expected test_id, expected_result values
 | |
|         compare_set = set([(0,20), (1,20), (2,75)])
 | |
|         for t in q:
 | |
|             val = t.test_id, t.expected_result
 | |
|             assert val in compare_set
 | |
|             compare_set.remove(val)
 | |
|         assert len(compare_set) == 0
 | |
| 
 | |
|     def test_multiple_iterations_work_properly(self):
 | |
|         """ Tests that iterating over a query set more than once works """
 | |
|         # test with both the filtering method and the query method
 | |
|         for q in (self.table.objects(test_id=0), self.table.objects(self.table.column('test_id') == 0)):
 | |
|             #tuple of expected attempt_id, expected_result values
 | |
|             compare_set = set([(0,5), (1,10), (2,15), (3,20)])
 | |
|             for t in q:
 | |
|                 val = t.attempt_id, t.expected_result
 | |
|                 assert val in compare_set
 | |
|                 compare_set.remove(val)
 | |
|             assert len(compare_set) == 0
 | |
| 
 | |
|             #try it again
 | |
|             compare_set = set([(0,5), (1,10), (2,15), (3,20)])
 | |
|             for t in q:
 | |
|                 val = t.attempt_id, t.expected_result
 | |
|                 assert val in compare_set
 | |
|                 compare_set.remove(val)
 | |
|             assert len(compare_set) == 0
 | |
| 
 | |
|     def test_multiple_iterators_are_isolated(self):
 | |
|         """
 | |
|         tests that the use of one iterator does not affect the behavior of another
 | |
|         """
 | |
|         for q in (self.table.objects(test_id=0), self.table.objects(self.table.column('test_id') == 0)):
 | |
|             q = q.order_by('attempt_id')
 | |
|             expected_order = [0,1,2,3]
 | |
|             iter1 = iter(q)
 | |
|             iter2 = iter(q)
 | |
|             for attempt_id in expected_order:
 | |
|                 assert iter1.next().attempt_id == attempt_id
 | |
|                 assert iter2.next().attempt_id == attempt_id
 | |
| 
 | |
|     def test_get_success_case(self):
 | |
|         """
 | |
|         Tests that the .get() method works on new and existing querysets
 | |
|         """
 | |
|         m = self.table.objects.get(test_id=0, attempt_id=0)
 | |
|         assert isinstance(m, ResultObject)
 | |
|         assert m.test_id == 0
 | |
|         assert m.attempt_id == 0
 | |
| 
 | |
|         q = self.table.objects(test_id=0, attempt_id=0)
 | |
|         m = q.get()
 | |
|         assert isinstance(m, ResultObject)
 | |
|         assert m.test_id == 0
 | |
|         assert m.attempt_id == 0
 | |
| 
 | |
|         q = self.table.objects(test_id=0)
 | |
|         m = q.get(attempt_id=0)
 | |
|         assert isinstance(m, ResultObject)
 | |
|         assert m.test_id == 0
 | |
|         assert m.attempt_id == 0
 | |
| 
 | |
|     def test_query_expression_get_success_case(self):
 | |
|         """
 | |
|         Tests that the .get() method works on new and existing querysets
 | |
|         """
 | |
|         m = self.table.get(self.table.column('test_id') == 0, self.table.column('attempt_id') == 0)
 | |
|         assert isinstance(m, ResultObject)
 | |
|         assert m.test_id == 0
 | |
|         assert m.attempt_id == 0
 | |
| 
 | |
|         q = self.table.objects(self.table.column('test_id') == 0, self.table.column('attempt_id') == 0)
 | |
|         m = q.get()
 | |
|         assert isinstance(m, ResultObject)
 | |
|         assert m.test_id == 0
 | |
|         assert m.attempt_id == 0
 | |
| 
 | |
|         q = self.table.objects(self.table.column('test_id') == 0)
 | |
|         m = q.get(self.table.column('attempt_id') == 0)
 | |
|         assert isinstance(m, ResultObject)
 | |
|         assert m.test_id == 0
 | |
|         assert m.attempt_id == 0
 | |
| 
 | |
|     def test_get_doesnotexist_exception(self):
 | |
|         """
 | |
|         Tests that get calls that don't return a result raises a DoesNotExist error
 | |
|         """
 | |
|         with self.assertRaises(self.table.DoesNotExist):
 | |
|             self.table.objects.get(test_id=100)
 | |
| 
 | |
|     def test_get_multipleobjects_exception(self):
 | |
|         """
 | |
|         Tests that get calls that return multiple results raise a MultipleObjectsReturned error
 | |
|         """
 | |
|         with self.assertRaises(self.table.MultipleObjectsReturned):
 | |
|             self.table.objects.get(test_id=1)
 | |
| 
 | |
| 
 | 
