diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 7cdb7143..8a873813 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -22,7 +22,7 @@ from threading import Event from subprocess import call from itertools import groupby -from cassandra import OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure +from cassandra import OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure, AlreadyExists from cassandra.cluster import Cluster from cassandra.protocol import ConfigurationException @@ -262,7 +262,8 @@ def execute_until_pass(session, query): while tries < 100: try: return session.execute(query) - except ConfigurationException: + except (ConfigurationException, AlreadyExists): + log.warn("Recieved already exists from query {0} not exiting".format(query)) # keyspace/table was already created/dropped return except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure): @@ -274,6 +275,36 @@ def execute_until_pass(session, query): raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query)) +def execute_with_long_wait_retry(session, query, timeout=30): + tries = 0 + while tries < 10: + try: + return session.execute(query, timeout=timeout) + except (ConfigurationException, AlreadyExists): + log.warn("Recieved already exists from query {0} not exiting".format(query)) + # keyspace/table was already created/dropped + return + except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure): + ex_type, ex, tb = sys.exc_info() + log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) + del tb + tries += 1 + + raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query)) + + +def drop_keyspace_shutdown_cluster(keyspace_name, session, cluster): + try: + execute_with_long_wait_retry(session, "DROP KEYSPACE {0}".format(keyspace_name)) + except: + log.warn("Error encountered when droping keyspace {0}".format(keyspace_name)) + ex_type, ex, tb = sys.exc_info() + log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) + del tb + log.warn("Shutting down cluster") + cluster.shutdown() + + def setup_keyspace(ipformat=None): # wait for nodes to startup time.sleep(10) @@ -292,23 +323,23 @@ def setup_keyspace(ipformat=None): ddl = ''' CREATE KEYSPACE test3rf WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}''' - execute_until_pass(session, ddl) + execute_with_long_wait_retry(session, ddl) ddl = ''' CREATE KEYSPACE test2rf WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'}''' - execute_until_pass(session, ddl) + execute_with_long_wait_retry(session, ddl) ddl = ''' CREATE KEYSPACE test1rf WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}''' - execute_until_pass(session, ddl) + execute_with_long_wait_retry(session, ddl) ddl = ''' CREATE TABLE test3rf.test ( k int PRIMARY KEY, v int )''' - execute_until_pass(session, ddl) + execute_with_long_wait_retry(session, ddl) except Exception: traceback.print_exc() @@ -356,12 +387,12 @@ class BasicKeyspaceUnitTestCase(unittest.TestCase): @classmethod def drop_keyspace(cls): - execute_until_pass(cls.session, "DROP KEYSPACE {0}".format(cls.ks_name)) + execute_with_long_wait_retry(cls.session, "DROP KEYSPACE {0}".format(cls.ks_name)) @classmethod def create_keyspace(cls, rf): ddl = "CREATE KEYSPACE {0} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '{1}'}}".format(cls.ks_name, rf) - execute_until_pass(cls.session, ddl) + execute_with_long_wait_retry(cls.session, ddl) @classmethod def common_setup(cls, rf, create_class_table=False, skip_if_cass_version_less_than=None): @@ -369,6 +400,7 @@ class BasicKeyspaceUnitTestCase(unittest.TestCase): cls.session = cls.cluster.connect() cls.ks_name = cls.__name__.lower() cls.create_keyspace(rf) + cls.cass_version = get_server_versions() if create_class_table: @@ -401,8 +433,7 @@ class BasicSharedKeyspaceUnitTestCase(BasicKeyspaceUnitTestCase): @classmethod def tearDownClass(cls): - cls.drop_keyspace() - cls.cluster.shutdown() + drop_keyspace_shutdown_cluster(cls.keyspace_name, cls.session, cls.cluster) class BasicSharedKeyspaceUnitTestCaseWTable(BasicSharedKeyspaceUnitTestCase): @@ -479,5 +510,4 @@ class BasicSegregatedKeyspaceUnitTestCase(BasicKeyspaceUnitTestCase): self.common_setup(1) def tearDown(self): - self.drop_keyspace() - self.cluster.shutdown() + drop_keyspace_shutdown_cluster(self.keyspace_name, self.session, self.cluster) diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index fb5ccf6f..9baaf9fb 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -31,7 +31,7 @@ from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy, from cassandra.protocol import MAX_SUPPORTED_VERSION from cassandra.query import SimpleStatement, TraceUnavailable -from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, get_node, CASSANDRA_VERSION, execute_until_pass +from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, get_node, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry from tests.integration.util import assert_quiescent_pool_state @@ -80,7 +80,7 @@ class ClusterTests(unittest.TestCase): """) self.assertFalse(result) - result = execute_until_pass(session, + result = execute_with_long_wait_retry(session, """ CREATE TABLE clustertests.cf0 ( a text, @@ -100,7 +100,7 @@ class ClusterTests(unittest.TestCase): result = session.execute("SELECT * FROM clustertests.cf0") self.assertEqual([('a', 'b', 'c')], result) - execute_until_pass(session, "DROP KEYSPACE clustertests") + execute_with_long_wait_retry(session, "DROP KEYSPACE clustertests") cluster.shutdown() diff --git a/tests/integration/standard/test_custom_protocol_handler.py b/tests/integration/standard/test_custom_protocol_handler.py index ce16e78c..1427b3b0 100644 --- a/tests/integration/standard/test_custom_protocol_handler.py +++ b/tests/integration/standard/test_custom_protocol_handler.py @@ -20,7 +20,7 @@ except ImportError: from cassandra.protocol import ProtocolHandler, ResultMessage, UUIDType, read_int, EventMessage from cassandra.query import tuple_factory from cassandra.cluster import Cluster -from tests.integration import use_singledc, PROTOCOL_VERSION, execute_until_pass +from tests.integration import use_singledc, PROTOCOL_VERSION, drop_keyspace_shutdown_cluster from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES from tests.integration.standard.utils import create_table_with_all_types, get_all_primitive_params from six import binary_type @@ -44,8 +44,7 @@ class CustomProtocolHandlerTest(unittest.TestCase): @classmethod def tearDownClass(cls): - cls.session.execute("DROP KEYSPACE custserdes") - cls.cluster.shutdown() + drop_keyspace_shutdown_cluster("custserdes", cls.session, cls.cluster) def test_custom_raw_uuid_row_results(self): """ diff --git a/tests/integration/standard/test_cython_protocol_handlers.py b/tests/integration/standard/test_cython_protocol_handlers.py index a61a1fc2..7532426e 100644 --- a/tests/integration/standard/test_cython_protocol_handlers.py +++ b/tests/integration/standard/test_cython_protocol_handlers.py @@ -11,7 +11,7 @@ from cassandra.query import tuple_factory from cassandra.cluster import Cluster from cassandra.protocol import ProtocolHandler, LazyProtocolHandler, NumpyProtocolHandler -from tests.integration import use_singledc, PROTOCOL_VERSION, notprotocolv1 +from tests.integration import use_singledc, PROTOCOL_VERSION, notprotocolv1, drop_keyspace_shutdown_cluster from tests.integration.datatype_utils import update_datatypes from tests.integration.standard.utils import ( create_table_with_all_types, get_all_primitive_params, get_primitive_datatypes) @@ -38,8 +38,7 @@ class CythonProtocolHandlerTest(unittest.TestCase): @classmethod def tearDownClass(cls): - cls.session.execute("DROP KEYSPACE testspace") - cls.cluster.shutdown() + drop_keyspace_shutdown_cluster("testspace", cls.session, cls.session) @cythontest def test_cython_parser(self): diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index b2d8fc1a..052d831e 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -34,7 +34,7 @@ from cassandra.policies import SimpleConvictionPolicy from cassandra.pool import Host from tests.integration import get_cluster, use_singledc, PROTOCOL_VERSION, get_server_versions, execute_until_pass, \ - BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase + BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster def setup_module(): @@ -1863,7 +1863,6 @@ class BadMetaTest(unittest.TestCase): class BadMetaException(Exception): pass - @property def function_name(self): return self._testMethodName.lower() @@ -1880,8 +1879,7 @@ class BadMetaTest(unittest.TestCase): @classmethod def teardown_class(cls): - cls.session.execute("DROP KEYSPACE %s" % cls.keyspace_name) - cls.cluster.shutdown() + drop_keyspace_shutdown_cluster(cls.keyspace_name, cls.session, cls.cluster) def _skip_if_not_version(self, version): if CASS_SERVER_VERSION < version: diff --git a/tests/integration/standard/test_metrics.py b/tests/integration/standard/test_metrics.py index b2a046ba..70da6710 100644 --- a/tests/integration/standard/test_metrics.py +++ b/tests/integration/standard/test_metrics.py @@ -32,18 +32,23 @@ def setup_module(): class MetricsTests(unittest.TestCase): + def setUp(self): + self.cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION) + self.session = self.cluster.connect("test3rf") + + def tearDown(self): + self.cluster.shutdown() + def test_connection_error(self): """ Trigger and ensure connection_errors are counted Stop all node with the driver knowing about the "DOWN" states. """ - cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION) - session = cluster.connect("test3rf") # Test writes for i in range(0, 100): - session.execute_async("INSERT INTO test (k, v) VALUES ({0}, {1})".format(i, i)) + self.session.execute_async("INSERT INTO test (k, v) VALUES ({0}, {1})".format(i, i)) # Stop the cluster get_cluster().stop(wait=True, gently=False) @@ -52,14 +57,13 @@ class MetricsTests(unittest.TestCase): # Ensure the nodes are actually down query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL) with self.assertRaises(NoHostAvailable): - session.execute(query) + self.session.execute(query) finally: get_cluster().start(wait_for_binary_proto=True, wait_other_notice=True) # Give some time for the cluster to come back up, for the next test time.sleep(5) - self.assertGreater(cluster.metrics.stats.connection_errors, 0) - cluster.shutdown() + self.assertGreater(self.cluster.metrics.stats.connection_errors, 0) def test_write_timeout(self): """ @@ -68,15 +72,12 @@ class MetricsTests(unittest.TestCase): Attempt a write at cl.ALL and receive a WriteTimeout. """ - cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION) - session = cluster.connect("test3rf") - # Test write - session.execute("INSERT INTO test (k, v) VALUES (1, 1)") + self.session.execute("INSERT INTO test (k, v) VALUES (1, 1)") # Assert read query = SimpleStatement("SELECT * FROM test WHERE k=1", consistency_level=ConsistencyLevel.ALL) - results = execute_until_pass(session, query) + results = execute_until_pass(self.session, query) self.assertTrue(results) # Pause node so it shows as unreachable to coordinator @@ -86,14 +87,12 @@ class MetricsTests(unittest.TestCase): # Test write query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL) with self.assertRaises(WriteTimeout): - session.execute(query, timeout=None) - self.assertEqual(1, cluster.metrics.stats.write_timeouts) + self.session.execute(query, timeout=None) + self.assertEqual(1, self.cluster.metrics.stats.write_timeouts) finally: get_node(1).resume() - cluster.shutdown() - def test_read_timeout(self): """ Trigger and ensure read_timeouts are counted @@ -101,15 +100,13 @@ class MetricsTests(unittest.TestCase): Attempt a read at cl.ALL and receive a ReadTimeout. """ - cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION) - session = cluster.connect("test3rf") # Test write - session.execute("INSERT INTO test (k, v) VALUES (1, 1)") + self.session.execute("INSERT INTO test (k, v) VALUES (1, 1)") # Assert read query = SimpleStatement("SELECT * FROM test WHERE k=1", consistency_level=ConsistencyLevel.ALL) - results = execute_until_pass(session, query) + results = execute_until_pass(self.session, query) self.assertTrue(results) # Pause node so it shows as unreachable to coordinator @@ -119,14 +116,12 @@ class MetricsTests(unittest.TestCase): # Test read query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL) with self.assertRaises(ReadTimeout): - session.execute(query, timeout=None) - self.assertEqual(1, cluster.metrics.stats.read_timeouts) + self.session.execute(query, timeout=None) + self.assertEqual(1, self.cluster.metrics.stats.read_timeouts) finally: get_node(1).resume() - cluster.shutdown() - def test_unavailable(self): """ Trigger and ensure unavailables are counted @@ -134,15 +129,12 @@ class MetricsTests(unittest.TestCase): Attempt an insert/read at cl.ALL and receive a Unavailable Exception. """ - cluster = Cluster(metrics_enabled=True, protocol_version=PROTOCOL_VERSION) - session = cluster.connect("test3rf") - # Test write - session.execute("INSERT INTO test (k, v) VALUES (1, 1)") + self.session.execute("INSERT INTO test (k, v) VALUES (1, 1)") # Assert read query = SimpleStatement("SELECT * FROM test WHERE k=1", consistency_level=ConsistencyLevel.ALL) - results = execute_until_pass(session, query) + results = execute_until_pass(self.session, query) self.assertTrue(results) # Stop node gracefully @@ -152,20 +144,20 @@ class MetricsTests(unittest.TestCase): # Test write query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL) with self.assertRaises(Unavailable): - session.execute(query) - self.assertEqual(1, cluster.metrics.stats.unavailables) + self.session.execute(query) + self.assertEqual(1, self.cluster.metrics.stats.unavailables) # Test write query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL) with self.assertRaises(Unavailable): - session.execute(query, timeout=None) - self.assertEqual(2, cluster.metrics.stats.unavailables) + self.session.execute(query, timeout=None) + self.assertEqual(2, self.cluster.metrics.stats.unavailables) finally: get_node(1).start(wait_other_notice=True, wait_for_binary_proto=True) # Give some time for the cluster to come back up, for the next test time.sleep(5) - cluster.shutdown() + self.cluster.shutdown() # def test_other_error(self): # # TODO: Bootstrapping or Overloaded cases