diff --git a/requirements.txt b/requirements.txt index d767a91..52601ed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ pbr>=2.0.0 # Apache-2.0 Babel>=2.3.4 # BSD requests!=2.12.2,!=2.13.0,>=2.10.0 # Apache-2.0 six>=1.9.0 # MIT +oslo.utils>=3.20.0 # Apache-2.0 +oslo.serialization>=1.10.0 # Apache-2.0 diff --git a/scciclient/irmc/elcm.py b/scciclient/irmc/elcm.py new file mode 100644 index 0000000..5a9d773 --- /dev/null +++ b/scciclient/irmc/elcm.py @@ -0,0 +1,497 @@ +# Copyright 2016 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +eLCM functionality. +""" + +from oslo_serialization import jsonutils +import requests + +from scciclient.irmc import scci + + +""" +List of profile names +""" +PROFILE_BIOS_CONFIG = 'BiosConfig' + + +""" +List of URL paths for profiles +""" +URL_PATH_PROFILE_MGMT = '/rest/v1/Oem/eLCM/ProfileManagement/' + + +""" +List of request params for profiles +""" +PARAM_PATH_SYSTEM_CONFIG = 'Server/SystemConfig/' +PARAM_PATH_BIOS_CONFIG = PARAM_PATH_SYSTEM_CONFIG + PROFILE_BIOS_CONFIG + + +""" +Timeout values +""" +PROFILE_CREATE_TIMEOUT = 300 # 300 secs +PROFILE_SET_TIMEOUT = 300 # 300 secs + + +class ELCMInvalidResponse(scci.SCCIError): + """ELCMInvalidResponse""" + def __init__(self, message): + super(ELCMInvalidResponse, self).__init__(message) + + +class ELCMProfileNotFound(scci.SCCIError): + """ELCMProfileNotFound""" + def __init__(self, message): + super(ELCMProfileNotFound, self).__init__(message) + + +class ELCMSessionNotFound(scci.SCCIError): + """ELCMSessionNotFound""" + def __init__(self, message): + super(ELCMSessionNotFound, self).__init__(message) + + +def _parse_elcm_response_body_as_json(response): + """parse eLCM response body as json data + + eLCM response should be in form of: + _ + Key1: value1 <-- optional --> + Key2: value2 <-- optional --> + KeyN: valueN <-- optional --> + + - CRLF - + + JSON string + - + + :param response: eLCM response + :return: json object if success + :raise ELCMInvalidResponse: if the response does not contain valid + json data. + """ + try: + body = response.text + body_parts = body.split('\r\n') + if len(body_parts) > 0: + return jsonutils.loads(body_parts[-1]) + else: + return None + except (TypeError, ValueError): + raise ELCMInvalidResponse('eLCM response does not contain valid json ' + 'data. Response is "%s".' % body) + + +def elcm_request(irmc_info, method, path, **kwargs): + """send an eLCM request to the server + + :param irmc_info: dict of iRMC params to access the server node + { + 'irmc_address': host, + 'irmc_username': user_id, + 'irmc_password': password, + 'irmc_port': 80 or 443, default is 443, + 'irmc_auth_method': 'basic' or 'digest', default is 'basic', + 'irmc_client_timeout': timeout, default is 60, + ... + } + :param method: request method such as 'GET', 'POST' + :param path: url path for eLCM request + :returns: requests.Response from SCCI server + :raises SCCIInvalidInputError: if port and/or auth_method params + are invalid + :raises SCCIClientError: if SCCI failed + """ + host = irmc_info['irmc_address'] + port = irmc_info.get('irmc_port', 443) + auth_method = irmc_info.get('irmc_auth_method', 'basic') + userid = irmc_info['irmc_username'] + password = irmc_info['irmc_password'] + client_timeout = irmc_info.get('irmc_client_timeout', 60) + + # Request headers, params, and data + headers = kwargs.get('headers', {'Accept': 'application/json'}) + params = kwargs.get('params') + data = kwargs.get('data') + + auth_obj = None + try: + protocol = {80: 'http', 443: 'https'}[port] + auth_obj = { + 'basic': requests.auth.HTTPBasicAuth(userid, password), + 'digest': requests.auth.HTTPDigestAuth(userid, password) + }[auth_method.lower()] + + except KeyError: + raise scci.SCCIInvalidInputError( + ("Invalid port %(port)d or " + + "auth_method for method %(auth_method)s") % + {'port': port, 'auth_method': auth_method}) + + try: + r = requests.request(method, + protocol + '://' + host + path, + headers=headers, + params=params, + data=data, + verify=False, + timeout=client_timeout, + allow_redirects=False, + auth=auth_obj) + except requests.exceptions.RequestException as requests_exception: + raise scci.SCCIClientError(requests_exception) + + # Process status_code 401 + if r.status_code == 401: + raise scci.SCCIClientError('UNAUTHORIZED') + + return r + + +def elcm_profile_list(irmc_info): + """send an eLCM request to list all profiles + + :param irmc_info: node info + :returns: dict object of profiles if succeed + { + 'Links': + { + 'profileStore': + [ + { '@odata.id': id1 }, + { '@odata.id': id2 }, + { '@odata.id': idN }, + ] + } + } + :raises: SCCIClientError if SCCI failed + """ + # Send GET request to the server + resp = elcm_request(irmc_info, + method='GET', + path=URL_PATH_PROFILE_MGMT) + + if resp.status_code == 200: + return _parse_elcm_response_body_as_json(resp) + else: + raise scci.SCCIClientError(('Failed to list profiles with ' + 'error code %s' % resp.status_code)) + + +def elcm_profile_get(irmc_info, profile_name): + """send an eLCM request to get profile data + + :param irmc_info: node info + :param profile_name: name of profile + :returns: dict object of profile data if succeed + :raises: ELCMProfileNotFound if profile does not exist + :raises: SCCIClientError if SCCI failed + """ + # Send GET request to the server + resp = elcm_request(irmc_info, + method='GET', + path=URL_PATH_PROFILE_MGMT + profile_name) + + if resp.status_code == 200: + return _parse_elcm_response_body_as_json(resp) + elif resp.status_code == 404: + raise ELCMProfileNotFound('Profile "%s" not found ' + 'in the profile store.' % profile_name) + else: + raise scci.SCCIClientError(('Failed to get profile "%(profile)s" with ' + 'error code %(error)s' % + {'profile': profile_name, + 'error': resp.status_code})) + + +def elcm_profile_create(irmc_info, param_path): + """send an eLCM request to create profile + + To create a profile, a new session is spawned with status 'running'. + When profile is created completely, the session ends. + + :param irmc_info: node info + :param param_path: path of profile + :returns: dict object of session info if succeed + { + 'Session': + { + 'Id': id + 'Status': 'activated' + ... + } + } + :raises: SCCIClientError if SCCI failed + """ + # Send POST request to the server + # NOTE: This task may take time, so set a timeout + _irmc_info = dict(irmc_info) + _irmc_info['irmc_client_timeout'] = PROFILE_CREATE_TIMEOUT + + resp = elcm_request(_irmc_info, + method='POST', + path=URL_PATH_PROFILE_MGMT + 'get', + params={'PARAM_PATH': param_path}) + + if resp.status_code == 202: + return _parse_elcm_response_body_as_json(resp) + else: + raise scci.SCCIClientError(('Failed to create profile for path ' + '"%(param_path)s" with error code ' + '%(error)s' % + {'param_path': param_path, + 'error': resp.status_code})) + + +def elcm_profile_set(irmc_info, input_data): + """send an eLCM request to set param values + + To apply param values, a new session is spawned with status 'running'. + When values are applied or error, the session ends. + + :param irmc_info: node info + :param input_data: param values to apply, eg. + { + 'Server': + { + 'SystemConfig': + { + 'BiosConfig': + { + '@Processing': 'execute', + -- config data -- + } + } + } + } + :returns: dict object of session info if succeed + { + 'Session': + { + 'Id': id + 'Status': 'activated' + ... + } + } + :raises: SCCIClientError if SCCI failed + """ + # Prepare the data to apply + if isinstance(input_data, dict): + data = jsonutils.dumps(input_data) + else: + data = input_data + + # Send POST request to the server + # NOTE: This task may take time, so set a timeout + _irmc_info = dict(irmc_info) + _irmc_info['irmc_client_timeout'] = PROFILE_SET_TIMEOUT + + resp = elcm_request(_irmc_info, + method='POST', + path=URL_PATH_PROFILE_MGMT + 'set', + headers={'Content-type': + 'application/x-www-form-urlencoded'}, + data=data) + + if resp.status_code == 202: + return _parse_elcm_response_body_as_json(resp) + else: + raise scci.SCCIClientError(('Failed to apply param values with ' + 'error code %(error)s' % + {'error': resp.status_code})) + + +def elcm_profile_delete(irmc_info, profile_name): + """send an eLCM request to delete a profile + + :param irmc_info: node info + :param profile_name: name of profile + :raises: ELCMProfileNotFound if the profile does not exist + :raises: SCCIClientError if SCCI failed + """ + # Send DELETE request to the server + resp = elcm_request(irmc_info, + method='DELETE', + path=URL_PATH_PROFILE_MGMT + profile_name) + + if resp.status_code == 200: + # Profile deleted + return + elif resp.status_code == 404: + # Profile not found + raise ELCMProfileNotFound('Profile "%s" not found ' + 'in the profile store.' % profile_name) + else: + raise scci.SCCIClientError(('Failed to delete profile "%(profile)s" ' + 'with error code %(error)s' % + {'profile': profile_name, + 'error': resp.status_code})) + + +def elcm_session_list(irmc_info): + """send an eLCM request to list all sessions + + :param irmc_info: node info + :returns: dict object of sessions if succeed + { + 'SessionList': + { + 'Contains': + [ + { 'Id': id1, 'Name': name1 }, + { 'Id': id2, 'Name': name2 }, + { 'Id': idN, 'Name': nameN }, + ] + } + } + :raises: SCCIClientError if SCCI failed + """ + # Send GET request to the server + resp = elcm_request(irmc_info, + method='GET', + path='/sessionInformation/') + + if resp.status_code == 200: + return _parse_elcm_response_body_as_json(resp) + else: + raise scci.SCCIClientError(('Failed to list sessions with ' + 'error code %s' % resp.status_code)) + + +def elcm_session_get_status(irmc_info, session_id): + """send an eLCM request to get session status + + :param irmc_info: node info + :param session_id: session id + :returns: dict object of session info if succeed + { + 'Session': + { + 'Id': id + 'Status': status + ... + } + } + :raises: ELCMSessionNotFound if the session does not exist + :raises: SCCIClientError if SCCI failed + """ + # Send GET request to the server + resp = elcm_request(irmc_info, + method='GET', + path='/sessionInformation/%s/status' % session_id) + + if resp.status_code == 200: + return _parse_elcm_response_body_as_json(resp) + elif resp.status_code == 404: + raise ELCMSessionNotFound('Session "%s" does not exist' % session_id) + else: + raise scci.SCCIClientError(('Failed to get status of session ' + '"%(session)s" with error code %(error)s' % + {'session': session_id, + 'error': resp.status_code})) + + +def elcm_session_get_log(irmc_info, session_id): + """send an eLCM request to get session log + + :param irmc_info: node info + :param session_id: session id + :returns: dict object of session log if succeed + { + 'Session': + { + 'Id': id + ... + } + } + :raises: ELCMSessionNotFound if the session does not exist + :raises: SCCIClientError if SCCI failed + """ + # Send GET request to the server + resp = elcm_request(irmc_info, + method='GET', + path='/sessionInformation/%s/log' % session_id) + + if resp.status_code == 200: + return _parse_elcm_response_body_as_json(resp) + elif resp.status_code == 404: + raise ELCMSessionNotFound('Session "%s" does not exist' % session_id) + else: + raise scci.SCCIClientError(('Failed to get log of session ' + '"%(session)s" with error code %(error)s' % + {'session': session_id, + 'error': resp.status_code})) + + +def elcm_session_terminate(irmc_info, session_id): + """send an eLCM request to terminate a session + + :param irmc_info: node info + :param session_id: session id + :raises: ELCMSessionNotFound if the session does not exist + :raises: SCCIClientError if SCCI failed + """ + # Send DELETE request to the server + resp = elcm_request(irmc_info, + method='DELETE', + path='/sessionInformation/%s/terminate' % session_id) + + if resp.status_code == 200: + return + elif resp.status_code == 404: + raise ELCMSessionNotFound('Session "%s" does not exist' % session_id) + else: + raise scci.SCCIClientError(('Failed to terminate session ' + '"%(session)s" with error code %(error)s' % + {'session': session_id, + 'error': resp.status_code})) + + +def elcm_session_delete(irmc_info, session_id, terminate=False): + """send an eLCM request to remove a session from the session list + + :param irmc_info: node info + :param session_id: session id + :param terminate: a running session must be terminated before removing + :raises: ELCMSessionNotFound if the session does not exist + :raises: SCCIClientError if SCCI failed + """ + # Terminate the session first if needs to + if terminate: + # Get session status to check + session = elcm_session_get_status(irmc_info, session_id) + status = session['Session']['Status'] + + # Terminate session if it is activated or running + if status == 'running' or status == 'activated': + elcm_session_terminate(irmc_info, session_id) + + # Send DELETE request to the server + resp = elcm_request(irmc_info, + method='DELETE', + path='/sessionInformation/%s/remove' % session_id) + + if resp.status_code == 200: + return + elif resp.status_code == 404: + raise ELCMSessionNotFound('Session "%s" does not exist' % session_id) + else: + raise scci.SCCIClientError(('Failed to remove session ' + '"%(session)s" with error code %(error)s' % + {'session': session_id, + 'error': resp.status_code})) diff --git a/scciclient/tests/irmc/test_elcm.py b/scciclient/tests/irmc/test_elcm.py new file mode 100644 index 0000000..401155c --- /dev/null +++ b/scciclient/tests/irmc/test_elcm.py @@ -0,0 +1,686 @@ +# Copyright 2016 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test class for iRMC eLCM functionality. +""" + +from oslo_utils import encodeutils +import requests +from requests_mock.contrib import fixture as rm_fixture +import testtools + +from scciclient.irmc import elcm +from scciclient.irmc import scci + + +class ELCMTestCase(testtools.TestCase): + """Tests for eLCM""" + + RESPONSE_TEMPLATE = ('Date: Mon, 07 Dec 2015 17:10:55 GMT\n' + 'Server: iRMC S4 Webserver\n' + 'Content-Length: 123\n' + 'Content-Type: application/json; charset=UTF-8\n' + '\r\n' + '{\n' + '%(json_text)s\n' + '}') + + def setUp(self): + super(ELCMTestCase, self).setUp() + + self.requests_mock = self.useFixture(rm_fixture.Fixture()) + + self.irmc_info = { + 'irmc_address': '10.124.196.159', + 'irmc_username': 'admin', + 'irmc_password': 'admin0', + 'irmc_port': 80, + 'irmc_auth_method': 'basic', + 'irmc_client_timeout': 60, + } + + def _create_server_url(self, path, port=None): + scheme = 'unknown' + + if port is None: + port = self.irmc_info['irmc_port'] + if port == 80: + scheme = 'http' + elif port == 443: + scheme = 'https' + + return ('%(scheme)s://%(addr)s%(path)s' % + {'scheme': scheme, + 'addr': self.irmc_info['irmc_address'], + 'path': path}) + + def _create_server_response(self, content): + response = requests.Response() + response._content = encodeutils.safe_encode(content) + response.encoding = 'utf-8' + return response + + def test__parse_elcm_response_body_as_json_empty(self): + response = self._create_server_response('') + self.assertRaises(elcm.ELCMInvalidResponse, + elcm._parse_elcm_response_body_as_json, + response=response) + + def test__parse_elcm_response_body_as_json_invalid(self): + content = 'abc123' + response = self._create_server_response(content) + self.assertRaises(elcm.ELCMInvalidResponse, + elcm._parse_elcm_response_body_as_json, + response=response) + + def test__parse_elcm_response_body_as_json_missing_empty_line(self): + content = ('key1:val1\nkey2:val2\n' + '{"1":1,"2":[123, "abc"],"3":3}') + response = self._create_server_response(content) + self.assertRaises(elcm.ELCMInvalidResponse, + elcm._parse_elcm_response_body_as_json, + response=response) + + def test__parse_elcm_response_body_as_json_ok(self): + content = ('key1:val1\nkey2:val2\n' + '\r\n' + '{"1":1,"2":[123, "abc"],"3":3}') + response = self._create_server_response(content) + result = elcm._parse_elcm_response_body_as_json(response) + + expected = { + "1": 1, + "2": [123, "abc"], + "3": 3 + } + self.assertEqual(expected, result) + + def test_elcm_request_protocol_http_ok(self): + irmc_info = dict(self.irmc_info) + irmc_info['irmc_port'] = 80 + + self.requests_mock.register_uri( + 'GET', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT, + port=80), + text=self.RESPONSE_TEMPLATE % {'json_text': 'abc123'}) + + response = elcm.elcm_request( + irmc_info, + method='GET', + path=elcm.URL_PATH_PROFILE_MGMT) + + self.assertEqual(200, response.status_code) + + expected = self.RESPONSE_TEMPLATE % {'json_text': 'abc123'} + self.assertEqual(expected, response.text) + + def test_elcm_request_protocol_https_ok(self): + irmc_info = dict(self.irmc_info) + irmc_info['irmc_port'] = 443 + + self.requests_mock.register_uri( + 'GET', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT, + port=443), + text=self.RESPONSE_TEMPLATE % {'json_text': 'abc123'}) + + response = elcm.elcm_request( + irmc_info, + method='GET', + path=elcm.URL_PATH_PROFILE_MGMT) + + self.assertEqual(200, response.status_code) + + expected = self.RESPONSE_TEMPLATE % {'json_text': 'abc123'} + self.assertEqual(expected, response.text) + + def test_elcm_request_unsupported_port(self): + irmc_info = dict(self.irmc_info) + irmc_info['irmc_port'] = 22 + + e = self.assertRaises(scci.SCCIInvalidInputError, + elcm.elcm_request, + irmc_info, + method='GET', + path=elcm.URL_PATH_PROFILE_MGMT) + + auth_method = self.irmc_info['irmc_auth_method'] + self.assertEqual((("Invalid port %(port)d or " + "auth_method for method %(auth_method)s") % + {'port': 22, + 'auth_method': auth_method}), str(e)) + + def test_elcm_request_auth_method_basic(self): + irmc_info = dict(self.irmc_info) + irmc_info['irmc_auth_method'] = 'basic' + + self.requests_mock.register_uri( + 'GET', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT), + status_code=200, + text='ok') + + response = elcm.elcm_request( + irmc_info, + method='GET', + path=elcm.URL_PATH_PROFILE_MGMT) + + self.assertEqual(200, response.status_code) + self.assertEqual('ok', response.text) + + def test_elcm_request_auth_method_digest(self): + irmc_info = dict(self.irmc_info) + irmc_info['irmc_auth_method'] = 'digest' + + self.requests_mock.register_uri( + 'GET', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT), + status_code=200, + text='ok') + + response = elcm.elcm_request( + irmc_info, + method='GET', + path=elcm.URL_PATH_PROFILE_MGMT) + + self.assertEqual(200, response.status_code) + self.assertEqual('ok', response.text) + + def test_elcm_request_unsupported_auth_method(self): + irmc_info = dict(self.irmc_info) + irmc_info['irmc_auth_method'] = 'unknown' + + e = self.assertRaises(scci.SCCIInvalidInputError, + elcm.elcm_request, + irmc_info, + method='GET', + path=elcm.URL_PATH_PROFILE_MGMT) + + port = self.irmc_info['irmc_port'] + self.assertEqual((("Invalid port %(port)d or " + "auth_method for method %(auth_method)s") % + {'port': port, + 'auth_method': 'unknown'}), str(e)) + + def test_elcm_request_auth_failed(self): + self.requests_mock.register_uri( + 'GET', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT), + status_code=401, + text='401 Unauthorized') + + e = self.assertRaises(scci.SCCIClientError, + elcm.elcm_request, + self.irmc_info, + method='GET', + path=elcm.URL_PATH_PROFILE_MGMT) + + self.assertEqual('UNAUTHORIZED', str(e)) + + def test_elcm_profile_get_not_found(self): + profile_name = elcm.PROFILE_BIOS_CONFIG + self.requests_mock.register_uri( + 'GET', + self._create_server_url((elcm.URL_PATH_PROFILE_MGMT + + profile_name)), + status_code=404) + + self.assertRaises(elcm.ELCMProfileNotFound, + elcm.elcm_profile_get, + self.irmc_info, + profile_name=profile_name) + + def test_elcm_profile_list_failed(self): + self.requests_mock.register_uri( + 'GET', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT), + status_code=503) + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_profile_list, + self.irmc_info) + + def test_elcm_profile_list_ok(self): + self.requests_mock.register_uri( + 'GET', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT), + text=(self.RESPONSE_TEMPLATE % { + 'json_text': + '"Links":{' + ' "profileStore":[' + ' {"@odata.id": "id1"},' + ' {"@odata.id": "id2"}' + ' ]' + '}'})) + + result = elcm.elcm_profile_list(self.irmc_info) + + expected = { + "Links": { + "profileStore": [ + {"@odata.id": "id1"}, + {"@odata.id": "id2"} + ] + } + } + self.assertEqual(expected, result) + + def test_elcm_profile_get_failed(self): + profile_name = elcm.PROFILE_BIOS_CONFIG + self.requests_mock.register_uri( + 'GET', + self._create_server_url((elcm.URL_PATH_PROFILE_MGMT + + profile_name)), + status_code=500) + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_profile_get, + self.irmc_info, + profile_name=profile_name) + + def test_elcm_profile_get_ok(self): + profile_name = elcm.PROFILE_BIOS_CONFIG + self.requests_mock.register_uri( + 'GET', + self._create_server_url((elcm.URL_PATH_PROFILE_MGMT + + profile_name)), + text=(self.RESPONSE_TEMPLATE % { + 'json_text': + '"Server":{' + ' "SystemConfig":{' + ' "BiosConfig":{' + ' "key":"val"' + ' }' + ' }' + '}'})) + + result = elcm.elcm_profile_get( + self.irmc_info, + profile_name=profile_name) + + expected = { + "Server": { + "SystemConfig": { + "BiosConfig": { + "key": "val" + } + } + } + } + self.assertEqual(expected, result) + + def test_elcm_profile_create_failed(self): + param_path = elcm.PARAM_PATH_BIOS_CONFIG + self.requests_mock.register_uri( + 'POST', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT + 'get'), + status_code=200) # Success code is 202 + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_profile_create, + self.irmc_info, + param_path=param_path) + + def test_elcm_profile_create_ok(self): + param_path = elcm.PARAM_PATH_BIOS_CONFIG + self.requests_mock.register_uri( + 'POST', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT + 'get'), + status_code=202, # Success code is 202 + text=(self.RESPONSE_TEMPLATE % { + 'json_text': + '"Session":{' + ' "Id": 123,' + ' "Status": "activated"' + '}'})) + + result = elcm.elcm_profile_create(self.irmc_info, + param_path=param_path) + + expected = { + "Session": { + "Id": 123, + "Status": "activated", + } + } + self.assertEqual(expected, result) + + def test_elcm_profile_set_failed(self): + input_data = { + "Server": { + "SystemConfig": { + "BiosConfig": { + "key": "val" + } + } + } + } + self.requests_mock.register_uri( + 'POST', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT + 'set'), + status_code=200) # Success code is 202 + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_profile_set, + self.irmc_info, + input_data=input_data) + + def test_elcm_profile_set_ok(self): + input_data = { + "Server": { + "SystemConfig": { + "BiosConfig": { + "key": "val" + } + } + } + } + self.requests_mock.register_uri( + 'POST', + self._create_server_url(elcm.URL_PATH_PROFILE_MGMT + 'set'), + status_code=202, # Success code is 202 + text=(self.RESPONSE_TEMPLATE % { + 'json_text': + '"Session":{' + ' "Id": 123,' + ' "Status": "activated"' + '}'})) + + result = elcm.elcm_profile_set(self.irmc_info, + input_data=input_data) + + expected = { + "Session": { + "Id": 123, + "Status": "activated", + } + } + self.assertEqual(expected, result) + + def test_elcm_profile_delete_not_found(self): + profile_name = elcm.PROFILE_BIOS_CONFIG + self.requests_mock.register_uri( + 'DELETE', + self._create_server_url((elcm.URL_PATH_PROFILE_MGMT + + profile_name)), + status_code=404) + + self.assertRaises(elcm.ELCMProfileNotFound, + elcm.elcm_profile_delete, + self.irmc_info, + profile_name=profile_name) + + def test_elcm_profile_delete_failed(self): + profile_name = elcm.PROFILE_BIOS_CONFIG + self.requests_mock.register_uri( + 'DELETE', + self._create_server_url((elcm.URL_PATH_PROFILE_MGMT + + profile_name)), + status_code=500) + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_profile_delete, + self.irmc_info, + profile_name=profile_name) + + def test_elcm_profile_delete_ok(self): + profile_name = elcm.PROFILE_BIOS_CONFIG + self.requests_mock.register_uri( + 'DELETE', + self._create_server_url((elcm.URL_PATH_PROFILE_MGMT + + profile_name)), + text='ok') + + result = elcm.elcm_profile_delete(self.irmc_info, + profile_name=profile_name) + + self.assertIsNone(result) + + def test_elcm_session_list_failed(self): + self.requests_mock.register_uri( + 'GET', + self._create_server_url('/sessionInformation/'), + status_code=500) + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_session_list, + self.irmc_info) + + def test_elcm_session_list_empty(self): + self.requests_mock.register_uri( + 'GET', + self._create_server_url('/sessionInformation/'), + text=(self.RESPONSE_TEMPLATE % { + 'json_text': + '"SessionList":{' + ' "Contains":[' + ' ]' + '}'})) + + result = elcm.elcm_session_list(self.irmc_info) + + expected = { + "SessionList": { + "Contains": [ + ] + } + } + self.assertEqual(expected, result) + + def test_elcm_session_list_ok(self): + self.requests_mock.register_uri( + 'GET', + self._create_server_url('/sessionInformation/'), + text=(self.RESPONSE_TEMPLATE % { + 'json_text': + '"SessionList":{' + ' "Contains":[' + ' { "Id": 1, "Name": "name1" },' + ' { "Id": 2, "Name": "name2" },' + ' { "Id": 3, "Name": "name3" }' + ' ]' + '}'})) + + result = elcm.elcm_session_list(self.irmc_info) + + expected = { + "SessionList": { + "Contains": [ + {"Id": 1, "Name": "name1"}, + {"Id": 2, "Name": "name2"}, + {"Id": 3, "Name": "name3"} + ] + } + } + self.assertEqual(expected, result) + + def test_elcm_session_get_status_not_found(self): + session_id = 123 + self.requests_mock.register_uri( + 'GET', + self._create_server_url(('/sessionInformation/%s/status' % + session_id)), + status_code=404) + + self.assertRaises(elcm.ELCMSessionNotFound, + elcm.elcm_session_get_status, + self.irmc_info, + session_id=session_id) + + def test_elcm_session_get_status_failed(self): + session_id = 123 + self.requests_mock.register_uri( + 'GET', + self._create_server_url(('/sessionInformation/%s/status' % + session_id)), + status_code=500) + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_session_get_status, + self.irmc_info, + session_id=session_id) + + def test_elcm_session_get_status_ok(self): + session_id = 123 + self.requests_mock.register_uri( + 'GET', + self._create_server_url(('/sessionInformation/%s/status' % + session_id)), + text=(self.RESPONSE_TEMPLATE % { + 'json_text': + '"Session":{' + ' "Id": 123,' + ' "Status": "abc123"' + '}'})) + + result = elcm.elcm_session_get_status(self.irmc_info, + session_id=session_id) + + expected = { + "Session": { + "Id": 123, + "Status": "abc123", + } + } + self.assertEqual(expected, result) + + def test_elcm_session_get_log_not_found(self): + session_id = 123 + self.requests_mock.register_uri( + 'GET', + self._create_server_url(('/sessionInformation/%s/log' % + session_id)), + status_code=404) + + self.assertRaises(elcm.ELCMSessionNotFound, + elcm.elcm_session_get_log, + self.irmc_info, + session_id=session_id) + + def test_elcm_session_get_log_failed(self): + session_id = 123 + self.requests_mock.register_uri( + 'GET', + self._create_server_url(('/sessionInformation/%s/log' % + session_id)), + status_code=500) + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_session_get_log, + self.irmc_info, + session_id=session_id) + + def test_elcm_session_get_log_ok(self): + session_id = 123 + self.requests_mock.register_uri( + 'GET', + self._create_server_url(('/sessionInformation/%s/log' % + session_id)), + text=(self.RESPONSE_TEMPLATE % { + 'json_text': + '"Session":{' + ' "Id": 123,' + ' "A_Param": "abc123"' + '}'})) + + result = elcm.elcm_session_get_log(self.irmc_info, + session_id=session_id) + + expected = { + "Session": { + "Id": 123, + "A_Param": "abc123", + } + } + self.assertEqual(expected, result) + + def test_elcm_session_terminate_not_found(self): + session_id = 123 + self.requests_mock.register_uri( + 'DELETE', + self._create_server_url(('/sessionInformation/%s/terminate' % + session_id)), + status_code=404) + + self.assertRaises(elcm.ELCMSessionNotFound, + elcm.elcm_session_terminate, + self.irmc_info, + session_id=session_id) + + def test_elcm_session_terminate_failed(self): + session_id = 123 + self.requests_mock.register_uri( + 'DELETE', + self._create_server_url(('/sessionInformation/%s/terminate' % + session_id)), + status_code=500) + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_session_terminate, + self.irmc_info, + session_id=session_id) + + def test_elcm_session_terminate_ok(self): + session_id = 123 + self.requests_mock.register_uri( + 'DELETE', + self._create_server_url(('/sessionInformation/%s/terminate' % + session_id)), + text='ok') + + result = elcm.elcm_session_terminate(self.irmc_info, + session_id=session_id) + + self.assertIsNone(result) + + def test_elcm_session_delete_not_found(self): + session_id = 123 + self.requests_mock.register_uri( + 'DELETE', + self._create_server_url(('/sessionInformation/%s/remove' % + session_id)), + status_code=404) + + self.assertRaises(elcm.ELCMSessionNotFound, + elcm.elcm_session_delete, + self.irmc_info, + session_id=session_id) + + def test_elcm_session_delete_failed(self): + session_id = 123 + self.requests_mock.register_uri( + 'DELETE', + self._create_server_url(('/sessionInformation/%s/remove' % + session_id)), + status_code=500) + + self.assertRaises(scci.SCCIClientError, + elcm.elcm_session_delete, + self.irmc_info, + session_id=session_id) + + def test_elcm_session_delete_ok(self): + session_id = 123 + self.requests_mock.register_uri( + 'DELETE', + self._create_server_url(('/sessionInformation/%s/remove' % + session_id)), + text='ok') + + result = elcm.elcm_session_delete(self.irmc_info, + session_id=session_id) + + self.assertIsNone(result)