Stop running commands with async

The async module is complex, and we're only using it to handle the
running cumulative timeout. However, we still fallback on the watchdog
timeout from time to time. Make things simpler by just having that be
how we time things out.

Change-Id: Ie51de4a135d953c4ad9dcb773d27b3c54ca8829b
This commit is contained in:
Monty Taylor
2016-09-14 11:39:12 -05:00
parent a192814194
commit f166784a28
4 changed files with 3 additions and 78 deletions

View File

@@ -1,52 +0,0 @@
# Copyright 2016 IBM Corp.
#
# This file is part of Zuul
#
# This file is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This file is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this file. If not, see <http://www.gnu.org/licenses/>.
import time
from ansible.executor.task_result import TaskResult
from ansible.plugins.callback import CallbackBase
class CallbackModule(CallbackBase):
def __init__(self, *args, **kw):
super(CallbackModule, self).__init__(*args, **kw)
self._elapsed_time = 0.0
self._task_start_time = None
self._play = None
def v2_playbook_on_play_start(self, play):
self._play = play
def playbook_on_task_start(self, name, is_conditional):
self._task_start_time = time.time()
def v2_on_any(self, *args, **kw):
result = None
if args and isinstance(args[0], TaskResult):
result = args[0]
if not result:
return
if self._task_start_time is not None:
task_time = time.time() - self._task_start_time
self._elapsed_time += task_time
if self._play and result._host:
manager = self._play.get_variable_manager()
facts = dict(elapsed_time=int(self._elapsed_time))
manager.set_nonpersistent_facts(result._host, facts)
self._task_start_time = None

View File

@@ -34,7 +34,6 @@ import jenkins_jobs.formatter
import zmq
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
ANSIBLE_WATCHDOG_GRACE = 5 * 60
@@ -213,19 +212,10 @@ class LaunchServer(object):
path = os.path.join(state_dir, 'launcher.socket')
self.command_socket = commandsocket.CommandSocket(path)
ansible_dir = os.path.join(state_dir, 'ansible')
plugins_dir = os.path.join(ansible_dir, 'plugins')
self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
if not os.path.exists(self.callback_dir):
os.makedirs(self.callback_dir)
self.library_dir = os.path.join(ansible_dir, 'library')
if not os.path.exists(self.library_dir):
os.makedirs(self.library_dir)
callback_path = os.path.dirname(os.path.abspath(
zuul.ansible.plugins.callback_plugins.__file__))
for fn in os.listdir(callback_path):
shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
library_path = os.path.dirname(os.path.abspath(
zuul.ansible.library.__file__))
for fn in os.listdir(library_path):
@@ -513,8 +503,7 @@ class LaunchServer(object):
args['description'], args['labels'],
self.hostname, self.zmq_send_queue,
self.termination_queue, self.keep_jobdir,
self.callback_dir, self.library_dir,
self.options)
self.library_dir, self.options)
self.node_workers[worker.name] = worker
worker.thread = threading.Thread(target=worker.run)
@@ -594,8 +583,7 @@ class NodeWorker(object):
def __init__(self, config, jobs, builds, sites, name, host,
description, labels, manager_name, zmq_send_queue,
termination_queue, keep_jobdir, callback_dir,
library_dir, options):
termination_queue, keep_jobdir, library_dir, options):
self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
self.log.debug("Creating node worker %s" % (name,))
self.config = config
@@ -641,7 +629,6 @@ class NodeWorker(object):
self.username = config.get('launcher', 'username')
else:
self.username = 'zuul'
self.callback_dir = callback_dir
self.library_dir = library_dir
self.options = options
@@ -1313,11 +1300,7 @@ class NodeWorker(object):
(executable, shell) = deal_with_shebang(builder['shell'])
task = dict(shell=shell)
task['name'] = ('command with {{ timeout | int - elapsed_time }} '
'second timeout')
task['when'] = '{{ elapsed_time < timeout | int }}'
task['async'] = '{{ timeout | int - elapsed_time }}'
task['poll'] = 5
task['name'] = 'command generated from JJB'
task['environment'] = parameters
task['args'] = dict(chdir=parameters['WORKSPACE'])
if executable:
@@ -1370,19 +1353,15 @@ class NodeWorker(object):
inventory.write('\n')
timeout = None
timeout_var = None
for wrapper in jjb_job.get('wrappers', []):
if isinstance(wrapper, dict):
build_timeout = wrapper.get('timeout')
if isinstance(build_timeout, dict):
timeout_var = build_timeout.get('timeout-var')
timeout = build_timeout.get('timeout')
if timeout is not None:
timeout = int(timeout) * 60
if not timeout:
timeout = ANSIBLE_DEFAULT_TIMEOUT
if timeout_var:
parameters[timeout_var] = str(timeout * 1000)
with open(jobdir.playbook, 'w') as playbook:
pre_tasks = []
@@ -1428,7 +1407,6 @@ class NodeWorker(object):
error_block.append(task)
error_block.append(dict(fail=dict(msg='FAILURE')))
variables.append(dict(timeout=timeout))
play = dict(hosts='node', name='Job body', vars=variables,
pre_tasks=pre_tasks, tasks=tasks)
playbook.write(yaml.safe_dump([play], default_flow_style=False))
@@ -1473,7 +1451,6 @@ class NodeWorker(object):
config.write('retry_files_enabled = False\n')
config.write('log_path = %s\n' % jobdir.ansible_log)
config.write('gathering = explicit\n')
config.write('callback_plugins = %s\n' % self.callback_dir)
config.write('library = %s\n' % self.library_dir)
# TODO(mordred) This can be removed once we're using ansible 2.2
config.write('module_set_locale = False\n')