Executor: stop jobs in parallel on shutdown

When shutting down, tell all of the jobs to stop, then wait for
all of them to complete.

Also, add a lock around starting jobs so that the list of running
jobs is guaranteed to be consistent.

Also fix a shutdown sequencing bug.  It was possible for us to shut down
the internal repo update thread while jobs were still waiting on it.  This
would cause those jobs never to stop.  Fix this by ensuring that we get
no new jobs, then stop all the jobs, then stop the update thread.

Because we want to wait for the update thread to complete, we join it in
the stop() method.  That seems to make the join() method redundant, so
move all of the joins to stop().

Change-Id: I12422d3b445892be3e50963c9edafe3904698bff
This commit is contained in:
James E. Blair 2017-10-16 11:44:43 -07:00
parent a86aaf1158
commit 0c45d8899f
1 changed files with 86 additions and 39 deletions

View File

@ -47,6 +47,11 @@ COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose',
DEFAULT_FINGER_PORT = 79 DEFAULT_FINGER_PORT = 79
class StopException(Exception):
"""An exception raised when an inner loop is asked to stop."""
pass
class ExecutorError(Exception): class ExecutorError(Exception):
"""A non-transient run-time executor error """A non-transient run-time executor error
@ -558,6 +563,8 @@ class AnsibleJob(object):
self.aborted = True self.aborted = True
self.aborted_reason = reason self.aborted_reason = reason
self.abortRunningProc() self.abortRunningProc()
def wait(self):
if self.thread: if self.thread:
self.thread.join() self.thread.join()
@ -1530,6 +1537,7 @@ class ExecutorServer(object):
self.hostname = socket.gethostname() self.hostname = socket.gethostname()
self.log_streaming_port = log_streaming_port self.log_streaming_port = log_streaming_port
self.merger_lock = threading.Lock() self.merger_lock = threading.Lock()
self.run_lock = threading.Lock()
self.verbose = False self.verbose = False
self.command_map = dict( self.command_map = dict(
stop=self.stop, stop=self.stop,
@ -1665,8 +1673,9 @@ class ExecutorServer(object):
self.merger_worker.registerFunction("merger:refstate") self.merger_worker.registerFunction("merger:refstate")
def register_work(self): def register_work(self):
self.accepting_work = True if self._running:
self.executor_worker.registerFunction("executor:execute") self.accepting_work = True
self.executor_worker.registerFunction("executor:execute")
def unregister_work(self): def unregister_work(self):
self.accepting_work = False self.accepting_work = False
@ -1675,26 +1684,57 @@ class ExecutorServer(object):
def stop(self): def stop(self):
self.log.debug("Stopping") self.log.debug("Stopping")
self.disk_accountant.stop() self.disk_accountant.stop()
# The governor can change function registration, so make sure
# it has stopped.
self.governor_stop_event.set() self.governor_stop_event.set()
self._running = False self.governor_thread.join()
self._command_running = False # Stop accepting new jobs
self.merger_worker.setFunctions([])
self.executor_worker.setFunctions([])
# Tell the executor worker to abort any jobs it just accepted,
# and grab the list of currently running job workers.
with self.run_lock:
self._running = False
self._command_running = False
workers = list(self.job_workers.values())
self.command_socket.stop() self.command_socket.stop()
self.update_queue.put(None)
for job_worker in list(self.job_workers.values()): for job_worker in workers:
try: try:
job_worker.stop() job_worker.stop()
except Exception: except Exception:
self.log.exception("Exception sending stop command " self.log.exception("Exception sending stop command "
"to worker:") "to worker:")
for job_worker in workers:
try:
job_worker.wait()
except Exception:
self.log.exception("Exception waiting for worker "
"to stop:")
# Now that we aren't accepting any new jobs, and all of the
# running jobs have stopped, tell the update processor to
# stop.
self.update_queue.put(None)
# All job results should have been sent by now, shutdown the
# gearman workers.
self.merger_worker.shutdown() self.merger_worker.shutdown()
self.executor_worker.shutdown() self.executor_worker.shutdown()
if self.statsd: if self.statsd:
base_key = 'zuul.executor.%s' % self.hostname base_key = 'zuul.executor.%s' % self.hostname
self.statsd.gauge(base_key + '.load_average', 0) self.statsd.gauge(base_key + '.load_average', 0)
self.statsd.gauge(base_key + '.running_builds', 0) self.statsd.gauge(base_key + '.running_builds', 0)
self.log.debug("Stopped") self.log.debug("Stopped")
def join(self):
self.governor_thread.join()
self.update_thread.join()
self.merger_thread.join()
self.executor_thread.join()
def pause(self): def pause(self):
# TODOv3: implement # TODOv3: implement
pass pass
@ -1719,12 +1759,6 @@ class ExecutorServer(object):
def nokeep(self): def nokeep(self):
self.keep_jobdir = False self.keep_jobdir = False
def join(self):
self.update_thread.join()
self.merger_thread.join()
self.executor_thread.join()
self.governor_thread.join()
def runCommand(self): def runCommand(self):
while self._command_running: while self._command_running:
try: try:
@ -1735,10 +1769,12 @@ class ExecutorServer(object):
self.log.exception("Exception while processing command") self.log.exception("Exception while processing command")
def _updateLoop(self): def _updateLoop(self):
while self._running: while True:
try: try:
self._innerUpdateLoop() self._innerUpdateLoop()
except: except StopException:
return
except Exception:
self.log.exception("Exception in update thread:") self.log.exception("Exception in update thread:")
def _innerUpdateLoop(self): def _innerUpdateLoop(self):
@ -1746,7 +1782,7 @@ class ExecutorServer(object):
task = self.update_queue.get() task = self.update_queue.get()
if task is None: if task is None:
# We are asked to stop # We are asked to stop
return raise StopException()
with self.merger_lock: with self.merger_lock:
self.log.info("Updating repo %s/%s" % ( self.log.info("Updating repo %s/%s" % (
task.connection_name, task.project_name)) task.connection_name, task.project_name))
@ -1767,18 +1803,7 @@ class ExecutorServer(object):
try: try:
job = self.merger_worker.getJob() job = self.merger_worker.getJob()
try: try:
if job.name == 'merger:cat': self.mergerJobDispatch(job)
self.log.debug("Got cat job: %s" % job.unique)
self.cat(job)
elif job.name == 'merger:merge':
self.log.debug("Got merge job: %s" % job.unique)
self.merge(job)
elif job.name == 'merger:refstate':
self.log.debug("Got refstate job: %s" % job.unique)
self.refstate(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
except Exception: except Exception:
self.log.exception("Exception while running job") self.log.exception("Exception while running job")
job.sendWorkException( job.sendWorkException(
@ -1788,21 +1813,28 @@ class ExecutorServer(object):
except Exception: except Exception:
self.log.exception("Exception while getting job") self.log.exception("Exception while getting job")
def mergerJobDispatch(self, job):
with self.run_lock:
if job.name == 'merger:cat':
self.log.debug("Got cat job: %s" % job.unique)
self.cat(job)
elif job.name == 'merger:merge':
self.log.debug("Got merge job: %s" % job.unique)
self.merge(job)
elif job.name == 'merger:refstate':
self.log.debug("Got refstate job: %s" % job.unique)
self.refstate(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
def run_executor(self): def run_executor(self):
self.log.debug("Starting executor listener") self.log.debug("Starting executor listener")
while self._running: while self._running:
try: try:
job = self.executor_worker.getJob() job = self.executor_worker.getJob()
try: try:
if job.name == 'executor:execute': self.executorJobDispatch(job)
self.log.debug("Got execute job: %s" % job.unique)
self.executeJob(job)
elif job.name.startswith('executor:stop'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
except Exception: except Exception:
self.log.exception("Exception while running job") self.log.exception("Exception while running job")
job.sendWorkException( job.sendWorkException(
@ -1812,9 +1844,20 @@ class ExecutorServer(object):
except Exception: except Exception:
self.log.exception("Exception while getting job") self.log.exception("Exception while getting job")
def run_governor(self): def executorJobDispatch(self, job):
while not self.governor_stop_event.wait(30): with self.run_lock:
self.manageLoad() if not self._running:
job.sendWorkFail()
return
if job.name == 'executor:execute':
self.log.debug("Got execute job: %s" % job.unique)
self.executeJob(job)
elif job.name.startswith('executor:stop'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
def executeJob(self, job): def executeJob(self, job):
if self.statsd: if self.statsd:
@ -1823,6 +1866,10 @@ class ExecutorServer(object):
self.job_workers[job.unique] = self._job_class(self, job) self.job_workers[job.unique] = self._job_class(self, job)
self.job_workers[job.unique].run() self.job_workers[job.unique].run()
def run_governor(self):
while not self.governor_stop_event.wait(30):
self.manageLoad()
def manageLoad(self): def manageLoad(self):
''' Apply some heuristics to decide whether or not we should ''' Apply some heuristics to decide whether or not we should
be askign for more jobs ''' be askign for more jobs '''