Ansible launcher: add zuul_runner module

This runs the commands asynchronously (but waits for their
completion).  This is more robust for long-running commands
because it avoids the built-in ssh timeout.

This adds an ansible module to actually run the remote command
so that we can:
 * process the console log
 * use ansible async (the script module does not support it)
 * control the environment variables of the script being run

It also adds a callback plugin to track the elapsed time so that
we can use the built-in timeout features of async commands.

Note that the module and plugin are GPL licensed.

Change-Id: I19b2b6a5c362bb9d843e7802aefe0eb5df9c5ed7
This commit is contained in:
James E. Blair 2016-04-21 17:28:58 -07:00 committed by James E. Blair
parent 19233fbff5
commit 47ef69f941
7 changed files with 211 additions and 42 deletions

0
zuul/ansible/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,74 @@
#!/usr/bin/python
# Copyright (c) 2016 IBM Corp.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import datetime
import subprocess
class Console(object):
def __enter__(self):
self.logfile = open('/tmp/console.log', 'w+')
return self
def __exit__(self, etype, value, tb):
self.logfile.close()
def addLine(self, ln):
ts = datetime.datetime.now()
outln = '%s %s' % (str(ts), ln)
self.logfile.write(outln)
def run(cwd, cmd, args):
proc = subprocess.Popen(
[cmd],
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=args,
)
with Console() as console:
while True:
line = proc.stdout.readline()
if not line:
break
console.addLine(line)
ret = proc.wait()
return ret
def main():
module = AnsibleModule(
argument_spec=dict(
command=dict(required=True, default=None),
cwd=dict(required=True, default=None),
parameters=dict(default={}, type='dict')
)
)
p = module.params
ret = run(p['cwd'], p['command'], p['parameters'])
if ret == 0:
module.exit_json(changed=True, rc=ret)
else:
module.fail_json(msg="Exit code %s" % ret, rc=ret)
from ansible.module_utils.basic import * # noqa
main()

View File

View File

@ -0,0 +1,57 @@
# Copyright 2016 IBM Corp.
#
# This file is part of Zuul
#
# This file is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This file is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this file. If not, see <http://www.gnu.org/licenses/>.
import time
from ansible.executor.task_result import TaskResult
from ansible.plugins.callback import CallbackBase
class CallbackModule(CallbackBase):
def __init__(self, *args, **kw):
super(CallbackModule, self).__init__(*args, **kw)
self._elapsed_time = 0.0
self._task_start_time = None
self._play = None
def v2_playbook_on_play_start(self, play):
self._play = play
def playbook_on_task_start(self, name, is_conditional):
self._task_start_time = time.time()
def v2_on_any(self, *args, **kw):
result = None
if args and isinstance(args[0], TaskResult):
result = args[0]
if not result:
return
if self._task_start_time is not None:
task_time = time.time() - self._task_start_time
self._elapsed_time += task_time
if self._play and result._host:
manager = self._play.get_variable_manager()
facts = dict(elapsed_time=self._elapsed_time)
overall_timeout = manager.extra_vars.get('timeout')
if overall_timeout is not None:
timeout = int(overall_timeout) - int(self._elapsed_time)
facts['timeout'] = timeout
manager.set_nonpersistent_facts(result._host, facts)
self._task_start_time = None

View File

