From 0c45d8899fafb98d4dee2d38c117d53c2c97b675 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Mon, 16 Oct 2017 11:44:43 -0700 Subject: [PATCH] Executor: stop jobs in parallel on shutdown When shutting down, tell all of the jobs to stop, then wait for all of them to complete. Also, add a lock around starting jobs so that the list of running jobs is guaranteed to be consistent. Also fix a shutdown sequencing bug. It was possible for us to shut down the internal repo update thread while jobs were still waiting on it. This would cause those jobs never to stop. Fix this by ensuring that we get no new jobs, then stop all the jobs, then stop the update thread. Because we want to wait for the update thread to complete, we join it in the stop() method. That seems to make the join() method redundant, so move all of the joins to stop(). Change-Id: I12422d3b445892be3e50963c9edafe3904698bff --- zuul/executor/server.py | 125 +++++++++++++++++++++++++++------------- 1 file changed, 86 insertions(+), 39 deletions(-) diff --git a/zuul/executor/server.py b/zuul/executor/server.py index fa9477d9c5..38640038dc 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -47,6 +47,11 @@ COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose', DEFAULT_FINGER_PORT = 79 +class StopException(Exception): + """An exception raised when an inner loop is asked to stop.""" + pass + + class ExecutorError(Exception): """A non-transient run-time executor error @@ -558,6 +563,8 @@ class AnsibleJob(object): self.aborted = True self.aborted_reason = reason self.abortRunningProc() + + def wait(self): if self.thread: self.thread.join() @@ -1530,6 +1537,7 @@ class ExecutorServer(object): self.hostname = socket.gethostname() self.log_streaming_port = log_streaming_port self.merger_lock = threading.Lock() + self.run_lock = threading.Lock() self.verbose = False self.command_map = dict( stop=self.stop, @@ -1665,8 +1673,9 @@ class ExecutorServer(object): self.merger_worker.registerFunction("merger:refstate") def register_work(self): - self.accepting_work = True - self.executor_worker.registerFunction("executor:execute") + if self._running: + self.accepting_work = True + self.executor_worker.registerFunction("executor:execute") def unregister_work(self): self.accepting_work = False @@ -1675,26 +1684,57 @@ class ExecutorServer(object): def stop(self): self.log.debug("Stopping") self.disk_accountant.stop() + # The governor can change function registration, so make sure + # it has stopped. self.governor_stop_event.set() - self._running = False - self._command_running = False + self.governor_thread.join() + # Stop accepting new jobs + self.merger_worker.setFunctions([]) + self.executor_worker.setFunctions([]) + # Tell the executor worker to abort any jobs it just accepted, + # and grab the list of currently running job workers. + with self.run_lock: + self._running = False + self._command_running = False + workers = list(self.job_workers.values()) self.command_socket.stop() - self.update_queue.put(None) - for job_worker in list(self.job_workers.values()): + for job_worker in workers: try: job_worker.stop() except Exception: self.log.exception("Exception sending stop command " "to worker:") + for job_worker in workers: + try: + job_worker.wait() + except Exception: + self.log.exception("Exception waiting for worker " + "to stop:") + + # Now that we aren't accepting any new jobs, and all of the + # running jobs have stopped, tell the update processor to + # stop. + self.update_queue.put(None) + + # All job results should have been sent by now, shutdown the + # gearman workers. self.merger_worker.shutdown() self.executor_worker.shutdown() + if self.statsd: base_key = 'zuul.executor.%s' % self.hostname self.statsd.gauge(base_key + '.load_average', 0) self.statsd.gauge(base_key + '.running_builds', 0) + self.log.debug("Stopped") + def join(self): + self.governor_thread.join() + self.update_thread.join() + self.merger_thread.join() + self.executor_thread.join() + def pause(self): # TODOv3: implement pass @@ -1719,12 +1759,6 @@ class ExecutorServer(object): def nokeep(self): self.keep_jobdir = False - def join(self): - self.update_thread.join() - self.merger_thread.join() - self.executor_thread.join() - self.governor_thread.join() - def runCommand(self): while self._command_running: try: @@ -1735,10 +1769,12 @@ class ExecutorServer(object): self.log.exception("Exception while processing command") def _updateLoop(self): - while self._running: + while True: try: self._innerUpdateLoop() - except: + except StopException: + return + except Exception: self.log.exception("Exception in update thread:") def _innerUpdateLoop(self): @@ -1746,7 +1782,7 @@ class ExecutorServer(object): task = self.update_queue.get() if task is None: # We are asked to stop - return + raise StopException() with self.merger_lock: self.log.info("Updating repo %s/%s" % ( task.connection_name, task.project_name)) @@ -1767,18 +1803,7 @@ class ExecutorServer(object): try: job = self.merger_worker.getJob() try: - if job.name == 'merger:cat': - self.log.debug("Got cat job: %s" % job.unique) - self.cat(job) - elif job.name == 'merger:merge': - self.log.debug("Got merge job: %s" % job.unique) - self.merge(job) - elif job.name == 'merger:refstate': - self.log.debug("Got refstate job: %s" % job.unique) - self.refstate(job) - else: - self.log.error("Unable to handle job %s" % job.name) - job.sendWorkFail() + self.mergerJobDispatch(job) except Exception: self.log.exception("Exception while running job") job.sendWorkException( @@ -1788,21 +1813,28 @@ class ExecutorServer(object): except Exception: self.log.exception("Exception while getting job") + def mergerJobDispatch(self, job): + with self.run_lock: + if job.name == 'merger:cat': + self.log.debug("Got cat job: %s" % job.unique) + self.cat(job) + elif job.name == 'merger:merge': + self.log.debug("Got merge job: %s" % job.unique) + self.merge(job) + elif job.name == 'merger:refstate': + self.log.debug("Got refstate job: %s" % job.unique) + self.refstate(job) + else: + self.log.error("Unable to handle job %s" % job.name) + job.sendWorkFail() + def run_executor(self): self.log.debug("Starting executor listener") while self._running: try: job = self.executor_worker.getJob() try: - if job.name == 'executor:execute': - self.log.debug("Got execute job: %s" % job.unique) - self.executeJob(job) - elif job.name.startswith('executor:stop'): - self.log.debug("Got stop job: %s" % job.unique) - self.stopJob(job) - else: - self.log.error("Unable to handle job %s" % job.name) - job.sendWorkFail() + self.executorJobDispatch(job) except Exception: self.log.exception("Exception while running job") job.sendWorkException( @@ -1812,9 +1844,20 @@ class ExecutorServer(object): except Exception: self.log.exception("Exception while getting job") - def run_governor(self): - while not self.governor_stop_event.wait(30): - self.manageLoad() + def executorJobDispatch(self, job): + with self.run_lock: + if not self._running: + job.sendWorkFail() + return + if job.name == 'executor:execute': + self.log.debug("Got execute job: %s" % job.unique) + self.executeJob(job) + elif job.name.startswith('executor:stop'): + self.log.debug("Got stop job: %s" % job.unique) + self.stopJob(job) + else: + self.log.error("Unable to handle job %s" % job.name) + job.sendWorkFail() def executeJob(self, job): if self.statsd: @@ -1823,6 +1866,10 @@ class ExecutorServer(object): self.job_workers[job.unique] = self._job_class(self, job) self.job_workers[job.unique].run() + def run_governor(self): + while not self.governor_stop_event.wait(30): + self.manageLoad() + def manageLoad(self): ''' Apply some heuristics to decide whether or not we should be askign for more jobs '''