mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
Previously, `map_path_to_link` and `map_link_to_path` returned the original input path when no mapping was found, instead of the normalized version. This could cause inconsistencies when paths with different representations (e.g., trailing slashes) were used. Now both methods consistently return the normalized path, ensuring uniform path handling throughout the application.
714 lines
29 KiB
Python
714 lines
29 KiB
Python
import os
|
|
import platform
|
|
import threading
|
|
from pathlib import Path
|
|
import folder_paths # type: ignore
|
|
from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Tuple
|
|
import logging
|
|
import json
|
|
import urllib.parse
|
|
import time
|
|
|
|
from .utils.settings_paths import ensure_settings_file, get_settings_dir, load_settings_template
|
|
|
|
# Use an environment variable to control standalone mode
|
|
standalone_mode = os.environ.get("LORA_MANAGER_STANDALONE", "0") == "1" or os.environ.get("HF_HUB_DISABLE_TELEMETRY", "0") == "0"
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _normalize_folder_paths_for_comparison(
|
|
folder_paths: Mapping[str, Iterable[str]]
|
|
) -> Dict[str, Set[str]]:
|
|
"""Normalize folder paths for comparison across libraries."""
|
|
|
|
normalized: Dict[str, Set[str]] = {}
|
|
for key, values in folder_paths.items():
|
|
if isinstance(values, str):
|
|
candidate_values: Iterable[str] = [values]
|
|
else:
|
|
try:
|
|
candidate_values = iter(values)
|
|
except TypeError:
|
|
continue
|
|
|
|
normalized_values: Set[str] = set()
|
|
for value in candidate_values:
|
|
if not isinstance(value, str):
|
|
continue
|
|
stripped = value.strip()
|
|
if not stripped:
|
|
continue
|
|
normalized_values.add(os.path.normcase(os.path.normpath(stripped)))
|
|
|
|
if normalized_values:
|
|
normalized[key] = normalized_values
|
|
|
|
return normalized
|
|
|
|
|
|
def _normalize_library_folder_paths(
|
|
library_payload: Mapping[str, Any]
|
|
) -> Dict[str, Set[str]]:
|
|
"""Return normalized folder paths extracted from a library payload."""
|
|
|
|
folder_paths = library_payload.get("folder_paths")
|
|
if isinstance(folder_paths, Mapping):
|
|
return _normalize_folder_paths_for_comparison(folder_paths)
|
|
return {}
|
|
|
|
|
|
def _get_template_folder_paths() -> Dict[str, Set[str]]:
|
|
"""Return normalized folder paths defined in the bundled template."""
|
|
|
|
template_payload = load_settings_template()
|
|
if not template_payload:
|
|
return {}
|
|
|
|
folder_paths = template_payload.get("folder_paths")
|
|
if isinstance(folder_paths, Mapping):
|
|
return _normalize_folder_paths_for_comparison(folder_paths)
|
|
return {}
|
|
|
|
|
|
class Config:
|
|
"""Global configuration for LoRA Manager"""
|
|
|
|
def __init__(self):
|
|
self.templates_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'templates')
|
|
self.static_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'static')
|
|
self.i18n_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'locales')
|
|
# Path mapping dictionary, target to link mapping
|
|
self._path_mappings: Dict[str, str] = {}
|
|
# Normalized preview root directories used to validate preview access
|
|
self._preview_root_paths: Set[Path] = set()
|
|
# Fingerprint of the symlink layout from the last successful scan
|
|
self._cached_fingerprint: Optional[Dict[str, object]] = None
|
|
self.loras_roots = self._init_lora_paths()
|
|
self.checkpoints_roots = None
|
|
self.unet_roots = None
|
|
self.embeddings_roots = None
|
|
self.base_models_roots = self._init_checkpoint_paths()
|
|
self.embeddings_roots = self._init_embedding_paths()
|
|
# Scan symbolic links during initialization
|
|
self._initialize_symlink_mappings()
|
|
|
|
if not standalone_mode:
|
|
# Save the paths to settings.json when running in ComfyUI mode
|
|
self.save_folder_paths_to_settings()
|
|
|
|
def save_folder_paths_to_settings(self):
|
|
"""Persist ComfyUI-derived folder paths to the multi-library settings."""
|
|
try:
|
|
ensure_settings_file(logger)
|
|
from .services.settings_manager import get_settings_manager
|
|
|
|
settings_service = get_settings_manager()
|
|
libraries = settings_service.get_libraries()
|
|
comfy_library = libraries.get("comfyui", {})
|
|
default_library = libraries.get("default", {})
|
|
|
|
template_folder_paths = _get_template_folder_paths()
|
|
default_library_paths: Dict[str, Set[str]] = {}
|
|
if isinstance(default_library, Mapping):
|
|
default_library_paths = _normalize_library_folder_paths(default_library)
|
|
|
|
libraries_changed = False
|
|
if (
|
|
isinstance(default_library, Mapping)
|
|
and template_folder_paths
|
|
and default_library_paths == template_folder_paths
|
|
):
|
|
if "comfyui" in libraries:
|
|
try:
|
|
settings_service.delete_library("default")
|
|
libraries_changed = True
|
|
logger.info("Removed template 'default' library entry")
|
|
except Exception as delete_error:
|
|
logger.debug(
|
|
"Failed to delete template 'default' library: %s",
|
|
delete_error,
|
|
)
|
|
else:
|
|
try:
|
|
settings_service.rename_library("default", "comfyui")
|
|
libraries_changed = True
|
|
logger.info("Renamed template 'default' library to 'comfyui'")
|
|
except Exception as rename_error:
|
|
logger.debug(
|
|
"Failed to rename template 'default' library: %s",
|
|
rename_error,
|
|
)
|
|
|
|
if libraries_changed:
|
|
libraries = settings_service.get_libraries()
|
|
comfy_library = libraries.get("comfyui", {})
|
|
default_library = libraries.get("default", {})
|
|
|
|
target_folder_paths = {
|
|
'loras': list(self.loras_roots),
|
|
'checkpoints': list(self.checkpoints_roots or []),
|
|
'unet': list(self.unet_roots or []),
|
|
'embeddings': list(self.embeddings_roots or []),
|
|
}
|
|
|
|
normalized_target_paths = _normalize_folder_paths_for_comparison(target_folder_paths)
|
|
|
|
normalized_default_paths: Optional[Dict[str, Set[str]]] = None
|
|
if isinstance(default_library, Mapping):
|
|
normalized_default_paths = _normalize_library_folder_paths(default_library)
|
|
|
|
if (
|
|
not comfy_library
|
|
and default_library
|
|
and normalized_target_paths
|
|
and normalized_default_paths == normalized_target_paths
|
|
):
|
|
try:
|
|
settings_service.rename_library("default", "comfyui")
|
|
logger.info("Renamed legacy 'default' library to 'comfyui'")
|
|
libraries = settings_service.get_libraries()
|
|
comfy_library = libraries.get("comfyui", {})
|
|
except Exception as rename_error:
|
|
logger.debug(
|
|
"Failed to rename legacy 'default' library: %s", rename_error
|
|
)
|
|
|
|
default_lora_root = comfy_library.get("default_lora_root", "")
|
|
if not default_lora_root and len(self.loras_roots) == 1:
|
|
default_lora_root = self.loras_roots[0]
|
|
|
|
default_checkpoint_root = comfy_library.get("default_checkpoint_root", "")
|
|
if (not default_checkpoint_root and self.checkpoints_roots and
|
|
len(self.checkpoints_roots) == 1):
|
|
default_checkpoint_root = self.checkpoints_roots[0]
|
|
|
|
default_embedding_root = comfy_library.get("default_embedding_root", "")
|
|
if (not default_embedding_root and self.embeddings_roots and
|
|
len(self.embeddings_roots) == 1):
|
|
default_embedding_root = self.embeddings_roots[0]
|
|
|
|
metadata = dict(comfy_library.get("metadata", {}))
|
|
metadata.setdefault("display_name", "ComfyUI")
|
|
metadata["source"] = "comfyui"
|
|
|
|
settings_service.upsert_library(
|
|
"comfyui",
|
|
folder_paths=target_folder_paths,
|
|
default_lora_root=default_lora_root,
|
|
default_checkpoint_root=default_checkpoint_root,
|
|
default_embedding_root=default_embedding_root,
|
|
metadata=metadata,
|
|
activate=True,
|
|
)
|
|
|
|
logger.info("Updated 'comfyui' library with current folder paths")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to save folder paths: {e}")
|
|
|
|
def _is_link(self, path: str) -> bool:
|
|
try:
|
|
if os.path.islink(path):
|
|
return True
|
|
if platform.system() == 'Windows':
|
|
try:
|
|
import ctypes
|
|
FILE_ATTRIBUTE_REPARSE_POINT = 0x400
|
|
attrs = ctypes.windll.kernel32.GetFileAttributesW(str(path))
|
|
return attrs != -1 and (attrs & FILE_ATTRIBUTE_REPARSE_POINT)
|
|
except Exception as e:
|
|
logger.error(f"Error checking Windows reparse point: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error checking link status for {path}: {e}")
|
|
return False
|
|
|
|
def _normalize_path(self, path: str) -> str:
|
|
return os.path.normpath(path).replace(os.sep, '/')
|
|
|
|
def _get_symlink_cache_path(self) -> Path:
|
|
cache_dir = Path(get_settings_dir(create=True)) / "cache"
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
return cache_dir / "symlink_map.json"
|
|
|
|
def _symlink_roots(self) -> List[str]:
|
|
roots: List[str] = []
|
|
roots.extend(self.loras_roots or [])
|
|
roots.extend(self.base_models_roots or [])
|
|
roots.extend(self.embeddings_roots or [])
|
|
return roots
|
|
|
|
def _build_symlink_fingerprint(self) -> Dict[str, object]:
|
|
roots = [self._normalize_path(path) for path in self._symlink_roots() if path]
|
|
unique_roots = sorted(set(roots))
|
|
# Fingerprint now only contains the root paths to avoid sensitivity to folder content changes.
|
|
return {"roots": unique_roots}
|
|
|
|
def _initialize_symlink_mappings(self) -> None:
|
|
start = time.perf_counter()
|
|
cache_loaded = self._load_persisted_cache_into_mappings()
|
|
|
|
if cache_loaded:
|
|
logger.info(
|
|
"Symlink mappings restored from cache in %.2f ms",
|
|
(time.perf_counter() - start) * 1000,
|
|
)
|
|
self._rebuild_preview_roots()
|
|
|
|
# Only rescan if target roots have changed.
|
|
# This is stable across file additions/deletions.
|
|
current_fingerprint = self._build_symlink_fingerprint()
|
|
cached_fingerprint = self._cached_fingerprint
|
|
|
|
if cached_fingerprint and current_fingerprint == cached_fingerprint:
|
|
return
|
|
|
|
logger.info("Symlink root paths changed; rescanning symbolic links")
|
|
|
|
self.rebuild_symlink_cache()
|
|
logger.info(
|
|
"Symlink mappings rebuilt and cached in %.2f ms",
|
|
(time.perf_counter() - start) * 1000,
|
|
)
|
|
|
|
def rebuild_symlink_cache(self) -> None:
|
|
"""Force a fresh scan of all symbolic links and update the persistent cache."""
|
|
self._scan_symbolic_links()
|
|
self._save_symlink_cache()
|
|
self._rebuild_preview_roots()
|
|
|
|
def _load_persisted_cache_into_mappings(self) -> bool:
|
|
"""Load the symlink cache and store its fingerprint for comparison."""
|
|
cache_path = self._get_symlink_cache_path()
|
|
if not cache_path.exists():
|
|
return False
|
|
|
|
try:
|
|
with cache_path.open("r", encoding="utf-8") as handle:
|
|
payload = json.load(handle)
|
|
except Exception as exc:
|
|
logger.info("Failed to load symlink cache %s: %s", cache_path, exc)
|
|
return False
|
|
|
|
if not isinstance(payload, dict):
|
|
return False
|
|
|
|
cached_mappings = payload.get("path_mappings")
|
|
if not isinstance(cached_mappings, Mapping):
|
|
return False
|
|
|
|
# Store the cached fingerprint for comparison during initialization
|
|
self._cached_fingerprint = payload.get("fingerprint")
|
|
|
|
normalized_mappings: Dict[str, str] = {}
|
|
for target, link in cached_mappings.items():
|
|
if not isinstance(target, str) or not isinstance(link, str):
|
|
continue
|
|
normalized_mappings[self._normalize_path(target)] = self._normalize_path(link)
|
|
|
|
self._path_mappings = normalized_mappings
|
|
logger.info("Symlink cache loaded with %d mappings", len(self._path_mappings))
|
|
return True
|
|
|
|
def _save_symlink_cache(self) -> None:
|
|
cache_path = self._get_symlink_cache_path()
|
|
payload = {
|
|
"fingerprint": self._build_symlink_fingerprint(),
|
|
"path_mappings": self._path_mappings,
|
|
}
|
|
|
|
try:
|
|
with cache_path.open("w", encoding="utf-8") as handle:
|
|
json.dump(payload, handle, ensure_ascii=False, indent=2)
|
|
logger.debug("Symlink cache saved to %s with %d mappings", cache_path, len(self._path_mappings))
|
|
except Exception as exc:
|
|
logger.info("Failed to write symlink cache %s: %s", cache_path, exc)
|
|
|
|
def _scan_symbolic_links(self):
|
|
"""Scan all symbolic links in LoRA, Checkpoint, and Embedding root directories"""
|
|
start = time.perf_counter()
|
|
|
|
# Reset mappings before rescanning to avoid stale entries
|
|
self._path_mappings.clear()
|
|
self._seed_root_symlink_mappings()
|
|
visited_dirs: Set[str] = set()
|
|
for root in self._symlink_roots():
|
|
self._scan_directory_links(root, visited_dirs)
|
|
logger.debug(
|
|
"Symlink scan finished in %.2f ms with %d mappings",
|
|
(time.perf_counter() - start) * 1000,
|
|
len(self._path_mappings),
|
|
)
|
|
|
|
def _scan_directory_links(self, root: str, visited_dirs: Set[str]):
|
|
"""Iteratively scan directory symlinks to avoid deep recursion."""
|
|
try:
|
|
# Note: We only use realpath for the initial root if it's not already resolved
|
|
# to ensure we have a valid entry point.
|
|
root_real = self._normalize_path(os.path.realpath(root))
|
|
except OSError:
|
|
root_real = self._normalize_path(root)
|
|
|
|
if root_real in visited_dirs:
|
|
return
|
|
|
|
visited_dirs.add(root_real)
|
|
# Stack entries: (display_path, real_resolved_path)
|
|
stack: List[Tuple[str, str]] = [(root, root_real)]
|
|
|
|
while stack:
|
|
current_display, current_real = stack.pop()
|
|
try:
|
|
with os.scandir(current_display) as it:
|
|
for entry in it:
|
|
try:
|
|
# 1. High speed detection using dirent data (is_symlink)
|
|
is_link = entry.is_symlink()
|
|
|
|
# On Windows, is_symlink handles reparse points
|
|
if is_link:
|
|
# Only resolve realpath when we actually find a link
|
|
target_path = os.path.realpath(entry.path)
|
|
if not os.path.isdir(target_path):
|
|
continue
|
|
|
|
normalized_target = self._normalize_path(target_path)
|
|
self.add_path_mapping(entry.path, target_path)
|
|
|
|
if normalized_target in visited_dirs:
|
|
continue
|
|
|
|
visited_dirs.add(normalized_target)
|
|
stack.append((target_path, normalized_target))
|
|
continue
|
|
|
|
# 2. Process normal directories
|
|
if not entry.is_dir(follow_symlinks=False):
|
|
continue
|
|
|
|
# For normal directories, we avoid realpath() call by
|
|
# incrementally building the real path relative to current_real.
|
|
# This is safe because 'entry' is NOT a symlink.
|
|
entry_real = self._normalize_path(os.path.join(current_real, entry.name))
|
|
|
|
if entry_real in visited_dirs:
|
|
continue
|
|
|
|
visited_dirs.add(entry_real)
|
|
stack.append((entry.path, entry_real))
|
|
except Exception as inner_exc:
|
|
logger.debug(
|
|
"Error processing directory entry %s: %s", entry.path, inner_exc
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error scanning links in {current_display}: {e}")
|
|
|
|
|
|
|
|
def add_path_mapping(self, link_path: str, target_path: str):
|
|
"""Add a symbolic link path mapping
|
|
target_path: actual target path
|
|
link_path: symbolic link path
|
|
"""
|
|
normalized_link = self._normalize_path(link_path)
|
|
normalized_target = self._normalize_path(target_path)
|
|
# Keep the original mapping: target path -> link path
|
|
self._path_mappings[normalized_target] = normalized_link
|
|
logger.info(f"Added path mapping: {normalized_target} -> {normalized_link}")
|
|
self._preview_root_paths.update(self._expand_preview_root(normalized_target))
|
|
self._preview_root_paths.update(self._expand_preview_root(normalized_link))
|
|
|
|
def _seed_root_symlink_mappings(self) -> None:
|
|
"""Ensure symlinked root folders are recorded before deep scanning."""
|
|
|
|
for root in self._symlink_roots():
|
|
if not root:
|
|
continue
|
|
try:
|
|
if not self._is_link(root):
|
|
continue
|
|
target_path = os.path.realpath(root)
|
|
if not os.path.isdir(target_path):
|
|
continue
|
|
self.add_path_mapping(root, target_path)
|
|
except Exception as exc:
|
|
logger.debug("Skipping root symlink %s: %s", root, exc)
|
|
|
|
def _expand_preview_root(self, path: str) -> Set[Path]:
|
|
"""Return normalized ``Path`` objects representing a preview root."""
|
|
|
|
roots: Set[Path] = set()
|
|
if not path:
|
|
return roots
|
|
|
|
try:
|
|
raw_path = Path(path).expanduser()
|
|
except Exception:
|
|
return roots
|
|
|
|
if raw_path.is_absolute():
|
|
roots.add(raw_path)
|
|
|
|
try:
|
|
resolved = raw_path.resolve(strict=False)
|
|
except RuntimeError:
|
|
resolved = raw_path.absolute()
|
|
roots.add(resolved)
|
|
|
|
try:
|
|
real_path = raw_path.resolve()
|
|
except (FileNotFoundError, RuntimeError):
|
|
real_path = resolved
|
|
roots.add(real_path)
|
|
|
|
normalized: Set[Path] = set()
|
|
for candidate in roots:
|
|
if candidate.is_absolute():
|
|
normalized.add(candidate)
|
|
else:
|
|
try:
|
|
normalized.add(candidate.resolve(strict=False))
|
|
except RuntimeError:
|
|
normalized.add(candidate.absolute())
|
|
|
|
return normalized
|
|
|
|
def _rebuild_preview_roots(self) -> None:
|
|
"""Recompute the cache of directories permitted for previews."""
|
|
|
|
preview_roots: Set[Path] = set()
|
|
|
|
for root in self.loras_roots or []:
|
|
preview_roots.update(self._expand_preview_root(root))
|
|
for root in self.base_models_roots or []:
|
|
preview_roots.update(self._expand_preview_root(root))
|
|
for root in self.embeddings_roots or []:
|
|
preview_roots.update(self._expand_preview_root(root))
|
|
|
|
for target, link in self._path_mappings.items():
|
|
preview_roots.update(self._expand_preview_root(target))
|
|
preview_roots.update(self._expand_preview_root(link))
|
|
|
|
self._preview_root_paths = {path for path in preview_roots if path.is_absolute()}
|
|
|
|
def map_path_to_link(self, path: str) -> str:
|
|
"""Map a target path back to its symbolic link path"""
|
|
normalized_path = os.path.normpath(path).replace(os.sep, '/')
|
|
# Check if the path is contained in any mapped target path
|
|
for target_path, link_path in self._path_mappings.items():
|
|
# Match whole path components to avoid prefix collisions (e.g., /a/b vs /a/bc)
|
|
if normalized_path == target_path:
|
|
return link_path
|
|
|
|
if normalized_path.startswith(target_path + '/'):
|
|
# If the path starts with the target path, replace with link path
|
|
mapped_path = normalized_path.replace(target_path, link_path, 1)
|
|
return mapped_path
|
|
return normalized_path
|
|
|
|
def map_link_to_path(self, link_path: str) -> str:
|
|
"""Map a symbolic link path back to the actual path"""
|
|
normalized_link = os.path.normpath(link_path).replace(os.sep, '/')
|
|
# Check if the path is contained in any mapped target path
|
|
for target_path, link_path_mapped in self._path_mappings.items():
|
|
# Match whole path components
|
|
if normalized_link == link_path_mapped:
|
|
return target_path
|
|
|
|
if normalized_link.startswith(link_path_mapped + '/'):
|
|
# If the path starts with the link path, replace with actual path
|
|
mapped_path = normalized_link.replace(link_path_mapped, target_path, 1)
|
|
return mapped_path
|
|
return normalized_link
|
|
|
|
def _dedupe_existing_paths(self, raw_paths: Iterable[str]) -> Dict[str, str]:
|
|
dedup: Dict[str, str] = {}
|
|
for path in raw_paths:
|
|
if not isinstance(path, str):
|
|
continue
|
|
if not os.path.exists(path):
|
|
continue
|
|
real_path = os.path.normpath(os.path.realpath(path)).replace(os.sep, '/')
|
|
normalized = os.path.normpath(path).replace(os.sep, '/')
|
|
if real_path not in dedup:
|
|
dedup[real_path] = normalized
|
|
return dedup
|
|
|
|
def _prepare_lora_paths(self, raw_paths: Iterable[str]) -> List[str]:
|
|
path_map = self._dedupe_existing_paths(raw_paths)
|
|
unique_paths = sorted(path_map.values(), key=lambda p: p.lower())
|
|
|
|
for original_path in unique_paths:
|
|
real_path = os.path.normpath(os.path.realpath(original_path)).replace(os.sep, '/')
|
|
if real_path != original_path:
|
|
self.add_path_mapping(original_path, real_path)
|
|
|
|
return unique_paths
|
|
|
|
def _prepare_checkpoint_paths(
|
|
self, checkpoint_paths: Iterable[str], unet_paths: Iterable[str]
|
|
) -> List[str]:
|
|
checkpoint_map = self._dedupe_existing_paths(checkpoint_paths)
|
|
unet_map = self._dedupe_existing_paths(unet_paths)
|
|
|
|
merged_map: Dict[str, str] = {}
|
|
for real_path, original in {**checkpoint_map, **unet_map}.items():
|
|
if real_path not in merged_map:
|
|
merged_map[real_path] = original
|
|
|
|
unique_paths = sorted(merged_map.values(), key=lambda p: p.lower())
|
|
|
|
checkpoint_values = set(checkpoint_map.values())
|
|
unet_values = set(unet_map.values())
|
|
self.checkpoints_roots = [p for p in unique_paths if p in checkpoint_values]
|
|
self.unet_roots = [p for p in unique_paths if p in unet_values]
|
|
|
|
for original_path in unique_paths:
|
|
real_path = os.path.normpath(os.path.realpath(original_path)).replace(os.sep, '/')
|
|
if real_path != original_path:
|
|
self.add_path_mapping(original_path, real_path)
|
|
|
|
return unique_paths
|
|
|
|
def _prepare_embedding_paths(self, raw_paths: Iterable[str]) -> List[str]:
|
|
path_map = self._dedupe_existing_paths(raw_paths)
|
|
unique_paths = sorted(path_map.values(), key=lambda p: p.lower())
|
|
|
|
for original_path in unique_paths:
|
|
real_path = os.path.normpath(os.path.realpath(original_path)).replace(os.sep, '/')
|
|
if real_path != original_path:
|
|
self.add_path_mapping(original_path, real_path)
|
|
|
|
return unique_paths
|
|
|
|
def _apply_library_paths(self, folder_paths: Mapping[str, Iterable[str]]) -> None:
|
|
self._path_mappings.clear()
|
|
self._preview_root_paths = set()
|
|
|
|
lora_paths = folder_paths.get('loras', []) or []
|
|
checkpoint_paths = folder_paths.get('checkpoints', []) or []
|
|
unet_paths = folder_paths.get('unet', []) or []
|
|
embedding_paths = folder_paths.get('embeddings', []) or []
|
|
|
|
self.loras_roots = self._prepare_lora_paths(lora_paths)
|
|
self.base_models_roots = self._prepare_checkpoint_paths(checkpoint_paths, unet_paths)
|
|
self.embeddings_roots = self._prepare_embedding_paths(embedding_paths)
|
|
|
|
self._initialize_symlink_mappings()
|
|
|
|
def _init_lora_paths(self) -> List[str]:
|
|
"""Initialize and validate LoRA paths from ComfyUI settings"""
|
|
try:
|
|
raw_paths = folder_paths.get_folder_paths("loras")
|
|
unique_paths = self._prepare_lora_paths(raw_paths)
|
|
logger.info("Found LoRA roots:" + ("\n - " + "\n - ".join(unique_paths) if unique_paths else "[]"))
|
|
|
|
if not unique_paths:
|
|
logger.warning("No valid loras folders found in ComfyUI configuration")
|
|
return []
|
|
|
|
return unique_paths
|
|
except Exception as e:
|
|
logger.warning(f"Error initializing LoRA paths: {e}")
|
|
return []
|
|
|
|
def _init_checkpoint_paths(self) -> List[str]:
|
|
"""Initialize and validate checkpoint paths from ComfyUI settings"""
|
|
try:
|
|
raw_checkpoint_paths = folder_paths.get_folder_paths("checkpoints")
|
|
raw_unet_paths = folder_paths.get_folder_paths("unet")
|
|
unique_paths = self._prepare_checkpoint_paths(raw_checkpoint_paths, raw_unet_paths)
|
|
|
|
logger.info("Found checkpoint roots:" + ("\n - " + "\n - ".join(unique_paths) if unique_paths else "[]"))
|
|
|
|
if not unique_paths:
|
|
logger.warning("No valid checkpoint folders found in ComfyUI configuration")
|
|
return []
|
|
|
|
return unique_paths
|
|
except Exception as e:
|
|
logger.warning(f"Error initializing checkpoint paths: {e}")
|
|
return []
|
|
|
|
def _init_embedding_paths(self) -> List[str]:
|
|
"""Initialize and validate embedding paths from ComfyUI settings"""
|
|
try:
|
|
raw_paths = folder_paths.get_folder_paths("embeddings")
|
|
unique_paths = self._prepare_embedding_paths(raw_paths)
|
|
logger.info("Found embedding roots:" + ("\n - " + "\n - ".join(unique_paths) if unique_paths else "[]"))
|
|
|
|
if not unique_paths:
|
|
logger.warning("No valid embeddings folders found in ComfyUI configuration")
|
|
return []
|
|
|
|
return unique_paths
|
|
except Exception as e:
|
|
logger.warning(f"Error initializing embedding paths: {e}")
|
|
return []
|
|
|
|
def get_preview_static_url(self, preview_path: str) -> str:
|
|
if not preview_path:
|
|
return ""
|
|
|
|
normalized = os.path.normpath(preview_path).replace(os.sep, '/')
|
|
encoded_path = urllib.parse.quote(normalized, safe='')
|
|
return f'/api/lm/previews?path={encoded_path}'
|
|
|
|
def is_preview_path_allowed(self, preview_path: str) -> bool:
|
|
"""Return ``True`` if ``preview_path`` is within an allowed directory."""
|
|
|
|
if not preview_path:
|
|
return False
|
|
|
|
try:
|
|
candidate = Path(preview_path).expanduser().resolve(strict=False)
|
|
except Exception:
|
|
return False
|
|
|
|
# Use os.path.normcase for case-insensitive comparison on Windows.
|
|
# On Windows, Path.relative_to() is case-sensitive for drive letters,
|
|
# causing paths like 'a:/folder' to not match 'A:/folder'.
|
|
candidate_str = os.path.normcase(str(candidate))
|
|
for root in self._preview_root_paths:
|
|
root_str = os.path.normcase(str(root))
|
|
# Check if candidate is equal to or under the root directory
|
|
if candidate_str == root_str or candidate_str.startswith(root_str + os.sep):
|
|
return True
|
|
|
|
return False
|
|
|
|
def apply_library_settings(self, library_config: Mapping[str, object]) -> None:
|
|
"""Update runtime paths to match the provided library configuration."""
|
|
folder_paths = library_config.get('folder_paths') if isinstance(library_config, Mapping) else {}
|
|
if not isinstance(folder_paths, Mapping):
|
|
folder_paths = {}
|
|
|
|
self._apply_library_paths(folder_paths)
|
|
|
|
logger.info(
|
|
"Applied library settings with %d lora roots, %d checkpoint roots, and %d embedding roots",
|
|
len(self.loras_roots or []),
|
|
len(self.base_models_roots or []),
|
|
len(self.embeddings_roots or []),
|
|
)
|
|
|
|
def get_library_registry_snapshot(self) -> Dict[str, object]:
|
|
"""Return the current library registry and active library name."""
|
|
|
|
try:
|
|
from .services.settings_manager import get_settings_manager
|
|
|
|
settings_service = get_settings_manager()
|
|
libraries = settings_service.get_libraries()
|
|
active_library = settings_service.get_active_library_name()
|
|
return {
|
|
"active_library": active_library,
|
|
"libraries": libraries,
|
|
}
|
|
except Exception as exc: # pragma: no cover - defensive logging
|
|
logger.debug("Failed to collect library registry snapshot: %s", exc)
|
|
return {"active_library": "", "libraries": {}}
|
|
|
|
# Global config instance
|
|
config = Config()
|