From 4ca1913666abe619319d03ebed99aa11273254fd Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 19 Nov 2024 13:39:36 -0800 Subject: [PATCH] Batch fake build zookeeper additions When skipping a job, we create a fake build entry in ZK and also a database record. When skipping a large number of jobs, we create many such builds and after each one, we rewrite the entire QueueItem object in ZooKeeper. To avoid excessive unecessary writes to ZK, create all the fake build objects and then add them to the QueueItem at once, resulting in only a single ZK write operation. A later change will do the same for the database updates. Also, remove an obsolete argument from the addBuild method. Change-Id: I5852f2e260e3f961ac570fa1b90a27d2ac2959dc --- zuul/executor/client.py | 2 +- zuul/model.py | 85 +++++++++++++++++++++++++---------------- zuul/scheduler.py | 2 +- 3 files changed, 54 insertions(+), 35 deletions(-) diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 87c0c40330..b3ae7c5633 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -88,7 +88,7 @@ class ExecutorClient(object): log.debug("Adding build %s of job %s to item %s", build, job, item) - item.addBuild(job, build) + item.addBuild(build) if job.name == 'noop': data = {"start_time": time.time()} diff --git a/zuul/model.py b/zuul/model.py index 832260dcf4..131eec242d 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -5704,14 +5704,18 @@ class BuildSet(zkobject.ZKObject): return self.states_map.get( state_num, 'UNKNOWN (%s)' % state_num) - def addBuild(self, job, build): - # We require the job temporarily until the circular dependency - # refactor is complete at which point the build and its linked - # job should be 1:1. + def addBuild(self, build): + self.addBuilds([build]) + + def addBuilds(self, builds): with self.activeContext(self.item.pipeline.manager.current_context): - self.builds[job.uuid] = build - if job.uuid not in self.tries: - self.tries[job.uuid] = 1 + for build in builds: + self._addBuild(build) + + def _addBuild(self, build): + self.builds[build.job.uuid] = build + if build.job.uuid not in self.tries: + self.tries[build.job.uuid] = 1 def addRetryBuild(self, build): with self.activeContext(self.item.pipeline.manager.current_context): @@ -6055,8 +6059,11 @@ class QueueItem(zkobject.ZKObject): layout_uuid=None) old_build_set.delete(context) - def addBuild(self, job, build): - self.current_build_set.addBuild(job, build) + def addBuild(self, build): + self.current_build_set.addBuild(build) + + def addBuilds(self, builds): + self.current_build_set.addBuilds(builds) def addRetryBuild(self, build): self.current_build_set.addRetryBuild(build) @@ -6412,7 +6419,7 @@ class QueueItem(zkobject.ZKObject): fakebuild = Build.new(self.pipeline.manager.current_context, job=job, build_set=self.current_build_set, error_detail=str(e), result='FAILURE') - self.addBuild(job, fakebuild) + self.addBuild(fakebuild) self.pipeline.manager.sched.reportBuildEnd( fakebuild, tenant=self.pipeline.tenant.name, @@ -6659,15 +6666,19 @@ class QueueItem(zkobject.ZKObject): build.job) skipped += to_skip + fake_builds = [] for job in skipped: child_build = self.current_build_set.getBuild(job) if not child_build: - fakebuild = Build.new(self.pipeline.manager.current_context, - job=job, - build_set=self.current_build_set, - error_detail=skipped_reason, - result='SKIPPED') - self.addBuild(job, fakebuild) + fake_builds.append( + Build.new(self.pipeline.manager.current_context, + job=job, + build_set=self.current_build_set, + error_detail=skipped_reason, + result='SKIPPED')) + if fake_builds: + self.addBuilds(fake_builds) + for fakebuild in fake_builds: self.pipeline.manager.sched.reportBuildEnd( fakebuild, tenant=self.pipeline.tenant.name, @@ -6683,7 +6694,7 @@ class QueueItem(zkobject.ZKObject): error_detail=error, result='NODE_FAILURE', ) - self.addBuild(job, fakebuild) + self.addBuild(fakebuild) self.pipeline.manager.sched.reportBuildEnd( fakebuild, tenant=self.pipeline.tenant.name, @@ -6734,30 +6745,38 @@ class QueueItem(zkobject.ZKObject): self._setAllJobsSkipped('Buildset configuration error') def _setAllJobsSkipped(self, msg): + fake_builds = [] for job in self.getJobs(): - fakebuild = Build.new(self.pipeline.manager.current_context, - job=job, build_set=self.current_build_set, - error_detail=msg, result='SKIPPED') - self.addBuild(job, fakebuild) - self.pipeline.manager.sched.reportBuildEnd( - fakebuild, - tenant=self.pipeline.tenant.name, - final=True) + fake_builds.append(Build.new( + self.pipeline.manager.current_context, + job=job, build_set=self.current_build_set, + error_detail=msg, result='SKIPPED')) + if fake_builds: + self.addBuilds(fake_builds) + for fakebuild in fake_builds: + self.pipeline.manager.sched.reportBuildEnd( + fakebuild, + tenant=self.pipeline.tenant.name, + final=True) def _setMissingJobsSkipped(self, msg): + fake_builds = [] for job in self.getJobs(): build = self.current_build_set.getBuild(job) if build: # We already have a build for this job continue - fakebuild = Build.new(self.pipeline.manager.current_context, - job=job, build_set=self.current_build_set, - error_detail=msg, result='SKIPPED') - self.addBuild(job, fakebuild) - self.pipeline.manager.sched.reportBuildEnd( - fakebuild, - tenant=self.pipeline.tenant.name, - final=True) + fake_builds.append(Build.new( + self.pipeline.manager.current_context, + job=job, build_set=self.current_build_set, + error_detail=msg, result='SKIPPED')) + if fake_builds: + self.addBuilds(fake_builds) + for fakebuild in fake_builds: + self.pipeline.manager.sched.reportBuildEnd( + fakebuild, + tenant=self.pipeline.tenant.name, + final=True) def formatUrlPattern(self, url_pattern, job=None, build=None): url = None diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 2e5611d83a..73e0c6160e 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -3209,7 +3209,7 @@ class Scheduler(threading.Thread): buildset.item.pipeline.manager.current_context, job=job, build_set=item.current_build_set, result='CANCELED') - buildset.addBuild(job, fakebuild) + buildset.addBuild(fakebuild) finally: # Release the semaphore in any case pipeline = buildset.item.pipeline