Merge "Process paused requests first"

This commit is contained in:
Zuul 2023-03-15 13:37:04 +00:00 committed by Gerrit Code Review
commit d9af3f00ec
1 changed files with 20 additions and 13 deletions

View File

@ -325,6 +325,21 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
len(self.request_handlers))
self.log.debug("Active requests: %s", active_reqs)
def _process_paused_handlers(self):
if self.paused_handlers:
self.component_info.paused = True
# If we are paused, some request handlers could not
# satisfy its assigned request, so give it
# another shot. Unpause ourselves if all are completed.
for rh in sorted(self.paused_handlers,
key=lambda h: h.request.priority):
rh.run()
if not rh.paused:
self.paused_handlers.remove(rh)
if not self.paused_handlers:
self.component_info.paused = False
def _hasTenantQuota(self, request, provider_manager):
'''
Checks if a tenant has enough quota to handle a list of nodes.
@ -447,19 +462,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
self.updateTenantLimits(
self.nodepool.config.tenant_resource_limits)
if self.paused_handlers:
self.component_info.paused = True
# If we are paused, some request handlers could not
# satisfy its assigned request, so give it
# another shot. Unpause ourselves if all are completed.
for rh in sorted(self.paused_handlers,
key=lambda h: h.request.priority):
rh.run()
if not rh.paused:
self.paused_handlers.remove(rh)
if not self.paused_handlers:
self.component_info.paused = False
self._process_paused_handlers()
# Regardless of whether we are paused, run
# assignHandlers. It will only accept requests if we
@ -471,6 +474,10 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
# between such that we have a chance to fulfill
# requests that already have all nodes.
self._removeCompletedHandlers()
# To avoid pausing the handlers for a long time process
# them here as well.
self._process_paused_handlers()
self._removeCompletedHandlers()
except Exception:
self.log.exception("Error in PoolWorker:")