PYTHON-614 PYTHON-619 Adding tests. Enabling protocol v5 detection and support to test harness.
This commit is contained in:
@@ -12,3 +12,4 @@ twisted
|
|||||||
gevent>=1.0
|
gevent>=1.0
|
||||||
eventlet
|
eventlet
|
||||||
cython>=0.21
|
cython>=0.21
|
||||||
|
packaging
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ try:
|
|||||||
import unittest2 as unittest
|
import unittest2 as unittest
|
||||||
except ImportError:
|
except ImportError:
|
||||||
import unittest # noqa
|
import unittest # noqa
|
||||||
|
from packaging.version import Version
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
@@ -140,11 +140,11 @@ if DSE_VERSION:
|
|||||||
|
|
||||||
def get_default_protocol():
|
def get_default_protocol():
|
||||||
|
|
||||||
if CASSANDRA_VERSION >= '2.2':
|
if Version(CASSANDRA_VERSION) >= Version('2.2'):
|
||||||
return 4
|
return 4
|
||||||
elif CASSANDRA_VERSION >= '2.1':
|
elif Version(CASSANDRA_VERSION) >= Version('2.1'):
|
||||||
return 3
|
return 3
|
||||||
elif CASSANDRA_VERSION >= '2.0':
|
elif Version(CASSANDRA_VERSION) >= Version('2.0'):
|
||||||
return 2
|
return 2
|
||||||
else:
|
else:
|
||||||
return 1
|
return 1
|
||||||
@@ -157,14 +157,17 @@ def get_supported_protocol_versions():
|
|||||||
2.1 -> 3, 2, 1
|
2.1 -> 3, 2, 1
|
||||||
2.2 -> 4, 3, 2, 1
|
2.2 -> 4, 3, 2, 1
|
||||||
3.X -> 4, 3
|
3.X -> 4, 3
|
||||||
|
3.10 -> 5(beta),4,3
|
||||||
` """
|
` """
|
||||||
if CASSANDRA_VERSION >= '3.0':
|
if Version(CASSANDRA_VERSION) >= Version('3.10'):
|
||||||
|
return (3, 4, 5)
|
||||||
|
elif Version(CASSANDRA_VERSION) >= Version('3.0'):
|
||||||
return (3, 4)
|
return (3, 4)
|
||||||
elif CASSANDRA_VERSION >= '2.2':
|
elif Version(CASSANDRA_VERSION) >= Version('2.2'):
|
||||||
return (1, 2, 3, 4)
|
return (1, 2, 3, 4)
|
||||||
elif CASSANDRA_VERSION >= '2.1':
|
elif Version(CASSANDRA_VERSION) >= Version('2.1'):
|
||||||
return (1, 2, 3)
|
return (1, 2, 3)
|
||||||
elif CASSANDRA_VERSION >= '2.0':
|
elif Version(CASSANDRA_VERSION) >= Version('2.0'):
|
||||||
return (1, 2)
|
return (1, 2)
|
||||||
else:
|
else:
|
||||||
return (1)
|
return (1)
|
||||||
@@ -176,7 +179,7 @@ def get_unsupported_lower_protocol():
|
|||||||
supported by the version of C* running
|
supported by the version of C* running
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if CASSANDRA_VERSION >= '3.0':
|
if Version(CASSANDRA_VERSION) >= Version('3.0'):
|
||||||
return 2
|
return 2
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
@@ -188,11 +191,11 @@ def get_unsupported_upper_protocol():
|
|||||||
supported by the version of C* running
|
supported by the version of C* running
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if CASSANDRA_VERSION >= '2.2':
|
if Version(CASSANDRA_VERSION) >= Version('2.2'):
|
||||||
return None
|
return None
|
||||||
if CASSANDRA_VERSION >= '2.1':
|
if Version(CASSANDRA_VERSION) >= Version('2.1'):
|
||||||
return 4
|
return 4
|
||||||
elif CASSANDRA_VERSION >= '2.0':
|
elif Version(CASSANDRA_VERSION) >= Version('2.0'):
|
||||||
return 3
|
return 3
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
@@ -205,6 +208,7 @@ PROTOCOL_VERSION = int(os.getenv('PROTOCOL_VERSION', default_protocol_version))
|
|||||||
notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported')
|
notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported')
|
||||||
lessthenprotocolv4 = unittest.skipUnless(PROTOCOL_VERSION < 4, 'Protocol versions 4 or greater not supported')
|
lessthenprotocolv4 = unittest.skipUnless(PROTOCOL_VERSION < 4, 'Protocol versions 4 or greater not supported')
|
||||||
greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported')
|
greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported')
|
||||||
|
protocolv5 = unittest.skipUnless(5 in get_supported_protocol_versions(), 'Protocol versions less than 5 are not supported')
|
||||||
|
|
||||||
greaterthancass20 = unittest.skipUnless(CASSANDRA_VERSION >= '2.1', 'Cassandra version 2.1 or greater required')
|
greaterthancass20 = unittest.skipUnless(CASSANDRA_VERSION >= '2.1', 'Cassandra version 2.1 or greater required')
|
||||||
greaterthancass21 = unittest.skipUnless(CASSANDRA_VERSION >= '2.2', 'Cassandra version 2.2 or greater required')
|
greaterthancass21 = unittest.skipUnless(CASSANDRA_VERSION >= '2.2', 'Cassandra version 2.2 or greater required')
|
||||||
|
|||||||
@@ -16,7 +16,8 @@ import sys,logging, traceback, time
|
|||||||
|
|
||||||
from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,\
|
from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,\
|
||||||
FunctionFailure
|
FunctionFailure
|
||||||
from cassandra.cluster import Cluster
|
from cassandra.protocol import MAX_SUPPORTED_VERSION
|
||||||
|
from cassandra.cluster import Cluster, NoHostAvailable
|
||||||
from cassandra.concurrent import execute_concurrent_with_args
|
from cassandra.concurrent import execute_concurrent_with_args
|
||||||
from cassandra.query import SimpleStatement
|
from cassandra.query import SimpleStatement
|
||||||
from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node
|
from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node
|
||||||
@@ -63,14 +64,20 @@ class ClientExceptionTests(unittest.TestCase):
|
|||||||
"""
|
"""
|
||||||
Test is skipped if run with native protocol version <4
|
Test is skipped if run with native protocol version <4
|
||||||
"""
|
"""
|
||||||
|
self.support_v5 = True
|
||||||
if PROTOCOL_VERSION < 4:
|
if PROTOCOL_VERSION < 4:
|
||||||
raise unittest.SkipTest(
|
raise unittest.SkipTest(
|
||||||
"Native protocol 4,0+ is required for custom payloads, currently using %r"
|
"Native protocol 4,0+ is required for custom payloads, currently using %r"
|
||||||
% (PROTOCOL_VERSION,))
|
% (PROTOCOL_VERSION,))
|
||||||
|
try:
|
||||||
|
self.cluster = Cluster(protocol_version=MAX_SUPPORTED_VERSION, allow_beta_protocol_version=True)
|
||||||
|
self.session = self.cluster.connect()
|
||||||
|
except NoHostAvailable:
|
||||||
|
log.info("Protocol Version 5 not supported,")
|
||||||
|
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
||||||
|
self.session = self.cluster.connect()
|
||||||
|
self.support_v5 = False
|
||||||
|
|
||||||
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
|
|
||||||
self.session = self.cluster.connect()
|
|
||||||
self.nodes_currently_failing = []
|
self.nodes_currently_failing = []
|
||||||
self.node1, self.node2, self.node3 = get_cluster().nodes.values()
|
self.node1, self.node2, self.node3 = get_cluster().nodes.values()
|
||||||
|
|
||||||
@@ -132,21 +139,28 @@ class ClientExceptionTests(unittest.TestCase):
|
|||||||
node.start(wait_for_binary_proto=True, wait_other_notice=True)
|
node.start(wait_for_binary_proto=True, wait_other_notice=True)
|
||||||
self.nodes_currently_failing.remove(node)
|
self.nodes_currently_failing.remove(node)
|
||||||
|
|
||||||
def _perform_cql_statement(self, text, consistency_level, expected_exception):
|
def _perform_cql_statement(self, text, consistency_level, expected_exception, session=None):
|
||||||
"""
|
"""
|
||||||
Simple helper method to preform cql statements and check for expected exception
|
Simple helper method to preform cql statements and check for expected exception
|
||||||
@param text CQl statement to execute
|
@param text CQl statement to execute
|
||||||
@param consistency_level Consistency level at which it is to be executed
|
@param consistency_level Consistency level at which it is to be executed
|
||||||
@param expected_exception Exception expected to be throw or none
|
@param expected_exception Exception expected to be throw or none
|
||||||
"""
|
"""
|
||||||
|
if session is None:
|
||||||
|
session = self.session
|
||||||
statement = SimpleStatement(text)
|
statement = SimpleStatement(text)
|
||||||
statement.consistency_level = consistency_level
|
statement.consistency_level = consistency_level
|
||||||
|
|
||||||
if expected_exception is None:
|
if expected_exception is None:
|
||||||
self.execute_helper(self.session, statement)
|
self.execute_helper(session, statement)
|
||||||
else:
|
else:
|
||||||
with self.assertRaises(expected_exception):
|
with self.assertRaises(expected_exception) as cm:
|
||||||
self.execute_helper(self.session, statement)
|
self.execute_helper(session, statement)
|
||||||
|
if self.support_v5 and (isinstance(cm.exception, WriteFailure) or isinstance(cm.exception, ReadFailure)):
|
||||||
|
if isinstance(cm.exception, ReadFailure):
|
||||||
|
self.assertEqual(cm.exception.error_code_map.values()[0], 1)
|
||||||
|
else:
|
||||||
|
self.assertEqual(cm.exception.error_code_map.values()[0], 0)
|
||||||
|
|
||||||
def test_write_failures_from_coordinator(self):
|
def test_write_failures_from_coordinator(self):
|
||||||
"""
|
"""
|
||||||
@@ -157,8 +171,8 @@ class ClientExceptionTests(unittest.TestCase):
|
|||||||
factor of the keyspace, and the consistency level, we will expect the coordinator to send WriteFailure, or not.
|
factor of the keyspace, and the consistency level, we will expect the coordinator to send WriteFailure, or not.
|
||||||
|
|
||||||
|
|
||||||
@since 2.6.0
|
@since 2.6.0, 3.7.0
|
||||||
@jira_ticket PYTHON-238
|
@jira_ticket PYTHON-238, PYTHON-619
|
||||||
@expected_result Appropriate write failures from the coordinator
|
@expected_result Appropriate write failures from the coordinator
|
||||||
|
|
||||||
@test_category queries:basic
|
@test_category queries:basic
|
||||||
@@ -217,8 +231,8 @@ class ClientExceptionTests(unittest.TestCase):
|
|||||||
from the coordinator.
|
from the coordinator.
|
||||||
|
|
||||||
|
|
||||||
@since 2.6.0
|
@since 2.6.0, 3.7.0
|
||||||
@jira_ticket PYTHON-238
|
@jira_ticket PYTHON-238, PYTHON-619
|
||||||
@expected_result Appropriate write failures from the coordinator
|
@expected_result Appropriate write failures from the coordinator
|
||||||
|
|
||||||
@test_category queries:basic
|
@test_category queries:basic
|
||||||
@@ -379,11 +393,3 @@ class TimeoutTimerTest(unittest.TestCase):
|
|||||||
self.assertAlmostEqual(expected_time, total_time, delta=.05)
|
self.assertAlmostEqual(expected_time, total_time, delta=.05)
|
||||||
self.assertTrue(mock_errorback.called)
|
self.assertTrue(mock_errorback.called)
|
||||||
self.assertFalse(mock_callback.called)
|
self.assertFalse(mock_callback.called)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -33,8 +33,8 @@ from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy,
|
|||||||
from cassandra.protocol import MAX_SUPPORTED_VERSION
|
from cassandra.protocol import MAX_SUPPORTED_VERSION
|
||||||
from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory
|
from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory
|
||||||
|
|
||||||
from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, get_node, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry, get_node,\
|
from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry, get_node,\
|
||||||
MockLoggingHandler, get_unsupported_lower_protocol, get_unsupported_upper_protocol
|
MockLoggingHandler, get_unsupported_lower_protocol, get_unsupported_upper_protocol, protocolv5
|
||||||
from tests.integration.util import assert_quiescent_pool_state
|
from tests.integration.util import assert_quiescent_pool_state
|
||||||
|
|
||||||
|
|
||||||
@@ -460,7 +460,7 @@ class ClusterTests(unittest.TestCase):
|
|||||||
end_time = time.time()
|
end_time = time.time()
|
||||||
self.assertGreaterEqual(end_time - start_time, agreement_timeout)
|
self.assertGreaterEqual(end_time - start_time, agreement_timeout)
|
||||||
self.assertIs(original_meta, c.metadata.keyspaces)
|
self.assertIs(original_meta, c.metadata.keyspaces)
|
||||||
|
|
||||||
# refresh wait overrides cluster value
|
# refresh wait overrides cluster value
|
||||||
original_meta = c.metadata.keyspaces
|
original_meta = c.metadata.keyspaces
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
@@ -489,7 +489,7 @@ class ClusterTests(unittest.TestCase):
|
|||||||
self.assertLess(end_time - start_time, refresh_threshold)
|
self.assertLess(end_time - start_time, refresh_threshold)
|
||||||
self.assertIsNot(original_meta, c.metadata.keyspaces)
|
self.assertIsNot(original_meta, c.metadata.keyspaces)
|
||||||
self.assertEqual(original_meta, c.metadata.keyspaces)
|
self.assertEqual(original_meta, c.metadata.keyspaces)
|
||||||
|
|
||||||
# refresh wait overrides cluster value
|
# refresh wait overrides cluster value
|
||||||
original_meta = c.metadata.keyspaces
|
original_meta = c.metadata.keyspaces
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
@@ -587,7 +587,7 @@ class ClusterTests(unittest.TestCase):
|
|||||||
cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
|
cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
|
||||||
session = cluster.connect(wait_for_all_pools=True)
|
session = cluster.connect(wait_for_all_pools=True)
|
||||||
|
|
||||||
# This test relies on impl details of connection req id management to see if heartbeats
|
# This test relies on impl details of connection req id management to see if heartbeats
|
||||||
# are being sent. May need update if impl is changed
|
# are being sent. May need update if impl is changed
|
||||||
connection_request_ids = {}
|
connection_request_ids = {}
|
||||||
for h in cluster.get_connection_holders():
|
for h in cluster.get_connection_holders():
|
||||||
@@ -763,7 +763,7 @@ class ClusterTests(unittest.TestCase):
|
|||||||
expected_hosts = set(cluster.metadata.all_hosts())
|
expected_hosts = set(cluster.metadata.all_hosts())
|
||||||
rr1_queried_hosts = set()
|
rr1_queried_hosts = set()
|
||||||
rr2_queried_hosts = set()
|
rr2_queried_hosts = set()
|
||||||
|
|
||||||
rs = session.execute(query, execution_profile='rr1')
|
rs = session.execute(query, execution_profile='rr1')
|
||||||
rr1_queried_hosts.add(rs.response_future._current_host)
|
rr1_queried_hosts.add(rs.response_future._current_host)
|
||||||
rs = session.execute(query, execution_profile='rr2')
|
rs = session.execute(query, execution_profile='rr2')
|
||||||
@@ -1054,3 +1054,48 @@ class DuplicateRpcTest(unittest.TestCase):
|
|||||||
self.assertEqual(len(warnings), 1)
|
self.assertEqual(len(warnings), 1)
|
||||||
self.assertTrue('multiple' in warnings[0])
|
self.assertTrue('multiple' in warnings[0])
|
||||||
logger.removeHandler(mock_handler)
|
logger.removeHandler(mock_handler)
|
||||||
|
|
||||||
|
|
||||||
|
@protocolv5
|
||||||
|
class BetaProtocolTest(unittest.TestCase):
|
||||||
|
|
||||||
|
@protocolv5
|
||||||
|
def test_invalid_protocol_version_beta_option(self):
|
||||||
|
"""
|
||||||
|
Test cluster connection with protocol v5 and beta flag not set
|
||||||
|
|
||||||
|
@since 3.7.0
|
||||||
|
@jira_ticket PYTHON-614
|
||||||
|
@expected_result client shouldn't connect with V5 and no beta flag set
|
||||||
|
|
||||||
|
@test_category connection
|
||||||
|
"""
|
||||||
|
|
||||||
|
cluster = Cluster(protocol_version=MAX_SUPPORTED_VERSION, allow_beta_protocol_version=False)
|
||||||
|
try:
|
||||||
|
with self.assertRaises(NoHostAvailable):
|
||||||
|
cluster.connect()
|
||||||
|
except Exception as e:
|
||||||
|
self.fail("Unexpected error encountered {0}".format(e.message))
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
@protocolv5
|
||||||
|
def test_valid_protocol_version_beta_options_connect(self):
|
||||||
|
"""
|
||||||
|
Test cluster connection with protocol version 5 and beta flag set
|
||||||
|
|
||||||
|
@since 3.7.0
|
||||||
|
@jira_ticket PYTHON-614
|
||||||
|
@expected_result client should connect with protocol v5 and beta flag set.
|
||||||
|
|
||||||
|
@test_category connection
|
||||||
|
"""
|
||||||
|
cluster = Cluster(protocol_version=MAX_SUPPORTED_VERSION, allow_beta_protocol_version=True)
|
||||||
|
session = cluster.connect()
|
||||||
|
self.assertEqual(cluster.protocol_version, MAX_SUPPORTED_VERSION)
|
||||||
|
self.assertTrue(session.execute("select release_version from system.local")[0])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user