Limit command stdout/stderr to 1GiB
If an Ansible command task produces a very large amount of data, that will be sent back to the ansible-playbook process on the executor and deserialized from JSON. If it is sufficiently large, it may cause an OOM. While we have adjusted settings to encourage the oom- killer to kill ansible-playbook rather than zuul-executor, it's still not a great situation to invoke the oom-killer in the first place. To avoid this in what we presume is an obviously avoidable situation, we will limit the output sent back to the executor to 1GiB. This should be much larger than necessary, in fact, the limit may be too high, but it seem unlikely to be too low. Other methods were considered: limiting by 50% of the total ram on the executor (likely to produce a value even higher than 1GiB), or 50% of the available ram on the executor (may be too variable depending on executor load). In the end, 1GiB seems like a good starting point. Because this affects the structured data returned by ansible and that may be used by later tasks in the same playbook to check the returned values, if we hit this limit, we should consider the task a failure so that users do not inadvertently use invalid data (consider a task thatk checks for the presence of some token in stdout). To that end, if we hit the limit, we will kill the command process and raise an exception which will cause Ansible to fail the task (incidentally, it will not include the oversized stdout/ stderr). The cause of the error will be visible in the json and text output of the job. This is not a setting that users or operators should be adjusting, and normally we would not expose something like this through a configuration option. But because we will fail the tasks, we provide an escape valve for users who upgrade to this version and suddenly find they are relying on 1GiB+ stdout values. A deprecated configuration option is added to adjust the value used. We can remove it in a later major version of Zuul. While we're working on the command module, make it more memory-efficient for large values by using a BytesIO class instead of concatenating strings. This reduces by 1 the number of complete copies of the stdout/stderr values on the remote node (but does nothing for the ansible-playbook process on the executor). Finally, add a "-vvv" argument to the test invocation; this was useful in debugging this change and will likely be so for future changes. Change-Id: I3442b09946ecd0ad18817339b090e49f00d51e93
This commit is contained in:
parent
a6df9edc16
commit
93f102d546
@ -762,6 +762,26 @@ The following sections of ``zuul.conf`` are used by the executor:
|
||||
total real memory multiplied by 100. Buffers and cache are
|
||||
considered available in the calculation.
|
||||
|
||||
.. attr:: output_max_bytes
|
||||
:default: 1073741824
|
||||
|
||||
.. warning:: This option is deprecated. In the future, the
|
||||
default value of 1GiB is likely to become fixed and
|
||||
unable to be changed. Set this option only if
|
||||
needed and only as long as needed to adjust
|
||||
existing jobs to avoid the limit.
|
||||
|
||||
Zuul limits the total number of bytes output via stdout or
|
||||
stderr from a single Ansible command to this value. If the
|
||||
command outputs more than this number of bytes, the command
|
||||
execution will fail. This is to protect the executor from being
|
||||
required to read an excessively large amount of data from an
|
||||
ansible task result.
|
||||
|
||||
If a job fails due to this limit, consider adjusting the command
|
||||
task to redirect output to a file and collecting the file
|
||||
separately.
|
||||
|
||||
.. attr:: hostname
|
||||
:default: hostname of the server
|
||||
|
||||
|
@ -10,6 +10,7 @@
|
||||
- name: Run ansible that should succeed against testing console
|
||||
command: >
|
||||
/usr/lib/zuul/ansible/{{ zuul_ansible_version }}/bin/ansible-playbook
|
||||
-vvv
|
||||
-e "new_console=true"
|
||||
src/opendev.org/zuul/zuul/playbooks/zuul-stream/fixtures/test-stream.yaml
|
||||
environment:
|
||||
@ -19,6 +20,7 @@
|
||||
ZUUL_JOB_LOG_CONFIG: "{{ ansible_user_dir}}/logging.json"
|
||||
ZUUL_JOBDIR: "{{ ansible_user_dir}}"
|
||||
ZUUL_ANSIBLE_SPLIT_STREAMS: False
|
||||
ZUUL_OUTPUT_MAX_BYTES: 1073741824
|
||||
PYTHONPATH: "{{ python_path }}"
|
||||
register: _success_output
|
||||
|
||||
@ -61,6 +63,7 @@
|
||||
ZUUL_JOB_LOG_CONFIG: "{{ ansible_user_dir}}/logging.json"
|
||||
ZUUL_JOBDIR: "{{ ansible_user_dir}}"
|
||||
ZUUL_ANSIBLE_SPLIT_STREAMS: False
|
||||
ZUUL_OUTPUT_MAX_BYTES: 1073741824
|
||||
PYTHONPATH: "{{ python_path }}"
|
||||
register: _success_output
|
||||
|
||||
@ -104,6 +107,7 @@
|
||||
ZUUL_JOB_LOG_CONFIG: "{{ ansible_user_dir}}/logging.json"
|
||||
ZUUL_JOBDIR: "{{ ansible_user_dir}}"
|
||||
ZUUL_ANSIBLE_SPLIT_STREAMS: False
|
||||
ZUUL_OUTPUT_MAX_BYTES: 1073741824
|
||||
PYTHONPATH: "{{ python_path }}"
|
||||
|
||||
- name: Save output
|
||||
|
12
releasenotes/notes/output-max-bytes-2521b32286875cdb.yaml
Normal file
12
releasenotes/notes/output-max-bytes-2521b32286875cdb.yaml
Normal file
@ -0,0 +1,12 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
The maximum combined stdout and stderr output from a single
|
||||
Ansible task has been limited to 1GiB. In the unlikely event that
|
||||
an existing job legitimately exceeds this limit, a new executor
|
||||
configuration option, :attr:`executor.output_max_bytes`, has been
|
||||
provided to temporarily increase the limit. This option is likely
|
||||
to be removed in a future version of Zuul. To avoid this issue,
|
||||
Ansible tasks with large volumes of output should be adjusted to
|
||||
redirect that output to a file which is separately collected and
|
||||
processed.
|
@ -281,6 +281,20 @@ def okay_tracebacks(*args):
|
||||
return decorator
|
||||
|
||||
|
||||
def zuul_config(section, key, value):
|
||||
"""Set a zuul.conf value."""
|
||||
|
||||
def decorator(test):
|
||||
config_dict = getattr(test, '__zuul_config__', None)
|
||||
if config_dict is None:
|
||||
config_dict = {}
|
||||
test.__zuul_config__ = config_dict
|
||||
section_dict = config_dict.setdefault(section, {})
|
||||
section_dict[key] = value
|
||||
return test
|
||||
return decorator
|
||||
|
||||
|
||||
def registerProjects(source_name, client, config):
|
||||
path = config.get('scheduler', 'tenant_config')
|
||||
with open(os.path.join(FIXTURE_DIR, path)) as f:
|
||||
@ -2228,6 +2242,7 @@ class TestConfig:
|
||||
self.enable_nodepool = getattr(test, '__enable_nodepool__', False)
|
||||
self.return_data = getattr(test, '__return_data__', [])
|
||||
self.driver_config = getattr(test, '__driver_config__', {})
|
||||
self.zuul_config = getattr(test, '__zuul_config__', {})
|
||||
self.driver = DriverTestConfig(self)
|
||||
self.changes = FakeChangeDB()
|
||||
|
||||
@ -2632,7 +2647,9 @@ class ZuulTestCase(BaseTestCase):
|
||||
os.path.join(git_path, reponame))
|
||||
# Make test_root persist after ansible run for .flag test
|
||||
config.set('executor', 'trusted_rw_paths', self.test_root)
|
||||
|
||||
for section, section_dict in self.test_config.zuul_config.items():
|
||||
for k, v in section_dict.items():
|
||||
config.set(section, k, v)
|
||||
return config
|
||||
|
||||
def setupSimpleLayout(self, config):
|
||||
|
@ -22,7 +22,7 @@ import yaml
|
||||
from unittest import skip
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from tests.base import AnsibleZuulTestCase
|
||||
from tests.base import AnsibleZuulTestCase, zuul_config
|
||||
|
||||
|
||||
class FunctionalZuulStreamMixIn:
|
||||
@ -386,6 +386,18 @@ class FunctionalZuulStreamMixIn:
|
||||
self.assertLogLineStartsWith(
|
||||
r"""compute1 \| ok: \{'string': '\d.""", text)
|
||||
|
||||
@zuul_config('executor', 'output_max_bytes', '2')
|
||||
def test_command_output_max_bytes(self):
|
||||
job = self._run_job('command')
|
||||
with self.jobLog(job):
|
||||
build = self.history[-1]
|
||||
self.assertEqual(build.result, 'FAILURE')
|
||||
|
||||
console_output = self.console_output.getvalue()
|
||||
self.assertNotIn('[WARNING]: Failure using method', console_output)
|
||||
text = self._get_job_output(build)
|
||||
self.assertIn('[Zuul] Log output exceeded max of 2', text)
|
||||
|
||||
def test_module_exception(self):
|
||||
job = self._run_job('module_failure_exception')
|
||||
with self.jobLog(job):
|
||||
|
@ -240,6 +240,7 @@ from ansible.module_utils._text import to_native, to_bytes, to_text
|
||||
from ansible.module_utils.common.collections import is_iterable
|
||||
|
||||
# Imports needed for Zuul things
|
||||
import io
|
||||
import re
|
||||
import subprocess
|
||||
import traceback
|
||||
@ -258,12 +259,9 @@ from ansible.module_utils.six.moves import shlex_quote
|
||||
|
||||
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
|
||||
PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?')
|
||||
# Lists to save stdout/stderr log lines in as we collect them
|
||||
_log_lines = []
|
||||
_stderr_log_lines = []
|
||||
|
||||
|
||||
class Console(object):
|
||||
class Console:
|
||||
def __init__(self, log_uuid):
|
||||
# The streamer currently will not ask us for output from
|
||||
# loops. This flag uuid was set in the action plugin if this
|
||||
@ -277,13 +275,19 @@ class Console(object):
|
||||
else:
|
||||
self.logfile_name = LOG_STREAM_FILE.format(log_uuid=log_uuid)
|
||||
|
||||
def __enter__(self):
|
||||
def open(self):
|
||||
self.logfile = open(self.logfile_name, 'ab', buffering=0)
|
||||
|
||||
def __enter__(self):
|
||||
self.open()
|
||||
return self
|
||||
|
||||
def __exit__(self, etype, value, tb):
|
||||
def close(self):
|
||||
self.logfile.close()
|
||||
|
||||
def __exit__(self, etype, value, tb):
|
||||
self.close()
|
||||
|
||||
def addLine(self, ln):
|
||||
# Note this format with deliminator is "inspired" by the old
|
||||
# Jenkins format but with microsecond resolution instead of
|
||||
@ -299,47 +303,89 @@ class Console(object):
|
||||
self.logfile.write(outln)
|
||||
|
||||
|
||||
def _follow(fd, log_lines, console):
|
||||
newline_warning = False
|
||||
while True:
|
||||
line = fd.readline()
|
||||
if not line:
|
||||
break
|
||||
log_lines.append(line)
|
||||
if not line[-1] != b'\n':
|
||||
line += b'\n'
|
||||
newline_warning = True
|
||||
console.addLine(line)
|
||||
if newline_warning:
|
||||
console.addLine('[Zuul] No trailing newline\n')
|
||||
class StreamFollower:
|
||||
def __init__(self, cmd, log_uuid, output_max_bytes):
|
||||
self.cmd = cmd
|
||||
self.log_uuid = log_uuid
|
||||
self.exception = None
|
||||
self.output_max_bytes = output_max_bytes
|
||||
# Lists to save stdout/stderr log lines in as we collect them
|
||||
self.log_bytes = io.BytesIO()
|
||||
self.stderr_log_bytes = io.BytesIO()
|
||||
self.stdout_thread = None
|
||||
self.stderr_thread = None
|
||||
# Total size in bytes of all log and stderr_log lines
|
||||
self.log_size = 0
|
||||
|
||||
def join(self):
|
||||
if self.exception:
|
||||
try:
|
||||
self.console.close()
|
||||
except Exception:
|
||||
pass
|
||||
raise self.exception
|
||||
for t in (self.stdout_thread, self.stderr_thread):
|
||||
if t is None:
|
||||
continue
|
||||
t.join(10)
|
||||
if t.is_alive():
|
||||
with Console(self.zuul_log_id) as console:
|
||||
console.addLine("[Zuul] standard output/error still open "
|
||||
"after child exited")
|
||||
self.console.close()
|
||||
|
||||
def follow(stdout, stderr, log_uuid):
|
||||
threads = []
|
||||
with Console(log_uuid) as console:
|
||||
if stdout:
|
||||
t = threading.Thread(
|
||||
target=_follow,
|
||||
args=(stdout, _log_lines, console)
|
||||
)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
threads.append(t)
|
||||
if stderr:
|
||||
t = threading.Thread(
|
||||
target=_follow,
|
||||
args=(stderr, _stderr_log_lines, console)
|
||||
)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
threads.append(t)
|
||||
for t in threads:
|
||||
t.join()
|
||||
def follow(self):
|
||||
self.console = Console(self.log_uuid)
|
||||
self.console.open()
|
||||
if self.cmd.stdout:
|
||||
self.stdout_thread = threading.Thread(
|
||||
target=self.follow_main,
|
||||
args=(self.cmd.stdout, self.log_bytes))
|
||||
self.stdout_thread.daemon = True
|
||||
self.stdout_thread.start()
|
||||
if self.cmd.stderr:
|
||||
self.stderr_thread = threading.Thread(
|
||||
target=self.follow_main,
|
||||
args=(self.cmd.stderr, self.stderr_log_bytes))
|
||||
self.stderr_thread.daemon = True
|
||||
self.stderr_thread.start()
|
||||
|
||||
def follow_main(self, fd, log_bytes):
|
||||
try:
|
||||
self.follow_inner(fd, log_bytes)
|
||||
except Exception as e:
|
||||
self.exception = e
|
||||
|
||||
def follow_inner(self, fd, log_bytes):
|
||||
newline_warning = False
|
||||
while True:
|
||||
line = fd.readline()
|
||||
if not line:
|
||||
break
|
||||
self.log_size += len(line)
|
||||
if self.log_size > self.output_max_bytes:
|
||||
msg = (
|
||||
'[Zuul] Log output exceeded max of %s, '
|
||||
'terminating\n' % (self.output_max_bytes,))
|
||||
self.console.addLine(msg)
|
||||
try:
|
||||
pgid = os.getpgid(self.cmd.pid)
|
||||
os.killpg(pgid, signal.SIGKILL)
|
||||
except Exception:
|
||||
pass
|
||||
raise Exception(msg)
|
||||
log_bytes.write(line)
|
||||
if not line[-1] != b'\n':
|
||||
line += b'\n'
|
||||
newline_warning = True
|
||||
self.console.addLine(line)
|
||||
if newline_warning:
|
||||
self.console.addLine('[Zuul] No trailing newline\n')
|
||||
|
||||
|
||||
# Taken from ansible/module_utils/basic.py ... forking the method for now
|
||||
# so that we can dive in and figure out how to make appropriate hook points
|
||||
def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None,
|
||||
def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, zuul_output_max_bytes, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None,
|
||||
use_unsafe_shell=False, prompt_regex=None, environ_update=None, umask=None, encoding='utf-8', errors='surrogate_or_strict',
|
||||
expand_user_and_vars=True, pass_fds=None, before_communicate_callback=None, ignore_invalid_cwd=True):
|
||||
'''
|
||||
@ -531,11 +577,10 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
|
||||
before_communicate_callback(cmd)
|
||||
|
||||
if self.no_log:
|
||||
t = None
|
||||
follower = None
|
||||
else:
|
||||
t = threading.Thread(target=follow, args=(cmd.stdout, cmd.stderr, zuul_log_id))
|
||||
t.daemon = True
|
||||
t.start()
|
||||
follower = StreamFollower(cmd, zuul_log_id, zuul_output_max_bytes)
|
||||
follower.follow()
|
||||
|
||||
# ZUUL: Our log thread will catch the output so don't do that here.
|
||||
|
||||
@ -559,16 +604,12 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
|
||||
# 10 seconds to catch up and exit. If it hasn't done so by
|
||||
# then, it is very likely stuck in readline() because it
|
||||
# spawed a child that is holding stdout or stderr open.
|
||||
if t:
|
||||
t.join(10)
|
||||
with Console(zuul_log_id) as console:
|
||||
if t.is_alive():
|
||||
console.addLine("[Zuul] standard output/error still open "
|
||||
"after child exited")
|
||||
if follower:
|
||||
follower.join()
|
||||
# ZUUL: stdout and stderr are in the console log file
|
||||
# ZUUL: return the saved log lines so we can ship them back
|
||||
stdout = b('').join(_log_lines)
|
||||
stderr = b('').join(_stderr_log_lines)
|
||||
stdout = follower.log_bytes.getvalue()
|
||||
stderr = follower.stderr_log_bytes.getvalue()
|
||||
else:
|
||||
stdout = b('')
|
||||
stderr = b('')
|
||||
@ -583,9 +624,6 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
|
||||
fail_json_kwargs = dict(rc=257, stdout=b'', stderr=b'', msg=to_native(e), exception=traceback.format_exc(), cmd=self._clean_args(args))
|
||||
finally:
|
||||
with Console(zuul_log_id) as console:
|
||||
if t and t.is_alive():
|
||||
console.addLine("[Zuul] standard output/error still open "
|
||||
"after child exited")
|
||||
if fail_json_kwargs:
|
||||
# we hit an exception and need to use the rc from
|
||||
# fail_json_kwargs
|
||||
@ -664,6 +702,7 @@ def main():
|
||||
strip_empty_ends=dict(type='bool', default=True),
|
||||
zuul_log_id=dict(type='str'),
|
||||
zuul_ansible_split_streams=dict(type='bool'),
|
||||
zuul_output_max_bytes=dict(type='int'),
|
||||
),
|
||||
supports_check_mode=True,
|
||||
)
|
||||
@ -680,6 +719,7 @@ def main():
|
||||
strip = module.params['strip_empty_ends']
|
||||
zuul_log_id = module.params['zuul_log_id']
|
||||
zuul_ansible_split_streams = module.params["zuul_ansible_split_streams"]
|
||||
zuul_output_max_bytes = module.params['zuul_output_max_bytes']
|
||||
|
||||
# we promissed these in 'always' ( _lines get autoaded on action plugin)
|
||||
r = {'changed': False, 'stdout': '', 'stderr': '', 'rc': None, 'cmd': None, 'start': None, 'end': None, 'delta': None, 'msg': ''}
|
||||
@ -750,7 +790,7 @@ def main():
|
||||
# actually executes command (or not ...)
|
||||
if not module.check_mode:
|
||||
r['start'] = datetime.datetime.now()
|
||||
r['rc'], r['stdout'], r['stderr'] = zuul_run_command(module, args, zuul_log_id, zuul_ansible_split_streams, executable=executable, use_unsafe_shell=shell, encoding=None,
|
||||
r['rc'], r['stdout'], r['stderr'] = zuul_run_command(module, args, zuul_log_id, zuul_ansible_split_streams, zuul_output_max_bytes, executable=executable, use_unsafe_shell=shell, encoding=None,
|
||||
data=stdin, binary_data=(not stdin_add_newline))
|
||||
r['end'] = datetime.datetime.now()
|
||||
else:
|
||||
|
@ -55,4 +55,6 @@ class ActionModule(command.ActionModule):
|
||||
self._task._uuid, count, log_host)
|
||||
self._task.args["zuul_ansible_split_streams"] = (
|
||||
os.environ["ZUUL_ANSIBLE_SPLIT_STREAMS"] == "True")
|
||||
self._task.args["zuul_output_max_bytes"] = int(
|
||||
os.environ["ZUUL_OUTPUT_MAX_BYTES"])
|
||||
return super(ActionModule, self).run(tmp, task_vars)
|
||||
|
@ -253,6 +253,7 @@ from ansible.module_utils.common.text.converters import to_native, to_bytes, to_
|
||||
from ansible.module_utils.common.collections import is_iterable
|
||||
|
||||
# Imports needed for Zuul things
|
||||
import io
|
||||
import re
|
||||
import subprocess
|
||||
import traceback
|
||||
@ -271,12 +272,9 @@ from ansible.module_utils.six.moves import shlex_quote
|
||||
|
||||
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
|
||||
PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?')
|
||||
# Lists to save stdout/stderr log lines in as we collect them
|
||||
_log_lines = []
|
||||
_stderr_log_lines = []
|
||||
|
||||
|
||||
class Console(object):
|
||||
class Console:
|
||||
def __init__(self, log_uuid):
|
||||
# The streamer currently will not ask us for output from
|
||||
# loops. This flag uuid was set in the action plugin if this
|
||||
@ -290,13 +288,19 @@ class Console(object):
|
||||
else:
|
||||
self.logfile_name = LOG_STREAM_FILE.format(log_uuid=log_uuid)
|
||||
|
||||
def __enter__(self):
|
||||
def open(self):
|
||||
self.logfile = open(self.logfile_name, 'ab', buffering=0)
|
||||
|
||||
def __enter__(self):
|
||||
self.open()
|
||||
return self
|
||||
|
||||
def __exit__(self, etype, value, tb):
|
||||
def close(self):
|
||||
self.logfile.close()
|
||||
|
||||
def __exit__(self, etype, value, tb):
|
||||
self.close()
|
||||
|
||||
def addLine(self, ln):
|
||||
# Note this format with deliminator is "inspired" by the old
|
||||
# Jenkins format but with microsecond resolution instead of
|
||||
@ -312,47 +316,89 @@ class Console(object):
|
||||
self.logfile.write(outln)
|
||||
|
||||
|
||||
def _follow(fd, log_lines, console):
|
||||
newline_warning = False
|
||||
while True:
|
||||
line = fd.readline()
|
||||
if not line:
|
||||
break
|
||||
log_lines.append(line)
|
||||
if not line[-1] != b'\n':
|
||||
line += b'\n'
|
||||
newline_warning = True
|
||||
console.addLine(line)
|
||||
if newline_warning:
|
||||
console.addLine('[Zuul] No trailing newline\n')
|
||||
class StreamFollower:
|
||||
def __init__(self, cmd, log_uuid, output_max_bytes):
|
||||
self.cmd = cmd
|
||||
self.log_uuid = log_uuid
|
||||
self.exception = None
|
||||
self.output_max_bytes = output_max_bytes
|
||||
# Lists to save stdout/stderr log lines in as we collect them
|
||||
self.log_bytes = io.BytesIO()
|
||||
self.stderr_log_bytes = io.BytesIO()
|
||||
self.stdout_thread = None
|
||||
self.stderr_thread = None
|
||||
# Total size in bytes of all log and stderr_log lines
|
||||
self.log_size = 0
|
||||
|
||||
def join(self):
|
||||
if self.exception:
|
||||
try:
|
||||
self.console.close()
|
||||
except Exception:
|
||||
pass
|
||||
raise self.exception
|
||||
for t in (self.stdout_thread, self.stderr_thread):
|
||||
if t is None:
|
||||
continue
|
||||
t.join(10)
|
||||
if t.is_alive():
|
||||
with Console(self.zuul_log_id) as console:
|
||||
console.addLine("[Zuul] standard output/error still open "
|
||||
"after child exited")
|
||||
self.console.close()
|
||||
|
||||
def follow(stdout, stderr, log_uuid):
|
||||
threads = []
|
||||
with Console(log_uuid) as console:
|
||||
if stdout:
|
||||
t = threading.Thread(
|
||||
target=_follow,
|
||||
args=(stdout, _log_lines, console)
|
||||
)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
threads.append(t)
|
||||
if stderr:
|
||||
t = threading.Thread(
|
||||
target=_follow,
|
||||
args=(stderr, _stderr_log_lines, console)
|
||||
)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
threads.append(t)
|
||||
for t in threads:
|
||||
t.join()
|
||||
def follow(self):
|
||||
self.console = Console(self.log_uuid)
|
||||
self.console.open()
|
||||
if self.cmd.stdout:
|
||||
self.stdout_thread = threading.Thread(
|
||||
target=self.follow_main,
|
||||
args=(self.cmd.stdout, self.log_bytes))
|
||||
self.stdout_thread.daemon = True
|
||||
self.stdout_thread.start()
|
||||
if self.cmd.stderr:
|
||||
self.stderr_thread = threading.Thread(
|
||||
target=self.follow_main,
|
||||
args=(self.cmd.stderr, self.stderr_log_bytes))
|
||||
self.stderr_thread.daemon = True
|
||||
self.stderr_thread.start()
|
||||
|
||||
def follow_main(self, fd, log_bytes):
|
||||
try:
|
||||
self.follow_inner(fd, log_bytes)
|
||||
except Exception as e:
|
||||
self.exception = e
|
||||
|
||||
def follow_inner(self, fd, log_bytes):
|
||||
newline_warning = False
|
||||
while True:
|
||||
line = fd.readline()
|
||||
if not line:
|
||||
break
|
||||
self.log_size += len(line)
|
||||
if self.log_size > self.output_max_bytes:
|
||||
msg = (
|
||||
'[Zuul] Log output exceeded max of %s, '
|
||||
'terminating\n' % (self.output_max_bytes,))
|
||||
self.console.addLine(msg)
|
||||
try:
|
||||
pgid = os.getpgid(self.cmd.pid)
|
||||
os.killpg(pgid, signal.SIGKILL)
|
||||
except Exception:
|
||||
pass
|
||||
raise Exception(msg)
|
||||
log_bytes.write(line)
|
||||
if not line[-1] != b'\n':
|
||||
line += b'\n'
|
||||
newline_warning = True
|
||||
self.console.addLine(line)
|
||||
if newline_warning:
|
||||
self.console.addLine('[Zuul] No trailing newline\n')
|
||||
|
||||
|
||||
# Taken from ansible/module_utils/basic.py ... forking the method for now
|
||||
# so that we can dive in and figure out how to make appropriate hook points
|
||||
def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None,
|
||||
def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, zuul_output_max_bytes, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None,
|
||||
use_unsafe_shell=False, prompt_regex=None, environ_update=None, umask=None, encoding='utf-8', errors='surrogate_or_strict',
|
||||
expand_user_and_vars=True, pass_fds=None, before_communicate_callback=None, ignore_invalid_cwd=True):
|
||||
'''
|
||||
@ -544,11 +590,10 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
|
||||
before_communicate_callback(cmd)
|
||||
|
||||
if self.no_log:
|
||||
t = None
|
||||
follower = None
|
||||
else:
|
||||
t = threading.Thread(target=follow, args=(cmd.stdout, cmd.stderr, zuul_log_id))
|
||||
t.daemon = True
|
||||
t.start()
|
||||
follower = StreamFollower(cmd, zuul_log_id, zuul_output_max_bytes)
|
||||
follower.follow()
|
||||
|
||||
# ZUUL: Our log thread will catch the output so don't do that here.
|
||||
|
||||
@ -572,16 +617,12 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
|
||||
# 10 seconds to catch up and exit. If it hasn't done so by
|
||||
# then, it is very likely stuck in readline() because it
|
||||
# spawed a child that is holding stdout or stderr open.
|
||||
if t:
|
||||
t.join(10)
|
||||
with Console(zuul_log_id) as console:
|
||||
if t.is_alive():
|
||||
console.addLine("[Zuul] standard output/error still open "
|
||||
"after child exited")
|
||||
if follower:
|
||||
follower.join()
|
||||
# ZUUL: stdout and stderr are in the console log file
|
||||
# ZUUL: return the saved log lines so we can ship them back
|
||||
stdout = b('').join(_log_lines)
|
||||
stderr = b('').join(_stderr_log_lines)
|
||||
stdout = follower.log_bytes.getvalue()
|
||||
stderr = follower.stderr_log_bytes.getvalue()
|
||||
else:
|
||||
stdout = b('')
|
||||
stderr = b('')
|
||||
@ -596,9 +637,6 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
|
||||
fail_json_kwargs = dict(rc=257, stdout=b'', stderr=b'', msg=to_native(e), exception=traceback.format_exc(), cmd=self._clean_args(args))
|
||||
finally:
|
||||
with Console(zuul_log_id) as console:
|
||||
if t and t.is_alive():
|
||||
console.addLine("[Zuul] standard output/error still open "
|
||||
"after child exited")
|
||||
if fail_json_kwargs:
|
||||
# we hit an exception and need to use the rc from
|
||||
# fail_json_kwargs
|
||||
@ -641,6 +679,7 @@ def main():
|
||||
strip_empty_ends=dict(type='bool', default=True),
|
||||
zuul_log_id=dict(type='str'),
|
||||
zuul_ansible_split_streams=dict(type='bool'),
|
||||
zuul_output_max_bytes=dict(type='int'),
|
||||
),
|
||||
supports_check_mode=True,
|
||||
)
|
||||
@ -657,6 +696,7 @@ def main():
|
||||
expand_argument_vars = module.params['expand_argument_vars']
|
||||
zuul_log_id = module.params['zuul_log_id']
|
||||
zuul_ansible_split_streams = module.params["zuul_ansible_split_streams"]
|
||||
zuul_output_max_bytes = module.params['zuul_output_max_bytes']
|
||||
|
||||
# we promised these in 'always' ( _lines get auto-added on action plugin)
|
||||
r = {'changed': False, 'stdout': '', 'stderr': '', 'rc': None, 'cmd': None, 'start': None, 'end': None, 'delta': None, 'msg': ''}
|
||||
@ -724,7 +764,7 @@ def main():
|
||||
# actually executes command (or not ...)
|
||||
if not module.check_mode:
|
||||
r['start'] = datetime.datetime.now()
|
||||
r['rc'], r['stdout'], r['stderr'] = zuul_run_command(module, args, zuul_log_id, zuul_ansible_split_streams, executable=executable, use_unsafe_shell=shell, encoding=None,
|
||||
r['rc'], r['stdout'], r['stderr'] = zuul_run_command(module, args, zuul_log_id, zuul_ansible_split_streams, zuul_output_max_bytes, executable=executable, use_unsafe_shell=shell, encoding=None,
|
||||
data=stdin, binary_data=(not stdin_add_newline),
|
||||
expand_user_and_vars=expand_argument_vars)
|
||||
r['end'] = datetime.datetime.now()
|
||||
|
@ -93,7 +93,8 @@ from zuul.zk.semaphore import SemaphoreHandler
|
||||
|
||||
|
||||
BUFFER_LINES_FOR_SYNTAX = 200
|
||||
OUTPUT_MAX_LINE_BYTES = 51200 # 50 KiB
|
||||
OUTPUT_MAX_LINE_BYTES = 51200 # 50 MiB
|
||||
OUTPUT_MAX_BYTES = 1024 * 1024 * 1024 # 1GiB
|
||||
DEFAULT_FINGER_PORT = 7900
|
||||
DEFAULT_STREAM_PORT = 19885
|
||||
BLACKLISTED_ANSIBLE_CONNECTION_TYPES = [
|
||||
@ -1095,6 +1096,13 @@ class AnsibleJob(object):
|
||||
self.executor_variables_file = self.executor_server.config.get(
|
||||
'executor', 'variables')
|
||||
|
||||
if self.executor_server.config.has_option('executor',
|
||||
'output_max_bytes'):
|
||||
self.output_max_bytes = self.executor_server.config.get(
|
||||
'executor', 'output_max_bytes')
|
||||
else:
|
||||
self.output_max_bytes = OUTPUT_MAX_BYTES
|
||||
|
||||
plugin_dir = self.executor_server.ansible_manager.getAnsiblePluginDir(
|
||||
self.ansible_version)
|
||||
self.library_dir = os.path.join(plugin_dir, 'library')
|
||||
@ -3040,6 +3048,7 @@ class AnsibleJob(object):
|
||||
for key, value in os.environ.copy().items()
|
||||
if not key.startswith("ZUUL_")}
|
||||
env_copy.update(self.ssh_agent.env)
|
||||
env_copy['ZUUL_OUTPUT_MAX_BYTES'] = str(self.output_max_bytes)
|
||||
env_copy['ZUUL_JOB_LOG_CONFIG'] = self.jobdir.logging_json
|
||||
env_copy['ZUUL_JOB_FAILURE_OUTPUT'] = self.failure_output
|
||||
env_copy['ZUUL_JOBDIR'] = self.jobdir.root
|
||||
|
Loading…
x
Reference in New Issue
Block a user