Merge branch '1.x'
This commit is contained in:
@@ -30,6 +30,9 @@ Bug Fixes
|
|||||||
* Avoid submitting tasks to the ThreadPoolExecutor after shutdown. With
|
* Avoid submitting tasks to the ThreadPoolExecutor after shutdown. With
|
||||||
retries enabled, this could cause Cluster.shutdown() to hang under
|
retries enabled, this could cause Cluster.shutdown() to hang under
|
||||||
some circumstances.
|
some circumstances.
|
||||||
|
* Fix unintended rebuild of token replica map when keyspaces are
|
||||||
|
discovered (on startup), added, or updated and TokenAwarePolicy is not
|
||||||
|
in use.
|
||||||
|
|
||||||
Other
|
Other
|
||||||
^^^^^
|
^^^^^
|
||||||
|
|||||||
@@ -130,7 +130,7 @@ def run_in_executor(f):
|
|||||||
@wraps(f)
|
@wraps(f)
|
||||||
def new_f(self, *args, **kwargs):
|
def new_f(self, *args, **kwargs):
|
||||||
|
|
||||||
if self.is_shutdown:
|
if self._is_shutdown:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
future = self.executor.submit(f, self, *args, **kwargs)
|
future = self.executor.submit(f, self, *args, **kwargs)
|
||||||
@@ -825,7 +825,7 @@ class Cluster(object):
|
|||||||
"""
|
"""
|
||||||
new_host = self.metadata.add_host(address, datacenter, rack)
|
new_host = self.metadata.add_host(address, datacenter, rack)
|
||||||
if new_host and signal:
|
if new_host and signal:
|
||||||
log.info("New Cassandra host %s added", address)
|
log.info("New Cassandra host %r discovered", new_host)
|
||||||
self.on_add(new_host)
|
self.on_add(new_host)
|
||||||
|
|
||||||
return new_host
|
return new_host
|
||||||
|
|||||||
@@ -179,11 +179,11 @@ class Metadata(object):
|
|||||||
|
|
||||||
def _keyspace_added(self, ksname):
|
def _keyspace_added(self, ksname):
|
||||||
if self.token_map:
|
if self.token_map:
|
||||||
self.token_map.rebuild_keyspace(ksname)
|
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
|
||||||
|
|
||||||
def _keyspace_updated(self, ksname):
|
def _keyspace_updated(self, ksname):
|
||||||
if self.token_map:
|
if self.token_map:
|
||||||
self.token_map.rebuild_keyspace(ksname)
|
self.token_map.rebuild_keyspace(ksname, build_if_absent=False)
|
||||||
|
|
||||||
def _keyspace_removed(self, ksname):
|
def _keyspace_removed(self, ksname):
|
||||||
if self.token_map:
|
if self.token_map:
|
||||||
@@ -953,10 +953,14 @@ class TokenMap(object):
|
|||||||
|
|
||||||
self.tokens_to_hosts_by_ks = {}
|
self.tokens_to_hosts_by_ks = {}
|
||||||
self._metadata = metadata
|
self._metadata = metadata
|
||||||
|
self._rebuild_lock = RLock()
|
||||||
|
|
||||||
def rebuild_keyspace(self, keyspace):
|
def rebuild_keyspace(self, keyspace, build_if_absent=False):
|
||||||
self.tokens_to_hosts_by_ks[keyspace] = \
|
with self._rebuild_lock:
|
||||||
self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace])
|
current = self.tokens_to_hosts_by_ks.get(keyspace, None)
|
||||||
|
if (build_if_absent and current is None) or (not build_if_absent and current is not None):
|
||||||
|
replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace])
|
||||||
|
self.tokens_to_hosts_by_ks[keyspace] = replica_map
|
||||||
|
|
||||||
def replica_map_for_keyspace(self, ks_metadata):
|
def replica_map_for_keyspace(self, ks_metadata):
|
||||||
strategy = ks_metadata.replication_strategy
|
strategy = ks_metadata.replication_strategy
|
||||||
@@ -975,7 +979,7 @@ class TokenMap(object):
|
|||||||
"""
|
"""
|
||||||
tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None)
|
tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None)
|
||||||
if tokens_to_hosts is None:
|
if tokens_to_hosts is None:
|
||||||
self.rebuild_keyspace(keyspace)
|
self.rebuild_keyspace(keyspace, build_if_absent=True)
|
||||||
tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None)
|
tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None)
|
||||||
if tokens_to_hosts is None:
|
if tokens_to_hosts is None:
|
||||||
return []
|
return []
|
||||||
|
|||||||
Reference in New Issue
Block a user