Avoid adding hosts to load balancing policy without DC/rack info
This commit is contained in:
@@ -21,6 +21,8 @@ Bug Fixes
|
||||
if the callback was added outside of the event loop thread (github issue #95)
|
||||
* Properly escape keyspace name in Session.set_keyspace(). Previously, the
|
||||
keyspace name was quoted, but any quotes in the string were not escaped.
|
||||
* Avoid adding hosts to the load balancing policy before their datacenter
|
||||
and rack information has been set, if possible.
|
||||
|
||||
Other
|
||||
-----
|
||||
|
||||
@@ -759,13 +759,13 @@ class Cluster(object):
|
||||
self.on_down(host, is_host_addition, force_if_down=True)
|
||||
return is_down
|
||||
|
||||
def add_host(self, address, signal):
|
||||
def add_host(self, address, datacenter=None, rack=None, signal=True):
|
||||
"""
|
||||
Called when adding initial contact points and when the control
|
||||
connection subsequently discovers a new node. Intended for internal
|
||||
use only.
|
||||
"""
|
||||
new_host = self.metadata.add_host(address)
|
||||
new_host = self.metadata.add_host(address, datacenter, rack)
|
||||
if new_host and signal:
|
||||
log.info("New Cassandra host %s added", address)
|
||||
self.on_add(new_host)
|
||||
@@ -1580,10 +1580,13 @@ class ControlConnection(object):
|
||||
found_hosts.add(addr)
|
||||
|
||||
host = self._cluster.metadata.get_host(addr)
|
||||
datacenter = row.get("data_center")
|
||||
rack = row.get("rack")
|
||||
if host is None:
|
||||
log.debug("[control connection] Found new host to connect to: %s", addr)
|
||||
host = self._cluster.add_host(addr, signal=True)
|
||||
host.set_location_info(row.get("data_center"), row.get("rack"))
|
||||
host = self._cluster.add_host(addr, datacenter, rack, signal=True)
|
||||
else:
|
||||
self._update_location_info(host, datacenter, rack)
|
||||
|
||||
tokens = row.get("tokens")
|
||||
if partitioner and tokens:
|
||||
@@ -1600,11 +1603,22 @@ class ControlConnection(object):
|
||||
log.debug("[control connection] Fetched ring info, rebuilding metadata")
|
||||
self._cluster.metadata.rebuild_token_map(partitioner, token_map)
|
||||
|
||||
def _update_location_info(self, host, datacenter, rack):
|
||||
if host.datacenter == datacenter and host.rack == rack:
|
||||
return
|
||||
|
||||
# If the dc/rack information changes, we need to update the load balancing policy.
|
||||
# For that, we remove and re-add the node against the policy. Not the most elegant, and assumes
|
||||
# that the policy will update correctly, but in practice this should work.
|
||||
self._cluster.load_balancing_policy.on_down(host)
|
||||
host.set_location_info(datacenter, rack)
|
||||
self._cluster.load_balancing_policy.on_up(host)
|
||||
|
||||
def _handle_topology_change(self, event):
|
||||
change_type = event["change_type"]
|
||||
addr, port = event["address"]
|
||||
if change_type == "NEW_NODE":
|
||||
self._cluster.scheduler.schedule(10, self._cluster.add_host, addr, signal=True)
|
||||
self._cluster.scheduler.schedule(10, self.refresh_node_list_and_token_map)
|
||||
elif change_type == "REMOVED_NODE":
|
||||
host = self._cluster.metadata.get_host(addr)
|
||||
self._cluster.scheduler.schedule(0, self._cluster.remove_host, host)
|
||||
@@ -1618,7 +1632,7 @@ class ControlConnection(object):
|
||||
if change_type == "UP":
|
||||
if host is None:
|
||||
# this is the first time we've seen the node
|
||||
self._cluster.scheduler.schedule(1, self._cluster.add_host, addr, signal=True)
|
||||
self._cluster.scheduler.schedule(1, self.refresh_node_list_and_token_map)
|
||||
else:
|
||||
# this will be run by the scheduler
|
||||
self._cluster.scheduler.schedule(1, self._cluster.on_up, host)
|
||||
|
||||
@@ -348,11 +348,12 @@ class Metadata(object):
|
||||
else:
|
||||
return True
|
||||
|
||||
def add_host(self, address):
|
||||
def add_host(self, address, datacenter, rack):
|
||||
cluster = self.cluster_ref()
|
||||
with self._hosts_lock:
|
||||
if address not in self._hosts:
|
||||
new_host = Host(address, cluster.conviction_policy_factory)
|
||||
new_host = Host(
|
||||
address, cluster.conviction_policy_factory, datacenter, rack)
|
||||
self._hosts[address] = new_host
|
||||
else:
|
||||
return None
|
||||
|
||||
@@ -134,6 +134,7 @@ class RoundRobinPolicy(LoadBalancingPolicy):
|
||||
|
||||
This load balancing policy is used by default.
|
||||
"""
|
||||
_live_hosts = frozenset(())
|
||||
|
||||
def populate(self, cluster, hosts):
|
||||
self._live_hosts = frozenset(hosts)
|
||||
|
||||
@@ -57,7 +57,7 @@ class Host(object):
|
||||
_currently_handling_node_up = False
|
||||
_handle_node_up_condition = None
|
||||
|
||||
def __init__(self, inet_address, conviction_policy_factory):
|
||||
def __init__(self, inet_address, conviction_policy_factory, datacenter=None, rack=None):
|
||||
if inet_address is None:
|
||||
raise ValueError("inet_address may not be None")
|
||||
if conviction_policy_factory is None:
|
||||
@@ -65,6 +65,7 @@ class Host(object):
|
||||
|
||||
self.address = inet_address
|
||||
self.conviction_policy = conviction_policy_factory(self)
|
||||
self.set_location_info(datacenter, rack)
|
||||
self.lock = RLock()
|
||||
self._handle_node_up_condition = Condition()
|
||||
|
||||
|
||||
@@ -59,8 +59,8 @@ class MockCluster(object):
|
||||
self.scheduler = Mock(spec=_Scheduler)
|
||||
self.executor = Mock(spec=ThreadPoolExecutor)
|
||||
|
||||
def add_host(self, address, signal=False):
|
||||
host = Host(address, SimpleConvictionPolicy)
|
||||
def add_host(self, address, datacenter, rack, signal=False):
|
||||
host = Host(address, SimpleConvictionPolicy, datacenter, rack)
|
||||
self.added_hosts.append(host)
|
||||
return host
|
||||
|
||||
@@ -212,6 +212,7 @@ class ControlConnectionTest(unittest.TestCase):
|
||||
self.connection.peer_results[1].append(
|
||||
["192.168.1.3", "10.0.0.3", "a", "dc1", "rack1", ["3", "103", "203"]]
|
||||
)
|
||||
self.cluster.scheduler.schedule = lambda delay, f, *args, **kwargs: f(*args, **kwargs)
|
||||
self.control_connection.refresh_node_list_and_token_map()
|
||||
self.assertEqual(1, len(self.cluster.added_hosts))
|
||||
self.assertEqual(self.cluster.added_hosts[0].address, "192.168.1.3")
|
||||
@@ -250,7 +251,7 @@ class ControlConnectionTest(unittest.TestCase):
|
||||
'address': ('1.2.3.4', 9000)
|
||||
}
|
||||
self.control_connection._handle_topology_change(event)
|
||||
self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.add_host, '1.2.3.4', signal=True)
|
||||
self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map)
|
||||
|
||||
event = {
|
||||
'change_type': 'REMOVED_NODE',
|
||||
@@ -272,7 +273,7 @@ class ControlConnectionTest(unittest.TestCase):
|
||||
'address': ('1.2.3.4', 9000)
|
||||
}
|
||||
self.control_connection._handle_status_change(event)
|
||||
self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.add_host, '1.2.3.4', signal=True)
|
||||
self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map)
|
||||
|
||||
# do the same with a known Host
|
||||
event = {
|
||||
|
||||
Reference in New Issue
Block a user