Deduplicate circular dep jobs in check

If a cycle of dependencies in a dependent pipeline meet certain
conditions and share some jobs, we deduplicate those jobs (i.e.,
we run only one build of the job and share it between multiple
items simultaneously).

This makes the same happen for independent pipelines.  This is
accomplished by searching for identical bundles within the pipeline
and deduplicating the job across bundles.  The deduplication is
no different than dependent pipelines -- we simply store a reference
to the actual build in ZK and use the same Build object in Python.

In dependent pipelines, bundle queue items are removed
simultaneously (they all report at the same time).  However, in
independent pipelines, they may report at different times.  If
a queue item is dequeued while one of its builds is referenced by
a different queue item, then the second queue item will begin to
have errors loading information from ZK.

To deal with this, we utlilize a property of how we store items in
ZK: the QueueItem itself is stored underneath the pipeline object
with no reference to its place in the queue.  Separately there are
queue lists which reference the QueueItem zobjects by path.

We will continue to remove the QueueItem from the Queue, but we will
leave it in place in ZK if it is possible that it may be referenced
by other queue items.  Then we will rely more heavily on a garbage
collection system to remove the QueueItems once they, and their builds,
are no longer referenced by any item in the pipeline.

The existing garbage collection happens after loading the pipeline
state from ZK, but this change moves it to the end of pipeline
processing so that we are more likely to write out empty pipelines
at the end of the process once all items are removed.

Change-Id: I7fc897088d720f483e320431fefaa06425a33c86
This commit is contained in:
James E. Blair
2023-03-10 12:27:33 -08:00
parent c015c1aa84
commit b913a67889
4 changed files with 198 additions and 27 deletions

View File

@@ -0,0 +1,78 @@
- queue:
name: integrated
allow-circular-dependencies: true
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- pipeline:
name: gate
manager: dependent
success-message: Build succeeded (gate).
require:
gerrit:
approval:
- Approved: 1
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
success:
gerrit:
Verified: 2
submit: true
failure:
gerrit:
Verified: -2
start:
gerrit:
Verified: 0
precedence: high
- job:
name: base
parent: null
run: playbooks/run.yaml
nodeset:
nodes:
- label: debian
name: controller
- job:
name: common-job
required-projects:
- org/project1
- org/project2
- job:
name: project1-job
- job:
name: project2-job
- project:
name: org/project1
queue: integrated
check:
jobs:
- common-job
- project1-job
- project:
name: org/project2
queue: integrated
check:
jobs:
- common-job
- project2-job

View File

@@ -2065,6 +2065,55 @@ class TestGerritCircularDependencies(ZuulTestCase):
def test_job_deduplication_semaphore_resources_first(self):
self._test_job_deduplication_semaphore()
@simple_layout('layouts/job-dedup-auto-shared-check.yaml')
def test_job_deduplication_check(self):
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
# A <-> B
A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
A.subject, B.data["url"]
)
B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
B.subject, A.data["url"]
)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.executor_server.release('common-job')
self.executor_server.release('project1-job')
self.waitUntilSettled()
# We do this even though it results in no changes to force an
# extra pipeline processing run to make sure we don't garbage
# collect the item early.
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.executor_server.release('project2-job')
self.waitUntilSettled()
self.assertHistory([
dict(name="project1-job", result="SUCCESS", changes="2,1 1,1"),
dict(name="common-job", result="SUCCESS", changes="2,1 1,1"),
dict(name="project2-job", result="SUCCESS", changes="1,1 2,1"),
# This is deduplicated
# dict(name="common-job", result="SUCCESS", changes="2,1 1,1"),
], ordered=False)
self.assertEqual(len(self.fake_nodepool.history), 3)
# Make sure there are no leaked queue items
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
pipeline = tenant.layout.pipelines["check"]
pipeline_path = pipeline.state.getPath()
all_items = set(self.zk_client.client.get_children(
f"{pipeline_path}/item"))
self.assertEqual(len(all_items), 0)
def test_submitted_together(self):
self.fake_gerrit._fake_submit_whole_topic = True
A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",

View File

