diff --git a/.testr.conf b/.testr.conf index 5433c070e3..222ce97160 100644 --- a/.testr.conf +++ b/.testr.conf @@ -1,4 +1,4 @@ [DEFAULT] -test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION +test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE test_list_option=--list diff --git a/doc/source/index.rst b/doc/source/index.rst index 61f9e4f579..3c793dac04 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -13,6 +13,7 @@ Contents: .. toctree:: :maxdepth: 2 + quick-start gating connections triggers diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst index c61cea8724..f368cb9420 100644 --- a/doc/source/launchers.rst +++ b/doc/source/launchers.rst @@ -6,7 +6,7 @@ https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin .. _`Turbo-Hipster`: - http://git.openstack.org/cgit/stackforge/turbo-hipster/ + https://git.openstack.org/cgit/openstack/turbo-hipster/ .. _`Turbo-Hipster Documentation`: http://turbo-hipster.rtfd.org/ diff --git a/doc/source/merger.rst b/doc/source/merger.rst index e01bc8c7df..82e204b2cc 100644 --- a/doc/source/merger.rst +++ b/doc/source/merger.rst @@ -58,3 +58,17 @@ instance, a clone will produce a repository in an unpredictable state depending on what the state of Zuul's repository is when the clone happens). They are, however, suitable for automated systems that respond to Zuul triggers. + +Clearing old references +~~~~~~~~~~~~~~~~~~~~~~~ + +The references created under refs/zuul are not garbage collected. Since +git fetch send them all to Gerrit to sync the repositories, the time +spent on merge will slightly grow overtime and start being noticeable. + +To clean them you can use the ``tools/zuul-clear-refs.py`` script on +each repositories. It will delete Zuul references that point to commits +for which the commit date is older than a given amount of days (default +360):: + + ./tools/zuul-clear-refs.py /path/to/zuul/git/repo diff --git a/doc/source/quick-start.rst b/doc/source/quick-start.rst new file mode 100644 index 0000000000..82779c6703 --- /dev/null +++ b/doc/source/quick-start.rst @@ -0,0 +1,162 @@ +Quick Start Guide +================= + +System Requirements +------------------- + +For most deployments zuul only needs 1-2GB. OpenStack uses a 30GB setup. + +Install Zuul +------------ + +You can get zuul from pypi via:: + + pip install zuul + +Zuul Components +--------------- + +Zuul provides the following components: + + - **zuul-server**: scheduler daemon which communicates with Gerrit and + Gearman. Handles receiving events, launching jobs, collecting results + and postingreports. + - **zuul-merger**: speculative-merger which communicates with Gearman. + Prepares Git repositories for jobs to test against. This additionally + requires a web server hosting the Git repositories which can be cloned + by the jobs. + - **zuul-cloner**: client side script used to setup job workspace. It is + used to clone the repositories prepared by the zuul-merger described + previously. + - **gearmand**: optional builtin gearman daemon provided by zuul-server + +External components: + + - Jenkins Gearman plugin: Used by Jenkins to connect to Gearman + +Zuul Communication +------------------ + +All the Zuul components communicate with each other using Gearman. As well as +the following communication channels: + +zuul-server: + + - Gerrit + - Gearman Daemon + +zuul-merger: + + - Gerrit + - Gearman Daemon + +zuul-cloner: + + - http hosted zuul-merger git repos + +Jenkins: + + - Gearman Daemon via Jenkins Gearman Plugin + +Zuul Setup +---------- + +At minimum we need to provide **zuul.conf** and **layout.yaml** and placed +in /etc/zuul/ directory. You will also need a zuul user and ssh key for the +zuul user in Gerrit. The following example uses the builtin gearmand service +in zuul. + +**zuul.conf**:: + + [zuul] + layout_config=/etc/zuul/layout.yaml + + [merger] + git_dir=/git + zuul_url=http://zuul.example.com/p + + [gearman_server] + start=true + + [gearman] + server=127.0.0.1 + + [connection gerrit] + driver=gerrit + server=git.example.com + port=29418 + baseurl=https://git.example.com/gerrit/ + user=zuul + sshkey=/home/zuul/.ssh/id_rsa + +See :doc:`zuul` for more details. + +The following sets up a basic timer triggered job using zuul. + +**layout.yaml**:: + + pipelines: + - name: periodic + source: gerrit + manager: IndependentPipelineManager + trigger: + timer: + - time: '0 * * * *' + + projects: + - name: aproject + periodic: + - aproject-periodic-build + +Starting Zuul +------------- + +You can run zuul-server with the **-d** option to make it not daemonize. It's +a good idea at first to confirm there's no issues with your configuration. + +Simply run:: + + zuul-server + +Once run you should have 2 zuul-server processes:: + + zuul 12102 1 0 Jan21 ? 00:15:45 /home/zuul/zuulvenv/bin/python /home/zuul/zuulvenv/bin/zuul-server -d + zuul 12107 12102 0 Jan21 ? 00:00:01 /home/zuul/zuulvenv/bin/python /home/zuul/zuulvenv/bin/zuul-server -d + +Note: In this example zuul was installed in a virtualenv. + +The 2nd zuul-server process is gearmand running if you are using the builtin +gearmand server, otherwise there will only be 1 process. + +Zuul won't actually process your Job queue however unless you also have a +zuul-merger process running. + +Simply run:: + + zuul-merger + +Zuul should now be able to process your periodic job as configured above once +the Jenkins side of things is configured. + +Jenkins Setup +------------- + +Install the Jenkins Gearman Plugin via Jenkins Plugin management interface. +Then naviage to **Manage > Configuration > Gearman** and setup the Jenkins +server hostname/ip and port to connect to gearman. + +At this point gearman should be running your Jenkins jobs. + +Troubleshooting +--------------- + +Checking Gearman function registration (jobs). You can use telnet to connect +to gearman to check that Jenkins is registering your configured jobs in +gearman:: + + telnet 4730 + +Useful commands are **workers** and **status** which you can run by just +typing those commands once connected to gearman. Every job in your Jenkins +master must appear when you run **workers** for Zuul to be able to run jobs +against your Jenkins instance. diff --git a/doc/source/statsd.rst b/doc/source/statsd.rst index f789d612a1..b3bf99f329 100644 --- a/doc/source/statsd.rst +++ b/doc/source/statsd.rst @@ -31,7 +31,7 @@ Metrics The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`): -**gerrit.events. (counters)** +**gerrit.event. (counters)** Gerrit emits different kind of message over its `stream-events` interface. As a convenience, Zuul emits metrics to statsd which save you from having to use a different daemon to measure Gerrit events. @@ -52,6 +52,18 @@ The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`): Refer to your Gerrit installation documentation for an exhaustive list of Gerrit event types. +**zuul.node_type.** + Holds metrics specifc to build nodes per label. The hierarchy is: + + #. **** each of the labels associated to a build in + Jenkins. It contains: + + #. **job.** subtree detailing per job statistics: + + #. **wait_time** counter and timer of the wait time, with the + difference of the job start time and the launch time, in + milliseconds. + **zuul.pipeline.** Holds metrics specific to jobs. The hierarchy is: @@ -75,10 +87,13 @@ The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`): known by Zuul (which includes build time and Zuul overhead). #. **total_changes** counter of the number of change proceeding since Zuul started. + #. **wait_time** counter and timer of the wait time, with the difference + of the job start time and the launch time, in milliseconds. Additionally, the `zuul.pipeline.` hierarchy contains - `current_changes` and `resident_time` metrics for each projects. The slash - separator used in Gerrit name being replaced by dots. + `current_changes` (gauge), `resident_time` (timing) and `total_changes` + (counter) metrics for each projects. The slash separator used in Gerrit name + being replaced by dots. As an example, given a job named `myjob` triggered by the `gate` pipeline which took 40 seconds to build, the Zuul scheduler will emit the following diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst index d8d72e69ce..2285ecbb66 100644 --- a/doc/source/zuul.rst +++ b/doc/source/zuul.rst @@ -10,11 +10,11 @@ Zuul has three configuration files: **zuul.conf** Connection information for Gerrit and Gearman, locations of the - other config files. + other config files. (required) **layout.yaml** - Project and pipeline configuration -- what Zuul does. + Project and pipeline configuration -- what Zuul does. (required) **logging.conf** - Python logging config. + Python logging config. (optional) Examples of each of the three files can be found in the etc/ directory of the source distribution. @@ -41,17 +41,28 @@ You can also find an example zuul.conf file in the git gearman """"""" +Client connection information for gearman. If using Zuul's builtin gearmand +server just set **server** to 127.0.0.1. + **server** Hostname or IP address of the Gearman server. - ``server=gearman.example.com`` + ``server=gearman.example.com`` (required) **port** Port on which the Gearman server is listening. - ``port=4730`` + ``port=4730`` (optional) + +**check_job_registration** + Check to see if job is registered with Gearman or not. When True + a build result of NOT_REGISTERED will be return if job is not found. + ``check_job_registration=True`` gearman_server """""""""""""" +The builtin gearman server. Zuul can fork a gearman process from itself rather +than connecting to an external one. + **start** Whether to start the internal Gearman server (default: False). ``start=true`` @@ -64,9 +75,25 @@ gearman_server Path to log config file for internal Gearman server. ``log_config=/etc/zuul/gearman-logging.yaml`` +webapp +"""""" + +**listen_address** + IP address or domain name on which to listen (default: 0.0.0.0). + ``listen_address=127.0.0.1`` + +**port** + Port on which the webapp is listening (default: 8001). + ``port=8008`` + zuul """" +Zuul's main configuration section. At minimum zuul must be able to find +layout.yaml to be useful. + +.. note:: Must be provided when running zuul-server + .. _layout_config: **layout_config** @@ -118,6 +145,13 @@ zuul merger """""" +The zuul-merger process configuration. Detailed documentation on this process +can be found on the :doc:`merger` page. + +.. note:: Must be provided when running zuul-merger. Both services may share the + same configuration (and even host) or otherwise have an individual + zuul.conf. + **git_dir** Directory that Zuul should clone local git repositories to. ``git_dir=/var/lib/zuul/git`` @@ -394,11 +428,12 @@ explanation of each of the parameters:: approval matching all specified requirements. *username* - If present, an approval from this username is required. + If present, an approval from this username is required. It is + treated as a regular expression. *email* If present, an approval with this email address is required. It - is treated as a regular expression as above. + is treated as a regular expression. *email-filter* (deprecated) A deprecated alternate spelling of *email*. Only one of *email* or @@ -759,7 +794,10 @@ each job as it builds a list from the project specification. expressions. The pattern for '/COMMIT_MSG' is always matched on and does not - have to be included. + have to be included. Exception is merge commits (without modified + files), in this case '/COMMIT_MSG' is not matched, and job is not + skipped. In case of merge commits it's assumed that list of modified + files isn't predictible and CI should be run. **voting (optional)** Boolean value (``true`` or ``false``) that indicates whatever @@ -997,9 +1035,8 @@ normal operation, omit ``-d`` and let Zuul run as a daemon. If you send signal 1 (SIGHUP) to the zuul-server process, Zuul will stop executing new jobs, wait until all executing jobs are finished, -reload its configuration, and resume. Any values in any of the -configuration files may be changed, except the location of Zuul's PID -file (a change to that will be ignored until Zuul is restarted). +reload its layout.yaml, and resume. Changes to any connections or +the PID file will be ignored until Zuul is restarted. If you send a SIGUSR1 to the zuul-server process, Zuul will stop executing new jobs, wait until all executing jobs are finished, diff --git a/etc/status/public_html/jquery.zuul.js b/etc/status/public_html/jquery.zuul.js index c63700a5fe..9df44cee7a 100644 --- a/etc/status/public_html/jquery.zuul.js +++ b/etc/status/public_html/jquery.zuul.js @@ -490,10 +490,12 @@ $header_div.append($heading); if (typeof pipeline.description === 'string') { + var descr = $('') + $.each( pipeline.description.split(/\r?\n\r?\n/), function(index, descr_part){ + descr.append($('

').text(descr_part)); + }); $header_div.append( - $('

').append( - $('').text(pipeline.description) - ) + $('

