Add script to generate pre-patched iso

This change adds the script that allow us to build a pre-patched
ISO inside the LAT container. The script will receive the iso
and patch files as parameters then mount the iso, use apt-ostree
to update the existing ostree repository with deb files from the
patch files and pack everything again in a new iso file.

Test plan:
    PASS: Create ISO with one patch applied
        Execute full AIO-SX install
        Check "software list" output
        Check "software show <release> -packages" output
        Verify if ostree commit has being added
        Verify if package(s) are installed with dpkg
    PASS: Create ISO with two patch applied
        Execute full AIO-SX install
        Check "software list" output
        Check "software show <release> -packages" output
        Verify if ostree commit has being added
        Verify if package(s) are installed with dpkg
    PASS: Test full upgrade process on pre-patched ISO, using:
        software upload <release>
        software deploy precheck <release>
        software deploy start <release>

Depends-On: https://review.opendev.org/c/starlingx/tools/+/922842

Story: 2010676
Task: 50436

Change-Id: I412e2244f1927dc05d41cdec461bc12d620ea71c
Signed-off-by: Dostoievski Batista <dostoievski.albinobatista@windriver.com>
This commit is contained in:
Dostoievski Batista 2024-06-26 10:53:27 -03:00
parent ab1c6d0012
commit 8c7a2fe56e

394
build-tools/create-prepatched-iso Executable file
View File

