Source code for cobbler.actions.check

"""
Cobbler Trigger Module that checks against a list of hardcoded potential common errors in a Cobbler installation.
"""

# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2006-2009, Red Hat, Inc and Others
# SPDX-FileCopyrightText: Michael DeHaan <michael.dehaan AT gmail>

import glob
import logging
import os
import re
from typing import TYPE_CHECKING, List
from xmlrpc.client import ServerProxy

from cobbler import utils
from cobbler.utils import process_management

if TYPE_CHECKING:
    from cobbler.api import CobblerAPI


[docs]class CobblerCheck: """ Validates whether the system is reasonably well configured for serving up content. This is the code behind 'cobbler check'. """ def __init__(self, api: "CobblerAPI") -> None: """ Constructor :param api: The API which holds all information. """ self.api = api self.settings = api.settings() self.logger = logging.getLogger() self.checked_family = ""
[docs] def run(self) -> List[str]: """ The CLI usage is "cobbler check" before "cobbler sync". :return: None if there are no errors, otherwise returns a list of things to correct prior to running application 'for real'. """ status: List[str] = [] self.checked_family = utils.get_family() self.check_name(status) self.check_selinux(status) if self.settings.manage_dhcp: mode = self.api.get_sync().dhcp.what() if mode == "isc": self.check_dhcpd_bin(status) self.check_dhcpd_conf(status) self.check_service(status, "dhcpd") elif mode == "dnsmasq": self.check_dnsmasq_bin(status) self.check_service(status, "dnsmasq") if self.settings.manage_dns: mode = self.api.get_sync().dns.what() if mode == "bind": self.check_bind_bin(status) self.check_service(status, "named") elif mode == "dnsmasq" and not self.settings.manage_dhcp: self.check_dnsmasq_bin(status) self.check_service(status, "dnsmasq") mode = self.api.get_sync().tftpd.what() if mode == "in_tftpd": self.check_tftpd_dir(status) elif mode == "tftpd_py": self.check_ctftpd_dir(status) self.check_service(status, "cobblerd") self.check_bootloaders(status) self.check_for_wget_curl(status) self.check_rsync_conf(status) self.check_iptables(status) self.check_yum(status) self.check_debmirror(status) self.check_for_ksvalidator(status) self.check_for_default_password(status) self.check_for_unreferenced_repos(status) self.check_for_unsynced_repos(status) self.check_for_cman(status) return status
[docs] def check_for_ksvalidator(self, status: List[str]) -> None: """ Check if the ``ksvalidator`` is present in ``/usr/bin``. :param status: The status list with possible problems. The status list with possible problems. """ # FIXME: This tools is cross-platform via Python. Thus all distros can have it. # ubuntu also identifies as "debian" if self.checked_family in ["debian", "suse"]: return if not os.path.exists("/usr/bin/ksvalidator"): status.append("ksvalidator was not found, install pykickstart")
[docs] @staticmethod def check_for_cman(status: List[str]) -> None: """ Check if the fence agents are available. This is done through checking if the binary ``fence_ilo`` is present in ``/sbin`` or ``/usr/sbin``. :param status: The status list with possible problems. The status list with possible problems. """ # not doing rpm -q here to be cross-distro friendly if not os.path.exists("/sbin/fence_ilo") and not os.path.exists( "/usr/sbin/fence_ilo" ): status.append( "fencing tools were not found, and are required to use the (optional) power management " "features. install cman or fence-agents to use them" )
[docs] def check_service(self, status: List[str], which: str, notes: str = "") -> None: """ Check if the service command is available or the old init.d system has to be used. :param status: The status list with possible problems. :param which: The service to check for. :param notes: A manual not to attach. """ if notes != "": notes = f" (NOTE: {notes})" return_code = 0 if process_management.is_supervisord(): with ServerProxy("http://localhost:9001/RPC2") as server: process_info = server.supervisor.getProcessInfo(which) if ( isinstance(process_info, dict) and process_info["statename"] != "RUNNING" ): status.append(f"service {which} is not running{notes}") return elif process_management.is_systemd(): return_code = utils.subprocess_call( ["systemctl", "status", which], shell=False ) if return_code != 0: status.append(f'service "{which}" is not running{notes}') return elif self.checked_family in ("redhat", "suse"): if os.path.exists(f"/etc/rc.d/init.d/{which}"): return_code = utils.subprocess_call( ["/sbin/service", which, "status"], shell=False ) if return_code != 0: status.append(f"service {which} is not running{notes}") return elif self.checked_family == "debian": # we still use /etc/init.d if os.path.exists(f"/etc/init.d/{which}"): return_code = utils.subprocess_call( [f"/etc/init.d/{which}", "status"], shell=False ) if return_code != 0: status.append(f"service {which} is not running{notes}") return else: status.append( f"Unknown distribution type, cannot check for running service {which}" ) return
[docs] def check_iptables(self, status: List[str]) -> None: """ Check if iptables is running. If yes print the needed ports. This is unavailable on Debian, SUSE and CentOS7 as a service. However this only indicates that the way of persisting the iptable rules are persisted via other means. :param status: The status list with possible problems. """ # TODO: Rewrite check to be able to verify this is in more cases if os.path.exists("/etc/rc.d/init.d/iptables"): return_code = utils.subprocess_call( ["/sbin/service", "iptables", "status"], shell=False ) if return_code == 0: status.append( f"since iptables may be running, ensure 69, 80/443, and {self.settings.xmlrpc_port} are unblocked" )
[docs] def check_yum(self, status: List[str]) -> None: """ Check if the yum-stack is available. On Debian based distros this will always return without checking. :param status: The status list with possible problems. """ # FIXME: Replace this with calls which check for the path of these tools. if self.checked_family == "debian": return if not os.path.exists("/usr/bin/createrepo"): status.append( "createrepo package is not installed, needed for cobbler import and cobbler reposync, " "install createrepo?" ) if not os.path.exists("/usr/bin/dnf") and not os.path.exists( "/usr/bin/reposync" ): status.append("reposync not installed, install yum-utils") if os.path.exists("/usr/bin/dnf") and not os.path.exists("/usr/bin/reposync"): status.append( "reposync is not installed, install yum-utils or dnf-plugins-core" ) if not os.path.exists("/usr/bin/dnf") and not os.path.exists( "/usr/bin/yumdownloader" ): status.append("yumdownloader is not installed, install yum-utils") if os.path.exists("/usr/bin/dnf") and not os.path.exists( "/usr/bin/yumdownloader" ): status.append( "yumdownloader is not installed, install yum-utils or dnf-plugins-core" )
[docs] def check_debmirror(self, status: List[str]) -> None: """ Check if debmirror is available and the config file for it exists. If the distro family is suse then this will pass without checking. :param status: The status list with possible problems. """ if self.checked_family == "suse": return if not os.path.exists("/usr/bin/debmirror"): status.append( "debmirror package is not installed, it will be required to manage debian deployments and " "repositories" ) if os.path.exists("/etc/debmirror.conf"): with open("/etc/debmirror.conf", encoding="UTF-8") as debmirror_fd: re_dists = re.compile(r"@dists=") re_arches = re.compile(r"@arches=") for line in debmirror_fd.readlines(): if re_dists.search(line) and not line.strip().startswith("#"): status.append( "comment out 'dists' on /etc/debmirror.conf for proper debian support" ) if re_arches.search(line) and not line.strip().startswith("#"): status.append( "comment out 'arches' on /etc/debmirror.conf for proper debian support" )
[docs] def check_name(self, status: List[str]) -> None: """ If the server name in the config file is still set to localhost automatic installations run from koan will not have proper kernel line parameters. :param status: The status list with possible problems. """ if self.settings.server == "127.0.0.1": status.append( "The 'server' field in /etc/cobbler/settings.yaml must be set to something other than localhost, " "or automatic installation features will not work. This should be a resolvable hostname or " "IP for the boot server as reachable by all machines that will use it." ) if self.settings.next_server_v4 == "127.0.0.1": status.append( "For PXE to be functional, the 'next_server_v4' field in /etc/cobbler/settings.yaml must be set to " "something other than 127.0.0.1, and should match the IP of the boot server on the PXE " "network." ) if self.settings.next_server_v6 == "::1": status.append( "For PXE to be functional, the 'next_server_v6' field in /etc/cobbler/settings.yaml must be set to " "something other than ::1, and should match the IP of the boot server on the PXE network." )
[docs] def check_selinux(self, status: List[str]) -> None: """ Suggests various SELinux rules changes to run Cobbler happily with SELinux in enforcing mode. :param status: The status list with possible problems. """ # FIXME: this method could use some refactoring in the future. if self.checked_family == "debian": return enabled = self.api.is_selinux_enabled() if enabled: status.append( "SELinux is enabled. Please review the following wiki page for details on ensuring Cobbler " "works correctly in your SELinux environment:\n " "https://cobbler.readthedocs.io/en/latest/user-guide/selinux.html" )
[docs] def check_for_default_password(self, status: List[str]) -> None: """ Check if the default password of Cobbler was changed. :param status: The status list with possible problems. """ default_pass = self.settings.default_password_crypted if default_pass == "$1$mF86/UHC$WvcIcX2t6crBz2onWxyac.": status.append( "The default password used by the sample templates for newly installed machines (" "default_password_crypted in /etc/cobbler/settings.yaml) is still set to 'cobbler' and should be " "changed, try: \"openssl passwd -1 -salt 'random-phrase-here' 'your-password-here'\" to " "generate new one" )
[docs] def check_for_unreferenced_repos(self, status: List[str]) -> None: """ Check if there are repositories which are not used and thus could be removed. :param status: The status list with possible problems. """ repos: List[str] = [] referenced: List[str] = [] not_found: List[str] = [] for repo in self.api.repos(): repos.append(repo.name) for profile in self.api.profiles(): my_repos = profile.repos if my_repos != "<<inherit>>": referenced.extend(my_repos) for repo in referenced: if repo not in repos and repo != "<<inherit>>": not_found.append(repo) if len(not_found) > 0: status.append( "One or more repos referenced by profile objects is no longer defined in Cobbler:" f" {', '.join(not_found)}" )
[docs] def check_for_unsynced_repos(self, status: List[str]) -> None: """ Check if there are unsynchronized repositories which need an update. :param status: The status list with possible problems. """ need_sync: List[str] = [] for repo in self.api.repos(): if repo.mirror_locally is True: lookfor = os.path.join(self.settings.webdir, "repo_mirror", repo.name) if not os.path.exists(lookfor): need_sync.append(repo.name) if len(need_sync) > 0: status.append( "One or more repos need to be processed by cobbler reposync for the first time before " f"automating installations using them: {', '.join(need_sync)}" )
[docs] @staticmethod def check_dhcpd_bin(status: List[str]) -> None: """ Check if dhcpd is installed. :param status: The status list with possible problems. """ if not os.path.exists("/usr/sbin/dhcpd"): status.append("dhcpd is not installed")
[docs] @staticmethod def check_dnsmasq_bin(status: List[str]) -> None: """ Check if dnsmasq is installed. :param status: The status list with possible problems. """ return_code = utils.subprocess_call(["dnsmasq", "--help"], shell=False) if return_code != 0: status.append("dnsmasq is not installed and/or in path")
[docs] @staticmethod def check_bind_bin(status: List[str]) -> None: """ Check if bind is installed. :param status: The status list with possible problems. """ return_code = utils.subprocess_call(["named", "-v"], shell=False) # it should return something like "BIND 9.6.1-P1-RedHat-9.6.1-6.P1.fc11" if return_code != 0: status.append("named is not installed and/or in path")
[docs] @staticmethod def check_for_wget_curl(status: List[str]) -> None: """ Check to make sure wget or curl is installed :param status: The status list with possible problems. """ rc_wget = utils.subprocess_call(["wget", "--help"], shell=False) rc_curl = utils.subprocess_call(["curl", "--help"], shell=False) if rc_wget != 0 and rc_curl != 0: status.append( "Neither wget nor curl are installed and/or available in $PATH. Cobbler requires that one " "of these utilities be installed." )
[docs] @staticmethod def check_bootloaders(status: List[str]) -> None: """ Check if network bootloaders are installed :param status: The status list with possible problems. """ # FIXME: move zpxe.rexx to loaders bootloaders = { "menu.c32": [ "/usr/share/syslinux/menu.c32", "/usr/lib/syslinux/menu.c32", "/var/lib/cobbler/loaders/menu.c32", ], "pxelinux.0": [ "/usr/share/syslinux/pxelinux.0", "/usr/lib/syslinux/pxelinux.0", "/var/lib/cobbler/loaders/pxelinux.0", ], "efi": [ "/var/lib/cobbler/loaders/grub-x86.efi", "/var/lib/cobbler/loaders/grub-x86_64.efi", ], } # look for bootloaders at the glob locations above found_bootloaders: List[str] = [] items = list(bootloaders.keys()) for loader_name in items: patterns = bootloaders[loader_name] for pattern in patterns: matches = glob.glob(pattern) if len(matches) > 0: found_bootloaders.append(loader_name) not_found: List[str] = [] # invert the list of what we've found so we can report on what we haven't found for loader_name in items: if loader_name not in found_bootloaders: not_found.append(loader_name) if len(not_found) > 0: status.append( "some network boot-loaders are missing from /var/lib/cobbler/loaders. If you only want to " "handle x86/x86_64 netbooting, you may ensure that you have installed a *recent* version " "of the syslinux package installed and can ignore this message entirely. Files in this " "directory, should you want to support all architectures, should include pxelinux.0, and" "menu.c32." )
[docs] def check_tftpd_dir(self, status: List[str]) -> None: """ Check if cobbler.conf's tftpboot directory exists :param status: The status list with possible problems. """ if self.checked_family == "debian": return bootloc = self.settings.tftpboot_location if not os.path.exists(bootloc): status.append(f"please create directory: {bootloc}")
[docs] def check_ctftpd_dir(self, status: List[str]) -> None: """ Check if ``cobbler.conf``'s tftpboot directory exists. :param status: The status list with possible problems. """ if self.checked_family == "debian": return bootloc = self.settings.tftpboot_location if not os.path.exists(bootloc): status.append(f"please create directory: {bootloc}")
[docs] def check_rsync_conf(self, status: List[str]) -> None: """ Check that rsync is enabled to autostart. :param status: The status list with possible problems. """ if self.checked_family == "debian": return if os.path.exists("/usr/lib/systemd/system/rsyncd.service"): if not os.path.exists( "/etc/systemd/system/multi-user.target.wants/rsyncd.service" ): status.append("enable and start rsyncd.service with systemctl")
[docs] def check_dhcpd_conf(self, status: List[str]) -> None: """ NOTE: this code only applies if Cobbler is *NOT* set to generate a ``dhcp.conf`` file. Check that dhcpd *appears* to be configured for pxe booting. We can't assure file correctness. Since a Cobbler user might have dhcp on another server, it's okay if it's not there and/or not configured correctly according to automated scans. :param status: The status list with possible problems. """ if self.settings.manage_dhcp: return if os.path.exists(self.settings.dhcpd_conf): match_next = False match_file = False with open(self.settings.dhcpd_conf, encoding="UTF-8") as dhcpd_conf_fd: for line in dhcpd_conf_fd.readlines(): if line.find("next-server") != -1: match_next = True if line.find("filename") != -1: match_file = True if not match_next: status.append( f"expecting next-server entry in {self.settings.dhcpd_conf}" ) if not match_file: status.append(f"missing file: {self.settings.dhcpd_conf}") else: status.append(f"missing file: {self.settings.dhcpd_conf}")