diff --git a/requirements.txt b/requirements.txt index aba4263..8143422 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,10 @@ # The order of packages is significant, because pip processes them in the order # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. - pbr!=2.1.0,>=2.0.0 # Apache-2.0 Babel!=2.4.0,>=2.3.4 # BSD +pyghmi>=1.0.22 # Apache-2.0 +pysnmp>=4.2.3 # BSD requests>=2.14.2 # Apache-2.0 six>=1.9.0 # MIT oslo.utils>=3.28.0 # Apache-2.0 diff --git a/scciclient/irmc/ipmi.py b/scciclient/irmc/ipmi.py new file mode 100755 index 0000000..f5cc686 --- /dev/null +++ b/scciclient/irmc/ipmi.py @@ -0,0 +1,172 @@ +# Copyright 2017 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import itertools + +from pyghmi import exceptions as ipmi_exception +from pyghmi.ipmi import command as ipmi_command + +# F1 1A - Get the number of GPU devices on PCI and the number of CPUs with FPGA +GET_GPU = '0x2E 0xF1 0x80 0x28 0x00 0x1A %s 0x00' + +# F5 81 - GET TPM STATUS +GET_TPM_STATUS = '0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0' + + +class IPMIFailure(Exception): + """IPMI Failure + + This exception is used when IPMI operation failed. + """ + def __init__(self, message): + super(IPMIFailure, self).__init__(message) + + +class InvalidParameterValue(IPMIFailure): + """Invalid Parameter Value Failure + + This exception is used when invalid parameter values are passed to + the APIs exposed by this module. + """ + def __init__(self, message): + super(InvalidParameterValue, self).__init__(message) + + +def _parse_raw_bytes(raw_bytes): + """Convert a string of hexadecimal values to decimal values parameters + + Example: '0x2E 0xF1 0x80 0x28 0x00 0x1A 0x01 0x00' is converted to: + 46, 241, [128, 40, 0, 26, 1, 0] + + :param raw_bytes: string of hexadecimal values + :returns: 3 decimal values + """ + bytes_list = [int(x, base=16) for x in raw_bytes.split()] + return bytes_list[0], bytes_list[1], bytes_list[2:] + + +def _send_raw_command(ipmicmd, raw_bytes): + """Use IPMI command object to send raw ipmi command to BMC + + :param ipmicmd: IPMI command object + :param raw_bytes: string of hexadecimal values. This is commonly used + for certain vendor specific commands. + :returns: dict -- The response from IPMI device + """ + + netfn, command, data = _parse_raw_bytes(raw_bytes) + response = ipmicmd.raw_command(netfn, command, data=data) + + return response + + +def get_tpm_status(d_info): + """Get the TPM support status. + + Get the TPM support status of the node. + + :param d_info: the list of ipmitool parameters for accessing a node. + :returns: TPM support status + """ + + # note: + # Get TPM support status : ipmi cmd '0xF5', valid flags '0xC0' + # + # $ ipmitool raw 0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0 + # + # Raw response: + # 80 28 00 C0 C0: True + # 80 28 00 -- --: False (other values than 'C0 C0') + + ipmicmd = ipmi_command.Command(bmc=d_info['irmc_address'], + userid=d_info['irmc_username'], + password=d_info['irmc_password']) + try: + response = _send_raw_command(ipmicmd, GET_TPM_STATUS) + if response['code'] != 0: + raise IPMIFailure( + "IPMI operation '%(operation)s' failed: %(error)s" % + {'operation': "GET TMP status", + 'error': response.get('error')}) + out = ' '.join('{:02X}'.format(x) for x in response['data']) + return out is not None and out[-5:] == 'C0 C0' + + except ipmi_exception.IpmiException as e: + raise IPMIFailure( + "IPMI operation '%(operation)s' failed: %(error)s" % + {'operation': "GET TMP status", 'error': e}) + + +def _pci_seq(ipmicmd): + """Get output of ipmiraw command and the ordinal numbers. + + :param ipmicmd: IPMI command object. + :returns: List of tuple contain ordinal number and output of ipmiraw + command. + """ + for i in range(1, 0xff + 1): + try: + res = _send_raw_command(ipmicmd, GET_GPU % hex(i)) + yield i, res + except ipmi_exception.IpmiException as e: + raise IPMIFailure( + "IPMI operation '%(operation)s' failed: %(error)s" % + {'operation': "GET GPU device quantity", 'error': e}) + + +def get_gpu(d_info, pci_device_ids): + """Get quantity of GPU devices on PCI and quantity of CPUs with FPGA. + + Get quantity of GPU devices on PCI and quantity of CPUs with FPGA of the + node. + + :param d_info: the list of ipmitool parameters for accessing a node. + :param pci_device_ids: the list contains pairs of / for + GPU on PCI. + :returns: a tuple of the number of GPU devices on PCI and the number of + CPUs with FPGA. + """ + + # note: + # Get quantity of GPU devices on PCI and quantity of CPUs with FPGA: + # ipmi cmd '0xF1' + # + # $ ipmitool raw 0x2E 0xF1 0x80 0x28 0x00 0x1A 0x01 0x00 + # + # Raw response: + # 80 28 00 00 00 05 data1 data2 34 17 76 11 00 04 + # 01 + + # data1: 2 octet of VendorID + # data2: 2 octet of DeviceID + + ipmicmd = ipmi_command.Command(bmc=d_info['irmc_address'], + userid=d_info['irmc_username'], + password=d_info['irmc_password']) + + response = itertools.takewhile( + lambda y: (y[1]['code'] != 0xC9 and y[1].get('error') is None), + _pci_seq(ipmicmd)) + + def _pci_count(accm, v): + out = v[1]['data'] + # if system returns value, record id will be increased. + pci_id = "0x{:02x}{:02x}/0x{:02x}{:02x}".format( + out[7], out[6], out[9], out[8]) + return accm + 1 if pci_id in pci_device_ids else accm + + gpu_count = functools.reduce(_pci_count, response, 0) + + return gpu_count diff --git a/scciclient/irmc/scci.py b/scciclient/irmc/scci.py old mode 100644 new mode 100755 index c443b99..eec2144 --- a/scciclient/irmc/scci.py +++ b/scciclient/irmc/scci.py @@ -23,6 +23,8 @@ import xml.etree.ElementTree as ET import requests import six +from scciclient.irmc import ipmi +from scciclient.irmc import snmp DEBUG = False @@ -486,3 +488,56 @@ def get_essential_properties(report, prop_keys): v['cpu_arch'] = 'x86_64' return {k: v[k] for k in prop_keys} + + +def get_capabilities_properties(d_info, + capa_keys, + pci_device_ids, + **kwargs): + """get capabilities properties + + This function returns a dictionary which contains keys + and their values from the report. + + + :param d_info: the dictionary of ipmitool parameters for accessing a node. + :param capa_keys: a list of keys for additional capabilities properties. + :param pci_device_ids: the list of string contains / + for GPU. + :param kwargs: additional arguments passed to scciclient. + :returns: a dictionary which contains keys and their values. + """ + + snmp_client = snmp.SNMPClient(d_info['irmc_address'], + d_info['irmc_snmp_port'], + d_info['irmc_snmp_version'], + d_info['irmc_snmp_community'], + d_info['irmc_snmp_security']) + try: + v = {} + if 'rom_firmware_version' in capa_keys: + v['rom_firmware_version'] = \ + snmp.get_bios_firmware_version(snmp_client) + + if 'irmc_firmware_version' in capa_keys: + v['irmc_firmware_version'] = \ + snmp.get_irmc_firmware_version(snmp_client) + + if 'server_model' in capa_keys: + v['server_model'] = snmp.get_server_model(snmp_client) + + # Sometime the server started but PCI device list building is + # still in progress so system will response error. We have to wait + # for some more seconds. + if kwargs.get('sleep_flag', False) and 'pci_gpu_devices' in capa_keys: + time.sleep(5) + + if 'pci_gpu_devices' in capa_keys: + v['pci_gpu_devices'] = ipmi.get_gpu(d_info, pci_device_ids) + + if 'trusted_boot' in capa_keys: + v['trusted_boot'] = ipmi.get_tpm_status(d_info) + + return v + except (snmp.SNMPFailure, ipmi.IPMIFailure) as err: + raise SCCIClientError('Capabilities inspection failed: %s' % err) diff --git a/scciclient/irmc/snmp.py b/scciclient/irmc/snmp.py new file mode 100644 index 0000000..9f0f564 --- /dev/null +++ b/scciclient/irmc/snmp.py @@ -0,0 +1,260 @@ +# Copyright 2017 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from pysnmp.entity.rfc3413.oneliner import cmdgen +from pysnmp import error as snmp_error + +BMC_NAME_OID = '1.3.6.1.4.1.231.2.10.2.2.10.3.4.1.3.1.1' +IRMC_FW_VERSION_OID = '1.3.6.1.4.1.231.2.10.2.2.10.3.4.1.4.1.1' +BIOS_FW_VERSION_OID = '1.3.6.1.4.1.231.2.10.2.2.10.4.1.1.11.1' +SERVER_MODEL_OID = '1.3.6.1.4.1.231.2.10.2.2.10.2.3.1.4.1' + +SNMP_V1 = '1' +SNMP_V2C = '2c' +SNMP_V3 = '3' + +SNMP_FAILURE_MSG = "SNMP operation '%s' failed: %s" + + +class SNMPFailure(Exception): + """SNMP Failure + + This exception is used when invalid inputs are passed to + the APIs exposed by this module. + """ + def __init__(self, message): + super(SNMPFailure, self).__init__(message) + + +class SNMPIRMCFirmwareFailure(SNMPFailure): + """SNMP iRMC Firmware Failure + + This exception is used when error occurs when collecting iRMC firmware. + """ + def __init__(self, message): + super(SNMPIRMCFirmwareFailure, self).__init__(message) + + +class SNMPBIOSFirmwareFailure(SNMPFailure): + """SNMP BIOS Firmware Failure + + This exception is used when error occurs when collecting BIOS firmware. + """ + def __init__(self, message): + super(SNMPBIOSFirmwareFailure, self).__init__(message) + + +class SNMPServerModelFailure(SNMPFailure): + """SNMP Server Model Failure + + This exception is used when error occurs when collecting server model. + """ + def __init__(self, message): + super(SNMPServerModelFailure, self).__init__(message) + + +def get_irmc_firmware_version(snmp_client): + """Get irmc firmware version of the node. + + :param snmp_client: an SNMP client object. + :raises: SNMPFailure if SNMP operation failed. + :returns: a string of bmc name and irmc firmware version. + """ + + try: + bmc_name = snmp_client.get(BMC_NAME_OID) + irmc_firm_ver = snmp_client.get(IRMC_FW_VERSION_OID) + return ('%(bmc)s%(sep)s%(firm_ver)s' % + {'bmc': bmc_name if bmc_name else '', + 'firm_ver': irmc_firm_ver if irmc_firm_ver else '', + 'sep': '-' if bmc_name and irmc_firm_ver else ''}) + except SNMPFailure as e: + raise SNMPIRMCFirmwareFailure( + SNMP_FAILURE_MSG % ("GET IRMC FIRMWARE VERSION", e)) + + +def get_bios_firmware_version(snmp_client): + """Get bios firmware version of the node. + + :param snmp_client: an SNMP client object. + :raises: SNMPFailure if SNMP operation failed. + :returns: a string of bios firmware version. + """ + + try: + bios_firmware_version = snmp_client.get(BIOS_FW_VERSION_OID) + return six.text_type(bios_firmware_version) + except SNMPFailure as e: + raise SNMPBIOSFirmwareFailure( + SNMP_FAILURE_MSG % ("GET BIOS FIRMWARE VERSION", e)) + + +def get_server_model(snmp_client): + """Get server model of the node. + + :param snmp_client: an SNMP client object. + :raises: SNMPFailure if SNMP operation failed. + :returns: a string of server model. + """ + + try: + server_model = snmp_client.get(SERVER_MODEL_OID) + return six.text_type(server_model) + except SNMPFailure as e: + raise SNMPServerModelFailure( + SNMP_FAILURE_MSG % ("GET SERVER MODEL", e)) + + +class SNMPClient(object): + """SNMP client object. + + Performs low level SNMP get and set operations. Encapsulates all + interaction with PySNMP to simplify dynamic importing and unit testing. + """ + + def __init__(self, address, port, version, community=None, security=None): + self.address = address + self.port = port + self.version = version + if self.version == SNMP_V3: + self.security = security + else: + self.community = community + self.cmd_gen = cmdgen.CommandGenerator() + + def _get_auth(self): + """Return the authorization data for an SNMP request. + + :returns: A + :class:`pysnmp.entity.rfc3413.oneliner.cmdgen.CommunityData` + object. + """ + if self.version == SNMP_V3: + # Handling auth/encryption credentials is not (yet) supported. + # This version supports a security name analogous to community. + return cmdgen.UsmUserData(self.security) + else: + mp_model = 1 if self.version == SNMP_V2C else 0 + return cmdgen.CommunityData(self.community, mpModel=mp_model) + + def _get_transport(self): + """Return the transport target for an SNMP request. + + :returns: A :class: + `pysnmp.entity.rfc3413.oneliner.cmdgen.UdpTransportTarget` object. + :raises: snmp_error.PySnmpError if the transport address is bad. + """ + # The transport target accepts timeout and retries parameters, which + # default to 1 (second) and 5 respectively. These are deemed sensible + # enough to allow for an unreliable network or slow device. + return cmdgen.UdpTransportTarget((self.address, self.port)) + + def get(self, oid): + """Use PySNMP to perform an SNMP GET operation on a single object. + + :param oid: The OID of the object to get. + :raises: SNMPFailure if an SNMP request fails. + :returns: The value of the requested object. + """ + try: + results = self.cmd_gen.getCmd(self._get_auth(), + self._get_transport(), + oid) + except snmp_error.PySnmpError as e: + raise SNMPFailure(SNMP_FAILURE_MSG % ("GET", e)) + + error_indication, error_status, error_index, var_binds = results + + if error_indication: + # SNMP engine-level error. + raise SNMPFailure(SNMP_FAILURE_MSG % ("GET", error_indication)) + + if error_status: + # SNMP PDU error. + raise SNMPFailure( + "SNMP operation '%(operation)s' failed: %(error)s at" + " %(index)s" % + {'operation': "GET", 'error': error_status.prettyPrint(), + 'index': + error_index and var_binds[int(error_index) - 1] + or '?'}) + + # We only expect a single value back + name, val = var_binds[0] + return val + + def get_next(self, oid): + """Use PySNMP to perform an SNMP GET NEXT operation on a table object. + + :param oid: The OID of the object to get. + :raises: SNMPFailure if an SNMP request fails. + :returns: A list of values of the requested table object. + """ + try: + results = self.cmd_gen.nextCmd(self._get_auth(), + self._get_transport(), + oid) + except snmp_error.PySnmpError as e: + raise SNMPFailure(SNMP_FAILURE_MSG % ("GET_NEXT", e)) + + error_indication, error_status, error_index, var_binds = results + + if error_indication: + # SNMP engine-level error. + raise SNMPFailure( + SNMP_FAILURE_MSG % ("GET_NEXT", error_indication)) + + if error_status: + # SNMP PDU error. + raise SNMPFailure( + "SNMP operation '%(operation)s' failed: %(error)s at" + " %(index)s" % + {'operation': "GET_NEXT", 'error': error_status.prettyPrint(), + 'index': + error_index and var_binds[int(error_index) - 1] + or '?'}) + + return [val for row in var_binds for name, val in row] + + def set(self, oid, value): + """Use PySNMP to perform an SNMP SET operation on a single object. + + :param oid: The OID of the object to set. + :param value: The value of the object to set. + :raises: SNMPFailure if an SNMP request fails. + """ + try: + results = self.cmd_gen.setCmd(self._get_auth(), + self._get_transport(), + (oid, value)) + except snmp_error.PySnmpError as e: + raise SNMPFailure(SNMP_FAILURE_MSG % ("SET", e)) + + error_indication, error_status, error_index, var_binds = results + + if error_indication: + # SNMP engine-level error. + raise SNMPFailure(SNMP_FAILURE_MSG % ("SET", error_indication)) + + if error_status: + # SNMP PDU error. + raise SNMPFailure( + "SNMP operation '%(operation)s' failed: %(error)s at" + " %(index)s" % + {'operation': "SET", 'error': error_status.prettyPrint(), + 'index': + error_index and var_binds[int(error_index) - 1] + or '?'}) diff --git a/scciclient/tests/irmc/test_ipmi.py b/scciclient/tests/irmc/test_ipmi.py new file mode 100644 index 0000000..556d029 --- /dev/null +++ b/scciclient/tests/irmc/test_ipmi.py @@ -0,0 +1,139 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test class for IPMI Module. +""" + +import mock +import testtools + +from pyghmi import exceptions as ipmi_exception +from pyghmi.ipmi import command as ipmi_command + +from scciclient.irmc import ipmi + + +@mock.patch.object(ipmi_command, 'Command', new=mock.Mock()) +class IpmiTestCase(testtools.TestCase): + """Tests for IPMI + + Unit Test Cases for getting information via ipmi raw command + """ + + def setUp(self): + super(IpmiTestCase, self).setUp() + + self.info = {'irmc_address': "10.0.0.10", + 'irmc_username': "admin", + 'irmc_password': "admin", + } + + @mock.patch.object(ipmi, '_send_raw_command') + def test_get_tpm_status_true(self, exec_mock): + exec_mock.return_value = {'command': 0xF5, 'code': 0x00, 'netfn': 0x2F, + 'data': [0x80, 0x28, 0x00, 0xC0, 0xC0]} + + cmd = "0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0" + actual_out = ipmi.get_tpm_status(self.info) + self.assertEqual(True, actual_out) + exec_mock.assert_called_once_with(mock.ANY, cmd) + + @mock.patch.object(ipmi, '_send_raw_command') + def test_get_tpm_status_false(self, exec_mock): + exec_mock.return_value = {'command': 0xF5, 'code': 0x00, 'netfn': 0x2F, + 'data': [0x80, 0x28, 0x00, 0x80, 0x01]} + cmd = "0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0" + + actual_out = ipmi.get_tpm_status(self.info) + self.assertEqual(False, actual_out) + exec_mock.assert_called_once_with(mock.ANY, cmd) + + @mock.patch.object(ipmi, '_send_raw_command') + def test_get_tpm_status_error_code(self, exec_mock): + exec_mock.return_value = {'command': 0xF5, 'code': 0x01, 'netfn': 0x2F, + 'data': [0x80, 0x28, 0x00, 0x80, 0x01]} + cmd = "0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0" + + self.assertRaises(ipmi.IPMIFailure, ipmi.get_tpm_status, self.info) + exec_mock.assert_called_once_with(mock.ANY, cmd) + + @mock.patch.object(ipmi, '_send_raw_command') + def test_get_tpm_status_exception(self, exec_mock): + exec_mock.side_effect = ipmi_exception.IpmiException + + cmd = "0x2E 0xF5 0x80 0x28 0x00 0x81 0xC0" + + self.assertRaises(ipmi.IPMIFailure, ipmi.get_tpm_status, self.info) + exec_mock.assert_called_once_with(mock.ANY, cmd) + + @mock.patch.object(ipmi, '_send_raw_command') + def test_get_gpu(self, exec_mock): + gpu_ids = ['0x1000/0x0079', '0x2100/0x0080'] + + exec_mock.side_effect = ({'command': 0xF1, 'code': 0x00, 'netfn': 0x2F, + 'data': [0x80, 0x28, 0x00, 0x00, 0x00, 0x05, + 0x00, 0x10, 0x79, 0x00, 0x34, 0x17, + 0x76, 0x11, 0x00, 0x04, 0x01]}, + {'command': 0xF1, 'code': 0xC9, 'netfn': 0x2F, + 'error': 'Parameter out of range', + 'data': [0x80, 0x28, 0x00]}) + + cmd1 = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x1 0x00" + cmd2 = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x2 0x00" + actual_out = ipmi.get_gpu(self.info, gpu_ids) + self.assertEqual(1, actual_out) + exec_mock.assert_has_calls([mock.call(mock.ANY, cmd1), + mock.call(mock.ANY, cmd2)]) + + @mock.patch.object(ipmi, '_send_raw_command') + def test_get_gpu_blank(self, exec_mock): + gpu_ids = [] + + actual_out = ipmi.get_gpu(self.info, gpu_ids) + self.assertEqual(0, actual_out) + self.assertTrue(exec_mock.called) + + @mock.patch.object(ipmi, '_send_raw_command') + def test_get_gpu_not_found(self, exec_mock): + gpu_ids = ['0x1111/0x1179', '0x2100/0x0080'] + + exec_mock.side_effect = ({'command': 0xF1, 'code': 0x00, 'netfn': 0x2F, + 'data': [0x80, 0x28, 0x00, 0x00, 0x00, 0x05, + 0x00, 0x10, 0x79, 0x00, 0x34, 0x17, + 0x76, 0x11, 0x00, 0x04, 0x01]}, + {'command': 0xF1, 'code': 0xC9, 'netfn': 0x2F, + 'error': 'Parameter out of range', + 'data': [0x80, 0x28, 0x00]}) + cmd1 = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x1 0x00" + cmd2 = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x2 0x00" + actual_out = ipmi.get_gpu(self.info, gpu_ids) + self.assertEqual(0, actual_out) + exec_mock.assert_has_calls([mock.call(mock.ANY, cmd1), + mock.call(mock.ANY, cmd2)]) + + @mock.patch.object(ipmi, '_send_raw_command') + def test_get_gpu_exception(self, exec_mock): + gpu_ids = ['0x1111/0x1179', '0x2100/0x0080'] + + exec_mock.side_effect = ipmi_exception.IpmiException('Error') + + cmd = "0x2E 0xF1 0x80 0x28 0x00 0x1A 0x1 0x00" + + e = self.assertRaises(ipmi.IPMIFailure, + ipmi.get_gpu, + self.info, + gpu_ids) + exec_mock.assert_called_once_with(mock.ANY, cmd) + self.assertEqual('IPMI operation \'GET GPU device quantity\' ' + 'failed: Error', str(e)) diff --git a/scciclient/tests/irmc/test_scci.py b/scciclient/tests/irmc/test_scci.py index 3b7746b..a26624f 100644 --- a/scciclient/tests/irmc/test_scci.py +++ b/scciclient/tests/irmc/test_scci.py @@ -23,7 +23,9 @@ import mock from requests_mock.contrib import fixture as rm_fixture import testtools +from scciclient.irmc import ipmi from scciclient.irmc import scci +from scciclient.irmc import snmp class SCCITestCase(testtools.TestCase): @@ -55,6 +57,19 @@ class SCCITestCase(testtools.TestCase): self.irmc_port = 80 self.irmc_auth_method = 'basic' self.irmc_client_timeout = 60 + self.irmc_info = {'irmc_address': self.irmc_address, + 'irmc_username': self.irmc_username, + 'irmc_password': self.irmc_password, + 'irmc_snmp_port': 161, + 'irmc_snmp_version': 'v2c', + 'irmc_snmp_community': 'public', + 'irmc_snmp_security': None, + 'irmc_client_timeout': self.irmc_client_timeout, + 'irmc_sensor_method': 'ipmitool', + 'irmc_auth_method': self.irmc_auth_method, + 'irmc_port': 443, + 'irmc_tempdir': "/tmp" + } self.irmc_remote_image_server = '10.33.110.49' self.irmc_remote_image_user_domain = 'example.local' @@ -688,3 +703,127 @@ class SCCITestCase(testtools.TestCase): self.report_ng_xml, ESSENTIAL_PROPERTIES_KEYS) self.assertEqual(expected, result) + + @mock.patch.object(ipmi, 'get_gpu') + @mock.patch.object(snmp, 'get_server_model') + @mock.patch.object(snmp, 'get_irmc_firmware_version') + @mock.patch.object(snmp, 'get_bios_firmware_version') + @mock.patch.object(ipmi, 'get_tpm_status') + def test_get_capabilities_properties(self, + tpm_mock, + bios_mock, + irmc_mock, + server_mock, + gpu_mock): + capabilities_properties = {'trusted_boot', 'irmc_firmware_version', + 'rom_firmware_version', 'server_model', + 'pci_gpu_devices'} + gpu_ids = ['0x1000/0x0079', '0x2100/0x0080'] + kwargs = {} + kwargs['sleep_flag'] = True + + tpm_mock.return_value = False + bios_mock.return_value = 'V4.6.5.4 R1.15.0 for D3099-B1x' + irmc_mock.return_value = 'iRMC S4-7.82F' + server_mock.return_value = 'TX2540M1F5' + gpu_mock.return_value = 1 + + expected = {'irmc_firmware_version': 'iRMC S4-7.82F', + 'pci_gpu_devices': 1, + 'rom_firmware_version': 'V4.6.5.4 R1.15.0 for D3099-B1x', + 'server_model': 'TX2540M1F5', + 'trusted_boot': False} + + result = scci.get_capabilities_properties( + self.irmc_info, + capabilities_properties, + gpu_ids, + **kwargs) + + self.assertEqual(expected, result) + tpm_mock.assert_called_once_with(self.irmc_info) + bios_mock.assert_called_once_with(mock.ANY) + irmc_mock.assert_called_once_with(mock.ANY) + server_mock.assert_called_once_with(mock.ANY) + gpu_mock.assert_called_once_with(self.irmc_info, + gpu_ids) + + @mock.patch.object(ipmi, 'get_gpu') + @mock.patch.object(snmp, 'get_server_model') + @mock.patch.object(snmp, 'get_irmc_firmware_version') + @mock.patch.object(snmp, 'get_bios_firmware_version') + @mock.patch.object(ipmi, 'get_tpm_status') + def test_get_capabilities_properties_blank(self, + tpm_mock, + bios_mock, + irmc_mock, + server_mock, + gpu_mock): + + capabilities_properties = {} + gpu_ids = ['0x1000/0x0079', '0x2100/0x0080'] + kwargs = {} + kwargs['sleep_flag'] = True + + tpm_mock.return_value = False + bios_mock.return_value = 'V4.6.5.4 R1.15.0 for D3099-B1x' + irmc_mock.return_value = 'iRMC S4-7.82F' + server_mock.return_value = 'TX2540M1F5' + gpu_mock.return_value = 1 + + expected = {} + + result = scci.get_capabilities_properties( + self.irmc_info, + capabilities_properties, + gpu_ids, + **kwargs) + + self.assertEqual(expected, result) + + @mock.patch.object(ipmi, '_send_raw_command') + @mock.patch.object(snmp.SNMPClient, 'get') + def test_get_capabilities_properties_scci_client_error(self, + snmp_mock, + ipmiraw_mock): + capabilities_properties = {'trusted_boot', 'irmc_firmware_version', + 'rom_firmware_version', 'server_model', + 'pci_gpu_devices'} + gpu_ids = ['0x1000/0x0079', '0x2100/0x0080'] + kwargs = {} + kwargs['sleep_flag'] = True + + ipmiraw_mock.return_value = None + snmp_mock.side_effect = snmp.SNMPFailure("error") + + e = self.assertRaises(scci.SCCIClientError, + scci.get_capabilities_properties, + self.irmc_info, + capabilities_properties, + gpu_ids, + **kwargs) + self.assertEqual('Capabilities inspection failed: SNMP operation \'' + 'GET BIOS FIRMWARE VERSION\' failed: error', str(e)) + + @mock.patch.object(ipmi, 'get_gpu') + @mock.patch.object(snmp.SNMPClient, 'get') + def test_get_capabilities_properties_scci_client_error_ipmi(self, + snmp_mock, + ipmi_mock): + capabilities_properties = {'trusted_boot', 'irmc_firmware_version', + 'rom_firmware_version', 'server_model', + 'pci_gpu_devices'} + gpu_ids = ['0x1000/0x0079', '0x2100/0x0080'] + kwargs = {} + kwargs['sleep_flag'] = True + + ipmi_mock.side_effect = ipmi.IPMIFailure("IPMI error") + snmp_mock.return_value = None + + e = self.assertRaises(scci.SCCIClientError, + scci.get_capabilities_properties, + self.irmc_info, + capabilities_properties, + gpu_ids, + **kwargs) + self.assertEqual('Capabilities inspection failed: IPMI error', str(e)) diff --git a/scciclient/tests/irmc/test_snmp.py b/scciclient/tests/irmc/test_snmp.py new file mode 100644 index 0000000..b837292 --- /dev/null +++ b/scciclient/tests/irmc/test_snmp.py @@ -0,0 +1,305 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test class for snmp module. +""" + + +import mock +from pysnmp.entity.rfc3413.oneliner import cmdgen +from pysnmp import error as snmp_error +import testtools + +from scciclient.irmc import snmp + + +class IRMCSnmpTestCase(testtools.TestCase): + """Tests for SNMP module + + Unit Test Cases for getting information via snmp module + """ + + def setUp(self): + super(IRMCSnmpTestCase, self).setUp() + + def test_get_irmc_firmware_version(self): + snmp_client = mock.Mock() + snmp_client.get.side_effect = ['iRMC S4', '7.82F'] + cmd1 = snmp.BMC_NAME_OID + cmd2 = snmp.IRMC_FW_VERSION_OID + actual_out = snmp.get_irmc_firmware_version(snmp_client) + self.assertEqual('iRMC S4-7.82F', actual_out) + snmp_client.get.assert_has_calls([mock.call(cmd1), + mock.call(cmd2)]) + + def test_get_irmc_firmware_version_BMC_only(self): + snmp_client = mock.Mock() + snmp_client.get.side_effect = ['iRMC S4', ''] + cmd1 = snmp.BMC_NAME_OID + cmd2 = snmp.IRMC_FW_VERSION_OID + actual_out = snmp.get_irmc_firmware_version(snmp_client) + self.assertEqual('iRMC S4', actual_out) + snmp_client.get.assert_has_calls([mock.call(cmd1), + mock.call(cmd2)]) + + def test_get_irmc_firmware_version_FW_only(self): + snmp_client = mock.Mock() + snmp_client.get.side_effect = ['', '7.82F'] + cmd1 = snmp.BMC_NAME_OID + cmd2 = snmp.IRMC_FW_VERSION_OID + actual_out = snmp.get_irmc_firmware_version(snmp_client) + self.assertEqual('7.82F', actual_out) + snmp_client.get.assert_has_calls([mock.call(cmd1), + mock.call(cmd2)]) + + def test_get_irmc_firmware_version_blank(self): + snmp_client = mock.Mock() + snmp_client.get.side_effect = ['', ''] + cmd1 = snmp.BMC_NAME_OID + cmd2 = snmp.IRMC_FW_VERSION_OID + actual_out = snmp.get_irmc_firmware_version(snmp_client) + self.assertEqual('', actual_out) + snmp_client.get.assert_has_calls([mock.call(cmd1), + mock.call(cmd2)]) + + def test_get_irmc_firmware_version_exception(self): + snmp_client = mock.Mock() + snmp_client.get.side_effect = snmp.SNMPFailure('Error') + cmd1 = snmp.BMC_NAME_OID + e = self.assertRaises(snmp.SNMPIRMCFirmwareFailure, + snmp.get_irmc_firmware_version, + snmp_client) + snmp_client.get.assert_has_calls([mock.call(cmd1)]) + self.assertEqual('SNMP operation \'GET IRMC FIRMWARE VERSION\'' + ' failed: Error', str(e)) + + def test_get_bios_firmware_version(self): + snmp_client = mock.Mock() + snmp_client.return_value = 'V4.6.5.4 R1.15.0 for D3099-B1x' + snmp_client.get.return_value = 'V4.6.5.4 R1.15.0 for D3099-B1x' + cmd = snmp.BIOS_FW_VERSION_OID + actual_out = snmp.get_bios_firmware_version(snmp_client) + self.assertEqual('V4.6.5.4 R1.15.0 for D3099-B1x', actual_out) + snmp_client.get.assert_called_once_with(cmd) + + def test_get_bios_firmware_version_exception(self): + snmp_client = mock.Mock() + snmp_client.get.side_effect = snmp.SNMPFailure('Error') + cmd = snmp.BIOS_FW_VERSION_OID + e = self.assertRaises(snmp.SNMPBIOSFirmwareFailure, + snmp.get_bios_firmware_version, + snmp_client) + snmp_client.get.assert_called_once_with(cmd) + self.assertEqual('SNMP operation \'GET BIOS FIRMWARE VERSION\'' + ' failed: Error', str(e)) + + def test_get_server_model(self): + snmp_client = mock.Mock() + snmp_client.return_value = 'TX2540M1F5' + snmp_client.get.return_value = 'TX2540M1F5' + cmd = snmp.SERVER_MODEL_OID + actual_out = snmp.get_server_model(snmp_client) + self.assertEqual('TX2540M1F5', actual_out) + snmp_client.get.assert_called_once_with(cmd) + + def test_get_server_model_exception(self): + snmp_client = mock.Mock() + snmp_client.get.side_effect = snmp.SNMPFailure('Error') + cmd = snmp.SERVER_MODEL_OID + e = self.assertRaises(snmp.SNMPServerModelFailure, + snmp.get_server_model, + snmp_client) + snmp_client.get.assert_called_once_with(cmd) + self.assertEqual('SNMP operation \'GET SERVER MODEL\'' + ' failed: Error', str(e)) + + +@mock.patch.object(cmdgen, 'CommandGenerator', autospec=True) +class SNMPClientTestCase(testtools.TestCase): + def setUp(self): + super(SNMPClientTestCase, self).setUp() + self.address = '1.2.3.4' + self.port = '6700' + self.oid = 'oid' + self.value = 'value' + + def test___init__(self, mock_cmdgen): + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1) + mock_cmdgen.assert_called_once_with() + self.assertEqual(self.address, client.address) + self.assertEqual(self.port, client.port) + self.assertEqual(snmp.SNMP_V1, client.version) + self.assertIsNone(client.community) + self.assertNotIn('security', client.__dict__) + self.assertEqual(mock_cmdgen.return_value, client.cmd_gen) + + def test_get(self, mock_cmdgen): + var_bind = (self.oid, self.value) + mock_cmdgenerator = mock_cmdgen.return_value + mock_cmdgenerator.getCmd.return_value = ("", None, 0, [var_bind]) + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + val = client.get(self.oid) + self.assertEqual(var_bind[1], val) + mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY, + self.oid) + + @mock.patch.object(cmdgen, 'CommunityData', autospec=True) + def test__get_auth_v1(self, mock_community, mock_cmdgen): + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1) + client._get_auth() + mock_cmdgen.assert_called_once_with() + mock_community.assert_called_once_with(client.community, mpModel=0) + + @mock.patch.object(cmdgen, 'CommunityData', autospec=True) + def test__get_auth_v2c(self, mock_community, mock_cmdgen): + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V2C) + client._get_auth() + mock_cmdgen.assert_called_once_with() + mock_community.assert_called_once_with(client.community, mpModel=1) + + @mock.patch.object(cmdgen, 'UsmUserData', autospec=True) + def test__get_auth_v3(self, mock_user, mock_cmdgen): + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + client._get_auth() + mock_cmdgen.assert_called_once_with() + mock_user.assert_called_once_with(client.security) + + @mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True) + def test__get_transport(self, mock_transport, mock_cmdgen): + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + client._get_transport() + mock_cmdgen.assert_called_once_with() + mock_transport.assert_called_once_with((client.address, client.port)) + + @mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True) + def test__get_transport_err(self, mock_transport, mock_cmdgen): + mock_transport.side_effect = snmp_error.PySnmpError() + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp_error.PySnmpError, client._get_transport) + mock_cmdgen.assert_called_once_with() + mock_transport.assert_called_once_with((client.address, client.port)) + + def test_get_pdu_err(self, mock_cmdgen): + var_bind = (self.oid, self.value) + error_status = mock.Mock() + error_status.prettyPrint = lambda: "pdu error" + mock_cmdgenerator = mock_cmdgen.return_value + mock_cmdgenerator.getCmd.return_value = (None, error_status, 1, + [var_bind]) + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp.SNMPFailure, client.get, self.oid) + mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY, + self.oid) + + def test_get_next(self, mock_cmdgen): + var_bind = (self.oid, self.value) + mock_cmdgenerator = mock_cmdgen.return_value + mock_cmdgenerator.nextCmd.return_value = ( + "", None, 0, [[var_bind, var_bind]]) + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + val = client.get_next(self.oid) + self.assertEqual([self.value, self.value], val) + mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY, + self.oid) + + @mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True) + def test_get_err_transport(self, mock_transport, mock_cmdgen): + mock_transport.side_effect = snmp_error.PySnmpError + mock_cmdgenerator = mock_cmdgen.return_value + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp.SNMPFailure, client.get, self.oid) + self.assertFalse(mock_cmdgenerator.getCmd.called) + + @mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True) + def test_get_next_err_transport(self, mock_transport, + mock_cmdgen): + mock_transport.side_effect = snmp_error.PySnmpError + mock_cmdgenerator = mock_cmdgen.return_value + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid) + self.assertFalse(mock_cmdgenerator.nextCmd.called) + + def test_get_err_engine(self, mock_cmdgen): + var_bind = (self.oid, self.value) + mock_cmdgenerator = mock_cmdgen.return_value + mock_cmdgenerator.getCmd.return_value = ("engine error", None, 0, + [var_bind]) + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp.SNMPFailure, client.get, self.oid) + mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY, + self.oid) + + def test_get_next_err_engine(self, mock_cmdgen): + var_bind = (self.oid, self.value) + mock_cmdgenerator = mock_cmdgen.return_value + mock_cmdgenerator.nextCmd.return_value = ("engine error", None, 0, + [[var_bind, var_bind]]) + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid) + mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY, + self.oid) + + def test_get_next_pdu_err(self, mock_cmdgen): + var_bind = (self.oid, self.value) + error_status = mock.Mock() + error_status.prettyPrint = lambda: "pdu error" + mock_cmdgenerator = mock_cmdgen.return_value + mock_cmdgenerator.nextCmd.return_value = (None, error_status, 1, + [var_bind]) + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid) + mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY, + self.oid) + + def test_set(self, mock_cmdgen): + var_bind = (self.oid, self.value) + mock_cmdgenerator = mock_cmdgen.return_value + mock_cmdgenerator.setCmd.return_value = ("", None, 0, [var_bind]) + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + client.set(self.oid, self.value) + mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY, + var_bind) + + @mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True) + def test_set_err_transport(self, mock_transport, mock_cmdgen): + mock_transport.side_effect = snmp_error.PySnmpError + mock_cmdgenerator = mock_cmdgen.return_value + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp.SNMPFailure, + client.set, self.oid, self.value) + self.assertFalse(mock_cmdgenerator.setCmd.called) + + def test_set_err_engine(self, mock_cmdgen): + var_bind = (self.oid, self.value) + mock_cmdgenerator = mock_cmdgen.return_value + mock_cmdgenerator.setCmd.return_value = ("engine error", None, 0, + [var_bind]) + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp.SNMPFailure, + client.set, self.oid, self.value) + mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY, + var_bind) + + def test_set_pdu_err(self, mock_cmdgen): + var_bind = (self.oid, self.value) + error_status = mock.Mock() + error_status.prettyPrint = lambda: "pdu error" + mock_cmdgenerator = mock_cmdgen.return_value + mock_cmdgenerator.setCmd.return_value = (None, error_status, 1, + [var_bind]) + client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) + self.assertRaises(snmp.SNMPFailure, + client.set, self.oid, self.value) + mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY, + var_bind)