First Pass at hardening test harness, against leaky session and adding and removal issues

This commit is contained in:
GregBestland
2015-11-05 17:21:47 -06:00
parent 76712df0ce
commit c91615a931
6 changed files with 76 additions and 58 deletions

View File

@@ -22,7 +22,7 @@ from threading import Event
from subprocess import call
from itertools import groupby
from cassandra import OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure
from cassandra import OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure, AlreadyExists
from cassandra.cluster import Cluster
from cassandra.protocol import ConfigurationException
@@ -262,7 +262,8 @@ def execute_until_pass(session, query):
while tries < 100:
try:
return session.execute(query)
except ConfigurationException:
except (ConfigurationException, AlreadyExists):
log.warn("Recieved already exists from query {0} not exiting".format(query))
# keyspace/table was already created/dropped
return
except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure):
@@ -274,6 +275,36 @@ def execute_until_pass(session, query):
raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query))
def execute_with_long_wait_retry(session, query, timeout=30):
tries = 0
while tries < 10:
try:
return session.execute(query, timeout=timeout)
except (ConfigurationException, AlreadyExists):
log.warn("Recieved already exists from query {0} not exiting".format(query))
# keyspace/table was already created/dropped
return
except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure):
ex_type, ex, tb = sys.exc_info()
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
del tb
tries += 1
raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query))
def drop_keyspace_shutdown_cluster(keyspace_name, session, cluster):
try:
execute_with_long_wait_retry(session, "DROP KEYSPACE {0}".format(keyspace_name))
except:
log.warn("Error encountered when droping keyspace {0}".format(keyspace_name))
ex_type, ex, tb = sys.exc_info()
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
del tb
log.warn("Shutting down cluster")
cluster.shutdown()
def setup_keyspace(ipformat=None):
# wait for nodes to startup
time.sleep(10)
@@ -292,23 +323,23 @@ def setup_keyspace(ipformat=None):
ddl = '''
CREATE KEYSPACE test3rf
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}'''
execute_until_pass(session, ddl)
execute_with_long_wait_retry(session, ddl)
ddl = '''
CREATE KEYSPACE test2rf
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'}'''
execute_until_pass(session, ddl)
execute_with_long_wait_retry(session, ddl)
ddl = '''
CREATE KEYSPACE test1rf
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}'''
execute_until_pass(session, ddl)
execute_with_long_wait_retry(session, ddl)
ddl = '''
CREATE TABLE test3rf.test (
k int PRIMARY KEY,
v int )'''
execute_until_pass(session, ddl)
execute_with_long_wait_retry(session, ddl)
except Exception:
traceback.print_exc()
@@ -356,12 +387,12 @@ class BasicKeyspaceUnitTestCase(unittest.TestCase):
@classmethod
def drop_keyspace(cls):
execute_until_pass(cls.session, "DROP KEYSPACE {0}".format(cls.ks_name))
execute_with_long_wait_retry(cls.session, "DROP KEYSPACE {0}".format(cls.ks_name))
@classmethod
def create_keyspace(cls, rf):
ddl = "CREATE KEYSPACE {0} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '{1}'}}".format(cls.ks_name, rf)
execute_until_pass(cls.session, ddl)
execute_with_long_wait_retry(cls.session, ddl)
@classmethod
def common_setup(cls, rf, create_class_table=False, skip_if_cass_version_less_than=None):
@@ -369,6 +400,7 @@ class BasicKeyspaceUnitTestCase(unittest.TestCase):
cls.session = cls.cluster.connect()
cls.ks_name = cls.__name__.lower()
cls.create_keyspace(rf)
cls.cass_version = get_server_versions()
if create_class_table:
@@ -401,8 +433,7 @@ class BasicSharedKeyspaceUnitTestCase(BasicKeyspaceUnitTestCase):
@classmethod
def tearDownClass(cls):
cls.drop_keyspace()
cls.cluster.shutdown()
drop_keyspace_shutdown_cluster(cls.keyspace_name, cls.session, cls.cluster)
class BasicSharedKeyspaceUnitTestCaseWTable(BasicSharedKeyspaceUnitTestCase):
@@ -479,5 +510,4 @@ class BasicSegregatedKeyspaceUnitTestCase(BasicKeyspaceUnitTestCase):
self.common_setup(1)
def tearDown(self):
self.drop_keyspace()
self.cluster.shutdown()
drop_keyspace_shutdown_cluster(self.keyspace_name, self.session, self.cluster)

View File

