root/build-tools/create-prepatched-iso
Dostoievski Batista f6d192ae31 Clean requires field from metadata
When rehoming a subcloud to a new system controller, the system will
check the metadata to see which patches are in the requires field and
based on that it will try to find the ostree commits for those patches,
that won't work because the pre-patched iso only holds the latest commit
This change erases the values from the requires field when creating a
pre-patched iso.

Test plan:
    PASS - Create pre-patched ISO with one patch
    PASS - Create pre-patched ISO with two patches

Story: 2010676
Task: 51075

Change-Id: I31da4ae2947bedf9460bec41a9dfb81a40986245
Signed-off-by: Dostoievski Batista <dostoievski.albinobatista@windriver.com>
2024-09-25 19:17:11 -03:00

494 lines
20 KiB
Python
Executable File

#!/usr/bin/python3
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright (C) 2024 Wind River Systems,Inc
import argparse
import glob
import logging
import os
import shutil
import subprocess
import sys
import tarfile
import tempfile
import xml.etree.ElementTree as ET
import yaml
BASE_BULLSEYE_PATH = os.path.join(os.environ.get('MY_REPO_ROOT_DIR'),
"stx-tools/debian-mirror-tools/config/debian/common/base-bullseye.yaml")
GPG_HOME = "/tmp/.lat_gnupg_root"
HTTP_SERVER_IP = os.environ.get('HTTP_CONTAINER_IP')
HTTP_FULL_ADDR = f"http://{HTTP_SERVER_IP}:8088"
LAT_SDK_SYSROOT = "/opt/LAT/SDK/sysroots/x86_64-wrlinuxsdk-linux"
MYUNAME = os.environ.get('MYUNAME')
PROJECT = os.environ.get('PROJECT')
FEED_PATH = f"/localdisk/loadbuild/{MYUNAME}/{PROJECT}/patches_feed"
logger = logging.getLogger('create-prepatched-iso')
def get_label_from_isolinux_cfg(path_to_file):
"""Get the iso label from the isolinux.cfg.
This file is not usually formatted so we need to find the exact line
where the value is.
:param path_to_file: Full path name to isolinux.cfg file
:returns: The instiso value
"""
logger.info("Getting instiso label from: %s" % path_to_file)
try:
with open(path_to_file, 'r') as file:
iso_label = None
split_line = []
for line in file:
if 'instiso=' in line:
split_line = line.split()
break
for item in split_line:
if 'instiso=' in item:
split_item = item.split('=')
iso_label = split_item[1]
break
return iso_label
except Exception as e:
logger.error(str(e))
raise Exception(e)
def create_iso(iso_directory, iso_label, output_path):
"""Create a new ISO or overwrite existing ISO
:param iso_directory: Path to files to be part of the ISO
:param iso_label: Value to be usad as volume ID
:param output_path: Path where .iso will be saved
"""
logger.info("Packing new ISO")
try:
# Here we use mkisofs command to create the iso, the parameters
# are so the iso is created with eltorito header and on ISO 9660 format
cmd = ["mkisofs",
"-o", output_path,
"-A", iso_label,
"-V", iso_label,
"-U", "-J",
"-joliet-long",
"-r",
"-iso-level", "2",
"-b", "isolinux/isolinux.bin",
"-c", "isolinux/boot.cat",
"-no-emul-boot",
"-boot-load-size", "4",
"-boot-info-table",
"-eltorito-alt-boot",
"-eltorito-platform", "0xEF",
"-eltorito-boot", "efi.img",
"-no-emul-boot",
iso_directory
]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Making the iso EFI bootable
cmd = ["isohybrid", "--uefi", output_path]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Implant new checksum, required for ISO9660 image
cmd = ["implantisomd5", output_path]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
except Exception as e:
logger.error(str(e))
raise Exception(e)
def mount_iso(iso_to_mount, path_to_mount):
"""Tries to mount the ISO in a directory
:param path_to_mount: Path to directory where iso will be mounted
"""
logger.info("Mounting ISO on: %s" % path_to_mount)
if not os.path.isfile(iso_to_mount):
raise Exception("ISO not found: %s" % iso_to_mount)
if not os.path.exists(path_to_mount):
raise Exception("Mount path not found: %s" % path_to_mount)
# We try to mount the iso in the folder
try:
cmd = ["mount", "-o", "loop", iso_to_mount, path_to_mount]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
except Exception as e:
logger.error(str(e))
raise Exception(e)
else:
logger.info("ISO sucessfully mounted")
def umount_iso(mount_point):
"""Tries to umount ISO from directory
:param mount_pount: Path where the mount is on
"""
logger.info("Unmounting: %s" % mount_point)
try:
cmd = ["umount", "-l", mount_point]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
except Exception as e:
logger.error("Unmounting failed")
logger.error(str(e))
else:
logger.info("Unmounted sucessfully")
def get_yaml_value(keys_to_get):
"""Load debian base template and get value from specific key
:param keys_to_get: Name of the key
:returns: Value from the key
"""
with open(BASE_BULLSEYE_PATH) as stream:
try:
keys = keys_to_get.split('.')
data = yaml.safe_load(stream)
for key in keys:
data = data.get(key)
if data is None:
logger.error("keys sequence '%s' not found in %s",
keys_to_get, BASE_BULLSEYE_PATH)
sys.exit(1)
except FileNotFoundError:
logger.error("%s not found", BASE_BULLSEYE_PATH)
sys.exit(1)
return data
def setup_gpg_client():
"""Setup configuration for the GPG client
First we check if GPG configuration folder exist (GPG_HOME)
if it doesn't exist we set it up then we set the env variable
for the GPG client. This is usually not needed because lat sdk
create this folder to us but this is not always the case.
"""
ostree_gpg_id = get_yaml_value("gpg.ostree.gpgid")
ostree_gpg_key = get_yaml_value("gpg.ostree.gpgkey")
ostree_gpg_pass = get_yaml_value("gpg.ostree.gpg_password")
if not os.path.exists(GPG_HOME):
logger.info("GPG home (%s) doesn't exist, creating...", GPG_HOME)
os.environ["OECORE_NATIVE_SYSROOT"] = LAT_SDK_SYSROOT
os.makedirs(GPG_HOME)
cmd = f"chmod 700 {GPG_HOME}"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"echo allow-loopback-pinentry > {GPG_HOME}/gpg-agent.conf"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg-connect-agent --homedir {GPG_HOME} reloadagent /bye"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir {GPG_HOME} --import {ostree_gpg_key}"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir {GPG_HOME} --list-keys {ostree_gpg_id}"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir={GPG_HOME} -o /dev/null -u \"{ostree_gpg_id}\" --pinentry=loopback \
--passphrase {ostree_gpg_pass} -s /dev/null"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
os.environ["GNUPGHOME"] = GPG_HOME
logger.info("GPG homedir created with success.")
else:
logger.info("GPG home (%s) folder already exist.", GPG_HOME)
cmd = f"gpg --homedir={GPG_HOME} -o /dev/null -u \"{ostree_gpg_id}\" --pinentry=loopback \
--passphrase {ostree_gpg_pass} -s /dev/null"
logger.debug('Running command: %s', cmd)
subprocess.call([cmd], shell=True)
os.environ["GNUPGHOME"] = GPG_HOME
def add_tag_xml(parent, name, text):
"""Add tag with text to a parent tag
Utility function that helps us create XML tags inside another
tag with a value inside it without repeating ourselves too much.
:param parent: XML parent tag
:param name: Name of the tag
:param text: Value inside the tag
"""
tag = ET.SubElement(parent, name)
tag.text = text
def update_metadata_info(metadata, iso_path):
"""Update iso's metadata files
This function updates the metadata files with information from
the patches and the ostree repository.
:param metadata: Path to the metadata file
:param iso_path: Path to the ISO
"""
logger.info("Updating metadata's info...")
# Load XML structure and create base
tree = ET.parse(metadata)
root = tree.getroot()
content = ET.SubElement(root, "contents")
ostree = ET.SubElement(content, "ostree")
add_tag_xml(ostree, "number_of_commits", "1")
# Update prepatched iso field
add_tag_xml(root, "prepatched_iso", "Y")
base_element = ET.SubElement(ostree, "base")
# For now we add empty values here as the software
# expect this fields to be in the XML
add_tag_xml(base_element, "commit", "")
add_tag_xml(base_element, "checksum", "")
# Get ostree commit
try:
cmd = f"ostree --repo={iso_path}/ostree_repo rev-parse starlingx"
logger.debug('Running command: %s', cmd)
commit_value = subprocess.check_output(cmd, stderr=subprocess.STDOUT,
shell=True).decode(sys.stdout.encoding).strip()
except subprocess.CalledProcessError as e:
raise Exception(e.output)
except Exception as e:
raise Exception(e)
# Get ostree checksum
try:
cmd = (f"ostree --repo={iso_path}/ostree_repo log starlingx"
'| grep -m 1 -i checksum | sed "s/.* //"')
logger.debug('Running command: %s', cmd)
checksum_value = subprocess.check_output(cmd, stderr=subprocess.STDOUT,
shell=True).decode(sys.stdout.encoding).strip()
except subprocess.CalledProcessError as e:
raise Exception(e.output)
except Exception as e:
raise Exception(e)
# Add info in commit1
commit1_element = ET.SubElement(ostree, "commit1")
add_tag_xml(commit1_element, "commit", commit_value)
add_tag_xml(commit1_element, "checksum", checksum_value)
# Remove requires field from metadata
requires = root.find("requires")
if requires is not None:
requires.clear()
# Save metadata file changes
tree.write(metadata)
def main():
parser = argparse.ArgumentParser(description="Create a valid StarlingX ISO with patches \
already applied.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-i','--iso',type=str,
help="Full path to .iso file to be used as the base.",
required=True)
parser.add_argument('-p','--patch',type=str,
help="""Full path to every .patch file. You can specify more than one.\
e.g.: /localdisk/deploy/starlingx-24.09.1.patch""",
action='append',
required=True)
parser.add_argument('-o','--output',type=str,
help="""Location where the pre-patched iso will be saved. \
e.g.: /localdisk/deploy/prepatch.iso""",
required=True)
parser.add_argument('-v','--verbose',action='store_true',
help="Active debug logging")
args = parser.parse_args()
# Config logging
log_level = logging.INFO
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(level=log_level)
# Check if every argument is correct
if not os.path.isfile(args.iso):
logger.error(f"ISO file doesn't exist in {args.iso}")
sys.exit(1)
if os.path.isfile(args.output):
logger.error(f"Output file {args.output} already exist, please select another name.")
sys.exit(1)
for patch in args.patch:
if not os.path.isfile(patch):
logger.error(f"Patch file {patch} doesn't exist, please input a valid file.")
sys.exit(1)
# Check if env variables are correctly set
if not MYUNAME:
logger.error("Environment variable UNAME is not correctly set.")
sys.exit(1)
if not PROJECT:
logger.error("Environment variable PROJECT is not correctly set")
sys.exit(1)
if not HTTP_SERVER_IP:
logger.error("Environment variable HTTP_SERVER_IP is not correctly set")
sys.exit(1)
try:
# Create temporary folders to hold the mount point,
# the new iso files and the metadata and debs from patches
logger.info("Creating temporary folders...")
mnt_folder = tempfile.mkdtemp(prefix='mnt_')
iso_folder = tempfile.mkdtemp(prefix='iso_')
ptc_folder = tempfile.mkdtemp(prefix='patch_')
mount_iso(args.iso, mnt_folder)
logger.info('Copying all files from %s to %s', mnt_folder, iso_folder)
# Copy all files from the mount point to the iso temporary folder
cmd = ["rsync", "-a", f'{mnt_folder}/', iso_folder]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# With all files copied, we don't need the mount point anymore
umount_iso(mnt_folder)
# Change permissions on iso folder so we can update the files
os.chmod(iso_folder, 0o777)
# We initiate a reprepo feed in loadbuild because we need to access it
# through a http service
logger.info(f'Setting up package feed in {FEED_PATH}')
cmd = ["apt-ostree", "repo", "init", "--feed", FEED_PATH,
"--release", "bullseye", "--origin", "updates"]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
latest_patch_number = 0
logger.info('Unpacking patches...')
# For every patch we need to extract the metadata.xml, the deb files
# and save the sw_version and packages names to be used on apt-ostree
patches_data = []
for patch in args.patch:
with tempfile.TemporaryDirectory() as extract_folder:
with tarfile.open(patch) as f:
# We extract the metadata.xml from the metadata.tar
f.extract('metadata.tar', f"{extract_folder}/")
metadata_tar = tarfile.open(f"{extract_folder}/metadata.tar")
metadata_tar.extract('metadata.xml', f"{extract_folder}/")
# Get sw_version value and save metadata.xml using sw_version as sufix
xml_root = ET.parse(f"{extract_folder}/metadata.xml").getroot()
sw_version = xml_root.find('sw_version').text
os.makedirs(f"{ptc_folder}/{sw_version}/metadata")
metadata_path = (f"{ptc_folder}/{sw_version}/metadata/starlingx-{sw_version}"
"-metadata.xml")
shutil.copy(f"{extract_folder}/metadata.xml", metadata_path)
# From inside software.tar we extract every .deb file
f.extract('software.tar', f"{extract_folder}/")
software_tar = tarfile.open(f"{extract_folder}/software.tar")
software_tar.extractall(f"{ptc_folder}/{sw_version}/debs/")
# Packages names need to include version and revision
# e.g.: logmgmt_1.0-1.stx.10
packages = []
for i in xml_root.find('packages').findall('deb'):
packages.append(i.text.split("_")[0])
# Now we save the information we extract for later use
patches_data.append({
"sw_version": sw_version,
"path": f"{ptc_folder}/{sw_version}",
"packages": packages,
"metadata": metadata_path
})
# Save the biggest version from the patches we have
patch_num = int(sw_version.split(".")[-1])
if patch_num > latest_patch_number:
latest_patch_number = patch_num
logger.info(f'Patch {sw_version} unpacked sucessfully.')
# Here we setup our gpg client
setup_gpg_client()
# We delete the patches folder from the base iso and recreate it
# so we may populate with the metadatas from the patches we are using
shutil.rmtree(f"{iso_folder}/patches")
os.mkdir(f"{iso_folder}/patches")
# We clean all the metadatas inside upgrades folder
for file in glob.glob(f"{iso_folder}/upgrades/*-metadata.xml"):
os.remove(file)
# Now we need to populate reprepo feed with every deb from every patch
# after that we install it on the ostree repository
logger.info('Populate ostree repository with .deb files...')
for patch in patches_data:
# Scan /debs/ folder and load every patch to the reprepo feed
deb_dir = os.scandir(os.path.join(patch["path"],"debs/"))
for deb in deb_dir:
cmd = ["apt-ostree", "repo", "add", "--feed", FEED_PATH,
"--release", "bullseye", "--component", patch['sw_version'],
os.path.join(f"{patch['path']}/debs/", deb.name)]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Now with every deb loaded we commit it in the ostree repository
# apt-ostree requires an http connection to access the host files
# so we give the full http path using the ip
full_feed_path = f'\"{HTTP_FULL_ADDR}{FEED_PATH} bullseye\"'
gpg_key = get_yaml_value("gpg.ostree.gpgid")
pkgs = " ".join(patch["packages"])
cmd = ["apt-ostree", "compose", "install", "--repo", f"{iso_folder}/ostree_repo",
"--gpg-key", gpg_key, "--branch", "starlingx", "--feed", full_feed_path,
"--component", patch['sw_version'], pkgs]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# Copy only the patch metadata with the biggest patch version to ISO
patch_num = int(patch["sw_version"].split(".")[-1])
if latest_patch_number == patch_num:
# Metadata inside upgrades requires ostree information
update_metadata_info(patch["metadata"], iso_folder)
shutil.copy(patch["metadata"], f"{iso_folder}/patches")
shutil.copy(patch["metadata"], f"{iso_folder}/upgrades")
# Update ostree summary
cmd = ["ostree", "summary", "--update", f"--repo={iso_folder}/ostree_repo"]
logger.debug('Running command: %s', cmd)
subprocess.check_call(cmd, shell=False)
# TODO(dalbinob): Remember to copy only the latest ostree commit
# Now we get the label and re create the ISO with the new ostree
logger.info('Creating new .iso file...')
instlabel = get_label_from_isolinux_cfg(f"{iso_folder}/isolinux/isolinux.cfg")
create_iso(iso_folder, instlabel, args.output)
# Allow to edit and read the newly created iso
os.chmod(args.output, 0o777)
logger.info("Pre-patched ISO created sucessfully: %s", args.output)
except Exception as e:
logger.error('create-prepatched-iso failed, see error below:')
logger.error(str(e))
finally:
logger.info('Cleaning temporary folders...')
if mnt_folder:
os.system(f'rm -rf {mnt_folder}')
if iso_folder:
os.system(f'rm -rf {iso_folder}')
if ptc_folder:
os.system(f'rm -rf {ptc_folder}')
# Clean reprepro feed
if os.path.exists(FEED_PATH):
shutil.rmtree(FEED_PATH)
if __name__ == "__main__":
main()