diff --git a/cassandra/connection.py b/cassandra/connection.py index 4ca458bf..6f60025b 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -121,7 +121,6 @@ class _Frame(object): NONBLOCKING = (errno.EAGAIN, errno.EWOULDBLOCK) - class ConnectionException(Exception): """ An unrecoverable error was hit when attempting to use a connection, @@ -187,6 +186,8 @@ else: class Connection(object): + CALLBACK_ERR_THREAD_THRESHOLD = 100 + in_buffer_size = 4096 out_buffer_size = 4096 @@ -354,8 +355,12 @@ class Connection(object): with self.lock: requests = self._requests self._requests = {} + + if not requests: + return + new_exc = ConnectionShutdown(str(exc)) - for cb, _ in requests.values(): + def try_callback(cb): try: cb(new_exc) except Exception: @@ -363,6 +368,28 @@ class Connection(object): "failed connection (%s) to host %s:", id(self), self.host, exc_info=True) + # run first callback from this thread to ensure pool state before leaving + cb, _ = requests.popitem()[1] + try_callback(cb) + + if not requests: + return + + # additional requests are optionally errored from a separate thread + # The default callback and retry logic is fairly expensive -- we don't + # want to tie up the event thread when there are many requests + def err_all_callbacks(): + for cb, _ in requests.values(): + try_callback(cb) + if len(requests) < Connection.CALLBACK_ERR_THREAD_THRESHOLD: + err_all_callbacks() + else: + # daemon thread here because we want to stay decoupled from the cluster TPE + # TODO: would it make sense to just have a driver-global TPE? + t = Thread(target=err_all_callbacks) + t.daemon = True + t.start() + def get_request_id(self): """ This must be called while self.lock is held. diff --git a/tests/unit/test_host_connection_pool.py b/tests/unit/test_host_connection_pool.py index fec889e9..79391a2a 100644 --- a/tests/unit/test_host_connection_pool.py +++ b/tests/unit/test_host_connection_pool.py @@ -184,7 +184,7 @@ class HostConnectionPoolTests(unittest.TestCase): def test_return_defunct_connection_on_down_host(self): host = Mock(spec=Host, address='ip1') session = self.make_session() - conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False, max_request_id=100) + conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False, max_request_id=100, signaled_error=False) session.cluster.connection_factory.return_value = conn pool = HostConnectionPool(host, HostDistance.LOCAL, session)