A collection of python libraries for the Validation Framework
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

318 lines
10 KiB

# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import glob
import json
import logging
import os
import six
import shutil
import tempfile
import yaml
#from prettytable import PrettyTable
from validations_libs import constants
RED = "\033[1;31m"
GREEN = "\033[0;32m"
RESET = "\033[0;0m"
FAILED_VALIDATION = "{}FAILED{}".format(RED, RESET)
PASSED_VALIDATION = "{}PASSED{}".format(GREEN, RESET)
LOG = logging.getLogger(__name__ + ".utils")
class Pushd(object):
"""Simple context manager to change directories and then return."""
def __init__(self, directory):
"""This context manager will enter and exit directories.
>>> with Pushd(directory='/tmp'):
... with open('file', 'w') as f:
... f.write('test')
:param directory: path to change directory to
:type directory: `string`
"""
self.dir = directory
self.pwd = self.cwd = os.getcwd()
def __enter__(self):
os.chdir(self.dir)
self.cwd = os.getcwd()
return self
def __exit__(self, *args):
if self.pwd != self.cwd:
os.chdir(self.pwd)
class TempDirs(object):
"""Simple context manager to manage temp directories."""
def __init__(self, dir_path=None, dir_prefix='validations', cleanup=True,
chdir=True):
"""This context manager will create, push, and cleanup temp directories.
>>> with TempDirs() as t:
... with open('file', 'w') as f:
... f.write('test')
... print(t)
... os.mkdir('testing')
... with open(os.path.join(t, 'file')) as w:
... print(w.read())
... with open('testing/file', 'w') as f:
... f.write('things')
... with open(os.path.join(t, 'testing/file')) as w:
... print(w.read())
:param dir_path: path to create the temp directory
:type dir_path: `string`
:param dir_prefix: prefix to add to a temp directory
:type dir_prefix: `string`
:param cleanup: when enabled the temp directory will be
removed on exit.
:type cleanup: `boolean`
:param chdir: Change to/from the created temporary dir on enter/exit.
:type chdir: `boolean`
"""
# NOTE(cloudnull): kwargs for tempfile.mkdtemp are created
# because args are not processed correctly
# in py2. When we drop py2 support (cent7)
# these args can be removed and used directly
# in the `tempfile.mkdtemp` function.
tempdir_kwargs = dict()
if dir_path:
tempdir_kwargs['dir'] = dir_path
if dir_prefix:
tempdir_kwargs['prefix'] = dir_prefix
self.dir = tempfile.mkdtemp(**tempdir_kwargs)
self.pushd = Pushd(directory=self.dir)
self.cleanup = cleanup
self.chdir = chdir
def __enter__(self):
if self.chdir:
self.pushd.__enter__()
return self.dir
def __exit__(self, *args):
if self.chdir:
self.pushd.__exit__()
if self.cleanup:
self.clean()
else:
LOG.warning("Not cleaning temporary directory "
"[ %s ]" % self.dir)
def clean(self):
shutil.rmtree(self.dir, ignore_errors=True)
LOG.info("Temporary directory [ %s ] cleaned up" % self.dir)
def parse_all_validations_on_disk(path, groups=None):
results = []
validations_abspath = glob.glob("{path}/*.yaml".format(path=path))
for pl in validations_abspath:
validation_id, ext = os.path.splitext(os.path.basename(pl))
with open(pl, 'r') as val_playbook:
contents = yaml.safe_load(val_playbook)
validation_groups = get_validation_metadata(contents, 'groups') or []
if not groups or set.intersection(set(groups), set(validation_groups)):
results.append({
'id': validation_id,
'name': get_validation_metadata(contents, 'name'),
'groups': get_validation_metadata(contents, 'groups'),
'description': get_validation_metadata(contents,
'description'),
'parameters': get_validation_parameters(contents)
})
return results
def parse_all_validation_groups_on_disk(groups_file_path=None):
results = []
if not groups_file_path:
groups_file_path = constants.VALIDATION_GROUPS_INFO
if not os.path.exists(groups_file_path):
return results
with open(groups_file_path, 'r') as grps:
contents = yaml.safe_load(grps)
for grp_name, grp_desc in sorted(contents.items()):
results.append((grp_name, grp_desc[0].get('description')))
return results
def get_validation_metadata(validation, key):
default_metadata = {
'name': 'Unnamed',
'description': 'No description',
'stage': 'No stage',
'groups': [],
}
try:
return validation[0]['vars']['metadata'].get(key,
default_metadata[key])
except KeyError:
LOG.exception("Key '{key}' not even found in "
"default metadata").format(key=key)
except TypeError:
LOG.exception("Failed to get validation metadata.")
def get_validation_parameters(validation):
try:
return {
k: v
for k, v in validation[0]['vars'].items()
if k != 'metadata'
}
except KeyError:
LOG.debug("No parameters found for this validation")
return dict()
def read_validation_groups_file(groups_file_path=None):
"""Load groups.yaml file and return a dictionary with its contents"""
if not groups_file_path:
groups_file_path = constants.VALIDATION_GROUPS_INFO
if not os.path.exists(groups_file_path):
return []
with open(groups_file_path, 'r') as grps:
contents = yaml.safe_load(grps)
return contents
def get_validation_group_name_list():
"""Get the validation group name list only"""
results = []
groups = read_validation_groups_file()
if groups and isinstance(dict, groups):
for grp_name in six.viewkeys(groups):
results.append(grp_name)
return results
def get_new_validations_logs_on_disk():
"""Return a list of new log execution filenames """
files = []
for root, dirs, filenames in os.walk(constants.VALIDATIONS_LOG_BASEDIR):
files = [
f for f in filenames if not f.startswith('processed')
and os.path.splitext(f)[1] == '.json'
]
return files
#def get_results(results):
# """Get validations results and return as PrettytTable format"""
# new_log_files = get_new_validations_logs_on_disk()
# for i in new_log_files:
# val_id = "{}.yaml".format(i.split('_')[1])
# for res in results:
# if res['validation'].get('validation_id') == val_id:
# res['validation']['logfile'] = \
# os.path.join(constants.VALIDATIONS_LOG_BASEDIR, i)
# t = PrettyTable(border=True, header=True, padding_width=1)
# t.field_names = [
# "UUID", "Validations", "Status", "Host Group(s)",
# "Status by Host", "Unreachable Host(s)", "Duration"]
# for validation in results:
# r = []
# logfile = validation['validation'].get('logfile', None)
# if logfile and os.path.exists(logfile):
# with open(logfile, 'r') as val:
# contents = json.load(val)
# for i in contents['plays']:
# host = [
# x.encode('utf-8')
# for x in i['play'].get('host').split(', ')
# ]
# val_id = i['play'].get('validation_id')
# time_elapsed = \
# i['play']['duration'].get('time_elapsed', None)
# r.append(contents['plays'][0]['play'].get('id'))
# r.append(val_id)
# if validation['validation'].get('status') == "PASSED":
# r.append(PASSED_VALIDATION)
# else:
# r.append(FAILED_VALIDATION)
# unreachable_hosts = []
# hosts_result = []
# for h in list(contents['stats'].keys()):
# ht = h.encode('utf-8')
# if contents['stats'][ht]['unreachable'] != 0:
# unreachable_hosts.append(ht)
# elif contents['stats'][ht]['failures'] != 0:
# hosts_result.append("{}{}{}".format(
# RED, ht, RESET))
# else:
# hosts_result.append("{}{}{}".format(
# GREEN, ht, RESET))
# r.append(", ".join(host))
# r.append(", ".join(hosts_result))
# r.append("{}{}{}".format(RED,
# ", ".join(unreachable_hosts),
# RESET))
# r.append(time_elapsed)
# t.add_row(r)
# t.sortby = "UUID"
# for field in t.field_names:
# if field == "Status":
# t.align['Status'] = "l"
# else:
# t.align[field] = "l"
# print(t)
# if len(new_log_files) > len(results):
# LOG.warn('Looks like we have more log files than '
# 'executed validations')
# for i in new_log_files:
# os.rename(
# "{}/{}".format(constants.VALIDATIONS_LOG_BASEDIR,
# i), "{}/processed_{}".format(
# constants.VALIDATIONS_LOG_BASEDIR, i))