Merged into master support for testing with remote cluster

This commit is contained in:
bjmb
2017-03-15 14:05:20 -04:00
18 changed files with 143 additions and 66 deletions

View File

@@ -12,9 +12,14 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import logging import logging
import sys import sys
import socket import socket
import platform
log = logging.getLogger() log = logging.getLogger()
log.setLevel('DEBUG') log.setLevel('DEBUG')
@@ -41,3 +46,5 @@ def is_gevent_monkey_patched():
def is_monkey_patched(): def is_monkey_patched():
return is_gevent_monkey_patched() or is_eventlet_monkey_patched() return is_gevent_monkey_patched() or is_eventlet_monkey_patched()
notwindows = unittest.skipUnless(not "Windows" in platform.system(), "This test is not adequate for windows")

View File

@@ -115,6 +115,7 @@ def _get_cass_version_from_dse(dse_version):
return cass_ver return cass_ver
CASSANDRA_IP = os.getenv('CASSANDRA_IP', '127.0.0.1')
CASSANDRA_DIR = os.getenv('CASSANDRA_DIR', None) CASSANDRA_DIR = os.getenv('CASSANDRA_DIR', None)
DSE_VERSION = os.getenv('DSE_VERSION', None) DSE_VERSION = os.getenv('DSE_VERSION', None)
DSE_CRED = os.getenv('DSE_CREDS', None) DSE_CRED = os.getenv('DSE_CREDS', None)
@@ -141,6 +142,18 @@ if DSE_VERSION:
CCM_KWARGS['dse_credentials_file'] = DSE_CRED CCM_KWARGS['dse_credentials_file'] = DSE_CRED
#This changes the default contact_point parameter in Cluster
def set_default_cass_ip():
if CASSANDRA_IP.startswith("127.0.0."):
return
defaults = list(Cluster.__init__.__defaults__)
defaults = [[CASSANDRA_IP]] + defaults[1:]
try:
Cluster.__init__.__defaults__ = tuple(defaults)
except:
Cluster.__init__.__func__.__defaults__ = tuple(defaults)
def get_default_protocol(): def get_default_protocol():
if Version(CASSANDRA_VERSION) >= Version('2.2'): if Version(CASSANDRA_VERSION) >= Version('2.2'):
@@ -208,6 +221,7 @@ default_protocol_version = get_default_protocol()
PROTOCOL_VERSION = int(os.getenv('PROTOCOL_VERSION', default_protocol_version)) PROTOCOL_VERSION = int(os.getenv('PROTOCOL_VERSION', default_protocol_version))
local = unittest.skipUnless(CASSANDRA_IP.startswith("127.0.0."), 'Tests only runs against local C*')
notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported') notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported')
lessthenprotocolv4 = unittest.skipUnless(PROTOCOL_VERSION < 4, 'Protocol versions 4 or greater not supported') lessthenprotocolv4 = unittest.skipUnless(PROTOCOL_VERSION < 4, 'Protocol versions 4 or greater not supported')
greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported') greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported')
@@ -299,6 +313,8 @@ def is_current_cluster(cluster_name, node_counts):
def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]): def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
set_default_cass_ip()
global CCM_CLUSTER global CCM_CLUSTER
if USE_CASS_EXTERNAL: if USE_CASS_EXTERNAL:
if CCM_CLUSTER: if CCM_CLUSTER:

View File

@@ -24,7 +24,7 @@ from cassandra.cqlengine import connection
from cassandra.cqlengine.management import create_keyspace_simple, CQLENG_ALLOW_SCHEMA_MANAGEMENT from cassandra.cqlengine.management import create_keyspace_simple, CQLENG_ALLOW_SCHEMA_MANAGEMENT
import cassandra import cassandra
from tests.integration import get_server_versions, use_single_node, PROTOCOL_VERSION from tests.integration import get_server_versions, use_single_node, PROTOCOL_VERSION, CASSANDRA_IP, set_default_cass_ip
DEFAULT_KEYSPACE = 'cqlengine_test' DEFAULT_KEYSPACE = 'cqlengine_test'
@@ -35,6 +35,7 @@ def setup_package():
warnings.simplefilter('always') # for testing warnings, make sure all are let through warnings.simplefilter('always') # for testing warnings, make sure all are let through
os.environ[CQLENG_ALLOW_SCHEMA_MANAGEMENT] = '1' os.environ[CQLENG_ALLOW_SCHEMA_MANAGEMENT] = '1'
set_default_cass_ip()
use_single_node() use_single_node()
setup_connection(DEFAULT_KEYSPACE) setup_connection(DEFAULT_KEYSPACE)
@@ -52,7 +53,7 @@ def is_prepend_reversed():
def setup_connection(keyspace_name): def setup_connection(keyspace_name):
connection.setup(['127.0.0.1'], connection.setup([CASSANDRA_IP],
consistency=ConsistencyLevel.ONE, consistency=ConsistencyLevel.ONE,
protocol_version=PROTOCOL_VERSION, protocol_version=PROTOCOL_VERSION,
default_keyspace=keyspace_name) default_keyspace=keyspace_name)

View File

