Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 67 additions & 56 deletions src/manage/aliasutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .exceptions import FilesInUseError, NoLauncherTemplateError
from .fsutils import atomic_unlink, ensure_tree, unlink
from .logging import LOGGER
from .pathutils import Path
from .pathutils import Path, relative_to
from .tagutils import install_matches_any

_EXE = ".exe".casefold()
Expand Down Expand Up @@ -105,16 +105,19 @@ def _create_alias(
if windowed:
launcher = cmd.launcherw_exe or launcher

chosen_by = "default"
if plat:
LOGGER.debug("Checking for launcher for platform -%s", plat)
launcher = _if_exists(launcher, f"-{plat}")
chosen_by = "platform tag"
if not launcher.is_file():
LOGGER.debug("Checking for launcher for default platform %s", cmd.default_platform)
launcher = _if_exists(launcher, cmd.default_platform)
chosen_by = "default platform"
if not launcher.is_file():
LOGGER.debug("Checking for launcher for -64")
launcher = _if_exists(launcher, "-64")
LOGGER.debug("Create %s linking to %s using %s", name, target, launcher)
chosen_by = "fallback default"
LOGGER.debug("Create %s for %s using %s, chosen by %s", name,
relative_to(target, getattr(cmd, "install_dir", None)),
launcher, chosen_by)
if not launcher or not launcher.is_file():
raise NoLauncherTemplateError()

Expand All @@ -128,18 +131,62 @@ def _create_alias(
LOGGER.debug("Failed to read %s", launcher, exc_info=True)
return

force = getattr(cmd, "force", False)
existing_bytes = b''
try:
with open(p, 'rb') as f:
existing_bytes = f.read(len(launcher_bytes) + 1)
except FileNotFoundError:
pass
except OSError:
LOGGER.debug("Failed to read existing alias launcher.")
if force:
# Only expect InstallCommand to have .force
unlink(p)
else:
try:
with open(p, 'rb') as f:
existing_bytes = f.read(len(launcher_bytes) + 1)
except FileNotFoundError:
pass
except OSError:
LOGGER.debug("Failed to read existing alias launcher.")

launcher_remap = cmd.scratch.setdefault("aliasutils.create_alias.launcher_remap", {})
if not allow_link or not _link:
# If links are disallowed, always replace the target with a copy.

if existing_bytes != launcher_bytes and allow_link and _link:
# Try to find an existing launcher we can hard-link
launcher2 = launcher_remap.get(launcher.name)
if (not launcher2 or not launcher2.is_file()) and not force:
# None known, so search existing files. Or, user is forcing us, so
# we only want to use an existing launcher if we've cached it this
# session.
try:
LOGGER.debug("Searching %s for suitable launcher to link", cmd.global_dir)
for p2 in cmd.global_dir.glob("*.exe"):
try:
with open(p2, 'rb') as f:
existing_bytes2 = f.read(len(launcher_bytes) + 1)
except OSError:
LOGGER.debug("Failed to check %s contents", p2, exc_info=True)
else:
if existing_bytes2 == launcher_bytes:
launcher2 = p2
break
else:
LOGGER.debug("No existing launcher available")
except Exception:
LOGGER.debug("Failed to find existing launcher", exc_info=True)

if launcher2 and launcher2.is_file():
# We know that the target either doesn't exist or needs replacing
unlink(p)
try:
_link(launcher2, p)
existing_bytes = launcher_bytes
launcher_remap[launcher.name] = launcher2
LOGGER.debug("Created %s as hard link to %s", p.name, launcher2.name)
except FileNotFoundError:
raise
except OSError:
LOGGER.debug("Failed to create hard link to %s", launcher2.name)
launcher2 = None

# Recheck - existing_bytes will have been updated if we successfully linked
if existing_bytes != launcher_bytes:
unlink(p)
try:
p.write_bytes(launcher_bytes)
Expand All @@ -148,43 +195,6 @@ def _create_alias(
except OSError:
LOGGER.error("Failed to create global command %s.", name)
LOGGER.debug("TRACEBACK", exc_info=True)
elif existing_bytes == launcher_bytes:
# Valid existing launcher, so save its path in case we need it later
# for a hard link.
launcher_remap.setdefault(launcher.name, p)
else:
# Links are allowed and we need to create one, so try to make a link,
# falling back to a link to another existing alias (that we've checked
# already during this run), and then falling back to a copy.
# This handles the case where our links are on a different volume to the
# install (so hard links don't work), but limits us to only a single
# copy (each) of the redirector(s), thus saving space.
unlink(p)
try:
_link(launcher, p)
LOGGER.debug("Created %s as hard link to %s", p.name, launcher.name)
except OSError as ex:
if ex.winerror != 17:
# Report errors other than cross-drive links
LOGGER.debug("Failed to create hard link for command.", exc_info=True)
launcher2 = launcher_remap.get(launcher.name)
if launcher2:
try:
_link(launcher2, p)
LOGGER.debug("Created %s as hard link to %s", p.name, launcher2.name)
except FileNotFoundError:
raise
except OSError:
LOGGER.debug("Failed to create hard link to fallback launcher")
launcher2 = None
if not launcher2:
try:
p.write_bytes(launcher_bytes)
LOGGER.debug("Created %s as copy of %s", p.name, launcher.name)
launcher_remap[launcher.name] = p
except OSError:
LOGGER.error("Failed to create global command %s.", name)
LOGGER.debug("TRACEBACK", exc_info=True)

p_target = p.with_name(p.name + ".__target__")
do_update = True
Expand Down Expand Up @@ -252,13 +262,14 @@ def _readlines(path):
return


def _scan_one(install, root):
def _scan_one(cmd, install, root):
# Scan d for dist-info directories with entry_points.txt
dist_info = [d for d in root.glob("*.dist-info") if d.is_dir()]
entrypoints = [f for f in [d / "entry_points.txt" for d in dist_info] if f.is_file()]
if len(entrypoints):
LOGGER.debug("Found %i entry_points.txt files in %i dist-info in %s",
len(entrypoints), len(dist_info), root)
len(entrypoints), len(dist_info),
relative_to(root, getattr(cmd, "install_dir", None)))

# Filter down to [console_scripts] and [gui_scripts]
for ep in entrypoints:
Expand All @@ -277,10 +288,10 @@ def _scan_one(install, root):
mod=mod, func=func, **alias)


def _scan(install, prefix, dirs):
def _scan(cmd, install, prefix, dirs):
for dirname in dirs or ():
root = prefix / dirname
yield from _scan_one(install, root)
yield from _scan_one(cmd, install, root)


def calculate_aliases(cmd, install, *, _scan=_scan):
Expand Down Expand Up @@ -322,7 +333,7 @@ def calculate_aliases(cmd, install, *, _scan=_scan):
site_dirs = s.get("dirs", ())
break

for ai in _scan(install, prefix, site_dirs):
for ai in _scan(cmd, install, prefix, site_dirs):
if ai.windowed and default_alias_w:
yield ai.replace(target=default_alias_w.target)
elif not ai.windowed and default_alias:
Expand Down
43 changes: 34 additions & 9 deletions src/manage/pathutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
import os


def _eq(x, y):
return x == y or x.casefold() == y.casefold()


class PurePath:
def __init__(self, *parts):
total = ""
for p in parts:
try:
p = p.__fspath__().replace("/", "\\")
except AttributeError:
p = str(p).replace("/", "\\")
p = os.fsdecode(p).replace("/", "\\")
p = p.replace("\\\\", "\\")
if p == ".":
continue
Expand All @@ -24,8 +28,11 @@ def __init__(self, *parts):
total += "\\" + p
else:
total += p
self._parent, _, self.name = total.rpartition("\\")
self._p = total.rstrip("\\")
drive, root, tail = os.path.splitroot(total)
parent, _, name = tail.rpartition("\\")
self._parent = drive + root + parent
self.name = name
self._p = drive + root + tail.rstrip("\\")

def __fspath__(self):
return self._p
Expand All @@ -36,6 +43,9 @@ def __repr__(self):
def __str__(self):
return self._p

def __bytes__(self):
return os.fsencode(self)

def __hash__(self):
return hash(self._p.casefold())

Expand Down Expand Up @@ -86,13 +96,13 @@ def __truediv__(self, other):

def __eq__(self, other):
if isinstance(other, PurePath):
return self._p.casefold() == other._p.casefold()
return self._p.casefold() == str(other).casefold()
return _eq(self._p, other._p)
return _eq(self._p, str(other))

def __ne__(self, other):
if isinstance(other, PurePath):
return self._p.casefold() != other._p.casefold()
return self._p.casefold() != str(other).casefold()
return not _eq(self._p, other._p)
return not _eq(self._p, str(other))

def with_name(self, name):
return type(self)(os.path.join(self._parent, name))
Expand All @@ -105,7 +115,7 @@ def with_suffix(self, suffix):
def relative_to(self, base):
base = PurePath(base).parts
parts = self.parts
if not all(x.casefold() == y.casefold() for x, y in zip(base, parts)):
if not all(_eq(x, y) for x, y in zip(base, parts)):
raise ValueError("path not relative to base")
return type(self)("\\".join(parts[len(base):]))

Expand All @@ -128,7 +138,7 @@ def match(self, pattern, full_match=False):
m = m.casefold()

if "*" not in p:
return m.casefold() == p
return m == p or m.casefold() == p

must_start_with = True
for bit in p.split("*"):
Expand Down Expand Up @@ -219,3 +229,18 @@ def write_bytes(self, data):
def write_text(self, text, encoding="utf-8", errors="strict"):
with open(self._p, "w", encoding=encoding, errors=errors) as f:
f.write(text)


def relative_to(path, root):
if not root:
return path
parts_1 = list(PurePath(path).parts)
parts_2 = list(PurePath(root).parts)
while parts_1 and parts_2 and _eq(parts_1[0], parts_2[0]):
parts_1.pop(0)
parts_2.pop(0)
if parts_1 and not parts_2:
if isinstance(path, PurePath):
return type(path)(*parts_1)
return type(path)(PurePath(*parts_1))
return path
4 changes: 2 additions & 2 deletions src/manage/startutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from .fsutils import rmtree, unlink
from .logging import LOGGER
from .pathutils import Path
from .pathutils import Path, relative_to
from .tagutils import install_matches_any


Expand Down Expand Up @@ -36,7 +36,7 @@ def _make(root, prefix, item, allow_warn=True):

lnk = root / (n + ".lnk")
target = _unprefix(item["Target"], prefix)
LOGGER.debug("Creating shortcut %s to %s", lnk, target)
LOGGER.debug("Creating shortcut %s to %s", relative_to(lnk, root), target)
try:
lnk.relative_to(root)
except ValueError:
Expand Down
18 changes: 8 additions & 10 deletions tests/test_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Cmd:
launcher_exe = "launcher.txt"
launcherw_exe = "launcherw.txt"
default_platform = "-64"
force = False

def __init__(self, platform=None):
self.scratch = {}
Expand Down Expand Up @@ -128,10 +129,7 @@ def test_write_alias_launcher_missing(fake_config, assert_log, tmp_path):
target=tmp_path / "target.exe",
)
assert_log(
"Checking for launcher.*",
"Checking for launcher.*",
"Checking for launcher.*",
"Create %s linking to %s",
"Create %s for %s using %s, chosen by %s",
assert_log.end_of_log(),
)

