Browse Source

Add run and ansible_runner execution

changes/53/712253/1
Mathieu Bultel 2 years ago
parent
commit
138ef53e5b
  1. 352
      validations_libs/ansible.py
  2. 95
      validations_libs/run.py
  3. 225
      validations_libs/utils.py

352
validations_libs/ansible.py

@ -0,0 +1,352 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import ansible_runner
import logging
import os
import six
import tempfile
import yaml
from six.moves import configparser
from validations_libs import constants
from validations_libs import utils
LOG = logging.getLogger(__name__ + ".ansible")
class Ansible(object):
def __init__(self):
self.log = logging.getLogger(__name__ + ".Ansible")
def _playbook_check(self, play):
"""Check if playbook exist"""
if not os.path.exists(play):
play = os.path.join(playbook_dir, play)
if not os.path.exists(play):
raise RuntimeError('No such playbook: {}'.format(play))
self.log.debug('Ansible playbook {} found'.format(play))
return play
def _inventory(self, inventory):
"""Handle inventory for Ansible"""
if inventory:
if isinstance(inventory, six.string_types):
# check is file path
if os.path.exists(inventory):
return inventory
elif isinstance(inventory, dict):
inventory = yaml.safe_dump(
inventory,
default_flow_style=False
)
return ansible_runner.utils.dump_artifact(
inventory,
ansible_artifact_path,
'hosts'
)
def _creates_ansible_fact_dir(self,
temp_suffix='validagions-libs-ansible'):
"""Creates ansible fact dir"""
ansible_fact_path = os.path.join(
os.path.join(
tempfile.gettempdir(),
temp_suffix
),
'fact_cache'
)
try:
os.makedirs(ansible_fact_path)
return ansible_fact_path
except FileExistsError:
self.log.debug(
'Directory "{}" was not created because it'
' already exists.'.format(
ansible_fact_path
)
)
def _get_extra_vars(self, extra_vars):
"""Manage extra_vars into a dict"""
extravars = dict()
if extra_vars:
if isinstance(extra_vars, dict):
extravars.update(extra_vars)
elif os.path.exists(extra_vars) and os.path.isfile(extra_vars):
with open(extra_vars) as f:
extravars.update(yaml.safe_load(f.read()))
return extravars
def _callback_whitelist(self, callback_whitelist):
"""Set callback whitelist"""
if callback_whitelist:
callback_whitelist = ','.join([callback_whitelist, output_callback])
else:
callback_whitelist = output_callback
return ','.join([callback_whitelist, 'profile_tasks'])
def _ansible_env_var(self, output_callback, ssh_user, workdir, connection,
gathering_policy, module_path, key,
extra_env_variables):
"""Handle Ansible env var for Ansible config execution"""
cwd = os.getcwd()
env = os.environ.copy()
env['ANSIBLE_SSH_ARGS'] = (
'-o UserKnownHostsFile={} '
'-o StrictHostKeyChecking=no '
'-o ControlMaster=auto '
'-o ControlPersist=30m '
'-o ServerAliveInterval=64 '
'-o ServerAliveCountMax=1024 '
'-o Compression=no '
'-o TCPKeepAlive=yes '
'-o VerifyHostKeyDNS=no '
'-o ForwardX11=no '
'-o ForwardAgent=yes '
'-o PreferredAuthentications=publickey '
'-T'
).format(os.devnull)
env['ANSIBLE_DISPLAY_FAILED_STDERR'] = True
env['ANSIBLE_FORKS'] = 36
env['ANSIBLE_TIMEOUT'] = ansible_timeout
env['ANSIBLE_GATHER_TIMEOUT'] = 45
env['ANSIBLE_SSH_RETRIES'] = 3
env['ANSIBLE_PIPELINING'] = True
env['ANSIBLE_REMOTE_USER'] = ssh_user
env['ANSIBLE_STDOUT_CALLBACK'] = output_callback
env['ANSIBLE_LIBRARY'] = os.path.expanduser(
'~/.ansible/plugins/modules:'
'{}:{}:'
'/usr/share/ansible/plugins/modules:'
'/usr/share/ceph-ansible/library:'
'{}/library'.format(
os.path.join(workdir, 'modules'),
os.path.join(cwd, 'modules'),
constants.DEFAULT_VALIDATIONS_BASEDIR
)
)
env['ANSIBLE_LOOKUP_PLUGINS'] = os.path.expanduser(
'~/.ansible/plugins/lookup:'
'{}:{}:'
'/usr/share/ansible/plugins/lookup:'
'/usr/share/ceph-ansible/plugins/lookup:'
'{}/lookup_plugins'.format(
os.path.join(workdir, 'lookup'),
os.path.join(cwd, 'lookup'),
constants.DEFAULT_VALIDATIONS_BASEDIR
)
)
env['ANSIBLE_CALLBACK_PLUGINS'] = os.path.expanduser(
'~/.ansible/plugins/callback:'
'{}:{}:'
'/usr/share/ansible/plugins/callback:'
'/usr/share/ceph-ansible/plugins/callback:'
'{}/callback_plugins'.format(
os.path.join(workdir, 'callback'),
os.path.join(cwd, 'callback'),
constants.DEFAULT_VALIDATIONS_BASEDIR
)
)
env['ANSIBLE_ACTION_PLUGINS'] = os.path.expanduser(
'~/.ansible/plugins/action:'
'{}:{}:'
'/usr/share/ansible/plugins/action:'
'/usr/share/ceph-ansible/plugins/actions:'
'{}/action_plugins'.format(
os.path.join(workdir, 'action'),
os.path.join(cwd, 'action'),
constants.DEFAULT_VALIDATIONS_BASEDIR
)
)
env['ANSIBLE_FILTER_PLUGINS'] = os.path.expanduser(
'~/.ansible/plugins/filter:'
'{}:{}:'
'/usr/share/ansible/plugins/filter:'
'/usr/share/ceph-ansible/plugins/filter:'
'{}/filter_plugins'.format(
os.path.join(workdir, 'filter'),
os.path.join(cwd, 'filter'),
constants.DEFAULT_VALIDATIONS_BASEDIR
)
)
env['ANSIBLE_ROLES_PATH'] = os.path.expanduser(
'~/.ansible/roles:'
'{}:{}:'
'/usr/share/ansible/roles:'
'/usr/share/ceph-ansible/roles:'
'/etc/ansible/roles:'
'{}/roles'.format(
os.path.join(workdir, 'roles'),
os.path.join(cwd, 'roles'),
constants.DEFAULT_VALIDATIONS_BASEDIR
)
)
env['ANSIBLE_CALLBACK_WHITELIST'] = callback_whitelist
env['ANSIBLE_RETRY_FILES_ENABLED'] = False
env['ANSIBLE_HOST_KEY_CHECKING'] = False
env['ANSIBLE_TRANSPORT'] = connection
env['ANSIBLE_CACHE_PLUGIN_TIMEOUT'] = 7200
if connection == 'local':
env['ANSIBLE_PYTHON_INTERPRETER'] = sys.executable
if gathering_policy in ('smart', 'explicit', 'implicit'):
env['ANSIBLE_GATHERING'] = gathering_policy
if module_path:
env['ANSIBLE_LIBRARY'] = ':'.join(
[env['ANSIBLE_LIBRARY'], module_path]
)
try:
user_pwd = pwd.getpwuid(int(os.getenv('SUDO_UID', os.getuid())))
except TypeError:
home = os.path.expanduser('~')
else:
home = user_pwd.pw_dir
env['ANSIBLE_LOG_PATH'] = os.path.join(home, 'ansible.log')
if key:
env['ANSIBLE_PRIVATE_KEY_FILE'] = key
if extra_env_variables:
if not isinstance(extra_env_variables, dict):
msg = "extra_env_variables must be a dict"
self.log.error(msg)
raise SystemError(msg)
else:
env.update(extra_env_variables)
return env
def _encode_envvars(self, env):
"""Encode a hash of values.
:param env: A hash of key=value items.
:type env: `dict`.
"""
for key, value in env.items():
env[key] = six.text_type(value)
else:
return env
def run(self, playbook, inventory, workdir, playbook_dir=None,
connection='smart', output_callback='yaml',
ssh_user='root', key=None, module_path=None,
limit_hosts=None, tags=None, skip_tags=None,
verbosity=0, quiet=False, extra_vars=None,
gathering_policy='smart',
extra_env_variables=None, parallel_run=False,
callback_whitelist=None, ansible_cfg=None,
ansible_timeout=30, reproduce_command=False,
fail_on_rc=True):
if not playbook_dir:
playbook_dir = workdir
playbook = self._playbook_check(play=playbook)
self.log.info(
'Running Ansible playbook: {},'
' Working directory: {},'
' Playbook directory: {}'.format(
playbook,
workdir,
playbook_dir
)
)
ansible_fact_path = self._creates_ansible_fact_dir()
extravars = self._get_extra_vars(extra_vars)
callback_whitelist = self._callback_whitelist(callback_whitelist)
# Set ansible environment variables
env = _ansible_env_var(output_callback, ssh_user, workdir, connection,
gathering_policy, module_path, key,
extra_env_variables)
command_path = None
with utils.TempDirs(chdir=False) as ansible_artifact_path:
if 'ANSIBLE_CONFIG' not in env and not ansible_cfg:
ansible_cfg = os.path.join(ansible_artifact_path, 'ansible.cfg')
config = configparser.ConfigParser()
config.add_section('defaults')
config.set('defaults', 'internal_poll_interval', '0.05')
with open(ansible_cfg, 'w') as f:
config.write(f)
env['ANSIBLE_CONFIG'] = ansible_cfg
elif 'ANSIBLE_CONFIG' not in env and ansible_cfg:
env['ANSIBLE_CONFIG'] = ansible_cfg
r_opts = {
'private_data_dir': workdir,
'project_dir': playbook_dir,
'inventory': self._inventory(inventory),
'envvars': self._encode_envvars(env=env),
'playbook': playbook,
'verbosity': verbosity,
'quiet': quiet,
'extravars': extravars,
'fact_cache': ansible_fact_path,
'fact_cache_type': 'jsonfile',
'artifact_dir': ansible_artifact_path,
'rotate_artifacts': 256
}
if skip_tags:
r_opts['skip_tags'] = skip_tags
if tags:
r_opts['tags'] = tags
if limit_hosts:
r_opts['limit'] = limit_hosts
if parallel_run:
r_opts['directory_isolation_base_path'] = ansible_artifact_path
runner_config = ansible_runner.runner_config.RunnerConfig(**r_opts)
runner_config.prepare()
# NOTE(cloudnull): overload the output callback after prepare
# to define the specific format we want.
# This is only required until PR
# https://github.com/ansible/ansible-runner/pull/387
# is merged and released. After this PR has been
# made available to us, this line should be removed.
runner_config.env['ANSIBLE_STDOUT_CALLBACK'] = \
r_opts['envvars']['ANSIBLE_STDOUT_CALLBACK']
runner = ansible_runner.Runner(config=runner_config)
status, rc = runner.run()
return playbook, rc, status

