"""
Utilities for filesystem operations used by Cobbler, including file linking, copying, hashing, and directory management.
"""
import errno
import glob
import hashlib
import json
import logging
import os
import pathlib
import shutil
import subprocess
import urllib.request
from typing import TYPE_CHECKING, Dict, Optional, Tuple, Union
from cobbler import utils
from cobbler.cexceptions import CX
from cobbler.utils import log_exc, mtab
if TYPE_CHECKING:
from cobbler.api import CobblerAPI
logger = logging.getLogger()
[docs]
def is_safe_to_hardlink(src: str, dst: str, api: "CobblerAPI") -> bool:
"""
Determine if it is safe to hardlink a file to the destination path.
:param src: The hardlink source path.
:param dst: The hardlink target path.
:param api: The api-instance to resolve needed information with.
:return: True if selinux is disabled, the file is on the same device, the source in not a link, and it is not a
remote path. If selinux is enabled the functions still may return true if the object is a kernel or initrd.
Otherwise returns False.
"""
# FIXME: Calling this with emtpy strings returns True?!
dev1, path1 = mtab.get_file_device_path(src)
dev2, _ = mtab.get_file_device_path(dst)
if dev1 != dev2:
return False
# Do not hardlink to a symbolic link! Chances are high the new link will be dangling.
if pathlib.Path(src).is_symlink():
return False
if dev1 is None:
return False
if dev1.find(":") != -1:
# Is a remote file
return False
# Note: This is very Cobbler implementation specific!
if not api.is_selinux_enabled():
return True
path1_basename = str(pathlib.PurePath(path1).name)
if utils.re_initrd.match(path1_basename):
return True
if utils.re_kernel.match(path1_basename):
return True
# We're dealing with SELinux and files that are not safe to chown
return False
[docs]
def sha1_file(file_path: Union[str, pathlib.Path], buffer_size: int = 65536) -> str:
"""
This function is emulating the functionality of the sha1sum tool.
:param file_path: The path to the file that should be hashed.
:param buffer_size: The buffer-size that should be used to hash the file.
:return: The SHA1 hash as sha1sum would return it.
"""
# Highly inspired by: https://stackoverflow.com/a/22058673
sha1 = hashlib.sha1()
with open(file_path, "rb") as file_fd:
while True:
data = file_fd.read(buffer_size)
if not data:
break
sha1.update(data)
return sha1.hexdigest()
[docs]
def hashfile(file_name: str, lcache: Optional[pathlib.Path] = None) -> Optional[str]:
r"""
Returns the sha1sum of the file
:param file_name: The file to get the sha1sum of.
:param lcache: This is a directory where Cobbler would store its ``link_cache.json`` file to speed up the return
of the hash. The hash looked up would be checked against the Cobbler internal mtime of the object.
:return: The sha1 sum or None if the file doesn't exist.
"""
hashfile_db: Dict[str, Tuple[float, str]] = {}
if lcache is None:
return None
dbfile = pathlib.Path(lcache) / "link_cache.json"
if dbfile.exists():
hashfile_db = json.loads(dbfile.read_text(encoding="utf-8"))
file = pathlib.Path(file_name)
if file.exists():
mtime = file.stat().st_mtime
if file_name in hashfile_db:
if hashfile_db[file_name][0] >= mtime:
return hashfile_db[file_name][1]
key = sha1_file(file_name)
hashfile_db[file_name] = (mtime, key)
__create_if_not_exists(lcache)
dbfile.write_text(json.dumps(hashfile_db), encoding="utf-8")
return key
return None
[docs]
def cachefile(src: str, dst: str) -> None:
"""
Copy a file into a cache and link it into place. Use this with caution, otherwise you could end up copying data
twice if the cache is not on the same device as the destination.
:param src: The sourcefile for the copy action.
:param dst: The destination for the copy action.
"""
lcache = pathlib.Path(dst).parent.parent / ".link_cache"
if not lcache.is_dir():
lcache.mkdir()
key = hashfile(src, lcache=lcache)
if key is None:
logger.info("Cachefile skipped due to missing key for it!")
return None
cachefile_obj = lcache / key
if not cachefile_obj.exists():
logger.info("trying to create cache file %s", cachefile_obj)
copyfile(src, str(cachefile_obj))
logger.debug("trying cachelink %s -> %s -> %s", src, cachefile_obj, dst)
os.link(cachefile_obj, dst)
[docs]
def linkfile(
api: "CobblerAPI", src: str, dst: str, symlink_ok: bool = False, cache: bool = True
) -> None:
"""
Attempt to create a link dst that points to src. Because file systems suck we attempt several different methods or
bail to just copying the file.
:param api: This parameter is needed to check if a file can be hardlinked. This method fails if this parameter is
not present.
:param src: The source file.
:param dst: The destination for the link.
:param symlink_ok: If it is okay to just use a symbolic link.
:param cache: If it is okay to use a cached file instead of the real one.
:raises CX: Raised in case the API is not given.
"""
dst_obj = pathlib.Path(dst)
src_obj = pathlib.Path(src)
if dst_obj.exists():
# if the destination exists, is it right in terms of accuracy and context?
if src_obj.samefile(dst):
if not is_safe_to_hardlink(src, dst, api):
# may have to remove old hardlinks for SELinux reasons as previous implementations were not complete
logger.info("removing: %s", dst)
dst_obj.unlink()
else:
return None
elif dst_obj.is_symlink():
# existing path exists and is a symlink, update the symlink
logger.info("removing: %s", dst)
dst_obj.unlink()
if is_safe_to_hardlink(src, dst, api):
# we can try a hardlink if the destination isn't to NFS or Samba this will help save space and sync time.
try:
logger.info("trying hardlink %s -> %s", src, dst)
os.link(src, dst)
return None
except (IOError, OSError):
# hardlink across devices, or link already exists we'll just symlink it if we can or otherwise copy it
pass
if symlink_ok:
# we can symlink anywhere except for /tftpboot because that is run chroot, so if we can symlink now, try it.
try:
logger.info("trying symlink %s -> %s", src, dst)
dst_obj.symlink_to(src_obj)
return None
except (IOError, OSError):
pass
if cache:
try:
cachefile(src, dst)
return None
except (IOError, OSError):
pass
# we couldn't hardlink and we couldn't symlink so we must copy
copyfile(src, dst)
[docs]
def copyfile(src: str, dst: str, symlink: bool = False) -> None:
"""
Copy a file from source to the destination.
:param src: The source file. This may also be a folder.
:param dst: The destination for the file or folder.
:param symlink: If instead of a copy, a symlink is okay, then this may be set explicitly to "True".
:raises OSError: Raised in case ``src`` could not be read.
"""
src_obj = pathlib.Path(src)
dst_obj = pathlib.Path(dst)
try:
logger.info("copying: %s -> %s", src, dst)
if src_obj.is_dir():
shutil.copytree(src, dst, symlinks=symlink)
else:
shutil.copyfile(src, dst, follow_symlinks=symlink)
except Exception as error:
if not os.access(src, os.R_OK):
raise OSError(f"Cannot read: {src}") from error
if src_obj.samefile(dst_obj):
# accomodate for the possibility that we already copied
# the file as a symlink/hardlink
raise
# traceback.print_exc()
# raise CX("Error copying %(src)s to %(dst)s" % { "src" : src, "dst" : dst})
[docs]
def copyremotefile(src: str, dst1: str, api: Optional["CobblerAPI"] = None) -> None:
"""
Copys a file from a remote place to the local destionation.
:param src: The remote file URI.
:param dst1: The copy destination on the local filesystem.
:param api: This parameter is not used currently.
:raises OSError: Raised in case an error occurs when fetching or writing the file.
"""
try:
logger.info("copying: %s -> %s", src, dst1)
with urllib.request.urlopen(src) as srcfile:
with open(dst1, "wb") as output:
output.write(srcfile.read())
except Exception as error:
raise OSError(
f"Error while getting remote file ({src} -> {dst1}):\n{error}"
) from error
[docs]
def copyfileimage(src: str, image_location: str, dst: str) -> None:
"""
Copy a file from source to the destination in the image.
:param src: The source file.
:param image_location: The location of the image.
:param dst: The destination for the file.
"""
cmd = ["mcopy", "-n", "-i", image_location, src, "::/" + dst]
try:
logger.info('running: "%s"', cmd)
utils.subprocess_call(cmd, shell=False)
except subprocess.CalledProcessError as error:
raise OSError(
f"Error while copying file to image ({src} -> {dst}):\n{error.output}"
) from error
[docs]
def rmfile(path: str) -> None:
"""
Delete a single file.
:param path: The file to delete.
"""
try:
pathlib.Path(path).unlink()
logger.info('Successfully removed "%s"', path)
except FileNotFoundError:
pass
except OSError as ioe:
logger.warning('Could not remove file "%s": %s', path, ioe.strerror)
[docs]
def rmtree_contents(path: str) -> None:
"""
Delete the content of a folder with a glob pattern.
:param path: This parameter presents the glob pattern of what should be deleted.
"""
what_to_delete = glob.glob(f"{path}/*")
for rmtree_path in what_to_delete:
rmtree(rmtree_path)
[docs]
def rmtree(path: str) -> None:
"""
Delete a complete directory or just a single file.
:param path: The directory or folder to delete.
:raises CX: Raised in case ``path`` does not exist.
"""
try:
if pathlib.Path(path).is_file():
rmfile(path)
logger.info("removing: %s", path)
shutil.rmtree(path, ignore_errors=True)
except OSError as ioe:
log_exc()
if ioe.errno != errno.ENOENT: # doesn't exist
raise CX(f"Error deleting {path}") from ioe
[docs]
def rmglob_files(path: str, glob_pattern: str) -> None:
"""
Deletes all files in ``path`` with ``glob_pattern`` with the help of ``rmfile()``.
:param path: The folder of the files to remove.
:param glob_pattern: The glob pattern for the files to remove in ``path``.
"""
for rm_path in pathlib.Path(path).glob(glob_pattern):
rmfile(str(rm_path))
[docs]
def mkdir(path: str, mode: int = 0o755) -> None:
"""
Create directory with a given mode.
:param path: The path to create the directory at.
:param mode: The mode to create the directory with.
:raises CX: Raised in case creating the directory fails with something different from error code 17 (directory
already exists).
"""
try:
pathlib.Path(path).mkdir(mode=mode, parents=True)
except OSError as os_error:
# already exists (no constant for 17?)
if os_error.errno != 17:
log_exc()
raise CX(f"Error creating {path}") from os_error
[docs]
def mkdirimage(path: pathlib.Path, image_location: str) -> None:
"""
Create a directory in an image.
:param path: The path to create the directory at.
:param image_location: The location of the image.
"""
path_parts = path.parts
cmd = ["mmd", "-i", image_location, str(path)]
try:
# Create all parent directories one by one inside the image
for parent_directory in range(1, len(path_parts) + 1):
cmd[-1] = "/".join(path_parts[:parent_directory])
logger.info('running: "%s"', cmd)
utils.subprocess_call(cmd, shell=False)
except subprocess.CalledProcessError as error:
raise OSError(
f"Error while creating directory ({cmd[-1]}) in image {image_location}.\n{error.output}"
) from error
[docs]
def path_tail(apath: str, bpath: str) -> str:
"""
Given two paths (B is longer than A), find the part in B not in A
:param apath: The first path.
:param bpath: The second path.
:return: If the paths are not starting at the same location this function returns an empty string.
"""
position = bpath.find(apath)
if position != 0:
return ""
rposition = position + len(apath)
result: str = bpath[rposition:]
if not result.startswith("/"):
result = "/" + result
return result
[docs]
def safe_filter(var: Optional[str]) -> None:
r"""
This function does nothing if the argument does not find any semicolons or two points behind each other.
:param var: This parameter shall not be None or have ".."/";" at the end.
:raises CX: In case any ``..`` or ``/`` is found in ``var``.
"""
if var is None:
return None
if var.find("..") != -1 or var.find(";") != -1:
raise CX("Invalid characters found in input")
def __create_if_not_exists(path: pathlib.Path) -> None:
"""
Creates a directory if it has not already been created.
:param path: The path where the directory should be created. Parents directories must exist.
"""
if not path.exists():
mkdir(str(path))
def __symlink_if_not_exists(source: pathlib.Path, target: pathlib.Path) -> None:
"""
Symlinks a directory if the symlink doesn't exist.
:param source: The source directory
:param target: The target directory
"""
if not target.exists():
target.symlink_to(source)
[docs]
def create_web_dirs(api: "CobblerAPI") -> None:
"""
Create directories for HTTP content
:param api: CobblerAPI
"""
webdir_obj = pathlib.Path(api.settings().webdir)
webroot_distro_mirror = webdir_obj / "distro_mirror"
webroot_misc = webdir_obj / "misc"
webroot_directory_paths = [
webdir_obj / "localmirror",
webdir_obj / "repo_mirror",
webroot_distro_mirror,
webroot_distro_mirror / "config",
webdir_obj / "links",
webroot_misc,
webdir_obj / "pub",
webdir_obj / "rendered",
webdir_obj / "images",
]
for directory_path in webroot_directory_paths:
__create_if_not_exists(directory_path)
# Copy anamon scripts to the webroot
misc_path = pathlib.Path("/var/lib/cobbler/misc")
rmtree_contents(str(webroot_misc))
for file in [f for f in misc_path.iterdir() if (misc_path / f).is_file()]:
copyfile(str((misc_path / file)), str(webroot_misc))
[docs]
def create_tftpboot_dirs(api: "CobblerAPI") -> None:
"""
Create directories for tftpboot images
:param api: CobblerAPI
"""
bootloc = pathlib.Path(api.settings().tftpboot_location)
grub_dir = bootloc / "grub"
esxi_dir = bootloc / "esxi"
tftpboot_directory_paths = [
bootloc / "etc",
bootloc / "images2",
bootloc / "ppc",
bootloc / "s390x",
bootloc / "pxelinux.cfg",
grub_dir,
grub_dir / "system",
grub_dir / "system_link",
bootloc / "images",
bootloc / "ipxe",
esxi_dir,
esxi_dir / "system",
]
for directory_path in tftpboot_directory_paths:
__create_if_not_exists(directory_path)
grub_images_link = grub_dir / "images"
__symlink_if_not_exists(pathlib.Path("../images"), grub_images_link)
esxi_images_link = esxi_dir / "images"
__symlink_if_not_exists(pathlib.Path("../images"), esxi_images_link)
esxi_pxelinux_link = esxi_dir / "pxelinux.cfg"
__symlink_if_not_exists(pathlib.Path("../pxelinux.cfg"), esxi_pxelinux_link)
boot_link = bootloc / "boot"
__symlink_if_not_exists(pathlib.Path("."), boot_link)
[docs]
def create_trigger_dirs(api: "CobblerAPI") -> None:
"""
Creates the directories that the user/admin can fill with dynamically executed scripts.
:param api: CobblerAPI
"""
# This is not yet a setting
libpath = pathlib.Path("/var/lib/cobbler")
trigger_directory = libpath / "triggers"
trigger_directories = [
trigger_directory,
trigger_directory / "add",
trigger_directory / "add" / "distro",
trigger_directory / "add" / "distro" / "pre",
trigger_directory / "add" / "distro" / "post",
trigger_directory / "add" / "profile",
trigger_directory / "add" / "profile" / "pre",
trigger_directory / "add" / "profile" / "post",
trigger_directory / "add" / "system",
trigger_directory / "add" / "system" / "pre",
trigger_directory / "add" / "system" / "post",
trigger_directory / "add" / "repo",
trigger_directory / "add" / "repo" / "pre",
trigger_directory / "add" / "repo" / "post",
trigger_directory / "add" / "menu",
trigger_directory / "add" / "menu" / "pre",
trigger_directory / "add" / "menu" / "post",
trigger_directory / "add" / "network_interface",
trigger_directory / "add" / "network_interface" / "pre",
trigger_directory / "add" / "network_interface" / "post",
trigger_directory / "add" / "template",
trigger_directory / "add" / "template" / "pre",
trigger_directory / "add" / "template" / "post",
trigger_directory / "add" / "distro_group",
trigger_directory / "add" / "distro_group" / "pre",
trigger_directory / "add" / "distro_group" / "post",
trigger_directory / "add" / "profile_group",
trigger_directory / "add" / "profile_group" / "pre",
trigger_directory / "add" / "profile_group" / "post",
trigger_directory / "add" / "system_group",
trigger_directory / "add" / "system_group" / "pre",
trigger_directory / "add" / "system_group" / "post",
trigger_directory / "delete",
trigger_directory / "delete" / "distro",
trigger_directory / "delete" / "distro" / "pre",
trigger_directory / "delete" / "distro" / "post",
trigger_directory / "delete" / "profile",
trigger_directory / "delete" / "profile" / "pre",
trigger_directory / "delete" / "profile" / "post",
trigger_directory / "delete" / "system",
trigger_directory / "delete" / "system" / "pre",
trigger_directory / "delete" / "system" / "post",
trigger_directory / "delete" / "repo",
trigger_directory / "delete" / "repo" / "pre",
trigger_directory / "delete" / "repo" / "post",
trigger_directory / "delete" / "menu",
trigger_directory / "delete" / "menu" / "pre",
trigger_directory / "delete" / "menu" / "post",
trigger_directory / "delete" / "network_interface",
trigger_directory / "delete" / "network_interface" / "pre",
trigger_directory / "delete" / "network_interface" / "post",
trigger_directory / "delete" / "template",
trigger_directory / "delete" / "template" / "pre",
trigger_directory / "delete" / "template" / "post",
trigger_directory / "delete" / "distro_group",
trigger_directory / "delete" / "distro_group" / "pre",
trigger_directory / "delete" / "distro_group" / "post",
trigger_directory / "delete" / "profile_group",
trigger_directory / "delete" / "profile_group" / "pre",
trigger_directory / "delete" / "profile_group" / "post",
trigger_directory / "delete" / "system_group",
trigger_directory / "delete" / "system_group" / "pre",
trigger_directory / "delete" / "system_group" / "post",
trigger_directory / "install",
trigger_directory / "install" / "pre",
trigger_directory / "install" / "post",
trigger_directory / "install" / "firstboot",
trigger_directory / "sync",
trigger_directory / "sync" / "pre",
trigger_directory / "sync" / "post",
trigger_directory / "change",
trigger_directory / "task",
trigger_directory / "task" / "distro",
trigger_directory / "task" / "distro" / "pre",
trigger_directory / "task" / "distro" / "post",
trigger_directory / "task" / "profile",
trigger_directory / "task" / "profile" / "pre",
trigger_directory / "task" / "profile" / "post",
trigger_directory / "task" / "system",
trigger_directory / "task" / "system" / "pre",
trigger_directory / "task" / "system" / "post",
trigger_directory / "task" / "repo",
trigger_directory / "task" / "repo" / "pre",
trigger_directory / "task" / "repo" / "post",
trigger_directory / "task" / "menu",
trigger_directory / "task" / "menu" / "pre",
trigger_directory / "task" / "menu" / "post",
trigger_directory / "task" / "network_interface",
trigger_directory / "task" / "network_interface" / "pre",
trigger_directory / "task" / "network_interface" / "post",
trigger_directory / "task" / "template",
trigger_directory / "task" / "template" / "pre",
trigger_directory / "task" / "template" / "post",
trigger_directory / "task" / "distro_group",
trigger_directory / "task" / "distro_group" / "pre",
trigger_directory / "task" / "distro_group" / "post",
trigger_directory / "task" / "profile_group",
trigger_directory / "task" / "profile_group" / "pre",
trigger_directory / "task" / "profile_group" / "post",
trigger_directory / "task" / "system_group",
trigger_directory / "task" / "system_group" / "pre",
trigger_directory / "task" / "system_group" / "post",
]
for directory_path in trigger_directories:
__create_if_not_exists(directory_path)
[docs]
def create_json_database_dirs(api: "CobblerAPI") -> None:
"""
Creates the database directories for the file serializer
:param api: CobblerAPI
"""
# This is not yet a setting
libpath = pathlib.Path("/var/lib/cobbler")
database_directories = [
libpath / "collections",
libpath / "collections" / "distros",
libpath / "collections" / "images",
libpath / "collections" / "profiles",
libpath / "collections" / "repos",
libpath / "collections" / "systems",
libpath / "collections" / "menus",
libpath / "collections" / "network_interfaces",
libpath / "collections" / "templates",
libpath / "collections" / "distro_groups",
libpath / "collections" / "profile_groups",
libpath / "collections" / "system_groups",
]
for directory_path in database_directories:
__create_if_not_exists(directory_path)