Merge "Automaticaly set cipher suite"

This commit is contained in:
Zuul 2021-03-29 10:51:18 +00:00 committed by Gerrit Code Review
commit 356734aaca
5 changed files with 400 additions and 0 deletions

View File

@ -190,6 +190,28 @@ negotiation. In both cases you can specify the required suite yourself, e.g.::
baremetal node set <UUID or name> --driver-info ipmi_cipher_suite=3
In scenarios where the operator can't specify the `ipmi_cipher_suite` for
each node, the configuration `[ipmi]/cipher_suite_versions` can be set to
a list of cipher suites that will be used, Ironic will attempt to find a value
that can be used from the list provided (from last to first).::
[ipmi]
cipher_suite_versions = ['1','2','3','6','7','8','11','12']
To find the suitable values for this configuration, you can check the field
`RMCP+ Cipher Suites` after running an `ipmitool` command, e.g::
$ ipmitool -I lanplus -H $HOST -U $USER -v -R 12 -N 5 lan print
# output
Set in Progress : Set Complete
Auth Type Support : NONE MD2 MD5 PASSWORD OEM
Auth Type Enable : Callback : NONE MD2 MD5 PASSWORD OEM
IP Address Source : Static Address
IP Address : <IP>
Subnet Mask : <Subnet>
MAC Address : <MAC>
RMCP+ Cipher Suites : 0,1,2,3,6,7,8,11,12
Static boot order configuration
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -73,6 +73,11 @@ opts = [
'additional debugging output. This is a separate '
'option as ipmitool can log a substantial amount '
'of misleading text when in this mode.')),
cfg.ListOpt('cipher_suite_versions',
default=[],
help=_('List of possible cipher suites versions that can '
'be supported by the hardware in case the field '
'`cipher_suite` is not set for the node.')),
]

View File

