Leonardo Fagundes Luz Serrano 77c6d3109b Tox: Remove upper bound on hacking version
This change is required in order to support
testing on python 3.12

Test Plan:
pass - Run tox -eflake8 in a ubuntu:noble
       container

Change-Id: I427a7cab6c67905e0496cbbe07d4367d080dfb58
Signed-off-by: Leonardo Fagundes Luz Serrano <Leonardo.FagundesLuzSerrano@windriver.com>
2024-11-12 18:36:32 -03:00

364 lines
10 KiB
Python
Executable File

#!/usr/bin/python3
#
# SPDX-License-Identifier: Apache-2.0
#
# Copyright (c) 2023 Wind River Systems, Inc.
#
import ipaddress
import os
import sys
import yaml
# Interface names are limited to 15 characters. 5 are reserved for VLANS
# ('.####'), it also add a single digit for the four enumerated bridges
# of the virtual lab. Meaning that 9 characters are left for the end
# user to choose.
BRIDGE_LIMIT = 9
# Fields that should be set in a Yaml configuration file
SUPPORTED_HOST_KEYS = ['disk', 'cpu', 'mem']
GEN_FIELDS = ('bridge_interface', 'controller', 'worker',
'domain_dir', 'storage', 'default_disk')
IP_FIELDS = ('ext_network', 'ext_IP')
HOST_FIELDS = ('controllerlist', 'workerlist', 'storagelist')
NUMBER_FIELDS = ('aio_default_cpu', 'aio_default_mem', 'default_cpu',
'default_mem')
NODES_NUM = ('worker_nodes_num', 'storage_nodes_num')
FIELDS = NODES_NUM + GEN_FIELDS + IP_FIELDS + HOST_FIELDS + NUMBER_FIELDS
HELP_TEXT = """
use:
./config.py <config_file> <key>
./config.py <config_file> <list key> <index> <key>
./config.py --validate <config_file>
"""
def number_validate(value, field):
error = 0
if not isinstance(value, int):
print('%s not valid for %s' % (value, field), file=sys.stderr)
error += 1
return error
# Allows the user to see how this script is used
def print_help():
print(HELP_TEXT, file=sys.stderr)
# Checks if there are any unexpected fields in the file
def existence_check(data):
for key in data:
if key not in FIELDS:
print('unexpected field: %s' % key, file=sys.stderr)
def host_key_check(listdata, printname):
# listdata is a list of hosts
index = 0
error = 0
for host in listdata:
index += 1
# each host in the list is a dictionary of keys
if type(host) is not dict:
print('list item %s in %s is not dictionary' %
(index, printname), file=sys.stderr)
error += 1
continue
for key in host.keys():
if key not in SUPPORTED_HOST_KEYS:
print('unexpected field %s in host %s of %s' %
(key, index, printname), file=sys.stderr)
return error
# Iterated through lists and returns specific fields
def item_from_list(data, fields, num):
item = None
try:
listdata = data[fields[0]]
key2 = fields[2]
if key2 in SUPPORTED_HOST_KEYS:
try:
item = listdata[num][key2]
except (IndexError, KeyError):
# Node index is not config yaml
# Specified data key is not in config yaml
if key2 == 'disk':
item = data['default_disk']
else:
print('ValueError: key %s is not supported' % key2,
file=sys.stderr)
sys.exit(1)
except TypeError as e:
print('Sanity: incorrect key type for list or dictionary read:'
' % s ' % e, file=sys.stderr)
return item
# Checks if the input is valid path
def check_path(value, field):
if not isinstance(value, str):
print(field + ' does not contain string value for path',
file=sys.stderr)
return 1
if not os.path.exists(value):
print(field + ' does not contain valid path', file=sys.stderr)
return 1
return 0
# Validation for checking cpu and memory values
def host_key_validate(data, field, value):
errnum = 0
for i in range(len(value)):
for key in SUPPORTED_HOST_KEYS:
fieldlist = [field, i, key]
item = item_from_list(data, fieldlist, i)
if key == 'disk':
err = check_path(item, field)
if not err == 0:
print('%s index %s has bad path' % (field, i),
file=sys.stderr)
errnum += 1
elif key in ('cpu', 'mem'):
if not isinstance(item, int):
print('%s is not valid for %s in %s' %
(item, key, field), file=sys.stderr)
errnum += 1
return errnum
# Validation for checking general fields
def general_validate(value, field):
if (field == 'bridge_interface' and len(value) > BRIDGE_LIMIT):
print(field + ' variable too long', file=sys.stderr)
return 1
if (field == 'default_disk'):
return (check_path(value, field))
if value.isalnum() is False:
print(field + ' is not alphanumerical', file=sys.stderr)
return 1
return 0
# Validation for checking IP addresses
def ip_validate(value, field):
try:
IP, netmask = value.split('/')
except ValueError:
print(field + ' does not look like IP/mask', file=sys.stderr)
return 1
try:
netmask = int(netmask)
except ValueError:
print(field + ': not a valid netmask', file=sys.stderr)
return 1
try:
ipaddress.ip_address(IP)
except ValueError:
print(field + ' is not a valid IP address', file=sys.stderr)
return 1
return 0
# Validation for checking paths
def host_validate(value, field, data):
errnum = 0
if isinstance(value, list):
# This is a host list, check if the keys are recognized
errnum += host_key_check(value, field)
if errnum:
return errnum
# validate each recognized key
errnum += host_key_validate(data, field, value)
else:
print(field + ' is not a list',
file=sys.stderr)
errnum += 1
return errnum
# Validation for the amount of nodes set
def nodes_validate(value, field, data):
nodes_num = value
if type(value) is not int:
try:
nodes_num = int(value)
except ValueError:
print(field + ' does not have an integer', file=sys.stderr)
return 1
if (field == 'worker_nodes_num'):
try:
listdata = data['workerlist']
except KeyError:
# this error is printed by key check for workerlist
return 1
if (len(listdata) != (nodes_num + 1)):
print('Amount of worker nodes allocated not equal to'
' amount set', file=sys.stderr)
return 1
if (field == 'storage_nodes_num'):
try:
listdata = data['storagelist']
except KeyError:
# this error is printed by key check for storagelist
return 1
if (len(listdata) != (nodes_num + 1)):
print('Amount of storage nodes allocated not equal to'
' amount set', file=sys.stderr)
return 1
return 0
# Opens the config file and returns its contents
def reader(config_file):
try:
with open(config_file, 'r', encoding="utf-8") as f:
content = yaml.load(f, Loader=yaml.Loader)
return content
except yaml.YAMLError as e:
print('YAMLError: %s' % e, file=sys.stderr)
sys.exit(1)
except OSError as e:
print('OSError: %s' % e, file=sys.stderr)
sys.exit(1)
# Allows the user to check if their configuration values
# exist and are valid
def validator(config_file):
ERROR = 0
data = reader(config_file)
existence_check(data)
# For every field checks if they exist and has various checks to
# ensure all values are valid
for field in FIELDS:
try:
value = data[field]
except KeyError:
print('%s does not exist' % field, file=sys.stderr)
ERROR += 1
continue
if (field in NUMBER_FIELDS):
ERROR += number_validate(value, field)
# Checking that most fields are alphanumerical
if (field in GEN_FIELDS):
ERROR += general_validate(value, field)
# Checking that there are valid IPv4 addresses
elif (field in IP_FIELDS):
# Removing the /24 subnet mask
ERROR += ip_validate(value, field)
elif (field in HOST_FIELDS):
ERROR += host_validate(value, field, data)
# Checking that node counts are numbers
elif (field in NODES_NUM):
ERROR += nodes_validate(value, field, data)
if ERROR > 0:
print('total errors: %s ' % ERROR, file=sys.stderr)
sys.exit(1)
# Allows the user to read any value in their configuration file
def readvalue(config_file, fieldlist):
data = reader(config_file)
if len(fieldlist) == 3:
try:
datalist = data[fieldlist[0]]
except KeyError as e:
print('KeyError: %s' % e, file=sys.stderr)
sys.exit(1)
if isinstance(datalist, list):
try:
num = (int(fieldlist[1]))
except ValueError as e:
print('ValueError: %s' % e, file=sys.stderr)
sys.exit(1)
result = item_from_list(data, fieldlist, num)
if not result:
sys.exit(1)
print(result)
else:
print('TypeError: %s is not list' % fieldlist[0], file=sys.stderr)
sys.exit(1)
elif len(fieldlist) == 1:
try:
value = data[fieldlist[0]]
except (KeyError) as e:
print('KeyError: %s' % e, file=sys.stderr)
sys.exit(1)
print(value)
else:
print_help()
sys.exit(1)
if __name__ == "__main__":
args = sys.argv[1:]
if '--help' in args or not len(args):
print_help()
sys.exit(0)
if '--validate' == args[0]:
if len(args) == 2:
validator(args[1])
sys.exit(0)
else:
print('Validate requires only one parameter: config_file',
file=sys.stderr)
print_help()
sys.exit(1)
if len(args) > 4:
print("Error: too many arguments.",
file=sys.stderr)
print_help()
sys.exit(1)
if len(args) >= 2:
input_file = args[0]
input_keys = args[1:]
readvalue(input_file, input_keys)
elif len(args) < 2:
print("Error: missing required arguments.",
file=sys.stderr)
print_help()
sys.exit(1)