Add v3 authentication functionality to snmp module
Currently the snmp module implemented by python-scciclient does not do any v3 authentication. This will cause failure when using SNMPv3 to communicate with servers. This commit fixes the above issue. Change-Id: I95537918702e37d98b57b07bca88ed638e879dd8
This commit is contained in:
		@@ -757,11 +757,17 @@ def get_capabilities_properties(d_info,
 | 
			
		||||
    :returns: a dictionary which contains keys and their values.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    snmp_client = snmp.SNMPClient(d_info['irmc_address'],
 | 
			
		||||
                                  d_info['irmc_snmp_port'],
 | 
			
		||||
                                  d_info['irmc_snmp_version'],
 | 
			
		||||
                                  d_info['irmc_snmp_community'],
 | 
			
		||||
                                  d_info['irmc_snmp_security'])
 | 
			
		||||
    snmp_client = snmp.SNMPClient(
 | 
			
		||||
        address=d_info['irmc_address'],
 | 
			
		||||
        port=d_info['irmc_snmp_port'],
 | 
			
		||||
        version=d_info['irmc_snmp_version'],
 | 
			
		||||
        read_community=d_info['irmc_snmp_community'],
 | 
			
		||||
        user=d_info.get('irmc_snmp_user'),
 | 
			
		||||
        auth_proto=d_info.get('irmc_snmp_auth_proto'),
 | 
			
		||||
        auth_key=d_info.get('irmc_snmp_auth_key'),
 | 
			
		||||
        priv_proto=d_info.get('irmc_snmp_priv_proto'),
 | 
			
		||||
        priv_key=d_info.get('irmc_snmp_priv_key'))
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        v = {}
 | 
			
		||||
        if 'rom_firmware_version' in capa_keys:
 | 
			
		||||
 
 | 
			
		||||
@@ -12,8 +12,8 @@
 | 
			
		||||
# License for the specific language governing permissions and limitations
 | 
			
		||||
# under the License.
 | 
			
		||||
 | 
			
		||||
from pysnmp.entity.rfc3413.oneliner import cmdgen
 | 
			
		||||
from pysnmp import error as snmp_error
 | 
			
		||||
from pysnmp import hlapi as snmp
 | 
			
		||||
import six
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -125,42 +125,76 @@ class SNMPClient(object):
 | 
			
		||||
    interaction with PySNMP to simplify dynamic importing and unit testing.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self, address, port, version, community=None, security=None):
 | 
			
		||||
    def __init__(self, address, port, version,
 | 
			
		||||
                 read_community=None, write_community=None,
 | 
			
		||||
                 user=None, auth_proto=None, auth_key=None,
 | 
			
		||||
                 priv_proto=None, priv_key=None,
 | 
			
		||||
                 context_engine_id=None, context_name=None):
 | 
			
		||||
        self.address = address
 | 
			
		||||
        self.port = port
 | 
			
		||||
        self.version = version
 | 
			
		||||
        if self.version == SNMP_V3:
 | 
			
		||||
            self.security = security
 | 
			
		||||
            self.user = user
 | 
			
		||||
            self.auth_proto = auth_proto
 | 
			
		||||
            self.auth_key = auth_key
 | 
			
		||||
            self.priv_proto = priv_proto
 | 
			
		||||
            self.priv_key = priv_key
 | 
			
		||||
        else:
 | 
			
		||||
            self.community = community
 | 
			
		||||
        self.cmd_gen = cmdgen.CommandGenerator()
 | 
			
		||||
            self.read_community = read_community
 | 
			
		||||
            self.write_community = write_community
 | 
			
		||||
 | 
			
		||||
    def _get_auth(self):
 | 
			
		||||
        self.context_engine_id = context_engine_id
 | 
			
		||||
        self.context_name = context_name or ''
 | 
			
		||||
        self.snmp_engine = snmp.SnmpEngine()
 | 
			
		||||
 | 
			
		||||
    def _get_auth(self, write_mode=False):
 | 
			
		||||
        """Return the authorization data for an SNMP request.
 | 
			
		||||
 | 
			
		||||
        :returns: A
 | 
			
		||||
            :class:`pysnmp.entity.rfc3413.oneliner.cmdgen.CommunityData`
 | 
			
		||||
            object.
 | 
			
		||||
        :param write_mode: `True` if write class SNMP command is
 | 
			
		||||
            executed. Default is `False`.
 | 
			
		||||
        :returns: Either
 | 
			
		||||
            :class:`pysnmp.hlapi.CommunityData`
 | 
			
		||||
            or :class:`pysnmp.hlapi.UsmUserData`
 | 
			
		||||
            object depending on SNMP version being used.
 | 
			
		||||
        """
 | 
			
		||||
        if self.version == SNMP_V3:
 | 
			
		||||
            # Handling auth/encryption credentials is not (yet) supported.
 | 
			
		||||
            # This version supports a security name analogous to community.
 | 
			
		||||
            return cmdgen.UsmUserData(self.security)
 | 
			
		||||
            return snmp.UsmUserData(self.user,
 | 
			
		||||
                                    authKey=self.auth_key,
 | 
			
		||||
                                    authProtocol=self.auth_proto,
 | 
			
		||||
                                    privKey=self.priv_key,
 | 
			
		||||
                                    privProtocol=self.priv_proto)
 | 
			
		||||
        else:
 | 
			
		||||
            mp_model = 1 if self.version == SNMP_V2C else 0
 | 
			
		||||
            return cmdgen.CommunityData(self.community, mpModel=mp_model)
 | 
			
		||||
            return snmp.CommunityData(
 | 
			
		||||
                self.write_community if write_mode else self.read_community,
 | 
			
		||||
                mpModel=mp_model
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def _get_transport(self):
 | 
			
		||||
        """Return the transport target for an SNMP request.
 | 
			
		||||
 | 
			
		||||
        :returns: A :class:
 | 
			
		||||
            `pysnmp.entity.rfc3413.oneliner.cmdgen.UdpTransportTarget` object.
 | 
			
		||||
        :raises: snmp_error.PySnmpError if the transport address is bad.
 | 
			
		||||
            `pysnmp.hlapi.UdpTransportTarget` object.
 | 
			
		||||
        :raises: :class:`pysnmp.error.PySnmpError` if the transport address
 | 
			
		||||
            is bad.
 | 
			
		||||
        """
 | 
			
		||||
        # The transport target accepts timeout and retries parameters, which
 | 
			
		||||
        # default to 1 (second) and 5 respectively. These are deemed sensible
 | 
			
		||||
        # enough to allow for an unreliable network or slow device.
 | 
			
		||||
        return cmdgen.UdpTransportTarget((self.address, self.port))
 | 
			
		||||
        return snmp.UdpTransportTarget((self.address, self.port))
 | 
			
		||||
 | 
			
		||||
    def _get_context(self):
 | 
			
		||||
        """Return the SNMP context for an SNMP request.
 | 
			
		||||
 | 
			
		||||
        :returns: A :class:
 | 
			
		||||
            `pysnmp.hlapi.ContextData` object.
 | 
			
		||||
        :raises: :class:`pysnmp.error.PySnmpError` if SNMP context data
 | 
			
		||||
            is bad.
 | 
			
		||||
        """
 | 
			
		||||
        return snmp.ContextData(contextEngineId=self.context_engine_id,
 | 
			
		||||
                                contextName=self.context_name)
 | 
			
		||||
 | 
			
		||||
    def get(self, oid):
 | 
			
		||||
        """Use PySNMP to perform an SNMP GET operation on a single object.
 | 
			
		||||
@@ -170,13 +204,15 @@ class SNMPClient(object):
 | 
			
		||||
        :returns: The value of the requested object.
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            results = self.cmd_gen.getCmd(self._get_auth(),
 | 
			
		||||
            snmp_gen = snmp.getCmd(self.snmp_engine,
 | 
			
		||||
                                   self._get_auth(),
 | 
			
		||||
                                   self._get_transport(),
 | 
			
		||||
                                          oid)
 | 
			
		||||
                                   self._get_context(),
 | 
			
		||||
                                   snmp.ObjectType(snmp.ObjectIdentity(oid)))
 | 
			
		||||
        except snmp_error.PySnmpError as e:
 | 
			
		||||
            raise SNMPFailure(SNMP_FAILURE_MSG % ("GET", e))
 | 
			
		||||
 | 
			
		||||
        error_indication, error_status, error_index, var_binds = results
 | 
			
		||||
        error_indication, error_status, error_index, var_binds = next(snmp_gen)
 | 
			
		||||
 | 
			
		||||
        if error_indication:
 | 
			
		||||
            # SNMP engine-level error.
 | 
			
		||||
@@ -203,29 +239,38 @@ class SNMPClient(object):
 | 
			
		||||
        :returns: A list of values of the requested table object.
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            results = self.cmd_gen.nextCmd(self._get_auth(),
 | 
			
		||||
            snmp_gen = snmp.nextCmd(self.snmp_engine,
 | 
			
		||||
                                    self._get_auth(),
 | 
			
		||||
                                    self._get_transport(),
 | 
			
		||||
                                           oid)
 | 
			
		||||
                                    self._get_context(),
 | 
			
		||||
                                    snmp.ObjectType(snmp.ObjectIdentity(oid)),
 | 
			
		||||
                                    lexicographicMode=False)
 | 
			
		||||
        except snmp_error.PySnmpError as e:
 | 
			
		||||
            raise SNMPFailure(SNMP_FAILURE_MSG % ("GET_NEXT", e))
 | 
			
		||||
 | 
			
		||||
        error_indication, error_status, error_index, var_binds = results
 | 
			
		||||
 | 
			
		||||
        vals = []
 | 
			
		||||
        for (error_indication, error_status, error_index,
 | 
			
		||||
                var_binds) in snmp_gen:
 | 
			
		||||
            if error_indication:
 | 
			
		||||
                # SNMP engine-level error.
 | 
			
		||||
            raise SNMPFailure(
 | 
			
		||||
                SNMP_FAILURE_MSG % ("GET_NEXT", error_indication))
 | 
			
		||||
                raise SNMPFailure(SNMP_FAILURE_MSG % ("GET_NEXT",
 | 
			
		||||
                                                      error_indication))
 | 
			
		||||
 | 
			
		||||
            if error_status:
 | 
			
		||||
                # SNMP PDU error.
 | 
			
		||||
                raise SNMPFailure(
 | 
			
		||||
                    "SNMP operation '%(operation)s' failed: %(error)s at"
 | 
			
		||||
                    " %(index)s" %
 | 
			
		||||
                {'operation': "GET_NEXT", 'error': error_status.prettyPrint(),
 | 
			
		||||
                    {'operation': "GET_NEXT",
 | 
			
		||||
                     'error': error_status.prettyPrint(),
 | 
			
		||||
                     'index':
 | 
			
		||||
                     error_index and var_binds[int(error_index) - 1] or '?'})
 | 
			
		||||
                        error_index and var_binds[int(error_index) - 1]
 | 
			
		||||
                        or '?'})
 | 
			
		||||
 | 
			
		||||
        return [val for row in var_binds for name, val in row]
 | 
			
		||||
            name, value = var_binds[0]
 | 
			
		||||
            vals.append(value)
 | 
			
		||||
 | 
			
		||||
        return vals
 | 
			
		||||
 | 
			
		||||
    def set(self, oid, value):
 | 
			
		||||
        """Use PySNMP to perform an SNMP SET operation on a single object.
 | 
			
		||||
@@ -235,13 +280,16 @@ class SNMPClient(object):
 | 
			
		||||
        :raises: SNMPFailure if an SNMP request fails.
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            results = self.cmd_gen.setCmd(self._get_auth(),
 | 
			
		||||
            snmp_gen = snmp.setCmd(self.snmp_engine,
 | 
			
		||||
                                   self._get_auth(write_mode=True),
 | 
			
		||||
                                   self._get_transport(),
 | 
			
		||||
                                          (oid, value))
 | 
			
		||||
                                   self._get_context(),
 | 
			
		||||
                                   snmp.ObjectType(snmp.ObjectIdentity(oid),
 | 
			
		||||
                                                   value))
 | 
			
		||||
        except snmp_error.PySnmpError as e:
 | 
			
		||||
            raise SNMPFailure(SNMP_FAILURE_MSG % ("SET", e))
 | 
			
		||||
 | 
			
		||||
        error_indication, error_status, error_index, var_binds = results
 | 
			
		||||
        error_indication, error_status, error_index, var_binds = next(snmp_gen)
 | 
			
		||||
 | 
			
		||||
        if error_indication:
 | 
			
		||||
            # SNMP engine-level error.
 | 
			
		||||
 
 | 
			
		||||
@@ -17,8 +17,8 @@ Test class for snmp module.
 | 
			
		||||
 | 
			
		||||
from unittest import mock
 | 
			
		||||
 | 
			
		||||
from pysnmp.entity.rfc3413.oneliner import cmdgen
 | 
			
		||||
from pysnmp import error as snmp_error
 | 
			
		||||
from pysnmp import hlapi as pysnmp
 | 
			
		||||
import testtools
 | 
			
		||||
 | 
			
		||||
from scciclient.irmc import snmp
 | 
			
		||||
@@ -125,7 +125,6 @@ class IRMCSnmpTestCase(testtools.TestCase):
 | 
			
		||||
                         ' failed: Error', str(e))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@mock.patch.object(cmdgen, 'CommandGenerator', autospec=True)
 | 
			
		||||
