Fix parsing gearman status

The current code has some parse errors with more complex gearman
function names (which can show up due to the way Jenkins constructs
maven jobs).

Also, switch the calculation to examine only queued jobs (total -
running) instead of trying to calculate a worker shortage (total -
workers).  The latter doesn't deal well with multiple jobs that
require workers of the same image (it incorrectly behaves as if
they are independent).  By only examining queued workers, the actual
relationship between multiple jobs that require workers from the
same image is manifested by the fact that if, together, all such
jobs exceed the demand, we will see jobs sitting in the queue.

In other words, the overall picture is now that nodepool should
have at least enough ready+building nodes to accomodate the number
of jobs for a given worker/image that gearman is waiting to run.

Change-Id: Ibc2990ed2c7aea37bd4c94e5387c80ef840afa83
This commit is contained in:
James E. Blair 2013-10-07 15:43:58 -07:00
parent 1d4c1483aa
commit 49238174ff
1 changed files with 23 additions and 11 deletions

View File

@ -163,6 +163,10 @@ class NodeUpdateListener(threading.Thread):
class GearmanClient(gear.Client):
def __init__(self):
super(GearmanClient, self).__init__()
self.__log = logging.getLogger("nodepool.GearmanClient")
def getNeededWorkers(self):
needed_workers = {}
job_worker_map = {}
@ -172,32 +176,40 @@ class GearmanClient(gear.Client):
req = gear.StatusAdminRequest()
connection.sendAdminRequest(req)
except Exception:
self.log.exception("Exception while listing functions")
self.__log.exception("Exception while listing functions")
continue
for line in req.response.split('\n'):
parts = [x.strip() for x in line.split()]
if not parts or parts[0] == '.':
continue
deficit = int(parts[1]) - int(parts[3])
if ':' in parts[0]:
job, worker = parts[0].split(':')
if not parts[0].startswith('build:'):
continue
function = parts[0][len('build:'):]
# total jobs in queue - running
queued = int(parts[1]) - int(parts[2])
if queued > 0:
self.__log.debug("Function: %s queued: %s" % (function, queued))
if ':' in function:
fparts = function.split(':')
job = fparts[-2]
worker = fparts[-1]
workers = job_worker_map.get(job, [])
workers.append(worker)
job_worker_map[job] = workers
if deficit:
if queued > 0:
needed_workers[worker] = (
needed_workers.get(worker, 0) + deficit)
elif deficit:
job = parts[0]
needed_workers.get(worker, 0) + queued)
elif queued > 0:
job = function
unspecified_jobs[job] = (unspecified_jobs.get(job, 0) +
deficit)
for job, deficit in unspecified_jobs.items():
queued)
for job, queued in unspecified_jobs.items():
workers = job_worker_map.get(job)
if not workers:
continue
worker = workers[0]
needed_workers[worker] = (needed_workers.get(worker, 0) +
deficit)
queued)
return needed_workers