Mark nodes as 'in-use' before launching jobs

While we immediately lock a node given to us by nodepool, we delay
setting the node to 'in-use' until we actually request that the job
be launched so that if we end up canceling the job before it is
run, we might return the node unused to nodepool.

Change-Id: I2d2c0f9cdb4c199f2ed309e7b0cfc62e071037fa
This commit is contained in:
James E. Blair 2017-01-04 13:14:37 -08:00
parent a38c28efa3
commit cacdf2b659
5 changed files with 41 additions and 4 deletions

View File

@ -78,6 +78,11 @@ class TestNodepool(BaseTestCase):
self.assertIsNotNone(node.lock)
self.assertEqual(node.state, 'ready')
# Mark the nodes in use
self.nodepool.useNodeset(nodeset)
for node in nodeset.getNodes():
self.assertEqual(node.state, 'in-use')
def test_node_request_disconnect(self):
# Test that node requests are re-submitted after disconnect

View File

@ -364,6 +364,8 @@ class PipelineManager(object):
for job in jobs:
self.log.debug("Found job %s for change %s" % (job, item.change))
try:
nodeset = item.current_build_set.getJobNodeSet(job.name)
self.sched.nodepool.useNodeset(nodeset)
build = self.sched.launcher.launch(job, item,
self.pipeline,
dependent_items)

View File

@ -361,6 +361,7 @@ class Node(object):
self.public_ipv4 = None
self.private_ipv4 = None
self.public_ipv6 = None
self._keys = []
@property
def state(self):
@ -377,12 +378,22 @@ class Node(object):
def __repr__(self):
return '<Node %s %s:%s>' % (self.id, self.name, self.image)
def toDict(self):
d = {}
d['state'] = self.state
for k in self._keys:
d[k] = getattr(self, k)
return d
def updateFromDict(self, data):
self._state = data['state']
self.state_time = data['state_time']
self.public_ipv4 = data.get('public_ipv4')
self.private_ipv4 = data.get('private_ipv4')
self.public_ipv6 = data.get('public_ipv6')
keys = []
for k, v in data.items():
if k == 'state':
continue
keys.append(k)
setattr(self, k, v)
self._keys = keys
class NodeSet(object):

View File

@ -38,6 +38,13 @@ class Nodepool(object):
if request in self.requests:
self.requests.remove(request)
def useNodeset(self, nodeset):
for node in nodeset.getNodes():
if node.lock is None:
raise Exception("Node %s is not locked" % (node,))
node.state = 'in-use'
self.sched.zk.storeNode(node)
def returnNodes(self, nodes, used=True):
pass

View File

@ -333,6 +333,18 @@ class ZooKeeper(object):
except kze.NoNodeError:
pass
def storeNode(self, node):
'''Store the node.
The node is expected to already exist and is updated in its
entirety.
:param Node node: The node to update.
'''
path = '%s/%s' % (self.NODE_ROOT, node.id)
self.client.set(path, self._dictToStr(node.toDict()))
def lockNode(self, node, blocking=True, timeout=None):
'''
Lock a node.