Add pipelines to data model.

This is a refactoring of the data model with the following goals:

 * Call top-level queues pipelines -- because too many other things
   are already called queues.  Pipelines convey the idea that there
   are a number of tasks to be performed (jobs), and that those
   tasks can be applied to different changes in parallel.
 * Eliminate references to queue_name from within a Change.
   Instead, methods that need to understand the pipeline that were
   accessed previously via the change are now located in the
   Pipeline class, taking a change as an argument.  Essentially,
   many methods involving changes (and builds, jobs, etc) must now
   be called in the context of a pipeline.
 * Add a changeish object to encompass the things that change and
   ref events have in common.

Change-Id: Iaf8ed0991f3c5b2bf7ded2c340a60725f7f98eaf
Reviewed-on: https://review.openstack.org/10757
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
This commit is contained in:
James E. Blair 2012-07-26 14:23:24 -07:00 committed by Jenkins
parent 170180fab3
commit 4aea70c1de
6 changed files with 461 additions and 416 deletions

View File

@ -1,6 +1,6 @@
queues:
pipelines:
- name: check
manager: IndependentQueueManager
manager: IndependentPipelineManager
trigger:
- event: patchset-uploaded
success:
@ -9,13 +9,13 @@ queues:
verified: -1
- name: post
manager: IndependentQueueManager
manager: IndependentPipelineManager
trigger:
- event: ref-updated
ref: ^(?!refs/).*$
- name: gate
manager: DependentQueueManager
manager: DependentPipelineManager
trigger:
- event: comment-added
approval:

View File

@ -916,15 +916,16 @@ class testScheduler(unittest.TestCase):
"Test that whether a change is ready to merge"
# TODO: move to test_gerrit (this is a unit test!)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
a = self.sched.trigger.getChange(1, 2, 'gate')
assert not a.can_merge
a = self.sched.trigger.getChange(1, 2)
mgr = self.sched.pipelines['gate'].manager
assert not self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
A.addApproval('CRVW', 2)
a = self.sched.trigger.getChange(1, 2, 'gate')
assert not a.can_merge
a = self.sched.trigger.getChange(1, 2)
assert not self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
A.addApproval('APRV', 1)
a = self.sched.trigger.getChange(1, 2, 'gate')
assert a.can_merge
a = self.sched.trigger.getChange(1, 2)
assert self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
return True

View File

@ -211,13 +211,13 @@ class Jenkins(object):
uuid = str(uuid1())
params = dict(UUID=uuid,
GERRIT_PROJECT=change.project.name)
if change.refspec:
if hasattr(change, 'refspec'):
changes_str = '^'.join(
['%s:%s:%s' % (c.project.name, c.branch, c.refspec)
for c in dependent_changes + [change]])
params['GERRIT_BRANCH'] = change.branch
params['GERRIT_CHANGES'] = changes_str
if change.ref:
if hasattr(change, 'ref'):
params['GERRIT_REFNAME'] = change.ref
params['GERRIT_OLDREV'] = change.oldrev
params['GERRIT_NEWREV'] = change.newrev

View File

