Flexible IPMI credential persistence method configuration
Instead of only file-based persistence which is reported to potentially expose IPMI credential by persisting the temp file(s) for a little too long. User can now pass ``True`` to the ``ipmi_store_cred_in_env`` parameter which instead stores IPMI password as an environment variable, limiting exposure to the user session of Ironic. Defaults to ``False``. Closes-Bug: #2058749 Change-Id: Icd91e969e5c58bf42fc50958c3cd1acabd36ccdf
This commit is contained in:
parent
c1f3daf7b0
commit
483bfbcdb6
@ -82,6 +82,34 @@ with an IPMItool-based driver. For example::
|
||||
--driver-info ipmi_username=<username> \
|
||||
--driver-info ipmi_password=<password>
|
||||
|
||||
|
||||
Changing The Default IPMI Credential Persistence Method
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
- ``ipmi_store_cred_in_env``: IPMI password persistence method;
|
||||
defaults to *False*.
|
||||
|
||||
The `ipmi_store_cred_in_env` configuration option allow users to switch
|
||||
between file-based and environment variable persistence methods for
|
||||
IPMI password.
|
||||
|
||||
For example, to switch to environment variable:
|
||||
|
||||
baremetal node create --driver ipmi \
|
||||
--driver-info ipmi_address=<address> \
|
||||
--driver-info ipmi_username=<username> \
|
||||
--driver-info ipmi_password=<password> \
|
||||
--driver-info ipmi_store_cred_in_env=True
|
||||
|
||||
And to swtich back to file-based temporary persistence:
|
||||
|
||||
baremetal node create --driver ipmi \
|
||||
--driver-info ipmi_address=<address> \
|
||||
--driver-info ipmi_username=<username> \
|
||||
--driver-info ipmi_password=<password> \
|
||||
--driver-info ipmi_store_cred_in_env=False
|
||||
|
||||
|
||||
Advanced configuration
|
||||
======================
|
||||
|
||||
|
@ -71,9 +71,13 @@ REQUIRED_PROPERTIES = {
|
||||
}
|
||||
OPTIONAL_PROPERTIES = {
|
||||
'ipmi_password': _("password. Optional."),
|
||||
'ipmi_hex_kg_key': _('Kg key for IPMIv2 authentication. '
|
||||
'The key is expected in hexadecimal format. '
|
||||
'Optional.'),
|
||||
'ipmi_store_cred_in_env': _("Determines how IPMI credentials are stored "
|
||||
"during execution; either as a temporary file "
|
||||
"(the default) or as an environment variable. "
|
||||
"Default is False. Optional."),
|
||||
'ipmi_hex_kg_key': _("Kg key for IPMIv2 authentication. "
|
||||
"The key is expected in hexadecimal format. "
|
||||
"Optional."),
|
||||
'ipmi_port': _("remote IPMI RMCP port. Optional."),
|
||||
'ipmi_priv_level': _("privilege level; default is ADMINISTRATOR. One of "
|
||||
"%s. Optional.") % ', '.join(VALID_PRIV_LEVELS),
|
||||
@ -342,6 +346,7 @@ def _parse_driver_info(node):
|
||||
address = info.get('ipmi_address')
|
||||
username = info.get('ipmi_username')
|
||||
password = str(info.get('ipmi_password', ''))
|
||||
store_cred_in_env = info.get('ipmi_store_cred_in_env', False)
|
||||
hex_kg_key = info.get('ipmi_hex_kg_key')
|
||||
dest_port = info.get('ipmi_port')
|
||||
port = (info.get('ipmi_terminal_port')
|
||||
@ -443,6 +448,7 @@ def _parse_driver_info(node):
|
||||
'dest_port': dest_port,
|
||||
'username': username,
|
||||
'password': password,
|
||||
'store_cred_in_env': store_cred_in_env,
|
||||
'hex_kg_key': hex_kg_key,
|
||||
'port': port,
|
||||
'uuid': node.uuid,
|
||||
@ -495,6 +501,25 @@ def _get_ipmitool_args(driver_info, pw_file=None):
|
||||
return args
|
||||
|
||||
|
||||
def _persist_ipmi_password(driver_info):
|
||||
"""Persists IPMI password for passing to the ipmitool
|
||||
|
||||
:param driver_info: driver info with the ipmitool parameters
|
||||
"""
|
||||
store_cred_in_env = driver_info['store_cred_in_env']
|
||||
if store_cred_in_env:
|
||||
password = driver_info.get('password')
|
||||
if password:
|
||||
os.environ['PASSWORD'] = password
|
||||
return '-E', None
|
||||
|
||||
path = _console_pwfile_path(driver_info['uuid'])
|
||||
pw_file = console_utils.make_persistent_password_file(
|
||||
path, driver_info['password'] or '\0')
|
||||
|
||||
return '-f', pw_file
|
||||
|
||||
|
||||
def _ipmitool_timing_args():
|
||||
if not _is_option_supported('timing'):
|
||||
return []
|
||||
@ -644,10 +669,19 @@ def _exec_ipmitool(driver_info, command, check_exit_code=None,
|
||||
# 'ipmitool' command will prompt password if there is no '-f'
|
||||
# option, we set it to '\0' to write a password file to support
|
||||
# empty password
|
||||
with _make_password_file(driver_info['password'] or '\0') as pw_file:
|
||||
|
||||
flag, _ = _persist_ipmi_password(driver_info)
|
||||
if flag == '-E':
|
||||
cmd_args.append('-E')
|
||||
cmd_args.append('PASSWORD')
|
||||
cmd_args.extend(command.split(" "))
|
||||
else:
|
||||
with _make_password_file(driver_info['password']
|
||||
or '\0') as pw_file:
|
||||
cmd_args.append('-f')
|
||||
cmd_args.append(pw_file)
|
||||
cmd_args.extend(command.split(" "))
|
||||
|
||||
try:
|
||||
out, err = utils.execute(*cmd_args, **extra_args)
|
||||
return out, err
|
||||
@ -1543,16 +1577,15 @@ class IPMIConsole(base.ConsoleInterface):
|
||||
created
|
||||
:raises: ConsoleSubprocessFailed when invoking the subprocess failed
|
||||
"""
|
||||
path = _console_pwfile_path(driver_info['uuid'])
|
||||
pw_file = console_utils.make_persistent_password_file(
|
||||
path, driver_info['password'] or '\0')
|
||||
ipmi_cmd = self._get_ipmi_cmd(driver_info, pw_file)
|
||||
_, path = _persist_ipmi_password(driver_info)
|
||||
ipmi_cmd = self._get_ipmi_cmd(driver_info, pw_file=path)
|
||||
ipmi_cmd += ' sol activate'
|
||||
|
||||
try:
|
||||
start_method(driver_info['uuid'], driver_info['port'], ipmi_cmd)
|
||||
except (exception.ConsoleError, exception.ConsoleSubprocessFailed):
|
||||
with excutils.save_and_reraise_exception():
|
||||
if path:
|
||||
ironic_utils.unlink_without_raise(path)
|
||||
|
||||
|
||||
|
@ -6307,7 +6307,7 @@ class ManagerTestProperties(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
'ipmi_target_address', 'ipmi_local_address',
|
||||
'ipmi_protocol_version', 'ipmi_force_boot_device',
|
||||
'ipmi_disable_boot_timeout', 'ipmi_hex_kg_key',
|
||||
'ipmi_cipher_suite']
|
||||
'ipmi_cipher_suite', 'ipmi_store_cred_in_env']
|
||||
self._check_driver_properties("ipmi", expected)
|
||||
|
||||
def test_driver_properties_snmp(self):
|
||||
|
@ -1922,6 +1922,812 @@ class IPMIToolPrivateMethodTestCase(
|
||||
self.assertEqual(['-R', '7', '-N', '1'], ipmi._ipmitool_timing_args())
|
||||
|
||||
|
||||
class IPMIToolPrivateMethodOnEnvironmentPersistenceTestCase(
|
||||
Base,
|
||||
metaclass=IPMIToolPrivateMethodTestCaseMeta):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
INFO_DICT['ipmi_store_cred_in_env'] = True
|
||||
BRIDGE_INFO_DICT['ipmi_store_cred_in_env'] = True
|
||||
|
||||
super(IPMIToolPrivateMethodOnEnvironmentPersistenceTestCase,
|
||||
self).setUp()
|
||||
|
||||
self._mock_system_random_distribution()
|
||||
|
||||
mock_sleep_fixture = self.useFixture(
|
||||
fixtures.MockPatchObject(time, 'sleep', autospec=True))
|
||||
self.mock_sleep = mock_sleep_fixture.mock
|
||||
|
||||
def _mock_system_random_distribution(self):
|
||||
m = mock.patch.object(random.SystemRandom, 'gauss', return_value=0.5,
|
||||
autospec=True)
|
||||
m.start()
|
||||
self.addCleanup(m.stop)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_first_call_to_address(self, mock_exec,
|
||||
mock_support):
|
||||
print(self.info)
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
self.assertFalse(self.mock_sleep.called)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_second_call_to_address_sleep(
|
||||
self, mock_exec, mock_support):
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
args = [[
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
], [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'D', 'E', 'F',
|
||||
]]
|
||||
|
||||
expected = [mock.call('timing'),
|
||||
mock.call('timing')]
|
||||
mock_support.return_value = False
|
||||
mock_exec.side_effect = [(None, None), (None, None)]
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
mock_exec.assert_called_with(*args[0])
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'D E F')
|
||||
self.assertTrue(self.mock_sleep.called)
|
||||
self.assertEqual(expected, mock_support.call_args_list)
|
||||
mock_exec.assert_called_with(*args[1])
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_second_call_to_address_no_sleep(
|
||||
self, mock_exec, mock_support):
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
args = [[
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
], [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'D', 'E', 'F',
|
||||
]]
|
||||
|
||||
expected = [mock.call('timing'),
|
||||
mock.call('timing')]
|
||||
mock_support.return_value = False
|
||||
mock_exec.side_effect = [(None, None), (None, None)]
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
mock_exec.assert_called_with(*args[0])
|
||||
# act like enough time has passed
|
||||
ipmi.LAST_CMD_TIME[self.info['address']] = (
|
||||
time.time() - CONF.ipmi.min_command_interval)
|
||||
ipmi._exec_ipmitool(self.info, 'D E F')
|
||||
self.assertFalse(self.mock_sleep.called)
|
||||
self.assertEqual(expected, mock_support.call_args_list)
|
||||
mock_exec.assert_called_with(*args[1])
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_two_calls_to_diff_address(
|
||||
self, mock_exec, mock_support):
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
args = [[
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
], [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', '127.127.127.127',
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'D', 'E', 'F',
|
||||
]]
|
||||
|
||||
expected = [mock.call('timing'),
|
||||
mock.call('timing')]
|
||||
mock_support.return_value = False
|
||||
mock_exec.side_effect = [(None, None), (None, None)]
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
mock_exec.assert_called_with(*args[0])
|
||||
self.info['address'] = '127.127.127.127'
|
||||
ipmi._exec_ipmitool(self.info, 'D E F')
|
||||
self.assertFalse(self.mock_sleep.called)
|
||||
self.assertEqual(expected, mock_support.call_args_list)
|
||||
mock_exec.assert_called_with(*args[1])
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_without_timing(
|
||||
self, mock_exec, mock_support):
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_timing(
|
||||
self, mock_exec, mock_support):
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-R', '7',
|
||||
'-N', '5',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = True
|
||||
mock_exec.return_value = (None, None)
|
||||
|
||||
self.config(use_ipmitool_retries=True, group='ipmi')
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_ironic_retries(
|
||||
self, mock_exec, mock_support):
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-R', '1',
|
||||
'-N', '5',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = True
|
||||
mock_exec.return_value = (None, None)
|
||||
|
||||
self.config(use_ipmitool_retries=False, group='ipmi')
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_timeout(
|
||||
self, mock_exec, mock_support):
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-R', '7',
|
||||
'-N', '5',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = True
|
||||
mock_exec.return_value = (None, None)
|
||||
|
||||
self.config(use_ipmitool_retries=True, group='ipmi')
|
||||
ipmi._exec_ipmitool(self.info, 'A B C', kill_on_timeout=True)
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args, timeout=60)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_ironic_retries_multiple(
|
||||
self, mock_exec, mock_support):
|
||||
|
||||
mock_exec.side_effect = [
|
||||
processutils.ProcessExecutionError(
|
||||
stderr="Unknown"
|
||||
),
|
||||
processutils.ProcessExecutionError(
|
||||
stderr="Unknown"
|
||||
),
|
||||
processutils.ProcessExecutionError(
|
||||
stderr="Unknown"
|
||||
),
|
||||
]
|
||||
|
||||
self.config(min_command_interval=1, group='ipmi')
|
||||
self.config(command_retry_timeout=3, group='ipmi')
|
||||
self.config(use_ipmitool_retries=False, group='ipmi')
|
||||
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
ipmi._exec_ipmitool,
|
||||
self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
self.assertEqual(3, mock_exec.call_count)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_without_username(
|
||||
self, mock_exec, mock_support):
|
||||
# An undefined username is treated the same as an empty username and
|
||||
# will cause no user (-U) to be specified.
|
||||
self.info['username'] = None
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_empty_username(
|
||||
self, mock_exec, mock_support):
|
||||
# An empty username is treated the same as an undefined username and
|
||||
# will cause no user (-U) to be specified.
|
||||
self.info['username'] = ""
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_without_password(self, mock_exec,
|
||||
mock_support):
|
||||
# An undefined password is treated the same as an empty password and
|
||||
# will cause a NULL (\0) password to be used"""
|
||||
self.info['password'] = None
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_empty_password(self, mock_exec,
|
||||
mock_support):
|
||||
# An empty password is treated the same as an undefined password and
|
||||
# will cause a NULL (\0) password to be used"""
|
||||
self.info['password'] = ""
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_dual_bridging(self,
|
||||
mock_exec,
|
||||
mock_support):
|
||||
|
||||
node = obj_utils.get_test_node(self.context,
|
||||
driver_info=BRIDGE_INFO_DICT)
|
||||
# when support for dual bridge command is called returns True
|
||||
mock_support.return_value = True
|
||||
info = ipmi._parse_driver_info(node)
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', info['address'],
|
||||
'-L', info['priv_level'],
|
||||
'-U', info['username'],
|
||||
'-m', info['local_address'],
|
||||
'-B', info['transit_channel'],
|
||||
'-T', info['transit_address'],
|
||||
'-b', info['target_channel'],
|
||||
'-t', info['target_address'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
expected = [mock.call('dual_bridge'),
|
||||
mock.call('timing')]
|
||||
# When support for timing command is called returns False
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
ipmi._exec_ipmitool(info, 'A B C')
|
||||
self.assertEqual(expected, mock_support.call_args_list)
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_single_bridging(self,
|
||||
mock_exec,
|
||||
mock_support):
|
||||
single_bridge_info = dict(BRIDGE_INFO_DICT)
|
||||
single_bridge_info['ipmi_bridging'] = 'single'
|
||||
print(BRIDGE_INFO_DICT)
|
||||
node = obj_utils.get_test_node(self.context,
|
||||
driver_info=single_bridge_info)
|
||||
# when support for single bridge command is called returns True
|
||||
mock_support.return_value = True
|
||||
info = ipmi._parse_driver_info(node)
|
||||
info['transit_channel'] = info['transit_address'] = None
|
||||
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', info['address'],
|
||||
'-L', info['priv_level'],
|
||||
'-U', info['username'],
|
||||
'-m', info['local_address'],
|
||||
'-b', info['target_channel'],
|
||||
'-t', info['target_address'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
expected = [mock.call('single_bridge'),
|
||||
mock.call('timing')]
|
||||
# When support for timing command is called returns False
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
ipmi._exec_ipmitool(info, 'A B C')
|
||||
self.assertEqual(expected, mock_support.call_args_list)
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_exception(self, mock_exec, mock_support):
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
self.config(use_ipmitool_retries=True, group='ipmi')
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.side_effect = processutils.ProcessExecutionError("x")
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
ipmi._exec_ipmitool,
|
||||
self.info, 'A B C')
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
self.assertEqual(1, mock_exec.call_count)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_IPMI_version_1_5(
|
||||
self, mock_exec, mock_support):
|
||||
self.info['protocol_version'] = '1.5'
|
||||
# Assert it uses "-I lan" (1.5) instead of "-I lanplus" (2.0)
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lan',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_port(self, mock_exec, mock_support):
|
||||
self.info['dest_port'] = '1623'
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-p', '1623',
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
self.assertFalse(self.mock_sleep.called)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_cipher_suite(self, mock_exec, mock_support):
|
||||
self.info['cipher_suite'] = '3'
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-C', '3',
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args)
|
||||
self.assertFalse(self.mock_sleep.called)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_cipher_suite_error_noconfig(
|
||||
self, mock_exec, mock_check_cs, mock_support):
|
||||
no_matching_error = 'Error in open session response message : ' \
|
||||
'no matching cipher suite\n\nError: ' \
|
||||
'Unable to establish IPMI v2 / RMCP+ session\n'
|
||||
self.config(min_command_interval=1, group='ipmi')
|
||||
self.config(command_retry_timeout=2, group='ipmi')
|
||||
self.config(use_ipmitool_retries=False, group='ipmi')
|
||||
self.config(cipher_suite_versions=[], group='ipmi')
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.side_effect = [
|
||||
processutils.ProcessExecutionError(
|
||||
stdout='',
|
||||
stderr=no_matching_error),
|
||||
processutils.ProcessExecutionError(
|
||||
stdout='',
|
||||
stderr=no_matching_error),
|
||||
]
|
||||
mock_check_cs.return_value = False
|
||||
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
ipmi._exec_ipmitool,
|
||||
self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
|
||||
calls = [mock.call(*args), mock.call(*args)]
|
||||
mock_exec.assert_has_calls(calls)
|
||||
self.assertEqual(2, mock_exec.call_count)
|
||||
self.assertEqual(0, mock_check_cs.call_count)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_cipher_suite_set_with_error_noconfig(
|
||||
self, mock_exec, mock_check_cs, mock_support):
|
||||
no_matching_error = 'Error in open session response message : ' \
|
||||
'no matching cipher suite\n\nError: ' \
|
||||
'Unable to establish IPMI v2 / RMCP+ session\n'
|
||||
self.config(min_command_interval=1, group='ipmi')
|
||||
self.config(command_retry_timeout=2, group='ipmi')
|
||||
self.config(use_ipmitool_retries=False, group='ipmi')
|
||||
self.config(cipher_suite_versions=[], group='ipmi')
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
self.info['cipher_suite'] = '17'
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-C', '17',
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.side_effect = [
|
||||
processutils.ProcessExecutionError(
|
||||
stdout='',
|
||||
stderr=no_matching_error),
|
||||
processutils.ProcessExecutionError(
|
||||
stdout='',
|
||||
stderr=no_matching_error),
|
||||
]
|
||||
mock_check_cs.return_value = False
|
||||
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
ipmi._exec_ipmitool,
|
||||
self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
|
||||
calls = [mock.call(*args), mock.call(*args)]
|
||||
mock_exec.assert_has_calls(calls)
|
||||
self.assertEqual(2, mock_exec.call_count)
|
||||
self.assertEqual(0, mock_check_cs.call_count)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_cipher_suite_set_with_error_config(
|
||||
self, mock_exec, mock_check_cs, mock_support):
|
||||
no_matching_error = 'Error in open session response message : ' \
|
||||
'no matching cipher suite\n\nError: ' \
|
||||
'Unable to establish IPMI v2 / RMCP+ session\n'
|
||||
self.config(min_command_interval=1, group='ipmi')
|
||||
self.config(command_retry_timeout=2, group='ipmi')
|
||||
self.config(use_ipmitool_retries=False, group='ipmi')
|
||||
self.config(cipher_suite_versions=[0, 1, 2, 3], group='ipmi')
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
self.info['cipher_suite'] = '17'
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-C', '17',
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.side_effect = [
|
||||
processutils.ProcessExecutionError(
|
||||
stdout='',
|
||||
stderr=no_matching_error),
|
||||
processutils.ProcessExecutionError(
|
||||
stdout='',
|
||||
stderr=no_matching_error),
|
||||
]
|
||||
mock_check_cs.return_value = False
|
||||
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
ipmi._exec_ipmitool,
|
||||
self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
|
||||
calls = [mock.call(*args), mock.call(*args)]
|
||||
mock_exec.assert_has_calls(calls)
|
||||
self.assertEqual(2, mock_exec.call_count)
|
||||
self.assertEqual(0, mock_check_cs.call_count)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_try_different_cipher_suite(
|
||||
self, mock_exec, mock_check_cs, mock_support):
|
||||
|
||||
self.config(min_command_interval=1, group='ipmi')
|
||||
self.config(command_retry_timeout=4, group='ipmi')
|
||||
self.config(use_ipmitool_retries=False, group='ipmi')
|
||||
cs_list = ['0', '1', '2', '3', '17']
|
||||
self.config(cipher_suite_versions=cs_list, group='ipmi')
|
||||
no_matching_error = 'Error in open session response message : ' \
|
||||
'no matching cipher suite\n\nError: ' \
|
||||
'Unable to establish IPMI v2 / RMCP+ session\n'
|
||||
unsupported_error = 'Unsupported cipher suite ID : 17\n\n' \
|
||||
'Error: Unable to establish IPMI v2 / RMCP+ session\n'
|
||||
ipmi.LAST_CMD_TIME = {}
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
args_cs17 = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-C', '17',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
args_cs3 = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-C', '3',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
|
||||
mock_support.return_value = False
|
||||
mock_exec.side_effect = [
|
||||
processutils.ProcessExecutionError(
|
||||
stdout='',
|
||||
stderr=no_matching_error),
|
||||
processutils.ProcessExecutionError(
|
||||
stdout='',
|
||||
stderr=unsupported_error),
|
||||
processutils.ProcessExecutionError(stderr="Unknown"),
|
||||
('', ''),
|
||||
]
|
||||
mock_check_cs.side_effect = [True, True, False]
|
||||
|
||||
ipmi._exec_ipmitool(self.info, 'A B C')
|
||||
|
||||
mock_support.assert_called_once_with('timing')
|
||||
|
||||
execute_calls = [mock.call(*args), mock.call(*args_cs17),
|
||||
mock.call(*args_cs3), mock.call(*args_cs3)]
|
||||
mock_exec.assert_has_calls(execute_calls)
|
||||
self.assertEqual(4, mock_exec.call_count)
|
||||
check_calls = [mock.call(no_matching_error),
|
||||
mock.call(unsupported_error), mock.call('Unknown')]
|
||||
mock_check_cs.assert_has_calls(check_calls)
|
||||
self.assertEqual(3, mock_check_cs.call_count)
|
||||
|
||||
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test__exec_ipmitool_with_check_exit_code(self, mock_exec,
|
||||
mock_support):
|
||||
args = [
|
||||
'ipmitool',
|
||||
'-I', 'lanplus',
|
||||
'-H', self.info['address'],
|
||||
'-L', self.info['priv_level'],
|
||||
'-U', self.info['username'],
|
||||
'-v',
|
||||
'-E', 'PASSWORD',
|
||||
'A', 'B', 'C',
|
||||
]
|
||||
mock_support.return_value = False
|
||||
mock_exec.return_value = (None, None)
|
||||
ipmi._exec_ipmitool(self.info, 'A B C', check_exit_code=[0, 1])
|
||||
mock_support.assert_called_once_with('timing')
|
||||
mock_exec.assert_called_once_with(*args, check_exit_code=[0, 1])
|
||||
|
||||
|
||||
class IPMIToolDriverTestCase(Base):
|
||||
|
||||
@mock.patch.object(ipmi, "_parse_driver_info", autospec=True)
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Adds a new configuration option ``ipmi_store_cred_in_env`` to allow
|
||||
switching between file-based and environment variable persistence for
|
||||
IPMI credentials. Defaults to ``False``.
|
Loading…
Reference in New Issue
Block a user