Merged branch PYTHON-759 tests into master

This commit is contained in:
bjmb
2017-07-19 12:32:22 -04:00
5 changed files with 63 additions and 32 deletions

View File

@@ -504,19 +504,15 @@ class HostFilterPolicy(LoadBalancingPolicy):
self._predicate = predicate self._predicate = predicate
def on_up(self, host, *args, **kwargs): def on_up(self, host, *args, **kwargs):
if self.predicate(host):
return self._child_policy.on_up(host, *args, **kwargs) return self._child_policy.on_up(host, *args, **kwargs)
def on_down(self, host, *args, **kwargs): def on_down(self, host, *args, **kwargs):
if self.predicate(host):
return self._child_policy.on_down(host, *args, **kwargs) return self._child_policy.on_down(host, *args, **kwargs)
def on_add(self, host, *args, **kwargs): def on_add(self, host, *args, **kwargs):
if self.predicate(host):
return self._child_policy.on_add(host, *args, **kwargs) return self._child_policy.on_add(host, *args, **kwargs)
def on_remove(self, host, *args, **kwargs): def on_remove(self, host, *args, **kwargs):
if self.predicate(host):
return self._child_policy.on_remove(host, *args, **kwargs) return self._child_policy.on_remove(host, *args, **kwargs)
@property @property
@@ -545,10 +541,7 @@ class HostFilterPolicy(LoadBalancingPolicy):
return HostDistance.IGNORED return HostDistance.IGNORED
def populate(self, cluster, hosts): def populate(self, cluster, hosts):
self._child_policy.populate( self._child_policy.populate(cluster=cluster, hosts=hosts)
cluster=cluster,
hosts=[h for h in hosts if self.predicate(h)]
)
def make_query_plan(self, working_keyspace=None, query=None): def make_query_plan(self, working_keyspace=None, query=None):
""" """

View File

@@ -17,7 +17,7 @@ import sys,logging, traceback, time, re
from cassandra import (ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure, from cassandra import (ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,
FunctionFailure, ProtocolVersion) FunctionFailure, ProtocolVersion)
from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import WhiteListRoundRobinPolicy from cassandra.policies import HostFilterPolicy, RoundRobinPolicy
from cassandra.concurrent import execute_concurrent_with_args from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import SimpleStatement from cassandra.query import SimpleStatement
from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node
@@ -327,7 +327,9 @@ class TimeoutTimerTest(unittest.TestCase):
# self.node1, self.node2, self.node3 = get_cluster().nodes.values() # self.node1, self.node2, self.node3 = get_cluster().nodes.values()
node1 = ExecutionProfile( node1 = ExecutionProfile(
load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1']) load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address == "127.0.0.1"
)
) )
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, execution_profiles={EXEC_PROFILE_DEFAULT: node1}) self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, execution_profiles={EXEC_PROFILE_DEFAULT: node1})
self.session = self.cluster.connect(wait_for_all_pools=True) self.session = self.cluster.connect(wait_for_all_pools=True)

View File

