Merge "Add capabilities discovery ability to scciclient"

This commit is contained in:
Zuul
2017-10-24 05:09:52 +00:00
committed by Gerrit Code Review
7 changed files with 1072 additions and 1 deletions

View File

@@ -1,9 +1,10 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
pbr!=2.1.0,>=2.0.0 # Apache-2.0
Babel!=2.4.0,>=2.3.4 # BSD
pyghmi>=1.0.22 # Apache-2.0
pysnmp>=4.2.3 # BSD
requests>=2.14.2 # Apache-2.0
six>=1.9.0 # MIT
oslo.utils>=3.28.0 # Apache-2.0

172
scciclient/irmc/ipmi.py Executable file
View File

@@ -0,0 +1,172 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import itertools
from pyghmi import exceptions as ipmi_exception
from pyghmi.ipmi import command as ipmi_command
# F1 1A - Get the number of GPU devices on PCI and the number of CPUs with FPGA
GET_GPU = '0x2E 0xF1 0x80 0x28 0x00 0x1A %s 0x00'
# F5 81 - GET TPM STATUS
GET_TPM_STATUS = '0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0'
class IPMIFailure(Exception):
"""IPMI Failure
This exception is used when IPMI operation failed.
"""
def __init__(self, message):
super(IPMIFailure, self).__init__(message)
class InvalidParameterValue(IPMIFailure):
"""Invalid Parameter Value Failure
This exception is used when invalid parameter values are passed to
the APIs exposed by this module.
"""
def __init__(self, message):
super(InvalidParameterValue, self).__init__(message)
def _parse_raw_bytes(raw_bytes):
"""Convert a string of hexadecimal values to decimal values parameters
Example: '0x2E 0xF1 0x80 0x28 0x00 0x1A 0x01 0x00' is converted to:
46, 241, [128, 40, 0, 26, 1, 0]
:param raw_bytes: string of hexadecimal values
:returns: 3 decimal values
"""
bytes_list = [int(x, base=16) for x in raw_bytes.split()]
return bytes_list[0], bytes_list[1], bytes_list[2:]
def _send_raw_command(ipmicmd, raw_bytes):
"""Use IPMI command object to send raw ipmi command to BMC
:param ipmicmd: IPMI command object
:param raw_bytes: string of hexadecimal values. This is commonly used
for certain vendor specific commands.
:returns: dict -- The response from IPMI device
"""
netfn, command, data = _parse_raw_bytes(raw_bytes)
response = ipmicmd.raw_command(netfn, command, data=data)
return response
def get_tpm_status(d_info):
"""Get the TPM support status.
Get the TPM support status of the node.
:param d_info: the list of ipmitool parameters for accessing a node.
:returns: TPM support status
"""
# note:
# Get TPM support status : ipmi cmd '0xF5', valid flags '0xC0'
#
# $ ipmitool raw 0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0
#
# Raw response:
# 80 28 00 C0 C0: True
# 80 28 00 -- --: False (other values than 'C0 C0')
ipmicmd = ipmi_command.Command(bmc=d_info['irmc_address'],
userid=d_info['irmc_username'],
password=d_info['irmc_password'])
try:
response = _send_raw_command(ipmicmd, GET_TPM_STATUS)
if response['code'] != 0:
raise IPMIFailure(
"IPMI operation '%(operation)s' failed: %(error)s" %
{'operation': "GET TMP status",
'error': response.get('error')})
out = ' '.join('{:02X}'.format(x) for x in response['data'])
return out is not None and out[-5:] == 'C0 C0'
except ipmi_exception.IpmiException as e:
raise IPMIFailure(
"IPMI operation '%(operation)s' failed: %(error)s" %
{'operation': "GET TMP status", 'error': e})
def _pci_seq(ipmicmd):
"""Get output of ipmiraw command and the ordinal numbers.
:param ipmicmd: IPMI command object.
:returns: List of tuple contain ordinal number and output of ipmiraw
command.
"""
for i in range(1, 0xff + 1):
try:
res = _send_raw_command(ipmicmd, GET_GPU % hex(i))
yield i, res
except ipmi_exception.IpmiException as e:
raise IPMIFailure(
"IPMI operation '%(operation)s' failed: %(error)s" %
{'operation': "GET GPU device quantity", 'error': e})
def get_gpu(d_info, pci_device_ids):
"""Get quantity of GPU devices on PCI and quantity of CPUs with FPGA.
Get quantity of GPU devices on PCI and quantity of CPUs with FPGA of the
node.
:param d_info: the list of ipmitool parameters for accessing a node.
:param pci_device_ids: the list contains pairs of <vendorID>/<deviceID> for
GPU on PCI.
:returns: a tuple of the number of GPU devices on PCI and the number of
CPUs with FPGA.
"""
# note:
# Get quantity of GPU devices on PCI and quantity of CPUs with FPGA:
# ipmi cmd '0xF1'
#
# $ ipmitool raw 0x2E 0xF1 0x80 0x28 0x00 0x1A 0x01 0x00
#
# Raw response:
# 80 28 00 00 00 05 data1 data2 34 17 76 11 00 04
# 01
# data1: 2 octet of VendorID
# data2: 2 octet of DeviceID
ipmicmd = ipmi_command.Command(bmc=d_info['irmc_address'],
userid=d_info['irmc_username'],
password=d_info['irmc_password'])
response = itertools.takewhile(
lambda y: (y[1]['code'] != 0xC9 and y[1].get('error') is None),
_pci_seq(ipmicmd))
def _pci_count(accm, v):
out = v[1]['data']
# if system returns value, record id will be increased.
pci_id = "0x{:02x}{:02x}/0x{:02x}{:02x}".format(
out[7], out[6], out[9], out[8])
return accm + 1 if pci_id in pci_device_ids else accm
gpu_count = functools.reduce(_pci_count, response, 0)
return gpu_count

