Merge "Handle nodepool allocation failure" into feature/zuulv3
This commit is contained in:
commit
fa8e36a8b4
|
@ -887,6 +887,7 @@ class FakeNodepool(object):
|
||||||
self.thread = threading.Thread(target=self.run)
|
self.thread = threading.Thread(target=self.run)
|
||||||
self.thread.daemon = True
|
self.thread.daemon = True
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
|
self.fail_requests = set()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._running = False
|
self._running = False
|
||||||
|
@ -965,21 +966,27 @@ class FakeNodepool(object):
|
||||||
nodeid = path.split("/")[-1]
|
nodeid = path.split("/")[-1]
|
||||||
return nodeid
|
return nodeid
|
||||||
|
|
||||||
|
def addFailRequest(self, request):
|
||||||
|
self.fail_requests.add(request['_oid'])
|
||||||
|
|
||||||
def fulfillRequest(self, request):
|
def fulfillRequest(self, request):
|
||||||
if request['state'] == 'fulfilled':
|
if request['state'] != 'requested':
|
||||||
return
|
return
|
||||||
request = request.copy()
|
request = request.copy()
|
||||||
oid = request['_oid']
|
oid = request['_oid']
|
||||||
del request['_oid']
|
del request['_oid']
|
||||||
|
|
||||||
nodes = []
|
if oid in self.fail_requests:
|
||||||
for node in request['node_types']:
|
request['state'] = 'failed'
|
||||||
nodeid = self.makeNode(oid, node)
|
else:
|
||||||
nodes.append(nodeid)
|
request['state'] = 'fulfilled'
|
||||||
|
nodes = []
|
||||||
|
for node in request['node_types']:
|
||||||
|
nodeid = self.makeNode(oid, node)
|
||||||
|
nodes.append(nodeid)
|
||||||
|
request['nodes'] = nodes
|
||||||
|
|
||||||
request['state'] = 'fulfilled'
|
|
||||||
request['state_time'] = time.time()
|
request['state_time'] = time.time()
|
||||||
request['nodes'] = nodes
|
|
||||||
path = self.REQUEST_ROOT + '/' + oid
|
path = self.REQUEST_ROOT + '/' + oid
|
||||||
data = json.dumps(request)
|
data = json.dumps(request)
|
||||||
self.log.debug("Fulfilling node request: %s %s" % (oid, data))
|
self.log.debug("Fulfilling node request: %s %s" % (oid, data))
|
||||||
|
|
|
@ -4546,6 +4546,27 @@ For CI problems and help debugging, contact ci@example.org"""
|
||||||
self.assertEqual(A.data['status'], 'MERGED')
|
self.assertEqual(A.data['status'], 'MERGED')
|
||||||
self.assertEqual(A.reported, 2)
|
self.assertEqual(A.reported, 2)
|
||||||
|
|
||||||
|
def test_nodepool_failure(self):
|
||||||
|
"Test that jobs are reported after a nodepool failure"
|
||||||
|
|
||||||
|
self.fake_nodepool.paused = True
|
||||||
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
|
A.addApproval('code-review', 2)
|
||||||
|
self.fake_gerrit.addEvent(A.addApproval('approved', 1))
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
req = self.fake_nodepool.getNodeRequests()[0]
|
||||||
|
self.fake_nodepool.addFailRequest(req)
|
||||||
|
|
||||||
|
self.fake_nodepool.paused = False
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
self.assertEqual(A.data['status'], 'NEW')
|
||||||
|
self.assertEqual(A.reported, 2)
|
||||||
|
self.assertIn('project-merge : NODE_FAILURE', A.messages[1])
|
||||||
|
self.assertIn('project-test1 : SKIPPED', A.messages[1])
|
||||||
|
self.assertIn('project-test2 : SKIPPED', A.messages[1])
|
||||||
|
|
||||||
|
|
||||||
class TestDuplicatePipeline(ZuulTestCase):
|
class TestDuplicatePipeline(ZuulTestCase):
|
||||||
tenant_config_file = 'config/duplicate-pipeline/main.yaml'
|
tenant_config_file = 'config/duplicate-pipeline/main.yaml'
|
||||||
|
|
|
@ -648,6 +648,10 @@ class PipelineManager(object):
|
||||||
build_set = request.build_set
|
build_set = request.build_set
|
||||||
build_set.jobNodeRequestComplete(request.job.name, request,
|
build_set.jobNodeRequestComplete(request.job.name, request,
|
||||||
request.nodeset)
|
request.nodeset)
|
||||||
|
if request.failed or not request.fulfilled:
|
||||||
|
self.log.info("Node request failure for %s" %
|
||||||
|
(request.job.name,))
|
||||||
|
build_set.item.setNodeRequestFailure(request.job)
|
||||||
self.log.info("Completed node request %s for job %s of item %s "
|
self.log.info("Completed node request %s for job %s of item %s "
|
||||||
"with nodes %s" %
|
"with nodes %s" %
|
||||||
(request, request.job, build_set.item,
|
(request, request.job, build_set.item,
|
||||||
|
|
|
@ -473,6 +473,10 @@ class NodeRequest(object):
|
||||||
# overwritten).
|
# overwritten).
|
||||||
self.failed = False
|
self.failed = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fulfilled(self):
|
||||||
|
return (self._state == STATE_FULFILLED) and not self.failed
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self):
|
def state(self):
|
||||||
return self._state
|
return self._state
|
||||||
|
@ -989,18 +993,28 @@ class QueueItem(object):
|
||||||
return self._findJobsToRun(tree.job_trees, mutex)
|
return self._findJobsToRun(tree.job_trees, mutex)
|
||||||
|
|
||||||
def _findJobsToRequest(self, job_trees):
|
def _findJobsToRequest(self, job_trees):
|
||||||
|
build_set = self.current_build_set
|
||||||
toreq = []
|
toreq = []
|
||||||
|
if self.item_ahead:
|
||||||
|
if self.item_ahead.isHoldingFollowingChanges():
|
||||||
|
return []
|
||||||
for tree in job_trees:
|
for tree in job_trees:
|
||||||
job = tree.job
|
job = tree.job
|
||||||
|
result = None
|
||||||
if job:
|
if job:
|
||||||
if not job.changeMatches(self.change):
|
if not job.changeMatches(self.change):
|
||||||
continue
|
continue
|
||||||
nodeset = self.current_build_set.getJobNodeSet(job.name)
|
build = build_set.getBuild(job.name)
|
||||||
if nodeset is None:
|
if build:
|
||||||
req = self.current_build_set.getJobNodeRequest(job.name)
|
result = build.result
|
||||||
if req is None:
|
else:
|
||||||
toreq.append(job)
|
nodeset = build_set.getJobNodeSet(job.name)
|
||||||
toreq.extend(self._findJobsToRequest(tree.job_trees))
|
if nodeset is None:
|
||||||
|
req = build_set.getJobNodeRequest(job.name)
|
||||||
|
if req is None:
|
||||||
|
toreq.append(job)
|
||||||
|
if result == 'SUCCESS' or not job:
|
||||||
|
toreq.extend(self._findJobsToRequest(tree.job_trees))
|
||||||
return toreq
|
return toreq
|
||||||
|
|
||||||
def findJobsToRequest(self):
|
def findJobsToRequest(self):
|
||||||
|
@ -1022,6 +1036,12 @@ class QueueItem(object):
|
||||||
fakebuild.result = 'SKIPPED'
|
fakebuild.result = 'SKIPPED'
|
||||||
self.addBuild(fakebuild)
|
self.addBuild(fakebuild)
|
||||||
|
|
||||||
|
def setNodeRequestFailure(self, job):
|
||||||
|
fakebuild = Build(job, None)
|
||||||
|
self.addBuild(fakebuild)
|
||||||
|
fakebuild.result = 'NODE_FAILURE'
|
||||||
|
self.setResult(fakebuild)
|
||||||
|
|
||||||
def setDequeuedNeedingChange(self):
|
def setDequeuedNeedingChange(self):
|
||||||
self.dequeued_needing_change = True
|
self.dequeued_needing_change = True
|
||||||
self._setAllJobsSkipped()
|
self._setAllJobsSkipped()
|
||||||
|
|
|
@ -98,8 +98,8 @@ class Nodepool(object):
|
||||||
if request.uid not in self.requests:
|
if request.uid not in self.requests:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if request.state == model.STATE_FULFILLED:
|
if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
|
||||||
self.log.info("Node request %s fulfilled" % (request,))
|
self.log.info("Node request %s %s" % (request, request.state))
|
||||||
|
|
||||||
# Give our results to the scheduler.
|
# Give our results to the scheduler.
|
||||||
self.sched.onNodesProvisioned(request)
|
self.sched.onNodesProvisioned(request)
|
||||||
|
@ -119,17 +119,18 @@ class Nodepool(object):
|
||||||
|
|
||||||
self.log.info("Accepting node request %s" % (request,))
|
self.log.info("Accepting node request %s" % (request,))
|
||||||
|
|
||||||
# First, try to lock the nodes.
|
|
||||||
locked = False
|
locked = False
|
||||||
try:
|
if request.fulfilled:
|
||||||
self.lockNodeset(request.nodeset)
|
# If the request suceeded, try to lock the nodes.
|
||||||
locked = True
|
try:
|
||||||
except Exception:
|
self.lockNodeset(request.nodeset)
|
||||||
self.log.exception("Error locking nodes:")
|
locked = True
|
||||||
request.failed = True
|
except Exception:
|
||||||
|
self.log.exception("Error locking nodes:")
|
||||||
|
request.failed = True
|
||||||
|
|
||||||
# Regardless of whether locking succeeded, delete the
|
# Regardless of whether locking (or even the request)
|
||||||
# request.
|
# succeeded, delete the request.
|
||||||
self.log.debug("Deleting node request %s" % (request,))
|
self.log.debug("Deleting node request %s" % (request,))
|
||||||
try:
|
try:
|
||||||
self.sched.zk.deleteNodeRequest(request)
|
self.sched.zk.deleteNodeRequest(request)
|
||||||
|
|
|
@ -811,22 +811,19 @@ class Scheduler(threading.Thread):
|
||||||
request = event.request
|
request = event.request
|
||||||
build_set = request.build_set
|
build_set = request.build_set
|
||||||
|
|
||||||
try:
|
self.nodepool.acceptNodes(request)
|
||||||
self.nodepool.acceptNodes(request)
|
|
||||||
except Exception:
|
|
||||||
self.log.exception("Unable to accept nodes from request %s:"
|
|
||||||
% (request,))
|
|
||||||
return
|
|
||||||
|
|
||||||
if build_set is not build_set.item.current_build_set:
|
if build_set is not build_set.item.current_build_set:
|
||||||
self.log.warning("Build set %s is not current" % (build_set,))
|
self.log.warning("Build set %s is not current" % (build_set,))
|
||||||
self.nodepool.returnNodeset(request.nodeset)
|
if request.fulfilled:
|
||||||
|
self.nodepool.returnNodeset(request.nodeset)
|
||||||
return
|
return
|
||||||
pipeline = build_set.item.pipeline
|
pipeline = build_set.item.pipeline
|
||||||
if not pipeline:
|
if not pipeline:
|
||||||
self.log.warning("Build set %s is not associated with a pipeline" %
|
self.log.warning("Build set %s is not associated with a pipeline" %
|
||||||
(build_set,))
|
(build_set,))
|
||||||
self.nodepool.returnNodeset(request.nodeset)
|
if request.fulfilled:
|
||||||
|
self.nodepool.returnNodeset(request.nodeset)
|
||||||
return
|
return
|
||||||
pipeline.manager.onNodesProvisioned(event)
|
pipeline.manager.onNodesProvisioned(event)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue