From 2a507e7799ea15f93f26d25334f5bd8620bf91ab Mon Sep 17 00:00:00 2001 From: Adam Holmberg Date: Thu, 14 Apr 2016 11:57:11 -0500 Subject: [PATCH] make pool.HostConnection block on a number of concurrent requests This allows basic rate limiting for low-level integrations. PYTHON-514 --- cassandra/connection.py | 21 ++++++++++++++------- cassandra/pool.py | 23 +++++++++++++++++++---- tests/unit/test_connection.py | 2 +- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/cassandra/connection.py b/cassandra/connection.py index f292f4fa..5843622f 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -211,6 +211,12 @@ class Connection(object): # the number of request IDs that are currently in use. in_flight = 0 + # Max concurrent requests allowed per connection. This is set optimistically high, allowing + # all request ids to be used in protocol version 3+. Normally concurrency would be controlled + # at a higher level by the application or concurrent.execute_concurrent. This attribute + # is for lower-level integrations that want some upper bound without reimplementing. + max_in_flight = 2 ** 15 + # A set of available request IDs. When using the v3 protocol or higher, # this will not initially include all request IDs in order to save memory, # but the set will grow if it is exhausted. @@ -262,13 +268,14 @@ class Connection(object): self._iobuf = io.BytesIO() if protocol_version >= 3: - self.max_request_id = (2 ** 15) - 1 - # Don't fill the deque with 2**15 items right away. Start with 300 and add + self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1) + # Don't fill the deque with 2**15 items right away. Start with some and add # more if needed. - self.request_ids = deque(range(300)) - self.highest_request_id = 299 + initial_size = min(300, self.max_in_flight) + self.request_ids = deque(range(initial_size)) + self.highest_request_id = initial_size - 1 else: - self.max_request_id = (2 ** 7) - 1 + self.max_request_id = min(self.max_in_flight, (2 ** 7) - 1) self.request_ids = deque(range(self.max_request_id + 1)) self.highest_request_id = self.max_request_id @@ -464,7 +471,7 @@ class Connection(object): while True: needed = len(msgs) - messages_sent with self.lock: - available = min(needed, self.max_request_id - self.in_flight) + available = min(needed, self.max_request_id - self.in_flight + 1) request_ids = [self.get_request_id() for _ in range(available)] self.in_flight += available @@ -911,7 +918,7 @@ class HeartbeatFuture(object): log.debug("Sending options message heartbeat on idle connection (%s) %s", id(connection), connection.host) with connection.lock: - if connection.in_flight < connection.max_request_id: + if connection.in_flight <= connection.max_request_id: connection.in_flight += 1 connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback) else: diff --git a/cassandra/pool.py b/cassandra/pool.py index 3cb14e91..73c59211 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -296,6 +296,8 @@ class HostConnection(object): self.host_distance = host_distance self._session = weakref.proxy(session) self._lock = Lock() + # this is used in conjunction with the connection streams. Not using the connection lock because the connection can be replaced in the lifetime of the pool. + self._stream_available_condition = Condition(self._lock) self._is_replacing = False if host_distance == HostDistance.IGNORED: @@ -320,16 +322,27 @@ class HostConnection(object): if not conn: raise NoConnectionsAvailable() - with conn.lock: - if conn.in_flight < conn.max_request_id: - conn.in_flight += 1 - return conn, conn.get_request_id() + start = time.time() + remaining = timeout + while True: + with conn.lock: + if conn.in_flight <= conn.max_request_id: + conn.in_flight += 1 + return conn, conn.get_request_id() + if timeout is not None: + remaining = timeout - time.time() + start + if remaining < 0: + break + with self._stream_available_condition: + self._stream_available_condition.wait(remaining) raise NoConnectionsAvailable("All request IDs are currently in use") def return_connection(self, connection): with connection.lock: connection.in_flight -= 1 + with self._stream_available_condition: + self._stream_available_condition.notify() if (connection.is_defunct or connection.is_closed) and not connection.signaled_error: log.debug("Defunct or closed connection (%s) returned to pool, potentially " @@ -355,6 +368,7 @@ class HostConnection(object): self._connection = conn with self._lock: self._is_replacing = False + self._stream_available_condition.notify() def shutdown(self): with self._lock: @@ -362,6 +376,7 @@ class HostConnection(object): return else: self.is_shutdown = True + self._stream_available_condition.notify_all() if self._connection: self._connection.close() diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index 15fa6e72..843beda5 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -344,7 +344,7 @@ class ConnectionHeartbeatTest(unittest.TestCase): get_holders = self.make_get_holders(1) max_connection = Mock(spec=Connection, host='localhost', lock=Lock(), - max_request_id=in_flight, in_flight=in_flight, + max_request_id=in_flight - 1, in_flight=in_flight, is_idle=True, is_defunct=False, is_closed=False) holder = get_holders.return_value[0] holder.get_connections.return_value.append(max_connection)