55
scciclient/irmc/scci.py Normal file → Executable file
View File

@@ -23,6 +23,8 @@ import xml.etree.ElementTree as ET
import requests
import six
from scciclient.irmc import ipmi
from scciclient.irmc import snmp
DEBUG = False
@@ -486,3 +488,56 @@ def get_essential_properties(report, prop_keys):
v['cpu_arch'] = 'x86_64'
return {k: v[k] for k in prop_keys}
def get_capabilities_properties(d_info,
capa_keys,
pci_device_ids,
**kwargs):
"""get capabilities properties
This function returns a dictionary which contains keys
and their values from the report.
:param d_info: the dictionary of ipmitool parameters for accessing a node.
:param capa_keys: a list of keys for additional capabilities properties.
:param pci_device_ids: the list of string contains <vendorID>/<deviceID>
for GPU.
:param kwargs: additional arguments passed to scciclient.
:returns: a dictionary which contains keys and their values.
"""
snmp_client = snmp.SNMPClient(d_info['irmc_address'],
d_info['irmc_snmp_port'],
d_info['irmc_snmp_version'],
d_info['irmc_snmp_community'],
d_info['irmc_snmp_security'])
try:
v = {}
if 'rom_firmware_version' in capa_keys:
v['rom_firmware_version'] = \
snmp.get_bios_firmware_version(snmp_client)
if 'irmc_firmware_version' in capa_keys:
v['irmc_firmware_version'] = \
snmp.get_irmc_firmware_version(snmp_client)
if 'server_model' in capa_keys:
v['server_model'] = snmp.get_server_model(snmp_client)
# Sometime the server started but PCI device list building is
# still in progress so system will response error. We have to wait
# for some more seconds.
if kwargs.get('sleep_flag', False) and 'pci_gpu_devices' in capa_keys:
time.sleep(5)
if 'pci_gpu_devices' in capa_keys:
v['pci_gpu_devices'] = ipmi.get_gpu(d_info, pci_device_ids)
if 'trusted_boot' in capa_keys:
v['trusted_boot'] = ipmi.get_tpm_status(d_info)
return v
except (snmp.SNMPFailure, ipmi.IPMIFailure) as err:
raise SCCIClientError('Capabilities inspection failed: %s' % err)

260
scciclient/irmc/snmp.py Normal file
View File

