root/build-tools/create-prepatched-iso
Dostoievski Batista f9823bffcd Add option to sign using gpg key
When running the apt-ostree to generate the commit for the prepatched
ISO we can use the gpg key from the LAT container to sign the commit.
This change allow us to choose when we want to do this with the
argument '--sign-gpg'.

Test plan:
    PASS: Run create-prepatched-iso without --sign-gpg, test full
        install of AIO-SX.
    PASS: Run create-prepatched-iso with --sign-gpg, test full install
        of AIO-SX.
    PASS: Run patch-iso sub-job from patch pipeline.

Story: 2010676
Task: 51485

Change-Id: I90650c5550c812955fa57baae3044c89e427a34d
Signed-off-by: Dostoievski Batista <dostoievski.albinobatista@windriver.com>
2024-12-18 17:06:07 -03:00

567 lines
24 KiB
Python
Executable File

#!/usr/bin/python3
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright (C) 2024 Wind River Systems,Inc
import argparse
import glob
import logging
import os
import re
import shutil
import subprocess
import sys
import tarfile
import tempfile
import xml.etree.ElementTree as ET
import yaml
BASE_BULLSEYE_PATH = os.path.join(os.environ.get('MY_REPO_ROOT_DIR'),
"stx-tools/debian-mirror-tools/config/debian/common/base-bullseye.yaml")
GPG_HOME = "/tmp/.lat_gnupg_root"
HTTP_SERVER_IP = os.environ.get('HTTP_CONTAINER_IP')
HTTP_FULL_ADDR = f"http://{HTTP_SERVER_IP}:8088"
LAT_SDK_SYSROOT = "/opt/LAT/SDK/sysroots/x86_64-wrlinuxsdk-linux"
MYUNAME = os.environ.get('MYUNAME')
PROJECT = os.environ.get('PROJECT')
FEED_PATH = f"/localdisk/loadbuild/{MYUNAME}/{PROJECT}/patches_feed"
DEPLOY_DIR = os.environ.get('DEPLOY_DIR')
logger = logging.getLogger('create-prepatched-iso')
def get_label_from_isolinux_cfg(path_to_file):
"""Get the iso label from the isolinux.cfg.
This file is not usually formatted so we need to find the exact line
where the value is.
:param path_to_file: Full path name to isolinux.cfg file
:returns: The instiso value
"""
logger.info("Getting instiso label from: %s" % path_to_file)
try:
with open(path_to_file, 'r') as file:
iso_label = None
split_line = []
for line in file:
if 'instiso=' in line:
split_line = line.split()
break
for item in split_line:
if 'instiso=' in item:
split_item = item.split('=')
iso_label = split_item[1]
break
return iso_label
except Exception as e:
logger.error(str(e))
raise Exception(e)
def create_iso(iso_directory, iso_label, output_path):
"""Create a new ISO or overwrite existing ISO
:param iso_directory: Path to files to be part of the ISO
:param iso_label: Value to be usad as volume ID
:param output_path: Path where .iso will be saved
"""
logger.info("Packing new ISO")
try:
# Here we use mkisofs command to create the iso, the parameters
# are so the iso is created with eltorito header and on ISO 9660 format
cmd = ["mkisofs",
"-o", output_path,
"-A", iso_label,
"-V", iso_label,
"-U", "-J",
"-joliet-long",
"-r",
"-iso-level", "2",
"-b", "isolinux/isolinux.bin",
"-c", "isolinux/boot.cat",
"-no-emul-boot",
"-boot-load-size", "4",
"-boot-info-table",
"-eltorito-alt-boot",
"-eltorito-platform", "0xEF",
"-eltorito-boot", "efi.img",
"-no-emul-boot",
iso_directory
]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Making the iso EFI bootable
cmd = ["isohybrid", "--uefi", output_path]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Implant new checksum, required for ISO9660 image
cmd = ["implantisomd5", output_path]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
except Exception as e:
logger.error(str(e))
raise Exception(e)
def mount_iso(iso_to_mount, path_to_mount):
"""Tries to mount the ISO in a directory
:param path_to_mount: Path to directory where iso will be mounted
"""
logger.info("Mounting ISO on: %s" % path_to_mount)
if not os.path.isfile(iso_to_mount):
raise Exception("ISO not found: %s" % iso_to_mount)
if not os.path.exists(path_to_mount):
raise Exception("Mount path not found: %s" % path_to_mount)
# We try to mount the iso in the folder
try:
cmd = ["mount", "-o", "loop", iso_to_mount, path_to_mount]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
except Exception as e:
logger.error(str(e))
raise Exception(e)
else:
logger.info("ISO sucessfully mounted")
def umount_iso(mount_point):
"""Tries to umount ISO from directory
:param mount_pount: Path where the mount is on
"""
logger.info("Unmounting: %s" % mount_point)
try:
cmd = ["umount", "-l", mount_point]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
except Exception as e:
logger.error("Unmounting failed")
logger.error(str(e))
else:
logger.info("Unmounted sucessfully")
def get_yaml_value(keys_to_get):
"""Load debian base template and get value from specific key
:param keys_to_get: Name of the key
:returns: Value from the key
"""
with open(BASE_BULLSEYE_PATH) as stream:
try:
keys = keys_to_get.split('.')
data = yaml.safe_load(stream)
for key in keys:
data = data.get(key)
if data is None:
logger.error("keys sequence '%s' not found in %s",
keys_to_get, BASE_BULLSEYE_PATH)
sys.exit(1)
except FileNotFoundError:
logger.error("%s not found", BASE_BULLSEYE_PATH)
sys.exit(1)
return data
def setup_gpg_client():
"""Setup configuration for the GPG client
First we check if GPG configuration folder exist (GPG_HOME)
if it doesn't exist we set it up then we set the env variable
for the GPG client. This is usually not needed because lat sdk
create this folder to us but this is not always the case.
"""
ostree_gpg_id = get_yaml_value("gpg.ostree.gpgid")
ostree_gpg_key = get_yaml_value("gpg.ostree.gpgkey")
ostree_gpg_pass = get_yaml_value("gpg.ostree.gpg_password")
if not os.path.exists(GPG_HOME):
logger.info("GPG home (%s) doesn't exist, creating...", GPG_HOME)
os.environ["OECORE_NATIVE_SYSROOT"] = LAT_SDK_SYSROOT
os.makedirs(GPG_HOME)
cmd = f"chmod 700 {GPG_HOME}"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"echo allow-loopback-pinentry > {GPG_HOME}/gpg-agent.conf"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg-connect-agent --homedir {GPG_HOME} reloadagent /bye"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir {GPG_HOME} --import {ostree_gpg_key}"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir {GPG_HOME} --list-keys {ostree_gpg_id}"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir={GPG_HOME} -o /dev/null -u \"{ostree_gpg_id}\" --pinentry=loopback \
--passphrase {ostree_gpg_pass} -s /dev/null"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
os.environ["GNUPGHOME"] = GPG_HOME
logger.info("GPG homedir created with success.")
else:
logger.info("GPG home (%s) folder already exist.", GPG_HOME)
cmd = f"gpg --homedir={GPG_HOME} -o /dev/null -u \"{ostree_gpg_id}\" --pinentry=loopback \
--passphrase {ostree_gpg_pass} -s /dev/null"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
os.environ["GNUPGHOME"] = GPG_HOME
def add_tag_xml(parent, name, text):
"""Add tag with text to a parent tag
Utility function that helps us create XML tags inside another
tag with a value inside it without repeating ourselves too much.
:param parent: XML parent tag
:param name: Name of the tag
:param text: Value inside the tag
"""
tag = ET.SubElement(parent, name)
tag.text = text
def update_metadata_info(metadata, iso_path):
"""Update iso's metadata files
This function updates the metadata files with information from
the patches and the ostree repository.
:param metadata: Path to the metadata file
:param iso_path: Path to the ISO
"""
logger.info("Updating metadata's info...")
# Load XML structure and create base
tree = ET.parse(metadata)
root = tree.getroot()
content = ET.SubElement(root, "contents")
ostree = ET.SubElement(content, "ostree")
add_tag_xml(ostree, "number_of_commits", "1")
# Update prepatched iso field
add_tag_xml(root, "prepatched_iso", "Y")
base_element = ET.SubElement(ostree, "base")
# For now we add empty values here as the software
# expect this fields to be in the XML
add_tag_xml(base_element, "commit", "")
add_tag_xml(base_element, "checksum", "")
# Get ostree commit
try:
cmd = f"ostree --repo={iso_path}/ostree_repo rev-parse starlingx"
logger.debug('Running command: %s', cmd)
commit_value = subprocess.check_output(cmd, stderr=subprocess.STDOUT,
shell=True).decode(sys.stdout.encoding).strip()
except subprocess.CalledProcessError as e:
raise Exception(e.output)
except Exception as e:
raise Exception(e)
# Get ostree checksum
try:
cmd = (f"ostree --repo={iso_path}/ostree_repo log starlingx"
'| grep -m 1 -i checksum | sed "s/.* //"')
logger.debug('Running command: %s', cmd)
checksum_value = subprocess.check_output(cmd, stderr=subprocess.STDOUT,
shell=True).decode(sys.stdout.encoding).strip()
except subprocess.CalledProcessError as e:
raise Exception(e.output)
except Exception as e:
raise Exception(e)
# Add info in commit1
commit1_element = ET.SubElement(ostree, "commit1")
add_tag_xml(commit1_element, "commit", commit_value)
add_tag_xml(commit1_element, "checksum", checksum_value)
# Remove requires field from metadata
requires = root.find("requires")
if requires is not None:
requires.clear()
# Save metadata file changes
tree.write(metadata)
def clean_ostree(ostree_repo_path):
'''
This function takes an ostree repository path as input and deletes all
commits in that repository except for the latest one.
:param ostree_repo_path: Path to the ostree repository
'''
cmd = f"ostree --repo='{ostree_repo_path}' log starlingx"
repo_history = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding)
commits = re.findall(pattern=r"^commit\s*(.*)", string=repo_history, flags=re.MULTILINE)
# Delete each commit except the latest one
for commit in commits[1:]:
cmd = f"ostree --repo='{ostree_repo_path}' prune --delete-commit='{commit}'"
_ = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding)
cmd = ["ostree", "summary", "--update", f"--repo={ostree_repo_path}"]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
def main():
parser = argparse.ArgumentParser(description="Create a valid StarlingX ISO with patches \
already applied.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-i','--iso',type=str,
help="Full path to .iso file to be used as the base.",
required=True)
parser.add_argument('-p','--patch',type=str,
help="""Full path to every .patch file. You can specify more than one.\
e.g.: /localdisk/deploy/starlingx-24.09.1.patch""",
action='append',
required=True)
parser.add_argument('-o','--output',type=str,
help="""Location where the pre-patched iso will be saved. \
e.g.: /localdisk/deploy/prepatch.iso""",
required=True)
parser.add_argument('-v','--verbose',action='store_true',
help="Active debug logging")
parser.add_argument('-b','--base',type=str,
help="Full path to ostree repository to be used as base to the \
pre-patched iso. Default value is: $DEPLOY_DIR/ostree_repo")
parser.add_argument('-g','--sign-gpg',action='store_true',
help="Sign the commit created by apt-ostree using the default \
GPG_HOME from LAT container.")
args = parser.parse_args()
# Config logging
log_level = logging.INFO
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(level=log_level)
# Check if every argument is correct
if not os.path.isfile(args.iso):
logger.error(f"ISO file doesn't exist in {args.iso}")
sys.exit(1)
if os.path.isfile(args.output):
logger.error(f"Output file {args.output} already exist, please select another name.")
sys.exit(1)
for patch in args.patch:
if not os.path.isfile(patch):
logger.error(f"Patch file {patch} doesn't exist, please input a valid file.")
sys.exit(1)
path_ostree_repo = f"{DEPLOY_DIR}/ostree_repo"
if args.base:
path_ostree_repo = args.base
if not os.path.isdir(f"{path_ostree_repo}"):
logger.error("Not able to find %s folder.", path_ostree_repo)
sys.exit(1)
# Check if env variables are correctly set
if not MYUNAME:
logger.error("Environment variable UNAME is not correctly set.")
sys.exit(1)
if not PROJECT:
logger.error("Environment variable PROJECT is not correctly set")
sys.exit(1)
if not HTTP_SERVER_IP:
logger.error("Environment variable HTTP_SERVER_IP is not correctly set")
sys.exit(1)
try:
# Create temporary folders to hold the mount point,
# the new iso files and the metadata and debs from patches
logger.info("Creating temporary folders...")
mnt_folder = tempfile.mkdtemp(prefix='mnt_')
iso_folder = tempfile.mkdtemp(prefix='iso_')
ptc_folder = tempfile.mkdtemp(prefix='patch_')
mount_iso(args.iso, mnt_folder)
logger.info('Copying all files from %s to %s', mnt_folder, iso_folder)
# Copy all files, except ostree_repo, from the mount point to the iso temporary folder
cmd = ["rsync", "-a", "--exclude", "ostree_repo/", f'{mnt_folder}/', iso_folder]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# With all files copied, we don't need the mount point anymore
umount_iso(mnt_folder)
# Change permissions on iso folder so we can update the files
os.chmod(iso_folder, 0o777)
# Copy the base ostree repository to the iso folder
logger.info('Copying ostree repository...')
cmd = ["rsync", "-a", f'{path_ostree_repo}/', f"{iso_folder}/ostree_repo"]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# We initiate a reprepo feed in loadbuild because we need to access it
# through a http service
logger.info(f'Setting up package feed in {FEED_PATH}')
cmd = ["apt-ostree", "repo", "init", "--feed", FEED_PATH,
"--release", "bullseye", "--origin", "updates"]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
latest_patch_number = 0
logger.info('Unpacking patches...')
# For every patch we need to extract the metadata.xml, the deb files
# and save the sw_version and packages names to be used on apt-ostree
patches_data = []
for patch in args.patch:
with tempfile.TemporaryDirectory() as extract_folder:
with tarfile.open(patch) as f:
# We extract the metadata.xml from the metadata.tar
f.extract('metadata.tar', f"{extract_folder}/")
metadata_tar = tarfile.open(f"{extract_folder}/metadata.tar")
metadata_tar.extract('metadata.xml', f"{extract_folder}/")
# Get sw_version value and save metadata.xml using sw_version as sufix
xml_root = ET.parse(f"{extract_folder}/metadata.xml").getroot()
sw_version = xml_root.find('sw_version').text
component = xml_root.find('component').text
os.makedirs(f"{ptc_folder}/{sw_version}/metadata")
metadata_path = (f"{ptc_folder}/{sw_version}/metadata/{component}-{sw_version}"
"-metadata.xml")
shutil.copy(f"{extract_folder}/metadata.xml", metadata_path)
# From inside software.tar we extract every .deb file
f.extract('software.tar', f"{extract_folder}/")
software_tar = tarfile.open(f"{extract_folder}/software.tar")
software_tar.extractall(f"{ptc_folder}/{sw_version}/debs/")
# Packages names need to include version and revision
# e.g.: logmgmt_1.0-1.stx.10
packages = []
for i in xml_root.find('packages').findall('deb'):
packages.append(i.text.split("_")[0])
# Patches can contain precheck scripts, we need to verify if
# they exist and if so move then to the pre-patched iso.
precheck = False
path_precheck = ''
path_upgrade_utils = ''
if "deploy-precheck" in f.getnames() and "upgrade_utils.py" in f.getnames():
precheck = True
f.extract('deploy-precheck', f"{extract_folder}/")
f.extract('upgrade_utils.py', f"{extract_folder}/")
precheck_folder = f"{ptc_folder}/{sw_version}/precheck"
os.makedirs(f"{precheck_folder}")
path_precheck = f"{precheck_folder}/deploy-precheck"
path_upgrade_utils = f"{precheck_folder}/upgrade_utils.py"
shutil.copy(f"{extract_folder}/deploy-precheck", path_precheck)
shutil.copy(f"{extract_folder}/upgrade_utils.py", path_upgrade_utils)
# Now we save the information we extract for later use
patches_data.append({
"sw_version": sw_version,
"path": f"{ptc_folder}/{sw_version}",
"packages": packages,
"metadata": metadata_path,
"precheck": precheck,
"path_precheck": path_precheck,
"path_upgrade_utils": path_upgrade_utils
})
# Save the biggest version from the patches we have
patch_num = int(sw_version.split(".")[-1])
if patch_num > latest_patch_number:
latest_patch_number = patch_num
logger.info(f'Patch {sw_version} unpacked sucessfully.')
# Here we setup our gpg client if needed
if args.sign_gpg:
setup_gpg_client()
# We delete the patches folder from the base iso and recreate it
# so we may populate with the metadatas from the patches we are using
shutil.rmtree(f"{iso_folder}/patches")
os.mkdir(f"{iso_folder}/patches")
# We clean all the metadatas inside upgrades folder
for file in glob.glob(f"{iso_folder}/upgrades/*-metadata.xml"):
os.remove(file)
# Now we need to populate reprepo feed with every deb from every patch
# after that we install it on the ostree repository
logger.info('Populate ostree repository with .deb files...')
patches_data = sorted(patches_data, key=lambda x:x['sw_version'])
for patch in patches_data:
# Scan /debs/ folder and load every patch to the reprepo feed
deb_dir = os.scandir(os.path.join(patch["path"],"debs/"))
for deb in deb_dir:
cmd = ["apt-ostree", "repo", "add", "--feed", FEED_PATH,
"--release", "bullseye", "--component", patch['sw_version'],
os.path.join(f"{patch['path']}/debs/", deb.name)]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Now with every deb loaded we commit it in the ostree repository
# apt-ostree requires an http connection to access the host files
# so we give the full http path using the ip
full_feed_path = f'\"{HTTP_FULL_ADDR}{FEED_PATH} bullseye\"'
cmd = ["apt-ostree", "compose", "install", "--repo", f"{iso_folder}/ostree_repo"]
# If we have ostree setup we will use the gpg key
if args.sign_gpg:
gpg_key = get_yaml_value("gpg.ostree.gpgid")
cmd += ["--gpg-key", gpg_key]
pkgs = " ".join(patch["packages"])
cmd += ["--branch", "starlingx", "--feed", full_feed_path, "--component",
patch['sw_version'], pkgs]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Check if patch has precheck scripts, if yes move then to the upgrades folder
if patch["precheck"]:
shutil.copy(patch["path_precheck"], f"{iso_folder}/upgrades")
shutil.copy(patch["path_upgrade_utils"], f"{iso_folder}/upgrades")
# Copy only the patch metadata with the biggest patch version to ISO
patch_num = int(patch["sw_version"].split(".")[-1])
if latest_patch_number == patch_num:
# Metadata inside upgrades requires ostree information
update_metadata_info(patch["metadata"], iso_folder)
shutil.copy(patch["metadata"], f"{iso_folder}/patches")
shutil.copy(patch["metadata"], f"{iso_folder}/upgrades")
# Update ostree summary
cmd = ["ostree", "summary", "--update", f"--repo={iso_folder}/ostree_repo"]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Keep only the latest commit in ostree_repo to save storage space
clean_ostree(f"{iso_folder}/ostree_repo")
# Now we get the label and re create the ISO with the new ostree
logger.info('Creating new .iso file...')
instlabel = get_label_from_isolinux_cfg(f"{iso_folder}/isolinux/isolinux.cfg")
create_iso(iso_folder, instlabel, args.output)
# Allow to edit and read the newly created iso
os.chmod(args.output, 0o777)
logger.info("Pre-patched ISO created sucessfully: %s", args.output)
except Exception as e:
logger.error('create-prepatched-iso failed, see error below:')
logger.error(str(e))
finally:
logger.info('Cleaning temporary folders...')
if mnt_folder:
os.system(f'rm -rf {mnt_folder}')
if iso_folder:
os.system(f'rm -rf {iso_folder}')
if ptc_folder:
os.system(f'rm -rf {ptc_folder}')
# Clean reprepro feed
if os.path.exists(FEED_PATH):
shutil.rmtree(FEED_PATH)
if __name__ == "__main__":
main()