Add test cases for hdsdiscovery and poll_switch
Change-Id: I04dec54881feefb8c127705f4ed59e950fc9ba23
This commit is contained in:
parent
2688e27054
commit
f7508f3f33
@ -29,7 +29,7 @@ class BaseVendor(object):
|
||||
"""Basic Vendor object."""
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
def is_this_vendor(self, host, credential, sys_info, **kwargs):
|
||||
def is_this_vendor(self, sys_info, **kwargs):
|
||||
"""Determine if the host is associated with this vendor.
|
||||
This function must be implemented by vendor itself
|
||||
"""
|
||||
@ -47,9 +47,13 @@ class BaseSnmpVendor(BaseVendor):
|
||||
super(BaseSnmpVendor, self).__init__()
|
||||
self._matched_names = matched_names
|
||||
|
||||
def is_this_vendor(self, host, credential, sys_info, **kwargs):
|
||||
"""Determine if the host is associated with this vendor."""
|
||||
if utils.is_valid_snmp_v2_credential(credential) and sys_info:
|
||||
def is_this_vendor(self, sys_info, **kwargs):
|
||||
"""Determine if the switch belongs to this vendor by matching the
|
||||
system information retrieved from the switch.
|
||||
:param str sys_info: the system information retrieved from a switch
|
||||
Return True
|
||||
"""
|
||||
if sys_info:
|
||||
for name in self._matched_names:
|
||||
if re.search(r"\b" + re.escape(name) + r"\b", sys_info,
|
||||
re.IGNORECASE):
|
||||
|
@ -83,7 +83,7 @@ class HDManager(object):
|
||||
"No such vendor found!")
|
||||
return False
|
||||
|
||||
if instance.is_this_vendor(host, credential, sys_info):
|
||||
if instance.is_this_vendor(sys_info):
|
||||
logging.info("[hdsdiscovery][hdmanager][is_valid_vendor]"
|
||||
"vendor %s is correct!", vendor)
|
||||
return True
|
||||
@ -134,7 +134,7 @@ class HDManager(object):
|
||||
logging.error('no instance %s load from %s', vname, vpath)
|
||||
continue
|
||||
|
||||
if instance.is_this_vendor(host, credential, sys_info):
|
||||
if instance.is_this_vendor(sys_info):
|
||||
logging.info("[get_vendor]****Found vendor '%s'****", vname)
|
||||
vendor = vname
|
||||
break
|
||||
|
@ -42,7 +42,7 @@ def load_module(mod_name, path, host=None, credential=None):
|
||||
|
||||
return instance
|
||||
except ImportError as exc:
|
||||
logging.error('No such plugin : %s', mod_name)
|
||||
logging.error('No such module found: %s', mod_name)
|
||||
logging.exception(exc)
|
||||
return None
|
||||
|
||||
@ -232,10 +232,7 @@ def snmpget_by_cl(host, credential, oid, timeout=8, retries=3):
|
||||
cmd = "snmpget -v %s -c %s -Ob -r %s -t %s %s %s" % (
|
||||
version, community, retries, timeout, host, oid)
|
||||
output = None
|
||||
sub_p = subprocess.Popen(cmd, shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
output, err = sub_p.communicate()
|
||||
output, err = exec_command(cmd)
|
||||
|
||||
if err:
|
||||
logging.error("[snmpget_by_cl] %s", err)
|
||||
@ -255,9 +252,8 @@ def snmpwalk_by_cl(host, credential, oid, timeout=5, retries=3):
|
||||
community = credential['community']
|
||||
cmd = "snmpwalk -v %s -c %s -Cc -r %s -t %s -Ob %s %s" % (
|
||||
version, community, retries, timeout, host, oid)
|
||||
output = []
|
||||
sub_p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
|
||||
output, err = sub_p.communicate()
|
||||
|
||||
output, err = exec_command(cmd)
|
||||
|
||||
if err:
|
||||
logging.debug("[snmpwalk_by_cl] %s ", err)
|
||||
@ -278,3 +274,14 @@ def snmpwalk_by_cl(host, credential, oid, timeout=5, retries=3):
|
||||
result.append(temp)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def exec_command(command):
|
||||
"""Execute command.
|
||||
Return a tuple of output and error message(None if no error).
|
||||
"""
|
||||
sub_p = subprocess.Popen(command,
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
return sub_p.communicate()
|
||||
|
@ -29,37 +29,38 @@ class OVSwitch(base.BaseVendor):
|
||||
def __init__(self):
|
||||
self.__name = "Open vSwitch"
|
||||
|
||||
def is_this_vendor(self, host, credential, sys_info, **kwargs):
|
||||
def is_this_vendor(self, sys_info, host=None, credential=None, **kwargs):
|
||||
"""Determine if the hostname is accociated witH this vendor.
|
||||
|
||||
:param host: swtich's IP address
|
||||
:param credential: credential to access switch
|
||||
"""
|
||||
if utils.is_valid_ssh_credential(credential):
|
||||
user = credential['username']
|
||||
pwd = credential['password']
|
||||
result = sys_info
|
||||
if host and credential:
|
||||
if utils.is_valid_ssh_credential(credential):
|
||||
user = credential['username']
|
||||
pwd = credential['password']
|
||||
|
||||
else:
|
||||
msg = ("[OVSwitch]The format of credential %r is not for SSH "
|
||||
"or incorrect Keywords! " % credential)
|
||||
logging.info(msg)
|
||||
return False
|
||||
|
||||
cmd = "ovs-vsctl -V"
|
||||
result = None
|
||||
try:
|
||||
result = utils.ssh_remote_execute(host, user, pwd, cmd)
|
||||
logging.debug('%s result for %s is %s', cmd, host, result)
|
||||
if not result:
|
||||
else:
|
||||
msg = ("[OVSwitch]The format of credential %r is not for SSH "
|
||||
"or incorrect Keywords! " % credential)
|
||||
logging.info(msg)
|
||||
return False
|
||||
except Exception as exc:
|
||||
logging.error("vendor incorrect or connection failed to run %s",
|
||||
cmd)
|
||||
logging.exception(exc)
|
||||
return False
|
||||
|
||||
if isinstance(result, str):
|
||||
result = [result]
|
||||
cmd = "ovs-vsctl -V"
|
||||
result = None
|
||||
try:
|
||||
result = utils.ssh_remote_execute(host, user, pwd, cmd)
|
||||
logging.debug('%s result for %s is %s', cmd, host, result)
|
||||
if not result:
|
||||
return False
|
||||
except Exception as exc:
|
||||
logging.error("No vendor or connection failed to run %s", cmd)
|
||||
logging.exception(exc)
|
||||
return False
|
||||
|
||||
if isinstance(result, str):
|
||||
result = [result]
|
||||
|
||||
for line in result:
|
||||
if not line:
|
||||
|
126
compass/tests/actions/test_poll_switch.py
Executable file
126
compass/tests/actions/test_poll_switch.py
Executable file
@ -0,0 +1,126 @@
|
||||
#!/usr/bin/python
|
||||
#
|
||||
# Copyright 2014 Huawei Technologies Co. Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""test poll_switch action module."""
|
||||
from mock import patch
|
||||
import os
|
||||
import unittest2
|
||||
|
||||
|
||||
os.environ['COMPASS_IGNORE_SETTING'] = 'true'
|
||||
|
||||
|
||||
from compass.utils import setting_wrapper as setting
|
||||
reload(setting)
|
||||
|
||||
|
||||
from compass.actions import poll_switch
|
||||
from compass.api import app
|
||||
from compass.db import database
|
||||
from compass.db.model import Machine
|
||||
from compass.db.model import Switch
|
||||
from compass.db.model import SwitchConfig
|
||||
from compass.utils import flags
|
||||
from compass.utils import logsetting
|
||||
|
||||
|
||||
class TestPollSwitch(unittest2.TestCase):
|
||||
"""base api test class."""
|
||||
|
||||
CLUSTER_NAME = "Test1"
|
||||
SWITCH_CREDENTIAL = {'version': '2c',
|
||||
'community': 'public'}
|
||||
DATABASE_URL = 'sqlite://'
|
||||
|
||||
def setUp(self):
|
||||
super(TestPollSwitch, self).setUp()
|
||||
logsetting.init()
|
||||
database.init(self.DATABASE_URL)
|
||||
database.create_db()
|
||||
self.test_client = app.test_client()
|
||||
|
||||
with database.session() as session:
|
||||
# Add one switch to DB
|
||||
switch = Switch(ip="127.0.0.1",
|
||||
credential=self.SWITCH_CREDENTIAL)
|
||||
session.add(switch)
|
||||
# Add filter port to SwitchConfig table
|
||||
filter_list = [
|
||||
SwitchConfig(ip="127.0.0.1", filter_port='6'),
|
||||
SwitchConfig(ip="127.0.0.1", filter_port='7')
|
||||
]
|
||||
session.add_all(filter_list)
|
||||
|
||||
def tearDown(self):
|
||||
database.drop_db()
|
||||
super(TestPollSwitch, self).tearDown()
|
||||
|
||||
@patch("compass.hdsdiscovery.hdmanager.HDManager.learn")
|
||||
@patch("compass.hdsdiscovery.hdmanager.HDManager.get_vendor")
|
||||
def test_poll_switch(self, mock_get_vendor, mock_learn):
|
||||
# Incorrect IP address format
|
||||
poll_switch.poll_switch("xxx")
|
||||
with database.session() as session:
|
||||
machines = session.query(Machine).filter_by(switch_id=1).all()
|
||||
self.assertEqual([], machines)
|
||||
|
||||
# Switch is unreachable
|
||||
mock_get_vendor.return_value = (None, 'unreachable', 'Timeout')
|
||||
poll_switch.poll_switch('127.0.0.1')
|
||||
with database.session() as session:
|
||||
machines = session.query(Machine).filter_by(switch_id=1).all()
|
||||
self.assertEqual([], machines)
|
||||
|
||||
switch = session.query(Switch).filter_by(id=1).first()
|
||||
self.assertEqual(switch.state, 'unreachable')
|
||||
|
||||
# Successfully retrieve machines from the switch
|
||||
mock_get_vendor.return_value = ('xxx', 'Found', "")
|
||||
mock_learn.return_value = [
|
||||
{'mac': '00:01:02:03:04:05', 'vlan': '1', 'port': '1'},
|
||||
{'mac': '00:01:02:03:04:06', 'vlan': '1', 'port': '2'},
|
||||
{'mac': '00:01:02:03:04:07', 'vlan': '2', 'port': '3'},
|
||||
{'mac': '00:01:02:03:04:08', 'vlan': '2', 'port': '4'},
|
||||
{'mac': '00:01:02:03:04:09', 'vlan': '3', 'port': '5'}
|
||||
]
|
||||
poll_switch.poll_switch('127.0.0.1')
|
||||
with database.session() as session:
|
||||
machines = session.query(Machine).filter_by(switch_id=1).all()
|
||||
self.assertEqual(5, len(machines))
|
||||
# The state and err_msg of the switch should be reset.
|
||||
switch = session.query(Switch).filter_by(id=1).first()
|
||||
self.assertEqual(switch.state, "under_monitoring")
|
||||
self.assertEqual(switch.err_msg, "")
|
||||
|
||||
# Successfully retrieve and filter some machines
|
||||
# In the following case, machines with port 6, 7 will be filtered.
|
||||
mock_learn.return_value = [
|
||||
{'mac': '00:01:02:03:04:10', 'vlan': '3', 'port': '6'},
|
||||
{'mac': '00:01:02:03:04:0a', 'vlan': '4', 'port': '7'},
|
||||
{'mac': '00:01:02:03:04:0b', 'vlan': '4', 'port': '8'},
|
||||
{'mac': '00:01:02:03:04:0c', 'vlan': '5', 'port': '9'},
|
||||
{'mac': '00:01:02:03:04:0d', 'vlan': '5', 'port': '10'}
|
||||
]
|
||||
poll_switch.poll_switch('127.0.0.1')
|
||||
with database.session() as session:
|
||||
machines = session.query(Machine).filter_by(switch_id=1).all()
|
||||
self.assertEqual(8, len(machines))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
flags.init()
|
||||
logsetting.init()
|
||||
unittest2.main()
|
@ -30,6 +30,7 @@ reload(setting)
|
||||
|
||||
from compass.hdsdiscovery.base import BaseSnmpMacPlugin
|
||||
from compass.hdsdiscovery.base import BaseSnmpVendor
|
||||
from compass.hdsdiscovery.error import TimeoutError
|
||||
from compass.utils import flags
|
||||
from compass.utils import logsetting
|
||||
|
||||
@ -47,7 +48,7 @@ class TestBaseSnmpMacPlugin(unittest2.TestCase):
|
||||
def setUp(self):
|
||||
super(TestBaseSnmpMacPlugin, self).setUp()
|
||||
logsetting.init()
|
||||
self.test_plugin = BaseSnmpMacPlugin('12.0.0.1',
|
||||
self.test_plugin = BaseSnmpMacPlugin('127.0.0.1',
|
||||
{'version': '2c',
|
||||
'community': 'public'})
|
||||
|
||||
@ -58,10 +59,16 @@ class TestBaseSnmpMacPlugin(unittest2.TestCase):
|
||||
@patch('compass.hdsdiscovery.utils.snmpget_by_cl')
|
||||
def test_get_port(self, mock_snmpget):
|
||||
"""test snmp get port."""
|
||||
# Successfully get port number
|
||||
mock_snmpget.return_value = 'IF-MIB::ifName.4 = STRING: ge-1/1/4'
|
||||
result = self.test_plugin.get_port('4')
|
||||
self.assertEqual('4', result)
|
||||
|
||||
# Failed to get port number, switch is timeout
|
||||
mock_snmpget.side_effect = TimeoutError("Timeout")
|
||||
result = self.test_plugin.get_port('4')
|
||||
self.assertIsNone(result)
|
||||
|
||||
@patch('compass.hdsdiscovery.utils.snmpget_by_cl')
|
||||
def test_get_vlan_id(self, mock_snmpget):
|
||||
"""test snmp get vlan."""
|
||||
@ -73,6 +80,11 @@ class TestBaseSnmpMacPlugin(unittest2.TestCase):
|
||||
result = self.test_plugin.get_vlan_id('4')
|
||||
self.assertEqual('100', result)
|
||||
|
||||
# Faild to query switch due to timeout
|
||||
mock_snmpget.side_effect = TimeoutError("Timeout")
|
||||
result = self.test_plugin.get_vlan_id('4')
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_get_mac_address(self):
|
||||
"""tet snmp get mac address."""
|
||||
# Correct input for mac numbers
|
||||
@ -100,33 +112,15 @@ class BaseTest(unittest2.TestCase):
|
||||
"""test base snmp vendor."""
|
||||
fake = MockSnmpVendor()
|
||||
|
||||
credential = {"version": "2c",
|
||||
"community": "public"}
|
||||
is_vendor = fake.is_this_vendor("12.0.0.1", credential,
|
||||
"FakeVendor 1.1")
|
||||
is_vendor = fake.is_this_vendor("FakeVendor 1.1")
|
||||
|
||||
self.assertTrue(is_vendor)
|
||||
|
||||
# check case-insensitive match
|
||||
|
||||
self.assertFalse(fake.is_this_vendor("12.0.0.1", credential,
|
||||
"fakevendor1.1"))
|
||||
self.assertFalse(fake.is_this_vendor("fakevendor1.1"))
|
||||
|
||||
# breaks word-boudary match
|
||||
self.assertFalse(fake.is_this_vendor("12.0.0.1", credential,
|
||||
"FakeVendor1.1"))
|
||||
|
||||
# Not SNMP credentials
|
||||
self.assertFalse(fake.is_this_vendor("12.0.0.1",
|
||||
{"username": "root",
|
||||
"password": "test123"},
|
||||
"fakevendor1.1"))
|
||||
|
||||
# Not SNMP v2 credentials
|
||||
self.assertFalse(fake.is_this_vendor("12.0.0.1",
|
||||
{"version": "v1",
|
||||
"community": "public"},
|
||||
"fakevendor1.1"))
|
||||
self.assertFalse(fake.is_this_vendor("FakeVendor1.1"))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -18,9 +18,9 @@
|
||||
import os
|
||||
import unittest2
|
||||
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
|
||||
|
||||
os.environ['COMPASS_IGNORE_SETTING'] = 'true'
|
||||
|
||||
|
||||
@ -31,10 +31,15 @@ reload(setting)
|
||||
from compass.hdsdiscovery.hdmanager import HDManager
|
||||
from compass.hdsdiscovery.vendors.huawei.huawei import Huawei
|
||||
from compass.hdsdiscovery.vendors.huawei.plugins.mac import Mac
|
||||
from compass.hdsdiscovery.vendors.ovswitch.plugins.mac import Mac as OVSMac
|
||||
from compass.utils import flags
|
||||
from compass.utils import logsetting
|
||||
|
||||
|
||||
SNMP_V2_CREDENTIALS = {'version': '2c',
|
||||
'community': 'public'}
|
||||
|
||||
|
||||
class HuaweiTest(unittest2.TestCase):
|
||||
"""test huawei switch snmp get."""
|
||||
|
||||
@ -42,8 +47,6 @@ class HuaweiTest(unittest2.TestCase):
|
||||
super(HuaweiTest, self).setUp()
|
||||
logsetting.init()
|
||||
self.huawei = Huawei()
|
||||
self.correct_host = '12.23.1.1'
|
||||
self.correct_credentials = {'version': '2c', 'community': 'public'}
|
||||
self.sys_info = 'Huawei Technologies'
|
||||
|
||||
def tearDown(self):
|
||||
@ -52,25 +55,14 @@ class HuaweiTest(unittest2.TestCase):
|
||||
|
||||
def test_is_this_vendor(self):
|
||||
"""test device vendor is haiwei."""
|
||||
#Credential's keyword is incorrect
|
||||
#Incorrect system information
|
||||
incorrect_sys_info = "xxx"
|
||||
self.assertFalse(
|
||||
self.huawei.is_this_vendor(self.correct_host,
|
||||
{'username': 'root',
|
||||
'password': 'root'},
|
||||
self.sys_info))
|
||||
|
||||
#Incorrect version
|
||||
self.assertFalse(
|
||||
self.huawei.is_this_vendor(self.correct_host,
|
||||
{'version': 'v1',
|
||||
'community': 'public'},
|
||||
self.sys_info))
|
||||
self.huawei.is_this_vendor(incorrect_sys_info))
|
||||
|
||||
#Correct vendor
|
||||
self.assertTrue(
|
||||
self.huawei.is_this_vendor(self.correct_host,
|
||||
self.correct_credentials,
|
||||
self.sys_info))
|
||||
self.huawei.is_this_vendor(self.sys_info))
|
||||
|
||||
|
||||
class HuaweiMacTest(unittest2.TestCase):
|
||||
@ -79,7 +71,7 @@ class HuaweiMacTest(unittest2.TestCase):
|
||||
def setUp(self):
|
||||
super(HuaweiMacTest, self).setUp()
|
||||
logsetting.init()
|
||||
host = '12.23.1.1'
|
||||
host = '192.168.1.1'
|
||||
credential = {'version': '2c', 'community': 'public'}
|
||||
self.mac_plugin = Mac(host, credential)
|
||||
|
||||
@ -87,13 +79,34 @@ class HuaweiMacTest(unittest2.TestCase):
|
||||
del self.mac_plugin
|
||||
super(HuaweiMacTest, self).tearDown()
|
||||
|
||||
def test_process_data(self):
|
||||
@patch("compass.hdsdiscovery.utils.snmpwalk_by_cl")
|
||||
def test_process_data(self, mock_snmpwalk):
|
||||
"""get progress data function."""
|
||||
# GET operation haven't been implemeneted.
|
||||
self.assertIsNone(self.mac_plugin.process_data('GET'))
|
||||
|
||||
# SNMP Walk Timeout
|
||||
#utils.snmpwalk_by_cl = Mock(return_value=None)
|
||||
mock_snmpwalk.return_value = None
|
||||
self.assertIsNone(self.mac_plugin.process_data())
|
||||
|
||||
from compass.hdsdiscovery.vendors.ovswitch.plugins.mac import Mac as OVSMac
|
||||
# Successfully get MAC addresses from the switch
|
||||
mock_snmp_walk_result = [
|
||||
{"iid": "40.110.212.77.198.190.88.1.48", "value": "10"},
|
||||
{"iid": "40.110.212.100.199.74.88.1.48", "value": "11"},
|
||||
{"iid": "0.12.41.53.220.2.88.1.48", "value": "12"}
|
||||
]
|
||||
expected_mac_info = [
|
||||
{"mac": "28:6e:d4:4d:c6:be", "port": "1", "vlan": "88"},
|
||||
{"mac": "28:6e:d4:64:c7:4a", "port": "2", "vlan": "88"},
|
||||
{"mac": "00:0c:29:35:dc:02", "port": "3", "vlan": "88"}
|
||||
]
|
||||
#utils.snmpwalk_by_cl = Mock(return_value=mock_snmp_walk_result)
|
||||
mock_snmpwalk.return_value = mock_snmp_walk_result
|
||||
self.mac_plugin.get_port = Mock()
|
||||
self.mac_plugin.get_port.side_effect = ["1", "2", "3"]
|
||||
result = self.mac_plugin.process_data()
|
||||
self.assertEqual(expected_mac_info, result)
|
||||
|
||||
|
||||
class OVSMacTest(unittest2.TestCase):
|
||||
@ -126,7 +139,7 @@ class HDManagerTest(unittest2.TestCase):
|
||||
super(HDManagerTest, self).setUp()
|
||||
logsetting.init()
|
||||
self.manager = HDManager()
|
||||
self.correct_host = '12.23.1.1'
|
||||
self.correct_host = '127.0.0.1'
|
||||
self.correct_credential = {'version': '2c', 'community': 'public'}
|
||||
|
||||
def tearDown(self):
|
||||
@ -137,17 +150,20 @@ class HDManagerTest(unittest2.TestCase):
|
||||
def test_get_vendor(self, sys_info_mock):
|
||||
"""test get_vendor."""
|
||||
# Incorrect ip
|
||||
self.assertIsNone(self.manager.get_vendor('1234.1.1.1',
|
||||
self.correct_credential)[0])
|
||||
vendor, state, err = self.manager.get_vendor('1234.1.1.1',
|
||||
self.correct_credential)
|
||||
self.assertIsNone(vendor)
|
||||
self.assertEqual('error', state)
|
||||
|
||||
# Incorrect credential
|
||||
self.assertIsNone(
|
||||
self.manager.get_vendor(self.correct_host,
|
||||
{'version': '1v',
|
||||
'community': 'private'})[0])
|
||||
incorr_cred = {'version': '1v', 'community': 'private'}
|
||||
vendor, state, err = self.manager.get_vendor(self.correct_host,
|
||||
incorr_cred)
|
||||
self.assertIsNone(vendor)
|
||||
self.assertEqual('error', state)
|
||||
|
||||
# SNMP get system description Timeout
|
||||
excepted_err_msg = 'Timeout: No Response from 12.23.1.1.'
|
||||
excepted_err_msg = 'Timeout: No Response from 127.0.0.1.'
|
||||
sys_info_mock.return_value = (None, excepted_err_msg)
|
||||
result, state, err = self.manager.get_vendor(self.correct_host,
|
||||
self.correct_credential)
|
||||
@ -171,34 +187,44 @@ class HDManagerTest(unittest2.TestCase):
|
||||
expected_vendor_names = ['huawei', 'hp', 'pica8']
|
||||
for info, expected_vendor in zip(sys_info, expected_vendor_names):
|
||||
sys_info_mock.return_value = (info, '')
|
||||
result, state, err = self.manager\
|
||||
.get_vendor(self.correct_host,
|
||||
self.correct_credential)
|
||||
self.assertEqual(result, expected_vendor)
|
||||
# the result is a tuple ($vendor, $state, $error_message)
|
||||
result = self.manager.get_vendor(self.correct_host,
|
||||
self.correct_credential)
|
||||
self.assertEqual(result[0], expected_vendor)
|
||||
|
||||
@patch('compass.hdsdiscovery.hdmanager.HDManager.get_sys_info')
|
||||
def test_is_valid_vendor(self, sys_info_mock):
|
||||
"""test is_valid_vendor."""
|
||||
#non-exsiting vendor
|
||||
self.assertFalse(self.manager.is_valid_vendor(self.correct_host,
|
||||
self.correct_credential,
|
||||
'xxxx'))
|
||||
#non-exsiting vendor under vendors directory
|
||||
self.assertFalse(
|
||||
self.manager.is_valid_vendor(self.correct_host,
|
||||
self.correct_credential,
|
||||
'xxxx')
|
||||
)
|
||||
|
||||
#No system description retrieved
|
||||
sys_info_mock.return_value = (None, 'TIMEOUT')
|
||||
self.assertFalse(self.manager.is_valid_vendor(self.correct_host,
|
||||
self.correct_credential,
|
||||
'pica8'))
|
||||
self.assertFalse(
|
||||
self.manager.is_valid_vendor(self.correct_host,
|
||||
self.correct_credential,
|
||||
'pica8')
|
||||
)
|
||||
|
||||
#Incorrect vendor name
|
||||
sys_info = 'Pica8 XorPlus Platform Software'
|
||||
sys_info_mock.return_value = (sys_info, '')
|
||||
self.assertFalse(self.manager.is_valid_vendor(self.correct_host,
|
||||
self.correct_credential,
|
||||
'huawei'))
|
||||
self.assertFalse(
|
||||
self.manager.is_valid_vendor(self.correct_host,
|
||||
self.correct_credential,
|
||||
'huawei')
|
||||
)
|
||||
|
||||
#Correct vendor name
|
||||
self.assertTrue(self.manager.is_valid_vendor(self.correct_host,
|
||||
self.correct_credential,
|
||||
'pica8'))
|
||||
self.assertTrue(
|
||||
self.manager.is_valid_vendor(self.correct_host,
|
||||
self.correct_credential,
|
||||
'pica8')
|
||||
)
|
||||
|
||||
def test_learn(self):
|
||||
"""test learn."""
|
||||
@ -213,21 +239,6 @@ class HDManagerTest(unittest2.TestCase):
|
||||
'xxxx', 'mac'))
|
||||
|
||||
|
||||
from compass.hdsdiscovery import utils
|
||||
|
||||
|
||||
class UtilsTest(unittest2.TestCase):
|
||||
"""hdsdiscovery util test class."""
|
||||
|
||||
def setUp(self):
|
||||
super(UtilsTest, self).setUp()
|
||||
logsetting.init()
|
||||
|
||||
def test_load_module(self):
|
||||
"""test load_module."""
|
||||
self.assertIsNone(utils.load_module('xxx', 'fake/path/to/module'))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
flags.init()
|
||||
logsetting.init()
|
||||
|
105
compass/tests/hdsdiscovery/test_utils.py
Executable file
105
compass/tests/hdsdiscovery/test_utils.py
Executable file
@ -0,0 +1,105 @@
|
||||
#!/usr/bin/python
|
||||
#
|
||||
# Copyright 2014 Huawei Technologies Co. Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""test hdsdiscovery.utils module."""
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
import os
|
||||
import unittest2
|
||||
|
||||
|
||||
os.environ['COMPASS_IGNORE_SETTING'] = 'true'
|
||||
|
||||
|
||||
from compass.utils import setting_wrapper as setting
|
||||
reload(setting)
|
||||
|
||||
|
||||
from compass.hdsdiscovery.error import TimeoutError
|
||||
from compass.hdsdiscovery import utils
|
||||
from compass.utils import flags
|
||||
from compass.utils import logsetting
|
||||
|
||||
|
||||
SNMP_V2_CREDENTIALS = {'version': '2c',
|
||||
'community': 'public'}
|
||||
|
||||
|
||||
class UtilsTest(unittest2.TestCase):
|
||||
"""test huawei switch snmp get."""
|
||||
|
||||
def setUp(self):
|
||||
super(UtilsTest, self).setUp()
|
||||
logsetting.init()
|
||||
self.host = "127.0.0.1"
|
||||
self.credentials = SNMP_V2_CREDENTIALS
|
||||
|
||||
def tearDown(self):
|
||||
super(UtilsTest, self).tearDown()
|
||||
|
||||
def test_load_module(self):
|
||||
"""get load_module function."""
|
||||
# Successfully load HUAWEI module
|
||||
huawei_vendor_path = "/".join(
|
||||
(os.path.dirname(
|
||||
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))),
|
||||
"hdsdiscovery/vendors/huawei")
|
||||
)
|
||||
|
||||
# No module found
|
||||
self.assertIsNone(utils.load_module("xxx", huawei_vendor_path))
|
||||
|
||||
@patch("compass.hdsdiscovery.utils.exec_command")
|
||||
def test_snmpget_by_cl(self, mock_exec_command):
|
||||
oid = "sysDescr.0"
|
||||
# Incorrect credentials
|
||||
incorr_credentials = {"version": "1v", "community": "public"}
|
||||
self.assertIsNone(utils.snmpget_by_cl(self.host,
|
||||
incorr_credentials,
|
||||
oid))
|
||||
# Switch timeout, failed to execute SNMPGET
|
||||
mock_exec_command.return_value = (None, "Timeout")
|
||||
with self.assertRaises(TimeoutError):
|
||||
utils.snmpget_by_cl(self.host, self.credentials, oid)
|
||||
|
||||
# Successfully get system information
|
||||
mock_exec_command.return_value = ("Huawei Technologies", None)
|
||||
result = utils.snmpget_by_cl(self.host, self.credentials, oid)
|
||||
self.assertEqual("Huawei Technologies", result)
|
||||
|
||||
def test_snmpwalk_by_cl(self):
|
||||
oid = "BRIDGE-MIB::dot1dTpFdbPort"
|
||||
# the result of SNMPWALK is None
|
||||
utils.exec_command = Mock(return_value=(None, None))
|
||||
result = utils.snmpwalk_by_cl(self.host, self.credentials, oid)
|
||||
self.assertEqual([], result)
|
||||
|
||||
# Successfully execute SNMPWALK
|
||||
return_value = ("xxx.0.12.41.112.143.193 = INTEGER: 47\n"
|
||||
"xxx.0.12.41.139.17.124 = INTEGER: 47\n")
|
||||
expected_result = [
|
||||
{"iid": "0.12.41.112.143.193", "value": "47"},
|
||||
{"iid": "0.12.41.139.17.124", "value": "47"}
|
||||
]
|
||||
utils.exec_command = Mock(return_value=(return_value, None))
|
||||
result = utils.snmpwalk_by_cl(self.host, self.credentials, oid)
|
||||
self.assertEqual(expected_result, result)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
flags.init()
|
||||
logsetting.init()
|
||||
unittest2.main()
|
Loading…
Reference in New Issue
Block a user