import asyncio import copy import json import os import shutil import logging from pathlib import Path from datetime import datetime, timezone from threading import Lock from typing import Any, Awaitable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple from platformdirs import user_config_dir from ..utils.constants import DEFAULT_HASH_CHUNK_SIZE_MB, DEFAULT_PRIORITY_TAG_CONFIG from ..utils.settings_paths import APP_NAME, ensure_settings_file, get_legacy_settings_path from ..utils.tag_priorities import ( PriorityTagEntry, collect_canonical_tags, parse_priority_tag_string, resolve_priority_tag, ) logger = logging.getLogger(__name__) CORE_USER_SETTING_KEYS: Tuple[str, ...] = ( "civitai_api_key", "folder_paths", ) # Threshold for aggressive cleanup: if file contains this many default keys, clean it up DEFAULT_KEYS_CLEANUP_THRESHOLD = 10 DEFAULT_SETTINGS: Dict[str, Any] = { "civitai_api_key": "", "use_portable_settings": False, "hash_chunk_size_mb": DEFAULT_HASH_CHUNK_SIZE_MB, "language": "en", "show_only_sfw": False, "onboarding_completed": False, "dismissed_banners": [], "enable_metadata_archive_db": False, "proxy_enabled": False, "proxy_host": "", "proxy_port": "", "proxy_username": "", "proxy_password": "", "proxy_type": "http", "default_lora_root": "", "default_checkpoint_root": "", "default_unet_root": "", "default_embedding_root": "", "base_model_path_mappings": {}, "download_path_templates": {}, "folder_paths": {}, "extra_folder_paths": {}, "example_images_path": "", "optimize_example_images": True, "auto_download_example_images": False, "blur_mature_content": True, "autoplay_on_hover": False, "display_density": "default", "card_info_display": "always", "show_folder_sidebar": True, "include_trigger_words": False, "compact_mode": False, "priority_tags": DEFAULT_PRIORITY_TAG_CONFIG.copy(), "model_name_display": "model_name", "model_card_footer_action": "replace_preview", "update_flag_strategy": "same_base", "auto_organize_exclusions": [], "metadata_refresh_skip_paths": [], } class SettingsManager: def __init__(self): self.settings_file = ensure_settings_file(logger) self._pending_portable_switch: Optional[Dict[str, str]] = None self._standalone_mode = self._detect_standalone_mode() self._startup_messages: List[Dict[str, Any]] = [] self._needs_initial_save = False self._bootstrap_reason: Optional[str] = None self._seed_template: Optional[Dict[str, Any]] = None self._template_payload_cache: Optional[Dict[str, Any]] = None self._template_payload_cache_loaded = False self._original_disk_payload: Optional[Dict[str, Any]] = None self._preserve_disk_template = False self._template_path = Path(__file__).resolve().parents[2] / "settings.json.example" self.settings = self._load_settings() self._migrate_setting_keys() self._ensure_default_settings() self._migrate_to_library_registry() self._migrate_download_path_template() self._auto_set_default_roots() self._check_environment_variables() self._collect_configuration_warnings() if self._needs_initial_save: self._save_settings() self._needs_initial_save = False else: # Clean up existing settings file by removing default values self._cleanup_default_values_from_disk() def _detect_standalone_mode(self) -> bool: """Return ``True`` when running in standalone mode.""" return os.environ.get("LORA_MANAGER_STANDALONE") == "1" def _load_settings(self) -> Dict[str, Any]: """Load settings from file""" if os.path.exists(self.settings_file): try: with open(self.settings_file, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): self._original_disk_payload = copy.deepcopy(data) if self._matches_template_payload(data): self._preserve_disk_template = True return data except json.JSONDecodeError as exc: logger.error("Failed to parse settings.json: %s", exc) self._add_startup_message( code="settings-json-invalid", title="Settings file could not be parsed", message=( "LoRA Manager could not parse settings.json. Default settings " "will be used for this session." ), severity="error", actions=self._default_settings_actions(), details=str(exc), dismissible=False, ) self._needs_initial_save = True self._bootstrap_reason = "invalid" except Exception as exc: # pragma: no cover - defensive guard logger.error("Unexpected error loading settings: %s", exc) self._add_startup_message( code="settings-json-unreadable", title="Settings file could not be read", message="LoRA Manager could not read settings.json. Default settings will be used for this session.", severity="error", actions=self._default_settings_actions(), details=str(exc), dismissible=False, ) self._needs_initial_save = True self._bootstrap_reason = "unreadable" if not os.path.exists(self.settings_file): self._needs_initial_save = True self._bootstrap_reason = "missing" seeded = self._load_settings_template() if seeded is not None: defaults = self._get_default_settings() merged = self._merge_template_with_defaults(defaults, seeded) return merged return self._get_default_settings() def _load_settings_template(self) -> Optional[Dict[str, Any]]: """Load the bundled template when no user settings are found.""" payload = self._read_template_payload() if payload is None: return None self._seed_template = copy.deepcopy(payload) return copy.deepcopy(payload) def _read_template_payload(self) -> Optional[Dict[str, Any]]: """Return the cached contents of ``settings.json.example`` when available.""" if self._template_payload_cache_loaded: if self._template_payload_cache is None: return None return copy.deepcopy(self._template_payload_cache) self._template_payload_cache_loaded = True try: with self._template_path.open("r", encoding="utf-8") as handle: data = json.load(handle) except FileNotFoundError: logger.debug("settings.json.example not found at %s", self._template_path) return None except json.JSONDecodeError as exc: logger.warning("Failed to parse settings.json.example: %s", exc) return None if not isinstance(data, dict): logger.debug("settings.json.example is not a JSON object; ignoring template") return None self._template_payload_cache = copy.deepcopy(data) return copy.deepcopy(self._template_payload_cache) def _matches_template_payload(self, payload: Mapping[str, Any]) -> bool: """Return ``True`` when ``payload`` matches the bundled template.""" template = self._read_template_payload() if template is None: return False return payload == template def _merge_template_with_defaults( self, defaults: Dict[str, Any], template: Mapping[str, Any] ) -> Dict[str, Any]: """Merge template values into the in-memory defaults.""" merged = copy.deepcopy(defaults) for key, value in template.items(): if key == "folder_paths" and isinstance(value, Mapping): merged[key] = self._normalize_folder_paths(value) else: merged[key] = copy.deepcopy(value) merged.setdefault("language", "en") merged.setdefault("folder_paths", {}) library_name = merged.get("active_library") or "default" merged["libraries"] = { library_name: self._build_library_payload( folder_paths=merged.get("folder_paths", {}), default_lora_root=merged.get("default_lora_root"), default_checkpoint_root=merged.get("default_checkpoint_root"), default_unet_root=merged.get("default_unet_root"), default_embedding_root=merged.get("default_embedding_root"), ) } merged["active_library"] = library_name return merged def _ensure_default_settings(self) -> None: """Ensure all default settings keys exist in memory (but don't save defaults to disk)""" defaults = self._get_default_settings() updated_existing = False inserted_defaults = False if "priority_tags" in self.settings: normalized_priority = self._normalize_priority_tag_config( self.settings.get("priority_tags") ) if normalized_priority != self.settings.get("priority_tags"): self.settings["priority_tags"] = normalized_priority updated_existing = True else: self.settings["priority_tags"] = copy.deepcopy( defaults.get("priority_tags", DEFAULT_PRIORITY_TAG_CONFIG) ) inserted_defaults = True if "auto_organize_exclusions" in self.settings: normalized_exclusions = self.normalize_auto_organize_exclusions( self.settings.get("auto_organize_exclusions") ) if normalized_exclusions != self.settings.get("auto_organize_exclusions"): self.settings["auto_organize_exclusions"] = normalized_exclusions updated_existing = True else: self.settings["auto_organize_exclusions"] = [] inserted_defaults = True if "metadata_refresh_skip_paths" in self.settings: normalized_skip_paths = self.normalize_metadata_refresh_skip_paths( self.settings.get("metadata_refresh_skip_paths") ) if normalized_skip_paths != self.settings.get("metadata_refresh_skip_paths"): self.settings["metadata_refresh_skip_paths"] = normalized_skip_paths updated_existing = True else: self.settings["metadata_refresh_skip_paths"] = [] inserted_defaults = True for key, value in defaults.items(): if key == "priority_tags": continue if key not in self.settings: if isinstance(value, dict): self.settings[key] = copy.deepcopy(value) else: self.settings[key] = value inserted_defaults = True # Save only if existing values were normalized/updated if updated_existing: self._save_settings() # Note: inserted_defaults no longer triggers save - defaults stay in memory only def _migrate_to_library_registry(self) -> None: """Ensure settings include the multi-library registry structure.""" libraries = self.settings.get("libraries") active_name = self.settings.get("active_library") initial_bootstrap = self._bootstrap_reason == "missing" raw_top_level_paths = self.settings.get("folder_paths", {}) normalized_top_level_paths: Dict[str, List[str]] = {} if isinstance(raw_top_level_paths, Mapping): normalized_top_level_paths = self._normalize_folder_paths(raw_top_level_paths) if normalized_top_level_paths != raw_top_level_paths: self.settings["folder_paths"] = copy.deepcopy(normalized_top_level_paths) top_level_has_paths = self._has_configured_paths(normalized_top_level_paths) needs_library_bootstrap = not isinstance(libraries, dict) or not libraries if ( not needs_library_bootstrap and top_level_has_paths and len(libraries) == 1 ): only_library_payload = next(iter(libraries.values())) if isinstance(only_library_payload, Mapping): folder_payload = only_library_payload.get("folder_paths") if not self._has_configured_paths(folder_payload): needs_library_bootstrap = True if needs_library_bootstrap: library_name = active_name or "default" library_payload = self._build_library_payload( folder_paths=normalized_top_level_paths, default_lora_root=self.settings.get("default_lora_root", ""), default_checkpoint_root=self.settings.get("default_checkpoint_root", ""), default_unet_root=self.settings.get("default_unet_root", ""), default_embedding_root=self.settings.get("default_embedding_root", ""), ) libraries = {library_name: library_payload} self.settings["libraries"] = libraries self.settings["active_library"] = library_name self._sync_active_library_to_root(save=False) if not initial_bootstrap and not self._preserve_disk_template: self._save_settings() return seed_library_name: Optional[str] = None if top_level_has_paths and isinstance(libraries, dict): target_name: Optional[str] = None if active_name and active_name in libraries: target_name = active_name elif len(libraries) == 1: target_name = next(iter(libraries.keys())) if target_name: candidate_payload = libraries.get(target_name) if isinstance(candidate_payload, Mapping) and not self._has_configured_paths(candidate_payload.get("folder_paths")): seed_library_name = target_name sanitized_libraries: Dict[str, Dict[str, Any]] = {} changed = False for name, data in libraries.items(): if not isinstance(data, dict): data = {} changed = True candidate_folder_paths = data.get("folder_paths") if ( seed_library_name == name and not self._has_configured_paths(candidate_folder_paths) and top_level_has_paths ): candidate_folder_paths = normalized_top_level_paths payload = self._build_library_payload( folder_paths=candidate_folder_paths, default_lora_root=data.get("default_lora_root"), default_checkpoint_root=data.get("default_checkpoint_root"), default_unet_root=data.get("default_unet_root"), default_embedding_root=data.get("default_embedding_root"), metadata=data.get("metadata"), base=data, ) sanitized_libraries[name] = payload if payload is not data: changed = True if changed: self.settings["libraries"] = sanitized_libraries if not active_name or active_name not in sanitized_libraries: changed = True if sanitized_libraries: self.settings["active_library"] = next(iter(sanitized_libraries.keys())) else: self.settings["active_library"] = "default" self._sync_active_library_to_root(save=changed and not initial_bootstrap) if changed and initial_bootstrap: self._needs_initial_save = True def _sync_active_library_to_root(self, *, save: bool = False) -> None: """Update top-level folder path settings to mirror the active library.""" libraries = self.settings.get("libraries", {}) active_name = self.settings.get("active_library") if not libraries: return if active_name not in libraries: active_name = next(iter(libraries.keys())) self.settings["active_library"] = active_name active_library = libraries.get(active_name, {}) folder_paths = copy.deepcopy(active_library.get("folder_paths", {})) self.settings["folder_paths"] = folder_paths self.settings["extra_folder_paths"] = copy.deepcopy(active_library.get("extra_folder_paths", {})) self.settings["default_lora_root"] = active_library.get("default_lora_root", "") self.settings["default_checkpoint_root"] = active_library.get("default_checkpoint_root", "") self.settings["default_unet_root"] = active_library.get("default_unet_root", "") self.settings["default_embedding_root"] = active_library.get("default_embedding_root", "") if save: self._save_settings() def _current_timestamp(self) -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def _build_library_payload( self, *, folder_paths: Optional[Mapping[str, Iterable[str]]] = None, extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None, default_lora_root: Optional[str] = None, default_checkpoint_root: Optional[str] = None, default_unet_root: Optional[str] = None, default_embedding_root: Optional[str] = None, metadata: Optional[Mapping[str, Any]] = None, base: Optional[Mapping[str, Any]] = None, ) -> Dict[str, Any]: payload: Dict[str, Any] = dict(base or {}) timestamp = self._current_timestamp() if folder_paths is not None: payload["folder_paths"] = self._normalize_folder_paths(folder_paths) else: payload.setdefault("folder_paths", {}) if extra_folder_paths is not None: payload["extra_folder_paths"] = self._normalize_folder_paths(extra_folder_paths) else: payload.setdefault("extra_folder_paths", {}) if default_lora_root is not None: payload["default_lora_root"] = default_lora_root else: payload.setdefault("default_lora_root", "") if default_checkpoint_root is not None: payload["default_checkpoint_root"] = default_checkpoint_root else: payload.setdefault("default_checkpoint_root", "") if default_unet_root is not None: payload["default_unet_root"] = default_unet_root else: payload.setdefault("default_unet_root", "") if default_embedding_root is not None: payload["default_embedding_root"] = default_embedding_root else: payload.setdefault("default_embedding_root", "") if metadata: merged_meta = dict(payload.get("metadata", {})) merged_meta.update(metadata) payload["metadata"] = merged_meta payload.setdefault("created_at", timestamp) payload["updated_at"] = timestamp return payload def _normalize_folder_paths( self, folder_paths: Mapping[str, Iterable[str]] ) -> Dict[str, List[str]]: normalized: Dict[str, List[str]] = {} for key, values in folder_paths.items(): if not isinstance(values, Iterable): continue cleaned: List[str] = [] seen = set() for value in values: if not isinstance(value, str): continue stripped = value.strip() if not stripped: continue if stripped not in seen: cleaned.append(stripped) seen.add(stripped) normalized[key] = cleaned return normalized def _has_configured_paths(self, folder_paths: Any) -> bool: if not isinstance(folder_paths, Mapping): return False for values in folder_paths.values(): if isinstance(values, str): candidate_values = [values] else: try: candidate_values = list(values) # type: ignore[arg-type] except TypeError: continue for path in candidate_values: if isinstance(path, str) and path.strip(): return True return False def _validate_folder_paths( self, library_name: str, folder_paths: Mapping[str, Iterable[str]], ) -> None: """Ensure folder paths do not overlap with other libraries.""" libraries = self.settings.get("libraries", {}) normalized_new: Dict[str, Dict[str, str]] = {} for key, values in folder_paths.items(): path_map: Dict[str, str] = {} for value in values: if not isinstance(value, str): continue stripped = value.strip() if not stripped: continue normalized_value = os.path.normcase(os.path.normpath(stripped)) path_map[normalized_value] = stripped if path_map: normalized_new[key] = path_map if not normalized_new: return for other_name, other in libraries.items(): if other_name == library_name: continue other_paths = other.get("folder_paths", {}) for key, new_paths in normalized_new.items(): existing = { os.path.normcase(os.path.normpath(path)) for path in other_paths.get(key, []) if isinstance(path, str) and path } overlap = existing.intersection(new_paths.keys()) if overlap: collisions = ", ".join(sorted(new_paths[value] for value in overlap)) raise ValueError( f"Folder path(s) {collisions} already assigned to library '{other_name}'" ) def _update_active_library_entry( self, *, folder_paths: Optional[Mapping[str, Iterable[str]]] = None, extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None, default_lora_root: Optional[str] = None, default_checkpoint_root: Optional[str] = None, default_unet_root: Optional[str] = None, default_embedding_root: Optional[str] = None, ) -> bool: libraries = self.settings.get("libraries", {}) active_name = self.settings.get("active_library") if not active_name or active_name not in libraries: return False library = libraries[active_name] changed = False if folder_paths is not None: normalized_paths = self._normalize_folder_paths(folder_paths) if library.get("folder_paths") != normalized_paths: library["folder_paths"] = normalized_paths changed = True if extra_folder_paths is not None: normalized_extra_paths = self._normalize_folder_paths(extra_folder_paths) if library.get("extra_folder_paths") != normalized_extra_paths: library["extra_folder_paths"] = normalized_extra_paths changed = True if default_lora_root is not None and library.get("default_lora_root") != default_lora_root: library["default_lora_root"] = default_lora_root changed = True if default_checkpoint_root is not None and library.get("default_checkpoint_root") != default_checkpoint_root: library["default_checkpoint_root"] = default_checkpoint_root changed = True if default_unet_root is not None and library.get("default_unet_root") != default_unet_root: library["default_unet_root"] = default_unet_root changed = True if default_embedding_root is not None and library.get("default_embedding_root") != default_embedding_root: library["default_embedding_root"] = default_embedding_root changed = True if changed: library.setdefault("created_at", self._current_timestamp()) library["updated_at"] = self._current_timestamp() return changed def _migrate_setting_keys(self) -> None: """Migrate legacy camelCase setting keys to snake_case""" key_migrations = { 'optimizeExampleImages': 'optimize_example_images', 'autoDownloadExampleImages': 'auto_download_example_images', 'blurMatureContent': 'blur_mature_content', 'autoplayOnHover': 'autoplay_on_hover', 'displayDensity': 'display_density', 'cardInfoDisplay': 'card_info_display', 'includeTriggerWords': 'include_trigger_words', 'compactMode': 'compact_mode', 'modelCardFooterAction': 'model_card_footer_action', } updated = False for old_key, new_key in key_migrations.items(): if old_key in self.settings: if new_key not in self.settings: self.settings[new_key] = self.settings[old_key] del self.settings[old_key] updated = True if updated: logger.info("Migrated legacy setting keys to snake_case") self._save_settings() def _migrate_download_path_template(self): """Migrate old download_path_template to new download_path_templates""" old_template = self.settings.get('download_path_template') templates = self.settings.get('download_path_templates') # If old template exists and new templates don't exist, migrate if old_template is not None and not templates: logger.info("Migrating download_path_template to download_path_templates") self.settings['download_path_templates'] = { 'lora': old_template, 'checkpoint': old_template, 'embedding': old_template } # Remove old setting del self.settings['download_path_template'] self._save_settings() logger.info("Migration completed") def _auto_set_default_roots(self): """Auto set default root paths when the current default is unset or not among the options. For single-path cases, always use that path. For multi-path cases, only set if current default is empty or invalid. """ folder_paths = self.settings.get('folder_paths', {}) updated = False # loras loras = folder_paths.get('loras', []) if isinstance(loras, list) and len(loras) == 1: current_lora_root = self.settings.get('default_lora_root') if current_lora_root not in loras: self.settings['default_lora_root'] = loras[0] updated = True # checkpoints checkpoints = folder_paths.get('checkpoints', []) if isinstance(checkpoints, list) and len(checkpoints) == 1: current_checkpoint_root = self.settings.get('default_checkpoint_root') if current_checkpoint_root not in checkpoints: self.settings['default_checkpoint_root'] = checkpoints[0] updated = True # unet (diffusion models) - auto-set if empty or invalid unet_paths = folder_paths.get('unet', []) if isinstance(unet_paths, list) and len(unet_paths) >= 1: current_unet_root = self.settings.get('default_unet_root') # Set to first path if current is empty or not in the valid paths if not current_unet_root or current_unet_root not in unet_paths: self.settings['default_unet_root'] = unet_paths[0] updated = True # embeddings embeddings = folder_paths.get('embeddings', []) if isinstance(embeddings, list) and len(embeddings) == 1: current_embedding_root = self.settings.get('default_embedding_root') if current_embedding_root not in embeddings: self.settings['default_embedding_root'] = embeddings[0] updated = True if updated: self._update_active_library_entry( default_lora_root=self.settings.get('default_lora_root'), default_checkpoint_root=self.settings.get('default_checkpoint_root'), default_unet_root=self.settings.get('default_unet_root'), default_embedding_root=self.settings.get('default_embedding_root'), ) if self._bootstrap_reason == "missing": self._needs_initial_save = True else: self._save_settings() def _check_environment_variables(self) -> None: """Check for environment variables and update settings if needed""" env_api_key = os.environ.get('CIVITAI_API_KEY') if env_api_key: # Check if the environment variable exists and is not empty logger.info("Found CIVITAI_API_KEY environment variable") # Always use the environment variable if it exists self.settings['civitai_api_key'] = env_api_key self._save_settings() def _default_settings_actions(self) -> List[Dict[str, Any]]: return [ { "action": "open-settings-location", "label": "Open settings folder", "type": "primary", "icon": "fas fa-folder-open", } ] def _add_startup_message( self, *, code: str, title: str, message: str, severity: str = "info", actions: Optional[List[Dict[str, Any]]] = None, details: Optional[str] = None, dismissible: bool = False, ) -> None: if any(existing.get("code") == code for existing in self._startup_messages): return payload: Dict[str, Any] = { "code": code, "title": title, "message": message, "severity": severity.lower(), "dismissible": bool(dismissible), } if actions: payload["actions"] = [dict(action) for action in actions] if details: payload["details"] = details payload["settings_file"] = self.settings_file self._startup_messages.append(payload) def _cleanup_default_values_from_disk(self) -> None: """Remove default values from existing settings.json to keep it clean. Only performs cleanup if the file contains a significant number of default values (indicating it's "bloated"). Small files (like template-based configs) are preserved as-is to avoid unexpected changes. """ # Only cleanup existing files (not new ones) if self._bootstrap_reason == "missing" or self._original_disk_payload is None: return defaults = self._get_default_settings() disk_keys = set(self._original_disk_payload.keys()) # Count how many keys on disk are set to their default values default_value_keys = set() for key in disk_keys: if key in CORE_USER_SETTING_KEYS: continue # Core keys don't count as "cleanup candidates" disk_value = self._original_disk_payload.get(key) default_value = defaults.get(key) # Compare using JSON serialization for complex objects if json.dumps(disk_value, sort_keys=True, default=str) == json.dumps(default_value, sort_keys=True, default=str): default_value_keys.add(key) # Only cleanup if there are "many" default keys (indicating a bloated file) # This preserves small/template-based configs while cleaning up legacy bloated files if len(default_value_keys) >= DEFAULT_KEYS_CLEANUP_THRESHOLD: logger.info( "Cleaning up %d default value(s) from settings.json to keep it minimal", len(default_value_keys) ) self._save_settings() # Update original payload to match what we just saved self._original_disk_payload = self._serialize_settings_for_disk() def _collect_configuration_warnings(self) -> None: if not self._standalone_mode: return folder_paths = self.settings.get('folder_paths', {}) or {} monitored_keys = ('loras', 'checkpoints', 'embeddings') has_valid_paths = False for key in monitored_keys: raw_paths = folder_paths.get(key) or [] if isinstance(raw_paths, str): raw_paths = [raw_paths] try: iterator = list(raw_paths) except TypeError: continue if any(isinstance(path, str) and path and os.path.exists(path) for path in iterator): has_valid_paths = True break if not has_valid_paths: if self._bootstrap_reason == "missing": message = ( "LoRA Manager created a default settings.json because no configuration was found. " "Edit settings.json to add your model directories so library scanning can run." ) else: message = ( "LoRA Manager could not locate any configured model directories. " "Edit settings.json to add your model folders so library scanning can run." ) self._add_startup_message( code="missing-model-paths", title="Model folders need setup", message=message, severity="warning", actions=self._default_settings_actions(), dismissible=False, ) def refresh_environment_variables(self) -> None: """Refresh settings from environment variables""" self._check_environment_variables() def _get_default_settings(self) -> Dict[str, Any]: """Return default settings""" defaults = copy.deepcopy(DEFAULT_SETTINGS) defaults['base_model_path_mappings'] = {} defaults['download_path_templates'] = {} defaults['priority_tags'] = DEFAULT_PRIORITY_TAG_CONFIG.copy() defaults.setdefault('folder_paths', {}) defaults.setdefault('extra_folder_paths', {}) defaults['auto_organize_exclusions'] = [] defaults['metadata_refresh_skip_paths'] = [] library_name = defaults.get("active_library") or "default" default_library = self._build_library_payload( folder_paths=defaults.get("folder_paths", {}), extra_folder_paths=defaults.get("extra_folder_paths", {}), default_lora_root=defaults.get("default_lora_root"), default_checkpoint_root=defaults.get("default_checkpoint_root"), default_embedding_root=defaults.get("default_embedding_root"), ) defaults['libraries'] = {library_name: default_library} defaults['active_library'] = library_name return defaults def _normalize_priority_tag_config(self, value: Any) -> Dict[str, str]: normalized: Dict[str, str] = {} if isinstance(value, Mapping): for key, raw in value.items(): if not isinstance(key, str) or not isinstance(raw, str): continue normalized[key] = raw.strip() for model_type, default_value in DEFAULT_PRIORITY_TAG_CONFIG.items(): normalized.setdefault(model_type, default_value) return normalized def normalize_auto_organize_exclusions(self, value: Any) -> List[str]: if value is None: return [] if isinstance(value, str): candidates: Iterable[str] = ( value.replace("\n", ",").replace(";", ",").split(",") ) elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)): candidates = value else: return [] patterns: List[str] = [] for raw in candidates: if isinstance(raw, str): token = raw.strip() if token: patterns.append(token) unique_patterns: List[str] = [] seen = set() for pattern in patterns: if pattern not in seen: seen.add(pattern) unique_patterns.append(pattern) return unique_patterns def get_priority_tag_config(self) -> Dict[str, str]: stored_value = self.settings.get("priority_tags") normalized = self._normalize_priority_tag_config(stored_value) if normalized != stored_value: self.settings["priority_tags"] = normalized self._save_settings() return normalized.copy() def get_auto_organize_exclusions(self) -> List[str]: exclusions = self.normalize_auto_organize_exclusions( self.settings.get("auto_organize_exclusions") ) if exclusions != self.settings.get("auto_organize_exclusions"): self.settings["auto_organize_exclusions"] = exclusions self._save_settings() return exclusions def normalize_metadata_refresh_skip_paths(self, value: Any) -> List[str]: if value is None: return [] if isinstance(value, str): candidates: Iterable[str] = ( value.replace("\n", ",").replace(";", ",").split(",") ) elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)): candidates = value else: return [] paths: List[str] = [] for raw in candidates: if isinstance(raw, str): token = raw.replace("\\", "/").strip().strip("/") if token: paths.append(token) unique_paths: List[str] = [] seen = set() for path in paths: if path not in seen: seen.add(path) unique_paths.append(path) return unique_paths def get_metadata_refresh_skip_paths(self) -> List[str]: skip_paths = self.normalize_metadata_refresh_skip_paths( self.settings.get("metadata_refresh_skip_paths") ) if skip_paths != self.settings.get("metadata_refresh_skip_paths"): self.settings["metadata_refresh_skip_paths"] = skip_paths self._save_settings() return skip_paths def get_extra_folder_paths(self) -> Dict[str, List[str]]: """Get extra folder paths for the active library. These paths are only used by LoRA Manager and not shared with ComfyUI. Returns a dictionary with keys like 'loras', 'checkpoints', 'embeddings', 'unet'. """ extra_paths = self.settings.get("extra_folder_paths", {}) if not isinstance(extra_paths, dict): return {} return self._normalize_folder_paths(extra_paths) def update_extra_folder_paths( self, extra_folder_paths: Mapping[str, Iterable[str]], ) -> None: """Update extra folder paths for the active library. These paths are only used by LoRA Manager and not shared with ComfyUI. Validates that extra paths don't overlap with other libraries' paths. """ active_name = self.get_active_library_name() self._validate_folder_paths(active_name, extra_folder_paths) normalized_paths = self._normalize_folder_paths(extra_folder_paths) self.settings["extra_folder_paths"] = normalized_paths self._update_active_library_entry(extra_folder_paths=normalized_paths) self._save_settings() logger.info("Updated extra folder paths for library '%s'", active_name) def get_startup_messages(self) -> List[Dict[str, Any]]: return [message.copy() for message in self._startup_messages] def get_priority_tag_entries(self, model_type: str) -> List[PriorityTagEntry]: config = self.get_priority_tag_config() raw_config = config.get(model_type, "") return parse_priority_tag_string(raw_config) def resolve_priority_tag_for_model( self, tags: Sequence[str] | Iterable[str], model_type: str ) -> str: entries = self.get_priority_tag_entries(model_type) resolved = resolve_priority_tag(tags, entries) if resolved: return resolved for tag in tags: if isinstance(tag, str) and tag: return tag return "" def get_priority_tag_suggestions(self) -> Dict[str, List[str]]: suggestions: Dict[str, List[str]] = {} config = self.get_priority_tag_config() for model_type, raw_value in config.items(): entries = parse_priority_tag_string(raw_value) suggestions[model_type] = collect_canonical_tags(entries) return suggestions def get(self, key: str, default: Any = None) -> Any: """Get setting value""" return self.settings.get(key, default) def set(self, key: str, value: Any) -> None: """Set setting value and save""" if key == "auto_organize_exclusions": value = self.normalize_auto_organize_exclusions(value) elif key == "metadata_refresh_skip_paths": value = self.normalize_metadata_refresh_skip_paths(value) self.settings[key] = value portable_switch_pending = False if key == "use_portable_settings" and isinstance(value, bool): portable_switch_pending = True self._prepare_portable_switch(value) if key == 'folder_paths' and isinstance(value, Mapping): self._update_active_library_entry(folder_paths=value) # type: ignore[arg-type] elif key == 'extra_folder_paths' and isinstance(value, Mapping): self._update_active_library_entry(extra_folder_paths=value) # type: ignore[arg-type] elif key == 'default_lora_root': self._update_active_library_entry(default_lora_root=str(value)) elif key == 'default_checkpoint_root': self._update_active_library_entry(default_checkpoint_root=str(value)) elif key == 'default_unet_root': self._update_active_library_entry(default_unet_root=str(value)) elif key == 'default_embedding_root': self._update_active_library_entry(default_embedding_root=str(value)) elif key == 'model_name_display': self._notify_model_name_display_change(value) self._save_settings() if portable_switch_pending: self._finalize_portable_switch() def delete(self, key: str) -> None: """Delete setting key and save""" if key in self.settings: del self.settings[key] self._save_settings() logger.info(f"Deleted setting: {key}") def keys(self) -> Iterable[str]: """Return all setting keys.""" return self.settings.keys() def _prepare_portable_switch(self, use_portable: bool) -> None: """Prepare switching the settings storage location.""" legacy_path = get_legacy_settings_path() user_dir = self._get_user_config_directory() user_settings_path = os.path.join(user_dir, "settings.json") target_path = legacy_path if use_portable else user_settings_path other_path = user_settings_path if use_portable else legacy_path target_dir = os.path.dirname(target_path) os.makedirs(target_dir, exist_ok=True) previous_path = self.settings_file or target_path previous_dir = os.path.dirname(previous_path) or target_dir if os.path.abspath(previous_path) != os.path.abspath(target_path): self._copy_model_cache_directory(previous_dir, target_dir) logger.info("Switching settings file to: %s", target_path) self._pending_portable_switch = {"other_path": other_path} self.settings_file = target_path def _finalize_portable_switch(self) -> None: """Mirror the latest settings file to the secondary location.""" info = self._pending_portable_switch if not info: return other_path = info.get("other_path") current_path = self.settings_file if not other_path or not current_path: self._pending_portable_switch = None return if os.path.abspath(other_path) == os.path.abspath(current_path): self._pending_portable_switch = None return other_dir = os.path.dirname(other_path) or os.path.dirname(current_path) if other_dir: os.makedirs(other_dir, exist_ok=True) try: shutil.copy2(current_path, other_path) except Exception as exc: logger.warning("Failed to mirror settings.json to %s: %s", other_path, exc) finally: self._pending_portable_switch = None def _copy_model_cache_directory(self, source_dir: str, target_dir: str) -> None: """Copy model_cache artifacts when switching storage locations.""" if not source_dir or not target_dir: return source_cache_dir = os.path.join(source_dir, "model_cache") target_cache_dir = os.path.join(target_dir, "model_cache") if ( os.path.isdir(source_cache_dir) and os.path.abspath(source_cache_dir) != os.path.abspath(target_cache_dir) ): try: shutil.copytree( source_cache_dir, target_cache_dir, dirs_exist_ok=True, ignore=shutil.ignore_patterns("*.sqlite-shm", "*.sqlite-wal"), ) except Exception as exc: logger.warning( "Failed to copy model_cache directory from %s to %s: %s", source_cache_dir, target_cache_dir, exc, ) source_cache_file = os.path.join(source_dir, "model_cache.sqlite") target_cache_file = os.path.join(target_dir, "model_cache.sqlite") if ( os.path.isfile(source_cache_file) and os.path.abspath(source_cache_file) != os.path.abspath(target_cache_file) ): try: shutil.copy2(source_cache_file, target_cache_file) except Exception as exc: logger.warning( "Failed to copy model_cache.sqlite from %s to %s: %s", source_cache_file, target_cache_file, exc, ) def _get_user_config_directory(self) -> str: """Return the user configuration directory, falling back to ~/.config.""" try: config_dir = user_config_dir(APP_NAME, appauthor=False) or "" except Exception as exc: # pragma: no cover - defensive fallback logger.warning("Failed to determine user config directory: %s", exc) config_dir = "" if not config_dir: config_dir = os.path.join(os.path.expanduser("~"), f".config/{APP_NAME}") try: os.makedirs(config_dir, exist_ok=True) except Exception as exc: logger.warning("Failed to create user config directory %s: %s", config_dir, exc) return config_dir def _notify_model_name_display_change(self, value: Any) -> None: """Trigger cache resorting when the model name display preference updates.""" try: from .service_registry import ServiceRegistry # type: ignore except Exception: # pragma: no cover - registry optional in some contexts return display_mode = value if isinstance(value, str) else "model_name" pending: List[Tuple[Optional[asyncio.AbstractEventLoop], Awaitable[Any]]] = [] def _resolve_service_loop(service: Any) -> Optional[asyncio.AbstractEventLoop]: loop = getattr(service, "loop", None) if loop is None: loop = getattr(service, "_loop", None) return loop if isinstance(loop, asyncio.AbstractEventLoop) else None for service_name in ( "lora_scanner", "checkpoint_scanner", "embedding_scanner", "recipe_scanner", ): service = ServiceRegistry.get_service_sync(service_name) if not service or not hasattr(service, "on_model_name_display_changed"): continue try: result = service.on_model_name_display_changed(display_mode) except Exception as exc: # pragma: no cover - defensive guard logger.debug( "Service %s failed to schedule name display update: %s", service_name, exc, ) continue if asyncio.iscoroutine(result): service_loop = _resolve_service_loop(service) pending.append((service_loop, result)) if not pending: return try: loop = asyncio.get_running_loop() except RuntimeError: loop = None for service_loop, coroutine in pending: target_loop = service_loop or loop if target_loop is None: try: asyncio.run(coroutine) except RuntimeError: logger.debug("Skipping name display update due to missing event loop") continue if loop is not None and target_loop is loop: target_loop.create_task(coroutine) continue if target_loop.is_running(): try: asyncio.run_coroutine_threadsafe(coroutine, target_loop) except Exception as exc: # pragma: no cover - defensive guard logger.debug("Failed to dispatch name display update: %s", exc) continue try: asyncio.run(coroutine) except RuntimeError: logger.debug("Skipping name display update due to closed loop") def _save_settings(self) -> None: """Save settings to file""" try: payload = self._serialize_settings_for_disk() with open(self.settings_file, 'w', encoding='utf-8') as f: json.dump(payload, f, indent=2) except Exception as e: logger.error(f"Error saving settings: {e}") else: if self._bootstrap_reason == "missing": self._bootstrap_reason = None self._seed_template = None def _serialize_settings_for_disk(self) -> Dict[str, Any]: """Return the settings payload that should be persisted to disk. Only saves settings that differ from defaults, keeping the config file clean and focused on user customizations. Default values are still available at runtime via _get_default_settings(). """ if self._bootstrap_reason == "missing": minimal: Dict[str, Any] = {} for key in CORE_USER_SETTING_KEYS: if key in self.settings: minimal[key] = copy.deepcopy(self.settings[key]) if self._seed_template: for key, value in self._seed_template.items(): minimal.setdefault(key, copy.deepcopy(value)) return minimal # Only save settings that differ from defaults defaults = self._get_default_settings() minimal = {} for key, value in self.settings.items(): default_value = defaults.get(key) # Core settings are always saved (even if equal to default) if key in CORE_USER_SETTING_KEYS: minimal[key] = copy.deepcopy(value) # Complex objects need deep comparison elif isinstance(value, (dict, list)) and default_value is not None: if json.dumps(value, sort_keys=True, default=str) != json.dumps(default_value, sort_keys=True, default=str): minimal[key] = copy.deepcopy(value) # Simple values use direct comparison elif value != default_value: minimal[key] = copy.deepcopy(value) return minimal def get_libraries(self) -> Dict[str, Dict[str, Any]]: """Return a copy of the registered libraries.""" libraries = self.settings.get("libraries", {}) return copy.deepcopy(libraries) def get_active_library_name(self) -> str: """Return the currently active library name.""" libraries = self.settings.get("libraries", {}) active_name = self.settings.get("active_library") if active_name and active_name in libraries: return active_name if libraries: return next(iter(libraries.keys())) return "default" def get_active_library(self) -> Dict[str, Any]: """Return a copy of the active library configuration.""" libraries = self.settings.get("libraries", {}) active_name = self.get_active_library_name() return copy.deepcopy(libraries.get(active_name, {})) def activate_library(self, library_name: str) -> None: """Activate a library by name and refresh dependent services.""" libraries = self.settings.get("libraries", {}) if library_name not in libraries: raise KeyError(f"Library '{library_name}' does not exist") current_active = self.get_active_library_name() if current_active == library_name: # Ensure root settings stay in sync even if already active self._sync_active_library_to_root(save=False) self._save_settings() self._notify_library_change(library_name) return self.settings["active_library"] = library_name self._sync_active_library_to_root(save=False) self._save_settings() self._notify_library_change(library_name) def upsert_library( self, library_name: str, *, folder_paths: Optional[Mapping[str, Iterable[str]]] = None, extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None, default_lora_root: Optional[str] = None, default_checkpoint_root: Optional[str] = None, default_unet_root: Optional[str] = None, default_embedding_root: Optional[str] = None, metadata: Optional[Mapping[str, Any]] = None, activate: bool = False, ) -> Dict[str, Any]: """Create or update a library definition.""" name = library_name.strip() if not name: raise ValueError("Library name cannot be empty") if folder_paths is not None: self._validate_folder_paths(name, folder_paths) if extra_folder_paths is not None: self._validate_folder_paths(name, extra_folder_paths) libraries = self.settings.setdefault("libraries", {}) existing = libraries.get(name, {}) payload = self._build_library_payload( folder_paths=folder_paths if folder_paths is not None else existing.get("folder_paths"), extra_folder_paths=extra_folder_paths if extra_folder_paths is not None else existing.get("extra_folder_paths"), default_lora_root=default_lora_root if default_lora_root is not None else existing.get("default_lora_root"), default_checkpoint_root=( default_checkpoint_root if default_checkpoint_root is not None else existing.get("default_checkpoint_root") ), default_unet_root=( default_unet_root if default_unet_root is not None else existing.get("default_unet_root") ), default_embedding_root=( default_embedding_root if default_embedding_root is not None else existing.get("default_embedding_root") ), metadata=metadata if metadata is not None else existing.get("metadata"), base=existing, ) libraries[name] = payload if activate or not self.settings.get("active_library"): self.settings["active_library"] = name self._sync_active_library_to_root(save=False) self._save_settings() if self.settings.get("active_library") == name: self._notify_library_change(name) return payload def create_library( self, library_name: str, *, folder_paths: Mapping[str, Iterable[str]], extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None, default_lora_root: str = "", default_checkpoint_root: str = "", default_unet_root: str = "", default_embedding_root: str = "", metadata: Optional[Mapping[str, Any]] = None, activate: bool = False, ) -> Dict[str, Any]: """Create a new library entry.""" libraries = self.settings.get("libraries", {}) if library_name in libraries: raise ValueError(f"Library '{library_name}' already exists") return self.upsert_library( library_name, folder_paths=folder_paths, extra_folder_paths=extra_folder_paths, default_lora_root=default_lora_root, default_checkpoint_root=default_checkpoint_root, default_unet_root=default_unet_root, default_embedding_root=default_embedding_root, metadata=metadata, activate=activate, ) def rename_library(self, old_name: str, new_name: str) -> None: """Rename an existing library.""" libraries = self.settings.get("libraries", {}) if old_name not in libraries: raise KeyError(f"Library '{old_name}' does not exist") new_name_stripped = new_name.strip() if not new_name_stripped: raise ValueError("New library name cannot be empty") if new_name_stripped in libraries: raise ValueError(f"Library '{new_name_stripped}' already exists") libraries[new_name_stripped] = libraries.pop(old_name) if self.settings.get("active_library") == old_name: self.settings["active_library"] = new_name_stripped active_name = new_name_stripped else: active_name = self.settings.get("active_library") self._sync_active_library_to_root(save=False) self._save_settings() if active_name == new_name_stripped: self._notify_library_change(new_name_stripped) def delete_library(self, library_name: str) -> None: """Remove a library definition.""" libraries = self.settings.get("libraries", {}) if library_name not in libraries: raise KeyError(f"Library '{library_name}' does not exist") if len(libraries) == 1: raise ValueError("At least one library must remain") was_active = self.settings.get("active_library") == library_name libraries.pop(library_name) if was_active: new_active = next(iter(libraries.keys())) self.settings["active_library"] = new_active self._sync_active_library_to_root(save=False) self._save_settings() if was_active: self._notify_library_change(self.settings["active_library"]) def update_active_library_paths( self, folder_paths: Mapping[str, Iterable[str]], *, extra_folder_paths: Optional[Mapping[str, Iterable[str]]] = None, default_lora_root: Optional[str] = None, default_checkpoint_root: Optional[str] = None, default_unet_root: Optional[str] = None, default_embedding_root: Optional[str] = None, ) -> None: """Update folder paths for the active library.""" active_name = self.get_active_library_name() self.upsert_library( active_name, folder_paths=folder_paths, extra_folder_paths=extra_folder_paths, default_lora_root=default_lora_root, default_checkpoint_root=default_checkpoint_root, default_unet_root=default_unet_root, default_embedding_root=default_embedding_root, activate=True, ) def _notify_library_change(self, library_name: str) -> None: """Notify dependent services that the active library changed.""" libraries = self.settings.get("libraries", {}) library_config = libraries.get(library_name, {}) library_snapshot = copy.deepcopy(library_config) try: from ..config import config # Local import to avoid circular dependency config.apply_library_settings(library_snapshot) except Exception as exc: # pragma: no cover - defensive logging logger.debug("Failed to apply library settings to config: %s", exc) try: from .service_registry import ServiceRegistry # type: ignore for service_name in ( "lora_scanner", "checkpoint_scanner", "embedding_scanner", "recipe_scanner", ): service = ServiceRegistry.get_service_sync(service_name) if service and hasattr(service, "on_library_changed"): try: service.on_library_changed() except Exception as service_exc: # pragma: no cover - defensive logging logger.debug( "Service %s failed to handle library change: %s", service_name, service_exc, ) except Exception as exc: # pragma: no cover - defensive logging logger.debug("Failed to notify services about library change: %s", exc) def get_download_path_template(self, model_type: str) -> str: """Get download path template for specific model type Args: model_type: The type of model ('lora', 'checkpoint', 'embedding') Returns: Template string for the model type, defaults to '{base_model}/{first_tag}' """ templates = self.settings.get('download_path_templates', {}) # Handle edge case where templates might be stored as JSON string if isinstance(templates, str): try: # Try to parse JSON string parsed_templates = json.loads(templates) if isinstance(parsed_templates, dict): # Update settings with parsed dictionary self.settings['download_path_templates'] = parsed_templates self._save_settings() templates = parsed_templates logger.info("Successfully parsed download_path_templates from JSON string") else: raise ValueError("Parsed JSON is not a dictionary") except (json.JSONDecodeError, ValueError) as e: # If parsing fails, set default values logger.warning(f"Failed to parse download_path_templates JSON string: {e}. Setting default values.") default_template = '{base_model}/{first_tag}' templates = { 'lora': default_template, 'checkpoint': default_template, 'embedding': default_template } self.settings['download_path_templates'] = templates self._save_settings() # Ensure templates is a dictionary if not isinstance(templates, dict): default_template = '{base_model}/{first_tag}' templates = { 'lora': default_template, 'checkpoint': default_template, 'embedding': default_template } self.settings['download_path_templates'] = templates self._save_settings() return templates.get(model_type, '{base_model}/{first_tag}') _SETTINGS_MANAGER: Optional["SettingsManager"] = None _SETTINGS_MANAGER_LOCK = Lock() # Legacy module-level alias for backwards compatibility with callers that # monkeypatch ``py.services.settings_manager.settings`` during tests. settings: Optional["SettingsManager"] = None def get_settings_manager() -> "SettingsManager": """Return the lazily initialised global :class:`SettingsManager`.""" global _SETTINGS_MANAGER, settings if settings is not None: return settings if _SETTINGS_MANAGER is None: with _SETTINGS_MANAGER_LOCK: if _SETTINGS_MANAGER is None: _SETTINGS_MANAGER = SettingsManager() settings = _SETTINGS_MANAGER return _SETTINGS_MANAGER def reset_settings_manager() -> None: """Reset the cached settings manager instance. Primarily intended for tests so they can configure the settings directory before the manager touches the filesystem. """ global _SETTINGS_MANAGER, settings with _SETTINGS_MANAGER_LOCK: _SETTINGS_MANAGER = None settings = None