@@ -20,11 +20,13 @@ import sys
import traceback import traceback
from uuid import uuid4 from uuid import uuid4
from cassandra import WriteTimeout from cassandra import WriteTimeout, OperationTimedOut
import cassandra.cqlengine.columns as columns import cassandra.cqlengine.columns as columns
from cassandra.cqlengine.functions import get_total_seconds from cassandra.cqlengine.functions import get_total_seconds
from cassandra.cqlengine.models import Model, ValidationError from cassandra.cqlengine.models import Model, ValidationError
from cassandra.cqlengine.management import sync_table, drop_table from cassandra.cqlengine.management import sync_table, drop_table
from tests.integration import CASSANDRA_IP
from tests.integration.cqlengine import is_prepend_reversed from tests.integration.cqlengine import is_prepend_reversed
from tests.integration.cqlengine.base import BaseCassEngTestCase from tests.integration.cqlengine.base import BaseCassEngTestCase
from tests.integration import greaterthancass20, CASSANDRA_VERSION from tests.integration import greaterthancass20, CASSANDRA_VERSION
@@ -134,8 +136,11 @@ class TestSetColumn(BaseCassEngTestCase):
break break
except WriteTimeout: except WriteTimeout:
ex_type, ex, tb = sys.exc_info() ex_type, ex, tb = sys.exc_info()
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) log.warning("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
del tb del tb
except OperationTimedOut:
#This will happen if the host is remote
self.assertFalse(CASSANDRA_IP.startswith("127.0.0."))
self.assertRaises(ValidationError, TestSetModel.create, **{'text_set': set(str(uuid4()) for i in range(65536))}) self.assertRaises(ValidationError, TestSetModel.create, **{'text_set': set(str(uuid4()) for i in range(65536))})
def test_partial_updates(self): def test_partial_updates(self):

View File

@@ -24,7 +24,7 @@ from cassandra.cqlengine.management import sync_table
from cassandra.cluster import Cluster from cassandra.cluster import Cluster
from cassandra.query import dict_factory from cassandra.query import dict_factory
from tests.integration import PROTOCOL_VERSION, execute_with_long_wait_retry from tests.integration import PROTOCOL_VERSION, execute_with_long_wait_retry, local
from tests.integration.cqlengine.base import BaseCassEngTestCase from tests.integration.cqlengine.base import BaseCassEngTestCase
from tests.integration.cqlengine import DEFAULT_KEYSPACE, setup_connection from tests.integration.cqlengine import DEFAULT_KEYSPACE, setup_connection
from cassandra.cqlengine import models from cassandra.cqlengine import models
@@ -95,10 +95,12 @@ class ConnectionTest(BaseCassEngTestCase):
self.assertEqual(1, TestConnectModel.objects.count()) self.assertEqual(1, TestConnectModel.objects.count())
self.assertEqual(TestConnectModel.objects.first(), TCM2) self.assertEqual(TestConnectModel.objects.first(), TCM2)
@local
def test_connection_setup_with_setup(self): def test_connection_setup_with_setup(self):
connection.setup(hosts=None, default_keyspace=None) connection.setup(hosts=None, default_keyspace=None)
self.assertIsNotNone(connection.get_connection("default").cluster.metadata.get_host("127.0.0.1")) self.assertIsNotNone(connection.get_connection("default").cluster.metadata.get_host("127.0.0.1"))
@local
def test_connection_setup_with_default(self): def test_connection_setup_with_default(self):
connection.default() connection.default()
self.assertIsNotNone(connection.get_connection("default").cluster.metadata.get_host("127.0.0.1")) self.assertIsNotNone(connection.get_connection("default").cluster.metadata.get_host("127.0.0.1"))

View File

@@ -23,6 +23,7 @@ from cassandra.cqlengine.query import ContextQuery, BatchQuery, ModelQuerySet
from tests.integration.cqlengine import setup_connection, DEFAULT_KEYSPACE from tests.integration.cqlengine import setup_connection, DEFAULT_KEYSPACE
from tests.integration.cqlengine.base import BaseCassEngTestCase from tests.integration.cqlengine.base import BaseCassEngTestCase
from tests.integration.cqlengine.query import test_queryset from tests.integration.cqlengine.query import test_queryset
from tests.integration import local, CASSANDRA_IP
class TestModel(Model): class TestModel(Model):
@@ -44,7 +45,6 @@ class AnotherTestModel(Model):
count = columns.Integer() count = columns.Integer()
text = columns.Text() text = columns.Text()
class ContextQueryConnectionTests(BaseCassEngTestCase): class ContextQueryConnectionTests(BaseCassEngTestCase):
@classmethod @classmethod
@@ -53,8 +53,8 @@ class ContextQueryConnectionTests(BaseCassEngTestCase):
create_keyspace_simple('ks1', 1) create_keyspace_simple('ks1', 1)
conn.unregister_connection('default') conn.unregister_connection('default')
conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True) conn.register_connection('fake_cluster', ['1.2.3.4'], lazy_connect=True, retry_connect=True, default=True)
conn.register_connection('cluster', ['127.0.0.1']) conn.register_connection('cluster', [CASSANDRA_IP])
with ContextQuery(TestModel, connection='cluster') as tm: with ContextQuery(TestModel, connection='cluster') as tm:
sync_table(tm) sync_table(tm)
@@ -141,7 +141,7 @@ class ManagementConnectionTests(BaseCassEngTestCase):
super(ManagementConnectionTests, cls).setUpClass() super(ManagementConnectionTests, cls).setUpClass()
conn.unregister_connection('default') conn.unregister_connection('default')
conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True) conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True)
conn.register_connection('cluster', ['127.0.0.1']) conn.register_connection('cluster', [CASSANDRA_IP])
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
@@ -227,11 +227,11 @@ class ManagementConnectionTests(BaseCassEngTestCase):
@test_category object_mapper @test_category object_mapper
""" """
cluster = Cluster(['127.0.0.1']) cluster = Cluster([CASSANDRA_IP])
session = cluster.connect() session = cluster.connect()
connection_name = 'from_session' connection_name = 'from_session'
conn.register_connection(connection_name, session=session) conn.register_connection(connection_name, session=session)
self.assertIsNotNone(conn.get_connection(connection_name).cluster.metadata.get_host("127.0.0.1")) self.assertIsNotNone(conn.get_connection(connection_name).cluster.metadata.get_host(CASSANDRA_IP))
self.addCleanup(conn.unregister_connection, connection_name) self.addCleanup(conn.unregister_connection, connection_name)
cluster.shutdown() cluster.shutdown()
@@ -245,8 +245,8 @@ class ManagementConnectionTests(BaseCassEngTestCase):
@test_category object_mapper @test_category object_mapper
""" """
connection_name = 'from_hosts' connection_name = 'from_hosts'
conn.register_connection(connection_name, hosts=['127.0.0.1']) conn.register_connection(connection_name, hosts=[CASSANDRA_IP])
self.assertIsNotNone(conn.get_connection(connection_name).cluster.metadata.get_host("127.0.0.1")) self.assertIsNotNone(conn.get_connection(connection_name).cluster.metadata.get_host(CASSANDRA_IP))
self.addCleanup(conn.unregister_connection, connection_name) self.addCleanup(conn.unregister_connection, connection_name)
def test_connection_param_validation(self): def test_connection_param_validation(self):
@@ -258,7 +258,7 @@ class ManagementConnectionTests(BaseCassEngTestCase):
@test_category object_mapper @test_category object_mapper
""" """
cluster = Cluster(['127.0.0.1']) cluster = Cluster([CASSANDRA_IP])
session = cluster.connect() session = cluster.connect()
with self.assertRaises(CQLEngineException): with self.assertRaises(CQLEngineException):
conn.register_connection("bad_coonection1", session=session, consistency="not_null") conn.register_connection("bad_coonection1", session=session, consistency="not_null")
@@ -275,6 +275,8 @@ class ManagementConnectionTests(BaseCassEngTestCase):
cluster.shutdown() cluster.shutdown()
cluster.shutdown()
class BatchQueryConnectionTests(BaseCassEngTestCase): class BatchQueryConnectionTests(BaseCassEngTestCase):
conns = ['cluster'] conns = ['cluster']
@@ -289,7 +291,7 @@ class BatchQueryConnectionTests(BaseCassEngTestCase):
conn.unregister_connection('default') conn.unregister_connection('default')
conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True) conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True)
conn.register_connection('cluster', ['127.0.0.1']) conn.register_connection('cluster', [CASSANDRA_IP])
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
@@ -415,7 +417,6 @@ class BatchQueryConnectionTests(BaseCassEngTestCase):
with BatchQuery(connection='cluster') as b: with BatchQuery(connection='cluster') as b:
obj1.batch(b).using(connection='test').save() obj1.batch(b).using(connection='test').save()
class UsingDescriptorTests(BaseCassEngTestCase): class UsingDescriptorTests(BaseCassEngTestCase):
conns = ['cluster'] conns = ['cluster']
@@ -427,7 +428,7 @@ class UsingDescriptorTests(BaseCassEngTestCase):
conn.unregister_connection('default') conn.unregister_connection('default')
conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True) conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True)
conn.register_connection('cluster', ['127.0.0.1']) conn.register_connection('cluster', [CASSANDRA_IP])
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
@@ -527,13 +528,12 @@ class ModelQuerySetNew(ModelQuerySet):
super(ModelQuerySetNew, self).__init__(*args, **kwargs) super(ModelQuerySetNew, self).__init__(*args, **kwargs)
self._connection = "cluster" self._connection = "cluster"
class BaseConnectionTestNoDefault(object): class BaseConnectionTestNoDefault(object):
conns = ['cluster'] conns = ['cluster']
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
conn.register_connection('cluster', ['127.0.0.1']) conn.register_connection('cluster', [CASSANDRA_IP])
test_queryset.TestModel.__queryset__ = ModelQuerySetNew test_queryset.TestModel.__queryset__ = ModelQuerySetNew
test_queryset.IndexedTestModel.__queryset__ = ModelQuerySetNew test_queryset.IndexedTestModel.__queryset__ = ModelQuerySetNew
test_queryset.IndexedCollectionsTestModel.__queryset__ = ModelQuerySetNew test_queryset.IndexedCollectionsTestModel.__queryset__ = ModelQuerySetNew

