diff --git a/tests/integration/long/test_consistency.py b/tests/integration/long/test_consistency.py index 43734c7d..538d924a 100644 --- a/tests/integration/long/test_consistency.py +++ b/tests/integration/long/test_consistency.py @@ -131,7 +131,7 @@ class ConsistencyTests(unittest.TestCase): load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), protocol_version=PROTOCOL_VERSION) session = cluster.connect() - wait_for_up(cluster, 1, wait=False) + wait_for_up(cluster, 1) wait_for_up(cluster, 2) create_schema(cluster, session, keyspace, replication_factor=rf) @@ -183,7 +183,7 @@ class ConsistencyTests(unittest.TestCase): load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), protocol_version=PROTOCOL_VERSION) session = cluster.connect() - wait_for_up(cluster, 1, wait=False) + wait_for_up(cluster, 1) wait_for_up(cluster, 2) create_schema(cluster, session, keyspace, replication_factor=3) diff --git a/tests/integration/long/test_loadbalancingpolicies.py b/tests/integration/long/test_loadbalancingpolicies.py index f0525317..54ed0d0a 100644 --- a/tests/integration/long/test_loadbalancingpolicies.py +++ b/tests/integration/long/test_loadbalancingpolicies.py @@ -43,11 +43,40 @@ class LoadBalancingPolicyTests(unittest.TestCase): remove_cluster() # clear ahead of test so it doesn't use one left in unknown state self.coordinator_stats = CoordinatorStats() self.prepared = None + self.probe_cluster = None + + def tearDown(self): + if self.probe_cluster: + self.probe_cluster.shutdown() @classmethod def teardown_class(cls): remove_cluster() + def _connect_probe_cluster(self): + if not self.probe_cluster: + # distinct cluster so we can see the status of nodes ignored by the LBP being tested + self.probe_cluster = Cluster(load_balancing_policy=RoundRobinPolicy(), + schema_metadata_enabled=False, token_metadata_enabled=False) + self.probe_session = self.probe_cluster.connect() + + def _wait_for_nodes_up(self, nodes): + self._connect_probe_cluster() + for n in nodes: + wait_for_up(self.probe_cluster, n) + + def _wait_for_nodes_down(self, nodes): + self._connect_probe_cluster() + for n in nodes: + wait_for_down(self.probe_cluster, n) + + def _cluster_session_with_lbp(self, lbp): + # create a cluster with no delay on events + cluster = Cluster(load_balancing_policy=lbp, protocol_version=PROTOCOL_VERSION, + topology_event_refresh_window=0, status_event_refresh_window=0) + session = cluster.connect() + return cluster, session + def _insert(self, session, keyspace, count=12, consistency_level=ConsistencyLevel.ONE): session.execute('USE %s' % keyspace) @@ -132,13 +161,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): def test_roundrobin(self): use_singledc() keyspace = 'test_roundrobin' - cluster = Cluster( - load_balancing_policy=RoundRobinPolicy(), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3) + cluster, session = self._cluster_session_with_lbp(RoundRobinPolicy()) + self._wait_for_nodes_up(range(1, 4)) create_schema(cluster, session, keyspace, replication_factor=3) self._insert(session, keyspace) @@ -149,7 +173,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.assert_query_count_equals(self, 3, 4) force_stop(3) - wait_for_down(cluster, 3) + self._wait_for_nodes_down([3]) self.coordinator_stats.reset_counts() self._query(session, keyspace) @@ -160,8 +184,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): decommission(1) start(3) - wait_for_down(cluster, 1) - wait_for_up(cluster, 3) + self._wait_for_nodes_down([1]) + self._wait_for_nodes_up([3]) self.coordinator_stats.reset_counts() self._query(session, keyspace) @@ -169,18 +193,13 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.assert_query_count_equals(self, 1, 0) self.coordinator_stats.assert_query_count_equals(self, 2, 6) self.coordinator_stats.assert_query_count_equals(self, 3, 6) + cluster.shutdown() def test_roundrobin_two_dcs(self): use_multidc([2, 2]) keyspace = 'test_roundrobin_two_dcs' - cluster = Cluster( - load_balancing_policy=RoundRobinPolicy(), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3, wait=False) - wait_for_up(cluster, 4) + cluster, session = self._cluster_session_with_lbp(RoundRobinPolicy()) + self._wait_for_nodes_up(range(1, 5)) create_schema(cluster, session, keyspace, replication_strategy=[2, 2]) self._insert(session, keyspace) @@ -197,7 +216,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): # reset control connection self._insert(session, keyspace, count=1000) - wait_for_up(cluster, 5) + self._wait_for_nodes_up([5]) self.coordinator_stats.reset_counts() self._query(session, keyspace) @@ -213,14 +232,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): def test_roundrobin_two_dcs_2(self): use_multidc([2, 2]) keyspace = 'test_roundrobin_two_dcs_2' - cluster = Cluster( - load_balancing_policy=RoundRobinPolicy(), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3, wait=False) - wait_for_up(cluster, 4) + cluster, session = self._cluster_session_with_lbp(RoundRobinPolicy()) + self._wait_for_nodes_up(range(1, 5)) create_schema(cluster, session, keyspace, replication_strategy=[2, 2]) self._insert(session, keyspace) @@ -237,7 +250,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): # reset control connection self._insert(session, keyspace, count=1000) - wait_for_up(cluster, 5) + self._wait_for_nodes_up([5]) self.coordinator_stats.reset_counts() self._query(session, keyspace) @@ -253,15 +266,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): def test_dc_aware_roundrobin_two_dcs(self): use_multidc([3, 2]) keyspace = 'test_dc_aware_roundrobin_two_dcs' - cluster = Cluster( - load_balancing_policy=DCAwareRoundRobinPolicy('dc1'), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3, wait=False) - wait_for_up(cluster, 4, wait=False) - wait_for_up(cluster, 5) + cluster, session = self._cluster_session_with_lbp(DCAwareRoundRobinPolicy('dc1')) + self._wait_for_nodes_up(range(1, 6)) create_schema(cluster, session, keyspace, replication_strategy=[2, 2]) self._insert(session, keyspace) @@ -278,15 +284,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): def test_dc_aware_roundrobin_two_dcs_2(self): use_multidc([3, 2]) keyspace = 'test_dc_aware_roundrobin_two_dcs_2' - cluster = Cluster( - load_balancing_policy=DCAwareRoundRobinPolicy('dc2'), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3, wait=False) - wait_for_up(cluster, 4, wait=False) - wait_for_up(cluster, 5) + cluster, session = self._cluster_session_with_lbp(DCAwareRoundRobinPolicy('dc2')) + self._wait_for_nodes_up(range(1, 6)) create_schema(cluster, session, keyspace, replication_strategy=[2, 2]) self._insert(session, keyspace) @@ -303,14 +302,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): def test_dc_aware_roundrobin_one_remote_host(self): use_multidc([2, 2]) keyspace = 'test_dc_aware_roundrobin_one_remote_host' - cluster = Cluster( - load_balancing_policy=DCAwareRoundRobinPolicy('dc2', used_hosts_per_remote_dc=1), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3, wait=False) - wait_for_up(cluster, 4) + cluster, session = self._cluster_session_with_lbp(DCAwareRoundRobinPolicy('dc2', used_hosts_per_remote_dc=1)) + self._wait_for_nodes_up(range(1, 5)) create_schema(cluster, session, keyspace, replication_strategy=[2, 2]) self._insert(session, keyspace) @@ -323,7 +316,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.reset_counts() bootstrap(5, 'dc1') - wait_for_up(cluster, 5) + self._wait_for_nodes_up([5]) self._query(session, keyspace) @@ -336,8 +329,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.reset_counts() decommission(3) decommission(4) - wait_for_down(cluster, 3, wait=True) - wait_for_down(cluster, 4, wait=True) + self._wait_for_nodes_down([3, 4]) self._query(session, keyspace) @@ -350,7 +342,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.reset_counts() decommission(5) - wait_for_down(cluster, 5, wait=True) + self._wait_for_nodes_down([5]) self._query(session, keyspace) @@ -364,7 +356,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.reset_counts() decommission(1) - wait_for_down(cluster, 1, wait=True) + self._wait_for_nodes_down([1]) self._query(session, keyspace) @@ -395,13 +387,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): def token_aware(self, keyspace, use_prepared=False): use_singledc() - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3) + cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy())) + self._wait_for_nodes_up([1, 2, 3]) create_schema(cluster, session, keyspace, replication_factor=1) self._insert(session, keyspace) @@ -420,7 +407,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.reset_counts() force_stop(2) - wait_for_down(cluster, 2, wait=True) + self._wait_for_nodes_down([2]) try: self._query(session, keyspace, use_prepared=use_prepared) @@ -432,7 +419,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.reset_counts() start(2) - wait_for_up(cluster, 2, wait=True) + self._wait_for_nodes_up([2]) self._query(session, keyspace, use_prepared=use_prepared) @@ -442,7 +429,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.reset_counts() stop(2) - wait_for_down(cluster, 2, wait=True) + self._wait_for_nodes_down([2]) try: self._query(session, keyspace, use_prepared=use_prepared) @@ -452,9 +439,9 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.reset_counts() start(2) - wait_for_up(cluster, 2, wait=True) + self._wait_for_nodes_up([2]) decommission(2) - wait_for_down(cluster, 2, wait=True) + self._wait_for_nodes_down([2]) self._query(session, keyspace, use_prepared=use_prepared) @@ -471,13 +458,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): use_singledc() keyspace = 'test_token_aware_composite_key' table = 'composite' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3) + cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy())) + self._wait_for_nodes_up([1, 2, 3]) create_schema(cluster, session, keyspace, replication_factor=2) session.execute('CREATE TABLE %s (' @@ -500,13 +482,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): def test_token_aware_with_rf_2(self, use_prepared=False): use_singledc() keyspace = 'test_token_aware_with_rf_2' - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3) + cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy())) + self._wait_for_nodes_up([1, 2, 3]) create_schema(cluster, session, keyspace, replication_factor=2) self._insert(session, keyspace) @@ -518,7 +495,7 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.reset_counts() stop(2) - wait_for_down(cluster, 2, wait=True) + self._wait_for_nodes_down([2]) self._query(session, keyspace) @@ -530,10 +507,8 @@ class LoadBalancingPolicyTests(unittest.TestCase): def test_token_aware_with_local_table(self): use_singledc() - cluster = Cluster( - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - protocol_version=PROTOCOL_VERSION) - session = cluster.connect() + cluster, session = self._cluster_session_with_lbp(TokenAwarePolicy(RoundRobinPolicy())) + self._wait_for_nodes_up([1, 2, 3]) p = session.prepare("SELECT * FROM system.local WHERE key=?") # this would blow up prior to 61b4fad @@ -546,13 +521,11 @@ class LoadBalancingPolicyTests(unittest.TestCase): use_singledc() keyspace = 'test_white_list' - cluster = Cluster(('127.0.0.2',), - load_balancing_policy=WhiteListRoundRobinPolicy((IP_FORMAT % 2,)), - protocol_version=PROTOCOL_VERSION) + cluster = Cluster(('127.0.0.2',), load_balancing_policy=WhiteListRoundRobinPolicy((IP_FORMAT % 2,)), + protocol_version=PROTOCOL_VERSION, topology_event_refresh_window=0, + status_event_refresh_window=0) session = cluster.connect() - wait_for_up(cluster, 1, wait=False) - wait_for_up(cluster, 2, wait=False) - wait_for_up(cluster, 3) + self._wait_for_nodes_up([1, 2, 3]) create_schema(cluster, session, keyspace) self._insert(session, keyspace) @@ -564,12 +537,12 @@ class LoadBalancingPolicyTests(unittest.TestCase): # white list policy should not allow reconnecting to ignored hosts force_stop(3) - wait_for_down(cluster, 3) + self._wait_for_nodes_down([3]) self.assertFalse(cluster.metadata._hosts[IP_FORMAT % 3].is_currently_reconnecting()) self.coordinator_stats.reset_counts() force_stop(2) - time.sleep(10) + self._wait_for_nodes_down([2]) try: self._query(session, keyspace) diff --git a/tests/integration/long/utils.py b/tests/integration/long/utils.py index f850d6a1..bd48e96f 100644 --- a/tests/integration/long/utils.py +++ b/tests/integration/long/utils.py @@ -123,14 +123,14 @@ def bootstrap(node, data_center=None, token=None): def ring(node): - print('From node%s:' % node) get_node(node).nodetool('ring') -def wait_for_up(cluster, node, wait=True): +def wait_for_up(cluster, node): tries = 0 + addr = IP_FORMAT % node while tries < 100: - host = cluster.metadata.get_host(IP_FORMAT % node) + host = cluster.metadata.get_host(addr) if host and host.is_up: log.debug("Done waiting for node %s to be up", node) return @@ -139,10 +139,11 @@ def wait_for_up(cluster, node, wait=True): tries += 1 time.sleep(1) - raise RuntimeError("Host {0} is not up after 100 attempts".format(IP_FORMAT.format(node))) + # todo: don't mix string interpolation methods in the same package + raise RuntimeError("Host {0} is not up after {1} attempts".format(addr, tries)) -def wait_for_down(cluster, node, wait=True): +def wait_for_down(cluster, node): log.debug("Waiting for node %s to be down", node) tries = 0 while tries < 100: @@ -155,4 +156,4 @@ def wait_for_down(cluster, node, wait=True): tries += 1 time.sleep(1) - raise RuntimeError("Host {0} is not down after 100 attempts".format(IP_FORMAT.format(node))) + raise RuntimeError("Host {0} is not down after {1} attempts".format(addr, tries))