Make node requests persistent

The original Nodepool protocol specified that node requests should
be ephemeral, that way if the requestor crashed before accepting
the nodes, the request would automatically be cleaned up and the
nodes returned.  This doesn't comport with multiple schedulers, as
we will soon expect schedulers to stop and start routinely while
we want the node requests they spawn to persist and be handled by
other schedulers.

Fortunately, Nodepool doesn't really care if the request is
ephemeral or not.  So we'll drop the "ephemeral" flag.

But in the short term, we will be stopping the scheduler and that
will leave orphan node requests.  And even in the long term, we
may have a complete Zuul system shutdown or even a bug which may
leak node requests, so we still need a way of deleting node requests
which don't belong.  To handle that, we add a cleanup routine which
we run immediately on startup and every hour that looks for node
requests created by this Zuul system but don't correspond to any
queue entries.  We create a new UUID to identify the Zuul system
and store it in ZK (so that if Nodepool has any other users we
don't delete their requests).

We no longer need to resubmit requests on connection loss, so tests
addressing that behavior are removed.

Change-Id: Ie22e99ef71cbe6b31d40c25a21498c1e867ca777
This commit is contained in:
James E. Blair 2021-08-26 14:33:31 -07:00
parent dbe13ce076
commit e225a28fa5
8 changed files with 157 additions and 180 deletions

View File

@ -12,14 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
from zuul import model
import zuul.nodepool
from tests.base import BaseTestCase, FakeNodepool, iterate_timeout
from tests.base import BaseTestCase, FakeNodepool
from zuul.zk import ZooKeeperClient
from zuul.zk.nodepool import ZooKeeperNodepool
@ -103,24 +101,6 @@ class TestNodepool(TestNodepoolBase):
self.assertIsNone(node.lock)
self.assertEqual(node.state, 'used')
def test_node_request_disconnect(self):
# Test that node requests are re-submitted after disconnect
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.pause()
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.zk_client.client.stop()
self.zk_client.client.start()
self.fake_nodepool.unpause()
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
def test_node_request_canceled(self):
# Test that node requests can be canceled
@ -137,60 +117,6 @@ class TestNodepool(TestNodepoolBase):
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 0)
def test_accept_nodes_resubmitted(self):
# Test that a resubmitted request would not lock nodes
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
# Accept the nodes, passing a different ID
new_nodeset = self.nodepool.checkNodeRequest(
request, "invalid", nodeset)
self.assertIsNone(new_nodeset)
# Don't call acceptNodes here as the node request wasn't accepted.
# Nothing we have done has returned an updated nodeset with
# real node records, so we need to do that ourselves to verify
# they are still unused.
for node_id, node in zip(request.nodes, nodeset.getNodes()):
self.nodepool.zk_nodepool.updateNode(node, node_id)
self.assertIsNone(node.lock)
self.assertEqual(node.state, 'ready')
def test_accept_nodes_lost_request(self):
# Test that a lost request would not lock nodes
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
self.zk_nodepool.deleteNodeRequest(request)
# Accept the nodes
new_nodeset = self.nodepool.checkNodeRequest(
request, request.id, nodeset)
self.assertIsNone(new_nodeset)
# Don't call acceptNodes here as the node request wasn't accepted.
for node in nodeset.getNodes():
self.assertIsNone(node.lock)
self.assertEqual(node.state, 'unknown')
def test_node_request_priority(self):
# Test that requests are satisfied in priority order
@ -210,57 +136,3 @@ class TestNodepool(TestNodepoolBase):
self.assertEqual(request1.state, 'fulfilled')
self.assertEqual(request2.state, 'fulfilled')
self.assertTrue(request2.state_time < request1.state_time)
class TestNodepoolResubmit(TestNodepoolBase):
def setUp(self):
super().setUp()
self.run_once = False
self.disconnect_event = threading.Event()
def onNodesProvisioned(self, request):
# This is a scheduler method that the nodepool class calls
# back when a request is provisioned.
d = request.toDict()
d['_oid'] = request.id
self.provisioned_requests.append(d)
if not self.run_once:
self.run_once = True
self.disconnect_event.set()
def _disconnect_thread(self):
self.disconnect_event.wait()
self.zk_client.client.stop()
self.zk_client.client.start()
self.nodepool.checkNodeRequest(
self.request, self.request.id, self.nodeset)
def test_node_request_disconnect_late(self):
# Test that node requests are re-submitted after a disconnect
# which happens right before we accept the node request.
disconnect_thread = threading.Thread(target=self._disconnect_thread)
disconnect_thread.daemon = True
disconnect_thread.start()
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
self.nodeset = nodeset
job = model.Job('testjob')
job.nodeset = nodeset
self.request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
for x in iterate_timeout(30, 'fulfill request'):
if len(self.provisioned_requests) == 2:
break
# Both requests should be fulfilled and have nodes. The
# important thing here is that they both have the same number
# of nodes (and the second request did not append extra nodes
# to the first).
self.assertEqual(self.provisioned_requests[0]['state'], 'fulfilled')
self.assertEqual(self.provisioned_requests[1]['state'], 'fulfilled')
self.assertNotEqual(self.provisioned_requests[0]['_oid'],
self.provisioned_requests[1]['_oid'])
self.assertEqual(len(self.provisioned_requests[0]['nodes']), 2)
self.assertEqual(len(self.provisioned_requests[1]['nodes']), 2)

