Make assignHandlers a generator

Our intent with assignHandlers is to run it almost continuously,
but in case it is slow, to occasionally remove completed handlers
and run paused handlers in order to make room for new requests
which may otherwise be starved.

The way that currently works is that we stop processing requests
after 15 seconds, allow the other methods to run, then start over.
If this happens continuously, we may never see requests near the
bottom of the list (which we might be able to satisfy).  To avoid
that, this change turns assignHandlers into a generator which
picks up where it left off each time we yield to the other
processors.

Change-Id: I32096ae7342cc8aafd2a14de79acc4267293349a
This commit is contained in:
James E. Blair 2023-03-09 13:24:54 -08:00
parent fada5d9edf
commit 70f143690d
1 changed files with 16 additions and 13 deletions

View File

@ -97,26 +97,28 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
# ---------------------------------------------------------------
def _assignHandlers(self, timeout=15):
'''
For each request we can grab, create a NodeRequestHandler for it.
'''For each request we can grab, create a NodeRequestHandler for it.
The NodeRequestHandler object will kick off any threads needed to
satisfy the request, then return. We will need to periodically poll
the handler for completion.
If exceeds the timeout it stops further iteration and returns False
in order to give us time to call _removeCompletedHandlers. Otherwise
it returns True to signal that it is finished for now.
This is implemented as a generator so if it exceeds the
timeout it yields in order to give us time to call
_removeCompletedHandlers, then will resume in the next
iteration.
'''
self.log.debug("Starting handler assignment")
start = time.monotonic()
overall_start = start
provider = self.getProviderConfig()
if not provider:
self.log.info("Missing config. Deleted provider?")
return True
return
if provider.max_concurrency == 0:
return True
return
# Get the launchers which are currently online. This may
# become out of date as the loop progresses, but it should be
@ -146,15 +148,17 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
requests = sorted(self.zk.nodeRequestIterator(), key=_sort_key)
for req_count, req in enumerate(requests):
if not self.running:
return True
return
# if we exceeded the timeout stop iterating here
elapsed = time.monotonic() - start
if elapsed > timeout:
self.log.debug("Early exit from handler assignment on timeout "
self.log.debug("Yield from handler assignment on timeout "
"after %s/%s requests in %s",
req_count + 1, len(requests), elapsed)
return False
yield
start = time.monotonic()
self.log.debug("Resuming handler assignment")
# Only interested in unhandled requests
if req.state != zk.REQUESTED:
@ -288,10 +292,9 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
self.paused_handlers.add(rh)
self.request_handlers.append(rh)
elapsed = time.monotonic() - start
elapsed = time.monotonic() - overall_start
self.log.debug("Finished handler assignment %s requests in %s",
len(requests), elapsed)
return True
def _removeCompletedHandlers(self):
'''
@ -470,7 +473,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
# assignHandlers. It will only accept requests if we
# are unpaused, otherwise it will only touch requests
# we intend to decline.
while not self._assignHandlers():
for chunk in self._assignHandlers():
# _assignHandlers can take quite some time on a busy
# system so sprinkle _removeCompletedHandlers in
# between such that we have a chance to fulfill