Split the merger into a separate process

Connect it to Zuul via Gearman.  Any number of mergers may be
deployed.

Directly find the pipeline for a build when processing a result,
so that the procedure is roughly the same for build and merge
results.

The timer trigger currently requires the gerrit trigger also be
configured.  Make that explicit inside of the timer trigger so
that the scheduler API interaction with triggers is cleaner.

Change-Id: I69498813764753c97c426e42d17596c2ef1d87cf
This commit is contained in:
James E. Blair 2014-01-28 12:42:20 -08:00
parent a84f0e4179
commit 4076e2b432
19 changed files with 651 additions and 170 deletions

View File

@ -3,6 +3,15 @@ Since 2.0.0:
* The push_change_refs option which specified that Zuul refs should be
pushed to Gerrit has been removed.
* Git merge operations are now performed in a separate process. Run
at least one instance of the ``zuul-merger`` program which is now
included. Any number of Zuul-Mergers may be run in order to
distribute the work of speculatively merging changes into git and
serving the results to test workers. This system is designed to
scale out to many servers, but one instance may be co-located with
the Zuul server in smaller deployments. Several configuration
options have moved from the ``zuul`` section to ``merger``.
Since 1.3.0:
* The Jenkins launcher is replaced with Gearman launcher. An internal

View File

@ -20,6 +20,7 @@ Contents:
gating
triggers
merger
launchers
reporters
zuul

63
doc/source/merger.rst Normal file
View File

@ -0,0 +1,63 @@
:title: Merger
Merger
======
The Zuul Merger is a separate component which communicates with the
main Zuul server via Gearman. Its purpose is to speculatively merge
the changes for Zuul in preparation for testing. The resulting git
commits also must be served to the test workers, and the server(s)
running the Zuul Merger are expected to do this as well. Because both
of these tasks are resource intensive, any number of Zuul Mergers can
be run in parallel on distinct hosts.
Configuration
~~~~~~~~~~~~~
The Zuul Merger can read the same zuul.conf file as the main Zuul
server and requires the ``gearman``, ``gerrit``, ``merger``, and
``zuul`` sections (indicated fields only). Be sure the zuul_url is
set appropriately on each host that runs a zuul-merger.
Zuul References
~~~~~~~~~~~~~~~
As the DependentPipelineManager may combine several changes together
for testing when performing speculative execution, determining exactly
how the workspace should be set up when running a Job can be complex.
To alleviate this problem, Zuul performs merges itself, merging or
cherry-picking changes as required and identifies the result with a
Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
Preparing the workspace is then a simple matter of fetching that ref
and checking it out. The parameters that provide this information are
described in :ref:`launchers`.
These references need to be made available via a Git repository that
is available to Jenkins. This is accomplished by serving Zuul's Git
repositories directly.
Serving Zuul Git Repos
~~~~~~~~~~~~~~~~~~~~~~
Zuul maintains its own copies of any needed Git repositories in the
directory specified by ``git_dir`` in the ``merger`` section of
zuul.conf (by default, /var/lib/zuul/git). To directly serve Zuul's
Git repositories in order to provide Zuul refs for Jenkins, you can
configure Apache to do so using the following directives::
SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git
SetEnv GIT_HTTP_EXPORT_ALL
AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1
AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1
ScriptAlias /p/ /usr/lib/git-core/git-http-backend/
And set ``push_change_refs`` to ``false`` (the default) in the
``zuul`` section of zuul.conf.
Note that Zuul's Git repositories are not bare, which means they have
a working tree, and are not suitable for public consumption (for
instance, a clone will produce a repository in an unpredictable state
depending on what the state of Zuul's repository is when the clone
happens). They are, however, suitable for automated systems that
respond to Zuul triggers.

View File

