small refactor of python-734 to be less invasive in the api
This commit is contained in:
@@ -1395,8 +1395,10 @@ class Cluster(object):
|
|||||||
"""
|
"""
|
||||||
if self.is_shutdown:
|
if self.is_shutdown:
|
||||||
return
|
return
|
||||||
|
|
||||||
with host.lock:
|
with host.lock:
|
||||||
was_up = host.is_up
|
was_up = host.is_up
|
||||||
|
|
||||||
# ignore down signals if we have open pools to the host
|
# ignore down signals if we have open pools to the host
|
||||||
# this is to avoid closing pools when a control connection host became isolated
|
# this is to avoid closing pools when a control connection host became isolated
|
||||||
if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED:
|
if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED:
|
||||||
@@ -3132,7 +3134,7 @@ class ControlConnection(object):
|
|||||||
c = getattr(self, '_connection', None)
|
c = getattr(self, '_connection', None)
|
||||||
return [c] if c else []
|
return [c] if c else []
|
||||||
|
|
||||||
def return_connection(self, connection, mark_host_down=False): # noqa
|
def return_connection(self, connection):
|
||||||
if connection is self._connection and (connection.is_defunct or connection.is_closed):
|
if connection is self._connection and (connection.is_defunct or connection.is_closed):
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
|
|
||||||
|
|||||||
@@ -1007,7 +1007,10 @@ class ConnectionHeartbeat(Thread):
|
|||||||
for connection, owner, exc in failed_connections:
|
for connection, owner, exc in failed_connections:
|
||||||
self._raise_if_stopped()
|
self._raise_if_stopped()
|
||||||
connection.defunct(exc)
|
connection.defunct(exc)
|
||||||
owner.return_connection(connection, mark_host_down=True)
|
if not connection.is_control_connection:
|
||||||
|
# Only HostConnection supports shutdown_on_error
|
||||||
|
owner.shutdown_on_error = True
|
||||||
|
owner.return_connection(connection)
|
||||||
except self.ShutdownException:
|
except self.ShutdownException:
|
||||||
pass
|
pass
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|||||||
@@ -305,6 +305,7 @@ class HostConnection(object):
|
|||||||
host = None
|
host = None
|
||||||
host_distance = None
|
host_distance = None
|
||||||
is_shutdown = False
|
is_shutdown = False
|
||||||
|
shutdown_on_error = False
|
||||||
|
|
||||||
_session = None
|
_session = None
|
||||||
_connection = None
|
_connection = None
|
||||||
@@ -359,14 +360,14 @@ class HostConnection(object):
|
|||||||
|
|
||||||
raise NoConnectionsAvailable("All request IDs are currently in use")
|
raise NoConnectionsAvailable("All request IDs are currently in use")
|
||||||
|
|
||||||
def return_connection(self, connection, mark_host_down=False):
|
def return_connection(self, connection):
|
||||||
with connection.lock:
|
with connection.lock:
|
||||||
connection.in_flight -= 1
|
connection.in_flight -= 1
|
||||||
with self._stream_available_condition:
|
with self._stream_available_condition:
|
||||||
self._stream_available_condition.notify()
|
self._stream_available_condition.notify()
|
||||||
|
|
||||||
if connection.is_defunct or connection.is_closed:
|
if connection.is_defunct or connection.is_closed:
|
||||||
if connection.signaled_error and not mark_host_down:
|
if connection.signaled_error and not self.shutdown_on_error:
|
||||||
return
|
return
|
||||||
|
|
||||||
is_down = False
|
is_down = False
|
||||||
@@ -377,8 +378,7 @@ class HostConnection(object):
|
|||||||
self.host, connection.last_error, is_host_addition=False)
|
self.host, connection.last_error, is_host_addition=False)
|
||||||
connection.signaled_error = True
|
connection.signaled_error = True
|
||||||
|
|
||||||
# Force mark down a host on error, used by the ConnectionHeartbeat
|
if self.shutdown_on_error and not is_down:
|
||||||
if mark_host_down and not is_down:
|
|
||||||
is_down = True
|
is_down = True
|
||||||
self._session.cluster.on_down(self.host, is_host_addition=False)
|
self._session.cluster.on_down(self.host, is_host_addition=False)
|
||||||
|
|
||||||
@@ -395,7 +395,6 @@ class HostConnection(object):
|
|||||||
def _replace(self, connection):
|
def _replace(self, connection):
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if self.is_shutdown:
|
if self.is_shutdown:
|
||||||
self._is_replacing = False
|
|
||||||
return
|
return
|
||||||
|
|
||||||
log.debug("Replacing connection (%s) to %s", id(connection), self.host)
|
log.debug("Replacing connection (%s) to %s", id(connection), self.host)
|
||||||
@@ -642,7 +641,7 @@ class HostConnectionPool(object):
|
|||||||
|
|
||||||
raise NoConnectionsAvailable()
|
raise NoConnectionsAvailable()
|
||||||
|
|
||||||
def return_connection(self, connection, mark_host_down=False): #noqa
|
def return_connection(self, connection):
|
||||||
with connection.lock:
|
with connection.lock:
|
||||||
connection.in_flight -= 1
|
connection.in_flight -= 1
|
||||||
in_flight = connection.in_flight
|
in_flight = connection.in_flight
|
||||||
|
|||||||
Reference in New Issue
Block a user