Add parameter to specify certification file

This patch adds functions & methods which have been used to connect
to iRMC via HTTPS to accept additional parameter.
With additional parameter, user is able to specify certification file.

Co-authored-by: Kobayashi Daisuke <kobayashi.da-06@fujitsu.com>
Change-Id: I51203e16207f8d3b1448b581942111bff60d0c86
This commit is contained in:
Vanou Ishii 2022-06-01 17:40:12 +09:00
parent a28eb28775
commit 274dca0344
3 changed files with 56 additions and 25 deletions

View File

@ -188,6 +188,10 @@ def elcm_request(irmc_info, method, path, **kwargs):
'irmc_port': 80 or 443, default is 443,
'irmc_auth_method': 'basic' or 'digest', default is 'basic',
'irmc_client_timeout': timeout, default is 60,
'irmc_verify_ca': Either a boolean, in which case it controls
whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use. Defaults to ``True``.
...
}
:param method: request method such as 'GET', 'POST'
@ -203,6 +207,7 @@ def elcm_request(irmc_info, method, path, **kwargs):
userid = irmc_info['irmc_username']
password = irmc_info['irmc_password']
client_timeout = irmc_info.get('irmc_client_timeout', 60)
verify = irmc_info.get('irmc_verify_ca', True)
# Request headers, params, and data
headers = kwargs.get('headers', {'Accept': 'application/json'})
@ -229,7 +234,7 @@ def elcm_request(irmc_info, method, path, **kwargs):
headers=headers,
params=params,
data=data,
verify=False,
verify=verify,
timeout=client_timeout,
allow_redirects=False,
auth=auth_obj)

View File

