Add v3 authentication functionality to snmp module

Currently the snmp module implemented by python-scciclient does not do
any v3 authentication. This will cause failure when using SNMPv3 to
communicate with servers.

This commit fixes the above issue.

Change-Id: I95537918702e37d98b57b07bca88ed638e879dd8
This commit is contained in:
Shukun Song 2022-06-10 21:18:00 +09:00
parent 274dca0344
commit e0590f59fb
3 changed files with 240 additions and 171 deletions

View File

@ -757,11 +757,17 @@ def get_capabilities_properties(d_info,
:returns: a dictionary which contains keys and their values.
"""
snmp_client = snmp.SNMPClient(d_info['irmc_address'],
d_info['irmc_snmp_port'],
d_info['irmc_snmp_version'],
d_info['irmc_snmp_community'],
d_info['irmc_snmp_security'])
snmp_client = snmp.SNMPClient(
address=d_info['irmc_address'],
port=d_info['irmc_snmp_port'],
version=d_info['irmc_snmp_version'],
read_community=d_info['irmc_snmp_community'],
user=d_info.get('irmc_snmp_user'),
auth_proto=d_info.get('irmc_snmp_auth_proto'),
auth_key=d_info.get('irmc_snmp_auth_key'),
priv_proto=d_info.get('irmc_snmp_priv_proto'),
priv_key=d_info.get('irmc_snmp_priv_key'))
try:
v = {}
if 'rom_firmware_version' in capa_keys:

View File

@ -12,8 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp import error as snmp_error
from pysnmp import hlapi as snmp
import six
@ -125,42 +125,76 @@ class SNMPClient(object):
interaction with PySNMP to simplify dynamic importing and unit testing.
"""
def __init__(self, address, port, version, community=None, security=None):
def __init__(self, address, port, version,
read_community=None, write_community=None,
user=None, auth_proto=None, auth_key=None,
priv_proto=None, priv_key=None,
context_engine_id=None, context_name=None):
self.address = address
self.port = port
self.version = version
if self.version == SNMP_V3:
self.security = security
self.user = user
self.auth_proto = auth_proto
self.auth_key = auth_key
self.priv_proto = priv_proto
self.priv_key = priv_key
else:
self.community = community
self.cmd_gen = cmdgen.CommandGenerator()
self.read_community = read_community
self.write_community = write_community
def _get_auth(self):
self.context_engine_id = context_engine_id
self.context_name = context_name or ''
self.snmp_engine = snmp.SnmpEngine()
def _get_auth(self, write_mode=False):
"""Return the authorization data for an SNMP request.
:returns: A
:class:`pysnmp.entity.rfc3413.oneliner.cmdgen.CommunityData`
object.
:param write_mode: `True` if write class SNMP command is
executed. Default is `False`.
:returns: Either
:class:`pysnmp.hlapi.CommunityData`
or :class:`pysnmp.hlapi.UsmUserData`
object depending on SNMP version being used.
"""
if self.version == SNMP_V3:
# Handling auth/encryption credentials is not (yet) supported.
# This version supports a security name analogous to community.
return cmdgen.UsmUserData(self.security)
return snmp.UsmUserData(self.user,
authKey=self.auth_key,
authProtocol=self.auth_proto,
privKey=self.priv_key,
privProtocol=self.priv_proto)
else:
mp_model = 1 if self.version == SNMP_V2C else 0
return cmdgen.CommunityData(self.community, mpModel=mp_model)
return snmp.CommunityData(
self.write_community if write_mode else self.read_community,
mpModel=mp_model
)
def _get_transport(self):
"""Return the transport target for an SNMP request.
:returns: A :class:
`pysnmp.entity.rfc3413.oneliner.cmdgen.UdpTransportTarget` object.
:raises: snmp_error.PySnmpError if the transport address is bad.
`pysnmp.hlapi.UdpTransportTarget` object.
:raises: :class:`pysnmp.error.PySnmpError` if the transport address
is bad.
"""
# The transport target accepts timeout and retries parameters, which
# default to 1 (second) and 5 respectively. These are deemed sensible
# enough to allow for an unreliable network or slow device.
return cmdgen.UdpTransportTarget((self.address, self.port))
return snmp.UdpTransportTarget((self.address, self.port))
def _get_context(self):
"""Return the SNMP context for an SNMP request.
:returns: A :class:
`pysnmp.hlapi.ContextData` object.
:raises: :class:`pysnmp.error.PySnmpError` if SNMP context data
is bad.
"""
return snmp.ContextData(contextEngineId=self.context_engine_id,
contextName=self.context_name)
def get(self, oid):
"""Use PySNMP to perform an SNMP GET operation on a single object.
@ -170,13 +204,15 @@ class SNMPClient(object):
:returns: The value of the requested object.
"""
try:
results = self.cmd_gen.getCmd(self._get_auth(),
snmp_gen = snmp.getCmd(self.snmp_engine,
self._get_auth(),
self._get_transport(),
oid)
self._get_context(),
snmp.ObjectType(snmp.ObjectIdentity(oid)))
except snmp_error.PySnmpError as e:
raise SNMPFailure(SNMP_FAILURE_MSG % ("GET", e))
error_indication, error_status, error_index, var_binds = results
error_indication, error_status, error_index, var_binds = next(snmp_gen)
if error_indication:
# SNMP engine-level error.
@ -203,29 +239,38 @@ class SNMPClient(object):
:returns: A list of values of the requested table object.
"""
try:
results = self.cmd_gen.nextCmd(self._get_auth(),
snmp_gen = snmp.nextCmd(self.snmp_engine,
self._get_auth(),
self._get_transport(),
oid)
self._get_context(),
snmp.ObjectType(snmp.ObjectIdentity(oid)),
lexicographicMode=False)
except snmp_error.PySnmpError as e:
raise SNMPFailure(SNMP_FAILURE_MSG % ("GET_NEXT", e))
error_indication, error_status, error_index, var_binds = results
vals = []
for (error_indication, error_status, error_index,
var_binds) in snmp_gen:
if error_indication:
# SNMP engine-level error.
raise SNMPFailure(
SNMP_FAILURE_MSG % ("GET_NEXT", error_indication))
raise SNMPFailure(SNMP_FAILURE_MSG % ("GET_NEXT",
error_indication))
if error_status:
# SNMP PDU error.
raise SNMPFailure(
"SNMP operation '%(operation)s' failed: %(error)s at"
" %(index)s" %
{'operation': "GET_NEXT", 'error': error_status.prettyPrint(),
{'operation': "GET_NEXT",
'error': error_status.prettyPrint(),
'index':
error_index and var_binds[int(error_index) - 1] or '?'})
error_index and var_binds[int(error_index) - 1]
or '?'})
return [val for row in var_binds for name, val in row]
name, value = var_binds[0]
vals.append(value)
return vals
def set(self, oid, value):
"""Use PySNMP to perform an SNMP SET operation on a single object.
@ -235,13 +280,16 @@ class SNMPClient(object):
:raises: SNMPFailure if an SNMP request fails.
"""
try:
results = self.cmd_gen.setCmd(self._get_auth(),
snmp_gen = snmp.setCmd(self.snmp_engine,
self._get_auth(write_mode=True),
self._get_transport(),
(oid, value))
self._get_context(),
snmp.ObjectType(snmp.ObjectIdentity(oid),
value))
except snmp_error.PySnmpError as e:
raise SNMPFailure(SNMP_FAILURE_MSG % ("SET", e))
error_indication, error_status, error_index, var_binds = results
error_indication, error_status, error_index, var_binds = next(snmp_gen)
if error_indication:
# SNMP engine-level error.

View File

@ -17,8 +17,8 @@ Test class for snmp module.
from unittest import mock
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp import error as snmp_error
from pysnmp import hlapi as pysnmp
import testtools
from scciclient.irmc import snmp
@ -125,7 +125,6 @@ class IRMCSnmpTestCase(testtools.TestCase):
' failed: Error', str(e))
@mock.patch.object(cmdgen, 'CommandGenerator', autospec=True)
class SNMPClientTestCase(testtools.TestCase):
def setUp(self):
super(SNMPClientTestCase, self).setUp()
@ -134,172 +133,188 @@ class SNMPClientTestCase(testtools.TestCase):
self.oid = 'oid'
self.value = 'value'
def test___init__(self, mock_cmdgen):
@mock.patch.object(pysnmp, 'SnmpEngine', authspec=True)
def test___init__(self, mock_snmpengine):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
mock_cmdgen.assert_called_once_with()
mock_snmpengine.assert_called_once_with()
self.assertEqual(self.address, client.address)
self.assertEqual(self.port, client.port)
self.assertEqual(snmp.SNMP_V1, client.version)
self.assertIsNone(client.community)
self.assertNotIn('security', client.__dict__)
self.assertEqual(mock_cmdgen.return_value, client.cmd_gen)
self.assertIsNone(client.read_community)
self.assertIsNone(client.write_community)
self.assertNotIn('user', client.__dict__)
self.assertEqual(mock_snmpengine.return_value, client.snmp_engine)
def test_get(self, mock_cmdgen):
@mock.patch.object(pysnmp, 'CommunityData', autospec=True)
def test__get_auth_v1_read(self, mock_community):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1,
read_community='public',
write_community='private')
client._get_auth()
mock_community.assert_called_once_with(client.read_community,
mpModel=0)
@mock.patch.object(pysnmp, 'CommunityData', autospec=True)
def test__get_auth_v1_write(self, mock_community):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1,
read_community='public',
write_community='private')
client._get_auth(write_mode=True)
mock_community.assert_called_once_with(client.write_community,
mpModel=0)
@mock.patch.object(pysnmp, 'CommunityData', autospec=True)
def test__get_auth_v2c(self, mock_community):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V2C)
client._get_auth()
mock_community.assert_called_once_with(client.read_community,
mpModel=1)
@mock.patch.object(pysnmp, 'UsmUserData', autospec=True)
def test__get_auth_v3(self, mock_user):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_auth()
mock_user.assert_called_once_with(client.user,
authKey=client.auth_key,
authProtocol=client.auth_proto,
privKey=client.priv_key,
privProtocol=client.priv_proto)
@mock.patch.object(pysnmp, 'UdpTransportTarget', autospec=True)
def test__get_transport(self, mock_transport):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_transport()
mock_transport.assert_called_once_with((client.address, client.port))
@mock.patch.object(pysnmp, 'UdpTransportTarget', autospec=True)
def test__get_transport_err(self, mock_transport):
mock_transport.side_effect = snmp_error.PySnmpError
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp_error.PySnmpError, client._get_transport)
mock_transport.assert_called_once_with((client.address, client.port))
@mock.patch.object(pysnmp, 'ContextData', authspec=True)
def test__get_context(self, mock_context):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
client._get_context()
mock_context.assert_called_once_with(contextEngineId=None,
contextName='')
@mock.patch.object(pysnmp, 'getCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get(self, mock_auth, mock_context, mock_transport,
mock_getcmd):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.getCmd.return_value = ("", None, 0, [var_bind])
mock_getcmd.return_value = iter([("", None, 0, [var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
val = client.get(self.oid)
self.assertEqual(var_bind[1], val)
mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
self.assertEqual(1, mock_getcmd.call_count)
@mock.patch.object(cmdgen, 'CommunityData', autospec=True)
def test__get_auth_v1(self, mock_community, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V1)
client._get_auth()
mock_cmdgen.assert_called_once_with()
mock_community.assert_called_once_with(client.community, mpModel=0)
@mock.patch.object(cmdgen, 'CommunityData', autospec=True)
def test__get_auth_v2c(self, mock_community, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V2C)
client._get_auth()
mock_cmdgen.assert_called_once_with()
mock_community.assert_called_once_with(client.community, mpModel=1)
@mock.patch.object(cmdgen, 'UsmUserData', autospec=True)
def test__get_auth_v3(self, mock_user, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_auth()
mock_cmdgen.assert_called_once_with()
mock_user.assert_called_once_with(client.security)
@mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True)
def test__get_transport(self, mock_transport, mock_cmdgen):
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client._get_transport()
mock_cmdgen.assert_called_once_with()
mock_transport.assert_called_once_with((client.address, client.port))
@mock.patch.object(cmdgen, 'UdpTransportTarget', autospec=True)
def test__get_transport_err(self, mock_transport, mock_cmdgen):
mock_transport.side_effect = snmp_error.PySnmpError()
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp_error.PySnmpError, client._get_transport)
mock_cmdgen.assert_called_once_with()
mock_transport.assert_called_once_with((client.address, client.port))
def test_get_pdu_err(self, mock_cmdgen):
@mock.patch.object(pysnmp, 'nextCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get_next(self, mock_auth, mock_context, mock_transport,
mock_nextcmd):
var_bind = (self.oid, self.value)
error_status = mock.Mock()
error_status.prettyPrint = lambda: "pdu error"
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.getCmd.return_value = (None, error_status, 1,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
def test_get_next(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.nextCmd.return_value = (
"", None, 0, [[var_bind, var_bind]])
mock_nextcmd.return_value = iter([("", None, 0, [var_bind]),
("", None, 0, [var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
val = client.get_next(self.oid)
self.assertEqual([self.value, self.value], val)
mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
self.assertEqual(1, mock_nextcmd.call_count)
@mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True)
def test_get_err_transport(self, mock_transport, mock_cmdgen):
@mock.patch.object(pysnmp, 'getCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get_err_transport(self, mock_auth, mock_context, mock_transport,
mock_getcmd):
mock_transport.side_effect = snmp_error.PySnmpError
mock_cmdgenerator = mock_cmdgen.return_value
var_bind = (self.oid, self.value)
mock_getcmd.return_value = iter([("engine error", None, 0,
[var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
self.assertFalse(mock_cmdgenerator.getCmd.called)
self.assertFalse(mock_getcmd.called)
@mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True)
def test_get_next_err_transport(self, mock_transport,
mock_cmdgen):
@mock.patch.object(pysnmp, 'nextCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get_next_err_transport(self, mock_auth, mock_context,
mock_transport, mock_nextcmd):
mock_transport.side_effect = snmp_error.PySnmpError
mock_cmdgenerator = mock_cmdgen.return_value
var_bind = (self.oid, self.value)
mock_nextcmd.return_value = iter([("engine error", None, 0,
[var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
self.assertFalse(mock_cmdgenerator.nextCmd.called)
self.assertFalse(mock_nextcmd.called)
def test_get_err_engine(self, mock_cmdgen):
@mock.patch.object(pysnmp, 'getCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get_err_engine(self, mock_auth, mock_context, mock_transport,
mock_getcmd):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.getCmd.return_value = ("engine error", None, 0,
[var_bind])
mock_getcmd.return_value = iter([("engine error", None, 0,
[var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get, self.oid)
mock_cmdgenerator.getCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
self.assertEqual(1, mock_getcmd.call_count)
def test_get_next_err_engine(self, mock_cmdgen):
@mock.patch.object(pysnmp, 'nextCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_get_next_err_engine(self, mock_auth, mock_context, mock_transport,
mock_nextcmd):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.nextCmd.return_value = ("engine error", None, 0,
[[var_bind, var_bind]])
mock_nextcmd.return_value = iter([("engine error", None, 0,
[var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
self.assertEqual(1, mock_nextcmd.call_count)
def test_get_next_pdu_err(self, mock_cmdgen):
@mock.patch.object(pysnmp, 'setCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_set(self, mock_auth, mock_context, mock_transport,
mock_setcmd):
var_bind = (self.oid, self.value)
error_status = mock.Mock()
error_status.prettyPrint = lambda: "pdu error"
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.nextCmd.return_value = (None, error_status, 1,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure, client.get_next, self.oid)
mock_cmdgenerator.nextCmd.assert_called_once_with(mock.ANY, mock.ANY,
self.oid)
def test_set(self, mock_cmdgen):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.setCmd.return_value = ("", None, 0, [var_bind])
mock_setcmd.return_value = iter([("", None, 0, [var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
client.set(self.oid, self.value)
mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
var_bind)
self.assertEqual(1, mock_setcmd.call_count)
@mock.patch.object(snmp.SNMPClient, '_get_transport', autospec=True)
def test_set_err_transport(self, mock_transport, mock_cmdgen):
@mock.patch.object(pysnmp, 'setCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_set_err_transport(self, mock_auth, mock_context, mock_transport,
mock_setcmd):
mock_transport.side_effect = snmp_error.PySnmpError
mock_cmdgenerator = mock_cmdgen.return_value
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure,
client.set, self.oid, self.value)
self.assertFalse(mock_cmdgenerator.setCmd.called)
self.assertFalse(mock_setcmd.called)
def test_set_err_engine(self, mock_cmdgen):
@mock.patch.object(pysnmp, 'setCmd', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_transport', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_context', authspec=True)
@mock.patch.object(snmp.SNMPClient, '_get_auth', authspec=True)
def test_set_err_engine(self, mock_auth, mock_context, mock_transport,
mock_setcmd):
var_bind = (self.oid, self.value)
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.setCmd.return_value = ("engine error", None, 0,
[var_bind])
mock_setcmd.return_value = iter([("engine error", None, 0,
[var_bind])])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure,
client.set, self.oid, self.value)
mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
var_bind)
def test_set_pdu_err(self, mock_cmdgen):
var_bind = (self.oid, self.value)
error_status = mock.Mock()
error_status.prettyPrint = lambda: "pdu error"
mock_cmdgenerator = mock_cmdgen.return_value
mock_cmdgenerator.setCmd.return_value = (None, error_status, 1,
[var_bind])
client = snmp.SNMPClient(self.address, self.port, snmp.SNMP_V3)
self.assertRaises(snmp.SNMPFailure,
client.set, self.oid, self.value)
mock_cmdgenerator.setCmd.assert_called_once_with(mock.ANY, mock.ANY,
var_bind)
self.assertEqual(1, mock_setcmd.call_count)