95
validations_libs/run.py

@ -0,0 +1,95 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import constants
import logging
import os
import six
from concurrent.futures import ThreadPoolExecutor
from validations_libs.ansible import Ansible as v_ansible
from validations_libs import utils as v_utils
LOG = logging.getLogger(__name__ + ".run")
class Run(object):
def __init__(self):
self.log = logging.getLogger(__name__ + ".Run")
def run_validations(self, playbook, inventory,
group=None,
extra_vars=None,
extra_vars_file=None,
validations_dir=None,
validation_name=None):
self.log = logging.getLogger(__name__ + ".run_validations")
playbooks = []
extra_vars_input = {}
if extra_vars:
extra_vars_input.update(extra_vars)
if extra_vars_file:
extra_vars_input.update(extra_vars_file)
if group:
self.log.debug('Getting the validations list by group')
try:
validations = v_utils.parse_all_validations_on_disk(
(self.validations_dir if validations_dir
else constants.ANSIBLE_VALIDATION_DIR), group)
for val in validations:
playbooks.append(val.get('id') + '.yaml')
except Exception as e:
raise(e)
else:
for pb in validation_name:
if pb not in v_utils.get_validation_group_name_list():
playbooks.append(pb + '.yaml')
else:
raise("Please, use '--group' argument instead of "
"'--validation' to run validation(s) by their "
"name(s)."
)
failed_val = False
run_ansible = v_ansible()
self.log.debug('Running the validations with Ansible')
results = []
with v_utils.TempDirs(chdir=False) as tmp:
for playbook in playbooks:
_playbook, _rc, _status = run_ansible.run(
workdir=tmp,
playbook=playbook,
playbook_dir=constants.
ANSIBLE_VALIDATION_DIR,
parallel_run=True,
inventory=inventory,
output_callback='validation_json',
quiet=True,
extra_vars=extra_vars_input,
gathering_policy='explicit')
results.append({'validation': {
'playbook': _playbook,
'rc_code': _rc,
'status': _status
}})
return results

