compass-core/compass/hdsdiscovery/base.py
grace.yu f7508f3f33 Add test cases for hdsdiscovery and poll_switch
Change-Id: I04dec54881feefb8c127705f4ed59e950fc9ba23
2014-03-18 10:28:37 -07:00

182 lines
5.7 KiB
Python

# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base class extended by specific vendor in vendors directory.
A vendor needs to implement abstract methods of base class.
"""
import logging
import re
from abc import ABCMeta
from compass.hdsdiscovery.error import TimeoutError
from compass.hdsdiscovery import utils
class BaseVendor(object):
"""Basic Vendor object."""
__metaclass__ = ABCMeta
def is_this_vendor(self, sys_info, **kwargs):
"""Determine if the host is associated with this vendor.
This function must be implemented by vendor itself
"""
raise NotImplementedError
class BaseSnmpVendor(BaseVendor):
"""Base SNMP-based vendor plugin.
.. note::
It uses MIB-II sysDescr value to determine the vendor of the switch.
"""
def __init__(self, matched_names):
super(BaseSnmpVendor, self).__init__()
self._matched_names = matched_names
def is_this_vendor(self, sys_info, **kwargs):
"""Determine if the switch belongs to this vendor by matching the
system information retrieved from the switch.
:param str sys_info: the system information retrieved from a switch
Return True
"""
if sys_info:
for name in self._matched_names:
if re.search(r"\b" + re.escape(name) + r"\b", sys_info,
re.IGNORECASE):
return True
return False
class BasePlugin(object):
"""Extended by vendor's plugin, which processes request and
retrieve info directly from the switch.
"""
__metaclass__ = ABCMeta
def process_data(self, oper='SCAN', **kwargs):
"""Each vendors will have some plugins to do some operations.
Plugin will process request data and return expected result.
:param oper: operation function name.
:param kwargs: key-value pairs of arguments
"""
raise NotImplementedError
# At least one of these three functions below must be implemented.
def scan(self, **kwargs):
"""Get multiple records at once."""
pass
def set(self, **kwargs):
"""Set value to desired variable."""
pass
def get(self, **kwargs):
"""Get one record from a host."""
pass
class BaseSnmpMacPlugin(BasePlugin):
"""Base snmp plugin."""
def __init__(self, host, credential, oid='BRIDGE-MIB::dot1dTpFdbPort',
vlan_oid='Q-BRIDGE-MIB::dot1qPvid'):
super(BaseSnmpMacPlugin, self).__init__()
self.host = host
self.credential = credential
self.oid = oid
self.port_oid = 'ifName'
self.vlan_oid = vlan_oid
def process_data(self, oper='SCAN', **kwargs):
"""progress data."""
func_name = oper.lower()
return getattr(self, func_name)(**kwargs)
def scan(self, **kwargs):
"""scan."""
results = None
try:
results = utils.snmpwalk_by_cl(self.host, self.credential,
self.oid)
except TimeoutError as error:
logging.debug("PluginMac:scan snmpwalk_by_cl failed: %s",
error.message)
return None
mac_list = []
for entity in results:
if_index = entity['value']
if entity and int(if_index):
tmp = {}
mac_numbers = entity['iid'].split('.')
tmp['mac'] = self.get_mac_address(mac_numbers)
tmp['port'] = self.get_port(if_index)
tmp['vlan'] = self.get_vlan_id(if_index)
mac_list.append(tmp)
return mac_list
def get_vlan_id(self, port):
"""Get vlan Id."""
if not port:
return None
oid = '.'.join((self.vlan_oid, port))
vlan_id = None
result = None
try:
result = utils.snmpget_by_cl(self.host, self.credential, oid)
except TimeoutError as error:
logging.debug("[PluginMac:get_vlan_id snmpget_by_cl failed: %s]",
error.message)
return None
vlan_id = result.split()[-1]
return vlan_id
def get_port(self, if_index):
"""Get port number."""
if_name = '.'.join((self.port_oid, if_index))
result = None
try:
result = utils.snmpget_by_cl(self.host, self.credential, if_name)
except TimeoutError as error:
logging.debug("[PluginMac:get_port snmpget_by_cl failed: %s]",
error.message)
return None
# A result may be like "Value: FasterEthernet1/2/34
port = result.split()[-1].split('/')[-1]
return port
def convert_to_hex(self, value):
"""Convert the integer from decimal to hex."""
return "%0.2x" % int(value)
def get_mac_address(self, mac_numbers):
"""Assemble mac address from the list."""
if len(mac_numbers) != 6:
logging.error("[PluginMac:get_mac_address] MAC address must be "
"6 digitals")
return None
mac_in_hex = [self.convert_to_hex(num) for num in mac_numbers]
return ":".join(mac_in_hex)