From fb8aeada275a2a9e878d251487a6fc4ff8e6db6e Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Thu, 12 Dec 2013 19:25:46 -0600 Subject: [PATCH] ConsistencyTests now work. Added SchemaTests as well. --- cassandra/metadata.py | 9 ++-- tests/integration/long/__init__.py | 1 - tests/integration/long/test_consistency.py | 44 +++++++++---------- tests/integration/long/test_schema.py | 51 ++++++++++++++++++++++ tests/integration/long/utils.py | 47 ++++++++++++-------- 5 files changed, 106 insertions(+), 46 deletions(-) create mode 100644 tests/integration/long/test_schema.py diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 8131bed6..359646a8 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -360,14 +360,13 @@ class SimpleStrategy(ReplicationStrategy): def make_token_replica_map(self, token_to_host_owner, ring): replica_map = {} for i in range(len(ring)): - j, hosts = 0, set() + j, hosts = 0, list() while len(hosts) < self.replication_factor and j < len(ring): token = ring[(i + j) % len(ring)] - hosts.add(token_to_host_owner[token]) + hosts.append(token_to_host_owner[token]) j += 1 replica_map[ring[i]] = hosts - return replica_map def export_for_schema(self): @@ -384,7 +383,7 @@ class NetworkTopologyStrategy(ReplicationStrategy): def make_token_replica_map(self, token_to_host_owner, ring): # note: this does not account for hosts having different racks - replica_map = defaultdict(set) + replica_map = defaultdict(list) ring_len = len(ring) ring_len_range = range(ring_len) dc_rf_map = dict((dc, int(rf)) @@ -401,7 +400,7 @@ class NetworkTopologyStrategy(ReplicationStrategy): # we already have all replicas for this DC continue - replica_map[ring[i]].add(host) + replica_map[ring[i]].append(host) if remaining[dc] == 1: del remaining[dc] diff --git a/tests/integration/long/__init__.py b/tests/integration/long/__init__.py index a1379440..e69de29b 100644 --- a/tests/integration/long/__init__.py +++ b/tests/integration/long/__init__.py @@ -1 +0,0 @@ -__author__ = 'joaquin' diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index d2d8ca24..fb976ce9 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -5,7 +5,7 @@ from cassandra.cluster import Cluster from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy from tests.integration.long.utils import reset_coordinators, force_stop, \ create_schema, init, query, assert_queried, wait_for_down, wait_for_up, \ - start + start, get_queried try: import unittest2 as unittest @@ -29,6 +29,8 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() + wait_for_up(cluster, 1) + wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=1) init(session, keyspace, 12) @@ -38,7 +40,6 @@ class ConsistencyTests(unittest.TestCase): assert_queried(2, 12) assert_queried(3, 0) - try: reset_coordinators() force_stop(2) @@ -57,12 +58,11 @@ class ConsistencyTests(unittest.TestCase): ] # Test writes that expected to complete successfully - # BUG: CL.ANY should work - # for cl in accepted_list: - # try: - # init(session, keyspace, 12, consistency_level=cl) - # except Exception as e: - # self._cl_failure(cl, e) + for cl in accepted_list: + try: + init(session, keyspace, 12, consistency_level=cl) + except Exception as e: + self._cl_failure(cl, e) # Test reads that expected to complete successfully for cl in accepted_list: @@ -118,18 +118,19 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() + wait_for_up(cluster, 1) + wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=2) init(session, keyspace, 12) - wait_for_up(cluster, 2) - - reset_coordinators() query(session, keyspace, 12) + assert_queried(1, 0) assert_queried(2, 12) assert_queried(3, 0) try: + reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -159,11 +160,6 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - # Bug: I believe the Java-Driver does this differently - # and RoundRobins after the ideal token is not available. - # I like the Python Driver's approach, but we should - # probably make all policies act the same way, whichever - # way gets chosen? assert_queried(1, 0) assert_queried(2, 0) assert_queried(3, 12) @@ -218,14 +214,14 @@ class ConsistencyTests(unittest.TestCase): create_schema(session, keyspace, replication_factor=3) init(session, keyspace, 12) - - reset_coordinators() query(session, keyspace, 12) + assert_queried(1, 0) assert_queried(2, 12) assert_queried(3, 0) try: + reset_coordinators() force_stop(2) wait_for_down(cluster, 2) @@ -255,10 +251,9 @@ class ConsistencyTests(unittest.TestCase): try: reset_coordinators() query(session, keyspace, 12, consistency_level=cl) - # Bug: I believe the Java-Driver does this differently - assert_queried(1, 12) + assert_queried(1, 0) assert_queried(2, 0) - assert_queried(3, 0) + assert_queried(3, 12) except cassandra.InvalidRequest as e: if not cl in [ConsistencyLevel.ANY]: self._cl_failure(cl, e) @@ -308,16 +303,19 @@ class ConsistencyTests(unittest.TestCase): cluster = Cluster( load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) session = cluster.connect() + wait_for_up(cluster, 1) + wait_for_up(cluster, 2) create_schema(session, keyspace, replication_factor=3) init(session, keyspace, 12) - - reset_coordinators() query(session, keyspace, 12) + assert_queried(1, 0) assert_queried(2, 12) assert_queried(3, 0) + reset_coordinators() + accepted_list = [ ConsistencyLevel.ANY, ConsistencyLevel.ONE, diff --git a/tests/integration/long/test_schema.py b/tests/integration/long/test_schema.py new file mode 100644 index 00000000..57484d46 --- /dev/null +++ b/tests/integration/long/test_schema.py @@ -0,0 +1,51 @@ +import logging +import cassandra + +from cassandra import ConsistencyLevel +from cassandra.cluster import Cluster +from cassandra.query import SimpleStatement + + +try: + import unittest2 as unittest +except ImportError: + import unittest # noqa + +log = logging.getLogger(__name__) + +class SchemaTests(unittest.TestCase): + def test_recreates(self): + cluster = Cluster() + session = cluster.connect() + + + replication_factor = 3 + + for i in range(2): + for keyspace in range(0, 100): + keyspace = 'ks_%s' % keyspace + results = session.execute('SELECT keyspace_name FROM system.schema_keyspaces') + existing_keyspaces = [row[0] for row in results] + if keyspace in existing_keyspaces: + ddl = 'DROP KEYSPACE %s' % keyspace + log.debug(ddl) + session.execute(ddl) + + ddl = "CREATE KEYSPACE %s WITH replication" \ + " = {'class': 'SimpleStrategy', 'replication_factor': '%s'}" % (keyspace, replication_factor) + log.debug(ddl) + session.execute(ddl) + + ddl = 'CREATE TABLE %s.cf (k int PRIMARY KEY, i int)' % keyspace + log.debug(ddl) + session.execute(ddl) + + statement = 'USE %s' % keyspace + log.debug(ddl) + session.execute(statement) + + statement = 'INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf' + log.debug(statement) + ss = SimpleStatement(statement, + consistency_level=ConsistencyLevel.QUORUM) + session.execute(ss) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index 43322efe..88ac1042 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -26,39 +26,52 @@ def reset_coordinators(): coordinators = {} +def get_queried(node): + ip = '127.0.0.%s' % node + if not ip in coordinators: + return 0 + return coordinators[ip] + + def assert_queried(node, n): ip = '127.0.0.%s' % node - print coordinators if ip in coordinators: if coordinators[ip] == n: return raise RuntimeError( - 'IP: %s. Expected: %s. Received: %s.' % (ip, n, coordinators[ip])) + 'IP: %s. Expected: %s. Received: %s. Full detail: %s.' % (ip, n, coordinators[ip], coordinators)) else: if n == 0: return - raise RuntimeError('IP: %s. Expected: %s. Received: %s.' % (ip, n, 0)) + raise RuntimeError('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % (ip, n, 0, coordinators)) def create_schema(session, keyspace, replication_class='SimpleStrategy', replication_factor=1): + results = session.execute( 'SELECT keyspace_name FROM system.schema_keyspaces') existing_keyspaces = [row[0] for row in results] if keyspace in existing_keyspaces: session.execute('DROP KEYSPACE %s' % keyspace) + if replication_class == 'SimpleStrategy': - ddl = "\n CREATE KEYSPACE %s\n WITH replication" \ + ddl = "CREATE KEYSPACE %s WITH replication" \ " = {'class': 'SimpleStrategy', 'replication_factor': '%s'}" session.execute(ddl % (keyspace, replication_factor)) - ddl = '\n CREATE TABLE %s.cf (\n k int PRIMARY ' \ - 'KEY,\n i int)\n ' + + ddl = 'CREATE TABLE %s.cf (k int PRIMARY KEY, i int)' session.execute(ddl % keyspace) + session.execute('USE %s' % keyspace) + + # BUG: probably related to PYTHON-39 + time.sleep(5) def init(session, keyspace, n, consistency_level=ConsistencyLevel.ONE): reset_coordinators() - session.execute('USE %s' % keyspace) + # BUG: PYTHON-38 + # session.execute('USE %s' % keyspace) for i in range(n): ss = SimpleStatement('INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf', consistency_level=consistency_level) @@ -85,24 +98,24 @@ def stop(node): def force_stop(node): get_node(node).stop(wait=False, gently=False) +def ring(node): + print 'From node%s:' % node + get_node(node).nodetool('ring') + def wait_for_up(cluster, node): while True: host = cluster.metadata.get_host('127.0.0.%s' % node) - if host and host.monitor.is_up: - # BUG: This shouldn't be needed. - # Ideally, host.monitor.is_up would be enough? - # If not, what should I be using? - # time.sleep(25) + if host and host.is_up: + # BUG: shouldn't have to, but we do + time.sleep(5) return def wait_for_down(cluster, node): while True: host = cluster.metadata.get_host('127.0.0.%s' % node) - if not host or not host.monitor.is_up: - # BUG: This shouldn't be needed. - # Ideally, host.monitor.is_up would be enough? - # If not, what should I be using? - # time.sleep(25) + if not host or not host.is_up: + # BUG: shouldn't have to, but we do + time.sleep(5) return