Merge "Handle double node locking snafu" into feature/zuulv3
This commit is contained in:
commit
278e956105
|
@ -76,7 +76,7 @@ class TestNodepool(BaseTestCase):
|
|||
self.assertEqual(request.state, 'fulfilled')
|
||||
|
||||
# Accept the nodes
|
||||
self.nodepool.acceptNodes(request)
|
||||
self.nodepool.acceptNodes(request, request.id)
|
||||
nodeset = request.nodeset
|
||||
|
||||
for node in nodeset.getNodes():
|
||||
|
@ -125,3 +125,47 @@ class TestNodepool(BaseTestCase):
|
|||
|
||||
self.waitForRequests()
|
||||
self.assertEqual(len(self.provisioned_requests), 0)
|
||||
|
||||
def test_accept_nodes_resubmitted(self):
|
||||
# Test that a resubmitted request would not lock nodes
|
||||
|
||||
nodeset = model.NodeSet()
|
||||
nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
|
||||
nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
|
||||
job = model.Job('testjob')
|
||||
job.nodeset = nodeset
|
||||
request = self.nodepool.requestNodes(None, job)
|
||||
self.waitForRequests()
|
||||
self.assertEqual(len(self.provisioned_requests), 1)
|
||||
self.assertEqual(request.state, 'fulfilled')
|
||||
|
||||
# Accept the nodes, passing a different ID
|
||||
self.nodepool.acceptNodes(request, "invalid")
|
||||
nodeset = request.nodeset
|
||||
|
||||
for node in nodeset.getNodes():
|
||||
self.assertIsNone(node.lock)
|
||||
self.assertEqual(node.state, 'ready')
|
||||
|
||||
def test_accept_nodes_lost_request(self):
|
||||
# Test that a lost request would not lock nodes
|
||||
|
||||
nodeset = model.NodeSet()
|
||||
nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
|
||||
nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
|
||||
job = model.Job('testjob')
|
||||
job.nodeset = nodeset
|
||||
request = self.nodepool.requestNodes(None, job)
|
||||
self.waitForRequests()
|
||||
self.assertEqual(len(self.provisioned_requests), 1)
|
||||
self.assertEqual(request.state, 'fulfilled')
|
||||
|
||||
self.zk.deleteNodeRequest(request)
|
||||
|
||||
# Accept the nodes
|
||||
self.nodepool.acceptNodes(request, request.id)
|
||||
nodeset = request.nodeset
|
||||
|
||||
for node in nodeset.getNodes():
|
||||
self.assertIsNone(node.lock)
|
||||
self.assertEqual(node.state, 'ready')
|
||||
|
|
|
@ -111,16 +111,19 @@ class Nodepool(object):
|
|||
except Exception:
|
||||
self.log.exception("Error unlocking node:")
|
||||
|
||||
def lockNodeSet(self, nodeset):
|
||||
self._lockNodes(nodeset.getNodes())
|
||||
def lockNodeSet(self, nodeset, request_id):
|
||||
self._lockNodes(nodeset.getNodes(), request_id)
|
||||
|
||||
def _lockNodes(self, nodes):
|
||||
def _lockNodes(self, nodes, request_id):
|
||||
# Try to lock all of the supplied nodes. If any lock fails,
|
||||
# try to unlock any which have already been locked before
|
||||
# re-raising the error.
|
||||
locked_nodes = []
|
||||
try:
|
||||
for node in nodes:
|
||||
if node.allocated_to != request_id:
|
||||
raise Exception("Node %s allocated to %s, not %s" %
|
||||
(node.id, node.allocated_to, request_id))
|
||||
self.log.debug("Locking node %s" % (node,))
|
||||
self.sched.zk.lockNode(node, timeout=30)
|
||||
locked_nodes.append(node)
|
||||
|
@ -141,7 +144,12 @@ class Nodepool(object):
|
|||
del self.requests[request.uid]
|
||||
return False
|
||||
|
||||
if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
|
||||
# TODOv3(jeblair): handle allocation failure
|
||||
if deleted:
|
||||
self.log.debug("Resubmitting lost node request %s" % (request,))
|
||||
request.id = None
|
||||
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
|
||||
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
|
||||
self.log.info("Node request %s %s" % (request, request.state))
|
||||
|
||||
# Give our results to the scheduler.
|
||||
|
@ -150,18 +158,29 @@ class Nodepool(object):
|
|||
|
||||
# Stop watching this request node.
|
||||
return False
|
||||
# TODOv3(jeblair): handle allocation failure
|
||||
elif deleted:
|
||||
self.log.debug("Resubmitting lost node request %s" % (request,))
|
||||
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
|
||||
|
||||
return True
|
||||
|
||||
def acceptNodes(self, request):
|
||||
def acceptNodes(self, request, request_id):
|
||||
# Called by the scheduler when it wants to accept and lock
|
||||
# nodes for (potential) use.
|
||||
|
||||
self.log.info("Accepting node request %s" % (request,))
|
||||
|
||||
if request_id != request.id:
|
||||
self.log.info("Skipping node accept for %s (resubmitted as %s)",
|
||||
request_id, request.id)
|
||||
return
|
||||
|
||||
# Make sure the request still exists. It's possible it could have
|
||||
# disappeared if we lost the ZK session between when the fulfillment
|
||||
# response was added to our queue, and when we actually get around to
|
||||
# processing it. Nodepool will automatically reallocate the assigned
|
||||
# nodes in that situation.
|
||||
if not self.sched.zk.nodeRequestExists(request):
|
||||
self.log.info("Request %s no longer exists", request.id)
|
||||
return
|
||||
|
||||
if request.canceled:
|
||||
self.log.info("Ignoring canceled node request %s" % (request,))
|
||||
# The request was already deleted when it was canceled
|
||||
|
@ -171,7 +190,7 @@ class Nodepool(object):
|
|||
if request.fulfilled:
|
||||
# If the request suceeded, try to lock the nodes.
|
||||
try:
|
||||
self.lockNodeSet(request.nodeset)
|
||||
self.lockNodeSet(request.nodeset, request.id)
|
||||
locked = True
|
||||
except Exception:
|
||||
self.log.exception("Error locking nodes:")
|
||||
|
|
|
@ -164,6 +164,7 @@ class NodesProvisionedEvent(ResultEvent):
|
|||
|
||||
def __init__(self, request):
|
||||
self.request = request
|
||||
self.request_id = request.id
|
||||
|
||||
|
||||
def toList(item):
|
||||
|
@ -889,9 +890,10 @@ class Scheduler(threading.Thread):
|
|||
|
||||
def _doNodesProvisionedEvent(self, event):
|
||||
request = event.request
|
||||
request_id = event.request_id
|
||||
build_set = request.build_set
|
||||
|
||||
self.nodepool.acceptNodes(request)
|
||||
self.nodepool.acceptNodes(request, request_id)
|
||||
if request.canceled:
|
||||
return
|
||||
|
||||
|
|
13
zuul/zk.py
13
zuul/zk.py
|
@ -187,6 +187,19 @@ class ZooKeeper(object):
|
|||
except kze.NoNodeError:
|
||||
pass
|
||||
|
||||
def nodeRequestExists(self, node_request):
|
||||
'''
|
||||
See if a NodeRequest exists in ZooKeeper.
|
||||
|
||||
:param NodeRequest node_request: A NodeRequest to verify.
|
||||
|
||||
:returns: True if the request exists, False otherwise.
|
||||
'''
|
||||
path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
|
||||
if self.client.exists(path):
|
||||
return True
|
||||
return False
|
||||
|
||||
def storeNode(self, node):
|
||||
'''Store the node.
|
||||
|
||||
|
|
Loading…
Reference in New Issue