Add layout config object to model

Store the results of the configuration (pipelines, jobs, and all)
in a new Layout object.  Return such an object from the parseConfig
method in the scheduler.  This is a first step to reloading the
configuration on the fly -- it supports holding multiple
configurations in memory at once.

Change-Id: Ide56cddecbdbecdc4ed77b917d0b9bb24b1753d5
Reviewed-on: https://review.openstack.org/35323
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
This commit is contained in:
James E. Blair 2013-07-01 12:44:14 -04:00 committed by Jenkins
parent fee8d6588a
commit eff881637f
4 changed files with 67 additions and 58 deletions

View File

@ -860,7 +860,7 @@ class TestScheduler(testtools.TestCase):
def registerJobs(self):
count = 0
for job in self.sched.jobs.keys():
for job in self.sched.layout.jobs.keys():
self.worker.registerFunction('build:' + job)
count += 1
self.worker.registerFunction('stop:' + self.worker.worker_id)
@ -1003,7 +1003,7 @@ class TestScheduler(testtools.TestCase):
def assertEmptyQueues(self):
# Make sure there are no orphaned jobs
for pipeline in self.sched.pipelines.values():
for pipeline in self.sched.layout.pipelines.values():
for queue in pipeline.queues:
if len(queue.queue) != 0:
print 'pipeline %s queue %s contents %s' % (
@ -1396,7 +1396,7 @@ class TestScheduler(testtools.TestCase):
# TODO: move to test_gerrit (this is a unit test!)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
a = self.sched.trigger.getChange(1, 2)
mgr = self.sched.pipelines['gate'].manager
mgr = self.sched.layout.pipelines['gate'].manager
assert not self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds())
A.addApproval('CRVW', 2)

View File

@ -784,3 +784,27 @@ class EventFilter(object):
if not matches_approval:
return False
return True
class Layout(object):
def __init__(self):
self.projects = {}
self.pipelines = {}
self.jobs = {}
self.metajobs = {}
def getJob(self, name):
if name in self.jobs:
return self.jobs[name]
job = Job(name)
if name.startswith('^'):
# This is a meta-job
regex = re.compile(name)
self.metajobs[regex] = job
else:
# Apply attributes from matching meta-jobs
for regex, metajob in self.metajobs.items():
if regex.match(name):
job.copy(metajob)
self.jobs[name] = job
return job

View File