@ -35,49 +35,6 @@ want Zuul to gate. For instance, you may want to grant ``Verified
be added to Gerrit. Zuul is very flexible and can take advantage of
those.
Zuul References
~~~~~~~~~~~~~~~
As the DependentPipelineManager may combine several changes together
for testing when performing speculative execution, determining exactly
how the workspace should be set up when running a Job can be complex.
To alleviate this problem, Zuul performs merges itself, merging or
cherry-picking changes as required and identifies the result with a
Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
Preparing the workspace is then a simple matter of fetching that ref
and checking it out. The parameters that provide this information are
described in :ref:`launchers`.
These references need to be made available via a Git repository that
is available to Jenkins. This is accomplished by serving Zuul's Git
repositories directly.
Serving Zuul Git Repos
""""""""""""""""""""""
Zuul maintains its own copies of any needed Git repositories in the
directory specified by ``git_dir`` in the ``zuul`` section of
zuul.conf (by default, /var/lib/zuul/git). To directly serve Zuul's
Git repositories in order to provide Zuul refs for Jenkins, you can
configure Apache to do so using the following directives::
SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git
SetEnv GIT_HTTP_EXPORT_ALL
AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1
AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1
ScriptAlias /p/ /usr/lib/git-core/git-http-backend/
And set ``push_change_refs`` to ``false`` (the default) in the
``zuul`` section of zuul.conf.
Note that Zuul's Git repositories are not bare, which means they have
a working tree, and are not suitable for public consumption (for
instance, a clone will produce a repository in an unpredictable state
depending on what the state of Zuul's repository is when the clone
happens). They are, however, suitable for automated systems that
respond to Zuul triggers.
Timer
-----

View File

@ -83,21 +83,49 @@ zuul
""""
**layout_config**
Path to layout config file.
Path to layout config file. Used by zuul-server only.
``layout_config=/etc/zuul/layout.yaml``
**log_config**
Path to log config file.
Path to log config file. Used by all Zuul commands.
``log_config=/etc/zuul/logging.yaml``
**pidfile**
Path to PID lock file.
Path to PID lock file. Used by all Zuul commands.
``pidfile=/var/run/zuul/zuul.pid``
**state_dir**
Path to directory that Zuul should save state to.
Path to directory that Zuul should save state to. Used by all Zuul
commands.
``state_dir=/var/lib/zuul``
**report_times**
Boolean value (``true`` or ``false``) that determines if Zuul should
include elapsed times for each job in the textual report. Used by
zuul-server only.
``report_times=true``
**status_url**
URL that will be posted in Zuul comments made to Gerrit changes when
starting jobs for a change. Used by zuul-server only.
``status_url=https://zuul.example.com/status``
**url_pattern**
If you are storing build logs external to the system that originally
ran jobs and wish to link to those logs when Zuul makes comments on
Gerrit changes for completed jobs this setting configures what the
URLs for those links should be. Used by zuul-server only.
``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}``
**job_name_in_report**
Boolean value (``true`` or ``false``) that indicates whether the
job name should be included in the report (normally only the URL
is included). Defaults to ``false``. Used by zuul-server only.
``job_name_in_report=true``
merger
""""""
**git_dir**
Directory that Zuul should clone local git repositories to.
``git_dir=/var/lib/zuul/git``
@ -110,32 +138,10 @@ zuul
Optional: Value to pass to `git config user.name`.
``git_user_name=zuul``
**report_times**
Boolean value (``true`` or ``false``) that determines if Zuul should
include elapsed times for each job in the textual report.
``report_times=true``
**status_url**
URL that will be posted in Zuul comments made to Gerrit changes when
starting jobs for a change.
``status_url=https://zuul.example.com/status``
**url_pattern**
If you are storing build logs external to the system that originally
ran jobs and wish to link to those logs when Zuul makes comments on
Gerrit changes for completed jobs this setting configures what the
URLs for those links should be.
``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}``
**job_name_in_report**
Boolean value (``true`` or ``false``) that indicates whether the
job name should be included in the report (normally only the URL
is included). Defaults to ``false``.
``job_name_in_report=true``
**zuul_url**
URL of Zuul's git repos, accessible to test workers.
Usually "http://zuul.example.com/p".
URL of this merger's git repos, accessible to test workers. Usually
"http://zuul.example.com/p" or "http://zuul-merger01.example.com/p"
depending on whether the merger is co-located with the Zuul server.
smtp
""""

View File

@ -15,10 +15,12 @@ layout_config=/etc/zuul/layout.yaml
log_config=/etc/zuul/logging.conf
pidfile=/var/run/zuul/zuul.pid
state_dir=/var/lib/zuul
status_url=https://jenkins.example.com/zuul/status
[merger]
git_dir=/var/lib/zuul/git
;git_user_email=zuul@example.com
;git_user_name=zuul
status_url=https://jenkins.example.com/zuul/status
zuul_url=http://zuul.example.com/p
[smtp]

View File

@ -22,6 +22,7 @@ warnerrors = True
[entry_points]
console_scripts =
zuul-server = zuul.cmd.server:main
zuul-merger = zuul.cmd.merger:main
zuul = zuul.cmd.client:main
[build_sphinx]

View File

@ -8,11 +8,13 @@ sshkey=none
[zuul]
layout_config=layout.yaml
url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
job_name_in_report=true
[merger]
git_dir=/tmp/zuul-test/git
git_user_email=zuul@example.com
git_user_name=zuul
url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
job_name_in_report=true
zuul_url=http://zuul.example.com/p
[smtp]

View File

@ -47,6 +47,8 @@ import zuul.webapp
import zuul.rpclistener
import zuul.rpcclient
import zuul.launcher.gearman
import zuul.merger.server
import zuul.merger.client
import zuul.reporter.gerrit
import zuul.reporter.smtp
import zuul.trigger.gerrit
@ -764,7 +766,7 @@ class TestScheduler(testtools.TestCase):
self.upstream_root = os.path.join(self.test_root, "upstream")
self.git_root = os.path.join(self.test_root, "git")
CONFIG.set('zuul', 'git_dir', self.git_root)
CONFIG.set('merger', 'git_dir', self.git_root)
if os.path.exists(self.test_root):
shutil.rmtree(self.test_root)
os.makedirs(self.test_root)
@ -804,6 +806,9 @@ class TestScheduler(testtools.TestCase):
self.worker.addServer('127.0.0.1', self.gearman_server.port)
self.gearman_server.worker = self.worker
self.merge_server = zuul.merger.server.MergeServer(self.config)
self.merge_server.start()
self.sched = zuul.scheduler.Scheduler()
def URLOpenerFactory(*args, **kw):
@ -812,6 +817,8 @@ class TestScheduler(testtools.TestCase):
urllib2.urlopen = URLOpenerFactory
self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
self.smtp_messages = []
@ -833,6 +840,7 @@ class TestScheduler(testtools.TestCase):
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.setLauncher(self.launcher)
self.sched.setMerger(self.merge_client)
self.sched.registerTrigger(self.gerrit)
self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
self.sched.registerTrigger(self.timer)
@ -873,6 +881,9 @@ class TestScheduler(testtools.TestCase):
def shutdown(self):
self.log.debug("Shutting down after tests")
self.launcher.stop()
self.merge_server.stop()
self.merge_server.join()
self.merge_client.stop()
self.worker.shutdown()
self.gearman_server.shutdown()
self.gerrit.stop()
@ -991,13 +1002,15 @@ class TestScheduler(testtools.TestCase):
done = True
for connection in self.gearman_server.active_connections:
if (connection.functions and
connection.client_id != 'Zuul RPC Listener'):
connection.client_id not in ['Zuul RPC Listener',
'Zuul Merger']):
done = False
if done:
break
time.sleep(0)
self.gearman_server.functions = set()
self.rpc.register()
self.merge_server.register()
def haveAllBuildsReported(self):
# See if Zuul is waiting on a meta job to complete
@ -1087,6 +1100,7 @@ class TestScheduler(testtools.TestCase):
if (self.sched.trigger_event_queue.empty() and
self.sched.result_event_queue.empty() and
self.fake_gerrit.event_queue.empty() and
not self.merge_client.build_sets and
self.areAllBuildsWaiting()):
self.sched.run_handler_lock.release()
self.worker.lock.release()
@ -2357,7 +2371,7 @@ class TestScheduler(testtools.TestCase):
# This test assumes the repo is already cloned; make sure it is
url = self.sched.triggers['gerrit'].getGitUrl(
self.sched.layout.projects['org/project1'])
self.sched.merger.addProject('org/project1', url)
self.merge_server.merger.addProject('org/project1', url)
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addPatchset(large=True)
path = os.path.join(self.upstream_root, "org/project1")
@ -2479,7 +2493,7 @@ class TestScheduler(testtools.TestCase):
def test_zuul_url_return(self):
"Test if ZUUL_URL is returning when zuul_url is set in zuul.conf"
self.assertTrue(self.sched.config.has_option('zuul', 'zuul_url'))
self.assertTrue(self.sched.config.has_option('merger', 'zuul_url'))
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')

153
zuul/cmd/merger.py Normal file
View File

@ -0,0 +1,153 @@
#!/usr/bin/env python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013-2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import ConfigParser
import daemon
import extras
# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
# instead it depends on lockfile-0.9.1 which uses pidfile.
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
import logging
import logging.config
import os
import sys
import signal
import traceback
# No zuul imports here because they pull in paramiko which must not be
# imported until after the daemonization.
# https://github.com/paramiko/paramiko/issues/59
# Similar situation with gear and statsd.
def stack_dump_handler(signum, frame):
signal.signal(signal.SIGUSR2, signal.SIG_IGN)
log_str = ""
for thread_id, stack_frame in sys._current_frames().items():
log_str += "Thread: %s\n" % thread_id
log_str += "".join(traceback.format_stack(stack_frame))
log = logging.getLogger("zuul.stack_dump")
log.debug(log_str)
signal.signal(signal.SIGUSR2, stack_dump_handler)
class Merger(object):
def __init__(self):
self.args = None
self.config = None
def parse_arguments(self):
parser = argparse.ArgumentParser(description='Zuul merge worker.')
parser.add_argument('-c', dest='config',
help='specify the config file')
parser.add_argument('-d', dest='nodaemon', action='store_true',
help='do not run as a daemon')
parser.add_argument('--version', dest='version', action='store_true',
help='show zuul version')
self.args = parser.parse_args()
def read_config(self):
self.config = ConfigParser.ConfigParser()
if self.args.config:
locations = [self.args.config]
else:
locations = ['/etc/zuul/zuul.conf',
'~/zuul.conf']
for fp in locations:
if os.path.exists(os.path.expanduser(fp)):
self.config.read(os.path.expanduser(fp))
return
raise Exception("Unable to locate config file in %s" % locations)
def setup_logging(self, section, parameter):
if self.config.has_option(section, parameter):
fp = os.path.expanduser(self.config.get(section, parameter))
if not os.path.exists(fp):
raise Exception("Unable to read logging config file at %s" %
fp)
logging.config.fileConfig(fp)
else:
logging.basicConfig(level=logging.DEBUG)
def exit_handler(self, signum, frame):
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
self.merger.stop()
self.merger.join()
def main(self):
# See comment at top of file about zuul imports
import zuul.merger.server
self.setup_logging('zuul', 'log_config')
self.merger = zuul.merger.server.MergeServer(self.config)
self.merger.start()
signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGUSR2, stack_dump_handler)
while True:
try:
signal.pause()
except KeyboardInterrupt:
print "Ctrl + C: asking merger to exit nicely...\n"
self.exit_handler(signal.SIGINT, None)
def main():
server = Merger()
server.parse_arguments()
if server.args.version:
from zuul.version import version_info as zuul_version_info
print "Zuul version: %s" % zuul_version_info.version_string()
sys.exit(0)
server.read_config()
if server.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
test_fn = os.path.join(state_dir, 'test')
try:
f = open(test_fn, 'w')
f.close()
os.unlink(test_fn)
except Exception:
print
print "Unable to write to state directory: %s" % state_dir
print
raise
if server.config.has_option('zuul', 'pidfile'):
pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile'))
else:
pid_fn = '/var/run/zuul/merger.pid'
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
if server.args.nodaemon:
server.main()
else:
with daemon.DaemonContext(pidfile=pid):
server.main()
if __name__ == "__main__":
sys.path.insert(0, '.')
main()

View File

@ -172,6 +172,7 @@ class Server(object):
# See comment at top of file about zuul imports
import zuul.scheduler
import zuul.launcher.gearman
import zuul.merger.client
import zuul.reporter.gerrit
import zuul.reporter.smtp
import zuul.trigger.gerrit
@ -188,6 +189,7 @@ class Server(object):
self.sched = zuul.scheduler.Scheduler()
gearman = zuul.launcher.gearman.Gearman(self.config, self.sched)
merger = zuul.merger.client.MergeClient(self.config, self.sched)
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
timer = zuul.trigger.timer.Timer(self.config, self.sched)
webapp = zuul.webapp.WebApp(self.sched)
@ -205,6 +207,7 @@ class Server(object):
)
self.sched.setLauncher(gearman)
self.sched.setMerger(merger)
self.sched.registerTrigger(gerrit)
self.sched.registerTrigger(timer)
self.sched.registerReporter(gerrit_reporter)
@ -243,21 +246,6 @@ def main():
path = None
sys.exit(server.test_config(path))
if server.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
test_fn = os.path.join(state_dir, 'test')
try:
f = open(test_fn, 'w')
f.close()
os.unlink(test_fn)
except:
print
print "Unable to write to state directory: %s" % state_dir
print
raise
if server.config.has_option('zuul', 'pidfile'):
pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile'))
else:

View File

@ -155,7 +155,6 @@ class Gearman(object):
self.sched = sched
self.builds = {}
self.meta_jobs = {} # A list of meta-jobs like stop or describe
self.zuul_server = config.get('zuul', 'zuul_url')
server = config.get('gearman', 'server')
if config.has_option('gearman', 'port'):
@ -226,7 +225,7 @@ class Gearman(object):
params = dict(ZUUL_UUID=uuid,
ZUUL_PROJECT=item.change.project.name)
params['ZUUL_PIPELINE'] = pipeline.name
params['ZUUL_URL'] = self.zuul_server
params['ZUUL_URL'] = item.current_build_set.zuul_url
if hasattr(item.change, 'refspec'):
changes_str = '^'.join(
['%s:%s:%s' % (i.change.project.name, i.change.branch,

0
zuul/merger/__init__.py Normal file
View File

117
zuul/merger/client.py Normal file
View File

@ -0,0 +1,117 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
from uuid import uuid4
import gear
def getJobData(job):
if not len(job.data):
return {}
d = job.data[-1]
if not d:
return {}
return json.loads(d)
class MergeGearmanClient(gear.Client):
def __init__(self, merge_client):
super(MergeGearmanClient, self).__init__()
self.__merge_client = merge_client
def handleWorkComplete(self, packet):
job = super(MergeGearmanClient, self).handleWorkComplete(packet)
self.__merge_client.onBuildCompleted(job)
return job
def handleWorkFail(self, packet):
job = super(MergeGearmanClient, self).handleWorkFail(packet)
self.__merge_client.onBuildCompleted(job)
return job
def handleWorkException(self, packet):
job = super(MergeGearmanClient, self).handleWorkException(packet)
self.__merge_client.onBuildCompleted(job)
return job
def handleDisconnect(self, job):
job = super(MergeGearmanClient, self).handleDisconnect(job)
self.__merge_client.onBuildCompleted(job)
class MergeClient(object):
log = logging.getLogger("zuul.MergeClient")
def __init__(self, config, sched):
self.config = config
self.sched = sched
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
self.gearman = MergeGearmanClient(self)
self.gearman.addServer(server, port)
self.log.debug("Waiting for gearman")
self.gearman.waitForServer()
self.build_sets = {}
def stop(self):
self.gearman.shutdown()
def areMergesOutstanding(self):
if self.build_sets:
return True
return False
def submitJob(self, name, data, build_set):
uuid = str(uuid4().hex)
self.log.debug("Submitting job %s with data %s" % (name, data))
job = gear.Job(name,
json.dumps(data),
unique=uuid)
self.build_sets[uuid] = build_set
self.gearman.submitJob(job)
def mergeChanges(self, items, build_set):
data = dict(items=items)
self.submitJob('merger:merge', data, build_set)
def updateRepo(self, project, url, build_set):
data = dict(project=project,
url=url)
self.submitJob('merger:update', data, build_set)
def onBuildCompleted(self, job):
build_set = self.build_sets.get(job.unique)
if build_set:
data = getJobData(job)
zuul_url = data.get('zuul_url')
merged = data.get('merged', False)
updated = data.get('updated', False)
commit = data.get('commit')
self.log.info("Merge %s complete, merged: %s, updated: %s, "
"commit: %s" %
(job, merged, updated, build_set.commit))
self.sched.onMergeCompleted(build_set, zuul_url,
merged, updated, commit)
# The test suite expects the build_set to be removed from
# the internal dict after the wake flag is set.
del self.build_sets[job.unique]
else:
self.log.error("Unable to find build set for uuid %s" % job.unique)

View File

@ -1,4 +1,5 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013-2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -201,9 +202,9 @@ class Merger(object):
repo = self.getRepo(item['project'], item['url'])
try:
repo.checkout(ref)
except:
except Exception:
self.log.exception("Unable to checkout %s" % ref)
return False
return None
try:
mode = item['merge_mode']
@ -219,7 +220,7 @@ class Merger(object):
# Log exceptions at debug level because they are
# usually benign merge conflicts
self.log.debug("Unable to merge %s" % item, exc_info=True)
return False
return None
return commit
@ -256,6 +257,8 @@ class Merger(object):
self.log.debug("Found base commit %s for %s" % (base, key,))
# Merge the change
commit = self._mergeChange(item, base)
if not commit:
return None
# Store this commit as the most recent for this project-branch
recent[key] = commit
# Set the Zuul ref for this item to point to the most recent

115
zuul/merger/server.py Normal file
View File

@ -0,0 +1,115 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import threading
import traceback
import gear
import merger
class MergeServer(object):
log = logging.getLogger("zuul.MergeServer")
def __init__(self, config):
self.config = config
self.zuul_url = config.get('merger', 'zuul_url')
if self.config.has_option('merger', 'git_dir'):
merge_root = self.config.get('merger', 'git_dir')
else:
merge_root = '/var/lib/zuul/git'
if self.config.has_option('merger', 'git_user_email'):
merge_email = self.config.get('merger', 'git_user_email')
else:
merge_email = None
if self.config.has_option('merger', 'git_user_name'):
merge_name = self.config.get('merger', 'git_user_name')
else:
merge_name = None
if self.config.has_option('gerrit', 'sshkey'):
sshkey = self.config.get('gerrit', 'sshkey')
else:
sshkey = None
self.merger = merger.Merger(merge_root, sshkey,
merge_email, merge_name)
def start(self):
self._running = True
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = gear.Worker('Zuul Merger')
self.worker.addServer(server, port)
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
self.worker.waitForServer()
self.register()
def register(self):
self.worker.registerFunction("merger:merge")
self.worker.registerFunction("merger:update")
def stop(self):
self.log.debug("Stopping")
self._running = False
self.worker.shutdown()
self.log.debug("Stopped")
def join(self):
self.thread.join()
def run(self):
self.log.debug("Starting merge listener")
while self._running:
try:
job = self.worker.getJob()
try:
if job.name == 'merger:merge':
self.merge(job)
elif job.name == 'merger:update':
self.update(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(traceback.format_exc())
except Exception:
self.log.exception("Exception while getting job")
def merge(self, job):
args = json.loads(job.arguments)
commit = self.merger.mergeChanges(args['items'])
result = dict(merged=(commit is not None),
commit=commit,
zuul_url=self.zuul_url)
job.sendWorkComplete(json.dumps(result))
def update(self, job):
args = json.loads(job.arguments)
self.merger.updateRepo(args['project'], args['url'])
result = dict(updated=True,
zuul_url=self.zuul_url)
job.sendWorkComplete(json.dumps(result))

View File

@ -633,6 +633,11 @@ class Build(object):
class BuildSet(object):
# Merge states:
NEW = 1
PENDING = 2
COMPLETE = 3
def __init__(self, item):
self.item = item
self.other_changes = []
@ -642,9 +647,11 @@ class BuildSet(object):
self.previous_build_set = None
self.ref = None
self.commit = None
self.zuul_url = None
self.unable_to_merge = False
self.unable_to_merge_message = None
self.failing_reasons = []
self.merge_state = self.NEW
def setConfiguration(self):
# The change isn't enqueued until after it's created

View File

@ -30,7 +30,6 @@ import yaml
import layoutvalidator
import model
from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
import merger
from zuul import version as zuul_version
statsd = extras.try_import('statsd.statsd')
@ -136,6 +135,24 @@ class BuildCompletedEvent(ResultEvent):
self.build = build
class MergeCompletedEvent(ResultEvent):
"""A remote merge operation has completed
:arg BuildSet build_set: The build_set which is ready.
:arg str zuul_url: The URL of the Zuul Merger.
:arg bool merged: Whether the merge succeeded (changes with refs).
:arg bool updated: Whether the repo was updated (changes without refs).
:arg str commit: The SHA of the merged commit (changes with refs).
"""
def __init__(self, build_set, zuul_url, merged, updated, commit):
self.build_set = build_set
self.zuul_url = zuul_url
self.merged = merged
self.updated = updated
self.commit = commit
class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
@ -149,6 +166,7 @@ class Scheduler(threading.Thread):
self._exit = False
self._stopped = False
self.launcher = None
self.merger = None
self.triggers = dict()
self.reporters = dict()
self.config = None
@ -379,36 +397,12 @@ class Scheduler(threading.Thread):
return layout
def _setupMerger(self):
if self.config.has_option('zuul', 'git_dir'):
merge_root = self.config.get('zuul', 'git_dir')
else:
merge_root = '/var/lib/zuul/git'
if self.config.has_option('zuul', 'git_user_email'):
merge_email = self.config.get('zuul', 'git_user_email')
else:
merge_email = None
if self.config.has_option('zuul', 'git_user_name'):
merge_name = self.config.get('zuul', 'git_user_name')
else:
merge_name = None
if self.config.has_option('gerrit', 'sshkey'):
sshkey = self.config.get('gerrit', 'sshkey')
else:
sshkey = None
# TODO: The merger should have an upstream repo independent of
# triggers, and then each trigger should provide a fetch
# location.
self.merger = merger.Merger(merge_root, sshkey,
merge_email, merge_name)
def setLauncher(self, launcher):
self.launcher = launcher
def setMerger(self, merger):
self.merger = merger
def registerTrigger(self, trigger, name=None):
if name is None:
name = trigger.name
@ -468,6 +462,14 @@ class Scheduler(threading.Thread):
self.wake_event.set()
self.log.debug("Done adding complete event for build: %s" % build)
def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
self.log.debug("Adding merge complete event for build set: %s" %
build_set)
event = MergeCompletedEvent(build_set, zuul_url,
merged, updated, commit)
self.result_event_queue.put(event)
self.wake_event.set()
def reconfigure(self, config):
self.log.debug("Prepare to reconfigure")
event = ReconfigureEvent(config)
@ -594,7 +596,6 @@ class Scheduler(threading.Thread):
new_pipeline.manager.building_jobs = \
old_pipeline.manager.building_jobs
self.layout = layout
self._setupMerger()
for trigger in self.triggers.values():
trigger.postConfig()
if statsd:
@ -651,6 +652,8 @@ class Scheduler(threading.Thread):
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
if self.merger.areMergesOutstanding():
waiting = True
for pipeline in self.layout.pipelines.values():
for build in pipeline.manager.building_jobs.keys():
self.log.debug("%s waiting on %s" % (pipeline.manager, build))
@ -672,6 +675,7 @@ class Scheduler(threading.Thread):
self.wake_event.wait()
self.wake_event.clear()
if self._stopped:
self.log.debug("Run handler stopping")
return
self.log.debug("Run handler awake")
self.run_handler_lock.acquire()
@ -728,17 +732,6 @@ class Scheduler(threading.Thread):
self.log.warning("Project %s not found" % event.project_name)
return
# Preprocessing for ref-update events
if event.ref:
# Make sure the local git repo is up-to-date with the
# remote one. We better have the new ref before
# enqueuing the changes. This is done before
# enqueuing the changes to avoid calling an update per
# pipeline accepting the change.
self.log.info("Fetching references for %s" % project)
url = self.triggers['gerrit'].getGitUrl(project)
self.merger.updateRepo(project.name, url)
for pipeline in self.layout.pipelines.values():
change = event.getChange(project,
self.triggers.get(event.trigger_name))
@ -776,24 +769,50 @@ class Scheduler(threading.Thread):
self._doBuildStartedEvent(event)
elif isinstance(event, BuildCompletedEvent):
self._doBuildCompletedEvent(event)
elif isinstance(event, MergeCompletedEvent):
self._doMergeCompletedEvent(event)
else:
self.log.error("Unable to handle event %s" % event)
finally:
self.result_event_queue.task_done()
def _doBuildStartedEvent(self, event):
for pipeline in self.layout.pipelines.values():
if pipeline.manager.onBuildStarted(event.build):
build = event.build
if build.build_set is not build.build_set.item.current_build_set:
self.log.warning("Build %s is not in the current build set" %
(build,))
return
self.log.warning("Build %s not found by any queue manager" %
(event.build))
pipeline = build.build_set.item.pipeline
if not pipeline:
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
pipeline.manager.onBuildStarted(event.build)
def _doBuildCompletedEvent(self, event):
for pipeline in self.layout.pipelines.values():
if pipeline.manager.onBuildCompleted(event.build):
build = event.build
if build.build_set is not build.build_set.item.current_build_set:
self.log.warning("Build %s is not in the current build set" %
(build,))
return
self.log.warning("Build %s not found by any queue manager" %
(event.build))
pipeline = build.build_set.item.pipeline
if not pipeline:
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
pipeline.manager.onBuildCompleted(event.build)
def _doMergeCompletedEvent(self, event):
build_set = event.build_set
if build_set is not build_set.item.current_build_set:
self.log.warning("Build set %s is not current" % (build_set,))
return
pipeline = build_set.item.pipeline
if not pipeline:
self.log.warning("Build set %s is not associated with a pipeline" %
(build_set,))
return
pipeline.manager.onMergeCompleted(event)
def formatStatusHTML(self):
ret = '<html><pre>'
@ -1083,7 +1102,7 @@ class BasePipelineManager(object):
# Create a dictionary with all info about the item needed by
# the merger.
return dict(project=item.change.project.name,
url=self.sched.triggers['gerrit'].getGitUrl(
url=self.pipeline.trigger.getGitUrl(
item.change.project),
merge_mode=item.change.project.merge_mode,
refspec=item.change.refspec,
@ -1092,26 +1111,28 @@ class BasePipelineManager(object):
)
def prepareRef(self, item):
# Returns False on success.
# Returns True if we were unable to prepare the ref.
ref = item.current_build_set.ref
# Returns True if the ref is ready, false otherwise
build_set = item.current_build_set
if build_set.merge_state == build_set.COMPLETE:
return True
if build_set.merge_state == build_set.PENDING:
return False
build_set.merge_state = build_set.PENDING
ref = build_set.ref
if hasattr(item.change, 'refspec') and not ref:
self.log.debug("Preparing ref for: %s" % item.change)
item.current_build_set.setConfiguration()
ref = item.current_build_set.ref
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
all_items = dependent_items + [item]
merger_items = map(self._makeMergerItem, all_items)
commit = self.sched.merger.mergeChanges(merger_items)
item.current_build_set.commit = commit
if not commit:
self.log.info("Unable to merge change %s" % item.change)
msg = ("This change was unable to be automatically merged "
"with the current state of the repository. Please "
"rebase your change and upload a new patchset.")
self.pipeline.setUnableToMerge(item, msg)
return True
self.sched.merger.mergeChanges(merger_items,
item.current_build_set)
else:
self.log.debug("Preparing update repo for: %s" % item.change)
url = self.pipeline.trigger.getGitUrl(item.change.project)
self.sched.merger.updateRepo(item.change.project.name,
url, build_set)
return False
def _launchJobs(self, item, jobs):
@ -1164,7 +1185,7 @@ class BasePipelineManager(object):
canceled = True
return canceled
def _processOneItem(self, item, nnfi):
def _processOneItem(self, item, nnfi, ready_ahead):
changed = False
item_ahead = item.item_ahead
change_queue = self.pipeline.getQueue(item.change.project)
@ -1181,10 +1202,11 @@ class BasePipelineManager(object):
self.reportItem(item)
except MergeFailure:
pass
return (True, nnfi)
return (True, nnfi, ready_ahead)
dep_item = self.getFailingDependentItem(item)
actionable = change_queue.isActionable(item)
item.active = actionable
ready = False
if dep_item:
failing_reasons.append('a needed change is failing')
self.cancelJobs(item, prime=False)
@ -1204,10 +1226,13 @@ class BasePipelineManager(object):
changed = True
self.cancelJobs(item)
if actionable:
self.prepareRef(item)
ready = self.prepareRef(item)
if item.current_build_set.unable_to_merge:
failing_reasons.append("it has a merge conflict")
if actionable and self.launchJobs(item):
ready = False
if not ready:
ready_ahead = False
if actionable and ready_ahead and self.launchJobs(item):
changed = True
if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
@ -1229,7 +1254,7 @@ class BasePipelineManager(object):
if failing_reasons:
self.log.debug("%s is a failing item because %s" %
(item, failing_reasons))
return (changed, nnfi)
return (changed, nnfi, ready_ahead)
def processQueue(self):
# Do whatever needs to be done for each change in the queue
@ -1238,8 +1263,10 @@ class BasePipelineManager(object):
for queue in self.pipeline.queues:
queue_changed = False
nnfi = None # Nearest non-failing item
ready_ahead = True # All build sets ahead are ready
for item in queue.queue[:]:
item_changed, nnfi = self._processOneItem(item, nnfi)
item_changed, nnfi, ready_ahhead = self._processOneItem(
item, nnfi, ready_ahead)
if item_changed:
queue_changed = True
self.reportStats(item)
@ -1294,6 +1321,22 @@ class BasePipelineManager(object):
self.updateBuildDescriptions(build.build_set)
return True
def onMergeCompleted(self, event):
build_set = event.build_set
item = build_set.item
build_set.merge_state = build_set.COMPLETE
build_set.zuul_url = event.zuul_url
if event.merged:
build_set.commit = event.commit
elif event.updated:
build_set.commit = item.change.newrev
if not build_set.commit:
self.log.info("Unable to merge change %s" % item.change)
msg = ("This change was unable to be automatically merged "
"with the current state of the repository. Please "
"rebase your change and upload a new patchset.")
self.pipeline.setUnableToMerge(item, msg)
def reportItem(self, item):
if item.reported:
raise Exception("Already reported change %s" % item.change)

View File

@ -86,7 +86,8 @@ class Timer(object):
raise Exception("Timer trigger does not support changes.")
def getGitUrl(self, project):
pass
# For the moment, the timer trigger requires gerrit.
return self.sched.triggers['gerrit'].getGitUrl(project)
def getGitwebUrl(self, project, sha=None):
url = '%s/gitweb?p=%s.git' % (self.baseurl, project)