diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst index b3a4c3f43e..5e7e0e1a22 100644 --- a/doc/source/admin/components.rst +++ b/doc/source/admin/components.rst @@ -468,6 +468,23 @@ The following sections of ``zuul.conf`` are used by the executor: `bubblewrap` has become integral to securely operating Zuul. If you have a valid use case for it, we encourage you to let us know. + .. attr:: load_multiplier + :default: 2.5 + + When an executor host gets too busy, the system may suffer + timeouts and other ill effects. The executor will stop accepting + more than 1 job at a time until load has lowered below a safe + level. This level is determined by multiplying the number of + CPU's by `load_multiplier`. + + So for example, if the system has 2 CPUs, and load_multiplier + is 2.5, the safe load for the system is 5.00. Any time the + system load average is over 5.00, the executor will quit + accepting multiple jobs at one time. + + The executor will observe system load and determine whether + to accept more jobs every 30 seconds. + .. attr:: merger .. attr:: git_user_email diff --git a/zuul/executor/server.py b/zuul/executor/server.py index b37a82e9e2..32c0523477 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -16,6 +16,7 @@ import collections import datetime import json import logging +import multiprocessing import os import shutil import signal @@ -563,6 +564,10 @@ class ExecutorServer(object): self.merge_name = get_default(self.config, 'merger', 'git_user_name') execution_wrapper_name = get_default(self.config, 'executor', 'execution_wrapper', 'bubblewrap') + load_multiplier = float(get_default(self.config, 'executor', + 'load_multiplier', '2.5')) + self.max_load_avg = multiprocessing.cpu_count() * load_multiplier + self.accepting_work = False self.execution_wrapper = connections.drivers[execution_wrapper_name] self.connections = connections @@ -652,19 +657,32 @@ class ExecutorServer(object): self.executor_thread = threading.Thread(target=self.run_executor) self.executor_thread.daemon = True self.executor_thread.start() + self.governor_stop_event = threading.Event() + self.governor_thread = threading.Thread(target=self.run_governor) + self.governor_thread.daemon = True + self.governor_thread.start() self.disk_accountant.start() def register(self): - self.executor_worker.registerFunction("executor:execute") + self.register_work() self.executor_worker.registerFunction("executor:stop:%s" % self.hostname) self.merger_worker.registerFunction("merger:merge") self.merger_worker.registerFunction("merger:cat") self.merger_worker.registerFunction("merger:refstate") + def register_work(self): + self.accepting_work = True + self.executor_worker.registerFunction("executor:execute") + + def unregister_work(self): + self.accepting_work = False + self.executor_worker.unregisterFunction("executor:execute") + def stop(self): self.log.debug("Stopping") self.disk_accountant.stop() + self.governor_stop_event.set() self._running = False self._command_running = False self.command_socket.stop() @@ -708,6 +726,7 @@ class ExecutorServer(object): self.update_thread.join() self.merger_thread.join() self.executor_thread.join() + self.governor_thread.join() def runCommand(self): while self._command_running: @@ -796,10 +815,31 @@ class ExecutorServer(object): except Exception: self.log.exception("Exception while getting job") + def run_governor(self): + while not self.governor_stop_event.wait(30): + self.manageLoad() + def executeJob(self, job): self.job_workers[job.unique] = AnsibleJob(self, job) self.job_workers[job.unique].run() + def manageLoad(self): + ''' Apply some heuristics to decide whether or not we should + be askign for more jobs ''' + load_avg = os.getloadavg()[0] + if self.accepting_work: + # Don't unregister if we don't have any active jobs. + if load_avg > self.max_load_avg and self.job_workers: + self.log.info( + "Unregistering due to high system load {} > {}".format( + load_avg, self.max_load_avg)) + self.unregister_work() + elif load_avg <= self.max_load_avg: + self.log.info( + "Re-registering as load is within limits {} <= {}".format( + load_avg, self.max_load_avg)) + self.register_work() + def finishJob(self, unique): del(self.job_workers[unique])