"""
Builds out and synchronizes yum repo mirrors.
Initial support for rsync, perhaps reposync coming later.
"""
# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: Copyright 2006-2007, Red Hat, Inc and Others
# SPDX-FileCopyrightText: Michael DeHaan <michael.dehaan AT gmail>
import logging
import os
import os.path
import shlex
import shutil
import stat
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
from cobbler import utils
from cobbler.cexceptions import CX
from cobbler.enums import MirrorType, RepoArchs, RepoBreeds
from cobbler.utils import filesystem_helpers, os_release
if TYPE_CHECKING:
from cobbler.api import CobblerAPI
from cobbler.items.repo import Repo
try:
import librepo # type: ignore
HAS_LIBREPO = True
except ModuleNotFoundError:
# This is a constant since it is only defined once.
HAS_LIBREPO = False # type: ignore
[docs]def repo_walker(
top: str, func: Callable[[Any, str, List[str]], None], arg: Any
) -> None:
"""
Directory tree walk with callback function.
For each directory in the directory tree rooted at top (including top itself, but excluding '.' and '..'), call
func(arg, dirname, fnames). dirname is the name of the directory, and fnames a list of the names of the files and
subdirectories in dirname (excluding '.' and '..'). func may modify the fnames list in-place (e.g. via del or slice
assignment), and walk will only recurse into the subdirectories whose names remain in fnames; this can be used to
implement a filter, or to impose a specific order of visiting. No semantics are defined for, or required of, arg,
beyond that arg is always passed to func. It can be used, e.g., to pass a filename pattern, or a mutable object
designed to accumulate statistics. Passing None for arg is common.
:param top: The directory that should be taken as root. The root dir will also be included in the processing.
:param func: The function that should be executed.
:param arg: The arguments for that function.
"""
try:
names = os.listdir(top)
# The order of the return is not guaranteed and seems to depend on the fileystem thus sort the list
names.sort()
except os.error:
return
func(arg, top, names)
for name in names:
path_name = os.path.join(top, name)
try:
file_stats = os.lstat(path_name)
except os.error:
continue
if stat.S_ISDIR(file_stats.st_mode):
repo_walker(path_name, func, arg)
[docs]class RepoSync:
"""
Handles conversion of internal state to the tftpboot tree layout.
"""
# ==================================================================================
def __init__(self, api: "CobblerAPI", tries: int = 1, nofail: bool = False) -> None:
"""
Constructor
:param api: The object which holds all information in Cobbler.
:param tries: The number of tries before the operation fails.
:param nofail: This sets the strictness of the reposync result handling.
"""
self.verbose = True
self.api = api
self.settings = api.settings()
self.repos = api.repos()
self.rflags = self.settings.reposync_flags.split()
self.tries = tries
self.nofail = nofail
self.logger = logging.getLogger()
self.logger.info("hello, reposync")
# ===================================================================
[docs] def run(self, name: Optional[str] = None, verbose: bool = True) -> None:
"""
Syncs the current repo configuration file with the filesystem.
:param name: The name of the repository to synchronize.
:param verbose: If the action should be logged verbose or not.
"""
self.logger.info("run, reposync, run!")
self.verbose = verbose
report_failure = False
for repo in self.repos:
if name is not None and repo.name != name:
# Invoked to sync only a specific repo, this is not the one
continue
if name is None and not repo.keep_updated:
# Invoked to run against all repos, but this one is off
self.logger.info("%s is set to not be updated", repo.name)
continue
repo_mirror = os.path.join(self.settings.webdir, "repo_mirror")
repo_path = os.path.join(repo_mirror, repo.name)
if not os.path.isdir(repo_path) and not repo.mirror.lower().startswith(
"rhn://"
):
os.makedirs(repo_path)
# Set the environment keys specified for this repo and save the old one if they modify an existing variable.
env = repo.environment
old_env: Dict[str, Any] = {}
for k in list(env.keys()):
self.logger.debug("setting repo environment: %s=%s", k, env[k])
if env[k] is not None:
if os.getenv(k):
old_env[k] = os.getenv(k)
else:
os.environ[k] = env[k]
# Which may actually NOT reposync if the repo is set to not mirror locally but that's a technicality.
success = False
for reposync_try in range(self.tries + 1, 1, -1):
try:
self.sync(repo)
success = True
break
except Exception:
success = False
utils.log_exc()
self.logger.warning(
"reposync failed, tries left: %s", (reposync_try - 2)
)
# Cleanup/restore any environment variables that were added or changed above.
for k in list(env.keys()):
if env[k] is not None:
if k in old_env:
self.logger.debug(
"resetting repo environment: %s=%s", k, old_env[k]
)
os.environ[k] = old_env[k]
else:
self.logger.debug("removing repo environment: %s=%s", k, env[k])
del os.environ[k]
if not success:
report_failure = True
if not self.nofail:
raise CX("reposync failed, retry limit reached, aborting")
self.logger.error("reposync failed, retry limit reached, skipping")
self.update_permissions(repo_path)
if report_failure:
raise CX("overall reposync failed, at least one repo failed to synchronize")
# ==================================================================================
[docs] def sync(self, repo: "Repo") -> None:
"""
Conditionally sync a repo, based on type.
:param repo: The repo to sync.
"""
if repo.breed == RepoBreeds.RHN:
self.rhn_sync(repo)
elif repo.breed == RepoBreeds.YUM:
self.yum_sync(repo)
elif repo.breed == RepoBreeds.APT:
self.apt_sync(repo)
elif repo.breed == RepoBreeds.RSYNC:
self.rsync_sync(repo)
elif repo.breed == RepoBreeds.WGET:
self.wget_sync(repo)
else:
raise CX(
f"unable to sync repo ({repo.name}), unknown or unsupported repo type ({repo.breed.value})"
)
# ====================================================================================
[docs] def librepo_getinfo(self, dirname: str) -> Dict[Any, Any]:
"""
Used to get records from a repomd.xml file of downloaded rpmmd repository.
:param dirname: The local path of rpmmd repository.
:return: The dict representing records from a repomd.xml file of rpmmd repository.
"""
# FIXME: Librepo has no typing stubs.
librepo_handle = librepo.Handle() # type: ignore
librepo_result = librepo.Result() # type: ignore
librepo_handle.setopt(librepo.LRO_REPOTYPE, librepo.LR_YUMREPO) # type: ignore
librepo_handle.setopt(librepo.LRO_URLS, [dirname]) # type: ignore
librepo_handle.setopt(librepo.LRO_LOCAL, True) # type: ignore
librepo_handle.setopt(librepo.LRO_CHECKSUM, True) # type: ignore
librepo_handle.setopt(librepo.LRO_IGNOREMISSING, True) # type: ignore
try:
librepo_handle.perform(librepo_result) # type: ignore
except librepo.LibrepoException as error: # type: ignore
raise CX("librepo error: " + dirname + " - " + error.args[1]) from error # type: ignore
return librepo_result.getinfo(librepo.LRR_RPMMD_REPOMD).get("records", {}) # type: ignore
# ====================================================================================
[docs] def createrepo_walker(self, repo: "Repo", dirname: str, fnames: Any) -> None:
"""
Used to run createrepo on a copied Yum mirror.
:param repo: The repository object to run for.
:param dirname: The directory to run in.
:param fnames: Not known what this is for.
"""
if os.path.exists(dirname) or repo.breed == RepoBreeds.RSYNC:
utils.remove_yum_olddata(dirname)
# add any repo metadata we can use
mdoptions: List[str] = []
origin_path = os.path.join(dirname, ".origin")
repodata_path = os.path.join(origin_path, "repodata")
if os.path.isfile(os.path.join(repodata_path, "repomd.xml")):
repo_data = self.librepo_getinfo(origin_path)
if "group" in repo_data:
groupmdfile = repo_data["group"]["location_href"]
mdoptions += ["-g", os.path.join(origin_path, groupmdfile)]
if "prestodelta" in repo_data:
# need createrepo >= 0.9.7 to add deltas
if utils.get_family() in ("redhat", "suse"):
cmd = [
"/usr/bin/rpmquery",
"--queryformat=%{VERSION}",
"createrepo",
]
createrepo_ver = utils.subprocess_get(cmd, shell=False)
if not createrepo_ver[0:1].isdigit():
cmd = [
"/usr/bin/rpmquery",
"--queryformat=%{VERSION}",
"createrepo_c",
]
createrepo_ver = utils.subprocess_get(cmd, shell=False)
if utils.compare_versions_gt(createrepo_ver, "0.9.7"):
mdoptions.append("--deltas")
else:
self.logger.error(
"this repo has presto metadata; you must upgrade createrepo to >= 0.9.7 "
"first and then need to resync the repo through Cobbler."
)
blended = utils.blender(self.api, False, repo)
flags = blended.get("createrepo_flags", "(ERROR: FLAGS)").split()
try:
cmd = ["createrepo"] + mdoptions + flags + [shlex.quote(dirname)]
utils.subprocess_call(cmd, shell=False)
except Exception:
utils.log_exc()
self.logger.error("createrepo failed.")
del fnames[:] # we're in the right place
# ====================================================================================
[docs] def wget_sync(self, repo: "Repo") -> None:
"""
Handle mirroring of directories using wget
:param repo: The repo object to sync via wget.
"""
mirror_program = "/usr/bin/wget"
if not os.path.exists(mirror_program):
raise CX(f"no {mirror_program} found, please install it")
if repo.mirror_type != MirrorType.BASEURL:
raise CX(
"mirrorlist and metalink mirror types is not supported for wget'd repositories"
)
if repo.rpm_list not in ("", []):
self.logger.warning("--rpm-list is not supported for wget'd repositories")
dest_path = os.path.join(self.settings.webdir, "repo_mirror", repo.name)
# FIXME: wrapper for subprocess that logs to logger
cmd = [
"wget",
"-N",
"-np",
"-r",
"-l",
"inf",
"-nd",
"-P",
shlex.quote(dest_path),
shlex.quote(repo.mirror),
]
return_value = utils.subprocess_call(cmd, shell=False)
if return_value != 0:
raise CX("cobbler reposync failed")
repo_walker(dest_path, self.createrepo_walker, repo)
self.create_local_file(dest_path, repo)
# ====================================================================================
[docs] def rsync_sync(self, repo: "Repo") -> None:
"""
Handle copying of rsync:// and rsync-over-ssh repos.
:param repo: The repo to sync via rsync.
"""
if not repo.mirror_locally:
raise CX(
"rsync:// urls must be mirrored locally, yum cannot access them directly"
)
if repo.mirror_type != MirrorType.BASEURL:
raise CX(
"mirrorlist and metalink mirror types is not supported for rsync'd repositories"
)
if repo.rpm_list not in ("", []):
self.logger.warning("--rpm-list is not supported for rsync'd repositories")
dest_path = os.path.join(self.settings.webdir, "repo_mirror", repo.name)
spacer: List[str] = []
if not repo.mirror.startswith("rsync://") and not repo.mirror.startswith("/"):
spacer = ["-e ssh"]
if not repo.mirror.endswith("/"):
repo.mirror = f"{repo.mirror}/"
flags: List[str] = []
for repo_option in repo.rsyncopts:
if repo.rsyncopts[repo_option]:
flags.append(f"{repo_option} {repo.rsyncopts[repo_option]}")
else:
flags.append(f"{repo_option}")
if not flags:
flags = self.settings.reposync_rsync_flags.split()
cmd = ["rsync"] + flags + ["--delete-after"]
cmd += spacer + [
"--delete",
"--exclude-from=/etc/cobbler/rsync.exclude",
shlex.quote(repo.mirror),
shlex.quote(dest_path),
]
return_code = utils.subprocess_call(cmd, shell=False)
if return_code != 0:
raise CX("cobbler reposync failed")
# If ran in archive mode then repo should already contain all repodata and does not need createrepo run
archive = False
if "--archive" in flags:
archive = True
else:
# skip all flags --{options} as we need to look for combined flags like -vaH
for option in flags:
if option.startswith("--"):
pass
else:
if "a" in option:
archive = True
break
if not archive:
repo_walker(dest_path, self.createrepo_walker, repo)
self.create_local_file(dest_path, repo)
# ====================================================================================
[docs] @staticmethod
def reposync_cmd() -> List[str]:
"""
Determine reposync command
:return: The path to the reposync command. If dnf exists it is used instead of reposync.
"""
if not HAS_LIBREPO:
raise CX("no librepo found, please install python3-librepo")
if os.path.exists("/usr/bin/reposync"):
cmd = ["/usr/bin/reposync"]
# DNF5 does not have a reposync subcommand
elif os.path.exists("/usr/bin/dnf"):
cmd = ["/usr/bin/dnf", "reposync"]
else:
# Warn about not having yum-utils. We don't want to require it in the package because Fedora 22+ has moved
# to dnf.
raise CX("no /usr/bin/reposync found, please install yum-utils")
return cmd
# ====================================================================================
[docs] def rhn_sync(self, repo: "Repo") -> None:
"""
Handle mirroring of RHN repos.
:param repo: The repo object to synchronize.
"""
# flag indicating not to pull the whole repo
has_rpm_list = False
# detect cases that require special handling
if repo.rpm_list not in ("", []):
has_rpm_list = True
# Create yum config file for use by reposync
# FIXME: don't hardcode
repos_path = os.path.join(self.settings.webdir, "repo_mirror")
dest_path = os.path.join(repos_path, repo.name)
temp_path = os.path.join(dest_path, ".origin")
if not os.path.isdir(temp_path):
# FIXME: there's a chance this might break the RHN D/L case
os.makedirs(temp_path)
# how we invoke reposync depends on whether this is RHN content or not.
# This is the somewhat more-complex RHN case.
# NOTE: this requires that you have entitlements for the server and you give the mirror as rhn://$channelname
if not repo.mirror_locally:
raise CX("rhn:// repos do not work with --mirror-locally=False")
if has_rpm_list:
self.logger.warning("warning: --rpm-list is not supported for RHN content")
rest = repo.mirror[6:] # everything after rhn://
if repo.name != rest:
args = {"name": repo.name, "rest": rest}
raise CX(
"ERROR: repository %(name)s needs to be renamed %(rest)s as the name of the cobbler repository "
"must match the name of the RHN channel" % args
)
arch = repo.arch.value
if arch == "i386":
# Counter-intuitive, but we want the newish kernels too
arch = "i686"
cmd = self.reposync_cmd()
cmd += self.rflags + [
f"--repo={shlex.quote(rest)}",
f"--download-path={shlex.quote(repos_path)}",
]
if arch != "none":
cmd.append(f'--arch="{arch}"')
# Now regardless of whether we're doing yumdownloader or reposync or whether the repo was http://, ftp://, or
# rhn://, execute all queued commands here. Any failure at any point stops the operation.
if repo.mirror_locally:
utils.subprocess_call(cmd, shell=False)
# Some more special case handling for RHN. Create the config file now, because the directory didn't exist
# earlier.
self.create_local_file(temp_path, repo, output=False)
# Now run createrepo to rebuild the index
if repo.mirror_locally:
repo_walker(dest_path, self.createrepo_walker, repo)
# Create the config file the hosts will use to access the repository.
self.create_local_file(dest_path, repo)
# ====================================================================================
[docs] def gen_urlgrab_ssl_opts(
self, yumopts: Dict[str, Any]
) -> Tuple[Optional[Tuple[Any, ...]], bool]:
"""
This function translates yum repository options into the appropriate options for python-requests
:param yumopts: The options to convert.
:return: A tuple with the cert and a boolean if it should be verified or not.
"""
# use SSL options if specified in yum opts
cert = None
sslcacert = None
verify = False
if "sslcacert" in yumopts:
sslcacert = yumopts["sslcacert"]
if "sslclientkey" and "sslclientcert" in yumopts:
cert = (sslcacert, yumopts["sslclientcert"], yumopts["sslclientkey"])
# Note that the default of requests is to verify the peer and host but the default here is NOT to verify them
# unless sslverify is explicitly set to 1 in yumopts.
if "sslverify" in yumopts:
verify = yumopts["sslverify"] == 1
return cert, verify
# ====================================================================================
[docs] def yum_sync(self, repo: "Repo") -> None:
"""
Handle copying of http:// and ftp:// yum repos.
:param repo: The yum reporitory to sync.
"""
# create the config file the hosts will use to access the repository.
repos_path = os.path.join(self.settings.webdir, "repo_mirror")
dest_path = os.path.join(repos_path, repo.name)
self.create_local_file(dest_path, repo)
if not repo.mirror_locally:
return
# command to run
cmd = self.reposync_cmd()
# flag indicating not to pull the whole repo
has_rpm_list = False
# detect cases that require special handling
if repo.rpm_list not in ("", []):
has_rpm_list = True
# create yum config file for use by reposync
temp_path = os.path.join(dest_path, ".origin")
if not os.path.isdir(temp_path):
# FIXME: there's a chance this might break the RHN D/L case
os.makedirs(temp_path)
temp_file = self.create_local_file(temp_path, repo, output=False)
arch = repo.arch.value
if arch == "i386":
# Counter-intuitive, but we want the newish kernels too
arch = "i686"
if not has_rpm_list:
# If we have not requested only certain RPMs, use reposync
cmd += self.rflags + [
f"--config={temp_file}",
f"--repoid={shlex.quote(repo.name)}",
f"--download-path={shlex.quote(repos_path)}",
]
if arch != "none":
cmd.append(f"--arch={arch}")
else:
# Create the output directory if it doesn't exist
if not os.path.exists(dest_path):
os.makedirs(dest_path)
# Older yumdownloader sometimes explodes on --resolvedeps if this happens to you, upgrade yum & yum-utils
extra_flags = self.settings.yumdownloader_flags.split()
cmd = [
"/usr/bin/dnf",
"download",
] + extra_flags
if arch == "src":
cmd.append("--source")
cmd += [
"--disablerepo=*",
f"--enablerepo={shlex.quote(repo.name)}",
f"-c={temp_file}",
f"--destdir={shlex.quote(dest_path)}",
]
cmd += repo.rpm_list
# Now regardless of whether we're doing yumdownloader or reposync or whether the repo was http://, ftp://, or
# rhn://, execute all queued commands here. Any failure at any point stops the operation.
return_code = utils.subprocess_call(cmd, shell=False)
if return_code != 0:
raise CX("cobbler reposync failed")
# download any metadata we can use
proxy = None
if repo.proxy not in ("<<None>>", ""):
proxy = repo.proxy
(cert, verify) = self.gen_urlgrab_ssl_opts(repo.yumopts)
repodata_path = os.path.join(dest_path, "repodata")
repomd_path = os.path.join(repodata_path, "repomd.xml")
if os.path.exists(repodata_path) and not os.path.isfile(repomd_path):
shutil.rmtree(repodata_path, ignore_errors=False, onerror=None)
repodata_path = os.path.join(temp_path, "repodata")
if os.path.exists(repodata_path):
self.logger.info("Deleted old repo metadata for %s", repodata_path)
shutil.rmtree(repodata_path, ignore_errors=False, onerror=None)
librepo_handle = librepo.Handle() # type: ignore
librepo_result = librepo.Result() # type: ignore
librepo_handle.setopt(librepo.LRO_REPOTYPE, librepo.LR_YUMREPO) # type: ignore
librepo_handle.setopt(librepo.LRO_CHECKSUM, True) # type: ignore
librepo_handle.setopt(librepo.LRO_DESTDIR, temp_path) # type: ignore
if repo.mirror_type == MirrorType.METALINK:
librepo_handle.setopt(librepo.LRO_METALINKURL, repo.mirror) # type: ignore
elif repo.mirror_type == MirrorType.MIRRORLIST:
librepo_handle.setopt(librepo.LRO_MIRRORLISTURL, repo.mirror) # type: ignore
elif repo.mirror_type == MirrorType.BASEURL:
librepo_handle.setopt(librepo.LRO_URLS, [repo.mirror]) # type: ignore
if verify:
librepo_handle.setopt(librepo.LRO_SSLVERIFYPEER, True) # type: ignore
librepo_handle.setopt(librepo.LRO_SSLVERIFYHOST, True) # type: ignore
if cert:
sslcacert, sslclientcert, sslclientkey = cert
librepo_handle.setopt(librepo.LRO_SSLCACERT, sslcacert) # type: ignore
librepo_handle.setopt(librepo.LRO_SSLCLIENTCERT, sslclientcert) # type: ignore
librepo_handle.setopt(librepo.LRO_SSLCLIENTKEY, sslclientkey) # type: ignore
if proxy:
librepo_handle.setopt(librepo.LRO_PROXY, proxy) # type: ignore
librepo_handle.setopt(librepo.LRO_PROXYTYPE, librepo.PROXY_HTTP) # type: ignore
try:
librepo_handle.perform(librepo_result) # type: ignore
except librepo.LibrepoException as exception: # type: ignore
raise CX(
"librepo error: " + temp_path + " - " + exception.args[1] # type: ignore
) from exception
# now run createrepo to rebuild the index
if repo.mirror_locally:
repo_walker(dest_path, self.createrepo_walker, repo)
# ====================================================================================
[docs] def apt_sync(self, repo: "Repo") -> None:
"""
Handle copying of http:// and ftp:// debian repos.
:param repo: The apt repository to sync.
"""
# Warn about not having mirror program.
mirror_program = "/usr/bin/debmirror"
if not os.path.exists(mirror_program):
raise CX(f"no {mirror_program} found, please install it")
# detect cases that require special handling
if repo.rpm_list not in ("", []):
raise CX("has_rpm_list not yet supported on apt repos")
if repo.arch == RepoArchs.NONE:
raise CX("Architecture is required for apt repositories")
if repo.mirror_type != MirrorType.BASEURL:
raise CX(
"mirrorlist and metalink mirror types is not supported for apt repositories"
)
# built destination path for the repo
dest_path = os.path.join(self.settings.webdir, "repo_mirror", repo.name)
if repo.mirror_locally:
# NOTE: Dropping @@suite@@ replace as it is also dropped from "from manage_import_debian"_ubuntu.py due that
# repo has no os_version attribute. If it is added again it will break the Web UI!
# mirror = repo.mirror.replace("@@suite@@",repo.os_version)
mirror = repo.mirror
idx = mirror.find("://")
method = mirror[:idx]
mirror = mirror[idx + 3 :]
idx = mirror.find("/")
host = mirror[:idx]
mirror = mirror[idx:]
dists = ",".join(repo.apt_dists)
components = ",".join(repo.apt_components)
mirror_data = [
f"--method={shlex.quote(method)}",
f"--host={shlex.quote(host)}",
f"--root={shlex.quote(mirror)}",
f"--dist={shlex.quote(dists)}",
f"--section={shlex.quote(components)}",
]
rflags = ["--nocleanup"]
for repo_yumoption in repo.yumopts:
if repo.yumopts[repo_yumoption]:
rflags.append(f"{repo_yumoption}={repo.yumopts[repo_yumoption]}")
else:
rflags.append(repo_yumoption)
cmd = [mirror_program] + rflags + mirror_data + [shlex.quote(dest_path)]
if repo.arch == RepoArchs.SRC:
cmd.append("--source")
else:
arch = repo.arch.value
if arch == "x86_64":
arch = "amd64" # FIX potential arch errors
cmd.append("--nosource")
cmd.append(f"-a={arch}")
# Set's an environment variable for subprocess, otherwise debmirror will fail as it needs this variable to
# exist.
# FIXME: might this break anything? So far it doesn't
os.putenv("HOME", "/var/lib/cobbler")
return_code = utils.subprocess_call(cmd, shell=False)
if return_code != 0:
raise CX("cobbler reposync failed")
[docs] def create_local_file(
self, dest_path: str, repo: "Repo", output: bool = True
) -> str:
"""
Creates Yum config files for use by reposync
Two uses:
(A) output=True, Create local files that can be used with yum on provisioned clients to make use of this mirror.
(B) output=False, Create a temporary file for yum to feed into yum for mirroring
:param dest_path: The destination path to create the file at.
:param repo: The repository object to create a file for.
:param output: See described above.
:return: The name of the file which was written.
"""
# The output case will generate repo configuration files which are usable for the installed systems. They need
# to be made compatible with --server-override which means they are actually templates, which need to be
# rendered by a Cobbler-sync on per profile/system basis.
if output:
fname = os.path.join(dest_path, "config.repo")
else:
fname = os.path.join(dest_path, f"{repo.name}.repo")
self.logger.debug("creating: %s", fname)
if not os.path.exists(dest_path):
filesystem_helpers.mkdir(dest_path)
with open(fname, "w", encoding="UTF-8") as config_file:
if not output:
config_file.write("[main]\nreposdir=/dev/null\n")
config_file.write(f"[{repo.name}]\n")
config_file.write(f"name={repo.name}\n")
optenabled = False
optgpgcheck = False
if output:
if repo.mirror_locally:
protocol = self.api.settings().autoinstall_scheme
line = f"baseurl={protocol}://${{http_server}}/cobbler/repo_mirror/{repo.name}\n"
else:
mstr = repo.mirror
if mstr.startswith("/"):
mstr = f"file://{mstr}"
line = f"{repo.mirror_type.value}={mstr}\n"
config_file.write(line)
# User may have options specific to certain yum plugins add them to the file
for repo_yumoption in repo.yumopts:
if repo_yumoption == "enabled":
optenabled = True
if repo_yumoption == "gpgcheck":
optgpgcheck = True
else:
mstr = repo.mirror
if mstr.startswith("/"):
mstr = f"file://{mstr}"
line = repo.mirror_type.value + f"={mstr}\n"
if self.settings.http_port not in (80, "80"):
http_server = f"{self.settings.server}:{self.settings.http_port}"
else:
http_server = self.settings.server
line = line.replace("@@server@@", http_server)
config_file.write(line)
config_proxy = None
if repo.proxy == "<<inherit>>":
config_proxy = self.settings.proxy_url_ext
elif repo.proxy not in ("", "<<None>>"):
config_proxy = repo.proxy
if config_proxy is not None:
config_file.write(f"proxy={config_proxy}\n")
if "exclude" in list(repo.yumopts.keys()):
self.logger.debug("excluding: %s", repo.yumopts["exclude"])
config_file.write(f"exclude={repo.yumopts['exclude']}\n")
if not optenabled:
config_file.write("enabled=1\n")
config_file.write(f"priority={repo.priority}\n")
# FIXME: potentially might want a way to turn this on/off on a per-repo basis
if not optgpgcheck:
config_file.write("gpgcheck=0\n")
# user may have options specific to certain yum plugins
# add them to the file
for repo_yumoption in repo.yumopts:
if not (
output and repo.mirror_locally and repo_yumoption.startswith("ssl")
):
config_file.write(
f"{repo_yumoption}={repo.yumopts[repo_yumoption]}\n"
)
return fname
# ==================================================================================
[docs] def update_permissions(self, repo_path: str) -> None:
"""
Verifies that permissions and contexts after an rsync are as expected.
Sending proper rsync flags should prevent the need for this, though this is largely a safeguard.
:param repo_path: The path to update the permissions of.
"""
# all_path = os.path.join(repo_path, "*")
owner = "root:apache"
(dist, _) = os_release()
if dist == "suse":
owner = "root:www"
elif dist in ("debian", "ubuntu"):
owner = "root:www-data"
cmd1 = ["chown", "-R", owner, repo_path]
utils.subprocess_call(cmd1, shell=False)
cmd2 = ["chmod", "-R", "755", repo_path]
utils.subprocess_call(cmd2, shell=False)