Merge pull request #116 from joaquincasares/code_coverage_0423
Adding a few missing code coverage tests
This commit is contained in:
@@ -11,6 +11,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from cassandra.cluster import Cluster
|
||||
|
||||
try:
|
||||
import unittest2 as unittest
|
||||
@@ -152,3 +153,18 @@ class ConnectionTest(unittest.TestCase):
|
||||
self.assertRaises(NotImplementedError, c.close)
|
||||
self.assertRaises(NotImplementedError, c.register_watcher, None, None)
|
||||
self.assertRaises(NotImplementedError, c.register_watchers, None)
|
||||
|
||||
def test_set_keyspace_blocking(self):
|
||||
c = self.make_connection()
|
||||
|
||||
self.assertEqual(c.keyspace, None)
|
||||
c.set_keyspace_blocking(None)
|
||||
self.assertEqual(c.keyspace, None)
|
||||
|
||||
c.keyspace = 'ks'
|
||||
c.set_keyspace_blocking('ks')
|
||||
self.assertEqual(c.keyspace, 'ks')
|
||||
|
||||
def test_set_connection_class(self):
|
||||
cluster = Cluster(connection_class='test')
|
||||
self.assertEqual('test', cluster.connection_class)
|
||||
|
@@ -11,6 +11,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from cassandra.marshal import bitlength
|
||||
|
||||
try:
|
||||
import unittest2 as unittest
|
||||
@@ -131,3 +132,8 @@ class TestUnmarshal(unittest.TestCase):
|
||||
self.assertEqual(type(whatwegot), type(serializedval),
|
||||
msg='Marshaller for %s (%s) gave wrong type (%s instead of %s)'
|
||||
% (valtype, marshaller, type(whatwegot), type(serializedval)))
|
||||
|
||||
def test_bitlength(self):
|
||||
self.assertEqual(bitlength(9), 4)
|
||||
self.assertEqual(bitlength(-10), 0)
|
||||
self.assertEqual(bitlength(0), 0)
|
||||
|
@@ -147,6 +147,9 @@ class TestStrategies(unittest.TestCase):
|
||||
self.assertItemsEqual(rf3_replicas[MD5Token(200)], [host3, host1, host2])
|
||||
|
||||
|
||||
def test_ss_equals(self):
|
||||
self.assertNotEqual(SimpleStrategy(1), NetworkTopologyStrategy(2))
|
||||
|
||||
class TestNameEscaping(unittest.TestCase):
|
||||
|
||||
def test_protect_name(self):
|
||||
|
@@ -55,6 +55,9 @@ class TestLoadBalancingPolicy(unittest.TestCase):
|
||||
self.assertRaises(NotImplementedError, policy.on_add, host)
|
||||
self.assertRaises(NotImplementedError, policy.on_remove, host)
|
||||
|
||||
def test_instance_check(self):
|
||||
self.assertRaises(TypeError, Cluster, load_balancing_policy=RoundRobinPolicy)
|
||||
|
||||
|
||||
class TestRoundRobinPolicy(unittest.TestCase):
|
||||
|
||||
|
@@ -11,6 +11,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import tempfile
|
||||
|
||||
try:
|
||||
import unittest2 as unittest
|
||||
@@ -22,8 +23,9 @@ import datetime
|
||||
import cassandra
|
||||
from cassandra.cqltypes import (BooleanType, lookup_casstype_simple, lookup_casstype,
|
||||
LongType, DecimalType, SetType, cql_typename,
|
||||
CassandraType, UTF8Type, parse_casstype_args)
|
||||
from cassandra.decoder import named_tuple_factory
|
||||
CassandraType, UTF8Type, parse_casstype_args,
|
||||
EmptyValue, _CassandraType, DateType)
|
||||
from cassandra.decoder import named_tuple_factory, write_string, read_longstring, write_stringmap, read_stringmap, read_inet, write_inet, cql_quote, read_string, write_longstring
|
||||
|
||||
|
||||
class TypeTests(unittest.TestCase):
|
||||
@@ -170,3 +172,57 @@ class TypeTests(unittest.TestCase):
|
||||
self.assertEquals(UTF8Type, ctype.subtypes[2])
|
||||
|
||||
self.assertEquals(['city', None, 'zip'], ctype.names)
|
||||
|
||||
def test_empty_value(self):
|
||||
self.assertEqual(str(EmptyValue()), 'EMPTY')
|
||||
|
||||
def test_CassandraType(self):
|
||||
cassandra_type = _CassandraType('randomvaluetocheck')
|
||||
self.assertEqual(cassandra_type.val, 'randomvaluetocheck')
|
||||
self.assertEqual(cassandra_type.validate('randomvaluetocheck2'), 'randomvaluetocheck2')
|
||||
self.assertEqual(cassandra_type.val, 'randomvaluetocheck')
|
||||
|
||||
def test_DateType(self):
|
||||
now = datetime.datetime.now()
|
||||
date_type = DateType(now)
|
||||
self.assertEqual(date_type.my_timestamp(), now)
|
||||
self.assertRaises(ValueError, date_type.interpret_datestring, 'fakestring')
|
||||
|
||||
def test_write_read_string(self):
|
||||
with tempfile.TemporaryFile() as f:
|
||||
value = u'test'
|
||||
write_string(f, value)
|
||||
f.seek(0)
|
||||
self.assertEqual(read_string(f), value)
|
||||
|
||||
def test_write_read_longstring(self):
|
||||
with tempfile.TemporaryFile() as f:
|
||||
value = u'test'
|
||||
write_longstring(f, value)
|
||||
f.seek(0)
|
||||
self.assertEqual(read_longstring(f), value)
|
||||
|
||||
def test_write_read_stringmap(self):
|
||||
with tempfile.TemporaryFile() as f:
|
||||
value = {'key': 'value'}
|
||||
write_stringmap(f, value)
|
||||
f.seek(0)
|
||||
self.assertEqual(read_stringmap(f), value)
|
||||
|
||||
def test_write_read_inet(self):
|
||||
with tempfile.TemporaryFile() as f:
|
||||
value = ('192.168.1.1', 9042)
|
||||
write_inet(f, value)
|
||||
f.seek(0)
|
||||
self.assertEqual(read_inet(f), value)
|
||||
|
||||
with tempfile.TemporaryFile() as f:
|
||||
value = ('2001:db8:0:f101::1', 9042)
|
||||
write_inet(f, value)
|
||||
f.seek(0)
|
||||
self.assertEqual(read_inet(f), value)
|
||||
|
||||
def test_cql_quote(self):
|
||||
self.assertEqual(cql_quote(u'test'), "'test'")
|
||||
self.assertEqual(cql_quote('test'), "'test'")
|
||||
self.assertEqual(cql_quote(0), '0')
|
||||
|
Reference in New Issue
Block a user