#!/usr/bin/env python3 # Copyright (C) 2024 Wind River Systems,Inc # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Script for generating a pre-patched ISO matching the USM framework # (stx 10 or later) # # A system installed with a pre-patched ISO will behave the same as a system # installed with the base ISO with the patches applied. # # Requirements: # - Should be run from inside the LAT container, part of the STX build env, # as this requires several env variables and dependencies readily available # in it. # - tools repo present in MY_REPO_ROOT_DIR, as 'base-bullseye.yaml' is used; # # This enables usage of "A|B" type hints, which are not available in py39 yet # This can be removed if LAT is upgraded to py310 or above. from __future__ import annotations from typing import Any import argparse import glob import logging import os import re import shutil import subprocess import tarfile import tempfile import xml.etree.ElementTree as ET import yaml # === Logging === # logger = logging.getLogger(__file__) logging.basicConfig(level=logging.INFO, format='[%(asctime)s - %(levelname)-7s] %(message)s', datefmt='%H:%M:%S') # === Get env variables === # # Note all these variables are automatically set inside a LAT container REQUIRED_ENV_VARIABLES = [ "HTTP_SERVER_IP", "MY_REPO_ROOT_DIR", "MYUNAME", "PROJECT", ] HTTP_SERVER_IP = os.environ.get("HTTP_CONTAINER_IP") MY_REPO_ROOT_DIR = os.environ.get("MY_REPO_ROOT_DIR", default="") MYUNAME = os.environ.get("MYUNAME") PROJECT = os.environ.get("PROJECT") # === Parameters === # BASE_BULLSEYE_YAML_PATH = os.path.join( MY_REPO_ROOT_DIR, "stx-tools", "debian-mirror-tools", "config", "debian", "common", "base-bullseye.yaml") # This is used to help validate content packed into the output ISO EXPECTED_ISO_CONTENTS = { "EFI", "bzImage", "bzImage.sig", "bzImage-rt", "bzImage-rt.sig", "bzImage-std", "bzImage-std.sig", "efi.img", "images", "initrd", "initrd.sig", "isolinux", "kickstart", "ostree_repo", "patches", "pxeboot", "upgrades", } GPG_HOME = "/tmp/.lat_gnupg_root" HTTP_FULL_ADDR = f"http://{HTTP_SERVER_IP}:8088" LAT_SDK_SYSROOT = "/opt/LAT/SDK/sysroots/x86_64-wrlinuxsdk-linux" PATCHES_FEED_PATH = f"/localdisk/loadbuild/{MYUNAME}/{PROJECT}/patches_feed" # Some command outputs are very long (e.g.: ostree history). # Max length before replacing them with "" in the log. MAX_LOG_LENGTH = 2500 # === Exceptions === # class YamlParsingException(Exception): """ Exception class for errors when trying to get a value from a yaml file. """ def __init__(self, msg, key, file) -> None: final_msg = msg final_msg += f" Key: '{key}' ;" final_msg += f" Yaml File: '{file}'." super().__init__(final_msg) # === Functions === # def log_message(label: str, text: str) -> str: """Organize log message to make it easier to read If the message fits a single line, display it as so. If the message has multiple lines, better start on the next line in the log so that all message lines are aligned. """ if ("\n" in text.strip()): return f"{label}\n\"{text}\"" return f"{label}\"{text}\"" def run_command(cmd: list[str], ignore_errors: bool = False, verbose: bool = True) -> str: """Execute bash command :param cmd: Args which compose the command to execute :param ignore_errors: Whether to ignore non-zero return codes or not :param verbose: Whether to log the execution results :returns: Command execution stdout """ logger.info(f"Running command: {cmd}") # Note that the wildcard character (asterisk "*") needs to be put in quotes # when used in the CLI to be passed as input to commands. This is to # prevent shell from using the wildcard itself instead of forwarding it. # But in Python there is no need for quotes. In fact, they shouldn't be # used, as asterisk doesn't have any special meaning to it. result = subprocess.run(args=cmd, capture_output=True, text=True) if result.returncode == 127: raise Exception("Command not available. Please install it.") if result.returncode != 0: logger.error(log_message(" - stdout: ", result.stdout)) logger.error(log_message(" - stderr: ", result.stderr)) logger.error(f"RC: {result.returncode}") if not ignore_errors: raise Exception("Command failed!") else: logger.warning("Command resulted in non-zero return code!") if verbose: if len(result.stdout) > MAX_LOG_LENGTH: output = "" else: output = result.stdout logger.debug(log_message(" - stdout: ", output)) logger.debug(log_message(" - stderr: ", result.stderr)) return result.stdout def get_iso_label_from_isolinux_cfg(isolinux_cfg_path: str) -> str: """Get ISO label from isolinux.cfg file. Open isolunux.cfg file, do a regex search for 'instiso=' (where is composed by any ammount of chars, numbers and dashes) and return the first match for (Note there may be several matches for instiso, but they all seem to have the same value). :param isolinux_cfg_path: Full path name to isolinux.cfg file :returns: First match for the instiso label value """ logger.info("Getting ISO label...") logger.info(f" - isolinux.cfg path: {isolinux_cfg_path}") try: with open(isolinux_cfg_path, mode="r", encoding="utf-8") as file: content = file.read() except Exception: logger.error("Could not read contents from isolinux.cfg") raise # Match "instiso=" and a value composed of chars, numbers and dashes try: first_key_value_match = re.findall(pattern=r"instiso=[\w\d-]+", string=content)[0] except IndexError: # Regex search returned no matches. Label not in file. raise Exception("Could not find ISO label in isolinux.cfg") # Remove the "instiso=" part iso_label = first_key_value_match[8:] logger.info(f"ISO label: {iso_label}") return iso_label def create_iso(iso_contents_dir: str, iso_label: str, output_iso_path: str, expected_iso_contents: set[str] = EXPECTED_ISO_CONTENTS ) -> None: """Create a new ISO or overwrite existing ISO Check if item names in :iso_contents_dir: matches :expected_iso_contents:, use 'mkisofs' with specific parameters to create an ISO, use 'isohybrid --uefi' to make ISO "EFI bootable" and, lastly, use 'implantisomd5' to implant md5sum into the ISO. :param iso_contents_dir: Path to dir containing files to include in the ISO :param iso_label: Value to use as Volume ID :param output_iso_path: Path for the output ISO :param expected_iso_contents: A set with the expected contents from 'ls :iso_contents_dir:' """ logger.info("Generating output ISO...") logger.info(f" - ISO label: {iso_label}") # Logging ISO contents cmd = ["ls", "-l", iso_contents_dir] iso_contents_list = run_command(cmd, verbose=False) logger.debug(f" - ISO contents to include: {iso_contents_list}") # Checking if contents match what is expected iso_contents = set(os.listdir(iso_contents_dir)) if iso_contents != expected_iso_contents: logger.warning("Output ISO contents are different than expected") # Create the output ISO # The parameters are so that the iso is created with eltorito header and # with ISO 9660 format. Some parameters are hard-coded, # as there is no need to customize them. cmd = ["mkisofs", "-o", output_iso_path, "-A", iso_label, "-V", iso_label, "-U", "-J", "-joliet-long", "-r", "-iso-level", "2", "-b", "isolinux/isolinux.bin", "-c", "isolinux/boot.cat", "-no-emul-boot", "-boot-load-size", "4", "-boot-info-table", "-eltorito-alt-boot", "-eltorito-platform", "0xEF", "-eltorito-boot", "efi.img", "-no-emul-boot", iso_contents_dir] run_command(cmd) logger.info("Making output ISO EFI bootable...") cmd = ["isohybrid", "--uefi", output_iso_path] run_command(cmd) logger.info("Implanting new checksum (required for ISO9660 image)...") cmd = ["implantisomd5", output_iso_path] run_command(cmd) logger.info(f"Output ISO: {output_iso_path}") def mount_iso(iso: str, mountpoint: str) -> None: """Mount an ISO on a directory :param iso_path: ISO path :param mount_path: Path to a directory in which to mount the ISO """ logger.info("Mounting ISO...") logger.info(f" - ISO: {iso}") logger.info(f" - Mountpoint: {mountpoint}") # Mount the ISO cmd = ["mount", "-o", "loop", iso, mountpoint] run_command(cmd) def unmount_iso(mountpoint: str) -> None: """Unmount ISO from mountpoint :param mountpoint: Path to directory where an ISO is mounted """ logger.info("Un-mounting ISO...") logger.info(f" - Mountpoint: {mountpoint}") cmd = ["umount", "-l", mountpoint] run_command(cmd) def get_value_from_yaml(concatenated_key: str, yaml_path: str = BASE_BULLSEYE_YAML_PATH) -> Any: """Get value associated to a composed key from a yaml file. :param concatenated_key: Key for searching a value. For selecting values at higher depths, concatenate keys with a '.' between each key. :returns: Value for the key """ with open(yaml_path, mode="r") as stream: data = yaml.safe_load(stream) keys = concatenated_key.split(".") for key in keys: try: data = data.get(key) except AttributeError: error_msg = "Invalid key: Tried treating final value as a dict." raise YamlParsingException(error_msg, keys, yaml_path) if data is None: error_msg = "Invalid key: Dict doesn't have a value for key used." raise YamlParsingException(error_msg, keys, yaml_path) return data def setup_gpg_client(gpg_home: str = GPG_HOME, lat_sdk_sysroot: str = LAT_SDK_SYSROOT) -> None: """Setup GPG client configs - Create and setup GPG config folder (GPG_HOME) if it doesn't exist - Set GNUPGHOME env variable These actions are usually performed automatically by the LAT SDK. :param gpg_home: GPG home config directory path :param lat_sdk_sysroot: LAT SDK sysroot directory path """ logger.info("Setting up GPG configs...") logger.info(f" - GPG home folder: {gpg_home}") ostree_gpg_id = get_value_from_yaml("gpg.ostree.gpgid") ostree_gpg_key = get_value_from_yaml("gpg.ostree.gpgkey") ostree_gpg_pass = get_value_from_yaml("gpg.ostree.gpg_password") if os.path.exists(gpg_home): logger.info("GPG home already exists.") else: logger.info("GPG home dir doesn't exist, creating...") os.makedirs(gpg_home) os.chmod(gpg_home, 0o700) os.environ["OECORE_NATIVE_SYSROOT"] = lat_sdk_sysroot with open(f"{gpg_home}/gpg-agent.conf", mode="w") as file: file.write("allow-loopback-pinentry") cmd = ["gpg-connect-agent", "--homedir", gpg_home, "reloadagent", "/bye"] run_command(cmd) cmd = ["gpg", "--homedir", gpg_home, "--import", ostree_gpg_key] run_command(cmd) cmd = ["gpg", "--homedir", gpg_home, "--list-keys", ostree_gpg_id] run_command(cmd) cmd = ["gpg", "--homedir", gpg_home, "-o", "/dev/null", "-u", f'"{ostree_gpg_id}"', "--pinentry", "loopback", "--passphrase", ostree_gpg_pass, "-s", "/dev/null"] run_command(cmd) os.environ["GNUPGHOME"] = gpg_home def add_tag_xml(parent: ET.Element, name: str, text: str) -> None: """Add tag with text to a parent tag Create an XML tag inside another tag with a text inside it. :param parent: XML parent tag :param name: Name of the tag :param text: Text value inside the tag """ tag = ET.SubElement(parent, name) tag.text = text def update_metadata_info(metadata_xml_path: str, iso_path: str) -> None: """Update ISO's metadata Update the metadata XML with the ostree commit ID and checksum, along with some other adjustments for compatibility with the USM patching system. :param metadata_xml_path: Metadata XML file path :param iso_path: ISO Path """ logger.info("Getting ostree info to add to metadata XML...") cmd = f"ostree --repo={iso_path}/ostree_repo rev-parse starlingx" commit_id = run_command(cmd.split()).strip() repo_history = get_ostree_history(f"{iso_path}/ostree_repo") logger.debug("Ostree repo history:\n{repo_history}") checksum = re.findall(pattern=r"^ContentChecksum:\s*([\w\d]+)", string=repo_history, flags=re.MULTILINE)[0] logger.info("Preparing metadata XML changes...") # Load metadata XML tree = ET.parse(metadata_xml_path) root = tree.getroot() element_contents = ET.SubElement(root, "contents") element_ostree = ET.SubElement(element_contents, "ostree") element_base = ET.SubElement(element_ostree, "base") element_commit1 = ET.SubElement(element_ostree, "commit1") logger.info("Set: prepatched_iso = Y") add_tag_xml(root, "prepatched_iso", "Y") logger.info("Set: ostree.number_of_commits = 1") add_tag_xml(element_ostree, "number_of_commits", "1") logger.info("Set: ostree.base.commit = \"\"") add_tag_xml(element_base, "commit", "") logger.info("Set: ostree.base.checksum = \"\"") add_tag_xml(element_base, "checksum", "") logger.info(f"Set: ostree.commit1.commit = '{commit_id}'") add_tag_xml(element_commit1, "commit", commit_id) logger.info(f"Set: ostree.commit1.checksum = '{checksum}'") add_tag_xml(element_commit1, "checksum", checksum) logger.info("Remove: requires") requires = root.find("requires") if requires is not None: requires.clear() logger.info("Saving metadata XML changes...") tree.write(metadata_xml_path) def get_ostree_history(ostree_repo: str, filtered: bool = True) -> str: """Get ostree repo history Take an ostree repo path and return the ostree history. Has an option to filter out the commit messages. :param ostree_repo: ostree repo path :param filtered: Whether to filter out commit messages :returns: ostree repo history """ if not os.path.isdir(ostree_repo): raise Exception(f"Ostree repo directory does not exist: {ostree_repo}") cmd = f"ostree --repo={ostree_repo} log starlingx" repo_history = run_command(cmd.split()) if not filtered: return repo_history # Strings that identify relevant info in the ostree repo history keywords = ["commit ", "Parent", "Checksum", "Date", "History"] filtered_history = [] for line in repo_history.splitlines(): if any(keyword in line for keyword in keywords): filtered_history.append(line) return "\n".join(filtered_history) def remove_ostree_remotes(ostree_repo: str) -> None: """ Remove all references to remote ostree repos from the target ostree repo :param ostree_repo: Path to ostree repo """ logger.info("Cleaning remotes from ostree repo...") if not os.path.isdir(ostree_repo): raise Exception(f"Ostree repo directory does not exist: {ostree_repo}") cmd = ["ostree", f"--repo={ostree_repo}", "remote", "list"] remote_list = run_command(cmd).split() logger.debug(f"Remotes: {remote_list}") for remote in remote_list: cmd = ["ostree", f"--repo={ostree_repo}", "remote", "delete", remote] run_command(cmd) with open(f"{ostree_repo}/config", mode="r", encoding="utf-8") as file: ostree_config = file.read() logger.debug(log_message("Clean ostree config:", ostree_config)) # TODO (lfagunde): This function, along with all ostree repo manipulations # across this script, can be implemented as a separate file for "ostree utils". # Define a class with the repo path as it's defining property and several # methods to operate on it. def clean_ostree(ostree_repo: str) -> None: """ Delete all commits in the ostree repo except for the latest one. :param ostree_repo: Path to the ostree repository """ logger.info("Cleaning old commits from ostree repo...") logger.info(f"ostree repo: {ostree_repo}") if not os.path.isdir(ostree_repo): raise Exception(f"Ostree repo directory does not exist: {ostree_repo}") repo_history = get_ostree_history(ostree_repo) logger.debug("Ostree repo history before cleaning old commits:\n" f"{repo_history}") commits = re.findall(pattern=r"^commit\s*([\w\d-]+)", string=repo_history, flags=re.MULTILINE) # Delete each commit except the latest one for commit in commits[1:]: cmd = f"ostree --repo={ostree_repo} prune --delete-commit={commit}" run_command(cmd.split()) cmd = ["ostree", "summary", "--update", f"--repo={ostree_repo}"] run_command(cmd) repo_history = get_ostree_history(ostree_repo) logger.debug("Ostree repo history after cleaning old commits:\n" f"{repo_history}") def copy_iso_contents_exclude_selected( iso_path: str, target_dir: str, exclude_list: list[str] | None = None, verbose: bool = False) -> None: """Copy ISO contents to target dir EXCEPT some selected content To copy only specific contents, check copy_specific_iso_contents() :param iso_path: Path to ISO file :param target_dir: Directory where to copy the ISO contents :param exclude_list: List with names of files and directories to exclude. Must contain only their names. E.g: 'patches', 'efi.img', 'upgrades' :param verbose: Whether or not to show the list of transferred items """ logger.info("Copying all ISO contents except selected...") logger.info(f" - ISO: {iso_path}") logger.info(f" - Target dir: {target_dir}") if not exclude_list: exclude_list = [] logger.info(f" - Excluded contents: {exclude_list}") # Create tempdir for mounting mount_tempdir = tempfile.mkdtemp(prefix='mount_tempdir_') mount_iso(iso_path, mount_tempdir) # The slashes at the end of dir names are necessary for rsync cmd = ["rsync", "-a"] if verbose: cmd += ["-v"] for item in exclude_list: path = os.path.join(mount_tempdir, item) if os.path.isfile(path): cmd += ["--exclude", item] elif os.path.isdir(path): # Syntax to account for all internal dir content cmd += ["--exclude", f"{item}/***"] else: raise Exception(f"Item in exclude list not in source ISO: {item}") cmd += [f"{mount_tempdir}/", f"{target_dir}/"] run_command(cmd) # Calculate if copy was successful iso_contents = set(os.listdir(mount_tempdir)) target_dir_contents = set(os.listdir(target_dir)) missing_content = iso_contents - target_dir_contents - set(exclude_list) # Remove mountpoint unmount_iso(mount_tempdir) os.rmdir(mount_tempdir) # Report errors if any if missing_content: raise Exception(f"Failed to copy ISO contents: {missing_content}") def copy_iso_contents_include_selected(iso_path: str, target_dir: str, include_list: list[str] | None, verbose: bool = False) -> None: """Copy ONLY selected ISO contents to target dir :param iso_path: Path to ISO file :param target_dir: Path to directory where to copy the ISO contents :param include_list: List with names of files and directories to copy. Must contain only their names. E.g: 'patches', 'efi.img', 'upgrades' :param verbose: Whether or not to show the list of transfered items """ logger.info("Copying specific ISO contents...") logger.info(f" - ISO: {iso_path}") logger.info(f" - Target dir: {target_dir}") if not include_list: include_list = [] logger.info(f" - Contents to include: {include_list}") # Create tempdir for mounting mount_tempdir = tempfile.mkdtemp(prefix='mount_tempdir_') mount_iso(iso_path, mount_tempdir) cmd = ["rsync", "-a"] if verbose: cmd += ["-v"] for item in include_list: path = os.path.join(mount_tempdir, item) if os.path.isfile(path): cmd += ["--include", item] elif os.path.isdir(path): # Syntax to include all internal dir content cmd += ["--include", f"{item}/***"] else: raise Exception(f"Invalid content to copy: {item}") cmd += ["--exclude", "*", f"{mount_tempdir}/", f"{target_dir}/"] run_command(cmd) # Calculate if copy was successful target_dir_contents = set(os.listdir(target_dir)) missing_content = set(include_list) - target_dir_contents # Remove mountpoint unmount_iso(mount_tempdir) os.rmdir(mount_tempdir) # Report errors if any if missing_content: raise Exception(f"Failed to copy ISO contents: {missing_content}") # === Main === # def main(): # Parse arguments parser = argparse.ArgumentParser( description="Create an ISO with patches already applied. " "Requires some env variables: " f"{REQUIRED_ENV_VARIABLES}", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( '-i', '--base-iso', help="Full path to main ISO file. All content used to make the output " "ISO will be pulled from here, unless specified otherwise", type=str, required=True) parser.add_argument( '-si', '--secondary-iso', help="Optional. Full path to a secondary ISO file. Some ISO content " "can be pulled from here to replace equivalent from base input " "ISO, if necessary", type=str) parser.add_argument( '-sc', '--secondary-content', help="Optional. Name of a file or directory to pull from the secondary" " ISO instead of the main ISO. Can be used multiple times", action='append', type=str) parser.add_argument( '-p', '--patch', help="Full path to a patch file to apply. Can be used multiple times", type=str, action='append', required=True) parser.add_argument( '-o', '--output', help="Full path to use for the output pre-patched ISO", type=str, required=True) parser.add_argument( '-v', '--verbose', help="Enable debug logs", action='store_true') parser.add_argument( '-g', '--sign-gpg', help="When adding a new ostree commit corresponding to each patch, " "sign it using the default GPG_HOME from the LAT container.", action='store_true') parser.add_argument( '-b', '--base-ostree-repo', help="Optional. Full path to an ostree repo to use as base to apply " "the patches instead of one from the input ISOs", type=str) args = parser.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) logger.debug("=> Args provided in command line:") for key, value in args._get_kwargs(): logger.debug(f"- {key} = {value}") # Create temporary directories # Tempdir for setting up ISO contents build_tempdir = tempfile.mkdtemp(prefix='build_tempdir_') # Tempdir for patches' metadata and debs patch_tempdir = tempfile.mkdtemp(prefix='patch_tempdir_') # Change permissions on build dir so we can update the files os.chmod(build_tempdir, 0o777) try: # Check if any required env variables are missing missing_env_variables = [var_name for var_name in REQUIRED_ENV_VARIABLES if not globals()[var_name]] if missing_env_variables: raise Exception("Env variables are missing: " f"{missing_env_variables}. Consider executing " "script from inside LAT container.") # Validate parsed arguments if not os.path.isfile(args.base_iso): raise Exception(f"Input ISO doesn't exist: {args.base_iso}") if args.secondary_iso and not os.path.isfile(args.secondary_iso): raise Exception("Secondary ISO doesn't exist: " f"{args.secondary_iso}") if args.secondary_content: if not args.secondary_iso: raise Exception("Can't define secondary content without a " "secondary input ISO to take it from") if any(["/" in item for item in args.secondary_content]): raise Exception("Secondary content must not be a path or " "contain '/', only content names: " f"{args.secondary_content}") if not all([os.path.isfile(patch) for patch in args.patch]): raise Exception("One or more patch files provided do not exist") if os.path.exists(args.output): raise Exception(f"Output filepath already exists: {args.output}") if args.base_ostree_repo and not os.path.isdir(args.base_ostree_repo): raise Exception("Base ostree repo doesn't exist: " f"{args.base_ostree_repo}") # Re-assign args to local variables base_input_iso = args.base_iso secondary_input_iso = args.secondary_iso secondary_content = args.secondary_content output_iso = args.output patches = args.patch sign_gpg = args.sign_gpg base_ostree_repo = args.base_ostree_repo # Assign default values if not secondary_content: secondary_content = [] logger.info("=> Starting execution") # Copy content from base input ISO to build dir # except the ostree_repo and whatever content was selected to be pulled # from the secondary ISO logger.info("=> Taking content from base input ISO...") exclude_list = secondary_content + ['ostree_repo'] copy_iso_contents_exclude_selected(iso_path=base_input_iso, target_dir=build_tempdir, exclude_list=exclude_list, verbose=True) # Copy content from secondary input ISO to build dir if secondary_input_iso and secondary_content: logger.info("=> Taking content from secondary input ISO...") copy_iso_contents_include_selected(iso_path=secondary_input_iso, target_dir=build_tempdir, include_list=secondary_content, verbose=True) # Copy ostree_repo to build dir logger.info("=> Copying base ostree repository from inputs...") if base_ostree_repo: # A custom ostree_repo was provided to serve as base cmd = ["rsync", "-a", f'{base_ostree_repo}/', f"{build_tempdir}/ostree_repo"] run_command(cmd) else: # As fallback, use ostree_repo from main Input ISO copy_iso_contents_include_selected(iso_path=base_input_iso, target_dir=build_tempdir, include_list=['ostree_repo']) # We initiate a reprepo feed in loadbuild because we need to access it # through a http service # TODO: apt-ostree outputs are going directly to the console, # instead of being returned to the caller via stdio logger.info(f'=> Setting up package feed in {PATCHES_FEED_PATH}...') cmd = ["apt-ostree", "repo", "init", "--feed", PATCHES_FEED_PATH, "--release", "bullseye", "--origin", "updates"] run_command(cmd) logger.info('=> Unpacking patches...') latest_patch_number = 0 # For each patch, extract the metadata.xml and the deb files # and save the sw_version and package names to be used on apt-ostree patches_data = [] for patch in patches: with tempfile.TemporaryDirectory() as extract_folder: with tarfile.open(patch) as f: # We extract the metadata.xml from the metadata.tar f.extract('metadata.tar', f"{extract_folder}/") metadata_tar = tarfile.open(f"{extract_folder}/metadata.tar") metadata_tar.extract('metadata.xml', f"{extract_folder}/") # Get sw_version value and save metadata.xml using sw_version as suffix xml_root = ET.parse(f"{extract_folder}/metadata.xml").getroot() sw_version = xml_root.find('sw_version').text component = xml_root.find('component').text os.makedirs(f"{patch_tempdir}/{sw_version}/metadata") metadata_path = (f"{patch_tempdir}/{sw_version}/metadata/{component}-{sw_version}" "-metadata.xml") shutil.copy(f"{extract_folder}/metadata.xml", metadata_path) # From inside software.tar we extract every .deb file f.extract('software.tar', f"{extract_folder}/") software_tar = tarfile.open(f"{extract_folder}/software.tar") software_tar.extractall(f"{patch_tempdir}/{sw_version}/debs/") # Packages names need to include version and revision # e.g.: logmgmt_1.0-1.stx.10 packages = [] for i in xml_root.find('packages').findall('deb'): packages.append(i.text.split("_")[0]) # Patches can contain precheck scripts, we need to verify if # they exist and, if so, move them to the pre-patched iso. precheck = False path_precheck = '' path_upgrade_utils = '' if "deploy-precheck" in f.getnames() and "upgrade_utils.py" in f.getnames(): precheck = True f.extract('deploy-precheck', f"{extract_folder}/") f.extract('upgrade_utils.py', f"{extract_folder}/") precheck_folder = f"{patch_tempdir}/{sw_version}/precheck" os.makedirs(f"{precheck_folder}") path_precheck = f"{precheck_folder}/deploy-precheck" path_upgrade_utils = f"{precheck_folder}/upgrade_utils.py" shutil.copy(f"{extract_folder}/deploy-precheck", path_precheck) shutil.copy(f"{extract_folder}/upgrade_utils.py", path_upgrade_utils) # Now we save the information we extract for later use patches_data.append({ "sw_version": sw_version, "path": f"{patch_tempdir}/{sw_version}", "packages": packages, "metadata": metadata_path, "precheck": precheck, "path_precheck": path_precheck, "path_upgrade_utils": path_upgrade_utils }) # Save the biggest version from the patches we have patch_num = int(sw_version.split(".")[-1]) latest_patch_number = max(patch_num, latest_patch_number) logger.info(f'Patch {sw_version} unpacked sucessfully.') # Here we setup our gpg client if needed if sign_gpg: setup_gpg_client() # We delete the patches folder from the base iso and recreate it # so we may populate with the metadatas from the patches we are using shutil.rmtree(f"{build_tempdir}/patches", ignore_errors=True) os.mkdir(f"{build_tempdir}/patches") # We clean all the metadatas inside upgrades folder for file in glob.glob(f"{build_tempdir}/upgrades/*-metadata.xml"): os.remove(file) # Now we need to populate reprepo feed with every deb from every patch # after that we install it on the ostree repository logger.info('Populate ostree repository with .deb files...') patches_data = sorted(patches_data, key=lambda x: x['sw_version']) for patch in patches_data: # Scan /debs/ folder and load every patch to the reprepo feed deb_dir = os.scandir(os.path.join(patch["path"], "debs/")) for deb in deb_dir: cmd = ["apt-ostree", "repo", "add", "--feed", PATCHES_FEED_PATH, "--release", "bullseye", "--component", patch['sw_version'], os.path.join(f"{patch['path']}/debs/", deb.name)] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) # Now with every deb loaded we commit it in the ostree repository # apt-ostree requires an http connection to access the host files # so we give the full http path using the ip full_feed_path = f'\"{HTTP_FULL_ADDR}{PATCHES_FEED_PATH} bullseye\"' cmd = ["apt-ostree", "compose", "install", "--repo", f"{build_tempdir}/ostree_repo"] # If we have ostree setup we will use the gpg key if sign_gpg: gpg_key = get_value_from_yaml("gpg.ostree.gpgid") cmd += ["--gpg-key", gpg_key] pkgs = " ".join(patch["packages"]) cmd += ["--branch", "starlingx", "--feed", full_feed_path, "--component", patch['sw_version'], pkgs] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) # Check if patch has precheck scripts, if yes move then to the upgrades folder if patch["precheck"]: shutil.copy(patch["path_precheck"], f"{build_tempdir}/upgrades") shutil.copy(patch["path_upgrade_utils"], f"{build_tempdir}/upgrades") # Copy only the patch metadata with the biggest patch version to ISO patch_num = int(patch["sw_version"].split(".")[-1]) if latest_patch_number == patch_num: # Metadata inside upgrades requires ostree information update_metadata_info(patch["metadata"], build_tempdir) shutil.copy(patch["metadata"], f"{build_tempdir}/patches") shutil.copy(patch["metadata"], f"{build_tempdir}/upgrades") # Update ostree summary cmd = ["ostree", "summary", "--update", f"--repo={build_tempdir}/ostree_repo"] logger.debug('Running command: %s', cmd) subprocess.check_call(cmd, shell=False) # Keep only the latest commit in ostree_repo to save storage space clean_ostree(f"{build_tempdir}/ostree_repo") # Remove all references to remote ostree repos used during build remove_ostree_remotes(f"{build_tempdir}/ostree_repo") # Now we get the label and re create the ISO with the new ostree logger.info('Creating new .iso file...') instlabel = get_iso_label_from_isolinux_cfg(f"{build_tempdir}/isolinux/isolinux.cfg") create_iso(build_tempdir, instlabel, output_iso) # Allow to edit and read the newly created iso os.chmod(output_iso, 0o664) logger.info("Pre-patched ISO created sucessfully: %s", output_iso) except Exception as e: logger.error("[EXECUTION FAILED]") logger.exception(f"Summary: {e}") # Clean up temporary folders shutil.rmtree(build_tempdir, ignore_errors=True) shutil.rmtree(patch_tempdir, ignore_errors=True) # Clean reprepro feed if os.path.exists(PATCHES_FEED_PATH): shutil.rmtree(PATCHES_FEED_PATH, ignore_errors=True) if __name__ == "__main__": main()