View File

@@ -18,7 +18,8 @@ import time
from cassandra.cluster import Cluster, NoHostAvailable from cassandra.cluster import Cluster, NoHostAvailable
from cassandra.auth import PlainTextAuthProvider, SASLClient, SaslAuthProvider from cassandra.auth import PlainTextAuthProvider, SASLClient, SaslAuthProvider
from tests.integration import use_singledc, get_cluster, remove_cluster, PROTOCOL_VERSION from tests.integration import use_singledc, get_cluster, remove_cluster, PROTOCOL_VERSION, CASSANDRA_IP, \
set_default_cass_ip
from tests.integration.util import assert_quiescent_pool_state from tests.integration.util import assert_quiescent_pool_state
try: try:
@@ -29,18 +30,25 @@ except ImportError:
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
#This can be tested for remote hosts, but the cluster has to be configured accordingly
#@local
def setup_module(): def setup_module():
use_singledc(start=False) if CASSANDRA_IP.startswith("127.0.0."):
ccm_cluster = get_cluster() use_singledc(start=False)
ccm_cluster.stop() ccm_cluster = get_cluster()
config_options = {'authenticator': 'PasswordAuthenticator', ccm_cluster.stop()
'authorizer': 'CassandraAuthorizer'} config_options = {'authenticator': 'PasswordAuthenticator',
ccm_cluster.set_configuration_options(config_options) 'authorizer': 'CassandraAuthorizer'}
log.debug("Starting ccm test cluster with %s", config_options) ccm_cluster.set_configuration_options(config_options)
ccm_cluster.start(wait_for_binary_proto=True, wait_other_notice=True) log.debug("Starting ccm test cluster with %s", config_options)
# there seems to be some race, with some versions of C* taking longer to ccm_cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
# get the auth (and default user) setup. Sleep here to give it a chance # there seems to be some race, with some versions of C* taking longer to
time.sleep(10) # get the auth (and default user) setup. Sleep here to give it a chance
time.sleep(10)
else:
set_default_cass_ip()
def teardown_module(): def teardown_module():

