Fix deduplication exceptions in pipeline processing

If a build is to be deduplicated and has not started yet and has
a pending node request, we store a dictionary describing the target
deduplicated build in the node_requests dictionary on the buildset.

There were a few places where we directly accessed that dictionary
and assumed the results would be the node request id.  Notably, this
could cause an error in pipeline processing (as well os potentially
some other edge cases such as reconfiguring).

Most of the time we can just ignore deduplicated node requests since
the "real" buildset will take care of them.  This change enriches
the API to help with that.  In other places, we add a check for the
type.

To test this, we enable relative_priority in the config file which
is used in the deduplication tests, and we also add an assertion
which runs at the end of every test that ensures there were no
pipeline exceptions during the test (almost all the existing dedup
tests fail this assertion before this change).

Change-Id: Ia0c3f000426011b59542d8e56b43767fccc89a22
This commit is contained in:
James E. Blair 2022-11-02 14:19:56 -07:00 committed by Simon Westphahl
parent c8aac6a118
commit 279d7fb5cd
No known key found for this signature in database
6 changed files with 43 additions and 15 deletions

View File

@ -3610,10 +3610,11 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self.log.debug('No running builds to release')
return
self.log.debug("Releasing build %s (%s)" % (regex, len(builds)))
self.log.debug("Releasing build %s %s (%s)" % (
regex, change, len(builds)))
for build in builds:
if (not regex or re.match(regex, build.name) and
not change or build.change == change):
if ((not regex or re.match(regex, build.name)) and
(not change or build.change == change)):
self.log.debug("Releasing build %s" %
(build.parameters['zuul']['build']))
build.release()
@ -5158,6 +5159,11 @@ class ZuulTestCase(BaseTestCase):
self.assertIsNotNone(build.start_time)
self.assertIsNotNone(build.end_time)
def assertNoPipelineExceptions(self):
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
self.assertEqual(0, pipeline._exception_count)
def assertFinalState(self):
self.log.debug("Assert final state")
# Make sure no jobs are running
@ -5184,6 +5190,7 @@ class ZuulTestCase(BaseTestCase):
for pipeline in tenant.layout.pipelines.values():
if isinstance(pipeline.manager, ipm):
self.assertEqual(len(pipeline.queues), 0)
self.assertNoPipelineExceptions()
def shutdown(self):
self.log.debug("Shutting down after tests")

View File

@ -5,6 +5,7 @@ server=127.0.0.1
[scheduler]
tenant_config=main.yaml
relative_priority=true
[merger]
git_dir=/tmp/zuul-test/merger-git

View File

@ -118,6 +118,11 @@ class ExecutorClient(object):
# Store the NodeRequest ID in the job arguments, so we can look it up
# on the executor side to lock the nodes.
req_id = build.build_set.getJobNodeRequestID(job.name)
if isinstance(req_id, dict):
# This should never happen
raise Exception(
"Attempt to start build with deduplicated node request ID "
f"{req_id}")
if req_id:
params["noderequest_id"] = req_id

View File

@ -1660,7 +1660,7 @@ class PipelineManager(metaclass=ABCMeta):
if (item.live and not dequeued
and self.sched.globals.use_relative_priority):
priority = item.getNodePriority()
for request_id in item.current_build_set.node_requests.values():
for _, request_id in item.current_build_set.getNodeRequests():
node_request = self.sched.nodepool.zk_nodepool.getNodeRequest(
request_id, cached=True)
if not node_request:

View File

@ -473,6 +473,8 @@ class Pipeline(object):
self.window_decrease_factor = None
self.state = None
self.change_list = None
# Only used by the unit tests for assertions
self._exception_count = 0
@property
def queues(self):
@ -4433,8 +4435,18 @@ class BuildSet(zkobject.ZKObject):
with self.activeContext(self.item.pipeline.manager.current_context):
self.node_requests[job_name] = request_id
def getJobNodeRequestID(self, job_name):
return self.node_requests.get(job_name)
def getJobNodeRequestID(self, job_name, ignore_deduplicate=False):
r = self.node_requests.get(job_name)
if ignore_deduplicate and isinstance(r, dict):
return None
return r
def getNodeRequests(self):
# This ignores deduplicated node requests
for job_name, request in self.node_requests.items():
if isinstance(request, dict):
continue
yield job_name, request
def removeJobNodeRequestID(self, job_name):
if job_name in self.node_requests:

View File

@ -688,8 +688,8 @@ class Scheduler(threading.Thread):
# In case we're in the middle of a reconfig,
# include the old queue items.
for item in pipeline.getAllItems(include_old=True):
nrs = item.current_build_set.node_requests
for req_id in (nrs.values()):
nrs = item.current_build_set.getNodeRequests()
for _, req_id in nrs:
outstanding_requests.add(req_id)
leaked_requests = zk_requests - outstanding_requests
for req_id in leaked_requests:
@ -1632,7 +1632,7 @@ class Scheduler(threading.Thread):
item.removeBuild(build)
builds_to_cancel.append(build)
for request_job, request in \
item.current_build_set.node_requests.items():
item.current_build_set.getNodeRequests():
new_job = item.getJob(request_job)
if not new_job:
requests_to_cancel.append(
@ -1654,7 +1654,7 @@ class Scheduler(threading.Thread):
for build in item.current_build_set.getBuilds():
builds_to_cancel.append(build)
for request_job, request in \
item.current_build_set.node_requests.items():
item.current_build_set.getNodeRequests():
requests_to_cancel.append(
(
item.current_build_set,
@ -1776,7 +1776,7 @@ class Scheduler(threading.Thread):
for build in item.current_build_set.getBuilds():
builds_to_cancel.append(build)
for request_job, request in \
item.current_build_set.node_requests.items():
item.current_build_set.getNodeRequests():
requests_to_cancel.append(
(
item.current_build_set,
@ -2225,6 +2225,7 @@ class Scheduler(threading.Thread):
pass
except Exception:
self.log.exception("Exception in pipeline processing:")
pipeline._exception_count += 1
pipeline.state.updateAttributes(
ctx, state=pipeline.STATE_ERROR)
# Continue processing other pipelines+tenants
@ -2820,7 +2821,8 @@ class Scheduler(threading.Thread):
# In case the build didn't show up on any executor, the node
# request does still exist, so we have to make sure it is
# removed from ZK.
request_id = build.build_set.getJobNodeRequestID(build.job.name)
request_id = build.build_set.getJobNodeRequestID(
build.job.name, ignore_deduplicate=True)
if request_id:
self.nodepool.deleteNodeRequest(
request_id, event_id=build.zuul_event_id)
@ -2921,9 +2923,10 @@ class Scheduler(threading.Thread):
# Cancel node request if needed
req_id = buildset.getJobNodeRequestID(job_name)
if req_id:
req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
if req:
self.nodepool.cancelRequest(req)
if not isinstance(req_id, dict):
req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
if req:
self.nodepool.cancelRequest(req)
buildset.removeJobNodeRequestID(job_name)
# Cancel build if needed