Switch the launcher to Gearman.
Remove the Jenkins launcher and add a new Gearman launcher (designed to be compatible with Jenkins) in its place. See the documentation for how to set up the Gearman Plugin for Jenkins. Change-Id: Ie7224396271d7375f4ea42eebb57f883bc291738changes/96/28696/5
parent
bdafe495db
commit
1f4c2bb104
@ -0,0 +1,2 @@
|
||||
def select_debian_node(change, params):
|
||||
params['ZUUL_NODE'] = 'debian'
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,387 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import gear
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import threading
|
||||
from uuid import uuid4
|
||||
|
||||
from zuul.model import Build
|
||||
|
||||
|
||||
class GearmanCleanup(threading.Thread):
|
||||
""" A thread that checks to see if outstanding builds have
|
||||
completed without reporting back. """
|
||||
log = logging.getLogger("zuul.JenkinsCleanup")
|
||||
|
||||
def __init__(self, gearman):
|
||||
threading.Thread.__init__(self)
|
||||
self.gearman = gearman
|
||||
self.wake_event = threading.Event()
|
||||
self._stopped = False
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
self.wake_event.set()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.wake_event.wait(300)
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
self.gearman.lookForLostBuilds()
|
||||
except:
|
||||
self.log.exception("Exception checking builds:")
|
||||
|
||||
|
||||
def getJobData(job):
|
||||
if not len(job.data):
|
||||
return {}
|
||||
d = job.data[-1]
|
||||
if not d:
|
||||
return {}
|
||||
return json.loads(d)
|
||||
|
||||
|
||||
class ZuulGearmanClient(gear.Client):
|
||||
def __init__(self, zuul_gearman):
|
||||
super(ZuulGearmanClient, self).__init__()
|
||||
self.__zuul_gearman = zuul_gearman
|
||||
|
||||
def handleWorkComplete(self, packet):
|
||||
job = super(ZuulGearmanClient, self).handleWorkComplete(packet)
|
||||
self.__zuul_gearman.onBuildCompleted(job)
|
||||
return job
|
||||
|
||||
def handleWorkFail(self, packet):
|
||||
job = super(ZuulGearmanClient, self).handleWorkFail(packet)
|
||||
self.__zuul_gearman.onBuildCompleted(job)
|
||||
return job
|
||||
|
||||
def handleWorkStatus(self, packet):
|
||||
job = super(ZuulGearmanClient, self).handleWorkStatus(packet)
|
||||
self.__zuul_gearman.onWorkStatus(job)
|
||||
return job
|
||||
|
||||
def handleWorkData(self, packet):
|
||||
job = super(ZuulGearmanClient, self).handleWorkData(packet)
|
||||
self.__zuul_gearman.onWorkStatus(job)
|
||||
return job
|
||||
|
||||
def handleDisconnect(self, job):
|
||||
job = super(ZuulGearmanClient, self).handleDisconnect(job)
|
||||
self.__zuul_gearman.onDisconnect(job)
|
||||
|
||||
def handleStatusRes(self, packet):
|
||||
try:
|
||||
job = super(ZuulGearmanClient, self).handleStatusRes(packet)
|
||||
except gear.UnknownJobError:
|
||||
handle = packet.getArgument(0)
|
||||
for build in self.__zuul_gearman.builds:
|
||||
if build.__gearman_job.handle == handle:
|
||||
self.__zuul_gearman.onUnknownJob(job)
|
||||
|
||||
|
||||
class Gearman(object):
|
||||
log = logging.getLogger("zuul.Gearman")
|
||||
negative_function_cache_ttl = 5
|
||||
|
||||
def __init__(self, config, sched):
|
||||
self.sched = sched
|
||||
self.builds = {}
|
||||
self.meta_jobs = {} # A list of meta-jobs like stop or describe
|
||||
server = config.get('gearman', 'server')
|
||||
if config.has_option('gearman', 'port'):
|
||||
port = config.get('gearman', 'port')
|
||||
else:
|
||||
port = 4730
|
||||
|
||||
self.gearman = ZuulGearmanClient(self)
|
||||
self.gearman.addServer(server, port)
|
||||
|
||||
self.cleanup_thread = GearmanCleanup(self)
|
||||
self.cleanup_thread.start()
|
||||
self.function_cache = set()
|
||||
self.function_cache_time = 0
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
self.cleanup_thread.stop()
|
||||
self.cleanup_thread.join()
|
||||
self.gearman.shutdown()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
def isJobRegistered(self, name):
|
||||
if self.function_cache_time:
|
||||
for connection in self.gearman.active_connections:
|
||||
if connection.connect_time > self.function_cache_time:
|
||||
self.function_cache = set()
|
||||
self.function_cache_time = 0
|
||||
break
|
||||
if name in self.function_cache:
|
||||
self.log.debug("Function %s is registered" % name)
|
||||
return True
|
||||
if ((time.time() - self.function_cache_time) <
|
||||
self.negative_function_cache_ttl):
|
||||
self.log.debug("Function %s is not registered "
|
||||
"(negative ttl in effect)" % name)
|
||||
return False
|
||||
self.function_cache_time = time.time()
|
||||
for connection in self.gearman.active_connections:
|
||||
try:
|
||||
req = gear.StatusAdminRequest()
|
||||
connection.sendAdminRequest(req)
|
||||
req.waitForResponse()
|
||||
except Exception:
|
||||
self.log.exception("Exception while checking functions")
|
||||
continue
|
||||
for line in req.response.split('\n'):
|
||||
parts = [x.strip() for x in line.split()]
|
||||
if not parts or parts[0] == '.':
|
||||
continue
|
||||
self.function_cache.add(parts[0])
|
||||
if name in self.function_cache:
|
||||
self.log.debug("Function %s is registered" % name)
|
||||
return True
|
||||
self.log.debug("Function %s is not registered" % name)
|
||||
return False
|
||||
|
||||
def launch(self, job, change, pipeline, dependent_changes=[]):
|
||||
self.log.info("Launch job %s for change %s with dependent changes %s" %
|
||||
(job, change, dependent_changes))
|
||||
dependent_changes = dependent_changes[:]
|
||||
dependent_changes.reverse()
|
||||
uuid = str(uuid4().hex)
|
||||
params = dict(ZUUL_UUID=uuid,
|
||||
ZUUL_PROJECT=change.project.name)
|
||||
params['ZUUL_PIPELINE'] = pipeline.name
|
||||
if hasattr(change, 'refspec'):
|
||||
changes_str = '^'.join(
|
||||
['%s:%s:%s' % (c.project.name, c.branch, c.refspec)
|
||||
for c in dependent_changes + [change]])
|
||||
params['ZUUL_BRANCH'] = change.branch
|
||||
params['ZUUL_CHANGES'] = changes_str
|
||||
params['ZUUL_REF'] = ('refs/zuul/%s/%s' %
|
||||
(change.branch,
|
||||
change.current_build_set.ref))
|
||||
params['ZUUL_COMMIT'] = change.current_build_set.commit
|
||||
|
||||
zuul_changes = ' '.join(['%s,%s' % (c.number, c.patchset)
|
||||
for c in dependent_changes + [change]])
|
||||
params['ZUUL_CHANGE_IDS'] = zuul_changes
|
||||
params['ZUUL_CHANGE'] = str(change.number)
|
||||
params['ZUUL_PATCHSET'] = str(change.patchset)
|
||||
if hasattr(change, 'ref'):
|
||||
params['ZUUL_REFNAME'] = change.ref
|
||||
params['ZUUL_OLDREV'] = change.oldrev
|
||||
params['ZUUL_NEWREV'] = change.newrev
|
||||
params['ZUUL_SHORT_OLDREV'] = change.oldrev[:7]
|
||||
params['ZUUL_SHORT_NEWREV'] = change.newrev[:7]
|
||||
|
||||
params['ZUUL_REF'] = change.ref
|
||||
params['ZUUL_COMMIT'] = change.newrev
|
||||
|
||||
# This is what we should be heading toward for parameters:
|
||||
|
||||
# required:
|
||||
# ZUUL_UUID
|
||||
# ZUUL_REF (/refs/zuul/..., /refs/tags/foo, master)
|
||||
# ZUUL_COMMIT
|
||||
|
||||
# optional:
|
||||
# ZUUL_PROJECT
|
||||
# ZUUL_PIPELINE
|
||||
|
||||
# optional (changes only):
|
||||
# ZUUL_BRANCH
|
||||
# ZUUL_CHANGE
|
||||
# ZUUL_CHANGE_IDS
|
||||
# ZUUL_PATCHSET
|
||||
|
||||
# optional (ref updated only):
|
||||
# ZUUL_OLDREV
|
||||
# ZUUL_NEWREV
|
||||
# ZUUL_SHORT_NEWREV
|
||||
# ZUUL_SHORT_OLDREV
|
||||
|
||||
if callable(job.parameter_function):
|
||||
job.parameter_function(change, params)
|
||||
self.log.debug("Custom parameter function used for job %s, "
|
||||
"change: %s, params: %s" % (job, change, params))
|
||||
|
||||
if 'ZUUL_NODE' in params:
|
||||
name = "build:%s:%s" % (job.name, params['ZUUL_NODE'])
|
||||
else:
|
||||
name = "build:%s" % job.name
|
||||
build = Build(job, uuid)
|
||||
|
||||
gearman_job = gear.Job(name, json.dumps(params),
|
||||
unique=uuid)
|
||||
build.__gearman_job = gearman_job
|
||||
self.builds[uuid] = build
|
||||
|
||||
if not self.isJobRegistered(gearman_job.name):
|
||||
self.log.error("Job %s is not registered with Gearman" %
|
||||
gearman_job)
|
||||
self.onBuildCompleted(gearman_job, 'LOST')
|
||||
return build
|
||||
|
||||
try:
|
||||
self.gearman.submitJob(gearman_job)
|
||||
except Exception:
|
||||
self.log.exception("Unable to submit job to Gearman")
|
||||
self.onBuildCompleted(gearman_job, 'LOST')
|
||||
return build
|
||||
|
||||
gearman_job.waitForHandle(30)
|
||||
if not gearman_job.handle:
|
||||
self.log.error("No job handle was received for %s after 30 seconds"
|
||||
" marking as lost." %
|
||||
gearman_job)
|
||||
self.onBuildCompleted(gearman_job, 'LOST')
|
||||
|
||||
return build
|
||||
|
||||
def cancel(self, build):
|
||||
self.log.info("Cancel build %s for job %s" % (build, build.job))
|
||||
|
||||
if build.number:
|
||||
self.log.debug("Build %s has already started" % build)
|
||||
self.cancelRunningBuild(build)
|
||||
self.log.debug("Canceled running build %s" % build)
|
||||
return
|
||||
else:
|
||||
self.log.debug("Build %s has not started yet" % build)
|
||||
|
||||
self.log.debug("Looking for build %s in queue" % build)
|
||||
if self.cancelJobInQueue(build):
|
||||
self.log.debug("Removed build %s from queue" % build)
|
||||
return
|
||||
|
||||
self.log.debug("Still unable to find build %s to cancel" % build)
|
||||
if build.number:
|
||||
self.log.debug("Build %s has just started" % build)
|
||||
self.cancelRunningBuild(build)
|
||||
self.log.debug("Canceled just running build %s" % build)
|
||||
else:
|
||||
self.log.error("Build %s has not started but "
|
||||
"was not found in queue" % build)
|
||||
|
||||
def onBuildCompleted(self, job, result=None):
|
||||
if job.unique in self.meta_jobs:
|
||||
del self.meta_jobs[job.unique]
|
||||
return
|
||||
|
||||
build = self.builds.get(job.unique)
|
||||
if build:
|
||||
if result is None:
|
||||
data = getJobData(job)
|
||||
result = data.get('result')
|
||||
self.log.info("Build %s complete, result %s" %
|
||||
(job, result))
|
||||
build.result = result
|
||||
self.sched.onBuildCompleted(build)
|
||||
# The test suite expects the build to be removed from the
|
||||
# internal dict after it's added to the report queue.
|
||||
del self.builds[job.unique]
|
||||
else:
|
||||
if not job.name.startswith("stop:"):
|
||||
self.log.error("Unable to find build %s" % job.unique)
|
||||
|
||||
def onWorkStatus(self, job):
|
||||
data = getJobData(job)
|
||||
self.log.info("Build %s update" % job)
|
||||
build = self.builds.get(job.unique)
|
||||
if build:
|
||||
self.log.debug("Found build %s" % build)
|
||||
if not build.number:
|
||||
self.log.info("Build %s started" % job)
|
||||
build.url = data.get('full_url')
|
||||
build.number = data.get('number')
|
||||
build.__gearman_master = data.get('master')
|
||||
self.sched.onBuildStarted(build)
|
||||
build.fraction_complete = job.fraction_complete
|
||||
else:
|
||||
self.log.error("Unable to find build %s" % job.unique)
|
||||
|
||||
def onDisconnect(self, job):
|
||||
self.log.info("Gearman job %s lost due to disconnect" % job)
|
||||
self.onBuildCompleted(job, 'LOST')
|
||||
|
||||
def onUnknownJob(self, job):
|
||||
self.log.info("Gearman job %s lost due to unknown handle" % job)
|
||||
self.onBuildCompleted(job, 'LOST')
|
||||
|
||||
def cancelJobInQueue(self, build):
|
||||
job = build.__gearman_job
|
||||
|
||||
req = gear.CancelJobAdminRequest(job.handle)
|
||||
job.connection.sendAdminRequest(req)
|
||||
req.waitForResponse()
|
||||
self.log.debug("Response to cancel build %s request: %s" %
|
||||
(build, req.response.strip()))
|
||||
if req.response.startswith("OK"):
|
||||
try:
|
||||
del self.builds[job.unique]
|
||||
except:
|
||||
pass
|
||||
return True
|
||||
return False
|
||||
|
||||
def cancelRunningBuild(self, build):
|
||||
stop_uuid = str(uuid4().hex)
|
||||
stop_job = gear.Job("stop:%s" % build.__gearman_master,
|
||||
build.uuid,
|
||||
unique=stop_uuid)
|
||||
self.meta_jobs[stop_uuid] = stop_job
|
||||
self.log.debug("Submitting stop job: %s", stop_job)
|
||||
self.gearman.submitJob(stop_job)
|
||||
return True
|
||||
|
||||
def setBuildDescription(self, build, desc):
|
||||
try:
|
||||
name = "set_description:%s" % build.__gearman_master
|
||||
except AttributeError:
|
||||
# We haven't yet received the first data packet that tells
|
||||
# us where the job is running.
|
||||
return False
|
||||
|
||||
if not self.isJobRegistered(name):
|
||||
return False
|
||||
|
||||
desc_uuid = str(uuid4().hex)
|
||||
data = dict(unique_id=build.uuid,
|
||||
html_description=desc)
|
||||
desc_job = gear.Job(name, json.dumps(data), unique=desc_uuid)
|
||||
self.meta_jobs[desc_uuid] = desc_job
|
||||
self.log.debug("Submitting describe job: %s", desc_job)
|
||||
self.gearman.submitJob(desc_job)
|
||||
return True
|
||||
|
||||
def lookForLostBuilds(self):
|
||||
self.log.debug("Looking for lost builds")
|
||||
for build in self.builds.values():
|
||||
if build.result:
|
||||
# The build has finished, it will be removed
|
||||
continue
|
||||
job = build.__gearman_job
|
||||
if not job.handle:
|
||||
# The build hasn't been enqueued yet
|
||||
continue
|
||||
p = gear.Packet(gear.constants.REQ, gear.constants.GET_STATUS,
|
||||
job.handle)
|
||||
job.connection.sendPacket(p)
|
@ -1,499 +0,0 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# So we can name this module "jenkins" and still load the "jenkins"
|
||||
# system module
|
||||
from __future__ import absolute_import
|
||||
|
||||
import json
|
||||
import logging
|
||||
import pprint
|
||||
import threading
|
||||
import time
|
||||
import urllib # for extending jenkins lib
|
||||
import urllib2 # for extending jenkins lib
|
||||
from uuid import uuid4
|
||||
|
||||
import jenkins
|
||||
from paste import httpserver
|
||||
from webob import Request
|
||||
|
||||
from zuul.model import Build
|
||||
|
||||
# The amount of time we tolerate a change in build status without
|
||||
# receiving a notification
|
||||
JENKINS_GRACE_TIME = 60
|
||||
|
||||
|
||||
class JenkinsCallback(threading.Thread):
|
||||
log = logging.getLogger("zuul.JenkinsCallback")
|
||||
|
||||
def __init__(self, jenkins):
|
||||
threading.Thread.__init__(self)
|
||||
self.jenkins = jenkins
|
||||
|
||||
def run(self):
|
||||
httpserver.serve(self.app, host='0.0.0.0', port='8001')
|
||||
|
||||
def app(self, environ, start_response):
|
||||
request = Request(environ)
|
||||
if request.path == '/jenkins_endpoint':
|
||||
self.jenkins_endpoint(request)
|
||||
start_response('200 OK', [('content-type', 'text/html')])
|
||||
return ['Zuul good.']
|
||||
elif request.path == '/status':
|
||||
try:
|
||||
ret = self.jenkins.sched.formatStatusHTML()
|
||||
except:
|
||||
self.log.exception("Exception formatting status:")
|
||||
raise
|
||||
start_response('200 OK', [('content-type', 'text/html')])
|
||||
return [ret]
|
||||
elif request.path == '/status.json':
|
||||
try:
|
||||
ret = self.jenkins.sched.formatStatusJSON()
|
||||
except:
|
||||
self.log.exception("Exception formatting status:")
|
||||
raise
|
||||
start_response('200 OK', [('content-type', 'application/json'),
|
||||
('Access-Control-Allow-Origin', '*')])
|
||||
return [ret]
|
||||
else:
|
||||
start_response('200 OK', [('content-type', 'text/html')])
|
||||
return ['Zuul good.']
|
||||
|
||||
def jenkins_endpoint(self, request):
|
||||
try:
|
||||
data = json.loads(request.body)
|
||||
except:
|
||||
self.log.exception("Exception handling Jenkins notification:")
|
||||
raise # let wsgi handler process the issue
|
||||
if data:
|
||||
self.log.debug("Received data from Jenkins: \n%s" %
|
||||
(pprint.pformat(data)))
|
||||
build = data.get('build')
|
||||
if build:
|
||||
phase = build.get('phase')
|
||||
status = build.get('status')
|
||||
url = build.get('full_url')
|
||||
number = build.get('number')
|
||||
params = build.get('parameters')
|
||||
if params:
|
||||
# UUID is deprecated in favor of ZUUL_UUID
|
||||
uuid = params.get('ZUUL_UUID') or params.get('UUID')
|
||||
if (status and url and uuid and phase and
|
||||
phase == 'COMPLETED'):
|
||||
self.jenkins.onBuildCompleted(uuid,
|
||||
status,
|
||||
url,
|
||||
number)
|
||||
if (phase and phase == 'STARTED'):
|
||||
self.jenkins.onBuildStarted(uuid, url, number)
|
||||
|
||||
|
||||
class JenkinsCleanup(threading.Thread):
|
||||
""" A thread that checks to see if outstanding builds have
|
||||
completed without reporting back. """
|
||||
log = logging.getLogger("zuul.JenkinsCleanup")
|
||||
|
||||
def __init__(self, jenkins):
|
||||
threading.Thread.__init__(self)
|
||||
self.jenkins = jenkins
|
||||
self.wake_event = threading.Event()
|
||||
self._stopped = False
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
self.wake_event.set()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.wake_event.wait(180)
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
self.jenkins.lookForLostBuilds()
|
||||
except:
|
||||
self.log.exception("Exception checking builds:")
|
||||
|
||||
|
||||
STOP_BUILD = 'job/%(name)s/%(number)s/stop'
|
||||
CANCEL_QUEUE = 'queue/item/%(number)s/cancelQueue'
|
||||
BUILD_INFO = 'job/%(name)s/%(number)s/api/json?depth=0'
|
||||
BUILD_DESCRIPTION = 'job/%(name)s/%(number)s/submitDescription'
|
||||
DEBUG = False
|
||||
|
||||
|
||||
class ExtendedJenkins(jenkins.Jenkins):
|
||||
def jenkins_open(self, req):
|
||||
'''
|
||||
Utility routine for opening an HTTP request to a Jenkins server.
|
||||
'''
|
||||
try:
|
||||
if self.auth:
|
||||
req.add_header('Authorization', self.auth)
|
||||
return urllib2.urlopen(req).read()
|
||||
except urllib2.HTTPError, e:
|
||||
if DEBUG:
|
||||
print e.msg
|
||||
print e.fp.read()
|
||||
raise
|
||||
|
||||
def stop_build(self, name, number):
|
||||
'''
|
||||
Stop a running Jenkins build.
|
||||
|
||||
@param name: Name of Jenkins job
|
||||
@type name: str
|
||||
@param number: Jenkins build number for the job
|
||||
@type number: int
|
||||
'''
|
||||
request = urllib2.Request(self.server + STOP_BUILD % locals())
|
||||
self.jenkins_open(request)
|
||||
|
||||
def cancel_queue(self, number):
|
||||
'''
|
||||
Cancel a queued build.
|
||||
|
||||
@param number: Jenkins queue number for the build
|
||||
@type number: int
|
||||
'''
|
||||
# Jenkins returns a 302 from this URL, unless Referer is not set,
|
||||
# then you get a 404.
|
||||
request = urllib2.Request(self.server + CANCEL_QUEUE % locals(),
|
||||
urllib.urlencode({}),
|
||||
headers={'Referer': self.server})
|
||||
self.jenkins_open(request)
|
||||
|
||||
def get_build_info(self, name, number):
|
||||
'''
|
||||
Get information for a build.
|
||||
|
||||
@param name: Name of Jenkins job
|
||||
@type name: str
|
||||
@param number: Jenkins build number for the job
|
||||
@type number: int
|
||||
@return: dictionary
|
||||
'''
|
||||
request = urllib2.Request(self.server + BUILD_INFO % locals())
|
||||
return json.loads(self.jenkins_open(request))
|
||||
|
||||
def set_build_description(self, name, number, description):
|
||||
'''
|
||||
Get information for a build.
|
||||
|
||||
@param name: Name of Jenkins job
|
||||
@type name: str
|
||||
@param number: Jenkins build number for the job
|
||||
@type number: int
|
||||
@param description: Bulid description to set
|
||||
@type description: str
|
||||
'''
|
||||
params = urllib.urlencode({'description': description})
|
||||
request = urllib2.Request(self.server + BUILD_DESCRIPTION % locals(),
|
||||
params)
|
||||
self.jenkins_open(request)
|
||||
|
||||
|
||||
class Jenkins(object):
|
||||
log = logging.getLogger("zuul.Jenkins")
|
||||
launch_retry_timeout = 5
|
||||
|
||||
def __init__(self, config, sched):
|
||||
self.sched = sched
|
||||
self.builds = {}
|
||||
server = config.get('jenkins', 'server')
|
||||
user = config.get('jenkins', 'user')
|
||||
apikey = config.get('jenkins', 'apikey')
|
||||
self.jenkins = ExtendedJenkins(server, user, apikey)
|
||||
self.callback_thread = JenkinsCallback(self)
|
||||
self.callback_thread.start()
|
||||
self.cleanup_thread = JenkinsCleanup(self)
|
||||
self.cleanup_thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.cleanup_thread.stop()
|
||||
self.cleanup_thread.join()
|
||||
|
||||
#TODO: remove dependent_changes
|
||||
def launch(self, job, change, pipeline, dependent_changes=[]):
|
||||
self.log.info("Launch job %s for change %s with dependent changes %s" %
|
||||
(job, change, dependent_changes))
|
||||
dependent_changes = dependent_changes[:]
|
||||
dependent_changes.reverse()
|
||||
uuid = str(uuid4().hex)
|
||||
params = dict(UUID=uuid, # deprecated
|
||||
ZUUL_UUID=uuid,
|
||||
GERRIT_PROJECT=change.project.name, # deprecated
|
||||
ZUUL_PROJECT=change.project.name)
|
||||
params['ZUUL_PIPELINE'] = pipeline.name
|
||||
if hasattr(change, 'refspec'):
|
||||
changes_str = '^'.join(
|
||||
['%s:%s:%s' % (c.project.name, c.branch, c.refspec)
|
||||
for c in dependent_changes + [change]])
|
||||
params['GERRIT_BRANCH'] = change.branch # deprecated
|
||||
params['ZUUL_BRANCH'] = change.branch
|
||||
params['GERRIT_CHANGES'] = changes_str # deprecated
|
||||
params['ZUUL_CHANGES'] = changes_str
|
||||
params['ZUUL_REF'] = ('refs/zuul/%s/%s' %
|
||||
(change.branch,
|
||||
change.current_build_set.ref))
|
||||
params['ZUUL_COMMIT'] = change.current_build_set.commit
|
||||
|
||||
zuul_changes = ' '.join(['%s,%s' % (c.number, c.patchset)
|
||||
for c in dependent_changes + [change]])
|
||||
params['ZUUL_CHANGE_IDS'] = zuul_changes
|
||||
params['ZUUL_CHANGE'] = str(change.number)
|
||||
params['ZUUL_PATCHSET'] = str(change.patchset)
|
||||
if hasattr(change, 'ref'):
|
||||
params['GERRIT_REFNAME'] = change.ref # deprecated
|
||||
params['ZUUL_REFNAME'] = change.ref
|
||||
params['GERRIT_OLDREV'] = change.oldrev # deprecated
|
||||
params['ZUUL_OLDREV'] = change.oldrev
|
||||
params['GERRIT_NEWREV'] = change.newrev # deprecated
|
||||
params['ZUUL_NEWREV'] = change.newrev
|
||||
params['ZUUL_SHORT_OLDREV'] = change.oldrev[:7]
|
||||
params['ZUUL_SHORT_NEWREV'] = change.newrev[:7]
|
||||
|
||||
params['ZUUL_REF'] = change.ref
|
||||
params['ZUUL_COMMIT'] = change.newrev
|
||||
|
||||
# This is what we should be heading toward for parameters:
|
||||
|
||||
# required:
|
||||
# ZUUL_UUID
|
||||
# ZUUL_REF (/refs/zuul/..., /refs/tags/foo, master)
|
||||
# ZUUL_COMMIT
|
||||
|
||||
# optional:
|
||||
# ZUUL_PROJECT
|
||||
# ZUUL_PIPELINE
|
||||
|
||||
# optional (changes only):
|
||||
# ZUUL_BRANCH
|
||||
# ZUUL_CHANGE
|
||||
# ZUUL_CHANGE_IDS
|
||||
# ZUUL_PATCHSET
|
||||
|
||||
# optional (ref updated only):
|
||||
# ZUUL_OLDREV
|
||||
# ZUUL_NEWREV
|
||||
# ZUUL_SHORT_NEWREV
|
||||
# ZUUL_SHORT_OLDREV
|
||||
|
||||
if callable(job.parameter_function):
|
||||
job.parameter_function(change, params)
|
||||
self.log.debug("Custom parameter function used for job %s, "
|
||||
"change: %s, params: %s" % (job, change, params))
|
||||
|
||||
build = Build(job, uuid)
|
||||
# We can get the started notification on another thread before
|
||||
# this is done so we add the build even before we trigger the
|
||||
# job on Jenkins.
|
||||
self.builds[uuid] = build
|
||||
# Sometimes Jenkins may erroneously respond with a 404. Handle
|
||||
# that by retrying for 30 seconds.
|
||||
launched = False
|
||||
errored = False
|
||||
for count in range(6):
|
||||
try:
|
||||
self.jenkins.build_job(job.name, parameters=params)
|
||||
launched = True
|
||||
break
|
||||
except:
|
||||
errored = True
|
||||
self.log.exception("Exception launching build %s for "
|
||||
"job %s for change %s (will retry):" %
|
||||
(build, job, change))
|
||||
time.sleep(self.launch_retry_timeout)
|
||||
|
||||
if errored:
|
||||
if launched:
|
||||
self.log.error("Finally able to launch %s" % build)
|
||||
else:
|
||||
self.log.error("Unable to launch %s, even after retrying, "
|
||||
"declaring lost" % build)
|
||||
# To keep the queue moving, declare this as a lost build
|
||||
# so that the change will get dropped.
|
||||
self.onBuildCompleted(build.uuid, 'LOST', None, None)
|
||||
return build
|
||||
|
||||
def findBuildInQueue(self, build):
|
||||
for item in self.jenkins.get_queue_info():
|
||||
if 'actions' not in item:
|
||||
continue
|
||||
for action in item['actions']:
|
||||
if 'parameters' not in action:
|
||||
continue
|
||||
parameters = action['parameters']
|
||||
for param in parameters:
|
||||
# UUID is deprecated in favor of ZUUL_UUID
|
||||
if ((param['name'] in ['ZUUL_UUID', 'UUID'])
|
||||
and build.uuid == param['value']):
|
||||
return item
|
||||
return False
|
||||
|
||||
def cancel(self, build):
|
||||
self.log.info("Cancel build %s for job %s" % (build, build.job))
|
||||
if build.number:
|
||||
self.log.debug("Build %s has already started" % build)
|
||||
self.jenkins.stop_build(build.job.name, build.number)
|
||||
self.log.debug("Canceled running build %s" % build)
|
||||
return
|
||||
else:
|
||||
self.log.debug("Build %s has not started yet" % build)
|
||||
|
||||
self.log.debug("Looking for build %s in queue" % build)
|
||||
item = self.findBuildInQueue(build)
|
||||
if item:
|
||||
self.log.debug("Found queue item %s for build %s" %
|
||||
(item['id'], build))
|
||||
try:
|
||||
self.jenkins.cancel_queue(item['id'])
|
||||
self.log.debug("Canceled queue item %s for build %s" %
|
||||
(item['id'], build))
|
||||
return
|
||||
except:
|
||||
self.log.exception("Exception canceling queue item %s "
|
||||
"for build %s" % (item['id'], build))
|
||||
|
||||
self.log.debug("Still unable to find build %s to cancel" % build)
|
||||
if build.number:
|
||||
self.log.debug("Build %s has just started" % build)
|
||||
self.jenkins.stop_build(build.job.name, build.number)
|
||||
self.log.debug("Canceled just running build %s" % build)
|
||||
else:
|
||||
self.log.error("Build %s has not started but "
|
||||
"was not found in queue" % build)
|
||||
|
||||
def setBuildDescription(self, build, description):
|
||||
if not build.number:
|
||||
return
|
||||
try:
|
||||
self.jenkins.set_build_description(build.job.name,
|
||||
build.number,
|
||||
description)
|
||||
except:
|
||||
self.log.exception("Exception setting build description for %s" %
|
||||
build)
|
||||
|
||||
def onBuildCompleted(self, uuid, status, url, number):
|
||||
self.log.info("Build %s #%s complete, status %s" %
|
||||
(uuid, number, status))
|
||||
build = self.builds.get(uuid)
|
||||
if build:
|
||||
self.log.debug("Found build %s" % build)
|
||||
del self.builds[uuid]
|
||||
if url:
|
||||
build.url = url
|
||||
build.result = status
|
||||
build.number = number
|
||||
self.sched.onBuildCompleted(build)
|
||||
else:
|
||||
self.log.error("Unable to find build %s" % uuid)
|
||||
|
||||
def onBuildStarted(self, uuid, url, number):
|
||||
self.log.info("Build %s #%s started, url: %s" % (uuid, number, url))
|
||||
build = self.builds.get(uuid)
|
||||
if build:
|
||||
self.log.debug("Found build %s" % build)
|
||||
build.url = url
|
||||
build.number = number
|
||||
self.sched.onBuildStarted(build)
|
||||
else:
|
||||
self.log.error("Unable to find build %s" % uuid)
|
||||
|
||||
def lookForLostBuilds(self):
|
||||
self.log.debug("Looking for lost builds")
|
||||
lostbuilds = []
|
||||
for build in self.builds.values():
|
||||
if build.result:
|
||||
# The build has finished, it will be removed
|
||||
continue
|
||||
if build.number:
|
||||
# The build has started; see if it has finished
|
||||
try:
|
||||
info = self.jenkins.get_build_info(build.job.name,
|
||||
build.number)
|
||||
if hasattr(build, '_jenkins_missing_build_info'):
|
||||
del build._jenkins_missing_build_info
|
||||
except:
|
||||
self.log.exception("Exception getting info for %s" % build)
|
||||
# We can't look it up in jenkins. That could be transient.
|
||||
# If it keeps up, assume it's permanent.
|
||||
if hasattr(build, '_jenkins_missing_build_info'):
|
||||
missing_time = build._jenkins_missing_build_info
|
||||
if time.time() - missing_time > JENKINS_GRACE_TIME:
|
||||
self.log.debug("Lost build %s because "
|
||||
"it has started but "
|
||||
"the build URL is not working" %
|
||||
build)
|
||||
lostbuilds.append(build)
|
||||
else:
|
||||
build._jenkins_missing_build_info = time.time()
|
||||
continue
|
||||
|
||||
if not info:
|
||||
self.log.debug("Lost build %s because "
|
||||
"it started but "
|
||||
"info can not be retreived" % build)
|
||||
lostbuilds.append(build)
|
||||
continue
|
||||
if info['building']:
|
||||
# It has not finished.
|
||||
continue
|
||||
if info['duration'] == 0:
|
||||
# Possible jenkins bug -- not building, but no duration
|
||||
self.log.debug("Possible jenkins bug with build %s: "
|
||||
"not building, but no duration is set "
|
||||
"Build info %s:" % (build,
|
||||
pprint.pformat(info)))
|
||||
continue
|
||||
finish_time = (info['timestamp'] + info['duration']) / 1000
|
||||
if time.time() - finish_time > JENKINS_GRACE_TIME:
|
||||
self.log.debug("Lost build %s because "
|
||||
"it finished more than 5 minutes ago. "
|
||||
"Build info %s:" % (build,
|
||||
pprint.pformat(info)))
|
||||
lostbuilds.append(build)
|
||||
continue
|
||||
# Give it more time
|
||||
else:
|
||||
# The build has not started
|
||||
if time.time() - build.launch_time < JENKINS_GRACE_TIME:
|
||||
# It just started, give it a bit
|
||||
continue
|
||||
info = self.findBuildInQueue(build)
|
||||
if info:
|
||||
# It's in the queue. All good.
|
||||
continue
|
||||
if build.number:
|
||||
# We just got notified it started
|
||||
continue
|
||||
# It may have just started. If we keep ending up here,
|
||||
# assume the worst.
|
||||
if hasattr(build, '_jenkins_missing_from_queue'):
|
||||
missing_time = build._jenkins_missing_from_queue
|
||||
if time.time() - missing_time > JENKINS_GRACE_TIME:
|
||||
self.log.debug("Lost build %s because "
|
||||
"it has not started and "
|
||||
"is not in the queue" % build)
|
||||
lostbuilds.append(build)
|
||||
continue
|
||||
else:
|
||||
build._jenkins_missing_from_queue = time.time()
|
||||
|
||||
for build in lostbuilds:
|
||||
self.log.error("Declaring %s lost" % build)
|
||||
self.onBuildCompleted(build.uuid, 'LOST', None, None)
|
@ -0,0 +1,50 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from paste import httpserver
|
||||
from webob import Request
|
||||
|
||||
|
||||
class WebApp(threading.Thread):
|
||||
log = logging.getLogger("zuul.WebApp")
|
||||
|
||||
def __init__(self, scheduler):
|
||||
threading.Thread.__init__(self)
|
||||
self.scheduler = scheduler
|
||||
|
||||
def run(self):
|
||||
self.server = httpserver.serve(self.app, host='0.0.0.0', port='8001',
|
||||
start_loop=False)
|
||||
self.server.serve_forever()
|
||||
|
||||
def stop(self):
|
||||
self.server.server_close()
|
||||
|
||||
def app(self, environ, start_response):
|
||||
request = Request(environ)
|
||||
if request.path == '/status.json':
|
||||
try:
|
||||
ret = self.scheduler.formatStatusJSON()
|
||||
except:
|
||||
self.log.exception("Exception formatting status:")
|
||||
raise
|
||||
start_response('200 OK', [('content-type', 'application/json'),
|
||||
('Access-Control-Allow-Origin', '*')])
|
||||
return [ret]
|
||||
else:
|
||||
start_response('404 Not Found', [('content-type', 'text/plain')])
|
||||
return ['Not found.']
|
Loading…
Reference in New Issue