Merge "Limit command stdout/stderr to 1GiB"

This commit is contained in:
Zuul 2025-01-18 02:45:29 +00:00 committed by Gerrit Code Review
commit 1446202ad1
9 changed files with 273 additions and 117 deletions

View File

@ -762,6 +762,26 @@ The following sections of ``zuul.conf`` are used by the executor:
total real memory multiplied by 100. Buffers and cache are
considered available in the calculation.
.. attr:: output_max_bytes
:default: 1073741824
.. warning:: This option is deprecated. In the future, the
default value of 1GiB is likely to become fixed and
unable to be changed. Set this option only if
needed and only as long as needed to adjust
existing jobs to avoid the limit.
Zuul limits the total number of bytes output via stdout or
stderr from a single Ansible command to this value. If the
command outputs more than this number of bytes, the command
execution will fail. This is to protect the executor from being
required to read an excessively large amount of data from an
ansible task result.
If a job fails due to this limit, consider adjusting the command
task to redirect output to a file and collecting the file
separately.
.. attr:: hostname
:default: hostname of the server

View File

@ -10,6 +10,7 @@
- name: Run ansible that should succeed against testing console
command: >
/usr/lib/zuul/ansible/{{ zuul_ansible_version }}/bin/ansible-playbook
-vvv
-e "new_console=true"
src/opendev.org/zuul/zuul/playbooks/zuul-stream/fixtures/test-stream.yaml
environment:
@ -19,6 +20,7 @@
ZUUL_JOB_LOG_CONFIG: "{{ ansible_user_dir}}/logging.json"
ZUUL_JOBDIR: "{{ ansible_user_dir}}"
ZUUL_ANSIBLE_SPLIT_STREAMS: False
ZUUL_OUTPUT_MAX_BYTES: 1073741824
PYTHONPATH: "{{ python_path }}"
register: _success_output
@ -61,6 +63,7 @@
ZUUL_JOB_LOG_CONFIG: "{{ ansible_user_dir}}/logging.json"
ZUUL_JOBDIR: "{{ ansible_user_dir}}"
ZUUL_ANSIBLE_SPLIT_STREAMS: False
ZUUL_OUTPUT_MAX_BYTES: 1073741824
PYTHONPATH: "{{ python_path }}"
register: _success_output
@ -104,6 +107,7 @@
ZUUL_JOB_LOG_CONFIG: "{{ ansible_user_dir}}/logging.json"
ZUUL_JOBDIR: "{{ ansible_user_dir}}"
ZUUL_ANSIBLE_SPLIT_STREAMS: False
ZUUL_OUTPUT_MAX_BYTES: 1073741824
PYTHONPATH: "{{ python_path }}"
- name: Save output

View File

@ -0,0 +1,12 @@
---
upgrade:
- |
The maximum combined stdout and stderr output from a single
Ansible task has been limited to 1GiB. In the unlikely event that
an existing job legitimately exceeds this limit, a new executor
configuration option, :attr:`executor.output_max_bytes`, has been
provided to temporarily increase the limit. This option is likely
to be removed in a future version of Zuul. To avoid this issue,
Ansible tasks with large volumes of output should be adjusted to
redirect that output to a file which is separately collected and
processed.

View File

@ -281,6 +281,20 @@ def okay_tracebacks(*args):
return decorator
def zuul_config(section, key, value):
"""Set a zuul.conf value."""
def decorator(test):
config_dict = getattr(test, '__zuul_config__', None)
if config_dict is None:
config_dict = {}
test.__zuul_config__ = config_dict
section_dict = config_dict.setdefault(section, {})
section_dict[key] = value
return test
return decorator
def registerProjects(source_name, client, config):
path = config.get('scheduler', 'tenant_config')
with open(os.path.join(FIXTURE_DIR, path)) as f:
@ -2229,6 +2243,7 @@ class TestConfig:
self.enable_nodepool = getattr(test, '__enable_nodepool__', False)
self.return_data = getattr(test, '__return_data__', [])
self.driver_config = getattr(test, '__driver_config__', {})
self.zuul_config = getattr(test, '__zuul_config__', {})
self.driver = DriverTestConfig(self)
self.changes = FakeChangeDB()
@ -2633,7 +2648,9 @@ class ZuulTestCase(BaseTestCase):
os.path.join(git_path, reponame))
# Make test_root persist after ansible run for .flag test
config.set('executor', 'trusted_rw_paths', self.test_root)
for section, section_dict in self.test_config.zuul_config.items():
for k, v in section_dict.items():
config.set(section, k, v)
return config
def setupSimpleLayout(self, config):

