update/sw-patch/cgcs-patch/cgcs_make_patch/make_patch.py
Leonardo Fagundes Luz Serrano 532644c769 make_patch: add tombstone for missing commit
If the reference repo used as basis for a patch
has incomplete history, then there must be
a "tombstone" for the latest missing commit.
Otherwise some ostree operations fail, such
as removing patches from the running system.

Test Plan:
pass - patch contains tombstone
pass - patch removal

Closes-Bug: 2086705

Change-Id: I6a4609e3f668702a15d40cac9e65c13d07850821
Signed-off-by: Leonardo Fagundes Luz Serrano <Leonardo.FagundesLuzSerrano@windriver.com>
2024-11-05 17:43:45 -03:00

1042 lines
43 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Copyright (c) 2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Creates Debian patches
Steps to create a debian patch:
1) Export env variables, e.g:
export PROJECT='stx-debian-desktop'
export STX_BUILD_HOME='/localdisk/designer/${USER}/${PROJECT}'
2) Run prepare to create a clone of the STX_BUILD_HOME ostree_repo
./make_patch.py prepare --clone-repo ostree_test
3) Make changes to the environment
- update a package
- build-pkgs -c -p <package name>
- build-image
This will create a new commit in the build ostree_repo
4) Create your patch.xml (samples provided)
- patch_recipe_rr_sample.xml
- patch_recipe_insvc_sample.xml
5) Run create to build the patch
./make_patch.py create --patch-recipe patch_recipe_rr_sample.xml \
--clone-repo ostree_test
Once the script is done the .patch file can be located at:
$STX_BUILD_HOME/localdisk/deploy/
"""
import argparse
import filecmp
import hashlib
import logging
import re
import tarfile
import time
import tempfile
import os
import shutil
import subprocess
import sys
import yaml
import xml.etree.ElementTree as ET
from xml.dom import minidom
# Signing function
sys.path.insert(0, "../../cgcs-patch")
from cgcs_patch.patch_signing import sign_files # noqa: E402 pylint: disable=wrong-import-position
from cgcs_patch.patch_verify import verify_files # noqa: E402 pylint: disable=wrong-import-position
PATCH_STATUS = {
'release': 'REL',
'obsolete': 'OBS',
'development': 'DEV'
}
METADATA_TAGS = ['ID', 'SW_VERSION', 'SUMMARY', 'DESCRIPTION', 'INSTALL_INSTRUCTIONS', 'WARNINGS', 'STATUS',
'UNREMOVABLE', 'REBOOT_REQUIRED', 'REQUIRES', 'RESTART_SCRIPT', 'APPLY_ACTIVE_RELEASE_ONLY']
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
log = logging.getLogger("make_patch")
def get_md5(path):
'''
Utility function for generating the md5sum of a file
:param path: Path to file
'''
md5 = hashlib.md5()
block_size = 8192
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(block_size), b''):
md5.update(chunk)
return int(md5.hexdigest(), 16)
class PatchError(Exception):
"""Base class for patch exceptions."""
def __init__(self, message=None):
super(PatchError, self).__init__(message)
self.message = message
def __str__(self):
return self.message or ""
class PatchBuilderInitError(PatchError):
"""Problem during PatchBuilder initialization."""
pass
class PatchRecipeXMLFail(PatchError):
"""Unkown tag"""
pass
class PatchInvalidStatus(PatchError):
"""Invalid status"""
pass
class PatchModifyError(PatchError):
"""Error while modifying patch"""
pass
class PatchValidationFailure(PatchError):
"""Patch validation failure"""
pass
class PatchRecipeData(object):
"""
Patch data
"""
def __init__(self):
self.patch_id = None
self.sw_version = None
self.requires = []
self.metadata = {}
self.restart_script = {}
self.ostree_content = {}
def __parse_root(self, e):
for child in e:
if child.tag == "METADATA":
self.__parse_metadata(child)
else:
msg = f"Unknown tag '{child.tag}' under <PATCH_RECIPE>"
log.exception(msg)
raise PatchRecipeXMLFail(msg)
if "ID" in self.metadata:
self.patch_id = self.metadata["ID"]
# Update restart script name - prefix with patch_id
if self.restart_script:
self.restart_script["metadata_name"] = self.patch_id + "_" \
+ self.restart_script["name"]
else:
msg = "patch is missing required field <PATCH_RECIPE><METADATA><ID>"
log.exception(msg)
raise PatchRecipeXMLFail(msg)
if "SW_VERSION" in self.metadata:
self.sw_version = self.metadata["SW_VERSION"]
else:
msg = f"patch '{self.patch_id}' is missing required field <PATCH_RECIPE><METADATA><SW_VERSION>"
log.exception(msg)
raise PatchRecipeXMLFail(msg)
def __parse_metadata(self, e):
for child in e:
if child.tag == "REQUIRES":
self.__parse_requires(child)
elif child.tag == "RESTART_SCRIPT":
self.__parse_restart_script(child)
elif child.tag in METADATA_TAGS:
self.metadata[child.tag] = child.text.strip() if child.text else ""
else:
msg = f"Unknow tag '{child.tag}' under <PATCH_RECIPE><METADATA>"
log.exception(msg)
raise PatchRecipeXMLFail(msg)
def __parse_restart_script(self, e):
e.text = e.text.strip()
if os.path.isfile(e.text):
self.restart_script["name"] = os.path.basename(e.text)
self.restart_script["full_path"] = e.text
else:
msg = f"Restart script not found {e.text}"
log.exception(msg)
raise PatchRecipeXMLFail(msg)
def __parse_requires(self, e):
for child in e:
if child.tag == "ID":
req = child.text.strip() if child.text else None
if req is None:
msg = "Patch id missing under <PATCH_RECIPE><METADATA><REQUIRES><ID>"
log.exception(msg)
raise PatchRecipeXMLFail(msg)
self.requires.append(req)
else:
msg = f"Unknow tag '{child.tag}' under <PATCH_RECIPE><METADATA><REQUIRES>"
log.exception(msg)
raise PatchRecipeXMLFail(msg)
def parse_xml(self,
filename):
"""
Parse an individual patch recipe XML file
:param filename: XML file
:return: Patch ID
"""
tree = ET.parse(filename)
root = tree.getroot()
self.__parse_root(root)
log.info("patch_id: %s", str(self.patch_id))
log.info("metadata: %s", str(self.metadata))
class PatchBuilder(object):
"""
Patch Builder - Create debian patches based on ostree changes
"""
def __init__(self, delta_dir="delta_dir"):
try:
# ostree repo location
self.deploy_dir = os.path.join(os.environ["STX_BUILD_HOME"], "localdisk/deploy")
self.ostree_repo = os.path.join(self.deploy_dir, "ostree_repo")
self.delta_dir = delta_dir
self.detached_signature_file = "signature.v2"
self.restart_script = None
self.patch_path = None
self.patch_data = None
self.patch_file_name = None
self.ostree_content = None
except PatchBuilderInitError:
log.error("Error while initializing PatchBuilder")
def __add_text_tag_to_xml(self, parent, name, text):
"""
Utility function for adding a text tag to an XML object
:param parent: Parent element
:param name: Element name
:param text: Text value
:return:The created element
"""
tag = ET.SubElement(parent, name)
tag.text = text
return tag
def __gen_xml(self, file_name="metadata.xml"):
"""
Generate patch metadata XML file
:param file_name: Path to output file
"""
top = ET.Element("patch")
self.__add_text_tag_to_xml(top, "id", self.patch_data.patch_id)
self.__add_text_tag_to_xml(top, "sw_version", self.patch_data.sw_version)
self.__add_text_tag_to_xml(top, "summary", self.patch_data.metadata["SUMMARY"])
self.__add_text_tag_to_xml(top, "description", self.patch_data.metadata["DESCRIPTION"])
self.__add_text_tag_to_xml(top, "install_instructions", self.patch_data.metadata["INSTALL_INSTRUCTIONS"])
self.__add_text_tag_to_xml(top, "warnings", self.patch_data.metadata["WARNINGS"])
if "STATUS" in self.patch_data.metadata:
self.__add_text_tag_to_xml(top, "status", self.patch_data.metadata["STATUS"])
else:
self.__add_text_tag_to_xml(top, "status", PATCH_STATUS['development'])
self.__add_text_tag_to_xml(top, "unremovable", self.patch_data.metadata["UNREMOVABLE"])
self.__add_text_tag_to_xml(top, "reboot_required", self.patch_data.metadata["REBOOT_REQUIRED"])
if "APPLY_ACTIVE_RELEASE_ONLY" in self.patch_data.metadata:
self.__add_text_tag_to_xml(top, "apply_active_release_only", self.patch_data.metadata["APPLY_ACTIVE_RELEASE_ONLY"])
if self.patch_data.restart_script:
self.__add_text_tag_to_xml(
top,
"restart_script",
self.patch_data.restart_script["metadata_name"]
)
# Parse ostree_content
content = ET.SubElement(top, "contents")
ostree = ET.SubElement(content, "ostree")
self.__add_text_tag_to_xml(ostree, "number_of_commits", str(len(self.ostree_content["commits"])))
base_commit = ET.SubElement(ostree, "base")
self.__add_text_tag_to_xml(base_commit, "commit", self.ostree_content["base"]["commit"])
self.__add_text_tag_to_xml(base_commit, "checksum", self.ostree_content["base"]["checksum"])
for i, c in enumerate(self.ostree_content["commits"]):
commit = ET.SubElement(ostree, "commit" + str(i + 1))
self.__add_text_tag_to_xml(commit, "commit", c["commit"])
self.__add_text_tag_to_xml(commit, "checksum", c["checksum"])
req = ET.SubElement(top, 'requires')
for req_patch in sorted(self.patch_data.requires):
self.__add_text_tag_to_xml(req, 'req_patch_id', req_patch)
self.__add_text_tag_to_xml(top, "semantics", "")
# Save xml
outfile = open(file_name, "w")
tree = ET.tostring(top)
outfile.write(minidom.parseString(tree).toprettyxml(indent=" "))
def __reuse_initramfs(self, rootfs_base_dir, rootfs_new_dir):
"""
Try to reuse the initramfs file /usr/lib/ostree-boot/initramfs-xxx and its signature
:param rootfs_base_dir: original root filesystem
:param rootfs_new_dir: newest root filesystem
:return: True if reuse the initramfs.
"""
# Compare the version of package initramfs-trigger, not same, not reuse.
if "NO_REUSE_INITRAMFS" in os.environ.keys():
return False
base_trigger_version = new_trigger_version = ""
cmd = f"dpkg --root {rootfs_base_dir} -l initramfs-trigger | tail -n 1"
ret = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding).strip()
if ret:
base_trigger_version = ret.split()[2].split(".stx")[0]
cmd = f"dpkg --root {rootfs_new_dir} -l initramfs-trigger | tail -n 1"
ret = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding).strip()
if ret:
new_trigger_version = ret.split()[2].split(".stx")[0]
# If feed rootfs has no package "initramfs-trigger", or the version of this package is not same,
# do not reuse the initramfs file.
if not new_trigger_version:
log.error("New build rootfs %s has no package initramfs-trigger!", rootfs_new_dir)
raise Exception("New build rootfs has no package initramfs-trigger!")
if not base_trigger_version or base_trigger_version != new_trigger_version:
log.info("We can not reuse the initramfs file for the versio of initramfs-trigger changed")
return False
# unpack two initramfs files
initramfs_base_dir = initramfs_new_dir = ""
os.path.join(os.path.dirname(rootfs_base_dir), "initramfs_base")
initramfs_new_dir = os.path.join(os.path.dirname(rootfs_new_dir), "initramfs_new")
for initrd_type, hugefs_dir in [("base", rootfs_base_dir), ("new", rootfs_new_dir)]:
initramfs_dir = os.path.join(os.path.dirname(hugefs_dir), "initramfs_" + initrd_type)
if initrd_type == "base":
initramfs_base_dir = initramfs_dir
else:
initramfs_new_dir = initramfs_dir
log.info("Unpack %s initramfs into %s", initrd_type, initramfs_dir)
os.mkdir(initramfs_dir)
cmd = f"cp {hugefs_dir}/usr/lib/ostree-boot/initramfs-* {initramfs_dir}/initrd.gz"
subprocess.call([cmd], shell=True)
cmd = f"gunzip {initramfs_dir}/initrd.gz"
subprocess.call([cmd], shell=True)
os.chdir(initramfs_dir)
cmd = "cpio -idm < initrd; rm -f initrd"
subprocess.call([cmd], shell=True)
# compare its log file /var/log/rootfs_install.log
base_install_log = os.path.join(initramfs_base_dir, "var/log/rootfs_install.log")
new_install_log = os.path.join(initramfs_new_dir, "var/log/rootfs_install.log")
if not os.path.exists(new_install_log):
log.error("Log file %s does not exist.", new_install_log)
raise Exception("Initramfs, install log file does not exist.")
if not os.path.exists(base_install_log):
log.info("The feed initramfs file has no install log, please careful.")
if os.path.exists(base_install_log) and os.path.exists(new_install_log):
initramfs_delta_dir = os.path.join(os.path.dirname(rootfs_new_dir), "initramfs_delta")
if filecmp.cmp(base_install_log, new_install_log):
log.info("Two initramfs have same install log files.")
else:
log.warning("install log files of two initramfs are NOT same:")
log.warning("Log file of feed initramfs: %s", base_install_log)
log.warning("Log file of new initramfs: %s", new_install_log)
os.mkdir(initramfs_delta_dir)
# Add "-q" to make the output clean
# subprocess.call(["rsync", "-rpgoc", "--compare-dest", initramfs_base_dir + "/", initramfs_new_dir + "/", initramfs_delta_dir + "/"])
subprocess.call(["rsync", "-rpgocq", "--compare-dest", initramfs_base_dir + "/", initramfs_new_dir + "/", initramfs_delta_dir + "/"])
log.info("The delta folder of two initramfs: %s.", initramfs_delta_dir)
log.info("Reuse initramfs files to shrink Debian patch...")
# reuse boot-initramfs files and their signature.
cmd = " ".join([
"rm -f",
os.path.join(rootfs_new_dir, 'usr/lib/ostree-boot/initramfs*'),
os.path.join(rootfs_new_dir, 'boot/initrd.img*'),
os.path.join(rootfs_new_dir, 'var/miniboot/initrd-mini*')
])
subprocess.call(cmd, shell=True)
cmd = " ".join(["cp -a", os.path.join(rootfs_base_dir, 'usr/lib/ostree-boot/initramfs*'), os.path.join(rootfs_new_dir, 'usr/lib/ostree-boot/')])
subprocess.call(cmd, shell=True)
cmd = " ".join(["cp -a", os.path.join(rootfs_base_dir, 'var/miniboot/initrd-mini*'), os.path.join(rootfs_new_dir, 'var/miniboot/')])
subprocess.call(cmd, shell=True)
cmd = " ".join(["cp -a", os.path.join(rootfs_base_dir, 'boot/initrd.img*'), os.path.join(rootfs_new_dir, 'boot/')])
subprocess.call(cmd, shell=True)
# Find and get checksum of necessary images: kernel_rt_file + kernel_file + vmlinuz_file + initramfs_file
ostree_boot_dir = os.path.join(rootfs_new_dir, 'usr/lib/ostree-boot')
kernel_rt_file = kernel_file = vmlinuz_file = initramfs_file = ""
for file_name in os.listdir(ostree_boot_dir):
if not os.path.isfile(os.path.join(ostree_boot_dir, file_name)):
continue
file_path = os.path.join(ostree_boot_dir, file_name)
if file_name.endswith(".sig"):
continue
if file_name.startswith("vmlinuz-") and file_name.endswith("-amd64"):
if file_name.find("-rt-") == -1:
kernel_file = file_path
else:
kernel_rt_file = file_path
continue
if file_name.startswith("vmlinuz-") and len(file_name) == len("vmlinuz-") + 64:
vmlinuz_file = file_path
continue
if file_name.startswith("initramfs-") and len(file_name) == len("initramfs-") + 64:
initramfs_file = file_path
continue
# Any file missed, raise exception.
if not kernel_rt_file or not kernel_file or not vmlinuz_file or not initramfs_file:
log.error("Miss key file when calculate sha256 checksum")
log.info("RT kernel image: %s", kernel_rt_file)
log.info("STD kernel image: %s", kernel_file)
log.info("vmlinuz image: %s", vmlinuz_file)
log.info("initramfs image: %s", initramfs_file)
raise Exception("Miss key file when calculate sha256 checksum")
# Order: std, rt, vmlinuz, initramfs
cmd = " ".join(["cat", kernel_file, kernel_rt_file, vmlinuz_file, initramfs_file, "| sha256sum | cut -d' ' -f 1"])
new_checksum = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding).strip()
log.info("New checksum is %s", new_checksum)
vmlinuz_new_name = "vmlinuz-" + new_checksum
initramfs_new_name = "initramfs-" + new_checksum
os.rename(vmlinuz_file, os.path.join(ostree_boot_dir, vmlinuz_new_name))
os.rename(initramfs_file, os.path.join(ostree_boot_dir, initramfs_new_name))
return True
def __add_latest_missing_ostree_commit_tombstone(self, ref_ostree_repo):
"""
Consider an ostree repo with history that is missing commits
(eg.: using ostree pull with depth 0 to create a copy with just the last commit
of a repo that has several commits).
If commits are missing, there must be an ostree commit tombstone for the most recent missing one
to signal that it is missing intentionally. Otherwise ostree will throw an error for some operations.
For example, starting with a repo that has 5 commits, if a copy is created
using pull depth 0 it will contain only the latest commit.
This copy MUST have a tombstone for the 4th commit.
This function checks the :ref_ostree_repo: for the latest missing ostree commit
and adds the corresponding tombstone file to self.delta_dir to be included in the patch,
to assert it will be available in the running system.
:param ref_ostree_repo: Reference ostree repo. The one without the patch content.
"""
cmd = f"ostree --repo={ref_ostree_repo} log starlingx"
repo_history = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding)
if "History beyond this commit not fetched" not in repo_history:
# No commits are missing. Nothing to do
return
try:
latest_missing_commit = re.findall(pattern=r"[Pp]arent:\s*(.*)", string=repo_history)[-1]
log.info("Latest missing commit in ref ostree repo: %s", latest_missing_commit)
except IndexError:
log.warning("Could not add tombstone for latest missing commit. Ostree history does not show commit parents.")
return
tombstone_relative_path = f"objects/{latest_missing_commit[:2]}/{latest_missing_commit[2:]}.tombstone-commit"
tombstone_to_create = os.path.join(self.delta_dir, tombstone_relative_path)
os.makedirs(os.path.dirname(tombstone_to_create), exist_ok=True)
with open(file=tombstone_to_create, mode='w'):
pass
log.info("Added tombstone to the patch: %s", tombstone_relative_path)
def __create_patch_repo(self, clone_dir="ostree-clone"):
"""
Create the ostree contains delta content
Used to compare with the feed ostree repo
:param clone_dir: clone dir name
"""
log.info("Creating patch ostree")
workdir = os.path.join(self.deploy_dir, "patch_work")
if os.path.exists(workdir):
shutil.rmtree(workdir)
os.mkdir(workdir)
os.chdir(workdir)
# Checkout both ostree repos
repo_base_dir = clone_dir
cmd = f"cat {repo_base_dir}/refs/heads/starlingx"
commit_id_base = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding).strip()
rootfs_base_dir = os.path.join(workdir, "rootfs_base")
log.info("Checkout commit %s from base OSTree %s, it may take several minutes.", commit_id_base[:6], repo_base_dir)
cmd = f"ostree --repo={repo_base_dir} checkout {commit_id_base} {rootfs_base_dir}"
log.info("Command line: %s", cmd)
subprocess.call([cmd], shell=True)
log.info("Done. Checkout base root fs in %s", rootfs_base_dir)
repo_new_dir = self.ostree_repo
cmd = f"cat {repo_new_dir}/refs/heads/starlingx"
commit_id_new = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding).strip()
rootfs_new_dir = os.path.join(workdir, "rootfs_new")
log.info("Checkout commit %s from new OSTree %s, it may take several minutes.", commit_id_new[:6], repo_new_dir)
cmd = f"ostree --repo={repo_new_dir} checkout {commit_id_new} {rootfs_new_dir}"
log.info("Command line: %s", cmd)
subprocess.call([cmd], shell=True)
log.info("Done. Checkout new root fs in %s", rootfs_new_dir)
self.__remove_ssh_host_keys(rootfs_new_dir)
# Try to reuse files from feed rootfs.
try:
initrd_reused = self.__reuse_initramfs(rootfs_base_dir, rootfs_new_dir)
# Nothing can be reused, just use the self.ostree_repo as the patch repo
if not initrd_reused:
log.info("No file can be reused, we can just use the original repo: %s", self.ostree_repo)
return self.ostree_repo, False
except Exception as e:
log.exception("Failed on reusing files of feed repo. %s", e)
return self.ostree_repo, False
# create patch repo
tmp_patch_repo_dir = os.path.join(workdir, "patch_repo_tmp")
patch_repo_dir = os.path.join(workdir, "patch_repo")
log.info("Create a new OSTree repo in %s, may take a few minutes", patch_repo_dir)
cmd = f"ostree --repo={tmp_patch_repo_dir} init --mode=bare"
subprocess.call([cmd], shell=True)
self.__check_gnupg_folder()
# Pull history from ostree prepatch (clone_dir)
log.info("Pull history from %s.", clone_dir)
cmd = f"ostree --repo={tmp_patch_repo_dir} pull-local {clone_dir}"
subprocess.call([cmd], shell=True)
timestamp = time.asctime()
gpg_id = self.__get_yaml_info('gpg.ostree.gpgid')
subject = "Commit-id: starlingx-intel-x86-64-" + time.strftime("%Y%m%d%H%M%S", time.localtime())
cmd = " ".join(["ostree", "--repo=" + tmp_patch_repo_dir, "commit", "--tree=dir=" + rootfs_new_dir,
"--skip-if-unchanged", "--gpg-sign=" + gpg_id + " --gpg-homedir=/tmp/.lat_gnupg_root",
"--branch=starlingx", "'--timestamp=" + timestamp + "'",
"'--subject=" + subject + "'",
"'--parent=" + commit_id_base + "'"])
subprocess.call([cmd], shell=True)
cmd = f"ostree --repo={patch_repo_dir} init --mode=archive-z2"
subprocess.call([cmd], shell=True)
# Pull with depth=1 to get parent data
cmd = f"ostree --repo={patch_repo_dir} pull-local --depth=1 {tmp_patch_repo_dir}"
subprocess.call([cmd], shell=True)
cmd = f"ostree summary -u --repo={patch_repo_dir}"
subprocess.call([cmd], shell=True)
log.info("New ostree repo been created: %s", patch_repo_dir)
log.info(" Based on bare repo %s", tmp_patch_repo_dir)
log.info(" Based on root filesystem %s", rootfs_new_dir)
return patch_repo_dir, True
def __remove_ssh_host_keys(self, rootfs_new_dir):
"""
Delete ssh host-key files inside new rootfs dir
:param rootfs_new_dir: rootfs dir name
"""
cmd = " ".join(["rm -f", os.path.join(rootfs_new_dir, 'usr/etc/ssh/ssh_host/ssh_host_rsa_key.pub')])
subprocess.call(cmd, shell=True)
cmd = " ".join(["rm -f", os.path.join(rootfs_new_dir, 'usr/etc/ssh/ssh_host/ssh_host_rsa_key')])
subprocess.call(cmd, shell=True)
cmd = " ".join(["rm -f", os.path.join(rootfs_new_dir, 'usr/etc/ssh/ssh_host/ssh_host_ed25519_key.pub')])
subprocess.call(cmd, shell=True)
cmd = " ".join(["rm -f", os.path.join(rootfs_new_dir, 'usr/etc/ssh/ssh_host/ssh_host_ed25519_key')])
subprocess.call(cmd, shell=True)
cmd = " ".join(["rm -f", os.path.join(rootfs_new_dir, 'usr/etc/ssh/ssh_host/ssh_host_ecdsa_key.pub')])
subprocess.call(cmd, shell=True)
cmd = " ".join(["rm -f", os.path.join(rootfs_new_dir, 'usr/etc/ssh/ssh_host/ssh_host_ecdsa_key')])
subprocess.call(cmd, shell=True)
def __check_gnupg_folder(self):
"""
Check if GPG homedir exists and create it if necessary
"""
gpg_home = "/tmp/.lat_gnupg_root"
if not os.path.exists(gpg_home):
log.info("Creating %s", gpg_home)
os.environ["OECORE_NATIVE_SYSROOT"] = "/opt/LAT/SDK/sysroots/x86_64-wrlinuxsdk-linux"
ostree_gpg_id = self.__get_yaml_info("gpg.ostree.gpgid")
ostree_gpg_key = self.__get_yaml_info("gpg.ostree.gpgkey")
ostree_gpg_pass = self.__get_yaml_info("gpg.ostree.gpg_password")
os.makedirs(gpg_home)
cmd = f"chmod 700 {gpg_home}"
subprocess.call([cmd], shell=True)
cmd = f"echo allow-loopback-pinentry > {gpg_home}/gpg-agent.conf"
subprocess.call([cmd], shell=True)
cmd = f"gpg-connect-agent --homedir {gpg_home} reloadagent /bye"
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir {gpg_home} --import {ostree_gpg_key}"
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir {gpg_home} --list-keys {ostree_gpg_id}"
subprocess.call([cmd], shell=True)
cmd = f"gpg --homedir={gpg_home} -o /dev/null -u \"{ostree_gpg_id}\" --pinentry=loopback --passphrase {ostree_gpg_pass} -s /dev/null"
subprocess.call([cmd], shell=True)
log.info("GPG homedir created with success.")
else:
log.info("GPG home (%s) folder already exist.", gpg_home)
def __get_yaml_info(self, keys_to_get):
"""
Get data from base-bullseye yaml file
:param keys_to_get: keys sequence to get
"""
with open(os.path.join(os.environ["MY_REPO_ROOT_DIR"],
"stx-tools/debian-mirror-tools/config/debian/common/base-bullseye.yaml"), "r") as stream:
try:
keys = keys_to_get.split('.')
data = yaml.safe_load(stream)
for key in keys:
data = data.get(key)
if data is None:
log.error("keys sequence '%s' not found in base-bullseye.yaml", keys_to_get)
sys.exit(1)
except FileNotFoundError:
log.error("base-bullseye.yaml not found")
sys.exit(1)
return data
def __create_delta_dir(self, patch_repo_dir, clone_dir="ostree-clone"):
"""
Creates the ostree delta directory
Contains the changes from the REPO (updated) and the cloned dir (pre update)
:param clone_dir: clone dir name
"""
log.info("Creating ostree delta")
clone_dir = os.path.join(self.deploy_dir, clone_dir)
os.chdir(self.deploy_dir)
# self.delta_dir is 'relative' to self.deploy_dir
if os.path.isdir(self.delta_dir):
log.error("Delta dir exists '%s', clean it up and try again", self.delta_dir)
exit(1)
if not os.path.isdir(clone_dir):
log.error("Clone dir not found")
exit(1)
subprocess.call(["rsync", "-rcpgo",
"--exclude=/.lock",
"--exclude=/config",
"--no-owner",
"--compare-dest",
clone_dir,
patch_repo_dir + "/",
self.delta_dir + "/"])
log.info("Delta dir created")
self.__add_latest_missing_ostree_commit_tombstone(ref_ostree_repo=clone_dir)
def __get_commit_checksum(self, commit_id, repo="ostree_repo"):
"""
Get commit checksum from a commit id
:param commit_id
:param repo
"""
# get all checksums
cmd = f"ostree --repo={repo} log starlingx | grep -i checksum | sed \"s/.* //\""
cksums = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding).strip().split("\n")
return(cksums[commit_id])
def __get_commits_from_base(self, base_sha, repo="ostree_repo"):
"""
Get a list of commits from base sha
:param base_sha
:param repo
"""
commits_from_base = []
cmd = f"ostree --repo={repo} log starlingx | grep \"^commit\" | sed \"s/.* //\""
commits = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding).strip().split("\n")
log.info("Patch repo commits %s", commits)
if commits[0] == base_sha:
log.info("base and top commit are the same")
return commits_from_base
# find base and add the commits to the list
for i, commit in enumerate(commits):
if commit == base_sha:
break
log.info("saving commit %s", commit)
# find commit checksum
cksum = self.__get_commit_checksum(i, repo)
commits_from_base.append({
"commit": commit,
"checksum": cksum
})
return commits_from_base
def __sign_official_patches(self, patch_file):
"""
Sign formal patch
Called internally once a patch is created and formal flag is set to true
:param patch_file full path to the patch file
"""
log.info("Signing patch %s", patch_file)
try:
# patch_file_path = os.path.join(self.deploy_dir, self.patch_file_name)
subprocess.check_call(["sign_patch_formal.sh", patch_file])
except subprocess.CalledProcessError as e:
log.exception("Failed to sign official patch. Call to sign_patch_formal.sh process returned non-zero exit status %i", e.returncode)
except FileNotFoundError:
log.exception("sign_patch_formal.sh not found, make sure $STX_BUILD_HOME/repo/cgcs-root/build-tools is in the $PATH")
def __sign_and_pack(self, patch_file, formal=False):
"""
Generates the patch signatures and pack the .patch file
:param patch_file .patch file full path
"""
filelist = ["metadata.tar", "software.tar"]
# Generate the local signature file
sig = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
for f in filelist:
sig ^= get_md5(f)
sigfile = open("signature", "w")
sigfile.write("%x" % sig)
sigfile.close()
# this comes from patch_functions write_patch
# Generate the detached signature
#
# Note: if cert_type requests a formal signature, but the signing key
# is not found, we'll instead sign with the "dev" key and
# need_resign_with_formal is set to True.
need_resign_with_formal = sign_files(
filelist,
self.detached_signature_file,
cert_type=None)
log.debug("Formal signing status %s", need_resign_with_formal)
# Save files into .patch
files = [f for f in os.listdir('.') if os.path.isfile(f)]
tar = tarfile.open(patch_file, "w:gz")
for file in files:
log.info("Saving file %s", file)
tar.add(file)
tar.close()
log.info("Patch file created %s", patch_file)
if formal:
log.info("Trying to sign formal patch")
self.__sign_official_patches(patch_file)
def prepare_env(self, clone_repo="ostree-clone"):
"""
Generates a copy of the current ostree_repo which is used
to create the delta dir during patch creation
:param clone_repo: name of the cloned directory
"""
log.info("Preparing ostree clone directory")
os.chdir(self.deploy_dir)
clone_dir = os.path.join(self.deploy_dir, clone_repo)
if os.path.isdir(clone_dir):
log.error("Clone directory exists %s", clone_repo)
sys.exit(1)
os.mkdir(clone_dir)
current_sha = open(os.path.join(self.ostree_repo, "refs/heads/starlingx"), "r").read()
log.info("Current SHA: %s", current_sha)
log.info("Cloning the directory...")
# Clone the ostree_repo dir using hard-links (ignores the .lock file)
subprocess.call(["rsync", "-a", "--exclude", ".lock", "--link-dest", "../ostree_repo", self.ostree_repo + "/", clone_dir])
log.info("Prepared ostree repo clone at %s", clone_dir)
def create_patch(self, patch_data: PatchRecipeData, clone_dir="ostree-clone", formal=False):
"""
Creates a debian patch using ostree delta between 2 repos (rsync)
:param patch_data: PatchRecipeData object
:param clone_dir: repo cloned before the changes
"""
self.patch_data = patch_data
self.patch_file_name = patch_data.patch_id + ".patch"
os.chdir(self.deploy_dir)
# read the base sha from the clone/ga directory
base_sha = open(os.path.join(clone_dir, "refs/heads/starlingx"), "r").read().strip()
log.info("Generating delta ostree repository")
patch_repo_dir, reuse_initramfs = self.__create_patch_repo(clone_dir=clone_dir)
log.info("Generating delta dir")
self.__create_delta_dir(patch_repo_dir, clone_dir=clone_dir)
# ostree --repo=ostree_repo show starlingx | grep -i checksum | sed "s/.* //"
cmd = f"ostree --repo={clone_dir} show starlingx | grep -i checksum | sed \"s/.* //\""
base_checksum = subprocess.check_output(cmd, shell=True).decode(sys.stdout.encoding).strip()
# Get commits from updated ostree repo
commits = self.__get_commits_from_base(base_sha, patch_repo_dir)
if commits:
self.ostree_content = {
"base": {
"commit": base_sha,
"checksum": base_checksum
},
}
self.ostree_content["commits"] = commits
else:
log.info("No changes detected")
# clean it up delta_dir
shutil.rmtree(self.delta_dir)
sys.exit(0)
log.info("Generating patch file...")
# Create software.tar, metadata.tar and signatures
# Create a temporary working directory
tmpdir = tempfile.mkdtemp(prefix="patch_")
# Change to the tmpdir
os.chdir(tmpdir)
tar = tarfile.open("software.tar", "w")
tar.add(os.path.join(self.deploy_dir, self.delta_dir), arcname="")
tar.close()
log.info("Generating xml with ostree content %s", commits)
self.__gen_xml()
tar = tarfile.open("metadata.tar", "w")
tar.add("metadata.xml")
tar.close()
os.remove("metadata.xml")
if self.patch_data.restart_script:
log.info("Saving restart scripts")
shutil.copyfile(
self.patch_data.restart_script["full_path"],
os.path.join(tmpdir, self.patch_data.restart_script["metadata_name"])
)
subprocess.call(["ls", "-lhrt"])
# Sign and create the .patch file
self.__sign_and_pack(
os.path.join(self.deploy_dir, self.patch_file_name),
formal
)
os.chdir(self.deploy_dir)
shutil.rmtree(tmpdir)
shutil.rmtree(self.delta_dir)
if reuse_initramfs:
# If initramfs is reused it needs to update the ostree_repo commit to match the patch_repo
cmd = f"ostree --repo={self.ostree_repo} pull-local \
{patch_repo_dir} starlingx; \
ostree --repo={self.ostree_repo} summary --update"
try:
subprocess.check_call([cmd], shell=True)
except subprocess.CalledProcessError as e:
log.exception("Failed pull patch_repo commit into ostree_repo. %s", e.stderr)
log.info("Patch file created %s at %s", self.patch_file_name, self.deploy_dir)
def modify_metadata_text(self, filename, key, value):
"""
Open an xml file, find first element matching 'key' and replace the text with 'value'
"""
new_filename = "%s.new" % filename
tree = ET.parse(filename)
# Prevent a proliferation of carriage returns when we write this XML back out to file.
for e in tree.iter():
if e.text is not None:
e.text = e.text.rstrip()
if e.tail is not None:
e.tail = e.tail.rstrip()
root = tree.getroot()
# Make the substitution
e = root.find(key)
if e is None:
msg = "modify_metadata_text: failed to find tag '%s'" % key
log.error(msg)
raise PatchValidationFailure(msg)
e.text = value
# write the modified file
outfile = open(new_filename, 'w')
rough_xml = ET.tostring(root)
outfile.write(minidom.parseString(rough_xml).toprettyxml(indent=" "))
outfile.close()
os.rename(new_filename, filename)
def read_patch(self, path):
"""
Extract the patch to current dir and validate signature
"""
# Open the patch file and extract the contents to the current dir
tar = tarfile.open(path, "r:gz")
tar.extractall()
# Checks signature
sigfile = open("signature", "r")
sig = int(sigfile.read(), 16)
sigfile.close()
filelist = ["metadata.tar", "software.tar"]
expected_sig = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
for f in filelist:
sig ^= get_md5(f)
if sig != expected_sig:
msg = "Patch failed verification"
log.error(msg)
raise PatchValidationFailure(msg)
# Verify detached signature
if os.path.exists(self.detached_signature_file):
sig_valid = verify_files(
filelist,
self.detached_signature_file,
cert_type=None)
sig_valid = True
if sig_valid is True:
msg = "Signature verified, patch has been signed"
else:
msg = "Signature check failed"
raise PatchValidationFailure(msg)
else:
msg = "Patch has not been signed"
raise PatchValidationFailure(msg)
# Extract metadata xml
tar = tarfile.open("metadata.tar")
tar.extractall()
def write_patch(self, patch_file, formal=False):
"""
Write files into .patch file and sign
"""
log.info("Saving patch file")
tar = tarfile.open("metadata.tar", "w")
tar.add("metadata.xml")
tar.close()
# remove the xml
os.remove("metadata.xml")
# Sign and create the .patch file
self.__sign_and_pack(patch_file, formal)
def handle_create(params):
"""
Create a patch
:param params: argparser parameters
"""
log.info("Creating patch")
if os.path.isfile(params.patch_recipe):
patch_data = PatchRecipeData()
patch_data.parse_xml(params.patch_recipe)
else:
raise FileNotFoundError("Patch recipe not found")
# continue steps to create a patch
patch_builder = PatchBuilder(params.delta_dir)
patch_builder.create_patch(patch_data, params.clone_repo, params.formal)
def handle_prepare(params):
"""
Prepare the environment by creating an ostree_repo clone
:param params: argparser parameters
"""
log.info("Preparing environment")
patch_builder = PatchBuilder()
log.info("Deploy dir %s", patch_builder.deploy_dir)
patch_builder.prepare_env(params.clone_repo)
def handle_modify(params):
"""
Modify patch status and resigns
"""
log.info("Modifying patch %s", params.patch_file)
if not os.path.isfile(params.patch_file):
raise FileNotFoundError("Patch file not found")
if params.status not in PATCH_STATUS:
raise PatchInvalidStatus(f"Supported status are {PATCH_STATUS}")
# Modify patch
orig_wd = os.getcwd()
workdir = tempfile.mkdtemp(prefix="patch_modify_")
os.chdir(workdir)
try:
p = PatchBuilder()
# extract and validate signatures
p.read_patch(params.patch_file)
log.info("Updating patch status to %s", PATCH_STATUS[params.status])
# Update Status
p.modify_metadata_text("metadata.xml", "status", PATCH_STATUS[params.status])
p.write_patch(params.patch_file, params.formal)
except PatchModifyError:
log.exception("Error while modifying patch")
finally:
shutil.rmtree(workdir)
os.chdir(orig_wd)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Debian make_patch helper")
subparsers = parser.add_subparsers(dest="cmd")
# Prepare env Action
prepare_parser = subparsers.add_parser("prepare",
add_help=False,
description="Prepare environment",
help="Prepare the environment for patching by creating a clone of ostree_repo")
prepare_parser.add_argument("-c", "--clone-repo", type=str, help="Clone repo directory name", default=None, required=True)
# Create Patch Action
create_parser = subparsers.add_parser("create",
add_help=False,
description="Create patch",
help="Create patch, should be executed after changes are done to the environment")
create_parser.add_argument("-p", "--patch-recipe", type=str, help="Patch recipe xml", required=True)
create_parser.add_argument("-f", "--formal", action="store_true", help="Formal patch flag")
create_parser.add_argument("-d", "--delta-dir", type=str, help="Delta dir name", default="delta-dir")
create_parser.add_argument("-c", "--clone-repo", type=str, help="Clone repo directory name", default=None, required=True)
# Modify Patch action
modify_parser = subparsers.add_parser("modify",
add_help=False,
description="modify patch status",
help="Modify patch status - DEV, REL, OBS")
modify_parser.add_argument("-s", "--status", type=str, help="Patch status", required=True)
modify_parser.add_argument("-f", "--formal", action="store_true", help="Formal patch flag")
modify_parser.add_argument("-pf", "--patch-file", type=str, help="Patch file", required=True)
args = parser.parse_args()
log.debug("Args: %s", args)
if args.cmd == "create":
handle_create(args)
elif args.cmd == "prepare":
handle_prepare(args)
elif args.cmd == "modify":
handle_modify(args)
log.info("Done")