From 4076e2b432992b71873b659649e35f0e665ab3ba Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 28 Jan 2014 12:42:20 -0800 Subject: [PATCH] Split the merger into a separate process Connect it to Zuul via Gearman. Any number of mergers may be deployed. Directly find the pipeline for a build when processing a result, so that the procedure is roughly the same for build and merge results. The timer trigger currently requires the gerrit trigger also be configured. Make that explicit inside of the timer trigger so that the scheduler API interaction with triggers is cleaner. Change-Id: I69498813764753c97c426e42d17596c2ef1d87cf --- NEWS.rst | 9 ++ doc/source/index.rst | 1 + doc/source/merger.rst | 63 +++++++++++++ doc/source/triggers.rst | 43 --------- doc/source/zuul.rst | 64 +++++++------ etc/zuul.conf-sample | 4 +- setup.cfg | 1 + tests/fixtures/zuul.conf | 6 +- tests/test_scheduler.py | 22 ++++- zuul/cmd/merger.py | 153 ++++++++++++++++++++++++++++++ zuul/cmd/server.py | 18 +--- zuul/launcher/gearman.py | 3 +- zuul/merger/__init__.py | 0 zuul/merger/client.py | 117 +++++++++++++++++++++++ zuul/{ => merger}/merger.py | 9 +- zuul/merger/server.py | 115 ++++++++++++++++++++++ zuul/model.py | 7 ++ zuul/scheduler.py | 183 ++++++++++++++++++++++-------------- zuul/trigger/timer.py | 3 +- 19 files changed, 651 insertions(+), 170 deletions(-) create mode 100644 doc/source/merger.rst create mode 100644 zuul/cmd/merger.py create mode 100644 zuul/merger/__init__.py create mode 100644 zuul/merger/client.py rename zuul/{ => merger}/merger.py (98%) create mode 100644 zuul/merger/server.py diff --git a/NEWS.rst b/NEWS.rst index ba6c9860e0..bd09bfefa2 100644 --- a/NEWS.rst +++ b/NEWS.rst @@ -3,6 +3,15 @@ Since 2.0.0: * The push_change_refs option which specified that Zuul refs should be pushed to Gerrit has been removed. +* Git merge operations are now performed in a separate process. Run + at least one instance of the ``zuul-merger`` program which is now + included. Any number of Zuul-Mergers may be run in order to + distribute the work of speculatively merging changes into git and + serving the results to test workers. This system is designed to + scale out to many servers, but one instance may be co-located with + the Zuul server in smaller deployments. Several configuration + options have moved from the ``zuul`` section to ``merger``. + Since 1.3.0: * The Jenkins launcher is replaced with Gearman launcher. An internal diff --git a/doc/source/index.rst b/doc/source/index.rst index 4b7b4b0281..c5beda007a 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -20,6 +20,7 @@ Contents: gating triggers + merger launchers reporters zuul diff --git a/doc/source/merger.rst b/doc/source/merger.rst new file mode 100644 index 0000000000..4c445c685c --- /dev/null +++ b/doc/source/merger.rst @@ -0,0 +1,63 @@ +:title: Merger + +Merger +====== + +The Zuul Merger is a separate component which communicates with the +main Zuul server via Gearman. Its purpose is to speculatively merge +the changes for Zuul in preparation for testing. The resulting git +commits also must be served to the test workers, and the server(s) +running the Zuul Merger are expected to do this as well. Because both +of these tasks are resource intensive, any number of Zuul Mergers can +be run in parallel on distinct hosts. + +Configuration +~~~~~~~~~~~~~ + +The Zuul Merger can read the same zuul.conf file as the main Zuul +server and requires the ``gearman``, ``gerrit``, ``merger``, and +``zuul`` sections (indicated fields only). Be sure the zuul_url is +set appropriately on each host that runs a zuul-merger. + +Zuul References +~~~~~~~~~~~~~~~ + +As the DependentPipelineManager may combine several changes together +for testing when performing speculative execution, determining exactly +how the workspace should be set up when running a Job can be complex. +To alleviate this problem, Zuul performs merges itself, merging or +cherry-picking changes as required and identifies the result with a +Git reference of the form ``refs/zuul//Z``. +Preparing the workspace is then a simple matter of fetching that ref +and checking it out. The parameters that provide this information are +described in :ref:`launchers`. + +These references need to be made available via a Git repository that +is available to Jenkins. This is accomplished by serving Zuul's Git +repositories directly. + +Serving Zuul Git Repos +~~~~~~~~~~~~~~~~~~~~~~ + +Zuul maintains its own copies of any needed Git repositories in the +directory specified by ``git_dir`` in the ``merger`` section of +zuul.conf (by default, /var/lib/zuul/git). To directly serve Zuul's +Git repositories in order to provide Zuul refs for Jenkins, you can +configure Apache to do so using the following directives:: + + SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git + SetEnv GIT_HTTP_EXPORT_ALL + + AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1 + AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1 + ScriptAlias /p/ /usr/lib/git-core/git-http-backend/ + +And set ``push_change_refs`` to ``false`` (the default) in the +``zuul`` section of zuul.conf. + +Note that Zuul's Git repositories are not bare, which means they have +a working tree, and are not suitable for public consumption (for +instance, a clone will produce a repository in an unpredictable state +depending on what the state of Zuul's repository is when the clone +happens). They are, however, suitable for automated systems that +respond to Zuul triggers. diff --git a/doc/source/triggers.rst b/doc/source/triggers.rst index 287246cc78..c4485bf5d6 100644 --- a/doc/source/triggers.rst +++ b/doc/source/triggers.rst @@ -35,49 +35,6 @@ want Zuul to gate. For instance, you may want to grant ``Verified be added to Gerrit. Zuul is very flexible and can take advantage of those. -Zuul References -~~~~~~~~~~~~~~~ - -As the DependentPipelineManager may combine several changes together -for testing when performing speculative execution, determining exactly -how the workspace should be set up when running a Job can be complex. -To alleviate this problem, Zuul performs merges itself, merging or -cherry-picking changes as required and identifies the result with a -Git reference of the form ``refs/zuul//Z``. -Preparing the workspace is then a simple matter of fetching that ref -and checking it out. The parameters that provide this information are -described in :ref:`launchers`. - -These references need to be made available via a Git repository that -is available to Jenkins. This is accomplished by serving Zuul's Git -repositories directly. - -Serving Zuul Git Repos -"""""""""""""""""""""" - -Zuul maintains its own copies of any needed Git repositories in the -directory specified by ``git_dir`` in the ``zuul`` section of -zuul.conf (by default, /var/lib/zuul/git). To directly serve Zuul's -Git repositories in order to provide Zuul refs for Jenkins, you can -configure Apache to do so using the following directives:: - - SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git - SetEnv GIT_HTTP_EXPORT_ALL - - AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1 - AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1 - ScriptAlias /p/ /usr/lib/git-core/git-http-backend/ - -And set ``push_change_refs`` to ``false`` (the default) in the -``zuul`` section of zuul.conf. - -Note that Zuul's Git repositories are not bare, which means they have -a working tree, and are not suitable for public consumption (for -instance, a clone will produce a repository in an unpredictable state -depending on what the state of Zuul's repository is when the clone -happens). They are, however, suitable for automated systems that -respond to Zuul triggers. - Timer ----- diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst index d71912c498..ee70523a2d 100644 --- a/doc/source/zuul.rst +++ b/doc/source/zuul.rst @@ -83,21 +83,49 @@ zuul """" **layout_config** - Path to layout config file. + Path to layout config file. Used by zuul-server only. ``layout_config=/etc/zuul/layout.yaml`` **log_config** - Path to log config file. + Path to log config file. Used by all Zuul commands. ``log_config=/etc/zuul/logging.yaml`` **pidfile** - Path to PID lock file. + Path to PID lock file. Used by all Zuul commands. ``pidfile=/var/run/zuul/zuul.pid`` **state_dir** - Path to directory that Zuul should save state to. + Path to directory that Zuul should save state to. Used by all Zuul + commands. ``state_dir=/var/lib/zuul`` +**report_times** + Boolean value (``true`` or ``false``) that determines if Zuul should + include elapsed times for each job in the textual report. Used by + zuul-server only. + ``report_times=true`` + +**status_url** + URL that will be posted in Zuul comments made to Gerrit changes when + starting jobs for a change. Used by zuul-server only. + ``status_url=https://zuul.example.com/status`` + +**url_pattern** + If you are storing build logs external to the system that originally + ran jobs and wish to link to those logs when Zuul makes comments on + Gerrit changes for completed jobs this setting configures what the + URLs for those links should be. Used by zuul-server only. + ``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}`` + +**job_name_in_report** + Boolean value (``true`` or ``false``) that indicates whether the + job name should be included in the report (normally only the URL + is included). Defaults to ``false``. Used by zuul-server only. + ``job_name_in_report=true`` + +merger +"""""" + **git_dir** Directory that Zuul should clone local git repositories to. ``git_dir=/var/lib/zuul/git`` @@ -110,32 +138,10 @@ zuul Optional: Value to pass to `git config user.name`. ``git_user_name=zuul`` -**report_times** - Boolean value (``true`` or ``false``) that determines if Zuul should - include elapsed times for each job in the textual report. - ``report_times=true`` - -**status_url** - URL that will be posted in Zuul comments made to Gerrit changes when - starting jobs for a change. - ``status_url=https://zuul.example.com/status`` - -**url_pattern** - If you are storing build logs external to the system that originally - ran jobs and wish to link to those logs when Zuul makes comments on - Gerrit changes for completed jobs this setting configures what the - URLs for those links should be. - ``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}`` - -**job_name_in_report** - Boolean value (``true`` or ``false``) that indicates whether the - job name should be included in the report (normally only the URL - is included). Defaults to ``false``. - ``job_name_in_report=true`` - **zuul_url** - URL of Zuul's git repos, accessible to test workers. - Usually "http://zuul.example.com/p". + URL of this merger's git repos, accessible to test workers. Usually + "http://zuul.example.com/p" or "http://zuul-merger01.example.com/p" + depending on whether the merger is co-located with the Zuul server. smtp """" diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample index a4d1390cea..75c84e4c4e 100644 --- a/etc/zuul.conf-sample +++ b/etc/zuul.conf-sample @@ -15,10 +15,12 @@ layout_config=/etc/zuul/layout.yaml log_config=/etc/zuul/logging.conf pidfile=/var/run/zuul/zuul.pid state_dir=/var/lib/zuul +status_url=https://jenkins.example.com/zuul/status + +[merger] git_dir=/var/lib/zuul/git ;git_user_email=zuul@example.com ;git_user_name=zuul -status_url=https://jenkins.example.com/zuul/status zuul_url=http://zuul.example.com/p [smtp] diff --git a/setup.cfg b/setup.cfg index 9ff62d67c9..21b1199b10 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,6 +22,7 @@ warnerrors = True [entry_points] console_scripts = zuul-server = zuul.cmd.server:main + zuul-merger = zuul.cmd.merger:main zuul = zuul.cmd.client:main [build_sphinx] diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf index f77e07e5e1..bee06e4bc2 100644 --- a/tests/fixtures/zuul.conf +++ b/tests/fixtures/zuul.conf @@ -8,11 +8,13 @@ sshkey=none [zuul] layout_config=layout.yaml +url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number} +job_name_in_report=true + +[merger] git_dir=/tmp/zuul-test/git git_user_email=zuul@example.com git_user_name=zuul -url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number} -job_name_in_report=true zuul_url=http://zuul.example.com/p [smtp] diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 67c4709c67..b2106f849b 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -47,6 +47,8 @@ import zuul.webapp import zuul.rpclistener import zuul.rpcclient import zuul.launcher.gearman +import zuul.merger.server +import zuul.merger.client import zuul.reporter.gerrit import zuul.reporter.smtp import zuul.trigger.gerrit @@ -764,7 +766,7 @@ class TestScheduler(testtools.TestCase): self.upstream_root = os.path.join(self.test_root, "upstream") self.git_root = os.path.join(self.test_root, "git") - CONFIG.set('zuul', 'git_dir', self.git_root) + CONFIG.set('merger', 'git_dir', self.git_root) if os.path.exists(self.test_root): shutil.rmtree(self.test_root) os.makedirs(self.test_root) @@ -804,6 +806,9 @@ class TestScheduler(testtools.TestCase): self.worker.addServer('127.0.0.1', self.gearman_server.port) self.gearman_server.worker = self.worker + self.merge_server = zuul.merger.server.MergeServer(self.config) + self.merge_server.start() + self.sched = zuul.scheduler.Scheduler() def URLOpenerFactory(*args, **kw): @@ -812,6 +817,8 @@ class TestScheduler(testtools.TestCase): urllib2.urlopen = URLOpenerFactory self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched) + self.merge_client = zuul.merger.client.MergeClient( + self.config, self.sched) self.smtp_messages = [] @@ -833,6 +840,7 @@ class TestScheduler(testtools.TestCase): self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched) self.sched.setLauncher(self.launcher) + self.sched.setMerger(self.merge_client) self.sched.registerTrigger(self.gerrit) self.timer = zuul.trigger.timer.Timer(self.config, self.sched) self.sched.registerTrigger(self.timer) @@ -873,6 +881,9 @@ class TestScheduler(testtools.TestCase): def shutdown(self): self.log.debug("Shutting down after tests") self.launcher.stop() + self.merge_server.stop() + self.merge_server.join() + self.merge_client.stop() self.worker.shutdown() self.gearman_server.shutdown() self.gerrit.stop() @@ -991,13 +1002,15 @@ class TestScheduler(testtools.TestCase): done = True for connection in self.gearman_server.active_connections: if (connection.functions and - connection.client_id != 'Zuul RPC Listener'): + connection.client_id not in ['Zuul RPC Listener', + 'Zuul Merger']): done = False if done: break time.sleep(0) self.gearman_server.functions = set() self.rpc.register() + self.merge_server.register() def haveAllBuildsReported(self): # See if Zuul is waiting on a meta job to complete @@ -1087,6 +1100,7 @@ class TestScheduler(testtools.TestCase): if (self.sched.trigger_event_queue.empty() and self.sched.result_event_queue.empty() and self.fake_gerrit.event_queue.empty() and + not self.merge_client.build_sets and self.areAllBuildsWaiting()): self.sched.run_handler_lock.release() self.worker.lock.release() @@ -2357,7 +2371,7 @@ class TestScheduler(testtools.TestCase): # This test assumes the repo is already cloned; make sure it is url = self.sched.triggers['gerrit'].getGitUrl( self.sched.layout.projects['org/project1']) - self.sched.merger.addProject('org/project1', url) + self.merge_server.merger.addProject('org/project1', url) A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') A.addPatchset(large=True) path = os.path.join(self.upstream_root, "org/project1") @@ -2479,7 +2493,7 @@ class TestScheduler(testtools.TestCase): def test_zuul_url_return(self): "Test if ZUUL_URL is returning when zuul_url is set in zuul.conf" - self.assertTrue(self.sched.config.has_option('zuul', 'zuul_url')) + self.assertTrue(self.sched.config.has_option('merger', 'zuul_url')) self.worker.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py new file mode 100644 index 0000000000..e9722cffcd --- /dev/null +++ b/zuul/cmd/merger.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# Copyright 2013-2014 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import ConfigParser +import daemon +import extras + +# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore +# instead it depends on lockfile-0.9.1 which uses pidfile. +pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile']) + +import logging +import logging.config +import os +import sys +import signal +import traceback + +# No zuul imports here because they pull in paramiko which must not be +# imported until after the daemonization. +# https://github.com/paramiko/paramiko/issues/59 +# Similar situation with gear and statsd. + + +def stack_dump_handler(signum, frame): + signal.signal(signal.SIGUSR2, signal.SIG_IGN) + log_str = "" + for thread_id, stack_frame in sys._current_frames().items(): + log_str += "Thread: %s\n" % thread_id + log_str += "".join(traceback.format_stack(stack_frame)) + log = logging.getLogger("zuul.stack_dump") + log.debug(log_str) + signal.signal(signal.SIGUSR2, stack_dump_handler) + + +class Merger(object): + def __init__(self): + self.args = None + self.config = None + + def parse_arguments(self): + parser = argparse.ArgumentParser(description='Zuul merge worker.') + parser.add_argument('-c', dest='config', + help='specify the config file') + parser.add_argument('-d', dest='nodaemon', action='store_true', + help='do not run as a daemon') + parser.add_argument('--version', dest='version', action='store_true', + help='show zuul version') + self.args = parser.parse_args() + + def read_config(self): + self.config = ConfigParser.ConfigParser() + if self.args.config: + locations = [self.args.config] + else: + locations = ['/etc/zuul/zuul.conf', + '~/zuul.conf'] + for fp in locations: + if os.path.exists(os.path.expanduser(fp)): + self.config.read(os.path.expanduser(fp)) + return + raise Exception("Unable to locate config file in %s" % locations) + + def setup_logging(self, section, parameter): + if self.config.has_option(section, parameter): + fp = os.path.expanduser(self.config.get(section, parameter)) + if not os.path.exists(fp): + raise Exception("Unable to read logging config file at %s" % + fp) + logging.config.fileConfig(fp) + else: + logging.basicConfig(level=logging.DEBUG) + + def exit_handler(self, signum, frame): + signal.signal(signal.SIGUSR1, signal.SIG_IGN) + self.merger.stop() + self.merger.join() + + def main(self): + # See comment at top of file about zuul imports + import zuul.merger.server + + self.setup_logging('zuul', 'log_config') + + self.merger = zuul.merger.server.MergeServer(self.config) + self.merger.start() + + signal.signal(signal.SIGUSR1, self.exit_handler) + signal.signal(signal.SIGUSR2, stack_dump_handler) + while True: + try: + signal.pause() + except KeyboardInterrupt: + print "Ctrl + C: asking merger to exit nicely...\n" + self.exit_handler(signal.SIGINT, None) + + +def main(): + server = Merger() + server.parse_arguments() + + if server.args.version: + from zuul.version import version_info as zuul_version_info + print "Zuul version: %s" % zuul_version_info.version_string() + sys.exit(0) + + server.read_config() + + if server.config.has_option('zuul', 'state_dir'): + state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir')) + else: + state_dir = '/var/lib/zuul' + test_fn = os.path.join(state_dir, 'test') + try: + f = open(test_fn, 'w') + f.close() + os.unlink(test_fn) + except Exception: + print + print "Unable to write to state directory: %s" % state_dir + print + raise + + if server.config.has_option('zuul', 'pidfile'): + pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile')) + else: + pid_fn = '/var/run/zuul/merger.pid' + pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10) + + if server.args.nodaemon: + server.main() + else: + with daemon.DaemonContext(pidfile=pid): + server.main() + + +if __name__ == "__main__": + sys.path.insert(0, '.') + main() diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py index 79015359f0..5d83959d00 100755 --- a/zuul/cmd/server.py +++ b/zuul/cmd/server.py @@ -172,6 +172,7 @@ class Server(object): # See comment at top of file about zuul imports import zuul.scheduler import zuul.launcher.gearman + import zuul.merger.client import zuul.reporter.gerrit import zuul.reporter.smtp import zuul.trigger.gerrit @@ -188,6 +189,7 @@ class Server(object): self.sched = zuul.scheduler.Scheduler() gearman = zuul.launcher.gearman.Gearman(self.config, self.sched) + merger = zuul.merger.client.MergeClient(self.config, self.sched) gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched) timer = zuul.trigger.timer.Timer(self.config, self.sched) webapp = zuul.webapp.WebApp(self.sched) @@ -205,6 +207,7 @@ class Server(object): ) self.sched.setLauncher(gearman) + self.sched.setMerger(merger) self.sched.registerTrigger(gerrit) self.sched.registerTrigger(timer) self.sched.registerReporter(gerrit_reporter) @@ -243,21 +246,6 @@ def main(): path = None sys.exit(server.test_config(path)) - if server.config.has_option('zuul', 'state_dir'): - state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir')) - else: - state_dir = '/var/lib/zuul' - test_fn = os.path.join(state_dir, 'test') - try: - f = open(test_fn, 'w') - f.close() - os.unlink(test_fn) - except: - print - print "Unable to write to state directory: %s" % state_dir - print - raise - if server.config.has_option('zuul', 'pidfile'): pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile')) else: diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py index 350044596d..37fc74362f 100644 --- a/zuul/launcher/gearman.py +++ b/zuul/launcher/gearman.py @@ -155,7 +155,6 @@ class Gearman(object): self.sched = sched self.builds = {} self.meta_jobs = {} # A list of meta-jobs like stop or describe - self.zuul_server = config.get('zuul', 'zuul_url') server = config.get('gearman', 'server') if config.has_option('gearman', 'port'): @@ -226,7 +225,7 @@ class Gearman(object): params = dict(ZUUL_UUID=uuid, ZUUL_PROJECT=item.change.project.name) params['ZUUL_PIPELINE'] = pipeline.name - params['ZUUL_URL'] = self.zuul_server + params['ZUUL_URL'] = item.current_build_set.zuul_url if hasattr(item.change, 'refspec'): changes_str = '^'.join( ['%s:%s:%s' % (i.change.project.name, i.change.branch, diff --git a/zuul/merger/__init__.py b/zuul/merger/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/merger/client.py b/zuul/merger/client.py new file mode 100644 index 0000000000..72fd4c520e --- /dev/null +++ b/zuul/merger/client.py @@ -0,0 +1,117 @@ +# Copyright 2014 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import logging +from uuid import uuid4 + +import gear + + +def getJobData(job): + if not len(job.data): + return {} + d = job.data[-1] + if not d: + return {} + return json.loads(d) + + +class MergeGearmanClient(gear.Client): + def __init__(self, merge_client): + super(MergeGearmanClient, self).__init__() + self.__merge_client = merge_client + + def handleWorkComplete(self, packet): + job = super(MergeGearmanClient, self).handleWorkComplete(packet) + self.__merge_client.onBuildCompleted(job) + return job + + def handleWorkFail(self, packet): + job = super(MergeGearmanClient, self).handleWorkFail(packet) + self.__merge_client.onBuildCompleted(job) + return job + + def handleWorkException(self, packet): + job = super(MergeGearmanClient, self).handleWorkException(packet) + self.__merge_client.onBuildCompleted(job) + return job + + def handleDisconnect(self, job): + job = super(MergeGearmanClient, self).handleDisconnect(job) + self.__merge_client.onBuildCompleted(job) + + +class MergeClient(object): + log = logging.getLogger("zuul.MergeClient") + + def __init__(self, config, sched): + self.config = config + self.sched = sched + server = self.config.get('gearman', 'server') + if self.config.has_option('gearman', 'port'): + port = self.config.get('gearman', 'port') + else: + port = 4730 + self.log.debug("Connecting to gearman at %s:%s" % (server, port)) + self.gearman = MergeGearmanClient(self) + self.gearman.addServer(server, port) + self.log.debug("Waiting for gearman") + self.gearman.waitForServer() + self.build_sets = {} + + def stop(self): + self.gearman.shutdown() + + def areMergesOutstanding(self): + if self.build_sets: + return True + return False + + def submitJob(self, name, data, build_set): + uuid = str(uuid4().hex) + self.log.debug("Submitting job %s with data %s" % (name, data)) + job = gear.Job(name, + json.dumps(data), + unique=uuid) + self.build_sets[uuid] = build_set + self.gearman.submitJob(job) + + def mergeChanges(self, items, build_set): + data = dict(items=items) + self.submitJob('merger:merge', data, build_set) + + def updateRepo(self, project, url, build_set): + data = dict(project=project, + url=url) + self.submitJob('merger:update', data, build_set) + + def onBuildCompleted(self, job): + build_set = self.build_sets.get(job.unique) + if build_set: + data = getJobData(job) + zuul_url = data.get('zuul_url') + merged = data.get('merged', False) + updated = data.get('updated', False) + commit = data.get('commit') + self.log.info("Merge %s complete, merged: %s, updated: %s, " + "commit: %s" % + (job, merged, updated, build_set.commit)) + self.sched.onMergeCompleted(build_set, zuul_url, + merged, updated, commit) + # The test suite expects the build_set to be removed from + # the internal dict after the wake flag is set. + del self.build_sets[job.unique] + else: + self.log.error("Unable to find build set for uuid %s" % job.unique) diff --git a/zuul/merger.py b/zuul/merger/merger.py similarity index 98% rename from zuul/merger.py rename to zuul/merger/merger.py index 6efffdb010..13dd1225c6 100644 --- a/zuul/merger.py +++ b/zuul/merger/merger.py @@ -1,4 +1,5 @@ # Copyright 2012 Hewlett-Packard Development Company, L.P. +# Copyright 2013-2014 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -201,9 +202,9 @@ class Merger(object): repo = self.getRepo(item['project'], item['url']) try: repo.checkout(ref) - except: + except Exception: self.log.exception("Unable to checkout %s" % ref) - return False + return None try: mode = item['merge_mode'] @@ -219,7 +220,7 @@ class Merger(object): # Log exceptions at debug level because they are # usually benign merge conflicts self.log.debug("Unable to merge %s" % item, exc_info=True) - return False + return None return commit @@ -256,6 +257,8 @@ class Merger(object): self.log.debug("Found base commit %s for %s" % (base, key,)) # Merge the change commit = self._mergeChange(item, base) + if not commit: + return None # Store this commit as the most recent for this project-branch recent[key] = commit # Set the Zuul ref for this item to point to the most recent diff --git a/zuul/merger/server.py b/zuul/merger/server.py new file mode 100644 index 0000000000..5d520413ac --- /dev/null +++ b/zuul/merger/server.py @@ -0,0 +1,115 @@ +# Copyright 2014 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import logging +import threading +import traceback + +import gear + +import merger + + +class MergeServer(object): + log = logging.getLogger("zuul.MergeServer") + + def __init__(self, config): + self.config = config + self.zuul_url = config.get('merger', 'zuul_url') + + if self.config.has_option('merger', 'git_dir'): + merge_root = self.config.get('merger', 'git_dir') + else: + merge_root = '/var/lib/zuul/git' + + if self.config.has_option('merger', 'git_user_email'): + merge_email = self.config.get('merger', 'git_user_email') + else: + merge_email = None + + if self.config.has_option('merger', 'git_user_name'): + merge_name = self.config.get('merger', 'git_user_name') + else: + merge_name = None + + if self.config.has_option('gerrit', 'sshkey'): + sshkey = self.config.get('gerrit', 'sshkey') + else: + sshkey = None + + self.merger = merger.Merger(merge_root, sshkey, + merge_email, merge_name) + + def start(self): + self._running = True + server = self.config.get('gearman', 'server') + if self.config.has_option('gearman', 'port'): + port = self.config.get('gearman', 'port') + else: + port = 4730 + self.worker = gear.Worker('Zuul Merger') + self.worker.addServer(server, port) + self.thread = threading.Thread(target=self.run) + self.thread.daemon = True + self.thread.start() + self.worker.waitForServer() + self.register() + + def register(self): + self.worker.registerFunction("merger:merge") + self.worker.registerFunction("merger:update") + + def stop(self): + self.log.debug("Stopping") + self._running = False + self.worker.shutdown() + self.log.debug("Stopped") + + def join(self): + self.thread.join() + + def run(self): + self.log.debug("Starting merge listener") + while self._running: + try: + job = self.worker.getJob() + try: + if job.name == 'merger:merge': + self.merge(job) + elif job.name == 'merger:update': + self.update(job) + else: + self.log.error("Unable to handle job %s" % job.name) + job.sendWorkFail() + except Exception: + self.log.exception("Exception while running job") + job.sendWorkException(traceback.format_exc()) + except Exception: + self.log.exception("Exception while getting job") + + def merge(self, job): + args = json.loads(job.arguments) + commit = self.merger.mergeChanges(args['items']) + result = dict(merged=(commit is not None), + commit=commit, + zuul_url=self.zuul_url) + job.sendWorkComplete(json.dumps(result)) + + def update(self, job): + args = json.loads(job.arguments) + self.merger.updateRepo(args['project'], args['url']) + result = dict(updated=True, + zuul_url=self.zuul_url) + job.sendWorkComplete(json.dumps(result)) diff --git a/zuul/model.py b/zuul/model.py index 5da9cef640..2a52306f8c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -633,6 +633,11 @@ class Build(object): class BuildSet(object): + # Merge states: + NEW = 1 + PENDING = 2 + COMPLETE = 3 + def __init__(self, item): self.item = item self.other_changes = [] @@ -642,9 +647,11 @@ class BuildSet(object): self.previous_build_set = None self.ref = None self.commit = None + self.zuul_url = None self.unable_to_merge = False self.unable_to_merge_message = None self.failing_reasons = [] + self.merge_state = self.NEW def setConfiguration(self): # The change isn't enqueued until after it's created diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 805d334afd..eaa5eae0b3 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -30,7 +30,6 @@ import yaml import layoutvalidator import model from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter -import merger from zuul import version as zuul_version statsd = extras.try_import('statsd.statsd') @@ -136,6 +135,24 @@ class BuildCompletedEvent(ResultEvent): self.build = build +class MergeCompletedEvent(ResultEvent): + """A remote merge operation has completed + + :arg BuildSet build_set: The build_set which is ready. + :arg str zuul_url: The URL of the Zuul Merger. + :arg bool merged: Whether the merge succeeded (changes with refs). + :arg bool updated: Whether the repo was updated (changes without refs). + :arg str commit: The SHA of the merged commit (changes with refs). + """ + + def __init__(self, build_set, zuul_url, merged, updated, commit): + self.build_set = build_set + self.zuul_url = zuul_url + self.merged = merged + self.updated = updated + self.commit = commit + + class Scheduler(threading.Thread): log = logging.getLogger("zuul.Scheduler") @@ -149,6 +166,7 @@ class Scheduler(threading.Thread): self._exit = False self._stopped = False self.launcher = None + self.merger = None self.triggers = dict() self.reporters = dict() self.config = None @@ -379,36 +397,12 @@ class Scheduler(threading.Thread): return layout - def _setupMerger(self): - if self.config.has_option('zuul', 'git_dir'): - merge_root = self.config.get('zuul', 'git_dir') - else: - merge_root = '/var/lib/zuul/git' - - if self.config.has_option('zuul', 'git_user_email'): - merge_email = self.config.get('zuul', 'git_user_email') - else: - merge_email = None - - if self.config.has_option('zuul', 'git_user_name'): - merge_name = self.config.get('zuul', 'git_user_name') - else: - merge_name = None - - if self.config.has_option('gerrit', 'sshkey'): - sshkey = self.config.get('gerrit', 'sshkey') - else: - sshkey = None - - # TODO: The merger should have an upstream repo independent of - # triggers, and then each trigger should provide a fetch - # location. - self.merger = merger.Merger(merge_root, sshkey, - merge_email, merge_name) - def setLauncher(self, launcher): self.launcher = launcher + def setMerger(self, merger): + self.merger = merger + def registerTrigger(self, trigger, name=None): if name is None: name = trigger.name @@ -468,6 +462,14 @@ class Scheduler(threading.Thread): self.wake_event.set() self.log.debug("Done adding complete event for build: %s" % build) + def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit): + self.log.debug("Adding merge complete event for build set: %s" % + build_set) + event = MergeCompletedEvent(build_set, zuul_url, + merged, updated, commit) + self.result_event_queue.put(event) + self.wake_event.set() + def reconfigure(self, config): self.log.debug("Prepare to reconfigure") event = ReconfigureEvent(config) @@ -594,7 +596,6 @@ class Scheduler(threading.Thread): new_pipeline.manager.building_jobs = \ old_pipeline.manager.building_jobs self.layout = layout - self._setupMerger() for trigger in self.triggers.values(): trigger.postConfig() if statsd: @@ -651,6 +652,8 @@ class Scheduler(threading.Thread): def _areAllBuildsComplete(self): self.log.debug("Checking if all builds are complete") waiting = False + if self.merger.areMergesOutstanding(): + waiting = True for pipeline in self.layout.pipelines.values(): for build in pipeline.manager.building_jobs.keys(): self.log.debug("%s waiting on %s" % (pipeline.manager, build)) @@ -672,6 +675,7 @@ class Scheduler(threading.Thread): self.wake_event.wait() self.wake_event.clear() if self._stopped: + self.log.debug("Run handler stopping") return self.log.debug("Run handler awake") self.run_handler_lock.acquire() @@ -728,17 +732,6 @@ class Scheduler(threading.Thread): self.log.warning("Project %s not found" % event.project_name) return - # Preprocessing for ref-update events - if event.ref: - # Make sure the local git repo is up-to-date with the - # remote one. We better have the new ref before - # enqueuing the changes. This is done before - # enqueuing the changes to avoid calling an update per - # pipeline accepting the change. - self.log.info("Fetching references for %s" % project) - url = self.triggers['gerrit'].getGitUrl(project) - self.merger.updateRepo(project.name, url) - for pipeline in self.layout.pipelines.values(): change = event.getChange(project, self.triggers.get(event.trigger_name)) @@ -776,24 +769,50 @@ class Scheduler(threading.Thread): self._doBuildStartedEvent(event) elif isinstance(event, BuildCompletedEvent): self._doBuildCompletedEvent(event) + elif isinstance(event, MergeCompletedEvent): + self._doMergeCompletedEvent(event) else: self.log.error("Unable to handle event %s" % event) finally: self.result_event_queue.task_done() def _doBuildStartedEvent(self, event): - for pipeline in self.layout.pipelines.values(): - if pipeline.manager.onBuildStarted(event.build): - return - self.log.warning("Build %s not found by any queue manager" % - (event.build)) + build = event.build + if build.build_set is not build.build_set.item.current_build_set: + self.log.warning("Build %s is not in the current build set" % + (build,)) + return + pipeline = build.build_set.item.pipeline + if not pipeline: + self.log.warning("Build %s is not associated with a pipeline" % + (build,)) + return + pipeline.manager.onBuildStarted(event.build) def _doBuildCompletedEvent(self, event): - for pipeline in self.layout.pipelines.values(): - if pipeline.manager.onBuildCompleted(event.build): - return - self.log.warning("Build %s not found by any queue manager" % - (event.build)) + build = event.build + if build.build_set is not build.build_set.item.current_build_set: + self.log.warning("Build %s is not in the current build set" % + (build,)) + return + pipeline = build.build_set.item.pipeline + if not pipeline: + self.log.warning("Build %s is not associated with a pipeline" % + (build,)) + return + pipeline.manager.onBuildCompleted(event.build) + + def _doMergeCompletedEvent(self, event): + build_set = event.build_set + if build_set is not build_set.item.current_build_set: + self.log.warning("Build set %s is not current" % (build_set,)) + return + pipeline = build_set.item.pipeline + if not pipeline: + self.log.warning("Build set %s is not associated with a pipeline" % + (build_set,)) + return + pipeline.manager.onMergeCompleted(event) def formatStatusHTML(self): ret = '
'
@@ -1083,7 +1102,7 @@ class BasePipelineManager(object):
         # Create a dictionary with all info about the item needed by
         # the merger.
         return dict(project=item.change.project.name,
-                    url=self.sched.triggers['gerrit'].getGitUrl(
+                    url=self.pipeline.trigger.getGitUrl(
                         item.change.project),
                     merge_mode=item.change.project.merge_mode,
                     refspec=item.change.refspec,
@@ -1092,26 +1111,28 @@ class BasePipelineManager(object):
                     )
 
     def prepareRef(self, item):
-        # Returns False on success.
-        # Returns True if we were unable to prepare the ref.
-        ref = item.current_build_set.ref
+        # Returns True if the ref is ready, false otherwise
+        build_set = item.current_build_set
+        if build_set.merge_state == build_set.COMPLETE:
+            return True
+        if build_set.merge_state == build_set.PENDING:
+            return False
+        build_set.merge_state = build_set.PENDING
+        ref = build_set.ref
         if hasattr(item.change, 'refspec') and not ref:
             self.log.debug("Preparing ref for: %s" % item.change)
             item.current_build_set.setConfiguration()
-            ref = item.current_build_set.ref
             dependent_items = self.getDependentItems(item)
             dependent_items.reverse()
             all_items = dependent_items + [item]
             merger_items = map(self._makeMergerItem, all_items)
-            commit = self.sched.merger.mergeChanges(merger_items)
-            item.current_build_set.commit = commit
-            if not commit:
-                self.log.info("Unable to merge change %s" % item.change)
-                msg = ("This change was unable to be automatically merged "
-                       "with the current state of the repository. Please "
-                       "rebase your change and upload a new patchset.")
-                self.pipeline.setUnableToMerge(item, msg)
-                return True
+            self.sched.merger.mergeChanges(merger_items,
+                                           item.current_build_set)
+        else:
+            self.log.debug("Preparing update repo for: %s" % item.change)
+            url = self.pipeline.trigger.getGitUrl(item.change.project)
+            self.sched.merger.updateRepo(item.change.project.name,
+                                         url, build_set)
         return False
 
     def _launchJobs(self, item, jobs):
@@ -1164,7 +1185,7 @@ class BasePipelineManager(object):
                 canceled = True
         return canceled
 
-    def _processOneItem(self, item, nnfi):
+    def _processOneItem(self, item, nnfi, ready_ahead):
         changed = False
         item_ahead = item.item_ahead
         change_queue = self.pipeline.getQueue(item.change.project)
@@ -1181,10 +1202,11 @@ class BasePipelineManager(object):
                 self.reportItem(item)
             except MergeFailure:
                 pass
-            return (True, nnfi)
+            return (True, nnfi, ready_ahead)
         dep_item = self.getFailingDependentItem(item)
         actionable = change_queue.isActionable(item)
         item.active = actionable
+        ready = False
         if dep_item:
             failing_reasons.append('a needed change is failing')
             self.cancelJobs(item, prime=False)
@@ -1204,10 +1226,13 @@ class BasePipelineManager(object):
                 changed = True
                 self.cancelJobs(item)
             if actionable:
-                self.prepareRef(item)
+                ready = self.prepareRef(item)
                 if item.current_build_set.unable_to_merge:
                     failing_reasons.append("it has a merge conflict")
-        if actionable and self.launchJobs(item):
+                    ready = False
+        if not ready:
+            ready_ahead = False
+        if actionable and ready_ahead and self.launchJobs(item):
             changed = True
         if self.pipeline.didAnyJobFail(item):
             failing_reasons.append("at least one job failed")
@@ -1229,7 +1254,7 @@ class BasePipelineManager(object):
         if failing_reasons:
             self.log.debug("%s is a failing item because %s" %
                            (item, failing_reasons))
-        return (changed, nnfi)
+        return (changed, nnfi, ready_ahead)
 
     def processQueue(self):
         # Do whatever needs to be done for each change in the queue
@@ -1238,8 +1263,10 @@ class BasePipelineManager(object):
         for queue in self.pipeline.queues:
             queue_changed = False
             nnfi = None  # Nearest non-failing item
+            ready_ahead = True  # All build sets ahead are ready
             for item in queue.queue[:]:
-                item_changed, nnfi = self._processOneItem(item, nnfi)
+                item_changed, nnfi, ready_ahhead = self._processOneItem(
+                    item, nnfi, ready_ahead)
                 if item_changed:
                     queue_changed = True
                 self.reportStats(item)
@@ -1294,6 +1321,22 @@ class BasePipelineManager(object):
         self.updateBuildDescriptions(build.build_set)
         return True
 
+    def onMergeCompleted(self, event):
+        build_set = event.build_set
+        item = build_set.item
+        build_set.merge_state = build_set.COMPLETE
+        build_set.zuul_url = event.zuul_url
+        if event.merged:
+            build_set.commit = event.commit
+        elif event.updated:
+            build_set.commit = item.change.newrev
+        if not build_set.commit:
+            self.log.info("Unable to merge change %s" % item.change)
+            msg = ("This change was unable to be automatically merged "
+                   "with the current state of the repository. Please "
+                   "rebase your change and upload a new patchset.")
+            self.pipeline.setUnableToMerge(item, msg)
+
     def reportItem(self, item):
         if item.reported:
             raise Exception("Already reported change %s" % item.change)
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index f055a5032f..904fa7a087 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -86,7 +86,8 @@ class Timer(object):
         raise Exception("Timer trigger does not support changes.")
 
     def getGitUrl(self, project):
-        pass
+        # For the moment, the timer trigger requires gerrit.
+        return self.sched.triggers['gerrit'].getGitUrl(project)
 
     def getGitwebUrl(self, project, sha=None):
         url = '%s/gitweb?p=%s.git' % (self.baseurl, project)