fix lbp tests following host state management change 01a3825
also fix logging in util and remove unused parameter
This commit is contained in:
@@ -131,7 +131,7 @@ class ConsistencyTests(unittest.TestCase):
|
||||
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 1)
|
||||
wait_for_up(cluster, 2)
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_factor=rf)
|
||||
@@ -183,7 +183,7 @@ class ConsistencyTests(unittest.TestCase):
|
||||
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 1)
|
||||
wait_for_up(cluster, 2)
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_factor=3)
|
||||
|
||||
@@ -43,11 +43,40 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
remove_cluster() # clear ahead of test so it doesn't use one left in unknown state
|
||||
self.coordinator_stats = CoordinatorStats()
|
||||
self.prepared = None
|
||||
self.probe_cluster = None
|
||||
|
||||
def tearDown(self):
|
||||
if self.probe_cluster:
|
||||
self.probe_cluster.shutdown()
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
remove_cluster()
|
||||
|
||||
def _connect_probe_cluster(self):
|
||||
if not self.probe_cluster:
|
||||
# distinct cluster so we can see the status of nodes ignored by the LBP being tested
|
||||
self.probe_cluster = Cluster(load_balancing_policy=RoundRobinPolicy(),
|
||||
schema_metadata_enabled=False, token_metadata_enabled=False)
|
||||
self.probe_session = self.probe_cluster.connect()
|
||||
|
||||
def _wait_for_nodes_up(self, nodes):
|
||||
self._connect_probe_cluster()
|
||||
for n in nodes:
|
||||
wait_for_up(self.probe_cluster, n)
|
||||
|
||||
def _wait_for_nodes_down(self, nodes):
|
||||
self._connect_probe_cluster()
|
||||
for n in nodes:
|
||||
wait_for_down(self.probe_cluster, n)
|
||||
|
||||
def _cluster_session_with_lbp(self, lbp):
|
||||
# create a cluster with no delay on events
|
||||
cluster = Cluster(load_balancing_policy=lbp, protocol_version=PROTOCOL_VERSION,
|
||||
topology_event_refresh_window=0, status_event_refresh_window=0)
|
||||
session = cluster.connect()
|
||||
return cluster, session
|
||||
|
||||
def _insert(self, session, keyspace, count=12,
|
||||
consistency_level=ConsistencyLevel.ONE):
|
||||
session.execute('USE %s' % keyspace)
|
||||
@@ -132,13 +161,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
def test_roundrobin(self):
|
||||
use_singledc()
|
||||
keyspace = 'test_roundrobin'
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=RoundRobinPolicy(),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3)
|
||||
cluster, session = self._cluster_session_with_lbp(RoundRobinPolicy())
|
||||
self._wait_for_nodes_up(range(1, 4))
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_factor=3)
|
||||
self._insert(session, keyspace)
|
||||
@@ -149,7 +173,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
self.coordinator_stats.assert_query_count_equals(self, 3, 4)
|
||||
|
||||
force_stop(3)
|
||||
wait_for_down(cluster, 3)
|
||||
self._wait_for_nodes_down([3])
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
self._query(session, keyspace)
|
||||
@@ -160,8 +184,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
decommission(1)
|
||||
start(3)
|
||||
wait_for_down(cluster, 1)
|
||||
wait_for_up(cluster, 3)
|
||||
self._wait_for_nodes_down([1])
|
||||
self._wait_for_nodes_up([3])
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
self._query(session, keyspace)
|
||||
@@ -169,18 +193,13 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
|
||||
self.coordinator_stats.assert_query_count_equals(self, 2, 6)
|
||||
self.coordinator_stats.assert_query_count_equals(self, 3, 6)
|
||||
cluster.shutdown()
|
||||
|
||||
def test_roundrobin_two_dcs(self):
|
||||
use_multidc([2, 2])
|
||||
keyspace = 'test_roundrobin_two_dcs'
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=RoundRobinPolicy(),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3, wait=False)
|
||||
wait_for_up(cluster, 4)
|
||||
cluster, session = self._cluster_session_with_lbp(RoundRobinPolicy())
|
||||
self._wait_for_nodes_up(range(1, 5))
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_strategy=[2, 2])
|
||||
self._insert(session, keyspace)
|
||||
@@ -197,7 +216,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
# reset control connection
|
||||
self._insert(session, keyspace, count=1000)
|
||||
|
||||
wait_for_up(cluster, 5)
|
||||
self._wait_for_nodes_up([5])
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
self._query(session, keyspace)
|
||||
@@ -213,14 +232,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
def test_roundrobin_two_dcs_2(self):
|
||||
use_multidc([2, 2])
|
||||
keyspace = 'test_roundrobin_two_dcs_2'
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=RoundRobinPolicy(),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3, wait=False)
|
||||
wait_for_up(cluster, 4)
|
||||
cluster, session = self._cluster_session_with_lbp(RoundRobinPolicy())
|
||||
self._wait_for_nodes_up(range(1, 5))
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_strategy=[2, 2])
|
||||
self._insert(session, keyspace)
|
||||
@@ -237,7 +250,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
# reset control connection
|
||||
self._insert(session, keyspace, count=1000)
|
||||
|
||||
wait_for_up(cluster, 5)
|
||||
self._wait_for_nodes_up([5])
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
self._query(session, keyspace)
|
||||
@@ -253,15 +266,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
def test_dc_aware_roundrobin_two_dcs(self):
|
||||
use_multidc([3, 2])
|
||||
keyspace = 'test_dc_aware_roundrobin_two_dcs'
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=DCAwareRoundRobinPolicy('dc1'),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3, wait=False)
|
||||
wait_for_up(cluster, 4, wait=False)
|
||||
wait_for_up(cluster, 5)
|
||||
cluster, session = self._cluster_session_with_lbp(DCAwareRoundRobinPolicy('dc1'))
|
||||
self._wait_for_nodes_up(range(1, 6))
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_strategy=[2, 2])
|
||||
self._insert(session, keyspace)
|
||||
@@ -278,15 +284,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
def test_dc_aware_roundrobin_two_dcs_2(self):
|
||||
use_multidc([3, 2])
|
||||
keyspace = 'test_dc_aware_roundrobin_two_dcs_2'
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=DCAwareRoundRobinPolicy('dc2'),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3, wait=False)
|
||||
wait_for_up(cluster, 4, wait=False)
|
||||
wait_for_up(cluster, 5)
|
||||
cluster, session = self._cluster_session_with_lbp(DCAwareRoundRobinPolicy('dc2'))
|
||||
self._wait_for_nodes_up(range(1, 6))
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_strategy=[2, 2])
|
||||
self._insert(session, keyspace)
|
||||
@@ -303,14 +302,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
def test_dc_aware_roundrobin_one_remote_host(self):
|
||||
use_multidc([2, 2])
|
||||
keyspace = 'test_dc_aware_roundrobin_one_remote_host'
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=DCAwareRoundRobinPolicy('dc2', used_hosts_per_remote_dc=1),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3, wait=False)
|
||||
wait_for_up(cluster, 4)
|
||||
cluster, session = self._cluster_session_with_lbp(DCAwareRoundRobinPolicy('dc2', used_hosts_per_remote_dc=1))
|
||||
self._wait_for_nodes_up(range(1, 5))
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_strategy=[2, 2])
|
||||
self._insert(session, keyspace)
|
||||
@@ -323,7 +316,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
bootstrap(5, 'dc1')
|
||||
wait_for_up(cluster, 5)
|
||||
self._wait_for_nodes_up([5])
|
||||
|
||||
self._query(session, keyspace)
|
||||
|
||||
@@ -336,8 +329,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
self.coordinator_stats.reset_counts()
|
||||
decommission(3)
|
||||
decommission(4)
|
||||
wait_for_down(cluster, 3, wait=True)
|
||||
wait_for_down(cluster, 4, wait=True)
|
||||
self._wait_for_nodes_down([3, 4])
|
||||
|
||||
self._query(session, keyspace)
|
||||
|
||||
@@ -350,7 +342,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
decommission(5)
|
||||
wait_for_down(cluster, 5, wait=True)
|
||||
self._wait_for_nodes_down([5])
|
||||
|
||||
self._query(session, keyspace)
|
||||
|
||||
@@ -364,7 +356,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
decommission(1)
|
||||
wait_for_down(cluster, 1, wait=True)
|
||||
self._wait_for_nodes_down([1])
|
||||
|
||||
self._query(session, keyspace)
|
||||
|
||||
@@ -395,13 +387,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
def token_aware(self, keyspace, use_prepared=False):
|
||||
use_singledc()
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3)
|
||||
cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy()))
|
||||
self._wait_for_nodes_up([1, 2, 3])
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_factor=1)
|
||||
self._insert(session, keyspace)
|
||||
@@ -420,7 +407,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
force_stop(2)
|
||||
wait_for_down(cluster, 2, wait=True)
|
||||
self._wait_for_nodes_down([2])
|
||||
|
||||
try:
|
||||
self._query(session, keyspace, use_prepared=use_prepared)
|
||||
@@ -432,7 +419,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
start(2)
|
||||
wait_for_up(cluster, 2, wait=True)
|
||||
self._wait_for_nodes_up([2])
|
||||
|
||||
self._query(session, keyspace, use_prepared=use_prepared)
|
||||
|
||||
@@ -442,7 +429,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
stop(2)
|
||||
wait_for_down(cluster, 2, wait=True)
|
||||
self._wait_for_nodes_down([2])
|
||||
|
||||
try:
|
||||
self._query(session, keyspace, use_prepared=use_prepared)
|
||||
@@ -452,9 +439,9 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
start(2)
|
||||
wait_for_up(cluster, 2, wait=True)
|
||||
self._wait_for_nodes_up([2])
|
||||
decommission(2)
|
||||
wait_for_down(cluster, 2, wait=True)
|
||||
self._wait_for_nodes_down([2])
|
||||
|
||||
self._query(session, keyspace, use_prepared=use_prepared)
|
||||
|
||||
@@ -471,13 +458,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
use_singledc()
|
||||
keyspace = 'test_token_aware_composite_key'
|
||||
table = 'composite'
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3)
|
||||
cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy()))
|
||||
self._wait_for_nodes_up([1, 2, 3])
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_factor=2)
|
||||
session.execute('CREATE TABLE %s ('
|
||||
@@ -500,13 +482,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
def test_token_aware_with_rf_2(self, use_prepared=False):
|
||||
use_singledc()
|
||||
keyspace = 'test_token_aware_with_rf_2'
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3)
|
||||
cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy()))
|
||||
self._wait_for_nodes_up([1, 2, 3])
|
||||
|
||||
create_schema(cluster, session, keyspace, replication_factor=2)
|
||||
self._insert(session, keyspace)
|
||||
@@ -518,7 +495,7 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
stop(2)
|
||||
wait_for_down(cluster, 2, wait=True)
|
||||
self._wait_for_nodes_down([2])
|
||||
|
||||
self._query(session, keyspace)
|
||||
|
||||
@@ -530,10 +507,8 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
def test_token_aware_with_local_table(self):
|
||||
use_singledc()
|
||||
cluster = Cluster(
|
||||
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
session = cluster.connect()
|
||||
cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy()))
|
||||
self._wait_for_nodes_up([1, 2, 3])
|
||||
|
||||
p = session.prepare("SELECT * FROM system.local WHERE key=?")
|
||||
# this would blow up prior to 61b4fad
|
||||
@@ -546,13 +521,11 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
use_singledc()
|
||||
keyspace = 'test_white_list'
|
||||
|
||||
cluster = Cluster(('127.0.0.2',),
|
||||
load_balancing_policy=WhiteListRoundRobinPolicy((IP_FORMAT % 2,)),
|
||||
protocol_version=PROTOCOL_VERSION)
|
||||
cluster = Cluster(('127.0.0.2',), load_balancing_policy=WhiteListRoundRobinPolicy((IP_FORMAT % 2,)),
|
||||
protocol_version=PROTOCOL_VERSION, topology_event_refresh_window=0,
|
||||
status_event_refresh_window=0)
|
||||
session = cluster.connect()
|
||||
wait_for_up(cluster, 1, wait=False)
|
||||
wait_for_up(cluster, 2, wait=False)
|
||||
wait_for_up(cluster, 3)
|
||||
self._wait_for_nodes_up([1, 2, 3])
|
||||
|
||||
create_schema(cluster, session, keyspace)
|
||||
self._insert(session, keyspace)
|
||||
@@ -564,12 +537,12 @@ class LoadBalancingPolicyTests(unittest.TestCase):
|
||||
|
||||
# white list policy should not allow reconnecting to ignored hosts
|
||||
force_stop(3)
|
||||
wait_for_down(cluster, 3)
|
||||
self._wait_for_nodes_down([3])
|
||||
self.assertFalse(cluster.metadata._hosts[IP_FORMAT % 3].is_currently_reconnecting())
|
||||
|
||||
self.coordinator_stats.reset_counts()
|
||||
force_stop(2)
|
||||
time.sleep(10)
|
||||
self._wait_for_nodes_down([2])
|
||||
|
||||
try:
|
||||
self._query(session, keyspace)
|
||||
|
||||
@@ -123,14 +123,14 @@ def bootstrap(node, data_center=None, token=None):
|
||||
|
||||
|
||||
def ring(node):
|
||||
print('From node%s:' % node)
|
||||
get_node(node).nodetool('ring')
|
||||
|
||||
|
||||
def wait_for_up(cluster, node, wait=True):
|
||||
def wait_for_up(cluster, node):
|
||||
tries = 0
|
||||
addr = IP_FORMAT % node
|
||||
while tries < 100:
|
||||
host = cluster.metadata.get_host(IP_FORMAT % node)
|
||||
host = cluster.metadata.get_host(addr)
|
||||
if host and host.is_up:
|
||||
log.debug("Done waiting for node %s to be up", node)
|
||||
return
|
||||
@@ -139,10 +139,11 @@ def wait_for_up(cluster, node, wait=True):
|
||||
tries += 1
|
||||
time.sleep(1)
|
||||
|
||||
raise RuntimeError("Host {0} is not up after 100 attempts".format(IP_FORMAT.format(node)))
|
||||
# todo: don't mix string interpolation methods in the same package
|
||||
raise RuntimeError("Host {0} is not up after {1} attempts".format(addr, tries))
|
||||
|
||||
|
||||
def wait_for_down(cluster, node, wait=True):
|
||||
def wait_for_down(cluster, node):
|
||||
log.debug("Waiting for node %s to be down", node)
|
||||
tries = 0
|
||||
while tries < 100:
|
||||
@@ -155,4 +156,4 @@ def wait_for_down(cluster, node, wait=True):
|
||||
tries += 1
|
||||
time.sleep(1)
|
||||
|
||||
raise RuntimeError("Host {0} is not down after 100 attempts".format(IP_FORMAT.format(node)))
|
||||
raise RuntimeError("Host {0} is not down after {1} attempts".format(addr, tries))
|
||||
|
||||
Reference in New Issue
Block a user