make pool.HostConnection block on a number of concurrent requests
This allows basic rate limiting for low-level integrations. PYTHON-514
This commit is contained in:
@@ -211,6 +211,12 @@ class Connection(object):
|
|||||||
# the number of request IDs that are currently in use.
|
# the number of request IDs that are currently in use.
|
||||||
in_flight = 0
|
in_flight = 0
|
||||||
|
|
||||||
|
# Max concurrent requests allowed per connection. This is set optimistically high, allowing
|
||||||
|
# all request ids to be used in protocol version 3+. Normally concurrency would be controlled
|
||||||
|
# at a higher level by the application or concurrent.execute_concurrent. This attribute
|
||||||
|
# is for lower-level integrations that want some upper bound without reimplementing.
|
||||||
|
max_in_flight = 2 ** 15
|
||||||
|
|
||||||
# A set of available request IDs. When using the v3 protocol or higher,
|
# A set of available request IDs. When using the v3 protocol or higher,
|
||||||
# this will not initially include all request IDs in order to save memory,
|
# this will not initially include all request IDs in order to save memory,
|
||||||
# but the set will grow if it is exhausted.
|
# but the set will grow if it is exhausted.
|
||||||
@@ -262,13 +268,14 @@ class Connection(object):
|
|||||||
self._iobuf = io.BytesIO()
|
self._iobuf = io.BytesIO()
|
||||||
|
|
||||||
if protocol_version >= 3:
|
if protocol_version >= 3:
|
||||||
self.max_request_id = (2 ** 15) - 1
|
self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1)
|
||||||
# Don't fill the deque with 2**15 items right away. Start with 300 and add
|
# Don't fill the deque with 2**15 items right away. Start with some and add
|
||||||
# more if needed.
|
# more if needed.
|
||||||
self.request_ids = deque(range(300))
|
initial_size = min(300, self.max_in_flight)
|
||||||
self.highest_request_id = 299
|
self.request_ids = deque(range(initial_size))
|
||||||
|
self.highest_request_id = initial_size - 1
|
||||||
else:
|
else:
|
||||||
self.max_request_id = (2 ** 7) - 1
|
self.max_request_id = min(self.max_in_flight, (2 ** 7) - 1)
|
||||||
self.request_ids = deque(range(self.max_request_id + 1))
|
self.request_ids = deque(range(self.max_request_id + 1))
|
||||||
self.highest_request_id = self.max_request_id
|
self.highest_request_id = self.max_request_id
|
||||||
|
|
||||||
@@ -464,7 +471,7 @@ class Connection(object):
|
|||||||
while True:
|
while True:
|
||||||
needed = len(msgs) - messages_sent
|
needed = len(msgs) - messages_sent
|
||||||
with self.lock:
|
with self.lock:
|
||||||
available = min(needed, self.max_request_id - self.in_flight)
|
available = min(needed, self.max_request_id - self.in_flight + 1)
|
||||||
request_ids = [self.get_request_id() for _ in range(available)]
|
request_ids = [self.get_request_id() for _ in range(available)]
|
||||||
self.in_flight += available
|
self.in_flight += available
|
||||||
|
|
||||||
@@ -911,7 +918,7 @@ class HeartbeatFuture(object):
|
|||||||
log.debug("Sending options message heartbeat on idle connection (%s) %s",
|
log.debug("Sending options message heartbeat on idle connection (%s) %s",
|
||||||
id(connection), connection.host)
|
id(connection), connection.host)
|
||||||
with connection.lock:
|
with connection.lock:
|
||||||
if connection.in_flight < connection.max_request_id:
|
if connection.in_flight <= connection.max_request_id:
|
||||||
connection.in_flight += 1
|
connection.in_flight += 1
|
||||||
connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback)
|
connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback)
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -296,6 +296,8 @@ class HostConnection(object):
|
|||||||
self.host_distance = host_distance
|
self.host_distance = host_distance
|
||||||
self._session = weakref.proxy(session)
|
self._session = weakref.proxy(session)
|
||||||
self._lock = Lock()
|
self._lock = Lock()
|
||||||
|
# this is used in conjunction with the connection streams. Not using the connection lock because the connection can be replaced in the lifetime of the pool.
|
||||||
|
self._stream_available_condition = Condition(self._lock)
|
||||||
self._is_replacing = False
|
self._is_replacing = False
|
||||||
|
|
||||||
if host_distance == HostDistance.IGNORED:
|
if host_distance == HostDistance.IGNORED:
|
||||||
@@ -320,16 +322,27 @@ class HostConnection(object):
|
|||||||
if not conn:
|
if not conn:
|
||||||
raise NoConnectionsAvailable()
|
raise NoConnectionsAvailable()
|
||||||
|
|
||||||
with conn.lock:
|
start = time.time()
|
||||||
if conn.in_flight < conn.max_request_id:
|
remaining = timeout
|
||||||
conn.in_flight += 1
|
while True:
|
||||||
return conn, conn.get_request_id()
|
with conn.lock:
|
||||||
|
if conn.in_flight <= conn.max_request_id:
|
||||||
|
conn.in_flight += 1
|
||||||
|
return conn, conn.get_request_id()
|
||||||
|
if timeout is not None:
|
||||||
|
remaining = timeout - time.time() + start
|
||||||
|
if remaining < 0:
|
||||||
|
break
|
||||||
|
with self._stream_available_condition:
|
||||||
|
self._stream_available_condition.wait(remaining)
|
||||||
|
|
||||||
raise NoConnectionsAvailable("All request IDs are currently in use")
|
raise NoConnectionsAvailable("All request IDs are currently in use")
|
||||||
|
|
||||||
def return_connection(self, connection):
|
def return_connection(self, connection):
|
||||||
with connection.lock:
|
with connection.lock:
|
||||||
connection.in_flight -= 1
|
connection.in_flight -= 1
|
||||||
|
with self._stream_available_condition:
|
||||||
|
self._stream_available_condition.notify()
|
||||||
|
|
||||||
if (connection.is_defunct or connection.is_closed) and not connection.signaled_error:
|
if (connection.is_defunct or connection.is_closed) and not connection.signaled_error:
|
||||||
log.debug("Defunct or closed connection (%s) returned to pool, potentially "
|
log.debug("Defunct or closed connection (%s) returned to pool, potentially "
|
||||||
@@ -355,6 +368,7 @@ class HostConnection(object):
|
|||||||
self._connection = conn
|
self._connection = conn
|
||||||
with self._lock:
|
with self._lock:
|
||||||
self._is_replacing = False
|
self._is_replacing = False
|
||||||
|
self._stream_available_condition.notify()
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
with self._lock:
|
with self._lock:
|
||||||
@@ -362,6 +376,7 @@ class HostConnection(object):
|
|||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
self.is_shutdown = True
|
self.is_shutdown = True
|
||||||
|
self._stream_available_condition.notify_all()
|
||||||
|
|
||||||
if self._connection:
|
if self._connection:
|
||||||
self._connection.close()
|
self._connection.close()
|
||||||
|
|||||||
@@ -344,7 +344,7 @@ class ConnectionHeartbeatTest(unittest.TestCase):
|
|||||||
get_holders = self.make_get_holders(1)
|
get_holders = self.make_get_holders(1)
|
||||||
max_connection = Mock(spec=Connection, host='localhost',
|
max_connection = Mock(spec=Connection, host='localhost',
|
||||||
lock=Lock(),
|
lock=Lock(),
|
||||||
max_request_id=in_flight, in_flight=in_flight,
|
max_request_id=in_flight - 1, in_flight=in_flight,
|
||||||
is_idle=True, is_defunct=False, is_closed=False)
|
is_idle=True, is_defunct=False, is_closed=False)
|
||||||
holder = get_holders.return_value[0]
|
holder = get_holders.return_value[0]
|
||||||
holder.get_connections.return_value.append(max_connection)
|
holder.get_connections.return_value.append(max_connection)
|
||||||
|
|||||||
Reference in New Issue
Block a user