From 0d2cbcbb20c7299043b25b48c93ca62fdf32a3c1 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 27 Sep 2017 12:25:19 -0700 Subject: [PATCH] Delay executor noop packets This is something we had in Zuul v2.5 but slipped through in v3. Executors delay getting new jobs from gearman in proportion to the number of jobs they're already running, so as to avoid a single executor ending up running all the jobs. Change-Id: I75b87c09d507ee7c4785acf52e75e918bb0ab16f --- zuul/executor/server.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/zuul/executor/server.py b/zuul/executor/server.py index af2e7e9c1e..b37a82e9e2 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -513,6 +513,21 @@ class ExecutorMergeWorker(gear.TextWorker): super(ExecutorMergeWorker, self).handleNoop(packet) +class ExecutorExecuteWorker(gear.TextWorker): + def __init__(self, executor_server, *args, **kw): + self.zuul_executor_server = executor_server + super(ExecutorExecuteWorker, self).__init__(*args, **kw) + + def handleNoop(self, packet): + # Delay our response to running a new job based on the number + # of jobs we're currently running, in an attempt to spread + # load evenly among executors. + workers = len(self.zuul_executor_server.job_workers) + delay = (workers ** 2) / 1000.0 + time.sleep(delay) + return super(ExecutorExecuteWorker, self).handleNoop(packet) + + class ExecutorServer(object): log = logging.getLogger("zuul.ExecutorServer") @@ -612,7 +627,8 @@ class ExecutorServer(object): ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger') self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) - self.executor_worker = gear.TextWorker('Zuul Executor Server') + self.executor_worker = ExecutorExecuteWorker( + self, 'Zuul Executor Server') self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) self.log.debug("Waiting for server") self.merger_worker.waitForServer()