zuul/zuul/launcher/ansiblelaunchserver.py

1398 lines
52 KiB
Python

# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import os
import re
import shutil
import signal
import socket
import subprocess
import tempfile
import threading
import time
import traceback
import Queue
import uuid
import gear
import yaml
import jenkins_jobs.builder
import jenkins_jobs.formatter
import zmq
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
ANSIBLE_WATCHDOG_GRACE = 5 * 60
ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful',
'verbose', 'unverbose']
def boolify(x):
if isinstance(x, str):
return bool(int(x))
return bool(x)
class LaunchGearWorker(gear.Worker):
def __init__(self, *args, **kw):
self.__launch_server = kw.pop('launch_server')
super(LaunchGearWorker, self).__init__(*args, **kw)
def handleNoop(self, packet):
workers = len(self.__launch_server.node_workers)
delay = (workers ** 2) / 1000.0
time.sleep(delay)
return super(LaunchGearWorker, self).handleNoop(packet)
class NodeGearWorker(gear.Worker):
MASS_DO = 101
def sendMassDo(self, functions):
data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions])
self.broadcast_lock.acquire()
try:
p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
self.broadcast(p)
finally:
self.broadcast_lock.release()
class Watchdog(object):
def __init__(self, timeout, function, args):
self.timeout = timeout
self.function = function
self.args = args
self.thread = threading.Thread(target=self._run)
self.thread.daemon = True
def _run(self):
while self._running and time.time() < self.end:
time.sleep(10)
if self._running:
self.function(*self.args)
def start(self):
self._running = True
self.end = time.time() + self.timeout
self.thread.start()
def stop(self):
self._running = False
class JobDir(object):
def __init__(self, keep=False):
self.keep = keep
self.root = tempfile.mkdtemp()
self.ansible_root = os.path.join(self.root, 'ansible')
os.makedirs(self.ansible_root)
self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
self.inventory = os.path.join(self.ansible_root, 'inventory')
self.playbook = os.path.join(self.ansible_root, 'playbook')
self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
self.config = os.path.join(self.ansible_root, 'ansible.cfg')
self.script_root = os.path.join(self.ansible_root, 'scripts')
self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
os.makedirs(self.script_root)
self.staging_root = os.path.join(self.root, 'staging')
os.makedirs(self.staging_root)
def __enter__(self):
return self
def __exit__(self, etype, value, tb):
if not self.keep:
shutil.rmtree(self.root)
class LaunchServer(object):
log = logging.getLogger("zuul.LaunchServer")
site_section_re = re.compile('site "(.*?)"')
node_section_re = re.compile('node "(.*?)"')
def __init__(self, config, keep_jobdir=False):
self.config = config
self.options = dict(
verbose=False
)
self.keep_jobdir = keep_jobdir
self.hostname = socket.gethostname()
self.registered_functions = set()
self.node_workers = {}
self.jobs = {}
self.builds = {}
self.zmq_send_queue = Queue.Queue()
self.termination_queue = Queue.Queue()
self.sites = {}
self.static_nodes = {}
self.command_map = dict(
reconfigure=self.reconfigure,
stop=self.stop,
pause=self.pause,
unpause=self.unpause,
release=self.release,
graceful=self.graceful,
verbose=self.verboseOn,
unverbose=self.verboseOff,
)
if config.has_option('launcher', 'accept_nodes'):
self.accept_nodes = config.getboolean('launcher',
'accept_nodes')
else:
self.accept_nodes = True
self.config_accept_nodes = self.accept_nodes
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(
self.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
path = os.path.join(state_dir, 'launcher.socket')
self.command_socket = commandsocket.CommandSocket(path)
ansible_dir = os.path.join(state_dir, 'ansible')
plugins_dir = os.path.join(ansible_dir, 'plugins')
self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
if not os.path.exists(self.callback_dir):
os.makedirs(self.callback_dir)
self.library_dir = os.path.join(ansible_dir, 'library')
if not os.path.exists(self.library_dir):
os.makedirs(self.library_dir)
callback_path = os.path.dirname(os.path.abspath(
zuul.ansible.plugins.callback_plugins.__file__))
for fn in os.listdir(callback_path):
shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
library_path = os.path.dirname(os.path.abspath(
zuul.ansible.library.__file__))
for fn in os.listdir(library_path):
shutil.copy(os.path.join(library_path, fn), self.library_dir)
for section in config.sections():
m = self.site_section_re.match(section)
if m:
sitename = m.group(1)
d = {}
d['host'] = config.get(section, 'host')
d['user'] = config.get(section, 'user')
if config.has_option(section, 'pass'):
d['pass'] = config.get(section, 'pass')
else:
d['pass'] = ''
if config.has_option(section, 'root'):
d['root'] = config.get(section, 'root')
else:
d['root'] = '/'
self.sites[sitename] = d
continue
m = self.node_section_re.match(section)
if m:
nodename = m.group(1)
d = {}
d['name'] = nodename
d['host'] = config.get(section, 'host')
if config.has_option(section, 'description'):
d['description'] = config.get(section, 'description')
else:
d['description'] = ''
if config.has_option(section, 'labels'):
d['labels'] = config.get(section, 'labels').split(',')
else:
d['labels'] = []
self.static_nodes[nodename] = d
continue
def start(self):
self._gearman_running = True
self._zmq_running = True
self._reaper_running = True
self._command_running = True
# Setup ZMQ
self.zcontext = zmq.Context()
self.zsocket = self.zcontext.socket(zmq.PUB)
self.zsocket.bind("tcp://*:8888")
# Setup Gearman
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = LaunchGearWorker('Zuul Launch Server',
launch_server=self)
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
self.log.debug("Registering")
self.register()
# Start command socket
self.log.debug("Starting command processor")
self.command_socket.start()
self.command_thread = threading.Thread(target=self.runCommand)
self.command_thread.daemon = True
self.command_thread.start()
# Load JJB config
self.loadJobs()
# Start ZMQ worker thread
self.log.debug("Starting ZMQ processor")
self.zmq_thread = threading.Thread(target=self.runZMQ)
self.zmq_thread.daemon = True
self.zmq_thread.start()
# Start node worker reaper thread
self.log.debug("Starting reaper")
self.reaper_thread = threading.Thread(target=self.runReaper)
self.reaper_thread.daemon = True
self.reaper_thread.start()
# Start Gearman worker thread
self.log.debug("Starting worker")
self.gearman_thread = threading.Thread(target=self.run)
self.gearman_thread.daemon = True
self.gearman_thread.start()
# Start static workers
for node in self.static_nodes.values():
self.log.debug("Creating static node with arguments: %s" % (node,))
self._launchWorker(node)
def loadJobs(self):
self.log.debug("Loading jobs")
builder = JJB()
path = self.config.get('launcher', 'jenkins_jobs')
builder.load_files([path])
builder.parser.expandYaml()
unseen = set(self.jobs.keys())
for job in builder.parser.jobs:
builder.expandMacros(job)
self.jobs[job['name']] = job
unseen.discard(job['name'])
for name in unseen:
del self.jobs[name]
def register(self):
new_functions = set()
if self.accept_nodes:
new_functions.add("node_assign:zuul")
new_functions.add("stop:%s" % self.hostname)
new_functions.add("set_description:%s" % self.hostname)
new_functions.add("node_revoke:%s" % self.hostname)
for function in new_functions - self.registered_functions:
self.worker.registerFunction(function)
for function in self.registered_functions - new_functions:
self.worker.unRegisterFunction(function)
self.registered_functions = new_functions
def reconfigure(self):
self.log.debug("Reconfiguring")
self.loadJobs()
for node in self.node_workers.values():
try:
if node.isAlive():
node.queue.put(dict(action='reconfigure'))
except Exception:
self.log.exception("Exception sending reconfigure command "
"to worker:")
self.log.debug("Reconfiguration complete")
def pause(self):
self.log.debug("Pausing")
self.accept_nodes = False
self.register()
for node in self.node_workers.values():
try:
if node.isAlive():
node.queue.put(dict(action='pause'))
except Exception:
self.log.exception("Exception sending pause command "
"to worker:")
self.log.debug("Paused")
def unpause(self):
self.log.debug("Unpausing")
self.accept_nodes = self.config_accept_nodes
self.register()
for node in self.node_workers.values():
try:
if node.isAlive():
node.queue.put(dict(action='unpause'))
except Exception:
self.log.exception("Exception sending unpause command "
"to worker:")
self.log.debug("Unpaused")
def release(self):
self.log.debug("Releasing idle nodes")
for node in self.node_workers.values():
if node.name in self.static_nodes:
continue
try:
if node.isAlive():
node.queue.put(dict(action='release'))
except Exception:
self.log.exception("Exception sending release command "
"to worker:")
self.log.debug("Finished releasing idle nodes")
def graceful(self):
# Note: this is run in the command processing thread; no more
# external commands will be processed after this.
self.log.debug("Gracefully stopping")
self.pause()
self.release()
self.log.debug("Waiting for all builds to finish")
while self.builds:
time.sleep(5)
self.log.debug("All builds are finished")
self.stop()
def stop(self):
self.log.debug("Stopping")
# First, stop accepting new jobs
self._gearman_running = False
self._reaper_running = False
self.worker.shutdown()
# Then stop all of the workers
for node in self.node_workers.values():
try:
if node.isAlive():
node.stop()
except Exception:
self.log.exception("Exception sending stop command to worker:")
# Stop ZMQ afterwords so that the send queue is flushed
self._zmq_running = False
self.zmq_send_queue.put(None)
self.zmq_send_queue.join()
# Stop command processing
self._command_running = False
self.command_socket.stop()
# Join the gearman thread which was stopped earlier.
self.gearman_thread.join()
# The command thread is joined in the join() method of this
# class, which is called by the command shell.
self.log.debug("Stopped")
def verboseOn(self):
self.log.debug("Enabling verbose mode")
self.options['verbose'] = True
def verboseOff(self):
self.log.debug("Disabling verbose mode")
self.options['verbose'] = False
def join(self):
self.command_thread.join()
def runCommand(self):
while self._command_running:
try:
command = self.command_socket.get()
self.command_map[command]()
except Exception:
self.log.exception("Exception while processing command")
def runZMQ(self):
while self._zmq_running or not self.zmq_send_queue.empty():
try:
item = self.zmq_send_queue.get()
self.log.debug("Got ZMQ event %s" % (item,))
if item is None:
continue
self.zsocket.send(item)
except Exception:
self.log.exception("Exception while processing ZMQ events")
finally:
self.zmq_send_queue.task_done()
def run(self):
while self._gearman_running:
try:
job = self.worker.getJob()
try:
if job.name.startswith('node_assign:'):
self.log.debug("Got node_assign job: %s" % job.unique)
self.assignNode(job)
elif job.name.startswith('stop:'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
elif job.name.startswith('set_description:'):
self.log.debug("Got set_description job: %s" %
job.unique)
job.sendWorkComplete()
elif job.name.startswith('node_revoke:'):
self.log.debug("Got node_revoke job: %s" % job.unique)
self.revokeNode(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(traceback.format_exc())
except gear.InterruptedError:
return
except Exception:
self.log.exception("Exception while getting job")
def assignNode(self, job):
args = json.loads(job.arguments)
self.log.debug("Assigned node with arguments: %s" % (args,))
self._launchWorker(args)
data = dict(manager=self.hostname)
job.sendWorkData(json.dumps(data))
job.sendWorkComplete()
def _launchWorker(self, args):
worker = NodeWorker(self.config, self.jobs, self.builds,
self.sites, args['name'], args['host'],
args['description'], args['labels'],
self.hostname, self.zmq_send_queue,
self.termination_queue, self.keep_jobdir,
self.callback_dir, self.library_dir,
self.options)
self.node_workers[worker.name] = worker
worker.thread = threading.Thread(target=worker.run)
worker.thread.start()
def revokeNode(self, job):
try:
args = json.loads(job.arguments)
self.log.debug("Revoke job with arguments: %s" % (args,))
name = args['name']
node = self.node_workers.get(name)
if not node:
self.log.debug("Unable to find worker %s" % (name,))
return
try:
if node.isAlive():
node.queue.put(dict(action='stop'))
else:
self.log.debug("Node %s is not alive while revoking node" %
(node.name,))
except Exception:
self.log.exception("Exception sending stop command "
"to worker:")
finally:
job.sendWorkComplete()
def stopJob(self, job):
try:
args = json.loads(job.arguments)
self.log.debug("Stop job with arguments: %s" % (args,))
unique = args['number']
build_worker_name = self.builds.get(unique)
if not build_worker_name:
self.log.debug("Unable to find build for job %s" % (unique,))
return
node = self.node_workers.get(build_worker_name)
if not node:
self.log.debug("Unable to find worker for job %s" % (unique,))
return
try:
if node.isAlive():
node.queue.put(dict(action='abort'))
else:
self.log.debug("Node %s is not alive while aborting job" %
(node.name,))
except Exception:
self.log.exception("Exception sending abort command "
"to worker:")
finally:
job.sendWorkComplete()
def runReaper(self):
# We don't actually care if all the events are processed
while self._reaper_running:
try:
item = self.termination_queue.get()
self.log.debug("Got termination event %s" % (item,))
if item is None:
continue
worker = self.node_workers[item]
self.log.debug("Joining %s" % (item,))
worker.thread.join()
self.log.debug("Joined %s" % (item,))
del self.node_workers[item]
except Exception:
self.log.exception("Exception while processing "
"termination events:")
finally:
self.termination_queue.task_done()
class NodeWorker(object):
def __init__(self, config, jobs, builds, sites, name, host,
description, labels, manager_name, zmq_send_queue,
termination_queue, keep_jobdir, callback_dir,
library_dir, options):
self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
self.log.debug("Creating node worker %s" % (name,))
self.config = config
self.jobs = jobs
self.builds = builds
self.sites = sites
self.name = name
self.host = host
self.description = description
if not isinstance(labels, list):
labels = [labels]
self.labels = labels
self.thread = None
self.registered_functions = set()
# If the unpaused Event is set, that means we should run jobs.
# If it is clear, then we are paused and should not run jobs.
self.unpaused = threading.Event()
self.unpaused.set()
self._running = True
self.queue = Queue.Queue()
self.manager_name = manager_name
self.zmq_send_queue = zmq_send_queue
self.termination_queue = termination_queue
self.keep_jobdir = keep_jobdir
self.running_job_lock = threading.Lock()
self.pending_registration = False
self.registration_lock = threading.Lock()
self._get_job_lock = threading.Lock()
self._got_job = False
self._job_complete_event = threading.Event()
self._running_job = False
self._aborted_job = False
self._watchdog_timeout = False
self._sent_complete_event = False
self.ansible_job_proc = None
self.ansible_post_proc = None
self.workspace_root = config.get('launcher', 'workspace_root')
if self.config.has_option('launcher', 'private_key_file'):
self.private_key_file = config.get('launcher', 'private_key_file')
else:
self.private_key_file = '~/.ssh/id_rsa'
if self.config.has_option('launcher', 'username'):
self.username = config.get('launcher', 'username')
else:
self.username = 'zuul'
if self.config.has_option('launcher', 'register_labels'):
self.register_labels = config.getboolean('launcher',
'register_labels')
else:
self.register_labels = True
self.callback_dir = callback_dir
self.library_dir = library_dir
self.options = options
def isAlive(self):
# Meant to be called from the manager
if self.thread and self.thread.is_alive():
return True
return False
def run(self):
self.log.debug("Node worker %s starting" % (self.name,))
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = NodeGearWorker(self.name)
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
self.log.debug("Registering")
self.register()
self.gearman_thread = threading.Thread(target=self.runGearman)
self.gearman_thread.daemon = True
self.gearman_thread.start()
self.log.debug("Started")
while self._running or not self.queue.empty():
try:
self._runQueue()
except Exception:
self.log.exception("Exception in queue manager:")
def stop(self):
# If this is called locally, setting _running will be
# effictive, if it's called remotely, it will not be, but it
# will be set by the queue thread.
self.log.debug("Submitting stop request")
self._running = False
self.unpaused.set()
self.queue.put(dict(action='stop'))
self.queue.join()
def pause(self):
self.unpaused.clear()
self.worker.stopWaitingForJobs()
def unpause(self):
self.unpaused.set()
def release(self):
# If this node is idle, stop it.
old_unpaused = self.unpaused.is_set()
if old_unpaused:
self.pause()
with self._get_job_lock:
if self._got_job:
self.log.debug("This worker is not idle")
if old_unpaused:
self.unpause()
return
self.log.debug("Stopping due to release command")
self.queue.put(dict(action='stop'))
def _runQueue(self):
item = self.queue.get()
try:
if item['action'] == 'stop':
self.log.debug("Received stop request")
self._running = False
self.termination_queue.put(self.name)
if not self.abortRunningJob():
self.sendFakeCompleteEvent()
else:
self._job_complete_event.wait()
self.worker.shutdown()
if item['action'] == 'pause':
self.log.debug("Received pause request")
self.pause()
if item['action'] == 'unpause':
self.log.debug("Received unpause request")
self.unpause()
if item['action'] == 'release':
self.log.debug("Received release request")
self.release()
elif item['action'] == 'reconfigure':
self.log.debug("Received reconfigure request")
self.register()
elif item['action'] == 'abort':
self.log.debug("Received abort request")
self.abortRunningJob()
finally:
self.queue.task_done()
def runGearman(self):
while self._running:
try:
self.unpaused.wait()
if self._running:
self._runGearman()
except Exception:
self.log.exception("Exception in gearman manager:")
with self._get_job_lock:
self._got_job = False
def _runGearman(self):
if self.pending_registration:
self.register()
with self._get_job_lock:
try:
job = self.worker.getJob()
self._got_job = True
except gear.InterruptedError:
return
self.log.debug("Node worker %s got job %s" % (self.name, job.name))
try:
if job.name not in self.registered_functions:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
return
self.launch(job)
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(traceback.format_exc())
def generateFunctionNames(self, job):
# This only supports "node: foo" and "node: foo || bar"
ret = set()
job_labels = job.get('node')
matching_labels = set()
if job_labels:
job_labels = [x.strip() for x in job_labels.split('||')]
matching_labels = set(self.labels) & set(job_labels)
if not matching_labels:
return ret
ret.add('build:%s' % (job['name'],))
if self.register_labels:
for label in matching_labels:
ret.add('build:%s:%s' % (job['name'], label))
return ret
def register(self):
if not self.registration_lock.acquire(False):
self.log.debug("Registration already in progress")
return
try:
if self._running_job:
self.pending_registration = True
self.log.debug("Ignoring registration due to running job")
return
self.log.debug("Updating registration")
self.pending_registration = False
new_functions = set()
for job in self.jobs.values():
new_functions |= self.generateFunctionNames(job)
self.worker.sendMassDo(new_functions)
self.registered_functions = new_functions
finally:
self.registration_lock.release()
def abortRunningJob(self):
self._aborted_job = True
return self.abortRunningProc(self.ansible_job_proc)
def abortRunningProc(self, proc):
aborted = False
self.log.debug("Abort: acquiring job lock")
with self.running_job_lock:
if self._running_job:
self.log.debug("Abort: a job is running")
if proc:
self.log.debug("Abort: sending kill signal to job "
"process group")
try:
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signal.SIGKILL)
aborted = True
except Exception:
self.log.exception("Exception while killing "
"ansible process:")
else:
self.log.debug("Abort: no job is running")
return aborted
def launch(self, job):
self.log.info("Node worker %s launching job %s" %
(self.name, job.name))
# Make sure we can parse what we need from the job first
args = json.loads(job.arguments)
offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
job_name = job.name.split(':')[1]
# Initialize the result so we have something regardless of
# whether the job actually runs
result = None
self._sent_complete_event = False
self._aborted_job = False
self._watchog_timeout = False
try:
self.sendStartEvent(job_name, args)
except Exception:
self.log.exception("Exception while sending job start event")
try:
result = self.runJob(job, args)
except Exception:
self.log.exception("Exception while launching job thread")
self._running_job = False
try:
data = json.dumps(dict(result=result))
job.sendWorkComplete(data)
except Exception:
self.log.exception("Exception while sending job completion packet")
try:
self.sendCompleteEvent(job_name, result, args)
except Exception:
self.log.exception("Exception while sending job completion event")
try:
del self.builds[job.unique]
except Exception:
self.log.exception("Exception while clearing build record")
self._job_complete_event.set()
if offline and self._running:
self.stop()
def sendStartEvent(self, name, parameters):
build = dict(node_name=self.name,
host_name=self.manager_name,
parameters=parameters)
event = dict(name=name,
build=build)
item = "onStarted %s" % json.dumps(event)
self.log.debug("Sending over ZMQ: %s" % (item,))
self.zmq_send_queue.put(item)
def sendCompleteEvent(self, name, status, parameters):
build = dict(status=status,
node_name=self.name,
host_name=self.manager_name,
parameters=parameters)
event = dict(name=name,
build=build)
item = "onFinalized %s" % json.dumps(event)
self.log.debug("Sending over ZMQ: %s" % (item,))
self.zmq_send_queue.put(item)
self._sent_complete_event = True
def sendFakeCompleteEvent(self):
if self._sent_complete_event:
return
self.sendCompleteEvent('zuul:launcher-shutdown',
'SUCCESS', {})
def runJob(self, job, args):
self.ansible_job_proc = None
self.ansible_post_proc = None
result = None
with self.running_job_lock:
if not self._running:
return result
self._running_job = True
self._job_complete_event.clear()
self.log.debug("Job %s: beginning" % (job.unique,))
self.builds[job.unique] = self.name
with JobDir(self.keep_jobdir) as jobdir:
self.log.debug("Job %s: job root at %s" %
(job.unique, jobdir.root))
timeout = self.prepareAnsibleFiles(jobdir, job, args)
data = {
'manager': self.manager_name,
'number': job.unique,
'url': 'telnet://%s:19885' % self.host,
}
job.sendWorkData(json.dumps(data))
job.sendWorkStatus(0, 100)
job_status = self.runAnsiblePlaybook(jobdir, timeout)
if job_status is None:
# The result of the job is indeterminate. Zuul will
# run it again.
return result
post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
if not post_status:
result = 'POST_FAILURE'
elif job_status:
result = 'SUCCESS'
else:
result = 'FAILURE'
if self._aborted_job and not self._watchdog_timeout:
# A Null result will cause zuul to relaunch the job if
# it needs to.
result = None
return result
def getHostList(self):
return [('node', dict(
ansible_host=self.host, ansible_user=self.username))]
def _substituteVariables(self, text, variables):
def lookup(match):
return variables.get(match.group(1), '')
return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
def _getRsyncOptions(self, source, parameters):
# Treat the publisher source as a filter; ant and rsync behave
# fairly close in this manner, except for leading directories.
source = self._substituteVariables(source, parameters)
# If the source starts with ** then we want to match any
# number of directories, so don't anchor the include filter.
# If it does not start with **, then the intent is likely to
# at least start by matching an immediate file or subdirectory
# (even if later we have a ** in the middle), so in this case,
# anchor it to the root of the transfer (the workspace).
if not source.startswith('**'):
source = os.path.join('/', source)
# These options mean: include the thing we want, include any
# directories (so that we continue to search for the thing we
# want no matter how deep it is), exclude anything that
# doesn't match the thing we want or is a directory, then get
# rid of empty directories left over at the end.
rsync_opts = ['--include="%s"' % source,
'--include="*/"',
'--exclude="*"',
'--prune-empty-dirs']
return rsync_opts
def _makeSCPTask(self, jobdir, publisher, parameters):
tasks = []
for scpfile in publisher['scp']['files']:
scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
os.chmod(scproot, 0o755)
site = publisher['scp']['site']
if scpfile.get('copy-console'):
# Include the local ansible directory in the console
# upload. This uploads the playbook and ansible logs.
copyargs = dict(src=jobdir.ansible_root + '/',
dest=os.path.join(scproot, '_zuul_ansible'))
task = dict(copy=copyargs,
delegate_to='127.0.0.1')
tasks.append(task)
# Fetch the console log from the remote host.
src = '/tmp/console.html'
rsync_opts = []
else:
src = parameters['WORKSPACE']
if not src.endswith('/'):
src = src + '/'
rsync_opts = self._getRsyncOptions(scpfile['source'],
parameters)
syncargs = dict(src=src,
dest=scproot,
copy_links='yes',
mode='pull')
if rsync_opts:
syncargs['rsync_opts'] = rsync_opts
task = dict(synchronize=syncargs)
if not scpfile.get('copy-after-failure'):
task['when'] = 'success'
tasks.append(task)
task = self._makeSCPTaskLocalAction(
site, scpfile, scproot, parameters)
tasks.append(task)
return tasks
def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
if site not in self.sites:
raise Exception("Undefined SCP site: %s" % (site,))
site = self.sites[site]
dest = scpfile['target'].lstrip('/')
dest = self._substituteVariables(dest, parameters)
dest = os.path.join(site['root'], dest)
dest = os.path.normpath(dest)
if not dest.startswith(site['root']):
raise Exception("Target path %s is not below site root" %
(dest,))
rsync_cmd = [
'/usr/bin/rsync', '--delay-updates', '-F',
'--compress', '-rt', '--safe-links',
'--rsync-path="mkdir -p {dest} && rsync"',
'--rsh="/usr/bin/ssh -i {private_key_file} -S none '
'-o StrictHostKeyChecking=no -q"',
'--out-format="<<CHANGED>>%i %n%L"',
'{source}', '"{user}@{host}:{dest}"'
]
if scpfile.get('keep-hierarchy'):
source = '"%s/"' % scproot
else:
source = '`/usr/bin/find "%s" -type f`' % scproot
shellargs = ' '.join(rsync_cmd).format(
source=source,
dest=dest,
private_key_file=self.private_key_file,
host=site['host'],
user=site['user'])
task = dict(shell=shellargs,
delegate_to='127.0.0.1')
if not scpfile.get('copy-after-failure'):
task['when'] = 'success'
return task
def _makeFTPTask(self, jobdir, publisher, parameters):
tasks = []
ftp = publisher['ftp']
site = ftp['site']
if site not in self.sites:
raise Exception("Undefined FTP site: %s" % site)
site = self.sites[site]
ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
ftpcontent = os.path.join(ftproot, 'content')
os.makedirs(ftpcontent)
ftpscript = os.path.join(ftproot, 'script')
src = parameters['WORKSPACE']
if not src.endswith('/'):
src = src + '/'
rsync_opts = self._getRsyncOptions(ftp['source'],
parameters)
syncargs = dict(src=src,
dest=ftpcontent,
copy_links='yes',
mode='pull')
if rsync_opts:
syncargs['rsync_opts'] = rsync_opts
task = dict(synchronize=syncargs,
when='success')
tasks.append(task)
task = dict(shell='lftp -f %s' % ftpscript,
when='success',
delegate_to='127.0.0.1')
ftpsource = ftpcontent
if ftp.get('remove-prefix'):
ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
while ftpsource[-1] == '/':
ftpsource = ftpsource[:-1]
ftptarget = ftp['target'].lstrip('/')
ftptarget = self._substituteVariables(ftptarget, parameters)
ftptarget = os.path.join(site['root'], ftptarget)
ftptarget = os.path.normpath(ftptarget)
if not ftptarget.startswith(site['root']):
raise Exception("Target path %s is not below site root" %
(ftptarget,))
while ftptarget[-1] == '/':
ftptarget = ftptarget[:-1]
with open(ftpscript, 'w') as script:
script.write('open %s\n' % site['host'])
script.write('user %s %s\n' % (site['user'], site['pass']))
script.write('mirror -R %s %s\n' % (ftpsource, ftptarget))
tasks.append(task)
return tasks
def _makeBuilderTask(self, jobdir, builder, parameters):
tasks = []
script_fn = '%s.sh' % str(uuid.uuid4().hex)
script_path = os.path.join(jobdir.script_root, script_fn)
with open(script_path, 'w') as script:
data = builder['shell']
if not data.startswith('#!'):
data = '#!/bin/bash -x\n %s' % (data,)
script.write(data)
remote_path = os.path.join('/tmp', script_fn)
copy = dict(src=script_path,
dest=remote_path,
mode=0o555)
task = dict(copy=copy)
tasks.append(task)
runner = dict(command=remote_path,
cwd=parameters['WORKSPACE'],
parameters=parameters)
task = dict(zuul_runner=runner)
task['name'] = ('zuul_runner with {{ timeout | int - elapsed_time }} '
'second timeout')
task['when'] = '{{ elapsed_time < timeout | int }}'
task['async'] = '{{ timeout | int - elapsed_time }}'
task['poll'] = 5
tasks.append(task)
filetask = dict(path=remote_path,
state='absent')
task = dict(file=filetask)
tasks.append(task)
return tasks
def _transformPublishers(self, jjb_job):
early_publishers = []
late_publishers = []
old_publishers = jjb_job.get('publishers', [])
for publisher in old_publishers:
early_scpfiles = []
late_scpfiles = []
if 'scp' not in publisher:
early_publishers.append(publisher)
continue
copy_console = False
for scpfile in publisher['scp']['files']:
if scpfile.get('copy-console'):
scpfile['keep-hierarchy'] = True
late_scpfiles.append(scpfile)
copy_console = True
else:
early_scpfiles.append(scpfile)
publisher['scp']['files'] = early_scpfiles + late_scpfiles
if copy_console:
late_publishers.append(publisher)
else:
early_publishers.append(publisher)
publishers = early_publishers + late_publishers
if old_publishers != publishers:
self.log.debug("Transformed job publishers")
return early_publishers, late_publishers
def prepareAnsibleFiles(self, jobdir, gearman_job, args):
job_name = gearman_job.name.split(':')[1]
jjb_job = self.jobs[job_name]
parameters = args.copy()
parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
with open(jobdir.inventory, 'w') as inventory:
for host_name, host_vars in self.getHostList():
inventory.write(host_name)
for k, v in host_vars.items():
inventory.write(' %s=%s' % (k, v))
inventory.write('\n')
timeout = None
timeout_var = None
for wrapper in jjb_job.get('wrappers', []):
if isinstance(wrapper, dict):
build_timeout = wrapper.get('timeout')
if isinstance(build_timeout, dict):
timeout_var = build_timeout.get('timeout-var')
timeout = build_timeout.get('timeout')
if timeout is not None:
timeout = int(timeout) * 60
if not timeout:
timeout = ANSIBLE_DEFAULT_TIMEOUT
if timeout_var:
parameters[timeout_var] = str(timeout * 1000)
with open(jobdir.playbook, 'w') as playbook:
pre_tasks = []
tasks = []
main_block = []
error_block = []
variables = []
shellargs = "ssh-keyscan %s > %s" % (
self.host, jobdir.known_hosts)
pre_tasks.append(dict(shell=shellargs,
delegate_to='127.0.0.1'))
tasks.append(dict(block=main_block,
rescue=error_block))
task = dict(file=dict(path='/tmp/console.html', state='absent'))
main_block.append(task)
task = dict(zuul_console=dict(path='/tmp/console.html',
port=19885))
main_block.append(task)
task = dict(file=dict(path=parameters['WORKSPACE'],
state='directory'))
main_block.append(task)
msg = [
"Launched by %s" % self.manager_name,
"Building remotely on %s in workspace %s" % (
self.name, parameters['WORKSPACE'])]
task = dict(zuul_log=dict(msg=msg))
main_block.append(task)
for builder in jjb_job.get('builders', []):
if 'shell' in builder:
main_block.extend(
self._makeBuilderTask(jobdir, builder, parameters))
task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"))
main_block.append(task)
task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
error_block.append(task)
error_block.append(dict(fail=dict(msg='FAILURE')))
variables.append(dict(timeout=timeout))
play = dict(hosts='node', name='Job body', vars=variables,
pre_tasks=pre_tasks, tasks=tasks)
playbook.write(yaml.safe_dump([play], default_flow_style=False))
early_publishers, late_publishers = self._transformPublishers(jjb_job)
with open(jobdir.post_playbook, 'w') as playbook:
blocks = []
for publishers in [early_publishers, late_publishers]:
block = []
for publisher in publishers:
if 'scp' in publisher:
block.extend(self._makeSCPTask(jobdir, publisher,
parameters))
if 'ftp' in publisher:
block.extend(self._makeFTPTask(jobdir, publisher,
parameters))
blocks.append(block)
# The 'always' section contains the log publishing tasks,
# the 'block' contains all the other publishers. This way
# we run the log publisher regardless of whether the rest
# of the publishers succeed.
tasks = []
tasks.append(dict(block=blocks[0],
always=blocks[1]))
play = dict(hosts='node', name='Publishers',
tasks=tasks)
playbook.write(yaml.safe_dump([play], default_flow_style=False))
with open(jobdir.config, 'w') as config:
config.write('[defaults]\n')
config.write('hostfile = %s\n' % jobdir.inventory)
config.write('keep_remote_files = True\n')
config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
config.write('private_key_file = %s\n' % self.private_key_file)
config.write('retry_files_enabled = False\n')
config.write('log_path = %s\n' % jobdir.ansible_log)
config.write('gathering = explicit\n')
config.write('callback_plugins = %s\n' % self.callback_dir)
config.write('library = %s\n' % self.library_dir)
config.write('[ssh_connection]\n')
ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
"-o UserKnownHostsFile=%s" % jobdir.known_hosts
config.write('ssh_args = %s\n' % ssh_args)
return timeout
def _ansibleTimeout(self, proc, msg):
self._watchdog_timeout = True
self.log.warning(msg)
self.abortRunningProc(proc)
def runAnsiblePlaybook(self, jobdir, timeout):
# Set LOGNAME env variable so Ansible log_path log reports
# the correct user.
env_copy = os.environ.copy()
env_copy['LOGNAME'] = 'zuul'
if self.options['verbose']:
verbose = '-vvv'
else:
verbose = '-v'
cmd = ['ansible-playbook', jobdir.playbook, verbose]
self.log.debug("Ansible command: %s" % (cmd,))
self.ansible_job_proc = subprocess.Popen(
cmd,
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
env=env_copy,
)
ret = None
watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
self._ansibleTimeout,
(self.ansible_job_proc,
"Ansible timeout exceeded"))
watchdog.start()
try:
for line in iter(self.ansible_job_proc.stdout.readline, b''):
line = line[:1024].rstrip()
self.log.debug("Ansible output: %s" % (line,))
ret = self.ansible_job_proc.wait()
finally:
watchdog.stop()
self.log.debug("Ansible exit code: %s" % (ret,))
self.ansible_job_proc = None
if self._watchdog_timeout:
return False
if ret == 3:
# AnsibleHostUnreachable: We had a network issue connecting to
# our zuul-worker.
return None
elif ret == -9:
# Received abort request.
return None
return ret == 0
def runAnsiblePostPlaybook(self, jobdir, success):
# Set LOGNAME env variable so Ansible log_path log reports
# the correct user.
env_copy = os.environ.copy()
env_copy['LOGNAME'] = 'zuul'
if self.options['verbose']:
verbose = '-vvv'
else:
verbose = '-v'
cmd = ['ansible-playbook', jobdir.post_playbook,
'-e', 'success=%s' % success, verbose]
self.log.debug("Ansible post command: %s" % (cmd,))
self.ansible_post_proc = subprocess.Popen(
cmd,
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
env=env_copy,
)
ret = None
watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
self._ansibleTimeout,
(self.ansible_post_proc,
"Ansible post timeout exceeded"))
watchdog.start()
try:
for line in iter(self.ansible_post_proc.stdout.readline, b''):
line = line[:1024].rstrip()
self.log.debug("Ansible post output: %s" % (line,))
ret = self.ansible_post_proc.wait()
finally:
watchdog.stop()
self.log.debug("Ansible post exit code: %s" % (ret,))
self.ansible_post_proc = None
return ret == 0
class JJB(jenkins_jobs.builder.Builder):
def __init__(self):
self.global_config = None
self._plugins_list = []
def expandComponent(self, component_type, component, template_data):
component_list_type = component_type + 's'
new_components = []
if isinstance(component, dict):
name, component_data = next(iter(component.items()))
if template_data:
component_data = jenkins_jobs.formatter.deep_format(
component_data, template_data, True)
else:
name = component
component_data = {}
new_component = self.parser.data.get(component_type, {}).get(name)
if new_component:
for new_sub_component in new_component[component_list_type]:
new_components.extend(
self.expandComponent(component_type,
new_sub_component, component_data))
else:
new_components.append({name: component_data})
return new_components
def expandMacros(self, job):
for component_type in ['builder', 'publisher', 'wrapper']:
component_list_type = component_type + 's'
new_components = []
for new_component in job.get(component_list_type, []):
new_components.extend(self.expandComponent(component_type,
new_component, {}))
job[component_list_type] = new_components