diff --git a/zuul/executor/server.py b/zuul/executor/server.py index fa9477d9c5..38640038dc 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -47,6 +47,11 @@ COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose', DEFAULT_FINGER_PORT = 79 +class StopException(Exception): + """An exception raised when an inner loop is asked to stop.""" + pass + + class ExecutorError(Exception): """A non-transient run-time executor error @@ -558,6 +563,8 @@ class AnsibleJob(object): self.aborted = True self.aborted_reason = reason self.abortRunningProc() + + def wait(self): if self.thread: self.thread.join() @@ -1530,6 +1537,7 @@ class ExecutorServer(object): self.hostname = socket.gethostname() self.log_streaming_port = log_streaming_port self.merger_lock = threading.Lock() + self.run_lock = threading.Lock() self.verbose = False self.command_map = dict( stop=self.stop, @@ -1665,8 +1673,9 @@ class ExecutorServer(object): self.merger_worker.registerFunction("merger:refstate") def register_work(self): - self.accepting_work = True - self.executor_worker.registerFunction("executor:execute") + if self._running: + self.accepting_work = True + self.executor_worker.registerFunction("executor:execute") def unregister_work(self): self.accepting_work = False @@ -1675,26 +1684,57 @@ class ExecutorServer(object): def stop(self): self.log.debug("Stopping") self.disk_accountant.stop() + # The governor can change function registration, so make sure + # it has stopped. self.governor_stop_event.set() - self._running = False - self._command_running = False + self.governor_thread.join() + # Stop accepting new jobs + self.merger_worker.setFunctions([]) + self.executor_worker.setFunctions([]) + # Tell the executor worker to abort any jobs it just accepted, + # and grab the list of currently running job workers. + with self.run_lock: + self._running = False + self._command_running = False + workers = list(self.job_workers.values()) self.command_socket.stop() - self.update_queue.put(None) - for job_worker in list(self.job_workers.values()): + for job_worker in workers: try: job_worker.stop() except Exception: self.log.exception("Exception sending stop command " "to worker:") + for job_worker in workers: + try: + job_worker.wait() + except Exception: + self.log.exception("Exception waiting for worker " + "to stop:") + + # Now that we aren't accepting any new jobs, and all of the + # running jobs have stopped, tell the update processor to + # stop. + self.update_queue.put(None) + + # All job results should have been sent by now, shutdown the + # gearman workers. self.merger_worker.shutdown() self.executor_worker.shutdown() + if self.statsd: base_key = 'zuul.executor.%s' % self.hostname self.statsd.gauge(base_key + '.load_average', 0) self.statsd.gauge(base_key + '.running_builds', 0) + self.log.debug("Stopped") + def join(self): + self.governor_thread.join() + self.update_thread.join() + self.merger_thread.join() + self.executor_thread.join() + def pause(self): # TODOv3: implement pass @@ -1719,12 +1759,6 @@ class ExecutorServer(object): def nokeep(self): self.keep_jobdir = False - def join(self): - self.update_thread.join() - self.merger_thread.join() - self.executor_thread.join() - self.governor_thread.join() - def runCommand(self): while self._command_running: try: @@ -1735,10 +1769,12 @@ class ExecutorServer(object): self.log.exception("Exception while processing command") def _updateLoop(self): - while self._running: + while True: try: self._innerUpdateLoop() - except: + except StopException: + return + except Exception: self.log.exception("Exception in update thread:") def _innerUpdateLoop(self): @@ -1746,7 +1782,7 @@ class ExecutorServer(object): task = self.update_queue.get() if task is None: # We are asked to stop - return + raise StopException() with self.merger_lock: self.log.info("Updating repo %s/%s" % ( task.connection_name, task.project_name)) @@ -1767,18 +1803,7 @@ class ExecutorServer(object): try: job = self.merger_worker.getJob() try: - if job.name == 'merger:cat': - self.log.debug("Got cat job: %s" % job.unique) - self.cat(job) - elif job.name == 'merger:merge': - self.log.debug("Got merge job: %s" % job.unique) - self.merge(job) - elif job.name == 'merger:refstate': - self.log.debug("Got refstate job: %s" % job.unique) - self.refstate(job) - else: - self.log.error("Unable to handle job %s" % job.name) - job.sendWorkFail() + self.mergerJobDispatch(job) except Exception: self.log.exception("Exception while running job") job.sendWorkException( @@ -1788,21 +1813,28 @@ class ExecutorServer(object): except Exception: self.log.exception("Exception while getting job") + def mergerJobDispatch(self, job): + with self.run_lock: + if job.name == 'merger:cat': + self.log.debug("Got cat job: %s" % job.unique) + self.cat(job) + elif job.name == 'merger:merge': + self.log.debug("Got merge job: %s" % job.unique) + self.merge(job) + elif job.name == 'merger:refstate': + self.log.debug("Got refstate job: %s" % job.unique) + self.refstate(job) + else: + self.log.error("Unable to handle job %s" % job.name) + job.sendWorkFail() + def run_executor(self): self.log.debug("Starting executor listener") while self._running: try: job = self.executor_worker.getJob() try: - if job.name == 'executor:execute': - self.log.debug("Got execute job: %s" % job.unique) - self.executeJob(job) - elif job.name.startswith('executor:stop'): - self.log.debug("Got stop job: %s" % job.unique) - self.stopJob(job) - else: - self.log.error("Unable to handle job %s" % job.name) - job.sendWorkFail() + self.executorJobDispatch(job) except Exception: self.log.exception("Exception while running job") job.sendWorkException( @@ -1812,9 +1844,20 @@ class ExecutorServer(object): except Exception: self.log.exception("Exception while getting job") - def run_governor(self): - while not self.governor_stop_event.wait(30): - self.manageLoad() + def executorJobDispatch(self, job): + with self.run_lock: + if not self._running: + job.sendWorkFail() + return + if job.name == 'executor:execute': + self.log.debug("Got execute job: %s" % job.unique) + self.executeJob(job) + elif job.name.startswith('executor:stop'): + self.log.debug("Got stop job: %s" % job.unique) + self.stopJob(job) + else: + self.log.error("Unable to handle job %s" % job.name) + job.sendWorkFail() def executeJob(self, job): if self.statsd: @@ -1823,6 +1866,10 @@ class ExecutorServer(object): self.job_workers[job.unique] = self._job_class(self, job) self.job_workers[job.unique].run() + def run_governor(self): + while not self.governor_stop_event.wait(30): + self.manageLoad() + def manageLoad(self): ''' Apply some heuristics to decide whether or not we should be askign for more jobs '''