diff --git a/zuul/ansible/__init__.py b/zuul/ansible/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/ansible/library/__init__.py b/zuul/ansible/library/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py new file mode 100644 index 0000000000..75542445ee --- /dev/null +++ b/zuul/ansible/library/zuul_runner.py @@ -0,0 +1,74 @@ +#!/usr/bin/python + +# Copyright (c) 2016 IBM Corp. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import datetime +import subprocess + + +class Console(object): + def __enter__(self): + self.logfile = open('/tmp/console.log', 'w+') + return self + + def __exit__(self, etype, value, tb): + self.logfile.close() + + def addLine(self, ln): + ts = datetime.datetime.now() + outln = '%s %s' % (str(ts), ln) + self.logfile.write(outln) + + +def run(cwd, cmd, args): + proc = subprocess.Popen( + [cmd], + cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=args, + ) + + with Console() as console: + while True: + line = proc.stdout.readline() + if not line: + break + console.addLine(line) + + ret = proc.wait() + return ret + + +def main(): + module = AnsibleModule( + argument_spec=dict( + command=dict(required=True, default=None), + cwd=dict(required=True, default=None), + parameters=dict(default={}, type='dict') + ) + ) + + p = module.params + ret = run(p['cwd'], p['command'], p['parameters']) + if ret == 0: + module.exit_json(changed=True, rc=ret) + else: + module.fail_json(msg="Exit code %s" % ret, rc=ret) + +from ansible.module_utils.basic import * # noqa + +main() diff --git a/zuul/ansible/plugins/__init__.py b/zuul/ansible/plugins/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/ansible/plugins/callback_plugins/__init__.py b/zuul/ansible/plugins/callback_plugins/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zuul/ansible/plugins/callback_plugins/timeout.py b/zuul/ansible/plugins/callback_plugins/timeout.py new file mode 100644 index 0000000000..245e9884ec --- /dev/null +++ b/zuul/ansible/plugins/callback_plugins/timeout.py @@ -0,0 +1,57 @@ +# Copyright 2016 IBM Corp. +# +# This file is part of Zuul +# +# This file is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This file is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this file. If not, see . + +import time + +from ansible.executor.task_result import TaskResult +from ansible.plugins.callback import CallbackBase + + +class CallbackModule(CallbackBase): + def __init__(self, *args, **kw): + super(CallbackModule, self).__init__(*args, **kw) + self._elapsed_time = 0.0 + self._task_start_time = None + self._play = None + + def v2_playbook_on_play_start(self, play): + self._play = play + + def playbook_on_task_start(self, name, is_conditional): + self._task_start_time = time.time() + + def v2_on_any(self, *args, **kw): + result = None + if args and isinstance(args[0], TaskResult): + result = args[0] + if not result: + return + + if self._task_start_time is not None: + task_time = time.time() - self._task_start_time + self._elapsed_time += task_time + if self._play and result._host: + manager = self._play.get_variable_manager() + facts = dict(elapsed_time=self._elapsed_time) + + overall_timeout = manager.extra_vars.get('timeout') + if overall_timeout is not None: + timeout = int(overall_timeout) - int(self._elapsed_time) + facts['timeout'] = timeout + + manager.set_nonpersistent_facts(result._host, facts) + self._task_start_time = None diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py index 4e652d26f8..8eb0374627 100644 --- a/zuul/launcher/ansiblelaunchserver.py +++ b/zuul/launcher/ansiblelaunchserver.py @@ -16,7 +16,6 @@ import json import logging import multiprocessing import os -import Queue import re import shutil import signal @@ -24,7 +23,6 @@ import socket import subprocess import tempfile import threading -import time import traceback import uuid @@ -33,6 +31,9 @@ import yaml import jenkins_jobs.builder import zmq +import zuul.ansible.library +import zuul.ansible.plugins.callback_plugins + class JobDir(object): def __init__(self): @@ -41,6 +42,8 @@ class JobDir(object): os.makedirs(self.git_root) self.ansible_root = os.path.join(self.root, 'ansible') os.makedirs(self.ansible_root) + self.plugins_root = os.path.join(self.ansible_root, 'plugins') + os.makedirs(self.plugins_root) self.inventory = os.path.join(self.ansible_root, 'inventory') self.playbook = os.path.join(self.ansible_root, 'playbook') self.post_playbook = os.path.join(self.ansible_root, 'post_playbook') @@ -293,8 +296,7 @@ class NodeWorker(object): self._job_complete_event = threading.Event() self._running_job = False self._sent_complete_event = False - self._job_timeout = None - self._job_start_time = None + self.workspace_root = config.get('launcher', 'workspace_root') def isAlive(self): # Meant to be called from the manager @@ -336,23 +338,7 @@ class NodeWorker(object): self.queue.join() def _runQueue(self): - # This also runs the timeout function if needed - try: - item = self.queue.get(True, 10) # 10 second resolution on timeout - except Queue.Empty: - # We don't need these in a critical section, but we do - # need them not to change while we evaluate them, so make - # local copies. - running = self._running_job - start = self._job_start_time - timeout = self._job_timeout - now = time.time() - if (running and timeout and start - and now - start >= timeout): - self.log.info("Job timed out after %s seconds" % - (now - start,)) - self.abortRunningJob() - return + item = self.queue.get() try: if item['action'] == 'stop': self.log.debug("Received stop request") @@ -466,13 +452,11 @@ class NodeWorker(object): self.log.exception("Exception while sending job start event") try: - result = self.runJob(job) + result = self.runJob(job, args) except Exception: self.log.exception("Exception while launching job thread") self._running_job = False - self._job_timeout = None - self._job_start_time = None if not result: result = b'' @@ -527,7 +511,7 @@ class NodeWorker(object): self.sendCompleteEvent('zuul:launcher-shutdown', 'SUCCESS', {}) - def runJob(self, job): + def runJob(self, job, args): self.ansible_proc = None result = None with self.running_job_lock: @@ -541,10 +525,7 @@ class NodeWorker(object): with JobDir() as jobdir: self.log.debug("Job %s: job root at %s" % (job.unique, jobdir.root)) - - self.prepareAnsibleFiles(jobdir, job) - - self._job_start_time = time.time() + timeout = self.prepareAnsibleFiles(jobdir, job, args) data = { 'manager': self.manager_name, @@ -554,7 +535,7 @@ class NodeWorker(object): job.sendWorkData(json.dumps(data)) job.sendWorkStatus(0, 100) - job_status = self.runAnsiblePlaybook(jobdir) + job_status = self.runAnsiblePlaybook(jobdir, timeout) post_status = self.runAnsiblePostPlaybook(jobdir, job_status) if job_status and post_status: status = 'SUCCESS' @@ -621,7 +602,46 @@ class NodeWorker(object): tasks.append(task) return tasks - def prepareAnsibleFiles(self, jobdir, gearman_job): + def _makeBuilderTask(self, jobdir, builder, parameters, timeout): + tasks = [] + script_fn = '%s.sh' % str(uuid.uuid4().hex) + script_path = os.path.join(jobdir.script_root, script_fn) + with open(script_path, 'w') as script: + script.write(builder['shell']) + + remote_path = os.path.join('/tmp', script_fn) + copy = dict(src=script_path, + dest=remote_path, + mode=0555) + task = dict(copy=copy) + tasks.append(task) + + runner = dict(command=remote_path, + cwd=parameters['WORKSPACE'], + parameters=parameters) + task = dict(zuul_runner=runner) + if timeout: + task['when'] = '{{ timeout | int > 0 }}' + task['async'] = '{{ timeout }}' + else: + task['async'] = 2 * 60 * 60 # 2 hour default timeout + task['poll'] = 5 + tasks.append(task) + + filetask = dict(path=remote_path, + state='absent') + task = dict(file=filetask) + tasks.append(task) + + return tasks + + def prepareAnsibleFiles(self, jobdir, gearman_job, args): + job_name = gearman_job.name.split(':')[1] + jjb_job = self.jobs[job_name] + + parameters = args.copy() + parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name) + with open(jobdir.inventory, 'w') as inventory: for host_name, host_vars in self.getHostList(): inventory.write(host_name) @@ -629,27 +649,30 @@ class NodeWorker(object): for k, v in host_vars.items(): inventory.write('%s=%s' % (k, v)) inventory.write('\n') - job_name = gearman_job.name.split(':')[1] - jjb_job = self.jobs[job_name] + timeout = None for wrapper in jjb_job.get('wrappers', []): if isinstance(wrapper, dict): timeout = wrapper.get('build-timeout', {}) if isinstance(timeout, dict): timeout = timeout.get('timeout') if timeout: - self._job_timeout = timeout * 60 + timeout = timeout * 60 with open(jobdir.playbook, 'w') as playbook: tasks = [] + + task = dict(file=dict(path='/tmp/console.log', state='absent')) + tasks.append(task) + + task = dict(file=dict(path=parameters['WORKSPACE'], + state='directory')) + tasks.append(task) + for builder in jjb_job.get('builders', []): if 'shell' in builder: - script_fn = '%s.sh' % str(uuid.uuid4().hex) - script_fn = os.path.join(jobdir.script_root, script_fn) - with open(script_fn, 'w') as script: - script.write(builder['shell']) - tasks.append(dict(script='%s >> /tmp/console.log 2>&1' % - script_fn)) + tasks.extend(self._makeBuilderTask(jobdir, builder, + parameters, timeout)) play = dict(hosts='node', name='Job body', tasks=tasks) playbook.write(yaml.dump([play])) @@ -670,15 +693,30 @@ class NodeWorker(object): config.write('hostfile = %s\n' % jobdir.inventory) config.write('host_key_checking = False\n') - def runAnsiblePlaybook(self, jobdir): + callback_path = zuul.ansible.plugins.callback_plugins.__file__ + callback_path = os.path.abspath(callback_path) + callback_path = os.path.dirname(callback_path) + config.write('callback_plugins = %s\n' % callback_path) + + library_path = zuul.ansible.library.__file__ + library_path = os.path.abspath(library_path) + library_path = os.path.dirname(library_path) + config.write('library = %s\n' % library_path) + + return timeout + + def runAnsiblePlaybook(self, jobdir, timeout): self.ansible_proc = subprocess.Popen( - ['ansible-playbook', jobdir.playbook], + ['ansible-playbook', jobdir.playbook, + '-e', 'timeout=%s' % timeout, '-v'], cwd=jobdir.ansible_root, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid, ) (out, err) = self.ansible_proc.communicate() + self.log.debug("Ansible stdout:\n%s" % out) + self.log.debug("Ansible stderr:\n%s" % err) ret = self.ansible_proc.wait() self.ansible_proc = None return ret == 0