First round of test fixes
This commit is contained in:
@@ -38,7 +38,7 @@ def setup_package():
|
||||
cluster.clear()
|
||||
except Exception:
|
||||
log.debug("Creating new ccm test cluster")
|
||||
cluster = CCMCluster(path, CLUSTER_NAME, cassandra_version='1.2.6')
|
||||
cluster = CCMCluster(path, CLUSTER_NAME, cassandra_version='1.2.9')
|
||||
cluster.set_configuration_options({'start_native_transport': True})
|
||||
common.switch_cluster(path, CLUSTER_NAME)
|
||||
cluster.populate(3)
|
||||
|
||||
@@ -87,13 +87,14 @@ class ClusterTests(unittest.TestCase):
|
||||
Ensure that a cluster can be shutdown twice, without error
|
||||
"""
|
||||
|
||||
# DISCUSS: Should we allow this?
|
||||
|
||||
cluster = Cluster()
|
||||
cluster.shutdown()
|
||||
|
||||
# Shouldn't throw an error
|
||||
cluster.shutdown()
|
||||
try:
|
||||
cluster.shutdown()
|
||||
self.fail('A double cluster.shutdown() should throw an error.')
|
||||
except Exception as e:
|
||||
self.assertEqual(e.message, 'The Cluster was already shutdown')
|
||||
|
||||
def test_connect_to_already_shutdown_cluster(self):
|
||||
"""
|
||||
|
||||
@@ -118,7 +118,7 @@ class SchemaMetadataTest(unittest.TestCase):
|
||||
|
||||
meta = self.cluster.metadata
|
||||
self.assertNotEqual(meta.cluster_ref, None)
|
||||
# self.assertNotEqual(meta.cluster_name, None) # TODO needs to be fixed
|
||||
self.assertNotEqual(meta.cluster_name, None)
|
||||
self.assertTrue(self.ksname in meta.keyspaces)
|
||||
ksmeta = meta.keyspaces[self.ksname]
|
||||
|
||||
@@ -285,18 +285,21 @@ class TestCodeCoverage(unittest.TestCase):
|
||||
cluster = Cluster()
|
||||
cluster.connect()
|
||||
|
||||
# BUG: Does not work
|
||||
# print cluster.metadata.export_schema_as_string()
|
||||
# BUG: cassandra.metadata:KeyspaceMetadata.as_cql_query() fails when self.name == 'system'
|
||||
# because self.replication_strategy == None
|
||||
# self.assertTrue(isinstance(cluster.metadata.export_schema_as_string(), str))
|
||||
# Traceback (most recent call last):
|
||||
# File "/Users/joaquin/repos/python-driver/tests/integration/test_metadata.py", line 280, in test_export_schema
|
||||
# print cluster.metadata.export_schema_as_string()
|
||||
# File "/Users/joaquin/repos/python-driver/tests/integration/test_metadata.py", line 288, in test_export_schema
|
||||
# self.assertTrue(isinstance(cluster.metadata.export_schema_as_string(), str))
|
||||
# File "/Users/joaquin/repos/python-driver/cassandra/metadata.py", line 71, in export_schema_as_string
|
||||
# return "\n".join(ks.export_as_string() for ks in self.keyspaces.values())
|
||||
# File "/Users/joaquin/repos/python-driver/cassandra/metadata.py", line 71, in <genexpr>
|
||||
# return "\n".join(ks.export_as_string() for ks in self.keyspaces.values())
|
||||
# File "/Users/joaquin/repos/python-driver/cassandra/metadata.py", line 351, in export_as_string
|
||||
# File "/Users/joaquin/repos/python-driver/cassandra/metadata.py", line 438, in export_as_string
|
||||
# return "\n".join([self.as_cql_query()] + [t.as_cql_query() for t in self.tables.values()])
|
||||
# TypeError: sequence item 0: expected string, NoneType found
|
||||
# File "/Users/joaquin/repos/python-driver/cassandra/metadata.py", line 444, in as_cql_query
|
||||
# (self.name, self.replication_strategy.export_for_schema())
|
||||
# AttributeError: 'NoneType' object has no attribute 'export_for_schema'
|
||||
|
||||
def test_export_keyspace_schema(self):
|
||||
"""
|
||||
@@ -306,19 +309,11 @@ class TestCodeCoverage(unittest.TestCase):
|
||||
cluster = Cluster()
|
||||
cluster.connect()
|
||||
|
||||
# BUG: Attempting to check cassandra.metadata:350
|
||||
# print cluster.metadata.keyspaces.export_as_string()
|
||||
# Traceback (most recent call last):
|
||||
# File "/Users/joaquin/repos/python-driver/tests/integration/test_metadata.py", line 296, in test_export_keyspace_schema
|
||||
# print cluster.metadata.keyspaces.export_as_string()
|
||||
# AttributeError: 'dict' object has no attribute 'export_as_string'
|
||||
|
||||
# BUG: Attempting to check cassandra.metadata:353
|
||||
# print cluster.metadata.keyspaces.as_cql_query()
|
||||
# Traceback (most recent call last):
|
||||
# File "/Users/joaquin/repos/python-driver/tests/integration/test_metadata.py", line 305, in test_export_keyspace_schema
|
||||
# print cluster.metadata.keyspaces.as_cql_query()
|
||||
# AttributeError: 'dict' object has no attribute 'as_cql_query'
|
||||
for keyspace in cluster.metadata.keyspaces:
|
||||
if keyspace != 'system':
|
||||
keyspace_metadata = cluster.metadata.keyspaces[keyspace]
|
||||
self.assertTrue(isinstance(keyspace_metadata.export_as_string(), unicode))
|
||||
self.assertTrue(isinstance(keyspace_metadata.as_cql_query(), unicode))
|
||||
|
||||
def test_already_exists_exceptions(self):
|
||||
"""
|
||||
@@ -347,10 +342,13 @@ class TestCodeCoverage(unittest.TestCase):
|
||||
Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace
|
||||
"""
|
||||
cluster = Cluster()
|
||||
self.assertEqual(cluster.metadata.get_replicas('key'), [])
|
||||
self.assertEqual(cluster.metadata.get_replicas('test3rf', 'key'), [])
|
||||
|
||||
cluster.connect('test3rf')
|
||||
host = list(cluster.metadata.get_replicas('key'))[0]
|
||||
|
||||
# BUG: The next line fails
|
||||
self.assertNotEqual(list(cluster.metadata.get_replicas('test3rf', 'key')), [])
|
||||
host = list(cluster.metadata.get_replicas('test3rf', 'key'))[0]
|
||||
self.assertEqual(host.datacenter, 'datacenter1')
|
||||
self.assertEqual(host.rack, 'rack1')
|
||||
self.assertEqual(host.address, '127.0.0.2')
|
||||
@@ -364,14 +362,15 @@ class TestCodeCoverage(unittest.TestCase):
|
||||
cluster.connect('test3rf')
|
||||
ring = cluster.metadata.token_map.ring
|
||||
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas(ring[0]))[0].address, '127.0.0.1')
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas(ring[1]))[0].address, '127.0.0.2')
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas(ring[2]))[0].address, '127.0.0.3')
|
||||
# BUG: The next line fails
|
||||
self.assertNotEqual(list(cluster.metadata.token_map.get_replicas('test3rf', ring[0])), [])
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas('test3rf', ring[0]))[0].address, '127.0.0.1')
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas('test3rf', ring[1]))[0].address, '127.0.0.2')
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas('test3rf', ring[2]))[0].address, '127.0.0.3')
|
||||
|
||||
# BUG: I was specifically trying to ensure that tokens wrap around
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas(Murmur3Token(ring[0].value - 1)))[0].address, '127.0.0.3')
|
||||
# self.assertEqual(list(cluster.metadata.token_map.get_replicas(Murmur3Token(ring[1].value - 1)))[0].address, '127.0.0.1')
|
||||
# self.assertEqual(list(cluster.metadata.token_map.get_replicas(Murmur3Token(ring[2].value - 1)))[0].address, '127.0.0.2')
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas('test3rf', Murmur3Token(ring[0].value - 1)))[0].address, '127.0.0.3')
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas('test3rf', Murmur3Token(ring[1].value - 1)))[0].address, '127.0.0.1')
|
||||
self.assertEqual(list(cluster.metadata.token_map.get_replicas('test3rf', Murmur3Token(ring[2].value - 1)))[0].address, '127.0.0.2')
|
||||
|
||||
|
||||
class TokenMetadataTest(unittest.TestCase):
|
||||
|
||||
@@ -14,20 +14,27 @@ class MetricsTests(unittest.TestCase):
|
||||
Trigger and ensure connection_errors are counted
|
||||
"""
|
||||
|
||||
# DISCUSS: Doesn't trigger code coverage on cassandra.metrics.on_connection_error(). Find new example.
|
||||
cluster = Cluster(metrics_enabled=True)
|
||||
session = cluster.connect()
|
||||
|
||||
# Test write
|
||||
session.execute("USE test3rf")
|
||||
|
||||
# Test writes
|
||||
for i in range(0, 100):
|
||||
session.execute_async(
|
||||
"""
|
||||
INSERT INTO test3rf.test (k, v) VALUES (%s, %s)
|
||||
""" % (i, i))
|
||||
|
||||
# Force kill cluster
|
||||
get_cluster().stop(wait=True, gently=False)
|
||||
try:
|
||||
# Ensure the nodes are actually down
|
||||
self.assertRaises(NoHostAvailable, session.execute, "USE test3rf")
|
||||
finally:
|
||||
get_cluster().start(wait_for_binary_proto=True)
|
||||
|
||||
self.assertGreater(cluster.metrics.stats.connection_errors, 0)
|
||||
|
||||
def test_write_timeout(self):
|
||||
"""
|
||||
Trigger and ensure write_timeouts are counted
|
||||
|
||||
@@ -4,22 +4,21 @@ from cassandra.cluster import Cluster
|
||||
|
||||
|
||||
class QueryTest(unittest.TestCase):
|
||||
# TODO: Cover routing keys
|
||||
# def test_query(self):
|
||||
# cluster = Cluster()
|
||||
# session = cluster.connect()
|
||||
#
|
||||
# prepared = session.prepare(
|
||||
# """
|
||||
# INSERT INTO test3rf.test (k, v) VALUES (?, ?)
|
||||
# """)
|
||||
#
|
||||
# self.assertIsInstance(prepared, PreparedStatement)
|
||||
# bound = prepared.bind((1, None))
|
||||
# self.assertIsInstance(bound, BoundStatement)
|
||||
# session.execute(bound)
|
||||
#
|
||||
# print bound.routing_key
|
||||
def test_query(self):
|
||||
cluster = Cluster()
|
||||
session = cluster.connect()
|
||||
|
||||
prepared = session.prepare(
|
||||
"""
|
||||
INSERT INTO test3rf.test (k, v) VALUES (?, ?)
|
||||
""")
|
||||
|
||||
self.assertIsInstance(prepared, PreparedStatement)
|
||||
bound = prepared.bind((1, None))
|
||||
self.assertIsInstance(bound, BoundStatement)
|
||||
session.execute(bound)
|
||||
self.assertEqual(bound.routing_key, '\x00\x00\x00\x01')
|
||||
|
||||
|
||||
def test_value_sequence(self):
|
||||
"""
|
||||
@@ -43,3 +42,5 @@ class QueryTest(unittest.TestCase):
|
||||
|
||||
# Ensure this does not throw an exception
|
||||
str(statement.trace)
|
||||
for event in statement.trace.events:
|
||||
str(event)
|
||||
|
||||
@@ -17,9 +17,8 @@ class TestMetadata(unittest.TestCase):
|
||||
self.assertEqual(table_metadata.protect_name('test\'s'), '"test\'s"')
|
||||
self.assertEqual(table_metadata.protect_name('test\'s'), "\"test's\"")
|
||||
self.assertEqual(table_metadata.protect_name('tests ?!@#$%^&*()'), '"tests ?!@#$%^&*()"')
|
||||
|
||||
# BUG: Or is this fine?
|
||||
self.assertEqual(table_metadata.protect_name('1'), '"1"')
|
||||
self.assertEqual(table_metadata.protect_name('1test'), '"1test"')
|
||||
|
||||
def test_protect_names(self):
|
||||
"""
|
||||
@@ -50,14 +49,12 @@ class TestMetadata(unittest.TestCase):
|
||||
|
||||
table_metadata = TableMetadata('ks_name', 'table_name')
|
||||
|
||||
self.assertEqual(table_metadata.protect_value(True), "'true'")
|
||||
self.assertEqual(table_metadata.protect_value(False), "'false'")
|
||||
self.assertEqual(table_metadata.protect_value(3.14), '3.140000')
|
||||
self.assertEqual(table_metadata.protect_value(True), "True")
|
||||
self.assertEqual(table_metadata.protect_value(False), "False")
|
||||
self.assertEqual(table_metadata.protect_value(3.14), '3.14')
|
||||
self.assertEqual(table_metadata.protect_value(3), '3')
|
||||
self.assertEqual(table_metadata.protect_value('test'), "'test'")
|
||||
self.assertEqual(table_metadata.protect_value('test\'s'), "'test''s'")
|
||||
|
||||
# BUG: Do we remove this altogether now?
|
||||
self.assertEqual(table_metadata.protect_value(None), 'NULL')
|
||||
|
||||
def test_is_valid_name(self):
|
||||
@@ -89,13 +86,16 @@ class TestMetadata(unittest.TestCase):
|
||||
self.assertEqual(murmur3_token.hash_fn(str(cassandra.metadata.MAX_LONG)), 7162290910810015547)
|
||||
|
||||
md5_token = MD5Token(cassandra.metadata.MIN_LONG - 1)
|
||||
# BUG: MD5Token always returns the same token
|
||||
# self.assertNotEqual(md5_token.hash_fn('123'), 110673303387115207421586718101067225896)
|
||||
# self.assertNotEqual(md5_token.hash_fn(str(cassandra.metadata.MAX_LONG)), 110673303387115207421586718101067225896)
|
||||
self.assertEqual(md5_token.hash_fn('123'), 42767516990368493138776584305024125808L)
|
||||
self.assertEqual(md5_token.hash_fn(str(cassandra.metadata.MAX_LONG)), 28528976619278518853815276204542453639L)
|
||||
|
||||
bytes_token = BytesToken(cassandra.metadata.MIN_LONG - 1)
|
||||
bytes_token = BytesToken(str(cassandra.metadata.MIN_LONG - 1))
|
||||
self.assertEqual(bytes_token.hash_fn('123'), '123')
|
||||
self.assertEqual(bytes_token.hash_fn(123), 123)
|
||||
self.assertEqual(bytes_token.hash_fn(str(cassandra.metadata.MAX_LONG)), str(cassandra.metadata.MAX_LONG))
|
||||
|
||||
# BUG? Should only accept strings?
|
||||
self.assertEqual(bytes_token.hash_fn(123), '123')
|
||||
try:
|
||||
bytes_token = BytesToken(cassandra.metadata.MIN_LONG - 1)
|
||||
self.fail('Tokens for ByteOrderedPartitioner should be only strings')
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
@@ -276,13 +276,6 @@ class TokenAwarePolicyTest(unittest.TestCase):
|
||||
self.assertEquals(replicas, qplan[:2])
|
||||
self.assertEquals(other, set(qplan[2:]))
|
||||
|
||||
# Should use the secondary policy
|
||||
for i in range(4):
|
||||
query = Query()
|
||||
qplan = list(policy.make_query_plan(query))
|
||||
|
||||
self.assertEquals(set(qplan), set(hosts))
|
||||
|
||||
# Should use the secondary policy
|
||||
for i in range(4):
|
||||
qplan = list(policy.make_query_plan())
|
||||
@@ -347,10 +340,7 @@ class SimpleConvictionPolicyTest(unittest.TestCase):
|
||||
"""
|
||||
|
||||
conviction_policy = SimpleConvictionPolicy(1)
|
||||
|
||||
# DISCUSS: Always return True?
|
||||
self.assertEqual(conviction_policy.add_failure(1), True)
|
||||
|
||||
self.assertEqual(conviction_policy.reset(), None)
|
||||
|
||||
|
||||
@@ -393,9 +383,12 @@ class ConstantReconnectionPolicyTest(unittest.TestCase):
|
||||
|
||||
delay = 2
|
||||
max_attempts = -100
|
||||
policy = ConstantReconnectionPolicy(delay=delay, max_attempts=max_attempts)
|
||||
schedule = list(policy.new_schedule())
|
||||
self.assertEqual(len(schedule), 0)
|
||||
|
||||
try:
|
||||
policy = ConstantReconnectionPolicy(delay=delay, max_attempts=max_attempts)
|
||||
self.fail('max_attempts should throw ValueError when negative')
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
class ExponentialReconnectionPolicyTest(unittest.TestCase):
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import unittest
|
||||
import datetime
|
||||
import cassandra
|
||||
from cassandra.cqltypes import lookup_cqltype, CassandraType, BooleanType, lookup_casstype_simple, lookup_casstype, \
|
||||
from cassandra.cqltypes import CassandraType, BooleanType, lookup_casstype_simple, lookup_casstype, \
|
||||
AsciiType, LongType, DecimalType, DoubleType, FloatType, Int32Type, UTF8Type, IntegerType, SetType, cql_typename
|
||||
|
||||
from cassandra.cluster import Cluster
|
||||
@@ -28,10 +28,6 @@ class TypeTests(unittest.TestCase):
|
||||
self.assertEqual(lookup_casstype_simple('DateType'), cassandra.cqltypes.DateType)
|
||||
self.assertEqual(lookup_casstype_simple('TimeUUIDType'), cassandra.cqltypes.TimeUUIDType)
|
||||
self.assertEqual(lookup_casstype_simple('UUIDType'), cassandra.cqltypes.UUIDType)
|
||||
|
||||
# DISCUSS: varchar is not valid?
|
||||
# self.assertEqual(lookup_casstype_simple('varchar'), cassandra.cqltypes.UTF8Type)
|
||||
|
||||
self.assertEqual(lookup_casstype_simple('IntegerType'), cassandra.cqltypes.IntegerType)
|
||||
self.assertEqual(lookup_casstype_simple('MapType'), cassandra.cqltypes.MapType)
|
||||
self.assertEqual(lookup_casstype_simple('ListType'), cassandra.cqltypes.ListType)
|
||||
@@ -61,10 +57,6 @@ class TypeTests(unittest.TestCase):
|
||||
self.assertEqual(lookup_casstype('DateType'), cassandra.cqltypes.DateType)
|
||||
self.assertEqual(lookup_casstype('TimeUUIDType'), cassandra.cqltypes.TimeUUIDType)
|
||||
self.assertEqual(lookup_casstype('UUIDType'), cassandra.cqltypes.UUIDType)
|
||||
|
||||
# DISCUSS: varchar is not valid?
|
||||
# self.assertEqual(lookup_casstype('varchar'), cassandra.cqltypes.UTF8Type)
|
||||
|
||||
self.assertEqual(lookup_casstype('IntegerType'), cassandra.cqltypes.IntegerType)
|
||||
self.assertEqual(lookup_casstype('MapType'), cassandra.cqltypes.MapType)
|
||||
self.assertEqual(lookup_casstype('ListType'), cassandra.cqltypes.ListType)
|
||||
@@ -77,184 +69,11 @@ class TypeTests(unittest.TestCase):
|
||||
|
||||
self.assertRaises(ValueError, lookup_casstype, 'AsciiType~')
|
||||
|
||||
# DISCUSS: Figure out if other tests are needed
|
||||
# TODO: Do a few more tests
|
||||
# "I would say some parameterized and nested types would be good to test,
|
||||
# like "MapType(AsciiType, IntegerType)" and "ReversedType(AsciiType)"
|
||||
self.assertEqual(str(lookup_casstype(BooleanType(True))), str(BooleanType(True)))
|
||||
|
||||
|
||||
def test_lookup_cqltype(self):
|
||||
"""
|
||||
Ensure lookup_cqltype returns the correct class
|
||||
"""
|
||||
|
||||
self.assertEqual(lookup_cqltype('ascii'), cassandra.cqltypes.AsciiType)
|
||||
self.assertEqual(lookup_cqltype('bigint'), cassandra.cqltypes.LongType)
|
||||
self.assertEqual(lookup_cqltype('blob'), cassandra.cqltypes.BytesType)
|
||||
self.assertEqual(lookup_cqltype('boolean'), cassandra.cqltypes.BooleanType)
|
||||
self.assertEqual(lookup_cqltype('counter'), cassandra.cqltypes.CounterColumnType)
|
||||
self.assertEqual(lookup_cqltype('decimal'), cassandra.cqltypes.DecimalType)
|
||||
self.assertEqual(lookup_cqltype('double'), cassandra.cqltypes.DoubleType)
|
||||
self.assertEqual(lookup_cqltype('float'), cassandra.cqltypes.FloatType)
|
||||
self.assertEqual(lookup_cqltype('inet'), cassandra.cqltypes.InetAddressType)
|
||||
self.assertEqual(lookup_cqltype('int'), cassandra.cqltypes.Int32Type)
|
||||
self.assertEqual(lookup_cqltype('text'), cassandra.cqltypes.UTF8Type)
|
||||
self.assertEqual(lookup_cqltype('timestamp'), cassandra.cqltypes.DateType)
|
||||
self.assertEqual(lookup_cqltype('timeuuid'), cassandra.cqltypes.TimeUUIDType)
|
||||
self.assertEqual(lookup_cqltype('uuid'), cassandra.cqltypes.UUIDType)
|
||||
self.assertEqual(lookup_cqltype('varchar'), cassandra.cqltypes.UTF8Type)
|
||||
self.assertEqual(lookup_cqltype('varint'), cassandra.cqltypes.IntegerType)
|
||||
|
||||
|
||||
self.assertEqual(str(lookup_cqltype('list<ascii>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.AsciiType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<bigint>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.LongType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<blob>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.BytesType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<boolean>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.BooleanType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<counter>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.CounterColumnType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<decimal>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.DecimalType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<double>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.DoubleType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<float>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.FloatType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<inet>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.InetAddressType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<int>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.Int32Type)))
|
||||
self.assertEqual(str(lookup_cqltype('list<text>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.UTF8Type)))
|
||||
self.assertEqual(str(lookup_cqltype('list<timestamp>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.DateType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<timeuuid>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.TimeUUIDType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<uuid>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.UUIDType)))
|
||||
self.assertEqual(str(lookup_cqltype('list<varchar>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.UTF8Type)))
|
||||
self.assertEqual(str(lookup_cqltype('list<varint>')),
|
||||
str(cassandra.cqltypes.ListType.apply_parameters(cassandra.cqltypes.IntegerType)))
|
||||
|
||||
|
||||
self.assertEqual(str(lookup_cqltype('set<ascii>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.AsciiType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<bigint>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.LongType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<blob>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.BytesType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<boolean>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.BooleanType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<counter>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.CounterColumnType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<decimal>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.DecimalType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<double>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.DoubleType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<float>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.FloatType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<inet>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.InetAddressType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<int>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.Int32Type)))
|
||||
self.assertEqual(str(lookup_cqltype('set<text>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.UTF8Type)))
|
||||
self.assertEqual(str(lookup_cqltype('set<timestamp>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.DateType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<timeuuid>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.TimeUUIDType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<uuid>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.UUIDType)))
|
||||
self.assertEqual(str(lookup_cqltype('set<varchar>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.UTF8Type)))
|
||||
self.assertEqual(str(lookup_cqltype('set<varint>')),
|
||||
str(cassandra.cqltypes.SetType.apply_parameters(cassandra.cqltypes.IntegerType)))
|
||||
|
||||
|
||||
self.assertEqual(str(lookup_cqltype('map<ascii, ascii>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.AsciiType,
|
||||
cassandra.cqltypes.AsciiType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<bigint, bigint>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.LongType,
|
||||
cassandra.cqltypes.LongType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<blob, blob>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.BytesType,
|
||||
cassandra.cqltypes.BytesType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<boolean, boolean>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.BooleanType,
|
||||
cassandra.cqltypes.BooleanType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<counter, counter>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.CounterColumnType,
|
||||
cassandra.cqltypes.CounterColumnType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<decimal, decimal>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.DecimalType,
|
||||
cassandra.cqltypes.DecimalType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<double, double>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.DoubleType,
|
||||
cassandra.cqltypes.DoubleType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<float, float>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.FloatType,
|
||||
cassandra.cqltypes.FloatType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<inet, inet>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.InetAddressType,
|
||||
cassandra.cqltypes.InetAddressType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<int, int>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.Int32Type,
|
||||
cassandra.cqltypes.Int32Type)))
|
||||
self.assertEqual(str(lookup_cqltype('map<text, text>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.UTF8Type,
|
||||
cassandra.cqltypes.UTF8Type)))
|
||||
self.assertEqual(str(lookup_cqltype('map<timestamp, timestamp>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.DateType,
|
||||
cassandra.cqltypes.DateType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<timeuuid, timeuuid>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.TimeUUIDType,
|
||||
cassandra.cqltypes.TimeUUIDType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<uuid, uuid>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.UUIDType,
|
||||
cassandra.cqltypes.UUIDType)))
|
||||
self.assertEqual(str(lookup_cqltype('map<varchar, varchar>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.UTF8Type,
|
||||
cassandra.cqltypes.UTF8Type)))
|
||||
self.assertEqual(str(lookup_cqltype('map<varint, varint>')),
|
||||
str(cassandra.cqltypes.MapType.apply_parameters(cassandra.cqltypes.IntegerType,
|
||||
cassandra.cqltypes.IntegerType)))
|
||||
|
||||
# DISCUSS: Figure out if other tests are needed, and how to test them
|
||||
# self.assertEqual(str(lookup_cqltype(AsciiType(CassandraType('asdf')))), str(AsciiType(CassandraType('asdf'))))
|
||||
# self.assertEqual(str(lookup_cqltype(LongType(CassandraType(1234)))), str(LongType(CassandraType(1234))))
|
||||
# self.assertEqual(str(lookup_cqltype(BytesType(CassandraType(True)))), str(BytesType(CassandraType(True))))
|
||||
self.assertEqual(str(lookup_cqltype(BooleanType(CassandraType(True)))), str(BooleanType(CassandraType(True))))
|
||||
# self.assertEqual(str(lookup_cqltype(CounterColumnType(CassandraType(True)))), str(CounterColumnType(CassandraType(True))))
|
||||
# self.assertEqual(str(lookup_cqltype(DecimalType(CassandraType(1234.1234)))), str(DecimalType(CassandraType(1234.1234))))
|
||||
# self.assertEqual(str(lookup_cqltype(DoubleType(CassandraType(1234.1234)))), str(DoubleType(CassandraType(1234.1234))))
|
||||
# self.assertEqual(str(lookup_cqltype(FloatType(CassandraType(1234.1234)))), str(FloatType(CassandraType(1234.1234))))
|
||||
# self.assertEqual(str(lookup_cqltype(InetAddressType(CassandraType(True)))), str(InetAddressType(CassandraType(True))))
|
||||
# self.assertEqual(str(lookup_cqltype(Int32Type(CassandraType(1234)))), str(Int32Type(CassandraType(1234))))
|
||||
# self.assertEqual(str(lookup_cqltype(UTF8Type(CassandraType('asdf')))), str(UTF8Type(CassandraType('asdf'))))
|
||||
# self.assertEqual(str(lookup_cqltype(DateType(CassandraType(True)))), str(DateType(CassandraType(True))))
|
||||
# self.assertEqual(str(lookup_cqltype(TimeUUIDType(CassandraType(True)))), str(TimeUUIDType(CassandraType(True))))
|
||||
# self.assertEqual(str(lookup_cqltype(UUIDType(CassandraType(True)))), str(UUIDType(CassandraType(True))))
|
||||
# self.assertEqual(str(lookup_cqltype(IntegerType(CassandraType(1234)))), str(IntegerType(CassandraType(1234))))
|
||||
|
||||
# DISCUSS: Check if typo in code, or misunderstanding
|
||||
# self.assertEqual(lookup_cqltype("'ascii'"), cassandra.cqltypes.AsciiType)
|
||||
# self.assertEqual(lookup_cqltype("'bigint'"), cassandra.cqltypes.LongType)
|
||||
# self.assertEqual(lookup_cqltype("'blob'"), cassandra.cqltypes.BytesType)
|
||||
# self.assertEqual(lookup_cqltype("'boolean'"), cassandra.cqltypes.BooleanType)
|
||||
# self.assertEqual(lookup_cqltype("'counter'"), cassandra.cqltypes.CounterColumnType)
|
||||
# self.assertEqual(lookup_cqltype("'decimal'"), cassandra.cqltypes.DecimalType)
|
||||
# self.assertEqual(lookup_cqltype("'float'"), cassandra.cqltypes.FloatType)
|
||||
# self.assertEqual(lookup_cqltype("'inet'"), cassandra.cqltypes.InetAddressType)
|
||||
# self.assertEqual(lookup_cqltype("'int'"), cassandra.cqltypes.Int32Type)
|
||||
# self.assertEqual(lookup_cqltype("'text'"), cassandra.cqltypes.UTF8Type)
|
||||
# self.assertEqual(lookup_cqltype("'timestamp'"), cassandra.cqltypes.DateType)
|
||||
# self.assertEqual(lookup_cqltype("'timeuuid'"), cassandra.cqltypes.TimeUUIDType)
|
||||
# self.assertEqual(lookup_cqltype("'uuid'"), cassandra.cqltypes.UUIDType)
|
||||
# self.assertEqual(lookup_cqltype("'varchar'"), cassandra.cqltypes.UTF8Type)
|
||||
# self.assertEqual(lookup_cqltype("'varint'"), cassandra.cqltypes.IntegerType)
|
||||
|
||||
def test_cassandratype(self):
|
||||
"""
|
||||
Smoke test cass_parameterized_type_with
|
||||
|
||||
Reference in New Issue
Block a user