Ensure that completed handlers are removed frequently

On a busy system it can happen that assignHandlers takes quite some
time (we saw occurrences of more than 10 minutes). Within this time no
node request is marked as fulfilled even if the nodes are there. A
possible solution is to return from assignHandlers frequently during
the iteration so we can remove completed handlers and then proceed
with assigning handlers.

Change-Id: I10f40504c81d532e6953d7af63c5c58fd5283573
This commit is contained in:
Tobias Henkel 2018-10-12 15:04:19 +02:00
parent 7111fcb407
commit 9296de9bf5
1 changed files with 24 additions and 6 deletions

View File

@ -143,21 +143,26 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
# Private methods # Private methods
# --------------------------------------------------------------- # ---------------------------------------------------------------
def _assignHandlers(self): def _assignHandlers(self, timeout=15):
''' '''
For each request we can grab, create a NodeRequestHandler for it. For each request we can grab, create a NodeRequestHandler for it.
The NodeRequestHandler object will kick off any threads needed to The NodeRequestHandler object will kick off any threads needed to
satisfy the request, then return. We will need to periodically poll satisfy the request, then return. We will need to periodically poll
the handler for completion. the handler for completion.
If exceeds the timeout it stops further iteration and returns False
in order to give us time to call _removeCompletedHandlers. Otherwise
it returns True to signal that it is finished for now.
''' '''
start = time.monotonic()
provider = self.getProviderConfig() provider = self.getProviderConfig()
if not provider: if not provider:
self.log.info("Missing config. Deleted provider?") self.log.info("Missing config. Deleted provider?")
return return True
if provider.max_concurrency == 0: if provider.max_concurrency == 0:
return return True
# Sort requests by queue priority, then, for all requests at # Sort requests by queue priority, then, for all requests at
# the same priority, use the relative_priority field to # the same priority, use the relative_priority field to
@ -168,8 +173,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
r.id.split('-')[1])) r.id.split('-')[1]))
for req in requests: for req in requests:
if not self.running:
return True
if self.paused_handler: if self.paused_handler:
return return True
# Get active threads for all pools for this provider # Get active threads for all pools for this provider
active_threads = sum([ active_threads = sum([
@ -183,7 +191,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
self.log.debug("Request handling limited: %s active threads ", self.log.debug("Request handling limited: %s active threads ",
"with max concurrency of %s", "with max concurrency of %s",
active_threads, provider.max_concurrency) active_threads, provider.max_concurrency)
return return True
req = self.zk.getNodeRequest(req.id) req = self.zk.getNodeRequest(req.id)
if not req: if not req:
@ -217,6 +225,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
self.paused_handler = rh self.paused_handler = rh
self.request_handlers.append(rh) self.request_handlers.append(rh)
# if we exceeded the timeout stop iterating here
if time.monotonic() - start > timeout:
return False
return True
def _removeCompletedHandlers(self): def _removeCompletedHandlers(self):
''' '''
Poll handlers to see which have completed. Poll handlers to see which have completed.
@ -305,7 +318,12 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
try: try:
if not self.paused_handler: if not self.paused_handler:
self._assignHandlers() while not self._assignHandlers():
# _assignHandlers can take quite some time on a busy
# system so sprinkle _removeCompletedHandlers in
# between such that we have a chance to fulfill
# requests that already have all nodes.
self._removeCompletedHandlers()
else: else:
# If we are paused, one request handler could not # If we are paused, one request handler could not
# satisfy its assigned request, so give it # satisfy its assigned request, so give it