Move parent provider determination to pipeline manager

Moving the parent provider determination into the pipeline manager
allows us to remove the buildset and job objects from the NodeRequest
constructor. This way we can fully serialize the NodeRequest to
ZooKeeper and restore it without missing important information.

This has also an impact on the NodeRequest's priority property. As this
is only needed to determine the znode path when the NodeRequest is
submitted, we can provide it directly as parameter to the
submitNodeRequest call (and the related update callbacks).

To ensure that NodePool doesn't strip those additional information when
it fulfills the NodeRequest, we use the new "requestor_data" field which
is implemented in [1].

To make this work, we also have to look up the buildset by its UUID from
the active tenants and pipelines when the NodesProvisioned event is
handled in the scheduler. Something similar was already done for
handling the other result events as well.

[1]: https://review.opendev.org/c/zuul/nodepool/+/798746/

Depends-On: https://review.opendev.org/c/zuul/nodepool/+/798746/
Change-Id: Id794643dcf26b0565499d20adba99d3b0518fdf1
This commit is contained in:
Felix Edel 2021-06-14 09:20:31 +02:00 committed by Clark Boylan
parent 10966948d7
commit 040c5c8032
8 changed files with 153 additions and 102 deletions

View File

@ -0,0 +1,5 @@
---
upgrade:
- |
Zuul now requires at least Nodepool version of 4.2.0 due to an
internal API change.

View File

@ -59,7 +59,8 @@ class TestNodepoolIntegration(BaseTestCase):
nodeset.addNode(model.Node(['controller'], 'fake-label'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job, 0)
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, model.STATE_FULFILLED)
@ -89,7 +90,8 @@ class TestNodepoolIntegration(BaseTestCase):
nodeset.addNode(model.Node(['controller'], 'invalid-label'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job, 0)
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, model.STATE_FAILED)
@ -104,7 +106,8 @@ class TestNodepoolIntegration(BaseTestCase):
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job, 0)
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.zk_client.client.stop()
self.zk_client.client.start()
self.fake_nodepool.paused = False
@ -122,7 +125,8 @@ class TestNodepoolIntegration(BaseTestCase):
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job, 0)
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.nodepool.cancelRequest(request)
self.waitForRequests()

View File

@ -70,7 +70,8 @@ class TestNodepool(BaseTestCase):
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job, 0)
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
@ -107,7 +108,8 @@ class TestNodepool(BaseTestCase):
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.pause()
request = self.nodepool.requestNodes(None, job, 0)
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.zk_client.client.stop()
self.zk_client.client.start()
self.fake_nodepool.unpause()
@ -124,7 +126,8 @@ class TestNodepool(BaseTestCase):
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.pause()
request = self.nodepool.requestNodes(None, job, 0)
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.nodepool.cancelRequest(request)
self.waitForRequests()
@ -138,7 +141,8 @@ class TestNodepool(BaseTestCase):
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job, 0)
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
@ -161,7 +165,8 @@ class TestNodepool(BaseTestCase):
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job, 0)
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
@ -187,8 +192,10 @@ class TestNodepool(BaseTestCase):
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.pause()
request1 = self.nodepool.requestNodes(None, job, 1)
request2 = self.nodepool.requestNodes(None, job, 0)
request1 = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 1)
request2 = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
self.fake_nodepool.unpause()
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 2)

View File