View File

@ -6096,7 +6096,6 @@ For CI problems and help debugging, contact ci@example.org"""
for x in iterate_timeout(30, 'fulfill request'):
if request.fulfilled:
break
id1 = request.id
# The request is fulfilled, but the scheduler hasn't processed
# it yet. Reconnect ZK.
@ -6107,18 +6106,35 @@ For CI problems and help debugging, contact ci@example.org"""
# out-of-date) notification that nodes are ready.
self.scheds.first.sched.run_handler_lock.release()
# It should resubmit the request, once it's fulfilled, we can
# The request should persist; once it's fulfilled, we can
# wait for it to run jobs and settle.
for x in iterate_timeout(30, 'fulfill request'):
if request.fulfilled:
break
self.waitUntilSettled()
id2 = request.id
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
# Make sure it was resubmitted (the id's should be different).
self.assertNotEqual(id1, id2)
def test_nodepool_cleanup(self):
"Test that we cleanup leaked node requests"
self.fake_nodepool.pause()
system_id = self.scheds.first.sched.system.system_id
zk_nodepool = self.scheds.first.sched.nodepool.zk_nodepool
req1 = zuul.model.NodeRequest(system_id, "uuid1", "tenant",
"pipeline", "job", ['label'], None,
0, None)
zk_nodepool.submitNodeRequest(req1, 100, lambda x, y: False)
req2 = zuul.model.NodeRequest("someone else", "uuid1", "tenant",
"pipeline", "job", ['label'], None,
0, None)
zk_nodepool.submitNodeRequest(req2, 100, lambda x, y: False)
self.assertEqual(zk_nodepool.getNodeRequests(),
['100-0000000000', '100-0000000001'])
self.scheds.first.sched._runNodeRequestCleanup()
self.assertEqual(zk_nodepool.getNodeRequests(),
['100-0000000001'])
zk_nodepool.deleteNodeRequest(req2)
def test_nodepool_failure(self):
"Test that jobs are reported after a nodepool failure"

View File

@ -69,6 +69,7 @@ from zuul.zk.components import ExecutorComponent
from zuul.zk.exceptions import JobRequestNotFound
from zuul.zk.executor import ExecutorApi
from zuul.zk.job_request_queue import JobRequestEvent
from zuul.zk.system import ZuulSystem
BUFFER_LINES_FOR_SYNTAX = 200
COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose',
@ -3180,7 +3181,9 @@ class ExecutorServer(BaseMergeServer):
'merge_jobs', True)
self.component_info.process_merge_jobs = self.process_merge_jobs
self.nodepool = Nodepool(self.zk_client, self.hostname, self.statsd)
self.system = ZuulSystem(self.zk_client)
self.nodepool = Nodepool(self.zk_client, self.system.system_id,
self.statsd)
self.result_events = PipelineResultEventQueue.createRegistry(
self.zk_client)

View File