@@ -0,0 +1,260 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp import error as snmp_error
BMC_NAME_OID = '1.3.6.1.4.1.231.2.10.2.2.10.3.4.1.3.1.1'
IRMC_FW_VERSION_OID = '1.3.6.1.4.1.231.2.10.2.2.10.3.4.1.4.1.1'
BIOS_FW_VERSION_OID = '1.3.6.1.4.1.231.2.10.2.2.10.4.1.1.11.1'
SERVER_MODEL_OID = '1.3.6.1.4.1.231.2.10.2.2.10.2.3.1.4.1'
SNMP_V1 = '1'
SNMP_V2C = '2c'
SNMP_V3 = '3'
SNMP_FAILURE_MSG = "SNMP operation '%s' failed: %s"
class SNMPFailure(Exception):
"""SNMP Failure
This exception is used when invalid inputs are passed to
the APIs exposed by this module.
"""
def __init__(self, message):
super(SNMPFailure, self).__init__(message)
class SNMPIRMCFirmwareFailure(SNMPFailure):
"""SNMP iRMC Firmware Failure
This exception is used when error occurs when collecting iRMC firmware.
"""
def __init__(self, message):
super(SNMPIRMCFirmwareFailure, self).__init__(message)
class SNMPBIOSFirmwareFailure(SNMPFailure):
"""SNMP BIOS Firmware Failure
This exception is used when error occurs when collecting BIOS firmware.
"""
def __init__(self, message):
super(SNMPBIOSFirmwareFailure, self).__init__(message)
class SNMPServerModelFailure(SNMPFailure):
"""SNMP Server Model Failure
This exception is used when error occurs when collecting server model.
"""
def __init__(self, message):
super(SNMPServerModelFailure, self).__init__(message)
def get_irmc_firmware_version(snmp_client):
"""Get irmc firmware version of the node.
:param snmp_client: an SNMP client object.
:raises: SNMPFailure if SNMP operation failed.
:returns: a string of bmc name and irmc firmware version.
"""
try:
bmc_name = snmp_client.get(BMC_NAME_OID)
irmc_firm_ver = snmp_client.get(IRMC_FW_VERSION_OID)
return ('%(bmc)s%(sep)s%(firm_ver)s' %
{'bmc': bmc_name if bmc_name else '',
'firm_ver': irmc_firm_ver if irmc_firm_ver else '',
'sep': '-' if bmc_name and irmc_firm_ver else ''})
except SNMPFailure as e:
raise SNMPIRMCFirmwareFailure(
SNMP_FAILURE_MSG % ("GET IRMC FIRMWARE VERSION", e))
def get_bios_firmware_version(snmp_client):
"""Get bios firmware version of the node.
:param snmp_client: an SNMP client object.
:raises: SNMPFailure if SNMP operation failed.
:returns: a string of bios firmware version.
"""
try:
bios_firmware_version = snmp_client.get(BIOS_FW_VERSION_OID)
return six.text_type(bios_firmware_version)
except SNMPFailure as e:
raise SNMPBIOSFirmwareFailure(
SNMP_FAILURE_MSG % ("GET BIOS FIRMWARE VERSION", e))
def get_server_model(snmp_client):
"""Get server model of the node.
:param snmp_client: an SNMP client object.
:raises: SNMPFailure if SNMP operation failed.
:returns: a string of server model.
"""
try:
server_model = snmp_client.get(SERVER_MODEL_OID)
return six.text_type(server_model)
except SNMPFailure as e:
raise SNMPServerModelFailure(
SNMP_FAILURE_MSG % ("GET SERVER MODEL", e))
class SNMPClient(object):
"""SNMP client object.
Performs low level SNMP get and set operations. Encapsulates all
interaction with PySNMP to simplify dynamic importing and unit testing.
"""
def __init__(self, address, port, version, community=None, security=None):
self.address = address
self.port = port
self.version = version
if self.version == SNMP_V3:
self.security = security
else:
self.community = community
self.cmd_gen = cmdgen.CommandGenerator()
def _get_auth(self):
"""Return the authorization data for an SNMP request.
:returns: A
:class:`pysnmp.entity.rfc3413.oneliner.cmdgen.CommunityData`
object.
"""
if self.version == SNMP_V3:
# Handling auth/encryption credentials is not (yet) supported.
# This version supports a security name analogous to community.
return cmdgen.UsmUserData(self.security)
else:
mp_model = 1 if self.version == SNMP_V2C else 0
return cmdgen.CommunityData(self.community, mpModel=mp_model)
def _get_transport(self):
"""Return the transport target for an SNMP request.
:returns: A :class:
`pysnmp.entity.rfc3413.oneliner.cmdgen.UdpTransportTarget` object.
:raises: snmp_error.PySnmpError if the transport address is bad.
"""
# The transport target accepts timeout and retries parameters, which
# default to 1 (second) and 5 respectively. These are deemed sensible
# enough to allow for an unreliable network or slow device.
return cmdgen.UdpTransportTarget((self.address, self.port))
def get(self, oid):
"""Use PySNMP to perform an SNMP GET operation on a single object.
:param oid: The OID of the object to get.
:raises: SNMPFailure if an SNMP request fails.
:returns: The value of the requested object.
"""
try:
results = self.cmd_gen.getCmd(self._get_auth(),
self._get_transport(),
oid)
except snmp_error.PySnmpError as e:
raise SNMPFailure(SNMP_FAILURE_MSG % ("GET", e))
error_indication, error_status, error_index, var_binds = results
if error_indication:
# SNMP engine-level error.
raise SNMPFailure(SNMP_FAILURE_MSG % ("GET", error_indication))
if error_status:
# SNMP PDU error.
raise SNMPFailure(
"SNMP operation '%(operation)s' failed: %(error)s at"
" %(index)s" %
{'operation': "GET", 'error': error_status.prettyPrint(),
'index':
error_index and var_binds[int(error_index) - 1]
or '?'})
# We only expect a single value back
name, val = var_binds[0]
return val
def get_next(self, oid):
"""Use PySNMP to perform an SNMP GET NEXT operation on a table object.
:param oid: The OID of the object to get.
:raises: SNMPFailure if an SNMP request fails.
:returns: A list of values of the requested table object.
"""
try:
results = self.cmd_gen.nextCmd(self._get_auth(),
self._get_transport(),
oid)
except snmp_error.PySnmpError as e:
raise SNMPFailure(SNMP_FAILURE_MSG % ("GET_NEXT", e))
error_indication, error_status, error_index, var_binds = results
if error_indication:
# SNMP engine-level error.
raise SNMPFailure(
SNMP_FAILURE_MSG % ("GET_NEXT", error_indication))
if error_status:
# SNMP PDU error.
raise SNMPFailure(
"SNMP operation '%(operation)s' failed: %(error)s at"
" %(index)s" %
{'operation': "GET_NEXT", 'error': error_status.prettyPrint(),
'index':
error_index and var_binds[int(error_index) - 1]
or '?'})
return [val for row in var_binds for name, val in row]
def set(self, oid, value):
"""Use PySNMP to perform an SNMP SET operation on a single object.
:param oid: The OID of the object to set.
:param value: The value of the object to set.
:raises: SNMPFailure if an SNMP request fails.
"""
try:
results = self.cmd_gen.setCmd(self._get_auth(),
self._get_transport(),
(oid, value))
except snmp_error.PySnmpError as e:
raise SNMPFailure(SNMP_FAILURE_MSG % ("SET", e))
error_indication, error_status, error_index, var_binds = results
if error_indication:
# SNMP engine-level error.
raise SNMPFailure(SNMP_FAILURE_MSG % ("SET", error_indication))
if error_status:
# SNMP PDU error.
raise SNMPFailure(
"SNMP operation '%(operation)s' failed: %(error)s at"
" %(index)s" %
{'operation': "SET", 'error': error_status.prettyPrint(),
'index':
error_index and var_binds[int(error_index) - 1]
or '?'})