@ -264,7 +264,7 @@ def get_share_type(share_type):
def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
client_timeout=60, do_async=True, **kwargs):
client_timeout=60, do_async=True, verify=True, **kwargs):
"""execute SCCI command
This function calls SCCI server modules
@ -276,6 +276,10 @@ def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
:param auth_method: irmc_username
:param client_timeout: timeout for SCCI operations
:param do_async: async call if True, sync call otherwise
:param verify: (optional) Either a boolean, in which case it
controls whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use. Defaults to ``True``.
:returns: requests.Response from SCCI server
:raises: SCCIInvalidInputError if port and/or auth_method params
are invalid
@ -310,12 +314,14 @@ def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
if check_eject_cd_cmd(cmd):
if not validate_params_cd_fd("cmd_cd", protocol,
host, auth_obj,
do_async, client_timeout):
do_async, client_timeout,
verify):
return
if check_eject_fd_cmd(cmd):
if not validate_params_cd_fd("cmd_fd", protocol,
host, auth_obj,
do_async, client_timeout):
do_async, client_timeout,
verify):
return
data = cmd
config_type = '/config'
@ -323,7 +329,7 @@ def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
r = requests.post(protocol + '://' + host + config_type,
data=data,
headers=header,
verify=False,
verify=verify,
timeout=client_timeout,
allow_redirects=False,
auth=auth_obj)
@ -373,7 +379,7 @@ def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
def validate_params_cd_fd(cmd_type, protocol, host, auth_obj,
do_async, client_timeout):
do_async, client_timeout, verify):
"""Validate parameters of CD/DVD or FD Image Virtual Media in iRMC
If one of parameters (ImageServer, ImageShareName or ImageName) set in
@ -386,6 +392,10 @@ def validate_params_cd_fd(cmd_type, protocol, host, auth_obj,
:param auth_obj: irmc userid/password
:param do_async: async call if True, sync call otherwise
:param client_timeout: timeout for SCCI operations
:param verify: Either a boolean, in which case it
controls whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use.
:return: False if one of param is null. Otherwise, returns True.
"""
@ -404,7 +414,7 @@ def validate_params_cd_fd(cmd_type, protocol, host, auth_obj,
r = requests.get(protocol + '://' + host + '/iRMC_Settings.pre',
params=param,
headers=header,
verify=False,
verify=verify,
timeout=client_timeout,
allow_redirects=False,
auth=auth_obj)
@ -486,7 +496,7 @@ def check_eject_fd_cmd(xml_cmd):
def get_client(host, userid, password, port=443, auth_method='basic',
client_timeout=60, **kwargs):
client_timeout=60, verify=True, **kwargs):
"""get SCCI command partial function
This function returns SCCI command partial function
@ -496,12 +506,17 @@ def get_client(host, userid, password, port=443, auth_method='basic',
:param port: port number of iRMC
:param auth_method: irmc_username
:param client_timeout: timeout for SCCI operations
:param verify: (optional) Either a boolean, in which case it
controls whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use. Defaults to ``True``.
:returns: scci_cmd partial function which takes a SCCI command param
"""
return functools.partial(scci_cmd, host, userid, password,
port=port, auth_method=auth_method,
client_timeout=client_timeout, **kwargs)
client_timeout=client_timeout,
verify=verify, **kwargs)
def get_virtual_cd_set_params_cmd(remote_image_server,
@ -568,7 +583,7 @@ def get_virtual_fd_set_params_cmd(remote_image_server,
def get_report(host, userid, password,
port=443, auth_method='basic', client_timeout=60):
port=443, auth_method='basic', client_timeout=60, verify=True):
"""get iRMC report
This function returns iRMC report in XML format
@ -578,6 +593,10 @@ def get_report(host, userid, password,
:param port: port number of iRMC
:param auth_method: irmc_username
:param client_timeout: timeout for SCCI operations
:param verify: (optional) Either a boolean, in which case it
controls whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use. Defaults to ``True``.
:returns: root element of SCCI report
:raises: ISCCIInvalidInputError if port and/or auth_method params
are invalid
@ -600,7 +619,7 @@ def get_report(host, userid, password,
try:
r = requests.get(protocol + '://' + host + '/report.xml',
verify=False,
verify=verify,
timeout=(10, client_timeout),
allow_redirects=False,
auth=auth_obj)
@ -868,6 +887,10 @@ def get_firmware_upgrade_status(irmc_info, upgrade_type):
'irmc_port': 80 or 443, default is 443,
'irmc_auth_method': 'basic' or 'digest', default is 'digest',
'irmc_client_timeout': timeout, default is 60,
'irmc_verify_ca': (optional) Either a boolean, in which case it
controls whether we verify the server's TLS
certificate, or a string, in which case it must be
a path to a CA bundle to use. Defaults to ``True``.
...
}
:param upgrade_type: flag to check upgrade with bios or irmc
@ -882,6 +905,7 @@ def get_firmware_upgrade_status(irmc_info, upgrade_type):
port = irmc_info.get('irmc_port', 443)
auth_method = irmc_info.get('irmc_auth_method', 'digest')
client_timeout = irmc_info.get('irmc_client_timeout', 60)
verify = irmc_info.get('irmc_verify_ca', True)
auth_obj = None
try:
@ -901,7 +925,7 @@ def get_firmware_upgrade_status(irmc_info, upgrade_type):
elif upgrade_type == 'irmc':
upgrade_type = '/irmcprogress'
r = requests.get(protocol + '://' + host + upgrade_type,
verify=False,
verify=verify,
timeout=(10, client_timeout),
allow_redirects=False,
auth=auth_obj)
@ -915,3 +939,5 @@ def get_firmware_upgrade_status(irmc_info, upgrade_type):
return upgrade_status_xml
except ET.ParseError as parse_error:
raise SCCIClientError(parse_error)
except requests.RequestException as requests_exception:
raise SCCIClientError(requests_exception)

View File

@ -223,7 +223,7 @@ class SCCITestCase(testtools.TestCase):
'https://' + self.irmc_address + '/config',
data=scci.POWER_ON,
headers={'Content-type': 'application/x-www-form-urlencoded'},
verify=False,
verify=True,
timeout=self.irmc_client_timeout,
allow_redirects=False,
auth=mock_requests.auth.HTTPBasicAuth(self.irmc_username,
@ -1026,9 +1026,9 @@ class SCCITestCase(testtools.TestCase):
result = scci.get_raid_fgi_status(report_fake)
self.assertEqual(result, fgi_status_expect)
@mock.patch('scciclient.irmc.scci.requests')
def test_fail_get_bios_firmware_status(self, mock_requests):
mock_requests.get.return_value = mock.Mock(
@mock.patch('scciclient.irmc.scci.requests.get')
def test_fail_get_bios_firmware_status(self, mock_requests_get):
mock_requests_get.return_value = mock.Mock(
status_code=404,
text="""</head>
<body>
@ -1049,9 +1049,9 @@ class SCCITestCase(testtools.TestCase):
scci.get_firmware_upgrade_status, self.irmc_info,
upgrade_type=upgrade_type)
@mock.patch('scciclient.irmc.scci.requests')
def test_success_get_bios_firmware_status(self, mock_requests):
mock_requests.get.return_value = mock.Mock(
@mock.patch('scciclient.irmc.scci.requests.get')
def test_success_get_bios_firmware_status(self, mock_requests_get):
mock_requests_get.return_value = mock.Mock(
return_value='ok',
status_code=200,
text="""<?xml version="1.0" encoding="UTF-8"?>
@ -1069,9 +1069,9 @@ class SCCITestCase(testtools.TestCase):
self.assertEqual(expected_severity, result.find("./Severity").text)
self.assertEqual(expected_message, result.find("./Message").text)
@mock.patch('scciclient.irmc.scci.requests')
def test_fail_get_irmc_firmware_status(self, mock_requests):
mock_requests.get.return_value = mock.Mock(
@mock.patch('scciclient.irmc.scci.requests.get')
def test_fail_get_irmc_firmware_status(self, mock_requests_get):
mock_requests_get.return_value = mock.Mock(
status_code=404,
text="""</head>
<body>
@ -1092,9 +1092,9 @@ class SCCITestCase(testtools.TestCase):
scci.get_firmware_upgrade_status, self.irmc_info,
upgrade_type=upgrade_type)
@mock.patch('scciclient.irmc.scci.requests')
def test_success_get_irmc_firmware_status(self, mock_requests):
mock_requests.get.return_value = mock.Mock(
@mock.patch('scciclient.irmc.scci.requests.get')
def test_success_get_irmc_firmware_status(self, mock_requests_get):
mock_requests_get.return_value = mock.Mock(
return_value='ok',
status_code=200,
text="""<?xml version="1.0" encoding="UTF-8"?>