feat(config): replace symlink scanning with cached mapping system

- Add `get_settings_dir` import for cache directory resolution
- Replace `_scan_symbolic_links` and `_rebuild_preview_roots` with unified `_initialize_symlink_mappings` method
- Implement fingerprint-based cache validation using root mtimes, inodes, and noise-aware timestamps
- Add helper methods for path normalization, cache location, and symlink root aggregation
- Improve performance by avoiding redundant symlink traversal when directory structure is unchanged
This commit is contained in:
Will Miao
2025-12-09 19:49:54 +08:00
parent a3a00bbeed
commit 3fc72d6bc1
4 changed files with 311 additions and 37 deletions

View File

@@ -7,7 +7,7 @@ import logging
import json
import urllib.parse
from .utils.settings_paths import ensure_settings_file, load_settings_template
from .utils.settings_paths import ensure_settings_file, get_settings_dir, load_settings_template
# Use an environment variable to control standalone mode
standalone_mode = os.environ.get("LORA_MANAGER_STANDALONE", "0") == "1" or os.environ.get("HF_HUB_DISABLE_TELEMETRY", "0") == "0"
@@ -87,8 +87,7 @@ class Config:
self.base_models_roots = self._init_checkpoint_paths()
self.embeddings_roots = self._init_embedding_paths()
# Scan symbolic links during initialization
self._scan_symbolic_links()
self._rebuild_preview_roots()
self._initialize_symlink_mappings()
if not standalone_mode:
# Save the paths to settings.json when running in ComfyUI mode
@@ -220,39 +219,212 @@ class Config:
logger.error(f"Error checking link status for {path}: {e}")
return False
def _normalize_path(self, path: str) -> str:
return os.path.normpath(path).replace(os.sep, '/')
def _get_symlink_cache_path(self) -> Path:
cache_dir = Path(get_settings_dir(create=True)) / "cache"
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir / "symlink_map.json"
def _compute_noise_mtime(self, root: str) -> Optional[int]:
"""Return the latest mtime of known noisy paths inside ``root``."""
normalized_root = self._normalize_path(root)
noise_paths: List[str] = []
# The first LoRA root hosts recipes and stats files which routinely
# update without changing symlink layout.
first_lora_root = self._normalize_path(self.loras_roots[0]) if self.loras_roots else None
if first_lora_root and normalized_root == first_lora_root:
recipes_dir = os.path.join(root, "recipes")
stats_file = os.path.join(root, "lora_manager_stats.json")
noise_paths.extend([recipes_dir, stats_file])
mtimes: List[int] = []
for path in noise_paths:
try:
stat_result = os.stat(path)
mtimes.append(getattr(stat_result, "st_mtime_ns", int(stat_result.st_mtime * 1e9)))
except OSError:
continue
if not mtimes:
return None
return max(mtimes)
def _symlink_roots(self) -> List[str]:
roots: List[str] = []
roots.extend(self.loras_roots or [])
roots.extend(self.base_models_roots or [])
roots.extend(self.embeddings_roots or [])
return roots
def _build_symlink_fingerprint(self) -> Dict[str, object]:
roots = [self._normalize_path(path) for path in self._symlink_roots() if path]
unique_roots = sorted(set(roots))
stats: Dict[str, Dict[str, int]] = {}
for root in unique_roots:
try:
root_stat = os.stat(root)
noise_mtime = self._compute_noise_mtime(root)
stats[root] = {
"mtime_ns": getattr(root_stat, "st_mtime_ns", int(root_stat.st_mtime * 1e9)),
"inode": getattr(root_stat, "st_ino", 0),
"noise_mtime_ns": noise_mtime,
}
except OSError:
continue
return {"roots": unique_roots, "stats": stats}
def _load_symlink_cache(self) -> bool:
cache_path = self._get_symlink_cache_path()
if not cache_path.exists():
return False
try:
with cache_path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
except Exception as exc:
logger.debug("Failed to load symlink cache %s: %s", cache_path, exc)
return False
if not isinstance(payload, dict):
return False
cached_fingerprint = payload.get("fingerprint")
cached_mappings = payload.get("path_mappings")
if not isinstance(cached_fingerprint, dict) or not isinstance(cached_mappings, Mapping):
return False
current_fingerprint = self._build_symlink_fingerprint()
cached_roots = cached_fingerprint.get("roots")
cached_stats = cached_fingerprint.get("stats")
if (
not isinstance(cached_roots, list)
or not isinstance(cached_stats, Mapping)
or sorted(cached_roots) != sorted(current_fingerprint["roots"]) # type: ignore[index]
):
return False
for root in current_fingerprint["roots"]: # type: ignore[assignment]
cached_stat = cached_stats.get(root) if isinstance(cached_stats, Mapping) else None
current_stat = current_fingerprint["stats"].get(root) # type: ignore[index]
if not isinstance(cached_stat, Mapping) or not current_stat:
return False
cached_mtime = cached_stat.get("mtime_ns")
cached_inode = cached_stat.get("inode")
current_mtime = current_stat.get("mtime_ns")
current_inode = current_stat.get("inode")
if cached_inode != current_inode:
return False
if cached_mtime != current_mtime:
cached_noise = cached_stat.get("noise_mtime_ns")
current_noise = current_stat.get("noise_mtime_ns")
if not (
cached_noise
and current_noise
and cached_mtime == cached_noise
and current_mtime == current_noise
):
return False
normalized_mappings: Dict[str, str] = {}
for target, link in cached_mappings.items():
if not isinstance(target, str) or not isinstance(link, str):
continue
normalized_mappings[self._normalize_path(target)] = self._normalize_path(link)
self._path_mappings = normalized_mappings
return True
def _save_symlink_cache(self) -> None:
cache_path = self._get_symlink_cache_path()
payload = {
"fingerprint": self._build_symlink_fingerprint(),
"path_mappings": self._path_mappings,
}
try:
with cache_path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, ensure_ascii=False, indent=2)
except Exception as exc:
logger.debug("Failed to write symlink cache %s: %s", cache_path, exc)
def _initialize_symlink_mappings(self) -> None:
if not self._load_symlink_cache():
self._scan_symbolic_links()
self._save_symlink_cache()
else:
logger.info("Loaded symlink mappings from cache")
self._rebuild_preview_roots()
def _scan_symbolic_links(self):
"""Scan all symbolic links in LoRA, Checkpoint, and Embedding root directories"""
for root in self.loras_roots:
self._scan_directory_links(root)
for root in self.base_models_roots:
self._scan_directory_links(root)
for root in self.embeddings_roots:
self._scan_directory_links(root)
visited_dirs: Set[str] = set()
for root in self._symlink_roots():
self._scan_directory_links(root, visited_dirs)
def _scan_directory_links(self, root: str):
"""Recursively scan symbolic links in a directory"""
def _scan_directory_links(self, root: str, visited_dirs: Set[str]):
"""Iteratively scan directory symlinks to avoid deep recursion."""
try:
with os.scandir(root) as it:
for entry in it:
if self._is_link(entry.path):
target_path = os.path.realpath(entry.path)
if os.path.isdir(target_path):
self.add_path_mapping(entry.path, target_path)
self._scan_directory_links(target_path)
elif entry.is_dir(follow_symlinks=False):
self._scan_directory_links(entry.path)
except Exception as e:
logger.error(f"Error scanning links in {root}: {e}")
root_real = self._normalize_path(os.path.realpath(root))
except OSError:
root_real = self._normalize_path(root)
if root_real in visited_dirs:
return
visited_dirs.add(root_real)
stack: List[str] = [root]
while stack:
current = stack.pop()
try:
with os.scandir(current) as it:
for entry in it:
try:
entry_path = entry.path
if self._is_link(entry_path):
target_path = os.path.realpath(entry_path)
if not os.path.isdir(target_path):
continue
normalized_target = self._normalize_path(target_path)
if normalized_target in visited_dirs:
continue
visited_dirs.add(normalized_target)
self.add_path_mapping(entry_path, target_path)
stack.append(target_path)
continue
if not entry.is_dir(follow_symlinks=False):
continue
normalized_real = self._normalize_path(os.path.realpath(entry_path))
if normalized_real in visited_dirs:
continue
visited_dirs.add(normalized_real)
stack.append(entry_path)
except Exception as inner_exc:
logger.debug(
"Error processing directory entry %s: %s", entry.path, inner_exc
)
except Exception as e:
logger.error(f"Error scanning links in {current}: {e}")
def add_path_mapping(self, link_path: str, target_path: str):
"""Add a symbolic link path mapping
target_path: actual target path
link_path: symbolic link path
"""
normalized_link = os.path.normpath(link_path).replace(os.sep, '/')
normalized_target = os.path.normpath(target_path).replace(os.sep, '/')
normalized_link = self._normalize_path(link_path)
normalized_target = self._normalize_path(target_path)
# Keep the original mapping: target path -> link path
self._path_mappings[normalized_target] = normalized_link
logger.info(f"Added path mapping: {normalized_target} -> {normalized_link}")
@@ -411,8 +583,7 @@ class Config:
self.base_models_roots = self._prepare_checkpoint_paths(checkpoint_paths, unet_paths)
self.embeddings_roots = self._prepare_embedding_paths(embedding_paths)
self._scan_symbolic_links()
self._rebuild_preview_roots()
self._initialize_symlink_mappings()
def _init_lora_paths(self) -> List[str]:
"""Initialize and validate LoRA paths from ComfyUI settings"""