Fix stuck node requests across ZK reconnection

When a request is fulfilled by nodepool, we add it to the scheduler's
event queue, and later, the scheduler processes the event and accepts
the nodes.  If there is a ZooKeeper disconnection in the interim, then
we will have noticed it and not locked the nodes, however, the scheduler
will still pass on the request to the pipeline manager and we will
attempt to run jobs on the unlocked nodes, which will continually
fail.

This change extends the handling of a lost request so that if it happens,
we retry the request (which is what would happen if the request is lucky
enough to have been lost before fulfillment).

This extends the fix in 94e95886e2.

Change-Id: If81a790ed8b16594f4f9186d9256200b8d5e707e
This commit is contained in:
James E. Blair 2018-02-06 13:43:50 -08:00
parent acb632d51b
commit b4bed1d2c3
3 changed files with 69 additions and 11 deletions

View File

@ -37,6 +37,7 @@ from tests.base import (
ZuulTestCase,
repack_repo,
simple_layout,
iterate_timeout,
)
@ -4397,6 +4398,54 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
def test_zookeeper_disconnect2(self):
"Test that jobs are executed after a zookeeper disconnect"
# This tests receiving a ZK disconnect between the arrival of
# a fulfilled request and when we accept its nodes.
self.fake_nodepool.paused = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
# We're waiting on the nodepool request to complete. Stop the
# scheduler from processing further events, then fulfill the
# nodepool request.
self.sched.run_handler_lock.acquire()
# Fulfill the nodepool request.
self.fake_nodepool.paused = False
requests = list(self.sched.nodepool.requests.values())
self.assertEqual(1, len(requests))
request = requests[0]
for x in iterate_timeout(30, 'fulfill request'):
if request.fulfilled:
break
id1 = request.id
# The request is fulfilled, but the scheduler hasn't processed
# it yet. Reconnect ZK.
self.zk.client.stop()
self.zk.client.start()
# Allow the scheduler to continue and process the (now
# out-of-date) notification that nodes are ready.
self.sched.run_handler_lock.release()
# It should resubmit the request, once it's fulfilled, we can
# wait for it to run jobs and settle.
for x in iterate_timeout(30, 'fulfill request'):
if request.fulfilled:
break
self.waitUntilSettled()
id2 = request.id
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
# Make sure it was resubmitted (the id's should be different).
self.assertNotEqual(id1, id2)
def test_nodepool_failure(self):
"Test that jobs are reported after a nodepool failure"

View File

@ -165,6 +165,7 @@ class Nodepool(object):
self.log.debug("Updating node request %s" % (request,))
if request.uid not in self.requests:
self.log.debug("Request %s is unknown" % (request.uid,))
return False
if request.canceled:
@ -193,14 +194,21 @@ class Nodepool(object):
def acceptNodes(self, request, request_id):
# Called by the scheduler when it wants to accept and lock
# nodes for (potential) use.
# nodes for (potential) use. Return False if there is a
# problem with the request (canceled or retrying), True if it
# is ready to be acted upon (success or failure).
self.log.info("Accepting node request %s" % (request,))
if request_id != request.id:
self.log.info("Skipping node accept for %s (resubmitted as %s)",
request_id, request.id)
return
return False
if request.canceled:
self.log.info("Ignoring canceled node request %s" % (request,))
# The request was already deleted when it was canceled
return False
# Make sure the request still exists. It's possible it could have
# disappeared if we lost the ZK session between when the fulfillment
@ -208,13 +216,13 @@ class Nodepool(object):
# processing it. Nodepool will automatically reallocate the assigned
# nodes in that situation.
if not self.sched.zk.nodeRequestExists(request):
self.log.info("Request %s no longer exists", request.id)
return
if request.canceled:
self.log.info("Ignoring canceled node request %s" % (request,))
# The request was already deleted when it was canceled
return
self.log.info("Request %s no longer exists, resubmitting",
request.id)
request.id = None
request.state = model.STATE_REQUESTED
self.requests[request.uid] = request
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
return False
locked = False
if request.fulfilled:
@ -239,3 +247,4 @@ class Nodepool(object):
# them.
if locked:
self.unlockNodeSet(request.nodeset)
return True

View File

@ -1035,8 +1035,8 @@ class Scheduler(threading.Thread):
request_id = event.request_id
build_set = request.build_set
self.nodepool.acceptNodes(request, request_id)
if request.canceled:
ready = self.nodepool.acceptNodes(request, request_id)
if not ready:
return
if build_set is not build_set.item.current_build_set: