Test UDTs with different namedtuple variables

This commit is contained in:
Joaquin Casares
2014-07-25 16:00:05 -05:00
parent 0fc4dd3c7d
commit ef81fa602c

View File

@@ -292,7 +292,7 @@ class TypeTests(unittest.TestCase):
if self._cass_version < (2, 1, 0): if self._cass_version < (2, 1, 0):
raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1") raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1")
MAX_NESTING_DEPTH = 4 # TODO: Move to 128, or similar MAX_NESTING_DEPTH = 128
c = Cluster(protocol_version=PROTOCOL_VERSION) c = Cluster(protocol_version=PROTOCOL_VERSION)
s = c.connect() s = c.connect()
@@ -300,9 +300,9 @@ class TypeTests(unittest.TestCase):
# set the row_factory to dict_factory for programmatically accessing values # set the row_factory to dict_factory for programmatically accessing values
s.row_factory = dict_factory s.row_factory = dict_factory
s.execute("""CREATE KEYSPACE test_nested_unregistered_udts s.execute("""CREATE KEYSPACE test_nested_registered_udts
WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}""") WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}""")
s.set_keyspace("test_nested_unregistered_udts") s.set_keyspace("test_nested_registered_udts")
# create the seed udt # create the seed udt
s.execute("CREATE TYPE depth_0 (age int, name text)") s.execute("CREATE TYPE depth_0 (age int, name text)")
@@ -327,13 +327,13 @@ class TypeTests(unittest.TestCase):
# create and register the seed udt type # create and register the seed udt type
udt = namedtuple('depth_0', ('age', 'name')) udt = namedtuple('depth_0', ('age', 'name'))
udts.append(udt) udts.append(udt)
c.register_user_type("test_nested_unregistered_udts", "depth_0", udts[0]) c.register_user_type("test_nested_registered_udts", "depth_0", udts[0])
# create and register the nested udt types # create and register the nested udt types
for i in range(MAX_NESTING_DEPTH): for i in range(MAX_NESTING_DEPTH):
udt = namedtuple('depth_{}'.format(i + 1), ('value')) udt = namedtuple('depth_{}'.format(i + 1), ('value'))
udts.append(udt) udts.append(udt)
c.register_user_type("test_nested_unregistered_udts", "depth_{}".format(i + 1), udts[i + 1]) c.register_user_type("test_nested_registered_udts", "depth_{}".format(i + 1), udts[i + 1])
# verify inserts and reads # verify inserts and reads
for i in (0, 1, 2, 3, MAX_NESTING_DEPTH): for i in (0, 1, 2, 3, MAX_NESTING_DEPTH):
@@ -359,10 +359,69 @@ class TypeTests(unittest.TestCase):
""" """
Test for ensuring nested udts are handled correctly when the Test for ensuring nested udts are handled correctly when the
created namedtuples are use names that are different the cql type. created namedtuples are use names that are different the cql type.
Future improvement: optimize these three related tests using a single
helper method to cut down on code reuse.
""" """
# close copy to test_nested_registered_udts if self._cass_version < (2, 1, 0):
pass raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1")
MAX_NESTING_DEPTH = 128
c = Cluster(protocol_version=PROTOCOL_VERSION)
s = c.connect()
# set the row_factory to dict_factory for programmatically accessing values
s.row_factory = dict_factory
s.execute("""CREATE KEYSPACE different_namedtuples
WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}""")
s.set_keyspace("different_namedtuples")
# create the seed udt
s.execute("CREATE TYPE depth_0 (age int, name text)")
# create the nested udts
for i in range(MAX_NESTING_DEPTH):
s.execute("CREATE TYPE depth_{} (value depth_{})".format(i + 1, i))
# create a table with multiple sizes of nested udts
# no need for all nested types, only a spot checked few and the largest one
s.execute("CREATE TABLE mytable ("
"k int PRIMARY KEY, "
"v_0 depth_0, "
"v_1 depth_1, "
"v_2 depth_2, "
"v_3 depth_3, "
"v_{0} depth_{0})".format(MAX_NESTING_DEPTH))
# create the udt container
udts = []
# create and register the seed udt type
udt = namedtuple('level_0', ('age', 'name'))
udts.append(udt)
c.register_user_type("different_namedtuples", "depth_0", udts[0])
# create and register the nested udt types
for i in range(MAX_NESTING_DEPTH):
udt = namedtuple('level_{}'.format(i + 1), ('value'))
udts.append(udt)
c.register_user_type("different_namedtuples", "depth_{}".format(i + 1), udts[i + 1])
# verify inserts and reads
for i in (0, 1, 2, 3, MAX_NESTING_DEPTH):
# create udt
udt = self.nested_udt_helper(udts, i)
print udt
# write udt
s.execute("INSERT INTO mytable (k, v_%s) VALUES (0, %s)", (i, udt))
# verify udt was written and read correctly
result = s.execute("SELECT v_%s FROM mytable WHERE k=0", (i,))[0]
self.assertEqual(udt, result['v_%s' % i])
def test_non_existing_types(self): def test_non_existing_types(self):
c = Cluster(protocol_version=PROTOCOL_VERSION) c = Cluster(protocol_version=PROTOCOL_VERSION)