"""
Misc heavy lifting functions for Cobbler
"""
# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2006-2009, Red Hat, Inc and Others
# SPDX-FileCopyrightText: Michael DeHaan <michael.dehaan AT gmail>
import contextlib
import fcntl
import glob
import logging
import os
import random
import re
import shutil
import subprocess
import sys
import traceback
import urllib.error
import urllib.parse
import urllib.request
from functools import reduce
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Dict,
Hashable,
List,
Optional,
Pattern,
Tuple,
Union,
)
import distro
from netaddr.ip import IPAddress, IPNetwork
from cobbler import enums, settings
from cobbler.cexceptions import CX
from cobbler.utils import process_management
if TYPE_CHECKING:
from cobbler.api import CobblerAPI
from cobbler.cobbler_collections.collection import ITEM
from cobbler.items.abstract.bootable_item import BootableItem
from cobbler.settings import Settings
CHEETAH_ERROR_DISCLAIMER = """
# *** ERROR ***
#
# There is a templating error preventing this file from rendering correctly.
#
# This is most likely not due to a bug in Cobbler and is something you can fix.
#
# Look at the message below to see what things are causing problems.
#
# (1) Does the template file reference a $variable that is not defined?
# (2) is there a formatting error in a Cheetah directive?
# (3) Should dollar signs ($) be escaped that are not being escaped?
#
# Try fixing the problem and then investigate to see if this message goes
# away or changes.
#
"""
re_kernel = re.compile(
r"(vmlinu[xz]|(kernel|linux(\.img)?)|pxeboot\.n12|wimboot|mboot\.c32|tboot\.b00|b\.b00|.+\.kernel)"
)
re_initrd = re.compile(r"(initrd(.*)\.img|ramdisk\.image\.gz|boot\.sdi|imgpayld\.tgz)")
# all logging from utils.die goes to the main log even if there is another log.
# logging.getLogger is not annotated fully according to pylance.
logger = logging.getLogger() # type: ignore
[docs]def die(msg: str) -> None:
"""
This method let's Cobbler crash with an exception. Log the exception once in the per-task log or the main log if
this is not a background op.
:param msg: The message to send for raising the exception
:raises CX: Raised in all cases with ``msg``.
"""
# log the exception once in the per-task log or the main log if this is not a background op.
try:
raise CX(msg)
except CX:
log_exc()
# now re-raise it so the error can fail the operation
raise CX(msg)
[docs]def log_exc() -> None:
"""
Log an exception.
"""
(exception_type, exception_value, exception_traceback) = sys.exc_info()
logger.info("Exception occurred: %s", exception_type)
logger.info("Exception value: %s", exception_value)
logger.info(
"Exception Info:\n%s",
"\n".join(traceback.format_list(traceback.extract_tb(exception_traceback))),
)
[docs]def get_exc(exc: Exception, full: bool = True) -> str:
"""
This tries to analyze if an exception comes from Cobbler and potentially enriches or shortens the exception.
:param exc: The exception which should be analyzed.
:param full: If the full exception should be returned or only the most important information.
:return: The exception which has been converted into a string which then can be logged easily.
"""
(exec_type, exec_value, trace) = sys.exc_info()
buf = ""
try:
getattr(exc, "from_cobbler")
buf = str(exc)[1:-1] + "\n"
except Exception:
if not full:
buf += str(exec_type)
buf = f"{buf}\n{exec_value}"
if full:
buf += "\n" + "\n".join(traceback.format_list(traceback.extract_tb(trace)))
return buf
[docs]def cheetah_exc(exc: Exception) -> str:
"""
Converts an exception thrown by Cheetah3 into a custom error message.
:param exc: The exception to convert.
:return: The string representation of the Cheetah3 exception.
"""
lines = get_exc(exc).split("\n")
buf = ""
for line in lines:
buf += f"# {line}\n"
return CHEETAH_ERROR_DISCLAIMER + buf
[docs]def pretty_hex(ip_address: IPAddress, length: int = 8) -> str:
"""
Pads an IP object with leading zeroes so that the result is _length_ hex digits. Also do an upper().
:param ip_address: The IP address to pretty print.
:param length: The length of the resulting hexstring. If the number is smaller than the resulting hex-string
then no front-padding is done.
"""
hexval = f"{ip_address.value:x}"
if len(hexval) < length:
hexval = "0" * (length - len(hexval)) + hexval
return hexval.upper()
[docs]def get_host_ip(ip_address: str, shorten: bool = True) -> str:
"""
Return the IP encoding needed for the TFTP boot tree.
:param ip_address: The IP address to pretty print.
:param shorten: Whether the IP-Address should be shortened or not.
:return: The IP encoded as a hexadecimal value.
"""
ip_address_obj = IPAddress(ip_address)
cidr = IPNetwork(ip_address)
if len(cidr) == 1: # Just an IP, e.g. a /32
return pretty_hex(ip_address_obj)
pretty = pretty_hex(cidr[0])
if not shorten or len(cidr) <= 8:
# not enough to make the last nibble insignificant
return pretty
cutoff = (32 - cidr.prefixlen) // 4
return pretty[0:-cutoff]
def _IP(ip_address: Union[str, IPAddress]) -> IPAddress:
"""
Returns a netaddr.IP object representing an ip.
If ip is already an netaddr.IP instance just return it.
Else return a new instance
"""
if isinstance(ip_address, IPAddress):
return ip_address
return IPAddress(ip_address)
[docs]def is_ip(strdata: str) -> bool:
"""
Return whether the argument is an IP address.
:param strdata: The IP in a string format. This get's passed to the IP object of Python.
"""
try:
_IP(strdata)
except Exception:
return False
return True
[docs]def get_random_mac(api_handle: "CobblerAPI", virt_type: str = "kvm") -> str:
"""
Generate a random MAC address.
The code of this method was taken from xend/server/netif.py
:param api_handle: The main Cobbler api instance.
:param virt_type: The virtualization provider. Currently possible is 'vmware', 'xen', 'qemu', 'kvm'.
:returns: MAC address string
:raises CX: Raised in case unsupported ``virt_type`` given.
"""
if virt_type.startswith("vmware"):
mac = [
0x00,
0x50,
0x56,
random.randint(0x00, 0x3F),
random.randint(0x00, 0xFF),
random.randint(0x00, 0xFF),
]
elif (
virt_type.startswith("xen")
or virt_type.startswith("qemu")
or virt_type.startswith("kvm")
):
mac = [
0x00,
0x16,
0x3E,
random.randint(0x00, 0x7F),
random.randint(0x00, 0xFF),
random.randint(0x00, 0xFF),
]
else:
raise CX("virt mac assignment not yet supported")
result = ":".join([f"{x:02x}" for x in mac])
systems = api_handle.systems()
while systems.find(mac_address=result):
result = get_random_mac(api_handle)
return result
[docs]def find_matching_files(directory: str, regex: Pattern[str]) -> List[str]:
"""
Find all files in a given directory that match a given regex. Can't use glob directly as glob doesn't take regexen.
The search does not include subdirectories.
:param directory: The directory to search in.
:param regex: The regex to apply to the found files.
:return: An array of files which apply to the regex.
"""
files = glob.glob(os.path.join(directory, "*"))
results: List[str] = []
for file in files:
if regex.match(os.path.basename(file)):
results.append(file)
return results
[docs]def find_highest_files(directory: str, unversioned: str, regex: Pattern[str]) -> str:
"""
Find the highest numbered file (kernel or initrd numbering scheme) in a given directory that matches a given
pattern. Used for auto-booting the latest kernel in a directory.
:param directory: The directory to search in.
:param unversioned: The base filename which also acts as a last resort if no numbered files are found.
:param regex: The regex to search for.
:return: The file with the highest number or an empty string.
"""
files = find_matching_files(directory, regex)
get_numbers = re.compile(r"(\d+).(\d+).(\d+)")
def max2(first: str, second: str) -> str:
"""
Returns the larger of the two values
"""
first_match = get_numbers.search(os.path.basename(first))
second_match = get_numbers.search(os.path.basename(second))
if not (first_match and second_match):
raise ValueError("Could not detect version numbers correctly!")
first_value = first_match.groups()
second_value = second_match.groups()
if first_value > second_value:
return first
return second
if len(files) > 0:
return reduce(max2, files)
# Couldn't find a highest numbered file, but maybe there is just a 'vmlinuz' or an 'initrd.img' in this directory?
last_chance = os.path.join(directory, unversioned)
if os.path.exists(last_chance):
return last_chance
return ""
[docs]def find_kernel(path: str) -> str:
"""
Given a filename, find if the path can be made to resolve into a kernel, and return that full path if possible.
:param path: The path to check for a kernel.
:return: path if at the specified location a possible match for a kernel was found, otherwise an empty string.
"""
if not isinstance(path, str): # type: ignore
raise TypeError("path must be of type str!")
if os.path.isfile(path):
filename = os.path.basename(path)
if re_kernel.match(filename) or filename == "vmlinuz":
return path
elif os.path.isdir(path):
return find_highest_files(path, "vmlinuz", re_kernel)
# For remote URLs we expect an absolute path, and will not do any searching for the latest:
elif file_is_remote(path) and remote_file_exists(path):
return path
return ""
[docs]def remove_yum_olddata(path: Union[str, "os.PathLike[str]"]) -> None:
"""
Delete .olddata folders that might be present from a failed run of createrepo.
:param path: The path to check for .olddata files.
"""
directories_to_try = [
".olddata",
".repodata/.olddata",
"repodata/.oldata",
"repodata/repodata",
]
for pathseg in directories_to_try:
olddata = Path(path, pathseg)
if olddata.is_dir() and olddata.exists():
logger.info('Removing: "%s"', olddata)
shutil.rmtree(olddata, ignore_errors=False, onerror=None)
[docs]def find_initrd(path: str) -> Optional[str]:
"""
Given a directory or a filename, see if the path can be made to resolve into an intird, return that full path if
possible.
:param path: The path to check for initrd files.
:return: None or the path to the found initrd.
"""
# FUTURE: try to match kernel/initrd pairs?
if path is None: # type: ignore
return None
if os.path.isfile(path):
# filename = os.path.basename(path)
# if re_initrd.match(filename):
# return path
# if filename == "initrd.img" or filename == "initrd":
# return path
return path
if os.path.isdir(path):
return find_highest_files(path, "initrd.img", re_initrd)
# For remote URLs we expect an absolute path, and will not do any searching for the latest:
if file_is_remote(path) and remote_file_exists(path):
return path
return None
[docs]def read_file_contents(
file_location: str, fetch_if_remote: bool = False
) -> Optional[str]:
"""
Reads the contents of a file, which could be referenced locally or as a URI.
:param file_location: The location of the file to read.
:param fetch_if_remote: If True a remote file will be tried to read, otherwise remote files are skipped and None is
returned.
:return: Returns None if file is remote and templating of remote files is disabled.
:raises FileNotFoundError: if the file does not exist at the specified location.
"""
# Local files:
if file_location.startswith("/"):
if not os.path.exists(file_location):
logger.warning("File does not exist: %s", file_location)
raise FileNotFoundError(f"File not found: {file_location}")
try:
with open(file_location, encoding="UTF-8") as file_fd:
data = file_fd.read()
return data
except:
log_exc()
raise
# Remote files:
if not fetch_if_remote:
return None
if file_is_remote(file_location):
try:
with urllib.request.urlopen(file_location) as handler:
data = handler.read()
return data
except urllib.error.HTTPError as error:
# File likely doesn't exist
logger.warning("File does not exist: %s", file_location)
raise FileNotFoundError(f"File not found: {file_location}") from error
return None
[docs]def remote_file_exists(file_url: str) -> bool:
"""
Return True if the remote file exists.
:param file_url: The URL to check.
:return: True if Cobbler can reach the specified URL, otherwise false.
"""
try:
with urllib.request.urlopen(file_url) as _:
pass
return True
except urllib.error.HTTPError:
# File likely doesn't exist
return False
[docs]def file_is_remote(file_location: str) -> bool:
"""
Returns true if the file is remote and referenced via a protocol we support.
:param file_location: The URI to check.
:return: True if the URI is http, https or ftp. Otherwise false.
"""
file_loc_lc = file_location.lower()
# Check for urllib2 supported protocols
for prefix in ["http://", "https://", "ftp://"]:
if file_loc_lc.startswith(prefix):
return True
return False
[docs]def blender(
api_handle: "CobblerAPI", remove_dicts: bool, root_obj: "BootableItem"
) -> Dict[str, Any]:
"""
Combine all of the data in an object tree from the perspective of that point on the tree, and produce a merged
dictionary containing consolidated data.
:param api_handle: The api to use for collecting the information to blender the item.
:param remove_dicts: Boolean to decide whether dicts should be converted.
:param root_obj: The object which should act as the root-node object.
:return: A dictionary with all the information from the root node downwards.
"""
tree = root_obj.grab_tree()
tree.reverse() # start with top of tree, override going down
results: Dict[str, Any] = {}
for node in tree:
__consolidate(node, results)
# Make interfaces accessible without Cheetah-voodoo in the templates
# EXAMPLE: $ip == $ip0, $ip1, $ip2 and so on.
if root_obj.COLLECTION_TYPE == "system": # type: ignore
for name, interface in root_obj.interfaces.items(): # type: ignore
intf_dict = interface.to_dict() # type: ignore
for key in intf_dict: # type: ignore
results[f"{key}_{name}"] = intf_dict[key] # type: ignore
# If the root object is a profile or system, add in all repo data for repos that belong to the object chain
if root_obj.COLLECTION_TYPE in ("profile", "system"): # type: ignore
repo_data: List[Dict[Any, Any]] = []
for repo in results.get("repos", []):
repo = api_handle.find_repo(name=repo)
if repo and not isinstance(repo, list):
repo_data.append(repo.to_dict())
# Sorting is courtesy of https://stackoverflow.com/a/73050/4730773
results["repo_data"] = sorted(
repo_data, key=lambda repo_dict: repo_dict["priority"], reverse=True
)
http_port = results.get("http_port", 80)
if http_port in (80, "80"):
results["http_server"] = results["server"]
else:
results["http_server"] = f"{results['server']}:{http_port}"
if "children" in results:
child_names = results["children"]
results["children"] = {}
# logger.info("Children: %s", child_names)
for key in child_names:
# We use return_list=False, thus this is only Optional[ITEM]
child = api_handle.find_items("", name=key, return_list=False) # type: ignore
if child is None or isinstance(child, list):
raise ValueError(
f'Child with the name "{key}" of parent object "{root_obj.name}" did not exist!'
)
continue
results["children"][key] = child.to_dict()
# sanitize output for koan and kernel option lines, etc
if remove_dicts:
# We know we pass a dict, thus we will always get the right type!
results = flatten(results) # type: ignore
# Add in some variables for easier templating as these variables change based on object type.
if "interfaces" in results:
# is a system object
results["system_name"] = results["name"]
results["profile_name"] = results["profile"]
if "distro" in results:
results["distro_name"] = results["distro"]
elif "image" in results:
results["distro_name"] = "N/A"
results["image_name"] = results["image"]
elif "distro" in results:
# is a profile or subprofile object
results["profile_name"] = results["name"]
results["distro_name"] = results["distro"]
elif "kernel" in results:
# is a distro object
results["distro_name"] = results["name"]
elif "file" in results:
# is an image object
results["distro_name"] = "N/A"
results["image_name"] = results["name"]
return results
[docs]def flatten(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Convert certain nested dicts to strings. This is only really done for the ones koan needs as strings this should
not be done for everything
:param data: The dictionary in which various keys should be converted into a string.
:return: None (if data is None) or the flattened string.
"""
if data is None or not isinstance(data, dict): # type: ignore
return None
if "environment" in data:
data["environment"] = dict_to_string(data["environment"])
if "kernel_options" in data:
data["kernel_options"] = dict_to_string(data["kernel_options"])
if "kernel_options_post" in data:
data["kernel_options_post"] = dict_to_string(data["kernel_options_post"])
if "yumopts" in data:
data["yumopts"] = dict_to_string(data["yumopts"])
if "autoinstall_meta" in data:
data["autoinstall_meta"] = dict_to_string(data["autoinstall_meta"])
if "template_files" in data:
data["template_files"] = dict_to_string(data["template_files"])
if "repos" in data and isinstance(data["repos"], list):
data["repos"] = " ".join(data["repos"]) # type: ignore
if "rpm_list" in data and isinstance(data["rpm_list"], list):
data["rpm_list"] = " ".join(data["rpm_list"]) # type: ignore
# Note -- we do not need to flatten "interfaces" as koan does not expect it to be a string, nor do we use it on a
# kernel options line, etc...
return data
[docs]def uniquify(seq: List[Any]) -> List[Any]:
"""
Remove duplicates from the sequence handed over in the args.
:param seq: The sequence to check for duplicates.
:return: The list without duplicates.
"""
# Credit: https://www.peterbe.com/plog/uniqifiers-benchmark
# FIXME: if this is actually slower than some other way, overhaul it
# For above there is a better version: https://www.peterbe.com/plog/fastest-way-to-uniquify-a-list-in-python-3.6
seen = {}
result: List[Any] = []
for item in seq:
if item in seen:
continue
seen[item] = 1
result.append(item)
return result
def __consolidate(node: Union["ITEM", "Settings"], results: Dict[Any, Any]) -> Dict[Any, Any]: # type: ignore
"""
Merge data from a given node with the aggregate of all data from past scanned nodes. Dictionaries and arrays are
treated specially.
:param node: The object to merge data into. The data from the node always wins.
:return: A dictionary with the consolidated data.
"""
node_data = node.to_dict()
# If the node has any data items labelled <<inherit>> we need to expunge them. So that they do not override the
# supernodes.
node_data_copy: Dict[Any, Any] = {}
for key in node_data:
value = node_data[key]
if value == enums.VALUE_INHERITED:
if key not in results:
# We need to add at least one value per key, use the property getter to resolve to the
# settings or wherever we inherit from.
node_data_copy[key] = getattr(node, key)
# Old keys should have no inherit and thus are not a real property
if key == "kickstart":
node_data_copy[key] = getattr(type(node), "autoinstall").fget(node)
elif key == "ks_meta":
node_data_copy[key] = getattr(type(node), "autoinstall_meta").fget(node)
else:
if isinstance(value, dict):
node_data_copy[key] = value.copy()
elif isinstance(value, list):
node_data_copy[key] = value[:]
else:
node_data_copy[key] = value
for field, data_item in node_data_copy.items():
if field in results:
# Now merge data types separately depending on whether they are dict, list, or scalar.
fielddata = results[field]
if isinstance(fielddata, dict):
# interweave dict results
results[field].update(data_item.copy())
elif isinstance(fielddata, (list, tuple)):
# add to lists (Cobbler doesn't have many lists)
# FIXME: should probably uniquify list after doing this
results[field].extend(data_item)
results[field] = uniquify(results[field])
else:
# distro field gets special handling, since we don't want to overwrite it ever.
# FIXME: should the parent's field too? It will be overwritten if there are multiple sub-profiles in
# the chain of inheritance
if field != "distro":
results[field] = data_item
else:
results[field] = data_item
# Now if we have any "!foo" results in the list, delete corresponding key entry "foo", and also the entry "!foo",
# allowing for removal of kernel options set in a distro later in a profile, etc.
dict_removals(results, "kernel_options")
dict_removals(results, "kernel_options_post")
dict_removals(results, "autoinstall_meta")
dict_removals(results, "template_files")
return results
[docs]def dict_removals(results: Dict[Any, Any], subkey: str) -> None:
"""
Remove entries from a dictionary starting with a "!".
:param results: The dictionary to search in
:param subkey: The subkey to search through.
"""
if subkey not in results:
return
return dict_annihilate(results[subkey])
[docs]def dict_annihilate(dictionary: Dict[Any, Any]) -> None:
"""
Annihilate entries marked for removal. This method removes all entries with
key names starting with "!". If a ``dictionary`` contains keys "!xxx" and
"xxx", then both will be removed.
:param dictionary: A dictionary to clean up.
"""
for key in list(dictionary.keys()):
if str(key).startswith("!") and key != "!":
short_key = key[1:]
if short_key in dictionary:
del dictionary[short_key]
del dictionary[key]
[docs]def dict_to_string(_dict: Dict[Any, Any]) -> Union[str, Dict[Any, Any]]:
"""
Convert a dictionary to a printable string. Used primarily in the kernel options string and for some legacy stuff
where koan expects strings (though this last part should be changed to dictionaries)
A KV-Pair is joined with a "=". Values are enclosed in single quotes.
:param _dict: The dictionary to convert to a string.
:return: The string which was previously a dictionary.
"""
buffer = ""
if not isinstance(_dict, dict): # type: ignore
return _dict
for key in _dict:
value = _dict[key]
if not value:
buffer += str(key) + " "
elif isinstance(value, list):
# this value is an array, so we print out every
# key=value
item: Any
for item in value:
# strip possible leading and trailing whitespaces
_item = str(item).strip()
if " " in _item:
buffer += str(key) + "='" + _item + "' "
else:
buffer += str(key) + "=" + _item + " "
else:
_value = str(value).strip()
if " " in _value:
buffer += str(key) + "='" + _value + "' "
else:
buffer += str(key) + "=" + _value + " "
return buffer
[docs]def rsync_files(src: str, dst: str, args: str, quiet: bool = True) -> bool:
r"""
Sync files from src to dst. The extra arguments specified by args are appended to the command.
:param src: The source for the copy process.
:param dst: The destination for the copy process.
:param args: The extra arguments are appended to our standard arguments.
:param quiet: If ``True`` no progress is reported. If ``False`` then progress will be reported by rsync.
:return: ``True`` on success, otherwise ``False``.
"""
if args is None: # type: ignore
args = ""
# Make sure we put a "/" on the end of the source and destination to make sure we don't cause any rsync weirdness.
if not dst.endswith("/"):
dst = f"{dst}/"
if not src.endswith("/"):
src = f"{src}/"
spacer = ""
if not src.startswith("rsync://") and not src.startswith("/"):
spacer = ' -e "ssh" '
rsync_cmd = [
"rsync",
"-a",
spacer,
f"'{src}'",
dst,
args,
"--exclude-from=/etc/cobbler/rsync.exclude",
"--quiet" if quiet else "--progress",
]
try:
res = subprocess_call(rsync_cmd, shell=False)
if res != 0:
die(f"Failed to run the rsync command: '{rsync_cmd}'")
except Exception:
return False
return True
[docs]def run_triggers(
api: "CobblerAPI",
ref: Optional["ITEM"] = None,
globber: str = "",
additional: Optional[List[Any]] = None,
) -> None:
"""Runs all the trigger scripts in a given directory.
Example: ``/var/lib/cobbler/triggers/blah/*``
As of Cobbler 1.5.X, this also runs Cobbler modules that match the globbing paths.
Python triggers are always run before shell triggers.
:param api: The api object to use for resolving the actions.
:param ref: Can be a Cobbler object, if not None, the name will be passed to the script. If ref is None, the script
will be called with no arguments.
:param globber: is a wildcard expression indicating which triggers to run.
:param additional: Additional arguments to run the triggers with.
:raises CX: Raised in case the trigger failed.
"""
logger.debug("running python triggers from %s", globber)
modules = api.get_modules_in_category(globber)
if additional is None:
additional = []
for module in modules:
arglist: List[str] = []
if ref:
arglist.append(ref.name)
for argument in additional:
arglist.append(argument)
logger.debug("running python trigger %s", module.__name__)
return_code = module.run(api, arglist)
if return_code != 0:
raise CX(f"Cobbler trigger failed: {module.__name__}")
# Now do the old shell triggers, which are usually going to be slower, but are easier to write and support any
# language.
logger.debug("running shell triggers from %s", globber)
triggers = glob.glob(globber)
triggers.sort()
for file in triggers:
try:
if file.startswith(".") or file.find(".rpm") != -1:
# skip dotfiles or .rpmnew files that may have been installed in the triggers directory
continue
arglist = [file]
if ref:
arglist.append(ref.name)
for argument in additional:
if argument:
arglist.append(argument)
logger.debug("running shell trigger %s", file)
return_code = subprocess_call(arglist, shell=False) # close_fds=True)
except Exception:
logger.warning("failed to execute trigger: %s", file)
continue
if return_code != 0:
raise CX(
"Cobbler trigger failed: %(file)s returns %(code)d"
% {"file": file, "code": return_code}
)
logger.debug("shell trigger %s finished successfully", file)
logger.debug("shell triggers finished successfully")
[docs]def get_family() -> str:
"""
Get family of running operating system.
Family is the base Linux distribution of a Linux distribution, with a set of common parents.
:return: May be "redhat", "debian" or "suse" currently. If none of these are detected then just the distro name is
returned.
"""
# TODO: Refactor that this is purely reliant on the distro module or obsolete it.
redhat_list = (
"red hat",
"redhat",
"scientific linux",
"fedora",
"centos",
"virtuozzo",
"almalinux",
"rocky linux",
"anolis os",
"oracle linux server",
)
distro_name = distro.name().lower()
for item in redhat_list:
if item in distro_name:
return "redhat"
if "debian" in distro_name or "ubuntu" in distro_name:
return "debian"
if "suse" in distro.like():
return "suse"
return distro_name
[docs]def os_release() -> Tuple[str, float]:
"""
Get the os version of the linux distro. If the get_family() method succeeds then the result is normalized.
:return: The os-name and os version.
"""
family = get_family()
distro_name = distro.name().lower()
distro_version = distro.version()
make = "unknown"
if family == "redhat":
if "fedora" in distro_name:
make = "fedora"
elif "centos" in distro_name:
make = "centos"
elif "almalinux" in distro_name:
make = "centos"
elif "rocky linux" in distro_name:
make = "centos"
elif "anolis os" in distro_name:
make = "centos"
elif "virtuozzo" in distro_name:
make = "virtuozzo"
elif "oracle linux server" in distro_name:
make = "centos"
else:
make = "redhat"
return make, float(distro_version)
if family == "debian":
if "debian" in distro_name:
return "debian", float(distro_version)
if "ubuntu" in distro_name:
return "ubuntu", float(distro_version)
if family == "suse":
make = "suse"
if "suse" not in distro.like():
make = "unknown"
return make, float(distro_version)
return make, 0.0
[docs]def is_selinux_enabled() -> bool:
"""
This check is achieved via a subprocess call to ``selinuxenabled``. Default return is false.
:return: Whether selinux is enabled or not.
"""
if not os.path.exists("/usr/sbin/selinuxenabled"):
return False
selinuxenabled = subprocess_call(["/usr/sbin/selinuxenabled"], shell=False)
if selinuxenabled == 0:
return True
return False
[docs]def command_existing(cmd: str) -> bool:
r"""
This takes a command which should be known to the system and checks if it is available.
:param cmd: The executable to check
:return: If the binary does not exist ``False``, otherwise ``True``.
"""
# https://stackoverflow.com/a/28909933
return shutil.which(cmd) is not None
[docs]def subprocess_sp(
cmd: Union[str, List[str]], shell: bool = True, process_input: Any = None
) -> Tuple[str, int]:
"""
Call a shell process and redirect the output for internal usage.
:param cmd: The command to execute in a subprocess call.
:param shell: Whether to use a shell or not for the execution of the command.
:param process_input: If there is any input needed for that command to stdin.
:return: A tuple of the output and the return code.
"""
logger.info("running: %s", cmd)
stdin = None
if process_input:
stdin = subprocess.PIPE
try:
with subprocess.Popen(
cmd,
shell=shell,
stdin=stdin,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
close_fds=True,
) as subprocess_popen_obj:
(out, err) = subprocess_popen_obj.communicate(process_input)
return_code = subprocess_popen_obj.returncode
except OSError as os_error:
log_exc()
raise ValueError(
f"OS Error, command not found? While running: {cmd}"
) from os_error
logger.info("received on stdout: %s", out)
logger.debug("received on stderr: %s", err)
return out, return_code
[docs]def subprocess_call(
cmd: Union[str, List[str]], shell: bool = False, process_input: Any = None
) -> int:
"""
A simple subprocess call with no output capturing.
:param cmd: The command to execute.
:param shell: Whether to use a shell or not for the execution of the command.
:param process_input: If there is any process_input needed for that command to stdin.
:return: The return code of the process
"""
_, return_code = subprocess_sp(cmd, shell=shell, process_input=process_input)
return return_code
[docs]def subprocess_get(
cmd: Union[str, List[str]], shell: bool = True, process_input: Any = None
) -> str:
"""
A simple subprocess call with no return code capturing.
:param cmd: The command to execute.
:param shell: Whether to use a shell or not for the execution of the command.
:param process_input: If there is any process_input needed for that command to stdin.
:return: The data which the subprocess returns.
"""
data, _ = subprocess_sp(cmd, shell=shell, process_input=process_input)
return data
[docs]def get_supported_system_boot_loaders() -> List[str]:
"""
Return the list of currently supported bootloaders.
:return: The list of currently supported bootloaders.
"""
return ["grub", "pxe", "ipxe"]
[docs]def get_shared_secret() -> Union[str, int]:
"""
The 'web.ss' file is regenerated each time cobblerd restarts and is used to agree on shared secret interchange
between the web server and cobblerd, and also the CLI and cobblerd, when username/password access is not required.
For the CLI, this enables root users to avoid entering username/pass if on the Cobbler server.
:return: The Cobbler secret which enables full access to Cobbler.
"""
try:
with open("/var/lib/cobbler/web.ss", "rb", encoding="utf-8") as web_secret_fd:
data = web_secret_fd.read()
except Exception:
return -1
return str(data).strip()
[docs]def local_get_cobbler_api_url() -> str:
"""
Get the URL of the Cobbler HTTP API from the Cobbler settings file.
:return: The api entry point. This does not respect modifications from Loadbalancers or API-Gateways.
"""
# Load server and http port
# TODO: Replace with Settings access
data = settings.read_settings_file()
ip_address = data.get("server", "127.0.0.1")
if data.get("client_use_localhost", False):
# this overrides the server setting
ip_address = "127.0.0.1"
port = data.get("http_port", "80")
protocol = "http"
if data.get("client_use_https", False):
protocol = "https"
return f"{protocol}://{ip_address}:{port}/cobbler_api"
[docs]def local_get_cobbler_xmlrpc_url() -> str:
"""
Get the URL of the Cobbler XMLRPC API from the Cobbler settings file.
:return: The api entry point.
"""
# Load xmlrpc port
data = settings.read_settings_file()
return f"http://127.0.0.1:{data.get('xmlrpc_port', '25151')}"
[docs]def strip_none(
data: Optional[Union[List[Any], Dict[Any, Any], int, str, float]],
omit_none: bool = False,
) -> Union[List[Any], Dict[Any, Any], int, str, float]:
"""
Remove "None" entries from datastructures. Used prior to communicating with XMLRPC.
:param data: The data to strip None away.
:param omit_none: If the datastructure is not a single item then None items will be skipped instead of replaced if
set to "True".
:return: The modified data structure without any occurrence of None.
"""
if data is None:
data = "~"
elif isinstance(data, list):
data2: List[Any] = []
for element in data:
if omit_none and element is None:
pass
else:
data2.append(strip_none(element))
return data2
elif isinstance(data, dict):
data3: Dict[Any, Any] = {}
for key in list(data.keys()):
if omit_none and data[key] is None:
pass
else:
data3[str(key)] = strip_none(data[key])
return data3
return data
[docs]def revert_strip_none(
data: Union[str, int, float, bool, List[Any], Dict[Any, Any]]
) -> Optional[Union[str, int, float, bool, List[Any], Dict[Any, Any]]]:
"""
Does the opposite to strip_none. If a value which represents None is detected, it replaces it with None.
:param data: The data to check.
:return: The data without None.
"""
if isinstance(data, str) and data.strip() == "~":
return None
if isinstance(data, list):
data2: List[Any] = []
for element in data:
data2.append(revert_strip_none(element))
return data2
if isinstance(data, dict):
data3: Dict[Any, Any] = {}
for key in data.keys():
data3[key] = revert_strip_none(data[key])
return data3
return data
[docs]def lod_to_dod(_list: List[Any], indexkey: Hashable) -> Dict[Any, Any]:
r"""
Things like ``get_distros()`` returns a list of a dictionaries. Convert this to a dict of dicts keyed off of an
arbitrary field.
Example: ``[ { "a" : 2 }, { "a" : 3 } ]`` -> ``{ "2" : { "a" : 2 }, "3" : { "a" : "3" } }``
:param _list: The list of dictionaries to use for the conversion.
:param indexkey: The position to use as dictionary keys.
:return: The converted dictionary. It is not guaranteed that the same key is not used multiple times.
"""
results: Dict[Any, Any] = {}
for item in _list:
results[item[indexkey]] = item
return results
[docs]def lod_sort_by_key(list_to_sort: List[Any], indexkey: Hashable) -> List[Any]:
"""
Sorts a list of dictionaries by a given key in the dictionaries.
Note: This is a destructive operation and does not sort the dictionaries.
:param list_to_sort: The list of dictionaries to sort.
:param indexkey: The key to index to dicts in the list.
:return: The sorted list.
"""
return sorted(list_to_sort, key=lambda k: k[indexkey])
[docs]def dhcpconf_location(protocol: enums.DHCP, filename: str = "dhcpd.conf") -> str:
"""
This method returns the location of the dhcpd.conf file.
:param protocol: The DHCP protocol version (v4/v6) that is used.
:param filename: The filename of the DHCP configuration file.
:raises AttributeError: If the protocol is not v4/v6.
:return: The path possibly used for the dhcpd.conf file.
"""
if protocol not in enums.DHCP:
logger.info(
"DHCP configuration location could not be determined due to unknown protocol version."
)
raise AttributeError("DHCP must be version 4 or 6!")
if protocol == enums.DHCP.V6 and filename == "dhcpd.conf":
filename = "dhcpd6.conf"
(dist, version) = os_release()
if (
(dist in ("redhat", "centos") and version < 6)
or (dist == "fedora" and version < 11)
or (dist == "suse")
):
return os.path.join("/etc", filename)
if (dist == "debian" and int(version) < 6) or (
dist == "ubuntu" and version < 11.10
):
return os.path.join("/etc/dhcp3", filename)
return os.path.join("/etc/dhcp/", filename)
[docs]def namedconf_location() -> str:
"""
This returns the location of the named.conf file.
:return: If the distro is Debian/Ubuntu then this returns "/etc/bind/named.conf". Otherwise "/etc/named.conf"
"""
(dist, _) = os_release()
if dist in ("debian", "ubuntu"):
return "/etc/bind/named.conf"
return "/etc/named.conf"
[docs]def dhcp_service_name() -> str:
"""
Determine the dhcp service which is different on various distros. This is currently a hardcoded detection.
:return: This will return one of the following names: "dhcp3-server", "isc-dhcp-server", "dhcpd"
"""
(dist, version) = os_release()
if dist == "debian" and int(version) < 6:
return "dhcp3-server"
if dist == "debian" and int(version) >= 6:
return "isc-dhcp-server"
if dist == "ubuntu" and version < 11.10:
return "dhcp3-server"
if dist == "ubuntu" and version >= 11.10:
return "isc-dhcp-server"
return "dhcpd"
[docs]def named_service_name() -> str:
"""
Determine the named service which is normally different on various distros.
:return: This will return for debian/ubuntu bind9 and on other distros named-chroot or named.
"""
(dist, _) = os_release()
if dist in ("debian", "ubuntu"):
return "bind9"
if process_management.is_systemd():
return_code = subprocess_call(
["/usr/bin/systemctl", "is-active", "named-chroot"], shell=False
)
if return_code == 0:
return "named-chroot"
return "named"
[docs]def compare_versions_gt(ver1: str, ver2: str) -> bool:
"""
Compares versions like "0.9.3" with each other and decides if ver1 is greater than ver2.
:param ver1: The first version.
:param ver2: The second version.
:return: True if ver1 is greater, otherwise False.
"""
def versiontuple(version: str) -> Tuple[int, ...]:
return tuple(map(int, (version.split("."))))
return versiontuple(ver1) > versiontuple(ver2)
[docs]def kopts_overwrite(
kopts: Dict[Any, Any],
cobbler_server_hostname: str = "",
distro_breed: str = "",
system_name: str = "",
) -> None:
"""
SUSE is not using 'text'. Instead 'textmode' is used as kernel option.
:param kopts: The kopts of the system.
:param cobbler_server_hostname: The server setting from our Settings.
:param distro_breed: The distro for the system to change to kopts for.
:param system_name: The system to overwrite the kopts for.
"""
# Type checks
if not isinstance(kopts, dict): # type: ignore
raise TypeError("kopts needs to be of type dict")
if not isinstance(cobbler_server_hostname, str): # type: ignore
raise TypeError("cobbler_server_hostname needs to be of type str")
if not isinstance(distro_breed, str): # type: ignore
raise TypeError("distro_breed needs to be of type str")
if not isinstance(system_name, str): # type: ignore
raise TypeError("system_name needs to be of type str")
# Function logic
if distro_breed == "suse":
if "textmode" in list(kopts.keys()):
kopts.pop("text", None)
elif "text" in list(kopts.keys()):
kopts.pop("text", None)
kopts["textmode"] = ["1"]
if system_name and cobbler_server_hostname:
# only works if pxe_just_once is enabled in global settings
kopts[
"info"
] = f"http://{cobbler_server_hostname}/cblr/svc/op/nopxe/system/{system_name}"
[docs]def is_str_int(value: str) -> bool:
"""
Checks if the string value could be converted into an integer.
This is necessary since the CLI only works with strings but many methods and checks expects an integer.
:param value: The value to check
:return: True if conversion is successful
"""
if not isinstance(value, str): # type: ignore
raise TypeError("value needs to be of type string")
try:
converted = int(value)
return str(converted) == value
except ValueError:
pass
return False
[docs]def is_str_float(value: str) -> bool:
"""
Checks if the string value could be converted into a float.
This is necessary since the CLI only works with strings but many methods and checks expects a float.
:param value: The value to check
:return: True if conversion is successful
"""
if not isinstance(value, str): # type: ignore
raise TypeError("value needs to be of type string")
if is_str_int(value):
return True
try:
converted = float(value)
return str(converted) == value
except ValueError:
pass
return False
[docs]@contextlib.contextmanager
def filelock(lock_file: str):
"""
Context manager to acquire a file lock and release it afterwards
:param lock_file: Path to the file lock to acquire
:raises OSError: Raised in case of unexpect error acquiring file lock.
"""
fd = None
try:
fd = os.open(lock_file, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o660)
fcntl.flock(fd, fcntl.LOCK_EX)
yield
finally:
if fd:
with contextlib.suppress(OSError):
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)
[docs]def merge_dicts_recursive(
base_dict: Dict[str, Any], updating_dict: Dict[str, Any], str_append: bool = False
) -> Dict[str, Any]:
"""Merge updating_dict into base_config recursively.
:param base_dict: Base dictionary.
:param updating_dict: Updating dict, overrides base_dict.
:param str_append: Append the updating_dict String values to the base_dict
with the same key instead of overwriting the original value.
:returns dict: Merged dict"""
ret = base_dict.copy()
for k, v in updating_dict.items():
if (
k in base_dict
and isinstance(v, dict)
and isinstance(base_dict.get(k), dict)
):
ret[k] = merge_dicts_recursive(base_dict[k], v, str_append) # type: ignore
elif str_append and k in ret and isinstance(v, str):
ret[k] += v
else:
ret[k] = v
return ret
[docs]def create_files_if_not_existing(files: List[str]) -> None:
"""
Create empty files if they don't already exist. If they exist, do nothing.
:param files: A list of the files to create. If the base directory doesn't
exist, create it too.
raises OSError: In case the file cannot be created.
"""
for file in files:
if not file:
logger.warning("Attempted to create an empty file in %s, skipping", files)
continue
if not os.path.exists(file):
# It's possible the file dir doesn't exist yet, create it
basedir = os.path.dirname(file)
if not os.path.exists(basedir):
os.makedirs(basedir)
# Create empty file
open( # pylint: disable=consider-using-with
file, "a", encoding="UTF-8"
).close()
[docs]def remove_lines_in_file(filepath: str, remove_keywords: List[str]) -> None:
"""
Remove any line from file at a given filepath if it is a substring of any
element of the `remove_keywords` list.
:param filepath: A path of the file that you want to modify.
:param remove_keywords: A list of keywords. Any line in filepath that contains
any of the keywords will be removed.
raises OSError: In case the file cannot be read or modified.
"""
tmp_filepath = filepath + ".tmp"
with open(filepath, "r", encoding="UTF-8") as fh, open(
tmp_filepath, "w", encoding="UTF-8"
) as tmp_fh:
for line in fh:
if any(keyword in line for keyword in remove_keywords):
continue
tmp_fh.write(line)
os.replace(tmp_filepath, filepath)