View File

@@ -21,7 +21,7 @@ except ImportError:
from cassandra.query import BatchStatement from cassandra.query import BatchStatement
from cassandra.cluster import Cluster from cassandra.cluster import Cluster
from tests.integration import use_singledc, PROTOCOL_VERSION from tests.integration import use_singledc, PROTOCOL_VERSION, local
def setup_module(): def setup_module():
@@ -93,6 +93,7 @@ class ClientWarningTests(unittest.TestCase):
self.assertRegexpMatches(future.warnings[0], 'Batch.*exceeding.*') self.assertRegexpMatches(future.warnings[0], 'Batch.*exceeding.*')
self.assertIsNotNone(future.get_query_trace()) self.assertIsNotNone(future.get_query_trace())
@local
def test_warning_with_custom_payload(self): def test_warning_with_custom_payload(self):
""" """
Test to validate client warning with custom payload Test to validate client warning with custom payload
@@ -111,6 +112,7 @@ class ClientWarningTests(unittest.TestCase):
self.assertRegexpMatches(future.warnings[0], 'Batch.*exceeding.*') self.assertRegexpMatches(future.warnings[0], 'Batch.*exceeding.*')
self.assertDictEqual(future.custom_payload, payload) self.assertDictEqual(future.custom_payload, payload)
@local
def test_warning_with_trace_and_custom_payload(self): def test_warning_with_trace_and_custom_payload(self):
""" """
Test to validate client warning with tracing and client warning Test to validate client warning with tracing and client warning

View File

