Merge "Centralize job canceling"
This commit is contained in:
commit
dac1ad5d73
|
@ -420,46 +420,14 @@ class PipelineManager(object):
|
|||
def cancelJobs(self, item, prime=True):
|
||||
self.log.debug("Cancel jobs for change %s" % item.change)
|
||||
canceled = False
|
||||
jobs_to_release = []
|
||||
|
||||
old_build_set = item.current_build_set
|
||||
old_jobs = {job.name: job for job in item.getJobs()}
|
||||
jobs_to_cancel = item.getJobs()
|
||||
|
||||
if prime and item.current_build_set.ref:
|
||||
item.resetAllBuilds()
|
||||
for req in old_build_set.node_requests.values():
|
||||
self.sched.nodepool.cancelRequest(req)
|
||||
jobs_to_release.append(req.job)
|
||||
old_build_set.node_requests = {}
|
||||
canceled_jobs = set()
|
||||
for build in old_build_set.getBuilds():
|
||||
if build.result:
|
||||
canceled_jobs.add(build.job.name)
|
||||
continue
|
||||
was_running = False
|
||||
try:
|
||||
was_running = self.sched.executor.cancel(build)
|
||||
except Exception:
|
||||
self.log.exception("Exception while canceling build %s "
|
||||
"for change %s" % (build, item.change))
|
||||
jobs_to_release.append(build.job)
|
||||
|
||||
if not was_running:
|
||||
nodeset = build.build_set.getJobNodeSet(build.job.name)
|
||||
self.sched.nodepool.returnNodeSet(nodeset, build)
|
||||
build.result = 'CANCELED'
|
||||
canceled = True
|
||||
canceled_jobs.add(build.job.name)
|
||||
for jobname, nodeset in list(old_build_set.nodesets.items()):
|
||||
if jobname in canceled_jobs:
|
||||
continue
|
||||
self.sched.nodepool.returnNodeSet(nodeset)
|
||||
jobs_to_release.append(old_jobs[jobname])
|
||||
|
||||
for job in jobs_to_release:
|
||||
tenant = old_build_set.item.pipeline.tenant
|
||||
tenant.semaphore_handler.release(
|
||||
old_build_set.item, job)
|
||||
for job in jobs_to_cancel:
|
||||
self.sched.cancelJob(old_build_set, job)
|
||||
|
||||
for item_behind in item.items_behind:
|
||||
self.log.debug("Canceling jobs for change %s, behind change %s" %
|
||||
|
|
|
@ -847,35 +847,15 @@ class Scheduler(threading.Thread):
|
|||
requests_to_cancel.append(
|
||||
(item.current_build_set, request))
|
||||
|
||||
semaphores_to_release = []
|
||||
for build in builds_to_cancel:
|
||||
self.log.info(
|
||||
"Canceling build %s during reconfiguration" % (build,))
|
||||
try:
|
||||
self.executor.cancel(build)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception while canceling build %s "
|
||||
"for change %s" % (build, build.build_set.item.change))
|
||||
# In the unlikely case that a build is removed and
|
||||
# later added back, make sure we clear out the nodeset
|
||||
# so it gets requested again.
|
||||
try:
|
||||
build.build_set.removeJobNodeSet(build.job.name)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception while removing nodeset from build %s "
|
||||
"for change %s" % (build, build.build_set.item.change))
|
||||
semaphores_to_release.append((build.build_set.item, build.job))
|
||||
self.cancelJob(build.build_set, build.job, build=build)
|
||||
for build_set, request in requests_to_cancel:
|
||||
self.log.info(
|
||||
"Canceling node request %s during reconfiguration",
|
||||
request)
|
||||
self.nodepool.cancelRequest(request)
|
||||
build_set.removeJobNodeRequest(request.job.name)
|
||||
semaphores_to_release.append((build_set.item, request.job))
|
||||
for item, job in semaphores_to_release:
|
||||
tenant.semaphore_handler.release(item, job)
|
||||
self.cancelJob(build_set, request.job)
|
||||
|
||||
def _reconfigureTenant(self, tenant):
|
||||
# This is called from _doReconfigureEvent while holding the
|
||||
|
@ -1436,3 +1416,48 @@ class Scheduler(threading.Thread):
|
|||
if change.isUpdateOf(dep):
|
||||
other_change.refresh_deps = True
|
||||
change.refresh_deps = True
|
||||
|
||||
def cancelJob(self, buildset, job, build=None):
|
||||
item = buildset.item
|
||||
try:
|
||||
# Cancel node request if needed
|
||||
req = buildset.node_requests.get(job)
|
||||
if req:
|
||||
self.nodepool.cancelRequest(req)
|
||||
buildset.removeJobNodeRequest(job.name)
|
||||
|
||||
# Cancel build if needed
|
||||
job_name = job.name
|
||||
build = build or buildset.getBuild(job_name)
|
||||
if build:
|
||||
was_running = False
|
||||
try:
|
||||
was_running = self.executor.cancel(build)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception while canceling build %s for change %s" % (
|
||||
build, item.change))
|
||||
|
||||
# In the unlikely case that a build is removed and
|
||||
# later added back, make sure we clear out the nodeset
|
||||
# so it gets requested again.
|
||||
try:
|
||||
buildset.removeJobNodeSet(job.name)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception while removing nodeset from build %s "
|
||||
"for change %s" % (build, build.build_set.item.change))
|
||||
|
||||
if not was_running:
|
||||
nodeset = buildset.getJobNodeSet(job.name)
|
||||
if nodeset:
|
||||
self.nodepool.returnNodeSet(nodeset, build)
|
||||
build.result = 'CANCELED'
|
||||
else:
|
||||
nodeset = buildset.getJobNodeSet(job.name)
|
||||
if nodeset:
|
||||
self.nodepool.returnNodeSet(nodeset)
|
||||
finally:
|
||||
# Release the semaphore in any case
|
||||
tenant = buildset.item.pipeline.tenant
|
||||
tenant.semaphore_handler.release(item, job)
|
||||
|
|
Loading…
Reference in New Issue