Ansible launcher: delay node assignment under load

Gearman wakes all available workers at the same time when a job
is available.  The first one to respond gets the job.  To attempt
to more evenly distribute nodes (which are assigned via a gearman
job) across multiple zuul launchers, delay for a period
related exponentially to the number of nodes this launcher currently
has between the wake up and grab job packets.

Change-Id: I307938f97b730b229c1622cd2f929fc5b65ccdad
This commit is contained in:
James E. Blair
2016-06-16 14:02:33 -07:00
parent 6870b12932
commit f0291c244a

View File

@@ -51,7 +51,19 @@ def boolify(x):
return bool(x)
class GearWorker(gear.Worker):
class LaunchGearWorker(gear.Worker):
def __init__(self, *args, **kw):
self.__launch_server = kw.pop('launch_server')
super(LaunchGearWorker, self).__init__(*args, **kw)
def handleNoop(self, packet):
workers = len(self.__launch_server.node_workers)
delay = (workers ** 2) / 1000.0
time.sleep(delay)
return super(LaunchGearWorker, self).handleNoop(packet)
class NodeGearWorker(gear.Worker):
MASS_DO = 101
def sendMassDo(self, functions):
@@ -203,7 +215,8 @@ class LaunchServer(object):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = gear.Worker('Zuul Launch Server')
self.worker = LaunchGearWorker('Zuul Launch Server',
launch_server=self)
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
@@ -533,7 +546,7 @@ class NodeWorker(object):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = GearWorker(self.name)
self.worker = NodeGearWorker(self.name)
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()