Process paused requests first

We observed a starvation problem in the following scenario leading to
gaps in request processing of several minutes.

Scenario:
- there are many pending node request
- a request handler gets paused by running into quota

In this case nodepool loops over all node requests while deferring all
requests due to the paused handler until it retries the paused
handler. In our case this took 10 minutes until it unpaused and
continued normal work normally. As long as the request queue is long
this starts over as soon as it reaches the cloud quota again.

The paused handlers are processed before looping through the node
requests. Similar to fixing the starvation problem of removing
completed handlers we process them within the loop as well.

Change-Id: Iadacd4969c883574d8947e8ab2313e42820cb298
This commit is contained in:
Tobias Henkel 2023-03-08 17:39:11 +01:00 committed by James E. Blair
parent 06e5d2f843
commit 1ed2b855c8
1 changed files with 20 additions and 13 deletions

View File

@ -325,6 +325,21 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
len(self.request_handlers))
self.log.debug("Active requests: %s", active_reqs)
def _process_paused_handlers(self):
if self.paused_handlers:
self.component_info.paused = True
# If we are paused, some request handlers could not
# satisfy its assigned request, so give it
# another shot. Unpause ourselves if all are completed.
for rh in sorted(self.paused_handlers,
key=lambda h: h.request.priority):
rh.run()
if not rh.paused:
self.paused_handlers.remove(rh)
if not self.paused_handlers:
self.component_info.paused = False
def _hasTenantQuota(self, request, provider_manager):
'''
Checks if a tenant has enough quota to handle a list of nodes.
@ -447,19 +462,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
self.updateTenantLimits(
self.nodepool.config.tenant_resource_limits)
if self.paused_handlers:
self.component_info.paused = True
# If we are paused, some request handlers could not
# satisfy its assigned request, so give it
# another shot. Unpause ourselves if all are completed.
for rh in sorted(self.paused_handlers,
key=lambda h: h.request.priority):
rh.run()
if not rh.paused:
self.paused_handlers.remove(rh)
if not self.paused_handlers:
self.component_info.paused = False
self._process_paused_handlers()
# Regardless of whether we are paused, run
# assignHandlers. It will only accept requests if we
@ -471,6 +474,10 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
# between such that we have a chance to fulfill
# requests that already have all nodes.
self._removeCompletedHandlers()
# To avoid pausing the handlers for a long time process
# them here as well.
self._process_paused_handlers()
self._removeCompletedHandlers()
except Exception:
self.log.exception("Error in PoolWorker:")