"""
Cobbler's Mongo database based object serializer.
"""
# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2006-2009, Red Hat, Inc and Others
# SPDX-FileCopyrightText: Michael DeHaan <michael.dehaan AT gmail>
# SPDX-FileCopyrightText: James Cammarata <jimi@sngx.net>
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional
from cobbler.cexceptions import CX
from cobbler.modules.serializers import StorageBase
if TYPE_CHECKING:
from pymongo.database import Database
from cobbler.api import CobblerAPI
from cobbler.cobbler_collections.collection import ITEM, Collection
try:
# pylint: disable-next=ungrouped-imports
from pymongo.errors import ConfigurationError, ConnectionFailure, OperationFailure
from pymongo.mongo_client import MongoClient
PYMONGO_LOADED = True
except ModuleNotFoundError:
# pylint: disable=invalid-name
ConfigurationError = None
ConnectionFailure = None
OperationFailure = None
# This is a constant! pyright just doesn't understand it.
PYMONGO_LOADED = False # type: ignore
[docs]def register() -> str:
"""
The mandatory Cobbler module registration hook.
"""
# FIXME: only run this if enabled.
if not PYMONGO_LOADED:
return ""
return "serializer"
[docs]def what() -> str:
"""
Module identification function
"""
return "serializer/mongodb"
[docs]class MongoDBSerializer(StorageBase):
"""
TODO
"""
def __init__(self, api: "CobblerAPI"):
super().__init__(api)
self.logger = logging.getLogger()
self.mongodb: Optional["MongoClient[Mapping[str, Any]]"] = None
self.mongodb_database: Optional["Database[Mapping[str, Any]]"] = None
self.database_name = "cobbler"
self.__connect()
def __connect(self) -> None:
"""
Reads the config file for mongodb and then connects to the mongodb.
"""
if ConnectionFailure is None or ConfigurationError is None:
raise ImportError("MongoDB is not correctly imported!")
host = self.api.settings().mongodb.get("host", "localhost")
port = self.api.settings().mongodb.get("port", 27017)
# TODO: Make database name configurable in settings
# TODO: Make authentication configurable
self.mongodb = MongoClient(host, port) # type: ignore
try:
# The ismaster command is cheap and doesn't require auth.
self.mongodb.admin.command("ping") # type: ignore
except ConnectionFailure as error:
raise CX("Unable to connect to Mongo database.") from error
except ConfigurationError as error:
raise CX(
"The configuration of the MongoDB connection isn't correct, please check the Cobbler settings."
) from error
if self.database_name not in self.mongodb.list_database_names(): # type: ignore
self.logger.info(
'Database with name "%s" was not found and will be created.',
self.database_name,
)
self.mongodb_database = self.mongodb["cobbler"] # type: ignore
def _rename_collection(self, old_collection: str, new_collection: str) -> None:
"""
Rename a collection in database.
:param old_collection: Previous collection name.
:param old_collection: New collection name.
"""
if OperationFailure is None:
raise ImportError("MongoDB not correctly imported!")
if (
old_collection != "setting"
and old_collection in self.mongodb_database.list_collection_names() # type: ignore
):
try:
self.mongodb_database[old_collection].rename(new_collection) # type: ignore
except OperationFailure as error:
raise CX(
f'Cannot rename MongoDB collection from "{old_collection}" to "{new_collection}": {error}.'
) from error
[docs] def serialize_item(self, collection: "Collection[ITEM]", item: "ITEM") -> None:
if self.mongodb_database is None:
raise ValueError("Database not available!")
mongodb_collection = self.mongodb_database[collection.collection_types()]
data = mongodb_collection.find_one({"name": item.name})
if data:
mongodb_collection.replace_one({"name": item.name}, item.serialize()) # type: ignore
else:
mongodb_collection.insert_one(item.serialize()) # type: ignore
[docs] def serialize_delete(self, collection: "Collection[ITEM]", item: "ITEM") -> None:
if self.mongodb_database is None:
raise ValueError("Database not available!")
mongodb_collection = self.mongodb_database[collection.collection_types()]
mongodb_collection.delete_one({"name": item.name}) # type: ignore
[docs] def serialize(self, collection: "Collection[ITEM]") -> None:
# TODO: error detection
ctype = collection.collection_types()
if ctype != "settings":
for item in collection:
self.serialize_item(collection, item)
[docs] def deserialize_raw(self, collection_type: str) -> List[Dict[str, Any]]:
if self.mongodb_database is None:
raise ValueError("Database not available!")
results = []
projection = None
collection = self.mongodb_database[collection_type]
lazy_start = self.api.settings().lazy_start
if lazy_start:
projection = ["name"]
# pymongo.cursor.Cursor
cursor = collection.find(projection=projection)
for result in cursor:
self._remove_id(result)
result["inmemory"] = not lazy_start # type: ignore
results.append(result) # type: ignore
return results # type: ignore
[docs] def deserialize(
self, collection: "Collection[ITEM]", topological: bool = True
) -> None:
self._rename_collection(
collection.collection_type(), collection.collection_types()
)
datastruct = self.deserialize_raw(collection.collection_types())
if topological and isinstance(datastruct, list): # type: ignore
datastruct.sort(key=lambda x: x.get("depth", 1)) # type: ignore
if isinstance(datastruct, dict):
# This is currently the corner case for the settings type.
collection.from_dict(datastruct) # type: ignore
elif isinstance(datastruct, list): # type: ignore
collection.from_list(datastruct) # type: ignore
[docs] def deserialize_item(self, collection_type: str, name: str) -> Dict[str, Any]:
"""
Get a collection item from database.
:param collection_type: The collection type to fetch.
:param name: collection Item name
:return: Dictionary of the collection item.
"""
if self.mongodb_database is None:
raise ValueError("Database not available!")
mongodb_collection = self.mongodb_database[collection_type]
result = mongodb_collection.find_one({"name": name})
if result is None:
raise CX(
f"Item {name} of collection {collection_type} was not found in MongoDB database {self.database_name}!"
)
self._remove_id(result)
result["inmemory"] = True # type: ignore
return result # type: ignore
@staticmethod
def _remove_id(_dict: Mapping[str, Any]):
if "_id" in _dict:
_dict.pop("_id") # type: ignore
[docs]def storage_factory(api: "CobblerAPI") -> MongoDBSerializer:
"""
TODO
"""
return MongoDBSerializer(api)