Merge branch 'feature/gearman'

Change-Id: I25b4b90b160c258094aa1f3f55a9e00ad0d0db79
This commit is contained in:
James E. Blair 2013-07-18 14:29:19 -07:00
commit c225b4dcf4
38 changed files with 2892 additions and 2648 deletions

2
.gitignore vendored
View File

@ -1,6 +1,8 @@
*.egg
*.egg-info
*.pyc
.test
.testrepository
.tox
AUTHORS
build/*

4
.testr.conf Normal file
View File

@ -0,0 +1,4 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -1,11 +1,7 @@
include AUTHORS
include HACKING
include LICENSE
include README.rst
include ChangeLog
include tox.ini
include zuul/versioninfo
recursive-include etc *
recursive-include doc *
recursive-include tests *
recursive-include tools *
exclude .gitignore
exclude .gitreview
global-exclude *.pyc

13
NEWS.rst Normal file
View File

@ -0,0 +1,13 @@
Since 1.2.0:
* The Jenkins launcher is replaced with Gearman launcher. An internal
Gearman server is provided, and there is a Gearman plugin for
Jenkins, so migration to the new system should be fairly
straightforward. See the Launchers section of the documentation for
details.
* The custom parameter function signature now takes a QueueItem as the
first argument, rather than the Change. The QueueItem has the full
context for why the change is being run (including the pipeline,
items ahead and behind, etc.). The Change is still available via
the "change" attribute on the QueueItem.

73
TESTING.rst Normal file
View File

@ -0,0 +1,73 @@
===========================
Testing Your OpenStack Code
===========================
------------
A Quickstart
------------
This is designed to be enough information for you to run your first tests.
Detailed information on testing can be found here: https://wiki.openstack.org/wiki/Testing
*Install pip*::
[apt-get | yum] install python-pip
More information on pip here: http://www.pip-installer.org/en/latest/
*Use pip to install tox*::
pip install tox
Run The Tests
-------------
*Navigate to the project's root directory and execute*::
tox
Note: completing this command may take a long time (depends on system resources)
also, you might not see any output until tox is complete.
Information about tox can be found here: http://testrun.org/tox/latest/
Run The Tests in One Environment
--------------------------------
Tox will run your entire test suite in the environments specified in the project tox.ini::
[tox]
envlist = <list of available environments>
To run the test suite in just one of the environments in envlist execute::
tox -e <env>
so for example, *run the test suite in py26*::
tox -e py26
Run One Test
------------
To run individual tests with tox::
tox -e <env> -- path.to.module.Class.test
For example, to *run the basic Zuul test*::
tox -e py27 -- tests.test_scheduler.TestScheduler.test_jobs_launched
To *run one test in the foreground* (after previously having run tox
to set up the virtualenv)::
.tox/py27/bin/python -m testtools.run tests.test_scheduler.TestScheduler.test_jobs_launched
Need More Info?
---------------
More information about testr: https://wiki.openstack.org/wiki/Testr
More information about nose: https://nose.readthedocs.org/en/latest/
More information about testing OpenStack code can be found here:
https://wiki.openstack.org/wiki/Testing

View File

@ -45,16 +45,6 @@ master_doc = 'index'
project = u'Zuul'
copyright = u'2012, OpenStack'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# Version info
from zuul.version import version_info as zuul_version
release = zuul_version.version_string_with_vcs()
# The short X.Y version.
version = zuul_version.canonical_version_string()
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None

View File

@ -10,8 +10,8 @@ Zuul is a program that is used to gate the source code repository of a
project so that changes are only merged if they pass tests.
The main component of Zuul is the scheduler. It receives events
related to proposed changes (currently from Gerrit), triggers tests
based on those events (currently on Jenkins), and reports back.
related to proposed changes, triggers tests based on those events, and
reports back.
Contents:

View File

@ -1,73 +1,80 @@
:title: Launchers
.. _launchers:
.. _Gearman: http://gearman.org/
.. _`Gearman Plugin`:
https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin
.. _launchers:
Launchers
=========
Zuul has a modular architecture for launching jobs. Currently only
Jenkins is supported, but it should be fairly easy to add a module to
support other systems. Zuul makes very few assumptions about the
interface to a launcher -- if it can trigger jobs, cancel them, and
receive success or failure reports, it should be able to be used with
Zuul. Patches to this effect are welcome.
Zuul has a modular architecture for launching jobs. Currently, the
only supported module interfaces with Gearman_. This design allows
any system to run jobs for Zuul simply by interfacing with a Gearman
server. The recommended way of integrating a new job-runner with Zuul
is via this method.
Jenkins
If Gearman is unsuitable, Zuul may be extended with a new launcher
module. Zuul makes very few assumptions about the interface to a
launcher -- if it can trigger jobs, cancel them, and receive success
or failure reports, it should be able to be used with Zuul. Patches
to this effect are welcome.
Gearman
-------
Zuul works with Jenkins using the Jenkins API and the notification
module. It uses the Jenkins API to trigger jobs, passing in
parameters indicating what should be tested. It recieves
notifications on job completion via the notification API (so jobs must
be conifigured to notify Zuul).
Gearman_ is a general-purpose protocol for distributing jobs to any
number of workers. Zuul works with Gearman by sending specific
information with job requests to Gearman, and expects certain
information to be returned on completion. This protocol is described
in `Zuul-Gearman Protocol`_.
Jenkins Configuration
~~~~~~~~~~~~~~~~~~~~~
The `Gearman Jenkins Plugin`_ makes it easy to use Jenkins with Zuul
by providing an interface between Jenkins and Gearman. In this
configuration, Zuul asks Gearman to run jobs, and Gearman can then
distribute those jobs to any number of Jenkins systems (including
multiple Jenkins masters).
Zuul will need access to a Jenkins user. Create a user in Jenkins,
and then visit the configuration page for the user:
In order for Zuul to run any jobs, you will need a running Gearman
server. Zuul includes a Gearman server, and it is recommended that it
be used as it supports the following features needed by Zuul:
https://jenkins.example.com/user/USERNAME/configure
* Canceling jobs in the queue (admin protocol command "cancel job").
* Strict FIFO queue operation (gearmand's round-robin mode may be
sufficient, but is untested).
And click **Show API Token** to retrieve the API token for that user.
You will need this later when configuring Zuul. Appropriate user
permissions must be set under the Jenkins security matrix: under the
``Global`` group of permissions, check ``Read``, then under the ``Job``
group of permissions, check ``Read`` and ``Build``. Finally, under
``Run`` check ``Update``. If using a per project matrix, make sure the
user permissions are properly set for any jobs that you want Zuul to
trigger.
To enable the built-in server, see the ``gearman_server`` section of
``zuul.conf``. Be sure that the host allows connections from Zuul and
any workers (e.g., Jenkins masters) on TCP port 4730, and nowhere else
(as the Gearman protocol does not include any provision for
authentication.
Make sure the notification plugin is installed. Visit the plugin
manager on your jenkins:
Gearman Jenkins Plugin
----------------------
https://jenkins.example.com/pluginManager/
The `Gearman Plugin`_ can be installed in Jenkins in order to
facilitate Jenkins running jobs for Zuul. Install the plugin and
configure it with the hostname or IP address of your Gearman server
and the port on which it is listening (4730 by default). It will
automatically register all known Jenkins jobs as functions that Zuul
can invoke via Gearman.
And install **Jenkins Notification plugin**. The homepage for the
plugin is at:
Any number of masters can be configured in this way, and Gearman will
distribute jobs to all of them as appropriate.
https://wiki.jenkins-ci.org/display/JENKINS/Notification+Plugin
No special Jenkins job configuration is needed to support triggering
by Zuul.
Jenkins Job Configuration
~~~~~~~~~~~~~~~~~~~~~~~~~
Zuul Parameters
---------------
For each job that you want Zuul to trigger, you will need to add a
notification endpoint for the job on that job's configuration page.
Click **Add Endpoint** and enter the following values:
**Protocol**
``HTTP``
**URL**
``http://127.0.0.1:8001/jenkins_endpoint``
If you are running Zuul on a different server than Jenkins, enter the
appropriate URL. Note that Zuul itself has no access controls, so
ensure that only Jenkins is permitted to access that URL.
Zuul will pass some parameters to Jenkins for every job it launches.
Check **This build is parameterized**, and add the following fields
with the type **String Parameter**:
Zuul will pass some parameters with every job it launches. The
Gearman Plugin will ensure these are supplied as Jenkins build
parameters, so they will be available for use in the job configuration
as well as to the running job as environment variables. They are as
follows:
**ZUUL_UUID**
Zuul provided key to link builds with Gerrit events
@ -75,27 +82,14 @@ with the type **String Parameter**:
Zuul provided ref that includes commit(s) to build
**ZUUL_COMMIT**
The commit SHA1 at the head of ZUUL_REF
Those are the only required parameters. The ZUUL_UUID is needed for Zuul to
keep track of the build, and the ZUUL_REF and ZUUL_COMMIT parameters are for
use in preparing the git repo for the build.
.. note::
The GERRIT_PROJECT and UUID parameters are deprecated respectively in
favor of ZUUL_PROJECT and ZUUL_UUID.
The following parameters will be sent for all builds, but are not required so
you do not need to configure Jenkins to accept them if you do not plan on using
them:
**ZUUL_PROJECT**
The project that triggered this build
**ZUUL_PIPELINE**
The Zuul pipeline that is building this job
The following parameters are optional and will only be provided for
builds associated with changes (i.e., in response to patchset-created
or comment-added events):
The following additional parameters will only be provided for builds
associated with changes (i.e., in response to patchset-created or
comment-added events):
**ZUUL_BRANCH**
The target branch for the change that triggered this build
@ -107,7 +101,7 @@ or comment-added events):
**ZUUL_PATCHSET**
The Gerrit patchset number for the change that triggered this build
The following parameters are optional and will only be provided for
The following additional parameters will only be provided for
post-merge (ref-updated) builds:
**ZUUL_OLDREV**
@ -132,14 +126,125 @@ plugin as follows::
Refspec: ${ZUUL_REF}
Branches to build:
Branch Specifier: ${ZUUL_COMMIT}
Advanced:
Clean after checkout: True
Advanced:
Clean after checkout: True
That should be sufficient for a job that only builds a single project.
If you have multiple interrelated projects (i.e., they share a Zuul
Change Queue) that are built together, you may be able to configure
the Git plugin to prepare them, or you may chose to use a shell script
instead. The OpenStack project uses the following script to prepare
the workspace for its integration testing:
instead. As an example, the OpenStack project uses the following
script to prepare the workspace for its integration testing:
https://github.com/openstack-infra/devstack-gate/blob/master/devstack-vm-gate-wrap.sh
Zuul-Gearman Protocol
---------------------
This section is only relevant if you intend to implement a new kind of
worker that runs jobs for Zuul via Gearman. If you just want to use
Jenkins, see `Gearman Jenkins Plugin`_ instead.
The Zuul protocol as used with Gearman is as follows:
Starting Builds
~~~~~~~~~~~~~~~
To start a build, Zuul invokes a Gearman function with the following
format:
build:FUNCTION_NAME
where **FUNCTION_NAME** is the name of the job that should be run. If
the job should run on a specific node (or class of node), Zuul will
instead invoke:
build:FUNCTION_NAME:NODE_NAME
where **NODE_NAME** is the name or class of node on which the job
should be run. This can be specified by setting the ZUUL_NODE
parameter in a paremeter-function (see :ref:`zuulconf`).
Zuul sends the ZUUL_* parameters described in `Zuul Parameters`_
encoded in JSON format as the argument included with the
SUBMIT_JOB_UNIQ request to Gearman. A unique ID (equal to the
ZUUL_UUID parameter) is also supplied to Gearman, and is accessible as
an added Gearman parameter with GRAB_JOB_UNIQ.
When a Gearman worker starts running a job for Zuul, it should
immediately send a WORK_DATA packet with the following information
encoded in JSON format:
**name**
The name of the job.
**number**
The build number (unique to this job).
**manager**
A unique identifier associated with the Gearman worker that can
abort this build. See `Stopping Builds`_ for more information.
**url** (optional)
The URL with the status or results of the build. Will be used in
the status page and the final report.
It should then immediately send a WORK_STATUS packet with a value of 0
percent complete. It may then optionally send subsequent WORK_STATUS
packets with updated completion values.
When the build is complete, it should send a final WORK_DATA packet
with the following in JSON format:
**result**
Either the string 'SUCCESS' if the job succeeded, or any other value
that describes the result if the job failed.
Finally, it should send either a WORK_FAIL or WORK_COMPLETE packet as
appropriate. A WORK_EXCEPTION packet will be interpreted as a
WORK_FAIL, but the exception will be logged in Zuul's error log.
Stopping Builds
~~~~~~~~~~~~~~~
If Zuul needs to abort a build already in progress, it will invoke the
following function through Gearman:
stop:MANAGER_NAME
Where **MANAGER_NAME** is the name of the manager worker supplied in
the initial WORK_DATA packet when the job started. This is used to
direct the stop: function invocation to the correct Gearman worker
that is capable of stopping that particular job. The argument to the
function should be the following encoded in JSON format:
**name**
The job name of the build to stop.
**number**
The build number of the build to stop.
The original job is expected to complete with a WORK_DATA and
WORK_FAIL packet as described in `Starting Builds`_.
Build Descriptions
~~~~~~~~~~~~~~~~~~
In order to update the job running system with a description of the
current state of all related builds, the job runner may optionally
implement the following Gearman function:
set_description:MANAGER_NAME
Where **MANAGER_NAME** is used as described in `Stopping Builds`_.
The argument to the function is the following encoded in JSON format:
**name**
The job name of the build to describe.
**number**
The build number of the build to describe.
**html_description**
The description of the build in HTML format.

View File

@ -9,7 +9,8 @@ Configuration
Zuul has three configuration files:
**zuul.conf**
Credentials for Gerrit and Jenkins, locations of the other config files
Connection information for Gerrit and Gearman, locations of the
other config files
**layout.yaml**
Project and pipeline configuration -- what Zuul does
**logging.conf**
@ -27,30 +28,37 @@ Zuul will look for ``/etc/zuul/zuul.conf`` or ``~/zuul.conf`` to
bootstrap its configuration. Alternately, you may specify ``-c
/path/to/zuul.conf`` on the command line.
Gerrit and Jenkins credentials are each described in a section of
zuul.conf. The location of the other two configuration files (as well
as the location of the PID file when running Zuul as a server) are
specified in a third section.
Gerrit and Gearman connection information are each described in a
section of zuul.conf. The location of the other two configuration
files (as well as the location of the PID file when running Zuul as a
server) are specified in a third section.
The three sections of this config and their options are documented below.
You can also find an example zuul.conf file in the git
`repository
<https://github.com/openstack-infra/zuul/blob/master/etc/zuul.conf-sample>`_
jenkins
gearman
"""""""
**server**
URL for the root of the Jenkins HTTP server.
``server=https://jenkins.example.com``
Hostname or IP address of the Gearman server.
``server=gearman.example.com``
**user**
User to authenticate against Jenkins with.
``user=jenkins``
**port**
Port on which the Gearman server is listening
``port=4730``
**apikey**
Jenkins API Key credentials for the above user.
``apikey=1234567890abcdef1234567890abcdef``
gearman_server
""""""""""""""
**start**
Whether to start the internal Gearman server (default: False).
``start=true``
**log_config**
Path to log config file for internal Gearman server.
``log_config=/etc/zuul/gearman-logging.yaml``
gerrit
""""""
@ -65,11 +73,11 @@ gerrit
**user**
User name to use when logging into above server via ssh.
``user=jenkins``
``user=zuul``
**sshkey**
Path to SSH key to use when logging into above server.
``sshkey=/home/jenkins/.ssh/id_rsa``
``sshkey=/home/zuul/.ssh/id_rsa``
zuul
""""
@ -115,13 +123,14 @@ zuul
**status_url**
URL that will be posted in Zuul comments made to Gerrit changes when
beginning Jenkins jobs for a change.
``status_url=https://jenkins.example.com/zuul/status``
starting jobs for a change.
``status_url=https://zuul.example.com/status``
**url_pattern**
If you are storing build logs external to Jenkins and wish to link to
those logs when Zuul makes comments on Gerrit changes for completed
jobs this setting configures what the URLs for those links should be.
If you are storing build logs external to the system that originally
ran jobs and wish to link to those logs when Zuul makes comments on
Gerrit changes for completed jobs this setting configures what the
URLs for those links should be.
``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}``
layout.yaml
@ -318,6 +327,13 @@ explanation of each of the parameters::
do when a change is added to the pipeline manager. This can be used,
for example, to reset the value of the Verified review category.
**precedence**
Indicates how the build scheduler should prioritize jobs for
different pipelines. Each pipeline may have one precedence, jobs
for pipelines with a higher precedence will be run before ones with
lower. The value should be one of ``high``, ``normal``, or ``low``.
Default: ``normal``.
Some example pipeline configurations are included in the sample layout
file. The first is called a *check* pipeline::
@ -418,13 +434,13 @@ each job as it builds a list from the project specification.
**failure-pattern (optional)**
The URL that should be reported to Gerrit if the job fails.
Defaults to the Jenkins build URL or the url_pattern configured in
Defaults to the build URL or the url_pattern configured in
zuul.conf. May be supplied as a string pattern with substitutions
as described in url_pattern in :ref:`zuulconf`.
**success-pattern (optional)**
The URL that should be reported to Gerrit if the job succeeds.
Defaults to the Jenkins build URL or the url_pattern configured in
Defaults to the build URL or the url_pattern configured in
zuul.conf. May be supplied as a string pattern with substitutions
as described in url_pattern in :ref:`zuulconf`.
@ -461,18 +477,22 @@ each job as it builds a list from the project specification.
included with the :ref:`includes` directive. The function
should have the following signature:
.. function:: parameters(change, parameters)
.. function:: parameters(item, parameters)
Manipulate the parameters passed to a job before a build is
launched. The ``parameters`` dictionary will already contain the
standard Zuul job parameters, and is expected to be modified
in-place.
:param change: the current change
:type change: zuul.model.Change
:param item: the current queue item
:type item: zuul.model.QueueItem
:param parameters: parameters to be passed to the job
:type parameters: dict
If the parameter **ZUUL_NODE** is set by this function, then it will
be used to specify on what node (or class of node) the job should be
run.
Here is an example of setting the failure message for jobs that check
whether a change merges cleanly::

View File

@ -1,7 +1,8 @@
[jenkins]
server=https://jenkins.example.com
user=jenkins
apikey=1234567890abcdef1234567890abcdef
[gearman]
server=127.0.0.1
[gearman_server]
start=true
[gerrit]
server=review.example.com

View File

@ -1,7 +0,0 @@
[DEFAULT]
# The list of modules to copy from oslo-incubator
modules=setup,version
# The base module to hold the copy of openstack.common
base=zuul

View File

@ -1,3 +1,6 @@
d2to1>=0.2.10,<0.3
pbr>=0.5,<0.6
PyYAML
python-jenkins
Paste
@ -9,3 +12,4 @@ python-daemon
extras
statsd>=1.0.0,<3.0
voluptuous>=0.6,<0.7
gear>=0.3.1,<0.4.0

View File

@ -1,11 +1,30 @@
[metadata]
name = zuul
summary = Trunk Gating System
description-file =
README.rst
author = OpenStack Infrastructure Team
author-email = openstack-infra@lists.openstack.org
home-page = http://ci.openstack.org/
classifier =
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 2.6
[global]
setup-hooks =
pbr.hooks.setup_hook
[entry_points]
console_scripts =
zuul-server = zuul.cmd.server:main
[build_sphinx]
source-dir = doc/source
build-dir = doc/build
all_files = 1
[nosetests]
verbosity=2
detailed-errors=1
cover-package = zuul
cover-html = true
cover-erase = true

View File

@ -1,51 +1,21 @@
#!/usr/bin/env python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import setuptools
from zuul.openstack.common import setup
requires = setup.parse_requirements()
test_requires = setup.parse_requirements(['tools/test-requires'])
depend_links = setup.parse_dependency_links()
project = 'zuul'
setuptools.setup(
name=project,
version=setup.get_version(project),
author='Hewlett-Packard Development Company, L.P.',
author_email='openstack@lists.launchpad.net',
description='Trunk gating system',
license='Apache License, Version 2.0',
url='http://launchpad.net/zuul',
packages=setuptools.find_packages(exclude=['tests', 'tests.*']),
include_package_data=True,
cmdclass=setup.get_cmdclass(),
install_requires=requires,
dependency_links=depend_links,
zip_safe=False,
classifiers=[
'Environment :: Console',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
'Programming Language :: Python'
],
entry_points={
'console_scripts': [
'zuul-server=zuul.cmd.server:main',
],
}
)
setup_requires=['d2to1', 'pbr'],
d2to1=True)

10
test-requirements.txt Normal file
View File

@ -0,0 +1,10 @@
hacking>=0.5.3,<0.6
coverage
sphinx
docutils==0.9.1
discover
fixtures>=0.3.12
python-subunit
testrepository>=0.0.13
testtools>=0.9.27

2
tests/fixtures/custom_functions.py vendored Normal file
View File

@ -0,0 +1,2 @@
def select_debian_node(item, params):
params['ZUUL_NODE'] = 'debian'

View File

@ -1,3 +1,6 @@
includes:
- python-file: custom_functions.py
pipelines:
- name: check
manager: IndependentPipelineManager
@ -28,6 +31,7 @@ pipelines:
verified: -2
start:
verified: 0
precedence: high
- name: unused
manager: IndependentPipelineManager
@ -65,6 +69,8 @@ jobs:
- name: project-testfile
files:
- '.*-requires'
- name: node-project-test1
parameter-function: select_debian_node
project-templates:
- name: test-one-and-two
@ -158,3 +164,9 @@ projects:
template:
- name: test-one-and-two
projectname: project
- name: org/node-project
gate:
- node-project-merge:
- node-project-test1
- node-project-test2

View File

@ -1,7 +1,5 @@
[jenkins]
server=https://jenkins.example.com
user=jenkins
apikey=1234
[gearman]
server=127.0.0.1
[gerrit]
server=review.example.com

View File

@ -14,11 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import os
import re
import yaml
import testtools
import voluptuous
import yaml
import zuul.layoutvalidator
@ -27,7 +28,7 @@ FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
LAYOUT_RE = re.compile(r'^(good|bad)_.*\.yaml$')
class testScheduler(unittest.TestCase):
class testScheduler(testtools.TestCase):
def test_layouts(self):
"""Test layout file validation"""
print

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +0,0 @@
coverage
nose
nosehtmloutput
sphinx
docutils==0.9.1

71
tools/trigger-job.py Executable file
View File

@ -0,0 +1,71 @@
#!/usr/bin/env python
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# This script can be used to manually trigger a job in the same way that
# Zuul does. At the moment, it only supports the post set of Zuul
# parameters.
import argparse
import time
import json
from uuid import uuid4
import gear
def main():
c = gear.Client()
parser = argparse.ArgumentParser(description='Trigger a Zuul job.')
parser.add_argument('--job', dest='job', required=True,
help='Job Name')
parser.add_argument('--project', dest='project', required=True,
help='Project name')
parser.add_argument('--pipeline', dest='pipeline', default='release',
help='Zuul pipeline')
parser.add_argument('--refname', dest='refname',
help='Ref name')
parser.add_argument('--oldrev', dest='oldrev',
default='0000000000000000000000000000000000000000',
help='Old revision (SHA)')
parser.add_argument('--newrev', dest='newrev',
help='New revision (SHA)')
args = parser.parse_args()
data = {'ZUUL_PIPELINE': args.pipeline,
'ZUUL_PROJECT': args.project,
'ZUUL_UUID': str(uuid4().hex),
'ZUUL_REF': args.refname,
'ZUUL_REFNAME': args.refname,
'ZUUL_OLDREV': args.oldrev,
'ZUUL_NEWREV': args.newrev,
'ZUUL_SHORT_OLDREV': args.oldrev[:7],
'ZUUL_SHORT_NEWREV': args.newrev[:7],
'ZUUL_COMMIT': args.newrev,
}
c.addServer('127.0.0.1', 4730)
c.waitForServer()
job = gear.Job("build:%s" % args.job,
json.dumps(data),
unique=data['ZUUL_UUID'])
c.submitJob(job)
while not job.complete:
time.sleep(1)
if __name__ == '__main__':
main()

View File

@ -18,7 +18,6 @@
import urllib2
import json
import sys
import argparse
parser = argparse.ArgumentParser()

20
tox.ini
View File

@ -5,22 +5,25 @@ envlist = pep8, pyflakes, py27
# Set STATSD env variables so that statsd code paths are tested.
setenv = STATSD_HOST=localhost
STATSD_PORT=8125
deps = -r{toxinidir}/tools/pip-requires
-r{toxinidir}/tools/test-requires
commands = nosetests {posargs}
VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands =
python setup.py testr --slowest --testr-args='{posargs}'
[tox:jenkins]
downloadcache = ~/cache/pip
[testenv:pep8]
deps = pep8==1.3.3
commands = pep8 --ignore=E123,E125,E128 --repeat --show-source --exclude=.venv,.tox,dist,doc,build .
commands = flake8
[testenv:cover]
setenv = NOSE_WITH_COVERAGE=1
commands =
python setup.py testr --coverage
[testenv:pyflakes]
deps = pyflakes
-r{toxinidir}/requirements.txt
commands = pyflakes zuul setup.py
[testenv:venv]
@ -28,3 +31,8 @@ commands = {posargs}
[testenv:validate-layout]
commands = zuul-server -c etc/zuul.conf-sample -t -l {posargs}
[flake8]
ignore = E123,E125,H
show-source = True
exclude = .venv,.tox,dist,doc,build,*.egg

View File

@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
#
@ -27,6 +28,8 @@ import os
import sys
import signal
import gear
# No zuul imports here because they pull in paramiko which must not be
# imported until after the daemonization.
# https://github.com/paramiko/paramiko/issues/59
@ -36,6 +39,7 @@ class Server(object):
def __init__(self):
self.args = None
self.config = None
self.gear_server_pid = None
def parse_arguments(self):
parser = argparse.ArgumentParser(description='Project gating system.')
@ -64,9 +68,9 @@ class Server(object):
return
raise Exception("Unable to locate config file in %s" % locations)
def setup_logging(self):
if self.config.has_option('zuul', 'log_config'):
fp = os.path.expanduser(self.config.get('zuul', 'log_config'))
def setup_logging(self, section, parameter):
if self.config.has_option(section, parameter):
fp = os.path.expanduser(self.config.get(section, parameter))
if not os.path.exists(fp):
raise Exception("Unable to read logging config file at %s" %
fp)
@ -77,43 +81,79 @@ class Server(object):
def reconfigure_handler(self, signum, frame):
signal.signal(signal.SIGHUP, signal.SIG_IGN)
self.read_config()
self.setup_logging()
self.setup_logging('zuul', 'log_config')
self.sched.reconfigure(self.config)
signal.signal(signal.SIGHUP, self.reconfigure_handler)
def exit_handler(self, signum, frame):
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
self.stop_gear_server()
self.sched.exit()
def term_handler(self, signum, frame):
self.stop_gear_server()
os._exit(0)
def test_config(self):
# See comment at top of file about zuul imports
import zuul.scheduler
import zuul.launcher.jenkins
import zuul.launcher.gearman
import zuul.trigger.gerrit
logging.basicConfig(level=logging.DEBUG)
self.sched = zuul.scheduler.Scheduler()
self.sched.testConfig(self.config.get('zuul', 'layout_config'))
def start_gear_server(self):
pipe_read, pipe_write = os.pipe()
child_pid = os.fork()
if child_pid == 0:
os.close(pipe_write)
self.setup_logging('gearman_server', 'log_config')
gear.Server(4730)
# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
pipe_read.read()
os._exit(0)
else:
os.close(pipe_read)
self.gear_server_pid = child_pid
self.gear_pipe_write = pipe_write
def stop_gear_server(self):
if self.gear_server_pid:
os.kill(self.gear_server_pid, signal.SIGKILL)
def main(self):
# See comment at top of file about zuul imports
import zuul.scheduler
import zuul.launcher.jenkins
import zuul.launcher.gearman
import zuul.trigger.gerrit
import zuul.webapp
if (self.config.has_option('gearman_server', 'start') and
self.config.getboolean('gearman_server', 'start')):
self.start_gear_server()
self.setup_logging('zuul', 'log_config')
self.sched = zuul.scheduler.Scheduler()
jenkins = zuul.launcher.jenkins.Jenkins(self.config, self.sched)
gearman = zuul.launcher.gearman.Gearman(self.config, self.sched)
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
webapp = zuul.webapp.WebApp(self.sched)
self.sched.setLauncher(jenkins)
self.sched.setLauncher(gearman)
self.sched.setTrigger(gerrit)
self.sched.start()
self.sched.reconfigure(self.config)
self.sched.resume()
webapp.start()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGTERM, self.term_handler)
while True:
try:
signal.pause()
@ -162,9 +202,12 @@ def main():
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
if server.args.nodaemon:
server.setup_logging()
server.main()
else:
with daemon.DaemonContext(pidfile=pid):
server.setup_logging()
server.main()
if __name__ == "__main__":
sys.path.insert(0, '.')
main()

454
zuul/launcher/gearman.py Normal file
View File

@ -0,0 +1,454 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gear
import json
import logging
import time
import threading
from uuid import uuid4
import zuul.model
from zuul.model import Build
class GearmanCleanup(threading.Thread):
""" A thread that checks to see if outstanding builds have
completed without reporting back. """
log = logging.getLogger("zuul.JenkinsCleanup")
def __init__(self, gearman):
threading.Thread.__init__(self)
self.daemon = True
self.gearman = gearman
self.wake_event = threading.Event()
self._stopped = False
def stop(self):
self._stopped = True
self.wake_event.set()
def run(self):
while True:
self.wake_event.wait(300)
if self._stopped:
return
try:
self.gearman.lookForLostBuilds()
except:
self.log.exception("Exception checking builds:")
def getJobData(job):
if not len(job.data):
return {}
d = job.data[-1]
if not d:
return {}
return json.loads(d)
class ZuulGearmanClient(gear.Client):
def __init__(self, zuul_gearman):
super(ZuulGearmanClient, self).__init__()
self.__zuul_gearman = zuul_gearman
def handleWorkComplete(self, packet):
job = super(ZuulGearmanClient, self).handleWorkComplete(packet)
self.__zuul_gearman.onBuildCompleted(job)
return job
def handleWorkFail(self, packet):
job = super(ZuulGearmanClient, self).handleWorkFail(packet)
self.__zuul_gearman.onBuildCompleted(job)
return job
def handleWorkException(self, packet):
job = super(ZuulGearmanClient, self).handleWorkException(packet)
self.__zuul_gearman.onBuildCompleted(job)
return job
def handleWorkStatus(self, packet):
job = super(ZuulGearmanClient, self).handleWorkStatus(packet)
self.__zuul_gearman.onWorkStatus(job)
return job
def handleWorkData(self, packet):
job = super(ZuulGearmanClient, self).handleWorkData(packet)
self.__zuul_gearman.onWorkStatus(job)
return job
def handleDisconnect(self, job):
job = super(ZuulGearmanClient, self).handleDisconnect(job)
self.__zuul_gearman.onDisconnect(job)
def handleStatusRes(self, packet):
try:
job = super(ZuulGearmanClient, self).handleStatusRes(packet)
except gear.UnknownJobError:
handle = packet.getArgument(0)
for build in self.__zuul_gearman.builds:
if build.__gearman_job.handle == handle:
self.__zuul_gearman.onUnknownJob(job)
def waitForGearmanToSettle(self):
# If we're running the internal gearman server, it's possible
# that after a restart or reload, we may be immediately ready
# to run jobs but all the gearman workers may not have
# registered yet. Give them a sporting chance to show up
# before we start declaring jobs lost because we don't have
# gearman functions registered for them.
# Spend up to 30 seconds after we connect to the gearman
# server waiting for the set of defined jobs to become
# consistent over a sliding 5 second window.
self.log.info("Waiting for connection to internal Gearman server")
self.waitForServer()
self.log.info("Waiting for gearman function set to settle")
start = time.time()
last_change = start
all_functions = set()
while time.time() - start < 30:
now = time.time()
last_functions = set()
for connection in self.active_connections:
try:
req = gear.StatusAdminRequest()
connection.sendAdminRequest(req)
except Exception:
self.log.exception("Exception while checking functions")
continue
for line in req.response.split('\n'):
parts = [x.strip() for x in line.split()]
if not parts or parts[0] == '.':
continue
last_functions.add(parts[0])
if last_functions != all_functions:
last_change = now
all_functions.update(last_functions)
else:
if now - last_change > 5:
self.log.info("Gearman function set has settled")
break
time.sleep(1)
self.log.info("Done waiting for Gearman server")
class Gearman(object):
log = logging.getLogger("zuul.Gearman")
negative_function_cache_ttl = 5
def __init__(self, config, sched):
self.sched = sched
self.builds = {}
self.meta_jobs = {} # A list of meta-jobs like stop or describe
server = config.get('gearman', 'server')
if config.has_option('gearman', 'port'):
port = config.get('gearman', 'port')
else:
port = 4730
self.gearman = ZuulGearmanClient(self)
self.gearman.addServer(server, port)
if (config.has_option('gearman_server', 'start') and
config.getboolean('gearman_server', 'start')):
self.gearman.waitForGearmanToSettle()
self.cleanup_thread = GearmanCleanup(self)
self.cleanup_thread.start()
self.function_cache = set()
self.function_cache_time = 0
def stop(self):
self.log.debug("Stopping")
self.cleanup_thread.stop()
self.cleanup_thread.join()
self.gearman.shutdown()
self.log.debug("Stopped")
def isJobRegistered(self, name):
if self.function_cache_time:
for connection in self.gearman.active_connections:
if connection.connect_time > self.function_cache_time:
self.function_cache = set()
self.function_cache_time = 0
break
if name in self.function_cache:
self.log.debug("Function %s is registered" % name)
return True
if ((time.time() - self.function_cache_time) <
self.negative_function_cache_ttl):
self.log.debug("Function %s is not registered "
"(negative ttl in effect)" % name)
return False
self.function_cache_time = time.time()
for connection in self.gearman.active_connections:
try:
req = gear.StatusAdminRequest()
connection.sendAdminRequest(req)
except Exception:
self.log.exception("Exception while checking functions")
continue
for line in req.response.split('\n'):
parts = [x.strip() for x in line.split()]
if not parts or parts[0] == '.':
continue
self.function_cache.add(parts[0])
if name in self.function_cache:
self.log.debug("Function %s is registered" % name)
return True
self.log.debug("Function %s is not registered" % name)
return False
def launch(self, job, item, pipeline, dependent_items=[]):
self.log.info("Launch job %s for change %s with dependent changes %s" %
(job, item.change,
[x.change for x in dependent_items]))
dependent_items = dependent_items[:]
dependent_items.reverse()
uuid = str(uuid4().hex)
params = dict(ZUUL_UUID=uuid,
ZUUL_PROJECT=item.change.project.name)
params['ZUUL_PIPELINE'] = pipeline.name
if hasattr(item.change, 'refspec'):
changes_str = '^'.join(
['%s:%s:%s' % (i.change.project.name, i.change.branch,
i.change.refspec)
for i in dependent_items + [item]])
params['ZUUL_BRANCH'] = item.change.branch
params['ZUUL_CHANGES'] = changes_str
params['ZUUL_REF'] = ('refs/zuul/%s/%s' %
(item.change.branch,
item.current_build_set.ref))
params['ZUUL_COMMIT'] = item.current_build_set.commit
zuul_changes = ' '.join(['%s,%s' % (i.change.number,
i.change.patchset)
for i in dependent_items + [item]])
params['ZUUL_CHANGE_IDS'] = zuul_changes
params['ZUUL_CHANGE'] = str(item.change.number)
params['ZUUL_PATCHSET'] = str(item.change.patchset)
if hasattr(item.change, 'ref'):
params['ZUUL_REFNAME'] = item.change.ref
params['ZUUL_OLDREV'] = item.change.oldrev
params['ZUUL_NEWREV'] = item.change.newrev
params['ZUUL_SHORT_OLDREV'] = item.change.oldrev[:7]
params['ZUUL_SHORT_NEWREV'] = item.change.newrev[:7]
params['ZUUL_REF'] = item.change.ref
params['ZUUL_COMMIT'] = item.change.newrev
# This is what we should be heading toward for parameters:
# required:
# ZUUL_UUID
# ZUUL_REF (/refs/zuul/..., /refs/tags/foo, master)
# ZUUL_COMMIT
# optional:
# ZUUL_PROJECT
# ZUUL_PIPELINE
# optional (changes only):
# ZUUL_BRANCH
# ZUUL_CHANGE
# ZUUL_CHANGE_IDS
# ZUUL_PATCHSET
# optional (ref updated only):
# ZUUL_OLDREV
# ZUUL_NEWREV
# ZUUL_SHORT_NEWREV
# ZUUL_SHORT_OLDREV
if callable(job.parameter_function):
job.parameter_function(item, params)
self.log.debug("Custom parameter function used for job %s, "
"change: %s, params: %s" % (job, item.change,
params))
if 'ZUUL_NODE' in params:
name = "build:%s:%s" % (job.name, params['ZUUL_NODE'])
else:
name = "build:%s" % job.name
build = Build(job, uuid)
build.parameters = params
gearman_job = gear.Job(name, json.dumps(params),
unique=uuid)
build.__gearman_job = gearman_job
self.builds[uuid] = build
if not self.isJobRegistered(gearman_job.name):
self.log.error("Job %s is not registered with Gearman" %
gearman_job)
self.onBuildCompleted(gearman_job, 'LOST')
return build
if pipeline.precedence == zuul.model.PRECEDENCE_NORMAL:
precedence = gear.PRECEDENCE_NORMAL
elif pipeline.precedence == zuul.model.PRECEDENCE_HIGH:
precedence = gear.PRECEDENCE_HIGH
elif pipeline.precedence == zuul.model.PRECEDENCE_LOW:
precedence = gear.PRECEDENCE_LOW
try:
self.gearman.submitJob(gearman_job, precedence=precedence)
except Exception:
self.log.exception("Unable to submit job to Gearman")
self.onBuildCompleted(gearman_job, 'LOST')
return build
if not gearman_job.handle:
self.log.error("No job handle was received for %s after 30 seconds"
" marking as lost." %
gearman_job)
self.onBuildCompleted(gearman_job, 'LOST')
return build
def cancel(self, build):
self.log.info("Cancel build %s for job %s" % (build, build.job))
if build.number is not None:
self.log.debug("Build %s has already started" % build)
self.cancelRunningBuild(build)
self.log.debug("Canceled running build %s" % build)
return
else:
self.log.debug("Build %s has not started yet" % build)
self.log.debug("Looking for build %s in queue" % build)
if self.cancelJobInQueue(build):
self.log.debug("Removed build %s from queue" % build)
return
self.log.debug("Still unable to find build %s to cancel" % build)
if build.number:
self.log.debug("Build %s has just started" % build)
self.cancelRunningBuild(build)
self.log.debug("Canceled just running build %s" % build)
else:
self.log.error("Build %s has not started but "
"was not found in queue" % build)
def onBuildCompleted(self, job, result=None):
if job.unique in self.meta_jobs:
del self.meta_jobs[job.unique]
return
build = self.builds.get(job.unique)
if build:
if result is None:
data = getJobData(job)
result = data.get('result')
if result is None:
result = 'LOST'
self.log.info("Build %s complete, result %s" %
(job, result))
build.result = result
self.sched.onBuildCompleted(build)
# The test suite expects the build to be removed from the
# internal dict after it's added to the report queue.
del self.builds[job.unique]
else:
if not job.name.startswith("stop:"):
self.log.error("Unable to find build %s" % job.unique)
def onWorkStatus(self, job):
data = getJobData(job)
self.log.debug("Build %s update %s " % (job, data))
build = self.builds.get(job.unique)
if build:
self.log.debug("Found build %s" % build)
if build.number is None:
self.log.info("Build %s started" % job)
build.url = data.get('url')
build.number = data.get('number')
build.__gearman_manager = data.get('manager')
self.sched.onBuildStarted(build)
build.fraction_complete = job.fraction_complete
else:
self.log.error("Unable to find build %s" % job.unique)
def onDisconnect(self, job):
self.log.info("Gearman job %s lost due to disconnect" % job)
self.onBuildCompleted(job, 'LOST')
def onUnknownJob(self, job):
self.log.info("Gearman job %s lost due to unknown handle" % job)
self.onBuildCompleted(job, 'LOST')
def cancelJobInQueue(self, build):
job = build.__gearman_job
req = gear.CancelJobAdminRequest(job.handle)
job.connection.sendAdminRequest(req)
self.log.debug("Response to cancel build %s request: %s" %
(build, req.response.strip()))
if req.response.startswith("OK"):
try:
del self.builds[job.unique]
except:
pass
return True
return False
def cancelRunningBuild(self, build):
stop_uuid = str(uuid4().hex)
data = dict(name=build.job.name,
number=build.number)
stop_job = gear.Job("stop:%s" % build.__gearman_manager,
json.dumps(data), unique=stop_uuid)
self.meta_jobs[stop_uuid] = stop_job
self.log.debug("Submitting stop job: %s", stop_job)
self.gearman.submitJob(stop_job, precedence=gear.PRECEDENCE_HIGH)
return True
def setBuildDescription(self, build, desc):
try:
name = "set_description:%s" % build.__gearman_manager
except AttributeError:
# We haven't yet received the first data packet that tells
# us where the job is running.
return False
if not self.isJobRegistered(name):
return False
desc_uuid = str(uuid4().hex)
data = dict(name=build.job.name,
number=build.number,
html_description=desc)
desc_job = gear.Job(name, json.dumps(data), unique=desc_uuid)
self.meta_jobs[desc_uuid] = desc_job
self.log.debug("Submitting describe job: %s", desc_job)
self.gearman.submitJob(desc_job, precedence=gear.PRECEDENCE_LOW)
return True
def lookForLostBuilds(self):
self.log.debug("Looking for lost builds")
for build in self.builds.values():
if build.result:
# The build has finished, it will be removed
continue
job = build.__gearman_job
if not job.handle:
# The build hasn't been enqueued yet
continue
p = gear.Packet(gear.constants.REQ, gear.constants.GET_STATUS,
job.handle)
job.connection.sendPacket(p)

View File

@ -1,499 +0,0 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# So we can name this module "jenkins" and still load the "jenkins"
# system module
from __future__ import absolute_import
import json
import logging
import pprint
import threading
import time
import urllib # for extending jenkins lib
import urllib2 # for extending jenkins lib
from uuid import uuid4
import jenkins
from paste import httpserver
from webob import Request
from zuul.model import Build
# The amount of time we tolerate a change in build status without
# receiving a notification
JENKINS_GRACE_TIME = 60
class JenkinsCallback(threading.Thread):
log = logging.getLogger("zuul.JenkinsCallback")
def __init__(self, jenkins):
threading.Thread.__init__(self)
self.jenkins = jenkins
def run(self):
httpserver.serve(self.app, host='0.0.0.0', port='8001')
def app(self, environ, start_response):
request = Request(environ)
if request.path == '/jenkins_endpoint':
self.jenkins_endpoint(request)
start_response('200 OK', [('content-type', 'text/html')])
return ['Zuul good.']
elif request.path == '/status':
try:
ret = self.jenkins.sched.formatStatusHTML()
except:
self.log.exception("Exception formatting status:")
raise
start_response('200 OK', [('content-type', 'text/html')])
return [ret]
elif request.path == '/status.json':
try:
ret = self.jenkins.sched.formatStatusJSON()
except:
self.log.exception("Exception formatting status:")
raise
start_response('200 OK', [('content-type', 'application/json'),
('Access-Control-Allow-Origin', '*')])
return [ret]
else:
start_response('200 OK', [('content-type', 'text/html')])
return ['Zuul good.']
def jenkins_endpoint(self, request):
try:
data = json.loads(request.body)
except:
self.log.exception("Exception handling Jenkins notification:")
raise # let wsgi handler process the issue
if data:
self.log.debug("Received data from Jenkins: \n%s" %
(pprint.pformat(data)))
build = data.get('build')
if build:
phase = build.get('phase')
status = build.get('status')
url = build.get('full_url')
number = build.get('number')
params = build.get('parameters')
if params:
# UUID is deprecated in favor of ZUUL_UUID
uuid = params.get('ZUUL_UUID') or params.get('UUID')
if (status and url and uuid and phase and
phase == 'COMPLETED'):
self.jenkins.onBuildCompleted(uuid,
status,
url,
number)
if (phase and phase == 'STARTED'):
self.jenkins.onBuildStarted(uuid, url, number)
class JenkinsCleanup(threading.Thread):
""" A thread that checks to see if outstanding builds have
completed without reporting back. """
log = logging.getLogger("zuul.JenkinsCleanup")
def __init__(self, jenkins):
threading.Thread.__init__(self)
self.jenkins = jenkins
self.wake_event = threading.Event()
self._stopped = False
def stop(self):
self._stopped = True
self.wake_event.set()
def run(self):
while True:
self.wake_event.wait(180)
if self._stopped:
return
try:
self.jenkins.lookForLostBuilds()
except:
self.log.exception("Exception checking builds:")
STOP_BUILD = 'job/%(name)s/%(number)s/stop'
CANCEL_QUEUE = 'queue/item/%(number)s/cancelQueue'
BUILD_INFO = 'job/%(name)s/%(number)s/api/json?depth=0'
BUILD_DESCRIPTION = 'job/%(name)s/%(number)s/submitDescription'
DEBUG = False
class ExtendedJenkins(jenkins.Jenkins):
def jenkins_open(self, req):
'''
Utility routine for opening an HTTP request to a Jenkins server.
'''
try:
if self.auth:
req.add_header('Authorization', self.auth)
return urllib2.urlopen(req).read()
except urllib2.HTTPError, e:
if DEBUG:
print e.msg
print e.fp.read()
raise
def stop_build(self, name, number):
'''
Stop a running Jenkins build.
@param name: Name of Jenkins job
@type name: str
@param number: Jenkins build number for the job
@type number: int
'''
request = urllib2.Request(self.server + STOP_BUILD % locals())
self.jenkins_open(request)
def cancel_queue(self, number):
'''
Cancel a queued build.
@param number: Jenkins queue number for the build
@type number: int
'''
# Jenkins returns a 302 from this URL, unless Referer is not set,
# then you get a 404.
request = urllib2.Request(self.server + CANCEL_QUEUE % locals(),
urllib.urlencode({}),
headers={'Referer': self.server})
self.jenkins_open(request)
def get_build_info(self, name, number):
'''
Get information for a build.
@param name: Name of Jenkins job
@type name: str
@param number: Jenkins build number for the job
@type number: int
@return: dictionary
'''
request = urllib2.Request(self.server + BUILD_INFO % locals())
return json.loads(self.jenkins_open(request))
def set_build_description(self, name, number, description):
'''
Get information for a build.
@param name: Name of Jenkins job
@type name: str
@param number: Jenkins build number for the job
@type number: int
@param description: Bulid description to set
@type description: str
'''
params = urllib.urlencode({'description': description})
request = urllib2.Request(self.server + BUILD_DESCRIPTION % locals(),
params)
self.jenkins_open(request)
class Jenkins(object):
log = logging.getLogger("zuul.Jenkins")
launch_retry_timeout = 5
def __init__(self, config, sched):
self.sched = sched
self.builds = {}
server = config.get('jenkins', 'server')
user = config.get('jenkins', 'user')
apikey = config.get('jenkins', 'apikey')
self.jenkins = ExtendedJenkins(server, user, apikey)
self.callback_thread = JenkinsCallback(self)
self.callback_thread.start()
self.cleanup_thread = JenkinsCleanup(self)
self.cleanup_thread.start()
def stop(self):
self.cleanup_thread.stop()
self.cleanup_thread.join()
#TODO: remove dependent_changes
def launch(self, job, change, pipeline, dependent_changes=[]):
self.log.info("Launch job %s for change %s with dependent changes %s" %
(job, change, dependent_changes))
dependent_changes = dependent_changes[:]
dependent_changes.reverse()
uuid = str(uuid4().hex)
params = dict(UUID=uuid, # deprecated
ZUUL_UUID=uuid,
GERRIT_PROJECT=change.project.name, # deprecated
ZUUL_PROJECT=change.project.name)
params['ZUUL_PIPELINE'] = pipeline.name
if hasattr(change, 'refspec'):
changes_str = '^'.join(
['%s:%s:%s' % (c.project.name, c.branch, c.refspec)
for c in dependent_changes + [change]])
params['GERRIT_BRANCH'] = change.branch # deprecated
params['ZUUL_BRANCH'] = change.branch
params['GERRIT_CHANGES'] = changes_str # deprecated
params['ZUUL_CHANGES'] = changes_str
params['ZUUL_REF'] = ('refs/zuul/%s/%s' %
(change.branch,
change.current_build_set.ref))
params['ZUUL_COMMIT'] = change.current_build_set.commit
zuul_changes = ' '.join(['%s,%s' % (c.number, c.patchset)
for c in dependent_changes + [change]])
params['ZUUL_CHANGE_IDS'] = zuul_changes
params['ZUUL_CHANGE'] = str(change.number)
params['ZUUL_PATCHSET'] = str(change.patchset)
if hasattr(change, 'ref'):
params['GERRIT_REFNAME'] = change.ref # deprecated
params['ZUUL_REFNAME'] = change.ref
params['GERRIT_OLDREV'] = change.oldrev # deprecated
params['ZUUL_OLDREV'] = change.oldrev
params['GERRIT_NEWREV'] = change.newrev # deprecated
params['ZUUL_NEWREV'] = change.newrev
params['ZUUL_SHORT_OLDREV'] = change.oldrev[:7]
params['ZUUL_SHORT_NEWREV'] = change.newrev[:7]
params['ZUUL_REF'] = change.ref
params['ZUUL_COMMIT'] = change.newrev
# This is what we should be heading toward for parameters:
# required:
# ZUUL_UUID
# ZUUL_REF (/refs/zuul/..., /refs/tags/foo, master)
# ZUUL_COMMIT
# optional:
# ZUUL_PROJECT
# ZUUL_PIPELINE
# optional (changes only):
# ZUUL_BRANCH
# ZUUL_CHANGE
# ZUUL_CHANGE_IDS
# ZUUL_PATCHSET
# optional (ref updated only):
# ZUUL_OLDREV
# ZUUL_NEWREV
# ZUUL_SHORT_NEWREV
# ZUUL_SHORT_OLDREV
if callable(job.parameter_function):
job.parameter_function(change, params)
self.log.debug("Custom parameter function used for job %s, "
"change: %s, params: %s" % (job, change, params))
build = Build(job, uuid)
# We can get the started notification on another thread before
# this is done so we add the build even before we trigger the
# job on Jenkins.
self.builds[uuid] = build
# Sometimes Jenkins may erroneously respond with a 404. Handle
# that by retrying for 30 seconds.
launched = False
errored = False
for count in range(6):
try:
self.jenkins.build_job(job.name, parameters=params)
launched = True
break
except:
errored = True
self.log.exception("Exception launching build %s for "
"job %s for change %s (will retry):" %
(build, job, change))
time.sleep(self.launch_retry_timeout)
if errored:
if launched:
self.log.error("Finally able to launch %s" % build)
else:
self.log.error("Unable to launch %s, even after retrying, "
"declaring lost" % build)
# To keep the queue moving, declare this as a lost build
# so that the change will get dropped.
self.onBuildCompleted(build.uuid, 'LOST', None, None)
return build
def findBuildInQueue(self, build):
for item in self.jenkins.get_queue_info():
if 'actions' not in item:
continue
for action in item['actions']:
if 'parameters' not in action:
continue
parameters = action['parameters']
for param in parameters:
# UUID is deprecated in favor of ZUUL_UUID
if ((param['name'] in ['ZUUL_UUID', 'UUID'])
and build.uuid == param['value']):
return item
return False
def cancel(self, build):
self.log.info("Cancel build %s for job %s" % (build, build.job))
if build.number:
self.log.debug("Build %s has already started" % build)
self.jenkins.stop_build(build.job.name, build.number)
self.log.debug("Canceled running build %s" % build)
return
else:
self.log.debug("Build %s has not started yet" % build)
self.log.debug("Looking for build %s in queue" % build)
item = self.findBuildInQueue(build)
if item:
self.log.debug("Found queue item %s for build %s" %
(item['id'], build))
try:
self.jenkins.cancel_queue(item['id'])
self.log.debug("Canceled queue item %s for build %s" %
(item['id'], build))
return
except:
self.log.exception("Exception canceling queue item %s "
"for build %s" % (item['id'], build))
self.log.debug("Still unable to find build %s to cancel" % build)
if build.number:
self.log.debug("Build %s has just started" % build)
self.jenkins.stop_build(build.job.name, build.number)
self.log.debug("Canceled just running build %s" % build)
else:
self.log.error("Build %s has not started but "
"was not found in queue" % build)
def setBuildDescription(self, build, description):
if not build.number:
return
try:
self.jenkins.set_build_description(build.job.name,
build.number,
description)
except:
self.log.exception("Exception setting build description for %s" %
build)
def onBuildCompleted(self, uuid, status, url, number):
self.log.info("Build %s #%s complete, status %s" %
(uuid, number, status))
build = self.builds.get(uuid)
if build:
self.log.debug("Found build %s" % build)
del self.builds[uuid]
if url:
build.url = url
build.result = status
build.number = number
self.sched.onBuildCompleted(build)
else:
self.log.error("Unable to find build %s" % uuid)
def onBuildStarted(self, uuid, url, number):
self.log.info("Build %s #%s started, url: %s" % (uuid, number, url))
build = self.builds.get(uuid)
if build:
self.log.debug("Found build %s" % build)
build.url = url
build.number = number
self.sched.onBuildStarted(build)
else:
self.log.error("Unable to find build %s" % uuid)
def lookForLostBuilds(self):
self.log.debug("Looking for lost builds")
lostbuilds = []
for build in self.builds.values():
if build.result:
# The build has finished, it will be removed
continue
if build.number:
# The build has started; see if it has finished
try:
info = self.jenkins.get_build_info(build.job.name,
build.number)
if hasattr(build, '_jenkins_missing_build_info'):
del build._jenkins_missing_build_info
except:
self.log.exception("Exception getting info for %s" % build)
# We can't look it up in jenkins. That could be transient.
# If it keeps up, assume it's permanent.
if hasattr(build, '_jenkins_missing_build_info'):
missing_time = build._jenkins_missing_build_info
if time.time() - missing_time > JENKINS_GRACE_TIME:
self.log.debug("Lost build %s because "
"it has started but "
"the build URL is not working" %
build)
lostbuilds.append(build)
else:
build._jenkins_missing_build_info = time.time()
continue
if not info:
self.log.debug("Lost build %s because "
"it started but "
"info can not be retreived" % build)
lostbuilds.append(build)
continue
if info['building']:
# It has not finished.
continue
if info['duration'] == 0:
# Possible jenkins bug -- not building, but no duration
self.log.debug("Possible jenkins bug with build %s: "
"not building, but no duration is set "
"Build info %s:" % (build,
pprint.pformat(info)))
continue
finish_time = (info['timestamp'] + info['duration']) / 1000
if time.time() - finish_time > JENKINS_GRACE_TIME:
self.log.debug("Lost build %s because "
"it finished more than 5 minutes ago. "
"Build info %s:" % (build,
pprint.pformat(info)))
lostbuilds.append(build)
continue
# Give it more time
else:
# The build has not started
if time.time() - build.launch_time < JENKINS_GRACE_TIME:
# It just started, give it a bit
continue
info = self.findBuildInQueue(build)
if info:
# It's in the queue. All good.
continue
if build.number:
# We just got notified it started
continue
# It may have just started. If we keep ending up here,
# assume the worst.
if hasattr(build, '_jenkins_missing_from_queue'):
missing_time = build._jenkins_missing_from_queue
if time.time() - missing_time > JENKINS_GRACE_TIME:
self.log.debug("Lost build %s because "
"it has not started and "
"is not in the queue" % build)
lostbuilds.append(build)
continue
else:
build._jenkins_missing_from_queue = time.time()
for build in lostbuilds:
self.log.error("Declaring %s lost" % build)
self.onBuildCompleted(build.uuid, 'LOST', None, None)

View File

@ -30,6 +30,9 @@ class LayoutSchema(object):
manager = v.Any('IndependentPipelineManager',
'DependentPipelineManager')
precedence = v.Any('normal', 'low', 'high')
variable_dict = v.Schema({}, extra=True)
trigger = {v.Required('event'): toList(v.Any('patchset-created',
@ -47,6 +50,7 @@ class LayoutSchema(object):
pipeline = {v.Required('name'): str,
v.Required('manager'): manager,
'precedence': precedence,
'description': str,
'success-message': str,
'failure-message': str,

View File

@ -123,7 +123,7 @@ class Merger(object):
log = logging.getLogger("zuul.Merger")
def __init__(self, trigger, working_root, push_refs, sshkey, email,
username):
username):
self.trigger = trigger
self.repos = {}
self.working_root = working_root
@ -199,7 +199,7 @@ class Merger(object):
return False
return commit
def mergeChanges(self, changes, target_ref=None, mode=None):
def mergeChanges(self, items, target_ref=None, mode=None):
# Merge shortcuts:
# if this is the only change just merge it against its branch.
# elif there are changes ahead of us that are from the same project and
@ -209,27 +209,27 @@ class Merger(object):
# Shortcuts assume some external entity is checking whether or not
# changes from other projects can merge.
commit = False
change = changes[-1]
sibling_filter = lambda c: (c.project == change.project and
c.branch == change.branch)
sibling_changes = filter(sibling_filter, changes)
item = items[-1]
sibling_filter = lambda i: (i.change.project == item.change.project and
i.change.branch == item.change.branch)
sibling_items = filter(sibling_filter, items)
# Only current change to merge against tip of change.branch
if len(sibling_changes) == 1:
repo = self.getRepo(change.project)
if len(sibling_items) == 1:
repo = self.getRepo(item.change.project)
# we need to reset here in order to call getBranchHead
try:
repo.reset()
except:
self.log.exception("Unable to reset repo %s" % repo)
return False
commit = self._mergeChange(change,
repo.getBranchHead(change.branch),
commit = self._mergeChange(item.change,
repo.getBranchHead(item.change.branch),
target_ref=target_ref, mode=mode)
# Sibling changes exist. Merge current change against newest sibling.
elif (len(sibling_changes) >= 2 and
sibling_changes[-2].current_build_set.commit):
last_change = sibling_changes[-2].current_build_set.commit
commit = self._mergeChange(change, last_change,
elif (len(sibling_items) >= 2 and
sibling_items[-2].current_build_set.commit):
last_commit = sibling_items[-2].current_build_set.commit
commit = self._mergeChange(item.change, last_commit,
target_ref=target_ref, mode=mode)
# Either change did not merge or we did not need to merge as there were
# previous merge conflicts.
@ -237,37 +237,39 @@ class Merger(object):
return commit
project_branches = []
for c in reversed(changes):
for i in reversed(items):
# Here we create all of the necessary zuul refs and potentially
# push them back to Gerrit.
if (c.project, c.branch) in project_branches:
if (i.change.project, i.change.branch) in project_branches:
continue
repo = self.getRepo(c.project)
if c.project != change.project or c.branch != change.branch:
repo = self.getRepo(i.change.project)
if (i.change.project != item.change.project or
i.change.branch != item.change.branch):
# Create a zuul ref for all dependent changes project
# branch combinations as this is the ref that jenkins will
# use to test. The ref for change has already been set so
# we skip it here.
try:
zuul_ref = c.branch + '/' + target_ref
repo.createZuulRef(zuul_ref, c.current_build_set.commit)
zuul_ref = i.change.branch + '/' + target_ref
repo.createZuulRef(zuul_ref, i.current_build_set.commit)
except:
self.log.exception("Unable to set zuul ref %s for "
"change %s" % (zuul_ref, c))
"change %s" % (zuul_ref, i.change))
return False
if self.push_refs:
# Push the results upstream to the zuul ref after
# they are created.
ref = 'refs/zuul/' + c.branch + '/' + target_ref
ref = 'refs/zuul/' + i.change.branch + '/' + target_ref
try:
repo.push(ref, ref)
complete = self.trigger.waitForRefSha(c.project, ref)
complete = self.trigger.waitForRefSha(i.change.project,
ref)
except:
self.log.exception("Unable to push %s" % ref)
return False
if not complete:
self.log.error("Ref %s did not show up in repo" % ref)
return False
project_branches.append((c.project, c.branch))
project_branches.append((i.change.project, i.change.branch))
return commit

View File

@ -22,6 +22,17 @@ MERGE_ALWAYS = 2
MERGE_IF_NECESSARY = 3
CHERRY_PICK = 4
PRECEDENCE_NORMAL = 0
PRECEDENCE_LOW = 1
PRECEDENCE_HIGH = 2
PRECEDENCE_MAP = {
None: PRECEDENCE_NORMAL,
'low': PRECEDENCE_LOW,
'normal': PRECEDENCE_NORMAL,
'high': PRECEDENCE_HIGH,
}
class Pipeline(object):
"""A top-level pipeline such as check, gate, post, etc."""
@ -34,6 +45,7 @@ class Pipeline(object):
self.job_trees = {} # project -> JobTree
self.manager = None
self.queues = []
self.precedence = PRECEDENCE_NORMAL
def __repr__(self):
return '<Pipeline %s>' % self.name
@ -68,20 +80,20 @@ class Pipeline(object):
return []
return changeish.filterJobs(tree.getJobs())
def _findJobsToRun(self, job_trees, changeish):
def _findJobsToRun(self, job_trees, item):
torun = []
if changeish.change_ahead:
if item.item_ahead:
# Only run jobs if any 'hold' jobs on the change ahead
# have completed successfully.
if self.isHoldingFollowingChanges(changeish.change_ahead):
if self.isHoldingFollowingChanges(item.item_ahead):
return []
for tree in job_trees:
job = tree.job
result = None
if job:
if not job.changeMatches(changeish):
if not job.changeMatches(item.change):
continue
build = changeish.current_build_set.getBuild(job.name)
build = item.current_build_set.getBuild(job.name)
if build:
result = build.result
else:
@ -91,93 +103,93 @@ class Pipeline(object):
# If there is no job, this is a null job tree, and we should
# run all of its jobs.
if result == 'SUCCESS' or not job:
torun.extend(self._findJobsToRun(tree.job_trees, changeish))
torun.extend(self._findJobsToRun(tree.job_trees, item))
return torun
def findJobsToRun(self, changeish):
tree = self.getJobTree(changeish.project)
def findJobsToRun(self, item):
tree = self.getJobTree(item.change.project)
if not tree:
return []
return self._findJobsToRun(tree.job_trees, changeish)
return self._findJobsToRun(tree.job_trees, item)
def areAllJobsComplete(self, changeish):
for job in self.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
def areAllJobsComplete(self, item):
for job in self.getJobs(item.change):
build = item.current_build_set.getBuild(job.name)
if not build or not build.result:
return False
return True
def didAllJobsSucceed(self, changeish):
for job in self.getJobs(changeish):
def didAllJobsSucceed(self, item):
for job in self.getJobs(item.change):
if not job.voting:
continue
build = changeish.current_build_set.getBuild(job.name)
build = item.current_build_set.getBuild(job.name)
if not build:
return False
if build.result != 'SUCCESS':
return False
return True
def didAnyJobFail(self, changeish):
for job in self.getJobs(changeish):
def didAnyJobFail(self, item):
for job in self.getJobs(item.change):
if not job.voting:
continue
build = changeish.current_build_set.getBuild(job.name)
build = item.current_build_set.getBuild(job.name)
if build and build.result and (build.result != 'SUCCESS'):
return True
return False
def isHoldingFollowingChanges(self, changeish):
for job in self.getJobs(changeish):
def isHoldingFollowingChanges(self, item):
for job in self.getJobs(item.change):
if not job.hold_following_changes:
continue
build = changeish.current_build_set.getBuild(job.name)
build = item.current_build_set.getBuild(job.name)
if not build:
return True
if build.result != 'SUCCESS':
return True
if not changeish.change_ahead:
if not item.item_ahead:
return False
return self.isHoldingFollowingChanges(changeish.change_ahead)
return self.isHoldingFollowingChanges(item.item_ahead)
def setResult(self, changeish, build):
def setResult(self, item, build):
if build.result != 'SUCCESS':
# Get a JobTree from a Job so we can find only its dependent jobs
root = self.getJobTree(changeish.project)
root = self.getJobTree(item.change.project)
tree = root.getJobTreeForJob(build.job)
for job in tree.getJobs():
fakebuild = Build(job, None)
fakebuild.result = 'SKIPPED'
changeish.addBuild(fakebuild)
item.addBuild(fakebuild)
def setUnableToMerge(self, changeish):
changeish.current_build_set.unable_to_merge = True
root = self.getJobTree(changeish.project)
def setUnableToMerge(self, item):
item.current_build_set.unable_to_merge = True
root = self.getJobTree(item.change.project)
for job in root.getJobs():
fakebuild = Build(job, None)
fakebuild.result = 'SKIPPED'
changeish.addBuild(fakebuild)
item.addBuild(fakebuild)
def setDequeuedNeedingChange(self, changeish):
changeish.dequeued_needing_change = True
root = self.getJobTree(changeish.project)
def setDequeuedNeedingChange(self, item):
item.dequeued_needing_change = True
root = self.getJobTree(item.change.project)
for job in root.getJobs():
fakebuild = Build(job, None)
fakebuild.result = 'SKIPPED'
changeish.addBuild(fakebuild)
item.addBuild(fakebuild)
def getChangesInQueue(self):
changes = []
for shared_queue in self.queues:
changes.extend(shared_queue.queue)
changes.extend([x.change for x in shared_queue.queue])
return changes
def getAllChanges(self):
changes = []
def getAllItems(self):
items = []
for shared_queue in self.queues:
changes.extend(shared_queue.queue)
changes.extend(shared_queue.severed_heads)
return changes
items.extend(shared_queue.queue)
items.extend(shared_queue.severed_heads)
return items
def formatStatusHTML(self):
ret = ''
@ -201,14 +213,15 @@ class Pipeline(object):
j_queue['heads'] = []
for head in queue.getHeads():
j_changes = []
c = head
while c:
j_changes.append(self.formatChangeJSON(c))
c = c.change_behind
e = head
while e:
j_changes.append(self.formatItemJSON(e))
e = e.item_behind
j_queue['heads'].append(j_changes)
return j_pipeline
def formatStatus(self, changeish, indent=0, html=False):
def formatStatus(self, item, indent=0, html=False):
changeish = item.change
indent_str = ' ' * indent
ret = ''
if html and hasattr(changeish, 'url') and changeish.url is not None:
@ -222,7 +235,7 @@ class Pipeline(object):
changeish.project.name,
changeish._id())
for job in self.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
build = item.current_build_set.getBuild(job.name)
if build:
result = build.result
else:
@ -241,12 +254,13 @@ class Pipeline(object):
job_name = '<a href="%s">%s</a>' % (url, job_name)
ret += '%s %s: %s%s' % (indent_str, job_name, result, voting)
ret += '\n'
if changeish.change_behind:
if item.item_behind:
ret += '%sFollowed by:\n' % (indent_str)
ret += self.formatStatus(changeish.change_behind, indent + 2, html)
ret += self.formatStatus(item.item_behind, indent + 2, html)
return ret
def formatChangeJSON(self, changeish):
def formatItemJSON(self, item):
changeish = item.change
ret = {}
if hasattr(changeish, 'url') and changeish.url is not None:
ret['url'] = changeish.url
@ -254,10 +268,10 @@ class Pipeline(object):
ret['url'] = None
ret['id'] = changeish._id()
ret['project'] = changeish.project.name
ret['enqueue_time'] = int(changeish.enqueue_time * 1000)
ret['enqueue_time'] = int(item.enqueue_time * 1000)
ret['jobs'] = []
for job in self.getJobs(changeish):
build = changeish.current_build_set.getBuild(job.name)
build = item.current_build_set.getBuild(job.name)
if build:
result = build.result
url = build.url
@ -303,27 +317,32 @@ class ChangeQueue(object):
self._jobs |= set(self.pipeline.getJobTree(project).getJobs())
def enqueueChange(self, change):
item = QueueItem(self.pipeline, change)
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
def enqueueItem(self, item):
if self.dependent and self.queue:
change.change_ahead = self.queue[-1]
change.change_ahead.change_behind = change
self.queue.append(change)
change.enqueue_time = time.time()
item.item_ahead = self.queue[-1]
item.item_ahead.item_behind = item
self.queue.append(item)
def dequeueChange(self, change):
if change in self.queue:
self.queue.remove(change)
if change in self.severed_heads:
self.severed_heads.remove(change)
if change.change_ahead:
change.change_ahead.change_behind = change.change_behind
if change.change_behind:
change.change_behind.change_ahead = change.change_ahead
change.change_ahead = None
change.change_behind = None
change.dequeue_time = time.time()
def dequeueItem(self, item):
if item in self.queue:
self.queue.remove(item)
if item in self.severed_heads:
self.severed_heads.remove(item)
if item.item_ahead:
item.item_ahead.item_behind = item.item_behind
if item.item_behind:
item.item_behind.item_ahead = item.item_ahead
item.item_ahead = None
item.item_behind = None
item.dequeue_time = time.time()
def addSeveredHead(self, change):
self.severed_heads.append(change)
def addSeveredHead(self, item):
self.severed_heads.append(item)
def mergeChangeQueue(self, other):
for project in other.projects:
@ -458,14 +477,16 @@ class Build(object):
self.launch_time = time.time()
self.start_time = None
self.end_time = None
self.parameters = {}
self.fraction_complete = None
def __repr__(self):
return '<Build %s of %s>' % (self.uuid, self.job.name)
class BuildSet(object):
def __init__(self, change):
self.change = change
def __init__(self, item):
self.item = item
self.other_changes = []
self.builds = {}
self.result = None
@ -480,10 +501,10 @@ class BuildSet(object):
# so we don't know what the other changes ahead will be
# until jobs start.
if not self.other_changes:
next_change = self.change.change_ahead
while next_change:
self.other_changes.append(next_change)
next_change = next_change.change_ahead
next_item = self.item.item_ahead
while next_item:
self.other_changes.append(next_item.change)
next_item = next_item.item_ahead
if not self.ref:
self.ref = 'Z' + uuid4().hex
@ -500,29 +521,21 @@ class BuildSet(object):
return [self.builds.get(x) for x in keys]
class Changeish(object):
"""Something like a change; either a change or a ref"""
is_reportable = False
class QueueItem(object):
"""A changish inside of a Pipeline queue"""
def __init__(self, project):
self.project = project
def __init__(self, pipeline, change):
self.pipeline = pipeline
self.change = change # a changeish
self.build_sets = []
self.dequeued_needing_change = False
self.current_build_set = BuildSet(self)
self.build_sets.append(self.current_build_set)
self.change_ahead = None
self.change_behind = None
self.item_ahead = None
self.item_behind = None
self.enqueue_time = None
self.dequeue_time = None
def equals(self, other):
raise NotImplementedError()
def isUpdateOf(self, other):
raise NotImplementedError()
def filterJobs(self, jobs):
return filter(lambda job: job.changeMatches(self), jobs)
self.reported = False
def resetAllBuilds(self):
old = self.current_build_set
@ -535,6 +548,29 @@ class Changeish(object):
def addBuild(self, build):
self.current_build_set.addBuild(build)
def setReportedResult(self, result):
self.current_build_set.result = result
class Changeish(object):
"""Something like a change; either a change or a ref"""
is_reportable = False
def __init__(self, project):
self.project = project
def equals(self, other):
raise NotImplementedError()
def isUpdateOf(self, other):
raise NotImplementedError()
def filterJobs(self, jobs):
return filter(lambda job: job.changeMatches(self), jobs)
def getRelatedChanges(self):
return set()
class Change(Changeish):
is_reportable = True
@ -548,12 +584,12 @@ class Change(Changeish):
self.refspec = None
self.files = []
self.reported = False
self.needs_change = None
self.needed_by_changes = []
self.is_current_patchset = True
self.can_merge = False
self.is_merged = False
self.failed_to_merge = False
def _id(self):
return '%s,%s' % (self.number, self.patchset)
@ -568,12 +604,21 @@ class Change(Changeish):
def isUpdateOf(self, other):
if ((hasattr(other, 'number') and self.number == other.number) and
(hasattr(other, 'patchset') and self.patchset > other.patchset)):
(hasattr(other, 'patchset') and
self.patchset is not None and
other.patchset is not None and
int(self.patchset) > int(other.patchset))):
return True
return False
def setReportedResult(self, result):
self.current_build_set.result = result
def getRelatedChanges(self):
related = set()
if self.needs_change:
related.add(self.needs_change)
for c in self.needed_by_changes:
related.add(c)
related.update(c.getRelatedChanges())
return related
class Ref(Changeish):
@ -650,10 +695,6 @@ class TriggerEvent(object):
return ret
def getChange(self, project, trigger):
# TODO: make the scheduler deal with events (which may have
# changes) rather than changes so that we don't have to create
# "fake" changes for events that aren't associated with changes.
if self.change_number:
change = trigger.getChange(self.change_number, self.patch_number)
if self.ref:
@ -762,3 +803,27 @@ class EventFilter(object):
if not matches_approval:
return False
return True
class Layout(object):
def __init__(self):
self.projects = {}
self.pipelines = {}
self.jobs = {}
self.metajobs = {}
def getJob(self, name):
if name in self.jobs:
return self.jobs[name]
job = Job(name)
if name.startswith('^'):
# This is a meta-job
regex = re.compile(name)
self.metajobs[regex] = job
else:
# Apply attributes from matching meta-jobs
for regex, metajob in self.metajobs.items():
if regex.match(name):
job.copy(metajob)
self.jobs[name] = job
return job

View File

@ -1,369 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utilities with minimum-depends for use in setup.py
"""
from __future__ import print_function
import email
import os
import re
import subprocess
import sys
from setuptools.command import sdist
def parse_mailmap(mailmap='.mailmap'):
mapping = {}
if os.path.exists(mailmap):
with open(mailmap, 'r') as fp:
for l in fp:
try:
canonical_email, alias = re.match(
r'[^#]*?(<.+>).*(<.+>).*', l).groups()
except AttributeError:
continue
mapping[alias] = canonical_email
return mapping
def _parse_git_mailmap(git_dir, mailmap='.mailmap'):
mailmap = os.path.join(os.path.dirname(git_dir), mailmap)
return parse_mailmap(mailmap)
def canonicalize_emails(changelog, mapping):
"""Takes in a string and an email alias mapping and replaces all
instances of the aliases in the string with their real email.
"""
for alias, email_address in mapping.iteritems():
changelog = changelog.replace(alias, email_address)
return changelog
# Get requirements from the first file that exists
def get_reqs_from_files(requirements_files):
for requirements_file in requirements_files:
if os.path.exists(requirements_file):
with open(requirements_file, 'r') as fil:
return fil.read().split('\n')
return []
def parse_requirements(requirements_files=['requirements.txt',
'tools/pip-requires']):
requirements = []
for line in get_reqs_from_files(requirements_files):
# For the requirements list, we need to inject only the portion
# after egg= so that distutils knows the package it's looking for
# such as:
# -e git://github.com/openstack/nova/master#egg=nova
if re.match(r'\s*-e\s+', line):
requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
line))
# such as:
# http://github.com/openstack/nova/zipball/master#egg=nova
elif re.match(r'\s*https?:', line):
requirements.append(re.sub(r'\s*https?:.*#egg=(.*)$', r'\1',
line))
# -f lines are for index locations, and don't get used here
elif re.match(r'\s*-f\s+', line):
pass
# argparse is part of the standard library starting with 2.7
# adding it to the requirements list screws distro installs
elif line == 'argparse' and sys.version_info >= (2, 7):
pass
else:
requirements.append(line)
return requirements
def parse_dependency_links(requirements_files=['requirements.txt',
'tools/pip-requires']):
dependency_links = []
# dependency_links inject alternate locations to find packages listed
# in requirements
for line in get_reqs_from_files(requirements_files):
# skip comments and blank lines
if re.match(r'(\s*#)|(\s*$)', line):
continue
# lines with -e or -f need the whole line, minus the flag
if re.match(r'\s*-[ef]\s+', line):
dependency_links.append(re.sub(r'\s*-[ef]\s+', '', line))
# lines that are only urls can go in unmolested
elif re.match(r'\s*https?:', line):
dependency_links.append(line)
return dependency_links
def _run_shell_command(cmd, throw_on_error=False):
if os.name == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
else:
output = subprocess.Popen(["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out = output.communicate()
if output.returncode and throw_on_error:
raise Exception("%s returned %d" % cmd, output.returncode)
if len(out) == 0:
return None
if len(out[0].strip()) == 0:
return None
return out[0].strip()
def _get_git_directory():
parent_dir = os.path.dirname(__file__)
while True:
git_dir = os.path.join(parent_dir, '.git')
if os.path.exists(git_dir):
return git_dir
parent_dir, child = os.path.split(parent_dir)
if not child: # reached to root dir
return None
def write_git_changelog():
"""Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog'
git_dir = _get_git_directory()
if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'):
if git_dir:
git_log_cmd = 'git --git-dir=%s log' % git_dir
changelog = _run_shell_command(git_log_cmd)
mailmap = _parse_git_mailmap(git_dir)
with open(new_changelog, "w") as changelog_file:
changelog_file.write(canonicalize_emails(changelog, mailmap))
else:
open(new_changelog, 'w').close()
def generate_authors():
"""Create AUTHORS file using git commits."""
jenkins_email = 'jenkins@review.(openstack|stackforge).org'
old_authors = 'AUTHORS.in'
new_authors = 'AUTHORS'
git_dir = _get_git_directory()
if not os.getenv('SKIP_GENERATE_AUTHORS'):
if git_dir:
# don't include jenkins email address in AUTHORS file
git_log_cmd = ("git --git-dir=" + git_dir +
" log --format='%aN <%aE>' | sort -u | "
"egrep -v '" + jenkins_email + "'")
changelog = _run_shell_command(git_log_cmd)
signed_cmd = ("git --git-dir=" + git_dir +
" log | grep -i Co-authored-by: | sort -u")
signed_entries = _run_shell_command(signed_cmd)
if signed_entries:
new_entries = "\n".join(
[signed.split(":", 1)[1].strip()
for signed in signed_entries.split("\n") if signed])
changelog = "\n".join((changelog, new_entries))
mailmap = _parse_git_mailmap(git_dir)
with open(new_authors, 'w') as new_authors_fh:
new_authors_fh.write(canonicalize_emails(changelog, mailmap))
if os.path.exists(old_authors):
with open(old_authors, "r") as old_authors_fh:
new_authors_fh.write('\n' + old_authors_fh.read())
else:
open(new_authors, 'w').close()
_rst_template = """%(heading)s
%(underline)s
.. automodule:: %(module)s
:members:
:undoc-members:
:show-inheritance:
"""
def get_cmdclass():
"""Return dict of commands to run from setup.py."""
cmdclass = dict()
def _find_modules(arg, dirname, files):
for filename in files:
if filename.endswith('.py') and filename != '__init__.py':
arg["%s.%s" % (dirname.replace('/', '.'),
filename[:-3])] = True
class LocalSDist(sdist.sdist):
"""Builds the ChangeLog and Authors files from VC first."""
def run(self):
write_git_changelog()
generate_authors()
# sdist.sdist is an old style class, can't use super()
sdist.sdist.run(self)
cmdclass['sdist'] = LocalSDist
# If Sphinx is installed on the box running setup.py,
# enable setup.py to build the documentation, otherwise,
# just ignore it
try:
from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc):
builders = ['html', 'man']
def generate_autoindex(self):
print("**Autodocumenting from %s" % os.path.abspath(os.curdir))
modules = {}
option_dict = self.distribution.get_option_dict('build_sphinx')
source_dir = os.path.join(option_dict['source_dir'][1], 'api')
if not os.path.exists(source_dir):
os.makedirs(source_dir)
for pkg in self.distribution.packages:
if '.' not in pkg:
os.path.walk(pkg, _find_modules, modules)
module_list = modules.keys()
module_list.sort()
autoindex_filename = os.path.join(source_dir, 'autoindex.rst')
with open(autoindex_filename, 'w') as autoindex:
autoindex.write(""".. toctree::
:maxdepth: 1
""")
for module in module_list:
output_filename = os.path.join(source_dir,
"%s.rst" % module)
heading = "The :mod:`%s` Module" % module
underline = "=" * len(heading)
values = dict(module=module, heading=heading,
underline=underline)
print("Generating %s" % output_filename)
with open(output_filename, 'w') as output_file:
output_file.write(_rst_template % values)
autoindex.write(" %s.rst\n" % module)
def run(self):
if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex()
for builder in self.builders:
self.builder = builder
self.finalize_options()
self.project = self.distribution.get_name()
self.version = self.distribution.get_version()
self.release = self.distribution.get_version()
BuildDoc.run(self)
class LocalBuildLatex(LocalBuildDoc):
builders = ['latex']
cmdclass['build_sphinx'] = LocalBuildDoc
cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError:
pass
return cmdclass
def _get_revno(git_dir):
"""Return the number of commits since the most recent tag.
We use git-describe to find this out, but if there are no
tags then we fall back to counting commits since the beginning
of time.
"""
describe = _run_shell_command(
"git --git-dir=%s describe --always" % git_dir)
if "-" in describe:
return describe.rsplit("-", 2)[-2]
# no tags found
revlist = _run_shell_command(
"git --git-dir=%s rev-list --abbrev-commit HEAD" % git_dir)
return len(revlist.splitlines())
def _get_version_from_git(pre_version):
"""Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions
if the current revision has no tag."""
git_dir = _get_git_directory()
if git_dir:
if pre_version:
try:
return _run_shell_command(
"git --git-dir=" + git_dir + " describe --exact-match",
throw_on_error=True).replace('-', '.')
except Exception:
sha = _run_shell_command(
"git --git-dir=" + git_dir + " log -n1 --pretty=format:%h")
return "%s.a%s.g%s" % (pre_version, _get_revno(git_dir), sha)
else:
return _run_shell_command(
"git --git-dir=" + git_dir + " describe --always").replace(
'-', '.')
return None
def _get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can."""
try:
pkg_info_file = open('PKG-INFO', 'r')
except (IOError, OSError):
return None
try:
pkg_info = email.message_from_file(pkg_info_file)
except email.MessageError:
return None
# Check to make sure we're in our own dir
if pkg_info.get('Name', None) != package_name:
return None
return pkg_info.get('Version', None)
def get_version(package_name, pre_version=None):
"""Get the version of the project. First, try getting it from PKG-INFO, if
it exists. If it does, that means we're in a distribution tarball or that
install has happened. Otherwise, if there is no PKG-INFO file, pull the
version from git.
We do not support setup.py version sanity in git archive tarballs, nor do
we support packagers directly sucking our git repo into theirs. We expect
that a source tarball be made from our git repo - or that if someone wants
to make a source tarball from a fork of our repo with additional tags in it
that they understand and desire the results of doing that.
"""
version = os.environ.get("OSLO_PACKAGE_VERSION", None)
if version:
return version
version = _get_version_from_pkg_info(package_name)
if version:
return version
version = _get_version_from_git(pre_version)
if version:
return version
raise Exception("Versioning for this project requires either an sdist"
" tarball, or access to an upstream git repository.")

View File

@ -1,94 +0,0 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utilities for consuming the version from pkg_resources.
"""
import pkg_resources
class VersionInfo(object):
def __init__(self, package):
"""Object that understands versioning for a package
:param package: name of the python package, such as glance, or
python-glanceclient
"""
self.package = package
self.release = None
self.version = None
self._cached_version = None
def __str__(self):
"""Make the VersionInfo object behave like a string."""
return self.version_string()
def __repr__(self):
"""Include the name."""
return "VersionInfo(%s:%s)" % (self.package, self.version_string())
def _get_version_from_pkg_resources(self):
"""Get the version of the package from the pkg_resources record
associated with the package."""
try:
requirement = pkg_resources.Requirement.parse(self.package)
provider = pkg_resources.get_provider(requirement)
return provider.version
except pkg_resources.DistributionNotFound:
# The most likely cause for this is running tests in a tree
# produced from a tarball where the package itself has not been
# installed into anything. Revert to setup-time logic.
from zuul.openstack.common import setup
return setup.get_version(self.package)
def release_string(self):
"""Return the full version of the package including suffixes indicating
VCS status.
"""
if self.release is None:
self.release = self._get_version_from_pkg_resources()
return self.release
def version_string(self):
"""Return the short version minus any alpha/beta tags."""
if self.version is None:
parts = []
for part in self.release_string().split('.'):
if part[0].isdigit():
parts.append(part)
else:
break
self.version = ".".join(parts)
return self.version
# Compatibility functions
canonical_version_string = version_string
version_string_with_vcs = release_string
def cached_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested
"""
if not self._cached_version:
self._cached_version = "%s%s" % (prefix,
self.version_string())
return self._cached_version

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,7 @@ class GerritEventConnector(threading.Thread):
def __init__(self, gerrit, sched):
super(GerritEventConnector, self).__init__()
self.daemon = True
self.gerrit = gerrit
self.sched = sched
self._stopped = False
@ -80,6 +81,14 @@ class GerritEventConnector(threading.Thread):
Can not get account information." % event.type)
event.account = None
if event.change_number:
# Call getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
self.sched.trigger.getChange(event.change_number,
event.patch_number,
refresh=True)
self.sched.addEvent(event)
self.gerrit.eventDone()
@ -99,6 +108,7 @@ class Gerrit(object):
replication_retry_interval = 5
def __init__(self, config, sched):
self._change_cache = {}
self.sched = sched
self.config = config
self.server = config.get('gerrit', 'server')
@ -278,29 +288,54 @@ class Gerrit(object):
return False
return True
def getChange(self, number, patchset, changes=None):
self.log.info("Getting information for %s,%s" % (number, patchset))
if changes is None:
changes = {}
data = self.gerrit.query(number)
project = self.sched.projects[data['project']]
change = Change(project)
def maintainCache(self, relevant):
# This lets the user supply a list of change objects that are
# still in use. Anything in our cache that isn't in the supplied
# list should be same to remove from the cache.
remove = []
for key, change in self._change_cache.items():
if change not in relevant:
remove.append(key)
for key in remove:
del self._change_cache[key]
def getChange(self, number, patchset, refresh=False):
key = '%s,%s' % (number, patchset)
change = None
if key in self._change_cache:
change = self._change_cache.get(key)
if not refresh:
return change
if not change:
change = Change(None)
change.number = number
change.patchset = patchset
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
self.updateChange(change)
return change
def updateChange(self, change):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
data = self.gerrit.query(change.number)
change._data = data
change.number = number
change.patchset = patchset
change.project = project
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
change.project = self.sched.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']
max_ps = 0
for ps in data['patchSets']:
if ps['number'] == patchset:
if ps['number'] == change.patchset:
change.refspec = ps['ref']
for f in ps.get('files', []):
change.files.append(f['file'])
if int(ps['number']) > int(max_ps):
max_ps = ps['number']
if max_ps == patchset:
if max_ps == change.patchset:
change.is_current_patchset = True
else:
change.is_current_patchset = False
@ -311,20 +346,10 @@ class Gerrit(object):
# for dependencies.
return change
key = '%s,%s' % (number, patchset)
changes[key] = change
def cachedGetChange(num, ps):
key = '%s,%s' % (num, ps)
if key in changes:
return changes.get(key)
c = self.getChange(num, ps, changes)
return c
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = cachedGetChange(dep_num, dep_ps)
dep = self.getChange(dep_num, dep_ps)
if not dep.is_merged:
change.needs_change = dep
@ -332,7 +357,7 @@ class Gerrit(object):
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = cachedGetChange(dep_num, dep_ps)
dep = self.getChange(dep_num, dep_ps)
if not dep.is_merged and dep.is_current_patchset:
change.needed_by_changes.append(dep)

View File

@ -15,6 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from zuul.openstack.common import version as common_version
import pbr.version
version_info = common_version.VersionInfo('zuul')
version_info = pbr.version.VersionInfo('zuul')

51
zuul/webapp.py Normal file
View File

@ -0,0 +1,51 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
from paste import httpserver
from webob import Request
class WebApp(threading.Thread):
log = logging.getLogger("zuul.WebApp")
def __init__(self, scheduler, port=8001):
threading.Thread.__init__(self)
self.scheduler = scheduler
self.port = port
def run(self):
self.server = httpserver.serve(self.app, host='0.0.0.0',
port=self.port, start_loop=False)
self.server.serve_forever()
def stop(self):
self.server.server_close()
def app(self, environ, start_response):
request = Request(environ)
if request.path == '/status.json':
try:
ret = self.scheduler.formatStatusJSON()
except:
self.log.exception("Exception formatting status:")
raise
start_response('200 OK', [('content-type', 'application/json'),
('Access-Control-Allow-Origin', '*')])
return [ret]
else:
start_response('404 Not Found', [('content-type', 'text/plain')])
return ['Not found.']