@ -622,17 +622,47 @@ class PipelineManager(metaclass=ABCMeta):
build_set = item.current_build_set
log.debug("Requesting nodes for change %s", item.change)
if self.sched.use_relative_priority:
priority = item.getNodePriority()
relative_priority = item.getNodePriority()
else:
priority = 0
relative_priority = 0
for job in jobs:
provider = self._getPausedParentProvider(build_set, job)
priority = self._calculateNodeRequestPriority(build_set, job)
tenant_name = build_set.item.pipeline.tenant.name
pipeline_name = build_set.item.pipeline.name
req = self.sched.nodepool.requestNodes(
build_set, job, priority, event=item.event)
build_set.uuid, job, tenant_name, pipeline_name, provider,
priority, relative_priority, event=item.event)
log.debug("Adding node request %s for job %s to item %s",
req, job, item)
build_set.setJobNodeRequest(job.name, req)
return True
def _getPausedParent(self, build_set, job):
job_graph = build_set.item.job_graph
if job_graph:
for parent in job_graph.getParentJobsRecursively(job.name):
build = build_set.getBuild(parent.name)
if build.paused:
return build
return None
def _getPausedParentProvider(self, build_set, job):
build = self._getPausedParent(build_set, job)
if build:
nodeset = build_set.getJobNodeSet(build.job.name)
if nodeset and nodeset.nodes:
return list(nodeset.nodes.values())[0].provider
return None
def _calculateNodeRequestPriority(self, build_set, job):
precedence_adjustment = 0
precedence = build_set.item.pipeline.precedence
if self._getPausedParent(build_set, job):
precedence_adjustment = -1
initial_precedence = model.PRIORITY_MAP[precedence]
return max(0, initial_precedence + precedence_adjustment)
def _executeJobs(self, item, jobs):
log = get_annotated_logger(self.log, item.event)
log.debug("Executing jobs for change %s", item.change)
@ -1452,25 +1482,24 @@ class PipelineManager(metaclass=ABCMeta):
repo_state[connection] = event.repo_state[connection]
build_set.repo_state_state = build_set.COMPLETE
def onNodesProvisioned(self, event):
def onNodesProvisioned(self, event, build_set):
# TODOv3(jeblair): handle provisioning failure here
request = event.request
log = get_annotated_logger(self.log, request.event_id)
build_set = request.build_set
build_set.jobNodeRequestComplete(request.job.name,
request.nodeset)
build_set.jobNodeRequestComplete(request.job_name, request.nodeset)
if request.failed or not request.fulfilled:
log.info("Node request %s: failure for %s",
request, request.job.name)
build_set.item.setNodeRequestFailure(request.job)
self._resumeBuilds(request.build_set)
request, request.job_name)
job = build_set.item.getJob(request.job_name)
build_set.item.setNodeRequestFailure(job)
self._resumeBuilds(build_set)
tenant = build_set.item.pipeline.tenant
tenant.semaphore_handler.release(build_set.item, request.job)
tenant.semaphore_handler.release(build_set.item, job)
log.info("Completed node request %s for job %s of item %s "
"with nodes %s",
request, request.job, build_set.item, request.nodeset)
request, request.job_name, build_set.item, request.nodeset)
def reportItem(self, item):
log = get_annotated_logger(self.log, item.event)

View File

@ -791,11 +791,13 @@ class NodeSet(ConfigObject):
class NodeRequest(object):
"""A request for a set of nodes."""
def __init__(self, requestor, build_set, job, nodeset, relative_priority,
event=None):
def __init__(self, requestor, build_set_uuid, tenant_name, pipeline_name,
job_name, nodeset, provider, relative_priority, event=None):
self.requestor = requestor
self.build_set = build_set
self.job = job
self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.job_name = job_name
self.nodeset = nodeset
self._state = STATE_REQUESTED
self.requested_time = time.time()
@ -804,7 +806,7 @@ class NodeRequest(object):
self.stat = None
self.uid = uuid4().hex
self.relative_priority = relative_priority
self.provider = self._getPausedParentProvider()
self.provider = provider
self.id = None
self._zk_data = {} # Data that we read back from ZK
if event is not None:
@ -816,37 +818,6 @@ class NodeRequest(object):
self.failed = False
self.canceled = False
def _getPausedParent(self):
if self.build_set:
job_graph = self.build_set.item.job_graph
if job_graph:
for parent in job_graph.getParentJobsRecursively(
self.job.name):
build = self.build_set.getBuild(parent.name)
if build.paused:
return build
return None
def _getPausedParentProvider(self):
build = self._getPausedParent()
if build:
nodeset = self.build_set.getJobNodeSet(build.job.name)
if nodeset and nodeset.nodes:
return list(nodeset.nodes.values())[0].provider
return None
@property
def priority(self):
precedence_adjustment = 0
if self.build_set:
precedence = self.build_set.item.pipeline.precedence
if self._getPausedParent():
precedence_adjustment = -1
else:
precedence = PRECEDENCE_NORMAL
initial_precedence = PRIORITY_MAP[precedence]
return max(0, initial_precedence + precedence_adjustment)
@property
def fulfilled(self):
return (self._state == STATE_FULFILLED) and not self.failed
@ -866,8 +837,22 @@ class NodeRequest(object):
return '<NodeRequest %s %s>' % (self.id, self.nodeset)
def toDict(self):
"""
Serialize a NodeRequest so it can be stored in ZooKeeper.
Any additional information must be stored in the requestor_data field,
so Nodepool doesn't strip the information when it fulfills the request.
"""
# Start with any previously read data
d = self._zk_data.copy()
# The requestor_data is opaque to nodepool and won't be touched when
# by nodepool when it fulfills the request.
d["requestor_data"] = {
"build_set_uuid": self.build_set_uuid,
"tenant_name": self.tenant_name,
"pipeline_name": self.pipeline_name,
"job_name": self.job_name,
}
nodes = [n.label for n in self.nodeset.getNodes()]
# These are immutable once set
d.setdefault('node_types', nodes)
@ -891,21 +876,20 @@ class NodeRequest(object):
def fromDict(cls, data):
"""Deserialize a NodeRequest from the data in ZooKeeper.
When nodepool answeres/updates the NodeRequest in ZooKeeper, it strips
any additional attributes. Thus, we can only deserialize a NodeRequest
with a subset of attributes.
Any additional information must be stored in the requestor_data field,
so Nodepool doesn't strip the information when it fulfills the request.
"""
# The requestor_data contains zuul-specific information which is opaque
# to nodepool and returned as-is when the NodeRequest is fulfilled.
requestor_data = data["requestor_data"]
request = cls(
requestor=data["requestor"],
# TODO (felix): Check if the build_set and job parameters are still
# required on the NodeRequest. In the current implementation they
# aren't available when the node request is deserialized on the
# executor because those values couldn't be serialized to ZK in the
# first place. So there might be a good chance that they aren't
# needed on the scheduler as well.
build_set=None,
job=None,
build_set_uuid=requestor_data["build_set_uuid"],
tenant_name=requestor_data["tenant_name"],
pipeline_name=requestor_data["pipeline_name"],
job_name=requestor_data["job_name"],
nodeset=None,
provider=data["provider"],
relative_priority=data.get("relative_priority", 0),
)
@ -4119,6 +4103,7 @@ class NodesProvisionedEvent(ResultEvent):
def __init__(self, request):
self.request = request
self.build_set_uuid = request.build_set_uuid
self.request_id = request.id
def toDict(self):

