"""
TODO
"""
import binascii
import logging
import os
import re
import tempfile
from typing import TYPE_CHECKING, Any, Dict, Optional
from cobbler import templar, tftpgen, utils
from cobbler.utils import filesystem_helpers
try:
import hivex # type: ignore
import pefile # type: ignore
from hivex.hive_types import REG_BINARY # type: ignore
from hivex.hive_types import REG_DWORD # type: ignore
from hivex.hive_types import REG_MULTI_SZ # type: ignore
from hivex.hive_types import REG_SZ # type: ignore
HAS_HIVEX = True
except Exception:
# This is only defined once in each case.
HAS_HIVEX = False # type: ignore
if TYPE_CHECKING:
from cobbler.api import CobblerAPI
from cobbler.items.distro import Distro
ANSWERFILE_TEMPLATE_NAME = "answerfile.template"
POST_INST_CMD_TEMPLATE_NAME = "post_inst_cmd.template"
STARTNET_TEMPLATE_NAME = "startnet.template"
WIMUPDATE = "/usr/bin/wimupdate"
logger = logging.getLogger()
[docs]def register() -> Optional[str]:
"""
This pure python trigger acts as if it were a legacy shell-trigger, but is much faster. The return of this method
indicates the trigger type
:return: Always ``/var/lib/cobbler/triggers/sync/post/*``
"""
if not HAS_HIVEX:
logging.info(
"python3-hivex not found. If you need Automatic Windows Installation support, please install."
)
return None
return "/var/lib/cobbler/triggers/sync/post/*"
[docs]def bcdedit(
orig_bcd: str, new_bcd: str, wim: str, sdi: str, startoptions: Optional[str] = None
):
"""
TODO
:param orig_bcd: TODO
:param new_bcd: TODO
:param wim: TODO
:param sdi: TODO
:param startoptions: TODO
:return: TODO
"""
def winpath_length(wp: str, add: int):
wpl = add + 2 * len(wp)
return wpl.to_bytes((wpl.bit_length() + 7) // 8, "big")
def guid2binary(g: str):
guid = (
g[7]
+ g[8]
+ g[5]
+ g[6]
+ g[3]
+ g[4]
+ g[1]
+ g[2]
+ g[12]
+ g[13]
+ g[10]
+ g[11]
+ g[17]
+ g[18]
)
guid += (
g[15]
+ g[16]
+ g[20]
+ g[21]
+ g[22]
+ g[23]
+ g[25]
+ g[26]
+ g[27]
+ g[28]
+ g[29]
+ g[30]
+ g[31]
)
guid += g[32] + g[33] + g[34] + g[35] + g[36]
return binascii.unhexlify(guid)
wim = wim.replace("/", "\\")
sdi = sdi.replace("/", "\\")
h = hivex.Hivex(orig_bcd, write=True) # type: ignore
root = h.root() # type: ignore
objs = h.node_get_child(root, "Objects") # type: ignore
for n in h.node_children(objs): # type: ignore
h.node_delete_child(n) # type: ignore
b = h.node_add_child(objs, "{9dea862c-5cdd-4e70-acc1-f32b344d4795}") # type: ignore
d = h.node_add_child(b, "Description") # type: ignore
h.node_set_value(d, {"key": "Type", "t": REG_DWORD, "value": b"\x02\x00\x10\x10"}) # type: ignore
e = h.node_add_child(b, "Elements") # type: ignore
e1 = h.node_add_child(e, "25000004") # type: ignore
h.node_set_value( # type: ignore
e1,
{
"key": "Element",
"t": REG_BINARY,
"value": b"\x1e\x00\x00\x00\x00\x00\x00\x00",
},
)
e1 = h.node_add_child(e, "12000004") # type: ignore
h.node_set_value( # type: ignore
e1,
{
"key": "Element",
"t": REG_SZ,
"value": "Windows Boot Manager\0".encode(encoding="utf-16le"),
},
)
e1 = h.node_add_child(e, "24000001") # type: ignore
h.node_set_value( # type: ignore
e1,
{
"key": "Element",
"t": REG_MULTI_SZ,
"value": "{65c31250-afa2-11df-8045-000c29f37d88}\0\0".encode(
encoding="utf-16le"
),
},
)
e1 = h.node_add_child(e, "16000048") # type: ignore
h.node_set_value(e1, {"key": "Element", "t": REG_BINARY, "value": b"\x01"}) # type: ignore
b = h.node_add_child(objs, "{65c31250-afa2-11df-8045-000c29f37d88}") # type: ignore
d = h.node_add_child(b, "Description") # type: ignore
h.node_set_value(d, {"key": "Type", "t": REG_DWORD, "value": b"\x03\x00\x20\x13"}) # type: ignore
e = h.node_add_child(b, "Elements") # type: ignore
e1 = h.node_add_child(e, "12000004") # type: ignore
h.node_set_value( # type: ignore
e1,
{
"key": "Element",
"t": REG_SZ,
"value": "Windows PE\0".encode(encoding="utf-16le"),
},
)
e1 = h.node_add_child(e, "22000002") # type: ignore
h.node_set_value( # type: ignore
e1,
{
"key": "Element",
"t": REG_SZ,
"value": "\\Windows\0".encode(encoding="utf-16le"),
},
)
e1 = h.node_add_child(e, "26000010") # type: ignore
h.node_set_value(e1, {"key": "Element", "t": REG_BINARY, "value": b"\x01"}) # type: ignore
e1 = h.node_add_child(e, "26000022") # type: ignore
h.node_set_value(e1, {"key": "Element", "t": REG_BINARY, "value": b"\x01"}) # type: ignore
e1 = h.node_add_child(e, "11000001") # type: ignore
guid = guid2binary("{ae5534e0-a924-466c-b836-758539a3ee3a}")
wimval = { # type: ignore
"key": "Element",
"t": REG_BINARY,
"value": guid
+ b"\x00\x00\x00\x00\x01\x00\x00\x00"
+ winpath_length(wim, 126)
+ b"\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00"
+ winpath_length(wim, 86)
+ b"\x00\x00\x00\x05\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x48\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00"
+ wim.encode(encoding="utf_16_le")
+ b"\x00\x00",
}
h.node_set_value(e1, wimval) # type: ignore
e1 = h.node_add_child(e, "21000001") # type: ignore
h.node_set_value(e1, wimval) # type: ignore
if startoptions:
e1 = h.node_add_child(e, "12000030") # type: ignore
h.node_set_value( # type: ignore
e1,
{
"key": "Element",
"t": REG_SZ,
"value": startoptions.join("\0").encode(encoding="utf-16le"),
},
)
b = h.node_add_child(objs, "{ae5534e0-a924-466c-b836-758539a3ee3a}") # type: ignore
d = h.node_add_child(b, "Description") # type: ignore
h.node_set_value(d, {"key": "Type", "t": REG_DWORD, "value": b"\x00\x00\x00\x30"}) # type: ignore
e = h.node_add_child(b, "Elements") # type: ignore
e1 = h.node_add_child(e, "12000004") # type: ignore
h.node_set_value( # type: ignore
e1,
{
"key": "Element",
"t": REG_SZ,
"value": "Ramdisk Options\0".encode(encoding="utf-16le"),
},
)
e1 = h.node_add_child(e, "32000004") # type: ignore
h.node_set_value( # type: ignore
e1,
{
"key": "Element",
"t": REG_SZ,
"value": sdi.encode(encoding="utf-16le") + b"\x00\x00",
},
)
e1 = h.node_add_child(e, "31000003") # type: ignore
h.node_set_value( # type: ignore
e1,
{
"key": "Element",
"t": REG_BINARY,
"value": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00"
b"\x00\x00\x00\x00\x48\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00",
},
)
h.commit(new_bcd) # type: ignore
[docs]def run(api: "CobblerAPI", args: Any):
"""
TODO
:param api: TODO
:param args: TODO
:return: TODO
"""
settings = api.settings()
if not settings.windows_enabled:
return 0
if not HAS_HIVEX:
logger.info(
"python3-hivex or python3-pefile not found. If you need Automatic Windows Installation support, "
"please install."
)
return 0
profiles = api.profiles()
systems = api.systems()
templ = templar.Templar(api)
tgen = tftpgen.TFTPGen(api)
with open(
os.path.join(settings.windows_template_dir, POST_INST_CMD_TEMPLATE_NAME),
encoding="UTF-8",
) as template_win:
post_tmpl_data = template_win.read()
with open(
os.path.join(settings.windows_template_dir, ANSWERFILE_TEMPLATE_NAME),
encoding="UTF-8",
) as template_win:
tmpl_data = template_win.read()
with open(
os.path.join(settings.windows_template_dir, STARTNET_TEMPLATE_NAME),
encoding="UTF-8",
) as template_start:
tmplstart_data = template_start.read()
def gen_win_files(distro: "Distro", meta: Dict[str, Any]):
(kernel_path, kernel_name) = os.path.split(distro.kernel)
distro_path = distro.find_distro_path()
distro_dir = wim_file_name = os.path.join(
settings.tftpboot_location, "images", distro.name
)
web_dir = os.path.join(settings.webdir, "images", distro.name)
is_winpe = "winpe" in meta and meta["winpe"] != ""
is_bcd = "bcd" in meta and meta["bcd"] != ""
if "kernel" in meta:
kernel_name = meta["kernel"]
kernel_name = os.path.basename(kernel_name)
is_wimboot = "wimboot" in kernel_name
if is_wimboot:
distro_path = os.path.join(settings.webdir, "distro_mirror", distro.name)
kernel_path = os.path.join(distro_path, "boot")
if "kernel" in meta and "wimboot" not in distro.kernel:
tgen.copy_single_distro_file(
os.path.join(settings.tftpboot_location, kernel_name),
distro_dir,
False,
)
tgen.copy_single_distro_file(
os.path.join(distro_dir, kernel_name), web_dir, True
)
if "post_install_script" in meta:
post_install_dir = distro_path
if distro.os_version not in ("XP", "2003"):
post_install_dir = os.path.join(post_install_dir, "sources")
post_install_dir = os.path.join(post_install_dir, "$OEM$", "$1")
if not os.path.exists(post_install_dir):
filesystem_helpers.mkdir(post_install_dir)
data = templ.render(post_tmpl_data, meta, None)
post_install_script = os.path.join(
post_install_dir, meta["post_install_script"]
)
logger.info("Build post install script: %s", post_install_script)
with open(post_install_script, "w+", encoding="UTF-8") as pi_file:
pi_file.write(data)
if "answerfile" in meta:
data = templ.render(tmpl_data, meta, None)
answerfile_name = os.path.join(distro_dir, meta["answerfile"])
logger.info("Build answer file: %s", answerfile_name)
with open(answerfile_name, "w+", encoding="UTF-8") as answerfile:
answerfile.write(data)
tgen.copy_single_distro_file(answerfile_name, distro_path, False)
tgen.copy_single_distro_file(answerfile_name, web_dir, True)
if "kernel" in meta and "bootmgr" in meta:
wk_file_name = os.path.join(distro_dir, kernel_name)
wl_file_name = os.path.join(distro_dir, meta["bootmgr"])
tl_file_name = os.path.join(kernel_path, "bootmgr.exe")
if distro.os_version in ("XP", "2003") and not is_winpe:
tl_file_name = os.path.join(kernel_path, "setupldr.exe")
if len(meta["bootmgr"]) != 5:
logger.error("The loader name should be EXACTLY 5 character")
return 1
pat1 = re.compile(rb"NTLDR", re.IGNORECASE)
pat2 = re.compile(rb"winnt\.sif", re.IGNORECASE)
with open(tl_file_name, "rb") as file:
out = data = file.read()
if "answerfile" in meta:
if len(meta["answerfile"]) != 9:
logger.error(
"The response file name should be EXACTLY 9 character"
)
return 1
out = pat2.sub(bytes(meta["answerfile"], "utf-8"), data)
else:
if len(meta["bootmgr"]) != 11:
logger.error(
"The Boot manager file name should be EXACTLY 11 character"
)
return 1
bcd_name = "bcd"
if is_bcd:
bcd_name = meta["bcd"]
if len(bcd_name) != 3:
logger.error("The BCD file name should be EXACTLY 3 character")
return 1
if not os.path.isfile(tl_file_name):
logger.error("File not found: %s", tl_file_name)
return 1
pat1 = re.compile(rb"bootmgr\.exe", re.IGNORECASE)
pat2 = re.compile(rb"(\\.B.o.o.t.\\.)(B)(.)(C)(.)(D)", re.IGNORECASE)
bcd_name = bytes(
"\\g<1>"
+ bcd_name[0]
+ "\\g<3>"
+ bcd_name[1]
+ "\\g<5>"
+ bcd_name[2],
"utf-8",
)
with open(tl_file_name, "rb") as file:
out = file.read()
if not is_wimboot:
logger.info("Patching build Loader: %s", wl_file_name)
out = pat2.sub(bcd_name, out)
if tl_file_name != wl_file_name:
logger.info("Build Loader: %s from %s", wl_file_name, tl_file_name)
with open(wl_file_name, "wb+") as file:
file.write(out)
tgen.copy_single_distro_file(wl_file_name, web_dir, True)
if not is_wimboot:
if distro.os_version not in ("XP", "2003") or is_winpe:
pe = pefile.PE(wl_file_name, fast_load=True) # type: ignore
pe.OPTIONAL_HEADER.CheckSum = pe.generate_checksum() # type: ignore
pe.write(filename=wl_file_name) # type: ignore
with open(distro.kernel, "rb") as file:
data = file.read()
out = pat1.sub(bytes(meta["bootmgr"], "utf-8"), data)
if wk_file_name != distro.kernel:
logger.info(
"Build PXEBoot: %s from %s", wk_file_name, distro.kernel
)
with open(wk_file_name, "wb+") as file:
file.write(out)
tgen.copy_single_distro_file(wk_file_name, web_dir, True)
if is_bcd:
obcd_file_name = os.path.join(kernel_path, "bcd")
bcd_file_name = os.path.join(distro_dir, meta["bcd"])
wim_file_name = "winpe.wim"
if not os.path.isfile(obcd_file_name):
logger.error("File not found: %s", obcd_file_name)
return 1
if is_winpe:
wim_file_name = meta["winpe"]
if is_wimboot:
wim_file_name = "\\Boot\\" + wim_file_name
sdi_file_name = "\\Boot\\" + "boot.sdi"
else:
wim_file_name = os.path.join("/images", distro.name, wim_file_name)
sdi_file_name = os.path.join(
"/images", distro.name, os.path.basename(distro.initrd)
)
logger.info(
"Build BCD: %s from %s for %s",
bcd_file_name,
obcd_file_name,
wim_file_name,
)
bcdedit(obcd_file_name, bcd_file_name, wim_file_name, sdi_file_name)
tgen.copy_single_distro_file(bcd_file_name, web_dir, True)
if is_winpe:
ps_file_name = os.path.join(distro_dir, meta["winpe"])
wim_pl_name = os.path.join(kernel_path, "winpe.wim")
cmd = ["/usr/bin/cp", "--reflink=auto", wim_pl_name, ps_file_name]
utils.subprocess_call(cmd, shell=False)
tgen.copy_single_distro_file(ps_file_name, web_dir, True)
if os.path.exists(WIMUPDATE):
data = templ.render(tmplstart_data, meta, None)
with tempfile.NamedTemporaryFile() as pi_file:
pi_file.write(bytes(data, "utf-8"))
pi_file.flush()
cmd = ["/usr/bin/wimdir", ps_file_name, "1"]
wimdir_result = utils.subprocess_get(cmd, shell=False)
wimdir_file_list = wimdir_result.split("\n")
# grep -i for /Windows/System32/startnet.cmd
startnet_path = "/Windows/System32/startnet.cmd"
for file in wimdir_file_list:
if file.lower() == startnet_path.lower():
startnet_path = file
cmd = [
WIMUPDATE,
ps_file_name,
f"--command=add {pi_file.name} {startnet_path}",
]
utils.subprocess_call(cmd, shell=False)
for profile in profiles:
distro: Optional["Distro"] = profile.get_conceptual_parent() # type: ignore
if distro is None:
raise ValueError("Distro not found!")
if distro and distro.breed == "windows":
logger.info("Profile: %s", profile.name)
meta = utils.blender(api, False, profile)
autoinstall_meta = meta.get("autoinstall_meta", {})
meta.update(autoinstall_meta)
gen_win_files(distro, meta)
for system in systems:
profile = system.get_conceptual_parent()
autoinstall_meta = system.autoinstall_meta
if not profile or not autoinstall_meta or autoinstall_meta == {}:
continue
distro = profile.get_conceptual_parent() # type: ignore
if distro and distro.breed == "windows":
logger.info("System: %s", system.name)
meta = utils.blender(api, False, system)
gen_win_files(distro, autoinstall_meta)
return 0