@@ -31,7 +31,7 @@ from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy,
from cassandra.protocol import MAX_SUPPORTED_VERSION
from cassandra.query import SimpleStatement, TraceUnavailable
from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, get_node, CASSANDRA_VERSION, execute_until_pass
from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, get_node, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry
from tests.integration.util import assert_quiescent_pool_state
@@ -80,7 +80,7 @@ class ClusterTests(unittest.TestCase):
""")
self.assertFalse(result)
result = execute_until_pass(session,
result = execute_with_long_wait_retry(session,
"""
CREATE TABLE clustertests.cf0 (
a text,
@@ -100,7 +100,7 @@ class ClusterTests(unittest.TestCase):
result = session.execute("SELECT * FROM clustertests.cf0")
self.assertEqual([('a', 'b', 'c')], result)
execute_until_pass(session, "DROP KEYSPACE clustertests")
execute_with_long_wait_retry(session, "DROP KEYSPACE clustertests")
cluster.shutdown()

View File

@@ -20,7 +20,7 @@ except ImportError:
from cassandra.protocol import ProtocolHandler, ResultMessage, UUIDType, read_int, EventMessage
from cassandra.query import tuple_factory
from cassandra.cluster import Cluster
from tests.integration import use_singledc, PROTOCOL_VERSION, execute_until_pass
from tests.integration import use_singledc, PROTOCOL_VERSION, drop_keyspace_shutdown_cluster
from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES
from tests.integration.standard.utils import create_table_with_all_types, get_all_primitive_params
from six import binary_type
@@ -44,8 +44,7 @@ class CustomProtocolHandlerTest(unittest.TestCase):
@classmethod
def tearDownClass(cls):
cls.session.execute("DROP KEYSPACE custserdes")
cls.cluster.shutdown()
drop_keyspace_shutdown_cluster("custserdes", cls.session, cls.cluster)
def test_custom_raw_uuid_row_results(self):
"""

View File

@@ -11,7 +11,7 @@ from cassandra.query import tuple_factory
from cassandra.cluster import Cluster
from cassandra.protocol import ProtocolHandler, LazyProtocolHandler, NumpyProtocolHandler
from tests.integration import use_singledc, PROTOCOL_VERSION, notprotocolv1
from tests.integration import use_singledc, PROTOCOL_VERSION, notprotocolv1, drop_keyspace_shutdown_cluster
from tests.integration.datatype_utils import update_datatypes
from tests.integration.standard.utils import (
create_table_with_all_types, get_all_primitive_params, get_primitive_datatypes)
@@ -38,8 +38,7 @@ class CythonProtocolHandlerTest(unittest.TestCase):
@classmethod
def tearDownClass(cls):
cls.session.execute("DROP KEYSPACE testspace")
cls.cluster.shutdown()
drop_keyspace_shutdown_cluster("testspace", cls.session, cls.session)
@cythontest
def test_cython_parser(self):

View File

@@ -34,7 +34,7 @@ from cassandra.policies import SimpleConvictionPolicy
from cassandra.pool import Host
from tests.integration import get_cluster, use_singledc, PROTOCOL_VERSION, get_server_versions, execute_until_pass, \
BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase
BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster
def setup_module():
@@ -1863,7 +1863,6 @@ class BadMetaTest(unittest.TestCase):
class BadMetaException(Exception):
pass
@property
def function_name(self):
return self._testMethodName.lower()
@@ -1880,8 +1879,7 @@ class BadMetaTest(unittest.TestCase):
@classmethod
def teardown_class(cls):
cls.session.execute("DROP KEYSPACE %s" % cls.keyspace_name)
cls.cluster.shutdown()
drop_keyspace_shutdown_cluster(cls.keyspace_name, cls.session, cls.cluster)
def _skip_if_not_version(self, version):
if CASS_SERVER_VERSION < version:

View File

@@ -32,18 +32,23 @@ def setup_module():
class MetricsTests(unittest.TestCase):
def setUp(self):
self.cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION)
self.session = self.cluster.connect("test3rf")
def tearDown(self):
self.cluster.shutdown()
def test_connection_error(self):
"""
Trigger and ensure connection_errors are counted
Stop all node with the driver knowing about the "DOWN" states.
"""
cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION)
session = cluster.connect("test3rf")
# Test writes
for i in range(0, 100):
session.execute_async("INSERT INTO test (k, v) VALUES ({0}, {1})".format(i, i))
self.session.execute_async("INSERT INTO test (k, v) VALUES ({0}, {1})".format(i, i))
# Stop the cluster
get_cluster().stop(wait=True, gently=False)
@@ -52,14 +57,13 @@ class MetricsTests(unittest.TestCase):
# Ensure the nodes are actually down
query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL)
with self.assertRaises(NoHostAvailable):
session.execute(query)
self.session.execute(query)
finally:
get_cluster().start(wait_for_binary_proto=True, wait_other_notice=True)
# Give some time for the cluster to come back up, for the next test
time.sleep(5)
self.assertGreater(cluster.metrics.stats.connection_errors, 0)
cluster.shutdown()
self.assertGreater(self.cluster.metrics.stats.connection_errors, 0)
def test_write_timeout(self):
"""
@@ -68,15 +72,12 @@ class MetricsTests(unittest.TestCase):
Attempt a write at cl.ALL and receive a WriteTimeout.
"""
cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION)
session = cluster.connect("test3rf")
# Test write
session.execute("INSERT INTO test (k, v) VALUES (1, 1)")
self.session.execute("INSERT INTO test (k, v) VALUES (1, 1)")
# Assert read
query = SimpleStatement("SELECT * FROM test WHERE k=1", consistency_level=ConsistencyLevel.ALL)
results = execute_until_pass(session, query)
results = execute_until_pass(self.session, query)
self.assertTrue(results)
# Pause node so it shows as unreachable to coordinator
@@ -86,14 +87,12 @@ class MetricsTests(unittest.TestCase):
# Test write
query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL)
with self.assertRaises(WriteTimeout):
session.execute(query, timeout=None)
self.assertEqual(1, cluster.metrics.stats.write_timeouts)
self.session.execute(query, timeout=None)
self.assertEqual(1, self.cluster.metrics.stats.write_timeouts)
finally:
get_node(1).resume()
cluster.shutdown()
def test_read_timeout(self):
"""
Trigger and ensure read_timeouts are counted
@@ -101,15 +100,13 @@ class MetricsTests(unittest.TestCase):
Attempt a read at cl.ALL and receive a ReadTimeout.
"""
cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION)
session = cluster.connect("test3rf")
# Test write
session.execute("INSERT INTO test (k, v) VALUES (1, 1)")
self.session.execute("INSERT INTO test (k, v) VALUES (1, 1)")
# Assert read
query = SimpleStatement("SELECT * FROM test WHERE k=1", consistency_level=ConsistencyLevel.ALL)
results = execute_until_pass(session, query)
results = execute_until_pass(self.session, query)
self.assertTrue(results)
# Pause node so it shows as unreachable to coordinator
@@ -119,14 +116,12 @@ class MetricsTests(unittest.TestCase):
# Test read
query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL)
with self.assertRaises(ReadTimeout):
session.execute(query, timeout=None)
self.assertEqual(1, cluster.metrics.stats.read_timeouts)
self.session.execute(query, timeout=None)
self.assertEqual(1, self.cluster.metrics.stats.read_timeouts)
finally:
get_node(1).resume()
cluster.shutdown()
def test_unavailable(self):
"""
Trigger and ensure unavailables are counted
@@ -134,15 +129,12 @@ class MetricsTests(unittest.TestCase):
Attempt an insert/read at cl.ALL and receive a Unavailable Exception.
"""
cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION)
session = cluster.connect("test3rf")
# Test write
session.execute("INSERT INTO test (k, v) VALUES (1, 1)")
self.session.execute("INSERT INTO test (k, v) VALUES (1, 1)")
# Assert read
query = SimpleStatement("SELECT * FROM test WHERE k=1", consistency_level=ConsistencyLevel.ALL)
results = execute_until_pass(session, query)
results = execute_until_pass(self.session, query)
self.assertTrue(results)
# Stop node gracefully
@@ -152,20 +144,20 @@ class MetricsTests(unittest.TestCase):
# Test write
query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL)
with self.assertRaises(Unavailable):
session.execute(query)
self.assertEqual(1, cluster.metrics.stats.unavailables)
self.session.execute(query)
self.assertEqual(1, self.cluster.metrics.stats.unavailables)
# Test write
query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL)
with self.assertRaises(Unavailable):
session.execute(query, timeout=None)
self.assertEqual(2, cluster.metrics.stats.unavailables)
self.session.execute(query, timeout=None)
self.assertEqual(2, self.cluster.metrics.stats.unavailables)
finally:
get_node(1).start(wait_other_notice=True, wait_for_binary_proto=True)
# Give some time for the cluster to come back up, for the next test
time.sleep(5)
cluster.shutdown()
self.cluster.shutdown()
# def test_other_error(self):
# # TODO: Bootstrapping or Overloaded cases