@@ -32,8 +32,9 @@ from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy,
WhiteListRoundRobinPolicy, AddressTranslator) WhiteListRoundRobinPolicy, AddressTranslator)
from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory
from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry, get_node,\
MockLoggingHandler, get_unsupported_lower_protocol, get_unsupported_upper_protocol, protocolv5 from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, CASSANDRA_VERSION, DSE_VERSION, execute_until_pass, execute_with_long_wait_retry, get_node,\
MockLoggingHandler, get_unsupported_lower_protocol, get_unsupported_upper_protocol, protocolv5, local, CASSANDRA_IP
from tests.integration.util import assert_quiescent_pool_state from tests.integration.util import assert_quiescent_pool_state
import sys import sys
@@ -56,7 +57,7 @@ class IgnoredHostPolicy(RoundRobinPolicy):
class ClusterTests(unittest.TestCase): class ClusterTests(unittest.TestCase):
@local
def test_ignored_host_up(self): def test_ignored_host_up(self):
""" """
Test to ensure that is_up is not set by default on ignored hosts Test to ensure that is_up is not set by default on ignored hosts
@@ -77,6 +78,7 @@ class ClusterTests(unittest.TestCase):
self.assertIsNone(host.is_up) self.assertIsNone(host.is_up)
cluster.shutdown() cluster.shutdown()
@local
def test_host_resolution(self): def test_host_resolution(self):
""" """
Test to insure A records are resolved appropriately. Test to insure A records are resolved appropriately.
@@ -90,6 +92,7 @@ class ClusterTests(unittest.TestCase):
cluster = Cluster(contact_points=["localhost"], protocol_version=PROTOCOL_VERSION, connect_timeout=1) cluster = Cluster(contact_points=["localhost"], protocol_version=PROTOCOL_VERSION, connect_timeout=1)
self.assertTrue('127.0.0.1' in cluster.contact_points_resolved) self.assertTrue('127.0.0.1' in cluster.contact_points_resolved)
@local
def test_host_duplication(self): def test_host_duplication(self):
""" """
Ensure that duplicate hosts in the contact points are surfaced in the cluster metadata Ensure that duplicate hosts in the contact points are surfaced in the cluster metadata
@@ -109,6 +112,7 @@ class ClusterTests(unittest.TestCase):
self.assertEqual(len(cluster.metadata.all_hosts()), 3) self.assertEqual(len(cluster.metadata.all_hosts()), 3)
cluster.shutdown() cluster.shutdown()
@local
def test_raise_error_on_control_connection_timeout(self): def test_raise_error_on_control_connection_timeout(self):
""" """
Test for initial control connection timeout Test for initial control connection timeout
@@ -436,9 +440,9 @@ class ClusterTests(unittest.TestCase):
self.assertEqual(original_type_meta.as_cql_query(), current_type_meta.as_cql_query()) self.assertEqual(original_type_meta.as_cql_query(), current_type_meta.as_cql_query())
cluster.shutdown() cluster.shutdown()
@local
def test_refresh_schema_no_wait(self): def test_refresh_schema_no_wait(self):
contact_points = [CASSANDRA_IP]
contact_points = ['127.0.0.1']
cluster = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=10, cluster = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=10,
contact_points=contact_points, load_balancing_policy=WhiteListRoundRobinPolicy(contact_points)) contact_points=contact_points, load_balancing_policy=WhiteListRoundRobinPolicy(contact_points))
session = cluster.connect() session = cluster.connect()
@@ -447,6 +451,7 @@ class ClusterTests(unittest.TestCase):
new_schema_ver = uuid4() new_schema_ver = uuid4()
session.execute("UPDATE system.local SET schema_version=%s WHERE key='local'", (new_schema_ver,)) session.execute("UPDATE system.local SET schema_version=%s WHERE key='local'", (new_schema_ver,))
try: try:
agreement_timeout = 1 agreement_timeout = 1
@@ -689,6 +694,7 @@ class ClusterTests(unittest.TestCase):
cluster.shutdown() cluster.shutdown()
@local
def test_profile_load_balancing(self): def test_profile_load_balancing(self):
""" """
Tests that profile load balancing policies are honored. Tests that profile load balancing policies are honored.
@@ -700,7 +706,7 @@ class ClusterTests(unittest.TestCase):
@test_category config_profiles @test_category config_profiles
""" """
query = "select release_version from system.local" query = "select release_version from system.local"
node1 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1'])) node1 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([CASSANDRA_IP]))
with Cluster(execution_profiles={'node1': node1}) as cluster: with Cluster(execution_profiles={'node1': node1}) as cluster:
session = cluster.connect(wait_for_all_pools=True) session = cluster.connect(wait_for_all_pools=True)
@@ -713,7 +719,7 @@ class ClusterTests(unittest.TestCase):
self.assertEqual(queried_hosts, expected_hosts) self.assertEqual(queried_hosts, expected_hosts)
# by name we should only hit the one # by name we should only hit the one
expected_hosts = set(h for h in cluster.metadata.all_hosts() if h.address == '127.0.0.1') expected_hosts = set(h for h in cluster.metadata.all_hosts() if h.address == CASSANDRA_IP)
queried_hosts = set() queried_hosts = set()
for _ in cluster.metadata.all_hosts(): for _ in cluster.metadata.all_hosts():
rs = session.execute(query, execution_profile='node1') rs = session.execute(query, execution_profile='node1')
@@ -837,6 +843,7 @@ class ClusterTests(unittest.TestCase):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
session.execute(query, execution_profile='rr3') session.execute(query, execution_profile='rr3')
@local
def test_profile_pool_management(self): def test_profile_pool_management(self):
""" """
Tests that changes to execution profiles correctly impact our cluster's pooling Tests that changes to execution profiles correctly impact our cluster's pooling
@@ -863,6 +870,7 @@ class ClusterTests(unittest.TestCase):
pools = session.get_pool_state() pools = session.get_pool_state()
self.assertEqual(set(h.address for h in pools), set(('127.0.0.1', '127.0.0.2', '127.0.0.3'))) self.assertEqual(set(h.address for h in pools), set(('127.0.0.1', '127.0.0.2', '127.0.0.3')))
@local
def test_add_profile_timeout(self): def test_add_profile_timeout(self):
""" """
Tests that EP Timeouts are honored. Tests that EP Timeouts are honored.
@@ -889,10 +897,12 @@ class ClusterTests(unittest.TestCase):
try: try:
self.assertRaises(cassandra.OperationTimedOut, cluster.add_execution_profile, 'node2', self.assertRaises(cassandra.OperationTimedOut, cluster.add_execution_profile, 'node2',
node2, pool_wait_timeout=sys.float_info.min) node2, pool_wait_timeout=sys.float_info.min)
break
except Exception: except Exception:
end = time.time() end = time.time()
self.assertAlmostEqual(start, end, 1) self.assertAlmostEqual(start, end, 1)
break else:
raise Exception("add_execution_profile didn't timeout after {0} retries".format(max_retry_count))
class LocalHostAdressTranslator(AddressTranslator): class LocalHostAdressTranslator(AddressTranslator):
@@ -904,7 +914,7 @@ class LocalHostAdressTranslator(AddressTranslator):
new_addr = self.addr_map.get(addr) new_addr = self.addr_map.get(addr)
return new_addr return new_addr
@local
class TestAddressTranslation(unittest.TestCase): class TestAddressTranslation(unittest.TestCase):
def test_address_translator_basic(self): def test_address_translator_basic(self):
@@ -948,11 +958,11 @@ class TestAddressTranslation(unittest.TestCase):
self.assertEqual(adder_map.get(str(host)), host.broadcast_address) self.assertEqual(adder_map.get(str(host)), host.broadcast_address)
c.shutdown() c.shutdown()
@local
class ContextManagementTest(unittest.TestCase): class ContextManagementTest(unittest.TestCase):
load_balancing_policy = WhiteListRoundRobinPolicy([CASSANDRA_IP])
load_balancing_policy = WhiteListRoundRobinPolicy(['127.0.0.1']) cluster_kwargs = {'execution_profiles': {EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=
cluster_kwargs = {'load_balancing_policy': load_balancing_policy, load_balancing_policy)},
'schema_metadata_enabled': False, 'schema_metadata_enabled': False,
'token_metadata_enabled': False} 'token_metadata_enabled': False}
@@ -1068,7 +1078,7 @@ class HostStateTest(unittest.TestCase):
time.sleep(.01) time.sleep(.01)
self.assertTrue(was_marked_down) self.assertTrue(was_marked_down)
@local
class DontPrepareOnIgnoredHostsTest(unittest.TestCase): class DontPrepareOnIgnoredHostsTest(unittest.TestCase):
ignored_addresses = ['127.0.0.3'] ignored_addresses = ['127.0.0.3']
@@ -1106,7 +1116,7 @@ class DontPrepareOnIgnoredHostsTest(unittest.TestCase):
self.assertEqual(call(unignored_address), c) self.assertEqual(call(unignored_address), c)
cluster.shutdown() cluster.shutdown()
@local
class DuplicateRpcTest(unittest.TestCase): class DuplicateRpcTest(unittest.TestCase):
load_balancing_policy = WhiteListRoundRobinPolicy(['127.0.0.1']) load_balancing_policy = WhiteListRoundRobinPolicy(['127.0.0.1'])

