Consistency test cleanup

This commit is contained in:
Tyler Hobbs
2013-12-27 17:51:53 -06:00
parent 5a64fa2b67
commit ed842e9440
2 changed files with 179 additions and 617 deletions

View File

@@ -1,10 +1,13 @@
import struct
import traceback
import cassandra
import cassandra
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster
from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, \
DowngradingConsistencyRetryPolicy
from cassandra.query import SimpleStatement
from tests.integration.long.utils import force_stop, create_schema, \
wait_for_down, wait_for_up, start, CoordinatorStats
@@ -13,295 +16,133 @@ try:
except ImportError:
import unittest # noqa
ALL_CONSISTENCY_LEVELS = set([
ConsistencyLevel.ANY, ConsistencyLevel.ONE, ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM, ConsistencyLevel.THREE,
ConsistencyLevel.ALL, ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM])
MULTI_DC_CONSISTENCY_LEVELS = set([
ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.EACH_QUORUM])
SINGLE_DC_CONSISTENCY_LEVELS = ALL_CONSISTENCY_LEVELS - MULTI_DC_CONSISTENCY_LEVELS
class ConsistencyTests(unittest.TestCase):
def setUp(self):
self.cs = CoordinatorStats()
self.coordinator_stats = CoordinatorStats()
def _cl_failure(self, consistency_level, e):
self.fail('%s seen for CL.%s:\n\n%s' % (
type(e), ConsistencyLevel.value_to_name[consistency_level],
self.fail('Instead of success, saw %s for CL.%s:\n\n%s' % (
e, ConsistencyLevel.value_to_name[consistency_level],
traceback.format_exc()))
def _cl_expected_failure(self, cl):
self.fail('Test passed at ConsistencyLevel.%s:\n\n%s' % (
ConsistencyLevel.value_to_name[cl], traceback.format_exc()))
def _insert(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE):
session.execute('USE %s' % keyspace)
for i in range(count):
ss = SimpleStatement('INSERT INTO cf(k, i) VALUES (0, 0)',
consistency_level=consistency_level)
session.execute(ss)
def test_rfone_tokenaware(self):
keyspace = 'test_rfone_tokenaware'
def _query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE):
routing_key = struct.pack('>i', 0)
for i in range(count):
ss = SimpleStatement('SELECT * FROM cf WHERE k = 0',
consistency_level=consistency_level,
routing_key=routing_key)
self.coordinator_stats.add_coordinator(session.execute_async(ss))
def _assert_writes_succeed(self, session, keyspace, consistency_levels):
for cl in consistency_levels:
self.coordinator_stats.reset_counts()
try:
self._insert(session, keyspace, 1, cl)
except Exception as e:
self._cl_failure(cl, e)
def _assert_reads_succeed(self, session, keyspace, consistency_levels, expected_reader=3):
for cl in consistency_levels:
self.coordinator_stats.reset_counts()
try:
self._query(session, keyspace, 1, cl)
for i in range(3):
if i == expected_reader:
self.coordinator_stats.assert_query_count_equals(self, i, 1)
else:
self.coordinator_stats.assert_query_count_equals(self, i, 0)
except Exception as e:
self._cl_failure(cl, e)
def _assert_writes_fail(self, session, keyspace, consistency_levels):
for cl in consistency_levels:
self.coordinator_stats.reset_counts()
try:
self._insert(session, keyspace, 1, cl)
self._cl_expected_failure(cl)
except (cassandra.Unavailable, cassandra.WriteTimeout):
pass
def _assert_reads_fail(self, session, keyspace, consistency_levels):
for cl in consistency_levels:
self.coordinator_stats.reset_counts()
try:
self._query(session, keyspace, 1, cl)
self._cl_expected_failure(cl)
except (cassandra.Unavailable, cassandra.ReadTimeout):
pass
def _test_tokenaware_one_node_down(self, keyspace, rf, accepted):
cluster = Cluster(
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()))
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2)
create_schema(session, keyspace, replication_factor=1)
self.cs.init(session, keyspace, 12)
self.cs.query(session, keyspace, 12)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 12)
self.cs.assert_queried(self, 3, 0)
create_schema(session, keyspace, replication_factor=rf)
self._insert(session, keyspace, count=1)
self._query(session, keyspace, count=1)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 1)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
try:
self.cs.reset_coordinators()
force_stop(2)
wait_for_down(cluster, 2)
accepted_list = [ConsistencyLevel.ANY]
fail_list = [
ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL,
ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM
]
# Test writes that expected to complete successfully
for cl in accepted_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
except Exception as e:
self._cl_failure(cl, e)
# Test reads that expected to complete successfully
for cl in accepted_list:
try:
self.cs.query(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.ANY]:
self._cl_failure(cl, e)
except Exception as e:
self._cl_failure(cl, e)
# Test writes that expected to fail
for cl in fail_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except (cassandra.Unavailable, cassandra.WriteTimeout) as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
# Test reads that expected to fail
for cl in fail_list:
try:
self.cs.query(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.Unavailable as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
self._assert_writes_succeed(session, keyspace, accepted)
self._assert_reads_succeed(session, keyspace,
accepted - set([ConsistencyLevel.ANY]))
self._assert_writes_fail(session, keyspace,
SINGLE_DC_CONSISTENCY_LEVELS - accepted)
self._assert_reads_fail(session, keyspace,
SINGLE_DC_CONSISTENCY_LEVELS - accepted)
finally:
start(2)
wait_for_up(cluster, 2)
def test_rfone_tokenaware_one_node_down(self):
self._test_tokenaware_one_node_down(
keyspace='test_rfone_tokenaware',
rf=1,
accepted=set([ConsistencyLevel.ANY]))
def test_rftwo_tokenaware(self):
keyspace = 'test_rftwo_tokenaware'
cluster = Cluster(
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()))
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2)
create_schema(session, keyspace, replication_factor=2)
self.cs.init(session, keyspace, 12)
self.cs.query(session, keyspace, 12)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 12)
self.cs.assert_queried(self, 3, 0)
try:
self.cs.reset_coordinators()
force_stop(2)
wait_for_down(cluster, 2)
accepted_list = [
ConsistencyLevel.ANY,
ConsistencyLevel.ONE
]
fail_list = [
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL,
ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM
]
# Test writes that expected to complete successfully
for cl in accepted_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
except Exception as e:
self._cl_failure(cl, e)
# Test reads that expected to complete successfully
for cl in accepted_list:
try:
self.cs.reset_coordinators()
self.cs.query(session, keyspace, 12, consistency_level=cl)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 0)
self.cs.assert_queried(self, 3, 12)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.ANY]:
self._cl_failure(cl, e)
except Exception as e:
self._cl_failure(cl, e)
# Test writes that expected to fail
for cl in fail_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.Unavailable as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
# Test reads that expected to fail
for cl in fail_list:
try:
self.cs.query(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.Unavailable as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
finally:
start(2)
wait_for_up(cluster, 2)
def test_rfthree_tokenaware(self):
keyspace = 'test_rfthree_tokenaware'
cluster = Cluster(
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()))
session = cluster.connect()
create_schema(session, keyspace, replication_factor=3)
self.cs.init(session, keyspace, 12)
self.cs.query(session, keyspace, 12)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 12)
self.cs.assert_queried(self, 3, 0)
try:
self.cs.reset_coordinators()
force_stop(2)
wait_for_down(cluster, 2)
accepted_list = [
ConsistencyLevel.ANY,
ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM
]
fail_list = [
ConsistencyLevel.THREE,
ConsistencyLevel.ALL,
ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM
]
# Test writes that expected to complete successfully
for cl in accepted_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
except Exception as e:
self._cl_failure(cl, e)
# Test reads that expected to complete successfully
for cl in accepted_list:
try:
self.cs.reset_coordinators()
self.cs.query(session, keyspace, 12, consistency_level=cl)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 0)
self.cs.assert_queried(self, 3, 12)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.ANY]:
self._cl_failure(cl, e)
except Exception as e:
self._cl_failure(cl, e)
# Test writes that expected to fail
for cl in fail_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.Unavailable as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
# Test reads that expected to fail
for cl in fail_list:
try:
self.cs.query(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.Unavailable as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
finally:
start(2)
wait_for_up(cluster, 2)
def test_rftwo_tokenaware_one_node_down(self):
self._test_tokenaware_one_node_down(
keyspace='test_rftwo_tokenaware',
rf=2,
accepted=set([ConsistencyLevel.ANY, ConsistencyLevel.ONE]))
def test_rfthree_tokenaware_one_node_down(self):
self._test_tokenaware_one_node_down(
keyspace='test_rfthree_tokenaware',
rf=3,
accepted=set([ConsistencyLevel.ANY, ConsistencyLevel.ONE,
ConsistencyLevel.TWO, ConsistencyLevel.QUORUM]))
def test_rfthree_tokenaware_none_down(self):
keyspace = 'test_rfthree_tokenaware_none_down'
@@ -312,256 +153,58 @@ class ConsistencyTests(unittest.TestCase):
wait_for_up(cluster, 2)
create_schema(session, keyspace, replication_factor=3)
self.cs.init(session, keyspace, 12)
self.cs.query(session, keyspace, 12)
self._insert(session, keyspace, count=1)
self._query(session, keyspace, count=1)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 1)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 12)
self.cs.assert_queried(self, 3, 0)
self.coordinator_stats.reset_counts()
self.cs.reset_coordinators()
self._assert_writes_succeed(session, keyspace, SINGLE_DC_CONSISTENCY_LEVELS)
self._assert_reads_succeed(session, keyspace,
SINGLE_DC_CONSISTENCY_LEVELS - set([ConsistencyLevel.ANY]),
expected_reader=2)
accepted_list = [
ConsistencyLevel.ANY,
ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL,
]
def _test_downgrading_cl(self, keyspace, rf, accepted):
cluster = Cluster(
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
default_retry_policy=DowngradingConsistencyRetryPolicy())
session = cluster.connect()
fail_list = [
ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM
]
create_schema(session, keyspace, replication_factor=rf)
self._insert(session, keyspace, 1)
self._query(session, keyspace, 1)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 1)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
# Test writes that expected to complete successfully
for cl in accepted_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
except Exception as e:
self._cl_failure(cl, e)
# Test reads that expected to complete successfully
for cl in accepted_list:
try:
self.cs.reset_coordinators()
self.cs.query(session, keyspace, 12, consistency_level=cl)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 12)
self.cs.assert_queried(self, 3, 0)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.ANY]:
self._cl_failure(cl, e)
except Exception as e:
self._cl_failure(cl, e)
# Test writes that expected to fail
for cl in fail_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.Unavailable as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
# Test reads that expected to fail
for cl in fail_list:
try:
self.cs.query(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.Unavailable as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
try:
force_stop(2)
wait_for_down(cluster, 2)
self._assert_writes_succeed(session, keyspace, accepted)
self._assert_reads_succeed(session, keyspace,
accepted - set([ConsistencyLevel.ANY]))
self._assert_writes_fail(session, keyspace,
SINGLE_DC_CONSISTENCY_LEVELS - accepted)
self._assert_reads_fail(session, keyspace,
SINGLE_DC_CONSISTENCY_LEVELS - accepted)
finally:
start(2)
wait_for_up(cluster, 2)
def test_rfone_downgradingcl(self):
keyspace = 'test_rfone_downgradingcl'
cluster = Cluster(
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
default_retry_policy=DowngradingConsistencyRetryPolicy())
session = cluster.connect()
create_schema(session, keyspace, replication_factor=1)
self.cs.init(session, keyspace, 12)
self.cs.query(session, keyspace, 12)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 12)
self.cs.assert_queried(self, 3, 0)
try:
self.cs.reset_coordinators()
force_stop(2)
wait_for_down(cluster, 2)
accepted_list = [
ConsistencyLevel.ANY
]
fail_list = [
ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL,
ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM
]
# Test writes that expected to complete successfully
for cl in accepted_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
except Exception as e:
self._cl_failure(cl, e)
# Test reads that expected to complete successfully
for cl in accepted_list:
try:
self.cs.reset_coordinators()
self.cs.query(session, keyspace, 12, consistency_level=cl)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 0)
self.cs.assert_queried(self, 3, 12)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.ANY]:
self._cl_failure(cl, e)
except Exception as e:
self._cl_failure(cl, e)
# Test writes that expected to fail
for cl in fail_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.Unavailable as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
# Test reads that expected to fail
for cl in fail_list:
try:
self.cs.query(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.Unavailable as e:
if not cl in [ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL]:
self._cl_failure(cl, e)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
finally:
start(2)
wait_for_up(cluster, 2)
self._test_downgrading_cl(
keyspace='test_rfone_downgradingcl',
rf=1,
accepted=set([ConsistencyLevel.ANY]))
def test_rftwo_downgradingcl(self):
keyspace = 'test_rftwo_downgradingcl'
cluster = Cluster(
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
default_retry_policy=DowngradingConsistencyRetryPolicy())
session = cluster.connect()
create_schema(session, keyspace, replication_factor=2)
self.cs.init(session, keyspace, 12)
self.cs.query(session, keyspace, 12)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 12)
self.cs.assert_queried(self, 3, 0)
try:
self.cs.reset_coordinators()
force_stop(2)
wait_for_down(cluster, 2)
accepted_list = [
ConsistencyLevel.ANY,
ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL
]
fail_list = [
ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM
]
# Test writes that expected to complete successfully
for cl in accepted_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
except Exception as e:
self._cl_failure(cl, e)
# Test reads that expected to complete successfully
for cl in accepted_list:
try:
self.cs.reset_coordinators()
self.cs.query(session, keyspace, 12, consistency_level=cl)
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 0)
self.cs.assert_queried(self, 3, 12)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.ANY]:
self._cl_failure(cl, e)
except Exception as e:
self._cl_failure(cl, e)
# Test writes that expected to fail
for cl in fail_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
# Test reads that expected to fail
for cl in fail_list:
try:
self.cs.query(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
finally:
start(2)
wait_for_up(cluster, 2)
self._test_downgrading_cl(
keyspace='test_rftwo_downgradingcl',
rf=2,
accepted=SINGLE_DC_CONSISTENCY_LEVELS)
def test_rfthree_roundrobin_downgradingcl(self):
keyspace = 'test_rfthree_roundrobin_downgradingcl'
@@ -581,82 +224,37 @@ class ConsistencyTests(unittest.TestCase):
session = cluster.connect()
create_schema(session, keyspace, replication_factor=2)
self.cs.init(session, keyspace, 12)
self.cs.query(session, keyspace, 12)
self._insert(session, keyspace, count=12)
self._query(session, keyspace, count=12)
if roundrobin:
self.cs.assert_queried(self, 1, 4)
self.cs.assert_queried(self, 2, 4)
self.cs.assert_queried(self, 3, 4)
self.coordinator_stats.assert_query_count_equals(self, 1, 4)
self.coordinator_stats.assert_query_count_equals(self, 2, 4)
self.coordinator_stats.assert_query_count_equals(self, 3, 4)
else:
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 12)
self.cs.assert_queried(self, 3, 0)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 12)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
try:
self.cs.reset_coordinators()
self.coordinator_stats.reset_counts()
force_stop(2)
wait_for_down(cluster, 2)
accepted_list = [
ConsistencyLevel.ANY,
ConsistencyLevel.ONE,
ConsistencyLevel.TWO,
ConsistencyLevel.QUORUM,
ConsistencyLevel.THREE,
ConsistencyLevel.ALL
]
fail_list = [
ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM
]
# Test writes that expected to complete successfully
for cl in accepted_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
except Exception as e:
self._cl_failure(cl, e)
self._assert_writes_succeed(session, keyspace, SINGLE_DC_CONSISTENCY_LEVELS)
# Test reads that expected to complete successfully
for cl in accepted_list:
try:
self.cs.reset_coordinators()
self.cs.query(session, keyspace, 12, consistency_level=cl)
if roundrobin:
self.cs.assert_queried(self, 1, 6)
self.cs.assert_queried(self, 2, 0)
self.cs.assert_queried(self, 3, 6)
else:
self.cs.assert_queried(self, 1, 0)
self.cs.assert_queried(self, 2, 0)
self.cs.assert_queried(self, 3, 12)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.ANY]:
self._cl_failure(cl, e)
except Exception as e:
self._cl_failure(cl, e)
# Test writes that expected to fail
for cl in fail_list:
try:
self.cs.init(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
# Test reads that expected to fail
for cl in fail_list:
try:
self.cs.query(session, keyspace, 12, consistency_level=cl)
self._cl_expected_failure(cl)
except cassandra.InvalidRequest as e:
if not cl in [ConsistencyLevel.LOCAL_QUORUM,
ConsistencyLevel.EACH_QUORUM]:
self._cl_failure(cl, e)
for cl in SINGLE_DC_CONSISTENCY_LEVELS - set([ConsistencyLevel.ANY]):
self.coordinator_stats.reset_counts()
self._query(session, keyspace, 12, consistency_level=cl)
if roundrobin:
self.coordinator_stats.assert_query_count_equals(self, 1, 6)
self.coordinator_stats.assert_query_count_equals(self, 2, 0)
self.coordinator_stats.assert_query_count_equals(self, 3, 6)
else:
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 0)
self.coordinator_stats.assert_query_count_equals(self, 3, 12)
finally:
start(2)
wait_for_up(cluster, 2)