View File

@ -111,17 +111,20 @@ class Nodepool(object):
self.statsd.incr(
key, value * duration, project=project, resource=resource)
def requestNodes(self, build_set, job, relative_priority, event=None):
def requestNodes(self, build_set_uuid, job, tenant_name, pipeline_name,
provider, priority, relative_priority, event=None):
log = get_annotated_logger(self.log, event)
# Create a copy of the nodeset to represent the actual nodes
# returned by nodepool.
nodeset = job.nodeset.copy()
req = model.NodeRequest(self.hostname, build_set, job,
nodeset, relative_priority, event=event)
req = model.NodeRequest(self.hostname, build_set_uuid, tenant_name,
pipeline_name, job.name, nodeset, provider,
relative_priority, event=event)
self.requests[req.uid] = req
if nodeset.nodes:
self.zk_nodepool.submitNodeRequest(req, self._updateNodeRequest)
self.zk_nodepool.submitNodeRequest(
req, priority, self._updateNodeRequest)
# Logged after submission so that we have the request id
log.info("Submitted node request %s", req)
self.emitStats(req)
@ -375,7 +378,7 @@ class Nodepool(object):
self._unlockNodes(locked_nodes)
raise
def _updateNodeRequest(self, request, deleted):
def _updateNodeRequest(self, request, priority, deleted):
log = get_annotated_logger(self.log, request.event_id)
# Return False to indicate that we should stop watching the
# node.
@ -395,7 +398,7 @@ class Nodepool(object):
log.debug("Resubmitting lost node request %s", request)
request.id = None
self.zk_nodepool.submitNodeRequest(
request, self._updateNodeRequest)
request, priority, self._updateNodeRequest)
# Stop watching this request node
return False
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
@ -453,11 +456,14 @@ class Nodepool(object):
if not self.zk_nodepool.nodeRequestExists(request):
log.info("Request %s no longer exists, resubmitting",
request.id)
# Look up the priority from the old request id before resetting
# it.
priority = request.id.partition("-")[0]
request.id = None
request.state = model.STATE_REQUESTED
self.requests[request.uid] = request
self.zk_nodepool.submitNodeRequest(
request, self._updateNodeRequest)
request, priority, self._updateNodeRequest)
return False
except Exception:
# If we cannot retrieve the node request from ZK we probably lost

View File

