From 2fa509606410c96da90fc57e6d6daaf547ced1b7 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 30 Jan 2013 21:50:41 -0800 Subject: [PATCH] Dequeue changes when new patchset created. When a new patchset is created for a change that is in a pipeline, cancel running builds and dequeue that change (and possibly dependent changes that can no longer merge). Make this optional (and document the option). Fixes bug 1022643. Change-Id: I8f591956cf86645443e4b6075b8cdfc95a939e4f Reviewed-on: https://review.openstack.org/20948 Reviewed-by: Jeremy Stanley Approved: James E. Blair Tested-by: Jenkins --- doc/source/zuul.rst | 7 ++ tests/fixtures/layout.yaml | 1 + tests/test_scheduler.py | 188 ++++++++++++++++++++++++++++++++++++- zuul/layoutvalidator.py | 1 + zuul/model.py | 12 +++ zuul/scheduler.py | 67 ++++++++++++- 6 files changed, 272 insertions(+), 4 deletions(-) diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst index f1578aca99..9fdf04d5a9 100644 --- a/doc/source/zuul.rst +++ b/doc/source/zuul.rst @@ -277,6 +277,13 @@ explanation of each of the parameters:: containing 'retrigger' somewhere in the comment text are added to a change. +**dequeue-on-new-patchset** + Normally, if a new patchset is uploaded to a change that is in a + pipeline, the existing entry in the pipeline will be removed (with + jobs canceled and any dependent changes that can no longer merge as + well. To suppress this behavior (and allow jobs to continue + running), set this to ``false``. Default: ``true``. + **success** Describes what Zuul should do if all the jobs complete successfully. This section is optional; if it is omitted, Zuul will run jobs and diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml index 586e2a5841..cab97b9068 100644 --- a/tests/fixtures/layout.yaml +++ b/tests/fixtures/layout.yaml @@ -31,6 +31,7 @@ pipelines: - name: unused manager: IndependentPipelineManager + dequeue-on-new-patchset: false trigger: - event: comment-added approval: diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 20ad4ec3b0..aa7001e014 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1332,7 +1332,7 @@ class testScheduler(unittest.TestCase): self.assertEmptyQueues() def test_head_is_dequeued_once(self): - "Test that if a change at the head fails it is dequeud only once" + "Test that if a change at the head fails it is dequeued only once" # If it's dequeued more than once, we should see extra # aborted jobs. self.fake_jenkins.hold_jobs_in_build = True @@ -1669,3 +1669,189 @@ class testScheduler(unittest.TestCase): jobs = self.fake_jenkins.job_history assert len(jobs) == 0 self.assertEmptyQueues() + + def test_new_patchset_dequeues_old(self): + "Test that a new patchset causes the old to be dequeued" + # D -> C (depends on B) -> B (depends on A) -> A -> M + self.fake_jenkins.hold_jobs_in_build = True + + M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M') + M.setMerged() + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D') + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + C.addApproval('CRVW', 2) + D.addApproval('CRVW', 2) + + C.setDependsOn(B, 1) + B.setDependsOn(A, 1) + A.setDependsOn(M, 1) + + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(D.addApproval('APRV', 1)) + self.waitUntilSettled() + + B.addPatchset() + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2)) + self.waitUntilSettled() + + self.fake_jenkins.hold_jobs_in_build = False + self.fake_jenkins.fakeRelease() + self.waitUntilSettled() + + jobs = self.fake_jenkins.all_jobs + finished_jobs = self.fake_jenkins.job_history + + for x in jobs: + print x + for x in finished_jobs: + print x + + assert A.data['status'] == 'MERGED' + assert A.reported == 2 + assert B.data['status'] == 'NEW' + assert B.reported == 2 + assert C.data['status'] == 'NEW' + assert C.reported == 2 + assert D.data['status'] == 'MERGED' + assert D.reported == 2 + assert len(finished_jobs) == 9 # 3 each for A, B, D. + self.assertEmptyQueues() + + def test_new_patchset_dequeues_old_on_head(self): + "Test that a new patchset causes the old to be dequeued (at head)" + # D -> C (depends on B) -> B (depends on A) -> A -> M + self.fake_jenkins.hold_jobs_in_build = True + + M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M') + M.setMerged() + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D') + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + C.addApproval('CRVW', 2) + D.addApproval('CRVW', 2) + + C.setDependsOn(B, 1) + B.setDependsOn(A, 1) + A.setDependsOn(M, 1) + + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(D.addApproval('APRV', 1)) + self.waitUntilSettled() + + A.addPatchset() + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2)) + self.waitUntilSettled() + + self.fake_jenkins.hold_jobs_in_build = False + self.fake_jenkins.fakeRelease() + self.waitUntilSettled() + + jobs = self.fake_jenkins.all_jobs + finished_jobs = self.fake_jenkins.job_history + + for x in jobs: + print x + for x in finished_jobs: + print x + + assert A.data['status'] == 'NEW' + assert A.reported == 2 + assert B.data['status'] == 'NEW' + assert B.reported == 2 + assert C.data['status'] == 'NEW' + assert C.reported == 2 + assert D.data['status'] == 'MERGED' + assert D.reported == 2 + assert len(finished_jobs) == 7 + self.assertEmptyQueues() + + def test_new_patchset_dequeues_old_without_dependents(self): + "Test that a new patchset causes only the old to be dequeued" + self.fake_jenkins.hold_jobs_in_build = True + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + C.addApproval('CRVW', 2) + + self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + + B.addPatchset() + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2)) + self.waitUntilSettled() + + self.fake_jenkins.hold_jobs_in_build = False + self.fake_jenkins.fakeRelease() + self.waitUntilSettled() + + jobs = self.fake_jenkins.all_jobs + finished_jobs = self.fake_jenkins.job_history + + for x in jobs: + print x + for x in finished_jobs: + print x + + assert A.data['status'] == 'MERGED' + assert A.reported == 2 + assert B.data['status'] == 'NEW' + assert B.reported == 2 + assert C.data['status'] == 'MERGED' + assert C.reported == 2 + assert len(finished_jobs) == 9 + self.assertEmptyQueues() + + def test_new_patchset_dequeues_old_independent_queue(self): + "Test that a new patchset causes the old to be dequeued (independent)" + self.fake_jenkins.hold_jobs_in_build = True + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + B.addPatchset() + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2)) + self.waitUntilSettled() + + self.fake_jenkins.hold_jobs_in_build = False + self.fake_jenkins.fakeRelease() + self.waitUntilSettled() + + jobs = self.fake_jenkins.all_jobs + finished_jobs = self.fake_jenkins.job_history + + for x in jobs: + print x + for x in finished_jobs: + print x + + assert A.data['status'] == 'NEW' + assert A.reported == 1 + assert B.data['status'] == 'NEW' + assert B.reported == 1 + assert C.data['status'] == 'NEW' + assert C.reported == 1 + assert len(finished_jobs) == 10 + assert self.countJobResults(finished_jobs, 'ABORTED') == 1 + self.assertEmptyQueues() diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py index 8d2177343d..e57cabb775 100644 --- a/zuul/layoutvalidator.py +++ b/zuul/layoutvalidator.py @@ -47,6 +47,7 @@ class LayoutSchema(object): 'description': str, 'success-message': str, 'failure-message': str, + 'dequeue-on-new-patchset': bool, 'trigger': toList(trigger), 'success': variable_dict, 'failure': variable_dict, diff --git a/zuul/model.py b/zuul/model.py index 9a60049e01..eb4700786c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -30,6 +30,7 @@ class Pipeline(object): self.description = None self.failure_message = None self.success_message = None + self.dequeue_on_new_patchset = True self.job_trees = {} # project -> JobTree self.manager = None self.queues = [] @@ -503,6 +504,9 @@ class Changeish(object): def equals(self, other): raise NotImplementedError() + def isUpdateOf(self, other): + raise NotImplementedError() + def filterJobs(self, jobs): return filter(lambda job: job.changeMatches(self), jobs) @@ -547,6 +551,11 @@ class Change(Changeish): return True return False + def isUpdateOf(self, other): + if self.number == other.number and self.patchset > other.patchset: + return True + return False + def setReportedResult(self, result): self.current_build_set.result = result @@ -585,6 +594,9 @@ class Ref(Changeish): return True return False + def isUpdateOf(self, other): + return False + class TriggerEvent(object): def __init__(self): diff --git a/zuul/scheduler.py b/zuul/scheduler.py index f8d607f6c6..97937aa3c6 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -100,6 +100,9 @@ class Scheduler(threading.Thread): "Build failed.") pipeline.success_message = conf_pipeline.get('success-message', "Build succeeded.") + pipeline.dequeue_on_new_patchset = conf_pipeline.get( + 'dequeue-on-new-patchset', + True) manager = globals()[conf_pipeline['manager']](self, pipeline) pipeline.setManager(manager) @@ -401,10 +404,12 @@ class Scheduler(threading.Thread): return for pipeline in self.pipelines.values(): + change = event.getChange(project, self.trigger) + if event.type == 'patchset-created': + pipeline.manager.removeOldVersionsOfChange(change) if not pipeline.manager.eventMatches(event): self.log.debug("Event %s ignored by %s" % (event, pipeline)) continue - change = event.getChange(project, self.trigger) self.log.info("Adding %s, %s to %s" % (project, change, pipeline)) pipeline.manager.addChange(change) @@ -572,6 +577,22 @@ class BasePipelineManager(object): def enqueueChangesBehind(self, change): return True + def findOldVersionOfChangeAlreadyInQueue(self, change): + for c in self.pipeline.getChangesInQueue(): + if change.isUpdateOf(c): + return c + return None + + def removeOldVersionsOfChange(self, change): + if not self.pipeline.dequeue_on_new_patchset: + return + old_change = self.findOldVersionOfChangeAlreadyInQueue(change) + if old_change: + self.log.debug("Change %s is a new version of %s, removing %s" % + (change, old_change, old_change)) + self.removeChange(old_change) + self.launchJobs() + def addChange(self, change): self.log.debug("Considering adding change %s" % change) if self.isChangeAlreadyInQueue(change): @@ -606,6 +627,37 @@ class BasePipelineManager(object): return False self.launchJobs() + def cancelJobs(self, change, prime=True): + self.log.debug("Cancel jobs for change %s" % change) + to_remove = [] + for build, build_change in self.building_jobs.items(): + if build_change == change: + self.log.debug("Found build %s for change %s to cancel" % + (build, change)) + try: + self.sched.launcher.cancel(build) + except: + self.log.exception("Exception while canceling build %s " + "for change %s" % (build, change)) + to_remove.append(build) + for build in to_remove: + self.log.debug("Removing build %s from running builds" % build) + build.result = 'CANCELED' + del self.building_jobs[build] + + def dequeueChange(self, change): + self.log.debug("Removing change %s from queue" % change) + change_queue = self.pipeline.getQueue(change.project) + change_queue.dequeueChange(change) + + def removeChange(self, change): + # Remove a change from the queue, probably because it has been + # superceded by another change. + self.log.debug("Canceling builds behind change: %s because it is " + "being removed." % change) + self.cancelJobs(change) + self.dequeueChange(change) + def _launchJobs(self, change, jobs): self.log.debug("Launching jobs for change %s" % change) ref = change.current_build_set.ref @@ -1126,6 +1178,15 @@ class DependentPipelineManager(BasePipelineManager): (change.change_behind, change)) self.cancelJobs(change.change_behind, prime=prime) + def removeChange(self, change): + # Remove a change from the queue (even the middle), probably + # because it has been superceded by another change (or + # otherwise will not merge). + self.log.debug("Canceling builds behind change: %s because it is " + "being removed." % change) + self.cancelJobs(change, prime=False) + self.dequeueChange(change, keep_severed_heads=False) + def handleFailedChange(self, change): # A build failed. All changes behind this change will need to # be retested. To free up resources cancel the builds behind @@ -1145,13 +1206,13 @@ class DependentPipelineManager(BasePipelineManager): "failure." % change) self.cancelJobs(change_behind, prime=False) - def dequeueChange(self, change): + def dequeueChange(self, change, keep_severed_heads=True): self.log.debug("Removing change %s from queue" % change) change_ahead = change.change_ahead change_behind = change.change_behind change_queue = self.pipeline.getQueue(change.project) change_queue.dequeueChange(change) - if not change_ahead and not change.reported: + if keep_severed_heads and not change_ahead and not change.reported: self.log.debug("Adding %s as a severed head" % change) change_queue.addSeveredHead(change) self.dequeueDependentChanges(change_behind)