Forward-port v2.5 Ansible launcher improvements

This brings forward most of our improvements related to executing
ansible, and also the command socket processor.  It includes
several TODO notes for items which are not straightforward
translations.

Change-Id: Id0c58f7f2e3f78e1edf3d373b65b564568e52f1f
This commit is contained in:
James E. Blair 2016-10-05 13:48:14 -07:00
parent efeeb6df3a
commit 414cb67a79
1 changed files with 196 additions and 12 deletions

View File

@ -17,39 +17,77 @@ import json
import logging
import os
import shutil
import signal
import socket
import subprocess
import tempfile
import threading
import time
import traceback
import gear
import yaml
import zuul.merger
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
ANSIBLE_WATCHDOG_GRACE = 5 * 60
class Watchdog(object):
def __init__(self, timeout, function, args):
self.timeout = timeout
self.function = function
self.args = args
self.thread = threading.Thread(target=self._run)
self.thread.daemon = True
self.timed_out = None
def _run(self):
while self._running and time.time() < self.end:
time.sleep(10)
if self._running:
self.timed_out = True
self.function(*self.args)
self.timed_out = False
def start(self):
self._running = True
self.end = time.time() + self.timeout
self.thread.start()
def stop(self):
self._running = False
# TODOv3(mordred): put git repos in a hierarchy that includes source
# hostname, eg: git.openstack.org/openstack/nova. Also, configure
# sources to have an alias, so that the review.openstack.org source
# repos end up in git.openstack.org.
class JobDir(object):
def __init__(self):
def __init__(self, keep=False):
self.keep = keep
self.root = tempfile.mkdtemp()
self.git_root = os.path.join(self.root, 'git')
os.makedirs(self.git_root)
self.ansible_root = os.path.join(self.root, 'ansible')
os.makedirs(self.ansible_root)
self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
self.inventory = os.path.join(self.ansible_root, 'inventory')
self.playbook = os.path.join(self.ansible_root, 'playbook')
self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
self.config = os.path.join(self.ansible_root, 'ansible.cfg')
self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
def __enter__(self):
return self
def __exit__(self, etype, value, tb):
shutil.rmtree(self.root)
if not self.keep:
shutil.rmtree(self.root)
class UpdateTask(object):
@ -112,12 +150,21 @@ class DeduplicateQueue(object):
class LaunchServer(object):
log = logging.getLogger("zuul.LaunchServer")
def __init__(self, config, connections={}):
def __init__(self, config, connections={}, keep_jobdir=False):
self.config = config
self.keep_jobdir = keep_jobdir
# TODOv3(mordred): make the launcher name more unique --
# perhaps hostname+pid.
self.hostname = socket.gethostname()
self.zuul_url = config.get('merger', 'zuul_url')
self.command_map = dict(
stop=self.stop,
pause=self.pause,
unpause=self.unpause,
graceful=self.graceful,
verbose=self.verboseOn,
unverbose=self.verboseOff,
)
if self.config.has_option('merger', 'git_dir'):
self.merge_root = self.config.get('merger', 'git_dir')
@ -133,17 +180,48 @@ class LaunchServer(object):
self.merge_name = self.config.get('merger', 'git_user_name')
else:
self.merge_name = None
if self.config.has_option('launcher', 'private_key_file'):
self.private_key_file = config.get('launcher', 'private_key_file')
else:
self.private_key_file = '~/.ssh/id_rsa'
self.connections = connections
self.merger = self._getMerger(self.merge_root)
self.update_queue = DeduplicateQueue()
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(
self.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
path = os.path.join(state_dir, 'launcher.socket')
self.command_socket = commandsocket.CommandSocket(path)
ansible_dir = os.path.join(state_dir, 'ansible')
plugins_dir = os.path.join(ansible_dir, 'plugins')
self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
if not os.path.exists(self.callback_dir):
os.makedirs(self.callback_dir)
self.library_dir = os.path.join(ansible_dir, 'library')
if not os.path.exists(self.library_dir):
os.makedirs(self.library_dir)
callback_path = os.path.dirname(os.path.abspath(
zuul.ansible.plugins.callback_plugins.__file__))
for fn in os.listdir(callback_path):
shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
library_path = os.path.dirname(os.path.abspath(
zuul.ansible.library.__file__))
for fn in os.listdir(library_path):
shutil.copy(os.path.join(library_path, fn), self.library_dir)
def _getMerger(self, root):
return zuul.merger.merger.Merger(root, self.connections,
self.merge_email, self.merge_name)
def start(self):
self._running = True
self._command_running = True
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
@ -155,6 +233,13 @@ class LaunchServer(object):
self.worker.waitForServer()
self.log.debug("Registering")
self.register()
self.log.debug("Starting command processor")
self.command_socket.start()
self.command_thread = threading.Thread(target=self.runCommand)
self.command_thread.daemon = True
self.command_thread.start()
self.log.debug("Starting worker")
self.update_thread = threading.Thread(target=self._updateLoop)
self.update_thread.daemon = True
@ -173,12 +258,42 @@ class LaunchServer(object):
self.log.debug("Stopping")
self._running = False
self.worker.shutdown()
self._command_running = False
self.command_socket.stop()
self.log.debug("Stopped")
def pause(self):
# TODOv3: implement
pass
def unpause(self):
# TODOv3: implement
pass
def graceful(self):
# TODOv3: implement
pass
def verboseOn(self):
# TODOv3: implement
pass
def verboseOff(self):
# TODOv3: implement
pass
def join(self):
self.update_thread.join()
self.thread.join()
def runCommand(self):
while self._command_running:
try:
command = self.command_socket.get()
self.command_map[command]()
except Exception:
self.log.exception("Exception while processing command")
def _updateLoop(self):
while self._running:
try:
@ -271,7 +386,6 @@ class LaunchServer(object):
job.sendWorkStatus(0, 100)
result = self.runAnsible(jobdir, job)
result = dict(result=result)
job.sendWorkComplete(json.dumps(result))
@ -300,23 +414,93 @@ class LaunchServer(object):
with open(jobdir.config, 'w') as config:
config.write('[defaults]\n')
config.write('hostfile = %s\n' % jobdir.inventory)
config.write('keep_remote_files = True\n')
config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
config.write('private_key_file = %s\n' % self.private_key_file)
config.write('retry_files_enabled = False\n')
config.write('log_path = %s\n' % jobdir.ansible_log)
config.write('gathering = explicit\n')
config.write('callback_plugins = %s\n' % self.callback_dir)
config.write('library = %s\n' % self.library_dir)
# bump the timeout because busy nodes may take more than
# 10s to respond
config.write('timeout = 30\n')
config.write('[ssh_connection]\n')
ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
"-o UserKnownHostsFile=%s" % jobdir.known_hosts
config.write('ssh_args = %s\n' % ssh_args)
def _ansibleTimeout(self, proc, msg):
self.log.warning(msg)
self.abortRunningProc(proc)
def abortRunningProc(self, proc):
aborted = False
self.log.debug("Abort: sending kill signal to job "
"process group")
try:
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signal.SIGKILL)
aborted = True
except Exception:
self.log.exception("Exception while killing "
"ansible process:")
return aborted
def runAnsible(self, jobdir, job):
# Job is included here for the benefit of the test framework.
env_copy = os.environ.copy()
env_copy['LOGNAME'] = 'zuul'
if False: # TODOv3: self.options['verbose']:
verbose = '-vvv'
else:
verbose = '-v'
cmd = ['ansible-playbook', jobdir.playbook, verbose]
self.log.debug("Ansible command: %s" % (cmd,))
# TODOv3: verbose
proc = subprocess.Popen(
['ansible-playbook', jobdir.playbook],
cmd,
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
env=env_copy,
)
(out, err) = proc.communicate()
ret = proc.wait()
print(out)
print(err)
ret = None
# TODOv3: get this from the job
timeout = 60
watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
self._ansibleTimeout,
(proc,
"Ansible timeout exceeded"))
watchdog.start()
try:
for line in iter(proc.stdout.readline, b''):
line = line[:1024].rstrip()
self.log.debug("Ansible output: %s" % (line,))
ret = proc.wait()
finally:
watchdog.stop()
self.log.debug("Ansible exit code: %s" % (ret,))
if watchdog.timed_out:
return 'TIMED_OUT'
if ret == 3:
# AnsibleHostUnreachable: We had a network issue connecting to
# our zuul-worker.
return None
elif ret == -9:
# Received abort request.
return None
if ret == 0:
return 'SUCCESS'
else:
return 'FAILURE'
return 'FAILURE'
def cat(self, job):
args = json.loads(job.arguments)