From 8c51776bdf9fc43200bc7e9f78941990699f134b Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Tue, 5 Nov 2013 17:16:53 -0600 Subject: [PATCH 01/19] Rearrange integration tests with new consistency test --- tests/integration/long/__init__.py | 1 + tests/integration/long/test_consistency.py | 386 ++++++++++++++++++ tests/integration/long/utils.py | 108 +++++ tests/integration/standard/__init__.py | 1 + .../{ => standard}/test_cluster.py | 0 .../{ => standard}/test_connection.py | 0 .../{ => standard}/test_factories.py | 0 .../{ => standard}/test_metadata.py | 0 .../{ => standard}/test_metrics.py | 0 .../test_prepared_statements.py | 0 .../integration/{ => standard}/test_query.py | 0 .../integration/{ => standard}/test_types.py | 0 12 files changed, 496 insertions(+) create mode 100644 tests/integration/long/__init__.py create mode 100644 tests/integration/long/test_consistency.py create mode 100644 tests/integration/long/utils.py create mode 100644 tests/integration/standard/__init__.py rename tests/integration/{ => standard}/test_cluster.py (100%) rename tests/integration/{ => standard}/test_connection.py (100%) rename tests/integration/{ => standard}/test_factories.py (100%) rename tests/integration/{ => standard}/test_metadata.py (100%) rename tests/integration/{ => standard}/test_metrics.py (100%) rename tests/integration/{ => standard}/test_prepared_statements.py (100%) rename tests/integration/{ => standard}/test_query.py (100%) rename tests/integration/{ => standard}/test_types.py (100%) diff --git a/tests/integration/long/__init__.py b/tests/integration/long/__init__.py new file mode 100644 index 00000000..a1379440 --- /dev/null +++ b/tests/integration/long/__init__.py @@ -0,0 +1 @@ +__author__ = 'joaquin' diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py new file mode 100644 index 00000000..13ae73fc --- /dev/null +++ b/tests/integration/long/test_consistency.py @@ -0,0 +1,386 @@ +import cassandra + +from cassandra import ConsistencyLevel +from cassandra.cluster import Cluster +from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy +from tests.integration.long.utils import reset_coordinators, force_stop, \ + create_schema, init, query, assert_queried, wait_for_down, wait_for_up, \ + start + +try: + import unittest2 as unittest +except ImportError: + import unittest # noqa + + +class ConsistencyTests(unittest.TestCase): + def _cl_failure(self, consistency_level, e): + self.fail('%s seen for CL.%s with message: %s' % ( + type(e), ConsistencyLevel.value_to_name[consistency_level], + e.message)) + + def _cl_expected_failure(self, cl): + self.fail('Test passed at ConsistencyLevel.%s' % + ConsistencyLevel.value_to_name[cl]) + + + def test_rfone_tokenaware(self): + keyspace = 'test_rfone_tokenaware' + cluster = Cluster( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) + session = cluster.connect() + + create_schema(session, keyspace, replication_factor=1) + init(session, keyspace, 12) + + reset_coordinators() + query(session, keyspace, 12) + assert_queried(1, 0) + assert_queried(2, 12) + assert_queried(3, 0) + + try: + force_stop(2) + wait_for_down(cluster, 2) + + accepted_list = [ConsistencyLevel.ANY] + + fail_list = [ + ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL, + ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM + ] + + # Test writes that expected to complete successfully + # BUG: CL.ANY should work + # for cl in accepted_list: + # try: + # init(session, keyspace, 12, consistency_level=cl) + # except Exception as e: + # self._cl_failure(cl, e) + + # Test reads that expected to complete successfully + for cl in accepted_list: + try: + query(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.ANY]: + self._cl_failure(cl, e) + except Exception as e: + self._cl_failure(cl, e) + + # Test writes that expected to fail + for cl in fail_list: + try: + init(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + + # Test reads that expected to fail + for cl in fail_list: + try: + query(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + finally: + start(2) + wait_for_up(cluster, 2) + + + def test_rftwo_tokenaware(self): + keyspace = 'test_rftwo_tokenaware' + cluster = Cluster( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) + session = cluster.connect() + + create_schema(session, keyspace, replication_factor=2) + init(session, keyspace, 12) + + reset_coordinators() + query(session, keyspace, 12) + assert_queried(1, 0) + assert_queried(2, 12) + assert_queried(3, 0) + + try: + force_stop(2) + wait_for_down(cluster, 2) + + accepted_list = [ + ConsistencyLevel.ANY, + ConsistencyLevel.ONE + ] + + fail_list = [ + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL, + ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM + ] + + # Test writes that expected to complete successfully + for cl in accepted_list: + try: + init(session, keyspace, 12, consistency_level=cl) + except Exception as e: + self._cl_failure(cl, e) + + # Test reads that expected to complete successfully + for cl in accepted_list: + try: + reset_coordinators() + query(session, keyspace, 12, consistency_level=cl) + # Bug: I believe the Java-Driver does this differently + # and RoundRobins after the ideal token is not available. + # I like the Python Driver's approach, but we should + # probably make all policies act the same way, whichever + # way gets chosen? + assert_queried(1, 0) + assert_queried(2, 0) + assert_queried(3, 12) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.ANY]: + self._cl_failure(cl, e) + except Exception as e: + self._cl_failure(cl, e) + + # Test writes that expected to fail + for cl in fail_list: + try: + init(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + + # Test reads that expected to fail + for cl in fail_list: + try: + query(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + finally: + start(2) + wait_for_up(cluster, 2) + + def test_rfthree_tokenaware(self): + keyspace = 'test_rfthree_tokenaware' + cluster = Cluster( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) + session = cluster.connect() + + create_schema(session, keyspace, replication_factor=3) + init(session, keyspace, 12) + + reset_coordinators() + query(session, keyspace, 12) + assert_queried(1, 0) + assert_queried(2, 12) + assert_queried(3, 0) + + try: + force_stop(2) + wait_for_down(cluster, 2) + + accepted_list = [ + ConsistencyLevel.ANY, + ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM + ] + + fail_list = [ + ConsistencyLevel.THREE, + ConsistencyLevel.ALL, + ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM + ] + + # Test writes that expected to complete successfully + for cl in accepted_list: + try: + init(session, keyspace, 12, consistency_level=cl) + except Exception as e: + self._cl_failure(cl, e) + + # Test reads that expected to complete successfully + for cl in accepted_list: + try: + reset_coordinators() + query(session, keyspace, 12, consistency_level=cl) + # Bug: I believe the Java-Driver does this differently + assert_queried(1, 12) + assert_queried(2, 0) + assert_queried(3, 0) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.ANY]: + self._cl_failure(cl, e) + except Exception as e: + self._cl_failure(cl, e) + + # Test writes that expected to fail + for cl in fail_list: + try: + init(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + + # Test reads that expected to fail + for cl in fail_list: + try: + query(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + finally: + start(2) + wait_for_up(cluster, 2) + + + def test_rfthree_tokenaware_none_down(self): + keyspace = 'test_rfthree_tokenaware_none_down' + cluster = Cluster( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) + session = cluster.connect() + + create_schema(session, keyspace, replication_factor=3) + init(session, keyspace, 12) + + reset_coordinators() + query(session, keyspace, 12) + assert_queried(1, 0) + assert_queried(2, 12) + assert_queried(3, 0) + + accepted_list = [ + ConsistencyLevel.ANY, + ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL, + ] + + fail_list = [ + ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM + ] + + # Test writes that expected to complete successfully + for cl in accepted_list: + try: + init(session, keyspace, 12, consistency_level=cl) + except Exception as e: + self._cl_failure(cl, e) + + # Test reads that expected to complete successfully + for cl in accepted_list: + try: + reset_coordinators() + query(session, keyspace, 12, consistency_level=cl) + assert_queried(1, 0) + assert_queried(2, 12) + assert_queried(3, 0) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.ANY]: + self._cl_failure(cl, e) + except Exception as e: + self._cl_failure(cl, e) + + # Test writes that expected to fail + for cl in fail_list: + try: + init(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + + # Test reads that expected to fail + for cl in fail_list: + try: + query(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py new file mode 100644 index 00000000..43322efe --- /dev/null +++ b/tests/integration/long/utils.py @@ -0,0 +1,108 @@ +import struct +import time + +from cassandra.query import SimpleStatement +from cassandra import ConsistencyLevel +from tests.integration import get_node + + +coordinators = {} + + +def add_coordinator(future): + global coordinators + coordinator = future._current_host.address + if coordinator in coordinators: + coordinators[coordinator] += 1 + else: + coordinators[coordinator] = 1 + if future._errors: + print 'future._errors', future._errors + future.result() + + +def reset_coordinators(): + global coordinators + coordinators = {} + + +def assert_queried(node, n): + ip = '127.0.0.%s' % node + print coordinators + if ip in coordinators: + if coordinators[ip] == n: + return + raise RuntimeError( + 'IP: %s. Expected: %s. Received: %s.' % (ip, n, coordinators[ip])) + else: + if n == 0: + return + raise RuntimeError('IP: %s. Expected: %s. Received: %s.' % (ip, n, 0)) + + +def create_schema(session, keyspace, replication_class='SimpleStrategy', + replication_factor=1): + results = session.execute( + 'SELECT keyspace_name FROM system.schema_keyspaces') + existing_keyspaces = [row[0] for row in results] + if keyspace in existing_keyspaces: + session.execute('DROP KEYSPACE %s' % keyspace) + if replication_class == 'SimpleStrategy': + ddl = "\n CREATE KEYSPACE %s\n WITH replication" \ + " = {'class': 'SimpleStrategy', 'replication_factor': '%s'}" + session.execute(ddl % (keyspace, replication_factor)) + ddl = '\n CREATE TABLE %s.cf (\n k int PRIMARY ' \ + 'KEY,\n i int)\n ' + session.execute(ddl % keyspace) + + +def init(session, keyspace, n, consistency_level=ConsistencyLevel.ONE): + reset_coordinators() + session.execute('USE %s' % keyspace) + for i in range(n): + ss = SimpleStatement('INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf', + consistency_level=consistency_level) + session.execute(ss) + + +def query(session, keyspace, n, consistency_level=ConsistencyLevel.ONE): + routing_key = struct.pack('>i', 0) + for i in range(n): + ss = SimpleStatement('SELECT * FROM %s WHERE k = 0' % 'cf', + consistency_level=consistency_level, + routing_key=routing_key) + add_coordinator(session.execute_async(ss)) + + +def start(node): + get_node(node).start() + + +def stop(node): + get_node(node).stop() + + +def force_stop(node): + get_node(node).stop(wait=False, gently=False) + + +def wait_for_up(cluster, node): + while True: + host = cluster.metadata.get_host('127.0.0.%s' % node) + if host and host.monitor.is_up: + # BUG: This shouldn't be needed. + # Ideally, host.monitor.is_up would be enough? + # If not, what should I be using? + # time.sleep(25) + return + + +def wait_for_down(cluster, node): + while True: + host = cluster.metadata.get_host('127.0.0.%s' % node) + if not host or not host.monitor.is_up: + # BUG: This shouldn't be needed. + # Ideally, host.monitor.is_up would be enough? + # If not, what should I be using? + # time.sleep(25) + return diff --git a/tests/integration/standard/__init__.py b/tests/integration/standard/__init__.py new file mode 100644 index 00000000..a1379440 --- /dev/null +++ b/tests/integration/standard/__init__.py @@ -0,0 +1 @@ +__author__ = 'joaquin' diff --git a/tests/integration/test_cluster.py b/tests/integration/standard/test_cluster.py similarity index 100% rename from tests/integration/test_cluster.py rename to tests/integration/standard/test_cluster.py diff --git a/tests/integration/test_connection.py b/tests/integration/standard/test_connection.py similarity index 100% rename from tests/integration/test_connection.py rename to tests/integration/standard/test_connection.py diff --git a/tests/integration/test_factories.py b/tests/integration/standard/test_factories.py similarity index 100% rename from tests/integration/test_factories.py rename to tests/integration/standard/test_factories.py diff --git a/tests/integration/test_metadata.py b/tests/integration/standard/test_metadata.py similarity index 100% rename from tests/integration/test_metadata.py rename to tests/integration/standard/test_metadata.py diff --git a/tests/integration/test_metrics.py b/tests/integration/standard/test_metrics.py similarity index 100% rename from tests/integration/test_metrics.py rename to tests/integration/standard/test_metrics.py diff --git a/tests/integration/test_prepared_statements.py b/tests/integration/standard/test_prepared_statements.py similarity index 100% rename from tests/integration/test_prepared_statements.py rename to tests/integration/standard/test_prepared_statements.py diff --git a/tests/integration/test_query.py b/tests/integration/standard/test_query.py similarity index 100% rename from tests/integration/test_query.py rename to tests/integration/standard/test_query.py diff --git a/tests/integration/test_types.py b/tests/integration/standard/test_types.py similarity index 100% rename from tests/integration/test_types.py rename to tests/integration/standard/test_types.py From d37b83bd0a09d2e7e0bcb7adac93c25af3564772 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Fri, 6 Dec 2013 17:29:02 -0600 Subject: [PATCH 02/19] Commit pending changes to rebase --- tests/integration/long/test_consistency.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index 13ae73fc..d2d8ca24 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -32,14 +32,15 @@ class ConsistencyTests(unittest.TestCase): create_schema(session, keyspace, replication_factor=1) init(session, keyspace, 12) - - reset_coordinators() query(session, keyspace, 12) + assert_queried(1, 0) assert_queried(2, 12) assert_queried(3, 0) + try: + reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -79,7 +80,7 @@ class ConsistencyTests(unittest.TestCase): try: init(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) - except cassandra.Unavailable as e: + except (cassandra.Unavailable, cassandra.WriteTimeout) as e: if not cl in [ConsistencyLevel.ONE, ConsistencyLevel.TWO, ConsistencyLevel.QUORUM, @@ -120,6 +121,7 @@ class ConsistencyTests(unittest.TestCase): create_schema(session, keyspace, replication_factor=2) init(session, keyspace, 12) + wait_for_up(cluster, 2) reset_coordinators() query(session, keyspace, 12) From fb8aeada275a2a9e878d251487a6fc4ff8e6db6e Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Thu, 12 Dec 2013 19:25:46 -0600 Subject: [PATCH 03/19] ConsistencyTests now work. Added SchemaTests as well. --- cassandra/metadata.py | 9 ++-- tests/integration/long/__init__.py | 1 - tests/integration/long/test_consistency.py | 44 +++++++++---------- tests/integration/long/test_schema.py | 51 ++++++++++++++++++++++ tests/integration/long/utils.py | 47 ++++++++++++-------- 5 files changed, 106 insertions(+), 46 deletions(-) create mode 100644 tests/integration/long/test_schema.py diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 8131bed6..359646a8 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -360,14 +360,13 @@ class SimpleStrategy(ReplicationStrategy): def make_token_replica_map(self, token_to_host_owner, ring): replica_map = {} for i in range(len(ring)): - j, hosts = 0, set() + j, hosts = 0, list() while len(hosts) < self.replication_factor and j < len(ring): token = ring[(i + j) % len(ring)] - hosts.add(token_to_host_owner[token]) + hosts.append(token_to_host_owner[token]) j += 1 replica_map[ring[i]] = hosts - return replica_map def export_for_schema(self): @@ -384,7 +383,7 @@ class NetworkTopologyStrategy(ReplicationStrategy): def make_token_replica_map(self, token_to_host_owner, ring): # note: this does not account for hosts having different racks - replica_map = defaultdict(set) + replica_map = defaultdict(list) ring_len = len(ring) ring_len_range = range(ring_len) dc_rf_map = dict((dc, int(rf)) @@ -401,7 +400,7 @@ class NetworkTopologyStrategy(ReplicationStrategy): # we already have all replicas for this DC continue - replica_map[ring[i]].add(host) + replica_map[ring[i]].append(host) if remaining[dc] == 1: del remaining[dc] diff --git a/tests/integration/long/__init__.py b/tests/integration/long/__init__.py index a1379440..e69de29b 100644 --- a/tests/integration/long/__init__.py +++ b/tests/integration/long/__init__.py @@ -1 +0,0 @@ -__author__ = 'joaquin' diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index d2d8ca24..fb976ce9 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -5,7 +5,7 @@ from cassandra.cluster import Cluster from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy from tests.integration.long.utils import reset_coordinators, force_stop, \ create_schema, init, query, assert_queried, wait_for_down, wait_for_up, \ - start + start, get_queried try: import unittest2 as unittest @@ -29,6 +29,8 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() + wait_for_up(cluster, 1) + wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=1) init(session, keyspace, 12) @@ -38,7 +40,6 @@ class ConsistencyTests(unittest.TestCase): assert_queried(2, 12) assert_queried(3, 0) - try: reset_coordinators() force_stop(2) @@ -57,12 +58,11 @@ class ConsistencyTests(unittest.TestCase): ] # Test writes that expected to complete successfully - # BUG: CL.ANY should work - # for cl in accepted_list: - # try: - # init(session, keyspace, 12, consistency_level=cl) - # except Exception as e: - # self._cl_failure(cl, e) + for cl in accepted_list: + try: + init(session, keyspace, 12, consistency_level=cl) + except Exception as e: + self._cl_failure(cl, e) # Test reads that expected to complete successfully for cl in accepted_list: @@ -118,18 +118,19 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() + wait_for_up(cluster, 1) + wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=2) init(session, keyspace, 12) - wait_for_up(cluster, 2) - - reset_coordinators() query(session, keyspace, 12) + assert_queried(1, 0) assert_queried(2, 12) assert_queried(3, 0) try: + reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -159,11 +160,6 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - # Bug: I believe the Java-Driver does this differently - # and RoundRobins after the ideal token is not available. - # I like the Python Driver's approach, but we should - # probably make all policies act the same way, whichever - # way gets chosen? assert_queried(1, 0) assert_queried(2, 0) assert_queried(3, 12) @@ -218,14 +214,14 @@ class ConsistencyTests(unittest.TestCase): create_schema(session, keyspace, replication_factor=3) init(session, keyspace, 12) - - reset_coordinators() query(session, keyspace, 12) + assert_queried(1, 0) assert_queried(2, 12) assert_queried(3, 0) try: + reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -255,10 +251,9 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - # Bug: I believe the Java-Driver does this differently - assert_queried(1, 12) + assert_queried(1, 0) assert_queried(2, 0) - assert_queried(3, 0) + assert_queried(3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -308,16 +303,19 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() + wait_for_up(cluster, 1) + wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=3) init(session, keyspace, 12) - - reset_coordinators() query(session, keyspace, 12) + assert_queried(1, 0) assert_queried(2, 12) assert_queried(3, 0) + reset_coordinators() + accepted_list = [ ConsistencyLevel.ANY, ConsistencyLevel.ONE, diff --git a/tests/integration/long/test_schema.py b/tests/integration/long/test_schema.py new file mode 100644 index 00000000..57484d46 --- /dev/null +++ b/tests/integration/long/test_schema.py @@ -0,0 +1,51 @@ +import logging +import cassandra + +from cassandra import ConsistencyLevel +from cassandra.cluster import Cluster +from cassandra.query import SimpleStatement + + +try: + import unittest2 as unittest +except ImportError: + import unittest # noqa + +log = logging.getLogger(__name__) + +class SchemaTests(unittest.TestCase): + def test_recreates(self): + cluster = Cluster() + session = cluster.connect() + + + replication_factor = 3 + + for i in range(2): + for keyspace in range(0, 100): + keyspace = 'ks_%s' % keyspace + results = session.execute('SELECT keyspace_name FROM system.schema_keyspaces') + existing_keyspaces = [row[0] for row in results] + if keyspace in existing_keyspaces: + ddl = 'DROP KEYSPACE %s' % keyspace + log.debug(ddl) + session.execute(ddl) + + ddl = "CREATE KEYSPACE %s WITH replication" \ + " = {'class': 'SimpleStrategy', 'replication_factor': '%s'}" % (keyspace, replication_factor) + log.debug(ddl) + session.execute(ddl) + + ddl = 'CREATE TABLE %s.cf (k int PRIMARY KEY, i int)' % keyspace + log.debug(ddl) + session.execute(ddl) + + statement = 'USE %s' % keyspace + log.debug(ddl) + session.execute(statement) + + statement = 'INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf' + log.debug(statement) + ss = SimpleStatement(statement, + consistency_level=ConsistencyLevel.QUORUM) + session.execute(ss) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index 43322efe..88ac1042 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -26,39 +26,52 @@ def reset_coordinators(): coordinators = {} +def get_queried(node): + ip = '127.0.0.%s' % node + if not ip in coordinators: + return 0 + return coordinators[ip] + + def assert_queried(node, n): ip = '127.0.0.%s' % node - print coordinators if ip in coordinators: if coordinators[ip] == n: return raise RuntimeError( - 'IP: %s. Expected: %s. Received: %s.' % (ip, n, coordinators[ip])) + 'IP: %s. Expected: %s. Received: %s. Full detail: %s.' % (ip, n, coordinators[ip], coordinators)) else: if n == 0: return - raise RuntimeError('IP: %s. Expected: %s. Received: %s.' % (ip, n, 0)) + raise RuntimeError('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % (ip, n, 0, coordinators)) def create_schema(session, keyspace, replication_class='SimpleStrategy', replication_factor=1): + results = session.execute( 'SELECT keyspace_name FROM system.schema_keyspaces') existing_keyspaces = [row[0] for row in results] if keyspace in existing_keyspaces: session.execute('DROP KEYSPACE %s' % keyspace) + if replication_class == 'SimpleStrategy': - ddl = "\n CREATE KEYSPACE %s\n WITH replication" \ + ddl = "CREATE KEYSPACE %s WITH replication" \ " = {'class': 'SimpleStrategy', 'replication_factor': '%s'}" session.execute(ddl % (keyspace, replication_factor)) - ddl = '\n CREATE TABLE %s.cf (\n k int PRIMARY ' \ - 'KEY,\n i int)\n ' + + ddl = 'CREATE TABLE %s.cf (k int PRIMARY KEY, i int)' session.execute(ddl % keyspace) + session.execute('USE %s' % keyspace) + + # BUG: probably related to PYTHON-39 + time.sleep(5) def init(session, keyspace, n, consistency_level=ConsistencyLevel.ONE): reset_coordinators() - session.execute('USE %s' % keyspace) + # BUG: PYTHON-38 + # session.execute('USE %s' % keyspace) for i in range(n): ss = SimpleStatement('INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf', consistency_level=consistency_level) @@ -85,24 +98,24 @@ def stop(node): def force_stop(node): get_node(node).stop(wait=False, gently=False) +def ring(node): + print 'From node%s:' % node + get_node(node).nodetool('ring') + def wait_for_up(cluster, node): while True: host = cluster.metadata.get_host('127.0.0.%s' % node) - if host and host.monitor.is_up: - # BUG: This shouldn't be needed. - # Ideally, host.monitor.is_up would be enough? - # If not, what should I be using? - # time.sleep(25) + if host and host.is_up: + # BUG: shouldn't have to, but we do + time.sleep(5) return def wait_for_down(cluster, node): while True: host = cluster.metadata.get_host('127.0.0.%s' % node) - if not host or not host.monitor.is_up: - # BUG: This shouldn't be needed. - # Ideally, host.monitor.is_up would be enough? - # If not, what should I be using? - # time.sleep(25) + if not host or not host.is_up: + # BUG: shouldn't have to, but we do + time.sleep(5) return From c41c9f17de8a46e18da00e00f768515168d60c4c Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Thu, 12 Dec 2013 20:03:39 -0600 Subject: [PATCH 04/19] Patch tests to work with recently patched code --- tests/integration/standard/test_metadata.py | 12 ++++++------ tests/unit/test_metadata.py | 2 +- tests/unit/test_policies.py | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index 32be266c..60a819b0 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -349,9 +349,9 @@ class TestCodeCoverage(unittest.TestCase): self.assertNotEqual(list(get_replicas('test3rf', ring[0])), []) for i, token in enumerate(ring): - self.assertEqual(get_replicas('test3rf', token), set(owners)) - self.assertEqual(get_replicas('test2rf', token), set([owners[i], owners[(i + 1) % 3]])) - self.assertEqual(get_replicas('test1rf', token), set([owners[i]])) + self.assertEqual(set(get_replicas('test3rf', token)), set(owners)) + self.assertEqual(set(get_replicas('test2rf', token)), set([owners[i], owners[(i + 1) % 3]])) + self.assertEqual(set(get_replicas('test1rf', token)), set([owners[i]])) class TokenMetadataTest(unittest.TestCase): @@ -379,15 +379,15 @@ class TokenMetadataTest(unittest.TestCase): # tokens match node tokens exactly for token, expected_host in zip(tokens, hosts): replicas = token_map.get_replicas("ks", token) - self.assertEqual(replicas, set([expected_host])) + self.assertEqual(set(replicas), set([expected_host])) # shift the tokens back by one for token, expected_host in zip(tokens[1:], hosts[1:]): replicas = token_map.get_replicas("ks", MD5Token(str(token.value - 1))) - self.assertEqual(replicas, set([expected_host])) + self.assertEqual(set(replicas), set([expected_host])) # shift the tokens forward by one for i, token in enumerate(tokens): replicas = token_map.get_replicas("ks", MD5Token(str(token.value + 1))) expected_host = hosts[(i + 1) % len(hosts)] - self.assertEqual(replicas, set([expected_host])) + self.assertEqual(set(replicas), set([expected_host])) diff --git a/tests/unit/test_metadata.py b/tests/unit/test_metadata.py index 199c8659..f52c9c32 100644 --- a/tests/unit/test_metadata.py +++ b/tests/unit/test_metadata.py @@ -96,7 +96,7 @@ class TestStrategies(unittest.TestCase): nts = NetworkTopologyStrategy({'dc1': 1, 'dc2': 0}) replica_map = nts.make_token_replica_map(token_to_host_owner, ring) - self.assertEqual(replica_map[MD5Token(0)], set([host])) + self.assertEqual(set(replica_map[MD5Token(0)]), set([host])) def test_nts_export_for_schema(self): # TODO: Cover NetworkTopologyStrategy.export_for_schema() diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index d461295c..ab993004 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -351,7 +351,7 @@ class TokenAwarePolicyTest(unittest.TestCase): self.assertEqual(policy.distance(remote_host), HostDistance.IGNORED) # dc2 isn't registered in the policy's live_hosts dict - policy.child_policy.used_hosts_per_remote_dc = 1 + policy._child_policy.used_hosts_per_remote_dc = 1 self.assertEqual(policy.distance(remote_host), HostDistance.IGNORED) # make sure the policy has both dcs registered From 9c9ebce1eca9a947fc711cacaf9bb7169dc63140 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Thu, 12 Dec 2013 20:04:59 -0600 Subject: [PATCH 05/19] PyCharm auto-code removal --- tests/integration/standard/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/standard/__init__.py b/tests/integration/standard/__init__.py index a1379440..e69de29b 100644 --- a/tests/integration/standard/__init__.py +++ b/tests/integration/standard/__init__.py @@ -1 +0,0 @@ -__author__ = 'joaquin' From 98584beb3cabe8f3b1d7194755a4ab98bb127d28 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Mon, 16 Dec 2013 13:39:27 -0600 Subject: [PATCH 06/19] Added policy checks to avoid unexpected exceptions. --- cassandra/cluster.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 985e3a9b..e528a8f9 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -283,14 +283,23 @@ class Cluster(object): self.auth_provider = auth_provider if load_balancing_policy is not None: + if isinstance(load_balancing_policy, type): + raise ValueError("load_balancing_policy must be an instance") + self.load_balancing_policy = load_balancing_policy else: self.load_balancing_policy = RoundRobinPolicy() if reconnection_policy is not None: + if isinstance(reconnection_policy, type): + raise ValueError("reconnection_policy must be an instance") + self.reconnection_policy = reconnection_policy if default_retry_policy is not None: + if isinstance(default_retry_policy, type): + raise ValueError("default_retry_policy must be an instance") + self.default_retry_policy = default_retry_policy if conviction_policy_factory is not None: From 13818a02c5757eb5148b1ea7f00017df4f874149 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Mon, 16 Dec 2013 13:39:59 -0600 Subject: [PATCH 07/19] Add the rest of the CL tests. --- tests/integration/long/test_consistency.py | 274 ++++++++++++++++++++- 1 file changed, 273 insertions(+), 1 deletion(-) diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index fb976ce9..34992442 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -2,7 +2,8 @@ import cassandra from cassandra import ConsistencyLevel from cassandra.cluster import Cluster -from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy +from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, \ + DowngradingConsistencyRetryPolicy from tests.integration.long.utils import reset_coordinators, force_stop, \ create_schema, init, query, assert_queried, wait_for_down, wait_for_up, \ start, get_queried @@ -384,3 +385,274 @@ class ConsistencyTests(unittest.TestCase): if not cl in [ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.EACH_QUORUM]: self._cl_failure(cl, e) + + + def test_rfone_downgradingcl(self): + keyspace = 'test_rfone_downgradingcl' + cluster = Cluster( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), + default_retry_policy=DowngradingConsistencyRetryPolicy()) + session = cluster.connect() + + create_schema(session, keyspace, replication_factor=1) + init(session, keyspace, 12) + query(session, keyspace, 12) + + assert_queried(1, 0) + assert_queried(2, 12) + assert_queried(3, 0) + + try: + reset_coordinators() + force_stop(2) + wait_for_down(cluster, 2) + + accepted_list = [ + ConsistencyLevel.ANY + ] + + fail_list = [ + ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL, + ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM + ] + + # Test writes that expected to complete successfully + for cl in accepted_list: + try: + init(session, keyspace, 12, consistency_level=cl) + except Exception as e: + self._cl_failure(cl, e) + + # Test reads that expected to complete successfully + for cl in accepted_list: + try: + reset_coordinators() + query(session, keyspace, 12, consistency_level=cl) + assert_queried(1, 0) + assert_queried(2, 0) + assert_queried(3, 12) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.ANY]: + self._cl_failure(cl, e) + except Exception as e: + self._cl_failure(cl, e) + + # Test writes that expected to fail + for cl in fail_list: + try: + init(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + + # Test reads that expected to fail + for cl in fail_list: + try: + query(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.Unavailable as e: + if not cl in [ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL]: + self._cl_failure(cl, e) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + finally: + start(2) + wait_for_up(cluster, 2) + + + def test_rftwo_downgradingcl(self): + keyspace = 'test_rftwo_downgradingcl' + cluster = Cluster( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), + default_retry_policy=DowngradingConsistencyRetryPolicy()) + session = cluster.connect() + + create_schema(session, keyspace, replication_factor=2) + init(session, keyspace, 12) + query(session, keyspace, 12) + + assert_queried(1, 0) + assert_queried(2, 12) + assert_queried(3, 0) + + try: + reset_coordinators() + force_stop(2) + wait_for_down(cluster, 2) + + accepted_list = [ + ConsistencyLevel.ANY, + ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL + ] + + fail_list = [ + ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM + ] + + # Test writes that expected to complete successfully + for cl in accepted_list: + try: + init(session, keyspace, 12, consistency_level=cl) + except Exception as e: + self._cl_failure(cl, e) + + # Test reads that expected to complete successfully + for cl in accepted_list: + try: + reset_coordinators() + query(session, keyspace, 12, consistency_level=cl) + assert_queried(1, 0) + assert_queried(2, 0) + assert_queried(3, 12) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.ANY]: + self._cl_failure(cl, e) + except Exception as e: + self._cl_failure(cl, e) + + # Test writes that expected to fail + for cl in fail_list: + try: + init(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + + # Test reads that expected to fail + for cl in fail_list: + try: + query(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + finally: + start(2) + wait_for_up(cluster, 2) + + + def test_rfthree_roundrobin_downgradingcl(self): + keyspace = 'test_rfthree_roundrobin_downgradingcl' + cluster = Cluster( + load_balancing_policy=RoundRobinPolicy(), + default_retry_policy=DowngradingConsistencyRetryPolicy()) + self.rfthree_downgradingcl(cluster, keyspace) + + def test_rfthree_tokenaware_downgradingcl(self): + keyspace = 'test_rfthree_tokenaware_downgradingcl' + cluster = Cluster( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), + default_retry_policy=DowngradingConsistencyRetryPolicy()) + self.rfthree_downgradingcl(cluster, keyspace) + + def rfthree_downgradingcl(self, cluster, keyspace): + session = cluster.connect() + + create_schema(session, keyspace, replication_factor=2) + init(session, keyspace, 12) + query(session, keyspace, 12) + + try: + assert_queried(1, 0) + assert_queried(2, 12) + assert_queried(3, 0) + except: + assert_queried(1, 4) + assert_queried(2, 4) + assert_queried(3, 4) + + try: + reset_coordinators() + force_stop(2) + wait_for_down(cluster, 2) + + accepted_list = [ + ConsistencyLevel.ANY, + ConsistencyLevel.ONE, + ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, + ConsistencyLevel.THREE, + ConsistencyLevel.ALL + ] + + fail_list = [ + ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM + ] + + # Test writes that expected to complete successfully + for cl in accepted_list: + try: + init(session, keyspace, 12, consistency_level=cl) + except Exception as e: + self._cl_failure(cl, e) + + # Test reads that expected to complete successfully + for cl in accepted_list: + try: + reset_coordinators() + query(session, keyspace, 12, consistency_level=cl) + # assert_queried(1, 0) + # assert_queried(2, 0) + # assert_queried(3, 12) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.ANY]: + self._cl_failure(cl, e) + except Exception as e: + self._cl_failure(cl, e) + + # Test writes that expected to fail + for cl in fail_list: + try: + init(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + + # Test reads that expected to fail + for cl in fail_list: + try: + query(session, keyspace, 12, consistency_level=cl) + self._cl_expected_failure(cl) + except cassandra.InvalidRequest as e: + if not cl in [ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]: + self._cl_failure(cl, e) + finally: + start(2) + wait_for_up(cluster, 2) + + # TODO: can't be done in this class since we reuse the ccm cluster + # instead we should create these elsewhere + # def test_rfthree_downgradingcl_twodcs(self): + # def test_rfthree_downgradingcl_twodcs_dcaware(self): From ed4636b6494eade54b295e79676a4743c93a092e Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Mon, 16 Dec 2013 13:40:33 -0600 Subject: [PATCH 08/19] Modify create_schema to work with NTS. --- tests/integration/long/utils.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index 88ac1042..ad27f1ff 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -46,8 +46,8 @@ def assert_queried(node, n): raise RuntimeError('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % (ip, n, 0, coordinators)) -def create_schema(session, keyspace, replication_class='SimpleStrategy', - replication_factor=1): +def create_schema(session, keyspace, replication_class='SS', + replication_factor=1, replication_strategy=None): results = session.execute( 'SELECT keyspace_name FROM system.schema_keyspaces') @@ -55,13 +55,20 @@ def create_schema(session, keyspace, replication_class='SimpleStrategy', if keyspace in existing_keyspaces: session.execute('DROP KEYSPACE %s' % keyspace) - if replication_class == 'SimpleStrategy': + if replication_class == 'SS': ddl = "CREATE KEYSPACE %s WITH replication" \ " = {'class': 'SimpleStrategy', 'replication_factor': '%s'}" session.execute(ddl % (keyspace, replication_factor)) + elif replication_class == 'NTS': + if not replication_strategy: + raise Exception('replication_strategy is not set') - ddl = 'CREATE TABLE %s.cf (k int PRIMARY KEY, i int)' - session.execute(ddl % keyspace) + ddl = "CREATE KEYSPACE %s" \ + " WITH replication = { 'class' : 'NetworkTopologyStrategy', %s }" + session.execute(ddl % (keyspace, str(replication_strategy)[1:-1])) + + ddl = 'CREATE TABLE %s.cf (k int PRIMARY KEY, i int)' + session.execute(ddl % keyspace) session.execute('USE %s' % keyspace) # BUG: probably related to PYTHON-39 From bf0d8a286137b6ac9278df36b5d622683832ee92 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Tue, 17 Dec 2013 12:46:56 -0600 Subject: [PATCH 09/19] Correctly handle replica set->list conversion --- cassandra/metadata.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 359646a8..e3e33bc9 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -360,13 +360,13 @@ class SimpleStrategy(ReplicationStrategy): def make_token_replica_map(self, token_to_host_owner, ring): replica_map = {} for i in range(len(ring)): - j, hosts = 0, list() + j, hosts = 0, set() while len(hosts) < self.replication_factor and j < len(ring): token = ring[(i + j) % len(ring)] - hosts.append(token_to_host_owner[token]) + hosts.add(token_to_host_owner[token]) j += 1 - replica_map[ring[i]] = hosts + replica_map[ring[i]] = list(sorted(hosts)) return replica_map def export_for_schema(self): @@ -383,7 +383,7 @@ class NetworkTopologyStrategy(ReplicationStrategy): def make_token_replica_map(self, token_to_host_owner, ring): # note: this does not account for hosts having different racks - replica_map = defaultdict(list) + replica_map = defaultdict(set) ring_len = len(ring) ring_len_range = range(ring_len) dc_rf_map = dict((dc, int(rf)) @@ -400,7 +400,7 @@ class NetworkTopologyStrategy(ReplicationStrategy): # we already have all replicas for this DC continue - replica_map[ring[i]].append(host) + replica_map[ring[i]].add(host) if remaining[dc] == 1: del remaining[dc] @@ -409,6 +409,8 @@ class NetworkTopologyStrategy(ReplicationStrategy): else: remaining[dc] -= 1 + replica_map[ring[i]] = list(sorted(replica_map[ring[i]])) + return replica_map def export_for_schema(self): From ce6f9dfe6799d0f99ca55d287445431442be7d88 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Tue, 17 Dec 2013 13:36:55 -0600 Subject: [PATCH 10/19] Polish policy instance error messages --- cassandra/cluster.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index e528a8f9..f9df898b 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -284,7 +284,7 @@ class Cluster(object): if load_balancing_policy is not None: if isinstance(load_balancing_policy, type): - raise ValueError("load_balancing_policy must be an instance") + raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class") self.load_balancing_policy = load_balancing_policy else: @@ -292,13 +292,13 @@ class Cluster(object): if reconnection_policy is not None: if isinstance(reconnection_policy, type): - raise ValueError("reconnection_policy must be an instance") + raise TypeError("reconnection_policy should not be a class, it should be an instance of that class") self.reconnection_policy = reconnection_policy if default_retry_policy is not None: if isinstance(default_retry_policy, type): - raise ValueError("default_retry_policy must be an instance") + raise TypeError("default_retry_policy should not be a class, it should be an instance of that class") self.default_retry_policy = default_retry_policy From 5d2d738b5bfc8c880fb1d351970ce08494445709 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Tue, 17 Dec 2013 15:56:02 -0600 Subject: [PATCH 11/19] use nose's fail, instead of raising Exceptions in utils.py --- tests/integration/long/test_consistency.py | 84 +++++++++++----------- tests/integration/long/utils.py | 9 +-- 2 files changed, 47 insertions(+), 46 deletions(-) diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index 34992442..72ed0bb5 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -37,9 +37,9 @@ class ConsistencyTests(unittest.TestCase): init(session, keyspace, 12) query(session, keyspace, 12) - assert_queried(1, 0) - assert_queried(2, 12) - assert_queried(3, 0) + assert_queried(self, 1, 0) + assert_queried(self, 2, 12) + assert_queried(self, 3, 0) try: reset_coordinators() @@ -126,9 +126,9 @@ class ConsistencyTests(unittest.TestCase): init(session, keyspace, 12) query(session, keyspace, 12) - assert_queried(1, 0) - assert_queried(2, 12) - assert_queried(3, 0) + assert_queried(self, 1, 0) + assert_queried(self, 2, 12) + assert_queried(self, 3, 0) try: reset_coordinators() @@ -161,9 +161,9 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - assert_queried(1, 0) - assert_queried(2, 0) - assert_queried(3, 12) + assert_queried(self, 1, 0) + assert_queried(self, 2, 0) + assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -217,9 +217,9 @@ class ConsistencyTests(unittest.TestCase): init(session, keyspace, 12) query(session, keyspace, 12) - assert_queried(1, 0) - assert_queried(2, 12) - assert_queried(3, 0) + assert_queried(self, 1, 0) + assert_queried(self, 2, 12) + assert_queried(self, 3, 0) try: reset_coordinators() @@ -252,9 +252,9 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - assert_queried(1, 0) - assert_queried(2, 0) - assert_queried(3, 12) + assert_queried(self, 1, 0) + assert_queried(self, 2, 0) + assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -311,9 +311,9 @@ class ConsistencyTests(unittest.TestCase): init(session, keyspace, 12) query(session, keyspace, 12) - assert_queried(1, 0) - assert_queried(2, 12) - assert_queried(3, 0) + assert_queried(self, 1, 0) + assert_queried(self, 2, 12) + assert_queried(self, 3, 0) reset_coordinators() @@ -343,9 +343,9 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - assert_queried(1, 0) - assert_queried(2, 12) - assert_queried(3, 0) + assert_queried(self, 1, 0) + assert_queried(self, 2, 12) + assert_queried(self, 3, 0) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -398,9 +398,9 @@ class ConsistencyTests(unittest.TestCase): init(session, keyspace, 12) query(session, keyspace, 12) - assert_queried(1, 0) - assert_queried(2, 12) - assert_queried(3, 0) + assert_queried(self, 1, 0) + assert_queried(self, 2, 12) + assert_queried(self, 3, 0) try: reset_coordinators() @@ -433,9 +433,9 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - assert_queried(1, 0) - assert_queried(2, 0) - assert_queried(3, 12) + assert_queried(self, 1, 0) + assert_queried(self, 2, 0) + assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -491,9 +491,9 @@ class ConsistencyTests(unittest.TestCase): init(session, keyspace, 12) query(session, keyspace, 12) - assert_queried(1, 0) - assert_queried(2, 12) - assert_queried(3, 0) + assert_queried(self, 1, 0) + assert_queried(self, 2, 12) + assert_queried(self, 3, 0) try: reset_coordinators() @@ -526,9 +526,9 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - assert_queried(1, 0) - assert_queried(2, 0) - assert_queried(3, 12) + assert_queried(self, 1, 0) + assert_queried(self, 2, 0) + assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -581,13 +581,13 @@ class ConsistencyTests(unittest.TestCase): query(session, keyspace, 12) try: - assert_queried(1, 0) - assert_queried(2, 12) - assert_queried(3, 0) + assert_queried(self, 1, 0) + assert_queried(self, 2, 12) + assert_queried(self, 3, 0) except: - assert_queried(1, 4) - assert_queried(2, 4) - assert_queried(3, 4) + assert_queried(self, 1, 4) + assert_queried(self, 2, 4) + assert_queried(self, 3, 4) try: reset_coordinators() @@ -620,9 +620,9 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - # assert_queried(1, 0) - # assert_queried(2, 0) - # assert_queried(3, 12) + # assert_queried(self, 1, 0) + # assert_queried(self, 2, 0) + # assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index ad27f1ff..4230afc9 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -33,17 +33,18 @@ def get_queried(node): return coordinators[ip] -def assert_queried(node, n): +def assert_queried(testcase, node, n): ip = '127.0.0.%s' % node if ip in coordinators: if coordinators[ip] == n: return - raise RuntimeError( - 'IP: %s. Expected: %s. Received: %s. Full detail: %s.' % (ip, n, coordinators[ip], coordinators)) + testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( + ip, n, coordinators[ip], coordinators)) else: if n == 0: return - raise RuntimeError('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % (ip, n, 0, coordinators)) + testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( + ip, n, 0, coordinators)) def create_schema(session, keyspace, replication_class='SS', From caf49c92aadcfb7fc5d56bbcfd4e7264cc64087c Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Tue, 17 Dec 2013 16:03:58 -0600 Subject: [PATCH 12/19] Use defaultdict(int) instead of checking and assigning --- tests/integration/long/utils.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index 4230afc9..307a5469 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -6,16 +6,14 @@ from cassandra import ConsistencyLevel from tests.integration import get_node -coordinators = {} +coordinators = defaultdict(int) def add_coordinator(future): global coordinators coordinator = future._current_host.address - if coordinator in coordinators: - coordinators[coordinator] += 1 - else: - coordinators[coordinator] = 1 + coordinators[coordinator] += 1 + if future._errors: print 'future._errors', future._errors future.result() From ae94856b3b5ad704656f9c4d50ecb98e22f58939 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Tue, 17 Dec 2013 16:11:19 -0600 Subject: [PATCH 13/19] Incorporated feedback --- tests/integration/long/utils.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index 307a5469..279cc416 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -1,13 +1,17 @@ +import logging import struct import time +from collections import defaultdict + from cassandra.query import SimpleStatement from cassandra import ConsistencyLevel from tests.integration import get_node - coordinators = defaultdict(int) +log = logging.getLogger(__name__) + def add_coordinator(future): global coordinators @@ -15,7 +19,7 @@ def add_coordinator(future): coordinators[coordinator] += 1 if future._errors: - print 'future._errors', future._errors + log.error('future._errors: %s' % future._errors) future.result() @@ -45,7 +49,7 @@ def assert_queried(testcase, node, n): ip, n, 0, coordinators)) -def create_schema(session, keyspace, replication_class='SS', +def create_schema(session, keyspace, simple_strategy=True, replication_factor=1, replication_strategy=None): results = session.execute( @@ -54,11 +58,11 @@ def create_schema(session, keyspace, replication_class='SS', if keyspace in existing_keyspaces: session.execute('DROP KEYSPACE %s' % keyspace) - if replication_class == 'SS': + if simple_strategy: ddl = "CREATE KEYSPACE %s WITH replication" \ " = {'class': 'SimpleStrategy', 'replication_factor': '%s'}" session.execute(ddl % (keyspace, replication_factor)) - elif replication_class == 'NTS': + else: if not replication_strategy: raise Exception('replication_strategy is not set') @@ -84,9 +88,9 @@ def init(session, keyspace, n, consistency_level=ConsistencyLevel.ONE): session.execute(ss) -def query(session, keyspace, n, consistency_level=ConsistencyLevel.ONE): +def query(session, keyspace, count, consistency_level=ConsistencyLevel.ONE): routing_key = struct.pack('>i', 0) - for i in range(n): + for i in range(count): ss = SimpleStatement('SELECT * FROM %s WHERE k = 0' % 'cf', consistency_level=consistency_level, routing_key=routing_key) From 284bb378ff72b09fe5c38fa6a5fa8639cf4819ec Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Wed, 18 Dec 2013 14:24:27 -0600 Subject: [PATCH 14/19] Re-implement previous replica sorting with vnode support --- cassandra/metadata.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cassandra/metadata.py b/cassandra/metadata.py index e3e33bc9..5a5083e7 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -360,13 +360,15 @@ class SimpleStrategy(ReplicationStrategy): def make_token_replica_map(self, token_to_host_owner, ring): replica_map = {} for i in range(len(ring)): - j, hosts = 0, set() + j, hosts = 0, list() while len(hosts) < self.replication_factor and j < len(ring): token = ring[(i + j) % len(ring)] - hosts.add(token_to_host_owner[token]) + host = token_to_host_owner[token] + if not host in hosts: + hosts.append(host) j += 1 - replica_map[ring[i]] = list(sorted(hosts)) + replica_map[ring[i]] = hosts return replica_map def export_for_schema(self): @@ -383,7 +385,7 @@ class NetworkTopologyStrategy(ReplicationStrategy): def make_token_replica_map(self, token_to_host_owner, ring): # note: this does not account for hosts having different racks - replica_map = defaultdict(set) + replica_map = defaultdict(list) ring_len = len(ring) ring_len_range = range(ring_len) dc_rf_map = dict((dc, int(rf)) @@ -400,7 +402,8 @@ class NetworkTopologyStrategy(ReplicationStrategy): # we already have all replicas for this DC continue - replica_map[ring[i]].add(host) + if not host in replica_map[ring[i]]: + replica_map[ring[i]].append(host) if remaining[dc] == 1: del remaining[dc] @@ -408,9 +411,6 @@ class NetworkTopologyStrategy(ReplicationStrategy): break else: remaining[dc] -= 1 - - replica_map[ring[i]] = list(sorted(replica_map[ring[i]])) - return replica_map def export_for_schema(self): From 75e63d8a95aa3b9a8ac3a75ac19e2d211448c2fe Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Wed, 18 Dec 2013 14:24:47 -0600 Subject: [PATCH 15/19] Create the CoordinatorStats class --- tests/integration/long/test_consistency.py | 220 +++++++++++---------- tests/integration/long/utils.py | 95 ++++----- 2 files changed, 162 insertions(+), 153 deletions(-) diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index 72ed0bb5..fde957c8 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -4,9 +4,8 @@ from cassandra import ConsistencyLevel from cassandra.cluster import Cluster from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, \ DowngradingConsistencyRetryPolicy -from tests.integration.long.utils import reset_coordinators, force_stop, \ - create_schema, init, query, assert_queried, wait_for_down, wait_for_up, \ - start, get_queried +from tests.integration.long.utils import force_stop, create_schema, \ + wait_for_down, wait_for_up, start, CoordinatorStats try: import unittest2 as unittest @@ -15,6 +14,10 @@ except ImportError: class ConsistencyTests(unittest.TestCase): + + def setUp(self): + self.cs = CoordinatorStats() + def _cl_failure(self, consistency_level, e): self.fail('%s seen for CL.%s with message: %s' % ( type(e), ConsistencyLevel.value_to_name[consistency_level], @@ -34,15 +37,15 @@ class ConsistencyTests(unittest.TestCase): wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=1) - init(session, keyspace, 12) - query(session, keyspace, 12) + self.cs.init(session, keyspace, 12) + self.cs.query(session, keyspace, 12) - assert_queried(self, 1, 0) - assert_queried(self, 2, 12) - assert_queried(self, 3, 0) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 12) + self.cs.assert_queried(self, 3, 0) try: - reset_coordinators() + self.cs.reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -61,14 +64,14 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to complete successfully for cl in accepted_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) except Exception as e: self._cl_failure(cl, e) # Test reads that expected to complete successfully for cl in accepted_list: try: - query(session, keyspace, 12, consistency_level=cl) + self.cs.query(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: @@ -79,7 +82,7 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to fail for cl in fail_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except (cassandra.Unavailable, cassandra.WriteTimeout) as e: if not cl in [ConsistencyLevel.ONE, @@ -96,7 +99,7 @@ class ConsistencyTests(unittest.TestCase): # Test reads that expected to fail for cl in fail_list: try: - query(session, keyspace, 12, consistency_level=cl) + self.cs.query(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.Unavailable as e: if not cl in [ConsistencyLevel.ONE, @@ -123,15 +126,15 @@ class ConsistencyTests(unittest.TestCase): wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=2) - init(session, keyspace, 12) - query(session, keyspace, 12) + self.cs.init(session, keyspace, 12) + self.cs.query(session, keyspace, 12) - assert_queried(self, 1, 0) - assert_queried(self, 2, 12) - assert_queried(self, 3, 0) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 12) + self.cs.assert_queried(self, 3, 0) try: - reset_coordinators() + self.cs.reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -152,18 +155,18 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to complete successfully for cl in accepted_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) except Exception as e: self._cl_failure(cl, e) # Test reads that expected to complete successfully for cl in accepted_list: try: - reset_coordinators() - query(session, keyspace, 12, consistency_level=cl) - assert_queried(self, 1, 0) - assert_queried(self, 2, 0) - assert_queried(self, 3, 12) + self.cs.reset_coordinators() + self.cs.query(session, keyspace, 12, consistency_level=cl) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 0) + self.cs.assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -173,7 +176,7 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to fail for cl in fail_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.Unavailable as e: if not cl in [ConsistencyLevel.ONE, @@ -190,7 +193,7 @@ class ConsistencyTests(unittest.TestCase): # Test reads that expected to fail for cl in fail_list: try: - query(session, keyspace, 12, consistency_level=cl) + self.cs.query(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.Unavailable as e: if not cl in [ConsistencyLevel.ONE, @@ -214,15 +217,15 @@ class ConsistencyTests(unittest.TestCase): session = cluster.connect() create_schema(session, keyspace, replication_factor=3) - init(session, keyspace, 12) - query(session, keyspace, 12) + self.cs.init(session, keyspace, 12) + self.cs.query(session, keyspace, 12) - assert_queried(self, 1, 0) - assert_queried(self, 2, 12) - assert_queried(self, 3, 0) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 12) + self.cs.assert_queried(self, 3, 0) try: - reset_coordinators() + self.cs.reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -243,18 +246,18 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to complete successfully for cl in accepted_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) except Exception as e: self._cl_failure(cl, e) # Test reads that expected to complete successfully for cl in accepted_list: try: - reset_coordinators() - query(session, keyspace, 12, consistency_level=cl) - assert_queried(self, 1, 0) - assert_queried(self, 2, 0) - assert_queried(self, 3, 12) + self.cs.reset_coordinators() + self.cs.query(session, keyspace, 12, consistency_level=cl) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 0) + self.cs.assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -264,7 +267,7 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to fail for cl in fail_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.Unavailable as e: if not cl in [ConsistencyLevel.ONE, @@ -281,7 +284,7 @@ class ConsistencyTests(unittest.TestCase): # Test reads that expected to fail for cl in fail_list: try: - query(session, keyspace, 12, consistency_level=cl) + self.cs.query(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.Unavailable as e: if not cl in [ConsistencyLevel.ONE, @@ -308,14 +311,14 @@ class ConsistencyTests(unittest.TestCase): wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=3) - init(session, keyspace, 12) - query(session, keyspace, 12) + self.cs.init(session, keyspace, 12) + self.cs.query(session, keyspace, 12) - assert_queried(self, 1, 0) - assert_queried(self, 2, 12) - assert_queried(self, 3, 0) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 12) + self.cs.assert_queried(self, 3, 0) - reset_coordinators() + self.cs.reset_coordinators() accepted_list = [ ConsistencyLevel.ANY, @@ -334,18 +337,18 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to complete successfully for cl in accepted_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) except Exception as e: self._cl_failure(cl, e) # Test reads that expected to complete successfully for cl in accepted_list: try: - reset_coordinators() - query(session, keyspace, 12, consistency_level=cl) - assert_queried(self, 1, 0) - assert_queried(self, 2, 12) - assert_queried(self, 3, 0) + self.cs.reset_coordinators() + self.cs.query(session, keyspace, 12, consistency_level=cl) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 12) + self.cs.assert_queried(self, 3, 0) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -355,7 +358,7 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to fail for cl in fail_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.Unavailable as e: if not cl in [ConsistencyLevel.ONE, @@ -372,7 +375,7 @@ class ConsistencyTests(unittest.TestCase): # Test reads that expected to fail for cl in fail_list: try: - query(session, keyspace, 12, consistency_level=cl) + self.cs.query(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.Unavailable as e: if not cl in [ConsistencyLevel.ONE, @@ -395,15 +398,15 @@ class ConsistencyTests(unittest.TestCase): session = cluster.connect() create_schema(session, keyspace, replication_factor=1) - init(session, keyspace, 12) - query(session, keyspace, 12) + self.cs.init(session, keyspace, 12) + self.cs.query(session, keyspace, 12) - assert_queried(self, 1, 0) - assert_queried(self, 2, 12) - assert_queried(self, 3, 0) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 12) + self.cs.assert_queried(self, 3, 0) try: - reset_coordinators() + self.cs.reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -424,18 +427,18 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to complete successfully for cl in accepted_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) except Exception as e: self._cl_failure(cl, e) # Test reads that expected to complete successfully for cl in accepted_list: try: - reset_coordinators() - query(session, keyspace, 12, consistency_level=cl) - assert_queried(self, 1, 0) - assert_queried(self, 2, 0) - assert_queried(self, 3, 12) + self.cs.reset_coordinators() + self.cs.query(session, keyspace, 12, consistency_level=cl) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 0) + self.cs.assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -445,7 +448,7 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to fail for cl in fail_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.Unavailable as e: if not cl in [ConsistencyLevel.ONE, @@ -462,7 +465,7 @@ class ConsistencyTests(unittest.TestCase): # Test reads that expected to fail for cl in fail_list: try: - query(session, keyspace, 12, consistency_level=cl) + self.cs.query(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.Unavailable as e: if not cl in [ConsistencyLevel.ONE, @@ -488,15 +491,15 @@ class ConsistencyTests(unittest.TestCase): session = cluster.connect() create_schema(session, keyspace, replication_factor=2) - init(session, keyspace, 12) - query(session, keyspace, 12) + self.cs.init(session, keyspace, 12) + self.cs.query(session, keyspace, 12) - assert_queried(self, 1, 0) - assert_queried(self, 2, 12) - assert_queried(self, 3, 0) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 12) + self.cs.assert_queried(self, 3, 0) try: - reset_coordinators() + self.cs.reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -517,18 +520,18 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to complete successfully for cl in accepted_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) except Exception as e: self._cl_failure(cl, e) # Test reads that expected to complete successfully for cl in accepted_list: try: - reset_coordinators() - query(session, keyspace, 12, consistency_level=cl) - assert_queried(self, 1, 0) - assert_queried(self, 2, 0) - assert_queried(self, 3, 12) + self.cs.reset_coordinators() + self.cs.query(session, keyspace, 12, consistency_level=cl) + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 0) + self.cs.assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -538,7 +541,7 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to fail for cl in fail_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.LOCAL_QUORUM, @@ -548,7 +551,7 @@ class ConsistencyTests(unittest.TestCase): # Test reads that expected to fail for cl in fail_list: try: - query(session, keyspace, 12, consistency_level=cl) + self.cs.query(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.LOCAL_QUORUM, @@ -564,33 +567,33 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=RoundRobinPolicy(), default_retry_policy=DowngradingConsistencyRetryPolicy()) - self.rfthree_downgradingcl(cluster, keyspace) + self.rfthree_downgradingcl(cluster, keyspace, True) def test_rfthree_tokenaware_downgradingcl(self): keyspace = 'test_rfthree_tokenaware_downgradingcl' cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), default_retry_policy=DowngradingConsistencyRetryPolicy()) - self.rfthree_downgradingcl(cluster, keyspace) + self.rfthree_downgradingcl(cluster, keyspace, False) - def rfthree_downgradingcl(self, cluster, keyspace): + def rfthree_downgradingcl(self, cluster, keyspace, roundrobin): session = cluster.connect() create_schema(session, keyspace, replication_factor=2) - init(session, keyspace, 12) - query(session, keyspace, 12) + self.cs.init(session, keyspace, 12) + self.cs.query(session, keyspace, 12) + + if roundrobin: + self.cs.assert_queried(self, 1, 4) + self.cs.assert_queried(self, 2, 4) + self.cs.assert_queried(self, 3, 4) + else: + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 12) + self.cs.assert_queried(self, 3, 0) try: - assert_queried(self, 1, 0) - assert_queried(self, 2, 12) - assert_queried(self, 3, 0) - except: - assert_queried(self, 1, 4) - assert_queried(self, 2, 4) - assert_queried(self, 3, 4) - - try: - reset_coordinators() + self.cs.reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -611,18 +614,23 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to complete successfully for cl in accepted_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) except Exception as e: self._cl_failure(cl, e) # Test reads that expected to complete successfully for cl in accepted_list: try: - reset_coordinators() - query(session, keyspace, 12, consistency_level=cl) - # assert_queried(self, 1, 0) - # assert_queried(self, 2, 0) - # assert_queried(self, 3, 12) + self.cs.reset_coordinators() + self.cs.query(session, keyspace, 12, consistency_level=cl) + if roundrobin: + self.cs.assert_queried(self, 1, 6) + self.cs.assert_queried(self, 2, 0) + self.cs.assert_queried(self, 3, 6) + else: + self.cs.assert_queried(self, 1, 0) + self.cs.assert_queried(self, 2, 0) + self.cs.assert_queried(self, 3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -632,7 +640,7 @@ class ConsistencyTests(unittest.TestCase): # Test writes that expected to fail for cl in fail_list: try: - init(session, keyspace, 12, consistency_level=cl) + self.cs.init(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.LOCAL_QUORUM, @@ -642,7 +650,7 @@ class ConsistencyTests(unittest.TestCase): # Test reads that expected to fail for cl in fail_list: try: - query(session, keyspace, 12, consistency_level=cl) + self.cs.query(session, keyspace, 12, consistency_level=cl) self._cl_expected_failure(cl) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.LOCAL_QUORUM, diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index 279cc416..8e49467a 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -8,45 +8,65 @@ from cassandra.query import SimpleStatement from cassandra import ConsistencyLevel from tests.integration import get_node -coordinators = defaultdict(int) log = logging.getLogger(__name__) -def add_coordinator(future): - global coordinators - coordinator = future._current_host.address - coordinators[coordinator] += 1 +class CoordinatorStats(): + def __init__(self): + self.coordinators = defaultdict(int) - if future._errors: - log.error('future._errors: %s' % future._errors) - future.result() + def add_coordinator(self, future): + coordinator = future._current_host.address + self.coordinators[coordinator] += 1 + + if future._errors: + log.error('future._errors: %s' % future._errors) + future.result() -def reset_coordinators(): - global coordinators - coordinators = {} + def reset_coordinators(self): + self.coordinators = defaultdict(int) -def get_queried(node): - ip = '127.0.0.%s' % node - if not ip in coordinators: - return 0 - return coordinators[ip] + def get_queried(self, node): + ip = '127.0.0.%s' % node + if not ip in self.coordinators: + return 0 + return self.coordinators[ip] -def assert_queried(testcase, node, n): - ip = '127.0.0.%s' % node - if ip in coordinators: - if coordinators[ip] == n: - return - testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( - ip, n, coordinators[ip], coordinators)) - else: - if n == 0: - return - testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( - ip, n, 0, coordinators)) + def assert_queried(self, testcase, node, n): + ip = '127.0.0.%s' % node + if ip in self.coordinators: + if self.coordinators[ip] == n: + return + testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( + ip, n, self.coordinators[ip], self.coordinators)) + else: + if n == 0: + return + testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( + ip, n, 0, self.coordinators)) + + + def init(self, session, keyspace, n, consistency_level=ConsistencyLevel.ONE): + self.reset_coordinators() + # BUG: PYTHON-38 + # session.execute('USE %s' % keyspace) + for i in range(n): + ss = SimpleStatement('INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf', + consistency_level=consistency_level) + session.execute(ss) + + + def query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE): + routing_key = struct.pack('>i', 0) + for i in range(count): + ss = SimpleStatement('SELECT * FROM %s WHERE k = 0' % 'cf', + consistency_level=consistency_level, + routing_key=routing_key) + self.add_coordinator(session.execute_async(ss)) def create_schema(session, keyspace, simple_strategy=True, @@ -78,25 +98,6 @@ def create_schema(session, keyspace, simple_strategy=True, time.sleep(5) -def init(session, keyspace, n, consistency_level=ConsistencyLevel.ONE): - reset_coordinators() - # BUG: PYTHON-38 - # session.execute('USE %s' % keyspace) - for i in range(n): - ss = SimpleStatement('INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf', - consistency_level=consistency_level) - session.execute(ss) - - -def query(session, keyspace, count, consistency_level=ConsistencyLevel.ONE): - routing_key = struct.pack('>i', 0) - for i in range(count): - ss = SimpleStatement('SELECT * FROM %s WHERE k = 0' % 'cf', - consistency_level=consistency_level, - routing_key=routing_key) - add_coordinator(session.execute_async(ss)) - - def start(node): get_node(node).start() From e26da843cc46bb9f58d163f3d3bee90bbb74aa20 Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Wed, 18 Dec 2013 16:12:38 -0600 Subject: [PATCH 16/19] Print more in-depth stacktraces --- tests/integration/long/test_consistency.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index fde957c8..0f12b0c5 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -1,3 +1,4 @@ +import traceback import cassandra from cassandra import ConsistencyLevel @@ -19,13 +20,13 @@ class ConsistencyTests(unittest.TestCase): self.cs = CoordinatorStats() def _cl_failure(self, consistency_level, e): - self.fail('%s seen for CL.%s with message: %s' % ( + self.fail('%s seen for CL.%s:\n\n%s' % ( type(e), ConsistencyLevel.value_to_name[consistency_level], - e.message)) + traceback.format_exc())) def _cl_expected_failure(self, cl): - self.fail('Test passed at ConsistencyLevel.%s' % - ConsistencyLevel.value_to_name[cl]) + self.fail('Test passed at ConsistencyLevel.%s:\n\n%s' % ( + ConsistencyLevel.value_to_name[cl], traceback.format_exc())) def test_rfone_tokenaware(self): From 9209bef19edb4eb4e591286b8fa17f7235cc733e Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Wed, 18 Dec 2013 16:12:56 -0600 Subject: [PATCH 17/19] optimize/remove existing bug comments --- tests/integration/long/test_consistency.py | 6 +++--- tests/integration/long/utils.py | 13 ++++++------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index 0f12b0c5..0f7306d6 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -34,7 +34,7 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() - wait_for_up(cluster, 1) + wait_for_up(cluster, 1, wait=False) wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=1) @@ -123,7 +123,7 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() - wait_for_up(cluster, 1) + wait_for_up(cluster, 1, wait=False) wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=2) @@ -308,7 +308,7 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() - wait_for_up(cluster, 1) + wait_for_up(cluster, 1, wait=False) wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=3) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index 8e49467a..54aa156a 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -94,9 +94,6 @@ def create_schema(session, keyspace, simple_strategy=True, session.execute(ddl % keyspace) session.execute('USE %s' % keyspace) - # BUG: probably related to PYTHON-39 - time.sleep(5) - def start(node): get_node(node).start() @@ -114,19 +111,21 @@ def ring(node): get_node(node).nodetool('ring') -def wait_for_up(cluster, node): +def wait_for_up(cluster, node, wait=True): while True: host = cluster.metadata.get_host('127.0.0.%s' % node) if host and host.is_up: # BUG: shouldn't have to, but we do - time.sleep(5) + if wait: + time.sleep(5) return -def wait_for_down(cluster, node): +def wait_for_down(cluster, node, wait=True): while True: host = cluster.metadata.get_host('127.0.0.%s' % node) if not host or not host.is_up: # BUG: shouldn't have to, but we do - time.sleep(5) + if wait: + time.sleep(5) return From 5a64fa2b672fd180e0941aa59362a2ff6dea4aba Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Wed, 18 Dec 2013 16:27:57 -0600 Subject: [PATCH 18/19] Remove last PYTHON-38 workaround --- tests/integration/long/utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index 54aa156a..f132531a 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -52,8 +52,7 @@ class CoordinatorStats(): def init(self, session, keyspace, n, consistency_level=ConsistencyLevel.ONE): self.reset_coordinators() - # BUG: PYTHON-38 - # session.execute('USE %s' % keyspace) + session.execute('USE %s' % keyspace) for i in range(n): ss = SimpleStatement('INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf', consistency_level=consistency_level) From ed842e9440826d38a51e25c4fe66d3a699fee69d Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 27 Dec 2013 17:51:53 -0600 Subject: [PATCH 19/19] Consistency test cleanup --- tests/integration/long/test_consistency.py | 730 +++++---------------- tests/integration/long/utils.py | 66 +- 2 files changed, 179 insertions(+), 617 deletions(-) diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index 0f7306d6..bf8646a8 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -1,10 +1,13 @@ +import struct import traceback -import cassandra +import cassandra from cassandra import ConsistencyLevel from cassandra.cluster import Cluster from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, \ DowngradingConsistencyRetryPolicy +from cassandra.query import SimpleStatement + from tests.integration.long.utils import force_stop, create_schema, \ wait_for_down, wait_for_up, start, CoordinatorStats @@ -13,295 +16,133 @@ try: except ImportError: import unittest # noqa +ALL_CONSISTENCY_LEVELS = set([ + ConsistencyLevel.ANY, ConsistencyLevel.ONE, ConsistencyLevel.TWO, + ConsistencyLevel.QUORUM, ConsistencyLevel.THREE, + ConsistencyLevel.ALL, ConsistencyLevel.LOCAL_QUORUM, + ConsistencyLevel.EACH_QUORUM]) + +MULTI_DC_CONSISTENCY_LEVELS = set([ + ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.EACH_QUORUM]) + +SINGLE_DC_CONSISTENCY_LEVELS = ALL_CONSISTENCY_LEVELS - MULTI_DC_CONSISTENCY_LEVELS + class ConsistencyTests(unittest.TestCase): def setUp(self): - self.cs = CoordinatorStats() + self.coordinator_stats = CoordinatorStats() def _cl_failure(self, consistency_level, e): - self.fail('%s seen for CL.%s:\n\n%s' % ( - type(e), ConsistencyLevel.value_to_name[consistency_level], + self.fail('Instead of success, saw %s for CL.%s:\n\n%s' % ( + e, ConsistencyLevel.value_to_name[consistency_level], traceback.format_exc())) def _cl_expected_failure(self, cl): self.fail('Test passed at ConsistencyLevel.%s:\n\n%s' % ( ConsistencyLevel.value_to_name[cl], traceback.format_exc())) + def _insert(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE): + session.execute('USE %s' % keyspace) + for i in range(count): + ss = SimpleStatement('INSERT INTO cf(k, i) VALUES (0, 0)', + consistency_level=consistency_level) + session.execute(ss) - def test_rfone_tokenaware(self): - keyspace = 'test_rfone_tokenaware' + def _query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE): + routing_key = struct.pack('>i', 0) + for i in range(count): + ss = SimpleStatement('SELECT * FROM cf WHERE k = 0', + consistency_level=consistency_level, + routing_key=routing_key) + self.coordinator_stats.add_coordinator(session.execute_async(ss)) + + def _assert_writes_succeed(self, session, keyspace, consistency_levels): + for cl in consistency_levels: + self.coordinator_stats.reset_counts() + try: + self._insert(session, keyspace, 1, cl) + except Exception as e: + self._cl_failure(cl, e) + + def _assert_reads_succeed(self, session, keyspace, consistency_levels, expected_reader=3): + for cl in consistency_levels: + self.coordinator_stats.reset_counts() + try: + self._query(session, keyspace, 1, cl) + for i in range(3): + if i == expected_reader: + self.coordinator_stats.assert_query_count_equals(self, i, 1) + else: + self.coordinator_stats.assert_query_count_equals(self, i, 0) + except Exception as e: + self._cl_failure(cl, e) + + def _assert_writes_fail(self, session, keyspace, consistency_levels): + for cl in consistency_levels: + self.coordinator_stats.reset_counts() + try: + self._insert(session, keyspace, 1, cl) + self._cl_expected_failure(cl) + except (cassandra.Unavailable, cassandra.WriteTimeout): + pass + + def _assert_reads_fail(self, session, keyspace, consistency_levels): + for cl in consistency_levels: + self.coordinator_stats.reset_counts() + try: + self._query(session, keyspace, 1, cl) + self._cl_expected_failure(cl) + except (cassandra.Unavailable, cassandra.ReadTimeout): + pass + + def _test_tokenaware_one_node_down(self, keyspace, rf, accepted): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() wait_for_up(cluster, 1, wait=False) wait_for_up(cluster, 2) - create_schema(session, keyspace, replication_factor=1) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) + create_schema(session, keyspace, replication_factor=rf) + self._insert(session, keyspace, count=1) + self._query(session, keyspace, count=1) + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 1) + self.coordinator_stats.assert_query_count_equals(self, 3, 0) try: - self.cs.reset_coordinators() force_stop(2) wait_for_down(cluster, 2) - accepted_list = [ConsistencyLevel.ANY] - - fail_list = [ - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except (cassandra.Unavailable, cassandra.WriteTimeout) as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) + self._assert_writes_succeed(session, keyspace, accepted) + self._assert_reads_succeed(session, keyspace, + accepted - set([ConsistencyLevel.ANY])) + self._assert_writes_fail(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - accepted) + self._assert_reads_fail(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - accepted) finally: start(2) wait_for_up(cluster, 2) + def test_rfone_tokenaware_one_node_down(self): + self._test_tokenaware_one_node_down( + keyspace='test_rfone_tokenaware', + rf=1, + accepted=set([ConsistencyLevel.ANY])) - def test_rftwo_tokenaware(self): - keyspace = 'test_rftwo_tokenaware' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2) - - create_schema(session, keyspace, replication_factor=2) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - - try: - self.cs.reset_coordinators() - force_stop(2) - wait_for_down(cluster, 2) - - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE - ] - - fail_list = [ - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - finally: - start(2) - wait_for_up(cluster, 2) - - def test_rfthree_tokenaware(self): - keyspace = 'test_rfthree_tokenaware' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) - session = cluster.connect() - - create_schema(session, keyspace, replication_factor=3) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - - try: - self.cs.reset_coordinators() - force_stop(2) - wait_for_down(cluster, 2) - - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM - ] - - fail_list = [ - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - finally: - start(2) - wait_for_up(cluster, 2) + def test_rftwo_tokenaware_one_node_down(self): + self._test_tokenaware_one_node_down( + keyspace='test_rftwo_tokenaware', + rf=2, + accepted=set([ConsistencyLevel.ANY, ConsistencyLevel.ONE])) + def test_rfthree_tokenaware_one_node_down(self): + self._test_tokenaware_one_node_down( + keyspace='test_rfthree_tokenaware', + rf=3, + accepted=set([ConsistencyLevel.ANY, ConsistencyLevel.ONE, + ConsistencyLevel.TWO, ConsistencyLevel.QUORUM])) def test_rfthree_tokenaware_none_down(self): keyspace = 'test_rfthree_tokenaware_none_down' @@ -312,256 +153,58 @@ class ConsistencyTests(unittest.TestCase): wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=3) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) + self._insert(session, keyspace, count=1) + self._query(session, keyspace, count=1) + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 1) + self.coordinator_stats.assert_query_count_equals(self, 3, 0) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) + self.coordinator_stats.reset_counts() - self.cs.reset_coordinators() + self._assert_writes_succeed(session, keyspace, SINGLE_DC_CONSISTENCY_LEVELS) + self._assert_reads_succeed(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - set([ConsistencyLevel.ANY]), + expected_reader=2) - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ] + def _test_downgrading_cl(self, keyspace, rf, accepted): + cluster = Cluster( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), + default_retry_policy=DowngradingConsistencyRetryPolicy()) + session = cluster.connect() - fail_list = [ - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] + create_schema(session, keyspace, replication_factor=rf) + self._insert(session, keyspace, 1) + self._query(session, keyspace, 1) + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 1) + self.coordinator_stats.assert_query_count_equals(self, 3, 0) - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) + try: + force_stop(2) + wait_for_down(cluster, 2) + self._assert_writes_succeed(session, keyspace, accepted) + self._assert_reads_succeed(session, keyspace, + accepted - set([ConsistencyLevel.ANY])) + self._assert_writes_fail(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - accepted) + self._assert_reads_fail(session, keyspace, + SINGLE_DC_CONSISTENCY_LEVELS - accepted) + finally: + start(2) + wait_for_up(cluster, 2) def test_rfone_downgradingcl(self): - keyspace = 'test_rfone_downgradingcl' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - default_retry_policy=DowngradingConsistencyRetryPolicy()) - session = cluster.connect() - - create_schema(session, keyspace, replication_factor=1) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - - try: - self.cs.reset_coordinators() - force_stop(2) - wait_for_down(cluster, 2) - - accepted_list = [ - ConsistencyLevel.ANY - ] - - fail_list = [ - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL, - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.Unavailable as e: - if not cl in [ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL]: - self._cl_failure(cl, e) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - finally: - start(2) - wait_for_up(cluster, 2) - + self._test_downgrading_cl( + keyspace='test_rfone_downgradingcl', + rf=1, + accepted=set([ConsistencyLevel.ANY])) def test_rftwo_downgradingcl(self): - keyspace = 'test_rftwo_downgradingcl' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - default_retry_policy=DowngradingConsistencyRetryPolicy()) - session = cluster.connect() - - create_schema(session, keyspace, replication_factor=2) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) - - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) - - try: - self.cs.reset_coordinators() - force_stop(2) - wait_for_down(cluster, 2) - - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL - ] - - fail_list = [ - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) - - # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - finally: - start(2) - wait_for_up(cluster, 2) - + self._test_downgrading_cl( + keyspace='test_rftwo_downgradingcl', + rf=2, + accepted=SINGLE_DC_CONSISTENCY_LEVELS) def test_rfthree_roundrobin_downgradingcl(self): keyspace = 'test_rfthree_roundrobin_downgradingcl' @@ -581,82 +224,37 @@ class ConsistencyTests(unittest.TestCase): session = cluster.connect() create_schema(session, keyspace, replication_factor=2) - self.cs.init(session, keyspace, 12) - self.cs.query(session, keyspace, 12) + self._insert(session, keyspace, count=12) + self._query(session, keyspace, count=12) if roundrobin: - self.cs.assert_queried(self, 1, 4) - self.cs.assert_queried(self, 2, 4) - self.cs.assert_queried(self, 3, 4) + self.coordinator_stats.assert_query_count_equals(self, 1, 4) + self.coordinator_stats.assert_query_count_equals(self, 2, 4) + self.coordinator_stats.assert_query_count_equals(self, 3, 4) else: - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 12) - self.cs.assert_queried(self, 3, 0) + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 12) + self.coordinator_stats.assert_query_count_equals(self, 3, 0) try: - self.cs.reset_coordinators() + self.coordinator_stats.reset_counts() force_stop(2) wait_for_down(cluster, 2) - accepted_list = [ - ConsistencyLevel.ANY, - ConsistencyLevel.ONE, - ConsistencyLevel.TWO, - ConsistencyLevel.QUORUM, - ConsistencyLevel.THREE, - ConsistencyLevel.ALL - ] - - fail_list = [ - ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM - ] - - # Test writes that expected to complete successfully - for cl in accepted_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - except Exception as e: - self._cl_failure(cl, e) + self._assert_writes_succeed(session, keyspace, SINGLE_DC_CONSISTENCY_LEVELS) # Test reads that expected to complete successfully - for cl in accepted_list: - try: - self.cs.reset_coordinators() - self.cs.query(session, keyspace, 12, consistency_level=cl) - if roundrobin: - self.cs.assert_queried(self, 1, 6) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 6) - else: - self.cs.assert_queried(self, 1, 0) - self.cs.assert_queried(self, 2, 0) - self.cs.assert_queried(self, 3, 12) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.ANY]: - self._cl_failure(cl, e) - except Exception as e: - self._cl_failure(cl, e) - - # Test writes that expected to fail - for cl in fail_list: - try: - self.cs.init(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) - - # Test reads that expected to fail - for cl in fail_list: - try: - self.cs.query(session, keyspace, 12, consistency_level=cl) - self._cl_expected_failure(cl) - except cassandra.InvalidRequest as e: - if not cl in [ConsistencyLevel.LOCAL_QUORUM, - ConsistencyLevel.EACH_QUORUM]: - self._cl_failure(cl, e) + for cl in SINGLE_DC_CONSISTENCY_LEVELS - set([ConsistencyLevel.ANY]): + self.coordinator_stats.reset_counts() + self._query(session, keyspace, 12, consistency_level=cl) + if roundrobin: + self.coordinator_stats.assert_query_count_equals(self, 1, 6) + self.coordinator_stats.assert_query_count_equals(self, 2, 0) + self.coordinator_stats.assert_query_count_equals(self, 3, 6) + else: + self.coordinator_stats.assert_query_count_equals(self, 1, 0) + self.coordinator_stats.assert_query_count_equals(self, 2, 0) + self.coordinator_stats.assert_query_count_equals(self, 3, 12) finally: start(2) wait_for_up(cluster, 2) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index f132531a..e8e4ca44 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -1,11 +1,8 @@ import logging -import struct import time from collections import defaultdict -from cassandra.query import SimpleStatement -from cassandra import ConsistencyLevel from tests.integration import get_node @@ -13,59 +10,26 @@ log = logging.getLogger(__name__) class CoordinatorStats(): + def __init__(self): - self.coordinators = defaultdict(int) + self.coordinator_counts = defaultdict(int) def add_coordinator(self, future): coordinator = future._current_host.address - self.coordinators[coordinator] += 1 + self.coordinator_counts[coordinator] += 1 if future._errors: - log.error('future._errors: %s' % future._errors) + log.error('future._errors: %s', future._errors) future.result() + def reset_counts(self): + self.coordinator_counts = defaultdict(int) - def reset_coordinators(self): - self.coordinators = defaultdict(int) - - - def get_queried(self, node): - ip = '127.0.0.%s' % node - if not ip in self.coordinators: - return 0 - return self.coordinators[ip] - - - def assert_queried(self, testcase, node, n): - ip = '127.0.0.%s' % node - if ip in self.coordinators: - if self.coordinators[ip] == n: - return - testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( - ip, n, self.coordinators[ip], self.coordinators)) - else: - if n == 0: - return - testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % ( - ip, n, 0, self.coordinators)) - - - def init(self, session, keyspace, n, consistency_level=ConsistencyLevel.ONE): - self.reset_coordinators() - session.execute('USE %s' % keyspace) - for i in range(n): - ss = SimpleStatement('INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf', - consistency_level=consistency_level) - session.execute(ss) - - - def query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE): - routing_key = struct.pack('>i', 0) - for i in range(count): - ss = SimpleStatement('SELECT * FROM %s WHERE k = 0' % 'cf', - consistency_level=consistency_level, - routing_key=routing_key) - self.add_coordinator(session.execute_async(ss)) + def assert_query_count_equals(self, testcase, node, expected): + ip = '127.0.0.%d' % node + if self.coordinator_counts[ip] != expected: + testcase.fail('Expected %d queries to %s, but got %d. Query counts: %s' % ( + expected, ip, self.coordinator_counts[ip], dict(self.coordinator_counts))) def create_schema(session, keyspace, simple_strategy=True, @@ -75,22 +39,22 @@ def create_schema(session, keyspace, simple_strategy=True, 'SELECT keyspace_name FROM system.schema_keyspaces') existing_keyspaces = [row[0] for row in results] if keyspace in existing_keyspaces: - session.execute('DROP KEYSPACE %s' % keyspace) + session.execute('DROP KEYSPACE %s' % keyspace, timeout=10) if simple_strategy: ddl = "CREATE KEYSPACE %s WITH replication" \ " = {'class': 'SimpleStrategy', 'replication_factor': '%s'}" - session.execute(ddl % (keyspace, replication_factor)) + session.execute(ddl % (keyspace, replication_factor), timeout=10) else: if not replication_strategy: raise Exception('replication_strategy is not set') ddl = "CREATE KEYSPACE %s" \ " WITH replication = { 'class' : 'NetworkTopologyStrategy', %s }" - session.execute(ddl % (keyspace, str(replication_strategy)[1:-1])) + session.execute(ddl % (keyspace, str(replication_strategy)[1:-1]), timeout=10) ddl = 'CREATE TABLE %s.cf (k int PRIMARY KEY, i int)' - session.execute(ddl % keyspace) + session.execute(ddl % keyspace, timeout=10) session.execute('USE %s' % keyspace)