feat: enhance symlink detection and cache invalidation

- Add `_entry_is_symlink` method to detect symlinks and Windows junctions
- Include first-level symlinks in fingerprint for better cache invalidation
- Re-enable preview path validation for security
- Update tests to verify retargeted symlinks trigger rescan
This commit is contained in:
Will Miao
2026-01-25 19:14:16 +08:00
parent 6142b3dc0c
commit c9e305397c
4 changed files with 117 additions and 43 deletions

View File

@@ -223,6 +223,20 @@ class Config:
logger.error(f"Error checking link status for {path}: {e}")
return False
def _entry_is_symlink(self, entry: os.DirEntry) -> bool:
"""Check if a directory entry is a symlink, including Windows junctions."""
if entry.is_symlink():
return True
if platform.system() == 'Windows':
try:
import ctypes
FILE_ATTRIBUTE_REPARSE_POINT = 0x400
attrs = ctypes.windll.kernel32.GetFileAttributesW(entry.path)
return attrs != -1 and (attrs & FILE_ATTRIBUTE_REPARSE_POINT)
except Exception:
pass
return False
def _normalize_path(self, path: str) -> str:
return os.path.normpath(path).replace(os.sep, '/')
@@ -241,8 +255,32 @@ class Config:
def _build_symlink_fingerprint(self) -> Dict[str, object]:
roots = [self._normalize_path(path) for path in self._symlink_roots() if path]
unique_roots = sorted(set(roots))
# Fingerprint now only contains the root paths to avoid sensitivity to folder content changes.
return {"roots": unique_roots}
# Include first-level symlinks in fingerprint for change detection.
# This ensures new symlinks under roots trigger a cache invalidation.
# Use lists (not tuples) for JSON serialization compatibility.
direct_symlinks: List[List[str]] = []
for root in unique_roots:
try:
if os.path.isdir(root):
with os.scandir(root) as it:
for entry in it:
if self._entry_is_symlink(entry):
try:
target = os.path.realpath(entry.path)
direct_symlinks.append([
self._normalize_path(entry.path),
self._normalize_path(target)
])
except OSError:
pass
except (OSError, PermissionError):
pass
return {
"roots": unique_roots,
"direct_symlinks": sorted(direct_symlinks)
}
def _initialize_symlink_mappings(self) -> None:
start = time.perf_counter()
@@ -362,10 +400,9 @@ class Config:
with os.scandir(current_display) as it:
for entry in it:
try:
# 1. High speed detection using dirent data (is_symlink)
is_link = entry.is_symlink()
# On Windows, is_symlink handles reparse points
# 1. Detect symlinks including Windows junctions
is_link = self._entry_is_symlink(entry)
if is_link:
# Only resolve realpath when we actually find a link
target_path = os.path.realpath(entry.path)

View File

@@ -41,10 +41,8 @@ class PreviewHandler:
raise web.HTTPBadRequest(text="Unable to resolve preview path") from exc
resolved_str = str(resolved)
# TODO: Temporarily disabled path validation due to issues #772 and #774
# Re-enable after fixing preview root path handling
# if not self._config.is_preview_path_allowed(resolved_str):
# raise web.HTTPForbidden(text="Preview path is not within an allowed directory")
if not self._config.is_preview_path_allowed(resolved_str):
raise web.HTTPForbidden(text="Preview path is not within an allowed directory")
if not resolved.is_file():
logger.debug("Preview file not found at %s", resolved_str)

View File

