Add parameter to specify certification file

This patch adds functions & methods which have been used to connect
to iRMC via HTTPS to accept additional parameter.
With additional parameter, user is able to specify certification file.

Co-authored-by: Kobayashi Daisuke <kobayashi.da-06@fujitsu.com>
Change-Id: I51203e16207f8d3b1448b581942111bff60d0c86
This commit is contained in:
Vanou Ishii
2022-06-01 17:40:12 +09:00
parent 6308fb304f
commit 38792bc898
3 changed files with 56 additions and 25 deletions

View File

@@ -188,6 +188,10 @@ def elcm_request(irmc_info, method, path, **kwargs):
'irmc_port': 80 or 443, default is 443, 'irmc_port': 80 or 443, default is 443,
'irmc_auth_method': 'basic' or 'digest', default is 'basic', 'irmc_auth_method': 'basic' or 'digest', default is 'basic',
'irmc_client_timeout': timeout, default is 60, 'irmc_client_timeout': timeout, default is 60,
'irmc_verify_ca': Either a boolean, in which case it controls
whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use. Defaults to ``True``.
... ...
} }
:param method: request method such as 'GET', 'POST' :param method: request method such as 'GET', 'POST'
@@ -203,6 +207,7 @@ def elcm_request(irmc_info, method, path, **kwargs):
userid = irmc_info['irmc_username'] userid = irmc_info['irmc_username']
password = irmc_info['irmc_password'] password = irmc_info['irmc_password']
client_timeout = irmc_info.get('irmc_client_timeout', 60) client_timeout = irmc_info.get('irmc_client_timeout', 60)
verify = irmc_info.get('irmc_verify_ca', True)
# Request headers, params, and data # Request headers, params, and data
headers = kwargs.get('headers', {'Accept': 'application/json'}) headers = kwargs.get('headers', {'Accept': 'application/json'})
@@ -229,7 +234,7 @@ def elcm_request(irmc_info, method, path, **kwargs):
headers=headers, headers=headers,
params=params, params=params,
data=data, data=data,
verify=False, verify=verify,
timeout=client_timeout, timeout=client_timeout,
allow_redirects=False, allow_redirects=False,
auth=auth_obj) auth=auth_obj)

View File