@ -1126,17 +1126,22 @@ class Scheduler(threading.Thread):
for request_job, request in \
item.current_build_set.node_requests.items():
requests_to_cancel.append(
(item.current_build_set, request))
(
item.current_build_set,
request,
item.getJob(request_job),
)
)
for build in builds_to_cancel:
self.log.info(
"Canceling build %s during reconfiguration" % (build,))
self.cancelJob(build.build_set, build.job, build=build)
for build_set, request in requests_to_cancel:
for build_set, request, request_job in requests_to_cancel:
self.log.info(
"Canceling node request %s during reconfiguration",
request)
self.cancelJob(build_set, request.job)
self.cancelJob(build_set, request_job)
for name, old_pipeline in old_tenant.layout.pipelines.items():
new_pipeline = tenant.layout.pipelines.get(name)
if not new_pipeline:
@ -1206,18 +1211,23 @@ class Scheduler(threading.Thread):
for request_job, request in \
item.current_build_set.node_requests.items():
requests_to_cancel.append(
(item.current_build_set, request))
(
item.current_build_set,
request,
item.getJob(request_job),
)
)
for build in builds_to_cancel:
self.log.info(
"Canceling build %s during reconfiguration" % (build,))
self.cancelJob(build.build_set, build.job,
build=build, force=True)
for build_set, request in requests_to_cancel:
for build_set, request, request_job in requests_to_cancel:
self.log.info(
"Canceling node request %s during reconfiguration",
request)
self.cancelJob(build_set, request.job, force=True)
self.cancelJob(build_set, request_job, force=True)
def _doPromoteEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)
@ -1852,7 +1862,9 @@ class Scheduler(threading.Thread):
def _doNodesProvisionedEvent(self, event):
request = event.request
request_id = event.request_id
build_set = request.build_set
tenant_name = request.tenant_name
pipeline_name = request.pipeline_name
log = get_annotated_logger(self.log, request.event_id)
ready = self.nodepool.checkNodeRequest(request, request_id)
@ -1864,31 +1876,34 @@ class Scheduler(threading.Thread):
if request.state == STATE_FAILED:
self.nodepool.deleteNodeRequest(request)
if build_set is not build_set.item.current_build_set:
log.warning("Build set %s is not current "
"for node request %s", build_set, request)
# Look up the pipeline by its name
# TODO (felix): The pipeline lookup can be removed once the
# NodesProvisionedEvents are in ZooKeeper.
pipeline = None
tenant = self.abide.tenants.get(tenant_name)
for pl in tenant.layout.pipelines.values():
if pl.name == pipeline_name:
pipeline = pl
break
build_set = self._getBuildSetFromPipeline(event, pipeline)
if not build_set:
if request.fulfilled:
self.nodepool.returnNodeSet(request.nodeset,
zuul_event_id=request.event_id)
return
if request.job.name not in [x.name for x in build_set.item.getJobs()]:
if request.job_name not in [x.name for x in build_set.item.getJobs()]:
log.warning("Item %s does not contain job %s "
"for node request %s",
build_set.item, request.job.name, request)
build_set.removeJobNodeRequest(request.job.name)
build_set.item, request.job_name, request)
build_set.removeJobNodeRequest(request.job_name)
if request.fulfilled:
self.nodepool.returnNodeSet(request.nodeset,
zuul_event_id=request.event_id)
return
pipeline = build_set.item.pipeline
if not pipeline:
log.warning("Build set %s is not associated with a pipeline "
"for node request %s", build_set, request)
if request.fulfilled:
self.nodepool.returnNodeSet(request.nodeset,
zuul_event_id=request.event_id)
return
pipeline.manager.onNodesProvisioned(event)
pipeline.manager.onNodesProvisioned(event, build_set)
def formatStatusJSON(self, tenant_name):
# TODOv3(jeblair): use tenants

View File

@ -366,7 +366,7 @@ class ZooKeeperNodepool(ZooKeeperBase):
self.log.exception(
"Exception in hold request cache update for event: %s", event)
def submitNodeRequest(self, node_request, watcher):
def submitNodeRequest(self, node_request, priority, watcher):
"""
Submit a request for nodes to Nodepool.
@ -385,7 +385,7 @@ class ZooKeeperNodepool(ZooKeeperBase):
node_request.created_time = time.time()
data = node_request.toDict()
path = '{}/{:0>3}-'.format(self.REQUEST_ROOT, node_request.priority)
path = '{}/{:0>3}-'.format(self.REQUEST_ROOT, priority)
path = self.kazoo_client.create(path, json.dumps(data).encode('utf8'),
makepath=True, sequence=True,
ephemeral=True)
@ -396,7 +396,7 @@ class ZooKeeperNodepool(ZooKeeperBase):
if value:
self.updateNodeRequest(node_request, value)
deleted = (value is None) # data *are* none
return watcher(node_request, deleted)
return watcher(node_request, priority, deleted)
self.kazoo_client.DataWatch(path, callback)