diff --git a/zuul/executor/server.py b/zuul/executor/server.py index af2e7e9c1e..b37a82e9e2 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -513,6 +513,21 @@ class ExecutorMergeWorker(gear.TextWorker): super(ExecutorMergeWorker, self).handleNoop(packet) +class ExecutorExecuteWorker(gear.TextWorker): + def __init__(self, executor_server, *args, **kw): + self.zuul_executor_server = executor_server + super(ExecutorExecuteWorker, self).__init__(*args, **kw) + + def handleNoop(self, packet): + # Delay our response to running a new job based on the number + # of jobs we're currently running, in an attempt to spread + # load evenly among executors. + workers = len(self.zuul_executor_server.job_workers) + delay = (workers ** 2) / 1000.0 + time.sleep(delay) + return super(ExecutorExecuteWorker, self).handleNoop(packet) + + class ExecutorServer(object): log = logging.getLogger("zuul.ExecutorServer") @@ -612,7 +627,8 @@ class ExecutorServer(object): ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger') self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) - self.executor_worker = gear.TextWorker('Zuul Executor Server') + self.executor_worker = ExecutorExecuteWorker( + self, 'Zuul Executor Server') self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) self.log.debug("Waiting for server") self.merger_worker.waitForServer()