Mask passwords only when command execution fails

At many places, processutils.ssh_execute() is being invoked to run
a command over ssh and output returned is parsed to get appropriate
information. In this flow, unsanitized output is being expected
where processutils.ssh_execute() was invoked but found that
output like volume details(containing "password" string in its name)
is being masked away with strutils.mask_password(stdout) even though
no error occured during command execution.

This is regression issue from patch[0]. In this fix, stdout and stderr
in processutils.ssh_execute() will be masked only when
ProcessExecutionError exception is thrown i.e. command execution failed
due to some reasons.

[0] https://github.com/openstack/oslo.concurrency/commit/
ae9e05bfc3

Change-Id: I2ce344330905eef437ef3f89a2a01169a30df8ab
Closes-Bug: #1482382
This commit is contained in:
prashkre 2018-01-25 13:41:42 +05:30
parent 61e2923856
commit 21ae27e66d
2 changed files with 47 additions and 3 deletions

View File

@ -503,9 +503,21 @@ def trycmd(*args, **kwargs):
def ssh_execute(ssh, cmd, process_input=None,
addl_env=None, check_exit_code=True,
binary=False, timeout=None):
binary=False, timeout=None,
sanitize_stdout=True):
"""Run a command through SSH.
:param ssh: An SSH Connection object.
:param cmd: The command string to run.
:param check_exit_code: If an exception should be raised for non-zero
exit.
:param timeout: Max time in secs to wait for command execution.
:param sanitize_stdout: Defaults to True. If set to True, stdout is
sanitized i.e. any sensitive information like
password in command output will be masked.
:returns: (stdout, stderr) from command execution through
SSH.
.. versionchanged:: 1.9
Added *binary* optional parameter.
"""
@ -537,13 +549,21 @@ def ssh_execute(ssh, cmd, process_input=None,
# mask_password() requires Unicode on Python 3
stdout = os.fsdecode(stdout)
stderr = os.fsdecode(stderr)
stdout = strutils.mask_password(stdout)
if sanitize_stdout:
stdout = strutils.mask_password(stdout)
stderr = strutils.mask_password(stderr)
# exit_status == -1 if no exit code was returned
if exit_status != -1:
LOG.debug('Result was %s' % exit_status)
if check_exit_code and exit_status != 0:
# In case of errors in command run, due to poor implementation of
# command executable program, there might be chance that it leaks
# sensitive information like password to stdout. In such cases
# stdout needs to be sanitized even though sanitize_stdout=False.
stdout = strutils.mask_password(stdout)
raise ProcessExecutionError(exit_code=exit_status,
stdout=stdout,
stderr=stderr,

View File

@ -758,7 +758,8 @@ class SshExecuteTestCase(test_base.BaseTestCase):
fake_stdout.channel.recv_exit_status.return_value = rc
fake_stdout.read.return_value = b'password="secret"'
fake_stderr = six.BytesIO(b'password="foobar"')
fake_stderr = mock.Mock()
fake_stderr.read.return_value = b'password="foobar"'
command = 'ls --password="bar"'
@ -778,6 +779,20 @@ class SshExecuteTestCase(test_base.BaseTestCase):
self.assertEqual('ls --password="***"', err.cmd)
self.assertNotIn('secret', str(err))
self.assertNotIn('foobar', str(err))
# test ssh_execute with sanitize_stdout=False
err = self.assertRaises(processutils.ProcessExecutionError,
processutils.ssh_execute,
connection, command,
check_exit_code=check,
sanitize_stdout=False)
self.assertEqual(rc, err.exit_code)
self.assertEqual('password="***"', err.stdout)
self.assertEqual('password="***"', err.stderr)
self.assertEqual('ls --password="***"', err.cmd)
self.assertNotIn('secret', str(err))
self.assertNotIn('foobar', str(err))
else:
o, e = processutils.ssh_execute(connection, command,
check_exit_code=check)
@ -786,6 +801,15 @@ class SshExecuteTestCase(test_base.BaseTestCase):
self.assertIn('password="***"', fixture.output)
self.assertNotIn('bar', fixture.output)
# test ssh_execute with sanitize_stdout=False
o, e = processutils.ssh_execute(connection, command,
check_exit_code=check,
sanitize_stdout=False)
self.assertEqual('password="secret"', o)
self.assertEqual('password="***"', e)
self.assertIn('password="***"', fixture.output)
self.assertNotIn('bar', fixture.output)
def test_compromising_ssh1(self):
self._test_compromising_ssh(rc=-1, check=True)