class SNMPClientTestCase(testtools.TestCase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(SNMPClientTestCase, self).setUp()
 | 
			
		||||
@@ -134,172 +133,188 @@ class SNMPClientTestCase(testtools.TestCase):
 | 
			
		||||
        self.oid = 'oid'
 | 
			
		||||
        self.value = 'value'
 | 
			
		||||
 | 
			
		||||
    def test___init__(self, mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'SnmpEngine', authspec=True)
 | 
			
		||||
    def test___init__(self, mock_snmpengine):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
 | 
			
		||||
        mock_cmdgen.assert_called_once_with()
 | 
			
		||||
        mock_snmpengine.assert_called_once_with()
 | 
			
		||||
        self.assertEqual(self.address, client.address)
 | 
			
		||||
        self.assertEqual(self.port, client.port)
 | 
			
		||||
        self.assertEqual(snmp.SNMP_V1, client.version)
 | 
			
		||||
        self.assertIsNone(client.community)
 | 
			
		||||
        self.assertNotIn('security', client.__dict__)
 | 
			
		||||
        self.assertEqual(mock_cmdgen.return_value, client.cmd_gen)
 | 
			
		||||
        self.assertIsNone(client.read_community)
 | 
			
		||||
        self.assertIsNone(client.write_community)
 | 
			
		||||
        self.assertNotIn('user', client.__dict__)
 | 
			
		||||
        self.assertEqual(mock_snmpengine.return_value, client.snmp_engine)
 | 
			
		||||
 | 
			
		||||
    def test_get(self, mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'CommunityData', autospec=True)
 | 
			
		||||
    def test__get_auth_v1_read(self, mock_community):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1,
 | 
			
		||||
                                 read_community='public',
 | 
			
		||||
                                 write_community='private')
 | 
			
		||||
        client._get_auth()
 | 
			
		||||
        mock_community.assert_called_once_with(client.read_community,
 | 
			
		||||
                                               mpModel=0)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(pysnmp, 'CommunityData', autospec=True)
 | 
			
		||||
    def test__get_auth_v1_write(self, mock_community):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1,
 | 
			
		||||
                                 read_community='public',
 | 
			
		||||
                                 write_community='private')
 | 
			
		||||
        client._get_auth(write_mode=True)
 | 
			
		||||
        mock_community.assert_called_once_with(client.write_community,
 | 
			
		||||
                                               mpModel=0)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(pysnmp, 'CommunityData', autospec=True)
 | 
			
		||||
    def test__get_auth_v2c(self, mock_community):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V2C)
 | 
			
		||||
        client._get_auth()
 | 
			
		||||
        mock_community.assert_called_once_with(client.read_community,
 | 
			
		||||
                                               mpModel=1)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(pysnmp, 'UsmUserData', autospec=True)
 | 
			
		||||
    def test__get_auth_v3(self, mock_user):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        client._get_auth()
 | 
			
		||||
        mock_user.assert_called_once_with(client.user,
 | 
			
		||||
                                          authKey=client.auth_key,
 | 
			
		||||
                                          authProtocol=client.auth_proto,
 | 
			
		||||
                                          privKey=client.priv_key,
 | 
			
		||||
                                          privProtocol=client.priv_proto)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(pysnmp, 'UdpTransportTarget', autospec=True)
 | 
			
		||||
    def test__get_transport(self, mock_transport):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        client._get_transport()
 | 
			
		||||
        mock_transport.assert_called_once_with((client.address, client.port))
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(pysnmp, 'UdpTransportTarget', autospec=True)
 | 
			
		||||
    def test__get_transport_err(self, mock_transport):
 | 
			
		||||
        mock_transport.side_effect = snmp_error.PySnmpError
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp_error.PySnmpError, client._get_transport)
 | 
			
		||||
        mock_transport.assert_called_once_with((client.address, client.port))
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(pysnmp, 'ContextData', authspec=True)
 | 
			
		||||
    def test__get_context(self, mock_context):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
 | 
			
		||||
        client._get_context()
 | 
			
		||||
        mock_context.assert_called_once_with(contextEngineId=None,
 | 
			
		||||
                                             contextName='')
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(pysnmp, 'getCmd', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
 | 
			
		||||
    def test_get(self, mock_auth, mock_context, mock_transport,
 | 
			
		||||
                 mock_getcmd):
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        mock_cmdgenerator.getCmd.return_value = ("", None, 0, [var_bind])
 | 
			
		||||
        mock_getcmd.return_value = iter([("", None, 0, [var_bind])])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        val = client.get(self.oid)
 | 
			
		||||
        self.assertEqual(var_bind[1], val)
 | 
			
		||||
        mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
 | 
			
		||||
                                                         self.oid)
 | 
			
		||||
        self.assertEqual(1, mock_getcmd.call_count)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(cmdgen, 'CommunityData', autospec=True)
 | 
			
		||||
    def test__get_auth_v1(self, mock_community, mock_cmdgen):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
 | 
			
		||||
        client._get_auth()
 | 
			
		||||
        mock_cmdgen.assert_called_once_with()
 | 
			
		||||
        mock_community.assert_called_once_with(client.community, mpModel=0)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(cmdgen, 'CommunityData', autospec=True)
 | 
			
		||||
    def test__get_auth_v2c(self, mock_community, mock_cmdgen):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V2C)
 | 
			
		||||
        client._get_auth()
 | 
			
		||||
        mock_cmdgen.assert_called_once_with()
 | 
			
		||||
        mock_community.assert_called_once_with(client.community, mpModel=1)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(cmdgen, 'UsmUserData', autospec=True)
 | 
			
		||||
    def test__get_auth_v3(self, mock_user, mock_cmdgen):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        client._get_auth()
 | 
			
		||||
        mock_cmdgen.assert_called_once_with()
 | 
			
		||||
        mock_user.assert_called_once_with(client.security)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True)
 | 
			
		||||
    def test__get_transport(self, mock_transport, mock_cmdgen):
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        client._get_transport()
 | 
			
		||||
        mock_cmdgen.assert_called_once_with()
 | 
			
		||||
        mock_transport.assert_called_once_with((client.address, client.port))
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True)
 | 
			
		||||
    def test__get_transport_err(self, mock_transport, mock_cmdgen):
 | 
			
		||||
        mock_transport.side_effect = snmp_error.PySnmpError()
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp_error.PySnmpError, client._get_transport)
 | 
			
		||||
        mock_cmdgen.assert_called_once_with()
 | 
			
		||||
        mock_transport.assert_called_once_with((client.address, client.port))
 | 
			
		||||
 | 
			
		||||
    def test_get_pdu_err(self, mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'nextCmd', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
 | 
			
		||||
    def test_get_next(self, mock_auth, mock_context, mock_transport,
 | 
			
		||||
                      mock_nextcmd):
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        error_status = mock.Mock()
 | 
			
		||||
        error_status.prettyPrint = lambda: "pdu error"
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        mock_cmdgenerator.getCmd.return_value = (None, error_status, 1,
 | 
			
		||||
                                                 [var_bind])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
 | 
			
		||||
        mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
 | 
			
		||||
                                                         self.oid)
 | 
			
		||||
 | 
			
		||||
    def test_get_next(self, mock_cmdgen):
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        mock_cmdgenerator.nextCmd.return_value = (
 | 
			
		||||
            "", None, 0, [[var_bind, var_bind]])
 | 
			
		||||
        mock_nextcmd.return_value = iter([("", None, 0, [var_bind]),
 | 
			
		||||
                                          ("", None, 0, [var_bind])])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        val = client.get_next(self.oid)
 | 
			
		||||
        self.assertEqual([self.value, self.value], val)
 | 
			
		||||
        mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
 | 
			
		||||
                                                          self.oid)
 | 
			
		||||
        self.assertEqual(1, mock_nextcmd.call_count)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True)
 | 
			
		||||
    def test_get_err_transport(self, mock_transport, mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'getCmd', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
 | 
			
		||||
    def test_get_err_transport(self, mock_auth, mock_context, mock_transport,
 | 
			
		||||
                               mock_getcmd):
 | 
			
		||||
        mock_transport.side_effect = snmp_error.PySnmpError
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        mock_getcmd.return_value = iter([("engine error", None, 0,
 | 
			
		||||
                                         [var_bind])])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
 | 
			
		||||
        self.assertFalse(mock_cmdgenerator.getCmd.called)
 | 
			
		||||
        self.assertFalse(mock_getcmd.called)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True)
 | 
			
		||||
    def test_get_next_err_transport(self, mock_transport,
 | 
			
		||||
                                    mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'nextCmd', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
 | 
			
		||||
    def test_get_next_err_transport(self, mock_auth, mock_context,
 | 
			
		||||
                                    mock_transport, mock_nextcmd):
 | 
			
		||||
        mock_transport.side_effect = snmp_error.PySnmpError
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        mock_nextcmd.return_value = iter([("engine error", None, 0,
 | 
			
		||||
                                         [var_bind])])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
 | 
			
		||||
        self.assertFalse(mock_cmdgenerator.nextCmd.called)
 | 
			
		||||
        self.assertFalse(mock_nextcmd.called)
 | 
			
		||||
 | 
			
		||||
    def test_get_err_engine(self, mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'getCmd', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
 | 
			
		||||
    def test_get_err_engine(self, mock_auth, mock_context, mock_transport,
 | 
			
		||||
                            mock_getcmd):
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        mock_cmdgenerator.getCmd.return_value = ("engine error", None, 0,
 | 
			
		||||
                                                 [var_bind])
 | 
			
		||||
        mock_getcmd.return_value = iter([("engine error", None, 0,
 | 
			
		||||
                                         [var_bind])])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
 | 
			
		||||
        mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
 | 
			
		||||
                                                         self.oid)
 | 
			
		||||
        self.assertEqual(1, mock_getcmd.call_count)
 | 
			
		||||
 | 
			
		||||
    def test_get_next_err_engine(self, mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'nextCmd', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
 | 
			
		||||
    def test_get_next_err_engine(self, mock_auth, mock_context, mock_transport,
 | 
			
		||||
                                 mock_nextcmd):
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        mock_cmdgenerator.nextCmd.return_value = ("engine error", None, 0,
 | 
			
		||||
                                                  [[var_bind, var_bind]])
 | 
			
		||||
        mock_nextcmd.return_value = iter([("engine error", None, 0,
 | 
			
		||||
                                         [var_bind])])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
 | 
			
		||||
        mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
 | 
			
		||||
                                                          self.oid)
 | 
			
		||||
        self.assertEqual(1, mock_nextcmd.call_count)
 | 
			
		||||
 | 
			
		||||
    def test_get_next_pdu_err(self, mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'setCmd', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
 | 
			
		||||
    def test_set(self, mock_auth, mock_context, mock_transport,
 | 
			
		||||
                 mock_setcmd):
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        error_status = mock.Mock()
 | 
			
		||||
        error_status.prettyPrint = lambda: "pdu error"
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        mock_cmdgenerator.nextCmd.return_value = (None, error_status, 1,
 | 
			
		||||
                                                  [var_bind])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
 | 
			
		||||
        mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
 | 
			
		||||
                                                          self.oid)
 | 
			
		||||
 | 
			
		||||
    def test_set(self, mock_cmdgen):
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        mock_cmdgenerator.setCmd.return_value = ("", None, 0, [var_bind])
 | 
			
		||||
        mock_setcmd.return_value = iter([("", None, 0, [var_bind])])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        client.set(self.oid, self.value)
 | 
			
		||||
        mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
 | 
			
		||||
                                                         var_bind)
 | 
			
		||||
        self.assertEqual(1, mock_setcmd.call_count)
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True)
 | 
			
		||||
    def test_set_err_transport(self, mock_transport, mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'setCmd', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
 | 
			
		||||
    def test_set_err_transport(self, mock_auth, mock_context, mock_transport,
 | 
			
		||||
                               mock_setcmd):
 | 
			
		||||
        mock_transport.side_effect = snmp_error.PySnmpError
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp.SNMPFailure,
 | 
			
		||||
                          client.set, self.oid, self.value)
 | 
			
		||||
        self.assertFalse(mock_cmdgenerator.setCmd.called)
 | 
			
		||||
        self.assertFalse(mock_setcmd.called)
 | 
			
		||||
 | 
			
		||||
    def test_set_err_engine(self, mock_cmdgen):
 | 
			
		||||
    @mock.patch.object(pysnmp, 'setCmd', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
 | 
			
		||||
    @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
 | 
			
		||||
    def test_set_err_engine(self, mock_auth, mock_context, mock_transport,
 | 
			
		||||
                            mock_setcmd):
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        mock_cmdgenerator.setCmd.return_value = ("engine error", None, 0,
 | 
			
		||||
                                                 [var_bind])
 | 
			
		||||
        mock_setcmd.return_value = iter([("engine error", None, 0,
 | 
			
		||||
                                         [var_bind])])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp.SNMPFailure,
 | 
			
		||||
                          client.set, self.oid, self.value)
 | 
			
		||||
        mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
 | 
			
		||||
                                                         var_bind)
 | 
			
		||||
 | 
			
		||||
    def test_set_pdu_err(self, mock_cmdgen):
 | 
			
		||||
        var_bind = (self.oid, self.value)
 | 
			
		||||
        error_status = mock.Mock()
 | 
			
		||||
        error_status.prettyPrint = lambda: "pdu error"
 | 
			
		||||
        mock_cmdgenerator = mock_cmdgen.return_value
 | 
			
		||||
        mock_cmdgenerator.setCmd.return_value = (None, error_status, 1,
 | 
			
		||||
                                                 [var_bind])
 | 
			
		||||
        client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
 | 
			
		||||
        self.assertRaises(snmp.SNMPFailure,
 | 
			
		||||
                          client.set, self.oid, self.value)
 | 
			
		||||
        mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
 | 
			
		||||
                                                         var_bind)
 | 
			
		||||
        self.assertEqual(1, mock_setcmd.call_count)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user