View File

@@ -32,7 +32,7 @@ from cassandra.policies import WhiteListRoundRobinPolicy, HostStateListener
from cassandra.pool import HostConnectionPool from cassandra.pool import HostConnectionPool
from tests import is_monkey_patched from tests import is_monkey_patched
from tests.integration import use_singledc, PROTOCOL_VERSION, get_node from tests.integration import use_singledc, PROTOCOL_VERSION, get_node, CASSANDRA_IP, local
try: try:
from cassandra.io.libevreactor import LibevConnection from cassandra.io.libevreactor import LibevConnection
@@ -49,7 +49,8 @@ class ConnectionTimeoutTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.defaultInFlight = Connection.max_in_flight self.defaultInFlight = Connection.max_in_flight
Connection.max_in_flight = 2 Connection.max_in_flight = 2
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1'])) self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, load_balancing_policy=
WhiteListRoundRobinPolicy([CASSANDRA_IP]))
self.session = self.cluster.connect() self.session = self.cluster.connect()
def tearDown(self): def tearDown(self):
@@ -104,6 +105,7 @@ class HeartbeatTest(unittest.TestCase):
def tearDown(self): def tearDown(self):
self.cluster.shutdown() self.cluster.shutdown()
@local
def test_heart_beat_timeout(self): def test_heart_beat_timeout(self):
# Setup a host listener to ensure the nodes don't go down # Setup a host listener to ensure the nodes don't go down
test_listener = TestHostListener() test_listener = TestHostListener()
@@ -190,7 +192,8 @@ class ConnectionTests(object):
e = None e = None
for i in range(5): for i in range(5):
try: try:
conn = self.klass.factory(host='127.0.0.1', timeout=timeout, protocol_version=PROTOCOL_VERSION) contact_point = CASSANDRA_IP
conn = self.klass.factory(host=contact_point, timeout=timeout, protocol_version=PROTOCOL_VERSION)
break break
except (OperationTimedOut, NoHostAvailable, ConnectionShutdown) as e: except (OperationTimedOut, NoHostAvailable, ConnectionShutdown) as e:
continue continue

View File

@@ -23,11 +23,14 @@ import six
from cassandra.query import (SimpleStatement, BatchStatement, BatchType) from cassandra.query import (SimpleStatement, BatchStatement, BatchType)
from cassandra.cluster import Cluster from cassandra.cluster import Cluster
from tests.integration import use_singledc, PROTOCOL_VERSION from tests.integration import use_singledc, PROTOCOL_VERSION, local
def setup_module(): def setup_module():
use_singledc() use_singledc()
#These test rely on the custom payload being returned but by default C*
#ignores all the payloads.
@local
class CustomPayloadTests(unittest.TestCase): class CustomPayloadTests(unittest.TestCase):
def setUp(self): def setUp(self):

View File

@@ -69,6 +69,7 @@ class CustomProtocolHandlerTest(unittest.TestCase):
cluster = Cluster(protocol_version=PROTOCOL_VERSION) cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect(keyspace="custserdes") session = cluster.connect(keyspace="custserdes")
session.row_factory = tuple_factory session.row_factory = tuple_factory
result = session.execute("SELECT schema_version FROM system.local") result = session.execute("SELECT schema_version FROM system.local")
uuid_type = result[0][0] uuid_type = result[0][0]
self.assertEqual(type(uuid_type), uuid.UUID) self.assertEqual(type(uuid_type), uuid.UUID)

View File

