Source code for cobbler.services

"""
Mod Python service functions for Cobbler's public interface (aka cool stuff that works with wget/curl)

Changelog:

Schema: From -> To

Current Schema: Please refer to the documentation visible of the individual methods.

V3.4.0 (unreleased)
    * No changes

V3.3.4 (unreleased)
    * No changes

V3.3.3
    * Removed:
        * ``look``

V3.3.2
    * No changes

V3.3.1
    * No changes

V3.3.0
    * Added:
        * ``settings``
    * Changed:
        * ``gpxe``: Renamed to ``ipxe``

V3.2.2
    * No changes

V3.2.1
    * No changes

V3.2.0
    * No changes

V3.1.2
    * No changes

V3.1.1
    * No changes

V3.1.0
    * No changes

V3.0.1
    * No changes

V3.0.0
    * Added:
        * ``autoinstall``
        * ``find_autoinstall``

V2.8.5
    * Inital tracking of changes.

"""

# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: based on code copyright 2007 Albert P. Tobey <tobert@gmail.com>
# SPDX-FileCopyrightText: additions: 2007-2009 Michael DeHaan <michael.dehaan AT gmail>

import json
import time
import xmlrpc.client
from typing import Any, Callable, Dict, List, Optional, Union
from urllib import parse

import yaml

from cobbler import download_manager


