Merge branch 'master' into workingv3

This includes forward-porting changes to launcher/server.py with the
exception of the pre/post playbooks changes which will be done in a
follow up commit as they have deviated.

Change-Id: I13aa229c1460b748745babe178c0a745e52f841c
This commit is contained in:
Joshua Hesketh 2016-11-21 17:36:44 +11:00
commit 3f7def3424
19 changed files with 895 additions and 394 deletions

View File

@ -1,4 +1,4 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
test_command=OS_LOG_LEVEL=${OS_LOG_LEVEL:-INFO} OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -10,6 +10,11 @@ preparation for the third major version of Zuul. We call this effort
Contributing
------------
We are currently engaged in a significant development effort in
preparation for the third major version of Zuul. We call this effort
`Zuul v3`_ and it is described in this file in the `feature/zuulv3`
branch of this repo.
To browse the latest code, see: https://git.openstack.org/cgit/openstack-infra/zuul/tree/
To clone the latest code, use `git clone git://git.openstack.org/openstack-infra/zuul`

View File

@ -798,6 +798,11 @@ each job as it builds a list from the project specification.
Boolean value (``true`` or ``false``) that indicates whatever
a job is voting or not. Default: ``true``.
**attempts (optional)**
Number of attempts zuul will launch a job. Once reached, zuul will report
RETRY_LIMIT as the job result.
Defaults to 3.
**tags (optional)**
A list of arbitrary strings which will be associated with the job.
Can be used by the parameter-function to alter behavior based on

View File

@ -3,7 +3,6 @@ hacking>=0.9.2,<0.10
coverage>=3.6
sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3
sphinxcontrib-blockdiag>=1.1.0
discover
fixtures>=0.3.14
python-keystoneclient>=0.4.2
python-subunit

View File

@ -561,6 +561,7 @@ class FakeBuild(object):
self.wait_condition = threading.Condition()
self.waiting = False
self.aborted = False
self.requeue = False
self.created = time.time()
self.run_error = False
self.changes = None
@ -616,6 +617,8 @@ class FakeBuild(object):
result = 'FAILURE'
if self.aborted:
result = 'ABORTED'
if self.requeue:
result = None
if self.run_error:
result = 'RUN_ERROR'
@ -885,8 +888,19 @@ class BaseTestCase(testtools.TestCase):
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
os.environ.get('OS_LOG_CAPTURE') == '1'):
log_level = logging.DEBUG
if os.environ.get('OS_LOG_LEVEL') == 'DEBUG':
log_level = logging.DEBUG
elif os.environ.get('OS_LOG_LEVEL') == 'INFO':
log_level = logging.INFO
elif os.environ.get('OS_LOG_LEVEL') == 'WARNING':
log_level = logging.WARNING
elif os.environ.get('OS_LOG_LEVEL') == 'ERROR':
log_level = logging.ERROR
elif os.environ.get('OS_LOG_LEVEL') == 'CRITICAL':
log_level = logging.CRITICAL
self.useFixture(fixtures.FakeLogger(
level=logging.DEBUG,
level=log_level,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))

View File

@ -0,0 +1,30 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
- name: post
manager: IndependentPipelineManager
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
jobs:
- name: project-test1
attempts: 4
projects:
- name: org/project
check:
- project-merge:
- project-test1
- project-test2

View File