@@ -37,7 +37,7 @@ from tests.integration import (get_cluster, use_singledc, PROTOCOL_VERSION, get_
BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase, BasicSegregatedKeyspaceUnitTestCase, BasicSharedKeyspaceUnitTestCase,
BasicExistingKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster, CASSANDRA_VERSION, BasicExistingKeyspaceUnitTestCase, drop_keyspace_shutdown_cluster, CASSANDRA_VERSION,
BasicExistingSegregatedKeyspaceUnitTestCase, dseonly, DSE_VERSION, BasicExistingSegregatedKeyspaceUnitTestCase, dseonly, DSE_VERSION,
get_supported_protocol_versions, greaterthanorequalcass30, lessthancass30) get_supported_protocol_versions, greaterthanorequalcass30, lessthancass30, local)
from tests.integration import greaterthancass21 from tests.integration import greaterthancass21
@@ -48,13 +48,14 @@ def setup_module():
class HostMetatDataTests(BasicExistingKeyspaceUnitTestCase): class HostMetatDataTests(BasicExistingKeyspaceUnitTestCase):
@local
def test_broadcast_listen_address(self): def test_broadcast_listen_address(self):
""" """
Check to ensure that the broadcast and listen adresss is populated correctly Check to ensure that the broadcast and listen adresss is populated correctly
@since 3.3 @since 3.3
@jira_ticket PYTHON-332 @jira_ticket PYTHON-332
@expected_result They are populated for C*> 2.0.16, 2.1.6, 2.2.0 @expected_result They are populated for C*> 2.1.6, 2.2.0
@test_category metadata @test_category metadata
""" """
@@ -81,7 +82,7 @@ class HostMetatDataTests(BasicExistingKeyspaceUnitTestCase):
for host in self.cluster.metadata.all_hosts(): for host in self.cluster.metadata.all_hosts():
self.assertTrue(host.release_version.startswith(CASSANDRA_VERSION)) self.assertTrue(host.release_version.startswith(CASSANDRA_VERSION))
@local
class MetaDataRemovalTest(unittest.TestCase): class MetaDataRemovalTest(unittest.TestCase):
def setUp(self): def setUp(self):
@@ -1090,6 +1091,7 @@ CREATE TABLE export_udts.users (
ksname = 'AnInterestingKeyspace' ksname = 'AnInterestingKeyspace'
cfname = 'AnInterestingTable' cfname = 'AnInterestingTable'
session.execute("DROP KEYSPACE IF EXISTS {0}".format(ksname))
session.execute(""" session.execute("""
CREATE KEYSPACE "%s" CREATE KEYSPACE "%s"
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
@@ -1146,6 +1148,7 @@ CREATE TABLE export_udts.users (
self.assertRaises(AlreadyExists, session.execute, ddl % (ksname, cfname)) self.assertRaises(AlreadyExists, session.execute, ddl % (ksname, cfname))
cluster.shutdown() cluster.shutdown()
@local
def test_replicas(self): def test_replicas(self):
""" """
Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace
@@ -1184,6 +1187,7 @@ CREATE TABLE export_udts.users (
self.assertEqual(set(get_replicas('test1rf', token)), set([owners[(i + 1) % 3]])) self.assertEqual(set(get_replicas('test1rf', token)), set([owners[(i + 1) % 3]]))
cluster.shutdown() cluster.shutdown()
@local
def test_legacy_tables(self): def test_legacy_tables(self):
if CASS_SERVER_VERSION < (2, 1, 0): if CASS_SERVER_VERSION < (2, 1, 0):
@@ -1450,7 +1454,7 @@ class TokenMetadataTest(unittest.TestCase):
""" """
Test of TokenMap creation and other behavior. Test of TokenMap creation and other behavior.
""" """
@local
def test_token(self): def test_token(self):
expected_node_count = len(get_cluster().nodes) expected_node_count = len(get_cluster().nodes)
@@ -2117,8 +2121,6 @@ class BadMetaTest(unittest.TestCase):
cls.parser_class = get_schema_parser(connection, str(CASS_SERVER_VERSION[0]), timeout=20).__class__ cls.parser_class = get_schema_parser(connection, str(CASS_SERVER_VERSION[0]), timeout=20).__class__
cls.cluster.control_connection.reconnect = Mock() cls.cluster.control_connection.reconnect = Mock()
@classmethod @classmethod
def teardown_class(cls): def teardown_class(cls):
drop_keyspace_shutdown_cluster(cls.keyspace_name, cls.session, cls.cluster) drop_keyspace_shutdown_cluster(cls.keyspace_name, cls.session, cls.cluster)

View File

@@ -28,12 +28,12 @@ from cassandra.protocol import SyntaxException
from cassandra.cluster import Cluster, NoHostAvailable from cassandra.cluster import Cluster, NoHostAvailable
from tests.integration import get_cluster, get_node, use_singledc, PROTOCOL_VERSION, execute_until_pass from tests.integration import get_cluster, get_node, use_singledc, PROTOCOL_VERSION, execute_until_pass
from greplin import scales from greplin import scales
from tests.integration import BasicSharedKeyspaceUnitTestCaseWTable, BasicExistingKeyspaceUnitTestCase from tests.integration import BasicSharedKeyspaceUnitTestCaseWTable, BasicExistingKeyspaceUnitTestCase, local
def setup_module(): def setup_module():
use_singledc() use_singledc()
@local
class MetricsTests(unittest.TestCase): class MetricsTests(unittest.TestCase):
def setUp(self): def setUp(self):
@@ -178,7 +178,7 @@ class MetricsTests(unittest.TestCase):
class MetricsNamespaceTest(BasicSharedKeyspaceUnitTestCaseWTable): class MetricsNamespaceTest(BasicSharedKeyspaceUnitTestCaseWTable):
@local
def test_metrics_per_cluster(self): def test_metrics_per_cluster(self):
""" """
Test to validate that metrics can be scopped to invdividual clusters Test to validate that metrics can be scopped to invdividual clusters

View File

@@ -18,11 +18,13 @@ try:
import unittest2 as unittest import unittest2 as unittest
except ImportError: except ImportError:
import unittest # noqa import unittest # noqa
from cassandra import OperationTimedOut from cassandra import OperationTimedOut
from cassandra.cluster import ExecutionProfile from cassandra.cluster import ExecutionProfile
from cassandra.query import SimpleStatement from cassandra.query import SimpleStatement
from cassandra.policies import ConstantSpeculativeExecutionPolicy, RoundRobinPolicy from cassandra.policies import ConstantSpeculativeExecutionPolicy, RoundRobinPolicy
from tests.integration import BasicSharedKeyspaceUnitTestCase, greaterthancass21 from tests.integration import BasicSharedKeyspaceUnitTestCase, greaterthancass21
from tests import notwindows
def setup_module(): def setup_module():
@@ -52,6 +54,8 @@ class SpecExecTest(BasicSharedKeyspaceUnitTestCase):
self.cluster.add_execution_profile("spec_ep_rr", spec_ep_rr) self.cluster.add_execution_profile("spec_ep_rr", spec_ep_rr)
self.cluster.add_execution_profile("spec_ep_rr_lim", spec_ep_rr_lim) self.cluster.add_execution_profile("spec_ep_rr_lim", spec_ep_rr_lim)
#This doesn't work well with Windows clock granularity
@notwindows
@greaterthancass21 @greaterthancass21
def test_speculative_execution(self): def test_speculative_execution(self):
""" """

View File

@@ -48,7 +48,11 @@ class PreparedStatementTests(unittest.TestCase):
""" """
Test basic PreparedStatement usage Test basic PreparedStatement usage
""" """
self.session.execute(
"""
DROP KEYSPACE IF EXISTS preparedtests
"""
)
self.session.execute( self.session.execute(
""" """
CREATE KEYSPACE preparedtests CREATE KEYSPACE preparedtests

View File

@@ -25,7 +25,9 @@ from cassandra.query import (PreparedStatement, BoundStatement, SimpleStatement,
BatchStatement, BatchType, dict_factory, TraceUnavailable) BatchStatement, BatchType, dict_factory, TraceUnavailable)
from cassandra.cluster import Cluster, NoHostAvailable from cassandra.cluster import Cluster, NoHostAvailable
from cassandra.policies import HostDistance, RoundRobinPolicy from cassandra.policies import HostDistance, RoundRobinPolicy
from tests.integration import use_singledc, PROTOCOL_VERSION, BasicSharedKeyspaceUnitTestCase, get_server_versions, greaterthanprotocolv3, MockLoggingHandler, get_supported_protocol_versions from tests.integration import use_singledc, PROTOCOL_VERSION, BasicSharedKeyspaceUnitTestCase, get_server_versions, \
greaterthanprotocolv3, MockLoggingHandler, get_supported_protocol_versions, local
from tests import notwindows
import time import time
import re import re
@@ -115,6 +117,7 @@ class QueryTests(BasicSharedKeyspaceUnitTestCase):
for event in trace.events: for event in trace.events:
str(event) str(event)
@local
@greaterthanprotocolv3 @greaterthanprotocolv3
def test_client_ip_in_trace(self): def test_client_ip_in_trace(self):
""" """
@@ -179,6 +182,7 @@ class QueryTests(BasicSharedKeyspaceUnitTestCase):
self.assertIsNotNone(response_future.get_query_trace(max_wait=2.0, query_cl=ConsistencyLevel.ANY).trace_id) self.assertIsNotNone(response_future.get_query_trace(max_wait=2.0, query_cl=ConsistencyLevel.ANY).trace_id)
self.assertIsNotNone(response_future.get_query_trace(max_wait=2.0, query_cl=ConsistencyLevel.QUORUM).trace_id) self.assertIsNotNone(response_future.get_query_trace(max_wait=2.0, query_cl=ConsistencyLevel.QUORUM).trace_id)
@notwindows
def test_incomplete_query_trace(self): def test_incomplete_query_trace(self):
""" """
Tests to ensure that partial tracing works. Tests to ensure that partial tracing works.
@@ -430,9 +434,9 @@ class PreparedStatementMetdataTest(unittest.TestCase):
future = session.execute_async(select_statement) future = session.execute_async(select_statement)
results = future.result() results = future.result()
if base_line is None: if base_line is None:
base_line = results[0].__dict__.keys() base_line = results[0]._asdict().keys()
else: else:
self.assertEqual(base_line, results[0].__dict__.keys()) self.assertEqual(base_line, results[0]._asdict().keys())
cluster.shutdown() cluster.shutdown()

View File

@@ -24,6 +24,7 @@ import time
import threading import threading
from six.moves.queue import PriorityQueue from six.moves.queue import PriorityQueue
import sys import sys
import platform
from cassandra.cluster import Cluster, Session from cassandra.cluster import Cluster, Session
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
@@ -115,7 +116,6 @@ class TimedCallableInvoker(threading.Thread):
self._stopper.wait(.001) self._stopper.wait(.001)
return return
class ConcurrencyTest((unittest.TestCase)): class ConcurrencyTest((unittest.TestCase)):
def test_results_ordering_forward(self): def test_results_ordering_forward(self):
@@ -231,7 +231,12 @@ class ConcurrencyTest((unittest.TestCase)):
for success, result in results: for success, result in results:
self.assertTrue(success) self.assertTrue(success)
current_time_added = list(result)[0] current_time_added = list(result)[0]
self.assertLess(last_time_added, current_time_added)
#Windows clock granularity makes this equal most of the times
if "Windows" in platform.system():
self.assertLessEqual(last_time_added, current_time_added)
else:
self.assertLess(last_time_added, current_time_added)
last_time_added = current_time_added last_time_added = current_time_added
def test_recursion_limited(self): def test_recursion_limited(self):