"""
This module contains the code for the abstract base collection that powers all the other collections.
"""
# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2006-2009, Red Hat, Inc and Others
# SPDX-FileCopyrightText: Michael DeHaan <michael.dehaan AT gmail>
import logging
import os
import time
from abc import abstractmethod
from threading import Lock
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generic,
Iterator,
List,
Optional,
TypeVar,
Union,
)
from cobbler import enums, utils
from cobbler.cexceptions import CX
from cobbler.items import distro, image, menu, profile, repo, system
from cobbler.items.abstract.base_item import BaseItem
from cobbler.items.abstract.inheritable_item import InheritableItem
if TYPE_CHECKING:
from cobbler.actions.sync import CobblerSync
from cobbler.api import CobblerAPI
from cobbler.cobbler_collections.manager import CollectionManager
ITEM = TypeVar("ITEM", bound=BaseItem)
FIND_KWARGS = Union[ # pylint: disable=invalid-name
str, int, bool, Dict[Any, Any], List[Any]
]
[docs]class Collection(Generic[ITEM]):
"""
Base class for any serializable list of things.
"""
def __init__(self, collection_mgr: "CollectionManager"):
"""
Constructor.
:param collection_mgr: The collection manager to resolve all information with.
"""
self.collection_mgr = collection_mgr
self.listing: Dict[str, ITEM] = {}
self.api = self.collection_mgr.api
self.__lite_sync: Optional["CobblerSync"] = None
self.lock: Lock = Lock()
self._inmemory: bool = not self.api.settings().lazy_start
self._deserialize_running: bool = False
# Secondary indexes for the collection.
# Only unique indexes on string values are supported.
# Empty strings are not indexed.
self.indexes: Dict[str, Dict[str, str]] = {"uid": {}}
self.logger = logging.getLogger()
def __iter__(self) -> Iterator[ITEM]:
"""
Iterator for the collection. Allows list comprehensions, etc.
"""
for obj in list(self.listing.values()):
yield obj
def __len__(self) -> int:
"""
Returns size of the collection.
"""
return len(list(self.listing.values()))
@property
def lite_sync(self) -> "CobblerSync":
"""
Provide a ready to use CobblerSync object.
:getter: Return the object that can update the filesystem state to a new one.
"""
if self.__lite_sync is None:
self.__lite_sync = self.api.get_sync()
return self.__lite_sync
@property
def inmemory(self) -> bool:
r"""
If set to ``true``, then all items of the collection are loaded into memory.
:getter: The inmemory for the collection.
:setter: The new inmemory value for the collection.
"""
return self._inmemory
@inmemory.setter
def inmemory(self, inmemory: bool):
"""
Setter for the inmemory of the collection.
:param inmemory: The new inmemory value.
"""
self._inmemory = inmemory
@property
def deserialize_running(self) -> bool:
r"""
If set to ``true``, then the collection items are currently being loaded from disk.
:getter: The deserialize_running for the collection.
:setter: The new deserialize_running value for the collection.
"""
return self._deserialize_running
@deserialize_running.setter
def deserialize_running(self, deserialize_running: bool):
"""
Setter for the deserialize_running of the collection.
:param deserialize_running: The new deserialize_running value.
"""
self._deserialize_running = deserialize_running
[docs] @abstractmethod
def factory_produce(self, api: "CobblerAPI", seed_data: Dict[str, Any]) -> ITEM:
"""
Must override in subclass. Factory_produce returns an Item object from dict.
:param api: The API to resolve all information with.
:param seed_data: Unused Parameter in the base collection.
"""
[docs] @abstractmethod
def remove(
self,
name: str,
with_delete: bool = True,
with_sync: bool = True,
with_triggers: bool = True,
recursive: bool = False,
) -> None:
"""
Remove an item from collection. This method must be overridden in any subclass.
:param name: Item Name
:param with_delete: sync and run triggers
:param with_sync: sync to server file system
:param with_triggers: run "on delete" triggers
:param recursive: recursively delete children
:returns: NotImplementedError
"""
[docs] def get(self, name: str) -> Optional[ITEM]:
"""
Return object with name in the collection
:param name: The name of the object to retrieve from the collection.
:return: The object if it exists. Otherwise, "None".
"""
return self.listing.get(name, None)
[docs] def get_names(self) -> List[str]:
"""
Return list of names in the collection.
:return: list of names in the collection.
"""
return list(self.listing)
[docs] def find(
self,
name: str = "",
return_list: bool = False,
no_errors: bool = False,
**kargs: FIND_KWARGS,
) -> Optional[Union[List[ITEM], ITEM]]:
"""
Return first object in the collection that matches all item='value' pairs passed, else return None if no objects
can be found. When return_list is set, can also return a list. Empty list would be returned instead of None in
that case.
:param name: The object name which should be found.
:param return_list: If a list should be returned or the first match.
:param no_errors: If errors which are possibly thrown while searching should be ignored or not.
:param kargs: If name is present, this is optional, otherwise this dict needs to have at least a key with
``name``. You may specify more keys to finetune the search.
:return: The first item or a list with all matches.
:raises ValueError: In case no arguments for searching were specified.
"""
matches: List[ITEM] = []
if name:
kargs["name"] = name
kargs = self.__rekey(kargs)
# no arguments is an error, so we don't return a false match
if len(kargs) == 0:
raise ValueError("calling find with no arguments")
# performance: if the only key is name we can skip the whole loop
if len(kargs) == 1 and "name" in kargs and not return_list:
try:
return self.listing.get(kargs["name"], None) # type: ignore
except Exception:
return self.listing.get(name, None)
if self.api.settings().lazy_start:
# Forced deserialization of the entire collection to prevent deadlock in the search loop
self._deserialize()
with self.lock:
result = self.find_by_indexes(kargs)
if result is not None:
matches = result
if len(kargs) > 0:
for obj in self:
if obj.inmemory and obj.find_match(kargs, no_errors=no_errors):
matches.append(obj)
if not return_list:
if len(matches) == 0:
return None
return matches[0]
return matches
SEARCH_REKEY = {
"kopts": "kernel_options",
"kopts_post": "kernel_options_post",
"inherit": "parent",
"ip": "ip_address",
"mac": "mac_address",
"virt-auto-boot": "virt_auto_boot",
"virt-file-size": "virt_file_size",
"virt-disk-driver": "virt_disk_driver",
"virt-ram": "virt_ram",
"virt-path": "virt_path",
"virt-type": "virt_type",
"virt-bridge": "virt_bridge",
"virt-cpus": "virt_cpus",
"virt-host": "virt_host",
"virt-group": "virt_group",
"dhcp-tag": "dhcp_tag",
"netboot-enabled": "netboot_enabled",
"enable_gpxe": "enable_ipxe",
"boot_loader": "boot_loaders",
}
def __rekey(self, _dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Find calls from the command line ("cobbler system find") don't always match with the keys from the datastructs
and this makes them both line up without breaking compatibility with either. Thankfully we don't have a LOT to
remap.
:param _dict: The dict which should be remapped.
:return: The dict which can now be understood by the cli.
"""
new_dict: Dict[str, Any] = {}
for key in _dict.keys():
if key in self.SEARCH_REKEY:
newkey = self.SEARCH_REKEY[key]
new_dict[newkey] = _dict[key]
else:
new_dict[key] = _dict[key]
return new_dict
[docs] def to_list(self) -> List[Dict[str, Any]]:
"""
Serialize the collection
:return: All elements of the collection as a list.
"""
return [item_obj.to_dict() for item_obj in list(self.listing.values())]
[docs] def from_list(self, _list: List[Dict[str, Any]]) -> None:
"""
Create all collection object items from ``_list``.
:param _list: The list with all item dictionaries.
"""
if _list is None: # type: ignore
return
for item_dict in _list:
try:
item = self.factory_produce(self.api, item_dict)
self.add(item)
except Exception as exc:
self.logger.error(
"Error while loading a collection: %s. Skipping collection %s!",
exc,
self.collection_type(),
)
[docs] def copy(self, ref: ITEM, newname: str):
"""
Copy an object with a new name into the same collection.
:param ref: The reference to the object which should be copied.
:param newname: The new name for the copied object.
"""
copied_item: ITEM = ref.make_clone()
copied_item.ctime = time.time()
copied_item.name = newname
self.add(
copied_item,
save=True,
with_copy=True,
with_triggers=True,
with_sync=True,
check_for_duplicate_names=True,
)
[docs] def rename(
self,
ref: "ITEM",
newname: str,
with_sync: bool = True,
with_triggers: bool = True,
):
"""
Allows an object "ref" to be given a new name without affecting the rest of the object tree.
:param ref: The reference to the object which should be renamed.
:param newname: The new name for the object.
:param with_sync: If a sync should be triggered when the object is renamed.
:param with_triggers: If triggers should be run when the object is renamed.
"""
# Nothing to do when it is the same name
if newname == ref.name:
return
# Save the old name
oldname = ref.name
with self.lock:
# Delete the old item
self.collection_mgr.serialize_delete_one_item(ref)
self.remove_from_indexes(ref)
self.listing.pop(oldname)
# Change the name of the object
ref.name = newname
# Save just this item
self.collection_mgr.serialize_one_item(ref)
self.listing[newname] = ref
self.add_to_indexes(ref)
for dep_type in InheritableItem.TYPE_DEPENDENCIES[ref.COLLECTION_TYPE]:
items = self.api.find_items(
dep_type[0], {dep_type[1]: oldname}, return_list=True
)
if items is None:
continue
if not isinstance(items, list):
raise ValueError("Unexepcted return value from find_items!")
for item in items:
attr = getattr(item, "_" + dep_type[1])
if isinstance(attr, (str, BaseItem)):
setattr(item, dep_type[1], newname)
elif isinstance(attr, list):
for i, attr_val in enumerate(attr): # type: ignore
if attr_val == oldname:
attr[i] = newname
else:
raise CX(
f'Internal error, unknown attribute type {type(attr)} for "{item.name}"!'
)
self.api.get_items(item.COLLECTION_TYPE).add(
item, # type: ignore
save=True,
with_sync=with_sync,
with_triggers=with_triggers,
)
# for a repo, rename the mirror directory
if isinstance(ref, repo.Repo):
# if ref.COLLECTION_TYPE == "repo":
path = os.path.join(self.api.settings().webdir, "repo_mirror")
old_path = os.path.join(path, oldname)
if os.path.exists(old_path):
new_path = os.path.join(path, ref.name)
os.renames(old_path, new_path)
# for a distro, rename the mirror and references to it
if isinstance(ref, distro.Distro):
# if ref.COLLECTION_TYPE == "distro":
path = ref.find_distro_path() # type: ignore
# create a symlink for the new distro name
ref.link_distro() # type: ignore
# Test to see if the distro path is based directly on the name of the distro. If it is, things need to
# updated accordingly.
if os.path.exists(path) and path == str(
os.path.join(self.api.settings().webdir, "distro_mirror", ref.name)
):
newpath = os.path.join(
self.api.settings().webdir, "distro_mirror", ref.name
)
os.renames(path, newpath)
# update any reference to this path ...
distros = self.api.distros()
for distro_obj in distros:
if distro_obj.kernel.find(path) == 0:
distro_obj.kernel = distro_obj.kernel.replace(path, newpath)
distro_obj.initrd = distro_obj.initrd.replace(path, newpath)
self.collection_mgr.serialize_one_item(distro_obj)
[docs] def add(
self,
ref: ITEM,
save: bool = False,
with_copy: bool = False,
with_triggers: bool = True,
with_sync: bool = True,
quick_pxe_update: bool = False,
check_for_duplicate_names: bool = False,
) -> None:
"""
Add an object to the collection
:param ref: The reference to the object.
:param save: If this is true then the objet is persisted on the disk.
:param with_copy: Is a bit of a misnomer, but lots of internal add operations can run with "with_copy" as False.
True means a real final commit, as if entered from the command line (or basically, by a user).
With with_copy as False, the particular add call might just be being run during
deserialization, in which case extra semantics around the add don't really apply. So, in that
case, don't run any triggers and don't deal with any actual files.
:param with_sync: If a sync should be triggered when the object is renamed.
:param with_triggers: If triggers should be run when the object is added.
:param quick_pxe_update: This decides if there should be run a quick or full update after the add was done.
:param check_for_duplicate_names: If the name of an object should be unique or not.
:raises TypError: Raised in case ``ref`` is None.
:raises ValueError: Raised in case the name of ``ref`` is empty.
"""
if ref is None: # type: ignore
raise TypeError("Unable to add a None object")
ref.check_if_valid()
if save:
now = float(time.time())
if ref.ctime == 0.0:
ref.ctime = now
ref.mtime = now
# migration path for old API parameter that I've renamed.
if with_copy and not save:
save = with_copy
if not save:
# For people that aren't quite aware of the API if not saving the object, you can't run these features.
with_triggers = False
with_sync = False
# Avoid adding objects to the collection with the same name
if check_for_duplicate_names:
for item_obj in self.listing.values():
if item_obj.name == ref.name:
raise CX(
f'An object with that name "{ref.name}" exists already. Try "edit"?'
)
if ref.COLLECTION_TYPE != self.collection_type():
raise TypeError("API error: storing wrong data type in collection")
# failure of a pre trigger will prevent the object from being added
if save and with_triggers:
utils.run_triggers(
self.api,
ref,
f"/var/lib/cobbler/triggers/add/{self.collection_type()}/pre/*",
)
with self.lock:
self.listing[ref.name] = ref
self.add_to_indexes(ref)
# perform filesystem operations
if save:
# Save just this item if possible, if not, save the whole collection
self.collection_mgr.serialize_one_item(ref)
if with_sync:
if isinstance(ref, system.System):
# we don't need openvz containers to be network bootable
if ref.virt_type == enums.VirtType.OPENVZ:
ref.netboot_enabled = False
self.lite_sync.add_single_system(ref)
elif isinstance(ref, profile.Profile):
# we don't need openvz containers to be network bootable
if ref.virt_type == "openvz": # type: ignore
ref.enable_menu = False
self.lite_sync.add_single_profile(ref)
self.api.sync_systems(
systems=self.find(
"system",
return_list=True,
no_errors=False,
**{"profile": ref.name},
) # type: ignore
)
elif isinstance(ref, distro.Distro):
self.lite_sync.add_single_distro(ref)
elif isinstance(ref, image.Image):
self.lite_sync.add_single_image(ref)
elif isinstance(ref, repo.Repo):
pass
elif isinstance(ref, menu.Menu):
pass
else:
self.logger.error(
"Internal error. Object type not recognized: %s", type(ref)
)
if not with_sync and quick_pxe_update:
if isinstance(ref, system.System):
self.lite_sync.update_system_netboot_status(ref.name)
# save the tree, so if neccessary, scripts can examine it.
if with_triggers:
utils.run_triggers(
self.api, ref, "/var/lib/cobbler/triggers/change/*", []
)
utils.run_triggers(
self.api,
ref,
f"/var/lib/cobbler/triggers/add/{self.collection_type()}/post/*",
[],
)
def _deserialize(self) -> None:
"""
Loading all collection items from disk in case of lazy start.
"""
if self.inmemory or self.deserialize_running:
# Preventing infinite recursion if a collection search is required when loading item properties.
# Also prevents unnecessary looping through the collection if all items are already in memory.
return
self.deserialize_running = True
for obj_name in self.get_names():
obj = self.get(obj_name)
if obj is not None and not obj.inmemory:
obj.deserialize()
self.inmemory = True
self.deserialize_running = False
[docs] def add_to_indexes(self, ref: ITEM) -> None:
"""
Add indexes for the object.
:param ref: The reference to the object whose indexes are updated.
"""
indx_dict = self.indexes["uid"]
indx_dict[ref.uid] = ref.name
[docs] def remove_from_indexes(self, ref: ITEM) -> None:
"""
Remove index keys for the object.
:param ref: The reference to the object whose index keys are removed.
"""
indx_dict = self.indexes["uid"]
indx_dict.pop(ref.uid, None)
[docs] def find_by_indexes(self, kargs: Dict[str, Any]) -> Optional[List[ITEM]]:
"""
Searching for items in the collection by indexes.
:param kwargs: The dict to match for the items.
"""
result: Optional[List[ITEM]] = None
found_keys: List[str] = []
for key, value in kargs.items():
# fnmatch and "~" are not supported
if (
key not in self.indexes
or value[:1] == "~"
or "?" in value
or "*" in value
or "[" in value
):
continue
indx_dict = self.indexes[key]
if value in indx_dict:
if result is None:
result = []
obj = self.listing.get(indx_dict[value])
if obj is not None:
result.append(obj)
else:
self.logger.error(
'Internal error. The "%s" index is corrupted.', key
)
found_keys.append(key)
for key in found_keys:
kargs.pop(key)
return result
[docs] @staticmethod
@abstractmethod
def collection_type() -> str:
"""
Returns the string key for the name of the collection (used by serializer etc)
"""
[docs] @staticmethod
@abstractmethod
def collection_types() -> str:
"""
Returns the string key for the plural name of the collection (used by serializer)
"""