@@ -29,7 +29,8 @@ from cassandra.cluster import Cluster, Session, NoHostAvailable, ExecutionProfil
from cassandra.concurrent import execute_concurrent from cassandra.concurrent import execute_concurrent
from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy, from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy,
RetryPolicy, SimpleConvictionPolicy, HostDistance, RetryPolicy, SimpleConvictionPolicy, HostDistance,
WhiteListRoundRobinPolicy, AddressTranslator, TokenAwarePolicy, HostFilterPolicy) AddressTranslator, TokenAwarePolicy, HostFilterPolicy)
from cassandra.pool import Host from cassandra.pool import Host
from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory
@@ -478,7 +479,10 @@ class ClusterTests(unittest.TestCase):
def test_refresh_schema_no_wait(self): def test_refresh_schema_no_wait(self):
contact_points = [CASSANDRA_IP] contact_points = [CASSANDRA_IP]
cluster = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=10, cluster = Cluster(protocol_version=PROTOCOL_VERSION, max_schema_agreement_wait=10,
contact_points=contact_points, load_balancing_policy=WhiteListRoundRobinPolicy(contact_points)) contact_points=contact_points,
load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address == CASSANDRA_IP
))
session = cluster.connect() session = cluster.connect()
schema_ver = session.execute("SELECT schema_version FROM system.local WHERE key='local'")[0][0] schema_ver = session.execute("SELECT schema_version FROM system.local WHERE key='local'")[0][0]
@@ -619,7 +623,7 @@ class ClusterTests(unittest.TestCase):
try: try:
result = future.get_query_trace(-1.0) result = future.get_query_trace(-1.0)
# In case the result has time to come back before this timeout due to a race condition # In case the result has time to come back before this timeout due to a race condition
check_trace(result) self.check_trace(result)
except TraceUnavailable: except TraceUnavailable:
break break
else: else:
@@ -631,7 +635,7 @@ class ClusterTests(unittest.TestCase):
try: try:
result = future.get_query_trace(max_wait=120) result = future.get_query_trace(max_wait=120)
# In case the result has been set check the trace # In case the result has been set check the trace
check_trace(result) self.check_trace(result)
except TraceUnavailable: except TraceUnavailable:
break break
else: else:
@@ -775,7 +779,11 @@ class ClusterTests(unittest.TestCase):
@test_category config_profiles @test_category config_profiles
""" """
query = "select release_version from system.local" query = "select release_version from system.local"
node1 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([CASSANDRA_IP])) node1 = ExecutionProfile(
load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address == CASSANDRA_IP
)
)
with Cluster(execution_profiles={'node1': node1}) as cluster: with Cluster(execution_profiles={'node1': node1}) as cluster:
session = cluster.connect(wait_for_all_pools=True) session = cluster.connect(wait_for_all_pools=True)
@@ -926,8 +934,16 @@ class ClusterTests(unittest.TestCase):
@test_category config_profiles @test_category config_profiles
""" """
node1 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1'])) node1 = ExecutionProfile(
node2 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.2'])) load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address == "127.0.0.1"
)
)
node2 = ExecutionProfile(
load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address == "127.0.0.2"
)
)
with Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1, 'node2': node2}) as cluster: with Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1, 'node2': node2}) as cluster:
session = cluster.connect(wait_for_all_pools=True) session = cluster.connect(wait_for_all_pools=True)
pools = session.get_pool_state() pools = session.get_pool_state()
@@ -936,7 +952,11 @@ class ClusterTests(unittest.TestCase):
self.assertEqual(set(h.address for h in pools), set(('127.0.0.1', '127.0.0.2'))) self.assertEqual(set(h.address for h in pools), set(('127.0.0.1', '127.0.0.2')))
# dynamically update pools on add # dynamically update pools on add
node3 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.3'])) node3 = ExecutionProfile(
load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address == "127.0.0.3"
)
)
cluster.add_execution_profile('node3', node3) cluster.add_execution_profile('node3', node3)
pools = session.get_pool_state() pools = session.get_pool_state()
self.assertEqual(set(h.address for h in pools), set(('127.0.0.1', '127.0.0.2', '127.0.0.3'))) self.assertEqual(set(h.address for h in pools), set(('127.0.0.1', '127.0.0.2', '127.0.0.3')))
@@ -954,14 +974,22 @@ class ClusterTests(unittest.TestCase):
""" """
max_retry_count = 10 max_retry_count = 10
for i in range(max_retry_count): for i in range(max_retry_count):
node1 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1'])) node1 = ExecutionProfile(
load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address == "127.0.0.1"
)
)
with Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1}) as cluster: with Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1}) as cluster:
session = cluster.connect(wait_for_all_pools=True) session = cluster.connect(wait_for_all_pools=True)
pools = session.get_pool_state() pools = session.get_pool_state()
self.assertGreater(len(cluster.metadata.all_hosts()), 2) self.assertGreater(len(cluster.metadata.all_hosts()), 2)
self.assertEqual(set(h.address for h in pools), set(('127.0.0.1',))) self.assertEqual(set(h.address for h in pools), set(('127.0.0.1',)))
node2 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.2', '127.0.0.3'])) node2 = ExecutionProfile(
load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address in ["127.0.0.2", "127.0.0.3"]
)
)
start = time.time() start = time.time()
try: try:
@@ -1089,7 +1117,9 @@ class TestAddressTranslation(unittest.TestCase):
@local @local
class ContextManagementTest(unittest.TestCase): class ContextManagementTest(unittest.TestCase):
load_balancing_policy = WhiteListRoundRobinPolicy([CASSANDRA_IP]) load_balancing_policy = HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address == CASSANDRA_IP
)
cluster_kwargs = {'execution_profiles': {EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy= cluster_kwargs = {'execution_profiles': {EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=
load_balancing_policy)}, load_balancing_policy)},
'schema_metadata_enabled': False, 'schema_metadata_enabled': False,
@@ -1209,7 +1239,6 @@ class HostStateTest(unittest.TestCase):
@local @local
class DontPrepareOnIgnoredHostsTest(unittest.TestCase): class DontPrepareOnIgnoredHostsTest(unittest.TestCase):
ignored_addresses = ['127.0.0.3'] ignored_addresses = ['127.0.0.3']
ignore_node_3_policy = IgnoredHostPolicy(ignored_addresses) ignore_node_3_policy = IgnoredHostPolicy(ignored_addresses)
@@ -1248,7 +1277,8 @@ class DontPrepareOnIgnoredHostsTest(unittest.TestCase):
@local @local
class DuplicateRpcTest(unittest.TestCase): class DuplicateRpcTest(unittest.TestCase):
load_balancing_policy = WhiteListRoundRobinPolicy(['127.0.0.1']) load_balancing_policy = HostFilterPolicy(RoundRobinPolicy(),
lambda host: host.address == "127.0.0.1")
def setUp(self): def setUp(self):
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, load_balancing_policy=self.load_balancing_policy) self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, load_balancing_policy=self.load_balancing_policy)

