mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
refactor: simplify symlink cache invalidation by removing background rescan and noise_mtime in favor of a root-path-only fingerprint.
This commit is contained in:
199
py/config.py
199
py/config.py
@@ -82,8 +82,8 @@ class Config:
|
|||||||
self._path_mappings: Dict[str, str] = {}
|
self._path_mappings: Dict[str, str] = {}
|
||||||
# Normalized preview root directories used to validate preview access
|
# Normalized preview root directories used to validate preview access
|
||||||
self._preview_root_paths: Set[Path] = set()
|
self._preview_root_paths: Set[Path] = set()
|
||||||
# Optional background rescan thread
|
# Fingerprint of the symlink layout from the last successful scan
|
||||||
self._rescan_thread: Optional[threading.Thread] = None
|
self._cached_fingerprint: Optional[Dict[str, object]] = None
|
||||||
self.loras_roots = self._init_lora_paths()
|
self.loras_roots = self._init_lora_paths()
|
||||||
self.checkpoints_roots = None
|
self.checkpoints_roots = None
|
||||||
self.unet_roots = None
|
self.unet_roots = None
|
||||||
@@ -231,32 +231,6 @@ class Config:
|
|||||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||||
return cache_dir / "symlink_map.json"
|
return cache_dir / "symlink_map.json"
|
||||||
|
|
||||||
def _compute_noise_mtime(self, root: str) -> Optional[int]:
|
|
||||||
"""Return the latest mtime of known noisy paths inside ``root``."""
|
|
||||||
|
|
||||||
normalized_root = self._normalize_path(root)
|
|
||||||
noise_paths: List[str] = []
|
|
||||||
|
|
||||||
# The first LoRA root hosts recipes and stats files which routinely
|
|
||||||
# update without changing symlink layout.
|
|
||||||
first_lora_root = self._normalize_path(self.loras_roots[0]) if self.loras_roots else None
|
|
||||||
if first_lora_root and normalized_root == first_lora_root:
|
|
||||||
recipes_dir = os.path.join(root, "recipes")
|
|
||||||
stats_file = os.path.join(root, "lora_manager_stats.json")
|
|
||||||
noise_paths.extend([recipes_dir, stats_file])
|
|
||||||
|
|
||||||
mtimes: List[int] = []
|
|
||||||
for path in noise_paths:
|
|
||||||
try:
|
|
||||||
stat_result = os.stat(path)
|
|
||||||
mtimes.append(getattr(stat_result, "st_mtime_ns", int(stat_result.st_mtime * 1e9)))
|
|
||||||
except OSError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not mtimes:
|
|
||||||
return None
|
|
||||||
return max(mtimes)
|
|
||||||
|
|
||||||
def _symlink_roots(self) -> List[str]:
|
def _symlink_roots(self) -> List[str]:
|
||||||
roots: List[str] = []
|
roots: List[str] = []
|
||||||
roots.extend(self.loras_roots or [])
|
roots.extend(self.loras_roots or [])
|
||||||
@@ -267,26 +241,46 @@ class Config:
|
|||||||
def _build_symlink_fingerprint(self) -> Dict[str, object]:
|
def _build_symlink_fingerprint(self) -> Dict[str, object]:
|
||||||
roots = [self._normalize_path(path) for path in self._symlink_roots() if path]
|
roots = [self._normalize_path(path) for path in self._symlink_roots() if path]
|
||||||
unique_roots = sorted(set(roots))
|
unique_roots = sorted(set(roots))
|
||||||
|
# Fingerprint now only contains the root paths to avoid sensitivity to folder content changes.
|
||||||
|
return {"roots": unique_roots}
|
||||||
|
|
||||||
stats: Dict[str, Dict[str, int]] = {}
|
def _initialize_symlink_mappings(self) -> None:
|
||||||
for root in unique_roots:
|
start = time.perf_counter()
|
||||||
try:
|
cache_loaded = self._load_persisted_cache_into_mappings()
|
||||||
root_stat = os.stat(root)
|
|
||||||
noise_mtime = self._compute_noise_mtime(root)
|
|
||||||
stats[root] = {
|
|
||||||
"mtime_ns": getattr(root_stat, "st_mtime_ns", int(root_stat.st_mtime * 1e9)),
|
|
||||||
"inode": getattr(root_stat, "st_ino", 0),
|
|
||||||
"noise_mtime_ns": noise_mtime,
|
|
||||||
}
|
|
||||||
except OSError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
return {"roots": unique_roots, "stats": stats}
|
if cache_loaded:
|
||||||
|
logger.info(
|
||||||
|
"Symlink mappings restored from cache in %.2f ms",
|
||||||
|
(time.perf_counter() - start) * 1000,
|
||||||
|
)
|
||||||
|
self._rebuild_preview_roots()
|
||||||
|
|
||||||
def _load_symlink_cache(self) -> bool:
|
# Only rescan if target roots have changed.
|
||||||
|
# This is stable across file additions/deletions.
|
||||||
|
current_fingerprint = self._build_symlink_fingerprint()
|
||||||
|
cached_fingerprint = self._cached_fingerprint
|
||||||
|
|
||||||
|
if cached_fingerprint and current_fingerprint == cached_fingerprint:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("Symlink root paths changed; rescanning symbolic links")
|
||||||
|
|
||||||
|
self.rebuild_symlink_cache()
|
||||||
|
logger.info(
|
||||||
|
"Symlink mappings rebuilt and cached in %.2f ms",
|
||||||
|
(time.perf_counter() - start) * 1000,
|
||||||
|
)
|
||||||
|
|
||||||
|
def rebuild_symlink_cache(self) -> None:
|
||||||
|
"""Force a fresh scan of all symbolic links and update the persistent cache."""
|
||||||
|
self._scan_symbolic_links()
|
||||||
|
self._save_symlink_cache()
|
||||||
|
self._rebuild_preview_roots()
|
||||||
|
|
||||||
|
def _load_persisted_cache_into_mappings(self) -> bool:
|
||||||
|
"""Load the symlink cache and store its fingerprint for comparison."""
|
||||||
cache_path = self._get_symlink_cache_path()
|
cache_path = self._get_symlink_cache_path()
|
||||||
if not cache_path.exists():
|
if not cache_path.exists():
|
||||||
logger.info("Symlink cache not found at %s", cache_path)
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -297,14 +291,15 @@ class Config:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
if not isinstance(payload, dict):
|
if not isinstance(payload, dict):
|
||||||
logger.info("Symlink cache payload is not a dict: %s", type(payload))
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
cached_mappings = payload.get("path_mappings")
|
cached_mappings = payload.get("path_mappings")
|
||||||
if not isinstance(cached_mappings, Mapping):
|
if not isinstance(cached_mappings, Mapping):
|
||||||
logger.info("Symlink cache missing path mappings")
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# Store the cached fingerprint for comparison during initialization
|
||||||
|
self._cached_fingerprint = payload.get("fingerprint")
|
||||||
|
|
||||||
normalized_mappings: Dict[str, str] = {}
|
normalized_mappings: Dict[str, str] = {}
|
||||||
for target, link in cached_mappings.items():
|
for target, link in cached_mappings.items():
|
||||||
if not isinstance(target, str) or not isinstance(link, str):
|
if not isinstance(target, str) or not isinstance(link, str):
|
||||||
@@ -329,30 +324,10 @@ class Config:
|
|||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.info("Failed to write symlink cache %s: %s", cache_path, exc)
|
logger.info("Failed to write symlink cache %s: %s", cache_path, exc)
|
||||||
|
|
||||||
def _initialize_symlink_mappings(self) -> None:
|
|
||||||
start = time.perf_counter()
|
|
||||||
cache_loaded = self._load_symlink_cache()
|
|
||||||
|
|
||||||
if cache_loaded:
|
|
||||||
logger.info(
|
|
||||||
"Symlink mappings restored from cache in %.2f ms",
|
|
||||||
(time.perf_counter() - start) * 1000,
|
|
||||||
)
|
|
||||||
self._rebuild_preview_roots()
|
|
||||||
self._schedule_symlink_rescan()
|
|
||||||
return
|
|
||||||
|
|
||||||
self._scan_symbolic_links()
|
|
||||||
self._save_symlink_cache()
|
|
||||||
self._rebuild_preview_roots()
|
|
||||||
logger.info(
|
|
||||||
"Symlink mappings rebuilt and cached in %.2f ms",
|
|
||||||
(time.perf_counter() - start) * 1000,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _scan_symbolic_links(self):
|
def _scan_symbolic_links(self):
|
||||||
"""Scan all symbolic links in LoRA, Checkpoint, and Embedding root directories"""
|
"""Scan all symbolic links in LoRA, Checkpoint, and Embedding root directories"""
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
|
|
||||||
# Reset mappings before rescanning to avoid stale entries
|
# Reset mappings before rescanning to avoid stale entries
|
||||||
self._path_mappings.clear()
|
self._path_mappings.clear()
|
||||||
self._seed_root_symlink_mappings()
|
self._seed_root_symlink_mappings()
|
||||||
@@ -365,39 +340,11 @@ class Config:
|
|||||||
len(self._path_mappings),
|
len(self._path_mappings),
|
||||||
)
|
)
|
||||||
|
|
||||||
def _schedule_symlink_rescan(self) -> None:
|
|
||||||
"""Trigger a best-effort background rescan to refresh stale caches."""
|
|
||||||
|
|
||||||
if self._rescan_thread and self._rescan_thread.is_alive():
|
|
||||||
return
|
|
||||||
|
|
||||||
def worker():
|
|
||||||
try:
|
|
||||||
self._scan_symbolic_links()
|
|
||||||
self._save_symlink_cache()
|
|
||||||
self._rebuild_preview_roots()
|
|
||||||
logger.debug("Background symlink rescan completed")
|
|
||||||
except Exception as exc: # pragma: no cover - defensive logging
|
|
||||||
logger.info("Background symlink rescan failed: %s", exc)
|
|
||||||
|
|
||||||
thread = threading.Thread(
|
|
||||||
target=worker,
|
|
||||||
name="lora-manager-symlink-rescan",
|
|
||||||
daemon=True,
|
|
||||||
)
|
|
||||||
self._rescan_thread = thread
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
def _wait_for_rescan(self, timeout: Optional[float] = None) -> None:
|
|
||||||
"""Block until the background rescan completes (testing convenience)."""
|
|
||||||
|
|
||||||
thread = self._rescan_thread
|
|
||||||
if thread:
|
|
||||||
thread.join(timeout=timeout)
|
|
||||||
|
|
||||||
def _scan_directory_links(self, root: str, visited_dirs: Set[str]):
|
def _scan_directory_links(self, root: str, visited_dirs: Set[str]):
|
||||||
"""Iteratively scan directory symlinks to avoid deep recursion."""
|
"""Iteratively scan directory symlinks to avoid deep recursion."""
|
||||||
try:
|
try:
|
||||||
|
# Note: We only use realpath for the initial root if it's not already resolved
|
||||||
|
# to ensure we have a valid entry point.
|
||||||
root_real = self._normalize_path(os.path.realpath(root))
|
root_real = self._normalize_path(os.path.realpath(root))
|
||||||
except OSError:
|
except OSError:
|
||||||
root_real = self._normalize_path(root)
|
root_real = self._normalize_path(root)
|
||||||
@@ -406,45 +353,57 @@ class Config:
|
|||||||
return
|
return
|
||||||
|
|
||||||
visited_dirs.add(root_real)
|
visited_dirs.add(root_real)
|
||||||
stack: List[str] = [root]
|
# Stack entries: (display_path, real_resolved_path)
|
||||||
|
stack: List[Tuple[str, str]] = [(root, root_real)]
|
||||||
|
|
||||||
while stack:
|
while stack:
|
||||||
current = stack.pop()
|
current_display, current_real = stack.pop()
|
||||||
try:
|
try:
|
||||||
with os.scandir(current) as it:
|
with os.scandir(current_display) as it:
|
||||||
for entry in it:
|
for entry in it:
|
||||||
try:
|
try:
|
||||||
entry_path = entry.path
|
# 1. High speed detection using dirent data (is_symlink)
|
||||||
if self._is_link(entry_path):
|
is_link = entry.is_symlink()
|
||||||
target_path = os.path.realpath(entry_path)
|
|
||||||
|
# On Windows, is_symlink handles reparse points
|
||||||
|
if is_link:
|
||||||
|
# Only resolve realpath when we actually find a link
|
||||||
|
target_path = os.path.realpath(entry.path)
|
||||||
if not os.path.isdir(target_path):
|
if not os.path.isdir(target_path):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
normalized_target = self._normalize_path(target_path)
|
normalized_target = self._normalize_path(target_path)
|
||||||
# Always record the mapping even if we already visited
|
self.add_path_mapping(entry.path, target_path)
|
||||||
# the real directory via another path. This prevents the
|
|
||||||
# traversal order from dropping valid link->target pairs.
|
|
||||||
self.add_path_mapping(entry_path, target_path)
|
|
||||||
if normalized_target in visited_dirs:
|
if normalized_target in visited_dirs:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
visited_dirs.add(normalized_target)
|
visited_dirs.add(normalized_target)
|
||||||
stack.append(target_path)
|
stack.append((target_path, normalized_target))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# 2. Process normal directories
|
||||||
if not entry.is_dir(follow_symlinks=False):
|
if not entry.is_dir(follow_symlinks=False):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
normalized_real = self._normalize_path(os.path.realpath(entry_path))
|
# For normal directories, we avoid realpath() call by
|
||||||
if normalized_real in visited_dirs:
|
# incrementally building the real path relative to current_real.
|
||||||
|
# This is safe because 'entry' is NOT a symlink.
|
||||||
|
entry_real = self._normalize_path(os.path.join(current_real, entry.name))
|
||||||
|
|
||||||
|
if entry_real in visited_dirs:
|
||||||
continue
|
continue
|
||||||
visited_dirs.add(normalized_real)
|
|
||||||
stack.append(entry_path)
|
visited_dirs.add(entry_real)
|
||||||
|
stack.append((entry.path, entry_real))
|
||||||
except Exception as inner_exc:
|
except Exception as inner_exc:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Error processing directory entry %s: %s", entry.path, inner_exc
|
"Error processing directory entry %s: %s", entry.path, inner_exc
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error scanning links in {current}: {e}")
|
logger.error(f"Error scanning links in {current_display}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def add_path_mapping(self, link_path: str, target_path: str):
|
def add_path_mapping(self, link_path: str, target_path: str):
|
||||||
"""Add a symbolic link path mapping
|
"""Add a symbolic link path mapping
|
||||||
@@ -537,7 +496,11 @@ class Config:
|
|||||||
normalized_path = os.path.normpath(path).replace(os.sep, '/')
|
normalized_path = os.path.normpath(path).replace(os.sep, '/')
|
||||||
# Check if the path is contained in any mapped target path
|
# Check if the path is contained in any mapped target path
|
||||||
for target_path, link_path in self._path_mappings.items():
|
for target_path, link_path in self._path_mappings.items():
|
||||||
if normalized_path.startswith(target_path):
|
# Match whole path components to avoid prefix collisions (e.g., /a/b vs /a/bc)
|
||||||
|
if normalized_path == target_path:
|
||||||
|
return link_path
|
||||||
|
|
||||||
|
if normalized_path.startswith(target_path + '/'):
|
||||||
# If the path starts with the target path, replace with link path
|
# If the path starts with the target path, replace with link path
|
||||||
mapped_path = normalized_path.replace(target_path, link_path, 1)
|
mapped_path = normalized_path.replace(target_path, link_path, 1)
|
||||||
return mapped_path
|
return mapped_path
|
||||||
@@ -547,10 +510,14 @@ class Config:
|
|||||||
"""Map a symbolic link path back to the actual path"""
|
"""Map a symbolic link path back to the actual path"""
|
||||||
normalized_link = os.path.normpath(link_path).replace(os.sep, '/')
|
normalized_link = os.path.normpath(link_path).replace(os.sep, '/')
|
||||||
# Check if the path is contained in any mapped target path
|
# Check if the path is contained in any mapped target path
|
||||||
for target_path, link_path in self._path_mappings.items():
|
for target_path, link_path_mapped in self._path_mappings.items():
|
||||||
if normalized_link.startswith(link_path):
|
# Match whole path components
|
||||||
|
if normalized_link == link_path_mapped:
|
||||||
|
return target_path
|
||||||
|
|
||||||
|
if normalized_link.startswith(link_path_mapped + '/'):
|
||||||
# If the path starts with the link path, replace with actual path
|
# If the path starts with the link path, replace with actual path
|
||||||
mapped_path = normalized_link.replace(link_path, target_path, 1)
|
mapped_path = normalized_link.replace(link_path_mapped, target_path, 1)
|
||||||
return mapped_path
|
return mapped_path
|
||||||
return link_path
|
return link_path
|
||||||
|
|
||||||
|
|||||||
@@ -654,6 +654,11 @@ class ModelScanner:
|
|||||||
self._is_initializing = True # Set flag
|
self._is_initializing = True # Set flag
|
||||||
try:
|
try:
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
|
# Manually trigger a symlink rescan during a full rebuild.
|
||||||
|
# This ensures that any new symlink mappings are correctly picked up.
|
||||||
|
config.rebuild_symlink_cache()
|
||||||
|
|
||||||
# Determine the page type based on model type
|
# Determine the page type based on model type
|
||||||
# Scan for new data
|
# Scan for new data
|
||||||
scan_result = await self._gather_model_data()
|
scan_result = await self._gather_model_data()
|
||||||
|
|||||||
@@ -63,7 +63,6 @@ def test_symlink_scan_skips_file_links(monkeypatch: pytest.MonkeyPatch, tmp_path
|
|||||||
|
|
||||||
def test_symlink_cache_reuses_previous_scan(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
def test_symlink_cache_reuses_previous_scan(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||||
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
|
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
|
||||||
monkeypatch.setattr(config_module.Config, "_schedule_symlink_rescan", lambda self: None)
|
|
||||||
|
|
||||||
target_dir = loras_dir / "target"
|
target_dir = loras_dir / "target"
|
||||||
target_dir.mkdir()
|
target_dir.mkdir()
|
||||||
@@ -87,7 +86,6 @@ def test_symlink_cache_reuses_previous_scan(monkeypatch: pytest.MonkeyPatch, tmp
|
|||||||
|
|
||||||
def test_symlink_cache_survives_noise_mtime(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
def test_symlink_cache_survives_noise_mtime(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||||
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
|
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
|
||||||
monkeypatch.setattr(config_module.Config, "_schedule_symlink_rescan", lambda self: None)
|
|
||||||
|
|
||||||
target_dir = loras_dir / "target"
|
target_dir = loras_dir / "target"
|
||||||
target_dir.mkdir()
|
target_dir.mkdir()
|
||||||
@@ -114,7 +112,7 @@ def test_symlink_cache_survives_noise_mtime(monkeypatch: pytest.MonkeyPatch, tmp
|
|||||||
assert second_cfg.map_path_to_link(str(target_dir)) == _normalize(str(dir_link))
|
assert second_cfg.map_path_to_link(str(target_dir)) == _normalize(str(dir_link))
|
||||||
|
|
||||||
|
|
||||||
def test_background_rescan_refreshes_cache(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
def test_manual_rescan_refreshes_cache(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||||
loras_dir, _ = _setup_paths(monkeypatch, tmp_path)
|
loras_dir, _ = _setup_paths(monkeypatch, tmp_path)
|
||||||
|
|
||||||
target_dir = loras_dir / "target"
|
target_dir = loras_dir / "target"
|
||||||
@@ -135,12 +133,11 @@ def test_background_rescan_refreshes_cache(monkeypatch: pytest.MonkeyPatch, tmp_
|
|||||||
|
|
||||||
second_cfg = config_module.Config()
|
second_cfg = config_module.Config()
|
||||||
|
|
||||||
# Cache may still point at the old real path immediately after load
|
# Cache still point at the old real path immediately after load
|
||||||
initial_mapping = second_cfg.map_path_to_link(str(new_target))
|
assert second_cfg.map_path_to_link(str(new_target)) == _normalize(str(new_target))
|
||||||
assert initial_mapping in {str(new_target), _normalize(str(dir_link))}
|
|
||||||
|
|
||||||
# Background rescan should refresh the mapping to the new target and update the cache file
|
# Manual rescan should refresh the mapping to the new target
|
||||||
second_cfg._wait_for_rescan(timeout=2.0)
|
second_cfg.rebuild_symlink_cache()
|
||||||
new_real = _normalize(os.path.realpath(new_target))
|
new_real = _normalize(os.path.realpath(new_target))
|
||||||
assert second_cfg._path_mappings.get(new_real) == _normalize(str(dir_link))
|
assert second_cfg._path_mappings.get(new_real) == _normalize(str(dir_link))
|
||||||
assert second_cfg.map_path_to_link(str(new_target)) == _normalize(str(dir_link))
|
assert second_cfg.map_path_to_link(str(new_target)) == _normalize(str(dir_link))
|
||||||
@@ -170,7 +167,6 @@ def test_symlink_roots_are_preserved(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
|||||||
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths)
|
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths)
|
||||||
monkeypatch.setattr(config_module, "standalone_mode", True)
|
monkeypatch.setattr(config_module, "standalone_mode", True)
|
||||||
monkeypatch.setattr(config_module, "get_settings_dir", lambda create=True: str(settings_dir))
|
monkeypatch.setattr(config_module, "get_settings_dir", lambda create=True: str(settings_dir))
|
||||||
monkeypatch.setattr(config_module.Config, "_schedule_symlink_rescan", lambda self: None)
|
|
||||||
|
|
||||||
cfg = config_module.Config()
|
cfg = config_module.Config()
|
||||||
|
|
||||||
|
|||||||
128
tests/config/test_symlink_fingerprint.py
Normal file
128
tests/config/test_symlink_fingerprint.py
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
import os
|
||||||
|
import pytest
|
||||||
|
from py import config as config_module
|
||||||
|
|
||||||
|
def _setup_paths(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||||
|
settings_dir = tmp_path / "settings"
|
||||||
|
loras_dir = tmp_path / "loras"
|
||||||
|
loras_dir.mkdir()
|
||||||
|
checkpoint_dir = tmp_path / "checkpoints"
|
||||||
|
checkpoint_dir.mkdir()
|
||||||
|
embedding_dir = tmp_path / "embeddings"
|
||||||
|
embedding_dir.mkdir()
|
||||||
|
|
||||||
|
def fake_get_folder_paths(kind: str):
|
||||||
|
mapping = {
|
||||||
|
"loras": [str(loras_dir)],
|
||||||
|
"checkpoints": [str(checkpoint_dir)],
|
||||||
|
"unet": [],
|
||||||
|
"embeddings": [str(embedding_dir)],
|
||||||
|
}
|
||||||
|
return mapping.get(kind, [])
|
||||||
|
|
||||||
|
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths)
|
||||||
|
monkeypatch.setattr(config_module, "standalone_mode", True)
|
||||||
|
monkeypatch.setattr(config_module, "get_settings_dir", lambda create=True: str(settings_dir))
|
||||||
|
|
||||||
|
return loras_dir, settings_dir
|
||||||
|
|
||||||
|
def test_fingerprint_match_skips_rescan(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||||
|
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
|
||||||
|
|
||||||
|
# Track calls to _scan_symbolic_links
|
||||||
|
scan_calls = 0
|
||||||
|
original_scan = config_module.Config._scan_symbolic_links
|
||||||
|
|
||||||
|
def wrapped_scan(self):
|
||||||
|
nonlocal scan_calls
|
||||||
|
scan_calls += 1
|
||||||
|
return original_scan(self)
|
||||||
|
|
||||||
|
monkeypatch.setattr(config_module.Config, "_scan_symbolic_links", wrapped_scan)
|
||||||
|
|
||||||
|
# 1. First initialization: should scan and cache
|
||||||
|
cfg1 = config_module.Config()
|
||||||
|
assert scan_calls == 1
|
||||||
|
|
||||||
|
# 2. Second initialization: should load from cache and skip rescan because fingerprint matches
|
||||||
|
cfg2 = config_module.Config()
|
||||||
|
# scan_calls should still be 1 because it loaded from cache and skipped the rescan
|
||||||
|
assert scan_calls == 1
|
||||||
|
|
||||||
|
def test_mtime_change_does_not_trigger_rescan(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||||
|
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
|
||||||
|
|
||||||
|
# Track calls to _scan_symbolic_links
|
||||||
|
scan_calls = 0
|
||||||
|
original_scan = config_module.Config._scan_symbolic_links
|
||||||
|
|
||||||
|
def wrapped_scan(self):
|
||||||
|
nonlocal scan_calls
|
||||||
|
scan_calls += 1
|
||||||
|
return original_scan(self)
|
||||||
|
|
||||||
|
monkeypatch.setattr(config_module.Config, "_scan_symbolic_links", wrapped_scan)
|
||||||
|
|
||||||
|
# 1. First initialization: should scan and cache
|
||||||
|
cfg1 = config_module.Config()
|
||||||
|
assert scan_calls == 1
|
||||||
|
|
||||||
|
# 2. Modify a root directory (change mtime) - this should NOT trigger a rescan anymore
|
||||||
|
os.utime(loras_dir, (os.path.getatime(loras_dir), os.path.getmtime(loras_dir) + 100))
|
||||||
|
|
||||||
|
# 3. Third initialization: should load from cache and skip rescan despite mtime change
|
||||||
|
cfg3 = config_module.Config()
|
||||||
|
assert scan_calls == 1
|
||||||
|
|
||||||
|
def test_root_path_change_triggers_rescan(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||||
|
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
|
||||||
|
|
||||||
|
# Track calls to _scan_symbolic_links
|
||||||
|
scan_calls = 0
|
||||||
|
original_scan = config_module.Config._scan_symbolic_links
|
||||||
|
|
||||||
|
def wrapped_scan(self):
|
||||||
|
nonlocal scan_calls
|
||||||
|
scan_calls += 1
|
||||||
|
return original_scan(self)
|
||||||
|
|
||||||
|
monkeypatch.setattr(config_module.Config, "_scan_symbolic_links", wrapped_scan)
|
||||||
|
|
||||||
|
# 1. First initialization
|
||||||
|
config_module.Config()
|
||||||
|
assert scan_calls == 1
|
||||||
|
|
||||||
|
# 2. Change root paths
|
||||||
|
new_lora_dir = tmp_path / "new_loras"
|
||||||
|
new_lora_dir.mkdir()
|
||||||
|
|
||||||
|
def fake_get_folder_paths_modified(kind: str):
|
||||||
|
if kind == "loras":
|
||||||
|
return [str(loras_dir), str(new_lora_dir)]
|
||||||
|
return []
|
||||||
|
|
||||||
|
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths_modified)
|
||||||
|
|
||||||
|
# 3. Initialization with different roots should trigger rescan
|
||||||
|
config_module.Config()
|
||||||
|
assert scan_calls == 2
|
||||||
|
|
||||||
|
def test_manual_rebuild_symlink_cache(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||||
|
loras_dir, settings_dir = _setup_paths(monkeypatch, tmp_path)
|
||||||
|
|
||||||
|
scan_calls = 0
|
||||||
|
original_scan = config_module.Config._scan_symbolic_links
|
||||||
|
|
||||||
|
def wrapped_scan(self):
|
||||||
|
nonlocal scan_calls
|
||||||
|
scan_calls += 1
|
||||||
|
return original_scan(self)
|
||||||
|
|
||||||
|
monkeypatch.setattr(config_module.Config, "_scan_symbolic_links", wrapped_scan)
|
||||||
|
|
||||||
|
cfg = config_module.Config()
|
||||||
|
assert scan_calls == 1
|
||||||
|
|
||||||
|
# Manual trigger
|
||||||
|
cfg.rebuild_symlink_cache()
|
||||||
|
assert scan_calls == 2
|
||||||
Reference in New Issue
Block a user