Optionally error callbacks in thread when defuncting connections
Prevents the event loop thread from being tied up too long when there are many concurrent requests on a broken connection.
This commit is contained in:
@@ -121,7 +121,6 @@ class _Frame(object):
|
||||
|
||||
NONBLOCKING = (errno.EAGAIN, errno.EWOULDBLOCK)
|
||||
|
||||
|
||||
class ConnectionException(Exception):
|
||||
"""
|
||||
An unrecoverable error was hit when attempting to use a connection,
|
||||
@@ -187,6 +186,8 @@ else:
|
||||
|
||||
class Connection(object):
|
||||
|
||||
CALLBACK_ERR_THREAD_THRESHOLD = 100
|
||||
|
||||
in_buffer_size = 4096
|
||||
out_buffer_size = 4096
|
||||
|
||||
@@ -354,8 +355,12 @@ class Connection(object):
|
||||
with self.lock:
|
||||
requests = self._requests
|
||||
self._requests = {}
|
||||
|
||||
if not requests:
|
||||
return
|
||||
|
||||
new_exc = ConnectionShutdown(str(exc))
|
||||
for cb, _ in requests.values():
|
||||
def try_callback(cb):
|
||||
try:
|
||||
cb(new_exc)
|
||||
except Exception:
|
||||
@@ -363,6 +368,28 @@ class Connection(object):
|
||||
"failed connection (%s) to host %s:",
|
||||
id(self), self.host, exc_info=True)
|
||||
|
||||
# run first callback from this thread to ensure pool state before leaving
|
||||
cb, _ = requests.popitem()[1]
|
||||
try_callback(cb)
|
||||
|
||||
if not requests:
|
||||
return
|
||||
|
||||
# additional requests are optionally errored from a separate thread
|
||||
# The default callback and retry logic is fairly expensive -- we don't
|
||||
# want to tie up the event thread when there are many requests
|
||||
def err_all_callbacks():
|
||||
for cb, _ in requests.values():
|
||||
try_callback(cb)
|
||||
if len(requests) < Connection.CALLBACK_ERR_THREAD_THRESHOLD:
|
||||
err_all_callbacks()
|
||||
else:
|
||||
# daemon thread here because we want to stay decoupled from the cluster TPE
|
||||
# TODO: would it make sense to just have a driver-global TPE?
|
||||
t = Thread(target=err_all_callbacks)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
def get_request_id(self):
|
||||
"""
|
||||
This must be called while self.lock is held.
|
||||
|
||||
@@ -184,7 +184,7 @@ class HostConnectionPoolTests(unittest.TestCase):
|
||||
def test_return_defunct_connection_on_down_host(self):
|
||||
host = Mock(spec=Host, address='ip1')
|
||||
session = self.make_session()
|
||||
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False, max_request_id=100)
|
||||
conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False, max_request_id=100, signaled_error=False)
|
||||
session.cluster.connection_factory.return_value = conn
|
||||
|
||||
pool = HostConnectionPool(host, HostDistance.LOCAL, session)
|
||||
|
||||
Reference in New Issue
Block a user