@@ -112,7 +112,8 @@ def test_symlink_cache_survives_noise_mtime(monkeypatch: pytest.MonkeyPatch, tmp
assert second_cfg.map_path_to_link(str(target_dir)) == _normalize(str(dir_link))
def test_manual_rescan_refreshes_cache(monkeypatch: pytest.MonkeyPatch, tmp_path):
def test_retargeted_symlink_triggers_rescan(monkeypatch: pytest.MonkeyPatch, tmp_path):
"""Changing a symlink's target should trigger automatic cache invalidation."""
loras_dir, _ = _setup_paths(monkeypatch, tmp_path)
target_dir = loras_dir / "target"
@@ -122,22 +123,16 @@ def test_manual_rescan_refreshes_cache(monkeypatch: pytest.MonkeyPatch, tmp_path
# Build initial cache pointing at the first target
first_cfg = config_module.Config()
old_real = _normalize(os.path.realpath(target_dir))
assert first_cfg.map_path_to_link(str(target_dir)) == _normalize(str(dir_link))
# Retarget the symlink to a new directory without touching the cache file
# Retarget the symlink to a new directory
new_target = loras_dir / "target_v2"
new_target.mkdir()
dir_link.unlink()
dir_link.symlink_to(new_target, target_is_directory=True)
# Second config should automatically detect the change and rescan
second_cfg = config_module.Config()
# Cache still point at the old real path immediately after load
assert second_cfg.map_path_to_link(str(new_target)) == _normalize(str(new_target))
# Manual rescan should refresh the mapping to the new target
second_cfg.rebuild_symlink_cache()
new_real = _normalize(os.path.realpath(new_target))
assert second_cfg._path_mappings.get(new_real) == _normalize(str(dir_link))
assert second_cfg.map_path_to_link(str(new_target)) == _normalize(str(dir_link))
@@ -177,3 +172,48 @@ def test_symlink_roots_are_preserved(monkeypatch: pytest.MonkeyPatch, tmp_path):
cache_path = settings_dir / "cache" / "symlink_map.json"
payload = json.loads(cache_path.read_text(encoding="utf-8"))
assert payload["path_mappings"][normalized_real] == normalized_link
def test_symlink_subfolder_to_external_location(monkeypatch: pytest.MonkeyPatch, tmp_path):
"""Symlink under root pointing outside root should be detected and allowed."""
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
# Create external directory (outside loras_dir)
external_dir = tmp_path / "external_models"
external_dir.mkdir()
preview_file = external_dir / "model.preview.png"
preview_file.write_bytes(b"preview")
# Create symlink under loras_dir pointing to external location
symlink = loras_dir / "characters"
symlink.symlink_to(external_dir, target_is_directory=True)
cfg = config_module.Config()
# Verify symlink was detected
normalized_external = _normalize(str(external_dir))
normalized_link = _normalize(str(symlink))
assert cfg._path_mappings[normalized_external] == normalized_link
# Verify preview path is allowed
assert cfg.is_preview_path_allowed(str(preview_file))
def test_new_symlink_triggers_rescan(monkeypatch: pytest.MonkeyPatch, tmp_path):
"""Adding a new symlink should trigger cache invalidation."""
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
# Initial scan with no symlinks
first_cfg = config_module.Config()
assert len(first_cfg._path_mappings) == 0
# Create a symlink after initial cache
external_dir = tmp_path / "external"
external_dir.mkdir()
symlink = loras_dir / "new_link"
symlink.symlink_to(external_dir, target_is_directory=True)
# Second config should detect the change and rescan
second_cfg = config_module.Config()
normalized_external = _normalize(str(external_dir))
assert normalized_external in second_cfg._path_mappings

View File

@@ -39,33 +39,32 @@ async def test_preview_handler_serves_preview_from_active_library(tmp_path):
assert response.status == 200
assert Path(response._path) == preview_file
# TODO: disable temporarily. Enable this once the symlink scan bug fixed
# async def test_preview_handler_forbids_paths_outside_active_library(tmp_path):
# allowed_root = tmp_path / "allowed"
# allowed_root.mkdir()
# forbidden_root = tmp_path / "forbidden"
# forbidden_root.mkdir()
# forbidden_file = forbidden_root / "sneaky.webp"
# forbidden_file.write_bytes(b"x")
async def test_preview_handler_forbids_paths_outside_active_library(tmp_path):
allowed_root = tmp_path / "allowed"
allowed_root.mkdir()
forbidden_root = tmp_path / "forbidden"
forbidden_root.mkdir()
forbidden_file = forbidden_root / "sneaky.webp"
forbidden_file.write_bytes(b"x")
# config = Config()
# config.apply_library_settings(
# {
# "folder_paths": {
# "loras": [str(allowed_root)],
# "checkpoints": [],
# "unet": [],
# "embeddings": [],
# }
# }
# )
config = Config()
config.apply_library_settings(
{
"folder_paths": {
"loras": [str(allowed_root)],
"checkpoints": [],
"unet": [],
"embeddings": [],
}
}
)
# handler = PreviewHandler(config=config)
# encoded_path = urllib.parse.quote(str(forbidden_file), safe="")
# request = make_mocked_request("GET", f"/api/lm/previews?path={encoded_path}")
handler = PreviewHandler(config=config)
encoded_path = urllib.parse.quote(str(forbidden_file), safe="")
request = make_mocked_request("GET", f"/api/lm/previews?path={encoded_path}")
# with pytest.raises(web.HTTPForbidden):
# await handler.serve_preview(request)
with pytest.raises(web.HTTPForbidden):
await handler.serve_preview(request)
async def test_config_updates_preview_roots_after_switch(tmp_path):