Merge pull request #570 from datastax/531

PYTHON-531 - don't stall schema agreement on ignored hosts
This commit is contained in:
Adam Holmberg
2016-05-02 16:21:40 -05:00

View File

@@ -1161,7 +1161,7 @@ class Cluster(object):
if distance == HostDistance.IGNORED: if distance == HostDistance.IGNORED:
log.debug("Not adding connection pool for new host %r because the " log.debug("Not adding connection pool for new host %r because the "
"load balancing policy has marked it as IGNORED", host) "load balancing policy has marked it as IGNORED", host)
self._finalize_add(host) self._finalize_add(host, set_up=False)
return return
futures_lock = Lock() futures_lock = Lock()
@@ -1203,9 +1203,10 @@ class Cluster(object):
if not have_future: if not have_future:
self._finalize_add(host) self._finalize_add(host)
def _finalize_add(self, host): def _finalize_add(self, host, set_up=True):
# mark the host as up and notify all listeners if set_up:
host.set_up() host.set_up()
for listener in self.listeners: for listener in self.listeners:
listener.on_add(host) listener.on_add(host)
@@ -2568,13 +2569,14 @@ class ControlConnection(object):
if local_row.get("schema_version"): if local_row.get("schema_version"):
versions[local_row.get("schema_version")].add(local_address) versions[local_row.get("schema_version")].add(local_address)
lbp = self._cluster.load_balancing_policy
for row in peers_result: for row in peers_result:
schema_ver = row.get('schema_version') schema_ver = row.get('schema_version')
if not schema_ver: if not schema_ver:
continue continue
addr = self._rpc_from_peer_row(row) addr = self._rpc_from_peer_row(row)
peer = self._cluster.metadata.get_host(addr) peer = self._cluster.metadata.get_host(addr)
if peer and peer.is_up: if peer and peer.is_up and lbp.distance(peer) != HostDistance.IGNORED:
versions[schema_ver].add(addr) versions[schema_ver].add(addr)
if len(versions) == 1: if len(versions) == 1: