Clear nodeset when re-submitting node requests

We encountered an issue where Zuul:

* submitted a node request
* nodepool fulfilled it
* zuul received the ZK watch and refreshed the NodeRequest object
* zuul submitted the node provisioned event to the event queue
* ZK was disconnected/reconnected
* zuul processed the node provisioned event
* zuul found the node request no longer existed (because it's ephemeral)
* zuul resubmitted the node request

Because the NodeRequest object had the provisioned node information
attached to it, the re-submitted request was created with an
existing 'nodes' list.  Nodepool appended to that list and fulfilled
the new request (which requested 1 but received 2 nodes).  This caused
an exception in Zuul's nodepool request watch callback, which caused
Zuul to ignore that and all future updates to the node request.

To address this, we make a new copy of the nodeset without any allocated
node info when re-submitting a request.

This contains an unrelated change to the event id handling from an earlier
revision; it is kept because it will simplify future changes which eliminate
the node request cache altogether.

Change-Id: I72f5ed7ad53e44d77b37870546daf61b8a4e7e09
This commit is contained in:
James E. Blair 2021-08-03 17:15:00 -07:00
parent c30eca959a
commit d87a9a8b8f
4 changed files with 88 additions and 21 deletions

View File

@ -3669,7 +3669,7 @@ class FakeNodepool(object):
request['state'] = 'failed'
else:
request['state'] = 'fulfilled'
nodes = []
nodes = request.get('nodes', [])
for node in request['node_types']:
nodeid = self.makeNode(oid, node)
nodes.append(nodeid)

View File

@ -13,22 +13,23 @@
# under the License.
import threading
import time
from zuul import model
import zuul.nodepool
from tests.base import BaseTestCase, FakeNodepool
from tests.base import BaseTestCase, FakeNodepool, iterate_timeout
from zuul.zk import ZooKeeperClient
from zuul.zk.nodepool import ZooKeeperNodepool
class TestNodepool(BaseTestCase):
class TestNodepoolBase(BaseTestCase):
# Tests the Nodepool interface class using a fake nodepool and
# scheduler.
def setUp(self):
super(TestNodepool, self).setUp()
super().setUp()
self.statsd = None
self.setupZK()
@ -62,6 +63,8 @@ class TestNodepool(BaseTestCase):
# back when a request is provisioned.
self.provisioned_requests.append(request)
class TestNodepool(TestNodepoolBase):
def test_node_request(self):
# Test a simple node request
@ -181,7 +184,7 @@ class TestNodepool(BaseTestCase):
for node in nodeset.getNodes():
self.assertIsNone(node.lock)
self.assertEqual(node.state, 'ready')
self.assertEqual(node.state, 'unknown')
def test_node_request_priority(self):
# Test that requests are satisfied in priority order
@ -266,3 +269,55 @@ class TestNodepool(BaseTestCase):
# original ones from the config
self.assertEqual(restored_nodes[0].name, "ubuntu-xenial-0")
self.assertEqual(restored_nodes[1].name, "ubuntu-xenial-1")
class TestNodepoolResubmit(TestNodepoolBase):
def setUp(self):
super().setUp()
self.run_once = False
self.disconnect_event = threading.Event()
def onNodesProvisioned(self, request):
# This is a scheduler method that the nodepool class calls
# back when a request is provisioned.
d = request.toDict()
d['_oid'] = request.id
self.provisioned_requests.append(d)
if not self.run_once:
self.run_once = True
self.disconnect_event.set()
def _disconnect_thread(self):
self.disconnect_event.wait()
self.zk_client.client.stop()
self.zk_client.client.start()
self.nodepool.checkNodeRequest(self.request, self.request.id)
def test_node_request_disconnect_late(self):
# Test that node requests are re-submitted after a disconnect
# which happens right before we accept the node request.
disconnect_thread = threading.Thread(target=self._disconnect_thread)
disconnect_thread.daemon = True
disconnect_thread.start()
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
self.request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
for x in iterate_timeout(30, 'fulfill request'):
if len(self.provisioned_requests) == 2:
break
# Both requests should be fulfilled and have nodes. The
# important thing here is that they both have the same number
# of nodes (and the second request did not append extra nodes
# to the first).
self.assertEqual(self.provisioned_requests[0]['state'], 'fulfilled')
self.assertEqual(self.provisioned_requests[1]['state'], 'fulfilled')
self.assertNotEqual(self.provisioned_requests[0]['_oid'],
self.provisioned_requests[1]['_oid'])
self.assertEqual(len(self.provisioned_requests[0]['nodes']), 2)
self.assertEqual(len(self.provisioned_requests[1]['nodes']), 2)

View File

@ -793,7 +793,8 @@ class NodeRequest(object):
"""A request for a set of nodes."""
def __init__(self, requestor, build_set_uuid, tenant_name, pipeline_name,
job_name, nodeset, provider, relative_priority, event=None):
job_name, nodeset, provider, relative_priority,
event_id=None):
self.requestor = requestor
self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name
@ -810,15 +811,23 @@ class NodeRequest(object):
self.provider = provider
self.id = None
self._zk_data = {} # Data that we read back from ZK
if event is not None:
self.event_id = event.zuul_event_id
else:
self.event_id = None
self.event_id = event_id
# Zuul internal flags (not stored in ZK so they are not
# overwritten).
self.failed = False
self.canceled = False
def reset(self):
# Reset the node request for re-submission
self._zk_data = {}
# Remove any real node information
self.nodeset = self.nodeset.copy()
self.id = None
self.state = STATE_REQUESTED
self.stat = None
self.failed = False
self.canceled = False
@property
def fulfilled(self):
return (self._state == STATE_FULFILLED) and not self.failed

View File

@ -117,9 +117,13 @@ class Nodepool(object):
# Create a copy of the nodeset to represent the actual nodes
# returned by nodepool.
nodeset = job.nodeset.copy()
if event:
event_id = event.zuul_event_id
else:
event_id = None
req = model.NodeRequest(self.hostname, build_set_uuid, tenant_name,
pipeline_name, job.name, nodeset, provider,
relative_priority, event=event)
relative_priority, event_id)
self.requests[req.uid] = req
if nodeset.nodes:
@ -361,8 +365,8 @@ class Nodepool(object):
try:
for node in nodeset.getNodes():
if node.allocated_to != request_id:
raise Exception("Node %s allocated to %s, not %s",
node.id, node.allocated_to, request_id)
raise Exception("Node %s allocated to %s, not %s" %
(node.id, node.allocated_to, request_id))
self.log.debug("Locking node %s" % (node,))
self.zk_nodepool.lockNode(node, timeout=30)
# Check the allocated_to again to ensure that nodepool didn't
@ -370,8 +374,8 @@ class Nodepool(object):
# were locking them.
if node.allocated_to != request_id:
raise Exception(
"Node %s was reallocated during locking %s, not %s",
node.id, node.allocated_to, request_id)
"Node %s was reallocated during locking %s, not %s" %
(node.id, node.allocated_to, request_id))
locked_nodes.append(node)
except Exception:
self.log.exception("Error locking nodes:")
@ -385,10 +389,11 @@ class Nodepool(object):
log.debug("Updating node request %s", request)
if request.uid not in self.requests:
log.debug("Request %s is unknown", request.uid)
log.debug("Request %s is unknown", request)
return False
if request.canceled:
log.debug("Node request %s was canceled", request)
del self.requests[request.uid]
self.emitStats(request)
return False
@ -396,7 +401,7 @@ class Nodepool(object):
# TODOv3(jeblair): handle allocation failure
if deleted:
log.debug("Resubmitting lost node request %s", request)
request.id = None
request.reset()
self.zk_nodepool.submitNodeRequest(
request, priority, self._updateNodeRequest)
# Stop watching this request node
@ -456,12 +461,10 @@ class Nodepool(object):
if not self.zk_nodepool.nodeRequestExists(request):
log.info("Request %s no longer exists, resubmitting",
request.id)
# Look up the priority from the old request id before resetting
# it.
# Look up the priority from the old request id.
priority = request.id.partition("-")[0]
request.id = None
request.state = model.STATE_REQUESTED
self.requests[request.uid] = request
request.reset()
self.zk_nodepool.submitNodeRequest(
request, priority, self._updateNodeRequest)
return False