mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
Introduce extra_folder_paths feature to allow users to add additional model roots that are managed by LoRA Manager but not shared with ComfyUI. Changes: - Add extra_folder_paths support in SettingsManager (stored per library) - Add extra path attributes in Config class (extra_loras_roots, etc.) - Merge folder_paths with extra_folder_paths when applying library settings - Update LoraScanner, CheckpointScanner, EmbeddingScanner to include extra paths in their model roots - Add comprehensive tests for the new functionality This enables users to manage models from additional directories without modifying ComfyUI's model folder configuration.
1614 lines
65 KiB
Python
1614 lines
65 KiB
Python
import asyncio
|
|
import copy
|
|
import json
|
|
import os
|
|
import shutil
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from threading import Lock
|
|
from typing import Any, Awaitable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple
|
|
|
|
from platformdirs import user_config_dir
|
|
|
|
from ..utils.constants import DEFAULT_HASH_CHUNK_SIZE_MB, DEFAULT_PRIORITY_TAG_CONFIG
|
|
from ..utils.settings_paths import APP_NAME, ensure_settings_file, get_legacy_settings_path
|
|
from ..utils.tag_priorities import (
|
|
PriorityTagEntry,
|
|
collect_canonical_tags,
|
|
parse_priority_tag_string,
|
|
resolve_priority_tag,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
CORE_USER_SETTING_KEYS: Tuple[str, ...] = (
|
|
"civitai_api_key",
|
|
"folder_paths",
|
|
)
|
|
|
|
# Threshold for aggressive cleanup: if file contains this many default keys, clean it up
|
|
DEFAULT_KEYS_CLEANUP_THRESHOLD = 10
|
|
|
|
|
|
DEFAULT_SETTINGS: Dict[str, Any] = {
|
|
"civitai_api_key": "",
|
|
"use_portable_settings": False,
|
|
"hash_chunk_size_mb": DEFAULT_HASH_CHUNK_SIZE_MB,
|
|
"language": "en",
|
|
"show_only_sfw": False,
|
|
"onboarding_completed": False,
|
|
"dismissed_banners": [],
|
|
"enable_metadata_archive_db": False,
|
|
"proxy_enabled": False,
|
|
"proxy_host": "",
|
|
"proxy_port": "",
|
|
"proxy_username": "",
|
|
"proxy_password": "",
|
|
"proxy_type": "http",
|
|
"default_lora_root": "",
|
|
"default_checkpoint_root": "",
|
|
"default_unet_root": "",
|
|
"default_embedding_root": "",
|
|
"base_model_path_mappings": {},
|
|
"download_path_templates": {},
|
|
"folder_paths": {},
|
|
"extra_folder_paths": {},
|
|
"example_images_path": "",
|
|
"optimize_example_images": True,
|
|
"auto_download_example_images": False,
|
|
"blur_mature_content": True,
|
|
"autoplay_on_hover": False,
|
|
"display_density": "default",
|
|
"card_info_display": "always",
|
|
"show_folder_sidebar": True,
|
|
"include_trigger_words": False,
|
|
"compact_mode": False,
|
|
"priority_tags": DEFAULT_PRIORITY_TAG_CONFIG.copy(),
|
|
"model_name_display": "model_name",
|
|
"model_card_footer_action": "replace_preview",
|
|
"update_flag_strategy": "same_base",
|
|
"auto_organize_exclusions": [],
|
|
"metadata_refresh_skip_paths": [],
|
|
}
|
|
|
|
|
|
class SettingsManager:
|
|
def __init__(self):
|
|
self.settings_file = ensure_settings_file(logger)
|
|
self._pending_portable_switch: Optional[Dict[str, str]] = None
|
|
self._standalone_mode = self._detect_standalone_mode()
|
|
self._startup_messages: List[Dict[str, Any]] = []
|
|
self._needs_initial_save = False
|
|
self._bootstrap_reason: Optional[str] = None
|
|
self._seed_template: Optional[Dict[str, Any]] = None
|
|
self._template_payload_cache: Optional[Dict[str, Any]] = None
|
|
self._template_payload_cache_loaded = False
|
|
self._original_disk_payload: Optional[Dict[str, Any]] = None
|
|
self._preserve_disk_template = False
|
|
self._template_path = Path(__file__).resolve().parents[2] / "settings.json.example"
|
|
self.settings = self._load_settings()
|
|
self._migrate_setting_keys()
|
|
self._ensure_default_settings()
|
|
self._migrate_to_library_registry()
|
|
self._migrate_download_path_template()
|
|
self._auto_set_default_roots()
|
|
self._check_environment_variables()
|
|
self._collect_configuration_warnings()
|
|
|
|
if self._needs_initial_save:
|
|
self._save_settings()
|
|
self._needs_initial_save = False
|
|
else:
|
|
# Clean up existing settings file by removing default values
|
|
self._cleanup_default_values_from_disk()
|
|
|
|
def _detect_standalone_mode(self) -> bool:
|
|
"""Return ``True`` when running in standalone mode."""
|
|
|
|
return os.environ.get("LORA_MANAGER_STANDALONE") == "1"
|
|
|
|
def _load_settings(self) -> Dict[str, Any]:
|
|
"""Load settings from file"""
|
|
if os.path.exists(self.settings_file):
|
|
try:
|
|
with open(self.settings_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if isinstance(data, dict):
|
|
self._original_disk_payload = copy.deepcopy(data)
|
|
if self._matches_template_payload(data):
|
|
self._preserve_disk_template = True
|
|
return data
|
|
except json.JSONDecodeError as exc:
|
|
logger.error("Failed to parse settings.json: %s", exc)
|
|
self._add_startup_message(
|
|
code="settings-json-invalid",
|
|
title="Settings file could not be parsed",
|
|
message=(
|
|
"LoRA Manager could not parse settings.json. Default settings "
|
|
"will be used for this session."
|
|
),
|
|
severity="error",
|
|
actions=self._default_settings_actions(),
|
|
details=str(exc),
|
|
dismissible=False,
|
|
)
|
|
self._needs_initial_save = True
|
|
self._bootstrap_reason = "invalid"
|
|
except Exception as exc: # pragma: no cover - defensive guard
|
|
logger.error("Unexpected error loading settings: %s", exc)
|
|
self._add_startup_message(
|
|
code="settings-json-unreadable",
|
|
title="Settings file could not be read",
|
|
message="LoRA Manager could not read settings.json. Default settings will be used for this session.",
|
|
severity="error",
|
|
actions=self._default_settings_actions(),
|
|
details=str(exc),
|
|
dismissible=False,
|
|
)
|
|
self._needs_initial_save = True
|
|
self._bootstrap_reason = "unreadable"
|
|
|
|
if not os.path.exists(self.settings_file):
|
|
self._needs_initial_save = True
|
|
self._bootstrap_reason = "missing"
|
|
seeded = self._load_settings_template()
|
|
if seeded is not None:
|
|
defaults = self._get_default_settings()
|
|
merged = self._merge_template_with_defaults(defaults, seeded)
|
|
return merged
|
|
return self._get_default_settings()
|
|
|
|
def _load_settings_template(self) -> Optional[Dict[str, Any]]:
|
|
"""Load the bundled template when no user settings are found."""
|
|
|
|
payload = self._read_template_payload()
|
|
if payload is None:
|
|
return None
|
|
|
|
self._seed_template = copy.deepcopy(payload)
|
|
return copy.deepcopy(payload)
|
|
|
|
def _read_template_payload(self) -> Optional[Dict[str, Any]]:
|
|
"""Return the cached contents of ``settings.json.example`` when available."""
|
|
|
|
if self._template_payload_cache_loaded:
|
|
if self._template_payload_cache is None:
|
|
return None
|
|
return copy.deepcopy(self._template_payload_cache)
|
|
|
|
self._template_payload_cache_loaded = True
|
|
|
|
try:
|
|
with self._template_path.open("r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
except FileNotFoundError:
|
|
logger.debug("settings.json.example not found at %s", self._template_path)
|
|
return None
|
|
except json.JSONDecodeError as exc:
|
|
logger.warning("Failed to parse settings.json.example: %s", exc)
|
|
return None
|
|
|
|
if not isinstance(data, dict):
|
|
logger.debug("settings.json.example is not a JSON object; ignoring template")
|
|
return None
|
|
|
|
self._template_payload_cache = copy.deepcopy(data)
|
|
return copy.deepcopy(self._template_payload_cache)
|
|
|
|
def _matches_template_payload(self, payload: Mapping[str, Any]) -> bool:
|
|
"""Return ``True`` when ``payload`` matches the bundled template."""
|
|
|
|
template = self._read_template_payload()
|
|
if template is None:
|
|
return False
|
|
|
|
return payload == template
|
|
|
|
def _merge_template_with_defaults(
|
|
self, defaults: Dict[str, Any], template: Mapping[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""Merge template values into the in-memory defaults."""
|
|
|
|
merged = copy.deepcopy(defaults)
|
|
for key, value in template.items():
|
|
if key == "folder_paths" and isinstance(value, Mapping):
|
|
merged[key] = self._normalize_folder_paths(value)
|
|
else:
|
|
merged[key] = copy.deepcopy(value)
|
|
|
|
merged.setdefault("language", "en")
|
|
merged.setdefault("folder_paths", {})
|
|
library_name = merged.get("active_library") or "default"
|
|
merged["libraries"] = {
|
|
library_name: self._build_library_payload(
|
|
folder_paths=merged.get("folder_paths", {}),
|
|
default_lora_root=merged.get("default_lora_root"),
|
|
default_checkpoint_root=merged.get("default_checkpoint_root"),
|
|
default_unet_root=merged.get("default_unet_root"),
|
|
default_embedding_root=merged.get("default_embedding_root"),
|
|
)
|
|
}
|
|
merged["active_library"] = library_name
|
|
return merged
|
|
|
|
def _ensure_default_settings(self) -> None:
|
|
"""Ensure all default settings keys exist in memory (but don't save defaults to disk)"""
|
|
defaults = self._get_default_settings()
|
|
updated_existing = False
|
|
inserted_defaults = False
|
|
|
|
if "priority_tags" in self.settings:
|
|
normalized_priority = self._normalize_priority_tag_config(
|
|
self.settings.get("priority_tags")
|
|
)
|
|
if normalized_priority != self.settings.get("priority_tags"):
|
|
self.settings["priority_tags"] = normalized_priority
|
|
updated_existing = True
|
|
else:
|
|
self.settings["priority_tags"] = copy.deepcopy(
|
|
defaults.get("priority_tags", DEFAULT_PRIORITY_TAG_CONFIG)
|
|
)
|
|
inserted_defaults = True
|
|
|
|
if "auto_organize_exclusions" in self.settings:
|
|
normalized_exclusions = self.normalize_auto_organize_exclusions(
|
|
self.settings.get("auto_organize_exclusions")
|
|
)
|
|
if normalized_exclusions != self.settings.get("auto_organize_exclusions"):
|
|
self.settings["auto_organize_exclusions"] = normalized_exclusions
|
|
updated_existing = True
|
|
else:
|
|
self.settings["auto_organize_exclusions"] = []
|
|
inserted_defaults = True
|
|
|
|
if "metadata_refresh_skip_paths" in self.settings:
|
|
normalized_skip_paths = self.normalize_metadata_refresh_skip_paths(
|
|
self.settings.get("metadata_refresh_skip_paths")
|
|
)
|
|
if normalized_skip_paths != self.settings.get("metadata_refresh_skip_paths"):
|
|
self.settings["metadata_refresh_skip_paths"] = normalized_skip_paths
|
|
updated_existing = True
|
|
else:
|
|
self.settings["metadata_refresh_skip_paths"] = []
|
|
inserted_defaults = True
|
|
|
|
for key, value in defaults.items():
|
|
if key == "priority_tags":
|
|
continue
|
|
if key not in self.settings:
|
|
if isinstance(value, dict):
|
|
self.settings[key] = copy.deepcopy(value)
|
|
else:
|
|
self.settings[key] = value
|
|
inserted_defaults = True
|
|
|
|
# Save only if existing values were normalized/updated
|
|
if updated_existing:
|
|
self._save_settings()
|
|
# Note: inserted_defaults no longer triggers save - defaults stay in memory only
|
|
|
|
def _migrate_to_library_registry(self) -> None:
|
|
"""Ensure settings include the multi-library registry structure."""
|
|
libraries = self.settings.get("libraries")
|
|
active_name = self.settings.get("active_library")
|
|
initial_bootstrap = self._bootstrap_reason == "missing"
|
|
|
|
raw_top_level_paths = self.settings.get("folder_paths", {})
|
|
normalized_top_level_paths: Dict[str, List[str]] = {}
|
|
if isinstance(raw_top_level_paths, Mapping):
|
|
normalized_top_level_paths = self._normalize_folder_paths(raw_top_level_paths)
|
|
if normalized_top_level_paths != raw_top_level_paths:
|
|
self.settings["folder_paths"] = copy.deepcopy(normalized_top_level_paths)
|
|
|
|
top_level_has_paths = self._has_configured_paths(normalized_top_level_paths)
|
|
|
|
needs_library_bootstrap = not isinstance(libraries, dict) or not libraries
|
|
|
|
if (
|
|
not needs_library_bootstrap
|
|
and top_level_has_paths
|
|
and len(libraries) == 1
|
|
):
|
|
only_library_payload = next(iter(libraries.values()))
|
|
if isinstance(only_library_payload, Mapping):
|
|
folder_payload = only_library_payload.get("folder_paths")
|
|
if not self._has_configured_paths(folder_payload):
|
|
needs_library_bootstrap = True
|
|
|
|
if needs_library_bootstrap:
|
|
library_name = active_name or "default"
|
|
library_payload = self._build_library_payload(
|
|
folder_paths=normalized_top_level_paths,
|
|
default_lora_root=self.settings.get("default_lora_root", ""),
|
|
default_checkpoint_root=self.settings.get("default_checkpoint_root", ""),
|
|
default_unet_root=self.settings.get("default_unet_root", ""),
|
|
default_embedding_root=self.settings.get("default_embedding_root", ""),
|
|
)
|
|
libraries = {library_name: library_payload}
|
|
self.settings["libraries"] = libraries
|
|
self.settings["active_library"] = library_name
|
|
self._sync_active_library_to_root(save=False)
|
|
if not initial_bootstrap and not self._preserve_disk_template:
|
|
self._save_settings()
|
|
return
|
|
|
|
seed_library_name: Optional[str] = None
|
|
if top_level_has_paths and isinstance(libraries, dict):
|
|
target_name: Optional[str] = None
|
|
if active_name and active_name in libraries:
|
|
target_name = active_name
|
|
elif len(libraries) == 1:
|
|
target_name = next(iter(libraries.keys()))
|
|
|
|
if target_name:
|
|
candidate_payload = libraries.get(target_name)
|
|
if isinstance(candidate_payload, Mapping) and not self._has_configured_paths(candidate_payload.get("folder_paths")):
|
|
seed_library_name = target_name
|
|
|
|
sanitized_libraries: Dict[str, Dict[str, Any]] = {}
|
|
changed = False
|
|
for name, data in libraries.items():
|
|
if not isinstance(data, dict):
|
|
data = {}
|
|
changed = True
|
|
|
|
candidate_folder_paths = data.get("folder_paths")
|
|
if (
|
|
seed_library_name == name
|
|
and not self._has_configured_paths(candidate_folder_paths)
|
|
and top_level_has_paths
|
|
):
|
|
candidate_folder_paths = normalized_top_level_paths
|
|
|
|
payload = self._build_library_payload(
|
|
folder_paths=candidate_folder_paths,
|
|
default_lora_root=data.get("default_lora_root"),
|
|
default_checkpoint_root=data.get("default_checkpoint_root"),
|
|
default_unet_root=data.get("default_unet_root"),
|
|
default_embedding_root=data.get("default_embedding_root"),
|
|
metadata=data.get("metadata"),
|
|
base=data,
|
|
)
|
|
sanitized_libraries[name] = payload
|
|
if payload is not data:
|
|
changed = True
|
|
|
|
if changed:
|
|
self.settings["libraries"] = sanitized_libraries
|
|
|
|
if not active_name or active_name not in sanitized_libraries:
|
|
changed = True
|
|
if sanitized_libraries:
|
|
self.settings["active_library"] = next(iter(sanitized_libraries.keys()))
|
|
else:
|
|
self.settings["active_library"] = "default"
|
|
|
|
self._sync_active_library_to_root(save=changed and not initial_bootstrap)
|
|
if changed and initial_bootstrap:
|
|
self._needs_initial_save = True
|
|
|
|
def _sync_active_library_to_root(self, *, save: bool = False) -> None:
|
|
"""Update top-level folder path settings to mirror the active library."""
|
|
libraries = self.settings.get("libraries", {})
|
|
active_name = self.settings.get("active_library")
|
|
if not libraries:
|
|
return
|
|
|
|
if active_name not in libraries:
|
|
active_name = next(iter(libraries.keys()))
|
|
self.settings["active_library"] = active_name
|
|
|
|
active_library = libraries.get(active_name, {})
|
|
folder_paths = copy.deepcopy(active_library.get("folder_paths", {}))
|
|
self.settings["folder_paths"] = folder_paths
|
|
self.settings["extra_folder_paths"] = copy.deepcopy(active_library.get("extra_folder_paths", {}))
|
|
self.settings["default_lora_root"] = active_library.get("default_lora_root", "")
|
|
self.settings["default_checkpoint_root"] = active_library.get("default_checkpoint_root", "")
|
|
self.settings["default_unet_root"] = active_library.get("default_unet_root", "")
|
|
self.settings["default_embedding_root"] = active_library.get("default_embedding_root", "")
|
|
|
|
if save:
|
|
self._save_settings()
|
|
|
|
def _current_timestamp(self) -> str:
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
|
|
|
|
def _build_library_payload(
|
|
self,
|
|
*,
|
|
folder_paths: Optional[Mapping[str, Iterable[str]]] = None,
|
|
extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None,
|
|
default_lora_root: Optional[str] = None,
|
|
default_checkpoint_root: Optional[str] = None,
|
|
default_unet_root: Optional[str] = None,
|
|
default_embedding_root: Optional[str] = None,
|
|
metadata: Optional[Mapping[str, Any]] = None,
|
|
base: Optional[Mapping[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
payload: Dict[str, Any] = dict(base or {})
|
|
timestamp = self._current_timestamp()
|
|
|
|
if folder_paths is not None:
|
|
payload["folder_paths"] = self._normalize_folder_paths(folder_paths)
|
|
else:
|
|
payload.setdefault("folder_paths", {})
|
|
|
|
if extra_folder_paths is not None:
|
|
payload["extra_folder_paths"] = self._normalize_folder_paths(extra_folder_paths)
|
|
else:
|
|
payload.setdefault("extra_folder_paths", {})
|
|
|
|
if default_lora_root is not None:
|
|
payload["default_lora_root"] = default_lora_root
|
|
else:
|
|
payload.setdefault("default_lora_root", "")
|
|
|
|
if default_checkpoint_root is not None:
|
|
payload["default_checkpoint_root"] = default_checkpoint_root
|
|
else:
|
|
payload.setdefault("default_checkpoint_root", "")
|
|
|
|
if default_unet_root is not None:
|
|
payload["default_unet_root"] = default_unet_root
|
|
else:
|
|
payload.setdefault("default_unet_root", "")
|
|
|
|
if default_embedding_root is not None:
|
|
payload["default_embedding_root"] = default_embedding_root
|
|
else:
|
|
payload.setdefault("default_embedding_root", "")
|
|
|
|
if metadata:
|
|
merged_meta = dict(payload.get("metadata", {}))
|
|
merged_meta.update(metadata)
|
|
payload["metadata"] = merged_meta
|
|
|
|
payload.setdefault("created_at", timestamp)
|
|
payload["updated_at"] = timestamp
|
|
return payload
|
|
|
|
def _normalize_folder_paths(
|
|
self, folder_paths: Mapping[str, Iterable[str]]
|
|
) -> Dict[str, List[str]]:
|
|
normalized: Dict[str, List[str]] = {}
|
|
for key, values in folder_paths.items():
|
|
if not isinstance(values, Iterable):
|
|
continue
|
|
cleaned: List[str] = []
|
|
seen = set()
|
|
for value in values:
|
|
if not isinstance(value, str):
|
|
continue
|
|
stripped = value.strip()
|
|
if not stripped:
|
|
continue
|
|
if stripped not in seen:
|
|
cleaned.append(stripped)
|
|
seen.add(stripped)
|
|
normalized[key] = cleaned
|
|
return normalized
|
|
|
|
def _has_configured_paths(self, folder_paths: Any) -> bool:
|
|
if not isinstance(folder_paths, Mapping):
|
|
return False
|
|
|
|
for values in folder_paths.values():
|
|
if isinstance(values, str):
|
|
candidate_values = [values]
|
|
else:
|
|
try:
|
|
candidate_values = list(values) # type: ignore[arg-type]
|
|
except TypeError:
|
|
continue
|
|
|
|
for path in candidate_values:
|
|
if isinstance(path, str) and path.strip():
|
|
return True
|
|
|
|
return False
|
|
|
|
def _validate_folder_paths(
|
|
self,
|
|
library_name: str,
|
|
folder_paths: Mapping[str, Iterable[str]],
|
|
) -> None:
|
|
"""Ensure folder paths do not overlap with other libraries."""
|
|
libraries = self.settings.get("libraries", {})
|
|
normalized_new: Dict[str, Dict[str, str]] = {}
|
|
for key, values in folder_paths.items():
|
|
path_map: Dict[str, str] = {}
|
|
for value in values:
|
|
if not isinstance(value, str):
|
|
continue
|
|
stripped = value.strip()
|
|
if not stripped:
|
|
continue
|
|
normalized_value = os.path.normcase(os.path.normpath(stripped))
|
|
path_map[normalized_value] = stripped
|
|
if path_map:
|
|
normalized_new[key] = path_map
|
|
|
|
if not normalized_new:
|
|
return
|
|
|
|
for other_name, other in libraries.items():
|
|
if other_name == library_name:
|
|
continue
|
|
other_paths = other.get("folder_paths", {})
|
|
for key, new_paths in normalized_new.items():
|
|
existing = {
|
|
os.path.normcase(os.path.normpath(path))
|
|
for path in other_paths.get(key, [])
|
|
if isinstance(path, str) and path
|
|
}
|
|
overlap = existing.intersection(new_paths.keys())
|
|
if overlap:
|
|
collisions = ", ".join(sorted(new_paths[value] for value in overlap))
|
|
raise ValueError(
|
|
f"Folder path(s) {collisions} already assigned to library '{other_name}'"
|
|
)
|
|
|
|
def _update_active_library_entry(
|
|
self,
|
|
*,
|
|
folder_paths: Optional[Mapping[str, Iterable[str]]] = None,
|
|
extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None,
|
|
default_lora_root: Optional[str] = None,
|
|
default_checkpoint_root: Optional[str] = None,
|
|
default_unet_root: Optional[str] = None,
|
|
default_embedding_root: Optional[str] = None,
|
|
) -> bool:
|
|
libraries = self.settings.get("libraries", {})
|
|
active_name = self.settings.get("active_library")
|
|
if not active_name or active_name not in libraries:
|
|
return False
|
|
|
|
library = libraries[active_name]
|
|
changed = False
|
|
|
|
if folder_paths is not None:
|
|
normalized_paths = self._normalize_folder_paths(folder_paths)
|
|
if library.get("folder_paths") != normalized_paths:
|
|
library["folder_paths"] = normalized_paths
|
|
changed = True
|
|
|
|
if extra_folder_paths is not None:
|
|
normalized_extra_paths = self._normalize_folder_paths(extra_folder_paths)
|
|
if library.get("extra_folder_paths") != normalized_extra_paths:
|
|
library["extra_folder_paths"] = normalized_extra_paths
|
|
changed = True
|
|
|
|
if default_lora_root is not None and library.get("default_lora_root") != default_lora_root:
|
|
library["default_lora_root"] = default_lora_root
|
|
changed = True
|
|
|
|
if default_checkpoint_root is not None and library.get("default_checkpoint_root") != default_checkpoint_root:
|
|
library["default_checkpoint_root"] = default_checkpoint_root
|
|
changed = True
|
|
|
|
if default_unet_root is not None and library.get("default_unet_root") != default_unet_root:
|
|
library["default_unet_root"] = default_unet_root
|
|
changed = True
|
|
|
|
if default_embedding_root is not None and library.get("default_embedding_root") != default_embedding_root:
|
|
library["default_embedding_root"] = default_embedding_root
|
|
changed = True
|
|
|
|
if changed:
|
|
library.setdefault("created_at", self._current_timestamp())
|
|
library["updated_at"] = self._current_timestamp()
|
|
|
|
return changed
|
|
|
|
def _migrate_setting_keys(self) -> None:
|
|
"""Migrate legacy camelCase setting keys to snake_case"""
|
|
key_migrations = {
|
|
'optimizeExampleImages': 'optimize_example_images',
|
|
'autoDownloadExampleImages': 'auto_download_example_images',
|
|
'blurMatureContent': 'blur_mature_content',
|
|
'autoplayOnHover': 'autoplay_on_hover',
|
|
'displayDensity': 'display_density',
|
|
'cardInfoDisplay': 'card_info_display',
|
|
'includeTriggerWords': 'include_trigger_words',
|
|
'compactMode': 'compact_mode',
|
|
'modelCardFooterAction': 'model_card_footer_action',
|
|
}
|
|
|
|
updated = False
|
|
for old_key, new_key in key_migrations.items():
|
|
if old_key in self.settings:
|
|
if new_key not in self.settings:
|
|
self.settings[new_key] = self.settings[old_key]
|
|
del self.settings[old_key]
|
|
updated = True
|
|
|
|
if updated:
|
|
logger.info("Migrated legacy setting keys to snake_case")
|
|
self._save_settings()
|
|
|
|
def _migrate_download_path_template(self):
|
|
"""Migrate old download_path_template to new download_path_templates"""
|
|
old_template = self.settings.get('download_path_template')
|
|
templates = self.settings.get('download_path_templates')
|
|
|
|
# If old template exists and new templates don't exist, migrate
|
|
if old_template is not None and not templates:
|
|
logger.info("Migrating download_path_template to download_path_templates")
|
|
self.settings['download_path_templates'] = {
|
|
'lora': old_template,
|
|
'checkpoint': old_template,
|
|
'embedding': old_template
|
|
}
|
|
# Remove old setting
|
|
del self.settings['download_path_template']
|
|
self._save_settings()
|
|
logger.info("Migration completed")
|
|
|
|
def _auto_set_default_roots(self):
|
|
"""Auto set default root paths when the current default is unset or not among the options.
|
|
|
|
For single-path cases, always use that path.
|
|
For multi-path cases, only set if current default is empty or invalid.
|
|
"""
|
|
folder_paths = self.settings.get('folder_paths', {})
|
|
updated = False
|
|
# loras
|
|
loras = folder_paths.get('loras', [])
|
|
if isinstance(loras, list) and len(loras) == 1:
|
|
current_lora_root = self.settings.get('default_lora_root')
|
|
if current_lora_root not in loras:
|
|
self.settings['default_lora_root'] = loras[0]
|
|
updated = True
|
|
# checkpoints
|
|
checkpoints = folder_paths.get('checkpoints', [])
|
|
if isinstance(checkpoints, list) and len(checkpoints) == 1:
|
|
current_checkpoint_root = self.settings.get('default_checkpoint_root')
|
|
if current_checkpoint_root not in checkpoints:
|
|
self.settings['default_checkpoint_root'] = checkpoints[0]
|
|
updated = True
|
|
# unet (diffusion models) - auto-set if empty or invalid
|
|
unet_paths = folder_paths.get('unet', [])
|
|
if isinstance(unet_paths, list) and len(unet_paths) >= 1:
|
|
current_unet_root = self.settings.get('default_unet_root')
|
|
# Set to first path if current is empty or not in the valid paths
|
|
if not current_unet_root or current_unet_root not in unet_paths:
|
|
self.settings['default_unet_root'] = unet_paths[0]
|
|
updated = True
|
|
# embeddings
|
|
embeddings = folder_paths.get('embeddings', [])
|
|
if isinstance(embeddings, list) and len(embeddings) == 1:
|
|
current_embedding_root = self.settings.get('default_embedding_root')
|
|
if current_embedding_root not in embeddings:
|
|
self.settings['default_embedding_root'] = embeddings[0]
|
|
updated = True
|
|
if updated:
|
|
self._update_active_library_entry(
|
|
default_lora_root=self.settings.get('default_lora_root'),
|
|
default_checkpoint_root=self.settings.get('default_checkpoint_root'),
|
|
default_unet_root=self.settings.get('default_unet_root'),
|
|
default_embedding_root=self.settings.get('default_embedding_root'),
|
|
)
|
|
if self._bootstrap_reason == "missing":
|
|
self._needs_initial_save = True
|
|
else:
|
|
self._save_settings()
|
|
|
|
def _check_environment_variables(self) -> None:
|
|
"""Check for environment variables and update settings if needed"""
|
|
env_api_key = os.environ.get('CIVITAI_API_KEY')
|
|
if env_api_key: # Check if the environment variable exists and is not empty
|
|
logger.info("Found CIVITAI_API_KEY environment variable")
|
|
# Always use the environment variable if it exists
|
|
self.settings['civitai_api_key'] = env_api_key
|
|
self._save_settings()
|
|
|
|
def _default_settings_actions(self) -> List[Dict[str, Any]]:
|
|
return [
|
|
{
|
|
"action": "open-settings-location",
|
|
"label": "Open settings folder",
|
|
"type": "primary",
|
|
"icon": "fas fa-folder-open",
|
|
}
|
|
]
|
|
|
|
def _add_startup_message(
|
|
self,
|
|
*,
|
|
code: str,
|
|
title: str,
|
|
message: str,
|
|
severity: str = "info",
|
|
actions: Optional[List[Dict[str, Any]]] = None,
|
|
details: Optional[str] = None,
|
|
dismissible: bool = False,
|
|
) -> None:
|
|
if any(existing.get("code") == code for existing in self._startup_messages):
|
|
return
|
|
|
|
payload: Dict[str, Any] = {
|
|
"code": code,
|
|
"title": title,
|
|
"message": message,
|
|
"severity": severity.lower(),
|
|
"dismissible": bool(dismissible),
|
|
}
|
|
|
|
if actions:
|
|
payload["actions"] = [dict(action) for action in actions]
|
|
if details:
|
|
payload["details"] = details
|
|
payload["settings_file"] = self.settings_file
|
|
|
|
self._startup_messages.append(payload)
|
|
|
|
def _cleanup_default_values_from_disk(self) -> None:
|
|
"""Remove default values from existing settings.json to keep it clean.
|
|
|
|
Only performs cleanup if the file contains a significant number of default
|
|
values (indicating it's "bloated"). Small files (like template-based configs)
|
|
are preserved as-is to avoid unexpected changes.
|
|
"""
|
|
# Only cleanup existing files (not new ones)
|
|
if self._bootstrap_reason == "missing" or self._original_disk_payload is None:
|
|
return
|
|
|
|
defaults = self._get_default_settings()
|
|
disk_keys = set(self._original_disk_payload.keys())
|
|
|
|
# Count how many keys on disk are set to their default values
|
|
default_value_keys = set()
|
|
for key in disk_keys:
|
|
if key in CORE_USER_SETTING_KEYS:
|
|
continue # Core keys don't count as "cleanup candidates"
|
|
disk_value = self._original_disk_payload.get(key)
|
|
default_value = defaults.get(key)
|
|
# Compare using JSON serialization for complex objects
|
|
if json.dumps(disk_value, sort_keys=True, default=str) == json.dumps(default_value, sort_keys=True, default=str):
|
|
default_value_keys.add(key)
|
|
|
|
# Only cleanup if there are "many" default keys (indicating a bloated file)
|
|
# This preserves small/template-based configs while cleaning up legacy bloated files
|
|
if len(default_value_keys) >= DEFAULT_KEYS_CLEANUP_THRESHOLD:
|
|
logger.info(
|
|
"Cleaning up %d default value(s) from settings.json to keep it minimal",
|
|
len(default_value_keys)
|
|
)
|
|
self._save_settings()
|
|
# Update original payload to match what we just saved
|
|
self._original_disk_payload = self._serialize_settings_for_disk()
|
|
|
|
def _collect_configuration_warnings(self) -> None:
|
|
if not self._standalone_mode:
|
|
return
|
|
|
|
folder_paths = self.settings.get('folder_paths', {}) or {}
|
|
monitored_keys = ('loras', 'checkpoints', 'embeddings')
|
|
|
|
has_valid_paths = False
|
|
for key in monitored_keys:
|
|
raw_paths = folder_paths.get(key) or []
|
|
if isinstance(raw_paths, str):
|
|
raw_paths = [raw_paths]
|
|
try:
|
|
iterator = list(raw_paths)
|
|
except TypeError:
|
|
continue
|
|
if any(isinstance(path, str) and path and os.path.exists(path) for path in iterator):
|
|
has_valid_paths = True
|
|
break
|
|
|
|
if not has_valid_paths:
|
|
if self._bootstrap_reason == "missing":
|
|
message = (
|
|
"LoRA Manager created a default settings.json because no configuration was found. "
|
|
"Edit settings.json to add your model directories so library scanning can run."
|
|
)
|
|
else:
|
|
message = (
|
|
"LoRA Manager could not locate any configured model directories. "
|
|
"Edit settings.json to add your model folders so library scanning can run."
|
|
)
|
|
self._add_startup_message(
|
|
code="missing-model-paths",
|
|
title="Model folders need setup",
|
|
message=message,
|
|
severity="warning",
|
|
actions=self._default_settings_actions(),
|
|
dismissible=False,
|
|
)
|
|
|
|
def refresh_environment_variables(self) -> None:
|
|
"""Refresh settings from environment variables"""
|
|
self._check_environment_variables()
|
|
|
|
def _get_default_settings(self) -> Dict[str, Any]:
|
|
"""Return default settings"""
|
|
defaults = copy.deepcopy(DEFAULT_SETTINGS)
|
|
defaults['base_model_path_mappings'] = {}
|
|
defaults['download_path_templates'] = {}
|
|
defaults['priority_tags'] = DEFAULT_PRIORITY_TAG_CONFIG.copy()
|
|
defaults.setdefault('folder_paths', {})
|
|
defaults.setdefault('extra_folder_paths', {})
|
|
defaults['auto_organize_exclusions'] = []
|
|
defaults['metadata_refresh_skip_paths'] = []
|
|
|
|
library_name = defaults.get("active_library") or "default"
|
|
default_library = self._build_library_payload(
|
|
folder_paths=defaults.get("folder_paths", {}),
|
|
extra_folder_paths=defaults.get("extra_folder_paths", {}),
|
|
default_lora_root=defaults.get("default_lora_root"),
|
|
default_checkpoint_root=defaults.get("default_checkpoint_root"),
|
|
default_embedding_root=defaults.get("default_embedding_root"),
|
|
)
|
|
defaults['libraries'] = {library_name: default_library}
|
|
defaults['active_library'] = library_name
|
|
return defaults
|
|
|
|
def _normalize_priority_tag_config(self, value: Any) -> Dict[str, str]:
|
|
normalized: Dict[str, str] = {}
|
|
if isinstance(value, Mapping):
|
|
for key, raw in value.items():
|
|
if not isinstance(key, str) or not isinstance(raw, str):
|
|
continue
|
|
normalized[key] = raw.strip()
|
|
|
|
for model_type, default_value in DEFAULT_PRIORITY_TAG_CONFIG.items():
|
|
normalized.setdefault(model_type, default_value)
|
|
|
|
return normalized
|
|
|
|
def normalize_auto_organize_exclusions(self, value: Any) -> List[str]:
|
|
if value is None:
|
|
return []
|
|
|
|
if isinstance(value, str):
|
|
candidates: Iterable[str] = (
|
|
value.replace("\n", ",").replace(";", ",").split(",")
|
|
)
|
|
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)):
|
|
candidates = value
|
|
else:
|
|
return []
|
|
|
|
patterns: List[str] = []
|
|
for raw in candidates:
|
|
if isinstance(raw, str):
|
|
token = raw.strip()
|
|
if token:
|
|
patterns.append(token)
|
|
|
|
unique_patterns: List[str] = []
|
|
seen = set()
|
|
for pattern in patterns:
|
|
if pattern not in seen:
|
|
seen.add(pattern)
|
|
unique_patterns.append(pattern)
|
|
|
|
return unique_patterns
|
|
|
|
def get_priority_tag_config(self) -> Dict[str, str]:
|
|
stored_value = self.settings.get("priority_tags")
|
|
normalized = self._normalize_priority_tag_config(stored_value)
|
|
if normalized != stored_value:
|
|
self.settings["priority_tags"] = normalized
|
|
self._save_settings()
|
|
return normalized.copy()
|
|
|
|
def get_auto_organize_exclusions(self) -> List[str]:
|
|
exclusions = self.normalize_auto_organize_exclusions(
|
|
self.settings.get("auto_organize_exclusions")
|
|
)
|
|
if exclusions != self.settings.get("auto_organize_exclusions"):
|
|
self.settings["auto_organize_exclusions"] = exclusions
|
|
self._save_settings()
|
|
return exclusions
|
|
|
|
def normalize_metadata_refresh_skip_paths(self, value: Any) -> List[str]:
|
|
if value is None:
|
|
return []
|
|
|
|
if isinstance(value, str):
|
|
candidates: Iterable[str] = (
|
|
value.replace("\n", ",").replace(";", ",").split(",")
|
|
)
|
|
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)):
|
|
candidates = value
|
|
else:
|
|
return []
|
|
|
|
paths: List[str] = []
|
|
for raw in candidates:
|
|
if isinstance(raw, str):
|
|
token = raw.replace("\\", "/").strip().strip("/")
|
|
if token:
|
|
paths.append(token)
|
|
|
|
unique_paths: List[str] = []
|
|
seen = set()
|
|
for path in paths:
|
|
if path not in seen:
|
|
seen.add(path)
|
|
unique_paths.append(path)
|
|
|
|
return unique_paths
|
|
|
|
def get_metadata_refresh_skip_paths(self) -> List[str]:
|
|
skip_paths = self.normalize_metadata_refresh_skip_paths(
|
|
self.settings.get("metadata_refresh_skip_paths")
|
|
)
|
|
if skip_paths != self.settings.get("metadata_refresh_skip_paths"):
|
|
self.settings["metadata_refresh_skip_paths"] = skip_paths
|
|
self._save_settings()
|
|
return skip_paths
|
|
|
|
def get_extra_folder_paths(self) -> Dict[str, List[str]]:
|
|
"""Get extra folder paths for the active library.
|
|
|
|
These paths are only used by LoRA Manager and not shared with ComfyUI.
|
|
Returns a dictionary with keys like 'loras', 'checkpoints', 'embeddings', 'unet'.
|
|
"""
|
|
extra_paths = self.settings.get("extra_folder_paths", {})
|
|
if not isinstance(extra_paths, dict):
|
|
return {}
|
|
return self._normalize_folder_paths(extra_paths)
|
|
|
|
def update_extra_folder_paths(
|
|
self,
|
|
extra_folder_paths: Mapping[str, Iterable[str]],
|
|
) -> None:
|
|
"""Update extra folder paths for the active library.
|
|
|
|
These paths are only used by LoRA Manager and not shared with ComfyUI.
|
|
Validates that extra paths don't overlap with other libraries' paths.
|
|
"""
|
|
active_name = self.get_active_library_name()
|
|
self._validate_folder_paths(active_name, extra_folder_paths)
|
|
|
|
normalized_paths = self._normalize_folder_paths(extra_folder_paths)
|
|
self.settings["extra_folder_paths"] = normalized_paths
|
|
self._update_active_library_entry(extra_folder_paths=normalized_paths)
|
|
self._save_settings()
|
|
logger.info("Updated extra folder paths for library '%s'", active_name)
|
|
|
|
def get_startup_messages(self) -> List[Dict[str, Any]]:
|
|
return [message.copy() for message in self._startup_messages]
|
|
|
|
def get_priority_tag_entries(self, model_type: str) -> List[PriorityTagEntry]:
|
|
config = self.get_priority_tag_config()
|
|
raw_config = config.get(model_type, "")
|
|
return parse_priority_tag_string(raw_config)
|
|
|
|
def resolve_priority_tag_for_model(
|
|
self, tags: Sequence[str] | Iterable[str], model_type: str
|
|
) -> str:
|
|
entries = self.get_priority_tag_entries(model_type)
|
|
resolved = resolve_priority_tag(tags, entries)
|
|
if resolved:
|
|
return resolved
|
|
|
|
for tag in tags:
|
|
if isinstance(tag, str) and tag:
|
|
return tag
|
|
return ""
|
|
|
|
def get_priority_tag_suggestions(self) -> Dict[str, List[str]]:
|
|
suggestions: Dict[str, List[str]] = {}
|
|
config = self.get_priority_tag_config()
|
|
for model_type, raw_value in config.items():
|
|
entries = parse_priority_tag_string(raw_value)
|
|
suggestions[model_type] = collect_canonical_tags(entries)
|
|
return suggestions
|
|
|
|
def get(self, key: str, default: Any = None) -> Any:
|
|
"""Get setting value"""
|
|
return self.settings.get(key, default)
|
|
|
|
def set(self, key: str, value: Any) -> None:
|
|
"""Set setting value and save"""
|
|
if key == "auto_organize_exclusions":
|
|
value = self.normalize_auto_organize_exclusions(value)
|
|
elif key == "metadata_refresh_skip_paths":
|
|
value = self.normalize_metadata_refresh_skip_paths(value)
|
|
self.settings[key] = value
|
|
portable_switch_pending = False
|
|
if key == "use_portable_settings" and isinstance(value, bool):
|
|
portable_switch_pending = True
|
|
self._prepare_portable_switch(value)
|
|
if key == 'folder_paths' and isinstance(value, Mapping):
|
|
self._update_active_library_entry(folder_paths=value) # type: ignore[arg-type]
|
|
elif key == 'extra_folder_paths' and isinstance(value, Mapping):
|
|
self._update_active_library_entry(extra_folder_paths=value) # type: ignore[arg-type]
|
|
elif key == 'default_lora_root':
|
|
self._update_active_library_entry(default_lora_root=str(value))
|
|
elif key == 'default_checkpoint_root':
|
|
self._update_active_library_entry(default_checkpoint_root=str(value))
|
|
elif key == 'default_unet_root':
|
|
self._update_active_library_entry(default_unet_root=str(value))
|
|
elif key == 'default_embedding_root':
|
|
self._update_active_library_entry(default_embedding_root=str(value))
|
|
elif key == 'model_name_display':
|
|
self._notify_model_name_display_change(value)
|
|
self._save_settings()
|
|
if portable_switch_pending:
|
|
self._finalize_portable_switch()
|
|
|
|
def delete(self, key: str) -> None:
|
|
"""Delete setting key and save"""
|
|
if key in self.settings:
|
|
del self.settings[key]
|
|
self._save_settings()
|
|
logger.info(f"Deleted setting: {key}")
|
|
|
|
def keys(self) -> Iterable[str]:
|
|
"""Return all setting keys."""
|
|
return self.settings.keys()
|
|
|
|
def _prepare_portable_switch(self, use_portable: bool) -> None:
|
|
"""Prepare switching the settings storage location."""
|
|
|
|
legacy_path = get_legacy_settings_path()
|
|
user_dir = self._get_user_config_directory()
|
|
user_settings_path = os.path.join(user_dir, "settings.json")
|
|
|
|
target_path = legacy_path if use_portable else user_settings_path
|
|
other_path = user_settings_path if use_portable else legacy_path
|
|
target_dir = os.path.dirname(target_path)
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
|
|
previous_path = self.settings_file or target_path
|
|
previous_dir = os.path.dirname(previous_path) or target_dir
|
|
|
|
if os.path.abspath(previous_path) != os.path.abspath(target_path):
|
|
self._copy_model_cache_directory(previous_dir, target_dir)
|
|
logger.info("Switching settings file to: %s", target_path)
|
|
|
|
self._pending_portable_switch = {"other_path": other_path}
|
|
self.settings_file = target_path
|
|
|
|
def _finalize_portable_switch(self) -> None:
|
|
"""Mirror the latest settings file to the secondary location."""
|
|
|
|
info = self._pending_portable_switch
|
|
if not info:
|
|
return
|
|
|
|
other_path = info.get("other_path")
|
|
current_path = self.settings_file
|
|
|
|
if not other_path or not current_path:
|
|
self._pending_portable_switch = None
|
|
return
|
|
|
|
if os.path.abspath(other_path) == os.path.abspath(current_path):
|
|
self._pending_portable_switch = None
|
|
return
|
|
|
|
other_dir = os.path.dirname(other_path) or os.path.dirname(current_path)
|
|
if other_dir:
|
|
os.makedirs(other_dir, exist_ok=True)
|
|
|
|
try:
|
|
shutil.copy2(current_path, other_path)
|
|
except Exception as exc:
|
|
logger.warning("Failed to mirror settings.json to %s: %s", other_path, exc)
|
|
finally:
|
|
self._pending_portable_switch = None
|
|
|
|
def _copy_model_cache_directory(self, source_dir: str, target_dir: str) -> None:
|
|
"""Copy model_cache artifacts when switching storage locations."""
|
|
|
|
if not source_dir or not target_dir:
|
|
return
|
|
|
|
source_cache_dir = os.path.join(source_dir, "model_cache")
|
|
target_cache_dir = os.path.join(target_dir, "model_cache")
|
|
if (
|
|
os.path.isdir(source_cache_dir)
|
|
and os.path.abspath(source_cache_dir) != os.path.abspath(target_cache_dir)
|
|
):
|
|
try:
|
|
shutil.copytree(
|
|
source_cache_dir,
|
|
target_cache_dir,
|
|
dirs_exist_ok=True,
|
|
ignore=shutil.ignore_patterns("*.sqlite-shm", "*.sqlite-wal"),
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Failed to copy model_cache directory from %s to %s: %s",
|
|
source_cache_dir,
|
|
target_cache_dir,
|
|
exc,
|
|
)
|
|
|
|
source_cache_file = os.path.join(source_dir, "model_cache.sqlite")
|
|
target_cache_file = os.path.join(target_dir, "model_cache.sqlite")
|
|
if (
|
|
os.path.isfile(source_cache_file)
|
|
and os.path.abspath(source_cache_file) != os.path.abspath(target_cache_file)
|
|
):
|
|
try:
|
|
shutil.copy2(source_cache_file, target_cache_file)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Failed to copy model_cache.sqlite from %s to %s: %s",
|
|
source_cache_file,
|
|
target_cache_file,
|
|
exc,
|
|
)
|
|
|
|
def _get_user_config_directory(self) -> str:
|
|
"""Return the user configuration directory, falling back to ~/.config."""
|
|
|
|
try:
|
|
config_dir = user_config_dir(APP_NAME, appauthor=False) or ""
|
|
except Exception as exc: # pragma: no cover - defensive fallback
|
|
logger.warning("Failed to determine user config directory: %s", exc)
|
|
config_dir = ""
|
|
|
|
if not config_dir:
|
|
config_dir = os.path.join(os.path.expanduser("~"), f".config/{APP_NAME}")
|
|
|
|
try:
|
|
os.makedirs(config_dir, exist_ok=True)
|
|
except Exception as exc:
|
|
logger.warning("Failed to create user config directory %s: %s", config_dir, exc)
|
|
|
|
return config_dir
|
|
|
|
def _notify_model_name_display_change(self, value: Any) -> None:
|
|
"""Trigger cache resorting when the model name display preference updates."""
|
|
|
|
try:
|
|
from .service_registry import ServiceRegistry # type: ignore
|
|
except Exception: # pragma: no cover - registry optional in some contexts
|
|
return
|
|
|
|
display_mode = value if isinstance(value, str) else "model_name"
|
|
pending: List[Tuple[Optional[asyncio.AbstractEventLoop], Awaitable[Any]]] = []
|
|
|
|
def _resolve_service_loop(service: Any) -> Optional[asyncio.AbstractEventLoop]:
|
|
loop = getattr(service, "loop", None)
|
|
if loop is None:
|
|
loop = getattr(service, "_loop", None)
|
|
return loop if isinstance(loop, asyncio.AbstractEventLoop) else None
|
|
|
|
for service_name in (
|
|
"lora_scanner",
|
|
"checkpoint_scanner",
|
|
"embedding_scanner",
|
|
"recipe_scanner",
|
|
):
|
|
service = ServiceRegistry.get_service_sync(service_name)
|
|
if not service or not hasattr(service, "on_model_name_display_changed"):
|
|
continue
|
|
|
|
try:
|
|
result = service.on_model_name_display_changed(display_mode)
|
|
except Exception as exc: # pragma: no cover - defensive guard
|
|
logger.debug(
|
|
"Service %s failed to schedule name display update: %s",
|
|
service_name,
|
|
exc,
|
|
)
|
|
continue
|
|
|
|
if asyncio.iscoroutine(result):
|
|
service_loop = _resolve_service_loop(service)
|
|
pending.append((service_loop, result))
|
|
|
|
if not pending:
|
|
return
|
|
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
loop = None
|
|
|
|
for service_loop, coroutine in pending:
|
|
target_loop = service_loop or loop
|
|
|
|
if target_loop is None:
|
|
try:
|
|
asyncio.run(coroutine)
|
|
except RuntimeError:
|
|
logger.debug("Skipping name display update due to missing event loop")
|
|
continue
|
|
|
|
if loop is not None and target_loop is loop:
|
|
target_loop.create_task(coroutine)
|
|
continue
|
|
|
|
if target_loop.is_running():
|
|
try:
|
|
asyncio.run_coroutine_threadsafe(coroutine, target_loop)
|
|
except Exception as exc: # pragma: no cover - defensive guard
|
|
logger.debug("Failed to dispatch name display update: %s", exc)
|
|
continue
|
|
|
|
try:
|
|
asyncio.run(coroutine)
|
|
except RuntimeError:
|
|
logger.debug("Skipping name display update due to closed loop")
|
|
|
|
def _save_settings(self) -> None:
|
|
"""Save settings to file"""
|
|
try:
|
|
payload = self._serialize_settings_for_disk()
|
|
with open(self.settings_file, 'w', encoding='utf-8') as f:
|
|
json.dump(payload, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Error saving settings: {e}")
|
|
else:
|
|
if self._bootstrap_reason == "missing":
|
|
self._bootstrap_reason = None
|
|
self._seed_template = None
|
|
|
|
def _serialize_settings_for_disk(self) -> Dict[str, Any]:
|
|
"""Return the settings payload that should be persisted to disk.
|
|
|
|
Only saves settings that differ from defaults, keeping the config file
|
|
clean and focused on user customizations. Default values are still
|
|
available at runtime via _get_default_settings().
|
|
"""
|
|
|
|
if self._bootstrap_reason == "missing":
|
|
minimal: Dict[str, Any] = {}
|
|
for key in CORE_USER_SETTING_KEYS:
|
|
if key in self.settings:
|
|
minimal[key] = copy.deepcopy(self.settings[key])
|
|
|
|
if self._seed_template:
|
|
for key, value in self._seed_template.items():
|
|
minimal.setdefault(key, copy.deepcopy(value))
|
|
|
|
return minimal
|
|
|
|
# Only save settings that differ from defaults
|
|
defaults = self._get_default_settings()
|
|
minimal = {}
|
|
|
|
for key, value in self.settings.items():
|
|
default_value = defaults.get(key)
|
|
|
|
# Core settings are always saved (even if equal to default)
|
|
if key in CORE_USER_SETTING_KEYS:
|
|
minimal[key] = copy.deepcopy(value)
|
|
# Complex objects need deep comparison
|
|
elif isinstance(value, (dict, list)) and default_value is not None:
|
|
if json.dumps(value, sort_keys=True, default=str) != json.dumps(default_value, sort_keys=True, default=str):
|
|
minimal[key] = copy.deepcopy(value)
|
|
# Simple values use direct comparison
|
|
elif value != default_value:
|
|
minimal[key] = copy.deepcopy(value)
|
|
|
|
return minimal
|
|
|
|
def get_libraries(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Return a copy of the registered libraries."""
|
|
libraries = self.settings.get("libraries", {})
|
|
return copy.deepcopy(libraries)
|
|
|
|
def get_active_library_name(self) -> str:
|
|
"""Return the currently active library name."""
|
|
libraries = self.settings.get("libraries", {})
|
|
active_name = self.settings.get("active_library")
|
|
if active_name and active_name in libraries:
|
|
return active_name
|
|
if libraries:
|
|
return next(iter(libraries.keys()))
|
|
return "default"
|
|
|
|
def get_active_library(self) -> Dict[str, Any]:
|
|
"""Return a copy of the active library configuration."""
|
|
libraries = self.settings.get("libraries", {})
|
|
active_name = self.get_active_library_name()
|
|
return copy.deepcopy(libraries.get(active_name, {}))
|
|
|
|
def activate_library(self, library_name: str) -> None:
|
|
"""Activate a library by name and refresh dependent services."""
|
|
libraries = self.settings.get("libraries", {})
|
|
if library_name not in libraries:
|
|
raise KeyError(f"Library '{library_name}' does not exist")
|
|
|
|
current_active = self.get_active_library_name()
|
|
if current_active == library_name:
|
|
# Ensure root settings stay in sync even if already active
|
|
self._sync_active_library_to_root(save=False)
|
|
self._save_settings()
|
|
self._notify_library_change(library_name)
|
|
return
|
|
|
|
self.settings["active_library"] = library_name
|
|
self._sync_active_library_to_root(save=False)
|
|
self._save_settings()
|
|
self._notify_library_change(library_name)
|
|
|
|
def upsert_library(
|
|
self,
|
|
library_name: str,
|
|
*,
|
|
folder_paths: Optional[Mapping[str, Iterable[str]]] = None,
|
|
extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None,
|
|
default_lora_root: Optional[str] = None,
|
|
default_checkpoint_root: Optional[str] = None,
|
|
default_unet_root: Optional[str] = None,
|
|
default_embedding_root: Optional[str] = None,
|
|
metadata: Optional[Mapping[str, Any]] = None,
|
|
activate: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""Create or update a library definition."""
|
|
|
|
name = library_name.strip()
|
|
if not name:
|
|
raise ValueError("Library name cannot be empty")
|
|
|
|
if folder_paths is not None:
|
|
self._validate_folder_paths(name, folder_paths)
|
|
|
|
if extra_folder_paths is not None:
|
|
self._validate_folder_paths(name, extra_folder_paths)
|
|
|
|
libraries = self.settings.setdefault("libraries", {})
|
|
existing = libraries.get(name, {})
|
|
|
|
payload = self._build_library_payload(
|
|
folder_paths=folder_paths if folder_paths is not None else existing.get("folder_paths"),
|
|
extra_folder_paths=extra_folder_paths if extra_folder_paths is not None else existing.get("extra_folder_paths"),
|
|
default_lora_root=default_lora_root if default_lora_root is not None else existing.get("default_lora_root"),
|
|
default_checkpoint_root=(
|
|
default_checkpoint_root
|
|
if default_checkpoint_root is not None
|
|
else existing.get("default_checkpoint_root")
|
|
),
|
|
default_unet_root=(
|
|
default_unet_root
|
|
if default_unet_root is not None
|
|
else existing.get("default_unet_root")
|
|
),
|
|
default_embedding_root=(
|
|
default_embedding_root
|
|
if default_embedding_root is not None
|
|
else existing.get("default_embedding_root")
|
|
),
|
|
metadata=metadata if metadata is not None else existing.get("metadata"),
|
|
base=existing,
|
|
)
|
|
|
|
libraries[name] = payload
|
|
|
|
if activate or not self.settings.get("active_library"):
|
|
self.settings["active_library"] = name
|
|
|
|
self._sync_active_library_to_root(save=False)
|
|
self._save_settings()
|
|
|
|
if self.settings.get("active_library") == name:
|
|
self._notify_library_change(name)
|
|
|
|
return payload
|
|
|
|
def create_library(
|
|
self,
|
|
library_name: str,
|
|
*,
|
|
folder_paths: Mapping[str, Iterable[str]],
|
|
extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None,
|
|
default_lora_root: str = "",
|
|
default_checkpoint_root: str = "",
|
|
default_unet_root: str = "",
|
|
default_embedding_root: str = "",
|
|
metadata: Optional[Mapping[str, Any]] = None,
|
|
activate: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""Create a new library entry."""
|
|
|
|
libraries = self.settings.get("libraries", {})
|
|
if library_name in libraries:
|
|
raise ValueError(f"Library '{library_name}' already exists")
|
|
|
|
return self.upsert_library(
|
|
library_name,
|
|
folder_paths=folder_paths,
|
|
extra_folder_paths=extra_folder_paths,
|
|
default_lora_root=default_lora_root,
|
|
default_checkpoint_root=default_checkpoint_root,
|
|
default_unet_root=default_unet_root,
|
|
default_embedding_root=default_embedding_root,
|
|
metadata=metadata,
|
|
activate=activate,
|
|
)
|
|
|
|
def rename_library(self, old_name: str, new_name: str) -> None:
|
|
"""Rename an existing library."""
|
|
|
|
libraries = self.settings.get("libraries", {})
|
|
if old_name not in libraries:
|
|
raise KeyError(f"Library '{old_name}' does not exist")
|
|
new_name_stripped = new_name.strip()
|
|
if not new_name_stripped:
|
|
raise ValueError("New library name cannot be empty")
|
|
if new_name_stripped in libraries:
|
|
raise ValueError(f"Library '{new_name_stripped}' already exists")
|
|
|
|
libraries[new_name_stripped] = libraries.pop(old_name)
|
|
if self.settings.get("active_library") == old_name:
|
|
self.settings["active_library"] = new_name_stripped
|
|
active_name = new_name_stripped
|
|
else:
|
|
active_name = self.settings.get("active_library")
|
|
|
|
self._sync_active_library_to_root(save=False)
|
|
self._save_settings()
|
|
|
|
if active_name == new_name_stripped:
|
|
self._notify_library_change(new_name_stripped)
|
|
|
|
def delete_library(self, library_name: str) -> None:
|
|
"""Remove a library definition."""
|
|
|
|
libraries = self.settings.get("libraries", {})
|
|
if library_name not in libraries:
|
|
raise KeyError(f"Library '{library_name}' does not exist")
|
|
if len(libraries) == 1:
|
|
raise ValueError("At least one library must remain")
|
|
|
|
was_active = self.settings.get("active_library") == library_name
|
|
libraries.pop(library_name)
|
|
|
|
if was_active:
|
|
new_active = next(iter(libraries.keys()))
|
|
self.settings["active_library"] = new_active
|
|
self._sync_active_library_to_root(save=False)
|
|
self._save_settings()
|
|
|
|
if was_active:
|
|
self._notify_library_change(self.settings["active_library"])
|
|
|
|
def update_active_library_paths(
|
|
self,
|
|
folder_paths: Mapping[str, Iterable[str]],
|
|
*,
|
|
extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None,
|
|
default_lora_root: Optional[str] = None,
|
|
default_checkpoint_root: Optional[str] = None,
|
|
default_unet_root: Optional[str] = None,
|
|
default_embedding_root: Optional[str] = None,
|
|
) -> None:
|
|
"""Update folder paths for the active library."""
|
|
|
|
active_name = self.get_active_library_name()
|
|
self.upsert_library(
|
|
active_name,
|
|
folder_paths=folder_paths,
|
|
extra_folder_paths=extra_folder_paths,
|
|
default_lora_root=default_lora_root,
|
|
default_checkpoint_root=default_checkpoint_root,
|
|
default_unet_root=default_unet_root,
|
|
default_embedding_root=default_embedding_root,
|
|
activate=True,
|
|
)
|
|
|
|
def _notify_library_change(self, library_name: str) -> None:
|
|
"""Notify dependent services that the active library changed."""
|
|
libraries = self.settings.get("libraries", {})
|
|
library_config = libraries.get(library_name, {})
|
|
library_snapshot = copy.deepcopy(library_config)
|
|
|
|
try:
|
|
from ..config import config # Local import to avoid circular dependency
|
|
|
|
config.apply_library_settings(library_snapshot)
|
|
except Exception as exc: # pragma: no cover - defensive logging
|
|
logger.debug("Failed to apply library settings to config: %s", exc)
|
|
|
|
try:
|
|
from .service_registry import ServiceRegistry # type: ignore
|
|
|
|
for service_name in (
|
|
"lora_scanner",
|
|
"checkpoint_scanner",
|
|
"embedding_scanner",
|
|
"recipe_scanner",
|
|
):
|
|
service = ServiceRegistry.get_service_sync(service_name)
|
|
if service and hasattr(service, "on_library_changed"):
|
|
try:
|
|
service.on_library_changed()
|
|
except Exception as service_exc: # pragma: no cover - defensive logging
|
|
logger.debug(
|
|
"Service %s failed to handle library change: %s",
|
|
service_name,
|
|
service_exc,
|
|
)
|
|
except Exception as exc: # pragma: no cover - defensive logging
|
|
logger.debug("Failed to notify services about library change: %s", exc)
|
|
|
|
def get_download_path_template(self, model_type: str) -> str:
|
|
"""Get download path template for specific model type
|
|
|
|
Args:
|
|
model_type: The type of model ('lora', 'checkpoint', 'embedding')
|
|
|
|
Returns:
|
|
Template string for the model type, defaults to '{base_model}/{first_tag}'
|
|
"""
|
|
templates = self.settings.get('download_path_templates', {})
|
|
|
|
# Handle edge case where templates might be stored as JSON string
|
|
if isinstance(templates, str):
|
|
try:
|
|
# Try to parse JSON string
|
|
parsed_templates = json.loads(templates)
|
|
if isinstance(parsed_templates, dict):
|
|
# Update settings with parsed dictionary
|
|
self.settings['download_path_templates'] = parsed_templates
|
|
self._save_settings()
|
|
templates = parsed_templates
|
|
logger.info("Successfully parsed download_path_templates from JSON string")
|
|
else:
|
|
raise ValueError("Parsed JSON is not a dictionary")
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
# If parsing fails, set default values
|
|
logger.warning(f"Failed to parse download_path_templates JSON string: {e}. Setting default values.")
|
|
default_template = '{base_model}/{first_tag}'
|
|
templates = {
|
|
'lora': default_template,
|
|
'checkpoint': default_template,
|
|
'embedding': default_template
|
|
}
|
|
self.settings['download_path_templates'] = templates
|
|
self._save_settings()
|
|
|
|
# Ensure templates is a dictionary
|
|
if not isinstance(templates, dict):
|
|
default_template = '{base_model}/{first_tag}'
|
|
templates = {
|
|
'lora': default_template,
|
|
'checkpoint': default_template,
|
|
'embedding': default_template
|
|
}
|
|
self.settings['download_path_templates'] = templates
|
|
self._save_settings()
|
|
|
|
return templates.get(model_type, '{base_model}/{first_tag}')
|
|
|
|
|
|
_SETTINGS_MANAGER: Optional["SettingsManager"] = None
|
|
_SETTINGS_MANAGER_LOCK = Lock()
|
|
# Legacy module-level alias for backwards compatibility with callers that
|
|
# monkeypatch ``py.services.settings_manager.settings`` during tests.
|
|
settings: Optional["SettingsManager"] = None
|
|
|
|
|
|
def get_settings_manager() -> "SettingsManager":
|
|
"""Return the lazily initialised global :class:`SettingsManager`."""
|
|
|
|
global _SETTINGS_MANAGER, settings
|
|
if settings is not None:
|
|
return settings
|
|
|
|
if _SETTINGS_MANAGER is None:
|
|
with _SETTINGS_MANAGER_LOCK:
|
|
if _SETTINGS_MANAGER is None:
|
|
_SETTINGS_MANAGER = SettingsManager()
|
|
|
|
settings = _SETTINGS_MANAGER
|
|
return _SETTINGS_MANAGER
|
|
|
|
|
|
def reset_settings_manager() -> None:
|
|
"""Reset the cached settings manager instance.
|
|
|
|
Primarily intended for tests so they can configure the settings
|
|
directory before the manager touches the filesystem.
|
|
"""
|
|
|
|
global _SETTINGS_MANAGER, settings
|
|
with _SETTINGS_MANAGER_LOCK:
|
|
_SETTINGS_MANAGER = None
|
|
settings = None
|