Merge "Fix stuck node requests across ZK reconnection"
This commit is contained in:
commit
2ee1cf40d0
|
@ -37,6 +37,7 @@ from tests.base import (
|
|||
ZuulTestCase,
|
||||
repack_repo,
|
||||
simple_layout,
|
||||
iterate_timeout,
|
||||
)
|
||||
|
||||
|
||||
|
@ -4397,6 +4398,54 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
self.assertEqual(A.reported, 2)
|
||||
|
||||
def test_zookeeper_disconnect2(self):
|
||||
"Test that jobs are executed after a zookeeper disconnect"
|
||||
|
||||
# This tests receiving a ZK disconnect between the arrival of
|
||||
# a fulfilled request and when we accept its nodes.
|
||||
self.fake_nodepool.paused = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# We're waiting on the nodepool request to complete. Stop the
|
||||
# scheduler from processing further events, then fulfill the
|
||||
# nodepool request.
|
||||
self.sched.run_handler_lock.acquire()
|
||||
|
||||
# Fulfill the nodepool request.
|
||||
self.fake_nodepool.paused = False
|
||||
requests = list(self.sched.nodepool.requests.values())
|
||||
self.assertEqual(1, len(requests))
|
||||
request = requests[0]
|
||||
for x in iterate_timeout(30, 'fulfill request'):
|
||||
if request.fulfilled:
|
||||
break
|
||||
id1 = request.id
|
||||
|
||||
# The request is fulfilled, but the scheduler hasn't processed
|
||||
# it yet. Reconnect ZK.
|
||||
self.zk.client.stop()
|
||||
self.zk.client.start()
|
||||
|
||||
# Allow the scheduler to continue and process the (now
|
||||
# out-of-date) notification that nodes are ready.
|
||||
self.sched.run_handler_lock.release()
|
||||
|
||||
# It should resubmit the request, once it's fulfilled, we can
|
||||
# wait for it to run jobs and settle.
|
||||
for x in iterate_timeout(30, 'fulfill request'):
|
||||
if request.fulfilled:
|
||||
break
|
||||
self.waitUntilSettled()
|
||||
|
||||
id2 = request.id
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
self.assertEqual(A.reported, 2)
|
||||
# Make sure it was resubmitted (the id's should be different).
|
||||
self.assertNotEqual(id1, id2)
|
||||
|
||||
def test_nodepool_failure(self):
|
||||
"Test that jobs are reported after a nodepool failure"
|
||||
|
||||
|
|
|
@ -165,6 +165,7 @@ class Nodepool(object):
|
|||
self.log.debug("Updating node request %s" % (request,))
|
||||
|
||||
if request.uid not in self.requests:
|
||||
self.log.debug("Request %s is unknown" % (request.uid,))
|
||||
return False
|
||||
|
||||
if request.canceled:
|
||||
|
@ -193,14 +194,21 @@ class Nodepool(object):
|
|||
|
||||
def acceptNodes(self, request, request_id):
|
||||
# Called by the scheduler when it wants to accept and lock
|
||||
# nodes for (potential) use.
|
||||
# nodes for (potential) use. Return False if there is a
|
||||
# problem with the request (canceled or retrying), True if it
|
||||
# is ready to be acted upon (success or failure).
|
||||
|
||||
self.log.info("Accepting node request %s" % (request,))
|
||||
|
||||
if request_id != request.id:
|
||||
self.log.info("Skipping node accept for %s (resubmitted as %s)",
|
||||
request_id, request.id)
|
||||
return
|
||||
return False
|
||||
|
||||
if request.canceled:
|
||||
self.log.info("Ignoring canceled node request %s" % (request,))
|
||||
# The request was already deleted when it was canceled
|
||||
return False
|
||||
|
||||
# Make sure the request still exists. It's possible it could have
|
||||
# disappeared if we lost the ZK session between when the fulfillment
|
||||
|
@ -208,13 +216,13 @@ class Nodepool(object):
|
|||
# processing it. Nodepool will automatically reallocate the assigned
|
||||
# nodes in that situation.
|
||||
if not self.sched.zk.nodeRequestExists(request):
|
||||
self.log.info("Request %s no longer exists", request.id)
|
||||
return
|
||||
|
||||
if request.canceled:
|
||||
self.log.info("Ignoring canceled node request %s" % (request,))
|
||||
# The request was already deleted when it was canceled
|
||||
return
|
||||
self.log.info("Request %s no longer exists, resubmitting",
|
||||
request.id)
|
||||
request.id = None
|
||||
request.state = model.STATE_REQUESTED
|
||||
self.requests[request.uid] = request
|
||||
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
|
||||
return False
|
||||
|
||||
locked = False
|
||||
if request.fulfilled:
|
||||
|
@ -239,3 +247,4 @@ class Nodepool(object):
|
|||
# them.
|
||||
if locked:
|
||||
self.unlockNodeSet(request.nodeset)
|
||||
return True
|
||||
|
|
|
@ -1035,8 +1035,8 @@ class Scheduler(threading.Thread):
|
|||
request_id = event.request_id
|
||||
build_set = request.build_set
|
||||
|
||||
self.nodepool.acceptNodes(request, request_id)
|
||||
if request.canceled:
|
||||
ready = self.nodepool.acceptNodes(request, request_id)
|
||||
if not ready:
|
||||
return
|
||||
|
||||
if build_set is not build_set.item.current_build_set:
|
||||
|
|
Loading…
Reference in New Issue