Dequeue changes when new patchset created.

When a new patchset is created for a change that is in a pipeline,
cancel running builds and dequeue that change (and possibly
dependent changes that can no longer merge).

Make this optional (and document the option).

Fixes bug 1022643.

Change-Id: I8f591956cf86645443e4b6075b8cdfc95a939e4f
Reviewed-on: https://review.openstack.org/20948
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
changes/59/22159/1
James E. Blair 10 years ago committed by Jenkins
parent ffef9e4e6b
commit 2fa5096064
  1. 7
      doc/source/zuul.rst
  2. 1
      tests/fixtures/layout.yaml
  3. 188
      tests/test_scheduler.py
  4. 1
      zuul/layoutvalidator.py
  5. 12
      zuul/model.py
  6. 67
      zuul/scheduler.py

@ -277,6 +277,13 @@ explanation of each of the parameters::
containing 'retrigger' somewhere in the comment text are added to a
change.
**dequeue-on-new-patchset**
Normally, if a new patchset is uploaded to a change that is in a
pipeline, the existing entry in the pipeline will be removed (with
jobs canceled and any dependent changes that can no longer merge as
well. To suppress this behavior (and allow jobs to continue
running), set this to ``false``. Default: ``true``.
**success**
Describes what Zuul should do if all the jobs complete successfully.
This section is optional; if it is omitted, Zuul will run jobs and

@ -31,6 +31,7 @@ pipelines:
- name: unused
manager: IndependentPipelineManager
dequeue-on-new-patchset: false
trigger:
- event: comment-added
approval:

@ -1332,7 +1332,7 @@ class testScheduler(unittest.TestCase):
self.assertEmptyQueues()
def test_head_is_dequeued_once(self):
"Test that if a change at the head fails it is dequeud only once"
"Test that if a change at the head fails it is dequeued only once"
# If it's dequeued more than once, we should see extra
# aborted jobs.
self.fake_jenkins.hold_jobs_in_build = True
@ -1669,3 +1669,189 @@ class testScheduler(unittest.TestCase):
jobs = self.fake_jenkins.job_history
assert len(jobs) == 0
self.assertEmptyQueues()
def test_new_patchset_dequeues_old(self):
"Test that a new patchset causes the old to be dequeued"
# D -> C (depends on B) -> B (depends on A) -> A -> M
self.fake_jenkins.hold_jobs_in_build = True
M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M')
M.setMerged()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
D.addApproval('CRVW', 2)
C.setDependsOn(B, 1)
B.setDependsOn(A, 1)
A.setDependsOn(M, 1)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
self.waitUntilSettled()
B.addPatchset()
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
self.waitUntilSettled()
self.fake_jenkins.hold_jobs_in_build = False
self.fake_jenkins.fakeRelease()
self.waitUntilSettled()
jobs = self.fake_jenkins.all_jobs
finished_jobs = self.fake_jenkins.job_history
for x in jobs:
print x
for x in finished_jobs:
print x
assert A.data['status'] == 'MERGED'
assert A.reported == 2
assert B.data['status'] == 'NEW'
assert B.reported == 2
assert C.data['status'] == 'NEW'
assert C.reported == 2
assert D.data['status'] == 'MERGED'
assert D.reported == 2
assert len(finished_jobs) == 9 # 3 each for A, B, D.
self.assertEmptyQueues()
def test_new_patchset_dequeues_old_on_head(self):
"Test that a new patchset causes the old to be dequeued (at head)"
# D -> C (depends on B) -> B (depends on A) -> A -> M
self.fake_jenkins.hold_jobs_in_build = True
M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M')
M.setMerged()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
D.addApproval('CRVW', 2)
C.setDependsOn(B, 1)
B.setDependsOn(A, 1)
A.setDependsOn(M, 1)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
self.waitUntilSettled()
A.addPatchset()
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
self.waitUntilSettled()
self.fake_jenkins.hold_jobs_in_build = False
self.fake_jenkins.fakeRelease()
self.waitUntilSettled()
jobs = self.fake_jenkins.all_jobs
finished_jobs = self.fake_jenkins.job_history
for x in jobs:
print x
for x in finished_jobs:
print x
assert A.data['status'] == 'NEW'
assert A.reported == 2
assert B.data['status'] == 'NEW'
assert B.reported == 2
assert C.data['status'] == 'NEW'
assert C.reported == 2
assert D.data['status'] == 'MERGED'
assert D.reported == 2
assert len(finished_jobs) == 7
self.assertEmptyQueues()
def test_new_patchset_dequeues_old_without_dependents(self):
"Test that a new patchset causes only the old to be dequeued"
self.fake_jenkins.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
B.addPatchset()
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
self.waitUntilSettled()
self.fake_jenkins.hold_jobs_in_build = False
self.fake_jenkins.fakeRelease()
self.waitUntilSettled()
jobs = self.fake_jenkins.all_jobs
finished_jobs = self.fake_jenkins.job_history
for x in jobs:
print x
for x in finished_jobs:
print x
assert A.data['status'] == 'MERGED'
assert A.reported == 2
assert B.data['status'] == 'NEW'
assert B.reported == 2
assert C.data['status'] == 'MERGED'
assert C.reported == 2
assert len(finished_jobs) == 9
self.assertEmptyQueues()
def test_new_patchset_dequeues_old_independent_queue(self):
"Test that a new patchset causes the old to be dequeued (independent)"
self.fake_jenkins.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
B.addPatchset()
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
self.waitUntilSettled()
self.fake_jenkins.hold_jobs_in_build = False
self.fake_jenkins.fakeRelease()
self.waitUntilSettled()
jobs = self.fake_jenkins.all_jobs
finished_jobs = self.fake_jenkins.job_history
for x in jobs:
print x
for x in finished_jobs:
print x
assert A.data['status'] == 'NEW'
assert A.reported == 1
assert B.data['status'] == 'NEW'
assert B.reported == 1
assert C.data['status'] == 'NEW'
assert C.reported == 1
assert len(finished_jobs) == 10
assert self.countJobResults(finished_jobs, 'ABORTED') == 1
self.assertEmptyQueues()

@ -47,6 +47,7 @@ class LayoutSchema(object):
'description': str,
'success-message': str,
'failure-message': str,
'dequeue-on-new-patchset': bool,
'trigger': toList(trigger),
'success': variable_dict,
'failure': variable_dict,

@ -30,6 +30,7 @@ class Pipeline(object):
self.description = None
self.failure_message = None
self.success_message = None
self.dequeue_on_new_patchset = True
self.job_trees = {} # project -> JobTree
self.manager = None
self.queues = []
@ -503,6 +504,9 @@ class Changeish(object):
def equals(self, other):
raise NotImplementedError()
def isUpdateOf(self, other):
raise NotImplementedError()
def filterJobs(self, jobs):
return filter(lambda job: job.changeMatches(self), jobs)
@ -547,6 +551,11 @@ class Change(Changeish):
return True
return False
def isUpdateOf(self, other):
if self.number == other.number and self.patchset > other.patchset:
return True
return False
def setReportedResult(self, result):
self.current_build_set.result = result
@ -585,6 +594,9 @@ class Ref(Changeish):
return True
return False
def isUpdateOf(self, other):
return False
class TriggerEvent(object):
def __init__(self):

@ -100,6 +100,9 @@ class Scheduler(threading.Thread):
"Build failed.")
pipeline.success_message = conf_pipeline.get('success-message',
"Build succeeded.")
pipeline.dequeue_on_new_patchset = conf_pipeline.get(
'dequeue-on-new-patchset',
True)
manager = globals()[conf_pipeline['manager']](self, pipeline)
pipeline.setManager(manager)
@ -401,10 +404,12 @@ class Scheduler(threading.Thread):
return
for pipeline in self.pipelines.values():
change = event.getChange(project, self.trigger)
if event.type == 'patchset-created':
pipeline.manager.removeOldVersionsOfChange(change)
if not pipeline.manager.eventMatches(event):
self.log.debug("Event %s ignored by %s" % (event, pipeline))
continue
change = event.getChange(project, self.trigger)
self.log.info("Adding %s, %s to %s" %
(project, change, pipeline))
pipeline.manager.addChange(change)
@ -572,6 +577,22 @@ class BasePipelineManager(object):
def enqueueChangesBehind(self, change):
return True
def findOldVersionOfChangeAlreadyInQueue(self, change):
for c in self.pipeline.getChangesInQueue():
if change.isUpdateOf(c):
return c
return None
def removeOldVersionsOfChange(self, change):
if not self.pipeline.dequeue_on_new_patchset:
return
old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
if old_change:
self.log.debug("Change %s is a new version of %s, removing %s" %
(change, old_change, old_change))
self.removeChange(old_change)
self.launchJobs()
def addChange(self, change):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
@ -606,6 +627,37 @@ class BasePipelineManager(object):
return False
self.launchJobs()
def cancelJobs(self, change, prime=True):
self.log.debug("Cancel jobs for change %s" % change)
to_remove = []
for build, build_change in self.building_jobs.items():
if build_change == change:
self.log.debug("Found build %s for change %s to cancel" %
(build, change))
try:
self.sched.launcher.cancel(build)
except:
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, change))
to_remove.append(build)
for build in to_remove:
self.log.debug("Removing build %s from running builds" % build)
build.result = 'CANCELED'
del self.building_jobs[build]
def dequeueChange(self, change):
self.log.debug("Removing change %s from queue" % change)
change_queue = self.pipeline.getQueue(change.project)
change_queue.dequeueChange(change)
def removeChange(self, change):
# Remove a change from the queue, probably because it has been
# superceded by another change.
self.log.debug("Canceling builds behind change: %s because it is "
"being removed." % change)
self.cancelJobs(change)
self.dequeueChange(change)
def _launchJobs(self, change, jobs):
self.log.debug("Launching jobs for change %s" % change)
ref = change.current_build_set.ref
@ -1126,6 +1178,15 @@ class DependentPipelineManager(BasePipelineManager):
(change.change_behind, change))
self.cancelJobs(change.change_behind, prime=prime)
def removeChange(self, change):
# Remove a change from the queue (even the middle), probably
# because it has been superceded by another change (or
# otherwise will not merge).
self.log.debug("Canceling builds behind change: %s because it is "
"being removed." % change)
self.cancelJobs(change, prime=False)
self.dequeueChange(change, keep_severed_heads=False)
def handleFailedChange(self, change):
# A build failed. All changes behind this change will need to
# be retested. To free up resources cancel the builds behind
@ -1145,13 +1206,13 @@ class DependentPipelineManager(BasePipelineManager):
"failure." % change)
self.cancelJobs(change_behind, prime=False)
def dequeueChange(self, change):
def dequeueChange(self, change, keep_severed_heads=True):
self.log.debug("Removing change %s from queue" % change)
change_ahead = change.change_ahead
change_behind = change.change_behind
change_queue = self.pipeline.getQueue(change.project)
change_queue.dequeueChange(change)
if not change_ahead and not change.reported:
if keep_severed_heads and not change_ahead and not change.reported:
self.log.debug("Adding %s as a severed head" % change)
change_queue.addSeveredHead(change)
self.dequeueDependentChanges(change_behind)

Loading…
Cancel
Save