Merge "Delay executor noop packets" into feature/zuulv3
This commit is contained in:
commit
2a0942273d
|
@ -513,6 +513,21 @@ class ExecutorMergeWorker(gear.TextWorker):
|
||||||
super(ExecutorMergeWorker, self).handleNoop(packet)
|
super(ExecutorMergeWorker, self).handleNoop(packet)
|
||||||
|
|
||||||
|
|
||||||
|
class ExecutorExecuteWorker(gear.TextWorker):
|
||||||
|
def __init__(self, executor_server, *args, **kw):
|
||||||
|
self.zuul_executor_server = executor_server
|
||||||
|
super(ExecutorExecuteWorker, self).__init__(*args, **kw)
|
||||||
|
|
||||||
|
def handleNoop(self, packet):
|
||||||
|
# Delay our response to running a new job based on the number
|
||||||
|
# of jobs we're currently running, in an attempt to spread
|
||||||
|
# load evenly among executors.
|
||||||
|
workers = len(self.zuul_executor_server.job_workers)
|
||||||
|
delay = (workers ** 2) / 1000.0
|
||||||
|
time.sleep(delay)
|
||||||
|
return super(ExecutorExecuteWorker, self).handleNoop(packet)
|
||||||
|
|
||||||
|
|
||||||
class ExecutorServer(object):
|
class ExecutorServer(object):
|
||||||
log = logging.getLogger("zuul.ExecutorServer")
|
log = logging.getLogger("zuul.ExecutorServer")
|
||||||
|
|
||||||
|
@ -612,7 +627,8 @@ class ExecutorServer(object):
|
||||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||||
self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
|
self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
|
||||||
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||||
self.executor_worker = gear.TextWorker('Zuul Executor Server')
|
self.executor_worker = ExecutorExecuteWorker(
|
||||||
|
self, 'Zuul Executor Server')
|
||||||
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||||
self.log.debug("Waiting for server")
|
self.log.debug("Waiting for server")
|
||||||
self.merger_worker.waitForServer()
|
self.merger_worker.waitForServer()
|
||||||
|
|
Loading…
Reference in New Issue