Implement eLCM functionality for iRMC SCCI

Implemented:
- elcm_request: base eLCM request handling

- elcm_profile_list: list existing profiles in the profile store
- elcm_profile_get: get a profile from the profile store
- elcm_profile_create: create a profile in the profile store
- elcm_profile_set: set value data for a profile
- elcm_profile_delete: delete a profile from the profile store

- elcm_session_list: list all current sessions
- elcm_session_get_status: get status of a session
- elcm_session_get_log: get log data of a session
- elcm_session_terminate: terminate a running session
- elcm_session_delete: delete a session

Related-Bug: #1639688

Change-Id: I03df3780978f35609c7b423c60b06e0e09a6568b
This commit is contained in:
Dao Cong Tien
2016-12-26 19:02:19 +07:00
parent dade813f11
commit 689212e270
3 changed files with 1185 additions and 0 deletions

View File

@@ -6,3 +6,5 @@ pbr>=2.0.0 # Apache-2.0
Babel>=2.3.4 # BSD
requests!=2.12.2,!=2.13.0,>=2.10.0 # Apache-2.0
six>=1.9.0 # MIT
oslo.utils>=3.20.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0

497
scciclient/irmc/elcm.py Normal file
View File

@@ -0,0 +1,497 @@
# Copyright 2016 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
eLCM functionality.
"""
from oslo_serialization import jsonutils
import requests
from scciclient.irmc import scci
"""
List of profile names
"""
PROFILE_BIOS_CONFIG = 'BiosConfig'
"""
List of URL paths for profiles
"""
URL_PATH_PROFILE_MGMT = '/rest/v1/Oem/eLCM/ProfileManagement/'
"""
List of request params for profiles
"""
PARAM_PATH_SYSTEM_CONFIG = 'Server/SystemConfig/'
PARAM_PATH_BIOS_CONFIG = PARAM_PATH_SYSTEM_CONFIG + PROFILE_BIOS_CONFIG
"""
Timeout values
"""
PROFILE_CREATE_TIMEOUT = 300 # 300 secs
PROFILE_SET_TIMEOUT = 300 # 300 secs
class ELCMInvalidResponse(scci.SCCIError):
"""ELCMInvalidResponse"""
def __init__(self, message):
super(ELCMInvalidResponse, self).__init__(message)
class ELCMProfileNotFound(scci.SCCIError):
"""ELCMProfileNotFound"""
def __init__(self, message):
super(ELCMProfileNotFound, self).__init__(message)
class ELCMSessionNotFound(scci.SCCIError):
"""ELCMSessionNotFound"""
def __init__(self, message):
super(ELCMSessionNotFound, self).__init__(message)
def _parse_elcm_response_body_as_json(response):
"""parse eLCM response body as json data
eLCM response should be in form of:
_
Key1: value1 <-- optional -->
Key2: value2 <-- optional -->
KeyN: valueN <-- optional -->
- CRLF -
JSON string
-
:param response: eLCM response
:return: json object if success
:raise ELCMInvalidResponse: if the response does not contain valid
json data.
"""
try:
body = response.text
body_parts = body.split('\r\n')
if len(body_parts) > 0:
return jsonutils.loads(body_parts[-1])
else:
return None
except (TypeError, ValueError):
raise ELCMInvalidResponse('eLCM response does not contain valid json '
'data. Response is "%s".' % body)
def elcm_request(irmc_info, method, path, **kwargs):
"""send an eLCM request to the server
:param irmc_info: dict of iRMC params to access the server node
{
'irmc_address': host,
'irmc_username': user_id,
'irmc_password': password,
'irmc_port': 80 or 443, default is 443,
'irmc_auth_method': 'basic' or 'digest', default is 'basic',
'irmc_client_timeout': timeout, default is 60,
...
}
:param method: request method such as 'GET', 'POST'
:param path: url path for eLCM request
:returns: requests.Response from SCCI server
:raises SCCIInvalidInputError: if port and/or auth_method params
are invalid
:raises SCCIClientError: if SCCI failed
"""
host = irmc_info['irmc_address']
port = irmc_info.get('irmc_port', 443)
auth_method = irmc_info.get('irmc_auth_method', 'basic')
userid = irmc_info['irmc_username']
password = irmc_info['irmc_password']
client_timeout = irmc_info.get('irmc_client_timeout', 60)
# Request headers, params, and data
headers = kwargs.get('headers', {'Accept': 'application/json'})
params = kwargs.get('params')
data = kwargs.get('data')
auth_obj = None
try:
protocol = {80: 'http', 443: 'https'}[port]
auth_obj = {
'basic': requests.auth.HTTPBasicAuth(userid, password),
'digest': requests.auth.HTTPDigestAuth(userid, password)
}[auth_method.lower()]
except KeyError:
raise scci.SCCIInvalidInputError(
("Invalid port %(port)d or " +
"auth_method for method %(auth_method)s") %
{'port': port, 'auth_method': auth_method})
try:
r = requests.request(method,
protocol + '://' + host + path,
headers=headers,
params=params,
data=data,
verify=False,
timeout=client_timeout,
allow_redirects=False,
auth=auth_obj)
except requests.exceptions.RequestException as requests_exception:
raise scci.SCCIClientError(requests_exception)
# Process status_code 401
if r.status_code == 401:
raise scci.SCCIClientError('UNAUTHORIZED')
return r
def elcm_profile_list(irmc_info):
"""send an eLCM request to list all profiles
:param irmc_info: node info
:returns: dict object of profiles if succeed
{
'Links':
{
'profileStore':
[
{ '@odata.id': id1 },
{ '@odata.id': id2 },
{ '@odata.id': idN },
]
}
}
:raises: SCCIClientError if SCCI failed
"""
# Send GET request to the server
resp = elcm_request(irmc_info,
method='GET',
path=URL_PATH_PROFILE_MGMT)
if resp.status_code == 200:
return _parse_elcm_response_body_as_json(resp)
else:
raise scci.SCCIClientError(('Failed to list profiles with '
'error code %s' % resp.status_code))
def elcm_profile_get(irmc_info, profile_name):
"""send an eLCM request to get profile data
:param irmc_info: node info
:param profile_name: name of profile
:returns: dict object of profile data if succeed
:raises: ELCMProfileNotFound if profile does not exist
:raises: SCCIClientError if SCCI failed
"""
# Send GET request to the server
resp = elcm_request(irmc_info,
method='GET',
path=URL_PATH_PROFILE_MGMT + profile_name)
if resp.status_code == 200:
return _parse_elcm_response_body_as_json(resp)
elif resp.status_code == 404:
raise ELCMProfileNotFound('Profile "%s" not found '
'in the profile store.' % profile_name)
else:
raise scci.SCCIClientError(('Failed to get profile "%(profile)s" with '
'error code %(error)s' %
{'profile': profile_name,
'error': resp.status_code}))
def elcm_profile_create(irmc_info, param_path):
"""send an eLCM request to create profile
To create a profile, a new session is spawned with status 'running'.
When profile is created completely, the session ends.
:param irmc_info: node info
:param param_path: path of profile
:returns: dict object of session info if succeed
{
'Session':
{
'Id': id
'Status': 'activated'
...
}
}
:raises: SCCIClientError if SCCI failed
"""
# Send POST request to the server
# NOTE: This task may take time, so set a timeout
_irmc_info = dict(irmc_info)
_irmc_info['irmc_client_timeout'] = PROFILE_CREATE_TIMEOUT
resp = elcm_request(_irmc_info,
method='POST',
path=URL_PATH_PROFILE_MGMT + 'get',
params={'PARAM_PATH': param_path})
if resp.status_code == 202:
return _parse_elcm_response_body_as_json(resp)
else:
raise scci.SCCIClientError(('Failed to create profile for path '
'"%(param_path)s" with error code '
'%(error)s' %
{'param_path': param_path,
'error': resp.status_code}))
def elcm_profile_set(irmc_info, input_data):
"""send an eLCM request to set param values
To apply param values, a new session is spawned with status 'running'.
When values are applied or error, the session ends.
:param irmc_info: node info
:param input_data: param values to apply, eg.
{
'Server':
{
'SystemConfig':
{
'BiosConfig':
{
'@Processing': 'execute',
-- config data --
}
}
}
}
:returns: dict object of session info if succeed
{
'Session':
{
'Id': id
'Status': 'activated'
...
}
}
:raises: SCCIClientError if SCCI failed
"""
# Prepare the data to apply
if isinstance(input_data, dict):
data = jsonutils.dumps(input_data)
else:
data = input_data
# Send POST request to the server
# NOTE: This task may take time, so set a timeout
_irmc_info = dict(irmc_info)
_irmc_info['irmc_client_timeout'] = PROFILE_SET_TIMEOUT
resp = elcm_request(_irmc_info,
method='POST',
path=URL_PATH_PROFILE_MGMT + 'set',
headers={'Content-type':
'application/x-www-form-urlencoded'},
data=data)
if resp.status_code == 202:
return _parse_elcm_response_body_as_json(resp)
else:
raise scci.SCCIClientError(('Failed to apply param values with '
'error code %(error)s' %
{'error': resp.status_code}))
def elcm_profile_delete(irmc_info, profile_name):
"""send an eLCM request to delete a profile
:param irmc_info: node info
:param profile_name: name of profile
:raises: ELCMProfileNotFound if the profile does not exist
:raises: SCCIClientError if SCCI failed
"""
# Send DELETE request to the server
resp = elcm_request(irmc_info,
method='DELETE',
path=URL_PATH_PROFILE_MGMT + profile_name)
if resp.status_code == 200:
# Profile deleted
return
elif resp.status_code == 404:
# Profile not found
raise ELCMProfileNotFound('Profile "%s" not found '
'in the profile store.' % profile_name)
else:
raise scci.SCCIClientError(('Failed to delete profile "%(profile)s" '
'with error code %(error)s' %
{'profile': profile_name,
'error': resp.status_code}))
def elcm_session_list(irmc_info):
"""send an eLCM request to list all sessions
:param irmc_info: node info
:returns: dict object of sessions if succeed
{
'SessionList':
{
'Contains':
[
{ 'Id': id1, 'Name': name1 },
{ 'Id': id2, 'Name': name2 },
{ 'Id': idN, 'Name': nameN },
]
}
}
:raises: SCCIClientError if SCCI failed
"""
# Send GET request to the server
resp = elcm_request(irmc_info,
method='GET',
path='/sessionInformation/')
if resp.status_code == 200:
return _parse_elcm_response_body_as_json(resp)
else:
raise scci.SCCIClientError(('Failed to list sessions with '
'error code %s' % resp.status_code))
def elcm_session_get_status(irmc_info, session_id):
"""send an eLCM request to get session status
:param irmc_info: node info
:param session_id: session id
:returns: dict object of session info if succeed
{
'Session':
{
'Id': id
'Status': status
...
}
}
:raises: ELCMSessionNotFound if the session does not exist
:raises: SCCIClientError if SCCI failed
"""
# Send GET request to the server
resp = elcm_request(irmc_info,
method='GET',
path='/sessionInformation/%s/status' % session_id)
if resp.status_code == 200:
return _parse_elcm_response_body_as_json(resp)
elif resp.status_code == 404:
raise ELCMSessionNotFound('Session "%s" does not exist' % session_id)
else:
raise scci.SCCIClientError(('Failed to get status of session '
'"%(session)s" with error code %(error)s' %
{'session': session_id,
'error': resp.status_code}))
def elcm_session_get_log(irmc_info, session_id):
"""send an eLCM request to get session log
:param irmc_info: node info
:param session_id: session id
:returns: dict object of session log if succeed
{
'Session':
{
'Id': id
...
}
}
:raises: ELCMSessionNotFound if the session does not exist
:raises: SCCIClientError if SCCI failed
"""
# Send GET request to the server
resp = elcm_request(irmc_info,
method='GET',
path='/sessionInformation/%s/log' % session_id)
if resp.status_code == 200:
return _parse_elcm_response_body_as_json(resp)
elif resp.status_code == 404:
raise ELCMSessionNotFound('Session "%s" does not exist' % session_id)
else:
raise scci.SCCIClientError(('Failed to get log of session '
'"%(session)s" with error code %(error)s' %
{'session': session_id,
'error': resp.status_code}))
def elcm_session_terminate(irmc_info, session_id):
"""send an eLCM request to terminate a session
:param irmc_info: node info
:param session_id: session id
:raises: ELCMSessionNotFound if the session does not exist
:raises: SCCIClientError if SCCI failed
"""
# Send DELETE request to the server
resp = elcm_request(irmc_info,
method='DELETE',
path='/sessionInformation/%s/terminate' % session_id)
if resp.status_code == 200:
return
elif resp.status_code == 404:
raise ELCMSessionNotFound('Session "%s" does not exist' % session_id)
else:
raise scci.SCCIClientError(('Failed to terminate session '
'"%(session)s" with error code %(error)s' %
{'session': session_id,
'error': resp.status_code}))
def elcm_session_delete(irmc_info, session_id, terminate=False):
"""send an eLCM request to remove a session from the session list
:param irmc_info: node info
:param session_id: session id
:param terminate: a running session must be terminated before removing
:raises: ELCMSessionNotFound if the session does not exist
:raises: SCCIClientError if SCCI failed
"""
# Terminate the session first if needs to
if terminate:
# Get session status to check
session = elcm_session_get_status(irmc_info, session_id)
status = session['Session']['Status']
# Terminate session if it is activated or running
if status == 'running' or status == 'activated':
elcm_session_terminate(irmc_info, session_id)
# Send DELETE request to the server
resp = elcm_request(irmc_info,
method='DELETE',
path='/sessionInformation/%s/remove' % session_id)
if resp.status_code == 200:
return
elif resp.status_code == 404:
raise ELCMSessionNotFound('Session "%s" does not exist' % session_id)
else:
raise scci.SCCIClientError(('Failed to remove session '
'"%(session)s" with error code %(error)s' %
{'session': session_id,
'error': resp.status_code}))

View File

@@ -0,0 +1,686 @@
# Copyright 2016 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test class for iRMC eLCM functionality.
"""
from oslo_utils import encodeutils
import requests
from requests_mock.contrib import fixture as rm_fixture
import testtools
from scciclient.irmc import elcm
from scciclient.irmc import scci
class ELCMTestCase(testtools.TestCase):
"""Tests for eLCM"""
RESPONSE_TEMPLATE = ('Date: Mon, 07 Dec 2015 17:10:55 GMT\n'
'Server: iRMC S4 Webserver\n'
'Content-Length: 123\n'
'Content-Type: application/json; charset=UTF-8\n'
'\r\n'
'{\n'
'%(json_text)s\n'
'}')
def setUp(self):
super(ELCMTestCase, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.irmc_info = {
'irmc_address': '10.124.196.159',
'irmc_username': 'admin',
'irmc_password': 'admin0',
'irmc_port': 80,
'irmc_auth_method': 'basic',
'irmc_client_timeout': 60,
}
def _create_server_url(self, path, port=None):
scheme = 'unknown'
if port is None:
port = self.irmc_info['irmc_port']
if port == 80:
scheme = 'http'
elif port == 443:
scheme = 'https'
return ('%(scheme)s://%(addr)s%(path)s' %
{'scheme': scheme,
'addr': self.irmc_info['irmc_address'],
'path': path})
def _create_server_response(self, content):
response = requests.Response()
response._content = encodeutils.safe_encode(content)
response.encoding = 'utf-8'
return response
def test__parse_elcm_response_body_as_json_empty(self):
response = self._create_server_response('')
self.assertRaises(elcm.ELCMInvalidResponse,
elcm._parse_elcm_response_body_as_json,
response=response)
def test__parse_elcm_response_body_as_json_invalid(self):
content = 'abc123'
response = self._create_server_response(content)
self.assertRaises(elcm.ELCMInvalidResponse,
elcm._parse_elcm_response_body_as_json,
response=response)
def test__parse_elcm_response_body_as_json_missing_empty_line(self):
content = ('key1:val1\nkey2:val2\n'
'{"1":1,"2":[123, "abc"],"3":3}')
response = self._create_server_response(content)
self.assertRaises(elcm.ELCMInvalidResponse,
elcm._parse_elcm_response_body_as_json,
response=response)
def test__parse_elcm_response_body_as_json_ok(self):
content = ('key1:val1\nkey2:val2\n'
'\r\n'
'{"1":1,"2":[123, "abc"],"3":3}')
response = self._create_server_response(content)
result = elcm._parse_elcm_response_body_as_json(response)
expected = {
"1": 1,
"2": [123, "abc"],
"3": 3
}
self.assertEqual(expected, result)
def test_elcm_request_protocol_http_ok(self):
irmc_info = dict(self.irmc_info)
irmc_info['irmc_port'] = 80
self.requests_mock.register_uri(
'GET',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT,
port=80),
text=self.RESPONSE_TEMPLATE % {'json_text': 'abc123'})
response = elcm.elcm_request(
irmc_info,
method='GET',
path=elcm.URL_PATH_PROFILE_MGMT)
self.assertEqual(200, response.status_code)
expected = self.RESPONSE_TEMPLATE % {'json_text': 'abc123'}
self.assertEqual(expected, response.text)
def test_elcm_request_protocol_https_ok(self):
irmc_info = dict(self.irmc_info)
irmc_info['irmc_port'] = 443
self.requests_mock.register_uri(
'GET',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT,
port=443),
text=self.RESPONSE_TEMPLATE % {'json_text': 'abc123'})
response = elcm.elcm_request(
irmc_info,
method='GET',
path=elcm.URL_PATH_PROFILE_MGMT)
self.assertEqual(200, response.status_code)
expected = self.RESPONSE_TEMPLATE % {'json_text': 'abc123'}
self.assertEqual(expected, response.text)
def test_elcm_request_unsupported_port(self):
irmc_info = dict(self.irmc_info)
irmc_info['irmc_port'] = 22
e = self.assertRaises(scci.SCCIInvalidInputError,
elcm.elcm_request,
irmc_info,
method='GET',
path=elcm.URL_PATH_PROFILE_MGMT)
auth_method = self.irmc_info['irmc_auth_method']
self.assertEqual((("Invalid port %(port)d or "
"auth_method for method %(auth_method)s") %
{'port': 22,
'auth_method': auth_method}), str(e))
def test_elcm_request_auth_method_basic(self):
irmc_info = dict(self.irmc_info)
irmc_info['irmc_auth_method'] = 'basic'
self.requests_mock.register_uri(
'GET',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT),
status_code=200,
text='ok')
response = elcm.elcm_request(
irmc_info,
method='GET',
path=elcm.URL_PATH_PROFILE_MGMT)
self.assertEqual(200, response.status_code)
self.assertEqual('ok', response.text)
def test_elcm_request_auth_method_digest(self):
irmc_info = dict(self.irmc_info)
irmc_info['irmc_auth_method'] = 'digest'
self.requests_mock.register_uri(
'GET',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT),
status_code=200,
text='ok')
response = elcm.elcm_request(
irmc_info,
method='GET',
path=elcm.URL_PATH_PROFILE_MGMT)
self.assertEqual(200, response.status_code)
self.assertEqual('ok', response.text)
def test_elcm_request_unsupported_auth_method(self):
irmc_info = dict(self.irmc_info)
irmc_info['irmc_auth_method'] = 'unknown'
e = self.assertRaises(scci.SCCIInvalidInputError,
elcm.elcm_request,
irmc_info,
method='GET',
path=elcm.URL_PATH_PROFILE_MGMT)
port = self.irmc_info['irmc_port']
self.assertEqual((("Invalid port %(port)d or "
"auth_method for method %(auth_method)s") %
{'port': port,
'auth_method': 'unknown'}), str(e))
def test_elcm_request_auth_failed(self):
self.requests_mock.register_uri(
'GET',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT),
status_code=401,
text='401 Unauthorized')
e = self.assertRaises(scci.SCCIClientError,
elcm.elcm_request,
self.irmc_info,
method='GET',
path=elcm.URL_PATH_PROFILE_MGMT)
self.assertEqual('UNAUTHORIZED', str(e))
def test_elcm_profile_get_not_found(self):
profile_name = elcm.PROFILE_BIOS_CONFIG
self.requests_mock.register_uri(
'GET',
self._create_server_url((elcm.URL_PATH_PROFILE_MGMT +
profile_name)),
status_code=404)
self.assertRaises(elcm.ELCMProfileNotFound,
elcm.elcm_profile_get,
self.irmc_info,
profile_name=profile_name)
def test_elcm_profile_list_failed(self):
self.requests_mock.register_uri(
'GET',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT),
status_code=503)
self.assertRaises(scci.SCCIClientError,
elcm.elcm_profile_list,
self.irmc_info)
def test_elcm_profile_list_ok(self):
self.requests_mock.register_uri(
'GET',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT),
text=(self.RESPONSE_TEMPLATE % {
'json_text':
'"Links":{'
' "profileStore":['
' {"@odata.id": "id1"},'
' {"@odata.id": "id2"}'
' ]'
'}'}))
result = elcm.elcm_profile_list(self.irmc_info)
expected = {
"Links": {
"profileStore": [
{"@odata.id": "id1"},
{"@odata.id": "id2"}
]
}
}
self.assertEqual(expected, result)
def test_elcm_profile_get_failed(self):
profile_name = elcm.PROFILE_BIOS_CONFIG
self.requests_mock.register_uri(
'GET',
self._create_server_url((elcm.URL_PATH_PROFILE_MGMT +
profile_name)),
status_code=500)
self.assertRaises(scci.SCCIClientError,
elcm.elcm_profile_get,
self.irmc_info,
profile_name=profile_name)
def test_elcm_profile_get_ok(self):
profile_name = elcm.PROFILE_BIOS_CONFIG
self.requests_mock.register_uri(
'GET',
self._create_server_url((elcm.URL_PATH_PROFILE_MGMT +
profile_name)),
text=(self.RESPONSE_TEMPLATE % {
'json_text':
'"Server":{'
' "SystemConfig":{'
' "BiosConfig":{'
' "key":"val"'
' }'
' }'
'}'}))
result = elcm.elcm_profile_get(
self.irmc_info,
profile_name=profile_name)
expected = {
"Server": {
"SystemConfig": {
"BiosConfig": {
"key": "val"
}
}
}
}
self.assertEqual(expected, result)
def test_elcm_profile_create_failed(self):
param_path = elcm.PARAM_PATH_BIOS_CONFIG
self.requests_mock.register_uri(
'POST',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT + 'get'),
status_code=200) # Success code is 202
self.assertRaises(scci.SCCIClientError,
elcm.elcm_profile_create,
self.irmc_info,
param_path=param_path)
def test_elcm_profile_create_ok(self):
param_path = elcm.PARAM_PATH_BIOS_CONFIG
self.requests_mock.register_uri(
'POST',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT + 'get'),
status_code=202, # Success code is 202
text=(self.RESPONSE_TEMPLATE % {
'json_text':
'"Session":{'
' "Id": 123,'
' "Status": "activated"'
'}'}))
result = elcm.elcm_profile_create(self.irmc_info,
param_path=param_path)
expected = {
"Session": {
"Id": 123,
"Status": "activated",
}
}
self.assertEqual(expected, result)
def test_elcm_profile_set_failed(self):
input_data = {
"Server": {
"SystemConfig": {
"BiosConfig": {
"key": "val"
}
}
}
}
self.requests_mock.register_uri(
'POST',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT + 'set'),
status_code=200) # Success code is 202
self.assertRaises(scci.SCCIClientError,
elcm.elcm_profile_set,
self.irmc_info,
input_data=input_data)
def test_elcm_profile_set_ok(self):
input_data = {
"Server": {
"SystemConfig": {
"BiosConfig": {
"key": "val"
}
}
}
}
self.requests_mock.register_uri(
'POST',
self._create_server_url(elcm.URL_PATH_PROFILE_MGMT + 'set'),
status_code=202, # Success code is 202
text=(self.RESPONSE_TEMPLATE % {
'json_text':
'"Session":{'
' "Id": 123,'
' "Status": "activated"'
'}'}))
result = elcm.elcm_profile_set(self.irmc_info,
input_data=input_data)
expected = {
"Session": {
"Id": 123,
"Status": "activated",
}
}
self.assertEqual(expected, result)
def test_elcm_profile_delete_not_found(self):
profile_name = elcm.PROFILE_BIOS_CONFIG
self.requests_mock.register_uri(
'DELETE',
self._create_server_url((elcm.URL_PATH_PROFILE_MGMT +
profile_name)),
status_code=404)
self.assertRaises(elcm.ELCMProfileNotFound,
elcm.elcm_profile_delete,
self.irmc_info,
profile_name=profile_name)
def test_elcm_profile_delete_failed(self):
profile_name = elcm.PROFILE_BIOS_CONFIG
self.requests_mock.register_uri(
'DELETE',
self._create_server_url((elcm.URL_PATH_PROFILE_MGMT +
profile_name)),
status_code=500)
self.assertRaises(scci.SCCIClientError,
elcm.elcm_profile_delete,
self.irmc_info,
profile_name=profile_name)
def test_elcm_profile_delete_ok(self):
profile_name = elcm.PROFILE_BIOS_CONFIG
self.requests_mock.register_uri(
'DELETE',
self._create_server_url((elcm.URL_PATH_PROFILE_MGMT +
profile_name)),
text='ok')
result = elcm.elcm_profile_delete(self.irmc_info,
profile_name=profile_name)
self.assertIsNone(result)
def test_elcm_session_list_failed(self):
self.requests_mock.register_uri(
'GET',
self._create_server_url('/sessionInformation/'),
status_code=500)
self.assertRaises(scci.SCCIClientError,
elcm.elcm_session_list,
self.irmc_info)
def test_elcm_session_list_empty(self):
self.requests_mock.register_uri(
'GET',
self._create_server_url('/sessionInformation/'),
text=(self.RESPONSE_TEMPLATE % {
'json_text':
'"SessionList":{'
' "Contains":['
' ]'
'}'}))
result = elcm.elcm_session_list(self.irmc_info)
expected = {
"SessionList": {
"Contains": [
]
}
}
self.assertEqual(expected, result)
def test_elcm_session_list_ok(self):
self.requests_mock.register_uri(
'GET',
self._create_server_url('/sessionInformation/'),
text=(self.RESPONSE_TEMPLATE % {
'json_text':
'"SessionList":{'
' "Contains":['
' { "Id": 1, "Name": "name1" },'
' { "Id": 2, "Name": "name2" },'
' { "Id": 3, "Name": "name3" }'
' ]'
'}'}))
result = elcm.elcm_session_list(self.irmc_info)
expected = {
"SessionList": {
"Contains": [
{"Id": 1, "Name": "name1"},
{"Id": 2, "Name": "name2"},
{"Id": 3, "Name": "name3"}
]
}
}
self.assertEqual(expected, result)
def test_elcm_session_get_status_not_found(self):
session_id = 123
self.requests_mock.register_uri(
'GET',
self._create_server_url(('/sessionInformation/%s/status' %
session_id)),
status_code=404)
self.assertRaises(elcm.ELCMSessionNotFound,
elcm.elcm_session_get_status,
self.irmc_info,
session_id=session_id)
def test_elcm_session_get_status_failed(self):
session_id = 123
self.requests_mock.register_uri(
'GET',
self._create_server_url(('/sessionInformation/%s/status' %
session_id)),
status_code=500)
self.assertRaises(scci.SCCIClientError,
elcm.elcm_session_get_status,
self.irmc_info,
session_id=session_id)
def test_elcm_session_get_status_ok(self):
session_id = 123
self.requests_mock.register_uri(
'GET',
self._create_server_url(('/sessionInformation/%s/status' %
session_id)),
text=(self.RESPONSE_TEMPLATE % {
'json_text':
'"Session":{'
' "Id": 123,'
' "Status": "abc123"'
'}'}))
result = elcm.elcm_session_get_status(self.irmc_info,
session_id=session_id)
expected = {
"Session": {
"Id": 123,
"Status": "abc123",
}
}
self.assertEqual(expected, result)
def test_elcm_session_get_log_not_found(self):
session_id = 123
self.requests_mock.register_uri(
'GET',
self._create_server_url(('/sessionInformation/%s/log' %
session_id)),
status_code=404)
self.assertRaises(elcm.ELCMSessionNotFound,
elcm.elcm_session_get_log,
self.irmc_info,
session_id=session_id)
def test_elcm_session_get_log_failed(self):
session_id = 123
self.requests_mock.register_uri(
'GET',
self._create_server_url(('/sessionInformation/%s/log' %
session_id)),
status_code=500)
self.assertRaises(scci.SCCIClientError,
elcm.elcm_session_get_log,
self.irmc_info,
session_id=session_id)
def test_elcm_session_get_log_ok(self):
session_id = 123
self.requests_mock.register_uri(
'GET',
self._create_server_url(('/sessionInformation/%s/log' %
session_id)),
text=(self.RESPONSE_TEMPLATE % {
'json_text':
'"Session":{'
' "Id": 123,'
' "A_Param": "abc123"'
'}'}))
result = elcm.elcm_session_get_log(self.irmc_info,
session_id=session_id)
expected = {
"Session": {
"Id": 123,
"A_Param": "abc123",
}
}
self.assertEqual(expected, result)
def test_elcm_session_terminate_not_found(self):
session_id = 123
self.requests_mock.register_uri(
'DELETE',
self._create_server_url(('/sessionInformation/%s/terminate' %
session_id)),
status_code=404)
self.assertRaises(elcm.ELCMSessionNotFound,
elcm.elcm_session_terminate,
self.irmc_info,
session_id=session_id)
def test_elcm_session_terminate_failed(self):
session_id = 123
self.requests_mock.register_uri(
'DELETE',
self._create_server_url(('/sessionInformation/%s/terminate' %
session_id)),
status_code=500)
self.assertRaises(scci.SCCIClientError,
elcm.elcm_session_terminate,
self.irmc_info,
session_id=session_id)
def test_elcm_session_terminate_ok(self):
session_id = 123
self.requests_mock.register_uri(
'DELETE',
self._create_server_url(('/sessionInformation/%s/terminate' %
session_id)),
text='ok')
result = elcm.elcm_session_terminate(self.irmc_info,
session_id=session_id)
self.assertIsNone(result)
def test_elcm_session_delete_not_found(self):
session_id = 123
self.requests_mock.register_uri(
'DELETE',
self._create_server_url(('/sessionInformation/%s/remove' %
session_id)),
status_code=404)
self.assertRaises(elcm.ELCMSessionNotFound,
elcm.elcm_session_delete,
self.irmc_info,
session_id=session_id)
def test_elcm_session_delete_failed(self):
session_id = 123
self.requests_mock.register_uri(
'DELETE',
self._create_server_url(('/sessionInformation/%s/remove' %
session_id)),
status_code=500)
self.assertRaises(scci.SCCIClientError,
elcm.elcm_session_delete,
self.irmc_info,
session_id=session_id)
def test_elcm_session_delete_ok(self):
session_id = 123
self.requests_mock.register_uri(
'DELETE',
self._create_server_url(('/sessionInformation/%s/remove' %
session_id)),
text='ok')
result = elcm.elcm_session_delete(self.irmc_info,
session_id=session_id)
self.assertIsNone(result)