#!/usr/bin/python3 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Copyright (C) 2024 Wind River Systems,Inc import argparse import glob import logging import os import shutil import subprocess import sys import tarfile import tempfile import xml.etree.ElementTree as ET import yaml BASE_BULLSEYE_PATH = os.path.join(os.environ.get('MY_REPO_ROOT_DIR'), "stx-tools/debian-mirror-tools/config/debian/common/base-bullseye.yaml") GPG_HOME = "/tmp/.lat_gnupg_root" HTTP_SERVER_IP = os.environ.get('HTTP_CONTAINER_IP') HTTP_FULL_ADDR = f"http://{HTTP_SERVER_IP}:8088" LAT_SDK_SYSROOT = "/opt/LAT/SDK/sysroots/x86_64-wrlinuxsdk-linux" MYUNAME = os.environ.get('MYUNAME') PROJECT = os.environ.get('PROJECT') FEED_PATH = f"/localdisk/loadbuild/{MYUNAME}/{PROJECT}/patches_feed" logger = logging.getLogger('create-prepatched-iso') def get_label_from_isolinux_cfg(path_to_file): """Get the iso label from the isolinux.cfg. This file is not usually formatted so we need to find the exact line where the value is. :param path_to_file: Full path name to isolinux.cfg file :returns: The instiso value """ logger.info("Getting instiso label from: %s" % path_to_file) try: with open(path_to_file, 'r') as file: iso_label = None split_line = [] for line in file: if 'instiso=' in line: split_line = line.split() break for item in split_line: if 'instiso=' in item: split_item = item.split('=') iso_label = split_item[1] break return iso_label except Exception as e: logger.error(str(e)) raise Exception(e) def create_iso(iso_directory, iso_label, output_path): """Create a new ISO or overwrite existing ISO :param iso_directory: Path to files to be part of the ISO :param iso_label: Value to be usad as volume ID :param output_path: Path where .iso will be saved """ logger.info("Packing new ISO") try: # Here we use mkisofs command to create the iso, the parameters # are so the iso is created with eltorito header and on ISO 9660 format cmd = ["mkisofs", "-o", output_path, "-A", iso_label, "-V", iso_label, "-U", "-J", "-joliet-long", "-r", "-iso-level", "2", "-b", "isolinux/isolinux.bin", "-c", "isolinux/boot.cat", "-no-emul-boot", "-boot-load-size", "4", "-boot-info-table", "-eltorito-alt-boot", "-eltorito-platform", "0xEF", "-eltorito-boot", "efi.img", "-no-emul-boot", iso_directory ] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) # Making the iso EFI bootable cmd = ["isohybrid", "--uefi", output_path] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) # Implant new checksum, required for ISO9660 image cmd = ["implantisomd5", output_path] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) except Exception as e: logger.error(str(e)) raise Exception(e) def mount_iso(iso_to_mount, path_to_mount): """Tries to mount the ISO in a directory :param path_to_mount: Path to directory where iso will be mounted """ logger.info("Mounting ISO on: %s" % path_to_mount) if not os.path.isfile(iso_to_mount): raise Exception("ISO not found: %s" % iso_to_mount) if not os.path.exists(path_to_mount): raise Exception("Mount path not found: %s" % path_to_mount) # We try to mount the iso in the folder try: cmd = ["mount", "-o", "loop", iso_to_mount, path_to_mount] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) except Exception as e: logger.error(str(e)) raise Exception(e) else: logger.info("ISO sucessfully mounted") def umount_iso(mount_point): """Tries to umount ISO from directory :param mount_pount: Path where the mount is on """ logger.info("Unmounting: %s" % mount_point) try: cmd = ["umount", "-l", mount_point] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) except Exception as e: logger.error("Unmounting failed") logger.error(str(e)) else: logger.info("Unmounted sucessfully") def get_yaml_value(keys_to_get): """Load debian base template and get value from specific key :param keys_to_get: Name of the key :returns: Value from the key """ with open(BASE_BULLSEYE_PATH) as stream: try: keys = keys_to_get.split('.') data = yaml.safe_load(stream) for key in keys: data = data.get(key) if data is None: logger.error("keys sequence '%s' not found in %s", keys_to_get, BASE_BULLSEYE_PATH) sys.exit(1) except FileNotFoundError: logger.error("%s not found", BASE_BULLSEYE_PATH) sys.exit(1) return data def setup_gpg_client(): """Setup configuration for the GPG client First we check if GPG configuration folder exist (GPG_HOME) if it doesn't exist we set it up then we set the env variable for the GPG client. This is usually not needed because lat sdk create this folder to us but this is not always the case. """ ostree_gpg_id = get_yaml_value("gpg.ostree.gpgid") ostree_gpg_key = get_yaml_value("gpg.ostree.gpgkey") ostree_gpg_pass = get_yaml_value("gpg.ostree.gpg_password") if not os.path.exists(GPG_HOME): logger.info("GPG home (%s) doesn't exist, creating...", GPG_HOME) os.environ["OECORE_NATIVE_SYSROOT"] = LAT_SDK_SYSROOT os.makedirs(GPG_HOME) cmd = f"chmod 700 {GPG_HOME}" logger.debug('Running command: %s', cmd) subprocess.call([cmd], shell=True) cmd = f"echo allow-loopback-pinentry > {GPG_HOME}/gpg-agent.conf" logger.debug('Running command: %s', cmd) subprocess.call([cmd], shell=True) cmd = f"gpg-connect-agent --homedir {GPG_HOME} reloadagent /bye" logger.debug('Running command: %s', cmd) subprocess.call([cmd], shell=True) cmd = f"gpg --homedir {GPG_HOME} --import {ostree_gpg_key}" logger.debug('Running command: %s', cmd) subprocess.call([cmd], shell=True) cmd = f"gpg --homedir {GPG_HOME} --list-keys {ostree_gpg_id}" logger.debug('Running command: %s', cmd) subprocess.call([cmd], shell=True) cmd = f"gpg --homedir={GPG_HOME} -o /dev/null -u \"{ostree_gpg_id}\" --pinentry=loopback \ --passphrase {ostree_gpg_pass} -s /dev/null" logger.debug('Running command: %s', cmd) subprocess.call([cmd], shell=True) os.environ["GNUPGHOME"] = GPG_HOME logger.info("GPG homedir created with success.") else: logger.info("GPG home (%s) folder already exist.", GPG_HOME) cmd = f"gpg --homedir={GPG_HOME} -o /dev/null -u \"{ostree_gpg_id}\" --pinentry=loopback \ --passphrase {ostree_gpg_pass} -s /dev/null" logger.debug('Running command: %s', cmd) subprocess.call([cmd], shell=True) os.environ["GNUPGHOME"] = GPG_HOME def add_tag_xml(parent, name, text): """Add tag with text to a parent tag Utility function that helps us create XML tags inside another tag with a value inside it without repeating ourselves too much. :param parent: XML parent tag :param name: Name of the tag :param text: Value inside the tag """ tag = ET.SubElement(parent, name) tag.text = text def update_metadata_info(metadata, iso_path): """Update iso's metadata files This function updates the metadata files with information from the patches and the ostree repository. :param metadata: Path to the metadata file :param iso_path: Path to the ISO """ logger.info("Updating metadata's info...") # Load XML structure and create base tree = ET.parse(metadata) root = tree.getroot() content = ET.SubElement(root, "contents") ostree = ET.SubElement(content, "ostree") add_tag_xml(ostree, "number_of_commits", "1") # Update prepatched iso field add_tag_xml(root, "prepatched_iso", "Y") base_element = ET.SubElement(ostree, "base") # For now we add empty values here as the software # expect this fields to be in the XML add_tag_xml(base_element, "commit", "") add_tag_xml(base_element, "checksum", "") # Get ostree commit try: cmd = f"ostree --repo={iso_path}/ostree_repo rev-parse starlingx" logger.debug('Running command: %s', cmd) commit_value = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True).decode(sys.stdout.encoding).strip() except subprocess.CalledProcessError as e: raise Exception(e.output) except Exception as e: raise Exception(e) # Get ostree checksum try: cmd = (f"ostree --repo={iso_path}/ostree_repo log starlingx" '| grep -m 1 -i checksum | sed "s/.* //"') logger.debug('Running command: %s', cmd) checksum_value = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True).decode(sys.stdout.encoding).strip() except subprocess.CalledProcessError as e: raise Exception(e.output) except Exception as e: raise Exception(e) # Add info in commit1 commit1_element = ET.SubElement(ostree, "commit1") add_tag_xml(commit1_element, "commit", commit_value) add_tag_xml(commit1_element, "checksum", checksum_value) # Remove requires field from metadata requires = root.find("requires") if requires is not None: requires.clear() # Save metadata file changes tree.write(metadata) def main(): parser = argparse.ArgumentParser(description="Create a valid StarlingX ISO with patches \ already applied.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('-i','--iso',type=str, help="Full path to .iso file to be used as the base.", required=True) parser.add_argument('-p','--patch',type=str, help="""Full path to every .patch file. You can specify more than one.\ e.g.: /localdisk/deploy/starlingx-24.09.1.patch""", action='append', required=True) parser.add_argument('-o','--output',type=str, help="""Location where the pre-patched iso will be saved. \ e.g.: /localdisk/deploy/prepatch.iso""", required=True) parser.add_argument('-v','--verbose',action='store_true', help="Active debug logging") args = parser.parse_args() # Config logging log_level = logging.INFO if args.verbose: log_level = logging.DEBUG logging.basicConfig(level=log_level) # Check if every argument is correct if not os.path.isfile(args.iso): logger.error(f"ISO file doesn't exist in {args.iso}") sys.exit(1) if os.path.isfile(args.output): logger.error(f"Output file {args.output} already exist, please select another name.") sys.exit(1) for patch in args.patch: if not os.path.isfile(patch): logger.error(f"Patch file {patch} doesn't exist, please input a valid file.") sys.exit(1) # Check if env variables are correctly set if not MYUNAME: logger.error("Environment variable UNAME is not correctly set.") sys.exit(1) if not PROJECT: logger.error("Environment variable PROJECT is not correctly set") sys.exit(1) if not HTTP_SERVER_IP: logger.error("Environment variable HTTP_SERVER_IP is not correctly set") sys.exit(1) try: # Create temporary folders to hold the mount point, # the new iso files and the metadata and debs from patches logger.info("Creating temporary folders...") mnt_folder = tempfile.mkdtemp(prefix='mnt_') iso_folder = tempfile.mkdtemp(prefix='iso_') ptc_folder = tempfile.mkdtemp(prefix='patch_') mount_iso(args.iso, mnt_folder) logger.info('Copying all files from %s to %s', mnt_folder, iso_folder) # Copy all files from the mount point to the iso temporary folder cmd = ["rsync", "-a", f'{mnt_folder}/', iso_folder] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) # With all files copied, we don't need the mount point anymore umount_iso(mnt_folder) # Change permissions on iso folder so we can update the files os.chmod(iso_folder, 0o777) # We initiate a reprepo feed in loadbuild because we need to access it # through a http service logger.info(f'Setting up package feed in {FEED_PATH}') cmd = ["apt-ostree", "repo", "init", "--feed", FEED_PATH, "--release", "bullseye", "--origin", "updates"] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) latest_patch_number = 0 logger.info('Unpacking patches...') # For every patch we need to extract the metadata.xml, the deb files # and save the sw_version and packages names to be used on apt-ostree patches_data = [] for patch in args.patch: with tempfile.TemporaryDirectory() as extract_folder: with tarfile.open(patch) as f: # We extract the metadata.xml from the metadata.tar f.extract('metadata.tar', f"{extract_folder}/") metadata_tar = tarfile.open(f"{extract_folder}/metadata.tar") metadata_tar.extract('metadata.xml', f"{extract_folder}/") # Get sw_version value and save metadata.xml using sw_version as sufix xml_root = ET.parse(f"{extract_folder}/metadata.xml").getroot() sw_version = xml_root.find('sw_version').text os.makedirs(f"{ptc_folder}/{sw_version}/metadata") metadata_path = (f"{ptc_folder}/{sw_version}/metadata/starlingx-{sw_version}" "-metadata.xml") shutil.copy(f"{extract_folder}/metadata.xml", metadata_path) # From inside software.tar we extract every .deb file f.extract('software.tar', f"{extract_folder}/") software_tar = tarfile.open(f"{extract_folder}/software.tar") software_tar.extractall(f"{ptc_folder}/{sw_version}/debs/") # Packages names need to include version and revision # e.g.: logmgmt_1.0-1.stx.10 packages = [] for i in xml_root.find('packages').findall('deb'): packages.append(i.text.split("_")[0]) # Now we save the information we extract for later use patches_data.append({ "sw_version": sw_version, "path": f"{ptc_folder}/{sw_version}", "packages": packages, "metadata": metadata_path }) # Save the biggest version from the patches we have patch_num = int(sw_version.split(".")[-1]) if patch_num > latest_patch_number: latest_patch_number = patch_num logger.info(f'Patch {sw_version} unpacked sucessfully.') # Here we setup our gpg client setup_gpg_client() # We delete the patches folder from the base iso and recreate it # so we may populate with the metadatas from the patches we are using shutil.rmtree(f"{iso_folder}/patches") os.mkdir(f"{iso_folder}/patches") # We clean all the metadatas inside upgrades folder for file in glob.glob(f"{iso_folder}/upgrades/*-metadata.xml"): os.remove(file) # Now we need to populate reprepo feed with every deb from every patch # after that we install it on the ostree repository logger.info('Populate ostree repository with .deb files...') for patch in patches_data: # Scan /debs/ folder and load every patch to the reprepo feed deb_dir = os.scandir(os.path.join(patch["path"],"debs/")) for deb in deb_dir: cmd = ["apt-ostree", "repo", "add", "--feed", FEED_PATH, "--release", "bullseye", "--component", patch['sw_version'], os.path.join(f"{patch['path']}/debs/", deb.name)] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) # Now with every deb loaded we commit it in the ostree repository # apt-ostree requires an http connection to access the host files # so we give the full http path using the ip full_feed_path = f'\"{HTTP_FULL_ADDR}{FEED_PATH} bullseye\"' gpg_key = get_yaml_value("gpg.ostree.gpgid") pkgs = " ".join(patch["packages"]) cmd = ["apt-ostree", "compose", "install", "--repo", f"{iso_folder}/ostree_repo", "--gpg-key", gpg_key, "--branch", "starlingx", "--feed", full_feed_path, "--component", patch['sw_version'], pkgs] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) # Copy only the patch metadata with the biggest patch version to ISO patch_num = int(patch["sw_version"].split(".")[-1]) if latest_patch_number == patch_num: # Metadata inside upgrades requires ostree information update_metadata_info(patch["metadata"], iso_folder) shutil.copy(patch["metadata"], f"{iso_folder}/patches") shutil.copy(patch["metadata"], f"{iso_folder}/upgrades") # Update ostree summary cmd = ["ostree", "summary", "--update", f"--repo={iso_folder}/ostree_repo"] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) # TODO(dalbinob): Remember to copy only the latest ostree commit # Now we get the label and re create the ISO with the new ostree logger.info('Creating new .iso file...') instlabel = get_label_from_isolinux_cfg(f"{iso_folder}/isolinux/isolinux.cfg") create_iso(iso_folder, instlabel, args.output) # Allow to edit and read the newly created iso os.chmod(args.output, 0o777) logger.info("Pre-patched ISO created sucessfully: %s", args.output) except Exception as e: logger.error('create-prepatched-iso failed, see error below:') logger.error(str(e)) finally: logger.info('Cleaning temporary folders...') if mnt_folder: os.system(f'rm -rf {mnt_folder}') if iso_folder: os.system(f'rm -rf {iso_folder}') if ptc_folder: os.system(f'rm -rf {ptc_folder}') # Clean reprepro feed if os.path.exists(FEED_PATH): shutil.rmtree(FEED_PATH) if __name__ == "__main__": main()