Refactor change queue.

Move most of the change queue logic into the model.  This should
be a little cleaner, and it should be easier to follow what
happens when a build is complete.

Run launch jobs over the whole queue.
Collapse addChange into the base manager.

Makes reading the logic around adding a change simpler.  Several
hooks are provided for the dependent manager subclass to extend
the method around dependent changes.

Move onbuildcompleted and reporting methods to base class.

Simplifies the code path when a build is completed.  Move to a
more generalized model where updates are applied to changes as
received, then search for changes ready to be reported, then
search for jobs that need to be launched.

Also,
 * test check queue functionality
 * assert all queues are empty at ends of tests
 * move formatting methods to pipeline/queue classes
 * add queue-only mode notification to status screen

Change-Id: I336ac289af6ebd23567ac54b359c9f38af7f2ac7
Reviewed-on: https://review.openstack.org/12277
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
This commit is contained in:
James E. Blair 2012-08-29 17:38:31 -07:00 committed by Jenkins
parent 127bc18b87
commit e048707033
4 changed files with 377 additions and 267 deletions

View File

@ -2,7 +2,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
- event: patchset-uploaded
- event: patchset-created
success:
verified: 1
failure:

View File

@ -203,6 +203,19 @@ class FakeChange(object):
self.patchsets.append(d)
self.data['submitRecords'] = self.getSubmitRecords()
def getPatchsetCreatedEvent(self, patchset):
event = {"type": "patchset-created",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"patchSet": self.patchsets[patchset - 1],
"uploader": {"name": "User Name"}}
return event
def addApproval(self, category, value):
approval = {'description': self.categories[category][0],
'type': category,
@ -662,6 +675,17 @@ class testScheduler(unittest.TestCase):
jobs = filter(lambda x: x['result'] == result, jobs)
return len(jobs)
def assertEmptyQueues(self):
# Make sure there are no orphaned jobs
for pipeline in self.sched.pipelines.values():
for queue in pipeline.queues:
if len(queue.queue) != 0:
print 'queue', queue.queue
assert len(queue.queue) == 0
if len(queue.severed_heads) != 0:
print 'heads', queue.severed_heads
assert len(queue.severed_heads) == 0
def test_jobs_launched(self):
"Test that jobs are launched and a change is merged"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@ -678,6 +702,7 @@ class testScheduler(unittest.TestCase):
assert jobs[2]['result'] == 'SUCCESS'
assert A.data['status'] == 'MERGED'
assert A.reported == 2
self.assertEmptyQueues()
def test_parallel_changes(self):
"Test that changes are tested in parallel and merged in series"
@ -756,6 +781,7 @@ class testScheduler(unittest.TestCase):
assert A.reported == 2
assert B.reported == 2
assert C.reported == 2
self.assertEmptyQueues()
def test_failed_changes(self):
"Test that a change behind a failed change is retested"
@ -776,6 +802,7 @@ class testScheduler(unittest.TestCase):
assert B.data['status'] == 'MERGED'
assert A.reported == 2
assert B.reported == 2
self.assertEmptyQueues()
def test_independent_queues(self):
"Test that changes end up in the right queues"
@ -824,6 +851,7 @@ class testScheduler(unittest.TestCase):
assert A.reported == 2
assert B.reported == 2
assert C.reported == 2
self.assertEmptyQueues()
def test_failed_change_at_head(self):
"Test that if a change at the head fails, jobs behind it are canceled"
@ -883,6 +911,7 @@ class testScheduler(unittest.TestCase):
assert A.reported == 2
assert B.reported == 2
assert C.reported == 2
self.assertEmptyQueues()
def test_failed_change_at_head_with_queue(self):
"Test that if a change at the head fails, queued jobs are canceled"
@ -946,6 +975,7 @@ class testScheduler(unittest.TestCase):
assert A.reported == 2
assert B.reported == 2
assert C.reported == 2
self.assertEmptyQueues()
def test_patch_order(self):
"Test that dependent patches are tested in the right order"
@ -990,6 +1020,7 @@ class testScheduler(unittest.TestCase):
assert A.reported == 2
assert B.reported == 2
assert C.reported == 2
self.assertEmptyQueues()
def test_can_merge(self):
"Test whether a change is ready to merge"
@ -1006,8 +1037,7 @@ class testScheduler(unittest.TestCase):
A.addApproval('APRV', 1)
a = self.sched.trigger.getChange(1, 2)
assert self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
return True
self.assertEmptyQueues()
def test_build_configuration(self):
"Test that zuul merges the right commits for testing"
@ -1043,6 +1073,7 @@ class testScheduler(unittest.TestCase):
repo_messages.reverse()
correct_messages = ['initial commit', 'A-1', 'B-1', 'C-1']
assert repo_messages == correct_messages
self.assertEmptyQueues()
def test_build_configuration_conflict(self):
"Test that merge conflicts are handled"
@ -1079,6 +1110,7 @@ class testScheduler(unittest.TestCase):
assert A.reported == 2
assert B.reported == 2
assert C.reported == 2
self.assertEmptyQueues()
def test_post(self):
"Test that post jobs run"
@ -1096,6 +1128,7 @@ class testScheduler(unittest.TestCase):
job_names = [x['name'] for x in jobs]
assert len(jobs) == 1
assert 'project-post' in job_names
self.assertEmptyQueues()
def test_build_configuration_branch(self):
"Test that the right commits are on alternate branches"
@ -1130,6 +1163,7 @@ class testScheduler(unittest.TestCase):
repo_messages.reverse()
correct_messages = ['initial commit', 'mp commit', 'A-1', 'B-1', 'C-1']
assert repo_messages == correct_messages
self.assertEmptyQueues()
def test_build_configuration_branch_interaction(self):
"Test that switching between branches works"
@ -1140,6 +1174,7 @@ class testScheduler(unittest.TestCase):
repo = git.Repo(path)
repo.heads.master.commit = repo.commit('init')
self.test_build_configuration()
self.assertEmptyQueues()
def test_build_configuration_multi_branch(self):
"Test that dependent changes on multiple branches are merged"
@ -1183,6 +1218,7 @@ class testScheduler(unittest.TestCase):
repo_messages.reverse()
correct_messages = ['initial commit', 'mp commit', 'B-1']
assert repo_messages == correct_messages
self.assertEmptyQueues()
def test_one_job_project(self):
"Test that queueing works with one job"
@ -1203,6 +1239,7 @@ class testScheduler(unittest.TestCase):
assert A.reported == 2
assert B.data['status'] == 'MERGED'
assert B.reported == 2
self.assertEmptyQueues()
def test_dependent_changes_dequeue(self):
"Test that dependent patches are not needlessly tested"
@ -1245,6 +1282,7 @@ class testScheduler(unittest.TestCase):
assert C.data['status'] == 'NEW'
assert C.reported == 2
assert len(finished_jobs) == 1
self.assertEmptyQueues()
def test_head_is_dequeued_once(self):
"Test that if a change at the head fails it is dequeud only once"
@ -1312,6 +1350,7 @@ class testScheduler(unittest.TestCase):
assert A.reported == 2
assert B.reported == 2
assert C.reported == 2
self.assertEmptyQueues()
def test_nonvoting_job(self):
"Test that non-voting jobs don't vote."
@ -1330,6 +1369,40 @@ class testScheduler(unittest.TestCase):
assert finished_jobs[0]['result'] == 'SUCCESS'
assert finished_jobs[1]['result'] == 'SUCCESS'
assert finished_jobs[2]['result'] == 'FAILURE'
self.assertEmptyQueues()
def test_check_queue_success(self):
"Test successful check queue jobs."
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
jobs = self.fake_jenkins.all_jobs
finished_jobs = self.fake_jenkins.job_history
assert A.data['status'] == 'NEW'
assert A.reported == 1
assert finished_jobs[0]['result'] == 'SUCCESS'
assert finished_jobs[1]['result'] == 'SUCCESS'
assert finished_jobs[2]['result'] == 'SUCCESS'
self.assertEmptyQueues()
def test_check_queue_failure(self):
"Test failed check queue jobs."
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_jenkins.fakeAddFailTest('project-test2', A)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
jobs = self.fake_jenkins.all_jobs
finished_jobs = self.fake_jenkins.job_history
assert A.data['status'] == 'NEW'
assert A.reported == 1
assert finished_jobs[0]['result'] == 'SUCCESS'
assert finished_jobs[1]['result'] == 'SUCCESS'
assert finished_jobs[2]['result'] == 'FAILURE'
self.assertEmptyQueues()
def test_dependent_behind_dequeue(self):
"test that dependent changes behind dequeued changes work"
@ -1417,9 +1490,6 @@ class testScheduler(unittest.TestCase):
assert E.reported == 2
assert F.reported == 2
# Make sure there are no orphaned jobs
for queue in self.sched.pipelines['gate'].manager.change_queues:
assert len(queue.queue) == 0
assert self.countJobResults(finished_jobs, 'ABORTED') == 15
assert len(finished_jobs) == 44
self.assertEmptyQueues()

View File

@ -29,6 +29,7 @@ class Pipeline(object):
self.name = name
self.job_trees = {} # project -> JobTree
self.manager = None
self.queues = []
def __repr__(self):
return '<Pipeline %s>' % self.name
@ -44,6 +45,15 @@ class Pipeline(object):
def getProjects(self):
return self.job_trees.keys()
def addQueue(self, queue):
self.queues.append(queue)
def getQueue(self, project):
for queue in self.queues:
if project in queue.projects:
return queue
return None
def getJobTree(self, project):
tree = self.job_trees.get(project)
return tree
@ -152,6 +162,68 @@ class Pipeline(object):
fakebuild.result = 'SKIPPED'
changeish.addBuild(fakebuild)
def getChangesInQueue(self):
changes = []
for shared_queue in self.queues:
changes.extend(shared_queue.queue)
return changes
def getAllChanges(self):
changes = []
for shared_queue in self.queues:
changes.extend(shared_queue.queue)
changes.extend(shared_queue.severed_heads)
return changes
def formatStatusHTML(self):
ret = ''
for queue in self.queues:
if len(self.queues) > 1:
s = 'Change queue: %s' % queue.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
for head in queue.getHeads():
ret += self.formatStatus(head, html=True)
return ret
def formatStatus(self, changeish, indent=0, html=False):
indent_str = ' ' * indent
ret = ''
if html and hasattr(changeish, 'url') and changeish.url is not None:
ret += '%sProject %s change <a href="%s">%s</a>\n' % (
indent_str,
changeish.project.name,
changeish.url,
changeish._id())
else:
ret += '%sProject %s change %s\n' % (indent_str,
changeish.project.name,
changeish._id())
for job in self.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
if build:
result = build.result
else:
result = None
job_name = job.name
if not job.voting:
voting = ' (non-voting)'
else:
voting = ''
if html:
if build:
url = build.url
else:
url = None
if url is not None:
job_name = '<a href="%s">%s</a>' % (url, job_name)
ret += '%s %s: %s%s' % (indent_str, job_name, result, voting)
ret += '\n'
if changeish.change_behind:
ret += '%sFollowed by:\n' % (indent_str)
ret += self.formatStatus(changeish.change_behind, indent + 2, html)
return ret
class ChangeQueue(object):
"""DependentPipelines have multiple parallel queues shared by
@ -159,12 +231,14 @@ class ChangeQueue(object):
a queue shared by interrelated projects foo and bar, and a second
queue for independent project baz. Pipelines have one or more
PipelineQueues."""
def __init__(self, pipeline):
def __init__(self, pipeline, dependent=True):
self.pipeline = pipeline
self.name = ''
self.projects = []
self._jobs = set()
self.queue = []
self.severed_heads = []
self.dependent = dependent
def __repr__(self):
return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
@ -182,19 +256,45 @@ class ChangeQueue(object):
def enqueueChange(self, change):
if self.queue:
self.queue[-1].change_behind = change
change.change_ahead = self.queue[-1]
change.change_ahead.change_behind = change
self.queue.append(change)
change.queue = self
def dequeueChange(self, change):
if change in self.queue:
self.queue.remove(change)
if change in self.severed_heads:
self.severed_heads.remove(change)
if change.change_ahead:
change.change_ahead.change_behind = change.change_behind
if change.change_behind:
change.change_behind.change_ahead = change.change_ahead
change.change_ahead = None
change.change_behind = None
def addSeveredHead(self, change):
self.severed_heads.append(change)
def mergeChangeQueue(self, other):
for project in other.projects:
self.addProject(project)
def getHead(self):
if not self.queue:
return None
return self.queue[0]
def getHeads(self):
heads = []
if self.dependent:
h = self.getHead()
if h:
heads.append(h)
else:
heads.extend(self.queue)
heads.extend(self.severed_heads)
return heads
class Project(object):
def __init__(self, name):
@ -337,11 +437,11 @@ class Changeish(object):
def __init__(self, project):
self.project = project
self.build_sets = []
self.change_ahead = None
self.change_behind = None
self.dequeued_needing_change = False
self.current_build_set = BuildSet(self)
self.build_sets.append(self.current_build_set)
self.change_ahead = None
self.change_behind = None
def equals(self, other):
raise NotImplementedError()
@ -360,12 +460,6 @@ class Changeish(object):
def addBuild(self, build):
self.current_build_set.addBuild(build)
def delete(self):
if self.change_behind:
self.change_behind.change_ahead = None
self.change_behind = None
self.queue.dequeueChange(self)
class Change(Changeish):
is_reportable = True

View File

@ -293,6 +293,7 @@ class Scheduler(threading.Thread):
self._init()
self._parseConfig(self.config.get('zuul', 'layout_config'))
self._pause = False
self._reconfigure = False
self.reconfigure_complete_event.set()
def _areAllBuildsComplete(self):
@ -372,6 +373,15 @@ class Scheduler(threading.Thread):
def formatStatusHTML(self):
ret = '<html><pre>'
if self._pause:
ret += '<p><b>Queue only mode:</b> preparing to '
if self._reconfigure:
ret += 'reconfigure'
if self._exit:
ret += 'exit'
ret += ', queue length: %s' % self.trigger_event_queue.qsize()
ret += '</p>'
keys = self.pipelines.keys()
keys.sort()
for key in keys:
@ -379,7 +389,7 @@ class Scheduler(threading.Thread):
s = 'Pipeline: %s' % pipeline.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
ret += pipeline.manager.formatStatusHTML()
ret += pipeline.formatStatusHTML()
ret += '\n'
ret += '</pre></html>'
return ret
@ -458,31 +468,64 @@ class BasePipelineManager(object):
return False
def isChangeAlreadyInQueue(self, change):
for c in self.getChangesInQueue():
for c in self.pipeline.getChangesInQueue():
if change.equals(c):
return True
return False
def reportStart(self, change):
try:
self.log.info("Reporting start, action %s change %s" %
(self.start_action, change))
msg = "Starting %s jobs." % self.pipeline.name
ret = self.sched.trigger.report(change, msg, self.start_action)
if ret:
self.log.error("Reporting change start %s received: %s" %
(change, ret))
except:
self.log.exception("Exception while reporting start:")
def isChangeReadyToBeEnqueued(self, change):
return True
def enqueueChangesAhead(self, change):
return True
def enqueueChangesBehind(self, change):
return True
def addChange(self, change):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
return
self.log.debug("Adding change %s" % change)
self._addChange(change)
return True
def _addChange(self, change):
if self.start_action:
try:
self.log.info("Reporting start, action %s change %s" %
(self.start_action, change))
msg = "Starting %s jobs." % self.pipeline.name
ret = self.sched.trigger.report(change, msg, self.start_action)
if ret:
self.log.error("Reporting change start %s received: %s" %
(change, ret))
except:
self.log.exception("Exception while reporting start:")
self.launchJobs(change)
if not self.isChangeReadyToBeEnqueued(change):
self.log.debug("Change %s is not ready to be enqueued, ignoring" %
change)
return False
if not self.enqueueChangesAhead(change):
self.log.debug("Failed to enqueue changes ahead of" % change)
return False
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
return True
change_queue = self.pipeline.getQueue(change.project)
if change_queue:
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
if self.start_action:
self.reportStart(change)
change_queue.enqueueChange(change)
self.enqueueChangesBehind(change)
else:
self.log.error("Unable to find change queue for project %s" %
change.project)
return False
self.launchJobs()
def _launchJobs(self, change, jobs):
self.log.debug("Launching jobs for change %s" % change)
@ -510,7 +553,11 @@ class BasePipelineManager(object):
self.log.exception("Exception while launching job %s "
"for change %s:" % (job, change))
def launchJobs(self, change):
def launchJobs(self, change=None):
if not change:
for change in self.pipeline.getAllChanges():
self.launchJobs(change)
return
jobs = self.pipeline.findJobsToRun(change)
if jobs:
self._launchJobs(change, jobs)
@ -536,6 +583,9 @@ class BasePipelineManager(object):
self.updateBuildDescriptions(build.build_set)
return True
def handleFailedChange(self, change):
pass
def onBuildCompleted(self, build):
self.log.debug("Build %s completed" % build)
if build not in self.building_jobs:
@ -551,23 +601,58 @@ class BasePipelineManager(object):
self.pipeline.setResult(change, build)
self.log.info("Change %s status is now:\n %s" %
(change, self.formatStatus(change)))
(change, self.pipeline.formatStatus(change)))
if self.pipeline.areAllJobsComplete(change):
self.log.debug("All jobs for change %s are complete" % change)
self.possiblyReportChange(change)
else:
# There may be jobs that depended on jobs that are now complete
self.log.debug("All jobs for change %s are not yet complete" %
(change))
self.launchJobs(change)
if self.pipeline.didAnyJobFail(change):
self.handleFailedChange(change)
self.reportChanges()
self.launchJobs()
self.updateBuildDescriptions(build.build_set)
return True
def reportChanges(self):
self.log.debug("Searching for changes to report")
reported = False
for change in self.pipeline.getAllChanges():
self.log.debug(" checking %s" % change)
if self.pipeline.areAllJobsComplete(change):
self.log.debug(" possibly reporting %s" % change)
if self.possiblyReportChange(change):
reported = True
if reported:
self.reportChanges()
self.log.debug("Done searching for changes to report")
def possiblyReportChange(self, change):
self.log.debug("Possibly reporting change %s" % change)
self.reportChange(change)
# Even if a change isn't reportable, keep going so that it
# gets dequeued in the normal manner.
if change.is_reportable and change.reported:
self.log.debug("Change %s already reported" % change)
return False
change_ahead = change.change_ahead
if not change_ahead:
self.log.debug("Change %s is at the front of the queue, "
"reporting" % (change))
ret = self.reportChange(change)
if self.changes_merge:
succeeded = self.pipeline.didAllJobsSucceed(change)
merged = (not ret)
if merged:
merged = self.sched.trigger.isMerged(change, change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (change, succeeded, merged))
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (change))
self.handleFailedChange(change)
return True
self.log.debug("Removing reported change %s from queue" %
change)
change_queue = self.pipeline.getQueue(change.project)
change_queue.dequeueChange(change)
return True
def reportChange(self, change):
if not change.is_reportable:
@ -583,6 +668,7 @@ class BasePipelineManager(object):
action = self.failure_action
change.setReportedResult('FAILURE')
report = self.formatReport(change)
change.reported = True
try:
self.log.info("Reporting change %s, action: %s" %
(change, action))
@ -590,67 +676,12 @@ class BasePipelineManager(object):
if ret:
self.log.error("Reporting change %s received: %s" %
(change, ret))
else:
change.reported = True
except:
self.log.exception("Exception while reporting:")
change.setReportedResult('ERROR')
self.updateBuildDescriptions(change.current_build_set)
return ret
def getChangesInQueue(self):
changes = []
for build, change in self.building_jobs.items():
if change not in changes:
changes.append(change)
return changes
def formatStatusHTML(self):
changes = self.getChangesInQueue()
ret = ''
for change in changes:
ret += self.formatStatus(change, html=True)
return ret
def formatStatus(self, changeish, indent=0, html=False):
indent_str = ' ' * indent
ret = ''
if html and hasattr(changeish, 'url') and changeish.url is not None:
ret += '%sProject %s change <a href="%s">%s</a>\n' % (
indent_str,
changeish.project.name,
changeish.url,
changeish._id())
else:
ret += '%sProject %s change %s\n' % (indent_str,
changeish.project.name,
changeish._id())
for job in self.pipeline.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
if build:
result = build.result
else:
result = None
job_name = job.name
if not job.voting:
voting = ' (non-voting)'
else:
voting = ''
if html:
if build:
url = build.url
else:
url = None
if url is not None:
job_name = '<a href="%s">%s</a>' % (url, job_name)
ret += '%s %s: %s%s' % (indent_str, job_name, result, voting)
ret += '\n'
if changeish.change_ahead:
ret += '%sWaiting on:\n' % (indent_str)
ret += self.formatStatus(changeish.change_ahead,
indent + 2, html)
return ret
def formatReport(self, changeish):
ret = ''
if self.pipeline.didAllJobsSucceed(changeish):
@ -796,14 +827,24 @@ class BasePipelineManager(object):
class IndependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.IndependentPipelineManager")
changes_merge = False
def _postConfig(self):
super(IndependentPipelineManager, self)._postConfig()
change_queue = ChangeQueue(self.pipeline, dependent=False)
for project in self.pipeline.getProjects():
change_queue.addProject(project)
self.pipeline.addQueue(change_queue)
class DependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.DependentPipelineManager")
changes_merge = True
def __init__(self, *args, **kwargs):
super(DependentPipelineManager, self).__init__(*args, **kwargs)
self.change_queues = []
def _postConfig(self):
super(DependentPipelineManager, self)._postConfig()
@ -833,18 +874,45 @@ class DependentPipelineManager(BasePipelineManager):
self.log.debug("Keeping queue %s" % (a))
new_change_queues.append(a)
self.change_queues = new_change_queues
self.log.info(" Shared change queues:")
for x in self.change_queues:
self.log.info(" %s" % x)
for queue in new_change_queues:
self.pipeline.addQueue(queue)
self.log.info(" %s" % queue)
def getQueue(self, project):
for queue in self.change_queues:
if project in queue.projects:
return queue
self.log.error("Unable to find change queue for project %s" % project)
def isChangeReadyToBeEnqueued(self, change):
if not self.sched.trigger.canMerge(change,
self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
return True
def _checkForChangesNeededBy(self, change, enqueue=True):
def enqueueChangesBehind(self, change):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" Changeish does not support dependencies")
return
for needs in change.needed_by_changes:
if self.sched.trigger.canMerge(needs,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
to_enqueue.append(needs)
if not to_enqueue:
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
self.addChange(other_change)
def enqueueChangesAhead(self, change):
ret = self.checkForChangesNeededBy(change)
if ret in [True, False]:
return ret
self.log.debug(" Change %s must be merged ahead of %s" %
(ret, change))
return self.addChange(ret)
def checkForChangesNeededBy(self, change):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
@ -863,69 +931,16 @@ class DependentPipelineManager(BasePipelineManager):
if self.isChangeAlreadyInQueue(change.needs_change):
self.log.debug(" Needed change is already ahead in the queue")
return True
if enqueue and self.sched.trigger.canMerge(change.needs_change,
self.getSubmitAllowNeeds()):
# It can merge, so attempt to enqueue it _ahead_ of this change.
# If that works we can enqueue this change, otherwise, we can't.
self.log.debug(" Change %s must be merged ahead of %s" %
(change.needs_change, change))
return self.addChange(change.needs_change)
if self.sched.trigger.canMerge(change.needs_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s is needed" %
change.needs_change)
return change.needs_change
# The needed change can't be merged.
self.log.debug(" Change %s is needed but can not be merged" %
change.needs_change)
return False
def _checkForChangesNeeding(self, change):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" Changeish does not support dependencies")
return to_enqueue
for needs in change.needed_by_changes:
if self.sched.trigger.canMerge(needs,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
to_enqueue.append(needs)
if not to_enqueue:
self.log.debug(" No changes need %s" % change)
return to_enqueue
def addChange(self, change):
# Returns true if added (or not needed), false if failed to add
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
return True
if not self.sched.trigger.canMerge(change,
self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
if not self._checkForChangesNeededBy(change):
return False
to_enqueue = self._checkForChangesNeeding(change)
# TODO(jeblair): Consider re-ordering this so that the dependent
# changes aren't checked until closer when they are needed.
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s has been added to queue, ignoring" %
change)
return True
self.log.debug("Adding change %s" % change)
change_queue = self.getQueue(change.project)
if change_queue:
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
change_queue.enqueueChange(change)
self._addChange(change)
for needs in to_enqueue:
self.addChange(needs)
return True
return False
def _getDependentChanges(self, change):
orig_change = change
changes = []
@ -951,7 +966,6 @@ class DependentPipelineManager(BasePipelineManager):
self.pipeline.setUnableToMerge(change)
self.possiblyReportChange(change)
return
#TODO: remove this line after GERRIT_CHANGES is gone
dependent_changes = self._getDependentChanges(change)
for job in jobs:
@ -968,15 +982,6 @@ class DependentPipelineManager(BasePipelineManager):
self.log.exception("Exception while launching job %s "
"for change %s:" % (job, change))
def launchJobs(self, change):
jobs = self.pipeline.findJobsToRun(change)
if jobs:
self._launchJobs(change, jobs)
if change.change_behind:
self.log.debug("Launching jobs for change %s, behind change %s" %
(change.change_behind, change))
self.launchJobs(change.change_behind)
def cancelJobs(self, change, prime=True):
self.log.debug("Cancel jobs for change %s" % change)
to_remove = []
@ -1001,109 +1006,50 @@ class DependentPipelineManager(BasePipelineManager):
(change.change_behind, change))
self.cancelJobs(change.change_behind, prime=prime)
def onBuildCompleted(self, build):
change = self.building_jobs.get(build)
if not super(DependentPipelineManager, self).onBuildCompleted(build):
return False
if (change and change.change_behind and
self.pipeline.didAnyJobFail(change)):
# This or some other build failed. All changes behind this change
# will need to be retested. To free up resources cancel the builds
# behind this one as they will be rerun anyways.
if not change.change_ahead:
change_behind = change.change_behind
self.log.debug("Removing failed change %s from queue" % change)
change.delete()
def handleFailedChange(self, change):
# A build failed. All changes behind this change will need to
# be retested. To free up resources cancel the builds behind
# this one as they will be rerun anyways.
change_ahead = change.change_ahead
change_behind = change.change_behind
if not change_ahead:
# If we're at the head of the queue, allow changes to relaunch
if change_behind:
self.log.info("Canceling/relaunching jobs for change %s "
"behind failed change %s" %
(change_behind, change))
self.cancelJobs(change_behind)
change_behind = self._dequeueDependentChanges(change_behind)
if change_behind:
self.launchJobs(change_behind)
else:
self.cancelJobs(change.change_behind, prime=False)
self.log.debug("Canceling builds behind change: %s due to "
"failure." % change)
return True
def possiblyReportChange(self, change):
self.log.debug("Possibly reporting change %s" % change)
if change.reported:
self.log.debug("Change %s already reported" % change)
return
change_behind = change.change_behind
if not change.change_ahead:
self.log.debug("Change %s is at the front of the queue, "
"reporting" % (change))
ret = self.reportChange(change)
self.log.debug("Removing reported change %s from queue" % change)
change.delete()
merged = (not ret)
if merged:
merged = self.sched.trigger.isMerged(change, change.branch)
succeeded = self.pipeline.didAllJobsSucceed(change)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (change, succeeded, merged))
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (change))
# The merge or test failed, re-run all jobs behind this one
if change_behind:
self.log.info("Canceling/relaunching jobs for change %s "
"behind failed change %s" %
(change_behind, change))
self.cancelJobs(change_behind)
change_behind = self._dequeueDependentChanges(
change_behind)
if change_behind:
self.launchJobs(change_behind)
# If the change behind this is ready, notify
if change_behind and self.pipeline.areAllJobsComplete(change_behind):
self.log.info("Change %s behind change %s is ready, "
"possibly reporting" % (change_behind, change))
self.possiblyReportChange(change_behind)
# There may be jobs behind that were dependent on all of these builds
# completing.
self.dequeueChange(change)
elif change_behind:
self.launchJobs(change_behind)
self.log.debug("Canceling builds behind change: %s due to "
"failure." % change)
self.cancelJobs(change_behind, prime=False)
def _dequeueDependentChanges(self, change, failed=[]):
def dequeueChange(self, change):
self.log.debug("Removing change %s from queue" % change)
change_ahead = change.change_ahead
change_behind = change.change_behind
change_queue = self.pipeline.getQueue(change.project)
change_queue.dequeueChange(change)
if not change_ahead and not change.reported:
self.log.debug("Adding %s as a severed head" % change)
change_queue.addSeveredHead(change)
self.dequeueDependentChanges(change_behind)
def dequeueDependentChanges(self, change):
# When a change is dequeued after failing, dequeue any changes that
# depend on it.
# Keep track of the most recent non-deleted change
head = change
while change:
change_behind = change.change_behind
if not self._checkForChangesNeededBy(change, enqueue=False):
if self.checkForChangesNeededBy(change) is not True:
# It's not okay to enqueue this change, we should remove it.
if change == head:
head = change_behind
self.log.info("Dequeuing change %s because "
"it can no longer merge" % change)
change.delete()
change_queue = self.pipeline.getQueue(change.project)
change_queue.dequeueChange(change)
self.pipeline.setDequeuedNeedingChange(change)
self.reportChange(change)
# We don't need to recurse, because any changes that might
# be affected by the removal of this change are behind us
# in the queue, so we can continue walking backwards.
change = change_behind
return head
def getChangesInQueue(self):
changes = []
for shared_queue in self.change_queues:
changes.extend(shared_queue.queue)
return changes
def formatStatusHTML(self):
ret = ''
ret += '\n'
for queue in self.change_queues:
s = 'Shared queue: %s' % queue.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
if queue.queue:
ret += self.formatStatus(queue.queue[-1], html=True)
return ret