Source code for cobbler.modules.nsupdate_add_system_post

"""
Replace (or remove) records in DNS zone for systems created (or removed) by Cobbler
"""

# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Adrian Brzezinski <adrbxx@gmail.com>

# DNS toolkit for Python
#   - python-dnspython (Debian)
#   - python-dns (RH/CentOS)

import time
from typing import IO, TYPE_CHECKING, Any, List, Optional

import dns.query
import dns.resolver
import dns.tsigkeyring
import dns.update

from cobbler.cexceptions import CX

if TYPE_CHECKING:
    from cobbler.api import CobblerAPI


LOGF: Optional[IO[str]] = None


[docs]def nslog(msg: str) -> None: """ Log a message to the logger. :param msg: The message to log. """ if LOGF is not None: LOGF.write(msg)
[docs]def register() -> str: """ This method is the obligatory Cobbler registration hook. :return: The trigger name or an empty string. """ if __name__ == "cobbler.modules.nsupdate_add_system_post": return "/var/lib/cobbler/triggers/add/system/post/*" if __name__ == "cobbler.modules.nsupdate_delete_system_pre": return "/var/lib/cobbler/triggers/delete/system/pre/*" return ""
[docs]def run(api: "CobblerAPI", args: List[Any]): """ This method executes the trigger, meaning in this case that it updates the dns configuration. :param api: The api to read metadata from. :param args: Metadata to log. :return: "0" on success or a skipped task. If the task failed or problems occurred then an exception is raised. """ # Module level log file descriptor global LOGF # pylint: disable=global-statement action = None if __name__ == "cobbler.modules.nsupdate_add_system_post": action = "replace" elif __name__ == "cobbler.modules.nsupdate_delete_system_pre": action = "delete" else: return 0 settings = api.settings() if not settings.nsupdate_enabled: return 0 # read our settings if str(settings.nsupdate_log) is not None: LOGF = open(str(settings.nsupdate_log), "a", encoding="UTF-8") # type: ignore nslog(f">> starting {__name__} {args}\n") if str(settings.nsupdate_tsig_key) is not None: keyring = dns.tsigkeyring.from_text( {str(settings.nsupdate_tsig_key[0]): str(settings.nsupdate_tsig_key[1])} ) else: keyring = None if str(settings.nsupdate_tsig_algorithm) is not None: keyring_algo = str(settings.nsupdate_tsig_algorithm) else: keyring_algo = "HMAC-MD5.SIG-ALG.REG.INT" # nslog( " algo %s, key %s : %s \n" % (keyring_algo,str(settings.nsupdate_tsig_key[0]), # str(settings.nsupdate_tsig_key[1])) ) # get information about this system system = api.find_system(args[0]) if system is None or isinstance(system, list): raise ValueError("Search result was ambigous!") # process all interfaces and perform dynamic update for those with --dns-name for (name, interface) in system.interfaces.items(): host = interface.dns_name host_ip = interface.ip_address if not system.is_management_supported(cidr_ok=False): continue if not host: continue if host.find(".") == -1: continue domain = ".".join(host.split(".")[1:]) # get domain from host name host = host.split(".")[0] # strip domain nslog(f"processing interface {name} : {interface}\n") nslog(f"lookup for '{domain}' domain master nameserver... ") # get master nameserver ip address answers = dns.resolver.query(domain + ".", dns.rdatatype.SOA) # type: ignore soa_mname = answers[0].mname # type: ignore soa_mname_ip = None for rrset in answers.response.additional: # type: ignore if rrset.name == soa_mname: # type: ignore soa_mname_ip = str(rrset.items[0].address) # type: ignore if soa_mname_ip is None: ip_address = dns.resolver.query(soa_mname, "A") # type: ignore for answer in ip_address: # type: ignore soa_mname_ip = answer.to_text() # type: ignore nslog(f"{soa_mname} [{soa_mname_ip}]\n") nslog(f"{action} dns record for {host}.{domain} [{host_ip}] .. ") # try to update zone with new record update = dns.update.Update( domain + ".", keyring=keyring, keyalgorithm=keyring_algo # type: ignore ) if action == "replace": update.replace(host, 3600, dns.rdatatype.A, host_ip) # type: ignore update.replace( # type: ignore host, 3600, dns.rdatatype.TXT, # type: ignore f'"cobbler (date: {time.strftime("%c")})"', ) else: update.delete(host, dns.rdatatype.A, host_ip) # type: ignore update.delete(host, dns.rdatatype.TXT) # type: ignore try: response = dns.query.tcp(update, soa_mname_ip) # type: ignore rcode_txt = dns.rcode.to_text(response.rcode()) # type: ignore except dns.tsig.PeerBadKey as error: # type: ignore nslog("failed (refused key)\n>> done\n") if LOGF is not None: LOGF.close() raise CX( f"nsupdate failed, server '{soa_mname}' refusing our key" ) from error nslog(f"response code: {rcode_txt}\n") # notice user about update failure if response.rcode() != dns.rcode.NOERROR: # type: ignore nslog(">> done\n") if LOGF is not None: LOGF.close() raise CX( f"nsupdate failed (response: {rcode_txt}, name: {host}.{domain}, ip {host_ip}, name server {soa_mname})" ) nslog(">> done\n") if LOGF is not None: LOGF.close() return 0