@ -0,0 +1,394 @@
#!/usr/bin/python3
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright (C) 2024 Wind River Systems,Inc
import argparse
import logging
import os
import shutil
import subprocess
import sys
import tarfile
import tempfile
import yaml
import xml.etree.ElementTree as ET
BASE_BULLSEYE_PATH = os.path.join(os.environ.get('MY_REPO_ROOT_DIR'),
"stx-tools/debian-mirror-tools/config/debian/common/base-bullseye.yaml")
GPG_HOME = "/tmp/.lat_gnupg_root"
HTTP_SERVER_IP = os.environ.get('HTTP_CONTAINER_IP')
HTTP_FULL_ADDR = f"http://{HTTP_SERVER_IP}:8088"
LAT_SDK_SYSROOT = "/opt/LAT/SDK/sysroots/x86_64-wrlinuxsdk-linux"
MYUNAME = os.environ.get('MYUNAME')
PROJECT = os.environ.get('PROJECT')
FEED_PATH = f"/localdisk/loadbuild/{MYUNAME}/{PROJECT}/patches_feed"
logger = logging.getLogger('create-prepatched-iso')
def get_label_from_isolinux_cfg(path_to_file):
"""Get the iso label from the isolinux.cfg.
This file is not usually formatted so we need to find the exact line
where the value is.
:param path_to_file: Full path name to isolinux.cfg file
:returns: The instiso value
"""
logger.info("Getting instiso label from: %s" % path_to_file)
try:
with open(path_to_file, 'r') as file:
iso_label = None
split_line = []
for line in file:
if 'instiso=' in line:
split_line = line.split()
break
for item in split_line:
if 'instiso=' in item:
split_item = item.split('=')
iso_label = split_item[1]
break
return iso_label
except Exception as e:
logger.error(str(e))
raise Exception(e)
def create_iso(iso_directory, iso_label, output_path):
"""Create a new ISO or overwrite existing ISO
:param iso_directory: Path to files to be part of the ISO
:param iso_label: Value to be usad as volume ID
:param output_path: Path where .iso will be saved
"""
logger.info("Packing new ISO")
try:
# Here we use mkisofs command to create the iso, the parameters
# are so the iso is created with eltorito header and on ISO 9660 format
cmd = ["mkisofs",
"-o", output_path,
"-A", iso_label,
"-V", iso_label,
"-U", "-J",
"-joliet-long",
"-r",
"-iso-level", "2",
"-b", "isolinux/isolinux.bin",
"-c", "isolinux/boot.cat",
"-no-emul-boot",
"-boot-load-size", "4",
"-boot-info-table",
"-eltorito-alt-boot",
"-eltorito-platform", "0xEF",
"-eltorito-boot", "efi.img",
"-no-emul-boot",
iso_directory
]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Making the iso EFI bootable
cmd = ["isohybrid", "--uefi", output_path]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Implant new checksum, required for ISO9660 image
cmd = ["implantisomd5", output_path]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
except Exception as e:
logger.error(str(e))
raise Exception(e)
def mount_iso(iso_to_mount, path_to_mount):
"""Tries to mount the ISO in a directory
:param path_to_mount: Path to directory where iso will be mounted
"""
logger.info("Mounting ISO on: %s" % path_to_mount)
if not os.path.isfile(iso_to_mount):
raise Exception("ISO not found: %s" % iso_to_mount)
if not os.path.exists(path_to_mount):
raise Exception("Mount path not found: %s" % path_to_mount)
# We try to mount the iso in the folder
try:
cmd = ["mount", "-o", "loop", iso_to_mount, path_to_mount]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
except Exception as e:
logger.error(str(e))
raise Exception(e)
else:
logger.info("ISO sucessfully mounted")
def umount_iso(mount_point):
"""Tries to umount ISO from directory
:param mount_pount: Path where the mount is on
"""
logger.info("Unmounting: %s" % mount_point)
try:
cmd = ["umount", "-l", mount_point]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
except Exception as e:
logger.error("Unmounting failed")
logger.error(str(e))
else:
logger.info("Unmounted sucessfully")
def get_yaml_value(keys_to_get):
"""Load debian base template and get value from specific key
:param keys_to_get: Name of the key
:returns: Value from the key
"""
with open(BASE_BULLSEYE_PATH) as stream:
try:
keys = keys_to_get.split('.')
data = yaml.safe_load(stream)
for key in keys:
data = data.get(key)
if data is None:
logger.error("keys sequence '%s' not found in %s",
keys_to_get, BASE_BULLSEYE_PATH)
sys.exit(1)
except FileNotFoundError:
logger.error("%s not found", BASE_BULLSEYE_PATH)
sys.exit(1)
return data
def setup_gpg_client():
"""Setup configuration for the GPG client
First we check if GPG configuration folder exist (GPG_HOME)
if it doesn't exist we set it up then we set the env variable
for the GPG client. This is usually not needed because lat sdk
create this folder to us but this is not always the case.
"""
ostree_gpg_id = get_yaml_value("gpg.ostree.gpgid")
ostree_gpg_key = get_yaml_value("gpg.ostree.gpgkey")
ostree_gpg_pass = get_yaml_value("gpg.ostree.gpg_password")
if not os.path.exists(GPG_HOME):
logger.info("GPG home (%s) doesn't exist, creating...", GPG_HOME)
os.environ["OECORE_NATIVE_SYSROOT"] = LAT_SDK_SYSROOT
os.makedirs(GPG_HOME)
cmd = f"chmod 700 {GPG_HOME}"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"echo allow-loopback-pinentry > {GPG_HOME}/gpg-agent.conf"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg-connect-agent --homedir {GPG_HOME} reloadagent /bye"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir {GPG_HOME} --import {ostree_gpg_key}"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir {GPG_HOME} --list-keys {ostree_gpg_id}"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir={GPG_HOME} -o /dev/null -u \"{ostree_gpg_id}\" --pinentry=loopback \
--passphrase {ostree_gpg_pass} -s /dev/null"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
os.environ["GNUPGHOME"] = GPG_HOME
logger.info("GPG homedir created with success.")
else:
logger.info("GPG home (%s) folder already exist.", GPG_HOME)
cmd = f"gpg --homedir={GPG_HOME} -o /dev/null -u \"{ostree_gpg_id}\" --pinentry=loopback \
--passphrase {ostree_gpg_pass} -s /dev/null"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
os.environ["GNUPGHOME"] = GPG_HOME
def main():
parser = argparse.ArgumentParser(description="Create a valid StarlingX ISO with patches \
already applied.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-i','--iso',type=str,
help="Full path to .iso file to be used as the base.",
required=True)
parser.add_argument('-p','--patch',type=str,
help="""Full path to every .patch file. You can specify more than one.\
e.g.: /localdisk/deploy/starlingx-24.09.1.patch""",
action='append',
required=True)
parser.add_argument('-o','--output',type=str,
help="""Location where the pre-patched iso will be saved. \
e.g.: /localdisk/deploy/prepatch.iso""",
required=True)
parser.add_argument('-v','--verbose',action='store_true',
help="Active debug logging")
args = parser.parse_args()
# Config logging
log_level = logging.INFO
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(level=log_level)
# Check if every argument is correct
if not os.path.isfile(args.iso):
logger.error(f"ISO file doesn't exist in {args.iso}")
sys.exit(1)
if os.path.isfile(args.output):
logger.error(f"Output file {args.output} already exist, please select another name.")
sys.exit(1)
for patch in args.patch:
if not os.path.isfile(patch):
logger.error(f"Patch file {patch} doesn't exist, please input a valid file.")
sys.exit(1)
# Check if env variables are correctly set
if not MYUNAME:
logger.error("Environment variable UNAME is not correctly set.")
sys.exit(1)
if not PROJECT:
logger.error("Environment variable PROJECT is not correctly set")
sys.exit(1)
if not HTTP_SERVER_IP:
logger.error("Environment variable HTTP_SERVER_IP is not correctly set")
sys.exit(1)
try:
# Create temporary folders to hold the mount point,
# the new iso files and the metadata and debs from patches
logger.info("Creating temporary folders...")
mnt_folder = tempfile.mkdtemp(prefix='mnt_')
iso_folder = tempfile.mkdtemp(prefix='iso_')
ptc_folder = tempfile.mkdtemp(prefix='patch_')
mount_iso(args.iso, mnt_folder)
logger.info('Copying all files from %s to %s', mnt_folder, iso_folder)
# Copy all files from the mount point to the iso temporary folder
cmd = ["rsync", "-a", f'{mnt_folder}/', iso_folder]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# With all files copied, we don't need the mount point anymore
umount_iso(mnt_folder)
# Change permissions on iso folder so we can update the files
os.chmod(iso_folder, 0o777)
# We initiate a reprepo feed in loadbuild because we need to access it
# through a http service
logger.info(f'Setting up package feed in {FEED_PATH}')
cmd = ["apt-ostree", "repo", "init", "--feed", FEED_PATH,
"--release", "bullseye", "--origin", "updates"]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
logger.info('Unpacking patches...')
# For every patch we need to extract the metadata.xml, the deb files
# and save the sw_version and packages names to be used on apt-ostree
patches_data = []
for patch in args.patch:
with tempfile.TemporaryDirectory() as extract_folder:
with tarfile.open(patch) as f:
# We extract the metadata.xml from the metadata.tar
f.extract('metadata.tar', f"{extract_folder}/")
metadata_tar = tarfile.open(f"{extract_folder}/metadata.tar")
metadata_tar.extract('metadata.xml', f"{extract_folder}/")
# Get sw_version value and save metadata.xml using sw_version as sufix
xml_root = ET.parse(f"{extract_folder}/metadata.xml").getroot()
sw_version = xml_root.find('sw_version').text
os.makedirs(f"{ptc_folder}/{sw_version}/metadata")
metadata_path = f"{ptc_folder}/{sw_version}/metadata/\
starlingx-{sw_version}-metadata.xml"
shutil.copy(f"{extract_folder}/metadata.xml", metadata_path)
# From inside software.tar we extract every .deb file
f.extract('software.tar', f"{extract_folder}/")
software_tar = tarfile.open(f"{extract_folder}/software.tar")
software_tar.extractall(f"{ptc_folder}/{sw_version}/debs/")
# Packages names need to include version and revision
# e.g.: logmgmt_1.0-1.stx.10
packages = []
for i in xml_root.find('packages').findall('deb'):
packages.append(i.text.split("_")[0])
# Now we save the information we extract for later use
patches_data.append({
"sw_version": sw_version,
"path": f"{ptc_folder}/{sw_version}",
"packages": packages,
"metadata": metadata_path
})
logger.info(f'Patch {sw_version} unpacked sucessfully.')
# Here we setup our gpg client
setup_gpg_client()
# Now we need to populate reprepo feed with every deb from every patch
# after that we install it on the ostree repository
logger.info('Populate ostree repository with .deb files...')
for patch in patches_data:
# Scan /debs/ folder and load every patch to the reprepo feed
deb_dir = os.scandir(os.path.join(patch["path"],"debs/"))
for deb in deb_dir:
cmd = ["apt-ostree", "repo", "add", "--feed", FEED_PATH,
"--release", "bullseye", "--component", patch['sw_version'],
os.path.join(f"{patch['path']}/debs/", deb.name)]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Now with every deb loaded we commit it in the ostree repository
# apt-ostree requires an http connection to access the host files
# so we give the full http path using the ip
full_feed_path = f'\"{HTTP_FULL_ADDR}{FEED_PATH} bullseye\"'
gpg_key = get_yaml_value("gpg.ostree.gpgid")
pkgs = " ".join(patch["packages"])
cmd = ["apt-ostree", "compose", "install", "--repo", f"{iso_folder}/ostree_repo",
"--gpg-key", gpg_key, "--branch", "starlingx", "--feed", full_feed_path,
"--component", patch['sw_version'], pkgs]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Copy patch metadata to iso
shutil.copy(patch["metadata"], f"{iso_folder}/patches")
# Update ostree summary
cmd = ["ostree", "summary", "--update", f"--repo={iso_folder}/ostree_repo"]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# TODO(dalbinob): Remember to copy only the latest ostree commit
# Now we get the label and re create the ISO with the new ostree
logger.info('Creating new .iso file...')
instlabel = get_label_from_isolinux_cfg(f"{iso_folder}/isolinux/isolinux.cfg")
create_iso(iso_folder, instlabel, args.output)
# Allow to edit and read the newly created iso
os.chmod(args.output, 0o777)
logger.info("Pre-patched ISO created sucessfully: %s", args.output)
except Exception as e:
logger.error('create-prepatched-iso failed, see error below:')
logger.error(str(e))
finally:
logger.info('Cleaning temporary folders...')
if mnt_folder:
os.system(f'rm -rf {mnt_folder}')
if iso_folder:
os.system(f'rm -rf {iso_folder}')
if ptc_folder:
os.system(f'rm -rf {ptc_folder}')
# Clean reprepro feed
if os.path.exists(FEED_PATH):
shutil.rmtree(FEED_PATH)
if __name__ == "__main__":
main()