Merge pull request #764 from datastax/python-734

PYTHON-734: ConnectionHeartbeat should mark down HostConnection on OTO
This commit is contained in:
Jaume Marhuenda
2017-05-18 15:42:18 -04:00
committed by GitHub
3 changed files with 26 additions and 12 deletions

View File

@@ -1395,10 +1395,8 @@ class Cluster(object):
""" """
if self.is_shutdown: if self.is_shutdown:
return return
with host.lock: with host.lock:
was_up = host.is_up was_up = host.is_up
# ignore down signals if we have open pools to the host # ignore down signals if we have open pools to the host
# this is to avoid closing pools when a control connection host became isolated # this is to avoid closing pools when a control connection host became isolated
if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED: if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED:
@@ -3134,7 +3132,7 @@ class ControlConnection(object):
c = getattr(self, '_connection', None) c = getattr(self, '_connection', None)
return [c] if c else [] return [c] if c else []
def return_connection(self, connection): def return_connection(self, connection, mark_host_down=False): # noqa
if connection is self._connection and (connection.is_defunct or connection.is_closed): if connection is self._connection and (connection.is_defunct or connection.is_closed):
self.reconnect() self.reconnect()

View File

@@ -1007,7 +1007,7 @@ class ConnectionHeartbeat(Thread):
for connection, owner, exc in failed_connections: for connection, owner, exc in failed_connections:
self._raise_if_stopped() self._raise_if_stopped()
connection.defunct(exc) connection.defunct(exc)
owner.return_connection(connection) owner.return_connection(connection, mark_host_down=True)
except self.ShutdownException: except self.ShutdownException:
pass pass
except Exception: except Exception:

View File

@@ -359,18 +359,29 @@ class HostConnection(object):
raise NoConnectionsAvailable("All request IDs are currently in use") raise NoConnectionsAvailable("All request IDs are currently in use")
def return_connection(self, connection): def return_connection(self, connection, mark_host_down=False):
with connection.lock: with connection.lock:
connection.in_flight -= 1 connection.in_flight -= 1
with self._stream_available_condition: with self._stream_available_condition:
self._stream_available_condition.notify() self._stream_available_condition.notify()
if (connection.is_defunct or connection.is_closed) and not connection.signaled_error: if connection.is_defunct or connection.is_closed:
log.debug("Defunct or closed connection (%s) returned to pool, potentially " if connection.signaled_error and not mark_host_down:
"marking host %s as down", id(connection), self.host) return
is_down = self._session.cluster.signal_connection_failure(
self.host, connection.last_error, is_host_addition=False) is_down = False
connection.signaled_error = True if not connection.signaled_error:
log.debug("Defunct or closed connection (%s) returned to pool, potentially "
"marking host %s as down", id(connection), self.host)
is_down = self._session.cluster.signal_connection_failure(
self.host, connection.last_error, is_host_addition=False)
connection.signaled_error = True
# Force mark down a host on error, used by the ConnectionHeartbeat
if mark_host_down and not is_down:
is_down = True
self._session.cluster.on_down(self.host, is_host_addition=False)
if is_down: if is_down:
self.shutdown() self.shutdown()
else: else:
@@ -382,6 +393,11 @@ class HostConnection(object):
self._session.submit(self._replace, connection) self._session.submit(self._replace, connection)
def _replace(self, connection): def _replace(self, connection):
with self._lock:
if self.is_shutdown:
self._is_replacing = False
return
log.debug("Replacing connection (%s) to %s", id(connection), self.host) log.debug("Replacing connection (%s) to %s", id(connection), self.host)
try: try:
conn = self._session.cluster.connection_factory(self.host.address) conn = self._session.cluster.connection_factory(self.host.address)
@@ -626,7 +642,7 @@ class HostConnectionPool(object):
raise NoConnectionsAvailable() raise NoConnectionsAvailable()
def return_connection(self, connection): def return_connection(self, connection, mark_host_down=False): #noqa
with connection.lock: with connection.lock:
connection.in_flight -= 1 connection.in_flight -= 1
in_flight = connection.in_flight in_flight = connection.in_flight