spyglass-plugin-xls/spyglass_plugin_xls/excel.py
Ian H. Pittwood 269ce7154f Standardize spyglass-plugin-xls code with YAPF
Implements documentation updates from [0] to standardize formatting for
all Airship projects.

[0] https://review.opendev.org/#/c/671291/

Change-Id: I723af0d91e24a306f05c06a3ddf456757489f7f0
2019-07-29 16:47:56 -05:00

319 lines
12 KiB
Python

# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
import pprint
import re
from spyglass.data_extractor.base import BaseDataSourcePlugin
from spyglass.data_extractor import models
from spyglass_plugin_xls.excel_parser import ExcelParser
from spyglass_plugin_xls.exceptions import ExcelFileNotSpecified
from spyglass_plugin_xls.exceptions import ExcelSpecNotSpecified
LOG = logging.getLogger(__name__)
class ExcelPlugin(BaseDataSourcePlugin):
def __init__(self, region, **kwargs):
super().__init__(region)
LOG.info("Excel Plugin Initializing")
self.SOURCE_TYPE = "excel"
self.PLUGIN_NAME = "spyglass-plugin-xls"
# Configuration parameters
if 'excel_file' not in kwargs:
raise ExcelFileNotSpecified()
self.excel_path = kwargs["excel_file"]
if 'excel_spec' not in kwargs:
raise ExcelSpecNotSpecified()
self.excel_spec = kwargs["excel_spec"]
# Raw data from excel
self.load_raw_data()
LOG.info("Initiated data extractor plugin:{}".format(self.source_name))
# Implement abstract methods
def load_raw_data(self):
excel_obj = ExcelParser(self.excel_path, self.excel_spec)
self.raw_data = excel_obj.get_data()
def parse_racks(self):
"""Return list of racks in the region
:returns: list of Rack objects
:rtype: list
"""
LOG.info("Get Host Information")
ipmi_data = self.raw_data["ipmi_data"][0]
rackwise_hosts = self._get_rackwise_hosts()
baremetal_racks = []
for rack in rackwise_hosts.keys():
host_list = []
for host_data in rackwise_hosts[rack]:
host = models.Host(
host_data,
rack_name=rack,
host_profile=ipmi_data[host_data]["host_profile"])
host_list.append(host)
baremetal_racks.append(models.Rack(rack, host_list))
return baremetal_racks
def parse_hosts(self, rack=None):
"""Return list of hosts in the region
:param string rack: Rack name
:returns: list of Host objects containing a rack's host data
:rtype: list of models.Host
"""
racks = self.parse_racks()
if rack:
for _rack in racks:
if rack == _rack.name:
return _rack.hosts
else:
host_list = []
for _rack in racks:
host_list.extend(_rack.hosts)
return host_list
def parse_networks(self):
"""Extracts vlan network info from raw network data from excel"""
vlan_list = []
# Network data extracted from xl is formatted to have a predictable
# data type. For e.g VlAN 45 extracted from xl is formatted as 45
vlan_pattern = r"\d+"
private_net = self.raw_data["network_data"]["private"]
public_net = self.raw_data["network_data"]["public"]
# Extract network information from private and public network data
for net_type, net_val in itertools.chain(private_net.items(),
public_net.items()):
tmp_vlan = {}
# Ingress is special network that has no vlan, only a subnet string
# So treatment for ingress is different
if net_type != "ingress":
# standardize the network name as net_type may ne different.
# For e.g insteas of pxe it may be PXE or instead of calico
# it may be ksn. Valid network names are pxe, calico, oob, oam,
# overlay, storage, ingress
tmp_vlan["name"] = self._get_network_name_from_vlan_name(
net_type)
# extract vlan tag. It was extracted from xl file as 'VlAN 45'
# The code below extracts the numeric data fron net_val['vlan']
if net_val.get("vlan", "") != "":
value = re.findall(vlan_pattern, net_val["vlan"])
tmp_vlan["vlan"] = value[0]
else:
tmp_vlan["vlan"] = "#CHANGE_ME"
tmp_vlan["subnet"] = net_val.get("subnet", "#CHANGE_ME")
tmp_vlan["gateway"] = net_val.get("gateway", "#CHANGE_ME")
else:
tmp_vlan["name"] = net_type
tmp_vlan["subnet"] = net_val
vlan_list.append(models.VLANNetworkData(**tmp_vlan))
return vlan_list
def parse_ips(self, host):
"""Return list of IPs on the host
:param string host: Host name
:returns: Dict of IPs per network on the host
:rtype: dict
Example: {'oob': {'ipv4': '192.168.1.10'},
'pxe': {'ipv4': '192.168.2.10'}}
The network name from get_networks is expected to be the keys of this
dict. In case some networks are missed, they are expected to be either
DHCP or internally generated n the next steps by the design rules.
"""
ipmi_data = self.raw_data["ipmi_data"][0]
ip_ = models.IPList(
**{
"oob": ipmi_data[host].get("ipmi_address", "#CHANGE_ME"),
"oam": ipmi_data[host].get("oam", "#CHANGE_ME"),
"calico": ipmi_data[host].get("calico", "#CHANGE_ME"),
"overlay": ipmi_data[host].get("overlay", "#CHANGE_ME"),
"pxe": ipmi_data[host].get("pxe", "#CHANGE_ME"),
"storage": ipmi_data[host].get("storage", "#CHANGE_ME"),
})
return ip_
def parse_ldap_information(self):
"""Extract ldap information from excel"""
ldap_raw_data = self.raw_data["site_info"]["ldap"]
ldap_info = {}
# raw url is 'url: ldap://example.com' so we are converting to
# 'ldap://example.com'
url = ldap_raw_data.get("url", "#CHANGE_ME")
try:
ldap_info["url"] = url.split(" ")[1]
ldap_info["domain"] = url.split(".")[1]
except IndexError as e:
LOG.error("url.split:{}".format(e))
ldap_info["common_name"] = ldap_raw_data.get(
"common_name", "#CHANGE_ME")
ldap_info["subdomain"] = ldap_raw_data.get("subdomain", "#CHANGE_ME")
return ldap_info
def parse_ntp_servers(self):
"""Returns a comma separated list of ntp ip addresses"""
ntp_server_list = self._get_formatted_server_list(
self.raw_data["site_info"]["ntp"])
return ntp_server_list
def parse_dns_servers(self):
"""Returns a comma separated list of dns ip addresses"""
dns_server_list = self._get_formatted_server_list(
self.raw_data["site_info"]["dns"])
return dns_server_list
def parse_domain_name(self):
"""Returns domain name extracted from excel file"""
return self.raw_data["site_info"]["domain"]
def parse_location_information(self):
"""Prepare location data from information extracted
by ExcelParser(i.e raw data)
"""
location_data = self.raw_data["site_info"]["location"]
corridor_pattern = r"\d+"
corridor_number = re.findall(
corridor_pattern, location_data["corridor"])[0]
name = location_data.get("name", "#CHANGE_ME")
state = location_data.get("state", "#CHANGE_ME")
country = location_data.get("country", "#CHANGE_ME")
physical_location_id = location_data.get("physical_location", "")
return {
"name": name,
"physical_location_id": physical_location_id,
"state": state,
"country": country,
"corridor": "c{}".format(corridor_number),
}
# Implement helper functions
@staticmethod
def _get_network_name_from_vlan_name(vlan_name):
"""Network names are ksn, oam, oob, overlay, storage, pxe
This is a utility function to determine the vlan acceptable
vlan from the name extracted from excel file
The following mapping rules apply:
vlan_name contains "ksn or calico" the network name is "calico"
vlan_name contains "storage" the network name is "storage"
vlan_name contains "server" the network name is "oam"
vlan_name contains "ovs" the network name is "overlay"
vlan_name contains "oob" the network name is "oob"
vlan_name contains "pxe" the network name is "pxe"
"""
network_names = [
"ksn|calico",
"storage",
"oam|server",
"ovs|overlay",
"oob",
"pxe",
]
for name in network_names:
# Make a pattern that would ignore case.
# if name is 'ksn' pattern name is '(?i)(ksn)'
name_pattern = "(?i)({})".format(name)
if re.search(name_pattern, vlan_name):
if name == "ksn|calico":
return "calico"
if name == "storage":
return "storage"
if name == "oam|server":
return "oam"
if name == "ovs|overlay":
return "overlay"
if name == "oob":
return "oob"
if name == "pxe":
return "pxe"
# if nothing matches
LOG.error(
"Unable to recognize VLAN name extracted from Plugin data source")
return ""
@staticmethod
def _get_formatted_server_list(server_list):
"""Format dns and ntp server list as comma separated string"""
# dns/ntp server info from excel is of the format
# 'xxx.xxx.xxx.xxx, (aaa.bbb.ccc.com)'
# The function returns a list of comma separated dns ip addresses
servers = []
for data in server_list:
if "(" not in data:
servers.append(data)
formatted_server_list = models.ServerList(servers)
return formatted_server_list
def _get_rack(self, host):
"""Get rack id from the rack string extracted from xl"""
rack_pattern = r"\w.*(r\d+)\w.*"
rack = re.findall(rack_pattern, host)[0]
if not self.region:
self.region = host.split(rack)[0]
return rack
def _get_rackwise_hosts(self):
"""Mapping hosts with rack ids"""
rackwise_hosts = {}
hostnames = self.raw_data["ipmi_data"][1]
racks = self._get_rack_data()
for rack in racks:
if rack not in rackwise_hosts:
rackwise_hosts[racks[rack]] = []
for host in hostnames:
if rack in host:
rackwise_hosts[racks[rack]].append(host)
LOG.debug("rackwise hosts:\n%s", pprint.pformat(rackwise_hosts))
return rackwise_hosts
def _get_rack_data(self):
"""Format rack name"""
LOG.info("Getting rack data")
racks = {}
hostnames = self.raw_data["ipmi_data"][1]
for host in hostnames:
rack = self._get_rack(host)
racks[rack] = rack.replace("r", "rack")
return racks