"""
This module is responsible for managing the custom common threading logic Cobbler has.
"""
import logging
import pathlib
from threading import Thread
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
from cobbler import enums, utils
if TYPE_CHECKING:
from cobbler.api import CobblerAPI
from cobbler.remote import CobblerXMLRPCInterface
[docs]class CobblerThread(Thread):
"""
This is a custom thread that has a custom logger as well as logic to execute Cobbler triggers.
"""
def __init__(
self,
event_id: str,
remote: "CobblerXMLRPCInterface",
options: Dict[str, Any],
task_name: str,
api: "CobblerAPI",
run: Callable[["CobblerThread"], None],
on_done: Optional[Callable[["CobblerThread"], None]] = None,
):
"""
This constructor creates a Cobbler thread which then may be run by calling ``run()``.
:param event_id: The event-id which is associated with this thread. Also used as thread name
:param remote: The Cobbler remote object to execute actions with.
:param options: Additional options which can be passed into the Thread.
:param task_name: The high level task name which is used to trigger pre- and post-task triggers
:param api: The Cobbler api object to resolve information with.
:param run: The callable that is going to be executed with this thread.
:param on_done: An optional callable that is going to be executed after ``run`` but before the triggers.
"""
super().__init__(name=event_id)
self.event_id = event_id
self.remote = remote
self.logger = logging.getLogger()
self.__task_log_handler: Optional[logging.FileHandler] = None
self.__setup_logger()
self._run = run
self.on_done = on_done
self.options = options
self.task_name = task_name
self.api = api
def __setup_logger(self):
"""
Utility function that will set up the Python logger for the tasks in a special directory.
"""
filename = pathlib.Path("/var/log/cobbler/tasks") / f"{self.event_id}.log"
self.__task_log_handler = logging.FileHandler(str(filename), encoding="utf-8")
task_log_formatter = logging.Formatter(
"[%(threadName)s] %(asctime)s - %(levelname)s | %(message)s"
)
self.__task_log_handler.setFormatter(task_log_formatter)
self.logger.setLevel(logging.INFO)
self.logger.addHandler(self.__task_log_handler)
def _set_task_state(self, new_state: enums.EventStatus):
"""
Set the state of the task. (For internal use only)
:param new_state: The new state of the task.
"""
if not isinstance(new_state, enums.EventStatus): # type: ignore
raise TypeError('"new_state" needs to be of type enums.EventStatus!')
if self.event_id not in self.remote.events:
raise ValueError('"event_id" not existing!')
self.remote.events[self.event_id].state = new_state
# clear the list of who has read it
self.remote.events[self.event_id].read_by_who = []
if new_state == enums.EventStatus.COMPLETE:
self.logger.info("### TASK COMPLETE ###")
elif new_state == enums.EventStatus.FAILED:
self.logger.error("### TASK FAILED ###")
[docs] def run(self) -> None:
"""
Run the thread.
:return: The return code of the action. This may a boolean or a Linux return code.
"""
self.logger.info("start_task(%s); event_id(%s)", self.task_name, self.event_id)
lock = "load_items_lock" in self.options and self.task_name != "load_items"
if lock:
# Shared lock to suspend execution of _background_load_items
self.options["load_items_lock"].acquire(blocking=False)
try:
if utils.run_triggers(
api=self.api,
globber=f"/var/lib/cobbler/triggers/task/{self.task_name}/pre/*",
additional=self.options if isinstance(self.options, list) else [],
):
self._set_task_state(enums.EventStatus.FAILED)
return
return_code = self._run(self)
if return_code is not None and not return_code:
self._set_task_state(enums.EventStatus.FAILED)
else:
self._set_task_state(enums.EventStatus.COMPLETE)
if self.on_done is not None:
self.on_done(self)
utils.run_triggers(
api=self.api,
globber=f"/var/lib/cobbler/triggers/task/{self.task_name}/post/*",
additional=self.options if isinstance(self.options, list) else [],
)
return return_code
except Exception:
utils.log_exc()
self._set_task_state(enums.EventStatus.FAILED)
return
finally:
if lock:
self.options["load_items_lock"].release()
if self.__task_log_handler is not None:
self.logger.removeHandler(self.__task_log_handler)