"""
Report from a Cobbler master.
FIXME: reinstante functionality for 2.0
"""
# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2007-2009, Red Hat, Inc and Others
# SPDX-FileCopyrightText: Anderson Silva <ansilva@redhat.com>
# SPDX-FileCopyrightText: Michael DeHaan <michael.dehaan AT gmail>
import re
from typing import TYPE_CHECKING, Any, Dict, List
from cobbler import utils
if TYPE_CHECKING:
from cobbler.api import CobblerAPI
from cobbler.cobbler_collections.collection import ITEM, Collection
[docs]class Report:
"""
TODO
"""
def __init__(self, api: "CobblerAPI"):
"""
Constructor
:param api: The API to hold all information in Cobbler available.
"""
self.settings = api.settings()
self.api = api
self.report_type = None
self.report_what = None
self.report_name = None
self.report_fields = None
self.report_noheaders = None
self.array_re = re.compile(r"([^[]+)\[([^]]+)\]")
[docs] def fielder(
self, structure: Dict[str, Any], fields_list: List[str]
) -> Dict[str, str]:
"""
Return data from a subset of fields of some item
:param structure: The item structure to report.
:param fields_list: The list of fields which should be returned.
:return: The same item with only the given subset of information.
"""
item: Dict[str, Any] = {}
for field in fields_list:
internal = self.array_re.search(field)
# check if field is primary field
if field in list(structure.keys()):
item[field] = structure[field]
# check if subfield in 'interfaces' field
elif internal and internal.group(1) in list(structure.keys()):
outer = internal.group(1)
inner = internal.group(2)
if isinstance(structure[outer], dict) and inner in structure[outer]:
item[field] = structure[outer][inner]
elif "interfaces" in list(structure.keys()):
for device in list(structure["interfaces"].keys()):
if field in structure["interfaces"][device]:
item[field] = (
device + ": " + structure["interfaces"][device][field]
)
return item
[docs] def reporting_csv(
self, info: List[Dict[str, str]], order: List[Any], noheaders: bool
) -> str:
"""
Formats data on 'info' for csv output
:param info: The list of iteratable items for csv output.
:param order: The list of fields which are available in the csv file.
:param noheaders: Whether headers are printed to the output or not.
:return: The string with the csv.
"""
outputheaders = ""
outputbody = ""
sep = ","
info_count = 0
for item in info:
item_count = 0
for key in order:
if info_count == 0:
outputheaders += str(key) + sep
if key in list(item.keys()):
outputbody += str(item[key]) + sep
else:
outputbody += "-" + sep
item_count += 1
info_count += 1
outputbody += "\n"
outputheaders += "\n"
if noheaders:
outputheaders = ""
return outputheaders + outputbody
[docs] def reporting_trac(
self, info: List[Dict[str, str]], order: List[Any], noheaders: bool
) -> str:
"""
Formats data on 'info' for trac wiki table output
:param info: The list of iteratable items for table output.
:param order: The list of fields which are available in the table file.
:param noheaders: Whether headers are printed to the output or not.
:return: The string with the generated table.
"""
outputheaders = ""
outputbody = ""
sep = "||"
info_count = 0
for item in info:
item_count = 0
for key in order:
if info_count == 0:
outputheaders += sep + str(key)
if key in list(item.keys()):
outputbody += sep + str(item[key])
else:
outputbody += sep + "-"
item_count = item_count + 1
info_count = info_count + 1
outputbody += "||\n"
outputheaders += "||\n"
if noheaders:
outputheaders = ""
return outputheaders + outputbody
[docs] def reporting_doku(
self, info: List[Dict[str, str]], order: List[Any], noheaders: bool
) -> str:
"""
Formats data on 'info' for doku wiki table output
:param info: The list of iteratable items for table output.
:param order: The list of fields which are available in the table file.
:param noheaders: Whether headers are printed to the output or not.
:return: The string with the generated table.
"""
outputheaders = ""
outputbody = ""
sep1 = "^"
sep2 = "|"
info_count = 0
for item in info:
item_count = 0
for key in order:
if info_count == 0:
outputheaders += sep1 + key
if key in list(item.keys()):
outputbody += sep2 + item[key]
else:
outputbody += sep2 + "-"
item_count = item_count + 1
info_count = info_count + 1
outputbody += sep2 + "\n"
outputheaders += sep1 + "\n"
if noheaders:
outputheaders = ""
return outputheaders + outputbody
[docs] @staticmethod
def reporting_print_sorted(collection: "Collection[ITEM]") -> None:
"""
Prints all objects in a collection sorted by name
:param collection: The collection to print.
"""
result = [x for x in collection]
result.sort(key=lambda x: x.name)
for item in result:
print(item.to_string()) # type: ignore
[docs] @staticmethod
def reporting_list_names2(collection: "Collection[ITEM]", name: str) -> None:
"""
Prints a specific object in a collection.
:param collection: The collections object to print a collection from.
:param name: The name of the collection to print.
"""
obj = collection.get(name)
if obj is not None:
print(obj.to_string()) # type: ignore
[docs] def reporting_print_all_fields(
self,
collection: "Collection[ITEM]",
report_name: str,
report_type: str,
report_noheaders: bool,
) -> None:
"""
Prints all fields in a collection as a table given the report type
:param collection: The collection to report.
:param report_name: The name of the report.
:param report_type: The type of report to give.
:param report_noheaders: Report without the headers. (May be useful for machine parsing)
:return: A report with all fields included pretty printed or machine readable.
"""
# per-item hack
if report_name:
collection = collection.find(name=report_name) # type: ignore
if collection:
collection = [collection] # type: ignore
else:
return None
collection = [x for x in collection] # type: ignore
collection.sort(key=lambda x: x.name) # type: ignore
data: List[Dict[str, str]] = []
out_order = []
count = 0
for item_obj in collection:
item: Dict[str, str] = {}
if item_obj.ITEM_TYPE == "settings": # type: ignore
structure = item_obj.to_dict()
else:
structure = item_obj.to_list() # type: ignore
for (key, value) in list(structure.items()): # type: ignore
# exception for systems which could have > 1 interface
if key == "interfaces":
for (device, info) in list(value.items()):
for (info_header, info_value) in list(info.items()):
item[info_header] = str(device) + ": " + str(info_value)
# needs to create order list for print_formatted_fields
if count == 0:
out_order.append(info_header) # type: ignore
else:
item[key] = value
# needs to create order list for print_formatted_fields
if count == 0:
out_order.append(key) # type: ignore
count = count + 1
data.append(item)
self.print_formatted_data(
data=data,
order=out_order,
report_type=report_type,
noheaders=report_noheaders,
)
[docs] def reporting_print_x_fields(
self,
collection: "Collection[ITEM]",
report_name: str,
report_type: str,
report_fields: str,
report_noheaders: bool,
) -> None:
"""
Prints specific fields in a collection as a table given the report type
:param collection: The collection to report.
:param report_name: The name of the report.
:param report_type: The type of report to give.
:param report_fields: The fields which should be included in the report.
:param report_noheaders: Report without the headers. (May be useful for machine parsing)
"""
# per-item hack
if report_name:
search_results: List["ITEM"] = collection.find(
name=report_name, return_list=True
) # type: ignore
if search_results:
search_results = [search_results] # type: ignore
else:
return
search_results = [x for x in collection]
search_results.sort(key=lambda x: x.name)
data: List[Dict[str, str]] = []
fields_list = report_fields.replace(" ", "").split(",")
for item in search_results:
if item.ITEM_TYPE == "settings": # type: ignore
structure = item.to_dict()
else:
structure = item.to_list() # type: ignore
item = self.fielder(structure, fields_list) # type: ignore
data.append(item)
self.print_formatted_data(
data=data,
order=fields_list,
report_type=report_type,
noheaders=report_noheaders,
)
# -------------------------------------------------------
[docs] def run(
self,
report_what: str = "",
report_name: str = "",
report_type: str = "",
report_fields: str = "",
report_noheaders: bool = False,
) -> None:
"""
Get remote profiles and distros and sync them locally
1. Handles original report output
2. Handles all fields of report outputs as table given a format
3. Handles specific fields of report outputs as table given a format
:param report_what: What should be reported. May be "all".
:param report_name: The name of the report.
:param report_type: The type of report to give.
:param report_fields: The fields which should be included in the report.
:param report_noheaders: Report without the headers. (May be useful for machine parsing)
"""
if report_type == "text" and report_fields == "all":
for collection_name in [
"distro",
"profile",
"system",
"repo",
"network",
"image",
]:
if report_what in (
"all",
collection_name,
f"{collection_name}s",
f"{collection_name}es",
):
if report_name:
self.reporting_list_names2(
self.api.get_items(collection_name), report_name # type: ignore
)
else:
self.reporting_print_sorted(self.api.get_items(collection_name)) # type: ignore
elif report_type == "text" and report_fields != "all":
utils.die("The 'text' type can only be used with field set to 'all'")
elif report_type != "text" and report_fields == "all":
for collection_name in [
"distro",
"profile",
"system",
"repo",
"network",
"image",
]:
if report_what in (
"all",
collection_name,
f"{collection_name}s",
f"{collection_name}es",
):
self.reporting_print_all_fields(
self.api.get_items(collection_name), # type: ignore
report_name,
report_type,
report_noheaders,
)
else:
for collection_name in [
"distro",
"profile",
"system",
"repo",
"network",
"image",
]:
if report_what in (
"all",
collection_name,
f"{collection_name}s",
f"{collection_name}es",
):
self.reporting_print_x_fields(
self.api.get_items(collection_name), # type: ignore
report_name,
report_type,
report_fields,
report_noheaders,
)