Merge "Merge branch 'master' into feature/zuulv3" into feature/zuulv3

This commit is contained in:
Jenkins 2016-10-06 20:40:08 +00:00 committed by Gerrit Code Review
commit 31285ef6c1
5 changed files with 255 additions and 48 deletions

View File

@ -4473,6 +4473,26 @@ For CI problems and help debugging, contact ci@example.org"""
self.waitUntilSettled()
self.assertEqual(self.history[-1].changes, '3,2 2,1 1,2')
@skip("Disabled for early v3 development")
def test_crd_check_unknown(self):
"Test unknown projects in independent pipeline"
self.init_repo("org/unknown")
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/unknown', 'master', 'D')
# A Depends-On: B
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
A.subject, B.data['id'])
# Make sure zuul has seen an event on B.
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(A.reported, 1)
self.assertEqual(B.data['status'], 'NEW')
self.assertEqual(B.reported, 0)
@skip("Disabled for early v3 development")
def test_crd_cycle_join(self):
"Test an updated change creates a cycle"

View File

@ -60,28 +60,13 @@ class Console(object):
class Server(object):
def __init__(self, path, port):
self.path = path
s = None
for res in socket.getaddrinfo(None, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0,
socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
s.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
except socket.error:
s = None
continue
try:
s.bind(sa)
s.listen(1)
except socket.error:
s.close()
s = None
continue
break
if s is None:
sys.exit(1)
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
s.bind(('::', port))
s.listen(1)
self.socket = s
def accept(self):
@ -170,7 +155,7 @@ class Server(object):
def test():
s = Server('/tmp/console.html', 8088)
s = Server('/tmp/console.html', 19885)
s.run()
@ -178,7 +163,7 @@ def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(default='/tmp/console.html'),
port=dict(default=8088, type='int'),
port=dict(default=19885, type='int'),
)
)

View File

@ -68,11 +68,16 @@ class NodeGearWorker(gear.Worker):
MASS_DO = 101
def sendMassDo(self, functions):
data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions])
names = [gear.convert_to_bytes(x) for x in functions]
data = b'\x00'.join(names)
new_function_dict = {}
for name in names:
new_function_dict[name] = gear.FunctionRecord(name)
self.broadcast_lock.acquire()
try:
p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
self.broadcast(p)
self.functions = new_function_dict
finally:
self.broadcast_lock.release()
@ -189,21 +194,21 @@ class LaunchServer(object):
for fn in os.listdir(library_path):
shutil.copy(os.path.join(library_path, fn), self.library_dir)
def get_config_default(section, option, default):
if config.has_option(section, option):
return config.get(section, option)
return default
for section in config.sections():
m = self.site_section_re.match(section)
if m:
sitename = m.group(1)
d = {}
d['host'] = config.get(section, 'host')
d['user'] = config.get(section, 'user')
if config.has_option(section, 'pass'):
d['pass'] = config.get(section, 'pass')
else:
d['pass'] = ''
if config.has_option(section, 'root'):
d['root'] = config.get(section, 'root')
else:
d['root'] = '/'
d['host'] = get_config_default(section, 'host', None)
d['user'] = get_config_default(section, 'user', '')
d['pass'] = get_config_default(section, 'pass', '')
d['root'] = get_config_default(section, 'root', '/')
d['keytab'] = get_config_default(section, 'keytab', None)
self.sites[sitename] = d
continue
m = self.node_section_re.match(section)
@ -212,10 +217,8 @@ class LaunchServer(object):
d = {}
d['name'] = nodename
d['host'] = config.get(section, 'host')
if config.has_option(section, 'description'):
d['description'] = config.get(section, 'description')
else:
d['description'] = ''
d['description'] = get_config_default(section,
'description', '')
if config.has_option(section, 'labels'):
d['labels'] = config.get(section, 'labels').split(',')
else:
@ -547,6 +550,11 @@ class LaunchServer(object):
class NodeWorker(object):
retry_args = dict(register='task_result',
until='task_result.rc == 0',
retries=3,
delay=30)
def __init__(self, config, jobs, builds, sites, name, host,
description, labels, manager_name, zmq_send_queue,
termination_queue, keep_jobdir, callback_dir,
@ -583,6 +591,7 @@ class NodeWorker(object):
self._job_complete_event = threading.Event()
self._running_job = False
self._aborted_job = False
self._watchdog_timeout = False
self._sent_complete_event = False
self.ansible_job_proc = None
self.ansible_post_proc = None
@ -796,6 +805,7 @@ class NodeWorker(object):
result = None
self._sent_complete_event = False
self._aborted_job = False
self._watchog_timeout = False
try:
self.sendStartEvent(job_name, args)
@ -881,8 +891,12 @@ class NodeWorker(object):
data = {
'manager': self.manager_name,
'number': job.unique,
'url': 'telnet://%s:8088' % self.host,
}
if ':' in self.host:
data['url'] = 'telnet://[%s]:19885' % self.host
else:
data['url'] = 'telnet://%s:19885' % self.host
job.sendWorkData(json.dumps(data))
job.sendWorkStatus(0, 100)
@ -900,7 +914,7 @@ class NodeWorker(object):
else:
result = 'FAILURE'
if self._aborted_job:
if self._aborted_job and not self._watchdog_timeout:
# A Null result will cause zuul to relaunch the job if
# it needs to.
result = None
@ -953,6 +967,8 @@ class NodeWorker(object):
dest=os.path.join(scproot, '_zuul_ansible'))
task = dict(copy=copyargs,
delegate_to='127.0.0.1')
# This is a local copy and should not fail, so does
# not need a retry stanza.
tasks.append(task)
# Fetch the console log from the remote host.
@ -974,10 +990,12 @@ class NodeWorker(object):
task = dict(synchronize=syncargs)
if not scpfile.get('copy-after-failure'):
task['when'] = 'success'
task.update(self.retry_args)
tasks.append(task)
task = self._makeSCPTaskLocalAction(
site, scpfile, scproot, parameters)
task.update(self.retry_args)
tasks.append(task)
return tasks
@ -1045,6 +1063,7 @@ class NodeWorker(object):
syncargs['rsync_opts'] = rsync_opts
task = dict(synchronize=syncargs,
when='success')
task.update(self.retry_args)
tasks.append(task)
task = dict(shell='lftp -f %s' % ftpscript,
when='success',
@ -1067,9 +1086,170 @@ class NodeWorker(object):
script.write('open %s\n' % site['host'])
script.write('user %s %s\n' % (site['user'], site['pass']))
script.write('mirror -R %s %s\n' % (ftpsource, ftptarget))
task.update(self.retry_args)
tasks.append(task)
return tasks
def _makeAFSTask(self, jobdir, publisher, parameters):
tasks = []
afs = publisher['afs']
site = afs['site']
if site not in self.sites:
raise Exception("Undefined AFS site: %s" % site)
site = self.sites[site]
# It is possible that this could be done in one rsync step,
# however, the current rysnc from the host is complicated (so
# that we can match the behavior of ant), and then rsync to
# afs is complicated and involves a pre-processing step in
# both locations (so that we can exclude directories). Each
# is well understood individually so it is easier to compose
# them in series than combine them together. A better,
# longer-lived solution (with better testing) would do just
# that.
afsroot = tempfile.mkdtemp(dir=jobdir.staging_root)
afscontent = os.path.join(afsroot, 'content')
src = parameters['WORKSPACE']
if not src.endswith('/'):
src = src + '/'
rsync_opts = self._getRsyncOptions(afs['source'],
parameters)
syncargs = dict(src=src,
dest=afscontent,
copy_links='yes',
mode='pull')
if rsync_opts:
syncargs['rsync_opts'] = rsync_opts
task = dict(synchronize=syncargs,
when='success')
task.update(self.retry_args)
tasks.append(task)
afstarget = afs['target']
afstarget = self._substituteVariables(afstarget, parameters)
afstarget = os.path.join(site['root'], afstarget)
afstarget = os.path.normpath(afstarget)
if not afstarget.startswith(site['root']):
raise Exception("Target path %s is not below site root" %
(afstarget,))
src_markers_file = os.path.join(afsroot, 'src-markers')
dst_markers_file = os.path.join(afsroot, 'dst-markers')
exclude_file = os.path.join(afsroot, 'exclude')
filter_file = os.path.join(afsroot, 'filter')
find_pipe = [
"/usr/bin/find {path} -name .root-marker -printf '%P\n'",
"/usr/bin/xargs -I{{}} dirname {{}}",
"/usr/bin/sort > {file}"]
find_pipe = ' | '.join(find_pipe)
# Find the list of root markers in the just-completed build
# (usually there will only be one, but some builds produce
# content at the root *and* at a tag location).
task = dict(shell=find_pipe.format(path=afscontent,
file=src_markers_file),
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# Find the list of root markers that already exist in the
# published site.
task = dict(shell=find_pipe.format(path=afstarget,
file=dst_markers_file),
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# Create a file that contains the set of directories with root
# markers in the published site that do not have root markers
# in the built site.
exclude_command = "/usr/bin/comm -23 {dst} {src} > {exclude}".format(
src=src_markers_file,
dst=dst_markers_file,
exclude=exclude_file)
task = dict(shell=exclude_command,
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# Create a filter list for rsync so that we copy exactly the
# directories we want to without deleting any existing
# directories in the published site that were placed there by
# previous builds.
# The first group of items in the filter list are the
# directories in the current build with root markers, except
# for the root of the build. This is so that if, later, the
# build root ends up as an exclude, we still copy the
# directories in this build underneath it (since these
# includes will have matched first). We can't include the
# build root itself here, even if we do want to synchronize
# it, since that would defeat later excludes. In other words,
# if the build produces a root marker in "/subdir" but not in
# "/", this section is needed so that "/subdir" is copied at
# all, since "/" will be excluded later.
command = ("/bin/grep -v '^/$' {src} | "
"/bin/sed -e 's/^+ /' > {filter}".format(
src=src_markers_file,
filter=filter_file))
task = dict(shell=command,
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# The second group is the set of directories that are in the
# published site but not in the built site. This is so that
# if the built site does contain a marker at root (meaning
# that there is content that should be copied into the root)
# that we don't delete everything else previously built
# underneath the root.
command = ("/bin/grep -v '^/$' {exclude} | "
"/bin/sed -e 's/^- /' >> {filter}".format(
exclude=exclude_file,
filter=filter_file))
task = dict(shell=command,
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# The last entry in the filter file is for the build root. If
# there is no marker in the build root, then we need to
# exclude it from the rsync, so we add it here. It needs to
# be in the form of '/*' so that it matches all of the files
# in the build root. If there is no marker at the build root,
# then we should omit the '/*' exclusion so that it is
# implicitly included.
command = "grep '^/$' {exclude} && echo '- /*' >> {filter}".format(
exclude=exclude_file,
filter=filter_file)
task = dict(shell=command,
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
# Perform the rsync with the filter list.
rsync_cmd = [
'/usr/bin/k5start', '-t', '-k', '{keytab}', '--',
'/usr/bin/rsync', '-rtp', '--safe-links', '--delete-after',
"--filter='merge {filter}'", '{src}/', '{dst}/',
]
shellargs = ' '.join(rsync_cmd).format(
src=afscontent,
dst=afstarget,
filter=filter_file,
keytab=site['keytab'])
task = dict(shell=shellargs,
when='success',
delegate_to='127.0.0.1')
tasks.append(task)
return tasks
def _makeBuilderTask(self, jobdir, builder, parameters):
tasks = []
script_fn = '%s.sh' % str(uuid.uuid4().hex)
@ -1169,8 +1349,8 @@ class NodeWorker(object):
error_block = []
variables = []
shellargs = "ssh-keyscan %s > %s" % (
self.host, jobdir.known_hosts)
shellargs = "ssh-keyscan {{ ansible_host }} > %s" % (
jobdir.known_hosts)
pre_tasks.append(dict(shell=shellargs,
delegate_to='127.0.0.1'))
@ -1180,7 +1360,8 @@ class NodeWorker(object):
task = dict(file=dict(path='/tmp/console.html', state='absent'))
main_block.append(task)
task = dict(zuul_console=dict(path='/tmp/console.html', port=8088))
task = dict(zuul_console=dict(path='/tmp/console.html',
port=19885))
main_block.append(task)
task = dict(file=dict(path=parameters['WORKSPACE'],
@ -1223,6 +1404,9 @@ class NodeWorker(object):
if 'ftp' in publisher:
block.extend(self._makeFTPTask(jobdir, publisher,
parameters))
if 'afs' in publisher:
block.extend(self._makeAFSTask(jobdir, publisher,
parameters))
blocks.append(block)
# The 'always' section contains the log publishing tasks,
@ -1241,13 +1425,17 @@ class NodeWorker(object):
config.write('[defaults]\n')
config.write('hostfile = %s\n' % jobdir.inventory)
config.write('keep_remote_files = True\n')
config.write('local_tmp = %s/.ansible/tmp\n' % jobdir.root)
config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
config.write('private_key_file = %s\n' % self.private_key_file)
config.write('retry_files_enabled = False\n')
config.write('log_path = %s\n' % jobdir.ansible_log)
config.write('gathering = explicit\n')
config.write('callback_plugins = %s\n' % self.callback_dir)
config.write('library = %s\n' % self.library_dir)
# bump the timeout because busy nodes may take more than
# 10s to respond
config.write('timeout = 30\n')
config.write('[ssh_connection]\n')
ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
@ -1257,6 +1445,7 @@ class NodeWorker(object):
return timeout
def _ansibleTimeout(self, proc, msg):
self._watchdog_timeout = True
self.log.warning(msg)
self.abortRunningProc(proc)
@ -1297,6 +1486,8 @@ class NodeWorker(object):
watchdog.stop()
self.log.debug("Ansible exit code: %s" % (ret,))
self.ansible_job_proc = None
if self._watchdog_timeout:
return False
if ret == 3:
# AnsibleHostUnreachable: We had a network issue connecting to
# our zuul-worker.

View File

@ -91,8 +91,12 @@ class Repo(object):
continue
repo.create_head(ref.remote_head, ref, force=True)
# Reset to remote HEAD (usually origin/master)
repo.head.reference = origin.refs['HEAD']
# try reset to remote HEAD (usually origin/master)
# If it fails, pick the first reference
try:
repo.head.reference = origin.refs['HEAD']
except IndexError:
repo.head.reference = origin.refs[0]
reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
@ -178,7 +182,14 @@ class Repo(object):
repo = self.createRepoObject()
self.log.debug("Updating repository %s" % self.local_path)
origin = repo.remotes.origin
origin.update()
if repo.git.version_info[:2] < (1, 9):
# Before 1.9, 'git fetch --tags' did not include the
# behavior covered by 'git --fetch', so we run both
# commands in that case. Starting with 1.9, 'git fetch
# --tags' is all that is necessary. See
# https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20
origin.fetch()
origin.fetch(tags=True)
def getFiles(self, files, branch=None, commit=None):
ret = {}