View File

@@ -1,11 +1,8 @@
import logging
import struct
import time
from collections import defaultdict
from cassandra.query import SimpleStatement
from cassandra import ConsistencyLevel
from tests.integration import get_node
@@ -13,59 +10,26 @@ log = logging.getLogger(__name__)
class CoordinatorStats():
def __init__(self):
self.coordinators = defaultdict(int)
self.coordinator_counts = defaultdict(int)
def add_coordinator(self, future):
coordinator = future._current_host.address
self.coordinators[coordinator] += 1
self.coordinator_counts[coordinator] += 1
if future._errors:
log.error('future._errors: %s' % future._errors)
log.error('future._errors: %s', future._errors)
future.result()
def reset_counts(self):
self.coordinator_counts = defaultdict(int)
def reset_coordinators(self):
self.coordinators = defaultdict(int)
def get_queried(self, node):
ip = '127.0.0.%s' % node
if not ip in self.coordinators:
return 0
return self.coordinators[ip]
def assert_queried(self, testcase, node, n):
ip = '127.0.0.%s' % node
if ip in self.coordinators:
if self.coordinators[ip] == n:
return
testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % (
ip, n, self.coordinators[ip], self.coordinators))
else:
if n == 0:
return
testcase.fail('IP: %s. Expected: %s. Received: %s. Full detail: %s.' % (
ip, n, 0, self.coordinators))
def init(self, session, keyspace, n, consistency_level=ConsistencyLevel.ONE):
self.reset_coordinators()
session.execute('USE %s' % keyspace)
for i in range(n):
ss = SimpleStatement('INSERT INTO %s(k, i) VALUES (0, 0)' % 'cf',
consistency_level=consistency_level)
session.execute(ss)
def query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE):
routing_key = struct.pack('>i', 0)
for i in range(count):
ss = SimpleStatement('SELECT * FROM %s WHERE k = 0' % 'cf',
consistency_level=consistency_level,
routing_key=routing_key)
self.add_coordinator(session.execute_async(ss))
def assert_query_count_equals(self, testcase, node, expected):
ip = '127.0.0.%d' % node
if self.coordinator_counts[ip] != expected:
testcase.fail('Expected %d queries to %s, but got %d. Query counts: %s' % (
expected, ip, self.coordinator_counts[ip], dict(self.coordinator_counts)))
def create_schema(session, keyspace, simple_strategy=True,
@@ -75,22 +39,22 @@ def create_schema(session, keyspace, simple_strategy=True,
'SELECT keyspace_name FROM system.schema_keyspaces')
existing_keyspaces = [row[0] for row in results]
if keyspace in existing_keyspaces:
session.execute('DROP KEYSPACE %s' % keyspace)
session.execute('DROP KEYSPACE %s' % keyspace, timeout=10)
if simple_strategy:
ddl = "CREATE KEYSPACE %s WITH replication" \
" = {'class': 'SimpleStrategy', 'replication_factor': '%s'}"
session.execute(ddl % (keyspace, replication_factor))
session.execute(ddl % (keyspace, replication_factor), timeout=10)
else:
if not replication_strategy:
raise Exception('replication_strategy is not set')
ddl = "CREATE KEYSPACE %s" \
" WITH replication = { 'class' : 'NetworkTopologyStrategy', %s }"
session.execute(ddl % (keyspace, str(replication_strategy)[1:-1]))
session.execute(ddl % (keyspace, str(replication_strategy)[1:-1]), timeout=10)
ddl = 'CREATE TABLE %s.cf (k int PRIMARY KEY, i int)'
session.execute(ddl % keyspace)
session.execute(ddl % keyspace, timeout=10)
session.execute('USE %s' % keyspace)