diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index b40eb32b..79da6560 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -25,11 +25,12 @@ from uuid import uuid4 import logging import cassandra -from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT +from cassandra.cluster import Cluster, Session, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.concurrent import execute_concurrent from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy, RetryPolicy, SimpleConvictionPolicy, HostDistance, WhiteListRoundRobinPolicy, AddressTranslator) +from cassandra.pool import Host from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory @@ -177,6 +178,28 @@ class ClusterTests(unittest.TestCase): cluster.shutdown() + def test_session_host_parameter(self): + """ + Test for protocol negotiation + + Very that NoHostAvailable is risen in Session.__init__ when there are no valid connections and that + no error is arisen otherwise, despite maybe being some invalid hosts + + @since 3.9 + @jira_ticket PYTHON-665 + @expected_result NoHostAvailable when the driver is unable to connect to a valid host, + no exception otherwise + + @test_category connection + """ + with self.assertRaises(NoHostAvailable): + Session(Cluster(), []) + with self.assertRaises(NoHostAvailable): + Session(Cluster(), [Host("1.2.3.4", SimpleConvictionPolicy)]) + session = Session(Cluster(), [Host(x, SimpleConvictionPolicy) for x in + ("127.0.0.1", "127.0.0.2", "1.2.3.4")]) + session.shutdown() + def test_protocol_negotiation(self): """ Test for protocol negotiation diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index b3337f6c..2532d8c7 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -21,10 +21,12 @@ from mock import patch from cassandra import ConsistencyLevel, DriverException, Timeout, Unavailable, RequestExecutionException, ReadTimeout, WriteTimeout, CoordinationFailure, ReadFailure, WriteFailure, FunctionFailure, AlreadyExists,\ InvalidRequest, Unauthorized, AuthenticationFailed, OperationTimedOut, UnsupportedOperation, RequestValidationException, ConfigurationException from cassandra.cluster import _Scheduler, Session, Cluster, _NOT_SET, default_lbp_factory, \ - ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT -from cassandra.policies import HostDistance, RetryPolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy + ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT, NoHostAvailable +from cassandra.policies import HostDistance, RetryPolicy, RoundRobinPolicy, \ + DowngradingConsistencyRetryPolicy, SimpleConvictionPolicy from cassandra.query import SimpleStatement, named_tuple_factory, tuple_factory - +from cassandra.pool import Host +from tests.unit.utils import mock_session_pools class ExceptionTypeTest(unittest.TestCase): @@ -121,7 +123,7 @@ class SchedulerTest(unittest.TestCase): class SessionTest(unittest.TestCase): # TODO: this suite could be expanded; for now just adding a test covering a PR - + @mock_session_pools def test_default_serial_consistency_level(self, *_): """ Make sure default_serial_consistency_level passes through to a query message. @@ -129,7 +131,7 @@ class SessionTest(unittest.TestCase): PR #510 """ - s = Session(Cluster(protocol_version=4), []) + s = Session(Cluster(protocol_version=4), [Host("127.0.0.1", SimpleConvictionPolicy)]) # default is None self.assertIsNone(s.default_serial_consistency_level) @@ -159,21 +161,23 @@ class ExecutionProfileTest(unittest.TestCase): self.assertEqual(rf.timeout, prof.request_timeout) self.assertEqual(rf.row_factory, prof.row_factory) + @mock_session_pools def test_default_exec_parameters(self): cluster = Cluster() self.assertEqual(cluster._config_mode, _ConfigMode.UNCOMMITTED) self.assertEqual(cluster.load_balancing_policy.__class__, default_lbp_factory().__class__) self.assertEqual(cluster.default_retry_policy.__class__, RetryPolicy) - session = Session(cluster, hosts=[]) + session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)]) self.assertEqual(session.default_timeout, 10.0) self.assertEqual(session.default_consistency_level, ConsistencyLevel.LOCAL_ONE) self.assertEqual(session.default_serial_consistency_level, None) self.assertEqual(session.row_factory, named_tuple_factory) + @mock_session_pools def test_default_legacy(self): cluster = Cluster(load_balancing_policy=RoundRobinPolicy(), default_retry_policy=DowngradingConsistencyRetryPolicy()) self.assertEqual(cluster._config_mode, _ConfigMode.LEGACY) - session = Session(cluster, hosts=[]) + session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)]) session.default_timeout = 3.7 session.default_consistency_level = ConsistencyLevel.ALL session.default_serial_consistency_level = ConsistencyLevel.SERIAL @@ -183,10 +187,11 @@ class ExecutionProfileTest(unittest.TestCase): session.default_timeout, session.row_factory) self._verify_response_future_profile(rf, expected_profile) + @mock_session_pools def test_default_profile(self): non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) cluster = Cluster(execution_profiles={'non-default': non_default_profile}) - session = Session(cluster, hosts=[]) + session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)]) self.assertEqual(cluster._config_mode, _ConfigMode.PROFILES) @@ -197,10 +202,11 @@ class ExecutionProfileTest(unittest.TestCase): rf = session.execute_async("query", execution_profile='non-default') self._verify_response_future_profile(rf, non_default_profile) + @mock_session_pools def test_statement_params_override_legacy(self): cluster = Cluster(load_balancing_policy=RoundRobinPolicy(), default_retry_policy=DowngradingConsistencyRetryPolicy()) self.assertEqual(cluster._config_mode, _ConfigMode.LEGACY) - session = Session(cluster, hosts=[]) + session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)]) ss = SimpleStatement("query", retry_policy=DowngradingConsistencyRetryPolicy(), consistency_level=ConsistencyLevel.ALL, serial_consistency_level=ConsistencyLevel.SERIAL) @@ -217,10 +223,11 @@ class ExecutionProfileTest(unittest.TestCase): serial_consistency_level=ss._serial_consistency_level) self._verify_response_future_profile(rf, expected_profile) + @mock_session_pools def test_statement_params_override_profile(self): non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) cluster = Cluster(execution_profiles={'non-default': non_default_profile}) - session = Session(cluster, hosts=[]) + session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)]) self.assertEqual(cluster._config_mode, _ConfigMode.PROFILES) @@ -240,6 +247,7 @@ class ExecutionProfileTest(unittest.TestCase): ss.consistency_level, ss._serial_consistency_level, my_timeout, non_default_profile.row_factory) self._verify_response_future_profile(rf, expected_profile) + @mock_session_pools def test_no_profile_with_legacy(self): # don't construct with both self.assertRaises(ValueError, Cluster, load_balancing_policy=RoundRobinPolicy(), execution_profiles={'a': ExecutionProfile()}) @@ -253,7 +261,7 @@ class ExecutionProfileTest(unittest.TestCase): # session settings lock out profiles cluster = Cluster() - session = Session(cluster, hosts=[]) + session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)]) for attr, value in (('default_timeout', 1), ('default_consistency_level', ConsistencyLevel.ANY), ('default_serial_consistency_level', ConsistencyLevel.SERIAL), @@ -265,6 +273,7 @@ class ExecutionProfileTest(unittest.TestCase): # don't accept profile self.assertRaises(ValueError, session.execute_async, "query", execution_profile='some name here') + @mock_session_pools def test_no_legacy_with_profile(self): cluster_init = Cluster(execution_profiles={'name': ExecutionProfile()}) cluster_add = Cluster() @@ -275,18 +284,19 @@ class ExecutionProfileTest(unittest.TestCase): for attr, value in (('default_retry_policy', RetryPolicy()), ('load_balancing_policy', default_lbp_factory())): self.assertRaises(ValueError, setattr, cluster, attr, value) - session = Session(cluster, hosts=[]) + session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)]) for attr, value in (('default_timeout', 1), ('default_consistency_level', ConsistencyLevel.ANY), ('default_serial_consistency_level', ConsistencyLevel.SERIAL), ('row_factory', tuple_factory)): self.assertRaises(ValueError, setattr, session, attr, value) + @mock_session_pools def test_profile_name_value(self): internalized_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) cluster = Cluster(execution_profiles={'by-name': internalized_profile}) - session = Session(cluster, hosts=[]) + session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)]) self.assertEqual(cluster._config_mode, _ConfigMode.PROFILES) rf = session.execute_async("query", execution_profile='by-name') @@ -296,10 +306,11 @@ class ExecutionProfileTest(unittest.TestCase): rf = session.execute_async("query", execution_profile=by_value) self._verify_response_future_profile(rf, by_value) + @mock_session_pools def test_exec_profile_clone(self): cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(), 'one': ExecutionProfile()}) - session = Session(cluster, hosts=[]) + session = Session(cluster, hosts=[Host("127.0.0.1", SimpleConvictionPolicy)]) profile_attrs = {'request_timeout': 1, 'consistency_level': ConsistencyLevel.ANY, diff --git a/tests/unit/test_concurrent.py b/tests/unit/test_concurrent.py index b0b0f05a..ca7bd096 100644 --- a/tests/unit/test_concurrent.py +++ b/tests/unit/test_concurrent.py @@ -28,6 +28,9 @@ import platform from cassandra.cluster import Cluster, Session from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args +from cassandra.pool import Host +from cassandra.policies import SimpleConvictionPolicy +from tests.unit.utils import mock_session_pools class MockResponseResponseFuture(): @@ -239,6 +242,7 @@ class ConcurrencyTest((unittest.TestCase)): self.assertLess(last_time_added, current_time_added) last_time_added = current_time_added + @mock_session_pools def test_recursion_limited(self): """ Verify that recursion is controlled when raise_on_first_error=False and something is wrong with the query. @@ -246,7 +250,7 @@ class ConcurrencyTest((unittest.TestCase)): PYTHON-585 """ max_recursion = sys.getrecursionlimit() - s = Session(Cluster(), []) + s = Session(Cluster(), [Host("127.0.0.1", SimpleConvictionPolicy)]) self.assertRaises(TypeError, execute_concurrent_with_args, s, "doesn't matter", [('param',)] * max_recursion, raise_on_first_error=True) results = execute_concurrent_with_args(s, "doesn't matter", [('param',)] * max_recursion, raise_on_first_error=False) # previously diff --git a/tests/unit/utils.py b/tests/unit/utils.py new file mode 100644 index 00000000..b3ac1131 --- /dev/null +++ b/tests/unit/utils.py @@ -0,0 +1,18 @@ +from concurrent.futures import Future +from functools import wraps +from cassandra.cluster import Session +from mock import patch + +def mock_session_pools(f): + """ + Helper decorator that allows tests to initialize :class:.`Session` objects + without actually connecting to a Cassandra cluster. + """ + @wraps(f) + def wrapper(*args, **kwargs): + with patch.object(Session, "add_or_renew_pool") as mocked_add_or_renew_pool: + future = Future() + future.set_result(object()) + mocked_add_or_renew_pool.return_value = future + f(*args, **kwargs) + return wrapper