Make ~/.tripleo/history owned by the correct user

When a tripleo subcommand command is executed with sudo, we identify
the original user and create the history file in that user's home
directory.

When creating the file, we should make that user owns it, otherwise
the user no longer able to run tripleo subcommends without sudo once
the history file is created and owned by root.

Change-Id: Ie6195b629fa65bcf9e5350f73d332e5034e8039b
(cherry picked from commit bd70bcaa2c)
This commit is contained in:
Takashi Kajinami 2022-10-20 13:23:47 +09:00
parent 06b3e54023
commit d4b5576c39
2 changed files with 69 additions and 23 deletions

View File

@ -15,8 +15,8 @@
import ansible_runner import ansible_runner
import argparse
import datetime import datetime
import errno
import fixtures import fixtures
import logging import logging
import openstack import openstack
@ -1450,34 +1450,65 @@ class TestGetSingleIp(TestCase):
class TestStoreCliParam(TestCase): class TestStoreCliParam(TestCase):
def setUp(self): def setUp(self):
self.args = argparse.ArgumentParser() class ArgsFake(object):
def __init__(self):
self.a = 1
self.args = ArgsFake()
@mock.patch('os.path.isdir') @mock.patch('os.path.isdir')
@mock.patch('os.path.exists') @mock.patch('os.chown')
def test_exists_but_not_dir(self, mock_exists, mock_isdir): @mock.patch('os.mkdir')
mock_exists.return_value = True def test_non_directory_exists(self, mock_mkdir, mock_chown, mock_isdir):
mock_isdir.return_value = False mock_isdir.return_value = False
self.assertRaises(exceptions.InvalidConfiguration, self.assertRaises(exceptions.InvalidConfiguration,
utils.store_cli_param, utils.store_cli_param,
"overcloud deploy", self.args) "overcloud deploy", self.args)
@mock.patch('tripleoclient.utils.datetime')
@mock.patch('os.path.isdir') @mock.patch('os.path.isdir')
@mock.patch('os.path.exists') @mock.patch('os.chown')
def test_write_cli_param(self, mock_exists, mock_isdir): @mock.patch('os.mkdir')
def test_directory_exists(self, mock_mkdir, mock_chown, mock_isdir,
mock_date):
history_path = os.path.join(os.path.expanduser("~"), '.tripleo') history_path = os.path.join(os.path.expanduser("~"), '.tripleo')
mock_exists.return_value = True mock_mkdir.side_effect = OSError(errno.EEXIST, 'error')
mock_isdir.return_value = True mock_isdir.return_value = True
mock_file = mock.mock_open() mock_file = mock.mock_open()
mock_date.datetime.now.return_value = datetime.datetime(2017, 11, 22)
class ArgsFake(object):
def __init__(self):
self.a = 1
dt = datetime.datetime(2017, 11, 22)
with mock.patch("builtins.open", mock_file): with mock.patch("builtins.open", mock_file):
with mock.patch('tripleoclient.utils.datetime') as mock_date: utils.store_cli_param("overcloud plan list", self.args)
mock_date.datetime.now.return_value = dt
utils.store_cli_param("overcloud plan list", ArgsFake()) expected_call = [
mock.call("%s/history" % history_path, 'a'),
mock.call().write('2017-11-22 00:00:00 overcloud-plan-list a=1 \n')
]
mock_file.assert_has_calls(expected_call, any_order=True)
@mock.patch('os.path.isdir')
@mock.patch('os.chown')
@mock.patch('os.mkdir')
def test_directory_fail(self, mock_mkdir, mock_chown, mock_isdir):
mock_mkdir.side_effect = OSError()
with self.assertRaises(IOError):
utils.store_cli_param("overcloud plan list", self.args)
mock_chown.assert_not_called()
mock_isdir.assert_not_called()
@mock.patch('tripleoclient.utils.datetime')
@mock.patch('os.path.isdir')
@mock.patch('os.chown')
@mock.patch('os.mkdir')
def test_write_cli_param(self, mock_mkdir, mock_chown, mock_isdir,
mock_date):
history_path = os.path.join(os.path.expanduser("~"), '.tripleo')
mock_isdir.return_value = True
mock_file = mock.mock_open()
mock_date.datetime.now.return_value = datetime.datetime(2017, 11, 22)
with mock.patch("builtins.open", mock_file):
utils.store_cli_param("overcloud plan list", self.args)
expected_call = [ expected_call = [
mock.call("%s/history" % history_path, 'a'), mock.call("%s/history" % history_path, 'a'),
@ -1487,12 +1518,14 @@ class TestStoreCliParam(TestCase):
@mock.patch('builtins.open') @mock.patch('builtins.open')
@mock.patch('os.path.isdir') @mock.patch('os.path.isdir')
@mock.patch('os.path.exists') @mock.patch('os.chown')
def test_fail_to_write_data(self, mock_exists, mock_isdir, mock_open): @mock.patch('os.mkdir')
mock_exists.return_value = True def test_fail_to_write_data(self, mock_mkdir, mock_chown, mock_isdir,
mock_open):
mock_isdir.return_value = True mock_isdir.return_value = True
mock_open.side_effect = IOError() mock_open.side_effect = IOError()
self.assertRaises(IOError, utils.store_cli_param, "command", self.args) with self.assertRaises(IOError):
utils.store_cli_param("command", self.args)
class ProcessMultipleEnvironments(TestCase): class ProcessMultipleEnvironments(TestCase):

View File

@ -828,16 +828,29 @@ def store_cli_param(command_name, parsed_args):
command_name = command_name.replace(" ", "-") command_name = command_name.replace(" ", "-")
history_path = os.path.join(constants.CLOUD_HOME_DIR, '.tripleo') history_path = os.path.join(constants.CLOUD_HOME_DIR, '.tripleo')
makedirs(history_path) try:
os.mkdir(history_path, 0o700)
os.chown(history_path,
int(os.environ.get('SUDO_UID', -1)),
int(os.environ.get('SUDO_GID', -1)))
except OSError as e:
if e.errno != errno.EEXIST:
messages = _("Unable to create the .tripleo directory: "
"{0}, {1}").format(history_path, e)
raise IOError(messages)
if os.path.isdir(history_path): if os.path.isdir(history_path):
try: try:
with open(os.path.join(history_path, history_file_path = os.path.join(history_path, 'history')
'history'), 'a') as history: with open(history_file_path, 'a') as history:
args = parsed_args.__dict__.copy() args = parsed_args.__dict__.copy()
used_args = ', '.join('%s=%s' % (key, value) used_args = ', '.join('%s=%s' % (key, value)
for key, value in args.items()) for key, value in args.items())
history.write(' '.join([str(datetime.datetime.now()), history.write(' '.join([str(datetime.datetime.now()),
str(command_name), used_args, "\n"])) str(command_name), used_args, "\n"]))
os.chown(history_file_path,
int(os.environ.get('SUDO_UID', -1)),
int(os.environ.get('SUDO_GID', -1)))
except IOError as e: except IOError as e:
messages = _("Unable to write into TripleO history file: " messages = _("Unable to write into TripleO history file: "
"{0}, {1}").format(history_path, e) "{0}, {1}").format(history_path, e)