225
validations_libs/utils.py

@ -12,18 +12,119 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import glob
import json
import logging
import os
import six
import shutil
import tempfile
import yaml
from prettytable import PrettyTable
from validations_libs import constants
RED = "\033[1;31m"
GREEN = "\033[0;32m"
RESET = "\033[0;0m"
FAILED_VALIDATION = "{}FAILED{}".format(RED, RESET)
PASSED_VALIDATION = "{}PASSED{}".format(GREEN, RESET)
LOG = logging.getLogger(__name__ + ".utils")
class Pushd(object):
"""Simple context manager to change directories and then return."""
def __init__(self, directory):
"""This context manager will enter and exit directories.
>>> with Pushd(directory='/tmp'):
... with open('file', 'w') as f:
... f.write('test')
:param directory: path to change directory to
:type directory: `string`
"""
self.dir = directory
self.pwd = self.cwd = os.getcwd()
def __enter__(self):
os.chdir(self.dir)
self.cwd = os.getcwd()
return self
def __exit__(self, *args):
if self.pwd != self.cwd:
os.chdir(self.pwd)
class TempDirs(object):
"""Simple context manager to manage temp directories."""
def __init__(self, dir_path=None, dir_prefix='tripleo', cleanup=True,
chdir=True):
"""This context manager will create, push, and cleanup temp directories.
>>> with TempDirs() as t:
... with open('file', 'w') as f:
... f.write('test')
... print(t)
... os.mkdir('testing')
... with open(os.path.join(t, 'file')) as w:
... print(w.read())
... with open('testing/file', 'w') as f:
... f.write('things')
... with open(os.path.join(t, 'testing/file')) as w:
... print(w.read())
:param dir_path: path to create the temp directory
:type dir_path: `string`
:param dir_prefix: prefix to add to a temp directory
:type dir_prefix: `string`
:param cleanup: when enabled the temp directory will be
removed on exit.
:type cleanup: `boolean`
:param chdir: Change to/from the created temporary dir on enter/exit.
:type chdir: `boolean`
"""
# NOTE(cloudnull): kwargs for tempfile.mkdtemp are created
# because args are not processed correctly
# in py2. When we drop py2 support (cent7)
# these args can be removed and used directly
# in the `tempfile.mkdtemp` function.
tempdir_kwargs = dict()
if dir_path:
tempdir_kwargs['dir'] = dir_path
if dir_prefix:
tempdir_kwargs['prefix'] = dir_prefix
self.dir = tempfile.mkdtemp(**tempdir_kwargs)
self.pushd = Pushd(directory=self.dir)
self.cleanup = cleanup
self.chdir = chdir
def __enter__(self):
if self.chdir:
self.pushd.__enter__()
return self.dir
def __exit__(self, *args):
if self.chdir:
self.pushd.__exit__()
if self.cleanup:
self.clean()
else:
LOG.warning("Not cleaning temporary directory [ %s ]" % self.dir)
def clean(self):
shutil.rmtree(self.dir, ignore_errors=True)
LOG.info("Temporary directory [ %s ] cleaned up" % self.dir)
def parse_all_validations_on_disk(path, groups=None):
results = []
validations_abspath = glob.glob("{path}/*.yaml".format(path=path))
@ -94,3 +195,123 @@ def get_validation_parameters(validation):
except KeyError:
LOG.debug("No parameters found for this validation")
return dict()
def read_validation_groups_file(groups_file_path=None):
"""Load groups.yaml file and return a dictionary with its contents"""
if not groups_file_path:
groups_file_path = constants.VALIDATION_GROUPS_INFO
if not os.path.exists(groups_file_path):
return []
with open(groups_file_path, 'r') as grps:
contents = yaml.safe_load(grps)
return contents
def get_validation_group_name_list():
"""Get the validation group name list only"""
results = []
groups = read_validation_groups_file()
if groups and isinstance(dict, groups):
for grp_name in six.viewkeys(groups):
results.append(grp_name)
return results
def get_new_validations_logs_on_disk():
"""Return a list of new log execution filenames """
files = []
for root, dirs, filenames in os.walk(constants.VALIDATIONS_LOG_BASEDIR):
files = [
f for f in filenames if not f.startswith('processed')
and os.path.splitext(f)[1] == '.json'
]
return files
def get_results(results):
"""Get validations results and return as PrettytTable format"""
new_log_files = get_new_validations_logs_on_disk()
for i in new_log_files:
val_id = "{}.yaml".format(i.split('_')[1])
for res in results:
if res['validation'].get('validation_id') == val_id:
res['validation']['logfile'] = \
os.path.join(constants.VALIDATIONS_LOG_BASEDIR, i)
t = PrettyTable(border=True, header=True, padding_width=1)
t.field_names = [
"UUID", "Validations", "Status", "Host Group(s)",
"Status by Host", "Unreachable Host(s)", "Duration"]
for validation in results:
r = []
logfile = validation['validation'].get('logfile', None)
if logfile and os.path.exists(logfile):
with open(logfile, 'r') as val:
contents = json.load(val)
for i in contents['plays']:
host = [
x.encode('utf-8')
for x in i['play'].get('host').split(', ')
]
val_id = i['play'].get('validation_id')
time_elapsed = \
i['play']['duration'].get('time_elapsed', None)
r.append(contents['plays'][0]['play'].get('id'))
r.append(val_id)
if validation['validation'].get('status') == "PASSED":
r.append(PASSED_VALIDATION)
else:
r.append(FAILED_VALIDATION)
unreachable_hosts = []
hosts_result = []
for h in list(contents['stats'].keys()):
ht = h.encode('utf-8')
if contents['stats'][ht]['unreachable'] != 0:
unreachable_hosts.append(ht)
elif contents['stats'][ht]['failures'] != 0:
hosts_result.append("{}{}{}".format(
RED, ht, RESET))
else:
hosts_result.append("{}{}{}".format(
GREEN, ht, RESET))
r.append(", ".join(host))
r.append(", ".join(hosts_result))
r.append("{}{}{}".format(RED,
", ".join(unreachable_hosts),
RESET))
r.append(time_elapsed)
t.add_row(r)
t.sortby = "UUID"
for field in t.field_names:
if field == "Status":
t.align['Status'] = "l"
else:
t.align[field] = "l"
print(t)
if len(new_log_files) > len(results):
LOG.warn('Looks like we have more log files than '
'executed validations')
for i in new_log_files:
os.rename(
"{}/{}".format(constants.VALIDATIONS_LOG_BASEDIR,
i), "{}/processed_{}".format(
constants.VALIDATIONS_LOG_BASEDIR, i))
Loading…
Cancel
Save