Add v3 authentication functionality to snmp module

Currently the snmp module implemented by python-scciclient does not do
any v3 authentication. This will cause failure when using SNMPv3 to
communicate with servers.

This commit fixes the above issue.

Change-Id: I95537918702e37d98b57b07bca88ed638e879dd8
This commit is contained in:
Shukun Song 2022-06-10 21:18:00 +09:00 committed by Vanou Ishii
parent 274dca0344
commit 59d426f1aa
3 changed files with 240 additions and 171 deletions

View File

@ -757,11 +757,17 @@ def get_capabilities_properties(d_info,
:returns: a dictionary which contains keys and their values. :returns: a dictionary which contains keys and their values.
""" """
snmp_client = snmp.SNMPClient(d_info['irmc_address'], snmp_client = snmp.SNMPClient(
d_info['irmc_snmp_port'], address=d_info['irmc_address'],
d_info['irmc_snmp_version'], port=d_info['irmc_snmp_port'],
d_info['irmc_snmp_community'], version=d_info['irmc_snmp_version'],
d_info['irmc_snmp_security']) read_community=d_info['irmc_snmp_community'],
user=d_info.get('irmc_snmp_user'),
auth_proto=d_info.get('irmc_snmp_auth_proto'),
auth_key=d_info.get('irmc_snmp_auth_key'),
priv_proto=d_info.get('irmc_snmp_priv_proto'),
priv_key=d_info.get('irmc_snmp_priv_key'))
try: try:
v = {} v = {}
if 'rom_firmware_version' in capa_keys: if 'rom_firmware_version' in capa_keys:

View File

@ -12,8 +12,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp import error as snmp_error from pysnmp import error as snmp_error
from pysnmp import hlapi as snmp
import six import six
@ -125,42 +125,76 @@ class SNMPClient(object):
interaction with PySNMP to simplify dynamic importing and unit testing. interaction with PySNMP to simplify dynamic importing and unit testing.
""" """
def __init__(self, address, port, version, community=None, security=None): def __init__(self, address, port, version,
read_community=None, write_community=None,
user=None, auth_proto=None, auth_key=None,
priv_proto=None, priv_key=None,
context_engine_id=None, context_name=None):
self.address = address self.address = address
self.port = port self.port = port
self.version = version self.version = version
if self.version == SNMP_V3: if self.version == SNMP_V3:
self.security = security self.user = user
self.auth_proto = auth_proto
self.auth_key = auth_key
self.priv_proto = priv_proto
self.priv_key = priv_key
else: else:
self.community = community self.read_community = read_community
self.cmd_gen = cmdgen.CommandGenerator() self.write_community = write_community
def _get_auth(self): self.context_engine_id = context_engine_id
self.context_name = context_name or ''
self.snmp_engine = snmp.SnmpEngine()
def _get_auth(self, write_mode=False):
"""Return the authorization data for an SNMP request. """Return the authorization data for an SNMP request.
:returns: A :param write_mode: `True` if write class SNMP command is
:class:`pysnmp.entity.rfc3413.oneliner.cmdgen.CommunityData` executed. Default is `False`.
object. :returns: Either
:class:`pysnmp.hlapi.CommunityData`
or :class:`pysnmp.hlapi.UsmUserData`
object depending on SNMP version being used.
""" """
if self.version == SNMP_V3: if self.version == SNMP_V3:
# Handling auth/encryption credentials is not (yet) supported. # Handling auth/encryption credentials is not (yet) supported.
# This version supports a security name analogous to community. # This version supports a security name analogous to community.
return cmdgen.UsmUserData(self.security) return snmp.UsmUserData(self.user,
authKey=self.auth_key,
authProtocol=self.auth_proto,
privKey=self.priv_key,
privProtocol=self.priv_proto)
else: else:
mp_model = 1 if self.version == SNMP_V2C else 0 mp_model = 1 if self.version == SNMP_V2C else 0
return cmdgen.CommunityData(self.community, mpModel=mp_model) return snmp.CommunityData(
self.write_community if write_mode else self.read_community,
mpModel=mp_model
)
def _get_transport(self): def _get_transport(self):
"""Return the transport target for an SNMP request. """Return the transport target for an SNMP request.
:returns: A :class: :returns: A :class:
`pysnmp.entity.rfc3413.oneliner.cmdgen.UdpTransportTarget` object. `pysnmp.hlapi.UdpTransportTarget` object.
:raises: snmp_error.PySnmpError if the transport address is bad. :raises: :class:`pysnmp.error.PySnmpError` if the transport address
is bad.
""" """
# The transport target accepts timeout and retries parameters, which # The transport target accepts timeout and retries parameters, which
# default to 1 (second) and 5 respectively. These are deemed sensible # default to 1 (second) and 5 respectively. These are deemed sensible
# enough to allow for an unreliable network or slow device. # enough to allow for an unreliable network or slow device.
return cmdgen.UdpTransportTarget((self.address, self.port)) return snmp.UdpTransportTarget((self.address, self.port))
def _get_context(self):
"""Return the SNMP context for an SNMP request.
:returns: A :class:
`pysnmp.hlapi.ContextData` object.
:raises: :class:`pysnmp.error.PySnmpError` if SNMP context data
is bad.
"""
return snmp.ContextData(contextEngineId=self.context_engine_id,
contextName=self.context_name)
def get(self, oid): def get(self, oid):
"""Use PySNMP to perform an SNMP GET operation on a single object. """Use PySNMP to perform an SNMP GET operation on a single object.
@ -170,13 +204,15 @@ class SNMPClient(object):
:returns: The value of the requested object. :returns: The value of the requested object.
""" """
try: try:
results = self.cmd_gen.getCmd(self._get_auth(), snmp_gen = snmp.getCmd(self.snmp_engine,
self._get_auth(),
self._get_transport(), self._get_transport(),
oid) self._get_context(),
snmp.ObjectType(snmp.ObjectIdentity(oid)))
except snmp_error.PySnmpError as e: except snmp_error.PySnmpError as e:
raise SNMPFailure(SNMP_FAILURE_MSG % ("GET", e)) raise SNMPFailure(SNMP_FAILURE_MSG % ("GET", e))
error_indication, error_status, error_index, var_binds = results error_indication, error_status, error_index, var_binds = next(snmp_gen)
if error_indication: if error_indication:
# SNMP engine-level error. # SNMP engine-level error.
@ -203,29 +239,38 @@ class SNMPClient(object):
:returns: A list of values of the requested table object. :returns: A list of values of the requested table object.
""" """
try: try:
results = self.cmd_gen.nextCmd(self._get_auth(), snmp_gen = snmp.nextCmd(self.snmp_engine,
self._get_auth(),
self._get_transport(), self._get_transport(),
oid) self._get_context(),
snmp.ObjectType(snmp.ObjectIdentity(oid)),
lexicographicMode=False)
except snmp_error.PySnmpError as e: except snmp_error.PySnmpError as e:
raise SNMPFailure(SNMP_FAILURE_MSG % ("GET_NEXT", e)) raise SNMPFailure(SNMP_FAILURE_MSG % ("GET_NEXT", e))
error_indication, error_status, error_index, var_binds = results vals = []
for (error_indication, error_status, error_index,
var_binds) in snmp_gen:
if error_indication: if error_indication:
# SNMP engine-level error. # SNMP engine-level error.
raise SNMPFailure( raise SNMPFailure(SNMP_FAILURE_MSG % ("GET_NEXT",
SNMP_FAILURE_MSG % ("GET_NEXT", error_indication)) error_indication))
if error_status: if error_status:
# SNMP PDU error. # SNMP PDU error.
raise SNMPFailure( raise SNMPFailure(
"SNMP operation '%(operation)s' failed: %(error)s at" "SNMP operation '%(operation)s' failed: %(error)s at"
" %(index)s" % " %(index)s" %
{'operation': "GET_NEXT", 'error': error_status.prettyPrint(), {'operation': "GET_NEXT",
'error': error_status.prettyPrint(),
'index': 'index':
error_index and var_binds[int(error_index) - 1] or '?'}) error_index and var_binds[int(error_index) - 1]
or '?'})
return [val for row in var_binds for name, val in row] name, value = var_binds[0]
vals.append(value)
return vals
def set(self, oid, value): def set(self, oid, value):
"""Use PySNMP to perform an SNMP SET operation on a single object. """Use PySNMP to perform an SNMP SET operation on a single object.
@ -235,13 +280,16 @@ class SNMPClient(object):
:raises: SNMPFailure if an SNMP request fails. :raises: SNMPFailure if an SNMP request fails.
""" """
try: try:
results = self.cmd_gen.setCmd(self._get_auth(), snmp_gen = snmp.setCmd(self.snmp_engine,
self._get_auth(write_mode=True),
self._get_transport(), self._get_transport(),
(oid, value)) self._get_context(),
snmp.ObjectType(snmp.ObjectIdentity(oid),
value))
except snmp_error.PySnmpError as e: except snmp_error.PySnmpError as e:
raise SNMPFailure(SNMP_FAILURE_MSG % ("SET", e)) raise SNMPFailure(SNMP_FAILURE_MSG % ("SET", e))
error_indication, error_status, error_index, var_binds = results error_indication, error_status, error_index, var_binds = next(snmp_gen)
if error_indication: if error_indication:
# SNMP engine-level error. # SNMP engine-level error.

View File

@ -17,8 +17,8 @@ Test class for snmp module.
from unittest import mock from unittest import mock
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp import error as snmp_error from pysnmp import error as snmp_error
from pysnmp import hlapi as pysnmp
import testtools import testtools
from scciclient.irmc import snmp from scciclient.irmc import snmp
@ -125,7 +125,6 @@ class IRMCSnmpTestCase(testtools.TestCase):
' failed: Error', str(e)) ' failed: Error', str(e))
@mock.patch.object(cmdgen, 'CommandGenerator', autospec=True)
class SNMPClientTestCase(testtools.TestCase): class SNMPClientTestCase(testtools.TestCase):
def setUp(self): def setUp(self):
super(SNMPClientTestCase, self).setUp() super(SNMPClientTestCase, self).setUp()
@ -134,172 +133,188 @@ class SNMPClientTestCase(testtools.TestCase):
self.oid = 'oid' self.oid = 'oid'
self.value = 'value' self.value = 'value'
def test___init__(self, mock_cmdgen): @mock.patch.object(pysnmp, 'SnmpEngine', authspec=True)
def test___init__(self, mock_snmpengine):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
mock_cmdgen.assert_called_once_with() mock_snmpengine.assert_called_once_with()
self.assertEqual(self.address, client.address) self.assertEqual(self.address, client.address)
self.assertEqual(self.port, client.port) self.assertEqual(self.port, client.port)
self.assertEqual(snmp.SNMP_V1, client.version) self.assertEqual(snmp.SNMP_V1, client.version)
self.assertIsNone(client.community) self.assertIsNone(client.read_community)
self.assertNotIn('security', client.__dict__) self.assertIsNone(client.write_community)
self.assertEqual(mock_cmdgen.return_value, client.cmd_gen) self.assertNotIn('user', client.__dict__)
self.assertEqual(mock_snmpengine.return_value, client.snmp_engine)
def test_get(self, mock_cmdgen): @mock.patch.object(pysnmp, 'CommunityData', autospec=True)
def test__get_auth_v1_read(self, mock_community):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1,
read_community='public',
write_community='private')
client._get_auth()
mock_community.assert_called_once_with(client.read_community,
mpModel=0)
@mock.patch.object(pysnmp, 'CommunityData', autospec=True)
def test__get_auth_v1_write(self, mock_community):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1,
read_community='public',
write_community='private')
client._get_auth(write_mode=True)
mock_community.assert_called_once_with(client.write_community,
mpModel=0)
@mock.patch.object(pysnmp, 'CommunityData', autospec=True)
def test__get_auth_v2c(self, mock_community):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V2C)
client._get_auth()
mock_community.assert_called_once_with(client.read_community,
mpModel=1)
@mock.patch.object(pysnmp, 'UsmUserData', autospec=True)
def test__get_auth_v3(self, mock_user):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_auth()
mock_user.assert_called_once_with(client.user,
authKey=client.auth_key,
authProtocol=client.auth_proto,
privKey=client.priv_key,
privProtocol=client.priv_proto)
@mock.patch.object(pysnmp, 'UdpTransportTarget', autospec=True)
def test__get_transport(self, mock_transport):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_transport()
mock_transport.assert_called_once_with((client.address, client.port))
@mock.patch.object(pysnmp, 'UdpTransportTarget', autospec=True)
def test__get_transport_err(self, mock_transport):
mock_transport.side_effect = snmp_error.PySnmpError
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp_error.PySnmpError, client._get_transport)
mock_transport.assert_called_once_with((client.address, client.port))
@mock.patch.object(pysnmp, 'ContextData', authspec=True)
def test__get_context(self, mock_context):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
client._get_context()
mock_context.assert_called_once_with(contextEngineId=None,
contextName='')
@mock.patch.object(pysnmp, 'getCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get(self, mock_auth, mock_context, mock_transport,
mock_getcmd):
var_bind = (self.oid, self.value) var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value mock_getcmd.return_value = iter([("", None, 0, [var_bind])])
mock_cmdgenerator.getCmd.return_value = ("", None, 0, [var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
val = client.get(self.oid) val = client.get(self.oid)
self.assertEqual(var_bind[1], val) self.assertEqual(var_bind[1], val)
mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY, self.assertEqual(1, mock_getcmd.call_count)
self.oid)
@mock.patch.object(cmdgen, 'CommunityData', autospec=True) @mock.patch.object(pysnmp, 'nextCmd', authspec=True)
def test__get_auth_v1(self, mock_community, mock_cmdgen): @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1) @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
client._get_auth() @mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
mock_cmdgen.assert_called_once_with() def test_get_next(self, mock_auth, mock_context, mock_transport,
mock_community.assert_called_once_with(client.community, mpModel=0) mock_nextcmd):
@mock.patch.object(cmdgen, 'CommunityData', autospec=True)
def test__get_auth_v2c(self, mock_community, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V2C)
client._get_auth()
mock_cmdgen.assert_called_once_with()
mock_community.assert_called_once_with(client.community, mpModel=1)
@mock.patch.object(cmdgen, 'UsmUserData', autospec=True)
def test__get_auth_v3(self, mock_user, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_auth()
mock_cmdgen.assert_called_once_with()
mock_user.assert_called_once_with(client.security)
@mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True)
def test__get_transport(self, mock_transport, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_transport()
mock_cmdgen.assert_called_once_with()
mock_transport.assert_called_once_with((client.address, client.port))
@mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True)
def test__get_transport_err(self, mock_transport, mock_cmdgen):
mock_transport.side_effect = snmp_error.PySnmpError()
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp_error.PySnmpError, client._get_transport)
mock_cmdgen.assert_called_once_with()
mock_transport.assert_called_once_with((client.address, client.port))
def test_get_pdu_err(self, mock_cmdgen):
var_bind = (self.oid, self.value) var_bind = (self.oid, self.value)
error_status = mock.Mock() mock_nextcmd.return_value = iter([("", None, 0, [var_bind]),
error_status.prettyPrint = lambda: "pdu error" ("", None, 0, [var_bind])])
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.getCmd.return_value = (None, error_status, 1,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
def test_get_next(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.nextCmd.return_value = (
"", None, 0, [[var_bind, var_bind]])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
val = client.get_next(self.oid) val = client.get_next(self.oid)
self.assertEqual([self.value, self.value], val) self.assertEqual([self.value, self.value], val)
mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY, self.assertEqual(1, mock_nextcmd.call_count)
self.oid)
@mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True) @mock.patch.object(pysnmp, 'getCmd', authspec=True)
def test_get_err_transport(self, mock_transport, mock_cmdgen): @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get_err_transport(self, mock_auth, mock_context, mock_transport,
mock_getcmd):
mock_transport.side_effect = snmp_error.PySnmpError mock_transport.side_effect = snmp_error.PySnmpError
mock_cmdgenerator = mock_cmdgen.return_value var_bind = (self.oid, self.value)
mock_getcmd.return_value = iter([("engine error", None, 0,
[var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get, self.oid) self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
self.assertFalse(mock_cmdgenerator.getCmd.called) self.assertFalse(mock_getcmd.called)
@mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True) @mock.patch.object(pysnmp, 'nextCmd', authspec=True)
def test_get_next_err_transport(self, mock_transport, @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
mock_cmdgen): @mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get_next_err_transport(self, mock_auth, mock_context,
mock_transport, mock_nextcmd):
mock_transport.side_effect = snmp_error.PySnmpError mock_transport.side_effect = snmp_error.PySnmpError
mock_cmdgenerator = mock_cmdgen.return_value var_bind = (self.oid, self.value)
mock_nextcmd.return_value = iter([("engine error", None, 0,
[var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid) self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
self.assertFalse(mock_cmdgenerator.nextCmd.called) self.assertFalse(mock_nextcmd.called)
def test_get_err_engine(self, mock_cmdgen): @mock.patch.object(pysnmp, 'getCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get_err_engine(self, mock_auth, mock_context, mock_transport,
mock_getcmd):
var_bind = (self.oid, self.value) var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value mock_getcmd.return_value = iter([("engine error", None, 0,
mock_cmdgenerator.getCmd.return_value = ("engine error", None, 0, [var_bind])])
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get, self.oid) self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY, self.assertEqual(1, mock_getcmd.call_count)
self.oid)
def test_get_next_err_engine(self, mock_cmdgen): @mock.patch.object(pysnmp, 'nextCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get_next_err_engine(self, mock_auth, mock_context, mock_transport,
mock_nextcmd):
var_bind = (self.oid, self.value) var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value mock_nextcmd.return_value = iter([("engine error", None, 0,
mock_cmdgenerator.nextCmd.return_value = ("engine error", None, 0, [var_bind])])
[[var_bind, var_bind]])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid) self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY, self.assertEqual(1, mock_nextcmd.call_count)
self.oid)
def test_get_next_pdu_err(self, mock_cmdgen): @mock.patch.object(pysnmp, 'setCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_set(self, mock_auth, mock_context, mock_transport,
mock_setcmd):
var_bind = (self.oid, self.value) var_bind = (self.oid, self.value)
error_status = mock.Mock() mock_setcmd.return_value = iter([("", None, 0, [var_bind])])
error_status.prettyPrint = lambda: "pdu error"
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.nextCmd.return_value = (None, error_status, 1,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
def test_set(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.setCmd.return_value = ("", None, 0, [var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client.set(self.oid, self.value) client.set(self.oid, self.value)
mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY, self.assertEqual(1, mock_setcmd.call_count)
var_bind)
@mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True) @mock.patch.object(pysnmp, 'setCmd', authspec=True)
def test_set_err_transport(self, mock_transport, mock_cmdgen): @mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_set_err_transport(self, mock_auth, mock_context, mock_transport,
mock_setcmd):
mock_transport.side_effect = snmp_error.PySnmpError mock_transport.side_effect = snmp_error.PySnmpError
mock_cmdgenerator = mock_cmdgen.return_value
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, self.assertRaises(snmp.SNMPFailure,
client.set, self.oid, self.value) client.set, self.oid, self.value)
self.assertFalse(mock_cmdgenerator.setCmd.called) self.assertFalse(mock_setcmd.called)
def test_set_err_engine(self, mock_cmdgen): @mock.patch.object(pysnmp, 'setCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_set_err_engine(self, mock_auth, mock_context, mock_transport,
mock_setcmd):
var_bind = (self.oid, self.value) var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value mock_setcmd.return_value = iter([("engine error", None, 0,
mock_cmdgenerator.setCmd.return_value = ("engine error", None, 0, [var_bind])])
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3) client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, self.assertRaises(snmp.SNMPFailure,
client.set, self.oid, self.value) client.set, self.oid, self.value)
mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY, self.assertEqual(1, mock_setcmd.call_count)
var_bind)
def test_set_pdu_err(self, mock_cmdgen):
var_bind = (self.oid, self.value)
error_status = mock.Mock()
error_status.prettyPrint = lambda: "pdu error"
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.setCmd.return_value = (None, error_status, 1,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure,
client.set, self.oid, self.value)
mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
var_bind)