scheduler.py cleanup

Fixes bug #1026429

Change-Id: Ifc72e63df9280533bd1b511d4b08bed79176a73e
This commit is contained in:
Zhongyue Luo 2012-07-19 11:03:56 +08:00
parent f950b4a4a4
commit 1c860d7f13
2 changed files with 62 additions and 59 deletions

View File

@ -1,2 +1,3 @@
James E. Blair <jeblair@hp.com>
Clark Boylan <clark.boylan@gmail.com>
Zhongyue Luo <lzyeval@gmail.com>

View File

@ -12,13 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import Queue
import threading
import logging
import re
import yaml
import os
import pickle
import Queue
import re
import threading
import yaml
from model import Job, Change, Project, ChangeQueue, EventFilter
@ -314,8 +314,8 @@ class Scheduler(threading.Thread):
self.log.debug("Event %s ignored by %s" % (event, manager))
continue
change = Change(manager.name, project, event)
self.log.info("Adding %s, %s to to %s" % (
project, change, manager))
self.log.info("Adding %s, %s to to %s" %
(project, change, manager))
manager.addChange(change)
def process_result_queue(self):
@ -379,8 +379,10 @@ class BaseQueueManager(object):
hold = ''
if tree.job.hold_following_changes:
hold = ' [hold]'
self.log.info("%s%s%s%s" % (istr, repr(tree.job),
efilters, hold))
self.log.info("%s%s%s%s" % (istr,
repr(tree.job),
efilters,
hold))
for x in tree.job_trees:
log_jobs(x, indent + 2)
@ -420,13 +422,13 @@ class BaseQueueManager(object):
def _addChange(self, change):
if self.start_action:
try:
self.log.info("Reporting start, action %s change %s" % (
self.start_action, change))
self.log.info("Reporting start, action %s change %s" %
(self.start_action, change))
msg = "Starting %s jobs." % self.name
ret = self.sched.trigger.report(change, msg, self.start_action)
if ret:
self.log.error("Reporting change start %s received: %s" % (
change, ret))
self.log.error("Reporting change start %s received: %s" %
(change, ret))
except:
self.log.exception("Exception while reporting start:")
self.launchJobs(change)
@ -438,12 +440,12 @@ class BaseQueueManager(object):
try:
build = self.sched.launcher.launch(job, change)
self.building_jobs[build] = change
self.log.debug("Adding build %s of job %s to change %s" % (
build, job, change))
self.log.debug("Adding build %s of job %s to change %s" %
(build, job, change))
change.addBuild(build)
except:
self.log.exception("Exception while launching job %s \
for change %s:" % (job, change))
self.log.exception("Exception while launching job %s "
"for change %s:" % (job, change))
def updateBuildDescriptions(self, build_set):
for build in build_set.getBuilds():
@ -474,22 +476,22 @@ for change %s:" % (job, change))
# or restarted
return False
change = self.building_jobs[build]
self.log.debug("Found change %s which triggered completed build %s" % (
change, build))
self.log.debug("Found change %s which triggered completed build %s" %
(change, build))
del self.building_jobs[build]
change.setResult(build)
self.log.info("Change %s status is now:\n %s" % (
change, change.formatStatus()))
self.log.info("Change %s status is now:\n %s" %
(change, change.formatStatus()))
if change.areAllJobsComplete():
self.log.debug("All jobs for change %s are complete" % change)
self.possiblyReportChange(change)
else:
# There may be jobs that depended on jobs that are now complete
self.log.debug("All jobs for change %s are not yet complete" % (
change))
self.log.debug("All jobs for change %s are not yet complete" %
(change))
self.launchJobs(change)
self.updateBuildDescriptions(build.build_set)
@ -511,13 +513,14 @@ for change %s:" % (job, change))
action = self.failure_action
change.setReportedResult('FAILURE')
try:
self.log.info("Reporting change %s, action: %s" % (
change, action))
ret = self.sched.trigger.report(change, change.formatReport(),
self.log.info("Reporting change %s, action: %s" %
(change, action))
ret = self.sched.trigger.report(change,
change.formatReport(),
action)
if ret:
self.log.error("Reporting change %s received: %s" % (
change, ret))
self.log.error("Reporting change %s received: %s" %
(change, ret))
else:
change.reported = True
except:
@ -543,7 +546,6 @@ for change %s:" % (job, change))
class IndependentQueueManager(BaseQueueManager):
log = logging.getLogger("zuul.IndependentQueueManager")
pass
class DependentQueueManager(BaseQueueManager):
@ -600,8 +602,8 @@ class DependentQueueManager(BaseQueueManager):
self.log.debug("Adding change %s" % change)
change_queue = self.getQueue(change.project)
if change_queue:
self.log.debug("Adding change %s to queue %s" % (
change, change_queue))
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
change_queue.enqueueChange(change)
self._addChange(change)
@ -621,18 +623,19 @@ class DependentQueueManager(BaseQueueManager):
for job in change.findJobsToRun():
self.log.debug("Found job %s for change %s" % (job, change))
try:
build = self.sched.launcher.launch(job, change,
build = self.sched.launcher.launch(job,
change,
dependent_changes)
self.building_jobs[build] = change
self.log.debug("Adding build %s of job %s to change %s" % (
build, job, change))
self.log.debug("Adding build %s of job %s to change %s" %
(build, job, change))
change.addBuild(build)
except:
self.log.exception("Exception while launching job %s \
for change %s:" % (job, change))
self.log.exception("Exception while launching job %s "
"for change %s:" % (job, change))
if change.change_behind:
self.log.debug("Launching jobs for change %s, behind change %s" % (
change.change_behind, change))
self.log.debug("Launching jobs for change %s, behind change %s" %
(change.change_behind, change))
self.launchJobs(change.change_behind)
def cancelJobs(self, change, prime=True):
@ -642,21 +645,21 @@ for change %s:" % (job, change))
change.resetAllBuilds()
for build, build_change in self.building_jobs.items():
if build_change == change:
self.log.debug("Found build %s for change %s to cancel" % (
build, change))
self.log.debug("Found build %s for change %s to cancel" %
(build, change))
try:
self.sched.launcher.cancel(build)
except:
self.log.exception("Exception while canceling build %s \
for change %s" % (build, change))
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, change))
to_remove.append(build)
for build in to_remove:
self.log.debug("Removing build %s from running builds" % build)
build.result = 'CANCELED'
del self.building_jobs[build]
if change.change_behind:
self.log.debug("Canceling jobs for change %s, \
behind change %s" % (change.change_behind, change))
self.log.debug("Canceling jobs for change %s, behind change %s" %
(change.change_behind, change))
self.cancelJobs(change.change_behind, prime=prime)
def onBuildCompleted(self, build):
@ -668,16 +671,16 @@ behind change %s" % (change.change_behind, change))
# will need to be retested. To free up resources cancel the builds
# behind this one as they will be rerun anyways.
self.cancelJobs(change.change_behind, prime=False)
self.log.debug("Canceling builds behind change: %s due to"
" failure." % change)
self.log.debug("Canceling builds behind change: %s due to "
"failure." % change)
return True
def possiblyReportChange(self, change):
self.log.debug("Possibly reporting change %s" % change)
change_behind = change.change_behind
if not change.change_ahead:
self.log.debug("Change %s is at the front of the queue, \
reporting" % (change))
self.log.debug("Change %s is at the front of the queue, "
"reporting" % (change))
ret = self.reportChange(change)
self.log.debug("Removing reported change %s from queue" % change)
change.delete()
@ -685,24 +688,23 @@ reporting" % (change))
if merged:
merged = self.sched.trigger.isMerged(change)
succeeded = change.didAllJobsSucceed()
self.log.info("Reported change %s status: all-succeeded: %s, \
merged: %s" % (change, succeeded, merged))
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (change, succeeded, merged))
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed \
to merge" % (change))
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (change))
# The merge or test failed, re-run all jobs behind this one
if change_behind:
self.log.info("Canceling/relaunching jobs for change %s \
behind failed change %s" % (
change_behind, change))
self.log.info("Canceling/relaunching jobs for change %s "
"behind failed change %s" %
(change_behind, change))
self.cancelJobs(change_behind)
self.launchJobs(change_behind)
# If the change behind this is ready, notify
if (change_behind and
change_behind.areAllJobsComplete()):
self.log.info("Change %s behind change %s is ready, \
possibly reporting" % (change_behind, change))
if change_behind and change_behind.areAllJobsComplete():
self.log.info("Change %s behind change %s is ready, "
"possibly reporting" % (change_behind, change))
self.possiblyReportChange(change_behind)
def getChangesInQueue(self):