fix(paths): deduplicate LoRA path overlap (#871)

This commit is contained in:
Will Miao
2026-03-27 17:27:11 +08:00
parent a5191414cc
commit 76ee59cdb9
6 changed files with 542 additions and 5 deletions

View File

@@ -705,6 +705,120 @@ class Config:
return unique_paths return unique_paths
@staticmethod
def _normalize_path_for_comparison(path: str, *, resolve_realpath: bool = False) -> str:
"""Normalize a path for equality checks across platforms."""
candidate = os.path.realpath(path) if resolve_realpath else path
return os.path.normcase(os.path.normpath(candidate)).replace(os.sep, "/")
def _filter_overlapping_extra_lora_paths(
self,
primary_paths: Iterable[str],
extra_paths: Iterable[str],
) -> List[str]:
"""Drop extra LoRA paths that resolve to the same physical location as primary roots."""
primary_map = {
self._normalize_path_for_comparison(path, resolve_realpath=True): path
for path in primary_paths
if isinstance(path, str) and path.strip() and os.path.exists(path)
}
primary_symlink_map = self._collect_first_level_symlink_targets(primary_paths)
filtered: List[str] = []
for original_path in extra_paths:
if not isinstance(original_path, str):
continue
stripped = original_path.strip()
if not stripped:
continue
if not os.path.exists(stripped):
continue
real_path = self._normalize_path_for_comparison(
stripped,
resolve_realpath=True,
)
normalized_path = os.path.normpath(stripped).replace(os.sep, "/")
primary_path = primary_map.get(real_path)
if primary_path:
# Config loading should stay tolerant of existing invalid state and warn.
logger.warning(
"Detected the same LoRA folder in both ComfyUI model paths and "
"LoRA Manager Extra Folder Paths. This can cause duplicate items or "
"other unexpected behavior, and it usually means the path setup is "
"not doing what you intended. LoRA Manager will keep the ComfyUI "
"path and ignore this Extra Folder Paths entry: '%s'. Please review "
"your path settings and remove the duplicate entry.",
normalized_path,
)
continue
symlink_path = primary_symlink_map.get(real_path)
if symlink_path:
# Config loading should stay tolerant of existing invalid state and warn.
logger.warning(
"Detected the same LoRA folder in both ComfyUI model paths and "
"LoRA Manager Extra Folder Paths. This can cause duplicate items or "
"other unexpected behavior, and it usually means the path setup is "
"not doing what you intended. LoRA Manager will keep the ComfyUI "
"path and ignore this Extra Folder Paths entry: '%s'. Please review "
"your path settings and remove the duplicate entry.",
normalized_path,
)
continue
filtered.append(stripped)
return filtered
def _collect_first_level_symlink_targets(
self, roots: Iterable[str]
) -> Dict[str, str]:
"""Return real-path -> link-path mappings for first-level symlinks under the given roots."""
targets: Dict[str, str] = {}
for root in roots:
if not isinstance(root, str):
continue
stripped_root = root.strip()
if not stripped_root or not os.path.isdir(stripped_root):
continue
try:
with os.scandir(stripped_root) as iterator:
for entry in iterator:
try:
if not self._entry_is_symlink(entry):
continue
target_path = os.path.realpath(entry.path)
if not os.path.isdir(target_path):
continue
normalized_target = self._normalize_path_for_comparison(
target_path,
resolve_realpath=True,
)
normalized_link = os.path.normpath(entry.path).replace(
os.sep, "/"
)
targets.setdefault(normalized_target, normalized_link)
except Exception as inner_exc:
logger.debug(
"Error collecting LoRA symlink target for %s: %s",
entry.path,
inner_exc,
)
except Exception as exc:
logger.debug(
"Error scanning first-level LoRA symlinks in %s: %s",
stripped_root,
exc,
)
return targets
def _prepare_checkpoint_paths( def _prepare_checkpoint_paths(
self, checkpoint_paths: Iterable[str], unet_paths: Iterable[str] self, checkpoint_paths: Iterable[str], unet_paths: Iterable[str]
) -> Tuple[List[str], List[str], List[str]]: ) -> Tuple[List[str], List[str], List[str]]:
@@ -796,7 +910,11 @@ class Config:
extra_unet_paths = extra_paths.get("unet", []) or [] extra_unet_paths = extra_paths.get("unet", []) or []
extra_embedding_paths = extra_paths.get("embeddings", []) or [] extra_embedding_paths = extra_paths.get("embeddings", []) or []
self.extra_loras_roots = self._prepare_lora_paths(extra_lora_paths) filtered_extra_lora_paths = self._filter_overlapping_extra_lora_paths(
self.loras_roots,
extra_lora_paths,
)
self.extra_loras_roots = self._prepare_lora_paths(filtered_extra_lora_paths)
( (
_, _,
self.extra_checkpoints_roots, self.extra_checkpoints_roots,

View File

@@ -732,19 +732,24 @@ class ModelScanner:
# Get current cached file paths # Get current cached file paths
cached_paths = {item['file_path'] for item in self._cache.raw_data} cached_paths = {item['file_path'] for item in self._cache.raw_data}
path_to_item = {item['file_path']: item for item in self._cache.raw_data} path_to_item = {item['file_path']: item for item in self._cache.raw_data}
cached_real_paths = {}
for cached_path in cached_paths:
try:
cached_real_paths.setdefault(os.path.realpath(cached_path), cached_path)
except Exception:
continue
# Track found files and new files # Track found files and new files
found_paths = set() found_paths = set()
new_files = [] new_files = []
visited_real_paths = set()
discovered_real_files = set()
# Scan all model roots # Scan all model roots
for root_path in self.get_model_roots(): for root_path in self.get_model_roots():
if not os.path.exists(root_path): if not os.path.exists(root_path):
continue continue
# Track visited real paths to avoid symlink loops
visited_real_paths = set()
# Recursively scan directory # Recursively scan directory
for root, _, files in os.walk(root_path, followlinks=True): for root, _, files in os.walk(root_path, followlinks=True):
real_root = os.path.realpath(root) real_root = os.path.realpath(root)
@@ -757,12 +762,18 @@ class ModelScanner:
if ext in self.file_extensions: if ext in self.file_extensions:
# Construct paths exactly as they would be in cache # Construct paths exactly as they would be in cache
file_path = os.path.join(root, file).replace(os.sep, '/') file_path = os.path.join(root, file).replace(os.sep, '/')
real_file_path = os.path.realpath(os.path.join(root, file))
# Check if this file is already in cache # Check if this file is already in cache
if file_path in cached_paths: if file_path in cached_paths:
found_paths.add(file_path) found_paths.add(file_path)
continue continue
cached_real_match = cached_real_paths.get(real_file_path)
if cached_real_match:
found_paths.add(cached_real_match)
continue
if file_path in self._excluded_models: if file_path in self._excluded_models:
continue continue
@@ -778,6 +789,10 @@ class ModelScanner:
if matched: if matched:
continue continue
if real_file_path in discovered_real_files:
continue
discovered_real_files.add(real_file_path)
# This is a new file to process # This is a new file to process
new_files.append(file_path) new_files.append(file_path)
@@ -1099,6 +1114,8 @@ class ModelScanner:
tags_count: Dict[str, int] = {} tags_count: Dict[str, int] = {}
excluded_models: List[str] = [] excluded_models: List[str] = []
processed_files = 0 processed_files = 0
processed_real_files: Set[str] = set()
visited_real_dirs: Set[str] = set()
async def handle_progress() -> None: async def handle_progress() -> None:
if progress_callback is None: if progress_callback is None:
@@ -1115,9 +1132,10 @@ class ModelScanner:
try: try:
real_path = os.path.realpath(current_path) real_path = os.path.realpath(current_path)
if real_path in visited_paths: if real_path in visited_paths or real_path in visited_real_dirs:
return return
visited_paths.add(real_path) visited_paths.add(real_path)
visited_real_dirs.add(real_path)
with os.scandir(current_path) as iterator: with os.scandir(current_path) as iterator:
entries = list(iterator) entries = list(iterator)
@@ -1130,6 +1148,11 @@ class ModelScanner:
continue continue
file_path = entry.path.replace(os.sep, "/") file_path = entry.path.replace(os.sep, "/")
real_file_path = os.path.realpath(entry.path)
if real_file_path in processed_real_files:
continue
processed_real_files.add(real_file_path)
result = await self._process_model_file( result = await self._process_model_file(
file_path, file_path,
root_path, root_path,

View File

@@ -1046,6 +1046,67 @@ class SettingsManager:
active_name = self.get_active_library_name() active_name = self.get_active_library_name()
self._validate_folder_paths(active_name, extra_folder_paths) self._validate_folder_paths(active_name, extra_folder_paths)
active_library = self.get_active_library()
active_folder_paths = active_library.get("folder_paths", {})
active_lora_paths = active_folder_paths.get("loras", []) or []
requested_extra_lora_paths = extra_folder_paths.get("loras", []) or []
primary_real_paths = set()
for path in active_lora_paths:
if not isinstance(path, str):
continue
stripped = path.strip()
if not stripped:
continue
normalized = os.path.normcase(os.path.normpath(stripped))
if os.path.exists(stripped):
normalized = os.path.normcase(os.path.normpath(os.path.realpath(stripped)))
primary_real_paths.add(normalized)
primary_symlink_targets = set()
for path in active_lora_paths:
if not isinstance(path, str):
continue
stripped = path.strip()
if not stripped or not os.path.isdir(stripped):
continue
try:
with os.scandir(stripped) as iterator:
for entry in iterator:
try:
if not entry.is_symlink():
continue
target_path = os.path.realpath(entry.path)
if not os.path.isdir(target_path):
continue
primary_symlink_targets.add(
os.path.normcase(os.path.normpath(target_path))
)
except Exception:
continue
except Exception:
continue
overlapping_paths = []
for path in requested_extra_lora_paths:
if not isinstance(path, str):
continue
stripped = path.strip()
if not stripped:
continue
normalized = os.path.normcase(os.path.normpath(stripped))
if os.path.exists(stripped):
normalized = os.path.normcase(os.path.normpath(os.path.realpath(stripped)))
if normalized in primary_real_paths or normalized in primary_symlink_targets:
overlapping_paths.append(stripped)
if overlapping_paths:
collisions = ", ".join(sorted(set(overlapping_paths)))
# Settings writes should reject new conflicting configuration instead of tolerating it.
raise ValueError(
f"Extra LoRA path(s) {collisions} overlap with the active library's primary LoRA roots"
)
normalized_paths = self._normalize_folder_paths(extra_folder_paths) normalized_paths = self._normalize_folder_paths(extra_folder_paths)
self.settings["extra_folder_paths"] = normalized_paths self.settings["extra_folder_paths"] = normalized_paths
self._update_active_library_entry(extra_folder_paths=normalized_paths) self._update_active_library_entry(extra_folder_paths=normalized_paths)

View File

@@ -322,3 +322,182 @@ def test_extra_paths_deduplication(monkeypatch, tmp_path):
assert config_instance.loras_roots == [str(loras_dir)] assert config_instance.loras_roots == [str(loras_dir)]
assert config_instance.extra_loras_roots == [str(extra_loras_dir)] assert config_instance.extra_loras_roots == [str(extra_loras_dir)]
def test_apply_library_settings_ignores_extra_lora_path_overlapping_primary_symlink(
monkeypatch, tmp_path, caplog
):
"""Extra LoRA paths should be ignored when they resolve to the same target as a primary root."""
real_loras_dir = tmp_path / "loras_real"
real_loras_dir.mkdir()
loras_link = tmp_path / "loras_link"
loras_link.symlink_to(real_loras_dir, target_is_directory=True)
config_instance = config_module.Config()
library_config = {
"folder_paths": {
"loras": [str(loras_link)],
"checkpoints": [],
"unet": [],
"embeddings": [],
},
"extra_folder_paths": {
"loras": [str(real_loras_dir)],
"checkpoints": [],
"unet": [],
"embeddings": [],
},
}
with caplog.at_level("WARNING", logger=config_module.logger.name):
config_instance.apply_library_settings(library_config)
assert config_instance.loras_roots == [str(loras_link)]
assert config_instance.extra_loras_roots == []
warning_messages = [
record.message
for record in caplog.records
if record.levelname == "WARNING"
and "same lora folder" in record.message.lower()
]
assert len(warning_messages) == 1
assert "comfyui model paths" in warning_messages[0].lower()
assert "extra folder paths" in warning_messages[0].lower()
assert "duplicate items" in warning_messages[0].lower()
def test_apply_library_settings_detects_overlap_case_insensitively(
monkeypatch, tmp_path, caplog
):
"""Overlap detection should use case-insensitive comparison on Windows-like paths."""
real_loras_dir = tmp_path / "loras_real"
real_loras_dir.mkdir()
loras_link = tmp_path / "loras_link"
loras_link.symlink_to(real_loras_dir, target_is_directory=True)
original_exists = config_module.os.path.exists
original_realpath = config_module.os.path.realpath
original_normcase = config_module.os.path.normcase
def fake_exists(path):
if isinstance(path, str) and path.lower() in {
str(loras_link).lower(),
str(real_loras_dir).lower(),
str(loras_link).upper().lower(),
str(real_loras_dir).upper().lower(),
}:
return True
return original_exists(path)
def fake_realpath(path, *args, **kwargs):
if isinstance(path, str):
lowered = path.lower()
if lowered == str(loras_link).lower():
return str(real_loras_dir)
if lowered == str(real_loras_dir).lower():
return str(real_loras_dir)
return original_realpath(path, *args, **kwargs)
monkeypatch.setattr(config_module.os.path, "exists", fake_exists)
monkeypatch.setattr(config_module.os.path, "realpath", fake_realpath)
monkeypatch.setattr(
config_module.os.path,
"normcase",
lambda value: original_normcase(value).lower(),
)
config_instance = config_module.Config()
primary_path = str(loras_link).replace("loras_link", "LORAS_LINK")
extra_path = str(real_loras_dir).replace("loras_real", "loras_real")
library_config = {
"folder_paths": {
"loras": [primary_path],
"checkpoints": [],
"unet": [],
"embeddings": [],
},
"extra_folder_paths": {
"loras": [extra_path.upper()],
"checkpoints": [],
"unet": [],
"embeddings": [],
},
}
with caplog.at_level("WARNING", logger=config_module.logger.name):
config_instance.apply_library_settings(library_config)
assert config_instance.loras_roots == [primary_path]
assert config_instance.extra_loras_roots == []
assert any("same lora folder" in record.message.lower() for record in caplog.records)
def test_apply_library_settings_ignores_missing_extra_lora_paths(monkeypatch, tmp_path, caplog):
"""Missing extra paths should be ignored without overlap warnings."""
loras_dir = tmp_path / "loras"
loras_dir.mkdir()
missing_extra = tmp_path / "missing_loras"
config_instance = config_module.Config()
library_config = {
"folder_paths": {
"loras": [str(loras_dir)],
"checkpoints": [],
"unet": [],
"embeddings": [],
},
"extra_folder_paths": {
"loras": [str(missing_extra)],
"checkpoints": [],
"unet": [],
"embeddings": [],
},
}
with caplog.at_level("WARNING", logger=config_module.logger.name):
config_instance.apply_library_settings(library_config)
assert config_instance.loras_roots == [str(loras_dir)]
assert config_instance.extra_loras_roots == []
assert not any("same lora folder" in record.message.lower() for record in caplog.records)
def test_apply_library_settings_ignores_extra_lora_path_overlapping_primary_root_symlink(
tmp_path, caplog
):
"""Extra LoRA paths should be ignored when already reachable via a first-level symlink under the primary root."""
loras_dir = tmp_path / "loras"
loras_dir.mkdir()
external_dir = tmp_path / "external_loras"
external_dir.mkdir()
link_dir = loras_dir / "link"
link_dir.symlink_to(external_dir, target_is_directory=True)
config_instance = config_module.Config()
library_config = {
"folder_paths": {
"loras": [str(loras_dir)],
"checkpoints": [],
"unet": [],
"embeddings": [],
},
"extra_folder_paths": {
"loras": [str(external_dir)],
"checkpoints": [],
"unet": [],
"embeddings": [],
},
}
with caplog.at_level("WARNING", logger=config_module.logger.name):
config_instance.apply_library_settings(library_config)
assert config_instance.loras_roots == [str(loras_dir)]
assert config_instance.extra_loras_roots == []
assert any(
"same lora folder" in record.message.lower()
for record in caplog.records
)

View File

@@ -73,6 +73,15 @@ class DummyScanner(ModelScanner):
} }
class MultiRootDummyScanner(DummyScanner):
def __init__(self, roots: List[Path]):
self._roots = [str(root) for root in roots]
super().__init__(roots[0])
def get_model_roots(self) -> List[str]:
return list(self._roots)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def reset_model_scanner_singletons(): def reset_model_scanner_singletons():
ModelScanner._instances.clear() ModelScanner._instances.clear()
@@ -559,3 +568,59 @@ async def test_count_model_files_handles_symlink_loops(tmp_path: Path):
count = scanner._count_model_files() count = scanner._count_model_files()
assert count == 2 assert count == 2
@pytest.mark.asyncio
async def test_initialize_cache_dedupes_files_reachable_via_primary_symlink_and_extra_root(
tmp_path: Path,
):
loras_root = tmp_path / "loras"
loras_root.mkdir()
extra_root = tmp_path / "extra"
extra_root.mkdir()
(extra_root / "one.txt").write_text("one", encoding="utf-8")
(loras_root / "link").symlink_to(extra_root, target_is_directory=True)
scanner = MultiRootDummyScanner([loras_root, extra_root])
await scanner._initialize_cache()
cache = await scanner.get_cached_data()
assert len(cache.raw_data) == 1
assert cache.raw_data[0]["file_path"] == _normalize_path(loras_root / "link" / "one.txt")
@pytest.mark.asyncio
async def test_reconcile_cache_removes_duplicate_alias_when_same_real_file_seen_once(
tmp_path: Path,
):
loras_root = tmp_path / "loras"
loras_root.mkdir()
extra_root = tmp_path / "extra"
extra_root.mkdir()
real_file = extra_root / "one.txt"
real_file.write_text("one", encoding="utf-8")
(loras_root / "link").symlink_to(extra_root, target_is_directory=True)
scanner = MultiRootDummyScanner([loras_root, extra_root])
await scanner._initialize_cache()
duplicate_entry = {
"file_path": _normalize_path(extra_root / "one.txt"),
"folder": "",
"sha256": "hash-one",
"tags": ["alpha"],
"model_name": "one",
"size": 1,
"modified": 1.0,
"license_flags": DEFAULT_LICENSE_FLAGS,
}
scanner._cache.raw_data.append(duplicate_entry)
scanner._cache.add_to_version_index(duplicate_entry)
scanner._hash_index.add_entry("hash-one", duplicate_entry["file_path"])
await scanner._reconcile_cache()
cache = await scanner.get_cached_data()
cached_paths = {item["file_path"] for item in cache.raw_data}
assert cached_paths == {_normalize_path(loras_root / "link" / "one.txt")}

View File

@@ -590,6 +590,97 @@ def test_extra_paths_validation_no_overlap_with_other_libraries(manager, tmp_pat
manager.update_extra_folder_paths({"loras": [str(lora_dir1)]}) manager.update_extra_folder_paths({"loras": [str(lora_dir1)]})
def test_extra_paths_validation_no_overlap_with_active_primary_lora_root(manager, tmp_path):
"""Test that extra LoRA paths cannot overlap the active library primary LoRA roots."""
real_lora_dir = tmp_path / "loras_real"
real_lora_dir.mkdir()
lora_link = tmp_path / "loras_link"
lora_link.symlink_to(real_lora_dir, target_is_directory=True)
manager.create_library(
"library1",
folder_paths={"loras": [str(lora_link)]},
activate=True,
)
with pytest.raises(ValueError, match="overlap with the active library's primary LoRA roots"):
manager.update_extra_folder_paths({"loras": [str(real_lora_dir)]})
def test_extra_paths_validation_no_overlap_with_active_primary_lora_root_case_insensitive(
manager, monkeypatch, tmp_path
):
"""Overlap validation should treat differently-cased Windows-like paths as the same path."""
real_lora_dir = tmp_path / "loras_real"
real_lora_dir.mkdir()
lora_link = tmp_path / "loras_link"
lora_link.symlink_to(real_lora_dir, target_is_directory=True)
manager.create_library(
"library1",
folder_paths={"loras": [str(lora_link)]},
activate=True,
)
original_exists = settings_manager_module.os.path.exists
original_realpath = settings_manager_module.os.path.realpath
original_normcase = settings_manager_module.os.path.normcase
def fake_exists(path):
if isinstance(path, str) and path.lower() in {str(lora_link).lower(), str(real_lora_dir).lower()}:
return True
return original_exists(path)
def fake_realpath(path):
if isinstance(path, str) and path.lower() == str(lora_link).lower():
return str(real_lora_dir)
return original_realpath(path)
monkeypatch.setattr(settings_manager_module.os.path, "exists", fake_exists)
monkeypatch.setattr(settings_manager_module.os.path, "realpath", fake_realpath)
monkeypatch.setattr(settings_manager_module.os.path, "normcase", lambda value: original_normcase(value).lower())
with pytest.raises(ValueError, match="overlap with the active library's primary LoRA roots"):
manager.update_extra_folder_paths({"loras": [str(real_lora_dir).upper()]})
def test_extra_paths_validation_allows_missing_non_overlapping_lora_root(manager, tmp_path):
"""Missing non-overlapping extra LoRA paths should not be rejected."""
lora_dir = tmp_path / "loras"
lora_dir.mkdir()
missing_extra = tmp_path / "missing_loras"
manager.create_library(
"library1",
folder_paths={"loras": [str(lora_dir)]},
activate=True,
)
manager.update_extra_folder_paths({"loras": [str(missing_extra)]})
extra_paths = manager.get_extra_folder_paths()
assert extra_paths["loras"] == [str(missing_extra)]
def test_extra_paths_validation_rejects_primary_root_first_level_symlink_target(manager, tmp_path):
"""Extra LoRA paths should be rejected when already reachable via a first-level symlink under the primary root."""
lora_dir = tmp_path / "loras"
lora_dir.mkdir()
external_dir = tmp_path / "external_loras"
external_dir.mkdir()
link_dir = lora_dir / "link"
link_dir.symlink_to(external_dir, target_is_directory=True)
manager.create_library(
"library1",
folder_paths={"loras": [str(lora_dir)]},
activate=True,
)
with pytest.raises(ValueError, match="overlap with the active library's primary LoRA roots"):
manager.update_extra_folder_paths({"loras": [str(external_dir)]})
def test_delete_library_switches_active(manager, tmp_path): def test_delete_library_switches_active(manager, tmp_path):
other_dir = tmp_path / "other" other_dir = tmp_path / "other"
other_dir.mkdir() other_dir.mkdir()