validations-libs/validations_libs/utils.py

459 lines
14 KiB
Python

# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import datetime
import glob
import logging
import os
import six
import uuid
from pathlib import Path
from os.path import join
from validations_libs import constants
from validations_libs.group import Group
from validations_libs.validation import Validation
LOG = logging.getLogger(__name__ + ".utils")
def current_time():
"""Return current time"""
return '%sZ' % datetime.datetime.utcnow().isoformat()
def _create_log_dir():
"""
"""
try:
log_path = "{}".format(
os.path.join(os.environ.get('HOME'), 'validations'))
os.makedirs(log_path)
except (OSError, PermissionError):
LOG.error(
(
"Error while creating the log directory. "
"Please check the access rights for: '{}'"
).format(log_path)
)
raise RuntimeError()
def get_log_dir(log_path):
"""Get validations log directory.
"""
if not os.path.exists(log_path):
LOG.warning(
(
"Requested log folder unavailable, defaulting to: '{}'"
).format(constants.VALIDATIONS_LOG_BASEDIR)
)
log_path = constants.VALIDATIONS_LOG_BASEDIR
if not os.path.exists(constants.VALIDATIONS_LOG_BASEDIR):
LOG.warning(
(
"Default log folder unavailable, creating: '{}'"
).format(log_path)
)
_create_log_dir()
return log_path
def _setup_artifacts_dir(log_path):
artifact_dir = os.path.join(log_path, 'artifacts')
if not os.path.exists(artifact_dir):
os.makedirs(artifact_dir)
marker = Path(
os.path.join(artifact_dir, '.validationsartifacts'))
marker.touch()
return artifact_dir
def _create_artifacts_subdir(artifacts_dir_path, prefix):
validation_uuid = str(uuid.uuid4())
artifact_sub_dir = "{}_{}_{}".format(
validation_uuid,
prefix,
current_time())
artifact_sub_dir = os.path.join(artifacts_dir_path, artifact_sub_dir)
os.makedirs(artifact_sub_dir)
return validation_uuid, artifact_sub_dir
def get_artifacts_dir(log_dir):
"""Retrieve Ansible artifacts directory
This function contains additional checks for the log directory
access as well as possible fallbacks.
In order, following locations are considered for artifacts directory:
`log_dir` parameter
`validations_libs.constants.VALIDATION_ANSIBLE_ARTIFACT_PATH`
In the last two cases, an error and warning is logged,
to notify the user about remediation, and possible complexities
it may lead to.
:param log_dir: Directory asbolute path
:type log_dir: `string`
:return: The UUID of the validation and the absolute Path of the log file
:rtype: `string`, `string`
"""
try:
artifacts_dir = _setup_artifacts_dir(log_dir)
except (OSError, PermissionError):
LOG.error(
(
"Error while creating Ansible artifacts directory. "
"Please check the access rights for: '{}'"
).format(log_dir)
)
artifacts_dir = _setup_artifacts_dir(
constants.VALIDATION_ANSIBLE_ARTIFACT_PATH)
LOG.warning(
(
"Selected artifact folder unavailable, defaulting to: '{}'"
).format(artifacts_dir)
)
return artifacts_dir
def get_results_dirs(log_path):
log_path = get_log_dir(log_path)
artifacts_path = get_artifacts_dir(log_path)
return log_path, artifacts_path
def get_artifacts_subdir(log_dir, prefix=''):
"""Create Ansible artifacts directory
This function contains additional checks for the log directory
access as well as possible fallbacks.
In order, following locations are considered for artifacts directory:
`dir_path` parameter
`validations_libs.constants.VALIDATION_ANSIBLE_ARTIFACT_PATH`
`$HOME/log/validations/artifacts`
`/tmp/log/validations/artifacts`
In the last two cases, an error and warning is logged,
to notify the user about remediation, and possible complexities
it may lead to.
:param dir_path: Directory asbolute path
:type dir_path: `string`
:param prefix: Playbook name
:type prefix: `string`
:return: The UUID of the validation and the absolute Path of the log file
:rtype: `string`, `string`
"""
try:
validation_uuid, artifacts_dir = _create_artifacts_subdir(
log_dir,
prefix)
except (OSError, PermissionError):
LOG.exception(
(
"Error while creating Ansible artifacts directory. "
"Please check the access rights for: '{}'"
).format(log_dir)
)
raise RuntimeError()
return validation_uuid, artifacts_dir
def parse_all_validations_on_disk(path, groups=None):
"""Return a list of validations metadata which can be sorted by Groups
:param path: The absolute path of the validations directory
:type path: `string`
:param groups: Groups of validations. Could be a `list` or a
comma-separated `string` of groups
:return: A list of validations metadata.
:rtype: `list`
:Example:
>>> path = '/foo/bar'
>>> parse_all_validations_on_disk(path)
[{'description': 'Detect whether the node disks use Advanced Format.',
'groups': ['prep', 'pre-deployment'],
'id': '512e',
'name': 'Advanced Format 512e Support'},
{'description': 'Make sure that the server has enough CPU cores.',
'groups': ['prep', 'pre-introspection'],
'id': 'check-cpu',
'name': 'Verify if the server fits the CPU core requirements'}]
"""
results = []
if not groups:
groups = []
else:
groups = convert_data(groups)
validations_abspath = glob.glob("{path}/*.yaml".format(path=path))
for pl in validations_abspath:
val = Validation(pl)
if not groups or set(groups).intersection(val.groups):
results.append(val.get_metadata)
return results
def get_validations_playbook(path, validation_id=None, groups=None):
"""Get a list of validations playbooks paths either by their names
or their groups
:param path: Path of the validations playbooks
:type path: `string`
:param validation_id: List of validation name
:type validation_id: `list` or a `string` of comma-separated validations
:param groups: List of validation group
:type groups: `list` or a `string` of comma-separated groups
:return: A list of absolute validations playbooks path
:rtype: `list`
:Example:
>>> path = '/usr/share/validation-playbooks'
>>> validation_id = ['512e','check-cpu']
>>> groups = None
>>> get_validations_playbook(path, validation_id, groups)
['/usr/share/ansible/validation-playbooks/512e.yaml',
'/usr/share/ansible/validation-playbooks/check-cpu.yaml',]
"""
if not validation_id:
validation_id = []
else:
validation_id = convert_data(validation_id)
if not groups:
groups = []
else:
groups = convert_data(groups)
pl = []
for f in os.listdir(path):
pl_path = join(path, f)
if os.path.isfile(pl_path):
if validation_id:
if os.path.splitext(f)[0] in validation_id or \
os.path.basename(f) in validation_id:
pl.append(pl_path)
if groups:
val = Validation(pl_path)
if set(groups).intersection(val.groups):
pl.append(pl_path)
return pl
def get_validation_parameters(validation):
"""Return dictionary of parameters"""
return Validation(validation).get_vars
def read_validation_groups_file(groups_path=None):
"""Load groups.yaml file and return a dictionary with its contents
:params groups_path: The path the groups.yaml file
:type groups_path: `string`
:return: The group list with their descriptions
:rtype: `dict`
:Example:
>>> read_validation_groups_file()
{'group1': [{'description': 'Group1 description.'}],
'group2': [{'description': 'Group2 description.'}]}
"""
gp = Group((groups_path if groups_path else
constants.VALIDATION_GROUPS_INFO))
return gp.get_data
def get_validation_group_name_list(groups_path=None):
"""Get the validation group name list only
:params groups_path: The path the groups.yaml file
:type groups_path: `string`
:return: The group name list
:rtype: `list`
:Example:
>>> get_validation_group_name_list()
['group1',
'group2',
'group3',
'group4']
"""
gp = Group((groups_path if groups_path else
constants.VALIDATION_GROUPS_INFO))
return gp.get_groups_keys_list
def get_validations_details(validation):
"""Return information details for a validation
:param validation: Name of the validation
:type validation: `string`
:return: The information of the validation
:rtype: `dict`
:raises: a `TypeError` exception if `validation` is not a string
:Example:
>>> validation = "check-something"
>>> get_validations_details(validation)
{'description': 'Verify that the server has enough something.',
'groups': ['group1', 'group2'],
'id': 'check-something',
'name': 'Verify the server fits the something requirements'}
"""
if not isinstance(validation, six.string_types):
raise TypeError("The input data should be a String")
results = parse_all_validations_on_disk(constants.ANSIBLE_VALIDATION_DIR)
for r in results:
if r['id'] == validation:
return r
return {}
def get_validations_data(validation, path=constants.ANSIBLE_VALIDATION_DIR):
"""Return validation data with format:
ID, Name, Description, Groups, Parameters
:param validation: Name of the validation without the `yaml` extension.
Defaults to `constants.ANSIBLE_VALIDATION_DIR`
:type validation: `string`
:param path: The path to the validations directory
:type path: `string`
:return: The validation data with the format
(ID, Name, Description, Groups, Parameters)
:rtype: `dict`
:Example:
>>> validation = 'check-something'
>>> get_validations_data(validation)
{'Description': 'Verify that the server has enough something',
'Groups': ['group1', 'group2'],
'ID': 'check-something',
'Name': 'Verify the server fits the something requirements',
'Parameters': {'param1': 24}}
"""
if not isinstance(validation, six.string_types):
raise TypeError("The input data should be a String")
data = {}
val_path = "{}/{}.yaml".format(path, validation)
if os.path.exists(val_path):
val = Validation(val_path)
data.update(val.get_formated_data)
data.update({'Parameters': val.get_vars})
return data
def get_validations_parameters(validations_data, validation_name=[],
groups=[]):
"""Return parameters for a list of validations
:param validations_data: A list of absolute validations playbooks path
:type validations_data: `list`
:param validation_name: A list of validation name
:type validation_name: `list`
:param groups: A list of validation groups
:type groups: `list`
:return: a dictionary containing the current parameters for
each `validation_name` or `groups`
:rtype: `dict`
:Example:
>>> validations_data = ['/foo/bar/check-ram.yaml',
'/foo/bar/check-cpu.yaml']
>>> validation_name = ['check-ram', 'check-cpu']
>>> get_validations_parameters(validations_data, validation_name)
{'check-cpu': {'parameters': {'minimal_cpu_count': 8}},
'check-ram': {'parameters': {'minimal_ram_gb': 24}}}
"""
params = {}
for val in validations_data:
v = Validation(val)
if v.id in validation_name or set(groups).intersection(v.groups):
params[v.id] = {
'parameters': v.get_vars
}
return params
def convert_data(data=''):
"""Transform a string containing comma-separated validation or group name
into a list. If `data` is already a list, it will simply return `data`.
:param data: A string or a list
:type data: `string` or `list`
:return: A list of data
:rtype: `list`
:raises: a `TypeError` exception if `data` is not a list or a string
:Example:
>>> data = "check-cpu,check-ram,check-disk-space"
>>> convert_data(data)
['check-cpu', 'check-ram', 'check-disk-space']
...
>>> data = "check-cpu , check-ram , check-disk-space"
>>> convert_data(data)
['check-cpu', 'check-ram', 'check-disk-space']
...
>>> data = "check-cpu,"
>>> convert_data(data)
['check-cpu']
...
>>> data = ['check-cpu', 'check-ram', 'check-disk-space']
>>> convert_data(data)
['check-cpu', 'check-ram', 'check-disk-space']
"""
if isinstance(data, six.string_types):
return [
conv_data.strip() for conv_data in data.split(',') if conv_data
]
elif not isinstance(data, list):
raise TypeError("The input data should be either a List or a String")
else:
return data