diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1b9f04bf..d6a7d66e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -21,6 +21,8 @@ Bug Fixes if the callback was added outside of the event loop thread (github issue #95) * Properly escape keyspace name in Session.set_keyspace(). Previously, the keyspace name was quoted, but any quotes in the string were not escaped. +* Avoid adding hosts to the load balancing policy before their datacenter + and rack information has been set, if possible. Other ----- diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 6ccf4264..cb9f4459 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -759,13 +759,13 @@ class Cluster(object): self.on_down(host, is_host_addition, force_if_down=True) return is_down - def add_host(self, address, signal): + def add_host(self, address, datacenter=None, rack=None, signal=True): """ Called when adding initial contact points and when the control connection subsequently discovers a new node. Intended for internal use only. """ - new_host = self.metadata.add_host(address) + new_host = self.metadata.add_host(address, datacenter, rack) if new_host and signal: log.info("New Cassandra host %s added", address) self.on_add(new_host) @@ -1580,10 +1580,13 @@ class ControlConnection(object): found_hosts.add(addr) host = self._cluster.metadata.get_host(addr) + datacenter = row.get("data_center") + rack = row.get("rack") if host is None: log.debug("[control connection] Found new host to connect to: %s", addr) - host = self._cluster.add_host(addr, signal=True) - host.set_location_info(row.get("data_center"), row.get("rack")) + host = self._cluster.add_host(addr, datacenter, rack, signal=True) + else: + self._update_location_info(host, datacenter, rack) tokens = row.get("tokens") if partitioner and tokens: @@ -1600,11 +1603,22 @@ class ControlConnection(object): log.debug("[control connection] Fetched ring info, rebuilding metadata") self._cluster.metadata.rebuild_token_map(partitioner, token_map) + def _update_location_info(self, host, datacenter, rack): + if host.datacenter == datacenter and host.rack == rack: + return + + # If the dc/rack information changes, we need to update the load balancing policy. + # For that, we remove and re-add the node against the policy. Not the most elegant, and assumes + # that the policy will update correctly, but in practice this should work. + self._cluster.load_balancing_policy.on_down(host) + host.set_location_info(datacenter, rack) + self._cluster.load_balancing_policy.on_up(host) + def _handle_topology_change(self, event): change_type = event["change_type"] addr, port = event["address"] if change_type == "NEW_NODE": - self._cluster.scheduler.schedule(10, self._cluster.add_host, addr, signal=True) + self._cluster.scheduler.schedule(10, self.refresh_node_list_and_token_map) elif change_type == "REMOVED_NODE": host = self._cluster.metadata.get_host(addr) self._cluster.scheduler.schedule(0, self._cluster.remove_host, host) @@ -1618,7 +1632,7 @@ class ControlConnection(object): if change_type == "UP": if host is None: # this is the first time we've seen the node - self._cluster.scheduler.schedule(1, self._cluster.add_host, addr, signal=True) + self._cluster.scheduler.schedule(1, self.refresh_node_list_and_token_map) else: # this will be run by the scheduler self._cluster.scheduler.schedule(1, self._cluster.on_up, host) diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 172c0056..3d1814d4 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -348,11 +348,12 @@ class Metadata(object): else: return True - def add_host(self, address): + def add_host(self, address, datacenter, rack): cluster = self.cluster_ref() with self._hosts_lock: if address not in self._hosts: - new_host = Host(address, cluster.conviction_policy_factory) + new_host = Host( + address, cluster.conviction_policy_factory, datacenter, rack) self._hosts[address] = new_host else: return None diff --git a/cassandra/policies.py b/cassandra/policies.py index 44fdae23..a563d7f2 100644 --- a/cassandra/policies.py +++ b/cassandra/policies.py @@ -134,6 +134,7 @@ class RoundRobinPolicy(LoadBalancingPolicy): This load balancing policy is used by default. """ + _live_hosts = frozenset(()) def populate(self, cluster, hosts): self._live_hosts = frozenset(hosts) diff --git a/cassandra/pool.py b/cassandra/pool.py index 6b12caa9..2f218af5 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -57,7 +57,7 @@ class Host(object): _currently_handling_node_up = False _handle_node_up_condition = None - def __init__(self, inet_address, conviction_policy_factory): + def __init__(self, inet_address, conviction_policy_factory, datacenter=None, rack=None): if inet_address is None: raise ValueError("inet_address may not be None") if conviction_policy_factory is None: @@ -65,6 +65,7 @@ class Host(object): self.address = inet_address self.conviction_policy = conviction_policy_factory(self) + self.set_location_info(datacenter, rack) self.lock = RLock() self._handle_node_up_condition = Condition() diff --git a/tests/unit/test_control_connection.py b/tests/unit/test_control_connection.py index 18aadfee..604034cc 100644 --- a/tests/unit/test_control_connection.py +++ b/tests/unit/test_control_connection.py @@ -59,8 +59,8 @@ class MockCluster(object): self.scheduler = Mock(spec=_Scheduler) self.executor = Mock(spec=ThreadPoolExecutor) - def add_host(self, address, signal=False): - host = Host(address, SimpleConvictionPolicy) + def add_host(self, address, datacenter, rack, signal=False): + host = Host(address, SimpleConvictionPolicy, datacenter, rack) self.added_hosts.append(host) return host @@ -212,6 +212,7 @@ class ControlConnectionTest(unittest.TestCase): self.connection.peer_results[1].append( ["192.168.1.3", "10.0.0.3", "a", "dc1", "rack1", ["3", "103", "203"]] ) + self.cluster.scheduler.schedule = lambda delay, f, *args, **kwargs: f(*args, **kwargs) self.control_connection.refresh_node_list_and_token_map() self.assertEqual(1, len(self.cluster.added_hosts)) self.assertEqual(self.cluster.added_hosts[0].address, "192.168.1.3") @@ -250,7 +251,7 @@ class ControlConnectionTest(unittest.TestCase): 'address': ('1.2.3.4', 9000) } self.control_connection._handle_topology_change(event) - self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.add_host, '1.2.3.4', signal=True) + self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map) event = { 'change_type': 'REMOVED_NODE', @@ -272,7 +273,7 @@ class ControlConnectionTest(unittest.TestCase): 'address': ('1.2.3.4', 9000) } self.control_connection._handle_status_change(event) - self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.add_host, '1.2.3.4', signal=True) + self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map) # do the same with a known Host event = {