Files
root/build-tools/stx/patch/patch-builder
Leonardo Fagundes Luz Serrano 3ae502e380 patch-builder: support for extra content
Adding support for an optional "extra_content" section in the patch xml,
which can have any number of "item" members refering to either files or
directories. These are then put into an "extra.tar" tarball which is
packed inside the patch.

Test Plan:
pass - "extra_content" section is optional
pass - "extra.tar" is included with a variety of files and dirs
pass - pack an ostree_repo inside "extra.tar"
pass - upload patch file in a running system to confirm signature
pass - source-controlled extra content

Story: 2011498
Task: 52894

Change-Id: If4c4d79a044eeaa94743290bcb623c0308fc9a00
Signed-off-by: Leonardo Fagundes Luz Serrano <Leonardo.FagundesLuzSerrano@windriver.com>
2025-10-09 14:56:31 +00:00

400 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Copyright (c) 2023-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
'''
Builds a Debian patch
'''
import click
import hashlib
import logging
import os
import shutil
import subprocess
import sys
import tarfile
import tempfile
from exceptions import FetchDebsError
import constants
import fetch_debs
import metadata
from signing.patch_signing import sign_files
sys.path.append('..')
import utils
# Patch signature files
DETACHED_SIGNATURE_FILENAME = "signature.v2"
MD5SUM_SIGNATURE_FILENAME = "signature"
# Default patch output directory
DEPLOY_DIR = "/localdisk/deploy"
DEFAULT_PATCH_OUTPUT_DIR = os.path.join(DEPLOY_DIR, "patch_output")
logger = logging.getLogger('patch_builder')
utils.set_logger(logger)
class PatchBuilder(object):
def __init__(self,
patch_recipe_file,
file_name=None,
sign_remote=False):
self.metadata = metadata.PatchMetadata(patch_recipe_file)
self.metadata.parse_input_xml_data()
self.patch_script_paths = self.metadata.patch_script_paths
self.fetch_debs = fetch_debs.FetchDebs()
self.fetch_debs.need_dl_stx_pkgs = self.metadata.stx_packages
self.fetch_debs.need_dl_binary_pkgs = self.metadata.binary_packages
self.patch_name = f'{self.metadata.patch_id}.patch' if file_name == None else file_name
self.sign_remote = sign_remote
self.signing_server = os.environ.get('SIGNING_SERVER')
self.signing_user = os.environ.get('SIGNING_USER')
self.validate_inputs()
def validate_inputs(self):
"""Raise errors if any input is invalid"""
# Formal signature requested without providing auths for signing server
if self.sign_remote and \
(self.signing_server == None or self.signing_user == None):
msg = "Cannot sign patch without signing server info"
logger.error(msg)
raise Exception(msg)
# In-Service patch requested without providing any patch scripts
if self.metadata.reboot_required == 'N' and not any(self.patch_script_paths.values()):
logger.warning("In service patch without any patch scripts!")
def get_md5(self, path):
'''
Utility function for generating the md5sum of a file
:param path: Path to file
'''
md5 = hashlib.md5()
block_size = 8192
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(block_size), b''):
md5.update(chunk)
return int(md5.hexdigest(), 16)
def build_patch(self):
logger.info(f"Patch patch: {self.patch_name}")
logger.debug("Fetching debs...")
self.fetch_debs.fetch_stx_packages()
self.fetch_debs.fetch_external_binaries()
logger.info("Building patch...")
tmpdir = tempfile.mkdtemp(prefix="patch_")
os.chdir(tmpdir)
tmp_debs_dir = os.path.join(self.fetch_debs.output_dir, "downloads", "binary")
# software tarball
with tarfile.open("software.tar", "w") as software_tar:
for binary_pkg in os.listdir(tmp_debs_dir):
logger.info(f"Saving file {binary_pkg}")
software_tar.add(os.path.join(tmp_debs_dir, binary_pkg), arcname=binary_pkg)
# append deb name into metadata
self.metadata.debs.append(binary_pkg)
# Patch scripts
for script_id, script_path in self.patch_script_paths.items():
if script_path is not None:
self.copy_rename_script(script_path, script_id)
logger.debug(f"Patch script added: {script_id}: {script_path}")
# Copy all activate scripts
if self.metadata.activation_scripts:
for script in self.metadata.activation_scripts:
self.copy_rename_script(path_to_script=script, rename=False)
# Extra content
if self.metadata.extra_content:
logger.info("Adding in extra content...")
with tarfile.open("extra.tar", "w") as extra_tar:
for item in self.metadata.extra_content:
logger.info(f"Adding: {item}")
extra_tar.add(item, arcname=os.path.join("extra", os.path.basename(item)))
# if the patch includes the 'software' package we need to make deploy-precheck
# and upgrade_utils.py from .deb file accessible directly from patch file
if 'software' in self.metadata.stx_packages:
logger.info(f"Patch includes the software package, getting scripts from deb file...")
# create temporary folder to hold our files until we copy them to the patch
tmp_folder = tempfile.mkdtemp(prefix='deb_')
# Collect files
files_to_get = [constants.PRECHECK_SCRIPTS["DEPLOY_PRECHECK"],
constants.PRECHECK_SCRIPTS["UPGRADE_UTILS"]]
path_files = self.get_files_from_deb(tmp_debs_dir, tmp_folder, 'software', files_to_get)
for path in path_files:
self.copy_rename_script(path_to_script=path, rename=False)
# removing the temporary folder
shutil.rmtree(tmp_folder)
# Generate metadata.xml
logger.debug("Generating metadata file")
self.metadata.generate_patch_metadata("metadata.xml")
tar = tarfile.open("metadata.tar", "w")
tar.add("metadata.xml")
tar.close()
os.remove("metadata.xml")
# Pack .patch file
self.__sign_and_pack(self.patch_name)
def copy_rename_script(self, path_to_script, script_type=None, rename=True):
'''
Copy the script to the directory we are in and rename based
on PATCH_SCRIPT, if necessary.
:param path_to_script: Path to the script
:param script_type: Type of the script from the constant PATCH_SCRIPTS
:param rename: Select if we should
'''
if not os.path.isfile(path_to_script):
msg = f"Patch script not found: {path_to_script}"
logger.error(msg)
raise FileNotFoundError(msg)
# check if need a rename or not
if rename:
if script_type == None or script_type not in constants.PATCH_SCRIPTS.keys():
msg = f"Not a valid script type: {script_type}"
logger.error(msg)
raise Exception(msg)
# We check the type to correctly rename the file to a expected value
script_name = constants.PATCH_SCRIPTS[script_type]
logger.info(f"Renaming {path_to_script} to {script_name}")
shutil.copy(path_to_script, f"./{script_name}")
else:
logger.info(f"Copying {path_to_script}...")
shutil.copy(path_to_script, "./")
def get_files_from_deb(self, download_dir, tmp_folder, package_name, files):
'''
Get files from inside the .deb and make it available in temporary folder
:param download_dir: Full path of directory where the deb is downloaded
:param tmp_folder: Temporary folder where file will be available
:param package_name: Name of the package
:param files: List of name of the files to be extracted
:returns list: full path for the script file
'''
# from download dir, search for {package_name}_*.deb package
pkg_name = None
for file in os.listdir(download_dir):
if file.startswith(f'{package_name}_') and file.endswith('.deb'):
pkg_name = file
if not pkg_name:
erro_msg = f'Unable to find {package_name} package inside download folder'
logger.error(erro_msg)
raise FileNotFoundError(erro_msg)
deb_path = os.path.join(download_dir, pkg_name)
# we copy deb to the temporary folder
shutil.copy(deb_path, tmp_folder)
# We first unpack deb file and get data.tar.xz from there
cmd = ['ar', '-x', os.path.join(tmp_folder, pkg_name)]
subprocess.check_call(cmd, cwd=tmp_folder)
# With data.tar.xz, we try to find script file
data_tar = tarfile.open(os.path.join(tmp_folder, 'data.tar.xz'))
paths = []
for f in files:
file_tarpath = None
for member in data_tar.getnames():
if member.endswith(f):
file_tarpath = member
if not file_tarpath:
erro_msg = f"Unable to find {f} inside data tar."
logger.error(erro_msg)
raise FileNotFoundError(erro_msg)
# We extract said file to the temporary folder
data_tar.extract(file_tarpath, path=tmp_folder)
# add it to our return
paths.append(os.path.join(tmp_folder, file_tarpath))
data_tar.close()
return paths
def __sign_and_pack(self, patch_file):
"""
Generates the patch signatures and pack the .patch file
:param patch_file .patch file full path
"""
filelist = ["metadata.tar", "software.tar"]
if self.metadata.extra_content:
filelist.append("extra.tar")
for script_id, script_path in self.patch_script_paths.items():
if script_path is not None:
filelist.append(constants.PATCH_SCRIPTS[script_id])
# Generate the local signature file
logger.debug(f"Generating signature for patch files: {filelist}")
sig = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
for f in filelist:
sig ^= self.get_md5(f)
sigfile = open(MD5SUM_SIGNATURE_FILENAME, "w")
sigfile.write("%x" % sig)
sigfile.close()
# this comes from patch_functions write_patch
# Generate the detached signature
#
# Note: if cert_type requests a formal signature, but the signing key
# is not found, we'll instead sign with the "dev" key and
# need_resign_with_formal is set to True.
need_resign_with_formal = sign_files(
filelist,
DETACHED_SIGNATURE_FILENAME,
cert_type=None)
logger.info(f"Formal signing status: {need_resign_with_formal}")
# Save files into .patch
files = [f for f in os.listdir('.') if os.path.isfile(f)]
if not os.path.exists(DEFAULT_PATCH_OUTPUT_DIR):
os.makedirs(DEFAULT_PATCH_OUTPUT_DIR)
patch_full_path = os.path.join(DEFAULT_PATCH_OUTPUT_DIR, patch_file)
tar = tarfile.open(patch_full_path, "w:gz")
for file in files:
logger.info(f"Saving file {file}")
tar.add(file)
tar.close()
logger.info(f"Patch file created {patch_full_path}")
if self.sign_remote:
self.__sign_patch_remotely(patch_full_path)
def __sign_patch_remotely(self, patch_file):
"""
Send the patch file to be signed remotely by a signing server
:param patch_file full path to the patch file
"""
logger.info("Starting remote signing for: %s", patch_file)
if not self.signing_server:
logger.error("SIGNING_SERVER variable not set, unable to continue.")
sys.exit(1)
if not self.signing_user:
logger.error("SIGNING_USER variable not set, unable to continue.")
sys.exit(1)
try:
conn_string = f"{self.signing_user}@{self.signing_server}"
patch_basename = os.path.basename(patch_file)
# First we get the upload path from the signing server, it should return something
# similar to: "Upload: /tmp/sign_upload.5jR11pS0"
call_path = subprocess.check_output([
"ssh",
"-o StrictHostKeyChecking=no",
conn_string,
f"sudo {constants.GET_UPLOAD_PATH} -r"]).decode(sys.stdout.encoding).strip()
upload_path = call_path.split()[1]
logger.info("Upload path receive from signing server: %s", upload_path)
# We send the patch to the signing server
logger.info("Sending patch to signing server...")
subprocess.check_output([
"scp",
"-q",
patch_file,
f"{conn_string}:{upload_path}"])
# Request the signing server to sign the patch, it should return the full path
# of the patch inside the signing server
logger.info("Signing patch...")
signed_patch_path = subprocess.check_output([
"ssh",
conn_string,
f"sudo {constants.REQUEST_SIGN}",
f"{upload_path}/{patch_basename}",
"usm"]).decode(sys.stdout.encoding).strip()
logger.info("Signing successful, path returned: %s", signed_patch_path)
logger.info("Downloading signed patch...")
subprocess.check_output([
"scp",
"-q",
f"{conn_string}:{signed_patch_path}",
patch_file])
logger.info("Patch successfully signed: %s", patch_file)
except subprocess.CalledProcessError as e:
logger.exception("Failure to sign patch: %s", e)
except Exception as e:
logger.exception("An unexpected error has occurred when signing the patch: %s", e)
@click.command()
@click.option(
'--recipe',
help='Patch recipe input XML file, examples are available under EXAMPLES directory',
required=True)
@click.option(
'--name',
help='Allow user to define name of the patch file. e.g.: test-sample-rr.patch. \
Name will default to patch_id if not defined',
required=False)
@click.option(
'--remote-sign',
help='Send the patch to defined SIGNING SERVER to be sign with an different key.',
is_flag=True,
required=False)
def build(recipe, name=None, remote_sign=False):
try:
patch_builder = PatchBuilder(recipe, name, remote_sign)
patch_builder.build_patch()
except FetchDebsError as e:
logger.exception(e)
sys.exit(1)
if __name__ == '__main__':
build()