Merge "Infer node demand from workers listing"

This commit is contained in:
Jenkins 2016-07-11 17:30:50 +00:00 committed by Gerrit Code Review
commit cb32fd47e2
2 changed files with 120 additions and 27 deletions

@ -266,11 +266,39 @@ class GearmanClient(gear.Client):
super(GearmanClient, self).__init__(client_id='nodepool')
self.__log = logging.getLogger("nodepool.GearmanClient")
def getNeededWorkers(self):
def getNeededWorkers(self, session):
needed_workers = {}
job_worker_map = {}
unspecified_jobs = {}
function_worker_map = {}
for connection in self.active_connections:
try:
req = gear.WorkersAdminRequest()
connection.sendAdminRequest(req, timeout=300)
except Exception:
self.__log.exception("Exception while listing workers")
self._lostConnection(connection)
continue
# Populate function_worker_map here
for line in req.response.split('\n'):
parts = [x.strip() for x in line.split(':', 1)]
# parts[0] - Connection details
# parts[1] - Functions that remote connection can execute
if len(parts) < 2 or not parts[0] or parts[0] == '.':
# Skip if end of response or if no functions list.
continue
(conn_fd, remote_ip, nodename) = parts[0].split(None, 2)
# Not every worker reg has build: functions so filter
# and only handle those entries.
functions = [x[len('build:'):]
for x in parts[1].split()
if x.startswith('build:')]
if functions:
node = session.getNodeByNodename(nodename)
if node:
worker = node.label_name
for function in functions:
workers = function_worker_map.setdefault(function,
set())
workers.add(worker)
try:
req = gear.StatusAdminRequest()
connection.sendAdminRequest(req, timeout=300)
@ -306,29 +334,18 @@ class GearmanClient(gear.Client):
if queued > 0:
self.__log.debug("Function: %s queued: %s" % (function,
queued))
if ':' in function:
fparts = function.split(':')
# fparts[0] - function name
# fparts[1] - target node [type]
job = fparts[-2]
worker = fparts[-1]
workers = job_worker_map.get(job, [])
workers.append(worker)
job_worker_map[job] = workers
if queued > 0:
needed_workers[worker] = (
needed_workers.get(worker, 0) + queued)
elif queued > 0:
job = function
unspecified_jobs[job] = (unspecified_jobs.get(job, 0) +
queued)
for job, queued in unspecified_jobs.items():
workers = job_worker_map.get(job)
if not workers:
continue
worker = workers[0]
needed_workers[worker] = (needed_workers.get(worker, 0) +
queued)
workers = function_worker_map.get(function)
# We assume worker was populated by function_worker_map
# fillout above in the workers status parsing.
if not workers:
continue
# Get worker for use in populating needed_workers.
# Set pop removes entries so we add it back in after popping.
worker = workers.pop()
workers.add(worker)
if queued > 0:
needed_workers[worker] = (
needed_workers.get(worker, 0) + queued)
return needed_workers
def handleWorkComplete(self, packet):
@ -1320,7 +1337,7 @@ class NodePool(threading.Thread):
self.log.debug("Beginning node launch calculation")
# Get the current demand for nodes.
if self.gearman_client:
label_demand = self.gearman_client.getNeededWorkers()
label_demand = self.gearman_client.getNeededWorkers(session)
else:
label_demand = {}

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import gear
import json
import logging
import threading
@ -692,6 +693,81 @@ class TestNodepool(tests.DBTestCase):
node = session.getNode(2)
self.assertEqual(node, None)
def test_no_label_gearman_demand(self):
"""Test that labelless demand is calculated properly"""
configfile = self.setup_config('node.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.waitForImage(pool, 'fake-provider', 'fake-image')
self.waitForNodes(pool)
with pool.getDB().getSession() as session:
nodes = session.getNodes(provider_name='fake-provider',
label_name='fake-label',
target_name='fake-target',
state=nodedb.READY)
self.assertEqual(len(nodes), 1)
nodename = nodes[0].nodename
worker = gear.Worker(nodename)
worker.addServer('localhost', self.gearman_server.port)
worker.registerFunction('build:foo')
client = gear.Client()
client.addServer('localhost', self.gearman_server.port)
client.waitForServer()
job1 = gear.Job('build:foo', '1')
job2 = gear.Job('build:foo', '2')
# Create 2 demand for fake-label via job foo registration
client.submitJob(job1)
client.submitJob(job2)
self.waitForNodes(pool)
with pool.getDB().getSession() as session:
nodes = session.getNodes(provider_name='fake-provider',
label_name='fake-label',
target_name='fake-target',
state=nodedb.READY)
# 1 (min ready) + 2 (demand)
self.assertEqual(len(nodes), 3)
client.shutdown()
def test_label_gearman_demand(self):
"""Test that labeled demand is calculated properly"""
configfile = self.setup_config('node.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.waitForImage(pool, 'fake-provider', 'fake-image')
self.waitForNodes(pool)
with pool.getDB().getSession() as session:
nodes = session.getNodes(provider_name='fake-provider',
label_name='fake-label',
target_name='fake-target',
state=nodedb.READY)
self.assertEqual(len(nodes), 1)
nodename = nodes[0].nodename
worker = gear.Worker(nodename)
worker.addServer('localhost', self.gearman_server.port)
worker.registerFunction('build:foo')
worker.registerFunction('build:foo:fake-label')
client = gear.Client()
client.addServer('localhost', self.gearman_server.port)
client.waitForServer()
job1 = gear.Job('build:foo:fake-label', '1')
job2 = gear.Job('build:foo:fake-label', '2')
# Create 2 demand for fake-label via job foo registration
client.submitJob(job1)
client.submitJob(job2)
self.waitForNodes(pool)
with pool.getDB().getSession() as session:
nodes = session.getNodes(provider_name='fake-provider',
label_name='fake-label',
target_name='fake-target',
state=nodedb.READY)
# 1 (min ready) + 2 (demand)
self.assertEqual(len(nodes), 3)
client.shutdown()
class TestGearClient(tests.DBTestCase):
def test_wait_for_completion(self):