@@ -264,7 +264,7 @@ def get_share_type(share_type):
def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic', def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
client_timeout=60, do_async=True, **kwargs): client_timeout=60, do_async=True, verify=True, **kwargs):
"""execute SCCI command """execute SCCI command
This function calls SCCI server modules This function calls SCCI server modules
@@ -276,6 +276,10 @@ def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
:param auth_method: irmc_username :param auth_method: irmc_username
:param client_timeout: timeout for SCCI operations :param client_timeout: timeout for SCCI operations
:param do_async: async call if True, sync call otherwise :param do_async: async call if True, sync call otherwise
:param verify: (optional) Either a boolean, in which case it
controls whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use. Defaults to ``True``.
:returns: requests.Response from SCCI server :returns: requests.Response from SCCI server
:raises: SCCIInvalidInputError if port and/or auth_method params :raises: SCCIInvalidInputError if port and/or auth_method params
are invalid are invalid
@@ -310,12 +314,14 @@ def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
if check_eject_cd_cmd(cmd): if check_eject_cd_cmd(cmd):
if not validate_params_cd_fd("cmd_cd", protocol, if not validate_params_cd_fd("cmd_cd", protocol,
host, auth_obj, host, auth_obj,
do_async, client_timeout): do_async, client_timeout,
verify):
return return
if check_eject_fd_cmd(cmd): if check_eject_fd_cmd(cmd):
if not validate_params_cd_fd("cmd_fd", protocol, if not validate_params_cd_fd("cmd_fd", protocol,
host, auth_obj, host, auth_obj,
do_async, client_timeout): do_async, client_timeout,
verify):
return return
data = cmd data = cmd
config_type = '/config' config_type = '/config'
@@ -323,7 +329,7 @@ def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
r = requests.post(protocol + '://' + host + config_type, r = requests.post(protocol + '://' + host + config_type,
data=data, data=data,
headers=header, headers=header,
verify=False, verify=verify,
timeout=client_timeout, timeout=client_timeout,
allow_redirects=False, allow_redirects=False,
auth=auth_obj) auth=auth_obj)
@@ -373,7 +379,7 @@ def scci_cmd(host, userid, password, cmd, port=443, auth_method='basic',
def validate_params_cd_fd(cmd_type, protocol, host, auth_obj, def validate_params_cd_fd(cmd_type, protocol, host, auth_obj,
do_async, client_timeout): do_async, client_timeout, verify):
"""Validate parameters of CD/DVD or FD Image Virtual Media in iRMC """Validate parameters of CD/DVD or FD Image Virtual Media in iRMC
If one of parameters (ImageServer, ImageShareName or ImageName) set in If one of parameters (ImageServer, ImageShareName or ImageName) set in
@@ -386,6 +392,10 @@ def validate_params_cd_fd(cmd_type, protocol, host, auth_obj,
:param auth_obj: irmc userid/password :param auth_obj: irmc userid/password
:param do_async: async call if True, sync call otherwise :param do_async: async call if True, sync call otherwise
:param client_timeout: timeout for SCCI operations :param client_timeout: timeout for SCCI operations
:param verify: Either a boolean, in which case it
controls whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use.
:return: False if one of param is null. Otherwise, returns True. :return: False if one of param is null. Otherwise, returns True.
""" """
@@ -404,7 +414,7 @@ def validate_params_cd_fd(cmd_type, protocol, host, auth_obj,
r = requests.get(protocol + '://' + host + '/iRMC_Settings.pre', r = requests.get(protocol + '://' + host + '/iRMC_Settings.pre',
params=param, params=param,
headers=header, headers=header,
verify=False, verify=verify,
timeout=client_timeout, timeout=client_timeout,
allow_redirects=False, allow_redirects=False,
auth=auth_obj) auth=auth_obj)
@@ -486,7 +496,7 @@ def check_eject_fd_cmd(xml_cmd):
def get_client(host, userid, password, port=443, auth_method='basic', def get_client(host, userid, password, port=443, auth_method='basic',
client_timeout=60, **kwargs): client_timeout=60, verify=True, **kwargs):
"""get SCCI command partial function """get SCCI command partial function
This function returns SCCI command partial function This function returns SCCI command partial function
@@ -496,12 +506,17 @@ def get_client(host, userid, password, port=443, auth_method='basic',
:param port: port number of iRMC :param port: port number of iRMC
:param auth_method: irmc_username :param auth_method: irmc_username
:param client_timeout: timeout for SCCI operations :param client_timeout: timeout for SCCI operations
:param verify: (optional) Either a boolean, in which case it
controls whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use. Defaults to ``True``.
:returns: scci_cmd partial function which takes a SCCI command param :returns: scci_cmd partial function which takes a SCCI command param
""" """
return functools.partial(scci_cmd, host, userid, password, return functools.partial(scci_cmd, host, userid, password,
port=port, auth_method=auth_method, port=port, auth_method=auth_method,
client_timeout=client_timeout, **kwargs) client_timeout=client_timeout,
verify=verify, **kwargs)
def get_virtual_cd_set_params_cmd(remote_image_server, def get_virtual_cd_set_params_cmd(remote_image_server,
@@ -568,7 +583,7 @@ def get_virtual_fd_set_params_cmd(remote_image_server,
def get_report(host, userid, password, def get_report(host, userid, password,
port=443, auth_method='basic', client_timeout=60): port=443, auth_method='basic', client_timeout=60, verify=True):
"""get iRMC report """get iRMC report
This function returns iRMC report in XML format This function returns iRMC report in XML format
@@ -578,6 +593,10 @@ def get_report(host, userid, password,
:param port: port number of iRMC :param port: port number of iRMC
:param auth_method: irmc_username :param auth_method: irmc_username
:param client_timeout: timeout for SCCI operations :param client_timeout: timeout for SCCI operations
:param verify: (optional) Either a boolean, in which case it
controls whether we verify the server's TLS certificate,
or a string, in which case it must be a path to
a CA bundle to use. Defaults to ``True``.
:returns: root element of SCCI report :returns: root element of SCCI report
:raises: ISCCIInvalidInputError if port and/or auth_method params :raises: ISCCIInvalidInputError if port and/or auth_method params
are invalid are invalid
@@ -600,7 +619,7 @@ def get_report(host, userid, password,
try: try:
r = requests.get(protocol + '://' + host + '/report.xml', r = requests.get(protocol + '://' + host + '/report.xml',
verify=False, verify=verify,
timeout=(10, client_timeout), timeout=(10, client_timeout),
allow_redirects=False, allow_redirects=False,
auth=auth_obj) auth=auth_obj)
@@ -868,6 +887,10 @@ def get_firmware_upgrade_status(irmc_info, upgrade_type):
'irmc_port': 80 or 443, default is 443, 'irmc_port': 80 or 443, default is 443,
'irmc_auth_method': 'basic' or 'digest', default is 'digest', 'irmc_auth_method': 'basic' or 'digest', default is 'digest',
'irmc_client_timeout': timeout, default is 60, 'irmc_client_timeout': timeout, default is 60,
'irmc_verify_ca': (optional) Either a boolean, in which case it
controls whether we verify the server's TLS
certificate, or a string, in which case it must be
a path to a CA bundle to use. Defaults to ``True``.
... ...
} }
:param upgrade_type: flag to check upgrade with bios or irmc :param upgrade_type: flag to check upgrade with bios or irmc
@@ -882,6 +905,7 @@ def get_firmware_upgrade_status(irmc_info, upgrade_type):
port = irmc_info.get('irmc_port', 443) port = irmc_info.get('irmc_port', 443)
auth_method = irmc_info.get('irmc_auth_method', 'digest') auth_method = irmc_info.get('irmc_auth_method', 'digest')
client_timeout = irmc_info.get('irmc_client_timeout', 60) client_timeout = irmc_info.get('irmc_client_timeout', 60)
verify = irmc_info.get('irmc_verify_ca', True)
auth_obj = None auth_obj = None
try: try:
@@ -901,7 +925,7 @@ def get_firmware_upgrade_status(irmc_info, upgrade_type):
elif upgrade_type == 'irmc': elif upgrade_type == 'irmc':
upgrade_type = '/irmcprogress' upgrade_type = '/irmcprogress'
r = requests.get(protocol + '://' + host + upgrade_type, r = requests.get(protocol + '://' + host + upgrade_type,
verify=False, verify=verify,
timeout=(10, client_timeout), timeout=(10, client_timeout),
allow_redirects=False, allow_redirects=False,
auth=auth_obj) auth=auth_obj)
@@ -915,3 +939,5 @@ def get_firmware_upgrade_status(irmc_info, upgrade_type):
return upgrade_status_xml return upgrade_status_xml
except ET.ParseError as parse_error: except ET.ParseError as parse_error:
raise SCCIClientError(parse_error) raise SCCIClientError(parse_error)
except requests.RequestException as requests_exception:
raise SCCIClientError(requests_exception)

View File

@@ -223,7 +223,7 @@ class SCCITestCase(testtools.TestCase):
'https://' + self.irmc_address + '/config', 'https://' + self.irmc_address + '/config',
data=scci.POWER_ON, data=scci.POWER_ON,
headers={'Content-type': 'application/x-www-form-urlencoded'}, headers={'Content-type': 'application/x-www-form-urlencoded'},
verify=False, verify=True,
timeout=self.irmc_client_timeout, timeout=self.irmc_client_timeout,
allow_redirects=False, allow_redirects=False,
auth=mock_requests.auth.HTTPBasicAuth(self.irmc_username, auth=mock_requests.auth.HTTPBasicAuth(self.irmc_username,
@@ -1026,9 +1026,9 @@ class SCCITestCase(testtools.TestCase):
result = scci.get_raid_fgi_status(report_fake) result = scci.get_raid_fgi_status(report_fake)
self.assertEqual(result, fgi_status_expect) self.assertEqual(result, fgi_status_expect)
@mock.patch('scciclient.irmc.scci.requests') @mock.patch('scciclient.irmc.scci.requests.get')
def test_fail_get_bios_firmware_status(self, mock_requests): def test_fail_get_bios_firmware_status(self, mock_requests_get):
mock_requests.get.return_value = mock.Mock( mock_requests_get.return_value = mock.Mock(
status_code=404, status_code=404,
text="""</head> text="""</head>
<body> <body>
@@ -1049,9 +1049,9 @@ class SCCITestCase(testtools.TestCase):
scci.get_firmware_upgrade_status, self.irmc_info, scci.get_firmware_upgrade_status, self.irmc_info,
upgrade_type=upgrade_type) upgrade_type=upgrade_type)
@mock.patch('scciclient.irmc.scci.requests') @mock.patch('scciclient.irmc.scci.requests.get')
def test_success_get_bios_firmware_status(self, mock_requests): def test_success_get_bios_firmware_status(self, mock_requests_get):
mock_requests.get.return_value = mock.Mock( mock_requests_get.return_value = mock.Mock(
return_value='ok', return_value='ok',
status_code=200, status_code=200,
text="""<?xml version="1.0" encoding="UTF-8"?> text="""<?xml version="1.0" encoding="UTF-8"?>
@@ -1069,9 +1069,9 @@ class SCCITestCase(testtools.TestCase):
self.assertEqual(expected_severity, result.find("./Severity").text) self.assertEqual(expected_severity, result.find("./Severity").text)
self.assertEqual(expected_message, result.find("./Message").text) self.assertEqual(expected_message, result.find("./Message").text)
@mock.patch('scciclient.irmc.scci.requests') @mock.patch('scciclient.irmc.scci.requests.get')
def test_fail_get_irmc_firmware_status(self, mock_requests): def test_fail_get_irmc_firmware_status(self, mock_requests_get):
mock_requests.get.return_value = mock.Mock( mock_requests_get.return_value = mock.Mock(
status_code=404, status_code=404,
text="""</head> text="""</head>
<body> <body>
@@ -1092,9 +1092,9 @@ class SCCITestCase(testtools.TestCase):
scci.get_firmware_upgrade_status, self.irmc_info, scci.get_firmware_upgrade_status, self.irmc_info,
upgrade_type=upgrade_type) upgrade_type=upgrade_type)
@mock.patch('scciclient.irmc.scci.requests') @mock.patch('scciclient.irmc.scci.requests.get')
def test_success_get_irmc_firmware_status(self, mock_requests): def test_success_get_irmc_firmware_status(self, mock_requests_get):
mock_requests.get.return_value = mock.Mock( mock_requests_get.return_value = mock.Mock(
return_value='ok', return_value='ok',
status_code=200, status_code=200,
text="""<?xml version="1.0" encoding="UTF-8"?> text="""<?xml version="1.0" encoding="UTF-8"?>