[docs]class CobblerSvc: """ Interesting mod python functions are all keyed off the parameter mode, which defaults to index. All options are passed as parameters into the function. """ def __init__(self, server: str = "") -> None: """ Default constructor which sets up everything to be ready. :param server: The domain to run at. :param req: This parameter is unused. """ self.server = server self.__remote: Optional[xmlrpc.client.Server] = None self.dlmgr = download_manager.DownloadManager() @property def remote(self) -> xmlrpc.client.ServerProxy: """ Sets up the connection to the Cobbler XMLRPC server. This is the version that does not require a login. """ if self.__remote is None: self.__remote = xmlrpc.client.Server(self.server, allow_none=True) return self.__remote
[docs] def settings(self, **kwargs: Any) -> str: """ Get the application configuration. :return: Settings object. """ return json.dumps(self.remote.get_settings(), indent=4)
[docs] def index(self, **args: Any) -> str: """ Just a placeholder method as an entry point. :param args: This parameter is unused. :return: "no mode specified" """ return "no mode specified"
[docs] def autoinstall( self, profile: Optional[str] = None, system: Optional[str] = None, REMOTE_ADDR: Optional[str] = None, REMOTE_MAC: Optional[str] = None, **rest: Any, ) -> str: """ Generate automatic installation files. :param profile: :param system: :param REMOTE_ADDR: :param REMOTE_MAC: :param rest: This parameter is unused. :return: """ data = self.remote.generate_autoinstall( profile, system, REMOTE_ADDR, REMOTE_MAC ) if isinstance(data, str): return data return "ERROR: Server returned unexpected data!"
[docs] def ks( self, profile: Optional[str] = None, system: Optional[str] = None, REMOTE_ADDR: Optional[str] = None, REMOTE_MAC: Optional[str] = None, **rest: Any, ) -> str: """ Generate automatic installation files. This is a legacy function for part backward compatibility to 2.6.6 releases. :param profile: :param system: :param REMOTE_ADDR: :param REMOTE_MAC: :param rest: This parameter is unused. :return: """ data = self.remote.generate_autoinstall( profile, system, REMOTE_ADDR, REMOTE_MAC ) if isinstance(data, str): return data return "ERROR: Server returned unexpected data!"
[docs] def ipxe( self, profile: Optional[str] = None, image: Optional[str] = None, system: Optional[str] = None, mac: Optional[str] = None, **rest: Any, ) -> str: """ Generates an iPXE configuration. :param profile: A profile. :param image: An image. :param system: A system. :param mac: A MAC address. :param rest: This parameter is unused. """ if not system and mac: query = {"mac_address": mac} if profile: query["profile"] = profile elif image: query["image"] = image # mypy and xmlrpc don't play well together found: List[Any] = self.remote.find_system(query) # type: ignore if found: system = found[0] data = self.remote.generate_ipxe(profile, image, system) if isinstance(data, str): return data return "ERROR: Server returned unexpected data!"
[docs] def bootcfg( self, profile: Optional[str] = None, system: Optional[str] = None, **rest: Any ) -> str: """ Generate a boot.cfg config file. Used primarily for VMware ESXi. :param profile: :param system: :param rest: This parameter is unused. :return: """ data = self.remote.generate_bootcfg(profile, system) if isinstance(data, str): return data return "ERROR: Server returned unexpected data!"
[docs] def script( self, profile: Optional[str] = None, system: Optional[str] = None, **rest: Any ) -> str: """ Generate a script based on snippets. Useful for post or late-action scripts where it's difficult to embed the script in the response file. :param profile: The profile to generate the script for. :param system: The system to generate the script for. :param rest: This may contain a parameter with the key "query_string" which has a key "script" which may be an array. The element from position zero is taken. :return: The generated script. """ data = self.remote.generate_script( profile, system, rest["query_string"]["script"][0] ) if isinstance(data, str): return data return "ERROR: Server returned unexpected data!"
[docs] def events(self, user: str = "", **rest: Any) -> str: """ If no user is given then all events are returned. Otherwise only event associated to a user are returned. :param user: Filter the events for a given user. :param rest: This parameter is unused. :return: A JSON object which contains all events. """ if user == "": data = self.remote.get_events("") else: data = self.remote.get_events(user) if not isinstance(data, dict): raise ValueError("Server returned incorrect data!") # sort it... it looks like { timestamp : [ array of details ] } keylist = list(data.keys()) keylist.sort() results: List[List[Union[str, float]]] = [] for k in keylist: etime = int(data[k][0]) nowtime = time.time() if (nowtime - etime) < 30: results.append([k, data[k][0], data[k][1], data[k][2]]) return json.dumps(results)
[docs] def template( self, profile: Optional[str] = None, system: Optional[str] = None, path: Optional[str] = None, **rest: Any, ) -> str: """ Generate a templated file for the system. Either specify a profile OR a system. :param profile: The profile to provide for the generation of the template. :param system: The system to provide for the generation of the template. :param path: The path to the template. :param rest: This parameter is unused. :return: The rendered template. """ if path is not None: path = path.replace("_", "/") path = path.replace("//", "_") else: return "# must specify a template path" if profile is not None: data = self.remote.get_template_file_for_profile(profile, path) elif system is not None: data = self.remote.get_template_file_for_system(system, path) else: data = "# must specify profile or system name" if not isinstance(data, str): raise ValueError("Server returned an unexpected data type!") return data
[docs] def yum( self, profile: Optional[str] = None, system: Optional[str] = None, **rest: Any ) -> str: """ Generate a repo config. Either specify a profile OR a system. :param profile: The profile to provide for the generation of the template. :param system: The system to provide for the generation of the template. :param rest: This parameter is unused. :return: The generated repository config. """ if profile is not None: data = self.remote.get_repo_config_for_profile(profile) elif system is not None: data = self.remote.get_repo_config_for_system(system) else: data = "# must specify profile or system name" if not isinstance(data, str): raise ValueError("Server returned an unexpected data type!") return data
[docs] def trig( self, mode: str = "?", profile: Optional[str] = None, system: Optional[str] = None, REMOTE_ADDR: Optional[str] = None, **rest: Any, ) -> str: """ Hook to call install triggers. Only valid for a profile OR a system. :param mode: Can be "pre", "post" or "firstboot". Everything else is invalid. :param profile: The profile object to run triggers for. :param system: The system object to run triggers for. :param REMOTE_ADDR: The ip if the remote system/profile. :param rest: This parameter is unused. :return: The return code of the action. """ ip_address = REMOTE_ADDR if profile: return_code = self.remote.run_install_triggers( mode, "profile", profile, ip_address ) else: return_code = self.remote.run_install_triggers( mode, "system", system, ip_address ) return str(return_code)
[docs] def nopxe(self, system: Optional[str] = None, **rest: Any) -> str: """ Disables the network boot for the given system. :param system: The system to disable netboot for. :param rest: This parameter is unused. :return: A boolean status if the action succeed or not. """ return str(self.remote.disable_netboot(system))
[docs] def list(self, what: str = "systems", **rest: Any) -> str: """ Return a list of objects of a desired category. Defaults to "systems". :param what: May be "systems", "profiles", "distros", "images", "repos" or "menus" :param rest: This parameter is unused. :return: The list of object names. """ # mypy and xmlrpc don't play well together listing: List[Dict[str, Any]] if what == "systems": listing = self.remote.get_systems() # type: ignore elif what == "profiles": listing = self.remote.get_profiles() # type: ignore elif what == "distros": listing = self.remote.get_distros() # type: ignore elif what == "images": listing = self.remote.get_images() # type: ignore elif what == "repos": listing = self.remote.get_repos() # type: ignore elif what == "menus": listing = self.remote.get_menus() # type: ignore else: return "?" names = [x["name"] for x in listing] if len(names) > 0: return "\n".join(names) + "\n" return ""
[docs] def autodetect(self, **rest: Union[str, int, List[str]]) -> str: """ This tries to autodect the system with the given information. If more than one candidate is found an error message is returned. :param rest: The keys "REMOTE_MACS", "REMOTE_ADDR" or "interfaces". :return: The name of the possible object or an error message. """ # mypy and xmlrpc don't play well together systems: List[Dict[str, Any]] = self.remote.get_systems() # type: ignore # If kssendmac was in the kernel options line, see if a system can be found matching the MAC address. This is # more specific than an IP match. # We cannot be certain that this header is included, thus we can't add a type check (potential breaking change). mac_addresses: List[str] = rest["REMOTE_MACS"] # type: ignore macinput: List[str] = [] for mac in mac_addresses: macinput.extend(mac.lower().split(" ")) ip_address = rest["REMOTE_ADDR"] candidates: List[Dict[str, Any]] = [] for system in systems: for interface in system["interfaces"]: if system["interfaces"][interface]["mac_address"].lower() in macinput: candidates.append(system) if len(candidates) == 0: for system in systems: for interface in system["interfaces"]: if system["interfaces"][interface]["ip_address"] == ip_address: candidates.append(system) if len(candidates) == 0: return f"FAILED: no match ({ip_address},{macinput})" if len(candidates) > 1: return "FAILED: multiple matches" if len(candidates) == 1: return candidates[0]["name"] return "FAILED: Negative amount of matches!"
[docs] def find_autoinstall( self, system: Optional[str] = None, profile: Optional[str] = None, **rest: Union[str, int], ) -> str: """ Find an autoinstallation for a system or a profile. If this is not known different parameters can be passed to rest to find it automatically. See "autodetect". :param system: The system to find the autoinstallation for, :param profile: The profile to find the autoinstallation for. :param rest: The metadata to find the autoinstallation automatically. :return: The autoinstall script or error message. """ name = "?" if system is not None: url = f"{self.server}/cblr/svc/op/autoinstall/system/{name}" elif profile is not None: url = f"{self.server}/cblr/svc/op/autoinstall/profile/{name}" else: name = self.autodetect(**rest) if name.startswith("FAILED"): return f"# autodetection {name}" url = f"{self.server}/cblr/svc/op/autoinstall/system/{name}" try: return self.dlmgr.urlread(url).content.decode("UTF-8") except Exception: return f"# automatic installation file retrieval failed ({url})"
[docs] def findks( self, system: Optional[str] = None, profile: Optional[str] = None, **rest: Union[str, int], ) -> str: """ This is a legacy function which enabled Cobbler partly to be backward compatible to 2.6.6 releases. It should be only be used if you must. Please use find_autoinstall if possible! :param system: If you wish to find a system please set this parameter to not null. Hand over the name of it. :param profile: If you wish to find a system please set this parameter to not null. Hand over the name of it. :param rest: If you wish you can try to let Cobbler autodetect the system with the MAC address. :return: Returns the autoinstall/kickstart profile. """ name = "?" if system is not None: url = f"{self.server}/cblr/svc/op/ks/system/{name}" elif profile is not None: url = f"{self.server}/cblr/svc/op/ks/profile/{name}" else: name = self.autodetect(**rest) if name.startswith("FAILED"): return f"# autodetection {name}" url = f"{self.server}/cblr/svc/op/ks/system/{name}" try: return self.dlmgr.urlread(url).content.decode("UTF-8") except Exception: return f"# kickstart retrieval failed ({url})"
def __fillup_form_dict(form: Dict[Any, Any], my_uri: str) -> str: """ Helper function to fillup the form dict with required mode information. :param form: The form dict to manipulate :param my_uri: The URI to work with. :return: The normalized URI. """ my_uri = parse.unquote(my_uri) tokens = my_uri.split("/") tokens = tokens[1:] label = True field = "" for token in tokens: if label: field = token else: form[field] = token label = not label return my_uri def __generate_remote_mac_list(environ: Dict[str, Any]) -> List[Any]: # This MAC header is set by anaconda during a kickstart booted with the # kssendmac kernel option. The field will appear here as something # like: eth0 XX:XX:XX:XX:XX:XX mac_counter = 0 remote_macs: List[Any] = [] mac_header = f"HTTP_X_RHN_PROVISIONING_MAC_{mac_counter:d}" while environ.get(mac_header, None): remote_macs.append(environ[mac_header]) mac_counter = mac_counter + 1 mac_header = f"HTTP_X_RHN_PROVISIONING_MAC_{mac_counter:d}" return remote_macs
[docs]def application( environ: Dict[str, Any], start_response: Callable[[str, List[Any]], None] ) -> List[bytes]: """ UWSGI entrypoint for Gunicorn :param environ: :param start_response: :return: """ form: Dict[str, Any] = {} my_uri = __fillup_form_dict(form, environ["RAW_URI"]) form["query_string"] = parse.parse_qs(environ["QUERY_STRING"]) form["REMOTE_MACS"] = __generate_remote_mac_list(environ) # REMOTE_ADDR isn't a required wsgi attribute so it may be naive to assume it's always present in this context. form["REMOTE_ADDR"] = environ.get("REMOTE_ADDR", None) # Read config for the XMLRPC port to connect to: with open("/etc/cobbler/settings.yaml", encoding="UTF-8") as main_settingsfile: ydata = yaml.safe_load(main_settingsfile) # Instantiate a CobblerWeb object http_api = CobblerSvc(server=f'http://127.0.0.1:{ydata.get("xmlrpc_port", 25151)}') # Check for a valid path/mode; handle invalid paths gracefully mode = form.get("op", "index") # TODO: We could do proper exception handling here and return # Corresponding HTTP status codes: status = "200 OK" # Execute corresponding operation on the CobblerSvc object: func = getattr(http_api, mode) try: content = func(**form) if content.find("# *** ERROR ***") != -1: status = "500 SERVER ERROR" print("possible cheetah template error") # TODO: Not sure these strings are the right ones to look for... elif ( content.find("# profile not found") != -1 or content.find("# system not found") != -1 or content.find("# object not found") != -1 ): print(f"content not found: {my_uri}") status = "404 NOT FOUND" except xmlrpc.client.Fault as err: status = "500 SERVER ERROR" content = err.faultString content = content.encode("utf-8") response_headers = [ ("Content-type", "text/plain;charset=utf-8"), ("Content-Length", str(len(content))), ] start_response(status, response_headers) return [content]