From fa1cb6809377f9f41643d4e44fb986ef94f2c692 Mon Sep 17 00:00:00 2001 From: Kishan Karunaratne Date: Fri, 12 Jun 2015 16:16:19 -0700 Subject: [PATCH] Windows testing support and stabilizations --- tests/integration/__init__.py | 117 +++++++++++------- tests/integration/cqlengine/__init__.py | 2 +- .../cqlengine/columns/test_validation.py | 2 +- .../integration/cqlengine/model/test_udts.py | 2 +- .../cqlengine/query/test_queryset.py | 7 +- tests/integration/long/test_consistency.py | 22 ++-- tests/integration/long/test_failure_types.py | 41 +++--- tests/integration/long/test_ipv6.py | 68 ++++++---- .../long/test_loadbalancingpolicies.py | 32 ++++- tests/integration/long/test_schema.py | 77 ++++-------- tests/integration/long/utils.py | 12 +- tests/integration/standard/test_cluster.py | 14 +-- tests/integration/standard/test_concurrent.py | 56 ++++++--- tests/integration/standard/test_metadata.py | 36 ++---- tests/integration/standard/test_metrics.py | 10 +- tests/integration/standard/test_query.py | 6 + tests/integration/standard/test_types.py | 9 +- tests/integration/standard/test_udts.py | 10 +- .../cqlengine => stress_tests}/test_load.py | 42 ++++--- .../test_multi_inserts.py | 20 ++- tests/unit/io/test_asyncorereactor.py | 8 +- tests/unit/io/test_libevreactor.py | 2 +- tests/unit/test_time_util.py | 4 +- 23 files changed, 346 insertions(+), 253 deletions(-) rename tests/{integration/cqlengine => stress_tests}/test_load.py (52%) rename tests/{integration/long => stress_tests}/test_multi_inserts.py (76%) mode change 100755 => 100644 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 1f43d6d6..3568b1ff 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -12,25 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time -import traceback - try: import unittest2 as unittest except ImportError: import unittest # noqa -import logging -log = logging.getLogger(__name__) - -import os +import os, six, time, sys, logging, traceback from threading import Event -import six from subprocess import call - from itertools import groupby +from cassandra import OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure from cassandra.cluster import Cluster +from cassandra.protocol import ConfigurationException try: from ccmlib.cluster import Cluster as CCMCluster @@ -39,6 +33,8 @@ try: except ImportError as e: CCMClusterFactory = None +log = logging.getLogger(__name__) + CLUSTER_NAME = 'test_cluster' SINGLE_NODE_CLUSTER_NAME = 'single_node' MULTIDC_CLUSTER_NAME = 'multidc_test_cluster' @@ -84,7 +80,7 @@ def _tuple_version(version_string): USE_CASS_EXTERNAL = bool(os.getenv('USE_CASS_EXTERNAL', False)) -default_cassandra_version = '2.1.3' +default_cassandra_version = '2.1.5' if USE_CASS_EXTERNAL: if CCMClusterFactory: @@ -157,10 +153,21 @@ def remove_cluster(): global CCM_CLUSTER if CCM_CLUSTER: - log.debug("removing cluster %s", CCM_CLUSTER.name) - CCM_CLUSTER.remove() - CCM_CLUSTER = None + log.debug("Removing cluster {0}".format(CCM_CLUSTER.name)) + tries = 0 + while tries < 100: + try: + CCM_CLUSTER.remove() + CCM_CLUSTER = None + return + except WindowsError: + ex_type, ex, tb = sys.exc_info() + log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) + del tb + tries += 1 + time.sleep(1) + raise RuntimeError("Failed to remove cluster after 100 attempts") def is_current_cluster(cluster_name, node_counts): global CCM_CLUSTER @@ -175,49 +182,55 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True): global CCM_CLUSTER if USE_CASS_EXTERNAL: if CCM_CLUSTER: - log.debug("Using external ccm cluster %s", CCM_CLUSTER.name) + log.debug("Using external CCM cluster {0}".format(CCM_CLUSTER.name)) else: log.debug("Using unnamed external cluster") return if is_current_cluster(cluster_name, nodes): - log.debug("Using existing cluster %s", cluster_name) - return + log.debug("Using existing cluster, matching topology: {0}".format(cluster_name)) + else: + if CCM_CLUSTER: + log.debug("Stopping existing cluster, topology mismatch: {0}".format(CCM_CLUSTER.name)) + CCM_CLUSTER.stop() - if CCM_CLUSTER: - log.debug("Stopping cluster %s", CCM_CLUSTER.name) - CCM_CLUSTER.stop() - - try: try: - cluster = CCMClusterFactory.load(path, cluster_name) - log.debug("Found existing ccm %s cluster; clearing", cluster_name) - cluster.clear() - cluster.set_install_dir(**CCM_KWARGS) + CCM_CLUSTER = CCMClusterFactory.load(path, cluster_name) + log.debug("Found existing CCM cluster, {0}; clearing.".format(cluster_name)) + CCM_CLUSTER.clear() + CCM_CLUSTER.set_install_dir(**CCM_KWARGS) except Exception: - log.debug("Creating new ccm %s cluster with %s", cluster_name, CCM_KWARGS) - cluster = CCMCluster(path, cluster_name, **CCM_KWARGS) - cluster.set_configuration_options({'start_native_transport': True}) - if CASSANDRA_VERSION >= '2.2': - cluster.set_configuration_options({'enable_user_defined_functions': True}) - common.switch_cluster(path, cluster_name) - cluster.populate(nodes, ipformat=ipformat) + ex_type, ex, tb = sys.exc_info() + log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) + del tb + log.debug("Creating new CCM cluster, {0}, with args {1}".format(cluster_name, CCM_KWARGS)) + CCM_CLUSTER = CCMCluster(path, cluster_name, **CCM_KWARGS) + CCM_CLUSTER.set_configuration_options({'start_native_transport': True}) + if CASSANDRA_VERSION >= '2.2': + CCM_CLUSTER.set_configuration_options({'enable_user_defined_functions': True}) + common.switch_cluster(path, cluster_name) + CCM_CLUSTER.populate(nodes, ipformat=ipformat) + try: jvm_args = [] # This will enable the Mirroring query handler which will echo our custom payload k,v pairs back if PROTOCOL_VERSION >= 4: jvm_args = [" -Dcassandra.custom_query_handler_class=org.apache.cassandra.cql3.CustomPayloadMirroringQueryHandler"] if start: - log.debug("Starting ccm %s cluster", cluster_name) - cluster.start(wait_for_binary_proto=True, wait_other_notice=True, jvm_args=jvm_args) + log.debug("Starting CCM cluster: {0}".format(cluster_name)) + CCM_CLUSTER.start(wait_for_binary_proto=True, wait_other_notice=True, jvm_args=jvm_args) setup_keyspace(ipformat=ipformat) - - CCM_CLUSTER = cluster except Exception: - log.exception("Failed to start ccm cluster. Removing cluster.") + log.exception("Failed to start CCM cluster; removing cluster.") + + if os.name == "nt": + if CCM_CLUSTER: + for node in CCM_CLUSTER.nodes.itervalues(): + os.system("taskkill /F /PID " + str(node.pid)) + else: + call(["pkill", "-9", "-f", ".ccm"]) remove_cluster() - call(["pkill", "-9", "-f", ".ccm"]) raise @@ -240,6 +253,22 @@ def teardown_package(): log.warning('Did not find cluster: %s' % cluster_name) +def execute_until_pass(session, query): + tries = 0 + while tries < 100: + try: + return session.execute(query) + except ConfigurationException: + # keyspace/table was already created/dropped + return + except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure): + ex_type, ex, tb = sys.exc_info() + log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) + del tb + tries += 1 + + raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query)) + def setup_keyspace(ipformat=None): # wait for nodes to startup time.sleep(10) @@ -251,32 +280,32 @@ def setup_keyspace(ipformat=None): session = cluster.connect() try: - results = session.execute("SELECT keyspace_name FROM system.schema_keyspaces") + results = execute_until_pass(session, "SELECT keyspace_name FROM system.schema_keyspaces") existing_keyspaces = [row[0] for row in results] for ksname in ('test1rf', 'test2rf', 'test3rf'): if ksname in existing_keyspaces: - session.execute("DROP KEYSPACE %s" % ksname) + execute_until_pass(session, "DROP KEYSPACE %s" % ksname) ddl = ''' CREATE KEYSPACE test3rf WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}''' - session.execute(ddl) + execute_until_pass(session, ddl) ddl = ''' CREATE KEYSPACE test2rf WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'}''' - session.execute(ddl) + execute_until_pass(session, ddl) ddl = ''' CREATE KEYSPACE test1rf WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}''' - session.execute(ddl) + execute_until_pass(session, ddl) ddl = ''' CREATE TABLE test3rf.test ( k int PRIMARY KEY, v int )''' - session.execute(ddl) + execute_until_pass(session, ddl) except Exception: traceback.print_exc() diff --git a/tests/integration/cqlengine/__init__.py b/tests/integration/cqlengine/__init__.py index f0322c6b..c53884fc 100644 --- a/tests/integration/cqlengine/__init__.py +++ b/tests/integration/cqlengine/__init__.py @@ -28,7 +28,7 @@ def setup_package(): use_single_node() keyspace = 'cqlengine_test' - connection.setup(['localhost'], + connection.setup(['127.0.0.1'], protocol_version=PROTOCOL_VERSION, default_keyspace=keyspace) diff --git a/tests/integration/cqlengine/columns/test_validation.py b/tests/integration/cqlengine/columns/test_validation.py index 5426548e..b6457555 100644 --- a/tests/integration/cqlengine/columns/test_validation.py +++ b/tests/integration/cqlengine/columns/test_validation.py @@ -358,7 +358,7 @@ class TestTimeUUIDFromDatetime(BaseCassEngTestCase): def test_conversion_specific_date(self): dt = datetime(1981, 7, 11, microsecond=555000) - uuid = TimeUUID.from_datetime(dt) + uuid = util.uuid_from_time(dt) from uuid import UUID assert isinstance(uuid, UUID) diff --git a/tests/integration/cqlengine/model/test_udts.py b/tests/integration/cqlengine/model/test_udts.py index 25c3ccee..86cc4596 100644 --- a/tests/integration/cqlengine/model/test_udts.py +++ b/tests/integration/cqlengine/model/test_udts.py @@ -26,7 +26,7 @@ from cassandra.cqlengine import columns from cassandra.cqlengine.management import sync_table, sync_type, create_keyspace_simple, drop_keyspace from cassandra.util import Date, Time -from tests.integration import get_server_versions, PROTOCOL_VERSION +from tests.integration import PROTOCOL_VERSION from tests.integration.cqlengine.base import BaseCassEngTestCase diff --git a/tests/integration/cqlengine/query/test_queryset.py b/tests/integration/cqlengine/query/test_queryset.py index 6d5fc926..cbaea21f 100644 --- a/tests/integration/cqlengine/query/test_queryset.py +++ b/tests/integration/cqlengine/query/test_queryset.py @@ -37,6 +37,7 @@ from datetime import tzinfo from cassandra.cqlengine import statements from cassandra.cqlengine import operators +from cassandra.util import uuid_from_time from cassandra.cqlengine.connection import get_session from tests.integration import PROTOCOL_VERSION @@ -582,17 +583,17 @@ class TestMinMaxTimeUUIDFunctions(BaseCassEngTestCase): TimeUUIDQueryModel.create( partition=pk, - time=columns.TimeUUID.from_datetime(midpoint_utc - timedelta(minutes=1)), + time=uuid_from_time(midpoint_utc - timedelta(minutes=1)), data='1') TimeUUIDQueryModel.create( partition=pk, - time=columns.TimeUUID.from_datetime(midpoint_utc), + time=uuid_from_time(midpoint_utc), data='2') TimeUUIDQueryModel.create( partition=pk, - time=columns.TimeUUID.from_datetime(midpoint_utc + timedelta(minutes=1)), + time=uuid_from_time(midpoint_utc + timedelta(minutes=1)), data='3') assert ['1', '2'] == [o.data for o in TimeUUIDQueryModel.filter( diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index db752cc4..1c860e99 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import struct, logging, sys, traceback, time +import struct, time, traceback, sys, logging from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, Unavailable from cassandra.cluster import Cluster from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy from cassandra.query import SimpleStatement -from tests.integration import use_singledc, PROTOCOL_VERSION +from tests.integration import use_singledc, PROTOCOL_VERSION, execute_until_pass from tests.integration.long.utils import (force_stop, create_schema, wait_for_down, wait_for_up, start, CoordinatorStats) @@ -28,8 +28,6 @@ try: except ImportError: import unittest # noqa -log = logging.getLogger(__name__) - ALL_CONSISTENCY_LEVELS = set([ ConsistencyLevel.ANY, ConsistencyLevel.ONE, ConsistencyLevel.TWO, ConsistencyLevel.QUORUM, ConsistencyLevel.THREE, @@ -41,6 +39,8 @@ MULTI_DC_CONSISTENCY_LEVELS = set([ SINGLE_DC_CONSISTENCY_LEVELS = ALL_CONSISTENCY_LEVELS - MULTI_DC_CONSISTENCY_LEVELS +log = logging.getLogger(__name__) + def setup_module(): use_singledc() @@ -65,15 +65,7 @@ class ConsistencyTests(unittest.TestCase): for i in range(count): ss = SimpleStatement('INSERT INTO cf(k, i) VALUES (0, 0)', consistency_level=consistency_level) - while True: - try: - session.execute(ss) - break - except (OperationTimedOut, WriteTimeout): - ex_type, ex, tb = sys.exc_info() - log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) - del tb - time.sleep(1) + execute_until_pass(session, ss) def _query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE): routing_key = struct.pack('>i', 0) @@ -81,7 +73,10 @@ class ConsistencyTests(unittest.TestCase): ss = SimpleStatement('SELECT * FROM cf WHERE k = 0', consistency_level=consistency_level, routing_key=routing_key) + tries = 0 while True: + if tries > 100: + raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(ss)) try: self.coordinator_stats.add_coordinator(session.execute_async(ss)) break @@ -89,6 +84,7 @@ class ConsistencyTests(unittest.TestCase): ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) del tb + tries += 1 time.sleep(1) def _assert_writes_succeed(self, session, keyspace, consistency_levels): diff --git a/tests/integration/long/test_failure_types.py b/tests/integration/long/test_failure_types.py index a927dc95..64d06f46 100644 --- a/tests/integration/long/test_failure_types.py +++ b/tests/integration/long/test_failure_types.py @@ -12,19 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys, logging, traceback + +from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,\ + FunctionFailure +from cassandra.cluster import Cluster +from cassandra.concurrent import execute_concurrent_with_args +from cassandra.query import SimpleStatement +from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster try: import unittest2 as unittest except ImportError: import unittest - -from cassandra.cluster import Cluster -from cassandra import ConsistencyLevel -from cassandra import WriteFailure, ReadFailure, FunctionFailure -from cassandra.concurrent import execute_concurrent_with_args -from cassandra.query import SimpleStatement -from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace +log = logging.getLogger(__name__) def setup_module(): @@ -48,14 +50,10 @@ def setup_module(): def teardown_module(): """ The rest of the tests don't need custom tombstones - reset the config options so as to not mess with other tests. + remove the cluster so as to not interfere with other tests. """ if PROTOCOL_VERSION >= 4: - ccm_cluster = get_cluster() - config_options = {} - ccm_cluster.set_configuration_options(config_options) - if ccm_cluster is not None: - ccm_cluster.stop() + remove_cluster() class ClientExceptionTests(unittest.TestCase): @@ -83,6 +81,19 @@ class ClientExceptionTests(unittest.TestCase): # Restart the nodes to fully functional again self.setFailingNodes(failing_nodes, "testksfail") + def execute_concurrent_args_helper(self, session, query, params): + tries = 0 + while tries < 100: + try: + return execute_concurrent_with_args(session, query, params, concurrency=50) + except (ReadTimeout, WriteTimeout, OperationTimedOut, ReadFailure, WriteFailure): + ex_type, ex, tb = sys.exc_info() + log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) + del tb + tries += 1 + + raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query)) + def setFailingNodes(self, failing_nodes, keyspace): """ This method will take in a set of failing nodes, and toggle all of the nodes in the provided list to fail @@ -210,11 +221,11 @@ class ClientExceptionTests(unittest.TestCase): statement = self.session.prepare("INSERT INTO test3rf.test2 (k, v0,v1) VALUES (1,?,1)") parameters = [(x,) for x in range(3000)] - execute_concurrent_with_args(self.session, statement, parameters, concurrency=50) + self.execute_concurrent_args_helper(self.session, statement, parameters) statement = self.session.prepare("DELETE v1 FROM test3rf.test2 WHERE k = 1 AND v0 =?") parameters = [(x,) for x in range(2001)] - execute_concurrent_with_args(self.session, statement, parameters, concurrency=50) + self.execute_concurrent_args_helper(self.session, statement, parameters) self._perform_cql_statement( """ diff --git a/tests/integration/long/test_ipv6.py b/tests/integration/long/test_ipv6.py index 3f416561..e17f22aa 100644 --- a/tests/integration/long/test_ipv6.py +++ b/tests/integration/long/test_ipv6.py @@ -12,27 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import os -import socket +import os, socket +from ccmlib import common from cassandra.cluster import Cluster, NoHostAvailable -from ccmlib import common -from tests.integration import use_cluster, remove_cluster, PROTOCOL_VERSION from cassandra.io.asyncorereactor import AsyncoreConnection -try: - from cassandra.io.libevreactor import LibevConnection -except ImportError: - LibevConnection = None +from tests import is_monkey_patched +from tests.integration import use_cluster, remove_cluster, PROTOCOL_VERSION +if is_monkey_patched(): + LibevConnection = -1 + AsyncoreConnection = -1 +else: + try: + from cassandra.io.libevreactor import LibevConnection + except ImportError: + LibevConnection = None try: import unittest2 as unittest except ImportError: import unittest # noqa -log = logging.getLogger(__name__) # If more modules do IPV6 testing, this can be moved down to integration.__init__. # For now, just keeping the clutter here @@ -40,12 +42,12 @@ IPV6_CLUSTER_NAME = 'ipv6_test_cluster' def setup_module(module): - validate_ccm_viable() - validate_host_viable() - # We use a dedicated cluster (instead of common singledc, as in other tests) because - # it's most likely that the test host will only have one local ipv6 address (::1) - # singledc has three - use_cluster(IPV6_CLUSTER_NAME, [1], ipformat='::%d') + if os.name != "nt": + validate_host_viable() + # We use a dedicated cluster (instead of common singledc, as in other tests) because + # it's most likely that the test host will only have one local ipv6 address (::1) + # singledc has three + use_cluster(IPV6_CLUSTER_NAME, [1], ipformat='::%d') def teardown_module(): @@ -73,7 +75,8 @@ class IPV6ConnectionTest(object): connection_class = None def test_connect(self): - cluster = Cluster(connection_class=self.connection_class, contact_points=['::1'], protocol_version=PROTOCOL_VERSION) + cluster = Cluster(connection_class=self.connection_class, contact_points=['::1'], connect_timeout=10, + protocol_version=PROTOCOL_VERSION) session = cluster.connect() future = session.execute_async("SELECT * FROM system.local") future.result() @@ -81,26 +84,41 @@ class IPV6ConnectionTest(object): cluster.shutdown() def test_error(self): - cluster = Cluster(connection_class=self.connection_class, contact_points=['::1'], port=9043, protocol_version=PROTOCOL_VERSION) - self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*%s.*::1\', 9043.*Connection refused.*' % os.errno.ECONNREFUSED, cluster.connect) + cluster = Cluster(connection_class=self.connection_class, contact_points=['::1'], port=9043, + connect_timeout=10, protocol_version=PROTOCOL_VERSION) + self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*%s.*::1\', 9043.*Connection refused.*' + % os.errno.ECONNREFUSED, cluster.connect) def test_error_multiple(self): if len(socket.getaddrinfo('localhost', 9043, socket.AF_UNSPEC, socket.SOCK_STREAM)) < 2: raise unittest.SkipTest('localhost only resolves one address') - cluster = Cluster(connection_class=self.connection_class, contact_points=['localhost'], port=9043, protocol_version=PROTOCOL_VERSION) - self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*Tried connecting to \[\(.*\(.*\].*Last error', cluster.connect) + cluster = Cluster(connection_class=self.connection_class, contact_points=['localhost'], port=9043, + connect_timeout=10, protocol_version=PROTOCOL_VERSION) + self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*Tried connecting to \[\(.*\(.*\].*Last error', + cluster.connect) class LibevConnectionTests(IPV6ConnectionTest, unittest.TestCase): connection_class = LibevConnection - @classmethod - def setup_class(cls): - if LibevConnection is None: - raise unittest.SkipTest('libev does not appear to be installed properly') + def setUp(self): + if os.name == "nt": + raise unittest.SkipTest("IPv6 is currently not supported under Windows") + + if LibevConnection == -1: + raise unittest.SkipTest("Can't test libev with monkey patching") + elif LibevConnection is None: + raise unittest.SkipTest("Libev does not appear to be installed properly") class AsyncoreConnectionTests(IPV6ConnectionTest, unittest.TestCase): connection_class = AsyncoreConnection + + def setUp(self): + if os.name == "nt": + raise unittest.SkipTest("IPv6 is currently not supported under Windows") + + if AsyncoreConnection == -1: + raise unittest.SkipTest("Can't test asyncore with monkey patching") \ No newline at end of file diff --git a/tests/integration/long/test_loadbalancingpolicies.py b/tests/integration/long/test_loadbalancingpolicies.py index 7b2b8795..465f9d76 100644 --- a/tests/integration/long/test_loadbalancingpolicies.py +++ b/tests/integration/long/test_loadbalancingpolicies.py @@ -14,7 +14,8 @@ import struct, time, logging, sys, traceback -from cassandra import ConsistencyLevel, Unavailable, OperationTimedOut, ReadTimeout +from cassandra import ConsistencyLevel, Unavailable, OperationTimedOut, ReadTimeout, ReadFailure, \ + WriteTimeout, WriteFailure from cassandra.cluster import Cluster, NoHostAvailable from cassandra.concurrent import execute_concurrent_with_args from cassandra.metadata import murmur3 @@ -50,9 +51,20 @@ class LoadBalancingPolicyTests(unittest.TestCase): def _insert(self, session, keyspace, count=12, consistency_level=ConsistencyLevel.ONE): session.execute('USE %s' % keyspace) - ss = SimpleStatement('INSERT INTO cf(k, i) VALUES (0, 0)', - consistency_level=consistency_level) - execute_concurrent_with_args(session, ss, [None] * count) + ss = SimpleStatement('INSERT INTO cf(k, i) VALUES (0, 0)', consistency_level=consistency_level) + + tries = 0 + while tries < 100: + try: + execute_concurrent_with_args(session, ss, [None] * count) + return + except (OperationTimedOut, WriteTimeout, WriteFailure): + ex_type, ex, tb = sys.exc_info() + log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) + del tb + tries += 1 + + raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(ss)) def _query(self, session, keyspace, count=12, consistency_level=ConsistencyLevel.ONE, use_prepared=False): @@ -62,28 +74,36 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.prepared = session.prepare(query_string) for i in range(count): + tries = 0 while True: + if tries > 100: + raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(self.prepared)) try: self.coordinator_stats.add_coordinator(session.execute_async(self.prepared.bind((0,)))) break - except (OperationTimedOut, ReadTimeout): + except (OperationTimedOut, ReadTimeout, ReadFailure): ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) del tb + tries += 1 else: routing_key = struct.pack('>i', 0) for i in range(count): ss = SimpleStatement('SELECT * FROM %s.cf WHERE k = 0' % keyspace, consistency_level=consistency_level, routing_key=routing_key) + tries = 0 while True: + if tries > 100: + raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(ss)) try: self.coordinator_stats.add_coordinator(session.execute_async(ss)) break - except (OperationTimedOut, ReadTimeout): + except (OperationTimedOut, ReadTimeout, ReadFailure): ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) del tb + tries += 1 def test_token_aware_is_used_by_default(self): """ diff --git a/tests/integration/long/test_schema.py b/tests/integration/long/test_schema.py index 806726cf..7da5203f 100644 --- a/tests/integration/long/test_schema.py +++ b/tests/integration/long/test_schema.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, sys, traceback +import logging -from cassandra import ConsistencyLevel, OperationTimedOut +from cassandra import ConsistencyLevel, AlreadyExists from cassandra.cluster import Cluster -from cassandra.protocol import ConfigurationException from cassandra.query import SimpleStatement -from tests.integration import use_singledc, PROTOCOL_VERSION + +from tests.integration import use_singledc, PROTOCOL_VERSION, execute_until_pass try: import unittest2 as unittest @@ -54,29 +54,29 @@ class SchemaTests(unittest.TestCase): for keyspace_number in range(5): keyspace = "ks_{0}".format(keyspace_number) - results = session.execute("SELECT keyspace_name FROM system.schema_keyspaces") + results = execute_until_pass(session, "SELECT keyspace_name FROM system.schema_keyspaces") existing_keyspaces = [row[0] for row in results] if keyspace in existing_keyspaces: drop = "DROP KEYSPACE {0}".format(keyspace) log.debug(drop) - session.execute(drop) + execute_until_pass(session, drop) create = "CREATE KEYSPACE {0} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 3}}".format(keyspace) log.debug(create) - session.execute(create) + execute_until_pass(session, create) create = "CREATE TABLE {0}.cf (k int PRIMARY KEY, i int)".format(keyspace) log.debug(create) - session.execute(create) + execute_until_pass(session, create) use = "USE {0}".format(keyspace) log.debug(use) - session.execute(use) + execute_until_pass(session, use) insert = "INSERT INTO cf (k, i) VALUES (0, 0)" log.debug(insert) ss = SimpleStatement(insert, consistency_level=ConsistencyLevel.QUORUM) - session.execute(ss) + execute_until_pass(session, ss) def test_for_schema_disagreements_different_keyspaces(self): """ @@ -86,28 +86,13 @@ class SchemaTests(unittest.TestCase): session = self.session for i in xrange(30): - try: - session.execute("CREATE KEYSPACE test_{0} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}".format(i)) - session.execute("CREATE TABLE test_{0}.cf (key int PRIMARY KEY, value int)".format(i)) + execute_until_pass(session, "CREATE KEYSPACE test_{0} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}".format(i)) + execute_until_pass(session, "CREATE TABLE test_{0}.cf (key int PRIMARY KEY, value int)".format(i)) - for j in xrange(100): - session.execute("INSERT INTO test_{0}.cf (key, value) VALUES ({1}, {1})".format(i, j)) - except OperationTimedOut: - ex_type, ex, tb = sys.exc_info() - log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) - del tb - finally: - while True: - try: - session.execute("DROP KEYSPACE test_{0}".format(i)) - break - except OperationTimedOut: - ex_type, ex, tb = sys.exc_info() - log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) - del tb - except ConfigurationException: - # We're good, the keyspace was never created due to OperationTimedOut - break + for j in xrange(100): + execute_until_pass(session, "INSERT INTO test_{0}.cf (key, value) VALUES ({1}, {1})".format(i, j)) + + execute_until_pass(session, "DROP KEYSPACE test_{0}".format(i)) def test_for_schema_disagreements_same_keyspace(self): """ @@ -119,24 +104,14 @@ class SchemaTests(unittest.TestCase): for i in xrange(30): try: - session.execute("CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}") - session.execute("CREATE TABLE test.cf (key int PRIMARY KEY, value int)") + execute_until_pass(session, "CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}") + except AlreadyExists: + execute_until_pass(session, "DROP KEYSPACE test") + execute_until_pass(session, "CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}") - for j in xrange(100): - session.execute("INSERT INTO test.cf (key, value) VALUES ({0}, {0})".format(j)) - except OperationTimedOut: - ex_type, ex, tb = sys.exc_info() - log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) - del tb - finally: - while True: - try: - session.execute("DROP KEYSPACE test") - break - except OperationTimedOut: - ex_type, ex, tb = sys.exc_info() - log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) - del tb - except ConfigurationException: - # We're good, the keyspace was never created due to OperationTimedOut - break + execute_until_pass(session, "CREATE TABLE test.cf (key int PRIMARY KEY, value int)") + + for j in xrange(100): + execute_until_pass(session, "INSERT INTO test.cf (key, value) VALUES ({0}, {0})".format(j)) + + execute_until_pass(session, "DROP KEYSPACE test") diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index e0e1e530..a3fb098c 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -131,23 +131,31 @@ def ring(node): def wait_for_up(cluster, node, wait=True): - while True: + tries = 0 + while tries < 100: host = cluster.metadata.get_host(IP_FORMAT % node) if host and host.is_up: log.debug("Done waiting for node %s to be up", node) return else: log.debug("Host is still marked down, waiting") + tries += 1 time.sleep(1) + raise RuntimeError("Host {0} is not up after 100 attempts".format(IP_FORMAT.format(node))) + def wait_for_down(cluster, node, wait=True): log.debug("Waiting for node %s to be down", node) - while True: + tries = 0 + while tries < 100: host = cluster.metadata.get_host(IP_FORMAT % node) if not host or not host.is_up: log.debug("Done waiting for node %s to be down", node) return else: log.debug("Host is still marked up, waiting") + tries += 1 time.sleep(1) + + raise RuntimeError("Host {0} is not down after 100 attempts".format(IP_FORMAT.format(node))) \ No newline at end of file diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index bf6bb1d4..045809a6 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -30,7 +30,7 @@ from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy, WhiteListRoundRobinPolicy) from cassandra.query import SimpleStatement, TraceUnavailable -from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, get_node +from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, get_node, execute_until_pass from tests.integration.util import assert_quiescent_pool_state @@ -72,14 +72,14 @@ class ClusterTests(unittest.TestCase): cluster = Cluster(protocol_version=PROTOCOL_VERSION) session = cluster.connect() - result = session.execute( + result = execute_until_pass(session, """ CREATE KEYSPACE clustertests WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} """) self.assertEqual(None, result) - result = session.execute( + result = execute_until_pass(session, """ CREATE TABLE clustertests.cf0 ( a text, @@ -99,7 +99,7 @@ class ClusterTests(unittest.TestCase): result = session.execute("SELECT * FROM clustertests.cf0") self.assertEqual([('a', 'b', 'c')], result) - session.execute("DROP KEYSPACE clustertests") + execute_until_pass(session, "DROP KEYSPACE clustertests") cluster.shutdown() @@ -227,7 +227,7 @@ class ClusterTests(unittest.TestCase): other_cluster = Cluster(protocol_version=PROTOCOL_VERSION) session = other_cluster.connect() - session.execute( + execute_until_pass(session, """ CREATE KEYSPACE newkeyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} @@ -238,7 +238,7 @@ class ClusterTests(unittest.TestCase): self.assertIn("newkeyspace", cluster.metadata.keyspaces) - session.execute("DROP KEYSPACE newkeyspace") + execute_until_pass(session, "DROP KEYSPACE newkeyspace") cluster.shutdown() other_cluster.shutdown() @@ -303,7 +303,7 @@ class ClusterTests(unittest.TestCase): keyspace_name = 'test1rf' type_name = self._testMethodName - session.execute('CREATE TYPE IF NOT EXISTS %s.%s (one int, two text)' % (keyspace_name, type_name)) + execute_until_pass(session, 'CREATE TYPE IF NOT EXISTS %s.%s (one int, two text)' % (keyspace_name, type_name)) original_meta = cluster.metadata.keyspaces original_test1rf_meta = original_meta[keyspace_name] original_type_meta = original_test1rf_meta.user_types[type_name] diff --git a/tests/integration/standard/test_concurrent.py b/tests/integration/standard/test_concurrent.py index 8b2bd8e6..9a823383 100644 --- a/tests/integration/standard/test_concurrent.py +++ b/tests/integration/standard/test_concurrent.py @@ -12,6 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from itertools import cycle +import sys, logging, traceback + +from cassandra import InvalidRequest, ConsistencyLevel, ReadTimeout, WriteTimeout, OperationTimedOut, \ + ReadFailure, WriteFailure +from cassandra.cluster import Cluster, PagedResult +from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args +from cassandra.policies import HostDistance +from cassandra.query import tuple_factory, SimpleStatement + from tests.integration import use_singledc, PROTOCOL_VERSION try: @@ -19,14 +29,7 @@ try: except ImportError: import unittest # noqa -from itertools import cycle - -from cassandra import InvalidRequest, ConsistencyLevel -from cassandra.cluster import Cluster, PagedResult -from cassandra.concurrent import (execute_concurrent, - execute_concurrent_with_args) -from cassandra.policies import HostDistance -from cassandra.query import tuple_factory, SimpleStatement +log = logging.getLogger(__name__) def setup_module(): @@ -47,6 +50,31 @@ class ClusterTests(unittest.TestCase): def tearDownClass(cls): cls.cluster.shutdown() + def execute_concurrent_helper(self, session, query): + count = 0 + while count < 100: + try: + return execute_concurrent(session, query) + except (ReadTimeout, WriteTimeout, OperationTimedOut, ReadFailure, WriteFailure): + ex_type, ex, tb = sys.exc_info() + log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) + del tb + count += 1 + + raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query)) + + def execute_concurrent_args_helper(self, session, query, params): + count = 0 + while count < 100: + try: + return execute_concurrent_with_args(session, query, params) + except (ReadTimeout, WriteTimeout, OperationTimedOut, ReadFailure, WriteFailure): + ex_type, ex, tb = sys.exc_info() + log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) + del tb + + raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query)) + def test_execute_concurrent(self): for num_statements in (0, 1, 2, 7, 10, 99, 100, 101, 199, 200, 201): # write @@ -56,7 +84,7 @@ class ClusterTests(unittest.TestCase): statements = cycle((statement, )) parameters = [(i, i) for i in range(num_statements)] - results = execute_concurrent(self.session, list(zip(statements, parameters))) + results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters))) self.assertEqual(num_statements, len(results)) self.assertEqual([(True, None)] * num_statements, results) @@ -67,7 +95,7 @@ class ClusterTests(unittest.TestCase): statements = cycle((statement, )) parameters = [(i, ) for i in range(num_statements)] - results = execute_concurrent(self.session, list(zip(statements, parameters))) + results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters))) self.assertEqual(num_statements, len(results)) self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results) @@ -78,7 +106,7 @@ class ClusterTests(unittest.TestCase): consistency_level=ConsistencyLevel.QUORUM) parameters = [(i, i) for i in range(num_statements)] - results = execute_concurrent_with_args(self.session, statement, parameters) + results = self.execute_concurrent_args_helper(self.session, statement, parameters) self.assertEqual(num_statements, len(results)) self.assertEqual([(True, None)] * num_statements, results) @@ -88,7 +116,7 @@ class ClusterTests(unittest.TestCase): consistency_level=ConsistencyLevel.QUORUM) parameters = [(i, ) for i in range(num_statements)] - results = execute_concurrent_with_args(self.session, statement, parameters) + results = self.execute_concurrent_args_helper(self.session, statement, parameters) self.assertEqual(num_statements, len(results)) self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results) @@ -104,7 +132,7 @@ class ClusterTests(unittest.TestCase): consistency_level=ConsistencyLevel.QUORUM) parameters = [(i, i) for i in range(num_statements)] - results = execute_concurrent_with_args(self.session, statement, parameters) + results = self.execute_concurrent_args_helper(self.session, statement, parameters) self.assertEqual(num_statements, len(results)) self.assertEqual([(True, None)] * num_statements, results) @@ -115,7 +143,7 @@ class ClusterTests(unittest.TestCase): fetch_size=int(num_statements / 2)) parameters = [(i, ) for i in range(num_statements)] - results = execute_concurrent_with_args(self.session, statement, [(num_statements,)]) + results = self.execute_concurrent_args_helper(self.session, statement, [(num_statements,)]) self.assertEqual(1, len(results)) self.assertTrue(results[0][0]) result = results[0][1] diff --git a/tests/integration/standard/test_metadata.py b/tests/integration/standard/test_metadata.py index 2bb88b50..ae1a37a0 100644 --- a/tests/integration/standard/test_metadata.py +++ b/tests/integration/standard/test_metadata.py @@ -17,15 +17,10 @@ try: except ImportError: import unittest # noqa -import difflib +import difflib, six, sys from mock import Mock -import logging -import six -import sys -import traceback -from cassandra import AlreadyExists, OperationTimedOut, SignatureDescriptor, UserFunctionDescriptor, \ - UserAggregateDescriptor +from cassandra import AlreadyExists, SignatureDescriptor, UserFunctionDescriptor, UserAggregateDescriptor from cassandra.cluster import Cluster from cassandra.cqltypes import DoubleType, Int32Type, ListType, UTF8Type, MapType @@ -35,10 +30,7 @@ from cassandra.metadata import (Metadata, KeyspaceMetadata, TableMetadata, Index from cassandra.policies import SimpleConvictionPolicy from cassandra.pool import Host -from tests.integration import (get_cluster, use_singledc, PROTOCOL_VERSION, - get_server_versions) - -log = logging.getLogger(__name__) +from tests.integration import get_cluster, use_singledc, PROTOCOL_VERSION, get_server_versions, execute_until_pass def setup_module(): @@ -58,18 +50,12 @@ class SchemaMetadataTests(unittest.TestCase): self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) self.session = self.cluster.connect() - self.session.execute("CREATE KEYSPACE schemametadatatest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}") + execute_until_pass(self.session, + "CREATE KEYSPACE schemametadatatest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}") def tearDown(self): - while True: - try: - self.session.execute("DROP KEYSPACE schemametadatatest") - self.cluster.shutdown() - break - except OperationTimedOut: - ex_type, ex, tb = sys.exc_info() - log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) - del tb + execute_until_pass(self.session, "DROP KEYSPACE schemametadatatest") + self.cluster.shutdown() def make_create_statement(self, partition_cols, clustering_cols=None, other_cols=None, compact=False): clustering_cols = clustering_cols or [] @@ -107,13 +93,13 @@ class SchemaMetadataTests(unittest.TestCase): def check_create_statement(self, tablemeta, original): recreate = tablemeta.as_cql_query(formatted=False) self.assertEqual(original, recreate[:len(original)]) - self.session.execute("DROP TABLE %s.%s" % (self.ksname, self.cfname)) - self.session.execute(recreate) + execute_until_pass(self.session, "DROP TABLE {0}.{1}".format(self.ksname, self.cfname)) + execute_until_pass(self.session, recreate) # create the table again, but with formatting enabled - self.session.execute("DROP TABLE %s.%s" % (self.ksname, self.cfname)) + execute_until_pass(self.session, "DROP TABLE {0}.{1}".format(self.ksname, self.cfname)) recreate = tablemeta.as_cql_query(formatted=True) - self.session.execute(recreate) + execute_until_pass(self.session, recreate) def get_table_metadata(self): self.cluster.refresh_table_metadata(self.ksname, self.cfname) diff --git a/tests/integration/standard/test_metrics.py b/tests/integration/standard/test_metrics.py index 7b194049..6731e907 100644 --- a/tests/integration/standard/test_metrics.py +++ b/tests/integration/standard/test_metrics.py @@ -23,7 +23,7 @@ from cassandra.query import SimpleStatement from cassandra import ConsistencyLevel, WriteTimeout, Unavailable, ReadTimeout from cassandra.cluster import Cluster, NoHostAvailable -from tests.integration import get_cluster, get_node, use_singledc, PROTOCOL_VERSION +from tests.integration import get_cluster, get_node, use_singledc, PROTOCOL_VERSION, execute_until_pass def setup_module(): @@ -76,7 +76,7 @@ class MetricsTests(unittest.TestCase): # Assert read query = SimpleStatement("SELECT * FROM test WHERE k=1", consistency_level=ConsistencyLevel.ALL) - results = session.execute(query) + results = execute_until_pass(session, query) self.assertEqual(1, len(results)) # Pause node so it shows as unreachable to coordinator @@ -109,7 +109,7 @@ class MetricsTests(unittest.TestCase): # Assert read query = SimpleStatement("SELECT * FROM test WHERE k=1", consistency_level=ConsistencyLevel.ALL) - results = session.execute(query) + results = execute_until_pass(session, query) self.assertEqual(1, len(results)) # Pause node so it shows as unreachable to coordinator @@ -142,11 +142,11 @@ class MetricsTests(unittest.TestCase): # Assert read query = SimpleStatement("SELECT * FROM test WHERE k=1", consistency_level=ConsistencyLevel.ALL) - results = session.execute(query) + results = execute_until_pass(session, query) self.assertEqual(1, len(results)) # Stop node gracefully - get_node(1).stop(wait=True, gently=True) + get_node(1).stop(wait=True, wait_other_notice=True) try: # Test write diff --git a/tests/integration/standard/test_query.py b/tests/integration/standard/test_query.py index dc98505b..80a0d8e2 100644 --- a/tests/integration/standard/test_query.py +++ b/tests/integration/standard/test_query.py @@ -512,8 +512,14 @@ class LightweightTransactionTests(unittest.TestCase): if type(result).__name__ == "WriteTimeout": received_timeout = True continue + if type(result).__name__ == "WriteFailure": + received_timeout = True + continue if type(result).__name__ == "ReadTimeout": continue + if type(result).__name__ == "ReadFailure": + continue + self.fail("Unexpected exception %s: %s" % (type(result).__name__, result.message)) # Make sure test passed diff --git a/tests/integration/standard/test_types.py b/tests/integration/standard/test_types.py index c6c8114b..3dd87e82 100644 --- a/tests/integration/standard/test_types.py +++ b/tests/integration/standard/test_types.py @@ -17,9 +17,6 @@ try: except ImportError: import unittest # noqa -import logging -log = logging.getLogger(__name__) - from datetime import datetime import six @@ -29,7 +26,7 @@ from cassandra.cqltypes import Int32Type, EMPTY from cassandra.query import dict_factory, ordered_dict_factory from cassandra.util import sortedset -from tests.integration import get_server_versions, use_singledc, PROTOCOL_VERSION +from tests.integration import get_server_versions, use_singledc, PROTOCOL_VERSION, execute_until_pass from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES, COLLECTION_TYPES, \ get_sample, get_collection_sample @@ -51,9 +48,9 @@ class TypeTests(unittest.TestCase): @classmethod def tearDownClass(cls): - cls.session.execute("DROP KEYSPACE typetests") + execute_until_pass(cls.session, "DROP KEYSPACE typetests") cls.cluster.shutdown() - + def test_can_insert_blob_type_as_string(self): """ Tests that byte strings in Python maps to blob type in Cassandra diff --git a/tests/integration/standard/test_udts.py b/tests/integration/standard/test_udts.py index 10b9dd39..7d9eb991 100644 --- a/tests/integration/standard/test_udts.py +++ b/tests/integration/standard/test_udts.py @@ -17,9 +17,6 @@ try: except ImportError: import unittest # noqa -import logging -log = logging.getLogger(__name__) - from collections import namedtuple from functools import partial @@ -28,7 +25,7 @@ from cassandra.cluster import Cluster, UserTypeDoesNotExist from cassandra.query import dict_factory from cassandra.util import OrderedMap -from tests.integration import get_server_versions, use_singledc, PROTOCOL_VERSION +from tests.integration import get_server_versions, use_singledc, PROTOCOL_VERSION, execute_until_pass from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES, COLLECTION_TYPES, \ get_sample, get_collection_sample @@ -51,13 +48,14 @@ class UDTTests(unittest.TestCase): self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) self.session = self.cluster.connect() - self.session.execute("CREATE KEYSPACE udttests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}") + execute_until_pass(self.session, + "CREATE KEYSPACE udttests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}") self.cluster.shutdown() def tearDown(self): self.cluster = Cluster(protocol_version=PROTOCOL_VERSION) self.session = self.cluster.connect() - self.session.execute("DROP KEYSPACE udttests") + execute_until_pass(self.session, "DROP KEYSPACE udttests") self.cluster.shutdown() def test_can_insert_unprepared_registered_udts(self): diff --git a/tests/integration/cqlengine/test_load.py b/tests/stress_tests/test_load.py similarity index 52% rename from tests/integration/cqlengine/test_load.py rename to tests/stress_tests/test_load.py index 4c62a525..6283d2e4 100644 --- a/tests/integration/cqlengine/test_load.py +++ b/tests/stress_tests/test_load.py @@ -17,35 +17,37 @@ except ImportError: import unittest # noqa import gc -import os -import resource from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model from cassandra.cqlengine.management import sync_table - -class LoadTest(Model): - k = columns.Integer(primary_key=True) - v = columns.Integer() +from tests.integration.cqlengine.base import BaseCassEngTestCase -@unittest.skipUnless("LOADTEST" in os.environ, "LOADTEST not on") -def test_lots_of_queries(): - sync_table(LoadTest) - import objgraph - gc.collect() - objgraph.show_most_common_types() +class LoadTests(BaseCassEngTestCase): - print("Starting...") + def test_lots_of_queries(self): + import resource + import objgraph - for i in range(1000000): - if i % 25000 == 0: - # print memory statistic - print("Memory usage: %s" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)) + class LoadTest(Model): + k = columns.Integer(primary_key=True) + v = columns.Integer() - LoadTest.create(k=i, v=i) + sync_table(LoadTest) + gc.collect() + objgraph.show_most_common_types() - objgraph.show_most_common_types() + print("Starting...") - raise Exception("you shouldn't be here") + for i in range(1000000): + if i % 25000 == 0: + # print memory statistic + print("Memory usage: %s" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)) + + LoadTest.create(k=i, v=i) + + objgraph.show_most_common_types() + + raise Exception("you shouldn't be here") diff --git a/tests/integration/long/test_multi_inserts.py b/tests/stress_tests/test_multi_inserts.py old mode 100755 new mode 100644 similarity index 76% rename from tests/integration/long/test_multi_inserts.py rename to tests/stress_tests/test_multi_inserts.py index 10422c97..b23a29dd --- a/tests/integration/long/test_multi_inserts.py +++ b/tests/stress_tests/test_multi_inserts.py @@ -1,3 +1,16 @@ +# Copyright 2013-2015 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. try: import unittest2 as unittest @@ -14,11 +27,10 @@ def setup_module(): class StressInsertsTests(unittest.TestCase): - - ''' + """ Test case for PYTHON-124: Repeated inserts may exhaust all connections causing NoConnectionsAvailable, in_flight never decreased - ''' + """ def setUp(self): """ @@ -61,7 +73,7 @@ class StressInsertsTests(unittest.TestCase): for pool in self.session._pools.values(): if leaking_connections: break - for conn in pool._connections: + for conn in pool.get_connections(): if conn.in_flight > 1: print self.session.get_pool_state() leaking_connections = True diff --git a/tests/unit/io/test_asyncorereactor.py b/tests/unit/io/test_asyncorereactor.py index 17e42a02..988f6488 100644 --- a/tests/unit/io/test_asyncorereactor.py +++ b/tests/unit/io/test_asyncorereactor.py @@ -44,7 +44,7 @@ class AsyncoreConnectionTest(unittest.TestCase): @classmethod def setUpClass(cls): if is_monkey_patched(): - raise unittest.SkipTest("monkey-patching detected") + return AsyncoreConnection.initialize_reactor() cls.socket_patcher = patch('socket.socket', spec=socket.socket) cls.mock_socket = cls.socket_patcher.start() @@ -56,8 +56,14 @@ class AsyncoreConnectionTest(unittest.TestCase): @classmethod def tearDownClass(cls): + if is_monkey_patched(): + return cls.socket_patcher.stop() + def setUp(self): + if is_monkey_patched(): + raise unittest.SkipTest("Can't test asyncore with monkey patching") + def make_connection(self): c = AsyncoreConnection('1.2.3.4', cql_version='3.0.1') c.socket = Mock() diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py index ddd2dc04..39d66b68 100644 --- a/tests/unit/io/test_libevreactor.py +++ b/tests/unit/io/test_libevreactor.py @@ -49,7 +49,7 @@ class LibevConnectionTest(unittest.TestCase): def setUp(self): if 'gevent.monkey' in sys.modules: - raise unittest.SkipTest("gevent monkey-patching detected") + raise unittest.SkipTest("Can't test libev with monkey patching") if LibevConnection is None: raise unittest.SkipTest('libev does not appear to be installed correctly') LibevConnection.initialize_reactor() diff --git a/tests/unit/test_time_util.py b/tests/unit/test_time_util.py index 0a3209a5..1c3dc1b4 100644 --- a/tests/unit/test_time_util.py +++ b/tests/unit/test_time_util.py @@ -42,11 +42,11 @@ class TimeUtilTest(unittest.TestCase): u = uuid.uuid1(node, 0) t = util.unix_time_from_uuid1(u) - self.assertAlmostEqual(now, t, 3) + self.assertAlmostEqual(now, t, 2) dt = util.datetime_from_uuid1(u) t = calendar.timegm(dt.timetuple()) + dt.microsecond / 1e6 - self.assertAlmostEqual(now, t, 3) + self.assertAlmostEqual(now, t, 2) def test_uuid_from_time(self): t = time.time()