Ensure token map is rebuilt when nodes are removed
This commit is contained in:
@@ -14,6 +14,8 @@ Bug Fixes
|
||||
recommends
|
||||
* Fix memory leak when libev connections are created and
|
||||
destroyed (github #93)
|
||||
* Ensure token map is rebuilt when hosts are removed from
|
||||
the cluster
|
||||
|
||||
2.0.1
|
||||
=====
|
||||
|
||||
@@ -1691,17 +1691,18 @@ class ControlConnection(object):
|
||||
else:
|
||||
self._cluster.metadata.rebuild_schema(ks_result, cf_result, col_result)
|
||||
|
||||
def refresh_node_list_and_token_map(self):
|
||||
def refresh_node_list_and_token_map(self, force_token_rebuild=False):
|
||||
try:
|
||||
if self._connection:
|
||||
self._refresh_node_list_and_token_map(self._connection)
|
||||
self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild)
|
||||
except ReferenceError:
|
||||
pass # our weak reference to the Cluster is no good
|
||||
except Exception:
|
||||
log.debug("[control connection] Error refreshing node list and token map", exc_info=True)
|
||||
self._signal_error()
|
||||
|
||||
def _refresh_node_list_and_token_map(self, connection, preloaded_results=None):
|
||||
def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
|
||||
force_token_rebuild=False):
|
||||
if preloaded_results:
|
||||
log.debug("[control connection] Refreshing node list and token map using preloaded results")
|
||||
peers_result = preloaded_results[0]
|
||||
@@ -1736,7 +1737,7 @@ class ControlConnection(object):
|
||||
if partitioner and tokens:
|
||||
token_map[host] = tokens
|
||||
|
||||
should_rebuild_token_map = False
|
||||
should_rebuild_token_map = force_token_rebuild
|
||||
found_hosts = set()
|
||||
for row in peers_result:
|
||||
addr = row.get("rpc_address")
|
||||
@@ -1762,12 +1763,11 @@ class ControlConnection(object):
|
||||
token_map[host] = tokens
|
||||
|
||||
for old_host in self._cluster.metadata.all_hosts():
|
||||
if old_host.address != connection.host and \
|
||||
old_host.address not in found_hosts and \
|
||||
old_host.address not in self._cluster.contact_points:
|
||||
log.debug("[control connection] Found host that has been removed: %r", old_host)
|
||||
if old_host.address != connection.host and old_host.address not in found_hosts:
|
||||
should_rebuild_token_map = True
|
||||
self._cluster.remove_host(old_host)
|
||||
if old_host.address not in self._cluster.contact_points:
|
||||
log.debug("[control connection] Found host that has been removed: %r", old_host)
|
||||
self._cluster.remove_host(old_host)
|
||||
|
||||
log.debug("[control connection] Finished fetching ring info")
|
||||
if partitioner and should_rebuild_token_map:
|
||||
@@ -1946,10 +1946,10 @@ class ControlConnection(object):
|
||||
self.reconnect()
|
||||
|
||||
def on_add(self, host):
|
||||
self.refresh_node_list_and_token_map()
|
||||
self.refresh_node_list_and_token_map(force_token_rebuild=True)
|
||||
|
||||
def on_remove(self, host):
|
||||
self.refresh_node_list_and_token_map()
|
||||
self.refresh_node_list_and_token_map(force_token_rebuild=True)
|
||||
|
||||
|
||||
def _stop_scheduler(scheduler, thread):
|
||||
|
||||
Reference in New Issue
Block a user