diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py index ba7523c8c7..d51898b733 100644 --- a/tests/unit/test_nodepool.py +++ b/tests/unit/test_nodepool.py @@ -76,7 +76,7 @@ class TestNodepool(BaseTestCase): self.assertEqual(request.state, 'fulfilled') # Accept the nodes - self.nodepool.acceptNodes(request) + self.nodepool.acceptNodes(request, request.id) nodeset = request.nodeset for node in nodeset.getNodes(): @@ -125,3 +125,47 @@ class TestNodepool(BaseTestCase): self.waitForRequests() self.assertEqual(len(self.provisioned_requests), 0) + + def test_accept_nodes_resubmitted(self): + # Test that a resubmitted request would not lock nodes + + nodeset = model.NodeSet() + nodeset.addNode(model.Node('controller', 'ubuntu-xenial')) + nodeset.addNode(model.Node('compute', 'ubuntu-xenial')) + job = model.Job('testjob') + job.nodeset = nodeset + request = self.nodepool.requestNodes(None, job) + self.waitForRequests() + self.assertEqual(len(self.provisioned_requests), 1) + self.assertEqual(request.state, 'fulfilled') + + # Accept the nodes, passing a different ID + self.nodepool.acceptNodes(request, "invalid") + nodeset = request.nodeset + + for node in nodeset.getNodes(): + self.assertIsNone(node.lock) + self.assertEqual(node.state, 'ready') + + def test_accept_nodes_lost_request(self): + # Test that a lost request would not lock nodes + + nodeset = model.NodeSet() + nodeset.addNode(model.Node('controller', 'ubuntu-xenial')) + nodeset.addNode(model.Node('compute', 'ubuntu-xenial')) + job = model.Job('testjob') + job.nodeset = nodeset + request = self.nodepool.requestNodes(None, job) + self.waitForRequests() + self.assertEqual(len(self.provisioned_requests), 1) + self.assertEqual(request.state, 'fulfilled') + + self.zk.deleteNodeRequest(request) + + # Accept the nodes + self.nodepool.acceptNodes(request, request.id) + nodeset = request.nodeset + + for node in nodeset.getNodes(): + self.assertIsNone(node.lock) + self.assertEqual(node.state, 'ready') diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 9a125cebfb..f4c850d106 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -111,16 +111,19 @@ class Nodepool(object): except Exception: self.log.exception("Error unlocking node:") - def lockNodeSet(self, nodeset): - self._lockNodes(nodeset.getNodes()) + def lockNodeSet(self, nodeset, request_id): + self._lockNodes(nodeset.getNodes(), request_id) - def _lockNodes(self, nodes): + def _lockNodes(self, nodes, request_id): # Try to lock all of the supplied nodes. If any lock fails, # try to unlock any which have already been locked before # re-raising the error. locked_nodes = [] try: for node in nodes: + if node.allocated_to != request_id: + raise Exception("Node %s allocated to %s, not %s" % + (node.id, node.allocated_to, request_id)) self.log.debug("Locking node %s" % (node,)) self.sched.zk.lockNode(node, timeout=30) locked_nodes.append(node) @@ -141,7 +144,12 @@ class Nodepool(object): del self.requests[request.uid] return False - if request.state in (model.STATE_FULFILLED, model.STATE_FAILED): + # TODOv3(jeblair): handle allocation failure + if deleted: + self.log.debug("Resubmitting lost node request %s" % (request,)) + request.id = None + self.sched.zk.submitNodeRequest(request, self._updateNodeRequest) + elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED): self.log.info("Node request %s %s" % (request, request.state)) # Give our results to the scheduler. @@ -150,18 +158,29 @@ class Nodepool(object): # Stop watching this request node. return False - # TODOv3(jeblair): handle allocation failure - elif deleted: - self.log.debug("Resubmitting lost node request %s" % (request,)) - self.sched.zk.submitNodeRequest(request, self._updateNodeRequest) + return True - def acceptNodes(self, request): + def acceptNodes(self, request, request_id): # Called by the scheduler when it wants to accept and lock # nodes for (potential) use. self.log.info("Accepting node request %s" % (request,)) + if request_id != request.id: + self.log.info("Skipping node accept for %s (resubmitted as %s)", + request_id, request.id) + return + + # Make sure the request still exists. It's possible it could have + # disappeared if we lost the ZK session between when the fulfillment + # response was added to our queue, and when we actually get around to + # processing it. Nodepool will automatically reallocate the assigned + # nodes in that situation. + if not self.sched.zk.nodeRequestExists(request): + self.log.info("Request %s no longer exists", request.id) + return + if request.canceled: self.log.info("Ignoring canceled node request %s" % (request,)) # The request was already deleted when it was canceled @@ -171,7 +190,7 @@ class Nodepool(object): if request.fulfilled: # If the request suceeded, try to lock the nodes. try: - self.lockNodeSet(request.nodeset) + self.lockNodeSet(request.nodeset, request.id) locked = True except Exception: self.log.exception("Error locking nodes:") diff --git a/zuul/scheduler.py b/zuul/scheduler.py index a926f6e305..ab147bae0b 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -164,6 +164,7 @@ class NodesProvisionedEvent(ResultEvent): def __init__(self, request): self.request = request + self.request_id = request.id def toList(item): @@ -889,9 +890,10 @@ class Scheduler(threading.Thread): def _doNodesProvisionedEvent(self, event): request = event.request + request_id = event.request_id build_set = request.build_set - self.nodepool.acceptNodes(request) + self.nodepool.acceptNodes(request, request_id) if request.canceled: return diff --git a/zuul/zk.py b/zuul/zk.py index a3efef2090..41d04296a8 100644 --- a/zuul/zk.py +++ b/zuul/zk.py @@ -187,6 +187,19 @@ class ZooKeeper(object): except kze.NoNodeError: pass + def nodeRequestExists(self, node_request): + ''' + See if a NodeRequest exists in ZooKeeper. + + :param NodeRequest node_request: A NodeRequest to verify. + + :returns: True if the request exists, False otherwise. + ''' + path = '%s/%s' % (self.REQUEST_ROOT, node_request.id) + if self.client.exists(path): + return True + return False + def storeNode(self, node): '''Store the node.