View File

@ -22,7 +22,7 @@ import yaml
from unittest import skip
from datetime import datetime, timedelta
from tests.base import AnsibleZuulTestCase
from tests.base import AnsibleZuulTestCase, zuul_config
class FunctionalZuulStreamMixIn:
@ -390,6 +390,18 @@ class FunctionalZuulStreamMixIn:
self.assertLogLine(
r'fake \| skipping: Conditional result was False', text)
@zuul_config('executor', 'output_max_bytes', '2')
def test_command_output_max_bytes(self):
job = self._run_job('command')
with self.jobLog(job):
build = self.history[-1]
self.assertEqual(build.result, 'FAILURE')
console_output = self.console_output.getvalue()
self.assertNotIn('[WARNING]: Failure using method', console_output)
text = self._get_job_output(build)
self.assertIn('[Zuul] Log output exceeded max of 2', text)
def test_module_exception(self):
job = self._run_job('module_failure_exception')
with self.jobLog(job):

View File

@ -240,6 +240,7 @@ from ansible.module_utils._text import to_native, to_bytes, to_text
from ansible.module_utils.common.collections import is_iterable
# Imports needed for Zuul things
import io
import re
import subprocess
import traceback
@ -258,12 +259,9 @@ from ansible.module_utils.six.moves import shlex_quote
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?')
# Lists to save stdout/stderr log lines in as we collect them
_log_lines = []
_stderr_log_lines = []
class Console(object):
class Console:
def __init__(self, log_uuid):
# The streamer currently will not ask us for output from
# loops. This flag uuid was set in the action plugin if this
@ -277,13 +275,19 @@ class Console(object):
else:
self.logfile_name = LOG_STREAM_FILE.format(log_uuid=log_uuid)
def __enter__(self):
def open(self):
self.logfile = open(self.logfile_name, 'ab', buffering=0)
def __enter__(self):
self.open()
return self
def __exit__(self, etype, value, tb):
def close(self):
self.logfile.close()
def __exit__(self, etype, value, tb):
self.close()
def addLine(self, ln):
# Note this format with deliminator is "inspired" by the old
# Jenkins format but with microsecond resolution instead of
@ -299,47 +303,89 @@ class Console(object):
self.logfile.write(outln)
def _follow(fd, log_lines, console):
newline_warning = False
while True:
line = fd.readline()
if not line:
break
log_lines.append(line)
if not line[-1] != b'\n':
line += b'\n'
newline_warning = True
console.addLine(line)
if newline_warning:
console.addLine('[Zuul] No trailing newline\n')
class StreamFollower:
def __init__(self, cmd, log_uuid, output_max_bytes):
self.cmd = cmd
self.log_uuid = log_uuid
self.exception = None
self.output_max_bytes = output_max_bytes
# Lists to save stdout/stderr log lines in as we collect them
self.log_bytes = io.BytesIO()
self.stderr_log_bytes = io.BytesIO()
self.stdout_thread = None
self.stderr_thread = None
# Total size in bytes of all log and stderr_log lines
self.log_size = 0
def join(self):
if self.exception:
try:
self.console.close()
except Exception:
pass
raise self.exception
for t in (self.stdout_thread, self.stderr_thread):
if t is None:
continue
t.join(10)
if t.is_alive():
with Console(self.zuul_log_id) as console:
console.addLine("[Zuul] standard output/error still open "
"after child exited")
self.console.close()
def follow(stdout, stderr, log_uuid):
threads = []
with Console(log_uuid) as console:
if stdout:
t = threading.Thread(
target=_follow,
args=(stdout, _log_lines, console)
)
t.daemon = True
t.start()
threads.append(t)
if stderr:
t = threading.Thread(
target=_follow,
args=(stderr, _stderr_log_lines, console)
)
t.daemon = True
t.start()
threads.append(t)
for t in threads:
t.join()
def follow(self):
self.console = Console(self.log_uuid)
self.console.open()
if self.cmd.stdout:
self.stdout_thread = threading.Thread(
target=self.follow_main,
args=(self.cmd.stdout, self.log_bytes))
self.stdout_thread.daemon = True
self.stdout_thread.start()
if self.cmd.stderr:
self.stderr_thread = threading.Thread(
target=self.follow_main,
args=(self.cmd.stderr, self.stderr_log_bytes))
self.stderr_thread.daemon = True
self.stderr_thread.start()
def follow_main(self, fd, log_bytes):
try:
self.follow_inner(fd, log_bytes)
except Exception as e:
self.exception = e
def follow_inner(self, fd, log_bytes):
newline_warning = False
while True:
line = fd.readline()
if not line:
break
self.log_size += len(line)
if self.log_size > self.output_max_bytes:
msg = (
'[Zuul] Log output exceeded max of %s, '
'terminating\n' % (self.output_max_bytes,))
self.console.addLine(msg)
try:
pgid = os.getpgid(self.cmd.pid)
os.killpg(pgid, signal.SIGKILL)
except Exception:
pass
raise Exception(msg)
log_bytes.write(line)
if not line[-1] != b'\n':
line += b'\n'
newline_warning = True
self.console.addLine(line)
if newline_warning:
self.console.addLine('[Zuul] No trailing newline\n')
# Taken from ansible/module_utils/basic.py ... forking the method for now
# so that we can dive in and figure out how to make appropriate hook points
def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None,
def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, zuul_output_max_bytes, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None,
use_unsafe_shell=False, prompt_regex=None, environ_update=None, umask=None, encoding='utf-8', errors='surrogate_or_strict',
expand_user_and_vars=True, pass_fds=None, before_communicate_callback=None, ignore_invalid_cwd=True):
'''
@ -531,11 +577,10 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
before_communicate_callback(cmd)
if self.no_log:
t = None
follower = None
else:
t = threading.Thread(target=follow, args=(cmd.stdout, cmd.stderr, zuul_log_id))
t.daemon = True
t.start()
follower = StreamFollower(cmd, zuul_log_id, zuul_output_max_bytes)
follower.follow()
# ZUUL: Our log thread will catch the output so don't do that here.
@ -559,16 +604,12 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
# 10 seconds to catch up and exit. If it hasn't done so by
# then, it is very likely stuck in readline() because it
# spawed a child that is holding stdout or stderr open.
if t:
t.join(10)
with Console(zuul_log_id) as console:
if t.is_alive():
console.addLine("[Zuul] standard output/error still open "
"after child exited")
if follower:
follower.join()
# ZUUL: stdout and stderr are in the console log file
# ZUUL: return the saved log lines so we can ship them back
stdout = b('').join(_log_lines)
stderr = b('').join(_stderr_log_lines)
stdout = follower.log_bytes.getvalue()
stderr = follower.stderr_log_bytes.getvalue()
else:
stdout = b('')
stderr = b('')
@ -583,9 +624,6 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
fail_json_kwargs = dict(rc=257, stdout=b'', stderr=b'', msg=to_native(e), exception=traceback.format_exc(), cmd=self._clean_args(args))
finally:
with Console(zuul_log_id) as console:
if t and t.is_alive():
console.addLine("[Zuul] standard output/error still open "
"after child exited")
if fail_json_kwargs:
# we hit an exception and need to use the rc from
# fail_json_kwargs
@ -664,6 +702,7 @@ def main():
strip_empty_ends=dict(type='bool', default=True),
zuul_log_id=dict(type='str'),
zuul_ansible_split_streams=dict(type='bool'),
zuul_output_max_bytes=dict(type='int'),
),
supports_check_mode=True,
)
@ -680,6 +719,7 @@ def main():
strip = module.params['strip_empty_ends']
zuul_log_id = module.params['zuul_log_id']
zuul_ansible_split_streams = module.params["zuul_ansible_split_streams"]
zuul_output_max_bytes = module.params['zuul_output_max_bytes']
# we promissed these in 'always' ( _lines get autoaded on action plugin)
r = {'changed': False, 'stdout': '', 'stderr': '', 'rc': None, 'cmd': None, 'start': None, 'end': None, 'delta': None, 'msg': ''}
@ -750,7 +790,7 @@ def main():
# actually executes command (or not ...)
if not module.check_mode:
r['start'] = datetime.datetime.now()
r['rc'], r['stdout'], r['stderr'] = zuul_run_command(module, args, zuul_log_id, zuul_ansible_split_streams, executable=executable, use_unsafe_shell=shell, encoding=None,
r['rc'], r['stdout'], r['stderr'] = zuul_run_command(module, args, zuul_log_id, zuul_ansible_split_streams, zuul_output_max_bytes, executable=executable, use_unsafe_shell=shell, encoding=None,
data=stdin, binary_data=(not stdin_add_newline))
r['end'] = datetime.datetime.now()
else:

View File

@ -55,4 +55,6 @@ class ActionModule(command.ActionModule):
self._task._uuid, count, log_host)
self._task.args["zuul_ansible_split_streams"] = (
os.environ["ZUUL_ANSIBLE_SPLIT_STREAMS"] == "True")
self._task.args["zuul_output_max_bytes"] = int(
os.environ["ZUUL_OUTPUT_MAX_BYTES"])
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -253,6 +253,7 @@ from ansible.module_utils.common.text.converters import to_native, to_bytes, to_
from ansible.module_utils.common.collections import is_iterable
# Imports needed for Zuul things
import io
import re
import subprocess
import traceback
@ -271,12 +272,9 @@ from ansible.module_utils.six.moves import shlex_quote
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?')
# Lists to save stdout/stderr log lines in as we collect them
_log_lines = []
_stderr_log_lines = []
class Console(object):
class Console:
def __init__(self, log_uuid):
# The streamer currently will not ask us for output from
# loops. This flag uuid was set in the action plugin if this
@ -290,13 +288,19 @@ class Console(object):
else:
self.logfile_name = LOG_STREAM_FILE.format(log_uuid=log_uuid)
def __enter__(self):
def open(self):
self.logfile = open(self.logfile_name, 'ab', buffering=0)
def __enter__(self):
self.open()
return self
def __exit__(self, etype, value, tb):
def close(self):
self.logfile.close()
def __exit__(self, etype, value, tb):
self.close()
def addLine(self, ln):
# Note this format with deliminator is "inspired" by the old
# Jenkins format but with microsecond resolution instead of
@ -312,47 +316,89 @@ class Console(object):
self.logfile.write(outln)
def _follow(fd, log_lines, console):
newline_warning = False
while True:
line = fd.readline()
if not line:
break
log_lines.append(line)
if not line[-1] != b'\n':
line += b'\n'
newline_warning = True
console.addLine(line)
if newline_warning:
console.addLine('[Zuul] No trailing newline\n')
class StreamFollower:
def __init__(self, cmd, log_uuid, output_max_bytes):
self.cmd = cmd
self.log_uuid = log_uuid
self.exception = None
self.output_max_bytes = output_max_bytes
# Lists to save stdout/stderr log lines in as we collect them
self.log_bytes = io.BytesIO()
self.stderr_log_bytes = io.BytesIO()
self.stdout_thread = None
self.stderr_thread = None
# Total size in bytes of all log and stderr_log lines
self.log_size = 0
def join(self):
if self.exception:
try:
self.console.close()
except Exception:
pass
raise self.exception
for t in (self.stdout_thread, self.stderr_thread):
if t is None:
continue
t.join(10)
if t.is_alive():
with Console(self.zuul_log_id) as console:
console.addLine("[Zuul] standard output/error still open "
"after child exited")
self.console.close()
def follow(stdout, stderr, log_uuid):
threads = []
with Console(log_uuid) as console:
if stdout:
t = threading.Thread(
target=_follow,
args=(stdout, _log_lines, console)
)
t.daemon = True
t.start()
threads.append(t)
if stderr:
t = threading.Thread(
target=_follow,
args=(stderr, _stderr_log_lines, console)
)
t.daemon = True
t.start()
threads.append(t)
for t in threads:
t.join()
def follow(self):
self.console = Console(self.log_uuid)
self.console.open()
if self.cmd.stdout:
self.stdout_thread = threading.Thread(
target=self.follow_main,
args=(self.cmd.stdout, self.log_bytes))
self.stdout_thread.daemon = True
self.stdout_thread.start()
if self.cmd.stderr:
self.stderr_thread = threading.Thread(
target=self.follow_main,
args=(self.cmd.stderr, self.stderr_log_bytes))
self.stderr_thread.daemon = True
self.stderr_thread.start()
def follow_main(self, fd, log_bytes):
try:
self.follow_inner(fd, log_bytes)
except Exception as e:
self.exception = e
def follow_inner(self, fd, log_bytes):
newline_warning = False
while True:
line = fd.readline()
if not line:
break
self.log_size += len(line)
if self.log_size > self.output_max_bytes:
msg = (
'[Zuul] Log output exceeded max of %s, '
'terminating\n' % (self.output_max_bytes,))
self.console.addLine(msg)
try:
pgid = os.getpgid(self.cmd.pid)
os.killpg(pgid, signal.SIGKILL)
except Exception:
pass
raise Exception(msg)
log_bytes.write(line)
if not line[-1] != b'\n':
line += b'\n'
newline_warning = True
self.console.addLine(line)
if newline_warning:
self.console.addLine('[Zuul] No trailing newline\n')
# Taken from ansible/module_utils/basic.py ... forking the method for now
# so that we can dive in and figure out how to make appropriate hook points
def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None,
def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, zuul_output_max_bytes, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None,
use_unsafe_shell=False, prompt_regex=None, environ_update=None, umask=None, encoding='utf-8', errors='surrogate_or_strict',
expand_user_and_vars=True, pass_fds=None, before_communicate_callback=None, ignore_invalid_cwd=True):
'''
@ -544,11 +590,10 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
before_communicate_callback(cmd)
if self.no_log:
t = None
follower = None
else:
t = threading.Thread(target=follow, args=(cmd.stdout, cmd.stderr, zuul_log_id))
t.daemon = True
t.start()
follower = StreamFollower(cmd, zuul_log_id, zuul_output_max_bytes)
follower.follow()
# ZUUL: Our log thread will catch the output so don't do that here.
@ -572,16 +617,12 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
# 10 seconds to catch up and exit. If it hasn't done so by
# then, it is very likely stuck in readline() because it
# spawed a child that is holding stdout or stderr open.
if t:
t.join(10)
with Console(zuul_log_id) as console:
if t.is_alive():
console.addLine("[Zuul] standard output/error still open "
"after child exited")
if follower:
follower.join()
# ZUUL: stdout and stderr are in the console log file
# ZUUL: return the saved log lines so we can ship them back
stdout = b('').join(_log_lines)
stderr = b('').join(_stderr_log_lines)
stdout = follower.log_bytes.getvalue()
stderr = follower.stderr_log_bytes.getvalue()
else:
stdout = b('')
stderr = b('')
@ -596,9 +637,6 @@ def zuul_run_command(self, args, zuul_log_id, zuul_ansible_split_streams, check_
fail_json_kwargs = dict(rc=257, stdout=b'', stderr=b'', msg=to_native(e), exception=traceback.format_exc(), cmd=self._clean_args(args))
finally:
with Console(zuul_log_id) as console:
if t and t.is_alive():
console.addLine("[Zuul] standard output/error still open "
"after child exited")
if fail_json_kwargs:
# we hit an exception and need to use the rc from
# fail_json_kwargs
@ -641,6 +679,7 @@ def main():
strip_empty_ends=dict(type='bool', default=True),
zuul_log_id=dict(type='str'),
zuul_ansible_split_streams=dict(type='bool'),
zuul_output_max_bytes=dict(type='int'),
),
supports_check_mode=True,
)
@ -657,6 +696,7 @@ def main():
expand_argument_vars = module.params['expand_argument_vars']
zuul_log_id = module.params['zuul_log_id']
zuul_ansible_split_streams = module.params["zuul_ansible_split_streams"]
zuul_output_max_bytes = module.params['zuul_output_max_bytes']
# we promised these in 'always' ( _lines get auto-added on action plugin)
r = {'changed': False, 'stdout': '', 'stderr': '', 'rc': None, 'cmd': None, 'start': None, 'end': None, 'delta': None, 'msg': ''}
@ -724,7 +764,7 @@ def main():
# actually executes command (or not ...)
if not module.check_mode:
r['start'] = datetime.datetime.now()
r['rc'], r['stdout'], r['stderr'] = zuul_run_command(module, args, zuul_log_id, zuul_ansible_split_streams, executable=executable, use_unsafe_shell=shell, encoding=None,
r['rc'], r['stdout'], r['stderr'] = zuul_run_command(module, args, zuul_log_id, zuul_ansible_split_streams, zuul_output_max_bytes, executable=executable, use_unsafe_shell=shell, encoding=None,
data=stdin, binary_data=(not stdin_add_newline),
expand_user_and_vars=expand_argument_vars)
r['end'] = datetime.datetime.now()

View File

@ -93,7 +93,8 @@ from zuul.zk.semaphore import SemaphoreHandler
BUFFER_LINES_FOR_SYNTAX = 200
OUTPUT_MAX_LINE_BYTES = 51200 # 50 KiB
OUTPUT_MAX_LINE_BYTES = 51200 # 50 MiB
OUTPUT_MAX_BYTES = 1024 * 1024 * 1024 # 1GiB
DEFAULT_FINGER_PORT = 7900
DEFAULT_STREAM_PORT = 19885
BLACKLISTED_ANSIBLE_CONNECTION_TYPES = [
@ -1095,6 +1096,13 @@ class AnsibleJob(object):
self.executor_variables_file = self.executor_server.config.get(
'executor', 'variables')
if self.executor_server.config.has_option('executor',
'output_max_bytes'):
self.output_max_bytes = self.executor_server.config.get(
'executor', 'output_max_bytes')
else:
self.output_max_bytes = OUTPUT_MAX_BYTES
plugin_dir = self.executor_server.ansible_manager.getAnsiblePluginDir(
self.ansible_version)
self.library_dir = os.path.join(plugin_dir, 'library')
@ -3129,6 +3137,7 @@ class AnsibleJob(object):
for key, value in os.environ.copy().items()
if not key.startswith("ZUUL_")}
env_copy.update(self.ssh_agent.env)
env_copy['ZUUL_OUTPUT_MAX_BYTES'] = str(self.output_max_bytes)
env_copy['ZUUL_JOB_LOG_CONFIG'] = self.jobdir.logging_json
env_copy['ZUUL_JOB_FAILURE_OUTPUT'] = self.failure_output
env_copy['ZUUL_JOBDIR'] = self.jobdir.root