# Copyright 2012 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import Queue import threading import logging import yaml from model import Job, Change, Project, ChangeQueue, EventFilter class Scheduler(object): log = logging.getLogger("zuul.Scheduler") def __init__(self, config): self.wake_event = threading.Event() self.queue_managers = {} self.jobs = {} self.projects = {} self.launcher = None self.trigger = None self.trigger_event_queue = Queue.Queue() self.result_event_queue = Queue.Queue() self._parseConfig(config.get('zuul', 'layout_config')) def _parseConfig(self, fp): def toList(item): if not item: return [] if type(item) == type([]): return item return [item] if fp: fp = os.path.expanduser(fp) if not os.path.exists(fp): raise Exception("Unable to read layout config file at %s" % fp) fp = open(fp) data = yaml.load(fp) import pprint pprint.pprint(data) for config_queue in data['queue']: manager = globals()[config_queue['manager']](self, config_queue['name']) self.queue_managers[config_queue['name']] = manager manager.success_action = config_queue.get('success') manager.failure_action = config_queue.get('failure') for trigger in toList(config_queue['trigger']): approvals = {} for approval_dict in toList(trigger.get('approval')): for k, v in approval_dict.items(): approvals[k]=v f = EventFilter(types=toList(trigger['event']), branches=toList(trigger.get('branch')), refs=toList(trigger.get('ref')), approvals=approvals) manager.event_filters.append(f) for config_job in data['jobs']: job = self.getJob(config_job['name']) job.failure_message = config_job.get('failure-message', None) job.success_message = config_job.get('success-message', None) silent = config_job.get('silent', None) if silent: job.silent = True branches = toList(config_job.get('branch')) if branches: f = EventFilter(branches=branches) job.event_filters.append(f) def add_jobs(job_tree, config_jobs): for job in config_jobs: if isinstance(job, list): for x in job: add_jobs(job_tree, x) if isinstance(job, dict): for parent, children in job.items(): parent_tree = job_tree.addJob(self.getJob(parent)) add_jobs(parent_tree, children) if isinstance(job, str): job_tree.addJob(self.getJob(job)) for config_project in data['projects']: project = Project(config_project['name']) self.projects[config_project['name']] = project for qname in self.queue_managers.keys(): if config_project.has_key(qname): print project, qname job_tree = project.addQueue(qname) config_jobs = config_project[qname] add_jobs(job_tree, config_jobs) # TODO(jeblair): check that we don't end up with jobs like # "foo - bar" because a ':' is missing in the yaml for a dependent job for manager in self.queue_managers.values(): manager._postConfig() def getJob(self, name): if self.jobs.has_key(name): return self.jobs[name] job = Job(name) self.jobs[name] = job return job def setLauncher(self, launcher): self.launcher = launcher def setTrigger(self, trigger): self.trigger = trigger def addEvent(self, event): self.log.debug("Adding trigger event: %s" % event) self.trigger_event_queue.put(event) self.wake_event.set() def onBuildCompleted(self, build): self.log.debug("Adding result event for build: %s" % build) self.result_event_queue.put(build) self.wake_event.set() def run(self): while True: self.log.debug("Run handler sleeping") self.wake_event.wait() self.wake_event.clear() self.log.debug("Run handler awake") try: if not self.trigger_event_queue.empty(): self.process_event_queue() if not self.result_event_queue.empty(): self.process_result_queue() except: self.log.exception("Exception in run handler:") def process_event_queue(self): self.log.debug("Fetching trigger event") event = self.trigger_event_queue.get() self.log.debug("Processing trigger event %s" % event) project = self.projects.get(event.project_name) if not project: self.log.warning("Project %s not found" % event.project_name) return for manager in self.queue_managers.values(): if not manager.eventMatches(event): self.log.debug("Event %s ignored by %s" % (event, manager)) continue if event.change_number: change = Change(manager.name, project, event.change_number, event.patch_number) self.log.info("Adding %s, %s to to %s" % ( project, change, manager)) manager.addChange(change) def process_result_queue(self): self.log.debug("Fetching result event") build = self.result_event_queue.get() self.log.debug("Processing result event %s" % build) for manager in self.queue_managers.values(): if manager.onBuildCompleted(build): return class BaseQueueManager(object): log = logging.getLogger("zuul.BaseQueueManager") def __init__(self, sched, name): self.sched = sched self.name = name self.building_jobs = {} self.event_filters = [] self.success_action = {} self.failure_action = {} def __str__(self): return "<%s %s>" % (self.__class__.__name__, self.name) def _postConfig(self): self.log.info("Configured Queue Manager %s" % self.name) self.log.info(" Events:") for e in self.event_filters: self.log.info(" %s" % e) self.log.info(" Projects:") def log_jobs(tree, indent=0): istr = ' '+' '*indent if tree.job: efilters = '' for e in tree.job.event_filters: efilters += str(e) if efilters: efilters = ' '+efilters self.log.info("%s%s%s" % (istr, repr(tree.job), efilters)) for x in tree.job_trees: log_jobs(x, indent+2) for p in self.sched.projects.values(): if p.hasQueue(self.name): self.log.info(" %s" % p) log_jobs(p.getJobTreeForQueue(self.name)) if self.success_action: self.log.info(" On success:") self.log.info(" %s" % self.success_action) if self.failure_action: self.log.info(" On failure:") self.log.info(" %s" % self.failure_action) def eventMatches(self, event): for ef in self.event_filters: print ef if ef.matches(event): return True return False def addChange(self, change): self.log.debug("Adding change %s" % change) self.launchJobs(change) def launchJobs(self, change): self.log.debug("Launching jobs for change %s" % change) for job in change.findJobsToRun(): self.log.debug("Found job %s for change %s" % (job, change)) try: build = self.sched.launcher.launch(job, change) self.building_jobs[build] = change self.log.debug("Adding build %s of job %s to change %s" % ( build, job, change)) change.addBuild(build) except: self.log.exception("Exception while launching job %s for change %s:" % ( job, change)) def onBuildCompleted(self, build): self.log.debug("Build %s completed" % build) if not self.building_jobs.has_key(build): self.log.warning("Build %s not found (may have been canceled)" % ( build)) # Or triggered externally, or triggered before zuul started, # or restarted return False change = self.building_jobs[build] self.log.debug("Found change %s which triggered completed build %s" % ( change, build)) del self.building_jobs[build] change.setResult(build) self.log.info("Change %s status is now:\n %s" % ( change, change.formatStatus())) if change.areAllJobsComplete(): self.log.debug("All jobs for change %s are complete" % change) self.possiblyReportChange(change) else: # There may be jobs that depended on jobs that are now complete self.log.debug("All jobs for change %s are not yet complete" % ( change)) self.launchJobs(change) return True def possiblyReportChange(self, change): self.log.debug("Possibly reporting change %s" % change) self.reportChange(change) def reportChange(self, change): self.log.debug("Reporting change %s" % change) ret = None if change.didAllJobsSucceed(): action = self.success_action else: action = self.failure_action try: self.log.info("Reporting change %s, action: %s" % ( change, action)) ret = self.sched.trigger.report(change, change.formatReport(), action) if ret: self.log.error("Reporting change %s received: %s" % ( change, ret)) print ret except: self.log.exception("Exception while reporting:") return ret class IndependentQueueManager(BaseQueueManager): log = logging.getLogger("zuul.IndependentQueueManager") pass class DependentQueueManager(BaseQueueManager): log = logging.getLogger("zuul.DependentQueueManager") def __init__(self, *args, **kwargs): super(DependentQueueManager, self).__init__(*args, **kwargs) self.change_queues = [] def _postConfig(self): super(DependentQueueManager, self)._postConfig() self.buildChangeQueues() def buildChangeQueues(self): self.log.debug("Building shared change queues") change_queues = [] for project in self.sched.projects.values(): if project.hasQueue(self.name): change_queue = ChangeQueue(self.name) change_queue.addProject(project) change_queues.append(change_queue) self.log.debug("Created queue: %s" % change_queue) self.log.debug("Combining shared queues") new_change_queues = [] for a in change_queues: merged_a = False for b in new_change_queues: if not a.getJobs().isdisjoint(b.getJobs()): self.log.debug("Merging queue %s into %s" % (a, b)) b.mergeChangeQueue(a) merged_a = True break # this breaks out of 'for b' and continues 'for a' if not merged_a: self.log.debug("Keeping queue %s" % (a)) new_change_queues.append(a) self.change_queues = new_change_queues self.log.info(" Shared change queues:") for x in self.change_queues: self.log.info(" %s" % x) def getQueue(self, project): for queue in self.change_queues: if project in queue.projects: return queue self.log.error("Unable to find change queue for project %s" % project) def addChange(self, change): self.log.debug("Adding change %s" % change) change_queue = self.getQueue(change.project) self.log.debug("Adding change %s to queue %s" % (change, change_queue)) change_queue.enqueueChange(change) super(DependentQueueManager, self).addChange(change) def _getDependentChanges(self, change): changes = [] while change.change_ahead: changes.append(change.change_ahead) change = change.change_ahead self.log.info("Change %s depends on changes %s" % (change, changes)) return changes def launchJobs(self, change): self.log.debug("Launching jobs for change %s" % change) dependent_changes = self._getDependentChanges(change) for job in change.findJobsToRun(): self.log.debug("Found job %s for change %s" % (job, change)) try: build = self.sched.launcher.launch(job, change, dependent_changes) self.building_jobs[build] = change self.log.debug("Adding build %s of job %s to change %s" % ( build, job, change)) change.addBuild(build) except: self.log.exception("Exception while launching job %s for change %s:" % ( job, change)) if change.change_behind: self.log.debug("Launching jobs for change %s, behind change %s" % ( change.change_behind, change)) self.launchJobs(change.change_behind) def cancelJobs(self, change): self.log.debug("Cancel jobs for change %s" % change) to_remove = [] change.resetAllBuilds() for build, build_change in self.building_jobs.items(): if build_change == change: self.log.debug("Found build %s for change %s to cancel" % ( build, change)) try: self.sched.launcher.cancel(build) except: self.log.exception("Exception while canceling build %s for change %s" % ( build, change)) to_remove.append(build) for build in to_remove: self.log.debug("Removing build %s from running builds" % build) del self.building_jobs[build] if change.change_behind: self.log.debug("Canceling jobs for change %s, behind change %s" % ( change.change_behind, change)) self.cancelJobs(change.change_behind) def possiblyReportChange(self, change): self.log.debug("Possibly reporting change %s" % change) if not change.change_ahead: self.log.debug("Change %s is at the front of the queue, reporting" % ( change)) ret = self.reportChange(change) self.log.debug("Removing reported change %s from queue" % change) change.delete() change.queue.dequeueChange(change) merged = (not ret) if merged: merged = self.sched.trigger.isMerged(change) succeeded = change.didAllJobsSucceed() self.log.info("Reported change %s status: all-succeeded: %s, merged: %s" % ( change, succeeded, merged)) if not (succeeded and merged): self.log.debug("Reported change %s failed tests or failed to merge" % ( change)) # The merge or test failed, re-run all jobs behind this one if change.change_behind: self.log.info("Canceling/relaunching jobs for change %s behind failed change %s" % ( change.change_behind, change)) self.cancelJobs(change.change_behind) self.launchJobs(change.change_behind) # If the change behind this is ready, notify if (change.change_behind and change.change_behind.areAllJobsComplete()): self.log.info("Change %s behind change %s is ready, possibly reporting" % ( change.change_behind, change)) self.possiblyReportChange(change.change_behind)