@@ -907,8 +907,19 @@ class PipelineState(zkobject.ZKObject):
except NoNodeError:
all_items = set()
known_items = {i.uuid for i in self._getKnownItems()}
stale_items = all_items - known_items
known_item_objs = self._getKnownItems()
known_items = {i.uuid for i in known_item_objs}
items_referenced_by_builds = set()
for i in known_item_objs:
build_set = i.current_build_set
job_graph = build_set.job_graph
if not job_graph:
continue
for job in job_graph.getJobs():
build = build_set.getBuild(job.name)
if build:
items_referenced_by_builds.add(build.build_set.item.uuid)
stale_items = all_items - known_items - items_referenced_by_builds
for item_uuid in stale_items:
self.pipeline.manager.log.debug("Cleaning up stale item %s",
item_uuid)
@@ -1249,16 +1260,14 @@ class ChangeQueue(zkobject.ZKObject):
item_ahead=item.item_ahead)
if item.bundle:
items_in_pipeline = self.pipeline.getAllItems(include_old=True)
if any([i in items_in_pipeline for i in item.bundle.items]):
item.updateAttributes(
self.zk_context, item_ahead=None, items_behind=[],
dequeue_time=time.time())
else:
# We no longer need the bundle items
for bundle_item in item.bundle.items:
bundle_item.delete(self.zk_context)
item._set(dequeue_time=time.time())
# This item may have builds referenced by other items in
# the bundle, or even other bundles (in the case of
# independent pipelines). Rather than trying to figure
# that out here, we will just let PipelineState.cleanup
# handle garbage collecting these items when done.
item.updateAttributes(
self.zk_context, item_ahead=None, items_behind=[],
dequeue_time=time.time())
else:
item.delete(self.zk_context)
# We use the dequeue time for stats reporting, but the queue
@@ -5304,7 +5313,33 @@ class QueueItem(zkobject.ZKObject):
ret = False
return ret
def findDuplicateJob(self, job):
def findDuplicateBundles(self):
"""
Find other bundles in the pipeline that are equivalent to ours.
"""
if not self.bundle:
return None
ret = []
for item in self.queue.pipeline.getAllItems():
if not item.live:
continue
if item is self:
continue
if not item.bundle:
continue
other_bundle_changes = {i.change for i in item.bundle.items}
this_bundle_changes = {i.change for i in self.bundle.items}
if other_bundle_changes != this_bundle_changes:
continue
other_item_queue = {i.change for i in item.queue.queue}
this_item_queue = {i.change for i in self.queue.queue}
if other_item_queue != this_item_queue:
continue
if item.bundle not in ret:
ret.append(item.bundle)
return ret
def findDuplicateJob(self, job, other_bundles):
"""
If another item in the bundle has a duplicate job,
return the other item
@@ -5313,18 +5348,20 @@ class QueueItem(zkobject.ZKObject):
return None
if job.deduplicate is False:
return None
for other_item in self.bundle.items:
if other_item is self:
continue
for other_job in other_item.getJobs():
if other_job.isEqual(job):
if job.deduplicate == 'auto':
# Deduplicate if there are required projects
# or the item project is the same.
if (not job.required_projects and
self.change.project != other_item.change.project):
continue
return other_item
for other_bundle in other_bundles:
for other_item in other_bundle.items:
if other_item is self:
continue
for other_job in other_item.getJobs():
if other_job.isEqual(job):
if job.deduplicate == 'auto':
# Deduplicate if there are required projects
# or the item project is the same.
if (not job.required_projects and
self.change.project !=
other_item.change.project):
continue
return other_item
def updateJobParentData(self):
job_graph = self.current_build_set.job_graph
@@ -5420,6 +5457,13 @@ class QueueItem(zkobject.ZKObject):
build_set = self.current_build_set
job_graph = build_set.job_graph
if self.bundle and len([i for i in self.bundle.items if i.live]) > 1:
# We are in a queue that has multiple live items, so we
# will only check our own bundle.
other_bundles = [self.bundle]
else:
# Look for identical bundles elsewhere in the pipeline
other_bundles = self.findDuplicateBundles()
for job in job_graph.getJobs():
this_request = build_set.getJobNodeRequestID(job.name)
this_nodeset = build_set.getJobNodeSetInfo(job.name)
@@ -5429,7 +5473,7 @@ class QueueItem(zkobject.ZKObject):
# Nothing more possible for this job
continue
other_item = self.findDuplicateJob(job)
other_item = self.findDuplicateJob(job, other_bundles)
if not other_item:
continue
other_build_set = other_item.current_build_set

View File

@@ -2327,7 +2327,6 @@ class Scheduler(threading.Thread):
pipeline.state.setDirty(self.zk_client.client)
if pipeline.state.old_queues:
self._reenqueuePipeline(tenant, pipeline, ctx)
pipeline.state.cleanup(ctx)
with self.statsd_timer(f'{stats_key}.event_process'):
self.process_pipeline_management_queue(tenant, pipeline)
@@ -2339,6 +2338,7 @@ class Scheduler(threading.Thread):
with self.statsd_timer(f'{stats_key}.process'):
while not self._stopped and pipeline.manager.processQueue():
pass
pipeline.state.cleanup(ctx)
except Exception:
self.log.exception("Exception in pipeline processing:")
pipeline._exception_count += 1