View File

@@ -29,7 +29,7 @@ from cassandra.cluster import NoHostAvailable, ConnectionShutdown, Cluster
from cassandra.io.asyncorereactor import AsyncoreConnection from cassandra.io.asyncorereactor import AsyncoreConnection
from cassandra.protocol import QueryMessage from cassandra.protocol import QueryMessage
from cassandra.connection import Connection from cassandra.connection import Connection
from cassandra.policies import WhiteListRoundRobinPolicy, HostStateListener from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, HostStateListener
from cassandra.pool import HostConnectionPool from cassandra.pool import HostConnectionPool
from tests import is_monkey_patched, notwindows from tests import is_monkey_patched, notwindows
@@ -50,8 +50,12 @@ class ConnectionTimeoutTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.defaultInFlight = Connection.max_in_flight self.defaultInFlight = Connection.max_in_flight
Connection.max_in_flight = 2 Connection.max_in_flight = 2
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION, load_balancing_policy= self.cluster = Cluster(
WhiteListRoundRobinPolicy([CASSANDRA_IP])) protocol_version=PROTOCOL_VERSION,
load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), predicate=lambda host: host.address == CASSANDRA_IP
)
)
self.session = self.cluster.connect() self.session = self.cluster.connect()
def tearDown(self): def tearDown(self):

View File

@@ -14,7 +14,7 @@
import time import time
from cassandra.policies import WhiteListRoundRobinPolicy, FallthroughRetryPolicy from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, FallthroughRetryPolicy
try: try:
import unittest2 as unittest import unittest2 as unittest
@@ -39,7 +39,9 @@ class MetricsTests(unittest.TestCase):
def setUp(self): def setUp(self):
contact_point = ['127.0.0.2'] contact_point = ['127.0.0.2']
self.cluster = Cluster(contact_points=contact_point, metrics_enabled=True, protocol_version=PROTOCOL_VERSION, self.cluster = Cluster(contact_points=contact_point, metrics_enabled=True, protocol_version=PROTOCOL_VERSION,
load_balancing_policy=WhiteListRoundRobinPolicy(contact_point), load_balancing_policy=HostFilterPolicy(
RoundRobinPolicy(), lambda host: host.address in contact_point
),
default_retry_policy=FallthroughRetryPolicy()) default_retry_policy=FallthroughRetryPolicy())
self.session = self.cluster.connect("test3rf", wait_for_all_pools=True) self.session = self.cluster.connect("test3rf", wait_for_all_pools=True)