"""
TODO
"""
import errno
import glob
import hashlib
import json
import logging
import os
import pathlib
import shutil
import subprocess
import urllib.request
from typing import TYPE_CHECKING, Dict, Optional, Tuple, Union
from cobbler import utils
from cobbler.cexceptions import CX
from cobbler.utils import log_exc, mtab
if TYPE_CHECKING:
from cobbler.api import CobblerAPI
logger = logging.getLogger()
[docs]def is_safe_to_hardlink(src: str, dst: str, api: "CobblerAPI") -> bool:
"""
Determine if it is safe to hardlink a file to the destination path.
:param src: The hardlink source path.
:param dst: The hardlink target path.
:param api: The api-instance to resolve needed information with.
:return: True if selinux is disabled, the file is on the same device, the source in not a link, and it is not a
remote path. If selinux is enabled the functions still may return true if the object is a kernel or initrd.
Otherwise returns False.
"""
# FIXME: Calling this with emtpy strings returns True?!
(dev1, path1) = mtab.get_file_device_path(src)
(dev2, _) = mtab.get_file_device_path(dst)
if dev1 != dev2:
return False
# Do not hardlink to a symbolic link! Chances are high the new link will be dangling.
if pathlib.Path(src).is_symlink():
return False
if dev1 is None:
return False
if dev1.find(":") != -1:
# Is a remote file
return False
# Note: This is very Cobbler implementation specific!
if not api.is_selinux_enabled():
return True
path1_basename = str(pathlib.PurePath(path1).name)
if utils.re_initrd.match(path1_basename):
return True
if utils.re_kernel.match(path1_basename):
return True
# We're dealing with SELinux and files that are not safe to chown
return False
[docs]def sha1_file(file_path: Union[str, pathlib.Path], buffer_size: int = 65536) -> str:
"""
This function is emulating the functionality of the sha1sum tool.
:param file_path: The path to the file that should be hashed.
:param buffer_size: The buffer-size that should be used to hash the file.
:return: The SHA1 hash as sha1sum would return it.
"""
# Highly inspired by: https://stackoverflow.com/a/22058673
sha1 = hashlib.sha1()
with open(file_path, "rb") as file_fd:
while True:
data = file_fd.read(buffer_size)
if not data:
break
sha1.update(data)
return sha1.hexdigest()
[docs]def hashfile(file_name: str, lcache: Optional[pathlib.Path] = None) -> Optional[str]:
r"""
Returns the sha1sum of the file
:param file_name: The file to get the sha1sum of.
:param lcache: This is a directory where Cobbler would store its ``link_cache.json`` file to speed up the return
of the hash. The hash looked up would be checked against the Cobbler internal mtime of the object.
:return: The sha1 sum or None if the file doesn't exist.
"""
hashfile_db: Dict[str, Tuple[float, str]] = {}
if lcache is None:
return None
dbfile = pathlib.Path(lcache) / "link_cache.json"
if dbfile.exists():
hashfile_db = json.loads(dbfile.read_text(encoding="utf-8"))
file = pathlib.Path(file_name)
if file.exists():
mtime = file.stat().st_mtime
if file_name in hashfile_db:
if hashfile_db[file_name][0] >= mtime:
return hashfile_db[file_name][1]
key = sha1_file(file_name)
hashfile_db[file_name] = (mtime, key)
__create_if_not_exists(lcache)
dbfile.write_text(json.dumps(hashfile_db), encoding="utf-8")
return key
return None
[docs]def cachefile(src: str, dst: str) -> None:
"""
Copy a file into a cache and link it into place. Use this with caution, otherwise you could end up copying data
twice if the cache is not on the same device as the destination.
:param src: The sourcefile for the copy action.
:param dst: The destination for the copy action.
"""
lcache = pathlib.Path(dst).parent.parent / ".link_cache"
if not lcache.is_dir():
lcache.mkdir()
key = hashfile(src, lcache=lcache)
if key is None:
logger.info("Cachefile skipped due to missing key for it!")
return None
cachefile_obj = lcache / key
if not cachefile_obj.exists():
logger.info("trying to create cache file %s", cachefile_obj)
copyfile(src, str(cachefile_obj))
logger.debug("trying cachelink %s -> %s -> %s", src, cachefile_obj, dst)
os.link(cachefile_obj, dst)
[docs]def linkfile(
api: "CobblerAPI", src: str, dst: str, symlink_ok: bool = False, cache: bool = True
) -> None:
"""
Attempt to create a link dst that points to src. Because file systems suck we attempt several different methods or
bail to just copying the file.
:param api: This parameter is needed to check if a file can be hardlinked. This method fails if this parameter is
not present.
:param src: The source file.
:param dst: The destination for the link.
:param symlink_ok: If it is okay to just use a symbolic link.
:param cache: If it is okay to use a cached file instead of the real one.
:raises CX: Raised in case the API is not given.
"""
dst_obj = pathlib.Path(dst)
src_obj = pathlib.Path(src)
if dst_obj.exists():
# if the destination exists, is it right in terms of accuracy and context?
if src_obj.samefile(dst):
if not is_safe_to_hardlink(src, dst, api):
# may have to remove old hardlinks for SELinux reasons as previous implementations were not complete
logger.info("removing: %s", dst)
dst_obj.unlink()
else:
return None
elif dst_obj.is_symlink():
# existing path exists and is a symlink, update the symlink
logger.info("removing: %s", dst)
dst_obj.unlink()
if is_safe_to_hardlink(src, dst, api):
# we can try a hardlink if the destination isn't to NFS or Samba this will help save space and sync time.
try:
logger.info("trying hardlink %s -> %s", src, dst)
os.link(src, dst)
return None
except (IOError, OSError):
# hardlink across devices, or link already exists we'll just symlink it if we can or otherwise copy it
pass
if symlink_ok:
# we can symlink anywhere except for /tftpboot because that is run chroot, so if we can symlink now, try it.
try:
logger.info("trying symlink %s -> %s", src, dst)
dst_obj.symlink_to(src_obj)
return None
except (IOError, OSError):
pass
if cache:
try:
cachefile(src, dst)
return None
except (IOError, OSError):
pass
# we couldn't hardlink and we couldn't symlink so we must copy
copyfile(src, dst)
[docs]def copyfile(src: str, dst: str, symlink: bool = False) -> None:
"""
Copy a file from source to the destination.
:param src: The source file. This may also be a folder.
:param dst: The destination for the file or folder.
:param symlink: If instead of a copy, a symlink is okay, then this may be set explicitly to "True".
:raises OSError: Raised in case ``src`` could not be read.
"""
src_obj = pathlib.Path(src)
dst_obj = pathlib.Path(dst)
try:
logger.info("copying: %s -> %s", src, dst)
if src_obj.is_dir():
shutil.copytree(src, dst, symlinks=symlink)
else:
shutil.copyfile(src, dst, follow_symlinks=symlink)
except Exception as error:
if not os.access(src, os.R_OK):
raise OSError(f"Cannot read: {src}") from error
if src_obj.samefile(dst_obj):
# accomodate for the possibility that we already copied
# the file as a symlink/hardlink
raise
# traceback.print_exc()
# raise CX("Error copying %(src)s to %(dst)s" % { "src" : src, "dst" : dst})
[docs]def copyremotefile(src: str, dst1: str, api: Optional["CobblerAPI"] = None) -> None:
"""
Copys a file from a remote place to the local destionation.
:param src: The remote file URI.
:param dst1: The copy destination on the local filesystem.
:param api: This parameter is not used currently.
:raises OSError: Raised in case an error occurs when fetching or writing the file.
"""
try:
logger.info("copying: %s -> %s", src, dst1)
with urllib.request.urlopen(src) as srcfile:
with open(dst1, "wb") as output:
output.write(srcfile.read())
except Exception as error:
raise OSError(
f"Error while getting remote file ({src} -> {dst1}):\n{error}"
) from error
[docs]def copyfileimage(src: str, image_location: str, dst: str) -> None:
"""
Copy a file from source to the destination in the image.
:param src: The source file.
:param image_location: The location of the image.
:param dst: The destination for the file.
"""
cmd = ["mcopy", "-n", "-i", image_location, src, "::/" + dst]
try:
logger.info('running: "%s"', cmd)
utils.subprocess_call(cmd, shell=False)
except subprocess.CalledProcessError as error:
raise OSError(
f"Error while copying file to image ({src} -> {dst}):\n{error.output}"
) from error
[docs]def rmfile(path: str) -> None:
"""
Delete a single file.
:param path: The file to delete.
"""
try:
pathlib.Path(path).unlink()
logger.info('Successfully removed "%s"', path)
except FileNotFoundError:
pass
except OSError as ioe:
logger.warning('Could not remove file "%s": %s', path, ioe.strerror)
[docs]def rmtree_contents(path: str) -> None:
"""
Delete the content of a folder with a glob pattern.
:param path: This parameter presents the glob pattern of what should be deleted.
"""
what_to_delete = glob.glob(f"{path}/*")
for rmtree_path in what_to_delete:
rmtree(rmtree_path)
[docs]def rmtree(path: str) -> None:
"""
Delete a complete directory or just a single file.
:param path: The directory or folder to delete.
:raises CX: Raised in case ``path`` does not exist.
"""
try:
if pathlib.Path(path).is_file():
rmfile(path)
logger.info("removing: %s", path)
shutil.rmtree(path, ignore_errors=True)
except OSError as ioe:
log_exc()
if ioe.errno != errno.ENOENT: # doesn't exist
raise CX(f"Error deleting {path}") from ioe
[docs]def rmglob_files(path: str, glob_pattern: str) -> None:
"""
Deletes all files in ``path`` with ``glob_pattern`` with the help of ``rmfile()``.
:param path: The folder of the files to remove.
:param glob_pattern: The glob pattern for the files to remove in ``path``.
"""
for rm_path in pathlib.Path(path).glob(glob_pattern):
rmfile(str(rm_path))
[docs]def mkdir(path: str, mode: int = 0o755) -> None:
"""
Create directory with a given mode.
:param path: The path to create the directory at.
:param mode: The mode to create the directory with.
:raises CX: Raised in case creating the directory fails with something different from error code 17 (directory
already exists).
"""
try:
pathlib.Path(path).mkdir(mode=mode)
except OSError as os_error:
# already exists (no constant for 17?)
if os_error.errno != 17:
log_exc()
raise CX(f"Error creating {path}") from os_error
[docs]def mkdirimage(path: pathlib.Path, image_location: str) -> None:
"""
Create a directory in an image.
:param path: The path to create the directory at.
:param image_location: The location of the image.
"""
path_parts = path.parts
cmd = ["mmd", "-i", image_location, str(path)]
try:
# Create all parent directories one by one inside the image
for parent_directory in range(1, len(path_parts) + 1):
cmd[-1] = "/".join(path_parts[:parent_directory])
logger.info('running: "%s"', cmd)
utils.subprocess_call(cmd, shell=False)
except subprocess.CalledProcessError as error:
raise OSError(
f"Error while creating directory ({cmd[-1]}) in image {image_location}.\n{error.output}"
) from error
[docs]def path_tail(apath: str, bpath: str) -> str:
"""
Given two paths (B is longer than A), find the part in B not in A
:param apath: The first path.
:param bpath: The second path.
:return: If the paths are not starting at the same location this function returns an empty string.
"""
position = bpath.find(apath)
if position != 0:
return ""
rposition = position + len(apath)
result: str = bpath[rposition:]
if not result.startswith("/"):
result = "/" + result
return result
[docs]def safe_filter(var: Optional[str]) -> None:
r"""
This function does nothing if the argument does not find any semicolons or two points behind each other.
:param var: This parameter shall not be None or have ".."/";" at the end.
:raises CX: In case any ``..`` or ``/`` is found in ``var``.
"""
if var is None:
return None
if var.find("..") != -1 or var.find(";") != -1:
raise CX("Invalid characters found in input")
def __create_if_not_exists(path: pathlib.Path) -> None:
"""
Creates a directory if it has not already been created.
:param path: The path where the directory should be created. Parents directories must exist.
"""
if not path.exists():
mkdir(str(path))
def __symlink_if_not_exists(source: pathlib.Path, target: pathlib.Path) -> None:
"""
Symlinks a directory if the symlink doesn't exist.
:param source: The source directory
:param target: The target directory
"""
if not target.exists():
target.symlink_to(source)
[docs]def create_web_dirs(api: "CobblerAPI") -> None:
"""
Create directories for HTTP content
:param api: CobblerAPI
"""
webdir_obj = pathlib.Path(api.settings().webdir)
webroot_distro_mirror = webdir_obj / "distro_mirror"
webroot_misc = webdir_obj / "misc"
webroot_directory_paths = [
webdir_obj / "localmirror",
webdir_obj / "repo_mirror",
webroot_distro_mirror,
webroot_distro_mirror / "config",
webdir_obj / "links",
webroot_misc,
webdir_obj / "pub",
webdir_obj / "rendered",
webdir_obj / "images",
]
for directory_path in webroot_directory_paths:
__create_if_not_exists(directory_path)
# Copy anamon scripts to the webroot
misc_path = pathlib.Path("/var/lib/cobbler/misc")
rmtree_contents(str(webroot_misc))
for file in [f for f in misc_path.iterdir() if (misc_path / f).is_file()]:
copyfile(str((misc_path / file)), str(webroot_misc))
[docs]def create_tftpboot_dirs(api: "CobblerAPI") -> None:
"""
Create directories for tftpboot images
:param api: CobblerAPI
"""
bootloc = pathlib.Path(api.settings().tftpboot_location)
grub_dir = bootloc / "grub"
esxi_dir = bootloc / "esxi"
tftpboot_directory_paths = [
bootloc / "boot",
bootloc / "etc",
bootloc / "images2",
bootloc / "ppc",
bootloc / "s390x",
bootloc / "pxelinux.cfg",
grub_dir,
grub_dir / "system",
grub_dir / "system_link",
bootloc / "images",
bootloc / "ipxe",
esxi_dir,
esxi_dir / "system",
]
for directory_path in tftpboot_directory_paths:
__create_if_not_exists(directory_path)
grub_images_link = grub_dir / "images"
__symlink_if_not_exists(pathlib.Path("../images"), grub_images_link)
esxi_images_link = esxi_dir / "images"
__symlink_if_not_exists(pathlib.Path("../images"), esxi_images_link)
esxi_pxelinux_link = esxi_dir / "pxelinux.cfg"
__symlink_if_not_exists(pathlib.Path("../pxelinux.cfg"), esxi_pxelinux_link)
[docs]def create_trigger_dirs(api: "CobblerAPI") -> None:
"""
Creates the directories that the user/admin can fill with dynamically executed scripts.
:param api: CobblerAPI
"""
# This is not yet a setting
libpath = pathlib.Path("/var/lib/cobbler")
trigger_directory = libpath / "triggers"
trigger_directories = [
trigger_directory,
trigger_directory / "add",
trigger_directory / "add" / "distro",
trigger_directory / "add" / "distro" / "pre",
trigger_directory / "add" / "distro" / "post",
trigger_directory / "add" / "profile",
trigger_directory / "add" / "profile" / "pre",
trigger_directory / "add" / "profile" / "post",
trigger_directory / "add" / "system",
trigger_directory / "add" / "system" / "pre",
trigger_directory / "add" / "system" / "post",
trigger_directory / "add" / "repo",
trigger_directory / "add" / "repo" / "pre",
trigger_directory / "add" / "repo" / "post",
trigger_directory / "add" / "mgmtclass",
trigger_directory / "add" / "mgmtclass" / "pre",
trigger_directory / "add" / "mgmtclass" / "post",
trigger_directory / "add" / "package",
trigger_directory / "add" / "package" / "pre",
trigger_directory / "add" / "package" / "post",
trigger_directory / "add" / "file",
trigger_directory / "add" / "file" / "pre",
trigger_directory / "add" / "file" / "post",
trigger_directory / "add" / "menu",
trigger_directory / "add" / "menu" / "pre",
trigger_directory / "add" / "menu" / "post",
trigger_directory / "delete",
trigger_directory / "delete" / "distro",
trigger_directory / "delete" / "distro" / "pre",
trigger_directory / "delete" / "distro" / "post",
trigger_directory / "delete" / "profile",
trigger_directory / "delete" / "profile" / "pre",
trigger_directory / "delete" / "profile" / "post",
trigger_directory / "delete" / "system",
trigger_directory / "delete" / "system" / "pre",
trigger_directory / "delete" / "system" / "post",
trigger_directory / "delete" / "repo",
trigger_directory / "delete" / "repo" / "pre",
trigger_directory / "delete" / "repo" / "post",
trigger_directory / "delete" / "mgmtclass",
trigger_directory / "delete" / "mgmtclass" / "pre",
trigger_directory / "delete" / "mgmtclass" / "post",
trigger_directory / "delete" / "package",
trigger_directory / "delete" / "package" / "pre",
trigger_directory / "delete" / "package" / "post",
trigger_directory / "delete" / "file",
trigger_directory / "delete" / "file" / "pre",
trigger_directory / "delete" / "file" / "post",
trigger_directory / "delete" / "menu",
trigger_directory / "delete" / "menu" / "pre",
trigger_directory / "delete" / "menu" / "post",
trigger_directory / "install",
trigger_directory / "install" / "pre",
trigger_directory / "install" / "post",
trigger_directory / "install" / "firstboot",
trigger_directory / "sync",
trigger_directory / "sync" / "pre",
trigger_directory / "sync" / "post",
trigger_directory / "change",
trigger_directory / "task",
trigger_directory / "task" / "distro",
trigger_directory / "task" / "distro" / "pre",
trigger_directory / "task" / "distro" / "post",
trigger_directory / "task" / "profile",
trigger_directory / "task" / "profile" / "pre",
trigger_directory / "task" / "profile" / "post",
trigger_directory / "task" / "system",
trigger_directory / "task" / "system" / "pre",
trigger_directory / "task" / "system" / "post",
trigger_directory / "task" / "repo",
trigger_directory / "task" / "repo" / "pre",
trigger_directory / "task" / "repo" / "post",
trigger_directory / "task" / "mgmtclass",
trigger_directory / "task" / "mgmtclass" / "pre",
trigger_directory / "task" / "mgmtclass" / "post",
trigger_directory / "task" / "package",
trigger_directory / "task" / "package" / "pre",
trigger_directory / "task" / "package" / "post",
trigger_directory / "task" / "file",
trigger_directory / "task" / "file" / "pre",
trigger_directory / "task" / "file" / "post",
trigger_directory / "task" / "menu",
trigger_directory / "task" / "menu" / "pre",
trigger_directory / "task" / "menu" / "post",
]
for directory_path in trigger_directories:
__create_if_not_exists(directory_path)
[docs]def create_json_database_dirs(api: "CobblerAPI") -> None:
"""
Creates the database directories for the file serializer
:param api: CobblerAPI
"""
# This is not yet a setting
libpath = pathlib.Path("/var/lib/cobbler")
database_directories = [
libpath / "collections",
libpath / "collections" / "distros",
libpath / "collections" / "images",
libpath / "collections" / "profiles",
libpath / "collections" / "repos",
libpath / "collections" / "systems",
libpath / "collections" / "mgmtclasses",
libpath / "collections" / "packages",
libpath / "collections" / "files",
libpath / "collections" / "menus",
]
for directory_path in database_directories:
__create_if_not_exists(directory_path)