"""
Cobbler's SQLite database based object serializer.
"""
# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2024 Yuriy Chelpanov <yuriy.chelpanov@gmail.com>
import json
import logging
import os
import sqlite3
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from cobbler import settings
from cobbler.cexceptions import CX
from cobbler.modules.serializers import StorageBase
if TYPE_CHECKING:
from cobbler.api import CobblerAPI
from cobbler.cobbler_collections.collection import ITEM, Collection
[docs]def register() -> str:
"""
The mandatory Cobbler module registration hook.
"""
return "serializer"
[docs]def what() -> str:
"""
Module identification function
"""
return "serializer/sqlite"
[docs]class SQLiteSerializer(StorageBase):
"""
Each collection is stored in a separate table named distros, profiles, etc.
Tables are created on demand, when the first object of this type is written.
TABLE name // name from collection.collection_types()
(
name TEXT PRIMARY KEY, // name from item.name
item TEXT // JSON representation of an object
)
"""
def __init__(self, api: "CobblerAPI"):
super().__init__(api)
self.logger = logging.getLogger()
self.connection: Optional[sqlite3.Connection] = None
self.arraysize = 1000
self.database_file = "/var/lib/cobbler/collections/collections.db"
def __connect(self) -> None:
"""
Connect to the sqlite.
"""
if self.connection is not None:
return
is_new_database = not os.path.isfile(self.database_file)
conn = sqlite3.connect(":memory:")
threadsafety_option = conn.execute(
"""
select * from pragma_compile_options
where compile_options like 'THREADSAFE=%'
"""
).fetchone()[0]
conn.close()
threadsafety = int(threadsafety_option.split("=")[1])
if threadsafety != 1:
raise CX(
f"You cannot use SQLite compiled with SQLITE_THREADSAFE={threadsafety} with Cobbler.\n"
"Please compile the code with the option SQLITE_THREADSAFE=1"
)
try:
self.connection = sqlite3.connect(
self.database_file,
detect_types=sqlite3.PARSE_DECLTYPES,
check_same_thread=False,
)
except sqlite3.DatabaseError as error:
raise CX(
f'Unable to connect to SQLite database "{self.database_file}": {error}'
) from error
if is_new_database:
self.logger.info(
'Database with name "{%s}" was not found and will be created.',
self.database_file,
)
def __create_table(self, table_name: str) -> None:
"""
Creates a new SQLite table.
:param table_name: The table name.
"""
try:
self.connection.execute(f"CREATE TABLE {table_name}(name text primary key, item text)") # type: ignore
except sqlite3.DatabaseError as error:
raise CX(f'Unable to create table "{table_name}": {error}') from error
def __is_table_exists(self, table_name: str) -> bool:
"""
Return True if the table exists.
:param table_name: The table name.
:return: True if the table exists. Otherwise false.
"""
cursor = self.connection.execute( # type: ignore
"SELECT name FROM sqlite_master WHERE name=:name", {"name": table_name}
)
if cursor.fetchone() is None:
return False
return True
def __upsert_items(
self, table_name: str, bind_vars: List[Optional[Dict[str, str]]]
) -> None:
"""
Insert/Update values into the table.
:param table_name: The table name.
:param bind_vars: The list of bind variables for SQL statement.
"""
if len(bind_vars) == 0:
return
self.__connect()
if not self.__is_table_exists(table_name):
self.__create_table(table_name)
try:
self.connection.executemany( # type: ignore
f"INSERT INTO {table_name}(name, item) " # nosec
"VALUES(:name, :item) "
"ON CONFLICT(name) DO UPDATE SET item=excluded.item",
bind_vars, # type: ignore
)
self.connection.commit() # type: ignore
except sqlite3.DatabaseError as error:
raise CX(f'Unable to upsert into table "{table_name}": {error}') from error
def __build_bind_vars(self, item: "ITEM") -> Dict[str, str]:
"""
Build the bind variables for Insert/Update.
:param item: The object for Insert/Update.
:return: The bind variables dict.
"""
if self.api.settings().serializer_pretty_json:
sort_keys = True
indent = 4
else:
sort_keys = False
indent = None
_dict = item.serialize()
data = json.dumps(_dict, sort_keys=sort_keys, indent=indent)
return {"name": item.name, "item": data}
[docs] def serialize_item(self, collection: "Collection[ITEM]", item: "ITEM") -> None:
"""
Save a collection item to table
:param collection: The Cobbler collection to know the type of the item.
:param item: The collection item to serialize.
"""
if not item.name:
raise CX("name unset for item!")
self.__connect()
self.__upsert_items(
collection.collection_types(), [self.__build_bind_vars(item)]
)
[docs] def serialize(self, collection: "Collection[ITEM]") -> None:
"""
Save a collection to disk
:param collection: The collection to serialize.
"""
self.__connect()
ctype = collection.collection_types()
if ctype != "settings":
bind_vars: List[Optional[Dict[str, str]]] = []
for item in collection:
bind_vars.append(self.__build_bind_vars(item))
self.__upsert_items(ctype, bind_vars)
[docs] def serialize_delete(self, collection: "Collection[ITEM]", item: "ITEM") -> None:
"""
Delete a collection item from table
:param collection: The Cobbler collection to know the type of the item.
:param item: The collection item to delete.
"""
self.__connect()
table_name = collection.collection_types()
try:
self.connection.execute( # type: ignore
f"DELETE FROM {table_name} WHERE name=:name", # nosec
{"name": item.name},
)
self.connection.commit() # type: ignore
except sqlite3.DatabaseError as error:
raise CX(
f'Unable to delete from table "{table_name}": {error}' # nosec
) from error
[docs] def deserialize_raw(
self, collection_type: str
) -> Union[List[Optional[Dict[str, Any]]], Dict[str, Any]]:
"""
Read the collection from the table or read the settings file.
:param collection_type: The collection type to read.
:return: The list of collection dicts or settings dict.
"""
if collection_type == "settings":
return settings.read_settings_file()
self.__connect()
results: List[Optional[Dict[str, Any]]] = []
if not self.__is_table_exists(collection_type):
return results
projection = "item"
lazy_start = self.api.settings().lazy_start
if lazy_start:
projection = "name"
try:
cursor = self.connection.execute( # type: ignore
f"SELECT {projection} FROM {collection_type}" # nosec
)
except sqlite3.DatabaseError as error:
raise CX(
f'Unable to SELECT from table "{collection_type}": {error}' # nosec
) from error
cursor.arraysize = self.arraysize
for result in cursor.fetchall():
if lazy_start:
_dict = {"name": result[0]}
else:
_dict = json.loads(result[0])
_dict["inmemory"] = not lazy_start
results.append(_dict)
return results
[docs] def deserialize(
self, collection: "Collection[ITEM]", topological: bool = True
) -> None:
"""
Load a collection from disk.
:param collection: The Cobbler collection to know the type of the item.
:param topological: Sort collection based on each items' depth attribute in the list of collection items. This
ensures properly ordered object loading from disk with objects having parent/child
relationships, i.e. profiles/subprofiles. See cobbler/items/item.py
"""
self.__connect()
datastruct = self.deserialize_raw(collection.collection_types())
if topological and isinstance(datastruct, list): # type: ignore
datastruct.sort(key=lambda x: x.get("depth", 1)) # type: ignore
if isinstance(datastruct, dict):
# This is currently the corner case for the settings type.
collection.from_dict(datastruct) # type: ignore
elif isinstance(datastruct, list): # type: ignore
collection.from_list(datastruct) # type: ignore
[docs] def deserialize_item(self, collection_type: str, name: str) -> Dict[str, Any]:
"""
Get a collection item from disk and parse it into an object.
:param collection_type: The collection type to deserialize.
:param item_name: The collection item name to deserialize.
:return: Dictionary of the collection item.
"""
self.__connect()
if not self.__is_table_exists(collection_type):
raise CX(
f"Item {name} of collection {collection_type} was not found in SQLite database {self.database_file}!"
)
try:
cursor = self.connection.execute( # type: ignore
f"SELECT item from {collection_type} WHERE name=:name", # nosec
{"name": name},
)
except sqlite3.DatabaseError as error:
raise CX(
f'Unable to SELECT from table "{collection_type}": {error}' # nosec
) from error
result = cursor.fetchone()
if result is None:
raise CX(
f"Item {name} of collection {collection_type} was not found in SQLite database {self.database_file}!"
)
_dict = json.loads(result[0])
if _dict["name"] != name:
raise CX(
f"The file name {name} does not match the {_dict['name']} {collection_type}!"
)
_dict["inmemory"] = True
return _dict
[docs]def storage_factory(api: "CobblerAPI") -> SQLiteSerializer:
"""
TODO
"""
return SQLiteSerializer(api)