Recover from broken process pool

The process pool used by the executor can get permanently broken if a
child process crashes or gets killed by oom. When this happens the
process pool rejects any further jobs and throws BrokenProcessPool
when submitting a job. Catch this exception and recreate the process
pool to recover from this condition.

Change-Id: I2a2064f6a8c259deb8321892cc08d309ff535306
This commit is contained in:
Tobias Henkel 2020-02-22 18:47:56 +01:00
parent 1324a3a2a8
commit 9d184174f7
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2

View File

@ -28,7 +28,7 @@ import tempfile
import threading
import time
import traceback
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor, BrokenProcessPool
import git
from urllib.parse import urlsplit
@ -847,6 +847,12 @@ class AnsibleJob(object):
self.executor_server.keep_jobdir,
str(self.job.unique))
self._execute()
except BrokenProcessPool:
# The process pool got broken, re-initialize it and send
# ABORTED so we re-try the job.
self.log.exception('Process pool got broken')
self.executor_server.resetProcessPool()
self._send_aborted()
except ExecutorError as e:
result_data = json.dumps(dict(result='ERROR',
error_detail=e.args[0]))
@ -2735,6 +2741,18 @@ class ExecutorServer(object):
except Exception:
self.log.exception("Exception in update thread:")
def resetProcessPool(self):
"""
This is called in order to re-initialize a broken process pool if it
got broken e.g. by an oom killed child process
"""
if self.process_worker:
try:
self.process_worker.shutdown()
except Exception:
self.log.exception('Failed to shutdown broken process worker')
self.process_worker = ProcessPoolExecutor()
def _innerUpdateLoop(self):
# Inside of a loop that keeps the main repositories up to date
task = self.update_queue.get()
@ -2764,6 +2782,11 @@ class ExecutorServer(object):
log.debug("Finished updating repo %s/%s",
task.connection_name, task.project_name)
task.success = True
except BrokenProcessPool:
# The process pool got broken. Reset it to unbreak it for further
# requests.
log.exception('Process pool got broken')
self.resetProcessPool()
except Exception:
log.exception('Got exception while updating repo %s/%s',
task.connection_name, task.project_name)