@ -16,7 +16,6 @@ import json
import logging
import multiprocessing
import os
import Queue
import re
import shutil
import signal
@ -24,7 +23,6 @@ import socket
import subprocess
import tempfile
import threading
import time
import traceback
import uuid
@ -33,6 +31,9 @@ import yaml
import jenkins_jobs.builder
import zmq
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
class JobDir(object):
def __init__(self):
@ -41,6 +42,8 @@ class JobDir(object):
os.makedirs(self.git_root)
self.ansible_root = os.path.join(self.root, 'ansible')
os.makedirs(self.ansible_root)
self.plugins_root = os.path.join(self.ansible_root, 'plugins')
os.makedirs(self.plugins_root)
self.inventory = os.path.join(self.ansible_root, 'inventory')
self.playbook = os.path.join(self.ansible_root, 'playbook')
self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
@ -293,8 +296,7 @@ class NodeWorker(object):
self._job_complete_event = threading.Event()
self._running_job = False
self._sent_complete_event = False
self._job_timeout = None
self._job_start_time = None
self.workspace_root = config.get('launcher', 'workspace_root')
def isAlive(self):
# Meant to be called from the manager
@ -336,23 +338,7 @@ class NodeWorker(object):
self.queue.join()
def _runQueue(self):
# This also runs the timeout function if needed
try:
item = self.queue.get(True, 10) # 10 second resolution on timeout
except Queue.Empty:
# We don't need these in a critical section, but we do
# need them not to change while we evaluate them, so make
# local copies.
running = self._running_job
start = self._job_start_time
timeout = self._job_timeout
now = time.time()
if (running and timeout and start
and now - start >= timeout):
self.log.info("Job timed out after %s seconds" %
(now - start,))
self.abortRunningJob()
return
item = self.queue.get()
try:
if item['action'] == 'stop':
self.log.debug("Received stop request")
@ -466,13 +452,11 @@ class NodeWorker(object):
self.log.exception("Exception while sending job start event")
try:
result = self.runJob(job)
result = self.runJob(job, args)
except Exception:
self.log.exception("Exception while launching job thread")
self._running_job = False
self._job_timeout = None
self._job_start_time = None
if not result:
result = b''
@ -527,7 +511,7 @@ class NodeWorker(object):
self.sendCompleteEvent('zuul:launcher-shutdown',
'SUCCESS', {})
def runJob(self, job):
def runJob(self, job, args):
self.ansible_proc = None
result = None
with self.running_job_lock:
@ -541,10 +525,7 @@ class NodeWorker(object):
with JobDir() as jobdir:
self.log.debug("Job %s: job root at %s" %
(job.unique, jobdir.root))
self.prepareAnsibleFiles(jobdir, job)
self._job_start_time = time.time()
timeout = self.prepareAnsibleFiles(jobdir, job, args)
data = {
'manager': self.manager_name,
@ -554,7 +535,7 @@ class NodeWorker(object):
job.sendWorkData(json.dumps(data))
job.sendWorkStatus(0, 100)
job_status = self.runAnsiblePlaybook(jobdir)
job_status = self.runAnsiblePlaybook(jobdir, timeout)
post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
if job_status and post_status:
status = 'SUCCESS'
@ -621,7 +602,46 @@ class NodeWorker(object):
tasks.append(task)
return tasks
def prepareAnsibleFiles(self, jobdir, gearman_job):
def _makeBuilderTask(self, jobdir, builder, parameters, timeout):
tasks = []
script_fn = '%s.sh' % str(uuid.uuid4().hex)
script_path = os.path.join(jobdir.script_root, script_fn)
with open(script_path, 'w') as script:
script.write(builder['shell'])
remote_path = os.path.join('/tmp', script_fn)
copy = dict(src=script_path,
dest=remote_path,
mode=0555)
task = dict(copy=copy)
tasks.append(task)
runner = dict(command=remote_path,
cwd=parameters['WORKSPACE'],
parameters=parameters)
task = dict(zuul_runner=runner)
if timeout:
task['when'] = '{{ timeout | int > 0 }}'
task['async'] = '{{ timeout }}'
else:
task['async'] = 2 * 60 * 60 # 2 hour default timeout
task['poll'] = 5
tasks.append(task)
filetask = dict(path=remote_path,
state='absent')
task = dict(file=filetask)
tasks.append(task)
return tasks
def prepareAnsibleFiles(self, jobdir, gearman_job, args):
job_name = gearman_job.name.split(':')[1]
jjb_job = self.jobs[job_name]
parameters = args.copy()
parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
with open(jobdir.inventory, 'w') as inventory:
for host_name, host_vars in self.getHostList():
inventory.write(host_name)
@ -629,27 +649,30 @@ class NodeWorker(object):
for k, v in host_vars.items():
inventory.write('%s=%s' % (k, v))
inventory.write('\n')
job_name = gearman_job.name.split(':')[1]
jjb_job = self.jobs[job_name]
timeout = None
for wrapper in jjb_job.get('wrappers', []):
if isinstance(wrapper, dict):
timeout = wrapper.get('build-timeout', {})
if isinstance(timeout, dict):
timeout = timeout.get('timeout')
if timeout:
self._job_timeout = timeout * 60
timeout = timeout * 60
with open(jobdir.playbook, 'w') as playbook:
tasks = []
task = dict(file=dict(path='/tmp/console.log', state='absent'))
tasks.append(task)
task = dict(file=dict(path=parameters['WORKSPACE'],
state='directory'))
tasks.append(task)
for builder in jjb_job.get('builders', []):
if 'shell' in builder:
script_fn = '%s.sh' % str(uuid.uuid4().hex)
script_fn = os.path.join(jobdir.script_root, script_fn)
with open(script_fn, 'w') as script:
script.write(builder['shell'])
tasks.append(dict(script='%s >> /tmp/console.log 2>&1' %
script_fn))
tasks.extend(self._makeBuilderTask(jobdir, builder,
parameters, timeout))
play = dict(hosts='node', name='Job body',
tasks=tasks)
playbook.write(yaml.dump([play]))
@ -670,15 +693,30 @@ class NodeWorker(object):
config.write('hostfile = %s\n' % jobdir.inventory)
config.write('host_key_checking = False\n')
def runAnsiblePlaybook(self, jobdir):
callback_path = zuul.ansible.plugins.callback_plugins.__file__
callback_path = os.path.abspath(callback_path)
callback_path = os.path.dirname(callback_path)
config.write('callback_plugins = %s\n' % callback_path)
library_path = zuul.ansible.library.__file__
library_path = os.path.abspath(library_path)
library_path = os.path.dirname(library_path)
config.write('library = %s\n' % library_path)
return timeout
def runAnsiblePlaybook(self, jobdir, timeout):
self.ansible_proc = subprocess.Popen(
['ansible-playbook', jobdir.playbook],
['ansible-playbook', jobdir.playbook,
'-e', 'timeout=%s' % timeout, '-v'],
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
)
(out, err) = self.ansible_proc.communicate()
self.log.debug("Ansible stdout:\n%s" % out)
self.log.debug("Ansible stderr:\n%s" % err)
ret = self.ansible_proc.wait()
self.ansible_proc = None
return ret == 0