View File

@@ -0,0 +1,139 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test class for IPMI Module.
"""
import mock
import testtools
from pyghmi import exceptions as ipmi_exception
from pyghmi.ipmi import command as ipmi_command
from scciclient.irmc import ipmi
@mock.patch.object(ipmi_command, 'Command', new=mock.Mock())
class IpmiTestCase(testtools.TestCase):
"""Tests for IPMI
Unit Test Cases for getting information via ipmi raw command
"""
def setUp(self):
super(IpmiTestCase, self).setUp()
self.info = {'irmc_address': "10.0.0.10",
'irmc_username': "admin",
'irmc_password': "admin",
}
@mock.patch.object(ipmi, '_send_raw_command')
def test_get_tpm_status_true(self, exec_mock):
exec_mock.return_value = {'command': 0xF5, 'code': 0x00, 'netfn': 0x2F,
'data': [0x80, 0x28, 0x00, 0xC0, 0xC0]}
cmd = "0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0"
actual_out = ipmi.get_tpm_status(self.info)
self.assertEqual(True, actual_out)
exec_mock.assert_called_once_with(mock.ANY, cmd)
@mock.patch.object(ipmi, '_send_raw_command')
def test_get_tpm_status_false(self, exec_mock):
exec_mock.return_value = {'command': 0xF5, 'code': 0x00, 'netfn': 0x2F,
'data': [0x80, 0x28, 0x00, 0x80, 0x01]}
cmd = "0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0"
actual_out = ipmi.get_tpm_status(self.info)
self.assertEqual(False, actual_out)
exec_mock.assert_called_once_with(mock.ANY, cmd)
@mock.patch.object(ipmi, '_send_raw_command')
def test_get_tpm_status_error_code(self, exec_mock):
exec_mock.return_value = {'command': 0xF5, 'code': 0x01, 'netfn': 0x2F,
'data': [0x80, 0x28, 0x00, 0x80, 0x01]}
cmd = "0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0"
self.assertRaises(ipmi.IPMIFailure, ipmi.get_tpm_status, self.info)
exec_mock.assert_called_once_with(mock.ANY, cmd)
@mock.patch.object(ipmi, '_send_raw_command')
def test_get_tpm_status_exception(self, exec_mock):
exec_mock.side_effect = ipmi_exception.IpmiException
cmd = "0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0"
self.assertRaises(ipmi.IPMIFailure, ipmi.get_tpm_status, self.info)
exec_mock.assert_called_once_with(mock.ANY, cmd)
@mock.patch.object(ipmi, '_send_raw_command')
def test_get_gpu(self, exec_mock):
gpu_ids = ['0x1000/0x0079', '0x2100/0x0080']
exec_mock.side_effect = ({'command': 0xF1, 'code': 0x00, 'netfn': 0x2F,
'data': [0x80, 0x28, 0x00, 0x00, 0x00, 0x05,
0x00, 0x10, 0x79, 0x00, 0x34, 0x17,
0x76, 0x11, 0x00, 0x04, 0x01]},
{'command': 0xF1, 'code': 0xC9, 'netfn': 0x2F,
'error': 'Parameter out of range',
'data': [0x80, 0x28, 0x00]})
cmd1 = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x1 0x00"
cmd2 = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x2 0x00"
actual_out = ipmi.get_gpu(self.info, gpu_ids)
self.assertEqual(1, actual_out)
exec_mock.assert_has_calls([mock.call(mock.ANY, cmd1),
mock.call(mock.ANY, cmd2)])
@mock.patch.object(ipmi, '_send_raw_command')
def test_get_gpu_blank(self, exec_mock):
gpu_ids = []
actual_out = ipmi.get_gpu(self.info, gpu_ids)
self.assertEqual(0, actual_out)
self.assertTrue(exec_mock.called)
@mock.patch.object(ipmi, '_send_raw_command')
def test_get_gpu_not_found(self, exec_mock):
gpu_ids = ['0x1111/0x1179', '0x2100/0x0080']
exec_mock.side_effect = ({'command': 0xF1, 'code': 0x00, 'netfn': 0x2F,
'data': [0x80, 0x28, 0x00, 0x00, 0x00, 0x05,
0x00, 0x10, 0x79, 0x00, 0x34, 0x17,
0x76, 0x11, 0x00, 0x04, 0x01]},
{'command': 0xF1, 'code': 0xC9, 'netfn': 0x2F,
'error': 'Parameter out of range',
'data': [0x80, 0x28, 0x00]})
cmd1 = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x1 0x00"
cmd2 = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x2 0x00"
actual_out = ipmi.get_gpu(self.info, gpu_ids)
self.assertEqual(0, actual_out)
exec_mock.assert_has_calls([mock.call(mock.ANY, cmd1),
mock.call(mock.ANY, cmd2)])
@mock.patch.object(ipmi, '_send_raw_command')
def test_get_gpu_exception(self, exec_mock):
gpu_ids = ['0x1111/0x1179', '0x2100/0x0080']
exec_mock.side_effect = ipmi_exception.IpmiException('Error')
cmd = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x1 0x00"
e = self.assertRaises(ipmi.IPMIFailure,
ipmi.get_gpu,
self.info,
gpu_ids)
exec_mock.assert_called_once_with(mock.ANY, cmd)
self.assertEqual('IPMI operation \'GET GPU device quantity\' '
'failed: Error', str(e))

View File

@@ -23,7 +23,9 @@ import mock
from requests_mock.contrib import fixture as rm_fixture
import testtools
from scciclient.irmc import ipmi
from scciclient.irmc import scci
from scciclient.irmc import snmp
class SCCITestCase(testtools.TestCase):
@@ -55,6 +57,19 @@ class SCCITestCase(testtools.TestCase):
self.irmc_port = 80
self.irmc_auth_method = 'basic'
self.irmc_client_timeout = 60
self.irmc_info = {'irmc_address': self.irmc_address,
'irmc_username': self.irmc_username,
'irmc_password': self.irmc_password,
'irmc_snmp_port': 161,
'irmc_snmp_version': 'v2c',
'irmc_snmp_community': 'public',
'irmc_snmp_security': None,
'irmc_client_timeout': self.irmc_client_timeout,
'irmc_sensor_method': 'ipmitool',
'irmc_auth_method': self.irmc_auth_method,
'irmc_port': 443,
'irmc_tempdir': "/tmp"
}
self.irmc_remote_image_server = '10.33.110.49'
self.irmc_remote_image_user_domain = 'example.local'
@@ -688,3 +703,127 @@ class SCCITestCase(testtools.TestCase):
self.report_ng_xml, ESSENTIAL_PROPERTIES_KEYS)
self.assertEqual(expected, result)
@mock.patch.object(ipmi, 'get_gpu')
@mock.patch.object(snmp, 'get_server_model')
@mock.patch.object(snmp, 'get_irmc_firmware_version')
@mock.patch.object(snmp, 'get_bios_firmware_version')
@mock.patch.object(ipmi, 'get_tpm_status')
def test_get_capabilities_properties(self,
tpm_mock,
bios_mock,
irmc_mock,
server_mock,
gpu_mock):
capabilities_properties = {'trusted_boot', 'irmc_firmware_version',
'rom_firmware_version', 'server_model',
'pci_gpu_devices'}
gpu_ids = ['0x1000/0x0079', '0x2100/0x0080']
kwargs = {}
kwargs['sleep_flag'] = True
tpm_mock.return_value = False
bios_mock.return_value = 'V4.6.5.4 R1.15.0 for D3099-B1x'
irmc_mock.return_value = 'iRMC S4-7.82F'
server_mock.return_value = 'TX2540M1F5'
gpu_mock.return_value = 1
expected = {'irmc_firmware_version': 'iRMC S4-7.82F',
'pci_gpu_devices': 1,
'rom_firmware_version': 'V4.6.5.4 R1.15.0 for D3099-B1x',
'server_model': 'TX2540M1F5',
'trusted_boot': False}
result = scci.get_capabilities_properties(
self.irmc_info,
capabilities_properties,
gpu_ids,
**kwargs)
self.assertEqual(expected, result)
tpm_mock.assert_called_once_with(self.irmc_info)
bios_mock.assert_called_once_with(mock.ANY)
irmc_mock.assert_called_once_with(mock.ANY)
server_mock.assert_called_once_with(mock.ANY)
gpu_mock.assert_called_once_with(self.irmc_info,
gpu_ids)
@mock.patch.object(ipmi, 'get_gpu')
@mock.patch.object(snmp, 'get_server_model')
@mock.patch.object(snmp, 'get_irmc_firmware_version')
@mock.patch.object(snmp, 'get_bios_firmware_version')
@mock.patch.object(ipmi, 'get_tpm_status')
def test_get_capabilities_properties_blank(self,
tpm_mock,
bios_mock,
irmc_mock,
server_mock,
gpu_mock):
capabilities_properties = {}
gpu_ids = ['0x1000/0x0079', '0x2100/0x0080']
kwargs = {}
kwargs['sleep_flag'] = True
tpm_mock.return_value = False
bios_mock.return_value = 'V4.6.5.4 R1.15.0 for D3099-B1x'
irmc_mock.return_value = 'iRMC S4-7.82F'
server_mock.return_value = 'TX2540M1F5'
gpu_mock.return_value = 1
expected = {}
result = scci.get_capabilities_properties(
self.irmc_info,
capabilities_properties,
gpu_ids,
**kwargs)
self.assertEqual(expected, result)
@mock.patch.object(ipmi, '_send_raw_command')
@mock.patch.object(snmp.SNMPClient, 'get')
def test_get_capabilities_properties_scci_client_error(self,
snmp_mock,
ipmiraw_mock):
capabilities_properties = {'trusted_boot', 'irmc_firmware_version',
'rom_firmware_version', 'server_model',
'pci_gpu_devices'}
gpu_ids = ['0x1000/0x0079', '0x2100/0x0080']
kwargs = {}
kwargs['sleep_flag'] = True
ipmiraw_mock.return_value = None
snmp_mock.side_effect = snmp.SNMPFailure("error")
e = self.assertRaises(scci.SCCIClientError,
scci.get_capabilities_properties,
self.irmc_info,
capabilities_properties,
gpu_ids,
**kwargs)
self.assertEqual('Capabilities inspection failed: SNMP operation \''
'GET BIOS FIRMWARE VERSION\' failed: error', str(e))
@mock.patch.object(ipmi, 'get_gpu')
@mock.patch.object(snmp.SNMPClient, 'get')
def test_get_capabilities_properties_scci_client_error_ipmi(self,
snmp_mock,
ipmi_mock):
capabilities_properties = {'trusted_boot', 'irmc_firmware_version',
'rom_firmware_version', 'server_model',
'pci_gpu_devices'}
gpu_ids = ['0x1000/0x0079', '0x2100/0x0080']
kwargs = {}
kwargs['sleep_flag'] = True
ipmi_mock.side_effect = ipmi.IPMIFailure("IPMI error")
snmp_mock.return_value = None
e = self.assertRaises(scci.SCCIClientError,
scci.get_capabilities_properties,
self.irmc_info,
capabilities_properties,
gpu_ids,
**kwargs)
self.assertEqual('Capabilities inspection failed: IPMI error', str(e))

View File

@@ -0,0 +1,305 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test class for snmp module.
"""
import mock
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp import error as snmp_error
import testtools
from scciclient.irmc import snmp
class IRMCSnmpTestCase(testtools.TestCase):
"""Tests for SNMP module
Unit Test Cases for getting information via snmp module
"""
def setUp(self):
super(IRMCSnmpTestCase, self).setUp()
def test_get_irmc_firmware_version(self):
snmp_client = mock.Mock()
snmp_client.get.side_effect = ['iRMC S4', '7.82F']
cmd1 = snmp.BMC_NAME_OID
cmd2 = snmp.IRMC_FW_VERSION_OID
actual_out = snmp.get_irmc_firmware_version(snmp_client)
self.assertEqual('iRMC S4-7.82F', actual_out)
snmp_client.get.assert_has_calls([mock.call(cmd1),
mock.call(cmd2)])
def test_get_irmc_firmware_version_BMC_only(self):
snmp_client = mock.Mock()
snmp_client.get.side_effect = ['iRMC S4', '']
cmd1 = snmp.BMC_NAME_OID
cmd2 = snmp.IRMC_FW_VERSION_OID
actual_out = snmp.get_irmc_firmware_version(snmp_client)
self.assertEqual('iRMC S4', actual_out)
snmp_client.get.assert_has_calls([mock.call(cmd1),
mock.call(cmd2)])
def test_get_irmc_firmware_version_FW_only(self):
snmp_client = mock.Mock()
snmp_client.get.side_effect = ['', '7.82F']
cmd1 = snmp.BMC_NAME_OID
cmd2 = snmp.IRMC_FW_VERSION_OID
actual_out = snmp.get_irmc_firmware_version(snmp_client)
self.assertEqual('7.82F', actual_out)
snmp_client.get.assert_has_calls([mock.call(cmd1),
mock.call(cmd2)])
def test_get_irmc_firmware_version_blank(self):
snmp_client = mock.Mock()
snmp_client.get.side_effect = ['', '']
cmd1 = snmp.BMC_NAME_OID
cmd2 = snmp.IRMC_FW_VERSION_OID
actual_out = snmp.get_irmc_firmware_version(snmp_client)
self.assertEqual('', actual_out)
snmp_client.get.assert_has_calls([mock.call(cmd1),
mock.call(cmd2)])
def test_get_irmc_firmware_version_exception(self):
snmp_client = mock.Mock()
snmp_client.get.side_effect = snmp.SNMPFailure('Error')
cmd1 = snmp.BMC_NAME_OID
e = self.assertRaises(snmp.SNMPIRMCFirmwareFailure,
snmp.get_irmc_firmware_version,
snmp_client)
snmp_client.get.assert_has_calls([mock.call(cmd1)])
self.assertEqual('SNMP operation \'GET IRMC FIRMWARE VERSION\''
' failed: Error', str(e))
def test_get_bios_firmware_version(self):
snmp_client = mock.Mock()
snmp_client.return_value = 'V4.6.5.4 R1.15.0 for D3099-B1x'
snmp_client.get.return_value = 'V4.6.5.4 R1.15.0 for D3099-B1x'
cmd = snmp.BIOS_FW_VERSION_OID
actual_out = snmp.get_bios_firmware_version(snmp_client)
self.assertEqual('V4.6.5.4 R1.15.0 for D3099-B1x', actual_out)
snmp_client.get.assert_called_once_with(cmd)
def test_get_bios_firmware_version_exception(self):
snmp_client = mock.Mock()
snmp_client.get.side_effect = snmp.SNMPFailure('Error')
cmd = snmp.BIOS_FW_VERSION_OID
e = self.assertRaises(snmp.SNMPBIOSFirmwareFailure,
snmp.get_bios_firmware_version,
snmp_client)
snmp_client.get.assert_called_once_with(cmd)
self.assertEqual('SNMP operation \'GET BIOS FIRMWARE VERSION\''
' failed: Error', str(e))
def test_get_server_model(self):
snmp_client = mock.Mock()
snmp_client.return_value = 'TX2540M1F5'
snmp_client.get.return_value = 'TX2540M1F5'
cmd = snmp.SERVER_MODEL_OID
actual_out = snmp.get_server_model(snmp_client)
self.assertEqual('TX2540M1F5', actual_out)
snmp_client.get.assert_called_once_with(cmd)
def test_get_server_model_exception(self):
snmp_client = mock.Mock()
snmp_client.get.side_effect = snmp.SNMPFailure('Error')
cmd = snmp.SERVER_MODEL_OID
e = self.assertRaises(snmp.SNMPServerModelFailure,
snmp.get_server_model,
snmp_client)
snmp_client.get.assert_called_once_with(cmd)
self.assertEqual('SNMP operation \'GET SERVER MODEL\''
' failed: Error', str(e))
@mock.patch.object(cmdgen, 'CommandGenerator', autospec=True)
class SNMPClientTestCase(testtools.TestCase):
def setUp(self):
super(SNMPClientTestCase, self).setUp()
self.address = '1.2.3.4'
self.port = '6700'
self.oid = 'oid'
self.value = 'value'
def test___init__(self, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
mock_cmdgen.assert_called_once_with()
self.assertEqual(self.address, client.address)
self.assertEqual(self.port, client.port)
self.assertEqual(snmp.SNMP_V1, client.version)
self.assertIsNone(client.community)
self.assertNotIn('security', client.__dict__)
self.assertEqual(mock_cmdgen.return_value, client.cmd_gen)
def test_get(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.getCmd.return_value = ("", None, 0, [var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
val = client.get(self.oid)
self.assertEqual(var_bind[1], val)
mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
@mock.patch.object(cmdgen, 'CommunityData', autospec=True)
def test__get_auth_v1(self, mock_community, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
client._get_auth()
mock_cmdgen.assert_called_once_with()
mock_community.assert_called_once_with(client.community, mpModel=0)
@mock.patch.object(cmdgen, 'CommunityData', autospec=True)
def test__get_auth_v2c(self, mock_community, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V2C)
client._get_auth()
mock_cmdgen.assert_called_once_with()
mock_community.assert_called_once_with(client.community, mpModel=1)
@mock.patch.object(cmdgen, 'UsmUserData', autospec=True)
def test__get_auth_v3(self, mock_user, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_auth()
mock_cmdgen.assert_called_once_with()
mock_user.assert_called_once_with(client.security)
@mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True)
def test__get_transport(self, mock_transport, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_transport()
mock_cmdgen.assert_called_once_with()
mock_transport.assert_called_once_with((client.address, client.port))
@mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True)
def test__get_transport_err(self, mock_transport, mock_cmdgen):
mock_transport.side_effect = snmp_error.PySnmpError()
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp_error.PySnmpError, client._get_transport)
mock_cmdgen.assert_called_once_with()
mock_transport.assert_called_once_with((client.address, client.port))
def test_get_pdu_err(self, mock_cmdgen):
var_bind = (self.oid, self.value)
error_status = mock.Mock()
error_status.prettyPrint = lambda: "pdu error"
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.getCmd.return_value = (None, error_status, 1,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
def test_get_next(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.nextCmd.return_value = (
"", None, 0, [[var_bind, var_bind]])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
val = client.get_next(self.oid)
self.assertEqual([self.value, self.value], val)
mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
@mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True)
def test_get_err_transport(self, mock_transport, mock_cmdgen):
mock_transport.side_effect = snmp_error.PySnmpError
mock_cmdgenerator = mock_cmdgen.return_value
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
self.assertFalse(mock_cmdgenerator.getCmd.called)
@mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True)
def test_get_next_err_transport(self, mock_transport,
mock_cmdgen):
mock_transport.side_effect = snmp_error.PySnmpError
mock_cmdgenerator = mock_cmdgen.return_value
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
self.assertFalse(mock_cmdgenerator.nextCmd.called)
def test_get_err_engine(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.getCmd.return_value = ("engine error", None, 0,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
def test_get_next_err_engine(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.nextCmd.return_value = ("engine error", None, 0,
[[var_bind, var_bind]])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
def test_get_next_pdu_err(self, mock_cmdgen):
var_bind = (self.oid, self.value)
error_status = mock.Mock()
error_status.prettyPrint = lambda: "pdu error"
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.nextCmd.return_value = (None, error_status, 1,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
def test_set(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.setCmd.return_value = ("", None, 0, [var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client.set(self.oid, self.value)
mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
var_bind)
@mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True)
def test_set_err_transport(self, mock_transport, mock_cmdgen):
mock_transport.side_effect = snmp_error.PySnmpError
mock_cmdgenerator = mock_cmdgen.return_value
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure,
client.set, self.oid, self.value)
self.assertFalse(mock_cmdgenerator.setCmd.called)
def test_set_err_engine(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.setCmd.return_value = ("engine error", None, 0,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure,
client.set, self.oid, self.value)
mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
var_bind)
def test_set_pdu_err(self, mock_cmdgen):
var_bind = (self.oid, self.value)
error_status = mock.Mock()
error_status.prettyPrint = lambda: "pdu error"
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.setCmd.return_value = (None, error_status, 1,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure,
client.set, self.oid, self.value)
mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
var_bind)