Source code for cobbler.modules.managers.isc

"""
This is some of the code behind 'cobbler sync'.
"""

# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2006-2009, Red Hat, Inc and Others
# SPDX-FileCopyrightText: Michael DeHaan <michael.dehaan AT gmail>
# SPDX-FileCopyrightText: John Eckersberg <jeckersb@redhat.com>

import shutil
import time
from typing import TYPE_CHECKING, Any, Dict, Optional, Set

from cobbler import enums, utils
from cobbler.enums import Archs
from cobbler.modules.managers import DhcpManagerModule
from cobbler.utils import process_management

if TYPE_CHECKING:
    from cobbler.api import CobblerAPI
    from cobbler.items.distro import Distro
    from cobbler.items.profile import Profile
    from cobbler.items.system import NetworkInterface, System

MANAGER = None


[docs]def register() -> str: """ The mandatory Cobbler module registration hook. """ return "manage"
class _IscManager(DhcpManagerModule): @staticmethod def what() -> str: """ Static method to identify the manager. :return: Always "isc". """ return "isc" def __init__(self, api: "CobblerAPI"): super().__init__(api) self.settings_file_v4 = utils.dhcpconf_location(enums.DHCP.V4) self.settings_file_v6 = utils.dhcpconf_location(enums.DHCP.V6) # cache config to allow adding systems incrementally self.config: Dict[str, Any] = {} self.generic_entry_cnt = 0 def sync_single_system(self, system: "System"): """ Update the config with data for a single system, write it to the filesysemt, and restart DHCP service. :param system: System object to generate the config for. """ if not self.config: # cache miss, need full sync for consistent data return self.sync() profile: Optional["Profile"] = system.get_conceptual_parent() # type: ignore distro: Optional["Distro"] = profile.get_conceptual_parent() # type: ignore blend_data = utils.blender(self.api, False, system) system_config = self._gen_system_config(system, blend_data, distro) if all( mac in self.config.get("dhcp_tags", {}).get(dhcp_tag, {}) for dhcp_tag, interface in system_config.items() for mac in interface ): # All interfaces in the added system are already cached. Therefore, # user might have removed an interface and we don't know which. # Trigger full sync. return self.sync() self.config = utils.merge_dicts_recursive( self.config, {"dhcp_tags": system_config}, ) self.config["date"] = time.asctime(time.gmtime()) self._write_configs(self.config) return self.restart_service() def remove_single_system(self, system_obj: "System") -> None: if not self.config: self.write_configs() return profile: Optional["Profile"] = system_obj.get_conceptual_parent() # type: ignore distro: Optional["Distro"] = profile.get_conceptual_parent() # type: ignore blend_data = utils.blender(self.api, False, system_obj) system_config = self._gen_system_config(system_obj, blend_data, distro) for dhcp_tag, mac_addresses in system_config.items(): for mac_address in mac_addresses: self.config.get("dhcp_tags", {}).get(dhcp_tag, {}).pop(mac_address, "") self.config["date"] = time.asctime(time.gmtime()) self._write_configs(self.config) self.restart_service() def _gen_system_config( self, system_obj: "System", system_blend_data: Dict[str, Any], distro_obj: Optional["Distro"], ) -> Dict[str, Any]: """ Generate DHCP config for a single system. :param system_obj: System to generate DHCP config for :param system_blend_data: utils.blender() data for the System :param distro_object: Optional, is used to access distro-specific information like arch when present """ dhcp_tags: Dict[str, Any] = {"default": {}} processed_system_master_interfaces: Set[str] = set() ignore_macs: Set[str] = set() if not system_obj.is_management_supported(cidr_ok=False): self.logger.debug( "%s does not meet precondition: MAC, IPv4, or IPv6 address is required.", system_obj.name, ) return {} profile: Optional["Profile"] = system_obj.get_conceptual_parent() # type: ignore for iface_name, iface_obj in system_obj.interfaces.items(): iface = iface_obj.to_dict() mac = iface_obj.mac_address if ( not self.settings.always_write_dhcp_entries and not system_blend_data["netboot_enabled"] and iface["static"] ): continue if not mac: self.logger.warning("%s has no MAC address", system_obj.name) continue iface["gateway"] = iface_obj.if_gateway or system_obj.gateway if iface["interface_type"] in ( "bond_slave", "bridge_slave", "bonded_bridge_slave", ): if iface["interface_master"] not in system_obj.interfaces: # Can't write DHCP entry: master interface does not exist continue master_name = iface["interface_master"] master_iface = system_obj.interfaces[master_name] # There may be multiple bonded interfaces, need composite index system_master_name = f"{system_obj.name}-{master_name}" if system_master_name not in processed_system_master_interfaces: processed_system_master_interfaces.add(system_master_name) else: ignore_macs.add(mac) # IPv4 iface["netmask"] = master_iface.netmask iface["ip_address"] = master_iface.ip_address if not iface["ip_address"]: iface["ip_address"] = self._find_ip_addr( system_obj.interfaces, prefix=master_name, ip_version="ipv4" ) # IPv6 iface["ipv6_address"] = master_iface.ipv6_address if not iface["ipv6_address"]: iface["ipv6_address"] = self._find_ip_addr( system_obj.interfaces, prefix=master_name, ip_version="ipv6" ) # common host = master_iface.dns_name dhcp_tag = master_iface.dhcp_tag else: # TODO: simplify _slave / non_slave branches host = iface["dns_name"] dhcp_tag = iface["dhcp_tag"] if distro_obj is not None: iface["distro"] = distro_obj.to_dict() if profile is not None: iface["profile"] = profile.to_dict() # type: ignore if host: if iface_name == "eth0": iface["name"] = host else: iface["name"] = f"{host}-{iface_name}" else: self.generic_entry_cnt += 1 iface["name"] = f"generic{self.generic_entry_cnt:d}" for key in ( "next_server_v6", "next_server_v4", "filename", "netboot_enabled", "hostname", "enable_ipxe", "name_servers", ): iface[key] = system_blend_data[key] iface["owner"] = system_blend_data["name"] # esxi if distro_obj is not None and distro_obj.os_version.startswith("esxi"): iface["filename_esxi"] = ( "esxi/system", # config filename can be None system_obj.get_config_filename(interface=iface_name, loader="pxe") or "", "mboot.efi", ) elif distro_obj is not None and not iface["filename"]: if distro_obj.arch in ( Archs.PPC, Archs.PPC64, Archs.PPC64LE, Archs.PPC64EL, ): iface["filename"] = "grub/grub.ppc64le" elif distro_obj.arch == Archs.AARCH64: iface["filename"] = "grub/grubaa64.efi" if not dhcp_tag: dhcp_tag = system_blend_data.get("dhcp_tag", "") if dhcp_tag == "": dhcp_tag = "default" if dhcp_tag not in dhcp_tags: dhcp_tags[dhcp_tag] = {mac: iface} else: dhcp_tags[dhcp_tag][mac] = iface for macs in dhcp_tags.values(): for mac in macs: if mac in ignore_macs: del macs[mac] return dhcp_tags def _find_ip_addr( self, interfaces: Dict[str, "NetworkInterface"], prefix: str, ip_version: str, ) -> str: """Find the first interface with an IP address that begins with prefix.""" if ip_version.lower() == "ipv4": attr_name = "ip_address" elif ip_version.lower() == "ipv6": attr_name = "ipv6_address" else: return "" for name, obj in interfaces: if name.startswith(prefix + ".") and hasattr(obj, attr_name): return getattr(obj, attr_name) return "" def gen_full_config(self) -> Dict[str, Any]: """Generate DHCP configuration for all systems.""" dhcp_tags: Dict[str, Any] = {"default": {}} self.generic_entry_cnt = 0 for system in self.systems: profile: Optional["Profile"] = system.get_conceptual_parent() # type: ignore if profile is None: continue distro: Optional["Distro"] = profile.get_conceptual_parent() # type: ignore blended_system = utils.blender(self.api, False, system) new_tags = self._gen_system_config(system, blended_system, distro) dhcp_tags = utils.merge_dicts_recursive(dhcp_tags, new_tags) metadata = { "date": time.asctime(time.gmtime()), "cobbler_server": f"{self.settings.server}:{self.settings.http_port}", "next_server_v4": self.settings.next_server_v4, "next_server_v6": self.settings.next_server_v6, "dhcp_tags": dhcp_tags, } return metadata def _write_config( self, config_data: Dict[Any, Any], template_file: str, settings_file: str, ) -> None: """DHCP files are written when ``manage_dhcp_v4`` or ``manage_dhcp_v6`` is set in the settings for the respective version. DHCPv4 files are written when ``manage_dhcp_v4`` is set in our settings. :param config_data: DHCP data to write. :param template_file: The location of the DHCP template. :param settings_file: The location of the final config file. """ try: with open(template_file, "r", encoding="UTF-8") as template_fd: template_data = template_fd.read() except OSError as e: self.logger.error("Can't read dhcp template '%s':\n%s", template_file, e) return config_copy = config_data.copy() # template rendering changes the passed dict self.logger.info("Writing %s", settings_file) self.templar.render(template_data, config_copy, settings_file) def write_v4_config( self, config_data: Optional[Dict[Any, Any]] = None, template_file: str = "/etc/cobbler/dhcp.template", ): """Write DHCP files for IPv4. :param config_data: DHCP data to write. :param template_file: The location of the DHCP template. :param settings_file: The location of the final config file. """ if not config_data: raise ValueError("No config to write.") self._write_config(config_data, template_file, self.settings_file_v4) def write_v6_config( self, config_data: Optional[Dict[Any, Any]] = None, template_file: str = "/etc/cobbler/dhcp6.template", ): """Write DHCP files for IPv6. :param config_data: DHCP data to write. :param template_file: The location of the DHCP template. :param settings_file: The location of the final config file. """ if not config_data: raise ValueError("No config to write.") self._write_config(config_data, template_file, self.settings_file_v6) def restart_dhcp(self, service_name: str, version: int) -> int: """ This syncs the dhcp server with it's new config files. Basically this restarts the service to apply the changes. :param service_name: The name of the DHCP service. """ dhcpd_path = shutil.which(service_name) if dhcpd_path is None: self.logger.error("%s path could not be found", service_name) return -1 return_code_service_restart = utils.subprocess_call( [dhcpd_path, f"-{version}", "-t", "-q"], shell=False ) if return_code_service_restart != 0: self.logger.error("Testing config - %s -t failed", service_name) if version == 4: return_code_service_restart = process_management.service_restart( service_name ) else: return_code_service_restart = process_management.service_restart( f"{service_name}{version}" ) if return_code_service_restart != 0: self.logger.error("%s service failed", service_name) return return_code_service_restart def write_configs(self) -> None: """ DHCP files are written when ``manage_dhcp`` is set in our settings. :raises OSError :raises ValueError """ self.generic_entry_cnt = 0 self.config = self.gen_full_config() self._write_configs(self.config) def _write_configs(self, data: Optional[Dict[Any, Any]] = None) -> None: if not data: raise ValueError("No config to write.") if self.settings.manage_dhcp_v4: self.write_v4_config(data) if self.settings.manage_dhcp_v6: self.write_v6_config(data) def restart_service(self) -> int: if not self.settings.restart_dhcp: return 0 # Even if one fails, try both and return an error ret = 0 service = utils.dhcp_service_name() if self.settings.manage_dhcp_v4: ret |= self.restart_dhcp(service, 4) if self.settings.manage_dhcp_v6: ret |= self.restart_dhcp(service, 6) return ret
[docs]def get_manager(api: "CobblerAPI") -> _IscManager: """ Creates a manager object to manage an isc dhcp server. :param api: The API which holds all information in the current Cobbler instance. :return: The object to manage the server with. """ # Singleton used, therefore ignoring 'global' global MANAGER # pylint: disable=global-statement if not MANAGER: MANAGER = _IscManager(api) # type: ignore return MANAGER