Expand Down Expand Up @@ -160,7 +158,7 @@ def read_bytes():
target=tmp_path / "target.exe",
)
assert_log(
"Create %s linking to %s",
"Create %s for %s",
"Failed to read launcher template at %s\\.",
"Failed to read %s",
assert_log.end_of_log(),
Expand All @@ -183,8 +181,9 @@ def fake_link(x, y):
_link=fake_link
)
assert_log(
"Create %s linking to %s",
"Failed to create hard link.+",
"Create %s for %s",
"Searching %s for suitable launcher to link",
"No existing launcher available",
"Created %s as copy of %s",
assert_log.end_of_log(),
)
Expand Down Expand Up @@ -217,8 +216,7 @@ def fake_link(x, y):
_link=fake_link
)
assert_log(
"Create %s linking to %s",
"Failed to create hard link.+",
"Create %s for %s",
("Created %s as hard link to %s", ("test.exe", "actual_launcher.txt")),
assert_log.end_of_log(),
)
Expand All @@ -240,7 +238,7 @@ def test_write_alias_launcher_no_linking(fake_config, assert_log, tmp_path):
_link=None
)
assert_log(
"Create %s linking to %s",
"Create %s for %s",
("Created %s as copy of %s", ("test.exe", "launcher.txt")),
assert_log.end_of_log(),
)
Expand Down
21 changes: 20 additions & 1 deletion tests/test_pathutils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from manage.pathutils import Path, PurePath
from manage.pathutils import Path, PurePath, relative_to

def test_path_match():
p = Path("python3.12.exe")
Expand Down Expand Up @@ -30,3 +30,22 @@ def test_path_stem():
p = Path(".exe")
assert p.stem == ""
assert p.suffix == ".exe"


def test_path_relative_to():
p = Path(r"C:\A\B\C\python.exe")
actual = relative_to(p, r"C:\A\B\C")
assert isinstance(actual, Path)
assert str(actual) == "python.exe"
actual = relative_to(p, "C:\\")
assert isinstance(actual, Path)
assert str(actual) == r"A\B\C\python.exe"
actual = relative_to(str(p), r"C:\A\B")
assert isinstance(actual, str)
assert actual == r"C\python.exe"
actual = relative_to(bytes(p), r"C:\A\B")
assert isinstance(actual, bytes)
assert actual == rb"C\python.exe"

assert relative_to(p, r"C:\A\B\C\D") is p
assert relative_to(p, None) is p