@ -4607,6 +4607,40 @@ For CI problems and help debugging, contact ci@example.org"""
'- docs-draft-test2 https://server/job/docs-draft-test2/1/',
body[3])
@skip("Disabled for early v3 development")
def test_rerun_on_abort(self):
"Test that if a worker fails to run a job, it is run again"
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-abort-attempts.yaml')
self.sched.reconfigure(self.config)
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 2)
self.builds[0].requeue = True
self.worker.release('.*-test*')
self.waitUntilSettled()
for x in range(3):
self.assertEqual(len(self.builds), 1)
self.builds[0].requeue = True
self.worker.release('.*-test1')
self.waitUntilSettled()
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
self.assertEqual(len(self.history), 6)
self.assertEqual(self.countJobResults(self.history, 'SUCCESS'), 2)
self.assertEqual(A.reported, 1)
self.assertIn('RETRY_LIMIT', A.messages[0])
class TestDuplicatePipeline(ZuulTestCase):
tenant_config_file = 'config/duplicate-pipeline/main.yaml'

View File

@ -0,0 +1,469 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Red Hat, Inc.
# Copyright (c) 2016 IBM Corp.
# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>, and others
# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
# flake8: noqa
# This file shares a significant chunk of code with an upstream ansible
# function, run_command. The goal is to not have to fork quite so much
# of that function, and discussing that design with upstream means we
# should keep the changes to substantive ones only. For that reason, this
# file is purposely not enforcing pep8, as making the function pep8 clean
# would remove our ability to easily have a discussion with our friends
# upstream
DOCUMENTATION = '''
---
module: command
short_description: Executes a command on a remote node
version_added: historical
description:
- The M(command) module takes the command name followed by a list of space-delimited arguments.
- The given command will be executed on all selected nodes. It will not be
processed through the shell, so variables like C($HOME) and operations
like C("<"), C(">"), C("|"), C(";") and C("&") will not work (use the M(shell)
module if you need these features).
options:
free_form:
description:
- the command module takes a free form command to run. There is no parameter actually named 'free form'.
See the examples!
required: true
default: null
creates:
description:
- a filename or (since 2.0) glob pattern, when it already exists, this step will B(not) be run.
required: no
default: null
removes:
description:
- a filename or (since 2.0) glob pattern, when it does not exist, this step will B(not) be run.
version_added: "0.8"
required: no
default: null
chdir:
description:
- cd into this directory before running the command
version_added: "0.6"
required: false
default: null
executable:
description:
- change the shell used to execute the command. Should be an absolute path to the executable.
required: false
default: null
version_added: "0.9"
warn:
version_added: "1.8"
default: yes
description:
- if command warnings are on in ansible.cfg, do not warn about this particular line if set to no/false.
required: false
notes:
- If you want to run a command through the shell (say you are using C(<),
C(>), C(|), etc), you actually want the M(shell) module instead. The
M(command) module is much more secure as it's not affected by the user's
environment.
- " C(creates), C(removes), and C(chdir) can be specified after the command. For instance, if you only want to run a command if a certain file does not exist, use this."
author:
- Ansible Core Team
- Michael DeHaan
'''
EXAMPLES = '''
# Example from Ansible Playbooks.
- command: /sbin/shutdown -t now
# Run the command if the specified file does not exist.
- command: /usr/bin/make_database.sh arg1 arg2 creates=/path/to/database
# You can also use the 'args' form to provide the options. This command
# will change the working directory to somedir/ and will only run when
# /path/to/database doesn't exist.
- command: /usr/bin/make_database.sh arg1 arg2
args:
chdir: somedir/
creates: /path/to/database
'''
import datetime
import glob
import pipes
import re
import shlex
import os
import getpass
import select
import subprocess
import traceback
import threading
from ansible.module_utils.basic import AnsibleModule, heuristic_log_sanitize
from ansible.module_utils.basic import get_exception
# ZUUL: Hardcode python2 until we're on ansible 2.2
from ast import literal_eval
PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?')
class Console(object):
def __enter__(self):
self.logfile = open('/tmp/console.html', 'a', 0)
return self
def __exit__(self, etype, value, tb):
self.logfile.close()
def addLine(self, ln):
# Note this format with deliminator is "inspired" by the old
# Jenkins format but with microsecond resolution instead of
# millisecond. It is kept so log parsing/formatting remains
# consistent.
ts = datetime.datetime.now()
outln = '%s | %s' % (ts, ln)
self.logfile.write(outln)
def follow(fd):
newline_warning = False
with Console() as console:
while True:
line = fd.readline()
if not line:
break
if not line.endswith('\n'):
line += '\n'
newline_warning = True
console.addLine(line)
if newline_warning:
console.addLine('[Zuul] No trailing newline\n')
# Taken from ansible/module_utils/basic.py ... forking the method for now
# so that we can dive in and figure out how to make appropriate hook points
def zuul_run_command(self, args, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None, use_unsafe_shell=False, prompt_regex=None, environ_update=None):
'''
Execute a command, returns rc, stdout, and stderr.
:arg args: is the command to run
* If args is a list, the command will be run with shell=False.
* If args is a string and use_unsafe_shell=False it will split args to a list and run with shell=False
* If args is a string and use_unsafe_shell=True it runs with shell=True.
:kw check_rc: Whether to call fail_json in case of non zero RC.
Default False
:kw close_fds: See documentation for subprocess.Popen(). Default True
:kw executable: See documentation for subprocess.Popen(). Default None
:kw data: If given, information to write to the stdin of the command
:kw binary_data: If False, append a newline to the data. Default False
:kw path_prefix: If given, additional path to find the command in.
This adds to the PATH environment vairable so helper commands in
the same directory can also be found
:kw cwd: If given, working directory to run the command inside
:kw use_unsafe_shell: See `args` parameter. Default False
:kw prompt_regex: Regex string (not a compiled regex) which can be
used to detect prompts in the stdout which would otherwise cause
the execution to hang (especially if no input data is specified)
:kwarg environ_update: dictionary to *update* os.environ with
'''
shell = False
if isinstance(args, list):
if use_unsafe_shell:
args = " ".join([pipes.quote(x) for x in args])
shell = True
elif isinstance(args, (str, unicode)) and use_unsafe_shell:
shell = True
elif isinstance(args, (str, unicode)):
# On python2.6 and below, shlex has problems with text type
# ZUUL: Hardcode python2 until we're on ansible 2.2
if isinstance(args, unicode):
args = args.encode('utf-8')
args = shlex.split(args)
else:
msg = "Argument 'args' to run_command must be list or string"
self.fail_json(rc=257, cmd=args, msg=msg)
prompt_re = None
if prompt_regex:
try:
prompt_re = re.compile(prompt_regex, re.MULTILINE)
except re.error:
self.fail_json(msg="invalid prompt regular expression given to run_command")
# expand things like $HOME and ~
if not shell:
args = [ os.path.expanduser(os.path.expandvars(x)) for x in args if x is not None ]
rc = 0
msg = None
st_in = None
# Manipulate the environ we'll send to the new process
old_env_vals = {}
# We can set this from both an attribute and per call
for key, val in self.run_command_environ_update.items():
old_env_vals[key] = os.environ.get(key, None)
os.environ[key] = val
if environ_update:
for key, val in environ_update.items():
old_env_vals[key] = os.environ.get(key, None)
os.environ[key] = val
if path_prefix:
old_env_vals['PATH'] = os.environ['PATH']
os.environ['PATH'] = "%s:%s" % (path_prefix, os.environ['PATH'])
# If using test-module and explode, the remote lib path will resemble ...
# /tmp/test_module_scratch/debug_dir/ansible/module_utils/basic.py
# If using ansible or ansible-playbook with a remote system ...
# /tmp/ansible_vmweLQ/ansible_modlib.zip/ansible/module_utils/basic.py
# Clean out python paths set by ansiballz
if 'PYTHONPATH' in os.environ:
pypaths = os.environ['PYTHONPATH'].split(':')
pypaths = [x for x in pypaths \
if not x.endswith('/ansible_modlib.zip') \
and not x.endswith('/debug_dir')]
os.environ['PYTHONPATH'] = ':'.join(pypaths)
if not os.environ['PYTHONPATH']:
del os.environ['PYTHONPATH']
# create a printable version of the command for use
# in reporting later, which strips out things like
# passwords from the args list
to_clean_args = args
# ZUUL: Hardcode python2 until we're on ansible 2.2
if isinstance(args, (unicode, str)):
to_clean_args = shlex.split(to_clean_args)
clean_args = []
is_passwd = False
for arg in to_clean_args:
if is_passwd:
is_passwd = False
clean_args.append('********')
continue
if PASSWD_ARG_RE.match(arg):
sep_idx = arg.find('=')
if sep_idx > -1:
clean_args.append('%s=********' % arg[:sep_idx])
continue
else:
is_passwd = True
arg = heuristic_log_sanitize(arg, self.no_log_values)
clean_args.append(arg)
clean_args = ' '.join(pipes.quote(arg) for arg in clean_args)
if data:
st_in = subprocess.PIPE
# ZUUL: changed stderr to follow stdout
kwargs = dict(
executable=executable,
shell=shell,
close_fds=close_fds,
stdin=st_in,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if cwd and os.path.isdir(cwd):
kwargs['cwd'] = cwd
# store the pwd
prev_dir = os.getcwd()
# make sure we're in the right working directory
if cwd and os.path.isdir(cwd):
try:
os.chdir(cwd)
except (OSError, IOError):
e = get_exception()
self.fail_json(rc=e.errno, msg="Could not open %s, %s" % (cwd, str(e)))
try:
if self._debug:
if isinstance(args, list):
running = ' '.join(args)
else:
running = args
self.log('Executing: ' + running)
# ZUUL: Replaced the excution loop with the zuul_runner run function
cmd = subprocess.Popen(args, **kwargs)
t = threading.Thread(target=follow, args=(cmd.stdout,))
t.daemon = True
t.start()
ret = cmd.wait()
# Give the thread that is writing the console log up to 10 seconds
# to catch up and exit. If it hasn't done so by then, it is very
# likely stuck in readline() because it spawed a child that is
# holding stdout or stderr open.
t.join(10)
with Console() as console:
if t.isAlive():
console.addLine("[Zuul] standard output/error still open "
"after child exited")
console.addLine("[Zuul] Task exit code: %s\n" % ret)
# ZUUL: If the console log follow thread *is* stuck in readline,
# we can't close stdout (attempting to do so raises an
# exception) , so this is disabled.
# cmd.stdout.close()
# ZUUL: stdout and stderr are in the console log file
stdout = ''
stderr = ''
rc = cmd.returncode
except (OSError, IOError):
e = get_exception()
self.fail_json(rc=e.errno, msg=str(e), cmd=clean_args)
except Exception:
e = get_exception()
self.fail_json(rc=257, msg=str(e), exception=traceback.format_exc(), cmd=clean_args)
# Restore env settings
for key, val in old_env_vals.items():
if val is None:
del os.environ[key]
else:
os.environ[key] = val
if rc != 0 and check_rc:
msg = heuristic_log_sanitize(stderr.rstrip(), self.no_log_values)
self.fail_json(cmd=clean_args, rc=rc, stdout=stdout, stderr=stderr, msg=msg)
# reset the pwd
os.chdir(prev_dir)
return (rc, stdout, stderr)
def check_command(commandline):
arguments = { 'chown': 'owner', 'chmod': 'mode', 'chgrp': 'group',
'ln': 'state=link', 'mkdir': 'state=directory',
'rmdir': 'state=absent', 'rm': 'state=absent', 'touch': 'state=touch' }
commands = { 'hg': 'hg', 'curl': 'get_url or uri', 'wget': 'get_url or uri',
'svn': 'subversion', 'service': 'service',
'mount': 'mount', 'rpm': 'yum, dnf or zypper', 'yum': 'yum', 'apt-get': 'apt',
'tar': 'unarchive', 'unzip': 'unarchive', 'sed': 'template or lineinfile',
'dnf': 'dnf', 'zypper': 'zypper' }
become = [ 'sudo', 'su', 'pbrun', 'pfexec', 'runas' ]
warnings = list()
command = os.path.basename(commandline.split()[0])
if command in arguments:
warnings.append("Consider using file module with %s rather than running %s" % (arguments[command], command))
if command in commands:
warnings.append("Consider using %s module rather than running %s" % (commands[command], command))
if command in become:
warnings.append("Consider using 'become', 'become_method', and 'become_user' rather than running %s" % (command,))
return warnings
def main():
# the command module is the one ansible module that does not take key=value args
# hence don't copy this one if you are looking to build others!
module = AnsibleModule(
argument_spec=dict(
_raw_params = dict(),
_uses_shell = dict(type='bool', default=False),
chdir = dict(type='path'),
executable = dict(),
creates = dict(type='path'),
removes = dict(type='path'),
warn = dict(type='bool', default=True),
environ = dict(type='dict', default=None),
)
)
shell = module.params['_uses_shell']
chdir = module.params['chdir']
executable = module.params['executable']
args = module.params['_raw_params']
creates = module.params['creates']
removes = module.params['removes']
warn = module.params['warn']
environ = module.params['environ']
if args.strip() == '':
module.fail_json(rc=256, msg="no command given")
if chdir:
chdir = os.path.abspath(chdir)
os.chdir(chdir)
if creates:
# do not run the command if the line contains creates=filename
# and the filename already exists. This allows idempotence
# of command executions.
if glob.glob(creates):
module.exit_json(
cmd=args,
stdout="skipped, since %s exists" % creates,
changed=False,
rc=0
)
if removes:
# do not run the command if the line contains removes=filename
# and the filename does not exist. This allows idempotence
# of command executions.
if not glob.glob(removes):
module.exit_json(
cmd=args,
stdout="skipped, since %s does not exist" % removes,
changed=False,
rc=0
)
warnings = list()
if warn:
warnings = check_command(args)
if not shell:
args = shlex.split(args)
startd = datetime.datetime.now()
rc, out, err = zuul_run_command(module, args, executable=executable, use_unsafe_shell=shell, environ_update=environ)
endd = datetime.datetime.now()
delta = endd - startd
if out is None:
out = ''
if err is None:
err = ''
module.exit_json(
cmd = args,
stdout = out.rstrip("\r\n"),
stderr = err.rstrip("\r\n"),
rc = rc,
start = str(startd),
end = str(endd),
delta = str(delta),
changed = True,
warnings = warnings
)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,121 @@
#!/usr/bin/python
# Copyright (c) 2016 Red Hat
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import os
import subprocess
def afs_sync(afsuser, afskeytab, afsroot, afssource, afstarget):
# Find the list of root markers in the just-completed build
# (usually there will only be one, but some builds produce content
# at the root *and* at a tag location, or possibly at multiple
# translation roots).
src_root_markers = []
for root, dirnames, filenames in os.walk(afssource):
if '.root-marker' in filenames:
src_root_markers.append(root)
output_blocks = []
# Synchronize the content at each root marker.
for root_count, src_root in enumerate(src_root_markers):
# The component of the path between the source root and the
# current source root marker. May be '.' if there is a marker
# at the root.
subpath = os.path.relpath(src_root, afssource)
# Add to our debugging output
output = dict(subpath=subpath)
output_blocks.append(output)
# The absolute path to the source (in staging) and destination
# (in afs) of the build root for the current root marker.
subsource = os.path.abspath(os.path.join(afssource, subpath))
subtarget = os.path.abspath(os.path.join(afstarget, subpath))
# Create a filter list for rsync so that we copy exactly the
# directories we want to without deleting any existing
# directories in the published site that were placed there by
# previous builds.
# Exclude any directories under this subpath which have root
# markers.
excludes = []
for root, dirnames, filenames in os.walk(subtarget):
if '.root-marker' in filenames:
exclude_subpath = os.path.relpath(root, subtarget)
if exclude_subpath == '.':
continue
excludes.append(os.path.join('/', exclude_subpath))
output['excludes'] = excludes
filter_file = os.path.join(afsroot, 'filter_%i' % root_count)
with open(filter_file, 'w') as f:
for exclude in excludes:
f.write('- %s\n' % exclude)
# Perform the rsync with the filter list.
rsync_cmd = ' '.join([
'/usr/bin/rsync', '-rtp', '--safe-links', '--delete-after',
"--out-format='<<CHANGED>>%i %n%L'",
"--filter='merge {filter}'", '{src}/', '{dst}/',
])
mkdir_cmd = ' '.join(['mkdir', '-p', '{dst}/'])
bash_cmd = ' '.join([
'/bin/bash', '-c', '"{mkdir_cmd} && {rsync_cmd}"'
]).format(
mkdir_cmd=mkdir_cmd,
rsync_cmd=rsync_cmd)
k5start_cmd = ' '.join([
'/usr/bin/k5start', '-t', '-f', '{keytab}', '{user}', '--',
bash_cmd,
])
shell_cmd = k5start_cmd.format(
src=subsource,
dst=subtarget,
filter=filter_file,
user=afsuser,
keytab=afskeytab),
output['source'] = subsource
output['destination'] = subtarget
output['output'] = subprocess.check_output(shell_cmd, shell=True)
return output_blocks
def main():
module = AnsibleModule(
argument_spec=dict(
user=dict(required=True, type='raw'),
keytab=dict(required=True, type='raw'),
root=dict(required=True, type='raw'),
source=dict(required=True, type='raw'),
target=dict(required=True, type='raw'),
)
)
p = module.params
output = afs_sync(p['user'], p['keytab'], p['root'],
p['source'], p['target'])
module.exit_json(changed=True, build_roots=output)
from ansible.module_utils.basic import * # noqa
if __name__ == '__main__':
main()

View File

@ -1,131 +0,0 @@
#!/usr/bin/python
# Copyright (c) 2016 IBM Corp.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import datetime
import getpass
import os
import subprocess
import threading
class Console(object):
def __enter__(self):
self.logfile = open('/tmp/console.html', 'a', 0)
return self
def __exit__(self, etype, value, tb):
self.logfile.close()
def addLine(self, ln):
# Note this format with deliminator is "inspired" by the old
# Jenkins format but with microsecond resolution instead of
# millisecond. It is kept so log parsing/formatting remains
# consistent.
ts = datetime.datetime.now()
outln = '%s | %s' % (ts, ln)
self.logfile.write(outln)
def get_env():
env = {}
env['HOME'] = os.path.expanduser('~')
env['USER'] = getpass.getuser()
# Known locations for PAM mod_env sources
for fn in ['/etc/environment', '/etc/default/locale']:
if os.path.exists(fn):
with open(fn) as f:
for line in f:
if not line:
continue
if line[0] == '#':
continue
if '=' not in line:
continue
k, v = line.strip().split('=')
for q in ["'", '"']:
if v[0] == q:
v = v.strip(q)
env[k] = v
return env
def follow(fd):
newline_warning = False
with Console() as console:
while True:
line = fd.readline()
if not line:
break
if not line.endswith('\n'):
line += '\n'
newline_warning = True
console.addLine(line)
if newline_warning:
console.addLine('[Zuul] No trailing newline\n')
def run(cwd, cmd, args):
env = get_env()
env.update(args)
proc = subprocess.Popen(
['/bin/bash', '-l', '-c', cmd],
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
)
t = threading.Thread(target=follow, args=(proc.stdout,))
t.daemon = True
t.start()
ret = proc.wait()
# Give the thread that is writing the console log up to 10 seconds
# to catch up and exit. If it hasn't done so by then, it is very
# likely stuck in readline() because it spawed a child that is
# holding stdout or stderr open.
t.join(10)
with Console() as console:
if t.isAlive():
console.addLine("[Zuul] standard output/error still open "
"after child exited")
console.addLine("[Zuul] Task exit code: %s\n" % ret)
return ret
def main():
module = AnsibleModule(
argument_spec=dict(
command=dict(required=True, default=None),
cwd=dict(required=True, default=None),
parameters=dict(default={}, type='dict')
)
)
p = module.params
env = p['parameters'].copy()
ret = run(p['cwd'], p['command'], env)
if ret == 0:
module.exit_json(changed=True, rc=ret)
else:
module.fail_json(msg="Exit code %s" % ret, rc=ret)
from ansible.module_utils.basic import * # noqa
if __name__ == '__main__':
main()

View File

@ -1,52 +0,0 @@
# Copyright 2016 IBM Corp.
#
# This file is part of Zuul
#
# This file is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This file is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this file. If not, see <http://www.gnu.org/licenses/>.
import time
from ansible.executor.task_result import TaskResult
from ansible.plugins.callback import CallbackBase
class CallbackModule(CallbackBase):
def __init__(self, *args, **kw):
super(CallbackModule, self).__init__(*args, **kw)
self._elapsed_time = 0.0
self._task_start_time = None
self._play = None
def v2_playbook_on_play_start(self, play):
self._play = play
def playbook_on_task_start(self, name, is_conditional):
self._task_start_time = time.time()
def v2_on_any(self, *args, **kw):
result = None
if args and isinstance(args[0], TaskResult):
result = args[0]
if not result:
return
if self._task_start_time is not None:
task_time = time.time() - self._task_start_time
self._elapsed_time += task_time
if self._play and result._host:
manager = self._play.get_variable_manager()
facts = dict(elapsed_time=int(self._elapsed_time))
manager.set_nonpersistent_facts(result._host, facts)
self._task_start_time = None

View File

@ -124,6 +124,7 @@ class JobParser(object):
job.voting = conf.get('voting', True)
job.hold_following_changes = conf.get('hold-following-changes', False)
job.mutex = conf.get('mutex', None)
job.attempts = conf.get('attempts', 3)
if 'nodes' in conf:
conf_nodes = conf['nodes']
if isinstance(conf_nodes, six.string_types):

View File

@ -24,8 +24,8 @@ import tempfile
import threading
import time
import traceback
import Queue
import uuid
import Queue
import gear
import yaml
@ -34,11 +34,11 @@ import jenkins_jobs.formatter
import zmq
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
ANSIBLE_WATCHDOG_GRACE = 5 * 60
ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
ANSIBLE_DEFAULT_PRE_TIMEOUT = 10 * 60
ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
@ -113,9 +113,13 @@ class JobDir(object):
os.makedirs(self.ansible_root)
self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
self.inventory = os.path.join(self.ansible_root, 'inventory')
self.vars = os.path.join(self.ansible_root, 'vars.yaml')
self.pre_playbook = os.path.join(self.ansible_root, 'pre_playbook')
self.playbook = os.path.join(self.ansible_root, 'playbook')
self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
self.config = os.path.join(self.ansible_root, 'ansible.cfg')
self.pre_post_config = os.path.join(self.ansible_root,
'ansible_pre_post.cfg')
self.script_root = os.path.join(self.ansible_root, 'scripts')
self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
os.makedirs(self.script_root)
@ -176,22 +180,27 @@ class LaunchServer(object):
path = os.path.join(state_dir, 'launcher.socket')
self.command_socket = commandsocket.CommandSocket(path)
ansible_dir = os.path.join(state_dir, 'ansible')
plugins_dir = os.path.join(ansible_dir, 'plugins')
self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
if not os.path.exists(self.callback_dir):
os.makedirs(self.callback_dir)
self.library_dir = os.path.join(ansible_dir, 'library')
if not os.path.exists(self.library_dir):
os.makedirs(self.library_dir)
callback_path = os.path.dirname(os.path.abspath(
zuul.ansible.plugins.callback_plugins.__file__))
for fn in os.listdir(callback_path):
shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
self.pre_post_library_dir = os.path.join(ansible_dir,
'pre_post_library')
if not os.path.exists(self.pre_post_library_dir):
os.makedirs(self.pre_post_library_dir)
library_path = os.path.dirname(os.path.abspath(
zuul.ansible.library.__file__))
for fn in os.listdir(library_path):
# Ansible library modules that should be available to all
# playbooks:
all_libs = ['zuul_log.py', 'zuul_console.py', 'zuul_afs.py']
# Modules that should only be used by job playbooks:
job_libs = ['command.py']
for fn in all_libs:
shutil.copy(os.path.join(library_path, fn), self.library_dir)
shutil.copy(os.path.join(library_path, fn),
self.pre_post_library_dir)
for fn in job_libs:
shutil.copy(os.path.join(library_path, fn), self.library_dir)
def get_config_default(section, option, default):
@ -476,7 +485,7 @@ class LaunchServer(object):
args['description'], args['labels'],
self.hostname, self.zmq_send_queue,
self.termination_queue, self.keep_jobdir,
self.callback_dir, self.library_dir,
self.library_dir, self.pre_post_library_dir,
self.options)
self.node_workers[worker.name] = worker
@ -557,8 +566,8 @@ class NodeWorker(object):
def __init__(self, config, jobs, builds, sites, name, host,
description, labels, manager_name, zmq_send_queue,
termination_queue, keep_jobdir, callback_dir,
library_dir, options):
termination_queue, keep_jobdir, library_dir,
pre_post_library_dir, options):
self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
self.log.debug("Creating node worker %s" % (name,))
self.config = config
@ -593,6 +602,7 @@ class NodeWorker(object):
self._aborted_job = False
self._watchdog_timeout = False
self._sent_complete_event = False
self.ansible_pre_proc = None
self.ansible_job_proc = None
self.ansible_post_proc = None
self.workspace_root = config.get('launcher', 'workspace_root')
@ -604,8 +614,8 @@ class NodeWorker(object):
self.username = config.get('launcher', 'username')
else:
self.username = 'zuul'
self.callback_dir = callback_dir
self.library_dir = library_dir
self.pre_post_library_dir = pre_post_library_dir
self.options = options
def isAlive(self):
@ -872,6 +882,7 @@ class NodeWorker(object):
'SUCCESS', {})
def runJob(self, job, args):
self.ansible_pre_proc = None
self.ansible_job_proc = None
self.ansible_post_proc = None
result = None
@ -900,6 +911,12 @@ class NodeWorker(object):
job.sendWorkData(json.dumps(data))
job.sendWorkStatus(0, 100)
pre_status = self.runAnsiblePrePlaybook(jobdir)
if pre_status is None:
# These should really never fail, so return None and have
# zuul try again
return result
job_status = self.runAnsiblePlaybook(jobdir, timeout)
if job_status is None:
# The result of the job is indeterminate. Zuul will
@ -965,7 +982,8 @@ class NodeWorker(object):
# upload. This uploads the playbook and ansible logs.
copyargs = dict(src=jobdir.ansible_root + '/',
dest=os.path.join(scproot, '_zuul_ansible'))
task = dict(copy=copyargs,
task = dict(name='copy console log',
copy=copyargs,
delegate_to='127.0.0.1')
# This is a local copy and should not fail, so does
# not need a retry stanza.
@ -987,10 +1005,15 @@ class NodeWorker(object):
mode='pull')
if rsync_opts:
syncargs['rsync_opts'] = rsync_opts
task = dict(synchronize=syncargs)
task = dict(name='copy files from node',
synchronize=syncargs)
if not scpfile.get('copy-after-failure'):
task['when'] = 'success'
task.update(self.retry_args)
task['when'] = 'success|bool'
# We don't use retry_args here because there is a bug in
# the synchronize module that breaks subsequent attempts at
# retrying. Better to try once and get an accurate error
# message if it fails.
# https://github.com/ansible/ansible/issues/18281
tasks.append(task)
task = self._makeSCPTaskLocalAction(
@ -1030,10 +1053,11 @@ class NodeWorker(object):
private_key_file=self.private_key_file,
host=site['host'],
user=site['user'])
task = dict(shell=shellargs,
task = dict(name='rsync logs to server',
shell=shellargs,
delegate_to='127.0.0.1')
if not scpfile.get('copy-after-failure'):
task['when'] = 'success'
task['when'] = 'success|bool'
return task
@ -1061,12 +1085,17 @@ class NodeWorker(object):
mode='pull')
if rsync_opts:
syncargs['rsync_opts'] = rsync_opts
task = dict(synchronize=syncargs,
when='success')
task.update(self.retry_args)
task = dict(name='copy files from node',
synchronize=syncargs,
when='success|bool')
# We don't use retry_args here because there is a bug in the
# synchronize module that breaks subsequent attempts at retrying.
# Better to try once and get an accurate error message if it fails.
# https://github.com/ansible/ansible/issues/18281
tasks.append(task)
task = dict(shell='lftp -f %s' % ftpscript,
when='success',
task = dict(name='FTP files to server',
shell='lftp -f %s' % ftpscript,
when='success|bool',
delegate_to='127.0.0.1')
ftpsource = ftpcontent
if ftp.get('remove-prefix'):
@ -1098,17 +1127,13 @@ class NodeWorker(object):
raise Exception("Undefined AFS site: %s" % site)
site = self.sites[site]
# It is possible that this could be done in one rsync step,
# however, the current rysnc from the host is complicated (so
# that we can match the behavior of ant), and then rsync to
# afs is complicated and involves a pre-processing step in
# both locations (so that we can exclude directories). Each
# is well understood individually so it is easier to compose
# them in series than combine them together. A better,
# longer-lived solution (with better testing) would do just
# that.
afsroot = tempfile.mkdtemp(dir=jobdir.staging_root)
afscontent = os.path.join(afsroot, 'content')
afssource = afscontent
if afs.get('remove-prefix'):
afssource = os.path.join(afscontent, afs['remove-prefix'])
while afssource[-1] == '/':
afssource = afssource[:-1]
src = parameters['WORKSPACE']
if not src.endswith('/'):
@ -1121,12 +1146,16 @@ class NodeWorker(object):
mode='pull')
if rsync_opts:
syncargs['rsync_opts'] = rsync_opts
task = dict(synchronize=syncargs,
when='success')
task.update(self.retry_args)
task = dict(name='copy files from node',
synchronize=syncargs,
when='success|bool')
# We don't use retry_args here because there is a bug in the
# synchronize module that breaks subsequent attempts at retrying.
# Better to try once and get an accurate error message if it fails.
# https://github.com/ansible/ansible/issues/18281
tasks.append(task)
afstarget = afs['target']
afstarget = afs['target'].lstrip('/')
afstarget = self._substituteVariables(afstarget, parameters)
afstarget = os.path.join(site['root'], afstarget)
afstarget = os.path.normpath(afstarget)
@ -1134,125 +1163,23 @@ class NodeWorker(object):
raise Exception("Target path %s is not below site root" %
(afstarget,))
src_markers_file = os.path.join(afsroot, 'src-markers')
dst_markers_file = os.path.join(afsroot, 'dst-markers')
exclude_file = os.path.join(afsroot, 'exclude')
filter_file = os.path.join(afsroot, 'filter')
afsargs = dict(user=site['user'],
keytab=site['keytab'],
root=afsroot,
source=afssource,
target=afstarget)
find_pipe = [
"/usr/bin/find {path} -name .root-marker -printf '%P\n'",
"/usr/bin/xargs -I{{}} dirname {{}}",
"/usr/bin/sort > {file}"]
find_pipe = ' | '.join(find_pipe)
# Find the list of root markers in the just-completed build
# (usually there will only be one, but some builds produce
# content at the root *and* at a tag location).
task = dict(shell=find_pipe.format(path=afscontent,
file=src_markers_file),
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# Find the list of root markers that already exist in the
# published site.
task = dict(shell=find_pipe.format(path=afstarget,
file=dst_markers_file),
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# Create a file that contains the set of directories with root
# markers in the published site that do not have root markers
# in the built site.
exclude_command = "/usr/bin/comm -23 {dst} {src} > {exclude}".format(
src=src_markers_file,
dst=dst_markers_file,
exclude=exclude_file)
task = dict(shell=exclude_command,
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# Create a filter list for rsync so that we copy exactly the
# directories we want to without deleting any existing
# directories in the published site that were placed there by
# previous builds.
# The first group of items in the filter list are the
# directories in the current build with root markers, except
# for the root of the build. This is so that if, later, the
# build root ends up as an exclude, we still copy the
# directories in this build underneath it (since these
# includes will have matched first). We can't include the
# build root itself here, even if we do want to synchronize
# it, since that would defeat later excludes. In other words,
# if the build produces a root marker in "/subdir" but not in
# "/", this section is needed so that "/subdir" is copied at
# all, since "/" will be excluded later.
command = ("/bin/grep -v '^/$' {src} | "
"/bin/sed -e 's/^+ /' > {filter}".format(
src=src_markers_file,
filter=filter_file))
task = dict(shell=command,
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# The second group is the set of directories that are in the
# published site but not in the built site. This is so that
# if the built site does contain a marker at root (meaning
# that there is content that should be copied into the root)
# that we don't delete everything else previously built
# underneath the root.
command = ("/bin/grep -v '^/$' {exclude} | "
"/bin/sed -e 's/^- /' >> {filter}".format(
exclude=exclude_file,
filter=filter_file))
task = dict(shell=command,
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# The last entry in the filter file is for the build root. If
# there is no marker in the build root, then we need to
# exclude it from the rsync, so we add it here. It needs to
# be in the form of '/*' so that it matches all of the files
# in the build root. If there is no marker at the build root,
# then we should omit the '/*' exclusion so that it is
# implicitly included.
command = "grep '^/$' {exclude} && echo '- /*' >> {filter}".format(
exclude=exclude_file,
filter=filter_file)
task = dict(shell=command,
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# Perform the rsync with the filter list.
rsync_cmd = [
'/usr/bin/k5start', '-t', '-k', '{keytab}', '--',
'/usr/bin/rsync', '-rtp', '--safe-links', '--delete-after',
"--filter='merge {filter}'", '{src}/', '{dst}/',
]
shellargs = ' '.join(rsync_cmd).format(
src=afscontent,
dst=afstarget,
filter=filter_file,
keytab=site['keytab'])
task = dict(shell=shellargs,
when='success',
task = dict(name='Synchronize files to AFS',
zuul_afs=afsargs,
when='success|bool',
delegate_to='127.0.0.1')
tasks.append(task)
return tasks
def _makeBuilderTask(self, jobdir, builder, parameters):
def _makeBuilderTask(self, jobdir, builder, parameters, sequence):
tasks = []
script_fn = '%s.sh' % str(uuid.uuid4().hex)
script_fn = '%02d-%s.sh' % (sequence, str(uuid.uuid4().hex))
script_path = os.path.join(jobdir.script_root, script_fn)
with open(script_path, 'w') as script:
data = builder['shell']
@ -1267,15 +1194,10 @@ class NodeWorker(object):
task = dict(copy=copy)
tasks.append(task)
runner = dict(command=remote_path,
cwd=parameters['WORKSPACE'],
parameters=parameters)
task = dict(zuul_runner=runner)
task['name'] = ('zuul_runner with {{ timeout | int - elapsed_time }} '
'second timeout')
task['when'] = '{{ elapsed_time < timeout | int }}'
task['async'] = '{{ timeout | int - elapsed_time }}'
task['poll'] = 5
task = dict(command=remote_path)
task['name'] = 'command generated from JJB'
task['environment'] = "{{ zuul.environment }}"
task['args'] = dict(chdir=parameters['WORKSPACE'])
tasks.append(task)
filetask = dict(path=remote_path,
@ -1342,53 +1264,56 @@ class NodeWorker(object):
if timeout_var:
parameters[timeout_var] = str(timeout * 1000)
with open(jobdir.playbook, 'w') as playbook:
pre_tasks = []
tasks = []
main_block = []
error_block = []
variables = []
with open(jobdir.vars, 'w') as vars_yaml:
variables = dict(
timeout=timeout,
environment=parameters,
)
zuul_vars = dict(zuul=variables)
vars_yaml.write(
yaml.safe_dump(zuul_vars, default_flow_style=False))
with open(jobdir.pre_playbook, 'w') as pre_playbook:
shellargs = "ssh-keyscan {{ ansible_host }} > %s" % (
jobdir.known_hosts)
pre_tasks.append(dict(shell=shellargs,
delegate_to='127.0.0.1'))
tasks.append(dict(block=main_block,
rescue=error_block))
tasks = []
tasks.append(dict(shell=shellargs, delegate_to='127.0.0.1'))
task = dict(file=dict(path='/tmp/console.html', state='absent'))
main_block.append(task)
tasks.append(task)
task = dict(zuul_console=dict(path='/tmp/console.html',
port=19885))
main_block.append(task)
tasks.append(task)
task = dict(file=dict(path=parameters['WORKSPACE'],
state='directory'))
main_block.append(task)
tasks.append(task)
msg = [
"Launched by %s" % self.manager_name,
"Building remotely on %s in workspace %s" % (
self.name, parameters['WORKSPACE'])]
task = dict(zuul_log=dict(msg=msg))
main_block.append(task)
tasks.append(task)
play = dict(hosts='node', name='Job setup', tasks=tasks)
pre_playbook.write(
yaml.safe_dump([play], default_flow_style=False))
with open(jobdir.playbook, 'w') as playbook:
tasks = []
sequence = 0
for builder in jjb_job.get('builders', []):
if 'shell' in builder:
main_block.extend(
self._makeBuilderTask(jobdir, builder, parameters))
task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"))
main_block.append(task)
sequence += 1
tasks.extend(
self._makeBuilderTask(jobdir, builder, parameters,
sequence))
task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
error_block.append(task)
error_block.append(dict(fail=dict(msg='FAILURE')))
variables.append(dict(timeout=timeout))
play = dict(hosts='node', name='Job body', vars=variables,
pre_tasks=pre_tasks, tasks=tasks)
play = dict(hosts='node', name='Job body', tasks=tasks)
playbook.write(yaml.safe_dump([play], default_flow_style=False))
early_publishers, late_publishers = self._transformPublishers(jjb_job)
@ -1414,6 +1339,14 @@ class NodeWorker(object):
# we run the log publisher regardless of whether the rest
# of the publishers succeed.
tasks = []
task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"),
when='success|bool')
blocks[0].insert(0, task)
task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"),
when='not success|bool')
blocks[0].insert(0, task)
tasks.append(dict(block=blocks[0],
always=blocks[1]))
@ -1421,46 +1354,104 @@ class NodeWorker(object):
tasks=tasks)
playbook.write(yaml.safe_dump([play], default_flow_style=False))
with open(jobdir.config, 'w') as config:
self._writeAnsibleConfig(jobdir, jobdir.config,
library=self.library_dir)
self._writeAnsibleConfig(jobdir, jobdir.pre_post_config,
library=self.pre_post_library_dir)
return timeout
def _writeAnsibleConfig(self, jobdir, fn, library):
with open(fn, 'w') as config:
config.write('[defaults]\n')
config.write('hostfile = %s\n' % jobdir.inventory)
config.write('keep_remote_files = True\n')
config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
config.write('private_key_file = %s\n' % self.private_key_file)
config.write('retry_files_enabled = False\n')
config.write('log_path = %s\n' % jobdir.ansible_log)
config.write('gathering = explicit\n')
config.write('callback_plugins = %s\n' % self.callback_dir)
config.write('library = %s\n' % self.library_dir)
config.write('library = %s\n' % library)
# TODO(mordred) This can be removed once we're using ansible 2.2
config.write('module_set_locale = False\n')
# bump the timeout because busy nodes may take more than
# 10s to respond
config.write('timeout = 30\n')
config.write('[ssh_connection]\n')
# NB: when setting pipelining = True, keep_remote_files
# must be False (the default). Otherwise it apparently
# will override the pipelining option and effectively
# disable it. Pipelining has a side effect of running the
# command without a tty (ie, without the -tt argument to
# ssh). We require this behavior so that if a job runs a
# command which expects interactive input on a tty (such
# as sudo) it does not hang.
config.write('pipelining = True\n')
ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
"-o UserKnownHostsFile=%s" % jobdir.known_hosts
config.write('ssh_args = %s\n' % ssh_args)
return timeout
def _ansibleTimeout(self, proc, msg):
self._watchdog_timeout = True
self.log.warning(msg)
self.abortRunningProc(proc)
def runAnsiblePlaybook(self, jobdir, timeout):
def runAnsiblePrePlaybook(self, jobdir):
# Set LOGNAME env variable so Ansible log_path log reports
# the correct user.
env_copy = os.environ.copy()
env_copy['LOGNAME'] = 'zuul'
env_copy['ANSIBLE_CONFIG'] = jobdir.pre_post_config
if self.options['verbose']:
verbose = '-vvv'
else:
verbose = '-v'
cmd = ['ansible-playbook', jobdir.playbook, verbose]
cmd = ['ansible-playbook', jobdir.pre_playbook,
'-e@%s' % jobdir.vars, verbose]
self.log.debug("Ansible pre command: %s" % (cmd,))
self.ansible_pre_proc = subprocess.Popen(
cmd,
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
env=env_copy,
)
ret = None
watchdog = Watchdog(ANSIBLE_DEFAULT_PRE_TIMEOUT,
self._ansibleTimeout,
(self.ansible_pre_proc,
"Ansible pre timeout exceeded"))
watchdog.start()
try:
for line in iter(self.ansible_pre_proc.stdout.readline, b''):
line = line[:1024].rstrip()
self.log.debug("Ansible pre output: %s" % (line,))
ret = self.ansible_pre_proc.wait()
finally:
watchdog.stop()
self.log.debug("Ansible pre exit code: %s" % (ret,))
self.ansible_pre_proc = None
return ret == 0
def runAnsiblePlaybook(self, jobdir, timeout):
# Set LOGNAME env variable so Ansible log_path log reports
# the correct user.
env_copy = os.environ.copy()
env_copy['LOGNAME'] = 'zuul'
env_copy['ANSIBLE_CONFIG'] = jobdir.config
if self.options['verbose']:
verbose = '-vvv'
else:
verbose = '-v'
cmd = ['ansible-playbook', jobdir.playbook, verbose,
'-e@%s' % jobdir.vars]
self.log.debug("Ansible command: %s" % (cmd,))
self.ansible_job_proc = subprocess.Popen(
@ -1502,6 +1493,7 @@ class NodeWorker(object):
# the correct user.
env_copy = os.environ.copy()
env_copy['LOGNAME'] = 'zuul'
env_copy['ANSIBLE_CONFIG'] = jobdir.pre_post_config
if self.options['verbose']:
verbose = '-vvv'
@ -1509,7 +1501,9 @@ class NodeWorker(object):
verbose = '-v'
cmd = ['ansible-playbook', jobdir.post_playbook,
'-e', 'success=%s' % success, verbose]
'-e', 'success=%s' % success,
'-e@%s' % jobdir.vars,
verbose]
self.log.debug("Ansible post command: %s" % (cmd,))
self.ansible_post_proc = subprocess.Popen(

View File

@ -399,6 +399,12 @@ class LaunchClient(object):
build.__gearman_manager = None
self.builds[uuid] = build
# NOTE(pabelanger): Rather then looping forever, check to see if job
# has passed attempts limit.
if item.current_build_set.getTries(job.name) > job.attempts:
self.onBuildCompleted(gearman_job, 'RETRY_LIMIT')
return build
if pipeline.precedence == zuul.model.PRECEDENCE_NORMAL:
precedence = gear.PRECEDENCE_NORMAL
elif pipeline.precedence == zuul.model.PRECEDENCE_HIGH:

View File

@ -30,7 +30,6 @@ import yaml
import zuul.merger
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
ANSIBLE_WATCHDOG_GRACE = 5 * 60
@ -197,19 +196,10 @@ class LaunchServer(object):
path = os.path.join(state_dir, 'launcher.socket')
self.command_socket = commandsocket.CommandSocket(path)
ansible_dir = os.path.join(state_dir, 'ansible')
plugins_dir = os.path.join(ansible_dir, 'plugins')
self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
if not os.path.exists(self.callback_dir):
os.makedirs(self.callback_dir)
self.library_dir = os.path.join(ansible_dir, 'library')
if not os.path.exists(self.library_dir):
os.makedirs(self.library_dir)
callback_path = os.path.dirname(os.path.abspath(
zuul.ansible.plugins.callback_plugins.__file__))
for fn in os.listdir(callback_path):
shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
library_path = os.path.dirname(os.path.abspath(
zuul.ansible.library.__file__))
for fn in os.listdir(library_path):
@ -419,20 +409,27 @@ class LaunchServer(object):
with open(jobdir.config, 'w') as config:
config.write('[defaults]\n')
config.write('hostfile = %s\n' % jobdir.inventory)
config.write('keep_remote_files = True\n')
config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
config.write('private_key_file = %s\n' % self.private_key_file)
config.write('retry_files_enabled = False\n')
config.write('log_path = %s\n' % jobdir.ansible_log)
config.write('gathering = explicit\n')
config.write('callback_plugins = %s\n' % self.callback_dir)
config.write('library = %s\n' % self.library_dir)
# bump the timeout because busy nodes may take more than
# 10s to respond
config.write('timeout = 30\n')
config.write('[ssh_connection]\n')
# NB: when setting pipelining = True, keep_remote_files
# must be False (the default). Otherwise it apparently
# will override the pipelining option and effectively
# disable it. Pipelining has a side effect of running the
# command without a tty (ie, without the -tt argument to
# ssh). We require this behavior so that if a job runs a
# command which expects interactive input on a tty (such
# as sudo) it does not hang.
config.write('pipelining = True\n')
ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
"-o UserKnownHostsFile=%s" % jobdir.known_hosts
config.write('ssh_args = %s\n' % ssh_args)

View File

@ -127,6 +127,7 @@ class LayoutSchema(object):
'success-pattern': str,
'hold-following-changes': bool,
'voting': bool,
'attempts': int,
'mutex': str,
'tags': toList(str),
'parameter-function': str,

View File

@ -427,6 +427,7 @@ class Job(object):
parameter_function=None, # TODOv3(jeblair): remove
tags=set(),
mutex=None,
attempts=3,
)
def __init__(self, name):
@ -640,6 +641,7 @@ class BuildSet(object):
self.node_requests = {} # job -> reqs
self.files = RepoFiles()
self.layout = None
self.tries = {}
def __repr__(self):
return '<BuildSet item: %s #builds: %s merge state: %s>' % (
@ -665,9 +667,12 @@ class BuildSet(object):
def addBuild(self, build):
self.builds[build.job.name] = build
if build.job.name not in self.tries:
self.tries[build.job.name] = 1
build.build_set = self
def removeBuild(self, build):
self.tries[build.job.name] += 1
del self.builds[build.job.name]
def getBuild(self, job_name):
@ -697,6 +702,9 @@ class BuildSet(object):
self.nodesets[job_name] = nodeset
del self.node_requests[job_name]
def getTries(self, job_name):
return self.tries.get(job_name)
class QueueItem(object):
"""Represents the position of a Change in a ChangeQueue.