').append(descr) ); } return $header_div; diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample index 21c1317d6d..d7b8eaeb55 100644 --- a/etc/zuul.conf-sample +++ b/etc/zuul.conf-sample @@ -26,6 +26,10 @@ default_container=logs region_name=EXP logserver_prefix=http://logs.example.org/server.app/ +[webapp] +listen_address=0.0.0.0 +port=8001 + [connection gerrit] driver=gerrit server=review.example.com diff --git a/other-requirements.txt b/other-requirements.txt new file mode 100644 index 0000000000..1ade6557cd --- /dev/null +++ b/other-requirements.txt @@ -0,0 +1,4 @@ +mysql-client [test] +mysql-server [test] +postgresql [test] +postgresql-client [test] diff --git a/requirements.txt b/requirements.txt index 84b9008d20..ec0b76ac48 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ pbr>=1.1.0 PyYAML>=3.1.0 Paste WebOb>=1.2.3 -paramiko>=1.8.0 +paramiko>=1.8.0,<2.0.0 GitPython>=0.3.3 ordereddict python-daemon>=2.0.4,<2.1.0 diff --git a/setup.cfg b/setup.cfg index 620e1ac5c8..7ddeb84be2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,6 +25,7 @@ console_scripts = zuul-merger = zuul.cmd.merger:main zuul = zuul.cmd.client:main zuul-cloner = zuul.cmd.cloner:main + zuul-launcher = zuul.cmd.launcher:main [build_sphinx] source-dir = doc/source diff --git a/tests/base.py b/tests/base.py index d888904511..a3c9242fec 100755 --- a/tests/base.py +++ b/tests/base.py @@ -22,10 +22,12 @@ import logging import os import pprint from six.moves import queue as Queue +from six.moves import urllib import random import re import select import shutil +from six.moves import reload_module import socket import string import subprocess @@ -33,12 +35,10 @@ import swiftclient import tempfile import threading import time -import urllib2 import git import gear import fixtures -import six.moves.urllib.parse as urlparse import statsd import testtools from git import GitCommandError @@ -482,7 +482,7 @@ class FakeURLOpener(object): self.url = url def read(self): - res = urlparse.urlparse(self.url) + res = urllib.parse.urlparse(self.url) path = res.path project = '/'.join(path.split('/')[2:-2]) ret = '001e# service=git-upload-pack\n' @@ -882,6 +882,28 @@ class BaseTestCase(testtools.TestCase): format='%(asctime)s %(name)-32s ' '%(levelname)-8s %(message)s')) + # NOTE(notmorgan): Extract logging overrides for specific libraries + # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for + # each. This is used to limit the output during test runs from + # libraries that zuul depends on such as gear. + log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS') + + if log_defaults_from_env: + for default in log_defaults_from_env.split(','): + try: + name, level_str = default.split('=', 1) + level = getattr(logging, level_str, logging.DEBUG) + self.useFixture(fixtures.FakeLogger( + name=name, + level=level, + format='%(asctime)s %(name)-32s ' + '%(levelname)-8s %(message)s')) + except ValueError: + # NOTE(notmorgan): Invalid format of the log default, + # skip and don't try and apply a logger for the + # specified module + pass + class ZuulTestCase(BaseTestCase): config_file = 'zuul.conf' @@ -897,11 +919,13 @@ class ZuulTestCase(BaseTestCase): self.test_root = os.path.join(tmp_root, "zuul-test") self.upstream_root = os.path.join(self.test_root, "upstream") self.git_root = os.path.join(self.test_root, "git") + self.state_root = os.path.join(self.test_root, "lib") if os.path.exists(self.test_root): shutil.rmtree(self.test_root) os.makedirs(self.test_root) os.makedirs(self.upstream_root) + os.makedirs(self.state_root) # Make per test copy of Configuration. self.setup_config() @@ -909,6 +933,7 @@ class ZuulTestCase(BaseTestCase): os.path.join(FIXTURE_DIR, self.config.get('zuul', 'tenant_config'))) self.config.set('merger', 'git_dir', self.git_root) + self.config.set('zuul', 'state_dir', self.state_root) # For each project in config: self.init_repo("org/project") @@ -937,8 +962,8 @@ class ZuulTestCase(BaseTestCase): os.environ['STATSD_PORT'] = str(self.statsd.port) self.statsd.start() # the statsd client object is configured in the statsd module import - reload(statsd) - reload(zuul.scheduler) + reload_module(statsd) + reload_module(zuul.scheduler) self.gearman_server = FakeGearmanServer() @@ -967,12 +992,12 @@ class ZuulTestCase(BaseTestCase): self.ansible_server.start() def URLOpenerFactory(*args, **kw): - if isinstance(args[0], urllib2.Request): + if isinstance(args[0], urllib.request.Request): return old_urlopen(*args, **kw) return FakeURLOpener(self.upstream_root, *args, **kw) - old_urlopen = urllib2.urlopen - urllib2.urlopen = URLOpenerFactory + old_urlopen = urllib.request.urlopen + urllib.request.urlopen = URLOpenerFactory self.launcher = zuul.launcher.client.LaunchClient( self.config, self.sched, self.swift) @@ -982,7 +1007,8 @@ class ZuulTestCase(BaseTestCase): self.sched.setLauncher(self.launcher) self.sched.setMerger(self.merge_client) - self.webapp = zuul.webapp.WebApp(self.sched, port=0) + self.webapp = zuul.webapp.WebApp( + self.sched, port=0, listen_address='127.0.0.1') self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched) self.sched.start() @@ -1179,6 +1205,17 @@ class ZuulTestCase(BaseTestCase): zuul.merger.merger.reset_repo_to_head(repo) repo.git.clean('-x', '-f', '-d') + def create_commit(self, project): + path = os.path.join(self.upstream_root, project) + repo = git.Repo(path) + repo.head.reference = repo.heads['master'] + file_name = os.path.join(path, 'README') + with open(file_name, 'a') as f: + f.write('creating fake commit\n') + repo.index.add([file_name]) + commit = repo.index.commit('Creating a fake commit') + return commit.hexsha + def ref_has_change(self, ref, change): path = os.path.join(self.git_root, change.project) repo = git.Repo(path) @@ -1325,9 +1362,11 @@ class ZuulTestCase(BaseTestCase): start = time.time() while True: if time.time() - start > 10: - print 'queue status:', - print ' '.join(self.eventQueuesEmpty()) - print self.areAllBuildsWaiting() + self.log.debug("Queue status:") + for queue in self.event_queues: + self.log.debug(" %s: %s" % (queue, queue.empty())) + self.log.debug("All builds waiting: %s" % + (self.areAllBuildsWaiting(),)) raise Exception("Timeout waiting for Zuul to settle") # Make sure no new events show up while we're checking # have all build states propogated to zuul? @@ -1369,8 +1408,8 @@ class ZuulTestCase(BaseTestCase): for pipeline in tenant.layout.pipelines.values(): for queue in pipeline.queues: if len(queue.queue) != 0: - print 'pipeline %s queue %s contents %s' % ( - pipeline.name, queue.name, queue.queue) + print('pipeline %s queue %s contents %s' % ( + pipeline.name, queue.name, queue.queue)) self.assertEqual(len(queue.queue), 0, "Pipelines queues should be empty") diff --git a/tests/fixtures/layout-requirement-username.yaml b/tests/fixtures/layout-requirement-username.yaml index 7a549f04b6..f9e647752d 100644 --- a/tests/fixtures/layout-requirement-username.yaml +++ b/tests/fixtures/layout-requirement-username.yaml @@ -3,7 +3,7 @@ pipelines: manager: IndependentPipelineManager require: approval: - - username: jenkins + - username: ^(jenkins|zuul)$ trigger: gerrit: - event: comment-added diff --git a/tests/fixtures/layout-success-pattern.yaml b/tests/fixtures/layout-success-pattern.yaml new file mode 100644 index 0000000000..cea15f123a --- /dev/null +++ b/tests/fixtures/layout-success-pattern.yaml @@ -0,0 +1,21 @@ +pipelines: + - name: check + manager: IndependentPipelineManager + trigger: + gerrit: + - event: patchset-created + success: + smtp: + to: me@example.org + +jobs: + - name: docs-draft-test + success-pattern: http://docs-draft.example.org/{build.parameters[LOG_PATH]}/publish-docs/ + - name: docs-draft-test2 + success-pattern: http://docs-draft.example.org/{NOPE}/{build.parameters[BAD]}/publish-docs/ + +projects: + - name: org/docs + check: + - docs-draft-test: + - docs-draft-test2 diff --git a/tests/test_change_matcher.py b/tests/test_change_matcher.py index 1f4ab93d61..05853223a5 100644 --- a/tests/test_change_matcher.py +++ b/tests/test_change_matcher.py @@ -123,13 +123,13 @@ class TestMatchAllFiles(BaseTestMatcher): self._test_matches(False) def test_matches_returns_false_when_not_all_files_match(self): - self._test_matches(False, files=['docs/foo', 'foo/bar']) + self._test_matches(False, files=['/COMMIT_MSG', 'docs/foo', 'foo/bar']) - def test_matches_returns_true_when_commit_message_matches(self): - self._test_matches(True, files=['/COMMIT_MSG']) + def test_matches_returns_false_when_commit_message_matches(self): + self._test_matches(False, files=['/COMMIT_MSG']) def test_matches_returns_true_when_all_files_match(self): - self._test_matches(True, files=['docs/foo']) + self._test_matches(True, files=['/COMMIT_MSG', 'docs/foo']) class TestMatchAll(BaseTestMatcher): diff --git a/tests/test_cloner.py b/tests/test_cloner.py index 82d1812ea2..67b5303a38 100644 --- a/tests/test_cloner.py +++ b/tests/test_cloner.py @@ -568,3 +568,57 @@ class TestCloner(ZuulTestCase): self.worker.hold_jobs_in_build = False self.worker.release() self.waitUntilSettled() + + def test_post_checkout(self): + project = "org/project" + path = os.path.join(self.upstream_root, project) + repo = git.Repo(path) + repo.head.reference = repo.heads['master'] + commits = [] + for i in range(0, 3): + commits.append(self.create_commit(project)) + newRev = commits[1] + + cloner = zuul.lib.cloner.Cloner( + git_base_url=self.upstream_root, + projects=[project], + workspace=self.workspace_root, + zuul_branch=None, + zuul_ref='master', + zuul_url=self.git_root, + zuul_project=project, + zuul_newrev=newRev, + ) + cloner.execute() + repos = self.getWorkspaceRepos([project]) + cloned_sha = repos[project].rev_parse('HEAD').hexsha + self.assertEqual(newRev, cloned_sha) + + def test_post_and_master_checkout(self): + project = "org/project1" + master_project = "org/project2" + path = os.path.join(self.upstream_root, project) + repo = git.Repo(path) + repo.head.reference = repo.heads['master'] + commits = [] + for i in range(0, 3): + commits.append(self.create_commit(project)) + newRev = commits[1] + + cloner = zuul.lib.cloner.Cloner( + git_base_url=self.upstream_root, + projects=[project, master_project], + workspace=self.workspace_root, + zuul_branch=None, + zuul_ref='master', + zuul_url=self.git_root, + zuul_project=project, + zuul_newrev=newRev + ) + cloner.execute() + repos = self.getWorkspaceRepos([project, master_project]) + cloned_sha = repos[project].rev_parse('HEAD').hexsha + self.assertEqual(newRev, cloned_sha) + self.assertEqual( + repos[master_project].rev_parse('HEAD').hexsha, + repos[master_project].rev_parse('master').hexsha) diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py index bd507d19a0..38c8e29a6a 100644 --- a/tests/test_layoutvalidator.py +++ b/tests/test_layoutvalidator.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -import ConfigParser +from six.moves import configparser as ConfigParser import os import re @@ -36,13 +36,13 @@ class TestLayoutValidator(testtools.TestCase): def test_layouts(self): """Test layout file validation""" - print + print() errors = [] for fn in os.listdir(os.path.join(FIXTURE_DIR, 'layouts')): m = LAYOUT_RE.match(fn) if not m: continue - print fn + print(fn) # Load any .conf file by the same name but .conf extension. config_file = ("%s.conf" % @@ -72,7 +72,7 @@ class TestLayoutValidator(testtools.TestCase): fn) except voluptuous.Invalid as e: error = str(e) - print ' ', error + print(' ', error) if error in errors: raise Exception("Error has already been tested: %s" % error) diff --git a/tests/test_model.py b/tests/test_model.py index f8f74dc793..145c119185 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -12,6 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. + +import os +import random + +import fixtures + from zuul import model from zuul import configloader @@ -32,12 +38,12 @@ class TestJob(BaseTestCase): def test_change_matches_returns_false_for_matched_skip_if(self): change = model.Change('project') - change.files = ['docs/foo'] + change.files = ['/COMMIT_MSG', 'docs/foo'] self.assertFalse(self.job.changeMatches(change)) def test_change_matches_returns_true_for_unmatched_skip_if(self): change = model.Change('project') - change.files = ['foo'] + change.files = ['/COMMIT_MSG', 'foo'] self.assertTrue(self.job.changeMatches(change)) def test_job_sets_defaults_for_boolean_attributes(self): @@ -98,3 +104,76 @@ class TestJob(BaseTestCase): job = item.getJobs()[0] self.assertEqual(job.name, 'python27') self.assertEqual(job.timeout, 50) + + +class TestJobTimeData(BaseTestCase): + def setUp(self): + super(TestJobTimeData, self).setUp() + self.tmp_root = self.useFixture(fixtures.TempDir( + rootdir=os.environ.get("ZUUL_TEST_ROOT")) + ).path + + def test_empty_timedata(self): + path = os.path.join(self.tmp_root, 'job-name') + self.assertFalse(os.path.exists(path)) + self.assertFalse(os.path.exists(path + '.tmp')) + td = model.JobTimeData(path) + self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + + def test_save_reload(self): + path = os.path.join(self.tmp_root, 'job-name') + self.assertFalse(os.path.exists(path)) + self.assertFalse(os.path.exists(path + '.tmp')) + td = model.JobTimeData(path) + self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + success_times = [] + failure_times = [] + results = [] + for x in range(10): + success_times.append(int(random.random() * 1000)) + failure_times.append(int(random.random() * 1000)) + results.append(0) + results.append(1) + random.shuffle(results) + s = f = 0 + for result in results: + if result: + td.add(failure_times[f], 'FAILURE') + f += 1 + else: + td.add(success_times[s], 'SUCCESS') + s += 1 + self.assertEqual(td.success_times, success_times) + self.assertEqual(td.failure_times, failure_times) + self.assertEqual(td.results, results[10:]) + td.save() + self.assertTrue(os.path.exists(path)) + self.assertFalse(os.path.exists(path + '.tmp')) + td = model.JobTimeData(path) + td.load() + self.assertEqual(td.success_times, success_times) + self.assertEqual(td.failure_times, failure_times) + self.assertEqual(td.results, results[10:]) + + +class TestTimeDataBase(BaseTestCase): + def setUp(self): + super(TestTimeDataBase, self).setUp() + self.tmp_root = self.useFixture(fixtures.TempDir( + rootdir=os.environ.get("ZUUL_TEST_ROOT")) + ).path + self.db = model.TimeDataBase(self.tmp_root) + + def test_timedatabase(self): + self.assertEqual(self.db.getEstimatedTime('job-name'), 0) + self.db.update('job-name', 50, 'SUCCESS') + self.assertEqual(self.db.getEstimatedTime('job-name'), 50) + self.db.update('job-name', 100, 'SUCCESS') + self.assertEqual(self.db.getEstimatedTime('job-name'), 75) + for x in range(10): + self.db.update('job-name', 100, 'SUCCESS') + self.assertEqual(self.db.getEstimatedTime('job-name'), 100) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 7ae7de5175..df6bc1bcf4 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -20,11 +20,10 @@ import os import re import shutil import time -import urllib -import urllib2 import yaml import git +from six.moves import urllib import testtools import zuul.change_matcher @@ -501,6 +500,46 @@ class TestScheduler(ZuulTestCase): self.assertEqual(B.reported, 2) self.assertEqual(C.reported, 2) + def _test_time_database(self, iteration): + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addApproval('CRVW', 2) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + time.sleep(2) + + data = json.loads(self.sched.formatStatusJSON()) + found_job = None + for pipeline in data['pipelines']: + if pipeline['name'] != 'gate': + continue + for queue in pipeline['change_queues']: + for head in queue['heads']: + for item in head: + for job in item['jobs']: + if job['name'] == 'project-merge': + found_job = job + break + + self.assertIsNotNone(found_job) + if iteration == 1: + self.assertIsNotNone(found_job['estimated_time']) + self.assertIsNone(found_job['remaining_time']) + else: + self.assertIsNotNone(found_job['estimated_time']) + self.assertTrue(found_job['estimated_time'] >= 2) + self.assertIsNotNone(found_job['remaining_time']) + + self.worker.hold_jobs_in_build = False + self.worker.release() + self.waitUntilSettled() + + def test_time_database(self): + "Test the time database" + + self._test_time_database(1) + self._test_time_database(2) + def test_two_failed_changes_at_head(self): "Test that changes are reparented correctly if 2 fail at head" @@ -606,6 +645,36 @@ class TestScheduler(ZuulTestCase): self.assertEqual(B.reported, 2) self.assertEqual(C.reported, 2) + def test_parse_skip_if(self): + job_yaml = """ +jobs: + - name: job_name + skip-if: + - project: ^project_name$ + branch: ^stable/icehouse$ + all-files-match-any: + - ^filename$ + - project: ^project2_name$ + all-files-match-any: + - ^filename2$ + """.strip() + data = yaml.load(job_yaml) + config_job = data.get('jobs')[0] + cm = zuul.change_matcher + expected = cm.MatchAny([ + cm.MatchAll([ + cm.ProjectMatcher('^project_name$'), + cm.BranchMatcher('^stable/icehouse$'), + cm.MatchAllFiles([cm.FileMatcher('^filename$')]), + ]), + cm.MatchAll([ + cm.ProjectMatcher('^project2_name$'), + cm.MatchAllFiles([cm.FileMatcher('^filename2$')]), + ]), + ]) + matcher = self.sched._parseSkipIf(config_job) + self.assertEqual(expected, matcher) + def test_patch_order(self): "Test that dependent patches are tested in the right order" A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') @@ -1455,7 +1524,7 @@ class TestScheduler(ZuulTestCase): self.worker.build_history = [] path = os.path.join(self.git_root, "org/project") - print repack_repo(path) + print(repack_repo(path)) A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addApproval('CRVW', 2) @@ -1480,9 +1549,9 @@ class TestScheduler(ZuulTestCase): A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') A.addPatchset(large=True) path = os.path.join(self.upstream_root, "org/project1") - print repack_repo(path) + print(repack_repo(path)) path = os.path.join(self.git_root, "org/project1") - print repack_repo(path) + print(repack_repo(path)) A.addApproval('CRVW', 2) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) @@ -2241,15 +2310,18 @@ class TestScheduler(ZuulTestCase): self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.waitUntilSettled() + self.worker.release('project-merge') + self.waitUntilSettled() + port = self.webapp.server.socket.getsockname()[1] - req = urllib2.Request("http://localhost:%s/status.json" % port) - f = urllib2.urlopen(req) + req = urllib.request.Request("http://localhost:%s/status.json" % port) + f = urllib.request.urlopen(req) headers = f.info() self.assertIn('Content-Length', headers) self.assertIn('Content-Type', headers) - self.assertEqual(headers['Content-Type'], - 'application/json; charset=UTF-8') + self.assertIsNotNone(re.match('^application/json(; charset=UTF-8)?$', + headers['Content-Type'])) self.assertIn('Access-Control-Allow-Origin', headers) self.assertIn('Cache-Control', headers) self.assertIn('Last-Modified', headers) @@ -2261,7 +2333,7 @@ class TestScheduler(ZuulTestCase): self.waitUntilSettled() data = json.loads(data) - status_jobs = set() + status_jobs = [] for p in data['pipelines']: for q in p['change_queues']: if p['name'] in ['gate', 'conflict']: @@ -2273,10 +2345,24 @@ class TestScheduler(ZuulTestCase): self.assertTrue(change['active']) self.assertEqual(change['id'], '1,1') for job in change['jobs']: - status_jobs.add(job['name']) - self.assertIn('project-merge', status_jobs) - self.assertIn('project-test1', status_jobs) - self.assertIn('project-test2', status_jobs) + status_jobs.append(job) + self.assertEqual('project-merge', status_jobs[0]['name']) + self.assertEqual('https://server/job/project-merge/0/', + status_jobs[0]['url']) + self.assertEqual('http://logs.example.com/1/1/gate/project-merge/0', + status_jobs[0]['report_url']) + + self.assertEqual('project-test1', status_jobs[1]['name']) + self.assertEqual('https://server/job/project-test1/1/', + status_jobs[1]['url']) + self.assertEqual('http://logs.example.com/1/1/gate/project-test1/1', + status_jobs[1]['report_url']) + + self.assertEqual('project-test2', status_jobs[2]['name']) + self.assertEqual('https://server/job/project-test2/2/', + status_jobs[2]['url']) + self.assertEqual('http://logs.example.com/1/1/gate/project-test2/2', + status_jobs[2]['report_url']) def test_merging_queues(self): "Test that transitively-connected change queues are merged" @@ -2829,7 +2915,8 @@ class TestScheduler(ZuulTestCase): port = self.webapp.server.socket.getsockname()[1] - f = urllib.urlopen("http://localhost:%s/status.json" % port) + req = urllib.request.Request("http://localhost:%s/status.json" % port) + f = urllib.request.urlopen(req) data = f.read() self.worker.hold_jobs_in_build = False @@ -4215,6 +4302,45 @@ For CI problems and help debugging, contact ci@example.org""" self.waitUntilSettled() self.assertEqual(self.history[-1].changes, '3,2 2,1 1,2') + def test_crd_cycle_join(self): + "Test an updated change creates a cycle" + A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A') + + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Create B->A + B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') + B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( + B.subject, A.data['id']) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Update A to add A->B (a cycle). + A.addPatchset() + A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( + A.subject, B.data['id']) + # Normally we would submit the patchset-created event for + # processing here, however, we have no way of noting whether + # the dependency cycle detection correctly raised an + # exception, so instead, we reach into the source driver and + # call the method that would ultimately be called by the event + # processing. + + source = self.sched.layout.pipelines['gate'].source + with testtools.ExpectedException( + Exception, "Dependency cycle detected"): + source._getChange(u'1', u'2', True) + self.log.debug("Got expected dependency cycle exception") + + # Now if we update B to remove the depends-on, everything + # should be okay. B; A->B + + B.addPatchset() + B.data['commitMessage'] = '%s\n' % (B.subject,) + source._getChange(u'1', u'2', True) + source._getChange(u'2', u'2', True) + def test_disable_at(self): "Test a pipeline will only report to the disabled trigger when failing" @@ -4336,3 +4462,38 @@ For CI problems and help debugging, contact ci@example.org""" self.assertIn('Build failed.', K.messages[0]) # No more messages reported via smtp self.assertEqual(3, len(self.smtp_messages)) + + def test_success_pattern(self): + "Ensure bad build params are ignored" + + # Use SMTP reporter to grab the result message easier + self.init_repo("org/docs") + self.config.set('zuul', 'layout_config', + 'tests/fixtures/layout-success-pattern.yaml') + self.sched.reconfigure(self.config) + self.worker.hold_jobs_in_build = True + self.registerJobs() + + A = self.fake_gerrit.addFakeChange('org/docs', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Grab build id + self.assertEqual(len(self.builds), 1) + uuid = self.builds[0].unique[:7] + + self.worker.hold_jobs_in_build = False + self.worker.release() + self.waitUntilSettled() + + self.assertEqual(len(self.smtp_messages), 1) + body = self.smtp_messages[0]['body'].splitlines() + self.assertEqual('Build succeeded.', body[0]) + + self.assertIn( + '- docs-draft-test http://docs-draft.example.org/1/1/1/check/' + 'docs-draft-test/%s/publish-docs/' % uuid, + body[2]) + self.assertIn( + '- docs-draft-test2 https://server/job/docs-draft-test2/1/', + body[3]) diff --git a/tests/test_webapp.py b/tests/test_webapp.py index bc5961fed6..555c08f331 100644 --- a/tests/test_webapp.py +++ b/tests/test_webapp.py @@ -16,7 +16,8 @@ # under the License. import json -import urllib2 + +from six.moves import urllib from tests.base import ZuulTestCase @@ -46,41 +47,41 @@ class TestWebapp(ZuulTestCase): def test_webapp_status(self): "Test that we can filter to only certain changes in the webapp." - req = urllib2.Request( + req = urllib.request.Request( "http://localhost:%s/status" % self.port) - f = urllib2.urlopen(req) + f = urllib.request.urlopen(req) data = json.loads(f.read()) self.assertIn('pipelines', data) def test_webapp_status_compat(self): # testing compat with status.json - req = urllib2.Request( + req = urllib.request.Request( "http://localhost:%s/status.json" % self.port) - f = urllib2.urlopen(req) + f = urllib.request.urlopen(req) data = json.loads(f.read()) self.assertIn('pipelines', data) def test_webapp_bad_url(self): # do we 404 correctly - req = urllib2.Request( + req = urllib.request.Request( "http://localhost:%s/status/foo" % self.port) - self.assertRaises(urllib2.HTTPError, urllib2.urlopen, req) + self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req) def test_webapp_find_change(self): # can we filter by change id - req = urllib2.Request( + req = urllib.request.Request( "http://localhost:%s/status/change/1,1" % self.port) - f = urllib2.urlopen(req) + f = urllib.request.urlopen(req) data = json.loads(f.read()) self.assertEqual(1, len(data), data) self.assertEqual("org/project", data[0]['project']) - req = urllib2.Request( + req = urllib.request.Request( "http://localhost:%s/status/change/2,1" % self.port) - f = urllib2.urlopen(req) + f = urllib.request.urlopen(req) data = json.loads(f.read()) self.assertEqual(1, len(data), data) diff --git a/tools/trigger-job.py b/tools/trigger-job.py index dff4e3fd1d..7123afce86 100755 --- a/tools/trigger-job.py +++ b/tools/trigger-job.py @@ -68,7 +68,7 @@ def main(): job = gear.Job("build:%s" % args.job, json.dumps(data), unique=data['ZUUL_UUID']) - c.submitJob(job) + c.submitJob(job, precedence=gear.PRECEDENCE_HIGH) while not job.complete: time.sleep(1) diff --git a/tools/zuul-changes.py b/tools/zuul-changes.py index 9dbf504e74..8b854c79de 100755 --- a/tools/zuul-changes.py +++ b/tools/zuul-changes.py @@ -35,7 +35,7 @@ for pipeline in data['pipelines']: if not change['live']: continue cid, cps = change['id'].split(',') - print ( + print( "zuul enqueue --trigger gerrit --pipeline %s " "--project %s --change %s,%s" % ( options.pipeline_name, diff --git a/tools/zuul-clear-refs.py b/tools/zuul-clear-refs.py new file mode 100755 index 0000000000..60ce74422f --- /dev/null +++ b/tools/zuul-clear-refs.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +# Copyright 2014-2015 Antoine "hashar" Musso +# Copyright 2014-2015 Wikimedia Foundation Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# pylint: disable=locally-disabled, invalid-name + +""" +Zuul references cleaner. + +Clear up references under /refs/zuul/ by inspecting the age of the commit the +reference points to. If the commit date is older than a number of days +specificed by --until, the reference is deleted from the git repository. + +Use --dry-run --verbose to finely inspect the script behavior. +""" + +import argparse +import git +import logging +import time +import sys + +NOW = int(time.time()) +DEFAULT_DAYS = 360 +ZUUL_REF_PREFIX = 'refs/zuul/' + +parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, +) +parser.add_argument('--until', dest='days_ago', default=DEFAULT_DAYS, type=int, + help='references older than this number of day will ' + 'be deleted. Default: %s' % DEFAULT_DAYS) +parser.add_argument('-n', '--dry-run', dest='dryrun', action='store_true', + help='do not delete references') +parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', + help='set log level from info to debug') +parser.add_argument('gitrepo', help='path to a Zuul git repository') +args = parser.parse_args() + +logging.basicConfig() +log = logging.getLogger('zuul-clear-refs') +if args.verbose: + log.setLevel(logging.DEBUG) +else: + log.setLevel(logging.INFO) + +try: + repo = git.Repo(args.gitrepo) +except git.exc.InvalidGitRepositoryError: + log.error("Invalid git repo: %s" % args.gitrepo) + sys.exit(1) + +for ref in repo.references: + + if not ref.path.startswith(ZUUL_REF_PREFIX): + continue + if type(ref) is not git.refs.reference.Reference: + # Paranoia: ignore heads/tags/remotes .. + continue + + try: + commit_ts = ref.commit.committed_date + except LookupError: + # GitPython does not properly handle PGP signed tags + log.exception("Error in commit: %s, ref: %s. Type: %s", + ref.commit, ref.path, type(ref)) + continue + + commit_age = int((NOW - commit_ts) / 86400) # days + log.debug( + "%s at %s is %3s days old", + ref.commit, + ref.path, + commit_age, + ) + if commit_age > args.days_ago: + if args.dryrun: + log.info("Would delete old ref: %s (%s)", ref.path, ref.commit) + else: + log.info("Deleting old ref: %s (%s)", ref.path, ref.commit) + ref.delete(repo, ref.path) diff --git a/tox.ini b/tox.ini index ad0aa3150a..1f6b39eff5 100644 --- a/tox.ini +++ b/tox.ini @@ -9,6 +9,7 @@ setenv = STATSD_HOST=127.0.0.1 STATSD_PORT=8125 VIRTUAL_ENV={envdir} OS_TEST_TIMEOUT=30 + OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO} passenv = ZUUL_TEST_ROOT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE usedevelop = True install_command = pip install {opts} {packages} @@ -17,7 +18,17 @@ deps = -r{toxinidir}/requirements.txt commands = python setup.py testr --slowest --testr-args='{posargs}' +[testenv:bindep] +# Do not install any requirements. We want this to be fast and work even if +# system dependencies are missing, since it's used to tell you what system +# dependencies are missing! This also means that bindep must be installed +# separately, outside of the requirements files. +deps = bindep +commands = bindep test + [testenv:pep8] +# streamer is python3 only, so we need to run flake8 in python3 +basepython = python3 commands = flake8 {posargs} [testenv:cover] diff --git a/zuul/ansible/__init__.py b/zuul/ansible/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/ansible/library/__init__.py b/zuul/ansible/library/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/ansible/library/zuul_console.py b/zuul/ansible/library/zuul_console.py new file mode 100644 index 0000000000..78f3249c88 --- /dev/null +++ b/zuul/ansible/library/zuul_console.py @@ -0,0 +1,199 @@ +#!/usr/bin/python + +# Copyright (c) 2016 IBM Corp. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import os +import sys +import socket +import threading + + +def daemonize(): + # A really basic daemonize method that should work well enough for + # now in this circumstance. Based on the public domain code at: + # http://web.archive.org/web/20131017130434/http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/ + + pid = os.fork() + if pid > 0: + return True + + os.chdir('/') + os.setsid() + os.umask(0) + + pid = os.fork() + if pid > 0: + sys.exit(0) + + sys.stdout.flush() + sys.stderr.flush() + i = open('/dev/null', 'r') + o = open('/dev/null', 'a+') + e = open('/dev/null', 'a+', 0) + os.dup2(i.fileno(), sys.stdin.fileno()) + os.dup2(o.fileno(), sys.stdout.fileno()) + os.dup2(e.fileno(), sys.stderr.fileno()) + return False + + +class Console(object): + def __init__(self, path): + self.path = path + self.file = open(path) + self.stat = os.stat(path) + self.size = self.stat.st_size + + +class Server(object): + def __init__(self, path, port): + self.path = path + s = None + for res in socket.getaddrinfo(None, port, socket.AF_UNSPEC, + socket.SOCK_STREAM, 0, + socket.AI_PASSIVE): + af, socktype, proto, canonname, sa = res + try: + s = socket.socket(af, socktype, proto) + s.setsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR, 1) + except socket.error: + s = None + continue + try: + s.bind(sa) + s.listen(1) + except socket.error: + s.close() + s = None + continue + break + if s is None: + sys.exit(1) + self.socket = s + + def accept(self): + conn, addr = self.socket.accept() + return conn + + def run(self): + while True: + conn = self.accept() + t = threading.Thread(target=self.handleOneConnection, args=(conn,)) + t.daemon = True + t.start() + + def chunkConsole(self, conn): + try: + console = Console(self.path) + except Exception: + return + while True: + chunk = console.file.read(4096) + if not chunk: + break + conn.send(chunk) + return console + + def followConsole(self, console, conn): + while True: + # As long as we have unread data, keep reading/sending + while True: + chunk = console.file.read(4096) + if chunk: + conn.send(chunk) + else: + break + + # At this point, we are waiting for more data to be written + time.sleep(0.5) + + # Check to see if the remote end has sent any data, if so, + # discard + r, w, e = select.select([conn], [], [conn], 0) + if conn in e: + return False + if conn in r: + ret = conn.recv(1024) + # Discard anything read, if input is eof, it has + # disconnected. + if not ret: + return False + + # See if the file has been truncated + try: + st = os.stat(console.path) + if (st.st_ino != console.stat.st_ino or + st.st_size < console.size): + return True + except Exception: + return True + console.size = st.st_size + + def handleOneConnection(self, conn): + # FIXME: this won't notice disconnects until it tries to send + console = None + try: + while True: + if console is not None: + try: + console.file.close() + except: + pass + while True: + console = self.chunkConsole(conn) + if console: + break + time.sleep(0.5) + while True: + if self.followConsole(console, conn): + break + else: + return + finally: + try: + conn.close() + except Exception: + pass + + +def test(): + s = Server('/tmp/console.html', 8088) + s.run() + + +def main(): + module = AnsibleModule( + argument_spec=dict( + path=dict(default='/tmp/console.html'), + port=dict(default=8088, type='int'), + ) + ) + + p = module.params + path = p['path'] + port = p['port'] + + if daemonize(): + module.exit_json() + + s = Server(path, port) + s.run() + +from ansible.module_utils.basic import * # noqa + +if __name__ == '__main__': + main() +# test() diff --git a/zuul/ansible/library/zuul_log.py b/zuul/ansible/library/zuul_log.py new file mode 100644 index 0000000000..4b377d9079 --- /dev/null +++ b/zuul/ansible/library/zuul_log.py @@ -0,0 +1,58 @@ +#!/usr/bin/python + +# Copyright (c) 2016 IBM Corp. +# Copyright (c) 2016 Red Hat +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import datetime + + +class Console(object): + def __enter__(self): + self.logfile = open('/tmp/console.html', 'a', 0) + return self + + def __exit__(self, etype, value, tb): + self.logfile.close() + + def addLine(self, ln): + ts = datetime.datetime.now() + outln = '%s | %s' % (str(ts), ln) + self.logfile.write(outln) + + +def log(msg): + if not isinstance(msg, list): + msg = [msg] + with Console() as console: + for line in msg: + console.addLine("[Zuul] %s\n" % line) + + +def main(): + module = AnsibleModule( + argument_spec=dict( + msg=dict(required=True, type='raw'), + ) + ) + + p = module.params + log(p['msg']) + module.exit_json(changed=True) + +from ansible.module_utils.basic import * # noqa + +if __name__ == '__main__': + main() diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py new file mode 100644 index 0000000000..7689fb3c8e --- /dev/null +++ b/zuul/ansible/library/zuul_runner.py @@ -0,0 +1,131 @@ +#!/usr/bin/python + +# Copyright (c) 2016 IBM Corp. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import datetime +import getpass +import os +import subprocess +import threading + + +class Console(object): + def __enter__(self): + self.logfile = open('/tmp/console.html', 'a', 0) + return self + + def __exit__(self, etype, value, tb): + self.logfile.close() + + def addLine(self, ln): + # Note this format with deliminator is "inspired" by the old + # Jenkins format but with microsecond resolution instead of + # millisecond. It is kept so log parsing/formatting remains + # consistent. + ts = datetime.datetime.now() + outln = '%s | %s' % (ts, ln) + self.logfile.write(outln) + + +def get_env(): + env = {} + env['HOME'] = os.path.expanduser('~') + env['USER'] = getpass.getuser() + + # Known locations for PAM mod_env sources + for fn in ['/etc/environment', '/etc/default/locale']: + if os.path.exists(fn): + with open(fn) as f: + for line in f: + if not line: + continue + if line[0] == '#': + continue + if '=' not in line: + continue + k, v = line.strip().split('=') + for q in ["'", '"']: + if v[0] == q: + v = v.strip(q) + env[k] = v + return env + + +def follow(fd): + newline_warning = False + with Console() as console: + while True: + line = fd.readline() + if not line: + break + if not line.endswith('\n'): + line += '\n' + newline_warning = True + console.addLine(line) + if newline_warning: + console.addLine('[Zuul] No trailing newline\n') + + +def run(cwd, cmd, args): + env = get_env() + env.update(args) + proc = subprocess.Popen( + ['/bin/bash', '-l', '-c', cmd], + cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + ) + + t = threading.Thread(target=follow, args=(proc.stdout,)) + t.daemon = True + t.start() + + ret = proc.wait() + # Give the thread that is writing the console log up to 10 seconds + # to catch up and exit. If it hasn't done so by then, it is very + # likely stuck in readline() because it spawed a child that is + # holding stdout or stderr open. + t.join(10) + with Console() as console: + if t.isAlive(): + console.addLine("[Zuul] standard output/error still open " + "after child exited") + console.addLine("[Zuul] Task exit code: %s\n" % ret) + return ret + + +def main(): + module = AnsibleModule( + argument_spec=dict( + command=dict(required=True, default=None), + cwd=dict(required=True, default=None), + parameters=dict(default={}, type='dict') + ) + ) + + p = module.params + env = p['parameters'].copy() + ret = run(p['cwd'], p['command'], env) + if ret == 0: + module.exit_json(changed=True, rc=ret) + else: + module.fail_json(msg="Exit code %s" % ret, rc=ret) + +from ansible.module_utils.basic import * # noqa + +if __name__ == '__main__': + main() diff --git a/zuul/ansible/plugins/__init__.py b/zuul/ansible/plugins/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/ansible/plugins/callback_plugins/__init__.py b/zuul/ansible/plugins/callback_plugins/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/ansible/plugins/callback_plugins/timeout.py b/zuul/ansible/plugins/callback_plugins/timeout.py new file mode 100644 index 0000000000..1cfd10df09 --- /dev/null +++ b/zuul/ansible/plugins/callback_plugins/timeout.py @@ -0,0 +1,52 @@ +# Copyright 2016 IBM Corp. +# +# This file is part of Zuul +# +# This file is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This file is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this file. If not, see . + +import time + +from ansible.executor.task_result import TaskResult +from ansible.plugins.callback import CallbackBase + + +class CallbackModule(CallbackBase): + def __init__(self, *args, **kw): + super(CallbackModule, self).__init__(*args, **kw) + self._elapsed_time = 0.0 + self._task_start_time = None + self._play = None + + def v2_playbook_on_play_start(self, play): + self._play = play + + def playbook_on_task_start(self, name, is_conditional): + self._task_start_time = time.time() + + def v2_on_any(self, *args, **kw): + result = None + if args and isinstance(args[0], TaskResult): + result = args[0] + if not result: + return + + if self._task_start_time is not None: + task_time = time.time() - self._task_start_time + self._elapsed_time += task_time + if self._play and result._host: + manager = self._play.get_variable_manager() + facts = dict(elapsed_time=int(self._elapsed_time)) + + manager.set_nonpersistent_facts(result._host, facts) + self._task_start_time = None diff --git a/zuul/change_matcher.py b/zuul/change_matcher.py index ed380f0ae5..ca2d93f375 100644 --- a/zuul/change_matcher.py +++ b/zuul/change_matcher.py @@ -101,7 +101,7 @@ class MatchAllFiles(AbstractMatcherCollection): yield self.commit_regex def matches(self, change): - if not (hasattr(change, 'files') and change.files): + if not (hasattr(change, 'files') and len(change.files) > 1): return False for file_ in change.files: matched_file = False diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py index 3e290fd9a7..3102f3b0ac 100644 --- a/zuul/cmd/__init__.py +++ b/zuul/cmd/__init__.py @@ -14,8 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. +import six from six.moves import configparser as ConfigParser -import cStringIO import extras import logging import logging.config @@ -47,7 +47,7 @@ def stack_dump_handler(signum, frame): yappi.start() else: yappi.stop() - yappi_out = cStringIO.StringIO() + yappi_out = six.BytesIO() yappi.get_func_stats().print_all(out=yappi_out) yappi.get_thread_stats().print_all(out=yappi_out) log.debug(yappi_out.getvalue()) diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index 59ac419f15..1ce2828f0b 100644 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -154,7 +154,7 @@ class Client(zuul.cmd.ZuulApp): running_items = client.get_running_jobs() if len(running_items) == 0: - print "No jobs currently running" + print("No jobs currently running") return True all_fields = self._show_running_jobs_columns() @@ -181,7 +181,7 @@ class Client(zuul.cmd.ZuulApp): v += all_fields[f]['append'] values.append(v) table.add_row(values) - print table + print(table) return True def _epoch_to_relative_time(self, epoch): diff --git a/zuul/cmd/cloner.py b/zuul/cmd/cloner.py index c616aa145f..4f8b9f474a 100755 --- a/zuul/cmd/cloner.py +++ b/zuul/cmd/cloner.py @@ -27,6 +27,8 @@ ZUUL_ENV_SUFFIXES = ( 'branch', 'ref', 'url', + 'project', + 'newrev', ) @@ -98,6 +100,10 @@ class Cloner(zuul.cmd.ZuulApp): parser.error("Specifying a Zuul ref requires a Zuul url. " "Define Zuul arguments either via environment " "variables or using options above.") + if 'zuul_newrev' in zuul_args and 'zuul_project' not in zuul_args: + parser.error("ZUUL_NEWREV has been specified without " + "ZUUL_PROJECT. Please define a ZUUL_PROJECT or do " + "not set ZUUL_NEWREV.") self.args = args @@ -145,6 +151,8 @@ class Cloner(zuul.cmd.ZuulApp): clone_map_file=self.args.clone_map_file, project_branches=project_branches, cache_dir=self.args.cache_dir, + zuul_newrev=self.args.zuul_newrev, + zuul_project=self.args.zuul_project, ) cloner.execute() diff --git a/zuul/cmd/launcher.py b/zuul/cmd/launcher.py new file mode 100644 index 0000000000..49643aee05 --- /dev/null +++ b/zuul/cmd/launcher.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# Copyright 2013-2014 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import daemon +import extras + +# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore +# instead it depends on lockfile-0.9.1 which uses pidfile. +pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile']) + +import logging +import os +import socket +import sys +import signal + +import zuul.cmd +import zuul.launcher.ansiblelaunchserver + +# No zuul imports that pull in paramiko here; it must not be +# imported until after the daemonization. +# https://github.com/paramiko/paramiko/issues/59 +# Similar situation with gear and statsd. + + +class Launcher(zuul.cmd.ZuulApp): + + def parse_arguments(self): + parser = argparse.ArgumentParser(description='Zuul launch worker.') + parser.add_argument('-c', dest='config', + help='specify the config file') + parser.add_argument('-d', dest='nodaemon', action='store_true', + help='do not run as a daemon') + parser.add_argument('--version', dest='version', action='version', + version=self._get_version(), + help='show zuul version') + parser.add_argument('--keep-jobdir', dest='keep_jobdir', + action='store_true', + help='keep local jobdirs after run completes') + parser.add_argument('command', + choices=zuul.launcher.ansiblelaunchserver.COMMANDS, + nargs='?') + + self.args = parser.parse_args() + + def send_command(self, cmd): + if self.config.has_option('zuul', 'state_dir'): + state_dir = os.path.expanduser( + self.config.get('zuul', 'state_dir')) + else: + state_dir = '/var/lib/zuul' + path = os.path.join(state_dir, 'launcher.socket') + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(path) + s.sendall('%s\n' % cmd) + + def exit_handler(self): + self.launcher.stop() + self.launcher.join() + + def main(self, daemon=True): + # See comment at top of file about zuul imports + + self.setup_logging('launcher', 'log_config') + + self.log = logging.getLogger("zuul.Launcher") + + LaunchServer = zuul.launcher.ansiblelaunchserver.LaunchServer + self.launcher = LaunchServer(self.config, + keep_jobdir=self.args.keep_jobdir) + self.launcher.start() + + signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler) + if daemon: + self.launcher.join() + else: + while True: + try: + signal.pause() + except KeyboardInterrupt: + print("Ctrl + C: asking launcher to exit nicely...\n") + self.exit_handler() + sys.exit(0) + + +def main(): + server = Launcher() + server.parse_arguments() + server.read_config() + + if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS: + server.send_command(server.args.command) + sys.exit(0) + + server.configure_connections() + + if server.config.has_option('launcher', 'pidfile'): + pid_fn = os.path.expanduser(server.config.get('launcher', 'pidfile')) + else: + pid_fn = '/var/run/zuul-launcher/zuul-launcher.pid' + pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10) + + if server.args.nodaemon: + server.main(False) + else: + with daemon.DaemonContext(pidfile=pid): + server.main(True) + + +if __name__ == "__main__": + sys.path.insert(0, '.') + main() diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py index df215fd80b..797a990b01 100644 --- a/zuul/cmd/merger.py +++ b/zuul/cmd/merger.py @@ -68,7 +68,7 @@ class Merger(zuul.cmd.ZuulApp): try: signal.pause() except KeyboardInterrupt: - print "Ctrl + C: asking merger to exit nicely...\n" + print("Ctrl + C: asking merger to exit nicely...\n") self.exit_handler(signal.SIGINT, None) @@ -89,9 +89,7 @@ def main(): f.close() os.unlink(test_fn) except Exception: - print - print "Unable to write to state directory: %s" % state_dir - print + print("\nUnable to write to state directory: %s\n" % state_dir) raise if server.config.has_option('merger', 'pidfile'): diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py index aef9500412..1747ec8503 100755 --- a/zuul/cmd/server.py +++ b/zuul/cmd/server.py @@ -61,12 +61,9 @@ class Server(zuul.cmd.ZuulApp): def reconfigure_handler(self, signum, frame): signal.signal(signal.SIGHUP, signal.SIG_IGN) self.log.debug("Reconfiguration triggered") - self.sched.stopConnections() self.read_config() self.setup_logging('zuul', 'log_config') try: - self.configure_connections() - self.sched.registerConnections(self.connections) self.sched.reconfigure(self.config) except Exception: self.log.exception("Reconfiguration failed:") @@ -89,8 +86,10 @@ class Server(zuul.cmd.ZuulApp): import zuul.trigger.gerrit logging.basicConfig(level=logging.DEBUG) - self.sched = zuul.scheduler.Scheduler(self.config) + self.sched = zuul.scheduler.Scheduler(self.config, + testonly=True) self.configure_connections() + self.sched.registerConnections(self.connections, load=False) layout = self.sched.testConfig(self.config.get('zuul', 'layout_config'), self.connections) @@ -109,7 +108,7 @@ class Server(zuul.cmd.ZuulApp): jobs.add(v) for job in sorted(layout.jobs): if job not in jobs: - print "Job %s not defined" % job + print("FAILURE: Job %s not defined" % job) failure = True return failure @@ -119,18 +118,18 @@ class Server(zuul.cmd.ZuulApp): if child_pid == 0: os.close(pipe_write) self.setup_logging('gearman_server', 'log_config') - import gear + import zuul.lib.gearserver statsd_host = os.environ.get('STATSD_HOST') statsd_port = int(os.environ.get('STATSD_PORT', 8125)) if self.config.has_option('gearman_server', 'listen_address'): host = self.config.get('gearman_server', 'listen_address') else: host = None - gear.Server(4730, - host=host, - statsd_host=statsd_host, - statsd_port=statsd_port, - statsd_prefix='zuul.geard') + zuul.lib.gearserver.GearServer(4730, + host=host, + statsd_host=statsd_host, + statsd_port=statsd_port, + statsd_prefix='zuul.geard') # Keep running until the parent dies: pipe_read = os.fdopen(pipe_read) @@ -174,7 +173,20 @@ class Server(zuul.cmd.ZuulApp): cache_expiry = self.config.getint('zuul', 'status_expiry') else: cache_expiry = 1 - webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry) + + if self.config.has_option('webapp', 'listen_address'): + listen_address = self.config.get('webapp', 'listen_address') + else: + listen_address = '0.0.0.0' + + if self.config.has_option('webapp', 'port'): + port = self.config.getint('webapp', 'port') + else: + port = 8001 + + webapp = zuul.webapp.WebApp( + self.sched, port=port, cache_expiry=cache_expiry, + listen_address=listen_address) rpc = zuul.rpclistener.RPCListener(self.config, self.sched) self.configure_connections() @@ -198,7 +210,7 @@ class Server(zuul.cmd.ZuulApp): try: signal.pause() except KeyboardInterrupt: - print "Ctrl + C: asking scheduler to exit nicely...\n" + print("Ctrl + C: asking scheduler to exit nicely...\n") self.exit_handler(signal.SIGINT, None) diff --git a/zuul/configloader.py b/zuul/configloader.py index 8d92e2b4e6..7800c46b95 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -12,6 +12,7 @@ import os import logging +import six import yaml import voluptuous as vs @@ -154,7 +155,7 @@ class ProjectTemplateParser(object): if not tree: tree = model.JobTree(None) for conf_job in conf: - if isinstance(conf_job, basestring): + if isinstance(conf_job, six.string_types): tree.addJob(layout.getJob(conf_job)) elif isinstance(conf_job, dict): # A dictionary in a job tree may override params, or diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py index a3870b5894..a8d874a4b0 100644 --- a/zuul/connection/gerrit.py +++ b/zuul/connection/gerrit.py @@ -19,22 +19,36 @@ import select import threading import time from six.moves import queue as Queue +from six.moves import urllib import paramiko import logging import pprint import voluptuous as v -import urllib2 from zuul.connection import BaseConnection from zuul.model import TriggerEvent, Project, Change, Ref, NullChange from zuul import exceptions +# Walk the change dependency tree to find a cycle +def detect_cycle(change, history=None): + if history is None: + history = [] + else: + history = history[:] + history.append(change.number) + for dep in change.needs_changes: + if dep.number in history: + raise Exception("Dependency cycle detected: %s in %s" % ( + dep.number, history)) + detect_cycle(dep, history) + + class GerritEventConnector(threading.Thread): """Move events from Gerrit to the scheduler.""" log = logging.getLogger("zuul.GerritEventConnector") - delay = 5.0 + delay = 10.0 def __init__(self, connection): super(GerritEventConnector, self).__init__() @@ -96,7 +110,7 @@ class GerritEventConnector(threading.Thread): try: event.account = data.get(accountfield_from_type[event.type]) except KeyError: - self.log.error("Received unrecognized event type '%s' from Gerrit.\ + self.log.warning("Received unrecognized event type '%s' from Gerrit.\ Can not get account information." % event.type) event.account = None @@ -132,6 +146,7 @@ class GerritEventConnector(threading.Thread): class GerritWatcher(threading.Thread): log = logging.getLogger("gerrit.GerritWatcher") + poll_timeout = 500 def __init__(self, gerrit_connection, username, hostname, port=29418, keyfile=None): @@ -154,7 +169,7 @@ class GerritWatcher(threading.Thread): poll = select.poll() poll.register(stdout.channel) while not self._stopped: - ret = poll.poll() + ret = poll.poll(self.poll_timeout) for (fd, event) in ret: if fd == stdout.channel.fileno(): if event == select.POLLIN: @@ -290,7 +305,7 @@ class GerritConnection(BaseConnection): raise return change - def _getDependsOnFromCommit(self, message): + def _getDependsOnFromCommit(self, message, change): records = [] seen = set() for match in self.depends_on_re.findall(message): @@ -300,17 +315,19 @@ class GerritConnection(BaseConnection): continue seen.add(match) query = "change:%s" % (match,) - self.log.debug("Running query %s to find needed changes" % - (query,)) + self.log.debug("Updating %s: Running query %s " + "to find needed changes" % + (change, query,)) records.extend(self.simpleQuery(query)) return records - def _getNeededByFromCommit(self, change_id): + def _getNeededByFromCommit(self, change_id, change): records = [] seen = set() query = 'message:%s' % change_id - self.log.debug("Running query %s to find changes needed-by" % - (query,)) + self.log.debug("Updating %s: Running query %s " + "to find changes needed-by" % + (change, query,)) results = self.simpleQuery(query) for result in results: for match in self.depends_on_re.findall( @@ -320,15 +337,15 @@ class GerritConnection(BaseConnection): key = (result['number'], result['currentPatchSet']['number']) if key in seen: continue - self.log.debug("Found change %s,%s needs %s from commit" % - (key[0], key[1], change_id)) + self.log.debug("Updating %s: Found change %s,%s " + "needs %s from commit" % + (change, key[0], key[1], change_id)) seen.add(key) records.append(result) return records def _updateChange(self, change, history=None): - self.log.info("Updating information for %s,%s" % - (change.number, change.patchset)) + self.log.info("Updating %s" % (change,)) data = self.query(change.number) change._data = data @@ -364,6 +381,7 @@ class GerritConnection(BaseConnection): if change.is_merged: # This change is merged, so we don't need to look any further # for dependencies. + self.log.debug("Updating %s: change is merged" % (change,)) return change if history is None: @@ -379,21 +397,35 @@ class GerritConnection(BaseConnection): if dep_num in history: raise Exception("Dependency cycle detected: %s in %s" % ( dep_num, history)) - self.log.debug("Getting git-dependent change %s,%s" % - (dep_num, dep_ps)) + self.log.debug("Updating %s: Getting git-dependent change %s,%s" % + (change, dep_num, dep_ps)) dep = self._getChange(dep_num, dep_ps, history=history) + # Because we are not forcing a refresh in _getChange, it + # may return without executing this code, so if we are + # updating our change to add ourselves to a dependency + # cycle, we won't detect it. By explicitly performing a + # walk of the dependency tree, we will. + detect_cycle(dep, history) if (not dep.is_merged) and dep not in needs_changes: needs_changes.append(dep) - for record in self._getDependsOnFromCommit(data['commitMessage']): + for record in self._getDependsOnFromCommit(data['commitMessage'], + change): dep_num = record['number'] dep_ps = record['currentPatchSet']['number'] if dep_num in history: raise Exception("Dependency cycle detected: %s in %s" % ( dep_num, history)) - self.log.debug("Getting commit-dependent change %s,%s" % - (dep_num, dep_ps)) + self.log.debug("Updating %s: Getting commit-dependent " + "change %s,%s" % + (change, dep_num, dep_ps)) dep = self._getChange(dep_num, dep_ps, history=history) + # Because we are not forcing a refresh in _getChange, it + # may return without executing this code, so if we are + # updating our change to add ourselves to a dependency + # cycle, we won't detect it. By explicitly performing a + # walk of the dependency tree, we will. + detect_cycle(dep, history) if (not dep.is_merged) and dep not in needs_changes: needs_changes.append(dep) change.needs_changes = needs_changes @@ -403,15 +435,17 @@ class GerritConnection(BaseConnection): for needed in data['neededBy']: parts = needed['ref'].split('/') dep_num, dep_ps = parts[3], parts[4] + self.log.debug("Updating %s: Getting git-needed change %s,%s" % + (change, dep_num, dep_ps)) dep = self._getChange(dep_num, dep_ps) if (not dep.is_merged) and dep.is_current_patchset: needed_by_changes.append(dep) - for record in self._getNeededByFromCommit(data['id']): + for record in self._getNeededByFromCommit(data['id'], change): dep_num = record['number'] dep_ps = record['currentPatchSet']['number'] - self.log.debug("Getting commit-needed change %s,%s" % - (dep_num, dep_ps)) + self.log.debug("Updating %s: Getting commit-needed change %s,%s" % + (change, dep_num, dep_ps)) # Because a commit needed-by may be a cross-repo # dependency, cause that change to refresh so that it will # reference the latest patchset of its Depends-On (this @@ -434,6 +468,10 @@ class GerritConnection(BaseConnection): data = self.query(change.number) change._data = data change.is_merged = self._isMerged(change) + if change.is_merged: + self.log.debug("Change %s is merged" % (change,)) + else: + self.log.debug("Change %s is not merged" % (change,)) if not head: return change.is_merged if not change.is_merged: @@ -456,7 +494,6 @@ class GerritConnection(BaseConnection): status = data.get('status') if not status: return False - self.log.debug("Change %s status: %s" % (change, status)) if status == 'MERGED': return True return False @@ -666,10 +703,10 @@ class GerritConnection(BaseConnection): url = "%s/p/%s/info/refs?service=git-upload-pack" % ( self.baseurl, project) try: - data = urllib2.urlopen(url).read() + data = urllib.request.urlopen(url).read() except: self.log.error("Cannot get references from %s" % url) - raise # keeps urllib2 error informations + raise # keeps urllib error informations ret = {} read_headers = False read_advertisement = False diff --git a/zuul/exceptions.py b/zuul/exceptions.py index 2bd2c6b4b6..40a1e40f52 100644 --- a/zuul/exceptions.py +++ b/zuul/exceptions.py @@ -22,5 +22,14 @@ class ChangeNotFound(Exception): super(ChangeNotFound, self).__init__(message) +class RevNotFound(Exception): + def __init__(self, project, rev): + self.project = project + self.revision = rev + message = ("Failed to checkout project '%s' at revision '%s'" + % (self.project, self.revision)) + super(RevNotFound, self).__init__(message) + + class MergeFailure(Exception): pass diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py new file mode 100644 index 0000000000..95fc2fa46a --- /dev/null +++ b/zuul/launcher/ansiblelaunchserver.py @@ -0,0 +1,1384 @@ +# Copyright 2014 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import logging +import os +import re +import shutil +import signal +import socket +import subprocess +import tempfile +import threading +import time +import traceback +import Queue +import uuid + +import gear +import yaml +import jenkins_jobs.builder +import jenkins_jobs.formatter +import zmq + +import zuul.ansible.library +import zuul.ansible.plugins.callback_plugins +from zuul.lib import commandsocket + +ANSIBLE_WATCHDOG_GRACE = 5 * 60 +ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60 +ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60 + + +COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful', + 'verbose', 'unverbose'] + + +def boolify(x): + if isinstance(x, str): + return bool(int(x)) + return bool(x) + + +class LaunchGearWorker(gear.Worker): + def __init__(self, *args, **kw): + self.__launch_server = kw.pop('launch_server') + super(LaunchGearWorker, self).__init__(*args, **kw) + + def handleNoop(self, packet): + workers = len(self.__launch_server.node_workers) + delay = (workers ** 2) / 1000.0 + time.sleep(delay) + return super(LaunchGearWorker, self).handleNoop(packet) + + +class NodeGearWorker(gear.Worker): + MASS_DO = 101 + + def sendMassDo(self, functions): + data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions]) + self.broadcast_lock.acquire() + try: + p = gear.Packet(gear.constants.REQ, self.MASS_DO, data) + self.broadcast(p) + finally: + self.broadcast_lock.release() + + +class Watchdog(object): + def __init__(self, timeout, function, args): + self.timeout = timeout + self.function = function + self.args = args + self.thread = threading.Thread(target=self._run) + self.thread.daemon = True + + def _run(self): + while self._running and time.time() < self.end: + time.sleep(10) + if self._running: + self.function(*self.args) + + def start(self): + self._running = True + self.end = time.time() + self.timeout + self.thread.start() + + def stop(self): + self._running = False + + +class JobDir(object): + def __init__(self, keep=False): + self.keep = keep + self.root = tempfile.mkdtemp() + self.ansible_root = os.path.join(self.root, 'ansible') + os.makedirs(self.ansible_root) + self.known_hosts = os.path.join(self.ansible_root, 'known_hosts') + self.inventory = os.path.join(self.ansible_root, 'inventory') + self.playbook = os.path.join(self.ansible_root, 'playbook') + self.post_playbook = os.path.join(self.ansible_root, 'post_playbook') + self.config = os.path.join(self.ansible_root, 'ansible.cfg') + self.script_root = os.path.join(self.ansible_root, 'scripts') + self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt') + os.makedirs(self.script_root) + self.staging_root = os.path.join(self.root, 'staging') + os.makedirs(self.staging_root) + + def __enter__(self): + return self + + def __exit__(self, etype, value, tb): + if not self.keep: + shutil.rmtree(self.root) + + +class LaunchServer(object): + log = logging.getLogger("zuul.LaunchServer") + site_section_re = re.compile('site "(.*?)"') + node_section_re = re.compile('node "(.*?)"') + + def __init__(self, config, keep_jobdir=False): + self.config = config + self.options = dict( + verbose=False + ) + self.keep_jobdir = keep_jobdir + self.hostname = socket.gethostname() + self.registered_functions = set() + self.node_workers = {} + self.jobs = {} + self.builds = {} + self.zmq_send_queue = Queue.Queue() + self.termination_queue = Queue.Queue() + self.sites = {} + self.static_nodes = {} + self.command_map = dict( + reconfigure=self.reconfigure, + stop=self.stop, + pause=self.pause, + unpause=self.unpause, + release=self.release, + graceful=self.graceful, + verbose=self.verboseOn, + unverbose=self.verboseOff, + ) + + if config.has_option('launcher', 'accept_nodes'): + self.accept_nodes = config.getboolean('launcher', + 'accept_nodes') + else: + self.accept_nodes = True + self.config_accept_nodes = self.accept_nodes + + if self.config.has_option('zuul', 'state_dir'): + state_dir = os.path.expanduser( + self.config.get('zuul', 'state_dir')) + else: + state_dir = '/var/lib/zuul' + path = os.path.join(state_dir, 'launcher.socket') + self.command_socket = commandsocket.CommandSocket(path) + ansible_dir = os.path.join(state_dir, 'ansible') + plugins_dir = os.path.join(ansible_dir, 'plugins') + self.callback_dir = os.path.join(plugins_dir, 'callback_plugins') + if not os.path.exists(self.callback_dir): + os.makedirs(self.callback_dir) + self.library_dir = os.path.join(ansible_dir, 'library') + if not os.path.exists(self.library_dir): + os.makedirs(self.library_dir) + + callback_path = os.path.dirname(os.path.abspath( + zuul.ansible.plugins.callback_plugins.__file__)) + for fn in os.listdir(callback_path): + shutil.copy(os.path.join(callback_path, fn), self.callback_dir) + + library_path = os.path.dirname(os.path.abspath( + zuul.ansible.library.__file__)) + for fn in os.listdir(library_path): + shutil.copy(os.path.join(library_path, fn), self.library_dir) + + for section in config.sections(): + m = self.site_section_re.match(section) + if m: + sitename = m.group(1) + d = {} + d['host'] = config.get(section, 'host') + d['user'] = config.get(section, 'user') + if config.has_option(section, 'pass'): + d['pass'] = config.get(section, 'pass') + else: + d['pass'] = '' + if config.has_option(section, 'root'): + d['root'] = config.get(section, 'root') + else: + d['root'] = '/' + self.sites[sitename] = d + continue + m = self.node_section_re.match(section) + if m: + nodename = m.group(1) + d = {} + d['name'] = nodename + d['host'] = config.get(section, 'host') + if config.has_option(section, 'description'): + d['description'] = config.get(section, 'description') + else: + d['description'] = '' + if config.has_option(section, 'labels'): + d['labels'] = config.get(section, 'labels').split(',') + else: + d['labels'] = [] + self.static_nodes[nodename] = d + continue + + def start(self): + self._gearman_running = True + self._zmq_running = True + self._reaper_running = True + self._command_running = True + + # Setup ZMQ + self.zcontext = zmq.Context() + self.zsocket = self.zcontext.socket(zmq.PUB) + self.zsocket.bind("tcp://*:8888") + + # Setup Gearman + server = self.config.get('gearman', 'server') + if self.config.has_option('gearman', 'port'): + port = self.config.get('gearman', 'port') + else: + port = 4730 + self.worker = LaunchGearWorker('Zuul Launch Server', + launch_server=self) + self.worker.addServer(server, port) + self.log.debug("Waiting for server") + self.worker.waitForServer() + self.log.debug("Registering") + self.register() + + # Start command socket + self.log.debug("Starting command processor") + self.command_socket.start() + self.command_thread = threading.Thread(target=self.runCommand) + self.command_thread.daemon = True + self.command_thread.start() + + # Load JJB config + self.loadJobs() + + # Start ZMQ worker thread + self.log.debug("Starting ZMQ processor") + self.zmq_thread = threading.Thread(target=self.runZMQ) + self.zmq_thread.daemon = True + self.zmq_thread.start() + + # Start node worker reaper thread + self.log.debug("Starting reaper") + self.reaper_thread = threading.Thread(target=self.runReaper) + self.reaper_thread.daemon = True + self.reaper_thread.start() + + # Start Gearman worker thread + self.log.debug("Starting worker") + self.gearman_thread = threading.Thread(target=self.run) + self.gearman_thread.daemon = True + self.gearman_thread.start() + + # Start static workers + for node in self.static_nodes.values(): + self.log.debug("Creating static node with arguments: %s" % (node,)) + self._launchWorker(node) + + def loadJobs(self): + self.log.debug("Loading jobs") + builder = JJB() + path = self.config.get('launcher', 'jenkins_jobs') + builder.load_files([path]) + builder.parser.expandYaml() + unseen = set(self.jobs.keys()) + for job in builder.parser.jobs: + builder.expandMacros(job) + self.jobs[job['name']] = job + unseen.discard(job['name']) + for name in unseen: + del self.jobs[name] + + def register(self): + new_functions = set() + if self.accept_nodes: + new_functions.add("node_assign:zuul") + new_functions.add("stop:%s" % self.hostname) + new_functions.add("set_description:%s" % self.hostname) + new_functions.add("node_revoke:%s" % self.hostname) + + for function in new_functions - self.registered_functions: + self.worker.registerFunction(function) + for function in self.registered_functions - new_functions: + self.worker.unRegisterFunction(function) + self.registered_functions = new_functions + + def reconfigure(self): + self.log.debug("Reconfiguring") + self.loadJobs() + for node in self.node_workers.values(): + try: + if node.isAlive(): + node.queue.put(dict(action='reconfigure')) + except Exception: + self.log.exception("Exception sending reconfigure command " + "to worker:") + self.log.debug("Reconfiguration complete") + + def pause(self): + self.log.debug("Pausing") + self.accept_nodes = False + self.register() + for node in self.node_workers.values(): + try: + if node.isAlive(): + node.queue.put(dict(action='pause')) + except Exception: + self.log.exception("Exception sending pause command " + "to worker:") + self.log.debug("Paused") + + def unpause(self): + self.log.debug("Unpausing") + self.accept_nodes = self.config_accept_nodes + self.register() + for node in self.node_workers.values(): + try: + if node.isAlive(): + node.queue.put(dict(action='unpause')) + except Exception: + self.log.exception("Exception sending unpause command " + "to worker:") + self.log.debug("Unpaused") + + def release(self): + self.log.debug("Releasing idle nodes") + for node in self.node_workers.values(): + if node.name in self.static_nodes: + continue + try: + if node.isAlive(): + node.queue.put(dict(action='release')) + except Exception: + self.log.exception("Exception sending release command " + "to worker:") + self.log.debug("Finished releasing idle nodes") + + def graceful(self): + # Note: this is run in the command processing thread; no more + # external commands will be processed after this. + self.log.debug("Gracefully stopping") + self.pause() + self.release() + self.log.debug("Waiting for all builds to finish") + while self.builds: + time.sleep(5) + self.log.debug("All builds are finished") + self.stop() + + def stop(self): + self.log.debug("Stopping") + # First, stop accepting new jobs + self._gearman_running = False + self._reaper_running = False + self.worker.shutdown() + # Then stop all of the workers + for node in self.node_workers.values(): + try: + if node.isAlive(): + node.stop() + except Exception: + self.log.exception("Exception sending stop command to worker:") + # Stop ZMQ afterwords so that the send queue is flushed + self._zmq_running = False + self.zmq_send_queue.put(None) + self.zmq_send_queue.join() + # Stop command processing + self._command_running = False + self.command_socket.stop() + # Join the gearman thread which was stopped earlier. + self.gearman_thread.join() + # The command thread is joined in the join() method of this + # class, which is called by the command shell. + self.log.debug("Stopped") + + def verboseOn(self): + self.log.debug("Enabling verbose mode") + self.options['verbose'] = True + + def verboseOff(self): + self.log.debug("Disabling verbose mode") + self.options['verbose'] = False + + def join(self): + self.command_thread.join() + + def runCommand(self): + while self._command_running: + try: + command = self.command_socket.get() + self.command_map[command]() + except Exception: + self.log.exception("Exception while processing command") + + def runZMQ(self): + while self._zmq_running or not self.zmq_send_queue.empty(): + try: + item = self.zmq_send_queue.get() + self.log.debug("Got ZMQ event %s" % (item,)) + if item is None: + continue + self.zsocket.send(item) + except Exception: + self.log.exception("Exception while processing ZMQ events") + finally: + self.zmq_send_queue.task_done() + + def run(self): + while self._gearman_running: + try: + job = self.worker.getJob() + try: + if job.name.startswith('node_assign:'): + self.log.debug("Got node_assign job: %s" % job.unique) + self.assignNode(job) + elif job.name.startswith('stop:'): + self.log.debug("Got stop job: %s" % job.unique) + self.stopJob(job) + elif job.name.startswith('set_description:'): + self.log.debug("Got set_description job: %s" % + job.unique) + job.sendWorkComplete() + elif job.name.startswith('node_revoke:'): + self.log.debug("Got node_revoke job: %s" % job.unique) + self.revokeNode(job) + else: + self.log.error("Unable to handle job %s" % job.name) + job.sendWorkFail() + except Exception: + self.log.exception("Exception while running job") + job.sendWorkException(traceback.format_exc()) + except gear.InterruptedError: + return + except Exception: + self.log.exception("Exception while getting job") + + def assignNode(self, job): + args = json.loads(job.arguments) + self.log.debug("Assigned node with arguments: %s" % (args,)) + self._launchWorker(args) + data = dict(manager=self.hostname) + job.sendWorkData(json.dumps(data)) + job.sendWorkComplete() + + def _launchWorker(self, args): + worker = NodeWorker(self.config, self.jobs, self.builds, + self.sites, args['name'], args['host'], + args['description'], args['labels'], + self.hostname, self.zmq_send_queue, + self.termination_queue, self.keep_jobdir, + self.callback_dir, self.library_dir, + self.options) + self.node_workers[worker.name] = worker + + worker.thread = threading.Thread(target=worker.run) + worker.thread.start() + + def revokeNode(self, job): + try: + args = json.loads(job.arguments) + self.log.debug("Revoke job with arguments: %s" % (args,)) + name = args['name'] + node = self.node_workers.get(name) + if not node: + self.log.debug("Unable to find worker %s" % (name,)) + return + try: + if node.isAlive(): + node.queue.put(dict(action='stop')) + else: + self.log.debug("Node %s is not alive while revoking node" % + (node.name,)) + except Exception: + self.log.exception("Exception sending stop command " + "to worker:") + finally: + job.sendWorkComplete() + + def stopJob(self, job): + try: + args = json.loads(job.arguments) + self.log.debug("Stop job with arguments: %s" % (args,)) + unique = args['number'] + build_worker_name = self.builds.get(unique) + if not build_worker_name: + self.log.debug("Unable to find build for job %s" % (unique,)) + return + node = self.node_workers.get(build_worker_name) + if not node: + self.log.debug("Unable to find worker for job %s" % (unique,)) + return + try: + if node.isAlive(): + node.queue.put(dict(action='abort')) + else: + self.log.debug("Node %s is not alive while aborting job" % + (node.name,)) + except Exception: + self.log.exception("Exception sending abort command " + "to worker:") + finally: + job.sendWorkComplete() + + def runReaper(self): + # We don't actually care if all the events are processed + while self._reaper_running: + try: + item = self.termination_queue.get() + self.log.debug("Got termination event %s" % (item,)) + if item is None: + continue + worker = self.node_workers[item] + self.log.debug("Joining %s" % (item,)) + worker.thread.join() + self.log.debug("Joined %s" % (item,)) + del self.node_workers[item] + except Exception: + self.log.exception("Exception while processing " + "termination events:") + finally: + self.termination_queue.task_done() + + +class NodeWorker(object): + def __init__(self, config, jobs, builds, sites, name, host, + description, labels, manager_name, zmq_send_queue, + termination_queue, keep_jobdir, callback_dir, + library_dir, options): + self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,)) + self.log.debug("Creating node worker %s" % (name,)) + self.config = config + self.jobs = jobs + self.builds = builds + self.sites = sites + self.name = name + self.host = host + self.description = description + if not isinstance(labels, list): + labels = [labels] + self.labels = labels + self.thread = None + self.registered_functions = set() + # If the unpaused Event is set, that means we should run jobs. + # If it is clear, then we are paused and should not run jobs. + self.unpaused = threading.Event() + self.unpaused.set() + self._running = True + self.queue = Queue.Queue() + self.manager_name = manager_name + self.zmq_send_queue = zmq_send_queue + self.termination_queue = termination_queue + self.keep_jobdir = keep_jobdir + self.running_job_lock = threading.Lock() + self.pending_registration = False + self.registration_lock = threading.Lock() + self._get_job_lock = threading.Lock() + self._got_job = False + self._job_complete_event = threading.Event() + self._running_job = False + self._aborted_job = False + self._sent_complete_event = False + self.ansible_job_proc = None + self.ansible_post_proc = None + self.workspace_root = config.get('launcher', 'workspace_root') + if self.config.has_option('launcher', 'private_key_file'): + self.private_key_file = config.get('launcher', 'private_key_file') + else: + self.private_key_file = '~/.ssh/id_rsa' + if self.config.has_option('launcher', 'username'): + self.username = config.get('launcher', 'username') + else: + self.username = 'zuul' + self.callback_dir = callback_dir + self.library_dir = library_dir + self.options = options + + def isAlive(self): + # Meant to be called from the manager + if self.thread and self.thread.is_alive(): + return True + return False + + def run(self): + self.log.debug("Node worker %s starting" % (self.name,)) + server = self.config.get('gearman', 'server') + if self.config.has_option('gearman', 'port'): + port = self.config.get('gearman', 'port') + else: + port = 4730 + self.worker = NodeGearWorker(self.name) + self.worker.addServer(server, port) + self.log.debug("Waiting for server") + self.worker.waitForServer() + self.log.debug("Registering") + self.register() + + self.gearman_thread = threading.Thread(target=self.runGearman) + self.gearman_thread.daemon = True + self.gearman_thread.start() + + self.log.debug("Started") + + while self._running or not self.queue.empty(): + try: + self._runQueue() + except Exception: + self.log.exception("Exception in queue manager:") + + def stop(self): + # If this is called locally, setting _running will be + # effictive, if it's called remotely, it will not be, but it + # will be set by the queue thread. + self.log.debug("Submitting stop request") + self._running = False + self.unpaused.set() + self.queue.put(dict(action='stop')) + self.queue.join() + + def pause(self): + self.unpaused.clear() + self.worker.stopWaitingForJobs() + + def unpause(self): + self.unpaused.set() + + def release(self): + # If this node is idle, stop it. + old_unpaused = self.unpaused.is_set() + if old_unpaused: + self.pause() + with self._get_job_lock: + if self._got_job: + self.log.debug("This worker is not idle") + if old_unpaused: + self.unpause() + return + self.log.debug("Stopping due to release command") + self.queue.put(dict(action='stop')) + + def _runQueue(self): + item = self.queue.get() + try: + if item['action'] == 'stop': + self.log.debug("Received stop request") + self._running = False + self.termination_queue.put(self.name) + if not self.abortRunningJob(): + self.sendFakeCompleteEvent() + else: + self._job_complete_event.wait() + self.worker.shutdown() + if item['action'] == 'pause': + self.log.debug("Received pause request") + self.pause() + if item['action'] == 'unpause': + self.log.debug("Received unpause request") + self.unpause() + if item['action'] == 'release': + self.log.debug("Received release request") + self.release() + elif item['action'] == 'reconfigure': + self.log.debug("Received reconfigure request") + self.register() + elif item['action'] == 'abort': + self.log.debug("Received abort request") + self.abortRunningJob() + finally: + self.queue.task_done() + + def runGearman(self): + while self._running: + try: + self.unpaused.wait() + if self._running: + self._runGearman() + except Exception: + self.log.exception("Exception in gearman manager:") + with self._get_job_lock: + self._got_job = False + + def _runGearman(self): + if self.pending_registration: + self.register() + with self._get_job_lock: + try: + job = self.worker.getJob() + self._got_job = True + except gear.InterruptedError: + return + self.log.debug("Node worker %s got job %s" % (self.name, job.name)) + try: + if job.name not in self.registered_functions: + self.log.error("Unable to handle job %s" % job.name) + job.sendWorkFail() + return + self.launch(job) + except Exception: + self.log.exception("Exception while running job") + job.sendWorkException(traceback.format_exc()) + + def generateFunctionNames(self, job): + # This only supports "node: foo" and "node: foo || bar" + ret = set() + job_labels = job.get('node') + matching_labels = set() + if job_labels: + job_labels = [x.strip() for x in job_labels.split('||')] + matching_labels = set(self.labels) & set(job_labels) + if not matching_labels: + return ret + ret.add('build:%s' % (job['name'],)) + for label in matching_labels: + ret.add('build:%s:%s' % (job['name'], label)) + return ret + + def register(self): + if not self.registration_lock.acquire(False): + self.log.debug("Registration already in progress") + return + try: + if self._running_job: + self.pending_registration = True + self.log.debug("Ignoring registration due to running job") + return + self.log.debug("Updating registration") + self.pending_registration = False + new_functions = set() + for job in self.jobs.values(): + new_functions |= self.generateFunctionNames(job) + self.worker.sendMassDo(new_functions) + self.registered_functions = new_functions + finally: + self.registration_lock.release() + + def abortRunningJob(self): + self._aborted_job = True + return self.abortRunningProc(self.ansible_job_proc) + + def abortRunningProc(self, proc): + aborted = False + self.log.debug("Abort: acquiring job lock") + with self.running_job_lock: + if self._running_job: + self.log.debug("Abort: a job is running") + if proc: + self.log.debug("Abort: sending kill signal to job " + "process group") + try: + pgid = os.getpgid(proc.pid) + os.killpg(pgid, signal.SIGKILL) + aborted = True + except Exception: + self.log.exception("Exception while killing " + "ansible process:") + else: + self.log.debug("Abort: no job is running") + + return aborted + + def launch(self, job): + self.log.info("Node worker %s launching job %s" % + (self.name, job.name)) + + # Make sure we can parse what we need from the job first + args = json.loads(job.arguments) + offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False)) + job_name = job.name.split(':')[1] + + # Initialize the result so we have something regardless of + # whether the job actually runs + result = None + self._sent_complete_event = False + self._aborted_job = False + + try: + self.sendStartEvent(job_name, args) + except Exception: + self.log.exception("Exception while sending job start event") + + try: + result = self.runJob(job, args) + except Exception: + self.log.exception("Exception while launching job thread") + + self._running_job = False + + try: + data = json.dumps(dict(result=result)) + job.sendWorkComplete(data) + except Exception: + self.log.exception("Exception while sending job completion packet") + + try: + self.sendCompleteEvent(job_name, result, args) + except Exception: + self.log.exception("Exception while sending job completion event") + + try: + del self.builds[job.unique] + except Exception: + self.log.exception("Exception while clearing build record") + + self._job_complete_event.set() + if offline and self._running: + self.stop() + + def sendStartEvent(self, name, parameters): + build = dict(node_name=self.name, + host_name=self.manager_name, + parameters=parameters) + + event = dict(name=name, + build=build) + + item = "onStarted %s" % json.dumps(event) + self.log.debug("Sending over ZMQ: %s" % (item,)) + self.zmq_send_queue.put(item) + + def sendCompleteEvent(self, name, status, parameters): + build = dict(status=status, + node_name=self.name, + host_name=self.manager_name, + parameters=parameters) + + event = dict(name=name, + build=build) + + item = "onFinalized %s" % json.dumps(event) + self.log.debug("Sending over ZMQ: %s" % (item,)) + self.zmq_send_queue.put(item) + self._sent_complete_event = True + + def sendFakeCompleteEvent(self): + if self._sent_complete_event: + return + self.sendCompleteEvent('zuul:launcher-shutdown', + 'SUCCESS', {}) + + def runJob(self, job, args): + self.ansible_job_proc = None + self.ansible_post_proc = None + result = None + with self.running_job_lock: + if not self._running: + return result + self._running_job = True + self._job_complete_event.clear() + + self.log.debug("Job %s: beginning" % (job.unique,)) + self.builds[job.unique] = self.name + with JobDir(self.keep_jobdir) as jobdir: + self.log.debug("Job %s: job root at %s" % + (job.unique, jobdir.root)) + timeout = self.prepareAnsibleFiles(jobdir, job, args) + + data = { + 'manager': self.manager_name, + 'number': job.unique, + 'url': 'telnet://%s:8088' % self.host, + } + job.sendWorkData(json.dumps(data)) + job.sendWorkStatus(0, 100) + + job_status = self.runAnsiblePlaybook(jobdir, timeout) + if job_status is None: + # The result of the job is indeterminate. Zuul will + # run it again. + return result + + post_status = self.runAnsiblePostPlaybook(jobdir, job_status) + if not post_status: + result = 'POST_FAILURE' + elif job_status: + result = 'SUCCESS' + else: + result = 'FAILURE' + + if self._aborted_job: + # A Null result will cause zuul to relaunch the job if + # it needs to. + result = None + + return result + + def getHostList(self): + return [('node', dict( + ansible_host=self.host, ansible_user=self.username))] + + def _substituteVariables(self, text, variables): + def lookup(match): + return variables.get(match.group(1), '') + return re.sub('\$([A-Za-z0-9_]+)', lookup, text) + + def _getRsyncOptions(self, source, parameters): + # Treat the publisher source as a filter; ant and rsync behave + # fairly close in this manner, except for leading directories. + source = self._substituteVariables(source, parameters) + # If the source starts with ** then we want to match any + # number of directories, so don't anchor the include filter. + # If it does not start with **, then the intent is likely to + # at least start by matching an immediate file or subdirectory + # (even if later we have a ** in the middle), so in this case, + # anchor it to the root of the transfer (the workspace). + if not source.startswith('**'): + source = os.path.join('/', source) + # These options mean: include the thing we want, include any + # directories (so that we continue to search for the thing we + # want no matter how deep it is), exclude anything that + # doesn't match the thing we want or is a directory, then get + # rid of empty directories left over at the end. + rsync_opts = ['--include="%s"' % source, + '--include="*/"', + '--exclude="*"', + '--prune-empty-dirs'] + return rsync_opts + + def _makeSCPTask(self, jobdir, publisher, parameters): + tasks = [] + for scpfile in publisher['scp']['files']: + scproot = tempfile.mkdtemp(dir=jobdir.staging_root) + os.chmod(scproot, 0o755) + + site = publisher['scp']['site'] + if scpfile.get('copy-console'): + # Include the local ansible directory in the console + # upload. This uploads the playbook and ansible logs. + copyargs = dict(src=jobdir.ansible_root + '/', + dest=os.path.join(scproot, '_zuul_ansible')) + task = dict(copy=copyargs, + delegate_to='127.0.0.1') + tasks.append(task) + + # Fetch the console log from the remote host. + src = '/tmp/console.html' + rsync_opts = [] + else: + src = parameters['WORKSPACE'] + if not src.endswith('/'): + src = src + '/' + rsync_opts = self._getRsyncOptions(scpfile['source'], + parameters) + + syncargs = dict(src=src, + dest=scproot, + copy_links='yes', + mode='pull') + if rsync_opts: + syncargs['rsync_opts'] = rsync_opts + task = dict(synchronize=syncargs) + if not scpfile.get('copy-after-failure'): + task['when'] = 'success' + tasks.append(task) + + task = self._makeSCPTaskLocalAction( + site, scpfile, scproot, parameters) + tasks.append(task) + return tasks + + def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters): + if site not in self.sites: + raise Exception("Undefined SCP site: %s" % (site,)) + site = self.sites[site] + dest = scpfile['target'].lstrip('/') + dest = self._substituteVariables(dest, parameters) + dest = os.path.join(site['root'], dest) + dest = os.path.normpath(dest) + if not dest.startswith(site['root']): + raise Exception("Target path %s is not below site root" % + (dest,)) + + rsync_cmd = [ + '/usr/bin/rsync', '--delay-updates', '-F', + '--compress', '-rt', '--safe-links', + '--rsync-path="mkdir -p {dest} && rsync"', + '--rsh="/usr/bin/ssh -i {private_key_file} -S none ' + '-o StrictHostKeyChecking=no -q"', + '--out-format="<>%i %n%L"', + '{source}', '"{user}@{host}:{dest}"' + ] + if scpfile.get('keep-hierarchy'): + source = '"%s/"' % scproot + else: + source = '`/usr/bin/find "%s" -type f`' % scproot + shellargs = ' '.join(rsync_cmd).format( + source=source, + dest=dest, + private_key_file=self.private_key_file, + host=site['host'], + user=site['user']) + task = dict(shell=shellargs, + delegate_to='127.0.0.1') + if not scpfile.get('copy-after-failure'): + task['when'] = 'success' + + return task + + def _makeFTPTask(self, jobdir, publisher, parameters): + tasks = [] + ftp = publisher['ftp'] + site = ftp['site'] + if site not in self.sites: + raise Exception("Undefined FTP site: %s" % site) + site = self.sites[site] + + ftproot = tempfile.mkdtemp(dir=jobdir.staging_root) + ftpcontent = os.path.join(ftproot, 'content') + os.makedirs(ftpcontent) + ftpscript = os.path.join(ftproot, 'script') + + src = parameters['WORKSPACE'] + if not src.endswith('/'): + src = src + '/' + rsync_opts = self._getRsyncOptions(ftp['source'], + parameters) + syncargs = dict(src=src, + dest=ftpcontent, + copy_links='yes', + mode='pull') + if rsync_opts: + syncargs['rsync_opts'] = rsync_opts + task = dict(synchronize=syncargs, + when='success') + tasks.append(task) + task = dict(shell='lftp -f %s' % ftpscript, + when='success', + delegate_to='127.0.0.1') + ftpsource = ftpcontent + if ftp.get('remove-prefix'): + ftpsource = os.path.join(ftpcontent, ftp['remove-prefix']) + while ftpsource[-1] == '/': + ftpsource = ftpsource[:-1] + ftptarget = ftp['target'].lstrip('/') + ftptarget = self._substituteVariables(ftptarget, parameters) + ftptarget = os.path.join(site['root'], ftptarget) + ftptarget = os.path.normpath(ftptarget) + if not ftptarget.startswith(site['root']): + raise Exception("Target path %s is not below site root" % + (ftptarget,)) + while ftptarget[-1] == '/': + ftptarget = ftptarget[:-1] + with open(ftpscript, 'w') as script: + script.write('open %s\n' % site['host']) + script.write('user %s %s\n' % (site['user'], site['pass'])) + script.write('mirror -R %s %s\n' % (ftpsource, ftptarget)) + tasks.append(task) + return tasks + + def _makeBuilderTask(self, jobdir, builder, parameters): + tasks = [] + script_fn = '%s.sh' % str(uuid.uuid4().hex) + script_path = os.path.join(jobdir.script_root, script_fn) + with open(script_path, 'w') as script: + data = builder['shell'] + if not data.startswith('#!'): + data = '#!/bin/bash -x\n %s' % (data,) + script.write(data) + + remote_path = os.path.join('/tmp', script_fn) + copy = dict(src=script_path, + dest=remote_path, + mode=0o555) + task = dict(copy=copy) + tasks.append(task) + + runner = dict(command=remote_path, + cwd=parameters['WORKSPACE'], + parameters=parameters) + task = dict(zuul_runner=runner) + task['name'] = ('zuul_runner with {{ timeout | int - elapsed_time }} ' + 'second timeout') + task['when'] = '{{ elapsed_time < timeout | int }}' + task['async'] = '{{ timeout | int - elapsed_time }}' + task['poll'] = 5 + tasks.append(task) + + filetask = dict(path=remote_path, + state='absent') + task = dict(file=filetask) + tasks.append(task) + + return tasks + + def _transformPublishers(self, jjb_job): + early_publishers = [] + late_publishers = [] + old_publishers = jjb_job.get('publishers', []) + for publisher in old_publishers: + early_scpfiles = [] + late_scpfiles = [] + if 'scp' not in publisher: + early_publishers.append(publisher) + continue + copy_console = False + for scpfile in publisher['scp']['files']: + if scpfile.get('copy-console'): + scpfile['keep-hierarchy'] = True + late_scpfiles.append(scpfile) + copy_console = True + else: + early_scpfiles.append(scpfile) + publisher['scp']['files'] = early_scpfiles + late_scpfiles + if copy_console: + late_publishers.append(publisher) + else: + early_publishers.append(publisher) + publishers = early_publishers + late_publishers + if old_publishers != publishers: + self.log.debug("Transformed job publishers") + return early_publishers, late_publishers + + def prepareAnsibleFiles(self, jobdir, gearman_job, args): + job_name = gearman_job.name.split(':')[1] + jjb_job = self.jobs[job_name] + + parameters = args.copy() + parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name) + + with open(jobdir.inventory, 'w') as inventory: + for host_name, host_vars in self.getHostList(): + inventory.write(host_name) + for k, v in host_vars.items(): + inventory.write(' %s=%s' % (k, v)) + inventory.write('\n') + + timeout = None + timeout_var = None + for wrapper in jjb_job.get('wrappers', []): + if isinstance(wrapper, dict): + build_timeout = wrapper.get('timeout') + if isinstance(build_timeout, dict): + timeout_var = build_timeout.get('timeout-var') + timeout = build_timeout.get('timeout') + if timeout is not None: + timeout = int(timeout) * 60 + if not timeout: + timeout = ANSIBLE_DEFAULT_TIMEOUT + if timeout_var: + parameters[timeout_var] = str(timeout * 1000) + + with open(jobdir.playbook, 'w') as playbook: + pre_tasks = [] + tasks = [] + main_block = [] + error_block = [] + variables = [] + + shellargs = "ssh-keyscan %s > %s" % ( + self.host, jobdir.known_hosts) + pre_tasks.append(dict(shell=shellargs, + delegate_to='127.0.0.1')) + + tasks.append(dict(block=main_block, + rescue=error_block)) + + task = dict(file=dict(path='/tmp/console.html', state='absent')) + main_block.append(task) + + task = dict(zuul_console=dict(path='/tmp/console.html', port=8088)) + main_block.append(task) + + task = dict(file=dict(path=parameters['WORKSPACE'], + state='directory')) + main_block.append(task) + + msg = [ + "Launched by %s" % self.manager_name, + "Building remotely on %s in workspace %s" % ( + self.name, parameters['WORKSPACE'])] + task = dict(zuul_log=dict(msg=msg)) + main_block.append(task) + + for builder in jjb_job.get('builders', []): + if 'shell' in builder: + main_block.extend( + self._makeBuilderTask(jobdir, builder, parameters)) + task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS")) + main_block.append(task) + + task = dict(zuul_log=dict(msg="Job complete, result: FAILURE")) + error_block.append(task) + error_block.append(dict(fail=dict(msg='FAILURE'))) + + variables.append(dict(timeout=timeout)) + play = dict(hosts='node', name='Job body', vars=variables, + pre_tasks=pre_tasks, tasks=tasks) + playbook.write(yaml.safe_dump([play], default_flow_style=False)) + + early_publishers, late_publishers = self._transformPublishers(jjb_job) + + with open(jobdir.post_playbook, 'w') as playbook: + blocks = [] + for publishers in [early_publishers, late_publishers]: + block = [] + for publisher in publishers: + if 'scp' in publisher: + block.extend(self._makeSCPTask(jobdir, publisher, + parameters)) + if 'ftp' in publisher: + block.extend(self._makeFTPTask(jobdir, publisher, + parameters)) + blocks.append(block) + + # The 'always' section contains the log publishing tasks, + # the 'block' contains all the other publishers. This way + # we run the log publisher regardless of whether the rest + # of the publishers succeed. + tasks = [] + tasks.append(dict(block=blocks[0], + always=blocks[1])) + + play = dict(hosts='node', name='Publishers', + tasks=tasks) + playbook.write(yaml.safe_dump([play], default_flow_style=False)) + + with open(jobdir.config, 'w') as config: + config.write('[defaults]\n') + config.write('hostfile = %s\n' % jobdir.inventory) + config.write('keep_remote_files = True\n') + config.write('local_tmp = %s/.ansible/tmp\n' % jobdir.root) + config.write('private_key_file = %s\n' % self.private_key_file) + config.write('retry_files_enabled = False\n') + config.write('log_path = %s\n' % jobdir.ansible_log) + config.write('gathering = explicit\n') + config.write('callback_plugins = %s\n' % self.callback_dir) + config.write('library = %s\n' % self.library_dir) + + config.write('[ssh_connection]\n') + ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \ + "-o UserKnownHostsFile=%s" % jobdir.known_hosts + config.write('ssh_args = %s\n' % ssh_args) + + return timeout + + def _ansibleTimeout(self, proc, msg): + self.log.warning(msg) + self.abortRunningProc(proc) + + def runAnsiblePlaybook(self, jobdir, timeout): + # Set LOGNAME env variable so Ansible log_path log reports + # the correct user. + env_copy = os.environ.copy() + env_copy['LOGNAME'] = 'zuul' + + if self.options['verbose']: + verbose = '-vvv' + else: + verbose = '-v' + + cmd = ['ansible-playbook', jobdir.playbook, verbose] + self.log.debug("Ansible command: %s" % (cmd,)) + + self.ansible_job_proc = subprocess.Popen( + cmd, + cwd=jobdir.ansible_root, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + preexec_fn=os.setsid, + env=env_copy, + ) + ret = None + watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE, + self._ansibleTimeout, + (self.ansible_job_proc, + "Ansible timeout exceeded")) + watchdog.start() + try: + for line in iter(self.ansible_job_proc.stdout.readline, b''): + line = line[:1024].rstrip() + self.log.debug("Ansible output: %s" % (line,)) + ret = self.ansible_job_proc.wait() + finally: + watchdog.stop() + self.log.debug("Ansible exit code: %s" % (ret,)) + self.ansible_job_proc = None + if ret == 3: + # AnsibleHostUnreachable: We had a network issue connecting to + # our zuul-worker. + return None + elif ret == -9: + # Received abort request. + return None + return ret == 0 + + def runAnsiblePostPlaybook(self, jobdir, success): + # Set LOGNAME env variable so Ansible log_path log reports + # the correct user. + env_copy = os.environ.copy() + env_copy['LOGNAME'] = 'zuul' + + if self.options['verbose']: + verbose = '-vvv' + else: + verbose = '-v' + + cmd = ['ansible-playbook', jobdir.post_playbook, + '-e', 'success=%s' % success, verbose] + self.log.debug("Ansible post command: %s" % (cmd,)) + + self.ansible_post_proc = subprocess.Popen( + cmd, + cwd=jobdir.ansible_root, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + preexec_fn=os.setsid, + env=env_copy, + ) + ret = None + watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT, + self._ansibleTimeout, + (self.ansible_post_proc, + "Ansible post timeout exceeded")) + watchdog.start() + try: + for line in iter(self.ansible_post_proc.stdout.readline, b''): + line = line[:1024].rstrip() + self.log.debug("Ansible post output: %s" % (line,)) + ret = self.ansible_post_proc.wait() + finally: + watchdog.stop() + self.log.debug("Ansible post exit code: %s" % (ret,)) + self.ansible_post_proc = None + return ret == 0 + + +class JJB(jenkins_jobs.builder.Builder): + def __init__(self): + self.global_config = None + self._plugins_list = [] + + def expandComponent(self, component_type, component, template_data): + component_list_type = component_type + 's' + new_components = [] + if isinstance(component, dict): + name, component_data = next(iter(component.items())) + if template_data: + component_data = jenkins_jobs.formatter.deep_format( + component_data, template_data, True) + else: + name = component + component_data = {} + + new_component = self.parser.data.get(component_type, {}).get(name) + if new_component: + for new_sub_component in new_component[component_list_type]: + new_components.extend( + self.expandComponent(component_type, + new_sub_component, component_data)) + else: + new_components.append({name: component_data}) + return new_components + + def expandMacros(self, job): + for component_type in ['builder', 'publisher', 'wrapper']: + component_list_type = component_type + 's' + new_components = [] + for new_component in job.get(component_list_type, []): + new_components.extend(self.expandComponent(component_type, + new_component, {})) + job[component_list_type] = new_components diff --git a/zuul/launcher/client.py b/zuul/launcher/client.py index cc39797369..8448422355 100644 --- a/zuul/launcher/client.py +++ b/zuul/launcher/client.py @@ -17,6 +17,7 @@ import inspect import json import logging import os +import six import time import threading from uuid import uuid4 @@ -193,6 +194,11 @@ class LaunchClient(object): port = config.get('gearman', 'port') else: port = 4730 + if config.has_option('gearman', 'check_job_registration'): + self.job_registration = config.getboolean( + 'gearman', 'check_job_registration') + else: + self.job_registration = True self.gearman = ZuulGearmanClient(self) self.gearman.addServer(server, port) @@ -260,7 +266,7 @@ class LaunchClient(object): s_config = {} s_config.update((k, v.format(item=item, job=job, change=item.change)) - if isinstance(v, basestring) + if isinstance(v, six.string_types) else (k, v) for k, v in s.items()) @@ -391,7 +397,8 @@ class LaunchClient(object): build.__gearman_job = gearman_job self.builds[uuid] = build - if not self.isJobRegistered(gearman_job.name): + if self.job_registration and not self.isJobRegistered( + gearman_job.name): self.log.error("Job %s is not registered with Gearman" % gearman_job) self.onBuildCompleted(gearman_job, 'NOT_REGISTERED') @@ -496,9 +503,6 @@ class LaunchClient(object): build.number = data.get('number') build.__gearman_manager = data.get('manager') self.sched.onBuildStarted(build) - - if job.denominator: - build.estimated_time = float(job.denominator) / 1000 else: self.log.error("Unable to find build %s" % job.unique) @@ -545,7 +549,7 @@ class LaunchClient(object): # us where the job is running. return False - if not self.isJobRegistered(name): + if self.job_registration and not self.isJobRegistered(name): return False desc_uuid = str(uuid4().hex) diff --git a/zuul/launcher/server.py b/zuul/launcher/server.py index 704d620a2f..c6a1b23dee 100644 --- a/zuul/launcher/server.py +++ b/zuul/launcher/server.py @@ -277,8 +277,8 @@ class LaunchServer(object): ) (out, err) = proc.communicate() ret = proc.wait() - print out - print err + print(out) + print(err) if ret == 0: return 'SUCCESS' else: diff --git a/zuul/lib/clonemapper.py b/zuul/lib/clonemapper.py index ae558cd8d0..57ac177100 100644 --- a/zuul/lib/clonemapper.py +++ b/zuul/lib/clonemapper.py @@ -19,6 +19,9 @@ import logging import os import re +import six + + OrderedDict = extras.try_imports(['collections.OrderedDict', 'ordereddict.OrderedDict']) @@ -59,17 +62,17 @@ class CloneMapper(object): raise Exception("Expansion error. Check error messages above") self.log.info("Mapping projects to workspace...") - for project, dest in ret.iteritems(): + for project, dest in six.iteritems(ret): dest = os.path.normpath(os.path.join(workspace, dest[0])) ret[project] = dest self.log.info(" %s -> %s", project, dest) self.log.debug("Checking overlap in destination directories...") check = defaultdict(list) - for project, dest in ret.iteritems(): + for project, dest in six.iteritems(ret): check[dest].append(project) - dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1) + dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1) if dupes: raise Exception("Some projects share the same destination: %s", dupes) diff --git a/zuul/lib/cloner.py b/zuul/lib/cloner.py index 257b95ded7..197c4260d5 100644 --- a/zuul/lib/cloner.py +++ b/zuul/lib/cloner.py @@ -19,7 +19,10 @@ import os import re import yaml +import six + from git import GitCommandError +from zuul import exceptions from zuul.lib.clonemapper import CloneMapper from zuul.merger.merger import Repo @@ -29,7 +32,8 @@ class Cloner(object): def __init__(self, git_base_url, projects, workspace, zuul_branch, zuul_ref, zuul_url, branch=None, clone_map_file=None, - project_branches=None, cache_dir=None): + project_branches=None, cache_dir=None, zuul_newrev=None, + zuul_project=None): self.clone_map = [] self.dests = None @@ -43,6 +47,10 @@ class Cloner(object): self.zuul_ref = zuul_ref or '' self.zuul_url = zuul_url self.project_branches = project_branches or {} + self.project_revisions = {} + + if zuul_newrev and zuul_project: + self.project_revisions[zuul_project] = zuul_newrev if clone_map_file: self.readCloneMap(clone_map_file) @@ -62,7 +70,7 @@ class Cloner(object): dests = mapper.expand(workspace=self.workspace) self.log.info("Preparing %s repositories", len(dests)) - for project, dest in dests.iteritems(): + for project, dest in six.iteritems(dests): self.prepareRepo(project, dest) self.log.info("Prepared all repositories") @@ -103,7 +111,14 @@ class Cloner(object): repo.fetchFrom(zuul_remote, ref) self.log.debug("Fetched ref %s from %s", ref, project) return True - except (ValueError, GitCommandError): + except ValueError: + self.log.debug("Project %s in Zuul does not have ref %s", + project, ref) + return False + except GitCommandError as error: + # Bail out if fetch fails due to infrastructure reasons + if error.stderr.startswith('fatal: unable to access'): + raise self.log.debug("Project %s in Zuul does not have ref %s", project, ref) return False @@ -112,10 +127,15 @@ class Cloner(object): """Clone a repository for project at dest and apply a reference suitable for testing. The reference lookup is attempted in this order: - 1) Zuul reference for the indicated branch - 2) Zuul reference for the master branch - 3) The tip of the indicated branch - 4) The tip of the master branch + 1) The indicated revision for specific project + 2) Zuul reference for the indicated branch + 3) Zuul reference for the master branch + 4) The tip of the indicated branch + 5) The tip of the master branch + + If an "indicated revision" is specified for this project, and we are + unable to meet this requirement, we stop attempting to check this + repo out and raise a zuul.exceptions.RevNotFound exception. The "indicated branch" is one of the following: @@ -135,6 +155,10 @@ class Cloner(object): # `git branch` is happy with. repo.reset() + indicated_revision = None + if project in self.project_revisions: + indicated_revision = self.project_revisions[project] + indicated_branch = self.branch or self.zuul_branch if project in self.project_branches: indicated_branch = self.project_branches[project] @@ -149,8 +173,9 @@ class Cloner(object): self.log.info("upstream repo has branch %s", indicated_branch) fallback_branch = indicated_branch else: - self.log.info("upstream repo is missing branch %s", - self.branch) + if indicated_branch: + self.log.info("upstream repo is missing branch %s", + indicated_branch) # FIXME should be origin HEAD branch which might not be 'master' fallback_branch = 'master' @@ -160,13 +185,26 @@ class Cloner(object): else: fallback_zuul_ref = None + # If the user has requested an explicit revision to be checked out, + # we use it above all else, and if we cannot satisfy this requirement + # we raise an error and do not attempt to continue. + if indicated_revision: + self.log.info("Attempting to check out revision %s for " + "project %s", indicated_revision, project) + try: + self.fetchFromZuul(repo, project, self.zuul_ref) + commit = repo.checkout(indicated_revision) + except (ValueError, GitCommandError): + raise exceptions.RevNotFound(project, indicated_revision) + self.log.info("Prepared '%s' repo at revision '%s'", project, + indicated_revision) # If we have a non empty zuul_ref to use, use it. Otherwise we fall # back to checking out the branch. - if ((override_zuul_ref and - self.fetchFromZuul(repo, project, override_zuul_ref)) or - (fallback_zuul_ref and - fallback_zuul_ref != override_zuul_ref and - self.fetchFromZuul(repo, project, fallback_zuul_ref))): + elif ((override_zuul_ref and + self.fetchFromZuul(repo, project, override_zuul_ref)) or + (fallback_zuul_ref and + fallback_zuul_ref != override_zuul_ref and + self.fetchFromZuul(repo, project, fallback_zuul_ref))): # Work around a bug in GitPython which can not parse FETCH_HEAD gitcmd = git.Git(dest) fetch_head = gitcmd.rev_parse('FETCH_HEAD') diff --git a/zuul/lib/commandsocket.py b/zuul/lib/commandsocket.py new file mode 100644 index 0000000000..1b7fed915b --- /dev/null +++ b/zuul/lib/commandsocket.py @@ -0,0 +1,83 @@ +# Copyright 2014 OpenStack Foundation +# Copyright 2014 Hewlett-Packard Development Company, L.P. +# Copyright 2016 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import socket +import threading +import Queue + + +class CommandSocket(object): + log = logging.getLogger("zuul.CommandSocket") + + def __init__(self, path): + self.running = False + self.path = path + self.queue = Queue.Queue() + + def start(self): + self.running = True + if os.path.exists(self.path): + os.unlink(self.path) + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.socket.bind(self.path) + self.socket.listen(1) + self.socket_thread = threading.Thread(target=self._socketListener) + self.socket_thread.daemon = True + self.socket_thread.start() + + def stop(self): + # First, wake up our listener thread with a connection and + # tell it to stop running. + self.running = False + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(self.path) + s.sendall('_stop\n') + # The command '_stop' will be ignored by our listener, so + # directly inject it into the queue so that consumers of this + # class which are waiting in .get() are awakened. They can + # either handle '_stop' or just ignore the unknown command and + # then check to see if they should continue to run before + # re-entering their loop. + self.queue.put('_stop') + self.socket_thread.join() + + def _socketListener(self): + while self.running: + try: + s, addr = self.socket.accept() + self.log.debug("Accepted socket connection %s" % (s,)) + buf = '' + while True: + buf += s.recv(1) + if buf[-1] == '\n': + break + buf = buf.strip() + self.log.debug("Received %s from socket" % (buf,)) + s.close() + # Because we use '_stop' internally to wake up a + # waiting thread, don't allow it to actually be + # injected externally. + if buf != '_stop': + self.queue.put(buf) + except Exception: + self.log.exception("Exception in socket handler") + + def get(self): + if not self.running: + raise Exception("CommandSocket.get called while stopped") + return self.queue.get() diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py index bc6a789cb6..a37907a16a 100644 --- a/zuul/lib/connections.py +++ b/zuul/lib/connections.py @@ -24,10 +24,11 @@ class ConnectionRegistry(object): def __init__(self): self.connections = {} - def registerScheduler(self, sched): + def registerScheduler(self, sched, load=True): for connection_name, connection in self.connections.items(): connection.registerScheduler(sched) - connection.onLoad() + if load: + connection.onLoad() def stop(self): for connection_name, connection in self.connections.items(): diff --git a/zuul/lib/gearserver.py b/zuul/lib/gearserver.py new file mode 100644 index 0000000000..9cddca346b --- /dev/null +++ b/zuul/lib/gearserver.py @@ -0,0 +1,35 @@ +# Copyright 2016 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import gear + +MASS_DO = 101 + + +class GearServer(gear.Server): + def handlePacket(self, packet): + if packet.ptype == MASS_DO: + self.log.info("Received packet from %s: %s" % (packet.connection, + packet)) + self.handleMassDo(packet) + else: + return super(GearServer, self).handlePacket(packet) + + def handleMassDo(self, packet): + packet.connection.functions = set() + for name in packet.data.split(b'\x00'): + self.log.debug("Adding function %s to %s" % ( + name, packet.connection)) + packet.connection.functions.add(name) + self.functions.add(name) diff --git a/zuul/lib/swift.py b/zuul/lib/swift.py index 3c411d3ff1..b5d3bc7164 100644 --- a/zuul/lib/swift.py +++ b/zuul/lib/swift.py @@ -19,8 +19,8 @@ from time import time import os import random import six +from six.moves import urllib import string -import urlparse class Swift(object): @@ -156,7 +156,7 @@ class Swift(object): url = os.path.join(self.storage_url, settings['container'], settings['file_path_prefix'], destination_prefix) - u = urlparse.urlparse(url) + u = urllib.parse.urlparse(url) hmac_body = '%s\n%s\n%s\n%s\n%s' % (u.path, redirect, settings['max_file_size'], diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index 7c9f1f9886..46fc468223 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -217,7 +217,7 @@ class Merger(object): fd.write('#!/bin/bash\n') fd.write('ssh -i %s $@\n' % key) fd.close() - os.chmod(name, 0755) + os.chmod(name, 0o755) def addProject(self, project, url): repo = None diff --git a/zuul/merger/server.py b/zuul/merger/server.py index 813c60241c..44606e7af5 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -19,7 +19,7 @@ import traceback import gear -import merger +from zuul.merger import merger class MergeServer(object): diff --git a/zuul/model.py b/zuul/model.py index 8d1c0bf776..2f16f68f68 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -13,7 +13,9 @@ # under the License. import copy +import os import re +import struct import time from uuid import uuid4 import extras @@ -121,7 +123,11 @@ class Pipeline(object): return job_tree def getProjects(self): - return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name)) + # cmp is not in python3, applied idiom from + # http://python-future.org/compatible_idioms.html#cmp + return sorted( + self.job_trees.keys(), + key=lambda p: p.name) def addQueue(self, queue): self.queues.append(queue) @@ -273,7 +279,7 @@ class Pipeline(object): items.extend(shared_queue.queue) return items - def formatStatusJSON(self): + def formatStatusJSON(self, url_pattern=None): j_pipeline = dict(name=self.name, description=self.description) j_queues = [] @@ -290,7 +296,7 @@ class Pipeline(object): if j_changes: j_queue['heads'].append(j_changes) j_changes = [] - j_changes.append(e.formatJSON()) + j_changes.append(e.formatJSON(url_pattern)) if (len(j_changes) > 1 and (j_changes[-2]['remaining_time'] is not None) and (j_changes[-1]['remaining_time'] is not None)): @@ -413,7 +419,7 @@ class ChangeQueue(object): elif self.window_decrease_type == 'exponential': self.window = max( self.window_floor, - self.window / self.window_decrease_factor) + int(self.window / self.window_decrease_factor)) class Project(object): @@ -766,7 +772,34 @@ class QueueItem(object): return [] return self.job_tree.getJobs() - def formatJSON(self): + def formatJobResult(self, job, url_pattern=None): + build = self.current_build_set.getBuild(job.name) + result = build.result + pattern = url_pattern + if result == 'SUCCESS': + if job.success_message: + result = job.success_message + if job.success_pattern: + pattern = job.success_pattern + elif result == 'FAILURE': + if job.failure_message: + result = job.failure_message + if job.failure_pattern: + pattern = job.failure_pattern + url = None + if pattern: + try: + url = pattern.format(change=self.change, + pipeline=self.pipeline, + job=job, + build=build) + except Exception: + pass # FIXME: log this or something? + if not url: + url = build.url or job.name + return (result, url) + + def formatJSON(self, url_pattern=None): changeish = self.change ret = {} ret['active'] = self.active @@ -803,11 +836,13 @@ class QueueItem(object): elapsed = None remaining = None result = None - url = None + build_url = None + report_url = None worker = None if build: result = build.result - url = build.url + build_url = build.url + (unused, report_url) = self.formatJobResult(job, url_pattern) if build.start_time: if build.end_time: elapsed = int((build.end_time - @@ -835,7 +870,8 @@ class QueueItem(object): 'name': job.name, 'elapsed_time': elapsed, 'remaining_time': remaining, - 'url': url, + 'url': build_url, + 'report_url': report_url, 'result': result, 'voting': job.voting, 'uuid': build.uuid if build else None, @@ -1085,7 +1121,7 @@ class BaseFilter(object): for a in approvals: for k, v in a.items(): if k == 'username': - pass + a['username'] = re.compile(v) elif k in ['email', 'email-filter']: a['email'] = re.compile(v) elif k == 'newer-than': @@ -1104,7 +1140,7 @@ class BaseFilter(object): by = approval.get('by', {}) for k, v in rapproval.items(): if k == 'username': - if (by.get('username', '') != v): + if (not v.search(by.get('username', ''))): return False elif k == 'email': if (not v.search(by.get('email', ''))): @@ -1517,3 +1553,78 @@ class Tenant(object): class Abide(object): def __init__(self): self.tenants = OrderedDict() + + +class JobTimeData(object): + format = 'B10H10H10B' + version = 0 + + def __init__(self, path): + self.path = path + self.success_times = [0 for x in range(10)] + self.failure_times = [0 for x in range(10)] + self.results = [0 for x in range(10)] + + def load(self): + if not os.path.exists(self.path): + return + with open(self.path) as f: + data = struct.unpack(self.format, f.read()) + version = data[0] + if version != self.version: + raise Exception("Unkown data version") + self.success_times = list(data[1:11]) + self.failure_times = list(data[11:21]) + self.results = list(data[21:32]) + + def save(self): + tmpfile = self.path + '.tmp' + data = [self.version] + data.extend(self.success_times) + data.extend(self.failure_times) + data.extend(self.results) + data = struct.pack(self.format, *data) + with open(tmpfile, 'w') as f: + f.write(data) + os.rename(tmpfile, self.path) + + def add(self, elapsed, result): + elapsed = int(elapsed) + if result == 'SUCCESS': + self.success_times.append(elapsed) + self.success_times.pop(0) + result = 0 + else: + self.failure_times.append(elapsed) + self.failure_times.pop(0) + result = 1 + self.results.append(result) + self.results.pop(0) + + def getEstimatedTime(self): + times = [x for x in self.success_times if x] + if times: + return float(sum(times)) / len(times) + return 0.0 + + +class TimeDataBase(object): + def __init__(self, root): + self.root = root + self.jobs = {} + + def _getTD(self, name): + td = self.jobs.get(name) + if not td: + td = JobTimeData(os.path.join(self.root, name)) + self.jobs[name] = td + td.load() + return td + + def getEstimatedTime(self, name): + return self._getTD(name).getEstimatedTime() + + def update(self, name, elapsed, result): + td = self._getTD(name) + td.add(elapsed, result) + td.save() diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py index 8622fe4ac1..6352fbb59c 100644 --- a/zuul/reporter/__init__.py +++ b/zuul/reporter/__init__.py @@ -13,6 +13,7 @@ # under the License. import abc +import logging import six @@ -24,6 +25,8 @@ class BaseReporter(object): Defines the exact public methods that must be supplied. """ + log = logging.getLogger("zuul.reporter.BaseReporter") + def __init__(self, reporter_config={}, connection=None): self.reporter_config = reporter_config self.connection = connection @@ -107,25 +110,7 @@ class BaseReporter(object): for job in pipeline.getJobs(item): build = item.current_build_set.getBuild(job.name) - result = build.result - pattern = url_pattern - if result == 'SUCCESS': - if job.success_message: - result = job.success_message - if job.success_pattern: - pattern = job.success_pattern - elif result == 'FAILURE': - if job.failure_message: - result = job.failure_message - if job.failure_pattern: - pattern = job.failure_pattern - if pattern: - url = pattern.format(change=item.change, - pipeline=pipeline, - job=job, - build=build) - else: - url = build.url or job.name + (result, url) = item.formatJobResult(job, url_pattern) if not job.voting: voting = ' (non-voting)' else: diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index d54da9f38e..716dcfb534 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -21,7 +21,7 @@ import traceback import gear import six -import model +from zuul import model class RPCListener(object): @@ -40,11 +40,11 @@ class RPCListener(object): port = 4730 self.worker = gear.Worker('Zuul RPC Listener') self.worker.addServer(server, port) + self.worker.waitForServer() + self.register() self.thread = threading.Thread(target=self.run) self.thread.daemon = True self.thread.start() - self.worker.waitForServer() - self.register() def register(self): self.worker.registerFunction("zuul:enqueue") @@ -66,8 +66,8 @@ class RPCListener(object): while self._running: try: job = self.worker.getJob() - z, jobname = job.name.split(':') self.log.debug("Received job %s" % job.name) + z, jobname = job.name.split(':') attrname = 'handle_' + jobname if hasattr(self, attrname): f = getattr(self, attrname) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 1bd746cb96..7f7875e16f 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -20,14 +20,15 @@ import json import logging import os import pickle +import six from six.moves import queue as Queue import sys import threading import time -import configloader -import model -from model import Project +from zuul import configloader +from zuul import model +from zuul.model import Project from zuul import exceptions from zuul import version as zuul_version @@ -100,12 +101,10 @@ class ManagementEvent(object): """An event that should be processed within the main queue run loop""" def __init__(self): self._wait_event = threading.Event() - self._exception = None - self._traceback = None + self._exc_info = None - def exception(self, e, tb): - self._exception = e - self._traceback = tb + def exception(self, exc_info): + self._exc_info = exc_info self._wait_event.set() def done(self): @@ -113,8 +112,8 @@ class ManagementEvent(object): def wait(self, timeout=None): self._wait_event.wait(timeout) - if self._exception: - raise self._exception, None, self._traceback + if self._exc_info: + six.reraise(*self._exc_info) return self._wait_event.is_set() @@ -211,7 +210,7 @@ def toList(item): class Scheduler(threading.Thread): log = logging.getLogger("zuul.Scheduler") - def __init__(self, config): + def __init__(self, config, testonly=False): threading.Thread.__init__(self) self.daemon = True self.wake_event = threading.Event() @@ -238,6 +237,10 @@ class Scheduler(threading.Thread): self.management_event_queue = Queue.Queue() self.abide = model.Abide() + if not testonly: + time_dir = self._get_time_database_dir() + self.time_database = model.TimeDataBase(time_dir) + self.zuul_version = zuul_version.version_info.release_string() self.last_reconfigured = None @@ -251,9 +254,11 @@ class Scheduler(threading.Thread): # registerConnections as we don't want to do the onLoad event yet. return self._parseConfig(config_path, connections) - def registerConnections(self, connections): + def registerConnections(self, connections, load=True): + # load: whether or not to trigger the onLoad for the connection. This + # is useful for not doing a full load during layout validation. self.connections = connections - self.connections.registerScheduler(self) + self.connections.registerScheduler(self, load) def stopConnections(self): self.connections.stop() @@ -388,6 +393,17 @@ class Scheduler(threading.Thread): state_dir = '/var/lib/zuul' return os.path.join(state_dir, 'queue.pickle') + def _get_time_database_dir(self): + if self.config.has_option('zuul', 'state_dir'): + state_dir = os.path.expanduser(self.config.get('zuul', + 'state_dir')) + else: + state_dir = '/var/lib/zuul' + d = os.path.join(state_dir, 'times') + if not os.path.exists(d): + os.mkdir(d) + return d + def _save_queue(self): pickle_file = self._get_queue_pickle_file() events = [] @@ -687,8 +703,8 @@ class Scheduler(threading.Thread): else: self.log.error("Unable to handle event %s" % event) event.done() - except Exception as e: - event.exception(e, sys.exc_info()[2]) + except Exception: + event.exception(sys.exc_info()) self.management_event_queue.task_done() def process_result_queue(self): @@ -718,6 +734,11 @@ class Scheduler(threading.Thread): self.log.warning("Build %s is not associated with a pipeline" % (build,)) return + try: + build.estimated_time = float(self.time_database.getEstimatedTime( + build.job.name)) + except Exception: + self.log.exception("Exception estimating build time:") pipeline.manager.onBuildStarted(event.build) def _doBuildCompletedEvent(self, event): @@ -731,6 +752,13 @@ class Scheduler(threading.Thread): self.log.warning("Build %s is not associated with a pipeline" % (build,)) return + if build.end_time and build.start_time and build.result: + duration = build.end_time - build.start_time + try: + self.time_database.update( + build.job.name, duration, build.result) + except Exception: + self.log.exception("Exception recording build time:") pipeline.manager.onBuildCompleted(event.build) def _doMergeCompletedEvent(self, event): @@ -747,6 +775,11 @@ class Scheduler(threading.Thread): def formatStatusJSON(self): # TODOv3(jeblair): use tenants + if self.config.has_option('zuul', 'url_pattern'): + url_pattern = self.config.get('zuul', 'url_pattern') + else: + url_pattern = None + data = {} data['zuul_version'] = self.zuul_version @@ -772,5 +805,5 @@ class Scheduler(threading.Thread): pipelines = [] data['pipelines'] = pipelines for pipeline in self.layout.pipelines.values(): - pipelines.append(pipeline.formatStatusJSON()) + pipelines.append(pipeline.formatStatusJSON(url_pattern)) return json.dumps(data) diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py index db3894c01b..94a406b6a1 100644 --- a/zuul/trigger/timer.py +++ b/zuul/trigger/timer.py @@ -40,8 +40,8 @@ class TimerTrigger(BaseTrigger): self.log.debug("Adding event %s" % event) self.connection.sched.addEvent(event) - def _shutdown(self): - self.apsched.stop() + def stop(self): + self.apsched.shutdown() def getEventFilters(self, trigger_conf): def toList(item): diff --git a/zuul/webapp.py b/zuul/webapp.py index 44c333bf95..c1c848b211 100644 --- a/zuul/webapp.py +++ b/zuul/webapp.py @@ -43,16 +43,19 @@ array of changes, they will not include the queue structure. class WebApp(threading.Thread): log = logging.getLogger("zuul.WebApp") - def __init__(self, scheduler, port=8001, cache_expiry=1): + def __init__(self, scheduler, port=8001, cache_expiry=1, + listen_address='0.0.0.0'): threading.Thread.__init__(self) self.scheduler = scheduler + self.listen_address = listen_address self.port = port self.cache_expiry = cache_expiry self.cache_time = 0 self.cache = None self.daemon = True - self.server = httpserver.serve(dec.wsgify(self.app), host='0.0.0.0', - port=self.port, start_loop=False) + self.server = httpserver.serve( + dec.wsgify(self.app), host=self.listen_address, port=self.port, + start_loop=False) def run(self): self.server.serve_forever()