Source code for cobbler.services

"""
Mod Python service functions for Cobbler's public interface
(aka cool stuff that works with wget/curl)

based on code copyright 2007 Albert P. Tobey <tobert@gmail.com>
additions: 2007-2009 Michael DeHaan <michael.dehaan AT gmail>

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301  USA
"""

import json
import time
import xmlrpc.client
import yaml

from cobbler import download_manager
from cobbler.cobbler_collections import manager
from cobbler.settings import Settings


[docs]class CobblerSvc: """ Interesting mod python functions are all keyed off the parameter mode, which defaults to index. All options are passed as parameters into the function. """ def __init__(self, server=None, req=None): """ Default constructor which sets up everything to be ready. :param server: The domain to run at. :param req: This parameter is unused. """ # ToDo: Remove req attribute. self.server = server self.remote = None self.req = req self.collection_mgr = manager.CollectionManager(self) self.dlmgr = download_manager.DownloadManager(self.collection_mgr) def __xmlrpc_setup(self): """ Sets up the connection to the Cobbler XMLRPC server. This is the version that does not require a login. """ if self.remote is None: self.remote = xmlrpc.client.Server(self.server, allow_none=True)
[docs] def settings(self) -> Settings: """ Get the application configuration. :return: Settings object. """ self.__xmlrpc_setup() return Settings().from_dict(self.remote.get_settings())
[docs] def index(self, **args) -> str: """ Just a placeholder method as an entry point. :param args: This parameter is unused. :return: "no mode specified" """ return "no mode specified"
[docs] def debug(self, profile=None, **rest): # The purpose of this method could change at any time and is intented for temporary test code only, don't rely # on it. self.__xmlrpc_setup() return self.remote.get_repos_compatible_with_profile(profile)
[docs] def autoinstall(self, profile=None, system=None, REMOTE_ADDR=None, REMOTE_MAC=None, **rest): """ Generate automatic installation files. :param profile: :param system: :param REMOTE_ADDR: :param REMOTE_MAC: :param rest: This parameter is unused. :return: """ self.__xmlrpc_setup() data = self.remote.generate_autoinstall(profile, system, REMOTE_ADDR, REMOTE_MAC) return "%s" % data
[docs] def ks(self, profile=None, system=None, REMOTE_ADDR=None, REMOTE_MAC=None, **rest): """ Generate automatic installation files. This is a legacy function for part backward compability to 2.6.6 releases. :param profile: :param system: :param REMOTE_ADDR: :param REMOTE_MAC: :param rest: This parameter is unused. :return: """ self.__xmlrpc_setup() data = self.remote.generate_autoinstall(profile, system, REMOTE_ADDR, REMOTE_MAC) return "%s" % data
[docs] def ipxe(self, profile=None, image=None, system=None, mac=None, **rest): """ Generates an iPXE configuration. :param profile: A profile. :param image: An image. :param system: A system. :param mac: A MAC address. :param rest: This parameter is unused. """ self.__xmlrpc_setup() if not system and mac: query = {"mac_address": mac} if profile: query["profile"] = profile elif image: query["image"] = image found = self.remote.find_system(query) if found: system = found[0] data = self.remote.generate_ipxe(profile, image, system) return "%s" % data
[docs] def bootcfg(self, profile=None, system=None, **rest): """ Generate a boot.cfg config file. Used primarily for VMware ESXi. :param profile: :param system: :param rest: This parameter is unused. :return: """ self.__xmlrpc_setup() data = self.remote.generate_bootcfg(profile, system) return "%s" % data
[docs] def script(self, profile=None, system=None, **rest) -> str: """ Generate a script based on snippets. Useful for post or late-action scripts where it's difficult to embed the script in the response file. :param profile: The profile to generate the script for. :param system: The system to generate the script for. :param rest: This may contain a parameter with the key "query_string" which has a key "script" which may be an array. The element from position zero is taken. :return: The generated script. """ self.__xmlrpc_setup() data = self.remote.generate_script(profile, system, rest['query_string']['script'][0]) return "%s" % data
[docs] def events(self, user="", **rest) -> str: """ If no user is given then all events are returned. Otherwise only event associated to a user are returned. :param user: Filter the events for a given user. :param rest: This parameter is unused. :return: A JSON object which contains all events. """ self.__xmlrpc_setup() if user == "": data = self.remote.get_events("") else: data = self.remote.get_events(user) # sort it... it looks like { timestamp : [ array of details ] } keylist = list(data.keys()) keylist.sort() results = [] for k in keylist: etime = int(data[k][0]) nowtime = time.time() if ((nowtime - etime) < 30): results.append([k, data[k][0], data[k][1], data[k][2]]) return json.dumps(results)
[docs] def template(self, profile=None, system=None, path=None, **rest) -> str: """ Generate a templated file for the system. Either specify a profile OR a system. :param profile: The profile to provide for the generation of the template. :param system: The system to provide for the generation of the template. :param path: The path to the template. :param rest: This parameter is unused. :return: The rendered template. """ self.__xmlrpc_setup() if path is not None: path = path.replace("_", "/") path = path.replace("//", "_") else: return "# must specify a template path" if profile is not None: data = self.remote.get_template_file_for_profile(profile, path) elif system is not None: data = self.remote.get_template_file_for_system(system, path) else: data = "# must specify profile or system name" return data
[docs] def yum(self, profile=None, system=None, **rest) -> str: """ Generate a repo config. Either specify a profile OR a system. :param profile: The profile to provide for the generation of the template. :param system: The system to provide for the generation of the template. :param rest: This parameter is unused. :return: The generated repository config. """ self.__xmlrpc_setup() if profile is not None: data = self.remote.get_repo_config_for_profile(profile) elif system is not None: data = self.remote.get_repo_config_for_system(system) else: data = "# must specify profile or system name" return data
[docs] def trig(self, mode: str = "?", profile=None, system=None, REMOTE_ADDR=None, **rest) -> str: """ Hook to call install triggers. Only valid for a profile OR a system. :param mode: Can be "pre", "post" or "firstboot". Everything else is invalid. :param profile: The profile object to run triggers for. :param system: The system object to run triggers for. :param REMOTE_ADDR: The ip if the remote system/profile. :param rest: This parameter is unused. :return: The return code of the action. """ self.__xmlrpc_setup() ip = REMOTE_ADDR if profile: rc = self.remote.run_install_triggers(mode, "profile", profile, ip) else: rc = self.remote.run_install_triggers(mode, "system", system, ip) return str(rc)
[docs] def nopxe(self, system=None, **rest) -> str: """ Disables the network boot for the given system. :param system: The system to disable netboot for. :param rest: This parameter is unused. :return: A boolean status if the action succeed or not. """ self.__xmlrpc_setup() return str(self.remote.disable_netboot(system))
[docs] def list(self, what="systems", **rest) -> str: """ Return a list of objects of a desired category. Defaults to "systems". :param what: May be "systems", "profiles", "distros", "images", "repos", "mgmtclasses", "packages", "files" or "menus" :param rest: This parameter is unused. :return: The list of object names. """ self.__xmlrpc_setup() buf = "" if what == "systems": listing = self.remote.get_systems() elif what == "profiles": listing = self.remote.get_profiles() elif what == "distros": listing = self.remote.get_distros() elif what == "images": listing = self.remote.get_images() elif what == "repos": listing = self.remote.get_repos() elif what == "mgmtclasses": listing = self.remote.get_mgmtclasses() elif what == "packages": listing = self.remote.get_packages() elif what == "files": listing = self.remote.get_files() elif what == "menus": listing = self.remote.get_menus() else: return "?" for x in listing: buf += "%s\n" % x["name"] return buf
[docs] def autodetect(self, **rest) -> str: """ This tries to autodect the system with the given information. If more than one candidate is found an error message is returned. :param rest: The keys "REMOTE_MACS", "REMOTE_ADDR" or "interfaces". :return: The name of the possible object or an error message. """ self.__xmlrpc_setup() systems = self.remote.get_systems() # If kssendmac was in the kernel options line, see if a system can be found matching the MAC address. This is # more specific than an IP match. macinput = [mac.split(' ').lower() for mac in rest["REMOTE_MACS"]] ip = rest["REMOTE_ADDR"] candidates = [] for x in systems: for y in x["interfaces"]: if x["interfaces"][y]["mac_address"].lower() in macinput: candidates.append(x) if len(candidates) == 0: for x in systems: for y in x["interfaces"]: if x["interfaces"][y]["ip_address"] == ip: candidates.append(x) if len(candidates) == 0: return "FAILED: no match (%s,%s)" % (ip, macinput) elif len(candidates) > 1: return "FAILED: multiple matches" elif len(candidates) == 1: return candidates[0]["name"]
[docs] def look(self, **rest) -> str: """ Debug only: Show the handed dict via repr to the requester. :param rest: The dict to represent. :return: The dict reformated with repr() """ return repr(rest)
[docs] def find_autoinstall(self, system=None, profile=None, **rest): """ Find an autoinstallation for a system or a profile. If this is not known different parameters can be passed to rest to find it automatically. See "autodetect". :param system: The system to find the autoinstallation for, :param profile: The profile to find the autoinstallation for. :param rest: The metadata to find the autoinstallation automatically. :return: The autoinstall script or error message. """ self.__xmlrpc_setup() serverseg = "http://%s" % self.collection_mgr._settings.server name = "?" if system is not None: url = "%s/cblr/svc/op/autoinstall/system/%s" % (serverseg, name) elif profile is not None: url = "%s/cblr/svc/op/autoinstall/profile/%s" % (serverseg, name) else: name = self.autodetect(**rest) if name.startswith("FAILED"): return "# autodetection %s" % name url = "%s/cblr/svc/op/autoinstall/system/%s" % (serverseg, name) try: return self.dlmgr.urlread(url) except: return "# automatic installation file retrieval failed (%s)" % url
[docs] def findks(self, system=None, profile=None, **rest): """ This is a legacy function which enabled Cobbler partly to be backward compatible to 2.6.6 releases. It should be only be used if you must. Please use find_autoinstall if possible! :param system: If you wish to find a system please set this parameter to not null. Hand over the name of it. :param profile: If you wish to find a system please set this parameter to not null. Hand over the name of it. :param rest: If you wish you can try to let Cobbler autodetect the system with the MAC address. :return: Returns the autoinstall/kickstart profile. """ self.__xmlrpc_setup() serverseg = "http://%s" % self.collection_mgr._settings.server name = "?" if system is not None: url = "%s/cblr/svc/op/ks/system/%s" % (serverseg, name) elif profile is not None: url = "%s/cblr/svc/op/ks/profile/%s" % (serverseg, name) else: name = self.autodetect(**rest) if name.startswith("FAILED"): return "# autodetection %s" % name url = "%s/cblr/svc/op/ks/system/%s" % (serverseg, name) try: return self.dlmgr.urlread(url) except: return "# kickstart retrieval failed (%s)" % url
[docs] def puppet(self, hostname=None, **rest) -> str: """ Dump the puppet data which is available for Cobbler. :param hostname: The hostname for the system which should the puppet data be dumped for. :param rest: This parameter is unused. :return: The yaml for the host. """ self.__xmlrpc_setup() if hostname is None: return "hostname is required" settings = self.remote.get_settings() results = self.remote.find_system_by_dns_name(hostname) classes = results.get("mgmt_classes", {}) params = results.get("mgmt_parameters", {}) environ = results.get("status", "") data = { "classes": classes, "parameters": params, "environment": environ, } if environ == "": data.pop("environment", None) if settings.get("puppet_parameterized_classes", False): for ckey in list(classes.keys()): tmp = {} class_name = classes[ckey].get("class_name", "") if class_name in (None, ""): class_name = ckey if classes[ckey].get("is_definition", False): def_tmp = {} def_name = classes[ckey]["params"].get("name", "") del classes[ckey]["params"]["name"] if def_name != "": for pkey in list(classes[ckey]["params"].keys()): def_tmp[pkey] = classes[ckey]["params"][pkey] tmp["instances"] = {def_name: def_tmp} else: # FIXME: log an error here? # skip silently... continue else: for pkey in list(classes[ckey]["params"].keys()): tmp[pkey] = classes[ckey]["params"][pkey] del classes[ckey] classes[class_name] = tmp else: classes = list(classes.keys()) return yaml.dump(data, default_flow_style=False)