Batch fake build zookeeper additions
When skipping a job, we create a fake build entry in ZK and also a database record. When skipping a large number of jobs, we create many such builds and after each one, we rewrite the entire QueueItem object in ZooKeeper. To avoid excessive unecessary writes to ZK, create all the fake build objects and then add them to the QueueItem at once, resulting in only a single ZK write operation. A later change will do the same for the database updates. Also, remove an obsolete argument from the addBuild method. Change-Id: I5852f2e260e3f961ac570fa1b90a27d2ac2959dc
This commit is contained in:
parent
82df4a0822
commit
4ca1913666
@ -88,7 +88,7 @@ class ExecutorClient(object):
|
||||
|
||||
log.debug("Adding build %s of job %s to item %s",
|
||||
build, job, item)
|
||||
item.addBuild(job, build)
|
||||
item.addBuild(build)
|
||||
|
||||
if job.name == 'noop':
|
||||
data = {"start_time": time.time()}
|
||||
|
@ -5704,14 +5704,18 @@ class BuildSet(zkobject.ZKObject):
|
||||
return self.states_map.get(
|
||||
state_num, 'UNKNOWN (%s)' % state_num)
|
||||
|
||||
def addBuild(self, job, build):
|
||||
# We require the job temporarily until the circular dependency
|
||||
# refactor is complete at which point the build and its linked
|
||||
# job should be 1:1.
|
||||
def addBuild(self, build):
|
||||
self.addBuilds([build])
|
||||
|
||||
def addBuilds(self, builds):
|
||||
with self.activeContext(self.item.pipeline.manager.current_context):
|
||||
self.builds[job.uuid] = build
|
||||
if job.uuid not in self.tries:
|
||||
self.tries[job.uuid] = 1
|
||||
for build in builds:
|
||||
self._addBuild(build)
|
||||
|
||||
def _addBuild(self, build):
|
||||
self.builds[build.job.uuid] = build
|
||||
if build.job.uuid not in self.tries:
|
||||
self.tries[build.job.uuid] = 1
|
||||
|
||||
def addRetryBuild(self, build):
|
||||
with self.activeContext(self.item.pipeline.manager.current_context):
|
||||
@ -6055,8 +6059,11 @@ class QueueItem(zkobject.ZKObject):
|
||||
layout_uuid=None)
|
||||
old_build_set.delete(context)
|
||||
|
||||
def addBuild(self, job, build):
|
||||
self.current_build_set.addBuild(job, build)
|
||||
def addBuild(self, build):
|
||||
self.current_build_set.addBuild(build)
|
||||
|
||||
def addBuilds(self, builds):
|
||||
self.current_build_set.addBuilds(builds)
|
||||
|
||||
def addRetryBuild(self, build):
|
||||
self.current_build_set.addRetryBuild(build)
|
||||
@ -6412,7 +6419,7 @@ class QueueItem(zkobject.ZKObject):
|
||||
fakebuild = Build.new(self.pipeline.manager.current_context,
|
||||
job=job, build_set=self.current_build_set,
|
||||
error_detail=str(e), result='FAILURE')
|
||||
self.addBuild(job, fakebuild)
|
||||
self.addBuild(fakebuild)
|
||||
self.pipeline.manager.sched.reportBuildEnd(
|
||||
fakebuild,
|
||||
tenant=self.pipeline.tenant.name,
|
||||
@ -6659,15 +6666,19 @@ class QueueItem(zkobject.ZKObject):
|
||||
build.job)
|
||||
skipped += to_skip
|
||||
|
||||
fake_builds = []
|
||||
for job in skipped:
|
||||
child_build = self.current_build_set.getBuild(job)
|
||||
if not child_build:
|
||||
fakebuild = Build.new(self.pipeline.manager.current_context,
|
||||
job=job,
|
||||
build_set=self.current_build_set,
|
||||
error_detail=skipped_reason,
|
||||
result='SKIPPED')
|
||||
self.addBuild(job, fakebuild)
|
||||
fake_builds.append(
|
||||
Build.new(self.pipeline.manager.current_context,
|
||||
job=job,
|
||||
build_set=self.current_build_set,
|
||||
error_detail=skipped_reason,
|
||||
result='SKIPPED'))
|
||||
if fake_builds:
|
||||
self.addBuilds(fake_builds)
|
||||
for fakebuild in fake_builds:
|
||||
self.pipeline.manager.sched.reportBuildEnd(
|
||||
fakebuild,
|
||||
tenant=self.pipeline.tenant.name,
|
||||
@ -6683,7 +6694,7 @@ class QueueItem(zkobject.ZKObject):
|
||||
error_detail=error,
|
||||
result='NODE_FAILURE',
|
||||
)
|
||||
self.addBuild(job, fakebuild)
|
||||
self.addBuild(fakebuild)
|
||||
self.pipeline.manager.sched.reportBuildEnd(
|
||||
fakebuild,
|
||||
tenant=self.pipeline.tenant.name,
|
||||
@ -6734,30 +6745,38 @@ class QueueItem(zkobject.ZKObject):
|
||||
self._setAllJobsSkipped('Buildset configuration error')
|
||||
|
||||
def _setAllJobsSkipped(self, msg):
|
||||
fake_builds = []
|
||||
for job in self.getJobs():
|
||||
fakebuild = Build.new(self.pipeline.manager.current_context,
|
||||
job=job, build_set=self.current_build_set,
|
||||
error_detail=msg, result='SKIPPED')
|
||||
self.addBuild(job, fakebuild)
|
||||
self.pipeline.manager.sched.reportBuildEnd(
|
||||
fakebuild,
|
||||
tenant=self.pipeline.tenant.name,
|
||||
final=True)
|
||||
fake_builds.append(Build.new(
|
||||
self.pipeline.manager.current_context,
|
||||
job=job, build_set=self.current_build_set,
|
||||
error_detail=msg, result='SKIPPED'))
|
||||
if fake_builds:
|
||||
self.addBuilds(fake_builds)
|
||||
for fakebuild in fake_builds:
|
||||
self.pipeline.manager.sched.reportBuildEnd(
|
||||
fakebuild,
|
||||
tenant=self.pipeline.tenant.name,
|
||||
final=True)
|
||||
|
||||
def _setMissingJobsSkipped(self, msg):
|
||||
fake_builds = []
|
||||
for job in self.getJobs():
|
||||
build = self.current_build_set.getBuild(job)
|
||||
if build:
|
||||
# We already have a build for this job
|
||||
continue
|
||||
fakebuild = Build.new(self.pipeline.manager.current_context,
|
||||
job=job, build_set=self.current_build_set,
|
||||
error_detail=msg, result='SKIPPED')
|
||||
self.addBuild(job, fakebuild)
|
||||
self.pipeline.manager.sched.reportBuildEnd(
|
||||
fakebuild,
|
||||
tenant=self.pipeline.tenant.name,
|
||||
final=True)
|
||||
fake_builds.append(Build.new(
|
||||
self.pipeline.manager.current_context,
|
||||
job=job, build_set=self.current_build_set,
|
||||
error_detail=msg, result='SKIPPED'))
|
||||
if fake_builds:
|
||||
self.addBuilds(fake_builds)
|
||||
for fakebuild in fake_builds:
|
||||
self.pipeline.manager.sched.reportBuildEnd(
|
||||
fakebuild,
|
||||
tenant=self.pipeline.tenant.name,
|
||||
final=True)
|
||||
|
||||
def formatUrlPattern(self, url_pattern, job=None, build=None):
|
||||
url = None
|
||||
|
@ -3209,7 +3209,7 @@ class Scheduler(threading.Thread):
|
||||
buildset.item.pipeline.manager.current_context,
|
||||
job=job, build_set=item.current_build_set,
|
||||
result='CANCELED')
|
||||
buildset.addBuild(job, fakebuild)
|
||||
buildset.addBuild(fakebuild)
|
||||
finally:
|
||||
# Release the semaphore in any case
|
||||
pipeline = buildset.item.pipeline
|
||||
|
Loading…
x
Reference in New Issue
Block a user