"""
Builds bootable CD images that have PXE-equivalent behavior for all Cobbler distros/profiles/systems currently in
memory.
"""
# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2006-2009, Red Hat, Inc and Others
# SPDX-FileCopyrightText: Michael DeHaan <michael.dehaan AT gmail>
import logging
import os
import pathlib
import re
import shutil
from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Union
from cobbler import templar, utils
from cobbler.enums import Archs
from cobbler.utils import filesystem_helpers, input_converters
if TYPE_CHECKING:
from cobbler.api import CobblerAPI
from cobbler.cobbler_collections.collection import ITEM, Collection
from cobbler.items.distro import Distro
from cobbler.items.profile import Profile
[docs]def add_remaining_kopts(kopts: Dict[str, Union[str, List[str]]]) -> str:
"""Add remaining kernel_options to append_line
:param kopts: The kernel options which are not present in append_line.
:return: A single line with all kernel options from the dictionary in the string. Starts with a space.
"""
append_line = [""] # empty str to ensure the returned str starts with a space
for option, args in kopts.items():
if args is None: # type: ignore
append_line.append(f"{option}")
continue
if not isinstance(args, list):
args = [args]
for arg in args:
arg_str = format(arg)
if " " in arg_str:
arg_str = f'"{arg_str}"'
append_line.append(f"{option}={arg_str}")
return " ".join(append_line)
[docs]class BootFilesCopyset(NamedTuple): # pylint: disable=missing-class-docstring
src_kernel: str
src_initrd: str
new_filename: str
[docs]class LoaderCfgsParts(NamedTuple): # pylint: disable=missing-class-docstring
isolinux: List[str]
grub: List[str]
bootfiles_copysets: List[BootFilesCopyset]
[docs]class BuildisoDirsX86_64(
NamedTuple
): # noqa: N801 pylint: disable=invalid-name,missing-class-docstring
root: pathlib.Path
isolinux: pathlib.Path
grub: pathlib.Path
autoinstall: pathlib.Path
repo: pathlib.Path
[docs]class BuildisoDirsPPC64LE(NamedTuple): # pylint: disable=missing-class-docstring
root: pathlib.Path
grub: pathlib.Path
ppc: pathlib.Path
autoinstall: pathlib.Path
repo: pathlib.Path
[docs]class Autoinstall(NamedTuple): # pylint: disable=missing-class-docstring
config: str
repos: List[str]
[docs]class BuildIso:
"""
Handles conversion of internal state to the isolinux tree layout
"""
def __init__(self, api: "CobblerAPI") -> None:
"""Constructor which initializes things here. The collection manager pulls all other dependencies in.
:param api: The API instance which holds all information about objects in Cobbler.
"""
self.api = api
self.distmap: Dict[str, str] = {}
self.distctr = 0
self.logger = logging.getLogger()
self.templar = templar.Templar(api)
self.isolinuxdir = ""
# based on https://uefi.org/sites/default/files/resources/UEFI%20Spec%202.8B%20May%202020.pdf
self.efi_fallback_renames = {
"grubaa64": "bootaa64.efi",
"grubx64.efi": "bootx64.efi",
}
# grab the header from buildiso.header file
self.iso_template = (
pathlib.Path(self.api.settings().iso_template_dir)
.joinpath("buildiso.template")
.read_text(encoding="UTF-8")
)
self.isolinux_menuentry_template = (
pathlib.Path(api.settings().iso_template_dir)
.joinpath("isolinux_menuentry.template")
.read_text(encoding="UTF-8")
)
self.grub_menuentry_template = (
pathlib.Path(api.settings().iso_template_dir)
.joinpath("grub_menuentry.template")
.read_text(encoding="UTF-8")
)
self.bootinfo_template = (
pathlib.Path(api.settings().iso_template_dir)
.joinpath("bootinfo.template")
.read_text(encoding="UTF-8")
)
def _find_distro_source(self, known_file: str, distro_mirror: str) -> str:
"""
Find a distro source tree based on a known file.
:param known_file: Path to a file that's known to be part of the distribution,
commonly the path to the kernel.
:raises ValueError: When no installation source was not found.
:return: Root of the distribution's source tree.
"""
self.logger.debug("Trying to locate source.")
(source_head, source_tail) = os.path.split(known_file)
filesource = None
while source_tail != "":
if source_head == distro_mirror:
filesource = os.path.join(source_head, source_tail)
self.logger.debug("Found source in %s", filesource)
break
(source_head, source_tail) = os.path.split(source_head)
if filesource:
return filesource
else:
raise ValueError(
"No installation source found. When building a standalone (incl. airgapped) ISO"
" you must specify a --source if the distro install tree is not hosted locally"
)
def _copy_boot_files(
self, kernel_path: str, initrd_path: str, destdir: str, new_filename: str = ""
):
"""Copy kernel/initrd to destdir with (optional) newfile prefix
:param kernel_path: Path to a a distro's kernel.
:param initrd_path: Path to a a distro's initrd.
:param destdir: The destination directory.
:param new_filename: The file new filename. Kernel and Initrd have different extensions to seperate them from
each another.
"""
kernel_source = pathlib.Path(kernel_path)
initrd_source = pathlib.Path(initrd_path)
path_destdir = pathlib.Path(destdir)
if new_filename:
kernel_dest = str(path_destdir / f"{new_filename}.krn")
initrd_dest = str(path_destdir / f"{new_filename}.img")
else:
kernel_dest = str(path_destdir / kernel_source.name)
initrd_dest = str(path_destdir / initrd_source.name)
filesystem_helpers.copyfile(str(kernel_source), kernel_dest)
filesystem_helpers.copyfile(str(initrd_source), initrd_dest)
[docs] def filter_profiles(
self, selected_items: Optional[List[str]] = None
) -> List["Profile"]:
"""
Return a list of valid profile objects selected from all profiles by name, or everything if ``selected_items``
is empty.
:param selected_items: A list of names to include in the returned list.
:return: A list of valid profiles. If an error occurred this is logged and an empty list is returned.
"""
if selected_items is None:
selected_items = []
return self.filter_items(self.api.profiles(), selected_items)
[docs] def filter_items(
self, all_objs: "Collection[ITEM]", selected_items: List[str]
) -> List["ITEM"]:
"""Return a list of valid profile or system objects selected from all profiles or systems by name, or everything
if selected_items is empty.
:param all_objs: The collection of items to filter.
:param selected_items: The list of names
:raises ValueError: Second option that this error is raised
when the list of filtered systems or profiles is empty.
:return: A list of valid profiles OR systems. If an error occurred this is logged and an empty list is returned.
"""
# No profiles/systems selection is made, let's return everything.
if len(selected_items) == 0:
return list(all_objs)
filtered_objects: List["ITEM"] = []
for name in selected_items:
item_object = all_objs.find(name=name)
if item_object is not None and not isinstance(item_object, list):
filtered_objects.append(item_object)
selected_items.remove(name)
for bad_name in selected_items:
self.logger.warning('"%s" is not a valid profile or system', bad_name)
if len(filtered_objects) == 0:
raise ValueError("No valid systems or profiles were specified.")
return filtered_objects
[docs] def parse_distro(self, distro_name: str) -> "Distro":
"""
Find and return distro object.
:param distro_name: Name of the distribution to parse.
:raises ValueError: If the distro is not found.
"""
distro_obj = self.api.find_distro(name=distro_name)
if distro_obj is None or isinstance(distro_obj, list):
raise ValueError(f"Distribution {distro_name} not found or ambigous.")
return distro_obj
[docs] def parse_profiles(
self, profiles: Optional[List[str]], distro_obj: "Distro"
) -> List["Profile"]:
"""
TODO
:param profiles: TODO
:param distro_obj: TODO
"""
profile_names = input_converters.input_string_or_list_no_inherit(profiles)
if profile_names:
orphans = set(profile_names) - set(distro_obj.children)
if len(orphans) > 0:
raise ValueError(
"When building a standalone ISO, all --profiles must be"
" under --distro. Extra --profiles: {}".format(
",".join(sorted(str(o for o in orphans)))
)
)
return self.filter_profiles(profile_names)
else:
return self.filter_profiles(distro_obj.children) # type: ignore[reportGeneralTypeIssues]
def _copy_isolinux_files(self):
"""
This method copies the required and optional files from syslinux into the directories we use for building the
ISO.
:param iso_distro: The distro (and thus architecture) to build the ISO for.
:param buildisodir: The directory where the ISO is being built in.
"""
self.logger.info("copying syslinux files")
files_to_copy = [
"isolinux.bin",
"menu.c32",
"chain.c32",
"ldlinux.c32",
"libcom32.c32",
"libutil.c32",
]
optional_files = ["ldlinux.c32", "libcom32.c32", "libutil.c32"]
syslinux_folders = [
pathlib.Path(self.api.settings().syslinux_dir),
pathlib.Path(self.api.settings().syslinux_dir).joinpath("modules/bios/"),
pathlib.Path("/usr/lib/syslinux/"),
pathlib.Path("/usr/lib/ISOLINUX/"),
]
# file_copy_success will be used to check for missing files
file_copy_success: Dict[str, bool] = {
f: False for f in files_to_copy if f not in optional_files
}
for syslinux_folder in syslinux_folders:
if syslinux_folder.exists():
for file_to_copy in files_to_copy:
source_file = syslinux_folder.joinpath(file_to_copy)
if source_file.exists():
filesystem_helpers.copyfile(
str(source_file),
os.path.join(self.isolinuxdir, file_to_copy),
)
file_copy_success[file_to_copy] = True
unsuccessful_copied_files = [k for k, v in file_copy_success.items() if not v]
if len(unsuccessful_copied_files) > 0:
self.logger.error(
'The following files were not found: "%s"',
'", "'.join(unsuccessful_copied_files),
)
raise FileNotFoundError(
"Required file(s) not found. Please check your syslinux installation"
)
def _render_grub_entry(
self, append_line: str, menu_name: str, kernel_path: str, initrd_path: str
) -> str:
"""
TODO
:param append_line: TODO
:param menu_name: TODO
:param kernel_path: TODO
:param initrd_path: TODO
"""
return self.templar.render(
self.grub_menuentry_template,
out_path=None,
search_table={
"menu_name": menu_name,
"kernel_path": kernel_path,
"initrd_path": initrd_path,
"kernel_options": re.sub(r".*initrd=\S+", "", append_line),
},
)
def _render_isolinux_entry(
self, append_line: str, menu_name: str, kernel_path: str, menu_indent: int = 0
) -> str:
"""Render a single isolinux.cfg menu entry."""
return self.templar.render(
self.isolinux_menuentry_template,
out_path=None,
search_table={
"menu_name": menu_name,
"kernel_path": kernel_path,
"append_line": append_line.lstrip(),
"menu_indent": menu_indent,
},
template_type="jinja2",
)
def _render_bootinfo_txt(self, distro_name: str) -> str:
"""Render bootinfo.txt for ppc."""
return self.templar.render(
self.bootinfo_template,
out_path=None,
search_table={"distro_name": distro_name},
template_type="jinja2",
)
def _copy_grub_into_esp(self, esp_image_location: str, arch: Archs):
"""Copy grub boot loader into EFI System Partition.
:param esp_image_location: Path to EFI System Partition.
:param arch: Distribution architecture
"""
grub_name = self.calculate_grub_name(arch)
efi_name = self.efi_fallback_renames.get(grub_name, grub_name)
esp_efi_boot = self._create_efi_boot_dir(esp_image_location)
grub_binary = (
pathlib.Path(self.api.settings().bootloaders_dir) / "grub" / grub_name
)
filesystem_helpers.copyfileimage(
str(grub_binary), esp_image_location, f"{esp_efi_boot}/{efi_name}"
)
[docs] def calculate_grub_name(self, desired_arch: Archs) -> str:
"""
This function checks the bootloaders_formats in our settings and then checks if there is a match between the
architectures and the distribution architecture.
:param distro: The distribution to get the GRUB2 loader name for.
"""
loader_formats = self.api.settings().bootloaders_formats
grub_binary_names: Dict[str, str] = {}
for loader_format, values in loader_formats.items():
name = values.get("binary_name", None)
if name is not None and isinstance(name, str):
grub_binary_names[loader_format.lower()] = name
if desired_arch in (Archs.PPC, Archs.PPC64, Archs.PPC64LE, Archs.PPC64EL):
# GRUB can boot all Power architectures it supports via the following modules directory.
return grub_binary_names["powerpc-ieee1275"]
if desired_arch == Archs.AARCH64:
# GRUB has only one 64-bit variant it can boot, the name is different how we have named it in Cobbler.
return grub_binary_names["arm64-efi"]
if desired_arch == Archs.ARM:
# GRUB has only one 32-bit variant it can boot, the name is different how we have named it in Cobbler.
return grub_binary_names["arm"]
# Now we do the regular stuff: We map the beginning of the Cobbler arch and try to find suitable loaders.
# We do want to drop "grub.0" always as it is not efi bootable.
matches = {
k: v
for (k, v) in grub_binary_names.items()
if k.startswith(desired_arch.value) and v != "grub.0"
}
if len(matches) == 0:
raise ValueError(
f'No matches found for requested Cobbler Arch: "{str(desired_arch.value)}"'
)
if len(matches) == 1:
return next(iter(matches.values()))
raise ValueError(
f'Ambiguous matches for GRUB to Cobbler Arch mapping! Requested: "{str(desired_arch.value)}"'
f' Found: "{str(matches.values())}"'
)
def _write_isolinux_cfg(
self, cfg_parts: List[str], output_dir: pathlib.Path
) -> None:
"""Write isolinux.cfg.
:param cfg_parts: List of str that is written to the config, joined by newlines.
:param output_dir: pathlib.Path that the isolinux.cfg file is written into.
"""
output_file = output_dir / "isolinux.cfg"
self.logger.info("Writing %s", output_file)
with open(output_file, "w") as f:
f.write("\n".join(cfg_parts))
def _write_grub_cfg(self, cfg_parts: List[str], output_dir: pathlib.Path) -> None:
"""Write grub.cfg.
:param cfg_parts: List of str that is written to the config, joined by newlines.
:param output_dir: pathlib.Path that the grub.cfg file is written into.
"""
output_file = output_dir / "grub.cfg"
self.logger.info("Writing %s", output_file)
with open(output_file, "w") as f:
f.write("\n".join(cfg_parts))
def _write_bootinfo(self, bootinfo_txt: str, output_dir: pathlib.Path) -> None:
"""Write ppc/bootinfo.txt
:param bootinfo_parts: List of str that is written to the config, joined by newlines.
:param output_dir: pathlib.Path that the bootinfo.txt is written into.
"""
output_file = output_dir / "bootinfo.txt"
self.logger.info("Writing %s", output_file)
with open(output_file, "w") as f:
f.write(bootinfo_txt)
def _create_esp_image_file(self, tmpdir: str) -> str:
esp = pathlib.Path(tmpdir) / "efi"
mkfs_cmd = ["mkfs.fat", "-C", str(esp), "3528"]
rc = utils.subprocess_call(mkfs_cmd, shell=False)
if rc != 0:
self.logger.error("Could not create ESP image file")
raise Exception # TODO: use proper exception
return str(esp)
def _create_efi_boot_dir(self, esp_mountpoint: str) -> str:
efi_boot = pathlib.Path("EFI") / "BOOT"
self.logger.info("Creating %s", efi_boot)
filesystem_helpers.mkdirimage(efi_boot, esp_mountpoint)
return str(efi_boot)
def _find_esp(self, root_dir: pathlib.Path) -> Optional[str]:
"""Walk root directory and look for an ESP."""
candidates = [str(match) for match in root_dir.glob("**/efi")]
if len(candidates) == 0:
return None
elif len(candidates) == 1:
return candidates[0]
else:
self.logger.info(
"Found multiple ESP (%s), choosing %s", candidates, candidates[0]
)
return candidates[0]
def _prepare_buildisodir(self, buildisodir: str = "") -> str:
"""
This validated the path and type of the buildiso directory and then (re-)creates the apropiate directories.
:param buildisodir: The directory in which the build of the ISO takes place. If an empty string then the default
directory is used.
:raises ValueError: In case the specified directory does not exist.
:raises TypeError: In case the specified argument is not of type str.
:return: The validated and normalized directory with appropriate subfolders provisioned.
"""
if not isinstance(buildisodir, str): # type: ignore
raise TypeError("buildisodir needs to be of type str!")
if not buildisodir:
buildisodir = self.api.settings().buildisodir
else:
if not os.path.isdir(buildisodir):
raise ValueError("The --tempdir specified is not a directory")
(_, buildisodir_tail) = os.path.split(os.path.normpath(buildisodir))
if buildisodir_tail != "buildiso":
buildisodir = os.path.join(buildisodir, "buildiso")
self.logger.info('Deleting and recreating the buildisodir at "%s"', buildisodir)
if os.path.exists(buildisodir):
shutil.rmtree(buildisodir)
os.makedirs(buildisodir)
self.isolinuxdir = os.path.join(buildisodir, "isolinux")
return buildisodir
[docs] def create_buildiso_dirs_x86_64(self, buildiso_root: str) -> BuildisoDirsX86_64:
"""Create directories in the buildiso root.
Layout:
.
├── autoinstall
├── EFI
│ └── BOOT
├── isolinux
└── repo_mirror
"""
root = pathlib.Path(buildiso_root)
isolinuxdir = root / "isolinux"
grubdir = root / "EFI" / "BOOT"
autoinstalldir = root / "autoinstall"
repodir = root / "repo_mirror"
for d in [isolinuxdir, grubdir, autoinstalldir, repodir]:
d.mkdir(parents=True)
return BuildisoDirsX86_64(
root=root,
isolinux=isolinuxdir,
grub=grubdir,
autoinstall=autoinstalldir,
repo=repodir,
)
[docs] def create_buildiso_dirs_ppc64le(self, buildiso_root: str) -> BuildisoDirsPPC64LE:
"""Create directories in the buildiso root.
Layout:
.
├── autoinstall
├── boot
├── ppc
└── repo_mirror
"""
root = pathlib.Path(buildiso_root)
grubdir = root / "boot"
ppcdir = root / "ppc"
autoinstalldir = root / "autoinstall"
repodir = root / "repo_mirror"
for _d in [grubdir, ppcdir, autoinstalldir, repodir]:
_d.mkdir(parents=True)
return BuildisoDirsPPC64LE(
root=root,
grub=grubdir,
ppc=ppcdir,
autoinstall=autoinstalldir,
repo=repodir,
)
def _xorriso_ppc64le(
self,
xorrisofs_opts: str,
iso: str,
buildisodir: str,
esp_path: str = "",
):
"""
Build the final xorrisofs command which is then executed on the disk.
:param xorrisofs_opts: The additional options for xorrisofs.
:param iso: The name of the output iso.
:param buildisodir: The directory in which we build the ISO.
"""
del esp_path # just accepted for polymorphism
cmd = [
"xorriso",
"-as",
"mkisofs",
]
if xorrisofs_opts != "":
cmd.append(xorrisofs_opts)
cmd.extend(
[
"-chrp-boot",
"-hfs-bless-by",
"p",
"boot",
"-V",
"COBBLER_INSTALL",
"-o",
iso,
buildisodir,
]
)
xorrisofs_return_code = utils.subprocess_call(cmd, shell=False)
if xorrisofs_return_code != 0:
self.logger.error("xorrisofs failed with non zero exit code!")
return
self.logger.info("ISO build complete")
self.logger.info("You may wish to delete: %s", buildisodir)
self.logger.info("The output file is: %s", iso)
def _xorriso_x86_64(
self, xorrisofs_opts: str, iso: str, buildisodir: str, esp_path: str
):
"""
Build the final xorrisofs command which is then executed on the disk.
:param xorrisofs_opts: The additional options for xorrisofs.
:param iso: The name of the output iso.
:param buildisodir: The directory in which we build the ISO.
:param esp_path: The absolute path to the EFI system partition.
"""
running_on, _ = utils.os_release()
if running_on in ("suse", "centos", "virtuozzo", "redhat"):
isohdpfx_location = pathlib.Path(self.api.settings().syslinux_dir).joinpath(
"isohdpfx.bin"
)
else:
isohdpfx_location = pathlib.Path(self.api.settings().syslinux_dir).joinpath(
"mbr/isohdpfx.bin"
)
esp_relative_path = pathlib.Path(esp_path).relative_to(buildisodir)
cmd = [
"xorriso",
"-as",
"mkisofs",
]
if xorrisofs_opts != "":
cmd.append(xorrisofs_opts)
cmd += [
"-isohybrid-mbr",
str(isohdpfx_location),
"-c",
"isolinux/boot.cat",
"-b",
"isolinux/isolinux.bin",
"-no-emul-boot",
"-boot-load-size",
"4",
"-boot-info-table",
"-eltorito-alt-boot",
"-e",
str(esp_relative_path),
"-no-emul-boot",
"-isohybrid-gpt-basdat",
"-V",
"COBBLER_INSTALL",
"-o",
iso,
buildisodir,
]
xorrisofs_return_code = utils.subprocess_call(cmd, shell=False)
if xorrisofs_return_code != 0:
self.logger.error("xorrisofs failed with non zero exit code!")
return
self.logger.info("ISO build complete")
self.logger.info("You may wish to delete: %s", buildisodir)
self.logger.info("The output file is: %s", iso)