@ -33,8 +33,8 @@ def subtract_resources(target, source):
class Nodepool(object):
log = logging.getLogger('zuul.nodepool')
def __init__(self, zk_client, hostname, statsd, scheduler=None):
self.hostname = hostname
def __init__(self, zk_client, system_id, statsd, scheduler=None):
self.system_id = system_id
self.statsd = statsd
# TODO (felix): Remove the scheduler parameter once the nodes are
# locked on the executor side.
@ -117,7 +117,7 @@ class Nodepool(object):
event_id = event.zuul_event_id
else:
event_id = None
req = model.NodeRequest(self.hostname, build_set_uuid, tenant_name,
req = model.NodeRequest(self.system_id, build_set_uuid, tenant_name,
pipeline_name, job.name, labels, provider,
relative_priority, event_id)
self.requests[req.uid] = req
@ -378,7 +378,7 @@ class Nodepool(object):
self._unlockNodes(locked_nodes)
raise
def _updateNodeRequest(self, request, priority, deleted):
def _updateNodeRequest(self, request, priority):
log = get_annotated_logger(self.log, request.event_id)
# Return False to indicate that we should stop watching the
# node.
@ -395,14 +395,7 @@ class Nodepool(object):
return False
# TODOv3(jeblair): handle allocation failure
if deleted:
log.debug("Resubmitting lost node request %s", request)
request.reset()
self.zk_nodepool.submitNodeRequest(
request, priority, self._updateNodeRequest)
# Stop watching this request node
return False
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
log.info("Node request %s %s", request, request.state)
# Give our results to the scheduler.
@ -433,11 +426,6 @@ class Nodepool(object):
# A copy of the nodeset with information about the real nodes
nodeset = job_nodeset.copy()
if request_id != request.id:
log.info("Skipping node accept for %s (resubmitted as %s)",
request_id, request.id)
return None
if request.canceled:
log.info("Ignoring canceled node request %s", request)
# The request was already deleted when it was canceled
@ -449,31 +437,15 @@ class Nodepool(object):
if not request.labels and request.fulfilled:
return nodeset
# Make sure the request still exists. It's possible it could have
# disappeared if we lost the ZK session between when the fulfillment
# response was added to our queue, and when we actually get around to
# processing it. Nodepool will automatically reallocate the assigned
# nodes in that situation.
# Load the node info from ZK.
try:
if not self.zk_nodepool.nodeRequestExists(request):
log.info("Request %s no longer exists, resubmitting",
request.id)
# Look up the priority from the old request id.
priority = request.id.partition("-")[0]
self.requests[request.uid] = request
request.reset()
self.zk_nodepool.submitNodeRequest(
request, priority, self._updateNodeRequest)
return None
else:
for node_id, node in zip(request.nodes, nodeset.getNodes()):
self.zk_nodepool.updateNode(node, node_id)
for node_id, node in zip(request.nodes, nodeset.getNodes()):
self.zk_nodepool.updateNode(node, node_id)
except Exception:
# If we cannot retrieve the node request from ZK we probably lost
# the connection and thus the ZK session. Resubmitting the node
# request probably doesn't make sense at this point in time as it
# is likely to directly fail again. So just log the problem
# with zookeeper and fail here.
# If we cannot retrieve the node request from ZK we
# probably lost the connection and thus the ZK
# session. Just log the problem with zookeeper and fail
# here.
log.exception("Error getting node request %s:", request_id)
request.failed = True
return nodeset

View File

@ -76,6 +76,7 @@ from zuul.zk.cleanup import (
BuildRequestCleanupLock,
GeneralCleanupLock,
MergeRequestCleanupLock,
NodeRequestCleanupLock,
)
from zuul.zk.components import (
BaseComponent, ComponentRegistry, SchedulerComponent
@ -100,6 +101,7 @@ from zuul.zk.locks import (
trigger_queue_lock,
)
from zuul.zk.nodepool import ZooKeeperNodepool
from zuul.zk.system import ZuulSystem
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
@ -172,6 +174,7 @@ class Scheduler(threading.Thread):
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.system = ZuulSystem(self.zk_client)
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
self.component_info = SchedulerComponent(self.zk_client, self.hostname)
self.component_info.register()
@ -211,6 +214,7 @@ class Scheduler(threading.Thread):
self.zk_client)
self.merge_request_cleanup_lock = MergeRequestCleanupLock(
self.zk_client)
self.node_request_cleanup_lock = NodeRequestCleanupLock(self.zk_client)
self.abide = Abide()
self.unparsed_abide = UnparsedAbideConfig()
@ -242,7 +246,7 @@ class Scheduler(threading.Thread):
self.executor = self._executor_client_class(self.config, self)
self.merger = self._merger_client_class(self.config, self)
self.nodepool = nodepool.Nodepool(
self.zk_client, self.hostname, self.statsd, self)
self.zk_client, self.system.system_id, self.statsd, self)
def start(self):
super(Scheduler, self).start()
@ -487,6 +491,10 @@ class Scheduler(threading.Thread):
self._runBuildRequestCleanup()
except Exception:
self.log.exception("Error in build request cleanup:")
try:
self._runNodeRequestCleanup()
except Exception:
self.log.exception("Error in node request cleanup:")
self.apsched.add_job(self._runSemaphoreCleanup,
trigger=self._semaphore_cleanup_interval)
@ -513,11 +521,51 @@ class Scheduler(threading.Thread):
finally:
self.semaphore_cleanup_lock.release()
def _runNodeRequestCleanup(self):
# Get the layout lock to make sure the abide doesn't change
# under us.
with self.layout_lock:
if self.node_request_cleanup_lock.acquire(blocking=False):
try:
self.log.debug("Starting node request cleanup")
try:
self._cleanupNodeRequests()
except Exception:
self.log.exception("Error in node request cleanup:")
finally:
self.semaphore_cleanup_lock.release()
def _cleanupNodeRequests(self):
# Get all the requests in ZK that belong to us
zk_requests = set()
for req_id in self.nodepool.zk_nodepool.getNodeRequests():
req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
if req.requestor == self.system.system_id:
zk_requests.add(req_id)
# Get all the current node requests in the queues
outstanding_requests = set()
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for item in pipeline.getAllItems():
for req in item.current_build_set.node_requests.values():
outstanding_requests.add(req.id)
leaked_requests = zk_requests - outstanding_requests
for req_id in leaked_requests:
try:
self.log.warning("Deleting leaked node request: %s", req_id)
req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
self.nodepool.zk_nodepool.deleteNodeRequest(req)
except Exception:
self.log.exception("Error deleting leaked node request: %s",
req_id)
def _runGeneralCleanup(self):
if self.general_cleanup_lock.acquire(blocking=False):
self._runConfigCacheCleanup()
self._runExecutorApiCleanup()
self._runMergerApiCleanup()
# This has its own locking
self._runNodeRequestCleanup()
def _runConfigCacheCleanup(self):
with self.layout_lock:

