From ed842e9440826d38a51e25c4fe66d3a699fee69d Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 27 Dec 2013 17:51:53 -0600 Subject: [PATCH] Consistency test cleanup --- tests/integration/long/test_consistency.py | 730 +++++---------------- tests/integration/long/utils.py | 66 +- 2 files changed, 179 insertions(+), 617 deletions(-) diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index 0f7306d6..bf8646a8 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -1,10 +1,13 @@ +import struct import traceback -import cassandra +import cassandra from cassandra import ConsistencyLevel from cassandra.cluster import Cluster from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, \ DowngradingConsistencyRetryPolicy +from cassandra.query import SimpleStatement + from tests.integration.long.utils import force_stop, create_schema, \ wait_for_down, wait_for_up, start, CoordinatorStats @@ -13,295 +16,133 @@ try: except ImportError: import unittest # noqa +ALL_CONSISTENCY_LEVELS = set([ + ConsistencyLevel.ANY, ConsistencyLevel.ONE, ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, ConsistencyLevel.THREE, + ConsistencyLevel.ALL, ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]) + +MULTI_DC_CONSISTENCY_LEVELS = set([ + ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.EACH_QUORUM]) + +SINGLE_DC_CONSISTENCY_LEVELS = ALL_CONSISTENCY_LEVELS - MULTI_DC_CONSISTENCY_LEVELS + class ConsistencyTests(unittest.TestCase): def setUp(self): - self.cs = CoordinatorStats() + self.coordinator_stats = CoordinatorStats() def _cl_failure(self, consistency_level, e): - self.fail('%s seen for CL.%s:\n\n%s' % ( - type(e), ConsistencyLevel.value_to_name[consistency_level], + self.fail('Instead of success, saw %s for CL.%s:\n\n%s' % ( + e, ConsistencyLevel.value_to_name[consistency_level], traceback.format_exc())) def _cl_expected_failure(self, cl): self.fail('Test passed at ConsistencyLevel.%s:\n\n%s' % ( ConsistencyLevel.value_to_name[cl], traceback.format_exc())) + def _insert(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE): + session.execute('USE %s' % keyspace) + for i in range(count): + ss = SimpleStatement('INSERT INTO cf(k, i) VALUES (0, 0)', + consistency_level=consistency_level) + session.execute(ss) - def test_rfone_tokenaware(self): - keyspace = 'test_rfone_tokenaware' + def _query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE): + routing_key = struct.pack('>i', 0) + for i in range(count): + ss = SimpleStatement('SELECT * FROM cf WHERE k = 0', + consistency_level=consistency_level, + routing_key=routing_key) + self.coordinator_stats.add_coordinator(session.execute_async(ss)) + + def _assert_writes_succeed(self, session, keyspace, consistency_levels): + for cl in consistency_levels: + self.coordinator_stats.reset_counts() + try: + self._insert(session, keyspace, 1, cl) + except Exception as e: + self._cl_failure(cl, e) + + def _assert_reads_succeed(self, session, keyspace, consistency_levels, expected_reader=3): + for cl in consistency_levels: + self.coordinator_stats.reset_counts() + try: + self._query(session, keyspace, 1, cl) + for i in range(3): + if i == expected_reader: + self.coordinator_stats.assert_query_count_equals(self, i, 1) + else: + self.coordinator_stats.assert_query_count_equals(self, i, 0) + except Exception as e: + self._cl_failure(cl, e) + + def _assert_writes_fail(self, session, keyspace, consistency_levels): + for cl in consistency_levels: + self.coordinator_stats.reset_counts() + try: + self._insert(session, keyspace, 1, cl) + self._cl_expected_failure(cl) + except (cassandra.Unavailable, cassandra.WriteTimeout): + pass + + def _assert_reads_fail(self, session, keyspace, consistency_levels): + for cl in consistency_levels: + self.coordinator_stats.reset_counts() + try: + self._query(session, keyspace, 1, cl) + self._cl_expected_failure(cl) + except (cassandra.Unavailable, cassandra.ReadTimeout): + pass + + def _test_tokenaware_one_node_down(self, keyspace, rf, accepted): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() wait_for_up(cluster, 1, wait=False) wait_for_up(cluster, 2) - create_schema(session, keyspace, replication_factor=1) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) + create_schema(session, keyspace, replication_factor=rf) + self._insert(session, keyspace, count=1) + self._query(session, keyspace, count=1) + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 1) + self.coordinator_stats.assert_query_count_equals(self, 3, 0) try: - self.cs.reset_coordinators() force_stop(2) wait_for_down(cluster, 2) - accepted_list = [ConsistencyLevel.ANY] - - fail_list = [ - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except (cassandra.Unavailable, cassandra.WriteTimeout) as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) + self._assert_writes_succeed(session, keyspace, accepted) + self._assert_reads_succeed(session, keyspace, + accepted - set([ConsistencyLevel.ANY])) + self._assert_writes_fail(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - accepted) + self._assert_reads_fail(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - accepted) finally: start(2) wait_for_up(cluster, 2) + def test_rfone_tokenaware_one_node_down(self): + self._test_tokenaware_one_node_down( + keyspace='test_rfone_tokenaware', + rf=1, + accepted=set([ConsistencyLevel.ANY])) - def test_rftwo_tokenaware(self): - keyspace = 'test_rftwo_tokenaware' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2) - - create_schema(session, keyspace, replication_factor=2) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - - try: - self.cs.reset_coordinators() - force_stop(2) - wait_for_down(cluster, 2) - - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE - ] - - fail_list = [ - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - finally: - start(2) - wait_for_up(cluster, 2) - - def test_rfthree_tokenaware(self): - keyspace = 'test_rfthree_tokenaware' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) - session = cluster.connect() - - create_schema(session, keyspace, replication_factor=3) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - - try: - self.cs.reset_coordinators() - force_stop(2) - wait_for_down(cluster, 2) - - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM - ] - - fail_list = [ - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - finally: - start(2) - wait_for_up(cluster, 2) + def test_rftwo_tokenaware_one_node_down(self): + self._test_tokenaware_one_node_down( + keyspace='test_rftwo_tokenaware', + rf=2, + accepted=set([ConsistencyLevel.ANY, ConsistencyLevel.ONE])) + def test_rfthree_tokenaware_one_node_down(self): + self._test_tokenaware_one_node_down( + keyspace='test_rfthree_tokenaware', + rf=3, + accepted=set([ConsistencyLevel.ANY, ConsistencyLevel.ONE, + ConsistencyLevel.TWO, ConsistencyLevel.QUORUM])) def test_rfthree_tokenaware_none_down(self): keyspace = 'test_rfthree_tokenaware_none_down' @@ -312,256 +153,58 @@ class ConsistencyTests(unittest.TestCase): wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=3) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) + self._insert(session, keyspace, count=1) + self._query(session, keyspace, count=1) + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 1) + self.coordinator_stats.assert_query_count_equals(self, 3, 0) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) + self.coordinator_stats.reset_counts() - self.cs.reset_coordinators() + self._assert_writes_succeed(session, keyspace, SINGLE_DC_CONSISTENCY_LEVELS) + self._assert_reads_succeed(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - set([ConsistencyLevel.ANY]), + expected_reader=2) - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ] + def _test_downgrading_cl(self, keyspace, rf, accepted): + cluster = Cluster( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), + default_retry_policy=DowngradingConsistencyRetryPolicy()) + session = cluster.connect() - fail_list = [ - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] + create_schema(session, keyspace, replication_factor=rf) + self._insert(session, keyspace, 1) + self._query(session, keyspace, 1) + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 1) + self.coordinator_stats.assert_query_count_equals(self, 3, 0) - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) + try: + force_stop(2) + wait_for_down(cluster, 2) + self._assert_writes_succeed(session, keyspace, accepted) + self._assert_reads_succeed(session, keyspace, + accepted - set([ConsistencyLevel.ANY])) + self._assert_writes_fail(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - accepted) + self._assert_reads_fail(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - accepted) + finally: + start(2) + wait_for_up(cluster, 2) def test_rfone_downgradingcl(self): - keyspace = 'test_rfone_downgradingcl' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - default_retry_policy=DowngradingConsistencyRetryPolicy()) - session = cluster.connect() - - create_schema(session, keyspace, replication_factor=1) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - - try: - self.cs.reset_coordinators() - force_stop(2) - wait_for_down(cluster, 2) - - accepted_list = [ - ConsistencyLevel.ANY - ] - - fail_list = [ - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - finally: - start(2) - wait_for_up(cluster, 2) - + self._test_downgrading_cl( + keyspace='test_rfone_downgradingcl', + rf=1, + accepted=set([ConsistencyLevel.ANY])) def test_rftwo_downgradingcl(self): - keyspace = 'test_rftwo_downgradingcl' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - default_retry_policy=DowngradingConsistencyRetryPolicy()) - session = cluster.connect() - - create_schema(session, keyspace, replication_factor=2) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - - try: - self.cs.reset_coordinators() - force_stop(2) - wait_for_down(cluster, 2) - - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL - ] - - fail_list = [ - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - finally: - start(2) - wait_for_up(cluster, 2) - + self._test_downgrading_cl( + keyspace='test_rftwo_downgradingcl', + rf=2, + accepted=SINGLE_DC_CONSISTENCY_LEVELS) def test_rfthree_roundrobin_downgradingcl(self): keyspace = 'test_rfthree_roundrobin_downgradingcl' @@ -581,82 +224,37 @@ class ConsistencyTests(unittest.TestCase): session = cluster.connect() create_schema(session, keyspace, replication_factor=2) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) + self._insert(session, keyspace, count=12) + self._query(session, keyspace, count=12) if roundrobin: - self.cs.assert_queried(self, 1, 4) - self.cs.assert_queried(self, 2, 4) - self.cs.assert_queried(self, 3, 4) + self.coordinator_stats.assert_query_count_equals(self, 1, 4) + self.coordinator_stats.assert_query_count_equals(self, 2, 4) + self.coordinator_stats.assert_query_count_equals(self, 3, 4) else: - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 12) + self.coordinator_stats.assert_query_count_equals(self, 3, 0) try: - self.cs.reset_coordinators() + self.coordinator_stats.reset_counts() force_stop(2) wait_for_down(cluster, 2) - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL - ] - - fail_list = [ - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) + self._assert_writes_succeed(session, keyspace, SINGLE_DC_CONSISTENCY_LEVELS) # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - if roundrobin: - self.cs.assert_queried(self, 1, 6) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 6) - else: - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) + for cl in SINGLE_DC_CONSISTENCY_LEVELS - set([ConsistencyLevel.ANY]): + self.coordinator_stats.reset_counts() + self._query(session, keyspace, 12, consistency_level=cl) + if roundrobin: + self.coordinator_stats.assert_query_count_equals(self, 1, 6) + self.coordinator_stats.assert_query_count_equals(self, 2, 0) + self.coordinator_stats.assert_query_count_equals(self, 3, 6) + else: + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 0) + self.coordinator_stats.assert_query_count_equals(self, 3, 12) finally: start(2) wait_for_up(cluster, 2) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index f132531a..e8e4ca44 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -1,11 +1,8 @@ import logging -import struct import time from collections import defaultdict -from cassandra.query import SimpleStatement -from cassandra import ConsistencyLevel from tests.integration import get_node @@ -13,59 +10,26 @@ log = logging.getLogger(__name__) class CoordinatorStats(): + def __init__(self): - self.coordinators = defaultdict(int) + self.coordinator_counts = defaultdict(int) def add_coordinator(self, future): coordinator = future._current_host.address - self.coordinators[coordinator] += 1 + self.coordinator_counts[coordinator] += 1 if future._errors: - log.error('future._errors: %s' % future._errors) + log.error('future._errors: %s', future._errors) future.result() + def reset_counts(self): + self.coordinator_counts = defaultdict(int) - def reset_coordinators(self): - self.coordinators = defaultdict(int) - - - def get_queried(self, node): - ip = '127.0.0.%s' % node - if not ip in self.coordinators: - return 0 - return self.coordinators[ip] - - - def assert_queried(self, testcase, node, n): - ip = '127.0.0.%s' % node - if ip in self.coordinators: - if self.coordinators[ip] == n: - return - testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( - ip, n, self.coordinators[ip], self.coordinators)) - else: - if n == 0: - return - testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( - ip, n, 0, self.coordinators)) - - - def init(self, session, keyspace, n, consistency_level=ConsistencyLevel.ONE): - self.reset_coordinators() - session.execute('USE %s' % keyspace) - for i in range(n): - ss = SimpleStatement('INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf', - consistency_level=consistency_level) - session.execute(ss) - - - def query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE): - routing_key = struct.pack('>i', 0) - for i in range(count): - ss = SimpleStatement('SELECT * FROM %s WHERE k = 0' % 'cf', - consistency_level=consistency_level, - routing_key=routing_key) - self.add_coordinator(session.execute_async(ss)) + def assert_query_count_equals(self, testcase, node, expected): + ip = '127.0.0.%d' % node + if self.coordinator_counts[ip] != expected: + testcase.fail('Expected %d queries to %s, but got %d. Query counts: %s' % ( + expected, ip, self.coordinator_counts[ip], dict(self.coordinator_counts))) def create_schema(session, keyspace, simple_strategy=True, @@ -75,22 +39,22 @@ def create_schema(session, keyspace, simple_strategy=True, 'SELECT keyspace_name FROM system.schema_keyspaces') existing_keyspaces = [row[0] for row in results] if keyspace in existing_keyspaces: - session.execute('DROP KEYSPACE %s' % keyspace) + session.execute('DROP KEYSPACE %s' % keyspace, timeout=10) if simple_strategy: ddl = "CREATE KEYSPACE %s WITH replication" \ " = {'class': 'SimpleStrategy', 'replication_factor': '%s'}" - session.execute(ddl % (keyspace, replication_factor)) + session.execute(ddl % (keyspace, replication_factor), timeout=10) else: if not replication_strategy: raise Exception('replication_strategy is not set') ddl = "CREATE KEYSPACE %s" \ " WITH replication = { 'class' : 'NetworkTopologyStrategy', %s }" - session.execute(ddl % (keyspace, str(replication_strategy)[1:-1])) + session.execute(ddl % (keyspace, str(replication_strategy)[1:-1]), timeout=10) ddl = 'CREATE TABLE %s.cf (k int PRIMARY KEY, i int)' - session.execute(ddl % keyspace) + session.execute(ddl % keyspace, timeout=10) session.execute('USE %s' % keyspace)