@ -515,6 +515,75 @@ def _ipmitool_timing_args():
]
def choose_cipher_suite(actual_ciper_suite):
"""Gives the possible next avaible cipher suite version.
Based on CONF.ipmi.cipher_suite_versions and the last cipher suite version
used that failed. This function is only called if the node doesn't have
cipher_suite set. Starts using the last element of the list and decreasing
the index.
:param actual_ciper_suite: latest cipher suite used in the
ipmi call.
:returns: the next possible cipher suite or None in case of empty
configuration.
"""
available_cs_versions = CONF.ipmi.cipher_suite_versions
if not available_cs_versions:
return None
if actual_ciper_suite is None:
return available_cs_versions[-1]
else:
try:
cs_index = available_cs_versions.index(actual_ciper_suite)
except ValueError:
return available_cs_versions[-1]
return available_cs_versions[max(cs_index - 1, 0)]
def check_cipher_suite_errors(cmd_stderr):
"""Checks if the command stderr contains cipher suite errors.
:param cmd_stderr: The command stderr.
:returns: True if the cmd_stderr contains a cipher suite error,
False otherwise.
"""
cs_errors = ["Unsupported cipher suite ID",
"Error in open session response message :"
" no matching cipher suite"]
for cs_err in cs_errors:
if cmd_stderr is not None and cs_err in cmd_stderr:
return True
return False
def update_cipher_suite_cmd(actual_cs, args):
"""Updates variables and the cipher suite cmd.
This function updates the values for all parameters so they
can be used in the next retry of _exec_ipmitool.
:param actual_cs: a string that represents the cipher suite that was
used in the command.
:param args: a list that contains the ipmitool command that was executed.
:returns: a tuple with the new values (actual_cs, args)
"""
actual_cs = choose_cipher_suite(actual_cs)
if '-C' in args:
cs_index = args.index('-C') + 1
args[cs_index] = actual_cs
else:
args.append('-C')
args.append(actual_cs)
return (actual_cs, args)
def _exec_ipmitool(driver_info, command, check_exit_code=None,
kill_on_timeout=False):
"""Execute the ipmitool command.
@ -533,6 +602,10 @@ def _exec_ipmitool(driver_info, command, check_exit_code=None,
"""
args = _get_ipmitool_args(driver_info)
change_cs = (CONF.ipmi.cipher_suite_versions != []
and driver_info.get('cipher_suite') is None)
actual_cs = None
timeout = CONF.ipmi.command_retry_timeout
args.extend(_ipmitool_timing_args())
@ -570,6 +643,11 @@ def _exec_ipmitool(driver_info, command, check_exit_code=None,
out, err = utils.execute(*cmd_args, **extra_args)
return out, err
except processutils.ProcessExecutionError as e:
if change_cs and check_cipher_suite_errors(e.stderr):
actual_cs, args = update_cipher_suite_cmd(
actual_cs, args)
else:
change_cs = False
with excutils.save_and_reraise_exception() as ctxt:
err_list = [
x for x in (

View File

@ -1458,6 +1458,295 @@ class IPMIToolPrivateMethodTestCase(
mock_exec.assert_called_once_with(*args)
self.assertFalse(self.mock_sleep.called)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(ipmi, '_make_password_file', _make_password_file_stub)
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_cipher_suite_error_noconfig(
self, mock_exec, mock_check_cs, mock_support):
no_matching_error = 'Error in open session response message : '\
'no matching cipher suite\n\nError: '\
'Unable to establish IPMI v2 / RMCP+ session\n'
self.config(min_command_interval=1, group='ipmi')
self.config(command_retry_timeout=2, group='ipmi')
self.config(use_ipmitool_retries=False, group='ipmi')
self.config(cipher_suite_versions=[], group='ipmi')
ipmi.LAST_CMD_TIME = {}
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-f', awesome_password_filename,
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.side_effect = [
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
]
mock_check_cs.return_value = False
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_support.assert_called_once_with('timing')
calls = [mock.call(*args), mock.call(*args)]
mock_exec.assert_has_calls(calls)
self.assertEqual(2, mock_exec.call_count)
self.assertEqual(0, mock_check_cs.call_count)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(ipmi, '_make_password_file', _make_password_file_stub)
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_cipher_suite_set_with_error_noconfig(
self, mock_exec, mock_check_cs, mock_support):
no_matching_error = 'Error in open session response message : '\
'no matching cipher suite\n\nError: '\
'Unable to establish IPMI v2 / RMCP+ session\n'
self.config(min_command_interval=1, group='ipmi')
self.config(command_retry_timeout=2, group='ipmi')
self.config(use_ipmitool_retries=False, group='ipmi')
self.config(cipher_suite_versions=[], group='ipmi')
ipmi.LAST_CMD_TIME = {}
self.info['cipher_suite'] = '17'
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-C', '17',
'-v',
'-f', awesome_password_filename,
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.side_effect = [
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
]
mock_check_cs.return_value = False
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_support.assert_called_once_with('timing')
calls = [mock.call(*args), mock.call(*args)]
mock_exec.assert_has_calls(calls)
self.assertEqual(2, mock_exec.call_count)
self.assertEqual(0, mock_check_cs.call_count)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(ipmi, '_make_password_file', _make_password_file_stub)
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_cipher_suite_set_with_error_config(
self, mock_exec, mock_check_cs, mock_support):
no_matching_error = 'Error in open session response message : '\
'no matching cipher suite\n\nError: '\
'Unable to establish IPMI v2 / RMCP+ session\n'
self.config(min_command_interval=1, group='ipmi')
self.config(command_retry_timeout=2, group='ipmi')
self.config(use_ipmitool_retries=False, group='ipmi')
self.config(cipher_suite_versions=[0, 1, 2, 3], group='ipmi')
ipmi.LAST_CMD_TIME = {}
self.info['cipher_suite'] = '17'
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-C', '17',
'-v',
'-f', awesome_password_filename,
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.side_effect = [
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
]
mock_check_cs.return_value = False
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_support.assert_called_once_with('timing')
calls = [mock.call(*args), mock.call(*args)]
mock_exec.assert_has_calls(calls)
self.assertEqual(2, mock_exec.call_count)
self.assertEqual(0, mock_check_cs.call_count)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(ipmi, '_make_password_file', _make_password_file_stub)
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_try_different_cipher_suite(
self, mock_exec, mock_check_cs, mock_support):
self.config(min_command_interval=1, group='ipmi')
self.config(command_retry_timeout=4, group='ipmi')
self.config(use_ipmitool_retries=False, group='ipmi')
cs_list = ['0', '1', '2', '3', '17']
self.config(cipher_suite_versions=cs_list, group='ipmi')
no_matching_error = 'Error in open session response message : '\
'no matching cipher suite\n\nError: '\
'Unable to establish IPMI v2 / RMCP+ session\n'
unsupported_error = 'Unsupported cipher suite ID : 17\n\n'\
'Error: Unable to establish IPMI v2 / RMCP+ session\n'
ipmi.LAST_CMD_TIME = {}
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-f', awesome_password_filename,
'A', 'B', 'C',
]
args_cs17 = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-C', '17',
'-f', awesome_password_filename,
'A', 'B', 'C',
]
args_cs3 = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-C', '3',
'-f', awesome_password_filename,
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.side_effect = [
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
processutils.ProcessExecutionError(
stdout='',
stderr=unsupported_error),
processutils.ProcessExecutionError(stderr="Unknown"),
('', ''),
]
mock_check_cs.side_effect = [True, True, False]
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
execute_calls = [mock.call(*args), mock.call(*args_cs17),
mock.call(*args_cs3), mock.call(*args_cs3)]
mock_exec.assert_has_calls(execute_calls)
self.assertEqual(4, mock_exec.call_count)
check_calls = [mock.call(no_matching_error),
mock.call(unsupported_error), mock.call('Unknown')]
mock_check_cs.assert_has_calls(check_calls)
self.assertEqual(3, mock_check_cs.call_count)
def test__choose_cipher_suite_empty_list(self):
self.config(cipher_suite_versions=[], group='ipmi')
value = ipmi.choose_cipher_suite(None)
self.assertIsNone(value)
nextvalue = ipmi.choose_cipher_suite('3')
self.assertIsNone(nextvalue)
def test__choose_cipher_suite_one_element(self):
cs_list = ['3']
self.config(cipher_suite_versions=cs_list, group='ipmi')
actual_cs = ipmi.choose_cipher_suite(None)
self.assertEqual(actual_cs, cs_list[-1])
self.assertEqual(actual_cs, cs_list[0])
self.assertEqual(actual_cs, '3')
# Call again and ensure it will be the same.
new_cs = ipmi.choose_cipher_suite(actual_cs)
self.assertEqual(new_cs, cs_list[-1])
self.assertEqual(new_cs, cs_list[0])
self.assertEqual(new_cs, '3')
self.assertEqual(new_cs, actual_cs)
def test__choose_cipher_suite_returns_last_to_first(self):
cs_list = ['0', '1', '2', '3', '17']
self.config(cipher_suite_versions=cs_list, group='ipmi')
element_position = len(cs_list) - 1
actual_cs = ipmi.choose_cipher_suite(None)
self.assertEqual(actual_cs, cs_list[-1])
self.assertEqual(actual_cs, cs_list[element_position])
self.assertEqual(actual_cs, '17')
# iterate call choose_cipher_suite 5 more times
# this ensures the last two call needs to return the first
# element.
for i in range(len(cs_list)):
actual_cs = ipmi.choose_cipher_suite(actual_cs)
if element_position != 0:
element_position -= 1
self.assertEqual(actual_cs, cs_list[element_position])
self.assertEqual(actual_cs, '0')
self.assertEqual(actual_cs, cs_list[0])
def test__check_cipher_suite_errors(self):
invalid_errors_stderr = [
'Unknow', 'x', '', 'test',
'Problem\n\nError: Unable to establish IPMI v2 / RMCP+ session\n',
'UnsupportedciphersuiteID:17\n\n'
]
no_matching_error = 'Error in open session response message : '\
'no matching cipher suite\n\nError: '\
'Unable to establish IPMI v2 / RMCP+ session\n'
unsupported_error = 'Unsupported cipher suite ID : 17\n\n'\
'Error: Unable to establish IPMI v2 / RMCP+ session\n'
valid_errors_stderr = [no_matching_error, unsupported_error]
for invalid_err in invalid_errors_stderr:
self.assertFalse(ipmi.check_cipher_suite_errors(invalid_err))
for valid_err in valid_errors_stderr:
self.assertTrue(ipmi.check_cipher_suite_errors(valid_err))
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(ipmi, '_make_password_file', _make_password_file_stub)
@mock.patch.object(utils, 'execute', autospec=True)

View File

@ -0,0 +1,6 @@
---
features:
- |
Allows providing a list of IPMI cipher suite versions via the new
configuration option ``[ipmi]/cipher_suite_versions``. The configuration
is only used when ``ipmi_cipher_suite`` is not set in ``driver_info``.