validations-libs/validations_libs/callback_plugins/vf_validation_json.py
Jiri Podivin 9163d8bcbc Moving callbacks to validations-libs
Callback plugins were transferred from validations-common
repository to validations-libs.

Necessary adjustments were made to the module structure,
requierements, as well as installation and documentation generator config.

Associated tests were moved as well, with removal of superfluous inheritance
and imports included.

Demonstration http server module for communication with `http_json`
callback was moved with directory structure preserved.

Signed-off-by: Jiri Podivin <jpodivin@redhat.com>
Change-Id: I31768375430a2f29da71aae8f3db3882c373ced5
2022-03-10 08:59:40 +00:00

239 lines
7.6 KiB
Python

# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__metaclass__ = type
import datetime
import json
import time
import os
from functools import partial
from functools import reduce
from ansible.parsing.ajson import AnsibleJSONEncoder
from ansible.plugins.callback import CallbackBase
DOCUMENTATION = '''
callback: json
short_description: Log Ansible results on filesystem
version_added: "1.0"
description:
- This callback converts all events into a JSON file
stored in the selected validations logging directory,
as defined by the $VALIDATIONS_LOG_DIR env variable,
or the $HOME/validations by default.
type: aggregate
requirements: None
'''
VALIDATIONS_LOG_DIR = os.environ.get(
'VALIDATIONS_LOG_DIR',
os.path.expanduser('~/validations'))
def current_time():
return '%sZ' % datetime.datetime.utcnow().isoformat()
def secondsToStr(t):
def rediv(ll, b):
return list(divmod(ll[0], b)) + ll[1:]
return "%d:%02d:%02d.%03d" % tuple(
reduce(rediv, [[
t * 1000,
], 1000, 60, 60]))
class CallbackModule(CallbackBase):
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'aggregate'
CALLBACK_NAME = 'validation_json'
CALLBACK_NEEDS_WHITELIST = True
def __init__(self, display=None):
super(CallbackModule, self).__init__(display)
self.results = []
self.simple_results = []
self.env = {}
self.start_time = None
self.current_time = current_time()
def _new_play(self, play):
return {
'play': {
'host': play.get_name(),
'validation_id': self.env['playbook_name'],
'validation_path': self.env['playbook_path'],
'id': (os.getenv('ANSIBLE_UUID') if os.getenv('ANSIBLE_UUID')
else str(play._uuid)),
'duration': {
'start': current_time()
}
},
'tasks': []
}
def _new_task(self, task):
return {
'task': {
'name': task.get_name(),
'id': str(task._uuid),
'duration': {
'start': current_time()
}
},
'hosts': {}
}
def _val_task(self, task_name):
return {
'task': {
'name': task_name,
'hosts': {}
}
}
def _val_task_host(self, task_name):
return {
'task': {
'name': task_name,
'hosts': {}
}
}
def v2_playbook_on_start(self, playbook):
self.start_time = time.time()
pl = playbook._file_name
validation_id = os.path.splitext(os.path.basename(pl))[0]
self.env = {
"playbook_name": validation_id,
"playbook_path": playbook._basedir
}
def v2_playbook_on_play_start(self, play):
self.results.append(self._new_play(play))
def v2_playbook_on_task_start(self, task, is_conditional):
self.results[-1]['tasks'].append(self._new_task(task))
def v2_playbook_on_handler_task_start(self, task):
self.results[-1]['tasks'].append(self._new_task(task))
def v2_playbook_on_stats(self, stats):
"""Display info about playbook statistics"""
hosts = sorted(stats.processed.keys())
summary = {}
for h in hosts:
s = stats.summarize(h)
summary[h] = s
output = {
'plays': self.results,
'stats': summary,
'validation_output': self.simple_results
}
log_file = "{}/{}_{}_{}.json".format(
VALIDATIONS_LOG_DIR,
(os.getenv('ANSIBLE_UUID') if os.getenv('ANSIBLE_UUID') else
self.results[0].get('play').get('id')),
self.env['playbook_name'],
self.current_time)
with open(log_file, 'w') as js:
js.write(json.dumps(output,
cls=AnsibleJSONEncoder,
indent=4,
sort_keys=True))
def _record_task_result(self, on_info, result, **kwargs):
"""This function is used as a partial to add info in a single method
"""
host = result._host
task = result._task
task_result = result._result.copy()
task_result.update(on_info)
task_result['action'] = task.action
self.results[-1]['tasks'][-1]['hosts'][host.name] = task_result
if 'failed' in task_result.keys():
self.simple_results.append(self._val_task(task.name))
self.simple_results[-1]['task']['status'] = "FAILED"
self.simple_results[-1]['task']['hosts'][host.name] = task_result
if 'warnings' in task_result.keys() and task_result.get('warnings'):
self.simple_results.append(self._val_task(task.name))
self.simple_results[-1]['task']['status'] = "WARNING"
self.simple_results[-1]['task']['hosts'][host.name] = task_result
end_time = current_time()
time_elapsed = secondsToStr(time.time() - self.start_time)
for result in self.results:
if len(result['tasks']) > 1:
result['tasks'][-1]['task']['duration']['end'] = end_time
result['play']['duration']['end'] = end_time
result['play']['duration']['time_elapsed'] = time_elapsed
def v2_playbook_on_no_hosts_matched(self):
no_match_result = self._val_task('No tasks run')
no_match_result['task']['status'] = "FAILED"
no_match_result['task']['info'] = (
"None of the hosts specified"
" were matched in the inventory file")
output = {
'plays': self.results,
'stats': {
'No host matched': {
'changed': 0,
'failures': 1,
'ignored': 0,
'ok': 0,
'rescued': 0,
'skipped': 0,
'unreachable': 0}},
'validation_output': self.simple_results + [no_match_result]
}
log_file = "{}/{}_{}_{}.json".format(
VALIDATIONS_LOG_DIR,
os.getenv(
'ANSIBLE_UUID',
self.results[0].get('play').get('id')),
self.env['playbook_name'],
self.current_time)
with open(log_file, 'w') as js:
js.write(json.dumps(output,
cls=AnsibleJSONEncoder,
indent=4,
sort_keys=True))
def __getattribute__(self, name):
"""Return ``_record_task_result`` partial with a dict
containing skipped/failed if necessary
"""
if name not in ('v2_runner_on_ok', 'v2_runner_on_failed',
'v2_runner_on_unreachable', 'v2_runner_on_skipped'):
return object.__getattribute__(self, name)
on = name.rsplit('_', 1)[1]
on_info = {}
on_info[on] = True
return partial(self._record_task_result, on_info)