Source code for cobbler.modules.managers.dnsmasq

"""
This is some of the code behind 'cobbler sync'.
"""

# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2006-2009, Red Hat, Inc and Others
# SPDX-FileCopyrightText: Michael DeHaan <michael.dehaan AT gmail>
# SPDX-FileCopyrightText: John Eckersberg <jeckersb@redhat.com>

import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from cobbler import utils
from cobbler.modules.managers import DhcpManagerModule, DnsManagerModule

if TYPE_CHECKING:
    from cobbler.api import CobblerAPI
    from cobbler.items.distro import Distro
    from cobbler.items.system import System


MANAGER = None


[docs]def register() -> str: """ The mandatory Cobbler modules registration hook. :return: Always "manage". """ return "manage"
class _DnsmasqManager(DnsManagerModule, DhcpManagerModule): """ Handles conversion of internal state to the tftpboot tree layout. """ @staticmethod def what() -> str: """ This identifies the module. :return: Will always return ``dnsmasq``. """ return "dnsmasq" def __init__(self, api: "CobblerAPI"): super().__init__(api) self.config: Dict[str, Any] = {} self.cobbler_hosts_file = self.api.settings().dnsmasq_hosts_file self.ethers_file = self.api.settings().dnsmasq_ethers_file utils.create_files_if_not_existing([self.cobbler_hosts_file, self.ethers_file]) def write_configs(self) -> None: """ DHCP files are written when ``manage_dhcp`` is set in our settings. :raises OSError :raises ValueError """ self.config = self.gen_full_config() self._write_configs(self.config) def _write_configs(self, config_data: Optional[Dict[Any, Any]] = None) -> None: """ Internal function to write DHCP files. :raises OSError :raises ValueError """ if not config_data: raise ValueError("No config to write.") settings_file = "/etc/dnsmasq.conf" template_file = "/etc/cobbler/dnsmasq.template" try: with open(template_file, "r", encoding="UTF-8") as template_file_fd: template_data = template_file_fd.read() except Exception as error: raise OSError(f"error writing template to file: {template_file}") from error config_copy = config_data.copy() # template rendering changes the passed dict self.logger.info("Writing %s", settings_file) self.templar.render(template_data, config_copy, settings_file) def gen_full_config(self) -> Dict[str, str]: """Generate DHCP configuration for all systems.""" system_definitions: Dict[str, str] = {} for system in self.systems: system_config = self._gen_system_config(system) system_definitions = utils.merge_dicts_recursive( system_definitions, system_config, str_append=True ) metadata = { "insert_cobbler_system_definitions": system_definitions.get("default", ""), "date": time.asctime(time.gmtime()), "cobbler_server": self.settings.server, "next_server_v4": self.settings.next_server_v4, "next_server_v6": self.settings.next_server_v6, "addn_host_file": self.cobbler_hosts_file, } # now add in other DHCP expansions that are not tagged with "default" for dhcp_tag, system_str in system_definitions.items(): if dhcp_tag == "default": continue metadata[f"insert_cobbler_system_definitions_{dhcp_tag}"] = system_str return metadata def remove_single_system(self, system_obj: "System") -> None: """ This method removes a single system. :param system_obj: System to be removed. """ if not self.config: self.config = self.gen_full_config() system_config = self._gen_system_config(system_obj) for dhcp_tag, system_iface_config in system_config.items(): config_key = "insert_cobbler_system_definitions" if dhcp_tag != "default": config_key = f"insert_cobbler_system_definitions_{dhcp_tag}" if system_iface_config not in self.config[config_key]: continue self.config[config_key] = self.config[config_key].replace( system_iface_config, "" ) self._write_configs(self.config) self.remove_single_ethers_entry(system_obj) def _gen_system_config( self, system_obj: "System", ) -> Dict[str, str]: """ Generate dnsmasq config for a single system. :param system_obj: System to generate dnsmasq config for """ # we used to just loop through each system, but now we must loop # through each network interface of each system. system_definitions: Dict[str, str] = {} if not system_obj.is_management_supported(cidr_ok=False): self.logger.debug( "%s does not meet precondition: MAC, IPv4, or IPv6 address is required.", system_obj.name, ) return {} profile = system_obj.get_conceptual_parent() if profile is None: raise ValueError("Profile for system not found!") distro: Optional["Distro"] = profile.get_conceptual_parent() # type: ignore if distro is None: raise ValueError("Distro for system not found!") for interface in system_obj.interfaces.values(): mac = interface.mac_address ip_address = interface.ip_address host = interface.dns_name ipv6 = interface.ipv6_address if not mac: # can't write a DHCP entry for this system continue # In many reallife situations there is a need to control the IP address and hostname for a specific # client when only the MAC address is available. In addition to that in some scenarios there is a need # to explicitly label a host with the applicable architecture in order to correctly handle situations # where we need something other than ``pxelinux.0``. So we always write a dhcp-host entry with as much # info as possible to allow maximum control and flexibility within the dnsmasq config. systxt = "dhcp-host=net:" + distro.arch.value.lower() + "," + mac if host != "": systxt += "," + host if ip_address != "": systxt += "," + ip_address if ipv6 != "": systxt += f",[{ipv6}]" systxt += "\n" dhcp_tag = interface.dhcp_tag if dhcp_tag == "": dhcp_tag = "default" if dhcp_tag not in system_definitions: system_definitions[dhcp_tag] = "" system_definitions[dhcp_tag] = system_definitions[dhcp_tag] + systxt return system_definitions def _find_unique_dhcp_entries( self, dhcp_entries: str, config_key: str ) -> Tuple[str, str]: """ This method checks the dhcp entries in the current config and returns only those that are unique. This is necessary because the sync_single_system method is used for both adding and modifying a single system. """ unique_entries = "" duplicate_entries = "" for dhcp_entry in dhcp_entries.split("\n"): if dhcp_entry and dhcp_entry not in self.config[config_key]: unique_entries += f"{dhcp_entry}\n" else: duplicate_entries += f"{dhcp_entry}\n" return unique_entries, duplicate_entries def _parse_mac_from_dnsmasq_entries(self, entries: str) -> List[str]: return [entry.split(",")[1] for entry in entries.split("\n") if entry] def sync_single_system(self, system: "System"): """ Synchronize data for a single system. :param system: A system to be added. """ if not self.config: # cache miss, need full sync for consistent data self.regen_ethers() return self.sync() system_config = self._gen_system_config(system) updated_dhcp_entries: Dict[str, str] = {} duplicate_dhcp_entries = "" for iface_tag, dhcp_entries in system_config.items(): config_key = "insert_cobbler_system_definitions" if iface_tag != "default": config_key = f"insert_cobbler_system_definitions_{iface_tag}" ( unique_dhcp_entries, duplicate_dhcp_entries, ) = self._find_unique_dhcp_entries(dhcp_entries, config_key) updated_dhcp_entries[config_key] = unique_dhcp_entries self.config = utils.merge_dicts_recursive( self.config, {config_key: unique_dhcp_entries}, str_append=True ) if all(not dhcp_entry for dhcp_entry in updated_dhcp_entries.values()): # No entries were updated. Therefore, User removed # or modified already existing mac address and we don't # know which mac address that was. Consequently, trigger # a full sync to keep DNS and DHCP entries consistent. self.regen_ethers() return self.sync() duplicate_macs = self._parse_mac_from_dnsmasq_entries(duplicate_dhcp_entries) self.sync_single_ethers_entry(system, duplicate_macs) self.config["date"] = time.asctime(time.gmtime()) self._write_configs(self.config) return self.restart_service() def regen_ethers(self) -> None: """ This function regenerates the ethers file. To get more information please read ``man ethers``, the format is also in there described. """ # dnsmasq knows how to read this database of MACs -> IPs, so we'll keep it up to date every time we add a # system. with open(self.ethers_file, "w", encoding="UTF-8") as ethers_fh: for system in self.systems: ethers_entry = self._gen_single_ethers_entry(system) if ethers_entry: ethers_fh.write(ethers_entry) def _gen_single_ethers_entry( self, system_obj: "System", duplicate_macs: Optional[List[str]] = None ): """ Generate an ethers entry for a system, such as: 00:1A:2B:3C:4D:5E\t1.2.3.4\n 01:2B:3C:4D:5E:6F\t1.2.4.4\n """ if not system_obj.is_management_supported(cidr_ok=False): self.logger.debug( "%s does not meet precondition: MAC, IPv4, or IPv6 address is required.", system_obj.name, ) return "" output = "" for interface in system_obj.interfaces.values(): mac = interface.mac_address ip_address = interface.ip_address if not mac: # can't write this w/o a MAC address continue if duplicate_macs and mac in duplicate_macs: # explicitly skipping mac address continue if ip_address != "": output += mac.upper() + "\t" + ip_address + "\n" return output def sync_single_ethers_entry( self, system: "System", duplicate_macs: Optional[List[str]] = None ): """ This appends a new single system entry to the ethers file. :param system: A system to be added. """ # dnsmasq knows how to read this database of MACs -> IPs, so we'll keep it up to date every time we add a # system. with open(self.ethers_file, "a", encoding="UTF-8") as ethers_fh: host_entry = self._gen_single_ethers_entry(system, duplicate_macs) if host_entry: ethers_fh.write(host_entry) def remove_single_ethers_entry( self, system: "System", ): """ This adds a new single system entry to the ethers file. :param system: A system to be removed. """ # dnsmasq knows how to read this database of MACs -> IPs, so we'll keep it up to date every time we add a # system. ethers_entry = self._gen_single_ethers_entry(system) if not ethers_entry: return mac_addresses = self._extract_mac_from_ethers_entry(ethers_entry) utils.remove_lines_in_file(self.ethers_file, mac_addresses) def _extract_mac_from_ethers_entry(self, ethers_entry: str) -> List[str]: """ One ethers entry can contain multiple MAC addresses. This method transforms: 00:1A:2B:3C:4D:5E\t1.2.3.4\n 01:2B:3C:4D:5E:6F\t1.2.4.4\n To: ["00:1A:2B:3C:4D:5E", "01:2B:3C:4D:5E:6F"] """ return [line.split("\t")[0] for line in ethers_entry.split("\n") if line] def remove_single_hosts_entry(self, system: "System"): """ This removes a single system entry from the Cobbler hosts file. :param system: A system to be removed. """ host_entry = self._gen_single_host_entry(system) if not host_entry: return utils.remove_lines_in_file(self.cobbler_hosts_file, [host_entry]) def _gen_single_host_entry( self, system_obj: "System", ): if not system_obj.is_management_supported(cidr_ok=False): self.logger.debug( "%s does not meet precondition: MAC, IPv4, or IPv6 address is required.", system_obj.name, ) return "" output = "" for _, interface in system_obj.interfaces.items(): mac = interface.mac_address host = interface.dns_name cnames = " ".join(interface.cnames) ipv4 = interface.ip_address ipv6 = interface.ipv6_address if not mac: continue if host != "" and ipv6 != "": output += ipv6 + "\t" + host elif host != "" and ipv4 != "": output += ipv4 + "\t" + host if cnames: output += " " + cnames + "\n" else: output += "\n" return output def add_single_hosts_entry( self, system: "System", ): """ This adds a single system entry to the hosts file. :param system: A system to be added. """ host_entries = self._gen_single_host_entry(system) if not host_entries: return # This method can be used for editing, so # remove duplicate entries first with open(self.cobbler_hosts_file, "r", encoding="UTF-8") as fd: for host_line in fd: host_line = host_line.strip() if host_line and host_line in host_entries: host_entries = host_entries.replace(f"{host_line}\n", "") if not host_entries.strip(): # No new entries present, we should trigger a full sync self.regen_hosts() return with open(self.cobbler_hosts_file, "a", encoding="UTF-8") as regen_hosts_fd: regen_hosts_fd.write(host_entries) def regen_hosts(self) -> None: """ This rewrites the hosts file and thus also rewrites the dns config. """ # dnsmasq knows how to read this database for host info (other things may also make use of this later) with open(self.cobbler_hosts_file, "w", encoding="UTF-8") as regen_hosts_fd: for system in self.systems: host_entry = self._gen_single_host_entry(system) if host_entry: regen_hosts_fd.write(host_entry) def restart_service(self) -> int: """ This restarts the dhcp server and thus applied the newly written config files. """ service_name = "dnsmasq" if self.settings.restart_dhcp: return_code_service_restart = utils.process_management.service_restart( service_name ) if return_code_service_restart != 0: self.logger.error("%s service failed", service_name) return return_code_service_restart return 0
[docs]def get_manager(api: "CobblerAPI") -> _DnsmasqManager: """ Creates a manager object to manage a dnsmasq server. :param api: The API to resolve all information with. :return: The object generated from the class. """ # Singleton used, therefore ignoring 'global' global MANAGER # pylint: disable=global-statement if not MANAGER: MANAGER = _DnsmasqManager(api) # type: ignore return MANAGER