From 9c92a5066726e4eccbe4f906b4de022c4c03812c Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Wed, 23 Apr 2014 20:39:10 -0500 Subject: [PATCH 1/4] Adding a few missing code coverage tests --- tests/unit/test_connection.py | 16 +++++++ tests/unit/test_marshalling.py | 6 +++ tests/unit/test_metadata.py | 3 ++ tests/unit/test_policies.py | 7 +++ tests/unit/test_types.py | 78 +++++++++++++++++++++++++++++++++- 5 files changed, 108 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index 12f1fb39..4671fef5 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from cassandra.cluster import Cluster try: import unittest2 as unittest @@ -152,3 +153,18 @@ class ConnectionTest(unittest.TestCase): self.assertRaises(NotImplementedError, c.close) self.assertRaises(NotImplementedError, c.register_watcher, None, None) self.assertRaises(NotImplementedError, c.register_watchers, None) + + def test_set_keyspace_blocking(self): + c = self.make_connection() + + self.assertEqual(c.keyspace, None) + c.set_keyspace_blocking(None) + self.assertEqual(c.keyspace, None) + + c.keyspace = 'ks' + c.set_keyspace_blocking('ks') + self.assertEqual(c.keyspace, 'ks') + + def test_set_connection_class(self): + cluster = Cluster(connection_class='test') + self.assertEqual('test', cluster.connection_class) diff --git a/tests/unit/test_marshalling.py b/tests/unit/test_marshalling.py index c30d1367..6f96ceab 100644 --- a/tests/unit/test_marshalling.py +++ b/tests/unit/test_marshalling.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from cassandra.marshal import bitlength try: import unittest2 as unittest @@ -131,3 +132,8 @@ class TestUnmarshal(unittest.TestCase): self.assertEqual(type(whatwegot), type(serializedval), msg='Marshaller for %s (%s) gave wrong type (%s instead of %s)' % (valtype, marshaller, type(whatwegot), type(serializedval))) + + def test_bitlength(self): + self.assertEqual(bitlength(9), 4) + self.assertEqual(bitlength(-10), 0) + self.assertEqual(bitlength(0), 0) diff --git a/tests/unit/test_metadata.py b/tests/unit/test_metadata.py index 1005e82a..62a2e57d 100644 --- a/tests/unit/test_metadata.py +++ b/tests/unit/test_metadata.py @@ -147,6 +147,9 @@ class TestStrategies(unittest.TestCase): self.assertItemsEqual(rf3_replicas[MD5Token(200)], [host3, host1, host2]) + def test_ss_equals(self): + self.assertNotEqual(SimpleStrategy(1), NetworkTopologyStrategy(2)) + class TestNameEscaping(unittest.TestCase): def test_protect_name(self): diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index f375e1d5..577ebb6b 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -55,6 +55,13 @@ class TestLoadBalancingPolicy(unittest.TestCase): self.assertRaises(NotImplementedError, policy.on_add, host) self.assertRaises(NotImplementedError, policy.on_remove, host) + def test_instance_check(self): + try: + Cluster(load_balancing_policy=RoundRobinPolicy) + self.fail() + except TypeError: + pass + class TestRoundRobinPolicy(unittest.TestCase): diff --git a/tests/unit/test_types.py b/tests/unit/test_types.py index b4586a93..d4dc6b74 100644 --- a/tests/unit/test_types.py +++ b/tests/unit/test_types.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import tempfile try: import unittest2 as unittest @@ -22,8 +23,9 @@ import datetime import cassandra from cassandra.cqltypes import (BooleanType, lookup_casstype_simple, lookup_casstype, LongType, DecimalType, SetType, cql_typename, - CassandraType, UTF8Type, parse_casstype_args) -from cassandra.decoder import named_tuple_factory + CassandraType, UTF8Type, parse_casstype_args, + EmptyValue, _CassandraType, DateType) +from cassandra.decoder import named_tuple_factory, write_string, read_longstring, write_stringmap, read_stringmap, read_inet, write_inet, cql_quote class TypeTests(unittest.TestCase): @@ -170,3 +172,75 @@ class TypeTests(unittest.TestCase): self.assertEquals(UTF8Type, ctype.subtypes[2]) self.assertEquals(['city', None, 'zip'], ctype.names) + + def test_empty_value(self): + self.assertEqual(str(EmptyValue()), 'EMPTY') + + def test_CassandraType(self): + cassandra_type = _CassandraType('randomvaluetocheck') + self.assertEqual(cassandra_type.val, 'randomvaluetocheck') + self.assertEqual(cassandra_type.validate('randomvaluetocheck2'), 'randomvaluetocheck2') + self.assertEqual(cassandra_type.val, 'randomvaluetocheck') + + def test_DateType(self): + now = datetime.datetime.now() + date_type = DateType(now) + self.assertEqual(date_type.my_timestamp(), now) + try: + date_type.interpret_datestring('fakestring') + self.fail() + except ValueError: + pass + + def test_write_read_string(self): + # BUG? + # Traceback (most recent call last): + # File "/Users/joaquin/repos/python-driver/tests/unit/test_types.py", line 199, in test_write_read_string + # self.assertEqual(read_longstring(f), u'test') + # AssertionError: u'st' != u'test' + # - st + # + test + # ? ++ + with tempfile.TemporaryFile() as f: + value = u'test' + write_string(f, value) + f.seek(0) + self.assertEqual(read_longstring(f), value) + + def test_write_read_stringmap(self): + with tempfile.TemporaryFile() as f: + value = {'key': 'value'} + write_stringmap(f, value) + f.seek(0) + self.assertEqual(read_stringmap(f), value) + + def test_write_read_inet(self): + # BUG? I didn't think it mattered, but just wanted to confirm + # Traceback (most recent call last): + # File "/Users/joaquin/repos/python-driver/tests/unit/test_types.py", line 228, in test_write_read_inet + # self.assertEqual(read_inet(f), value) + # AssertionError: Tuples differ: ('2001:db8:0:f101::1', 9042) != ('2001:0db8:0:f101::1', 9042) + # + # First differing element 0: + # 2001:db8:0:f101::1 + # 2001:0db8:0:f101::1 + # + # - ('2001:db8:0:f101::1', 9042) + # + ('2001:0db8:0:f101::1', 9042) + # ? + + with tempfile.TemporaryFile() as f: + value = ('192.168.1.1', 9042) + write_inet(f, value) + f.seek(0) + self.assertEqual(read_inet(f), value) + + with tempfile.TemporaryFile() as f: + value = ('2001:0db8:0:f101::1', 9042) + write_inet(f, value) + f.seek(0) + self.assertEqual(read_inet(f), value) + + def test_cql_quote(self): + self.assertEqual(cql_quote(u'test'), "'test'") + self.assertEqual(cql_quote('test'), "'test'") + self.assertEqual(cql_quote(0), '0') From 7db2f3b4129cd3e6642ede0027ff2ba65696055c Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Thu, 24 Apr 2014 16:21:55 -0500 Subject: [PATCH 2/4] test case fixes --- tests/unit/test_policies.py | 6 +----- tests/unit/test_types.py | 38 ++++++++++--------------------------- 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index 577ebb6b..a3aa27b5 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -56,11 +56,7 @@ class TestLoadBalancingPolicy(unittest.TestCase): self.assertRaises(NotImplementedError, policy.on_remove, host) def test_instance_check(self): - try: - Cluster(load_balancing_policy=RoundRobinPolicy) - self.fail() - except TypeError: - pass + self.assertRaises(TypeError, Cluster, load_balancing_policy=RoundRobinPolicy) class TestRoundRobinPolicy(unittest.TestCase): diff --git a/tests/unit/test_types.py b/tests/unit/test_types.py index d4dc6b74..323553b5 100644 --- a/tests/unit/test_types.py +++ b/tests/unit/test_types.py @@ -25,7 +25,7 @@ from cassandra.cqltypes import (BooleanType, lookup_casstype_simple, lookup_cass LongType, DecimalType, SetType, cql_typename, CassandraType, UTF8Type, parse_casstype_args, EmptyValue, _CassandraType, DateType) -from cassandra.decoder import named_tuple_factory, write_string, read_longstring, write_stringmap, read_stringmap, read_inet, write_inet, cql_quote +from cassandra.decoder import named_tuple_factory, write_string, read_longstring, write_stringmap, read_stringmap, read_inet, write_inet, cql_quote, read_string, write_longstring class TypeTests(unittest.TestCase): @@ -186,25 +186,20 @@ class TypeTests(unittest.TestCase): now = datetime.datetime.now() date_type = DateType(now) self.assertEqual(date_type.my_timestamp(), now) - try: - date_type.interpret_datestring('fakestring') - self.fail() - except ValueError: - pass + self.assertRaises(ValueError, date_type.interpret_datestring, 'fakestring') def test_write_read_string(self): - # BUG? - # Traceback (most recent call last): - # File "/Users/joaquin/repos/python-driver/tests/unit/test_types.py", line 199, in test_write_read_string - # self.assertEqual(read_longstring(f), u'test') - # AssertionError: u'st' != u'test' - # - st - # + test - # ? ++ with tempfile.TemporaryFile() as f: value = u'test' write_string(f, value) f.seek(0) + self.assertEqual(read_string(f), value) + + def test_write_read_longstring(self): + with tempfile.TemporaryFile() as f: + value = u'test' + write_longstring(f, value) + f.seek(0) self.assertEqual(read_longstring(f), value) def test_write_read_stringmap(self): @@ -215,19 +210,6 @@ class TypeTests(unittest.TestCase): self.assertEqual(read_stringmap(f), value) def test_write_read_inet(self): - # BUG? I didn't think it mattered, but just wanted to confirm - # Traceback (most recent call last): - # File "/Users/joaquin/repos/python-driver/tests/unit/test_types.py", line 228, in test_write_read_inet - # self.assertEqual(read_inet(f), value) - # AssertionError: Tuples differ: ('2001:db8:0:f101::1', 9042) != ('2001:0db8:0:f101::1', 9042) - # - # First differing element 0: - # 2001:db8:0:f101::1 - # 2001:0db8:0:f101::1 - # - # - ('2001:db8:0:f101::1', 9042) - # + ('2001:0db8:0:f101::1', 9042) - # ? + with tempfile.TemporaryFile() as f: value = ('192.168.1.1', 9042) write_inet(f, value) @@ -235,7 +217,7 @@ class TypeTests(unittest.TestCase): self.assertEqual(read_inet(f), value) with tempfile.TemporaryFile() as f: - value = ('2001:0db8:0:f101::1', 9042) + value = ('2001:db8:0:f101::1', 9042) write_inet(f, value) f.seek(0) self.assertEqual(read_inet(f), value) From 47dcfbe9e35482dc948477b4c63fa61507fd1653 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Thu, 24 Apr 2014 18:57:04 -0500 Subject: [PATCH 3/4] Added cassandra.util tests --- tests/unit/test_utils.py | 424 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 424 insertions(+) create mode 100644 tests/unit/test_utils.py diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py new file mode 100644 index 00000000..a52a6737 --- /dev/null +++ b/tests/unit/test_utils.py @@ -0,0 +1,424 @@ +# Copyright 2013-2014 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +from cassandra.util import OrderedDict, WeakSet + + +class OrderedDictTests(unittest.TestCase): + + def test_init(self): + traditional_dict = {1: 2, 3: 4} + + ordered_dict = OrderedDict() + ordered_dict = OrderedDict(traditional_dict) + self.assertEqual(traditional_dict, ordered_dict) + + self.assertRaises(TypeError, OrderedDict, traditional_dict, 'extra argument') + + def test_(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + del traditional_dict[1] + del ordered_dict[1] + self.assertEqual(traditional_dict, ordered_dict) + + def test_(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + traditional_dict[1] = 5 + ordered_dict[1] = 5 + self.assertEqual(traditional_dict, ordered_dict) + + def test_(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + for value in ordered_dict: + self.assertIn(value, set([1, 3])) + + def test_reversed(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + current_order_dict = iter(ordered_dict) + self.assertEqual(current_order_dict.next(), 1) + self.assertEqual(current_order_dict.next(), 3) + reversed_dict = reversed(ordered_dict) + self.assertEqual(reversed_dict.next(), 3) + self.assertEqual(reversed_dict.next(), 1) + + del traditional_dict[1] + del ordered_dict[1] + traditional_dict[1] = 5 + ordered_dict[1] = 5 + + current_order_dict = iter(ordered_dict) + self.assertEqual(current_order_dict.next(), 3) + self.assertEqual(current_order_dict.next(), 1) + reversed_dict = reversed(ordered_dict) + self.assertEqual(reversed_dict.next(), 1) + self.assertEqual(reversed_dict.next(), 3) + + def test_pop(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + self.assertEqual(ordered_dict.popitem(), (3, 4)) + self.assertEqual(ordered_dict.popitem(), (1, 2)) + self.assertRaises(KeyError, ordered_dict.popitem) + + + def test_pop_first(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + self.assertEqual(ordered_dict.popitem(last=False), (1, 2)) + self.assertEqual(ordered_dict.popitem(last=False), (3, 4)) + self.assertRaises(KeyError, ordered_dict.popitem) + + def test_reduce(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + #TODO: Not sure how this is done + + def test_keys(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + self.assertEqual(traditional_dict.keys(), ordered_dict.keys()) + + def test_repr(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + self.assertEqual(repr(ordered_dict), 'OrderedDict([(1, 2), (3, 4)])') + + ordered_dict = OrderedDict() + self.assertEqual(repr(ordered_dict), 'OrderedDict()') + + def test_copy(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + copy_dict = ordered_dict.copy() + self.assertEqual(ordered_dict, copy_dict) + + del copy_dict[1] + self.assertNotEqual(ordered_dict, copy_dict) + + copy_dict[1] = 5 + self.assertNotEqual(ordered_dict, copy_dict) + + def test_fromkeys(self): + traditional_dict = {1: 2, 3: 4} + ordered_dict = OrderedDict(traditional_dict) + + self.assertEqual(ordered_dict.fromkeys([1, 2]), OrderedDict([(1, None), (2, None)])) + + self.assertEqual(ordered_dict.fromkeys([1, 2], True), OrderedDict([(1, True), (2, True)])) + + +class TestWeakSet(unittest.TestCase): + class Foo(object): pass + + def test_init(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set = WeakSet([f1, f2]) + + def test_add(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set = WeakSet([f1, f2]) + + f3 = self.Foo() + weak_set.add(f3) + + iterator = iter(weak_set) + self.assertIn(iterator.next(), set([f1, f2, f3])) + self.assertIn(iterator.next(), set([f1, f2, f3])) + self.assertIn(iterator.next(), set([f1, f2, f3])) + self.assertEqual(len(weak_set), 3) + + def test_del(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set = WeakSet([f1, f2]) + + f3 = self.Foo() + weak_set.add(f3) + + del f2 + self.assertIn(f1, weak_set) + self.assertIn(f3, weak_set) + self.assertEqual(len(weak_set), 2) + + def test_clear(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set = WeakSet([f1, f2]) + + weak_set.clear() + self.assertEqual(weak_set, WeakSet()) + + def test_copy(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set = WeakSet([f1, f2]) + + copy_set = weak_set.copy() + self.assertEqual(weak_set, copy_set) + + del f2 + self.assertEqual(weak_set, copy_set) + + copy_set.discard(f1) + self.assertNotEqual(weak_set, copy_set) + + def test_pop(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set = WeakSet([f1, f2]) + + self.assertIn(weak_set.pop(), set([f1, f2])) + self.assertIn(weak_set.pop(), set([f1, f2])) + + self.assertRaises(KeyError, weak_set.pop) + + def test_remove(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set = WeakSet([f1, f2]) + + weak_set.remove(f2) + self.assertEqual(weak_set.pop(), f1) + + def test_update(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + + f3 = self.Foo() + f4 = self.Foo() + weak_set_2 = WeakSet([f3, f4]) + + weak_set_3 = WeakSet([f1, f2, f3, f4]) + + weak_set_1.update(weak_set_2) + self.assertEqual(weak_set_1, weak_set_3) + + weak_set_4 = weak_set_1.copy() + weak_set_4.update([f3, f4]) + self.assertEqual(weak_set_3, weak_set_4) + + def test_ior(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + + f3 = self.Foo() + f4 = self.Foo() + weak_set_2 = WeakSet([f3, f4]) + + weak_set_3 = WeakSet([f1, f2, f3, f4]) + + weak_set_1 |= (weak_set_2) + self.assertEqual(weak_set_1, weak_set_3) + + weak_set_4 = weak_set_1.copy() + weak_set_4 |= ([f3, f4]) + self.assertEqual(weak_set_3, weak_set_4) + + def test_difference_update(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + copy_set_1 = weak_set_1.copy() + + f3 = self.Foo() + f4 = self.Foo() + weak_set_2 = WeakSet([f3, f4]) + + weak_set_1.difference_update(weak_set_2) + self.assertEqual(weak_set_1, copy_set_1) + + weak_set_1.difference_update(copy_set_1) + self.assertEqual(weak_set_1, WeakSet()) + + def test_isub(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + copy_set_1 = weak_set_1.copy() + + f3 = self.Foo() + f4 = self.Foo() + weak_set_2 = WeakSet([f3, f4]) + + weak_set_1 -= weak_set_2 + self.assertEqual(weak_set_1, copy_set_1) + + weak_set_1 -= copy_set_1 + self.assertEqual(weak_set_1, WeakSet()) + + def test_intersection(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + copy_set_1 = weak_set_1.copy() + + f3 = self.Foo() + f4 = self.Foo() + weak_set_2 = WeakSet([f3, f4]) + + intersection = weak_set_1.intersection(weak_set_2) + self.assertEqual(intersection, WeakSet()) + + intersection = weak_set_1.intersection(copy_set_1) + self.assertEqual(intersection, copy_set_1) + + f5 = self.Foo() + weak_set_2.add(f5) + weak_set_3 = WeakSet([f5]) + + intersection = weak_set_3.intersection(weak_set_2) + self.assertEqual(intersection, WeakSet([f5])) + + intersection = weak_set_1.intersection(copy_set_1) + self.assertEqual(intersection, weak_set_1) + self.assertEqual(intersection, copy_set_1) + + def test_intersection_update(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + copy_set_1 = weak_set_1.copy() + + f3 = self.Foo() + f4 = self.Foo() + weak_set_2 = WeakSet([f3, f4]) + + weak_set_1.intersection_update(weak_set_2) + self.assertEqual(weak_set_1, WeakSet()) + + weak_set_1.intersection_update(copy_set_1) + self.assertEqual(weak_set_1, WeakSet()) + + f5 = self.Foo() + weak_set_2.add(f5) + weak_set_3 = WeakSet([f5]) + + weak_set_1 = copy_set_1.copy() + + weak_set_3.intersection_update(weak_set_2) + self.assertEqual(weak_set_3, WeakSet([f5])) + + weak_set_1.intersection_update(copy_set_1) + self.assertEqual(weak_set_1, copy_set_1) + + def test_issubset(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + + weak_set_2 = WeakSet([f1]) + + self.assertTrue(weak_set_2.issubset(weak_set_1)) + self.assertFalse(weak_set_1.issubset(weak_set_2)) + + def test_issuperset(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + + weak_set_2 = WeakSet([f1]) + + self.assertFalse(weak_set_2.issuperset(weak_set_1)) + self.assertTrue(weak_set_1.issuperset(weak_set_2)) + + def test_symmetric_difference(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + copy_set_1 = weak_set_1.copy() + + weak_set_2 = WeakSet([f1]) + + symmetric_difference = weak_set_1.symmetric_difference(weak_set_2) + self.assertEqual(symmetric_difference, WeakSet([f2])) + + weak_set_1 = copy_set_1.copy() + symmetric_difference = weak_set_2.symmetric_difference(weak_set_1) + self.assertEqual(symmetric_difference, WeakSet([f2])) + + weak_set_1 = copy_set_1.copy() + symmetric_difference = weak_set_1.symmetric_difference(weak_set_1) + self.assertEqual(symmetric_difference, WeakSet()) + + def test_symmetric_difference_update(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + copy_set_1 = weak_set_1.copy() + + weak_set_2 = WeakSet([f1]) + + weak_set_1.symmetric_difference_update(weak_set_2) + self.assertEqual(weak_set_1, WeakSet([f2])) + + weak_set_1 = copy_set_1.copy() + weak_set_2.symmetric_difference_update(weak_set_1) + self.assertEqual(weak_set_2, WeakSet([f2])) + + weak_set_1 = copy_set_1.copy() + weak_set_1.symmetric_difference_update(weak_set_1) + self.assertEqual(weak_set_1, WeakSet()) + + def test_union(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + + f3 = self.Foo() + f4 = self.Foo() + weak_set_2 = WeakSet([f3, f4]) + + weak_set_3 = WeakSet([f1, f2, f3, f4]) + + union = weak_set_1.union(weak_set_2) + self.assertEqual(union, weak_set_3) + + def test_isdisjoint(self): + f1 = self.Foo() + f2 = self.Foo() + weak_set_1 = WeakSet([f1, f2]) + copy_set_1 = weak_set_1.copy() + + f3 = self.Foo() + f4 = self.Foo() + weak_set_2 = WeakSet([f3, f4]) + + weak_set_3 = WeakSet([f1, f2, f3, f4]) + + isdisjoint = weak_set_1.isdisjoint(weak_set_2) + self.assertEqual(isdisjoint, True) + + isdisjoint = weak_set_1.isdisjoint(weak_set_3) + self.assertEqual(isdisjoint, False) + + isdisjoint = weak_set_1.isdisjoint(copy_set_1) + self.assertEqual(isdisjoint, False) From c2b3bac0338ca92f84b6a48285db32b2d9794e90 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Fri, 25 Apr 2014 19:27:10 -0500 Subject: [PATCH 4/4] Removed tests/unit/test_utils.py --- tests/unit/test_utils.py | 424 --------------------------------------- 1 file changed, 424 deletions(-) delete mode 100644 tests/unit/test_utils.py diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py deleted file mode 100644 index a52a6737..00000000 --- a/tests/unit/test_utils.py +++ /dev/null @@ -1,424 +0,0 @@ -# Copyright 2013-2014 DataStax, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import unittest -from cassandra.util import OrderedDict, WeakSet - - -class OrderedDictTests(unittest.TestCase): - - def test_init(self): - traditional_dict = {1: 2, 3: 4} - - ordered_dict = OrderedDict() - ordered_dict = OrderedDict(traditional_dict) - self.assertEqual(traditional_dict, ordered_dict) - - self.assertRaises(TypeError, OrderedDict, traditional_dict, 'extra argument') - - def test_(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - del traditional_dict[1] - del ordered_dict[1] - self.assertEqual(traditional_dict, ordered_dict) - - def test_(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - traditional_dict[1] = 5 - ordered_dict[1] = 5 - self.assertEqual(traditional_dict, ordered_dict) - - def test_(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - for value in ordered_dict: - self.assertIn(value, set([1, 3])) - - def test_reversed(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - current_order_dict = iter(ordered_dict) - self.assertEqual(current_order_dict.next(), 1) - self.assertEqual(current_order_dict.next(), 3) - reversed_dict = reversed(ordered_dict) - self.assertEqual(reversed_dict.next(), 3) - self.assertEqual(reversed_dict.next(), 1) - - del traditional_dict[1] - del ordered_dict[1] - traditional_dict[1] = 5 - ordered_dict[1] = 5 - - current_order_dict = iter(ordered_dict) - self.assertEqual(current_order_dict.next(), 3) - self.assertEqual(current_order_dict.next(), 1) - reversed_dict = reversed(ordered_dict) - self.assertEqual(reversed_dict.next(), 1) - self.assertEqual(reversed_dict.next(), 3) - - def test_pop(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - self.assertEqual(ordered_dict.popitem(), (3, 4)) - self.assertEqual(ordered_dict.popitem(), (1, 2)) - self.assertRaises(KeyError, ordered_dict.popitem) - - - def test_pop_first(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - self.assertEqual(ordered_dict.popitem(last=False), (1, 2)) - self.assertEqual(ordered_dict.popitem(last=False), (3, 4)) - self.assertRaises(KeyError, ordered_dict.popitem) - - def test_reduce(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - #TODO: Not sure how this is done - - def test_keys(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - self.assertEqual(traditional_dict.keys(), ordered_dict.keys()) - - def test_repr(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - self.assertEqual(repr(ordered_dict), 'OrderedDict([(1, 2), (3, 4)])') - - ordered_dict = OrderedDict() - self.assertEqual(repr(ordered_dict), 'OrderedDict()') - - def test_copy(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - copy_dict = ordered_dict.copy() - self.assertEqual(ordered_dict, copy_dict) - - del copy_dict[1] - self.assertNotEqual(ordered_dict, copy_dict) - - copy_dict[1] = 5 - self.assertNotEqual(ordered_dict, copy_dict) - - def test_fromkeys(self): - traditional_dict = {1: 2, 3: 4} - ordered_dict = OrderedDict(traditional_dict) - - self.assertEqual(ordered_dict.fromkeys([1, 2]), OrderedDict([(1, None), (2, None)])) - - self.assertEqual(ordered_dict.fromkeys([1, 2], True), OrderedDict([(1, True), (2, True)])) - - -class TestWeakSet(unittest.TestCase): - class Foo(object): pass - - def test_init(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set = WeakSet([f1, f2]) - - def test_add(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set = WeakSet([f1, f2]) - - f3 = self.Foo() - weak_set.add(f3) - - iterator = iter(weak_set) - self.assertIn(iterator.next(), set([f1, f2, f3])) - self.assertIn(iterator.next(), set([f1, f2, f3])) - self.assertIn(iterator.next(), set([f1, f2, f3])) - self.assertEqual(len(weak_set), 3) - - def test_del(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set = WeakSet([f1, f2]) - - f3 = self.Foo() - weak_set.add(f3) - - del f2 - self.assertIn(f1, weak_set) - self.assertIn(f3, weak_set) - self.assertEqual(len(weak_set), 2) - - def test_clear(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set = WeakSet([f1, f2]) - - weak_set.clear() - self.assertEqual(weak_set, WeakSet()) - - def test_copy(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set = WeakSet([f1, f2]) - - copy_set = weak_set.copy() - self.assertEqual(weak_set, copy_set) - - del f2 - self.assertEqual(weak_set, copy_set) - - copy_set.discard(f1) - self.assertNotEqual(weak_set, copy_set) - - def test_pop(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set = WeakSet([f1, f2]) - - self.assertIn(weak_set.pop(), set([f1, f2])) - self.assertIn(weak_set.pop(), set([f1, f2])) - - self.assertRaises(KeyError, weak_set.pop) - - def test_remove(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set = WeakSet([f1, f2]) - - weak_set.remove(f2) - self.assertEqual(weak_set.pop(), f1) - - def test_update(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - - f3 = self.Foo() - f4 = self.Foo() - weak_set_2 = WeakSet([f3, f4]) - - weak_set_3 = WeakSet([f1, f2, f3, f4]) - - weak_set_1.update(weak_set_2) - self.assertEqual(weak_set_1, weak_set_3) - - weak_set_4 = weak_set_1.copy() - weak_set_4.update([f3, f4]) - self.assertEqual(weak_set_3, weak_set_4) - - def test_ior(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - - f3 = self.Foo() - f4 = self.Foo() - weak_set_2 = WeakSet([f3, f4]) - - weak_set_3 = WeakSet([f1, f2, f3, f4]) - - weak_set_1 |= (weak_set_2) - self.assertEqual(weak_set_1, weak_set_3) - - weak_set_4 = weak_set_1.copy() - weak_set_4 |= ([f3, f4]) - self.assertEqual(weak_set_3, weak_set_4) - - def test_difference_update(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - copy_set_1 = weak_set_1.copy() - - f3 = self.Foo() - f4 = self.Foo() - weak_set_2 = WeakSet([f3, f4]) - - weak_set_1.difference_update(weak_set_2) - self.assertEqual(weak_set_1, copy_set_1) - - weak_set_1.difference_update(copy_set_1) - self.assertEqual(weak_set_1, WeakSet()) - - def test_isub(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - copy_set_1 = weak_set_1.copy() - - f3 = self.Foo() - f4 = self.Foo() - weak_set_2 = WeakSet([f3, f4]) - - weak_set_1 -= weak_set_2 - self.assertEqual(weak_set_1, copy_set_1) - - weak_set_1 -= copy_set_1 - self.assertEqual(weak_set_1, WeakSet()) - - def test_intersection(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - copy_set_1 = weak_set_1.copy() - - f3 = self.Foo() - f4 = self.Foo() - weak_set_2 = WeakSet([f3, f4]) - - intersection = weak_set_1.intersection(weak_set_2) - self.assertEqual(intersection, WeakSet()) - - intersection = weak_set_1.intersection(copy_set_1) - self.assertEqual(intersection, copy_set_1) - - f5 = self.Foo() - weak_set_2.add(f5) - weak_set_3 = WeakSet([f5]) - - intersection = weak_set_3.intersection(weak_set_2) - self.assertEqual(intersection, WeakSet([f5])) - - intersection = weak_set_1.intersection(copy_set_1) - self.assertEqual(intersection, weak_set_1) - self.assertEqual(intersection, copy_set_1) - - def test_intersection_update(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - copy_set_1 = weak_set_1.copy() - - f3 = self.Foo() - f4 = self.Foo() - weak_set_2 = WeakSet([f3, f4]) - - weak_set_1.intersection_update(weak_set_2) - self.assertEqual(weak_set_1, WeakSet()) - - weak_set_1.intersection_update(copy_set_1) - self.assertEqual(weak_set_1, WeakSet()) - - f5 = self.Foo() - weak_set_2.add(f5) - weak_set_3 = WeakSet([f5]) - - weak_set_1 = copy_set_1.copy() - - weak_set_3.intersection_update(weak_set_2) - self.assertEqual(weak_set_3, WeakSet([f5])) - - weak_set_1.intersection_update(copy_set_1) - self.assertEqual(weak_set_1, copy_set_1) - - def test_issubset(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - - weak_set_2 = WeakSet([f1]) - - self.assertTrue(weak_set_2.issubset(weak_set_1)) - self.assertFalse(weak_set_1.issubset(weak_set_2)) - - def test_issuperset(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - - weak_set_2 = WeakSet([f1]) - - self.assertFalse(weak_set_2.issuperset(weak_set_1)) - self.assertTrue(weak_set_1.issuperset(weak_set_2)) - - def test_symmetric_difference(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - copy_set_1 = weak_set_1.copy() - - weak_set_2 = WeakSet([f1]) - - symmetric_difference = weak_set_1.symmetric_difference(weak_set_2) - self.assertEqual(symmetric_difference, WeakSet([f2])) - - weak_set_1 = copy_set_1.copy() - symmetric_difference = weak_set_2.symmetric_difference(weak_set_1) - self.assertEqual(symmetric_difference, WeakSet([f2])) - - weak_set_1 = copy_set_1.copy() - symmetric_difference = weak_set_1.symmetric_difference(weak_set_1) - self.assertEqual(symmetric_difference, WeakSet()) - - def test_symmetric_difference_update(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - copy_set_1 = weak_set_1.copy() - - weak_set_2 = WeakSet([f1]) - - weak_set_1.symmetric_difference_update(weak_set_2) - self.assertEqual(weak_set_1, WeakSet([f2])) - - weak_set_1 = copy_set_1.copy() - weak_set_2.symmetric_difference_update(weak_set_1) - self.assertEqual(weak_set_2, WeakSet([f2])) - - weak_set_1 = copy_set_1.copy() - weak_set_1.symmetric_difference_update(weak_set_1) - self.assertEqual(weak_set_1, WeakSet()) - - def test_union(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - - f3 = self.Foo() - f4 = self.Foo() - weak_set_2 = WeakSet([f3, f4]) - - weak_set_3 = WeakSet([f1, f2, f3, f4]) - - union = weak_set_1.union(weak_set_2) - self.assertEqual(union, weak_set_3) - - def test_isdisjoint(self): - f1 = self.Foo() - f2 = self.Foo() - weak_set_1 = WeakSet([f1, f2]) - copy_set_1 = weak_set_1.copy() - - f3 = self.Foo() - f4 = self.Foo() - weak_set_2 = WeakSet([f3, f4]) - - weak_set_3 = WeakSet([f1, f2, f3, f4]) - - isdisjoint = weak_set_1.isdisjoint(weak_set_2) - self.assertEqual(isdisjoint, True) - - isdisjoint = weak_set_1.isdisjoint(weak_set_3) - self.assertEqual(isdisjoint, False) - - isdisjoint = weak_set_1.isdisjoint(copy_set_1) - self.assertEqual(isdisjoint, False)