diff --git a/tests/integration/datatype_utils.py b/tests/integration/datatype_utils.py index 66e6f9de..ce55efcd 100644 --- a/tests/integration/datatype_utils.py +++ b/tests/integration/datatype_utils.py @@ -39,10 +39,15 @@ DATA_TYPE_NON_PRIMITIVE_NAMES = [ 'list', 'set', 'map', + 'tuple' ] def get_sample_data(): + """ + Create a standard set of sample inputs for testing. + """ + sample_data = {} for datatype in DATA_TYPE_PRIMITIVES: @@ -103,4 +108,8 @@ def get_sample_data(): SAMPLE_DATA = get_sample_data() def get_sample(datatype): + """ + Helper method to access created sample data + """ + return SAMPLE_DATA[datatype] diff --git a/tests/integration/standard/test_types.py b/tests/integration/standard/test_types.py index 14c00a46..78d2c1b5 100644 --- a/tests/integration/standard/test_types.py +++ b/tests/integration/standard/test_types.py @@ -405,11 +405,17 @@ class TypeTests(unittest.TestCase): self.assertEqual(dt.utctimetuple(), result.utctimetuple()) def test_tuple_type(self): + """ + Basic test of tuple functionality + """ + if self._cass_version < (2, 1, 0): raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1") c = Cluster(protocol_version=PROTOCOL_VERSION) s = c.connect() + + # use this encoder in order to insert tuples s.encoders[tuple] = cql_encode_tuple s.execute("""CREATE KEYSPACE test_tuple_type @@ -429,6 +435,7 @@ class TypeTests(unittest.TestCase): result = s.execute("SELECT b FROM mytable WHERE a=1")[0] self.assertEqual(partial_result, result.b) + # test single value tuples subpartial = ('zoo',) subpartial_result = subpartial + (None, None) s.execute("INSERT INTO mytable (a, b) VALUES (2, %s)", parameters=(subpartial,)) @@ -447,11 +454,19 @@ class TypeTests(unittest.TestCase): self.assertEqual(subpartial_result, s.execute(prepared, (5,))[0].b) def test_tuple_type_varying_lengths(self): + """ + Test tuple types of lengths of 1, 2, 3, and 384 to ensure edge cases work + as expected. + """ + if self._cass_version < (2, 1, 0): raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1") c = Cluster(protocol_version=PROTOCOL_VERSION) s = c.connect() + + # set the row_factory to dict_factory for programmatic access + # set the encoder for tuples for the ability to write tuples s.row_factory = dict_factory s.encoders[tuple] = cql_encode_tuple @@ -459,12 +474,15 @@ class TypeTests(unittest.TestCase): WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}""") s.set_keyspace("test_tuple_type_varying_lengths") + # programmatically create the table with tuples of said sizes lengths = (1, 2, 3, 384) value_schema = [] for i in lengths: value_schema += [' v_%s tuple<%s>' % (i, ', '.join(['int'] * i))] s.execute("CREATE TABLE mytable (k int PRIMARY KEY, %s)" % (', '.join(value_schema),)) + # insert tuples into same key using different columns + # and verify the results for i in lengths: created_tuple = tuple(range(0, i)) @@ -474,6 +492,10 @@ class TypeTests(unittest.TestCase): self.assertEqual(tuple(created_tuple), result['v_%s' % i]) def test_tuple_subtypes(self): + """ + Ensure tuple subtypes are appropriately handled. + """ + if self._cass_version < (2, 1, 0): raise unittest.SkipTest("The tuple type was introduced in Cassandra 2.1") @@ -490,6 +512,8 @@ class TypeTests(unittest.TestCase): "v tuple<%s>)" % ','.join(DATA_TYPE_PRIMITIVES)) for i in range(len(DATA_TYPE_PRIMITIVES)): + # create tuples to be written and ensure they match with the expected response + # responses have trailing None values for every element that has not been written created_tuple = [get_sample(DATA_TYPE_PRIMITIVES[j]) for j in range(i + 1)] response_tuple = tuple(created_tuple + [None for j in range(len(DATA_TYPE_PRIMITIVES) - i - 1)]) written_tuple = tuple(created_tuple)