View File

@ -41,3 +41,10 @@ class GeneralCleanupLock(kazoo.recipe.lock.Lock):
def __init__(self, client):
super().__init__(client.client, self._path)
class NodeRequestCleanupLock(kazoo.recipe.lock.Lock):
_path = '/zuul/cleanup/node_request'
def __init__(self, client):
super().__init__(client.client, self._path)

View File

@ -330,19 +330,30 @@ class ZooKeeperNodepool(ZooKeeperBase):
path = '{}/{:0>3}-'.format(self.REQUEST_ROOT, priority)
path = self.kazoo_client.create(path, json.dumps(data).encode('utf8'),
makepath=True, sequence=True,
ephemeral=True)
makepath=True, sequence=True)
reqid = path.split("/")[-1]
node_request.id = reqid
def callback(value, _):
if value:
self.updateNodeRequest(node_request, value)
deleted = (value is None) # data *are* none
return watcher(node_request, priority, deleted)
return watcher(node_request, priority)
self.kazoo_client.DataWatch(path, callback)
def getNodeRequests(self):
'''
Get the current list of all node requests in priority sorted order.
:returns: A list of request ids.
'''
try:
requests = self.kazoo_client.get_children(self.REQUEST_ROOT)
except NoNodeError:
return []
return sorted(requests)
def getNodeRequest(self, node_request_id):
"""
Retrieve a NodeRequest from a given path in ZooKeeper.

48
zuul/zk/system.py Normal file
View File

@ -0,0 +1,48 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import uuid
from kazoo.exceptions import NoNodeError, NodeExistsError
from zuul.zk import ZooKeeperBase
# This is outside of the /zuul root, so this holds data we expect to
# be persistent (it will not be deleted by 'zuul delete-state').
SYSTEM_ROOT = "/zuul-system"
class ZuulSystem(ZooKeeperBase):
"""Information about the complete Zuul system
Currently includes only a system UUID for use with Nodepool.
"""
log = logging.getLogger("zuul.System")
def __init__(self, client):
super().__init__(client)
try:
data, stat = self.kazoo_client.get(SYSTEM_ROOT)
except NoNodeError:
system_id = uuid.uuid4().hex
data = json.dumps({'system_id': system_id}).encode('utf8')
try:
self.kazoo_client.create(SYSTEM_ROOT, data)
except NodeExistsError:
data, stat = self.kazoo_client.get(SYSTEM_ROOT)
self.system_id = json.loads(data.decode('utf8'))['system_id']