diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 35e7445a..6bb40b7c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2814,6 +2814,7 @@ class ResponseFuture(object): """ Internal """ # query_plan is an iterator, so this will resume where we last left # off if send_request() is called multiple times + start = time.time() for host in self.query_plan: req_id = self._query(host) if req_id is not None: @@ -2825,6 +2826,9 @@ class ResponseFuture(object): if self._timer is None: self._start_timer() return + if self.timeout is not None and time.time() - start > self.timeout: + self._on_timeout() + return self._set_final_exception(NoHostAvailable( "Unable to complete the operation against any hosts", self._errors)) diff --git a/cassandra/connection.py b/cassandra/connection.py index 78a8bae2..5db48d01 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -211,6 +211,12 @@ class Connection(object): # the number of request IDs that are currently in use. in_flight = 0 + # Max concurrent requests allowed per connection. This is set optimistically high, allowing + # all request ids to be used in protocol version 3+. Normally concurrency would be controlled + # at a higher level by the application or concurrent.execute_concurrent. This attribute + # is for lower-level integrations that want some upper bound without reimplementing. + max_in_flight = 2 ** 15 + # A set of available request IDs. When using the v3 protocol or higher, # this will not initially include all request IDs in order to save memory, # but the set will grow if it is exhausted. @@ -260,13 +266,14 @@ class Connection(object): self._iobuf = io.BytesIO() if protocol_version >= 3: - self.max_request_id = (2 ** 15) - 1 - # Don't fill the deque with 2**15 items right away. Start with 300 and add + self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1) + # Don't fill the deque with 2**15 items right away. Start with some and add # more if needed. - self.request_ids = deque(range(300)) - self.highest_request_id = 299 + initial_size = min(300, self.max_in_flight) + self.request_ids = deque(range(initial_size)) + self.highest_request_id = initial_size - 1 else: - self.max_request_id = (2 ** 7) - 1 + self.max_request_id = min(self.max_in_flight, (2 ** 7) - 1) self.request_ids = deque(range(self.max_request_id + 1)) self.highest_request_id = self.max_request_id @@ -462,7 +469,7 @@ class Connection(object): while True: needed = len(msgs) - messages_sent with self.lock: - available = min(needed, self.max_request_id - self.in_flight) + available = min(needed, self.max_request_id - self.in_flight + 1) request_ids = [self.get_request_id() for _ in range(available)] self.in_flight += available @@ -897,7 +904,7 @@ class HeartbeatFuture(object): log.debug("Sending options message heartbeat on idle connection (%s) %s", id(connection), connection.host) with connection.lock: - if connection.in_flight < connection.max_request_id: + if connection.in_flight <= connection.max_request_id: connection.in_flight += 1 connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback) else: diff --git a/cassandra/pool.py b/cassandra/pool.py index 528c0a0c..134eb254 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -301,6 +301,8 @@ class HostConnection(object): self.host_distance = host_distance self._session = weakref.proxy(session) self._lock = Lock() + # this is used in conjunction with the connection streams. Not using the connection lock because the connection can be replaced in the lifetime of the pool. + self._stream_available_condition = Condition(self._lock) self._is_replacing = False if host_distance == HostDistance.IGNORED: @@ -325,16 +327,27 @@ class HostConnection(object): if not conn: raise NoConnectionsAvailable() - with conn.lock: - if conn.in_flight < conn.max_request_id: - conn.in_flight += 1 - return conn, conn.get_request_id() + start = time.time() + remaining = timeout + while True: + with conn.lock: + if conn.in_flight <= conn.max_request_id: + conn.in_flight += 1 + return conn, conn.get_request_id() + if timeout is not None: + remaining = timeout - time.time() + start + if remaining < 0: + break + with self._stream_available_condition: + self._stream_available_condition.wait(remaining) raise NoConnectionsAvailable("All request IDs are currently in use") def return_connection(self, connection): with connection.lock: connection.in_flight -= 1 + with self._stream_available_condition: + self._stream_available_condition.notify() if (connection.is_defunct or connection.is_closed) and not connection.signaled_error: log.debug("Defunct or closed connection (%s) returned to pool, potentially " @@ -365,6 +378,7 @@ class HostConnection(object): else: with self._lock: self._is_replacing = False + self._stream_available_condition.notify() def shutdown(self): with self._lock: @@ -372,6 +386,7 @@ class HostConnection(object): return else: self.is_shutdown = True + self._stream_available_condition.notify_all() if self._connection: self._connection.close() diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index dcec9cd7..2ac10a59 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -346,7 +346,7 @@ class ConnectionHeartbeatTest(unittest.TestCase): get_holders = self.make_get_holders(1) max_connection = Mock(spec=Connection, host='localhost', lock=Lock(), - max_request_id=in_flight, in_flight=in_flight, + max_request_id=in_flight - 1, in_flight=in_flight, is_idle=True, is_defunct=False, is_closed=False) holder = get_holders.return_value[0] holder.get_connections.return_value.append(max_connection)