@ -28,7 +28,7 @@ import yaml
import layoutvalidator
import model
from model import Pipeline, Job, Project, ChangeQueue, EventFilter
from model import Pipeline, Project, ChangeQueue, EventFilter
import merger
statsd = extras.try_import('statsd.statsd')
@ -81,11 +81,7 @@ class Scheduler(threading.Thread):
self._init()
def _init(self):
self.pipelines = {}
self.jobs = {}
self.projects = {}
self.project_templates = {}
self.metajobs = {}
self.layout = model.Layout()
def stop(self):
self._stopped = True
@ -96,6 +92,9 @@ class Scheduler(threading.Thread):
self._parseConfig(config_path)
def _parseConfig(self, config_path):
layout = model.Layout()
project_templates = {}
def toList(item):
if not item:
return []
@ -114,7 +113,7 @@ class Scheduler(threading.Thread):
validator = layoutvalidator.LayoutValidator()
validator.validate(data)
self._config_env = {}
config_env = {}
for include in data.get('includes', []):
if 'python-file' in include:
fn = include['python-file']
@ -122,7 +121,7 @@ class Scheduler(threading.Thread):
base = os.path.dirname(config_path)
fn = os.path.join(base, fn)
fn = os.path.expanduser(fn)
execfile(fn, self._config_env)
execfile(fn, config_env)
for conf_pipeline in data.get('pipelines', []):
pipeline = Pipeline(conf_pipeline['name'])
@ -137,7 +136,7 @@ class Scheduler(threading.Thread):
manager = globals()[conf_pipeline['manager']](self, pipeline)
pipeline.setManager(manager)
self.pipelines[conf_pipeline['name']] = pipeline
layout.pipelines[conf_pipeline['name']] = pipeline
manager.success_action = conf_pipeline.get('success')
manager.failure_action = conf_pipeline.get('failure')
manager.start_action = conf_pipeline.get('start')
@ -160,14 +159,13 @@ class Scheduler(threading.Thread):
# Make sure the template only contains valid pipelines
tpl = dict(
(pipe_name, project_template.get(pipe_name))
for pipe_name in self.pipelines.keys()
for pipe_name in layout.pipelines.keys()
if pipe_name in project_template
)
self.project_templates[project_template.get('name')] \
= tpl
project_templates[project_template.get('name')] = tpl
for config_job in data.get('jobs', []):
job = self.getJob(config_job['name'])
job = layout.getJob(config_job['name'])
# Be careful to only set attributes explicitly present on
# this job, to avoid squashing attributes set by a meta-job.
m = config_job.get('failure-message', None)
@ -190,7 +188,7 @@ class Scheduler(threading.Thread):
job.voting = m
fname = config_job.get('parameter-function', None)
if fname:
func = self._config_env.get(fname, None)
func = config_env.get(fname, None)
if not func:
raise Exception("Unable to find function %s" % fname)
job.parameter_function = func
@ -210,17 +208,17 @@ class Scheduler(threading.Thread):
add_jobs(job_tree, x)
if isinstance(job, dict):
for parent, children in job.items():
parent_tree = job_tree.addJob(self.getJob(parent))
parent_tree = job_tree.addJob(layout.getJob(parent))
add_jobs(parent_tree, children)
if isinstance(job, str):
job_tree.addJob(self.getJob(job))
job_tree.addJob(layout.getJob(job))
for config_project in data.get('projects', []):
project = Project(config_project['name'])
for requested_template in config_project.get('template', []):
# Fetch the template from 'project-templates'
tpl = self.project_templates.get(
tpl = project_templates.get(
requested_template.get('name'))
# Expand it with the project context
expanded = deep_format(tpl, requested_template)
@ -228,11 +226,11 @@ class Scheduler(threading.Thread):
# defined for this project
config_project.update(expanded)
self.projects[config_project['name']] = project
layout.projects[config_project['name']] = project
mode = config_project.get('merge-mode')
if mode and mode == 'cherry-pick':
project.merge_mode = model.CHERRY_PICK
for pipeline in self.pipelines.values():
for pipeline in layout.pipelines.values():
if pipeline.name in config_project:
job_tree = pipeline.addProject(project)
config_jobs = config_project[pipeline.name]
@ -240,10 +238,12 @@ class Scheduler(threading.Thread):
# All jobs should be defined at this point, get rid of
# metajobs so that getJob isn't doing anything weird.
self.metajobs = {}
layout.metajobs = {}
for pipeline in self.pipelines.values():
pipeline.manager._postConfig()
for pipeline in layout.pipelines.values():
pipeline.manager._postConfig(layout)
return layout
def _setupMerger(self):
if self.config.has_option('zuul', 'git_dir'):
@ -273,26 +273,10 @@ class Scheduler(threading.Thread):
self.merger = merger.Merger(self.trigger, merge_root, push_refs,
sshkey, merge_email, merge_name)
for project in self.projects.values():
for project in self.layout.projects.values():
url = self.trigger.getGitUrl(project)
self.merger.addProject(project, url)
def getJob(self, name):
if name in self.jobs:
return self.jobs[name]
job = Job(name)
if name.startswith('^'):
# This is a meta-job
regex = re.compile(name)
self.metajobs[regex] = job
else:
# Apply attributes from matching meta-jobs
for regex, metajob in self.metajobs.items():
if regex.match(name):
job.copy(metajob)
self.jobs[name] = job
return job
def setLauncher(self, launcher):
self.launcher = launcher
@ -406,7 +390,8 @@ class Scheduler(threading.Thread):
if self._reconfigure:
self.log.debug("Performing reconfiguration")
self._init()
self._parseConfig(self.config.get('zuul', 'layout_config'))
self.layout = self._parseConfig(
self.config.get('zuul', 'layout_config'))
self._setupMerger()
self._pause = False
self._reconfigure = False
@ -415,7 +400,7 @@ class Scheduler(threading.Thread):
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
for pipeline in self.pipelines.values():
for pipeline in self.layout.pipelines.values():
for build in pipeline.manager.building_jobs.keys():
self.log.debug("%s waiting on %s" % (pipeline.manager, build))
waiting = True
@ -464,7 +449,7 @@ class Scheduler(threading.Thread):
self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get()
self.log.debug("Processing trigger event %s" % event)
project = self.projects.get(event.project_name)
project = self.layout.projects.get(event.project_name)
if not project:
self.log.warning("Project %s not found" % event.project_name)
self.trigger_event_queue.task_done()
@ -479,7 +464,7 @@ class Scheduler(threading.Thread):
self.log.info("Fetching references for %s" % project)
self.merger.updateRepo(project)
for pipeline in self.pipelines.values():
for pipeline in self.layout.pipelines.values():
change = event.getChange(project, self.trigger)
if event.type == 'patchset-created':
pipeline.manager.removeOldVersionsOfChange(change)
@ -496,7 +481,7 @@ class Scheduler(threading.Thread):
self.log.debug("Fetching result event")
event_type, build = self.result_event_queue.get()
self.log.debug("Processing result event %s" % build)
for pipeline in self.pipelines.values():
for pipeline in self.layout.pipelines.values():
if event_type == 'started':
if pipeline.manager.onBuildStarted(build):
self.result_event_queue.task_done()
@ -519,10 +504,10 @@ class Scheduler(threading.Thread):
ret += ', queue length: %s' % self.trigger_event_queue.qsize()
ret += '</p>'
keys = self.pipelines.keys()
keys = self.layout.pipelines.keys()
keys.sort()
for key in keys:
pipeline = self.pipelines[key]
pipeline = self.layout.pipelines[key]
s = 'Pipeline: %s' % pipeline.name
ret += s + '\n'
ret += '-' * len(s) + '\n'
@ -552,10 +537,10 @@ class Scheduler(threading.Thread):
pipelines = []
data['pipelines'] = pipelines
keys = self.pipelines.keys()
keys = self.layout.pipelines.keys()
keys.sort()
for key in keys:
pipeline = self.pipelines[key]
pipeline = self.layout.pipelines[key]
pipelines.append(pipeline.formatStatusJSON())
return json.dumps(data)
@ -581,7 +566,7 @@ class BasePipelineManager(object):
def __str__(self):
return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
def _postConfig(self):
def _postConfig(self, layout):
self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
self.log.info(" Events:")
for e in self.event_filters:
@ -609,7 +594,7 @@ class BasePipelineManager(object):
for x in tree.job_trees:
log_jobs(x, indent + 2)
for p in self.sched.projects.values():
for p in layout.projects.values():
tree = self.pipeline.getJobTree(p)
if tree:
self.log.info(" %s" % p)
@ -1170,8 +1155,8 @@ class IndependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.IndependentPipelineManager")
changes_merge = False
def _postConfig(self):
super(IndependentPipelineManager, self)._postConfig()
def _postConfig(self, layout):
super(IndependentPipelineManager, self)._postConfig(layout)
change_queue = ChangeQueue(self.pipeline, dependent=False)
for project in self.pipeline.getProjects():
@ -1187,8 +1172,8 @@ class DependentPipelineManager(BasePipelineManager):
def __init__(self, *args, **kwargs):
super(DependentPipelineManager, self).__init__(*args, **kwargs)
def _postConfig(self):
super(DependentPipelineManager, self)._postConfig()
def _postConfig(self, layout):
super(DependentPipelineManager, self)._postConfig(layout)
self.buildChangeQueues()
def buildChangeQueues(self):

View File

@ -324,7 +324,7 @@ class Gerrit(object):
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
change.project = self.sched.projects[data['project']]
change.project = self.sched.layout.projects[data['project']]
change.branch = data['branch']
change.url = data['url']
max_ps = 0