438 lines
17 KiB
Python
438 lines
17 KiB
Python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
import Queue
|
|
import threading
|
|
import logging
|
|
import yaml
|
|
|
|
|
|
from model import Job, Change, Project, ChangeQueue, EventFilter
|
|
|
|
class Scheduler(object):
|
|
log = logging.getLogger("zuul.Scheduler")
|
|
|
|
def __init__(self, config):
|
|
self.wake_event = threading.Event()
|
|
self.queue_managers = {}
|
|
self.jobs = {}
|
|
self.projects = {}
|
|
self.launcher = None
|
|
self.trigger = None
|
|
|
|
self.trigger_event_queue = Queue.Queue()
|
|
self.result_event_queue = Queue.Queue()
|
|
|
|
self._parseConfig(config.get('zuul', 'layout_config'))
|
|
|
|
def _parseConfig(self, fp):
|
|
def toList(item):
|
|
if not item: return []
|
|
if type(item) == type([]):
|
|
return item
|
|
return [item]
|
|
|
|
if fp:
|
|
fp = os.path.expanduser(fp)
|
|
if not os.path.exists(fp):
|
|
raise Exception("Unable to read layout config file at %s" % fp)
|
|
fp = open(fp)
|
|
data = yaml.load(fp)
|
|
import pprint
|
|
pprint.pprint(data)
|
|
|
|
for config_queue in data['queue']:
|
|
manager = globals()[config_queue['manager']](self, config_queue['name'])
|
|
self.queue_managers[config_queue['name']] = manager
|
|
manager.success_action = config_queue.get('success')
|
|
manager.failure_action = config_queue.get('failure')
|
|
for trigger in toList(config_queue['trigger']):
|
|
approvals = {}
|
|
for approval_dict in toList(trigger.get('approval')):
|
|
for k, v in approval_dict.items():
|
|
approvals[k]=v
|
|
f = EventFilter(types=toList(trigger['event']),
|
|
branches=toList(trigger.get('branch')),
|
|
refs=toList(trigger.get('ref')),
|
|
approvals=approvals)
|
|
manager.event_filters.append(f)
|
|
|
|
for config_job in data['jobs']:
|
|
job = self.getJob(config_job['name'])
|
|
job.failure_message = config_job.get('failure-message', None)
|
|
job.success_message = config_job.get('success-message', None)
|
|
silent = config_job.get('silent', None)
|
|
if silent:
|
|
job.silent = True
|
|
branches = toList(config_job.get('branch'))
|
|
if branches:
|
|
f = EventFilter(branches=branches)
|
|
job.event_filters.append(f)
|
|
|
|
def add_jobs(job_tree, config_jobs):
|
|
for job in config_jobs:
|
|
if isinstance(job, list):
|
|
for x in job:
|
|
add_jobs(job_tree, x)
|
|
if isinstance(job, dict):
|
|
for parent, children in job.items():
|
|
parent_tree = job_tree.addJob(self.getJob(parent))
|
|
add_jobs(parent_tree, children)
|
|
if isinstance(job, str):
|
|
job_tree.addJob(self.getJob(job))
|
|
|
|
for config_project in data['projects']:
|
|
project = Project(config_project['name'])
|
|
self.projects[config_project['name']] = project
|
|
for qname in self.queue_managers.keys():
|
|
if config_project.has_key(qname):
|
|
print project, qname
|
|
job_tree = project.addQueue(qname)
|
|
config_jobs = config_project[qname]
|
|
add_jobs(job_tree, config_jobs)
|
|
|
|
|
|
# TODO(jeblair): check that we don't end up with jobs like
|
|
# "foo - bar" because a ':' is missing in the yaml for a dependent job
|
|
for manager in self.queue_managers.values():
|
|
manager._postConfig()
|
|
|
|
def getJob(self, name):
|
|
if self.jobs.has_key(name):
|
|
return self.jobs[name]
|
|
job = Job(name)
|
|
self.jobs[name] = job
|
|
return job
|
|
|
|
def setLauncher(self, launcher):
|
|
self.launcher = launcher
|
|
|
|
def setTrigger(self, trigger):
|
|
self.trigger = trigger
|
|
|
|
def addEvent(self, event):
|
|
self.log.debug("Adding trigger event: %s" % event)
|
|
self.trigger_event_queue.put(event)
|
|
self.wake_event.set()
|
|
|
|
def onBuildCompleted(self, build):
|
|
self.log.debug("Adding result event for build: %s" % build)
|
|
self.result_event_queue.put(build)
|
|
self.wake_event.set()
|
|
|
|
def run(self):
|
|
while True:
|
|
self.log.debug("Run handler sleeping")
|
|
self.wake_event.wait()
|
|
self.wake_event.clear()
|
|
self.log.debug("Run handler awake")
|
|
try:
|
|
if not self.trigger_event_queue.empty():
|
|
self.process_event_queue()
|
|
if not self.result_event_queue.empty():
|
|
self.process_result_queue()
|
|
except:
|
|
self.log.exception("Exception in run handler:")
|
|
|
|
def process_event_queue(self):
|
|
self.log.debug("Fetching trigger event")
|
|
event = self.trigger_event_queue.get()
|
|
self.log.debug("Processing trigger event %s" % event)
|
|
project = self.projects.get(event.project_name)
|
|
if not project:
|
|
self.log.warning("Project %s not found" % event.project_name)
|
|
return
|
|
|
|
for manager in self.queue_managers.values():
|
|
if not manager.eventMatches(event):
|
|
self.log.debug("Event %s ignored by %s" % (event, manager))
|
|
continue
|
|
if event.change_number:
|
|
change = Change(manager.name, project, event.branch,
|
|
event.change_number, event.patch_number,
|
|
event.refspec)
|
|
self.log.info("Adding %s, %s to to %s" % (
|
|
project, change, manager))
|
|
manager.addChange(change)
|
|
|
|
def process_result_queue(self):
|
|
self.log.debug("Fetching result event")
|
|
build = self.result_event_queue.get()
|
|
self.log.debug("Processing result event %s" % build)
|
|
for manager in self.queue_managers.values():
|
|
if manager.onBuildCompleted(build):
|
|
return
|
|
|
|
class BaseQueueManager(object):
|
|
log = logging.getLogger("zuul.BaseQueueManager")
|
|
|
|
def __init__(self, sched, name):
|
|
self.sched = sched
|
|
self.name = name
|
|
self.building_jobs = {}
|
|
self.event_filters = []
|
|
self.success_action = {}
|
|
self.failure_action = {}
|
|
|
|
def __str__(self):
|
|
return "<%s %s>" % (self.__class__.__name__, self.name)
|
|
|
|
def _postConfig(self):
|
|
self.log.info("Configured Queue Manager %s" % self.name)
|
|
self.log.info(" Events:")
|
|
for e in self.event_filters:
|
|
self.log.info(" %s" % e)
|
|
self.log.info(" Projects:")
|
|
def log_jobs(tree, indent=0):
|
|
istr = ' '+' '*indent
|
|
if tree.job:
|
|
efilters = ''
|
|
for e in tree.job.event_filters:
|
|
efilters += str(e)
|
|
if efilters:
|
|
efilters = ' '+efilters
|
|
self.log.info("%s%s%s" % (istr, repr(tree.job), efilters))
|
|
for x in tree.job_trees:
|
|
log_jobs(x, indent+2)
|
|
for p in self.sched.projects.values():
|
|
if p.hasQueue(self.name):
|
|
self.log.info(" %s" % p)
|
|
log_jobs(p.getJobTreeForQueue(self.name))
|
|
if self.success_action:
|
|
self.log.info(" On success:")
|
|
self.log.info(" %s" % self.success_action)
|
|
if self.failure_action:
|
|
self.log.info(" On failure:")
|
|
self.log.info(" %s" % self.failure_action)
|
|
|
|
|
|
def eventMatches(self, event):
|
|
for ef in self.event_filters:
|
|
print ef
|
|
if ef.matches(event):
|
|
return True
|
|
return False
|
|
|
|
|
|
def addChange(self, change):
|
|
self.log.debug("Adding change %s" % change)
|
|
self.launchJobs(change)
|
|
|
|
def launchJobs(self, change):
|
|
self.log.debug("Launching jobs for change %s" % change)
|
|
for job in change.findJobsToRun():
|
|
self.log.debug("Found job %s for change %s" % (job, change))
|
|
try:
|
|
build = self.sched.launcher.launch(job, change)
|
|
self.building_jobs[build] = change
|
|
self.log.debug("Adding build %s of job %s to change %s" % (
|
|
build, job, change))
|
|
change.addBuild(build)
|
|
except:
|
|
self.log.exception("Exception while launching job %s for change %s:" % (
|
|
job, change))
|
|
|
|
def onBuildCompleted(self, build):
|
|
self.log.debug("Build %s completed" % build)
|
|
if not self.building_jobs.has_key(build):
|
|
self.log.warning("Build %s not found (may have been canceled)" % (
|
|
build))
|
|
# Or triggered externally, or triggered before zuul started,
|
|
# or restarted
|
|
return False
|
|
change = self.building_jobs[build]
|
|
self.log.debug("Found change %s which triggered completed build %s" % (
|
|
change, build))
|
|
|
|
del self.building_jobs[build]
|
|
|
|
change.setResult(build)
|
|
self.log.info("Change %s status is now:\n %s" % (
|
|
change, change.formatStatus()))
|
|
|
|
if change.areAllJobsComplete():
|
|
self.log.debug("All jobs for change %s are complete" % change)
|
|
self.possiblyReportChange(change)
|
|
else:
|
|
# There may be jobs that depended on jobs that are now complete
|
|
self.log.debug("All jobs for change %s are not yet complete" % (
|
|
change))
|
|
self.launchJobs(change)
|
|
return True
|
|
|
|
def possiblyReportChange(self, change):
|
|
self.log.debug("Possibly reporting change %s" % change)
|
|
self.reportChange(change)
|
|
|
|
def reportChange(self, change):
|
|
self.log.debug("Reporting change %s" % change)
|
|
ret = None
|
|
if change.didAllJobsSucceed():
|
|
action = self.success_action
|
|
else:
|
|
action = self.failure_action
|
|
try:
|
|
self.log.info("Reporting change %s, action: %s" % (
|
|
change, action))
|
|
ret = self.sched.trigger.report(change, change.formatReport(),
|
|
action)
|
|
if ret:
|
|
self.log.error("Reporting change %s received: %s" % (
|
|
change, ret))
|
|
print ret
|
|
except:
|
|
self.log.exception("Exception while reporting:")
|
|
return ret
|
|
|
|
class IndependentQueueManager(BaseQueueManager):
|
|
log = logging.getLogger("zuul.IndependentQueueManager")
|
|
pass
|
|
|
|
class DependentQueueManager(BaseQueueManager):
|
|
log = logging.getLogger("zuul.DependentQueueManager")
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(DependentQueueManager, self).__init__(*args, **kwargs)
|
|
self.change_queues = []
|
|
|
|
def _postConfig(self):
|
|
super(DependentQueueManager, self)._postConfig()
|
|
self.buildChangeQueues()
|
|
|
|
def buildChangeQueues(self):
|
|
self.log.debug("Building shared change queues")
|
|
change_queues = []
|
|
|
|
for project in self.sched.projects.values():
|
|
if project.hasQueue(self.name):
|
|
change_queue = ChangeQueue(self.name)
|
|
change_queue.addProject(project)
|
|
change_queues.append(change_queue)
|
|
self.log.debug("Created queue: %s" % change_queue)
|
|
|
|
self.log.debug("Combining shared queues")
|
|
new_change_queues = []
|
|
for a in change_queues:
|
|
merged_a = False
|
|
for b in new_change_queues:
|
|
if not a.getJobs().isdisjoint(b.getJobs()):
|
|
self.log.debug("Merging queue %s into %s" % (a, b))
|
|
b.mergeChangeQueue(a)
|
|
merged_a = True
|
|
break # this breaks out of 'for b' and continues 'for a'
|
|
if not merged_a:
|
|
self.log.debug("Keeping queue %s" % (a))
|
|
new_change_queues.append(a)
|
|
|
|
self.change_queues = new_change_queues
|
|
self.log.info(" Shared change queues:")
|
|
for x in self.change_queues:
|
|
self.log.info(" %s" % x)
|
|
|
|
def getQueue(self, project):
|
|
for queue in self.change_queues:
|
|
if project in queue.projects:
|
|
return queue
|
|
self.log.error("Unable to find change queue for project %s" % project)
|
|
|
|
def addChange(self, change):
|
|
self.log.debug("Adding change %s" % change)
|
|
change_queue = self.getQueue(change.project)
|
|
self.log.debug("Adding change %s to queue %s" % (change, change_queue))
|
|
change_queue.enqueueChange(change)
|
|
super(DependentQueueManager, self).addChange(change)
|
|
|
|
def _getDependentChanges(self, change):
|
|
changes = []
|
|
while change.change_ahead:
|
|
changes.append(change.change_ahead)
|
|
change = change.change_ahead
|
|
self.log.info("Change %s depends on changes %s" % (change, changes))
|
|
return changes
|
|
|
|
def launchJobs(self, change):
|
|
self.log.debug("Launching jobs for change %s" % change)
|
|
dependent_changes = self._getDependentChanges(change)
|
|
for job in change.findJobsToRun():
|
|
self.log.debug("Found job %s for change %s" % (job, change))
|
|
try:
|
|
build = self.sched.launcher.launch(job, change,
|
|
dependent_changes)
|
|
self.building_jobs[build] = change
|
|
self.log.debug("Adding build %s of job %s to change %s" % (
|
|
build, job, change))
|
|
change.addBuild(build)
|
|
except:
|
|
self.log.exception("Exception while launching job %s for change %s:" % (
|
|
job, change))
|
|
if change.change_behind:
|
|
self.log.debug("Launching jobs for change %s, behind change %s" % (
|
|
change.change_behind, change))
|
|
self.launchJobs(change.change_behind)
|
|
|
|
def cancelJobs(self, change):
|
|
self.log.debug("Cancel jobs for change %s" % change)
|
|
to_remove = []
|
|
change.resetAllBuilds()
|
|
for build, build_change in self.building_jobs.items():
|
|
if build_change == change:
|
|
self.log.debug("Found build %s for change %s to cancel" % (
|
|
build, change))
|
|
try:
|
|
self.sched.launcher.cancel(build)
|
|
except:
|
|
self.log.exception("Exception while canceling build %s for change %s" % (
|
|
build, change))
|
|
to_remove.append(build)
|
|
for build in to_remove:
|
|
self.log.debug("Removing build %s from running builds" % build)
|
|
del self.building_jobs[build]
|
|
if change.change_behind:
|
|
self.log.debug("Canceling jobs for change %s, behind change %s" % (
|
|
change.change_behind, change))
|
|
self.cancelJobs(change.change_behind)
|
|
|
|
def possiblyReportChange(self, change):
|
|
self.log.debug("Possibly reporting change %s" % change)
|
|
if not change.change_ahead:
|
|
self.log.debug("Change %s is at the front of the queue, reporting" % (
|
|
change))
|
|
ret = self.reportChange(change)
|
|
self.log.debug("Removing reported change %s from queue" % change)
|
|
change.delete()
|
|
change.queue.dequeueChange(change)
|
|
merged = (not ret)
|
|
if merged:
|
|
merged = self.sched.trigger.isMerged(change)
|
|
succeeded = change.didAllJobsSucceed()
|
|
self.log.info("Reported change %s status: all-succeeded: %s, merged: %s" % (
|
|
change, succeeded, merged))
|
|
|
|
if not (succeeded and merged):
|
|
self.log.debug("Reported change %s failed tests or failed to merge" % (
|
|
change))
|
|
# The merge or test failed, re-run all jobs behind this one
|
|
if change.change_behind:
|
|
self.log.info("Canceling/relaunching jobs for change %s behind failed change %s" % (
|
|
change.change_behind, change))
|
|
self.cancelJobs(change.change_behind)
|
|
self.launchJobs(change.change_behind)
|
|
# If the change behind this is ready, notify
|
|
if (change.change_behind and
|
|
change.change_behind.areAllJobsComplete()):
|
|
self.log.info("Change %s behind change %s is ready, possibly reporting" % (
|
|
change.change_behind, change))
|
|
self.possiblyReportChange(change.change_behind)
|