@ -16,16 +16,283 @@ import re
import time
class Pipeline(object):
"""A top-level pipeline such as check, gate, post, etc."""
def __init__(self, name):
self.name = name
self.job_trees = {} # project -> JobTree
self.manager = None
def setManager(self, manager):
self.manager = manager
def addProject(self, project):
job_tree = JobTree(None) # Null job == job tree root
self.job_trees[project] = job_tree
return job_tree
def getProjects(self):
return self.job_trees.keys()
def getJobTree(self, project):
tree = self.job_trees.get(project)
return tree
def getJobs(self, changeish):
tree = self.job_trees[changeish.project]
if not tree:
return []
return changeish.filterJobs(tree.getJobs())
def _findJobsToRun(self, job_trees, changeish):
torun = []
if changeish.change_ahead:
# Only run jobs if any 'hold' jobs on the change ahead
# have completed successfully.
if self.isHoldingFollowingChanges(changeish.change_ahead):
return []
for tree in job_trees:
job = tree.job
result = None
if job:
if not job.changeMatches(changeish):
continue
build = changeish.current_build_set.getBuild(job.name)
if build:
result = build.result
else:
# There is no build for the root of this job tree,
# so we should run it.
torun.append(job)
# If there is no job, this is a null job tree, and we should
# run all of its jobs.
if result == 'SUCCESS' or not job:
torun.extend(self._findJobsToRun(tree.job_trees, changeish))
return torun
def findJobsToRun(self, changeish):
tree = self.job_trees[changeish.project]
if not tree:
return []
return self._findJobsToRun(tree.job_trees, changeish)
def areAllJobsComplete(self, changeish):
for job in self.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
if not build or not build.result:
return False
return True
def didAllJobsSucceed(self, changeish):
for job in self.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
if not build:
return False
if build.result != 'SUCCESS':
return False
return True
def didAnyJobFail(self, changeish):
for job in self.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
if build and build.result == 'FAILURE':
return True
return False
def isHoldingFollowingChanges(self, changeish):
for job in self.getJobs(changeish):
if not job.hold_following_changes:
continue
build = changeish.current_build_set.getBuild(job.name)
if not build:
return True
if build.result != 'SUCCESS':
return True
if not changeish.change_ahead:
return False
return self.isHoldingFollowingChanges(changeish.change_ahead)
def formatStatus(self, changeish, indent=0, html=False):
indent_str = ' ' * indent
ret = ''
if html and changeish.url is not None:
ret += '%sProject %s change <a href="%s">%s</a>\n' % (
indent_str,
changeish.project.name,
changeish.url,
changeish._id())
else:
ret += '%sProject %s change %s\n' % (indent_str,
changeish.project.name,
changeish._id())
for job in self.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
if build:
result = build.result
else:
result = None
job_name = job.name
if html:
if build:
url = build.url
else:
url = None
if url is not None:
job_name = '<a href="%s">%s</a>' % (url, job_name)
ret += '%s %s: %s' % (indent_str, job_name, result)
ret += '\n'
if changeish.change_ahead:
ret += '%sWaiting on:\n' % (indent_str)
ret += self.formatStatus(changeish.change_ahead,
indent + 2, html)
return ret
def formatReport(self, changeish):
ret = ''
if self.didAllJobsSucceed(changeish):
ret += 'Build successful\n\n'
else:
ret += 'Build failed\n\n'
for job in self.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
result = build.result
if result == 'SUCCESS' and job.success_message:
result = job.success_message
elif result == 'FAILURE' and job.failure_message:
result = job.failure_message
url = build.url
if not url:
url = job.name
ret += '- %s : %s\n' % (url, result)
return ret
def formatDescription(self, build):
concurrent_changes = ''
concurrent_builds = ''
other_builds = ''
for change in build.build_set.other_changes:
concurrent_changes += '<li><a href="{change.url}">\
{change.number},{change.patchset}</a></li>'.format(
change=change)
change = build.build_set.change
for build in build.build_set.getBuilds():
if build.base_url:
concurrent_builds += """\
<li>
<a href="{build.base_url}">
{build.job.name} #{build.number}</a>: {build.result}
</li>
""".format(build=build)
else:
concurrent_builds += """\
<li>
{build.job.name}: {build.result}
</li>""".format(build=build)
if build.build_set.previous_build_set:
other_build = build.build_set.previous_build_set.getBuild(
build.job.name)
if other_build:
other_builds += """\
<li>
Preceded by: <a href="{build.base_url}">
{build.job.name} #{build.number}</a>
</li>
""".format(build=other_build)
if build.build_set.next_build_set:
other_build = build.build_set.next_build_set.getBuild(
build.job.name)
if other_build:
other_builds += """\
<li>
Succeeded by: <a href="{build.base_url}">
{build.job.name} #{build.number}</a>
</li>
""".format(build=other_build)
result = build.build_set.result
if change.number:
ret = """\
<p>
Triggered by change:
<a href="{change.url}">{change.number},{change.patchset}</a><br/>
Branch: <b>{change.branch}</b><br/>
Pipeline: <b>{self.name}</b>
</p>"""
else:
ret = """\
<p>
Triggered by reference:
{change.ref}</a><br/>
Old revision: <b>{change.oldrev}</b><br/>
New revision: <b>{change.newrev}</b><br/>
Pipeline: <b>{self.name}</b>
</p>"""
if concurrent_changes:
ret += """\
<p>
Other changes tested concurrently with this change:
<ul>{concurrent_changes}</ul>
</p>
"""
if concurrent_builds:
ret += """\
<p>
All builds for this change set:
<ul>{concurrent_builds}</ul>
</p>
"""
if other_builds:
ret += """\
<p>
Other build sets for this change:
<ul>{other_builds}</ul>
</p>
"""
if result:
ret += """\
<p>
Reported result: <b>{result}</b>
</p>
"""
ret = ret.format(**locals())
return ret
def setResult(self, changeish, build):
if build.result != 'SUCCESS':
# Get a JobTree from a Job so we can find only its dependent jobs
root = self.getJobTree(changeish.project)
tree = root.getJobTreeForJob(build.job)
for job in tree.getJobs():
fakebuild = Build(job, None)
fakebuild.result = 'SKIPPED'
changeish.addBuild(fakebuild)
class ChangeQueue(object):
def __init__(self, queue_name):
"""DependentPipelines have multiple parallel queues shared by
different projects; this is one of them. For instance, there may
a queue shared by interrelated projects foo and bar, and a second
queue for independent project baz. Pipelines have one or more
PipelineQueues."""
def __init__(self, pipeline):
self.pipeline = pipeline
self.name = ''
self.queue_name = queue_name
self.projects = []
self._jobs = set()
self.queue = []
def __repr__(self):
return '<ChangeQueue %s: %s>' % (self.queue_name, self.name)
return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
def getJobs(self):
return self._jobs
@ -36,7 +303,7 @@ class ChangeQueue(object):
names = [x.name for x in self.projects]
names.sort()
self.name = ', '.join(names)
self._jobs |= set(project.getJobs(self.queue_name))
self._jobs |= set(self.pipeline.getJobTree(project).getJobs())
def enqueueChange(self, change):
if self.queue:
@ -54,6 +321,17 @@ class ChangeQueue(object):
self.addProject(project)
class Project(object):
def __init__(self, name):
self.name = name
def __str__(self):
return self.name
def __repr__(self):
return '<Project %s>' % (self.name)
class Job(object):
def __init__(self, name):
# If you add attributes here, be sure to add them to the copy method.
@ -88,119 +366,6 @@ class Job(object):
return False
class Build(object):
def __init__(self, job, uuid):
self.job = job
self.uuid = uuid
self.base_url = None
self.url = None
self.number = None
self.result = None
self.build_set = None
self.launch_time = time.time()
def __repr__(self):
return '<Build %s of %s>' % (self.uuid, self.job.name)
def formatDescription(self):
concurrent_changes = ''
concurrent_builds = ''
other_builds = ''
for change in self.build_set.other_changes:
concurrent_changes += '<li><a href="{change.url}">\
{change.number},{change.patchset}</a></li>'.format(
change=change)
change = self.build_set.change
for build in self.build_set.getBuilds():
if build.base_url:
concurrent_builds += """\
<li>
<a href="{build.base_url}">
{build.job.name} #{build.number}</a>: {build.result}
</li>
""".format(build=build)
else:
concurrent_builds += """\
<li>
{build.job.name}: {build.result}
</li>""".format(build=build)
if self.build_set.previous_build_set:
build = self.build_set.previous_build_set.getBuild(self.job.name)
if build:
other_builds += """\
<li>
Preceded by: <a href="{build.base_url}">
{build.job.name} #{build.number}</a>
</li>
""".format(build=build)
if self.build_set.next_build_set:
build = self.build_set.next_build_set.getBuild(self.job.name)
if build:
other_builds += """\
<li>
Succeeded by: <a href="{build.base_url}">
{build.job.name} #{build.number}</a>
</li>
""".format(build=build)
result = self.build_set.result
if change.number:
ret = """\
<p>
Triggered by change:
<a href="{change.url}">{change.number},{change.patchset}</a><br/>
Branch: <b>{change.branch}</b><br/>
Pipeline: <b>{change.queue_name}</b>
</p>"""
else:
ret = """\
<p>
Triggered by reference:
{change.ref}</a><br/>
Old revision: <b>{change.oldrev}</b><br/>
New revision: <b>{change.newrev}</b><br/>
Pipeline: <b>{change.queue_name}</b>
</p>"""
if concurrent_changes:
ret += """\
<p>
Other changes tested concurrently with this change:
<ul>{concurrent_changes}</ul>
</p>
"""
if concurrent_builds:
ret += """\
<p>
All builds for this change set:
<ul>{concurrent_builds}</ul>
</p>
"""
if other_builds:
ret += """\
<p>
Other build sets for this change:
<ul>{other_builds}</ul>
</p>
"""
if result:
ret += """\
<p>
Reported result: <b>{result}</b>
</p>
"""
ret = ret.format(**locals())
return ret
class JobTree(object):
""" A JobTree represents an instance of one Job, and holds JobTrees
whose jobs should be run if that Job succeeds. A root node of a
@ -233,34 +398,19 @@ class JobTree(object):
return None
class Project(object):
def __init__(self, name):
self.name = name
self.job_trees = {} # Queue -> JobTree
def __str__(self):
return self.name
class Build(object):
def __init__(self, job, uuid):
self.job = job
self.uuid = uuid
self.base_url = None
self.url = None
self.number = None
self.result = None
self.build_set = None
self.launch_time = time.time()
def __repr__(self):
return '<Project %s>' % (self.name)
def addQueue(self, name):
self.job_trees[name] = JobTree(None)
return self.job_trees[name]
def hasQueue(self, name):
if name in self.job_trees:
return True
return False
def getJobTreeForQueue(self, name):
return self.job_trees.get(name, None)
def getJobs(self, queue_name):
tree = self.getJobTreeForQueue(queue_name)
if not tree:
return []
return tree.getJobs()
return '<Build %s of %s>' % (self.uuid, self.job.name)
class BuildSet(object):
@ -294,111 +444,24 @@ class BuildSet(object):
return [self.builds.get(x) for x in keys]
class Change(object):
def __init__(self, queue_name, project):
self.queue_name = queue_name
self.project = project
self.branch = None
self.number = None
self.url = None
self.patchset = None
self.refspec = None
self.ref = None
self.oldrev = None
self.newrev = None
self.reported = False
self.needs_change = None
self.needed_by_changes = []
self.is_current_patchset = True
self.can_merge = False
self.is_merged = False
class Changeish(object):
"""Something like a change; either a change or a ref"""
is_reportable = False
def __init__(self, project):
self.project = project
self.build_sets = []
self.change_ahead = None
self.change_behind = None
self.current_build_set = BuildSet(self)
self.build_sets.append(self.current_build_set)
def _id(self):
if self.number:
return '%s,%s' % (self.number, self.patchset)
return self.newrev
def __repr__(self):
return '<Change 0x%x %s>' % (id(self), self._id())
def equals(self, other):
if self.number:
if (self.number == other.number and
self.patchset == other.patchset):
return True
return False
if self.ref:
if (self.ref == other.ref and
self.newrev == other.newrev):
return True
return False
return False
raise NotImplementedError()
def _filterJobs(self, jobs):
def filterJobs(self, jobs):
return filter(lambda job: job.changeMatches(self), jobs)
def formatStatus(self, indent=0, html=False):
indent_str = ' ' * indent
ret = ''
if html and self.url is not None:
ret += '%sProject %s change <a href="%s">%s</a>\n' % (indent_str,
self.project.name,
self.url,
self._id())
else:
ret += '%sProject %s change %s\n' % (indent_str,
self.project.name,
self._id())
for job in self._filterJobs(self.project.getJobs(self.queue_name)):
build = self.current_build_set.getBuild(job.name)
if build:
result = build.result
else:
result = None
job_name = job.name
if html:
if build:
url = build.url
else:
url = None
if url is not None:
job_name = '<a href="%s">%s</a>' % (url, job_name)
ret += '%s %s: %s' % (indent_str, job_name, result)
ret += '\n'
if self.change_ahead:
ret += '%sWaiting on:\n' % (indent_str)
ret += self.change_ahead.formatStatus(indent + 2, html)
return ret
def formatReport(self):
ret = ''
if self.didAllJobsSucceed():
ret += 'Build successful\n\n'
else:
ret += 'Build failed\n\n'
for job in self._filterJobs(self.project.getJobs(self.queue_name)):
build = self.current_build_set.getBuild(job.name)
result = build.result
if result == 'SUCCESS' and job.success_message:
result = job.success_message
elif result == 'FAILURE' and job.failure_message:
result = job.failure_message
url = build.url
if not url:
url = job.name
ret += '- %s : %s\n' % (url, result)
return ret
def setReportedResult(self, result):
self.current_build_set.result = result
def resetAllBuilds(self):
old = self.current_build_set
self.current_build_set.result = 'CANCELED'
@ -410,88 +473,6 @@ class Change(object):
def addBuild(self, build):
self.current_build_set.addBuild(build)
def setResult(self, build):
if build.result != 'SUCCESS':
# Get a JobTree from a Job so we can find only its dependent jobs
root = self.project.getJobTreeForQueue(self.queue_name)
tree = root.getJobTreeForJob(build.job)
for job in tree.getJobs():
fakebuild = Build(job, None)
fakebuild.result = 'SKIPPED'
self.addBuild(fakebuild)
def isHoldingFollowingChanges(self):
tree = self.project.getJobTreeForQueue(self.queue_name)
for job in self._filterJobs(tree.getJobs()):
if not job.hold_following_changes:
continue
build = self.current_build_set.getBuild(job.name)
if not build:
return True
if build.result != 'SUCCESS':
return True
if not self.change_ahead:
return False
return self.change_ahead.isHoldingFollowingChanges()
def _findJobsToRun(self, job_trees):
torun = []
if self.change_ahead:
# Only run our jobs if any 'hold' jobs on the change ahead
# have completed successfully.
if self.change_ahead.isHoldingFollowingChanges():
return []
for tree in job_trees:
job = tree.job
if not job.changeMatches(self):
continue
result = None
if job:
build = self.current_build_set.getBuild(job.name)
if build:
result = build.result
else:
# There is no build for the root of this job tree,
# so we should run it.
torun.append(job)
# If there is no job, this is a null job tree, and we should
# run all of its jobs.
if result == 'SUCCESS' or not job:
torun.extend(self._findJobsToRun(tree.job_trees))
return torun
def findJobsToRun(self):
tree = self.project.getJobTreeForQueue(self.queue_name)
if not tree:
return []
return self._findJobsToRun(tree.job_trees)
def areAllJobsComplete(self):
tree = self.project.getJobTreeForQueue(self.queue_name)
for job in self._filterJobs(tree.getJobs()):
build = self.current_build_set.getBuild(job.name)
if not build or not build.result:
return False
return True
def didAllJobsSucceed(self):
tree = self.project.getJobTreeForQueue(self.queue_name)
for job in self._filterJobs(tree.getJobs()):
build = self.current_build_set.getBuild(job.name)
if not build:
return False
if build.result != 'SUCCESS':
return False
return True
def didAnyJobFail(self):
tree = self.project.getJobTreeForQueue(self.queue_name)
for job in self._filterJobs(tree.getJobs()):
build = self.current_build_set.getBuild(job.name)
if build and build.result == 'FAILURE':
return True
return False
def delete(self):
if self.change_behind:
self.change_behind.change_ahead = None
@ -499,6 +480,58 @@ class Change(object):
self.queue.dequeueChange(self)
class Change(Changeish):
is_reportable = True
def __init__(self, project):
super(Change, self).__init__(project)
self.branch = None
self.number = None
self.url = None
self.patchset = None
self.refspec = None
self.reported = False
self.needs_change = None
self.needed_by_changes = []
self.is_current_patchset = True
self.can_merge = False
self.is_merged = False
def _id(self):
if self.number:
return '%s,%s' % (self.number, self.patchset)
return self.newrev
def __repr__(self):
return '<Change 0x%x %s>' % (id(self), self._id())
def equals(self, other):
if (self.number == other.number and
self.patchset == other.patchset):
return True
return False
def setReportedResult(self, result):
self.current_build_set.result = result
class Ref(Changeish):
is_reportable = False
def __init__(self, project):
super(Change, self).__init__(project)
self.ref = None
self.oldrev = None
self.newrev = None
def equals(self, other):
if (self.ref == other.ref and
self.newrev == other.newrev):
return True
return False
class TriggerEvent(object):
def __init__(self):
self.data = None
@ -532,16 +565,15 @@ class TriggerEvent(object):
return ret
def getChange(self, manager_name, project, trigger):
def getChange(self, project, trigger):
# TODO: make the scheduler deal with events (which may have
# changes) rather than changes so that we don't have to create
# "fake" changes for events that aren't associated with changes.
if self.change_number:
change = trigger.getChange(self.change_number, self.patch_number,
manager_name)
change = trigger.getChange(self.change_number, self.patch_number)
if self.ref:
change = Change(manager_name, project)
change = Ref(project)
change.ref = self.ref
change.oldrev = self.oldrev
change.newrev = self.newrev

View File

@ -20,7 +20,7 @@ import re
import threading
import yaml
from model import Job, Project, ChangeQueue, EventFilter
from model import Pipeline, Job, Project, ChangeQueue, EventFilter
class Scheduler(threading.Thread):
@ -43,7 +43,7 @@ class Scheduler(threading.Thread):
self._init()
def _init(self):
self.queue_managers = {}
self.pipelines = {}
self.jobs = {}
self.projects = {}
self.metajobs = {}
@ -78,14 +78,16 @@ class Scheduler(threading.Thread):
fn = os.path.expanduser(fn)
execfile(fn, self._config_env)
for config_queue in data['queues']:
manager = globals()[config_queue['manager']](self,
config_queue['name'])
self.queue_managers[config_queue['name']] = manager
manager.success_action = config_queue.get('success')
manager.failure_action = config_queue.get('failure')
manager.start_action = config_queue.get('start')
for trigger in toList(config_queue['trigger']):
for conf_pipeline in data.get('pipelines', []):
pipeline = Pipeline(conf_pipeline['name'])
manager = globals()[conf_pipeline['manager']](self, pipeline)
pipeline.setManager(manager)
self.pipelines[conf_pipeline['name']] = pipeline
manager.success_action = conf_pipeline.get('success')
manager.failure_action = conf_pipeline.get('failure')
manager.start_action = conf_pipeline.get('start')
for trigger in toList(conf_pipeline['trigger']):
approvals = {}
for approval_dict in toList(trigger.get('approval')):
for k, v in approval_dict.items():
@ -95,7 +97,7 @@ class Scheduler(threading.Thread):
refs=toList(trigger.get('ref')),
approvals=approvals,
comment_filters=toList(
trigger.get('comment_filter')))
trigger.get('comment_filter')))
manager.event_filters.append(f)
for config_job in data['jobs']:
@ -137,10 +139,10 @@ class Scheduler(threading.Thread):
for config_project in data['projects']:
project = Project(config_project['name'])
self.projects[config_project['name']] = project
for qname in self.queue_managers.keys():
if qname in config_project:
job_tree = project.addQueue(qname)
config_jobs = config_project[qname]
for pipeline in self.pipelines.values():
if pipeline.name in config_project:
job_tree = pipeline.addProject(project)
config_jobs = config_project[pipeline.name]
add_jobs(job_tree, config_jobs)
# All jobs should be defined at this point, get rid of
@ -149,8 +151,8 @@ class Scheduler(threading.Thread):
# TODO(jeblair): check that we don't end up with jobs like
# "foo - bar" because a ':' is missing in the yaml for a dependent job
for manager in self.queue_managers.values():
manager._postConfig()
for pipeline in self.pipelines.values():
pipeline.manager._postConfig()
def getJob(self, name):
if name in self.jobs:
@ -275,9 +277,9 @@ class Scheduler(threading.Thread):
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
for manager in self.queue_managers.values():
for build in manager.building_jobs.keys():
self.log.debug("%s waiting on %s" % (manager, build))
for pipeline in self.pipelines.values():
for build in pipeline.manager.building_jobs.keys():
self.log.debug("%s waiting on %s" % (pipeline.manager, build))
waiting = True
if not waiting:
self.log.debug("All builds are complete")
@ -325,49 +327,49 @@ class Scheduler(threading.Thread):
self.log.warning("Project %s not found" % event.project_name)
return
for manager in self.queue_managers.values():
if not manager.eventMatches(event):
self.log.debug("Event %s ignored by %s" % (event, manager))
for pipeline in self.pipelines.values():
if not pipeline.manager.eventMatches(event):
self.log.debug("Event %s ignored by %s" % (event, pipeline))
continue
change = event.getChange(manager.name, project, self.trigger)
change = event.getChange(project, self.trigger)
self.log.info("Adding %s, %s to %s" %
(project, change, manager))
manager.addChange(change)
(project, change, pipeline))
pipeline.manager.addChange(change)
def process_result_queue(self):
self.log.debug("Fetching result event")
event_type, build = self.result_event_queue.get()
self.log.debug("Processing result event %s" % build)
for manager in self.queue_managers.values():
for pipeline in self.pipelines.values():
if event_type == 'started':
if manager.onBuildStarted(build):
if pipeline.manager.onBuildStarted(build):
return
elif event_type == 'completed':
if manager.onBuildCompleted(build):
if pipeline.manager.onBuildCompleted(build):
return
self.log.warning("Build %s not found by any queue manager" % (build))
def formatStatusHTML(self):
ret = '<html><pre>'
keys = self.queue_managers.keys()
keys = self.pipelines.keys()
keys.sort()
for key in keys:
manager = self.queue_managers[key]
s = 'Queue: %s' % manager.name
pipeline = self.pipelines[key]
s = 'Pipeline: %s' % pipeline.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
ret += manager.formatStatusHTML()
ret += pipeline.formatStatusHTML()
ret += '\n'
ret += '</pre></html>'
return ret
class BaseQueueManager(object):
log = logging.getLogger("zuul.BaseQueueManager")
class BasePipelineManager(object):
log = logging.getLogger("zuul.BasePipelineManager")
def __init__(self, sched, name):
def __init__(self, sched, pipeline):
self.sched = sched
self.name = name
self.pipeline = pipeline
self.building_jobs = {}
self.event_filters = []
self.success_action = {}
@ -378,7 +380,7 @@ class BaseQueueManager(object):
return "<%s %s>" % (self.__class__.__name__, self.name)
def _postConfig(self):
self.log.info("Configured Queue Manager %s" % self.name)
self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
self.log.info(" Events:")
for e in self.event_filters:
self.log.info(" %s" % e)
@ -403,9 +405,10 @@ class BaseQueueManager(object):
log_jobs(x, indent + 2)
for p in self.sched.projects.values():
if p.hasQueue(self.name):
tree = self.pipeline.getJobTree(p)
if tree:
self.log.info(" %s" % p)
log_jobs(p.getJobTreeForQueue(self.name))
log_jobs(tree)
if self.start_action:
self.log.info(" On start:")
self.log.info(" %s" % self.start_action)
@ -421,7 +424,10 @@ class BaseQueueManager(object):
# "needed" in the submit records for a change, with respect
# to this queue. In other words, the list of review labels
# this queue itself is likely to set before submitting.
return self.success_action.keys()
if self.success_action:
return self.success_action.keys()
else:
return {}
def eventMatches(self, event):
for ef in self.event_filters:
@ -447,7 +453,7 @@ class BaseQueueManager(object):
try:
self.log.info("Reporting start, action %s change %s" %
(self.start_action, change))
msg = "Starting %s jobs." % self.name
msg = "Starting %s jobs." % self.pipeline.name
ret = self.sched.trigger.report(change, msg, self.start_action)
if ret:
self.log.error("Reporting change start %s received: %s" %
@ -458,7 +464,7 @@ class BaseQueueManager(object):
def launchJobs(self, change):
self.log.debug("Launching jobs for change %s" % change)
for job in change.findJobsToRun():
for job in self.pipeline.findJobsToRun(change):
self.log.debug("Found job %s for change %s" % (job, change))
try:
build = self.sched.launcher.launch(job, change)
@ -472,12 +478,12 @@ class BaseQueueManager(object):
def updateBuildDescriptions(self, build_set):
for build in build_set.getBuilds():
desc = build.formatDescription()
desc = self.pipeline.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
if build_set.previous_build_set:
for build in build_set.previous_build_set.getBuilds():
desc = build.formatDescription()
desc = self.pipeline.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
def onBuildStarted(self, build):
@ -504,11 +510,11 @@ class BaseQueueManager(object):
del self.building_jobs[build]
change.setResult(build)
self.pipeline.setResult(change, build)
self.log.info("Change %s status is now:\n %s" %
(change, change.formatStatus()))
(change, self.pipeline.formatStatus(change)))
if change.areAllJobsComplete():
if self.pipeline.areAllJobsComplete(change):
self.log.debug("All jobs for change %s are complete" % change)
self.possiblyReportChange(change)
else:
@ -525,11 +531,13 @@ class BaseQueueManager(object):
self.reportChange(change)
def reportChange(self, change):
self.log.debug("Reporting change %s" % change)
if not change.is_reportable:
return False
if change.reported:
return 0
self.log.debug("Reporting change %s" % change)
ret = None
if change.didAllJobsSucceed():
if self.pipeline.didAllJobsSucceed(change):
action = self.success_action
change.setReportedResult('SUCCESS')
else:
@ -539,7 +547,7 @@ class BaseQueueManager(object):
self.log.info("Reporting change %s, action: %s" %
(change, action))
ret = self.sched.trigger.report(change,
change.formatReport(),
self.pipeline.formatReport(change),
action)
if ret:
self.log.error("Reporting change %s received: %s" %
@ -563,35 +571,34 @@ class BaseQueueManager(object):
changes = self.getChangesInQueue()
ret = ''
for change in changes:
ret += change.formatStatus(html=True)
ret += self.pipeline.formatStatus(change, html=True)
return ret
class IndependentQueueManager(BaseQueueManager):
log = logging.getLogger("zuul.IndependentQueueManager")
class IndependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.IndependentPipelineManager")
class DependentQueueManager(BaseQueueManager):
log = logging.getLogger("zuul.DependentQueueManager")
class DependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.DependentPipelineManager")
def __init__(self, *args, **kwargs):
super(DependentQueueManager, self).__init__(*args, **kwargs)
super(DependentPipelineManager, self).__init__(*args, **kwargs)
self.change_queues = []
def _postConfig(self):
super(DependentQueueManager, self)._postConfig()
super(DependentPipelineManager, self)._postConfig()
self.buildChangeQueues()
def buildChangeQueues(self):
self.log.debug("Building shared change queues")
change_queues = []
for project in self.sched.projects.values():
if project.hasQueue(self.name):
change_queue = ChangeQueue(self.name)
change_queue.addProject(project)
change_queues.append(change_queue)
self.log.debug("Created queue: %s" % change_queue)
for project in self.pipeline.getProjects():
change_queue = ChangeQueue(self.pipeline)
change_queue.addProject(project)
change_queues.append(change_queue)
self.log.debug("Created queue: %s" % change_queue)
self.log.debug("Combining shared queues")
new_change_queues = []
@ -622,6 +629,9 @@ class DependentQueueManager(BaseQueueManager):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
if not hasattr(change, 'needs_change'):
self.log.debug(" Changeish does not support dependencies")
return True
if not change.needs_change:
self.log.debug(" No changes needed")
return True
@ -635,7 +645,8 @@ class DependentQueueManager(BaseQueueManager):
if change.needs_change in change_queue.queue:
self.log.debug(" Needed change is already ahead in the queue")
return True
if change.needs_change.can_merge:
if self.sched.trigger.canMerge(change.needs_change,
self.getSubmitAllowNeeds()):
# It can merge, so attempt to enqueue it _ahead_ of this change.
# If that works we can enqueue this change, otherwise, we can't.
self.log.debug(" Change %s must be merged ahead of %s" %
@ -649,8 +660,12 @@ class DependentQueueManager(BaseQueueManager):
def _checkForChangesNeeding(self, change):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" Changeish does not support dependencies")
return to_enqueue
for needs in change.needed_by_changes:
if needs.can_merge:
if self.sched.trigger.canMerge(needs,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
to_enqueue.append(needs)
@ -664,7 +679,8 @@ class DependentQueueManager(BaseQueueManager):
self.log.debug("Change %s is already in queue, ignoring" % change)
return True
if not change.can_merge:
if not self.sched.trigger.canMerge(change,
self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
@ -705,7 +721,7 @@ class DependentQueueManager(BaseQueueManager):
def launchJobs(self, change):
self.log.debug("Launching jobs for change %s" % change)
dependent_changes = self._getDependentChanges(change)
for job in change.findJobsToRun():
for job in self.pipeline.findJobsToRun(change):
self.log.debug("Found job %s for change %s" % (job, change))
try:
build = self.sched.launcher.launch(job,
@ -749,9 +765,10 @@ class DependentQueueManager(BaseQueueManager):
def onBuildCompleted(self, build):
change = self.building_jobs.get(build)
if not super(DependentQueueManager, self).onBuildCompleted(build):
if not super(DependentPipelineManager, self).onBuildCompleted(build):
return False
if change and change.change_behind and change.didAnyJobFail():
if (change and change.change_behind and
self.pipeline.didAnyJobFail(change)):
# This or some other build failed. All changes behind this change
# will need to be retested. To free up resources cancel the builds
# behind this one as they will be rerun anyways.
@ -775,7 +792,7 @@ class DependentQueueManager(BaseQueueManager):
merged = (not ret)
if merged:
merged = self.sched.trigger.isMerged(change, change.branch)
succeeded = change.didAllJobsSucceed()
succeeded = self.pipeline.didAllJobsSucceed(change)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (change, succeeded, merged))
@ -790,7 +807,7 @@ class DependentQueueManager(BaseQueueManager):
self.cancelJobs(change_behind)
self.launchJobs(change_behind)
# If the change behind this is ready, notify
if change_behind and change_behind.areAllJobsComplete():
if change_behind and self.pipeline.areAllJobsComplete(change_behind):
self.log.info("Change %s behind change %s is ready, "
"possibly reporting" % (change_behind, change))
self.possiblyReportChange(change_behind)

View File

@ -170,7 +170,7 @@ class Gerrit(object):
if status == 'MERGED' or status == 'SUBMITTED':
return True
def _canMerge(self, change, allow_needs):
def canMerge(self, change, allow_needs):
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
@ -206,15 +206,13 @@ class Gerrit(object):
return False
return True
def getChange(self, number, patchset, queue_name, changes=None):
# TODO: queue_name is screwing up the data model, refactor
# the queue context so it isn't necessary.
def getChange(self, number, patchset, changes=None):
self.log.info("Getting information for %s,%s" % (number, patchset))
if changes is None:
changes = {}
data = self.gerrit.query(number)
project = self.sched.projects[data['project']]
change = Change(queue_name, project)
change = Change(project)
change._data = data
change.number = number
@ -233,9 +231,6 @@ class Gerrit(object):
else:
change.is_current_patchset = False
manager = self.sched.queue_managers[queue_name]
change.can_merge = self._canMerge(change,
manager.getSubmitAllowNeeds())
change.is_merged = self._isMerged(change)
if change.is_merged:
# This change is merged, so we don't need to look any further
@ -249,7 +244,7 @@ class Gerrit(object):
key = '%s,%s' % (num, ps)
if key in changes:
return changes.get(key)
c = self.getChange(num, ps, queue_name, changes)
c = self.getChange(num, ps, changes)
return c
if 'dependsOn' in data: