From d6bb66a0337fc88da940077e122fe874d8dede68 Mon Sep 17 00:00:00 2001 From: Heitor Matsui Date: Wed, 11 Oct 2023 09:10:49 -0300 Subject: [PATCH] Create wrapper to run commands and log structured output This commit creates a class to run commands and functions for the USM endpoints and capture the rc, stdout and stderr from them, so that this information is logged into a json file. The purpose of this file is to enable each deployment history to be recovered in an easier way on the future. The json files are stored inside directories named under the corresponding release, and have a file named under the deployment stage in which the object was instantiated, e.g.: /opt/software/summary/starlingx-24.03.0/deploy-precheck.json These files can grow incrementally in case multiple commands are executed under the same deployment stage. The parts of the code, currently, that can benefit from the implementation added by this commit will be changed in follow-up commits. Test Plan PASS: manually replace subprocess functions on USM code, run the respective commands and verify: - Command is executed successfully - Output and behavior is maintained - The json file is created within the expected directory, with correct filename and content PASS: execute the previous step multiple times with different commands and verify the json files are appended with the new operations Story: 2010676 Task: 48955 Change-Id: Iccf1aef1b0cc064399163eeb58c23fa065a6dab5 Signed-off-by: Heitor Matsui --- software/software/constants.py | 3 + software/software/software_worker.py | 245 +++++++++++++++++++++++++++ 2 files changed, 248 insertions(+) create mode 100644 software/software/software_worker.py diff --git a/software/software/constants.py b/software/software/constants.py index 8ad29731..adc0cc2c 100644 --- a/software/software/constants.py +++ b/software/software/constants.py @@ -111,3 +111,6 @@ LICENSE_FILE = "/etc/platform/.license" VERIFY_LICENSE_BINARY = "/usr/bin/verify-license" SOFTWARE_JSON_FILE = "/opt/software/software.json" + +WORKER_SUMMARY_DIR = "%s/summary" % SOFTWARE_STORAGE_DIR +WORKER_DATETIME_FORMAT = "%Y%m%dT%H%M%S%f" diff --git a/software/software/software_worker.py b/software/software/software_worker.py new file mode 100644 index 00000000..f8753490 --- /dev/null +++ b/software/software/software_worker.py @@ -0,0 +1,245 @@ +""" +Copyright (c) 2023 Wind River Systems, Inc. + +SPDX-License-Identifier: Apache-2.0 + +""" +import asyncio +import json +import os +import re +import subprocess +from datetime import datetime + +import software.constants as constants + + +class SoftwareWorker(object): + """This class wraps the subprocess commands used by USM + modules to run a command with parameters and write its + return code, stdout, stderr and other useful information into + a structured json file that can be later recovered to create + a deployment summary report. + """ + def __init__(self, release, stage): + """SoftwareWorker constructor + + :param release: target release name, used to define + the directory in which json files will be created + :param stage: deployment stage which the commands + are being executed, used to define the json filename + """ + self._release = release + self._stage = stage + self._directory = os.path.join(constants.WORKER_SUMMARY_DIR, self._release) + os.makedirs(self._directory, exist_ok=True) + self._filename = os.path.join(self._directory, self._stage) + ".json" + operations = self._read_file() + self._run = str(SoftwareWorker._get_key(operations)) + + def _read_file(self): + """Reads the file and returns its content in a dictionary. + + :returns: dictionary loaded with content from json file + """ + try: + with open(self._filename, "r") as f: + return json.loads(f.read()) + except (FileNotFoundError, json.decoder.JSONDecodeError): + return {} + + def _write_file(self, operation, cmd, rc, output): + """Writes the command in a structured format in the file. + + :param cmd: command that was run via subprocess + :param rc: command return code + :param output: output (stdout + stderr) returned by the command + """ + operations = self._read_file() + command = SoftwareWorker._suppress_text(cmd) + if not isinstance(cmd, list): + command = [command] + with open(self._filename, "w") as f: + if self._run not in operations: + operations[self._run] = {} + operations[self._run][operation] = { + "timestamp": datetime.strftime(datetime.utcnow(), + constants.WORKER_DATETIME_FORMAT), + "command": " ".join(command), + "rc": rc, + "output": output, + } + f.write(json.dumps(operations)) + + async def _run_async(self, operation, cmd, *args, **kwargs): + """Run a command with asyncio lib, which allows returning + a line-by-line output for stdout and stderr that is then + written to a json file. + + :param operation: operation name written to json file + :param cmd: command to be executed in string format + :param args: list of arguments passed along with the command + :param kwargs: extra arguments to change the behavior of the output + :returns: instance of CompletedProcess object + """ + if "env" in kwargs: + env = kwargs["env"] + else: + env = {} + + # concatenate params for shell command format + cmd_str = " ".join([cmd] + list(args)) + + # create process, capture output and wait it to end + if "shell" in kwargs and kwargs["shell"]: + process = await asyncio.create_subprocess_shell( + cmd_str, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env) + else: + process = await asyncio.create_subprocess_exec( + cmd, + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env) + stdout, stderr = await asyncio.gather( + SoftwareWorker._read_pipe(process.stdout, "stdout"), + SoftwareWorker._read_pipe(process.stderr, "stderr") + ) + await process.wait() + + # sort pipes by timestamp to write to json + rc = process.returncode + output = stdout + stderr + sorted_output = sorted(output, key=lambda item: list(item.values())[0]) + self._write_file(operation, cmd_str, process.returncode, sorted_output) + + # join stdout and stderr to return + stdout_str, stderr_str = SoftwareWorker._join_stdout_stderr(sorted_output) + + # do some validation to simulate subprocess behavior + if "check" in kwargs and kwargs["check"]: + if rc != 0: + raise subprocess.CalledProcessError(cmd=cmd, returncode=rc, + output=stdout_str, stderr=stderr_str) + cp = subprocess.CompletedProcess(args=args, returncode=rc, + stdout=stdout_str, stderr=stderr_str) + return cp + + def run(self, operation, cmd, *args, **kwargs): + """Run the _run_async() method with asyncio.run() + to hide asyncio complexity details from the user. + + :param operation: operation name written to json file + :param cmd: command to be run + :param args: extra arguments passes to the command + :param kwargs: extra keyword arguments + :returns: command output + """ + return asyncio.run(self._run_async(operation, cmd, *args, **kwargs)) + + def run_func(self, operation, function, *args, **kwargs): + """Runs a function, capture its output and writes + to a json file. + + :param operation: operation name written to json file + :param function: function to be executed + :param args: args to pass to function + :param kwargs: kwargs to pass to function + :returns: executed function return + """ + str_args = [str(arg) for arg in args] + str_kwargs = [str(arg) + "=" + str(kwargs[arg]) for arg in kwargs] + cmd = function.__name__ + "(" + ", ".join(str_args + str_kwargs) + ")" + msg = "'%s' executed " % function.__name__ + ret, rc = None, 0 + try: + ret = function(*args, **kwargs) + msg = msg + "with success: %s" % str(ret) + except Exception as e: + rc = 1 + msg = msg + "with failure: %s" % str(e) + raise e + finally: + msg_type = "stdout" if rc == 0 else "stderr" + self._write_file(operation, cmd, rc, [{ + "timestamp": datetime.strftime(datetime.utcnow(), + constants.WORKER_DATETIME_FORMAT), + "type": msg_type, + "output": msg + }]) + return ret + + @staticmethod + async def _read_pipe(stream, pipe): + """Read an IO stream created by asyncio line-by-line + + :param stream: stream of data to be read + :param pipe: type of the output (e.g. stdio) + :returns: list of dictionaries containing + each line of the stream marked + with date and type + """ + output_list = [] + while True: + chunk = await stream.readline() + if len(chunk) == 0: + break + line = str(chunk.decode('utf-8')) + output_list.append({ + "timestamp": datetime.strftime(datetime.utcnow(), + constants.WORKER_DATETIME_FORMAT), + "type": pipe, + "output": line + }) + return output_list + + @staticmethod + def _join_stdout_stderr(output_list): + """Join a list of lines with two different types + + :param output_list: list of lines to be merged + :returns: two strings, one with all stdio output + and one with all stderr output + """ + stderr, stdout = "", "" + for output in output_list: + if output["type"] == "stdout": + stdout += output["output"] + else: + stderr += output["output"] + return stdout, stderr + + @staticmethod + def _get_key(d): + """Receive a dictionary with integer keys + and return the next valid integer + + :param d: dictionary with integer keys + :returns: next valid integer key + """ + if not d: + return 1 + keys = sorted(list(d.keys())) + last = keys[-1] + return int(last) + 1 + + @staticmethod + def _suppress_text(_str): + """Suppress a set of patterns from a string + + :param _str: source string + :returns: suppressed string + """ + search_patterns = [ + r".*(?:password|pass|pw)[= ]+(\S+)\s", + ] + + suppressed = _str + for sp in search_patterns: + match = re.match(sp, _str) + if match: + suppressed = suppressed.replace(match.group(1), "xxxxxxx") + return suppressed