Add Jenkins cleanup thread.
It searches for outstanding builds that haven't reported back or otherwise can't be found by Jenkins, and marks them complete with a result of "LOST". This should help to avoid deadlocks where Zuul waits forever to hear back from Jenkins after missing a notification. Add pyflakes to tox.ini. Change-Id: I26d3fbf375e82e224448ec3337f9cc97deeccd56
This commit is contained in:
parent
1e8dd893ed
commit
ff986a134b
|
@ -3,5 +3,4 @@ dist
|
||||||
zuul.egg-info
|
zuul.egg-info
|
||||||
MANIFEST
|
MANIFEST
|
||||||
.tox
|
.tox
|
||||||
|
|
||||||
*.pyc
|
*.pyc
|
||||||
|
|
6
tox.ini
6
tox.ini
|
@ -1,6 +1,10 @@
|
||||||
[tox]
|
[tox]
|
||||||
envlist = pep8
|
envlist = pep8, pyflakes
|
||||||
|
|
||||||
[testenv:pep8]
|
[testenv:pep8]
|
||||||
deps = pep8
|
deps = pep8
|
||||||
commands = pep8 --repeat --show-source zuul zuul-server setup.py
|
commands = pep8 --repeat --show-source zuul zuul-server setup.py
|
||||||
|
|
||||||
|
[testenv:pyflakes]
|
||||||
|
deps = pyflakes
|
||||||
|
commands = pyflakes zuul zuul-server setup.py
|
||||||
|
|
|
@ -25,9 +25,14 @@ import json
|
||||||
import urllib2 # for extending jenkins lib
|
import urllib2 # for extending jenkins lib
|
||||||
import logging
|
import logging
|
||||||
import pprint
|
import pprint
|
||||||
|
import time
|
||||||
|
|
||||||
from zuul.model import Build
|
from zuul.model import Build
|
||||||
|
|
||||||
|
# The amount of time we tolerate a change in build status without
|
||||||
|
# receiving a notification
|
||||||
|
JENKINS_GRACE_TIME = 60
|
||||||
|
|
||||||
|
|
||||||
class JenkinsCallback(threading.Thread):
|
class JenkinsCallback(threading.Thread):
|
||||||
log = logging.getLogger("zuul.JenkinsCallback")
|
log = logging.getLogger("zuul.JenkinsCallback")
|
||||||
|
@ -68,6 +73,24 @@ class JenkinsCallback(threading.Thread):
|
||||||
self.jenkins.onBuildStarted(uuid, url, number)
|
self.jenkins.onBuildStarted(uuid, url, number)
|
||||||
|
|
||||||
|
|
||||||
|
class JenkinsCleanup(threading.Thread):
|
||||||
|
""" A thread that checks to see if outstanding builds have
|
||||||
|
completed without reporting back. """
|
||||||
|
log = logging.getLogger("zuul.JenkinsCleanup")
|
||||||
|
|
||||||
|
def __init__(self, jenkins):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.jenkins = jenkins
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
time.sleep(180)
|
||||||
|
try:
|
||||||
|
self.jenkins.lookForLostBuilds()
|
||||||
|
except:
|
||||||
|
self.log.exception("Exception checking builds:")
|
||||||
|
|
||||||
|
|
||||||
STOP_BUILD = 'job/%(name)s/%(number)s/stop'
|
STOP_BUILD = 'job/%(name)s/%(number)s/stop'
|
||||||
CANCEL_QUEUE = 'queue/item/%(number)s/cancelQueue'
|
CANCEL_QUEUE = 'queue/item/%(number)s/cancelQueue'
|
||||||
BUILD_INFO = 'job/%(name)s/%(number)s/api/json?depth=0'
|
BUILD_INFO = 'job/%(name)s/%(number)s/api/json?depth=0'
|
||||||
|
@ -137,6 +160,8 @@ class Jenkins(object):
|
||||||
self.jenkins = ExtendedJenkins(server, user, apikey)
|
self.jenkins = ExtendedJenkins(server, user, apikey)
|
||||||
self.callback_thread = JenkinsCallback(self)
|
self.callback_thread = JenkinsCallback(self)
|
||||||
self.callback_thread.start()
|
self.callback_thread.start()
|
||||||
|
self.cleanup_thread = JenkinsCleanup(self)
|
||||||
|
self.cleanup_thread.start()
|
||||||
|
|
||||||
def launch(self, job, change, dependent_changes=[]):
|
def launch(self, job, change, dependent_changes=[]):
|
||||||
self.log.info("Launch job %s for change %s with dependent changes %s" %
|
self.log.info("Launch job %s for change %s with dependent changes %s" %
|
||||||
|
@ -166,6 +191,20 @@ class Jenkins(object):
|
||||||
raise
|
raise
|
||||||
return build
|
return build
|
||||||
|
|
||||||
|
def findBuildInQueue(self, build):
|
||||||
|
for item in self.jenkins.get_queue_info():
|
||||||
|
if 'actions' not in item:
|
||||||
|
continue
|
||||||
|
for action in item['actions']:
|
||||||
|
if 'parameters' not in action:
|
||||||
|
continue
|
||||||
|
parameters = action['parameters']
|
||||||
|
for param in parameters:
|
||||||
|
if (param['name'] == 'UUID' and
|
||||||
|
build.uuid == param['value']):
|
||||||
|
return item
|
||||||
|
return False
|
||||||
|
|
||||||
def cancel(self, build):
|
def cancel(self, build):
|
||||||
self.log.info("Cancel build %s for job %s" % (build, build.job))
|
self.log.info("Cancel build %s for job %s" % (build, build.job))
|
||||||
if build.number:
|
if build.number:
|
||||||
|
@ -177,27 +216,20 @@ class Jenkins(object):
|
||||||
self.log.debug("Build %s has not started yet" % build)
|
self.log.debug("Build %s has not started yet" % build)
|
||||||
|
|
||||||
self.log.debug("Looking for build %s in queue" % build)
|
self.log.debug("Looking for build %s in queue" % build)
|
||||||
for item in self.jenkins.get_queue_info():
|
item = self.findBuildInQueue(build)
|
||||||
if 'actions' not in item:
|
if item:
|
||||||
continue
|
self.log.debug("Found queue item %s for build %s" % (
|
||||||
for action in item['actions']:
|
item['id'], build))
|
||||||
if 'parameters' not in action:
|
try:
|
||||||
continue
|
self.jenkins.cancel_queue(item['id'])
|
||||||
parameters = action['parameters']
|
self.log.debug(
|
||||||
for param in parameters:
|
"Canceled queue item %s for build %s" % (
|
||||||
if (param['name'] == 'UUID' and
|
item['id'], build))
|
||||||
build.uuid == param['value']):
|
return
|
||||||
self.log.debug("Found queue item %s for build %s" % (
|
except:
|
||||||
item['id'], build))
|
self.log.exception("Exception canceling queue item %s \
|
||||||
try:
|
for build %s" % (item['id'], build))
|
||||||
self.jenkins.cancel_queue(item['id'])
|
|
||||||
self.log.debug(
|
|
||||||
"Canceled queue item %s for build %s" % (
|
|
||||||
item['id'], build))
|
|
||||||
return
|
|
||||||
except:
|
|
||||||
self.log.exception("Exception canceling queue \
|
|
||||||
item %s for build %s" % (item['id'], build))
|
|
||||||
self.log.debug("Still unable to find build %s to cancel" % build)
|
self.log.debug("Still unable to find build %s to cancel" % build)
|
||||||
if build.number:
|
if build.number:
|
||||||
self.log.debug("Build %s has just started" % build)
|
self.log.debug("Build %s has just started" % build)
|
||||||
|
@ -231,3 +263,57 @@ item %s for build %s" % (item['id'], build))
|
||||||
build.number = number
|
build.number = number
|
||||||
else:
|
else:
|
||||||
self.log.error("Unable to find build %s" % uuid)
|
self.log.error("Unable to find build %s" % uuid)
|
||||||
|
|
||||||
|
def lookForLostBuilds(self):
|
||||||
|
self.log.debug("Looking for lost builds")
|
||||||
|
lostbuilds = []
|
||||||
|
for build in self.builds.values():
|
||||||
|
if build.result:
|
||||||
|
# The build has finished, it will be removed
|
||||||
|
continue
|
||||||
|
if build.number:
|
||||||
|
# The build has started; see if it has finished
|
||||||
|
info = self.jenkins.get_build_info(build.job.name,
|
||||||
|
build.number)
|
||||||
|
if not info:
|
||||||
|
self.log.debug("Lost build %s because it started but \
|
||||||
|
info can not be retreived" % build)
|
||||||
|
lostbuilds.append(build)
|
||||||
|
continue
|
||||||
|
if not info['result']:
|
||||||
|
# It hasn't finished, continue
|
||||||
|
continue
|
||||||
|
finish_time = (info['timestamp'] + info['duration']) / 1000
|
||||||
|
if time.time() - finish_time > JENKINS_GRACE_TIME:
|
||||||
|
self.log.debug("Lost build %s because it finished \
|
||||||
|
more than 5 minutes ago" % build)
|
||||||
|
lostbuilds.append(build)
|
||||||
|
continue
|
||||||
|
# Give it more time
|
||||||
|
else:
|
||||||
|
# The build has not started
|
||||||
|
if time.time() - build.launch_time < JENKINS_GRACE_TIME:
|
||||||
|
# It just started, give it a bit
|
||||||
|
continue
|
||||||
|
info = self.findBuildInQueue(build)
|
||||||
|
if info:
|
||||||
|
# It's in the queue. All good.
|
||||||
|
continue
|
||||||
|
if build.number:
|
||||||
|
# We just got notified it started
|
||||||
|
continue
|
||||||
|
# It may have just started. If we keep ending up here,
|
||||||
|
# assume the worst.
|
||||||
|
if hasattr(build, '_jenkins_missing_from_queue'):
|
||||||
|
missing_time = build._jenkins_missing_from_queue
|
||||||
|
if time.time() - missing_time > JENKINS_GRACE_TIME:
|
||||||
|
self.log.debug("Lost build %s because it has not \
|
||||||
|
started and is not in the queue" % build)
|
||||||
|
lostbuilds.append(build)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
build._jenkins_missing_from_queue = time.time()
|
||||||
|
|
||||||
|
for build in lostbuilds:
|
||||||
|
self.log.error("Declaring %s lost" % build)
|
||||||
|
self.onBuildCompleted(build.uuid, 'LOST', None, None)
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
class ChangeQueue(object):
|
class ChangeQueue(object):
|
||||||
|
@ -74,6 +75,8 @@ class Build(object):
|
||||||
self.status = None
|
self.status = None
|
||||||
self.url = None
|
self.url = None
|
||||||
self.number = None
|
self.number = None
|
||||||
|
self.result = None
|
||||||
|
self.launch_time = time.time()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<Build %s of %s>' % (self.uuid, self.job.name)
|
return '<Build %s of %s>' % (self.uuid, self.job.name)
|
||||||
|
@ -197,7 +200,8 @@ class Change(object):
|
||||||
def setResult(self, build):
|
def setResult(self, build):
|
||||||
self.running_builds.remove(build)
|
self.running_builds.remove(build)
|
||||||
self.jobs[build.job.name] = build.result
|
self.jobs[build.job.name] = build.result
|
||||||
self.job_urls[build.job.name] = build.url
|
if build.url:
|
||||||
|
self.job_urls[build.job.name] = build.url
|
||||||
if build.result != 'SUCCESS':
|
if build.result != 'SUCCESS':
|
||||||
# Get a JobTree from a Job so we can find only its dependent jobs
|
# Get a JobTree from a Job so we can find only its dependent jobs
|
||||||
root = self.project.getJobTreeForQueue(self.queue_name)
|
root = self.project.getJobTreeForQueue(self.queue_name)
|
||||||
|
|
Loading…
Reference in New Issue