Delay executor noop packets

This is something we had in Zuul v2.5 but slipped through in v3.
Executors delay getting new jobs from gearman in proportion to
the number of jobs they're already running, so as to avoid a single
executor ending up running all the jobs.

Change-Id: I75b87c09d507ee7c4785acf52e75e918bb0ab16f
This commit is contained in:
James E. Blair 2017-09-27 12:25:19 -07:00
parent b3d3f2b9d9
commit 0d2cbcbb20
1 changed files with 17 additions and 1 deletions

View File

@ -513,6 +513,21 @@ class ExecutorMergeWorker(gear.TextWorker):
super(ExecutorMergeWorker, self).handleNoop(packet)
class ExecutorExecuteWorker(gear.TextWorker):
def __init__(self, executor_server, *args, **kw):
self.zuul_executor_server = executor_server
super(ExecutorExecuteWorker, self).__init__(*args, **kw)
def handleNoop(self, packet):
# Delay our response to running a new job based on the number
# of jobs we're currently running, in an attempt to spread
# load evenly among executors.
workers = len(self.zuul_executor_server.job_workers)
delay = (workers ** 2) / 1000.0
time.sleep(delay)
return super(ExecutorExecuteWorker, self).handleNoop(packet)
class ExecutorServer(object):
log = logging.getLogger("zuul.ExecutorServer")
@ -612,7 +627,8 @@ class ExecutorServer(object):
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.executor_worker = gear.TextWorker('Zuul Executor Server')
self.executor_worker = ExecutorExecuteWorker(
self, 'Zuul Executor Server')
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.log.debug("Waiting for server")
self.merger_worker.waitForServer()