fix(settings): enforce valid default model roots

This commit is contained in:
Will Miao
2026-04-01 20:36:37 +08:00
parent f93baf5fc0
commit 9bdb337962
6 changed files with 520 additions and 225 deletions

View File

@@ -25,6 +25,31 @@ standalone_mode = (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _resolve_valid_default_root(
current: str, primary_paths: List[str], name: str
) -> str:
"""Return a valid default root from the current primary path set."""
valid_paths = [path for path in primary_paths if isinstance(path, str) and path.strip()]
if not valid_paths:
return ""
if current in valid_paths:
return current
if current:
logger.info(
"Repaired stale %s from '%s' to '%s'",
name,
current,
valid_paths[0],
)
else:
logger.info("Auto-setting %s to '%s'", name, valid_paths[0])
return valid_paths[0]
def _normalize_folder_paths_for_comparison( def _normalize_folder_paths_for_comparison(
folder_paths: Mapping[str, Iterable[str]], folder_paths: Mapping[str, Iterable[str]],
) -> Dict[str, Set[str]]: ) -> Dict[str, Set[str]]:
@@ -197,25 +222,23 @@ class Config:
"Failed to rename legacy 'default' library: %s", rename_error "Failed to rename legacy 'default' library: %s", rename_error
) )
default_lora_root = comfy_library.get("default_lora_root", "") default_lora_root = _resolve_valid_default_root(
if not default_lora_root and len(self.loras_roots) == 1: comfy_library.get("default_lora_root", ""),
default_lora_root = self.loras_roots[0] list(self.loras_roots or []),
"default_lora_root",
)
default_checkpoint_root = comfy_library.get("default_checkpoint_root", "") default_checkpoint_root = _resolve_valid_default_root(
if ( comfy_library.get("default_checkpoint_root", ""),
not default_checkpoint_root list(self.checkpoints_roots or []),
and self.checkpoints_roots "default_checkpoint_root",
and len(self.checkpoints_roots) == 1 )
):
default_checkpoint_root = self.checkpoints_roots[0]
default_embedding_root = comfy_library.get("default_embedding_root", "") default_embedding_root = _resolve_valid_default_root(
if ( comfy_library.get("default_embedding_root", ""),
not default_embedding_root list(self.embeddings_roots or []),
and self.embeddings_roots "default_embedding_root",
and len(self.embeddings_roots) == 1 )
):
default_embedding_root = self.embeddings_roots[0]
metadata = dict(comfy_library.get("metadata", {})) metadata = dict(comfy_library.get("metadata", {}))
metadata.setdefault("display_name", "ComfyUI") metadata.setdefault("display_name", "ComfyUI")
@@ -706,7 +729,9 @@ class Config:
return unique_paths return unique_paths
@staticmethod @staticmethod
def _normalize_path_for_comparison(path: str, *, resolve_realpath: bool = False) -> str: def _normalize_path_for_comparison(
path: str, *, resolve_realpath: bool = False
) -> str:
"""Normalize a path for equality checks across platforms.""" """Normalize a path for equality checks across platforms."""
candidate = os.path.realpath(path) if resolve_realpath else path candidate = os.path.realpath(path) if resolve_realpath else path
return os.path.normcase(os.path.normpath(candidate)).replace(os.sep, "/") return os.path.normcase(os.path.normpath(candidate)).replace(os.sep, "/")

View File

@@ -7,7 +7,17 @@ import logging
from pathlib import Path from pathlib import Path
from datetime import datetime, timezone from datetime import datetime, timezone
from threading import Lock from threading import Lock
from typing import Any, Awaitable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple from typing import (
Any,
Awaitable,
Dict,
Iterable,
List,
Mapping,
Optional,
Sequence,
Tuple,
)
from platformdirs import user_config_dir from platformdirs import user_config_dir
@@ -17,7 +27,11 @@ from ..utils.constants import (
SUPPORTED_DOWNLOAD_SKIP_BASE_MODELS, SUPPORTED_DOWNLOAD_SKIP_BASE_MODELS,
) )
from ..utils.preview_selection import VALID_MATURE_BLUR_LEVELS from ..utils.preview_selection import VALID_MATURE_BLUR_LEVELS
from ..utils.settings_paths import APP_NAME, ensure_settings_file, get_legacy_settings_path from ..utils.settings_paths import (
APP_NAME,
ensure_settings_file,
get_legacy_settings_path,
)
from ..utils.tag_priorities import ( from ..utils.tag_priorities import (
PriorityTagEntry, PriorityTagEntry,
collect_canonical_tags, collect_canonical_tags,
@@ -94,7 +108,9 @@ class SettingsManager:
self._template_payload_cache_loaded = False self._template_payload_cache_loaded = False
self._original_disk_payload: Optional[Dict[str, Any]] = None self._original_disk_payload: Optional[Dict[str, Any]] = None
self._preserve_disk_template = False self._preserve_disk_template = False
self._template_path = Path(__file__).resolve().parents[2] / "settings.json.example" self._template_path = (
Path(__file__).resolve().parents[2] / "settings.json.example"
)
self.settings = self._load_settings() self.settings = self._load_settings()
self._migrate_setting_keys() self._migrate_setting_keys()
self._ensure_default_settings() self._ensure_default_settings()
@@ -120,7 +136,7 @@ class SettingsManager:
"""Load settings from file""" """Load settings from file"""
if os.path.exists(self.settings_file): if os.path.exists(self.settings_file):
try: try:
with open(self.settings_file, 'r', encoding='utf-8') as f: with open(self.settings_file, "r", encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
if isinstance(data, dict): if isinstance(data, dict):
self._original_disk_payload = copy.deepcopy(data) self._original_disk_payload = copy.deepcopy(data)
@@ -198,7 +214,9 @@ class SettingsManager:
return None return None
if not isinstance(data, dict): if not isinstance(data, dict):
logger.debug("settings.json.example is not a JSON object; ignoring template") logger.debug(
"settings.json.example is not a JSON object; ignoring template"
)
return None return None
self._template_payload_cache = copy.deepcopy(data) self._template_payload_cache = copy.deepcopy(data)
@@ -274,7 +292,9 @@ class SettingsManager:
normalized_skip_paths = self.normalize_metadata_refresh_skip_paths( normalized_skip_paths = self.normalize_metadata_refresh_skip_paths(
self.settings.get("metadata_refresh_skip_paths") self.settings.get("metadata_refresh_skip_paths")
) )
if normalized_skip_paths != self.settings.get("metadata_refresh_skip_paths"): if normalized_skip_paths != self.settings.get(
"metadata_refresh_skip_paths"
):
self.settings["metadata_refresh_skip_paths"] = normalized_skip_paths self.settings["metadata_refresh_skip_paths"] = normalized_skip_paths
updated_existing = True updated_existing = True
else: else:
@@ -288,9 +308,7 @@ class SettingsManager:
if normalized_skip_base_models != self.settings.get( if normalized_skip_base_models != self.settings.get(
"download_skip_base_models" "download_skip_base_models"
): ):
self.settings["download_skip_base_models"] = ( self.settings["download_skip_base_models"] = normalized_skip_base_models
normalized_skip_base_models
)
updated_existing = True updated_existing = True
else: else:
self.settings["download_skip_base_models"] = [] self.settings["download_skip_base_models"] = []
@@ -330,19 +348,19 @@ class SettingsManager:
raw_top_level_paths = self.settings.get("folder_paths", {}) raw_top_level_paths = self.settings.get("folder_paths", {})
normalized_top_level_paths: Dict[str, List[str]] = {} normalized_top_level_paths: Dict[str, List[str]] = {}
if isinstance(raw_top_level_paths, Mapping): if isinstance(raw_top_level_paths, Mapping):
normalized_top_level_paths = self._normalize_folder_paths(raw_top_level_paths) normalized_top_level_paths = self._normalize_folder_paths(
raw_top_level_paths
)
if normalized_top_level_paths != raw_top_level_paths: if normalized_top_level_paths != raw_top_level_paths:
self.settings["folder_paths"] = copy.deepcopy(normalized_top_level_paths) self.settings["folder_paths"] = copy.deepcopy(
normalized_top_level_paths
)
top_level_has_paths = self._has_configured_paths(normalized_top_level_paths) top_level_has_paths = self._has_configured_paths(normalized_top_level_paths)
needs_library_bootstrap = not isinstance(libraries, dict) or not libraries needs_library_bootstrap = not isinstance(libraries, dict) or not libraries
if ( if not needs_library_bootstrap and top_level_has_paths and len(libraries) == 1:
not needs_library_bootstrap
and top_level_has_paths
and len(libraries) == 1
):
only_library_payload = next(iter(libraries.values())) only_library_payload = next(iter(libraries.values()))
if isinstance(only_library_payload, Mapping): if isinstance(only_library_payload, Mapping):
folder_payload = only_library_payload.get("folder_paths") folder_payload = only_library_payload.get("folder_paths")
@@ -354,7 +372,9 @@ class SettingsManager:
library_payload = self._build_library_payload( library_payload = self._build_library_payload(
folder_paths=normalized_top_level_paths, folder_paths=normalized_top_level_paths,
default_lora_root=self.settings.get("default_lora_root", ""), default_lora_root=self.settings.get("default_lora_root", ""),
default_checkpoint_root=self.settings.get("default_checkpoint_root", ""), default_checkpoint_root=self.settings.get(
"default_checkpoint_root", ""
),
default_unet_root=self.settings.get("default_unet_root", ""), default_unet_root=self.settings.get("default_unet_root", ""),
default_embedding_root=self.settings.get("default_embedding_root", ""), default_embedding_root=self.settings.get("default_embedding_root", ""),
) )
@@ -376,7 +396,11 @@ class SettingsManager:
if target_name: if target_name:
candidate_payload = libraries.get(target_name) candidate_payload = libraries.get(target_name)
if isinstance(candidate_payload, Mapping) and not self._has_configured_paths(candidate_payload.get("folder_paths")): if isinstance(
candidate_payload, Mapping
) and not self._has_configured_paths(
candidate_payload.get("folder_paths")
):
seed_library_name = target_name seed_library_name = target_name
sanitized_libraries: Dict[str, Dict[str, Any]] = {} sanitized_libraries: Dict[str, Dict[str, Any]] = {}
@@ -435,11 +459,17 @@ class SettingsManager:
active_library = libraries.get(active_name, {}) active_library = libraries.get(active_name, {})
folder_paths = copy.deepcopy(active_library.get("folder_paths", {})) folder_paths = copy.deepcopy(active_library.get("folder_paths", {}))
self.settings["folder_paths"] = folder_paths self.settings["folder_paths"] = folder_paths
self.settings["extra_folder_paths"] = copy.deepcopy(active_library.get("extra_folder_paths", {})) self.settings["extra_folder_paths"] = copy.deepcopy(
active_library.get("extra_folder_paths", {})
)
self.settings["default_lora_root"] = active_library.get("default_lora_root", "") self.settings["default_lora_root"] = active_library.get("default_lora_root", "")
self.settings["default_checkpoint_root"] = active_library.get("default_checkpoint_root", "") self.settings["default_checkpoint_root"] = active_library.get(
"default_checkpoint_root", ""
)
self.settings["default_unet_root"] = active_library.get("default_unet_root", "") self.settings["default_unet_root"] = active_library.get("default_unet_root", "")
self.settings["default_embedding_root"] = active_library.get("default_embedding_root", "") self.settings["default_embedding_root"] = active_library.get(
"default_embedding_root", ""
)
if save: if save:
self._save_settings() self._save_settings()
@@ -468,7 +498,9 @@ class SettingsManager:
payload.setdefault("folder_paths", {}) payload.setdefault("folder_paths", {})
if extra_folder_paths is not None: if extra_folder_paths is not None:
payload["extra_folder_paths"] = self._normalize_folder_paths(extra_folder_paths) payload["extra_folder_paths"] = self._normalize_folder_paths(
extra_folder_paths
)
else: else:
payload.setdefault("extra_folder_paths", {}) payload.setdefault("extra_folder_paths", {})
@@ -577,7 +609,9 @@ class SettingsManager:
} }
overlap = existing.intersection(new_paths.keys()) overlap = existing.intersection(new_paths.keys())
if overlap: if overlap:
collisions = ", ".join(sorted(new_paths[value] for value in overlap)) collisions = ", ".join(
sorted(new_paths[value] for value in overlap)
)
raise ValueError( raise ValueError(
f"Folder path(s) {collisions} already assigned to library '{other_name}'" f"Folder path(s) {collisions} already assigned to library '{other_name}'"
) )
@@ -612,19 +646,31 @@ class SettingsManager:
library["extra_folder_paths"] = normalized_extra_paths library["extra_folder_paths"] = normalized_extra_paths
changed = True changed = True
if default_lora_root is not None and library.get("default_lora_root") != default_lora_root: if (
default_lora_root is not None
and library.get("default_lora_root") != default_lora_root
):
library["default_lora_root"] = default_lora_root library["default_lora_root"] = default_lora_root
changed = True changed = True
if default_checkpoint_root is not None and library.get("default_checkpoint_root") != default_checkpoint_root: if (
default_checkpoint_root is not None
and library.get("default_checkpoint_root") != default_checkpoint_root
):
library["default_checkpoint_root"] = default_checkpoint_root library["default_checkpoint_root"] = default_checkpoint_root
changed = True changed = True
if default_unet_root is not None and library.get("default_unet_root") != default_unet_root: if (
default_unet_root is not None
and library.get("default_unet_root") != default_unet_root
):
library["default_unet_root"] = default_unet_root library["default_unet_root"] = default_unet_root
changed = True changed = True
if default_embedding_root is not None and library.get("default_embedding_root") != default_embedding_root: if (
default_embedding_root is not None
and library.get("default_embedding_root") != default_embedding_root
):
library["default_embedding_root"] = default_embedding_root library["default_embedding_root"] = default_embedding_root
changed = True changed = True
@@ -637,16 +683,16 @@ class SettingsManager:
def _migrate_setting_keys(self) -> None: def _migrate_setting_keys(self) -> None:
"""Migrate legacy camelCase setting keys to snake_case""" """Migrate legacy camelCase setting keys to snake_case"""
key_migrations = { key_migrations = {
'optimizeExampleImages': 'optimize_example_images', "optimizeExampleImages": "optimize_example_images",
'autoDownloadExampleImages': 'auto_download_example_images', "autoDownloadExampleImages": "auto_download_example_images",
'blurMatureContent': 'blur_mature_content', "blurMatureContent": "blur_mature_content",
'matureBlurLevel': 'mature_blur_level', "matureBlurLevel": "mature_blur_level",
'autoplayOnHover': 'autoplay_on_hover', "autoplayOnHover": "autoplay_on_hover",
'displayDensity': 'display_density', "displayDensity": "display_density",
'cardInfoDisplay': 'card_info_display', "cardInfoDisplay": "card_info_display",
'includeTriggerWords': 'include_trigger_words', "includeTriggerWords": "include_trigger_words",
'compactMode': 'compact_mode', "compactMode": "compact_mode",
'modelCardFooterAction': 'model_card_footer_action', "modelCardFooterAction": "model_card_footer_action",
} }
updated = False updated = False
@@ -663,65 +709,77 @@ class SettingsManager:
def _migrate_download_path_template(self): def _migrate_download_path_template(self):
"""Migrate old download_path_template to new download_path_templates""" """Migrate old download_path_template to new download_path_templates"""
old_template = self.settings.get('download_path_template') old_template = self.settings.get("download_path_template")
templates = self.settings.get('download_path_templates') templates = self.settings.get("download_path_templates")
# If old template exists and new templates don't exist, migrate # If old template exists and new templates don't exist, migrate
if old_template is not None and not templates: if old_template is not None and not templates:
logger.info("Migrating download_path_template to download_path_templates") logger.info("Migrating download_path_template to download_path_templates")
self.settings['download_path_templates'] = { self.settings["download_path_templates"] = {
'lora': old_template, "lora": old_template,
'checkpoint': old_template, "checkpoint": old_template,
'embedding': old_template "embedding": old_template,
} }
# Remove old setting # Remove old setting
del self.settings['download_path_template'] del self.settings["download_path_template"]
self._save_settings() self._save_settings()
logger.info("Migration completed") logger.info("Migration completed")
def _auto_set_default_roots(self): def _auto_set_default_roots(self):
"""Auto set default root paths when the current default is unset or not among the options. """Ensure default root paths always point at a current valid root.
For single-path cases, always use that path. Empty or stale defaults are repaired to the first configured root.
For multi-path cases, only set if current default is empty or invalid. Skips auto-setting when the settings file matches the template
(user hasn't customized yet).
""" """
folder_paths = self.settings.get('folder_paths', {}) # Skip auto-setting if the user hasn't customized settings yet (template preserved)
if self._preserve_disk_template:
return
folder_paths = self.settings.get("folder_paths", {})
updated = False updated = False
# loras
loras = folder_paths.get('loras', []) def _check_and_auto_set(key: str, setting_key: str) -> bool:
if isinstance(loras, list) and len(loras) == 1: """Repair default roots when empty or no longer present."""
current_lora_root = self.settings.get('default_lora_root') current = self.settings.get(setting_key, "")
if current_lora_root not in loras: candidates = folder_paths.get(key, [])
self.settings['default_lora_root'] = loras[0] if not isinstance(candidates, list) or not candidates:
updated = True return False
# checkpoints
checkpoints = folder_paths.get('checkpoints', []) # Filter valid path strings
if isinstance(checkpoints, list) and len(checkpoints) == 1: valid_paths = [p for p in candidates if isinstance(p, str) and p.strip()]
current_checkpoint_root = self.settings.get('default_checkpoint_root') if not valid_paths:
if current_checkpoint_root not in checkpoints: return False
self.settings['default_checkpoint_root'] = checkpoints[0]
updated = True if current in valid_paths:
# unet (diffusion models) - auto-set if empty or invalid return False
unet_paths = folder_paths.get('unet', [])
if isinstance(unet_paths, list) and len(unet_paths) >= 1: self.settings[setting_key] = valid_paths[0]
current_unet_root = self.settings.get('default_unet_root') if current:
# Set to first path if current is empty or not in the valid paths logger.info(
if not current_unet_root or current_unet_root not in unet_paths: "Repaired stale %s from '%s' to '%s'",
self.settings['default_unet_root'] = unet_paths[0] setting_key,
updated = True current,
# embeddings valid_paths[0],
embeddings = folder_paths.get('embeddings', []) )
if isinstance(embeddings, list) and len(embeddings) == 1: else:
current_embedding_root = self.settings.get('default_embedding_root') logger.info("Auto-set %s to '%s'", setting_key, valid_paths[0])
if current_embedding_root not in embeddings: return True
self.settings['default_embedding_root'] = embeddings[0]
updated = True # Process all model types
updated = _check_and_auto_set("loras", "default_lora_root") or updated
updated = (
_check_and_auto_set("checkpoints", "default_checkpoint_root") or updated
)
updated = _check_and_auto_set("unet", "default_unet_root") or updated
updated = _check_and_auto_set("embeddings", "default_embedding_root") or updated
if updated: if updated:
self._update_active_library_entry( self._update_active_library_entry(
default_lora_root=self.settings.get('default_lora_root'), default_lora_root=self.settings.get("default_lora_root"),
default_checkpoint_root=self.settings.get('default_checkpoint_root'), default_checkpoint_root=self.settings.get("default_checkpoint_root"),
default_unet_root=self.settings.get('default_unet_root'), default_unet_root=self.settings.get("default_unet_root"),
default_embedding_root=self.settings.get('default_embedding_root'), default_embedding_root=self.settings.get("default_embedding_root"),
) )
if self._bootstrap_reason == "missing": if self._bootstrap_reason == "missing":
self._needs_initial_save = True self._needs_initial_save = True
@@ -730,11 +788,11 @@ class SettingsManager:
def _check_environment_variables(self) -> None: def _check_environment_variables(self) -> None:
"""Check for environment variables and update settings if needed""" """Check for environment variables and update settings if needed"""
env_api_key = os.environ.get('CIVITAI_API_KEY') env_api_key = os.environ.get("CIVITAI_API_KEY")
if env_api_key: # Check if the environment variable exists and is not empty if env_api_key: # Check if the environment variable exists and is not empty
logger.info("Found CIVITAI_API_KEY environment variable") logger.info("Found CIVITAI_API_KEY environment variable")
# Always use the environment variable if it exists # Always use the environment variable if it exists
self.settings['civitai_api_key'] = env_api_key self.settings["civitai_api_key"] = env_api_key
self._save_settings() self._save_settings()
def _default_settings_actions(self) -> List[Dict[str, Any]]: def _default_settings_actions(self) -> List[Dict[str, Any]]:
@@ -799,7 +857,9 @@ class SettingsManager:
disk_value = self._original_disk_payload.get(key) disk_value = self._original_disk_payload.get(key)
default_value = defaults.get(key) default_value = defaults.get(key)
# Compare using JSON serialization for complex objects # Compare using JSON serialization for complex objects
if json.dumps(disk_value, sort_keys=True, default=str) == json.dumps(default_value, sort_keys=True, default=str): if json.dumps(disk_value, sort_keys=True, default=str) == json.dumps(
default_value, sort_keys=True, default=str
):
default_value_keys.add(key) default_value_keys.add(key)
# Only cleanup if there are "many" default keys (indicating a bloated file) # Only cleanup if there are "many" default keys (indicating a bloated file)
@@ -807,7 +867,7 @@ class SettingsManager:
if len(default_value_keys) >= DEFAULT_KEYS_CLEANUP_THRESHOLD: if len(default_value_keys) >= DEFAULT_KEYS_CLEANUP_THRESHOLD:
logger.info( logger.info(
"Cleaning up %d default value(s) from settings.json to keep it minimal", "Cleaning up %d default value(s) from settings.json to keep it minimal",
len(default_value_keys) len(default_value_keys),
) )
self._save_settings() self._save_settings()
# Update original payload to match what we just saved # Update original payload to match what we just saved
@@ -817,8 +877,8 @@ class SettingsManager:
if not self._standalone_mode: if not self._standalone_mode:
return return
folder_paths = self.settings.get('folder_paths', {}) or {} folder_paths = self.settings.get("folder_paths", {}) or {}
monitored_keys = ('loras', 'checkpoints', 'embeddings') monitored_keys = ("loras", "checkpoints", "embeddings")
has_valid_paths = False has_valid_paths = False
for key in monitored_keys: for key in monitored_keys:
@@ -829,7 +889,10 @@ class SettingsManager:
iterator = list(raw_paths) iterator = list(raw_paths)
except TypeError: except TypeError:
continue continue
if any(isinstance(path, str) and path and os.path.exists(path) for path in iterator): if any(
isinstance(path, str) and path and os.path.exists(path)
for path in iterator
):
has_valid_paths = True has_valid_paths = True
break break
@@ -860,13 +923,13 @@ class SettingsManager:
def _get_default_settings(self) -> Dict[str, Any]: def _get_default_settings(self) -> Dict[str, Any]:
"""Return default settings""" """Return default settings"""
defaults = copy.deepcopy(DEFAULT_SETTINGS) defaults = copy.deepcopy(DEFAULT_SETTINGS)
defaults['base_model_path_mappings'] = {} defaults["base_model_path_mappings"] = {}
defaults['download_path_templates'] = {} defaults["download_path_templates"] = {}
defaults['priority_tags'] = DEFAULT_PRIORITY_TAG_CONFIG.copy() defaults["priority_tags"] = DEFAULT_PRIORITY_TAG_CONFIG.copy()
defaults.setdefault('folder_paths', {}) defaults.setdefault("folder_paths", {})
defaults.setdefault('extra_folder_paths', {}) defaults.setdefault("extra_folder_paths", {})
defaults['auto_organize_exclusions'] = [] defaults["auto_organize_exclusions"] = []
defaults['metadata_refresh_skip_paths'] = [] defaults["metadata_refresh_skip_paths"] = []
library_name = defaults.get("active_library") or "default" library_name = defaults.get("active_library") or "default"
default_library = self._build_library_payload( default_library = self._build_library_payload(
@@ -876,8 +939,8 @@ class SettingsManager:
default_checkpoint_root=defaults.get("default_checkpoint_root"), default_checkpoint_root=defaults.get("default_checkpoint_root"),
default_embedding_root=defaults.get("default_embedding_root"), default_embedding_root=defaults.get("default_embedding_root"),
) )
defaults['libraries'] = {library_name: default_library} defaults["libraries"] = {library_name: default_library}
defaults['active_library'] = library_name defaults["active_library"] = library_name
return defaults return defaults
def _normalize_priority_tag_config(self, value: Any) -> Dict[str, str]: def _normalize_priority_tag_config(self, value: Any) -> Dict[str, str]:
@@ -908,7 +971,9 @@ class SettingsManager:
candidates: Iterable[str] = ( candidates: Iterable[str] = (
value.replace("\n", ",").replace(";", ",").split(",") value.replace("\n", ",").replace(";", ",").split(",")
) )
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)): elif isinstance(value, Sequence) and not isinstance(
value, (bytes, bytearray, str)
):
candidates = value candidates = value
else: else:
return [] return []
@@ -954,7 +1019,9 @@ class SettingsManager:
candidates: Iterable[str] = ( candidates: Iterable[str] = (
value.replace("\n", ",").replace(";", ",").split(",") value.replace("\n", ",").replace(";", ",").split(",")
) )
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)): elif isinstance(value, Sequence) and not isinstance(
value, (bytes, bytearray, str)
):
candidates = value candidates = value
else: else:
return [] return []
@@ -1060,7 +1127,9 @@ class SettingsManager:
continue continue
normalized = os.path.normcase(os.path.normpath(stripped)) normalized = os.path.normcase(os.path.normpath(stripped))
if os.path.exists(stripped): if os.path.exists(stripped):
normalized = os.path.normcase(os.path.normpath(os.path.realpath(stripped))) normalized = os.path.normcase(
os.path.normpath(os.path.realpath(stripped))
)
primary_real_paths.add(normalized) primary_real_paths.add(normalized)
primary_symlink_targets = set() primary_symlink_targets = set()
@@ -1096,8 +1165,13 @@ class SettingsManager:
continue continue
normalized = os.path.normcase(os.path.normpath(stripped)) normalized = os.path.normcase(os.path.normpath(stripped))
if os.path.exists(stripped): if os.path.exists(stripped):
normalized = os.path.normcase(os.path.normpath(os.path.realpath(stripped))) normalized = os.path.normcase(
if normalized in primary_real_paths or normalized in primary_symlink_targets: os.path.normpath(os.path.realpath(stripped))
)
if (
normalized in primary_real_paths
or normalized in primary_symlink_targets
):
overlapping_paths.append(stripped) overlapping_paths.append(stripped)
if overlapping_paths: if overlapping_paths:
@@ -1161,19 +1235,19 @@ class SettingsManager:
if key == "use_portable_settings" and isinstance(value, bool): if key == "use_portable_settings" and isinstance(value, bool):
portable_switch_pending = True portable_switch_pending = True
self._prepare_portable_switch(value) self._prepare_portable_switch(value)
if key == 'folder_paths' and isinstance(value, Mapping): if key == "folder_paths" and isinstance(value, Mapping):
self._update_active_library_entry(folder_paths=value) # type: ignore[arg-type] self._update_active_library_entry(folder_paths=value) # type: ignore[arg-type]
elif key == 'extra_folder_paths' and isinstance(value, Mapping): elif key == "extra_folder_paths" and isinstance(value, Mapping):
self._update_active_library_entry(extra_folder_paths=value) # type: ignore[arg-type] self._update_active_library_entry(extra_folder_paths=value) # type: ignore[arg-type]
elif key == 'default_lora_root': elif key == "default_lora_root":
self._update_active_library_entry(default_lora_root=str(value)) self._update_active_library_entry(default_lora_root=str(value))
elif key == 'default_checkpoint_root': elif key == "default_checkpoint_root":
self._update_active_library_entry(default_checkpoint_root=str(value)) self._update_active_library_entry(default_checkpoint_root=str(value))
elif key == 'default_unet_root': elif key == "default_unet_root":
self._update_active_library_entry(default_unet_root=str(value)) self._update_active_library_entry(default_unet_root=str(value))
elif key == 'default_embedding_root': elif key == "default_embedding_root":
self._update_active_library_entry(default_embedding_root=str(value)) self._update_active_library_entry(default_embedding_root=str(value))
elif key == 'model_name_display': elif key == "model_name_display":
self._notify_model_name_display_change(value) self._notify_model_name_display_change(value)
self._save_settings() self._save_settings()
if portable_switch_pending: if portable_switch_pending:
@@ -1249,10 +1323,9 @@ class SettingsManager:
source_cache_dir = os.path.join(source_dir, "model_cache") source_cache_dir = os.path.join(source_dir, "model_cache")
target_cache_dir = os.path.join(target_dir, "model_cache") target_cache_dir = os.path.join(target_dir, "model_cache")
if ( if os.path.isdir(source_cache_dir) and os.path.abspath(
os.path.isdir(source_cache_dir) source_cache_dir
and os.path.abspath(source_cache_dir) != os.path.abspath(target_cache_dir) ) != os.path.abspath(target_cache_dir):
):
try: try:
shutil.copytree( shutil.copytree(
source_cache_dir, source_cache_dir,
@@ -1270,10 +1343,9 @@ class SettingsManager:
source_cache_file = os.path.join(source_dir, "model_cache.sqlite") source_cache_file = os.path.join(source_dir, "model_cache.sqlite")
target_cache_file = os.path.join(target_dir, "model_cache.sqlite") target_cache_file = os.path.join(target_dir, "model_cache.sqlite")
if ( if os.path.isfile(source_cache_file) and os.path.abspath(
os.path.isfile(source_cache_file) source_cache_file
and os.path.abspath(source_cache_file) != os.path.abspath(target_cache_file) ) != os.path.abspath(target_cache_file):
):
try: try:
shutil.copy2(source_cache_file, target_cache_file) shutil.copy2(source_cache_file, target_cache_file)
except Exception as exc: except Exception as exc:
@@ -1299,7 +1371,9 @@ class SettingsManager:
try: try:
os.makedirs(config_dir, exist_ok=True) os.makedirs(config_dir, exist_ok=True)
except Exception as exc: except Exception as exc:
logger.warning("Failed to create user config directory %s: %s", config_dir, exc) logger.warning(
"Failed to create user config directory %s: %s", config_dir, exc
)
return config_dir return config_dir
@@ -1359,7 +1433,9 @@ class SettingsManager:
try: try:
asyncio.run(coroutine) asyncio.run(coroutine)
except RuntimeError: except RuntimeError:
logger.debug("Skipping name display update due to missing event loop") logger.debug(
"Skipping name display update due to missing event loop"
)
continue continue
if loop is not None and target_loop is loop: if loop is not None and target_loop is loop:
@@ -1382,7 +1458,7 @@ class SettingsManager:
"""Save settings to file""" """Save settings to file"""
try: try:
payload = self._serialize_settings_for_disk() payload = self._serialize_settings_for_disk()
with open(self.settings_file, 'w', encoding='utf-8') as f: with open(self.settings_file, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2) json.dump(payload, f, indent=2)
except Exception as e: except Exception as e:
logger.error(f"Error saving settings: {e}") logger.error(f"Error saving settings: {e}")
@@ -1423,7 +1499,9 @@ class SettingsManager:
minimal[key] = copy.deepcopy(value) minimal[key] = copy.deepcopy(value)
# Complex objects need deep comparison # Complex objects need deep comparison
elif isinstance(value, (dict, list)) and default_value is not None: elif isinstance(value, (dict, list)) and default_value is not None:
if json.dumps(value, sort_keys=True, default=str) != json.dumps(default_value, sort_keys=True, default=str): if json.dumps(value, sort_keys=True, default=str) != json.dumps(
default_value, sort_keys=True, default=str
):
minimal[key] = copy.deepcopy(value) minimal[key] = copy.deepcopy(value)
# Simple values use direct comparison # Simple values use direct comparison
elif value != default_value: elif value != default_value:
@@ -1500,9 +1578,15 @@ class SettingsManager:
existing = libraries.get(name, {}) existing = libraries.get(name, {})
payload = self._build_library_payload( payload = self._build_library_payload(
folder_paths=folder_paths if folder_paths is not None else existing.get("folder_paths"), folder_paths=folder_paths
extra_folder_paths=extra_folder_paths if extra_folder_paths is not None else existing.get("extra_folder_paths"), if folder_paths is not None
default_lora_root=default_lora_root if default_lora_root is not None else existing.get("default_lora_root"), else existing.get("folder_paths"),
extra_folder_paths=extra_folder_paths
if extra_folder_paths is not None
else existing.get("extra_folder_paths"),
default_lora_root=default_lora_root
if default_lora_root is not None
else existing.get("default_lora_root"),
default_checkpoint_root=( default_checkpoint_root=(
default_checkpoint_root default_checkpoint_root
if default_checkpoint_root is not None if default_checkpoint_root is not None
@@ -1662,7 +1746,9 @@ class SettingsManager:
if service and hasattr(service, "on_library_changed"): if service and hasattr(service, "on_library_changed"):
try: try:
service.on_library_changed() service.on_library_changed()
except Exception as service_exc: # pragma: no cover - defensive logging except (
Exception
) as service_exc: # pragma: no cover - defensive logging
logger.debug( logger.debug(
"Service %s failed to handle library change: %s", "Service %s failed to handle library change: %s",
service_name, service_name,
@@ -1673,15 +1759,15 @@ class SettingsManager:
def get_download_path_template(self, model_type: str) -> str: def get_download_path_template(self, model_type: str) -> str:
"""Get download path template for specific model type """Get download path template for specific model type
Args: Args:
model_type: The type of model ('lora', 'checkpoint', 'embedding') model_type: The type of model ('lora', 'checkpoint', 'embedding')
Returns: Returns:
Template string for the model type, defaults to '{base_model}/{first_tag}' Template string for the model type, defaults to '{base_model}/{first_tag}'
""" """
templates = self.settings.get('download_path_templates', {}) templates = self.settings.get("download_path_templates", {})
# Handle edge case where templates might be stored as JSON string # Handle edge case where templates might be stored as JSON string
if isinstance(templates, str): if isinstance(templates, str):
try: try:
@@ -1689,36 +1775,40 @@ class SettingsManager:
parsed_templates = json.loads(templates) parsed_templates = json.loads(templates)
if isinstance(parsed_templates, dict): if isinstance(parsed_templates, dict):
# Update settings with parsed dictionary # Update settings with parsed dictionary
self.settings['download_path_templates'] = parsed_templates self.settings["download_path_templates"] = parsed_templates
self._save_settings() self._save_settings()
templates = parsed_templates templates = parsed_templates
logger.info("Successfully parsed download_path_templates from JSON string") logger.info(
"Successfully parsed download_path_templates from JSON string"
)
else: else:
raise ValueError("Parsed JSON is not a dictionary") raise ValueError("Parsed JSON is not a dictionary")
except (json.JSONDecodeError, ValueError) as e: except (json.JSONDecodeError, ValueError) as e:
# If parsing fails, set default values # If parsing fails, set default values
logger.warning(f"Failed to parse download_path_templates JSON string: {e}. Setting default values.") logger.warning(
default_template = '{base_model}/{first_tag}' f"Failed to parse download_path_templates JSON string: {e}. Setting default values."
)
default_template = "{base_model}/{first_tag}"
templates = { templates = {
'lora': default_template, "lora": default_template,
'checkpoint': default_template, "checkpoint": default_template,
'embedding': default_template "embedding": default_template,
} }
self.settings['download_path_templates'] = templates self.settings["download_path_templates"] = templates
self._save_settings() self._save_settings()
# Ensure templates is a dictionary # Ensure templates is a dictionary
if not isinstance(templates, dict): if not isinstance(templates, dict):
default_template = '{base_model}/{first_tag}' default_template = "{base_model}/{first_tag}"
templates = { templates = {
'lora': default_template, "lora": default_template,
'checkpoint': default_template, "checkpoint": default_template,
'embedding': default_template "embedding": default_template,
} }
self.settings['download_path_templates'] = templates self.settings["download_path_templates"] = templates
self._save_settings() self._save_settings()
return templates.get(model_type, '{base_model}/{first_tag}') return templates.get(model_type, "{base_model}/{first_tag}")
_SETTINGS_MANAGER: Optional["SettingsManager"] = None _SETTINGS_MANAGER: Optional["SettingsManager"] = None

View File

@@ -1246,10 +1246,7 @@ export class SettingsManager {
throw new Error('No LoRA roots found'); throw new Error('No LoRA roots found');
} }
// Clear existing options except the first one (No Default)
const noDefaultOption = defaultLoraRootSelect.querySelector('option[value=""]');
defaultLoraRootSelect.innerHTML = ''; defaultLoraRootSelect.innerHTML = '';
defaultLoraRootSelect.appendChild(noDefaultOption);
// Add options for each root // Add options for each root
data.roots.forEach(root => { data.roots.forEach(root => {
@@ -1259,9 +1256,8 @@ export class SettingsManager {
defaultLoraRootSelect.appendChild(option); defaultLoraRootSelect.appendChild(option);
}); });
// Set selected value from settings
const defaultRoot = state.global.settings.default_lora_root || ''; const defaultRoot = state.global.settings.default_lora_root || '';
defaultLoraRootSelect.value = defaultRoot; defaultLoraRootSelect.value = data.roots.includes(defaultRoot) ? defaultRoot : data.roots[0];
} catch (error) { } catch (error) {
console.error('Error loading LoRA roots:', error); console.error('Error loading LoRA roots:', error);
@@ -1285,10 +1281,7 @@ export class SettingsManager {
throw new Error('No checkpoint roots found'); throw new Error('No checkpoint roots found');
} }
// Clear existing options except first one (No Default)
const noDefaultOption = defaultCheckpointRootSelect.querySelector('option[value=""]');
defaultCheckpointRootSelect.innerHTML = ''; defaultCheckpointRootSelect.innerHTML = '';
defaultCheckpointRootSelect.appendChild(noDefaultOption);
// Add options for each root // Add options for each root
data.roots.forEach(root => { data.roots.forEach(root => {
@@ -1298,9 +1291,8 @@ export class SettingsManager {
defaultCheckpointRootSelect.appendChild(option); defaultCheckpointRootSelect.appendChild(option);
}); });
// Set selected value from settings
const defaultRoot = state.global.settings.default_checkpoint_root || ''; const defaultRoot = state.global.settings.default_checkpoint_root || '';
defaultCheckpointRootSelect.value = defaultRoot; defaultCheckpointRootSelect.value = data.roots.includes(defaultRoot) ? defaultRoot : data.roots[0];
} catch (error) { } catch (error) {
console.error('Error loading checkpoint roots:', error); console.error('Error loading checkpoint roots:', error);
@@ -1324,10 +1316,7 @@ export class SettingsManager {
throw new Error('No diffusion model roots found'); throw new Error('No diffusion model roots found');
} }
// Clear existing options except first one (No Default)
const noDefaultOption = defaultUnetRootSelect.querySelector('option[value=""]');
defaultUnetRootSelect.innerHTML = ''; defaultUnetRootSelect.innerHTML = '';
defaultUnetRootSelect.appendChild(noDefaultOption);
// Add options for each root // Add options for each root
data.roots.forEach(root => { data.roots.forEach(root => {
@@ -1337,9 +1326,8 @@ export class SettingsManager {
defaultUnetRootSelect.appendChild(option); defaultUnetRootSelect.appendChild(option);
}); });
// Set selected value from settings
const defaultRoot = state.global.settings.default_unet_root || ''; const defaultRoot = state.global.settings.default_unet_root || '';
defaultUnetRootSelect.value = defaultRoot; defaultUnetRootSelect.value = data.roots.includes(defaultRoot) ? defaultRoot : data.roots[0];
} catch (error) { } catch (error) {
console.error('Error loading diffusion model roots:', error); console.error('Error loading diffusion model roots:', error);
@@ -1363,10 +1351,7 @@ export class SettingsManager {
throw new Error('No embedding roots found'); throw new Error('No embedding roots found');
} }
// Clear existing options except first one (No Default)
const noDefaultOption = defaultEmbeddingRootSelect.querySelector('option[value=""]');
defaultEmbeddingRootSelect.innerHTML = ''; defaultEmbeddingRootSelect.innerHTML = '';
defaultEmbeddingRootSelect.appendChild(noDefaultOption);
// Add options for each root // Add options for each root
data.roots.forEach(root => { data.roots.forEach(root => {
@@ -1376,9 +1361,8 @@ export class SettingsManager {
defaultEmbeddingRootSelect.appendChild(option); defaultEmbeddingRootSelect.appendChild(option);
}); });
// Set selected value from settings
const defaultRoot = state.global.settings.default_embedding_root || ''; const defaultRoot = state.global.settings.default_embedding_root || '';
defaultEmbeddingRootSelect.value = defaultRoot; defaultEmbeddingRootSelect.value = data.roots.includes(defaultRoot) ? defaultRoot : data.roots[0];
} catch (error) { } catch (error) {
console.error('Error loading embedding roots:', error); console.error('Error loading embedding roots:', error);

View File

@@ -484,9 +484,7 @@
</label> </label>
</div> </div>
<div class="setting-control select-control"> <div class="setting-control select-control">
<select id="defaultLoraRoot" onchange="settingsManager.saveSelectSetting('defaultLoraRoot', 'default_lora_root')"> <select id="defaultLoraRoot" onchange="settingsManager.saveSelectSetting('defaultLoraRoot', 'default_lora_root')"></select>
<option value="">{{ t('settings.folderSettings.noDefault') }}</option>
</select>
</div> </div>
</div> </div>
</div> </div>
@@ -500,9 +498,7 @@
</label> </label>
</div> </div>
<div class="setting-control select-control"> <div class="setting-control select-control">
<select id="defaultCheckpointRoot" onchange="settingsManager.saveSelectSetting('defaultCheckpointRoot', 'default_checkpoint_root')"> <select id="defaultCheckpointRoot" onchange="settingsManager.saveSelectSetting('defaultCheckpointRoot', 'default_checkpoint_root')"></select>
<option value="">{{ t('settings.folderSettings.noDefault') }}</option>
</select>
</div> </div>
</div> </div>
</div> </div>
@@ -516,9 +512,7 @@
</label> </label>
</div> </div>
<div class="setting-control select-control"> <div class="setting-control select-control">
<select id="defaultUnetRoot" onchange="settingsManager.saveSelectSetting('defaultUnetRoot', 'default_unet_root')"> <select id="defaultUnetRoot" onchange="settingsManager.saveSelectSetting('defaultUnetRoot', 'default_unet_root')"></select>
<option value="">{{ t('settings.folderSettings.noDefault') }}</option>
</select>
</div> </div>
</div> </div>
</div> </div>
@@ -532,9 +526,7 @@
</label> </label>
</div> </div>
<div class="setting-control select-control"> <div class="setting-control select-control">
<select id="defaultEmbeddingRoot" onchange="settingsManager.saveSelectSetting('defaultEmbeddingRoot', 'default_embedding_root')"> <select id="defaultEmbeddingRoot" onchange="settingsManager.saveSelectSetting('defaultEmbeddingRoot', 'default_embedding_root')"></select>
<option value="">{{ t('settings.folderSettings.noDefault') }}</option>
</select>
</div> </div>
</div> </div>
</div> </div>

View File

@@ -131,6 +131,102 @@ def test_save_paths_logs_warning_when_upsert_fails(
assert "Failed to save folder paths: boom" in caplog.text assert "Failed to save folder paths: boom" in caplog.text
def test_save_paths_repairs_empty_default_roots(monkeypatch: pytest.MonkeyPatch, tmp_path):
folder_paths = _setup_config_environment(monkeypatch, tmp_path)
class FakeSettingsService:
def get_libraries(self):
return {
"comfyui": {
"folder_paths": {key: list(value) for key, value in folder_paths.items()},
"default_lora_root": "",
"default_checkpoint_root": "",
"default_embedding_root": "",
}
}
def rename_library(self, *_):
raise AssertionError("rename_library should not be invoked")
def upsert_library(self, name: str, **payload):
self.name = name
self.payload = payload
fake_settings = FakeSettingsService()
monkeypatch.setattr(settings_manager_module, "settings", fake_settings)
config_module.Config()
assert fake_settings.name == "comfyui"
assert fake_settings.payload["default_lora_root"] == folder_paths["loras"][0].replace("\\", "/")
assert fake_settings.payload["default_checkpoint_root"] == folder_paths["checkpoints"][0].replace("\\", "/")
assert fake_settings.payload["default_embedding_root"] == folder_paths["embeddings"][0].replace("\\", "/")
def test_save_paths_repairs_stale_default_roots(monkeypatch: pytest.MonkeyPatch, tmp_path):
folder_paths = _setup_config_environment(monkeypatch, tmp_path)
class FakeSettingsService:
def get_libraries(self):
return {
"comfyui": {
"folder_paths": {key: list(value) for key, value in folder_paths.items()},
"default_lora_root": "/stale/loras",
"default_checkpoint_root": "/stale/checkpoints",
"default_embedding_root": "/stale/embeddings",
}
}
def rename_library(self, *_):
raise AssertionError("rename_library should not be invoked")
def upsert_library(self, name: str, **payload):
self.name = name
self.payload = payload
fake_settings = FakeSettingsService()
monkeypatch.setattr(settings_manager_module, "settings", fake_settings)
config_module.Config()
assert fake_settings.name == "comfyui"
assert fake_settings.payload["default_lora_root"] == folder_paths["loras"][0].replace("\\", "/")
assert fake_settings.payload["default_checkpoint_root"] == folder_paths["checkpoints"][0].replace("\\", "/")
assert fake_settings.payload["default_embedding_root"] == folder_paths["embeddings"][0].replace("\\", "/")
def test_save_paths_keeps_valid_default_roots(monkeypatch: pytest.MonkeyPatch, tmp_path):
folder_paths = _setup_config_environment(monkeypatch, tmp_path)
class FakeSettingsService:
def get_libraries(self):
return {
"comfyui": {
"folder_paths": {key: list(value) for key, value in folder_paths.items()},
"default_lora_root": folder_paths["loras"][0],
"default_checkpoint_root": folder_paths["checkpoints"][0],
"default_embedding_root": folder_paths["embeddings"][0],
}
}
def rename_library(self, *_):
raise AssertionError("rename_library should not be invoked")
def upsert_library(self, name: str, **payload):
self.name = name
self.payload = payload
fake_settings = FakeSettingsService()
monkeypatch.setattr(settings_manager_module, "settings", fake_settings)
config_module.Config()
assert fake_settings.name == "comfyui"
assert fake_settings.payload["default_lora_root"] == folder_paths["loras"][0].replace("\\", "/")
assert fake_settings.payload["default_checkpoint_root"] == folder_paths["checkpoints"][0].replace("\\", "/")
assert fake_settings.payload["default_embedding_root"] == folder_paths["embeddings"][0].replace("\\", "/")
def test_save_paths_removes_template_default_library(monkeypatch, tmp_path): def test_save_paths_removes_template_default_library(monkeypatch, tmp_path):
folder_paths = _setup_config_environment(monkeypatch, tmp_path) folder_paths = _setup_config_environment(monkeypatch, tmp_path)

View File

@@ -17,7 +17,9 @@ def test_portable_settings_use_project_root(tmp_path, monkeypatch):
from importlib import reload from importlib import reload
settings_paths_module = reload(settings_paths) settings_paths_module = reload(settings_paths)
monkeypatch.setattr(settings_paths_module, "get_project_root", lambda: str(tmp_path)) monkeypatch.setattr(
settings_paths_module, "get_project_root", lambda: str(tmp_path)
)
monkeypatch.setattr( monkeypatch.setattr(
settings_paths_module, settings_paths_module,
"user_config_dir", "user_config_dir",
@@ -25,7 +27,9 @@ def test_portable_settings_use_project_root(tmp_path, monkeypatch):
) )
portable_settings = {"use_portable_settings": True} portable_settings = {"use_portable_settings": True}
(tmp_path / "settings.json").write_text(json.dumps(portable_settings), encoding="utf-8") (tmp_path / "settings.json").write_text(
json.dumps(portable_settings), encoding="utf-8"
)
config_dir = settings_paths_module.get_settings_dir(create=True) config_dir = settings_paths_module.get_settings_dir(create=True)
assert config_dir == str(tmp_path) assert config_dir == str(tmp_path)
@@ -74,7 +78,9 @@ def test_initial_save_persists_minimal_template(tmp_path, monkeypatch):
self._seed_template = copy.deepcopy(template) self._seed_template = copy.deepcopy(template)
return copy.deepcopy(template) return copy.deepcopy(template)
monkeypatch.setattr(SettingsManager, "_load_settings_template", fake_template_loader) monkeypatch.setattr(
SettingsManager, "_load_settings_template", fake_template_loader
)
manager = SettingsManager() manager = SettingsManager()
@@ -118,7 +124,10 @@ def test_existing_folder_paths_seed_default_library(tmp_path, monkeypatch):
assert "default" in libraries assert "default" in libraries
assert libraries["default"]["folder_paths"]["loras"] == [str(lora_dir)] assert libraries["default"]["folder_paths"]["loras"] == [str(lora_dir)]
assert libraries["default"]["folder_paths"]["checkpoints"] == [str(checkpoint_dir)] assert libraries["default"]["folder_paths"]["checkpoints"] == [str(checkpoint_dir)]
assert libraries["default"]["folder_paths"]["unet"] == [str(diffusion_dir), str(unet_dir)] assert libraries["default"]["folder_paths"]["unet"] == [
str(diffusion_dir),
str(unet_dir),
]
assert libraries["default"]["folder_paths"]["embeddings"] == [str(embedding_dir)] assert libraries["default"]["folder_paths"]["embeddings"] == [str(embedding_dir)]
assert manager.get_startup_messages() == [] assert manager.get_startup_messages() == []
@@ -138,7 +147,9 @@ def test_environment_variable_overrides_settings(tmp_path, monkeypatch):
assert mgr.get("civitai_api_key") == "secret" assert mgr.get("civitai_api_key") == "secret"
def _create_manager_with_settings(tmp_path, monkeypatch, initial_settings, *, save_spy=None): def _create_manager_with_settings(
tmp_path, monkeypatch, initial_settings, *, save_spy=None
):
"""Helper to instantiate SettingsManager with predefined settings.""" """Helper to instantiate SettingsManager with predefined settings."""
fake_settings_path = tmp_path / "settings.json" fake_settings_path = tmp_path / "settings.json"
@@ -203,7 +214,9 @@ def test_switch_to_portable_mode_copies_cache(tmp_path, monkeypatch):
assert manager.settings_file == str(project_root / "settings.json") assert manager.settings_file == str(project_root / "settings.json")
marker_copy = project_root / "model_cache" / "user_marker.txt" marker_copy = project_root / "model_cache" / "user_marker.txt"
assert marker_copy.read_text(encoding="utf-8") == "user_marker.txt" assert marker_copy.read_text(encoding="utf-8") == "user_marker.txt"
assert (project_root / "model_cache.sqlite").read_text(encoding="utf-8") == "user_db" assert (project_root / "model_cache.sqlite").read_text(
encoding="utf-8"
) == "user_db"
assert user_settings.exists() assert user_settings.exists()
@@ -216,13 +229,17 @@ def test_switching_back_to_user_config_moves_cache(tmp_path, monkeypatch):
project_cache_dir = project_root / "model_cache" project_cache_dir = project_root / "model_cache"
project_cache_dir.mkdir(exist_ok=True) project_cache_dir.mkdir(exist_ok=True)
(project_cache_dir / "project_marker.txt").write_text("project_marker", encoding="utf-8") (project_cache_dir / "project_marker.txt").write_text(
"project_marker", encoding="utf-8"
)
(project_root / "model_cache.sqlite").write_text("project_db", encoding="utf-8") (project_root / "model_cache.sqlite").write_text("project_db", encoding="utf-8")
manager.set("use_portable_settings", False) manager.set("use_portable_settings", False)
assert manager.settings_file == str(user_settings) assert manager.settings_file == str(user_settings)
assert (user_dir / "model_cache" / "project_marker.txt").read_text(encoding="utf-8") == "project_marker" assert (user_dir / "model_cache" / "project_marker.txt").read_text(
encoding="utf-8"
) == "project_marker"
assert (user_dir / "model_cache.sqlite").read_text(encoding="utf-8") == "project_db" assert (user_dir / "model_cache.sqlite").read_text(encoding="utf-8") == "project_db"
@@ -242,10 +259,19 @@ def test_download_path_template_invalid_json(manager):
template = manager.get_download_path_template("checkpoint") template = manager.get_download_path_template("checkpoint")
assert template == "{base_model}/{first_tag}" assert template == "{base_model}/{first_tag}"
assert manager.settings["download_path_templates"]["lora"] == "{base_model}/{first_tag}" assert (
manager.settings["download_path_templates"]["lora"]
== "{base_model}/{first_tag}"
)
def test_auto_set_default_roots(manager): def test_auto_set_default_roots(manager):
# Clear any previously auto-set values to test fresh behavior
manager.settings["default_lora_root"] = ""
manager.settings["default_checkpoint_root"] = ""
manager.settings["default_embedding_root"] = ""
manager.settings["default_unet_root"] = ""
manager.settings["folder_paths"] = { manager.settings["folder_paths"] = {
"loras": ["/loras"], "loras": ["/loras"],
"checkpoints": ["/checkpoints"], "checkpoints": ["/checkpoints"],
@@ -259,6 +285,48 @@ def test_auto_set_default_roots(manager):
assert manager.get("default_embedding_root") == "/embeddings" assert manager.get("default_embedding_root") == "/embeddings"
def test_auto_set_default_roots_repairs_stale_values(manager):
manager.settings["default_lora_root"] = "/stale-lora"
manager.settings["default_checkpoint_root"] = "/stale-checkpoint"
manager.settings["default_embedding_root"] = "/stale-embedding"
manager.settings["default_unet_root"] = "/stale-unet"
manager.settings["folder_paths"] = {
"loras": ["/loras"],
"checkpoints": ["/checkpoints"],
"unet": ["/unet"],
"embeddings": ["/embeddings"],
}
manager._auto_set_default_roots()
assert manager.get("default_lora_root") == "/loras"
assert manager.get("default_checkpoint_root") == "/checkpoints"
assert manager.get("default_unet_root") == "/unet"
assert manager.get("default_embedding_root") == "/embeddings"
def test_auto_set_default_roots_keeps_valid_values(manager):
manager.settings["default_lora_root"] = "/loras"
manager.settings["default_checkpoint_root"] = "/checkpoints"
manager.settings["default_embedding_root"] = "/embeddings"
manager.settings["default_unet_root"] = "/unet"
manager.settings["folder_paths"] = {
"loras": ["/loras", "/other-loras"],
"checkpoints": ["/checkpoints"],
"unet": ["/unet", "/other-unet"],
"embeddings": ["/embeddings"],
}
manager._auto_set_default_roots()
assert manager.get("default_lora_root") == "/loras"
assert manager.get("default_checkpoint_root") == "/checkpoints"
assert manager.get("default_unet_root") == "/unet"
assert manager.get("default_embedding_root") == "/embeddings"
def test_delete_setting(manager): def test_delete_setting(manager):
manager.set("example", 1) manager.set("example", 1)
manager.delete("example") manager.delete("example")
@@ -293,7 +361,14 @@ def test_invalid_mature_blur_level_is_normalized_to_r(tmp_path, monkeypatch):
def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch): def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch):
initial = { initial = {
"libraries": {"default": {"folder_paths": {}, "default_lora_root": "", "default_checkpoint_root": "", "default_embedding_root": ""}}, "libraries": {
"default": {
"folder_paths": {},
"default_lora_root": "",
"default_checkpoint_root": "",
"default_embedding_root": "",
}
},
"active_library": "default", "active_library": "default",
"model_name_display": "model_name", "model_name_display": "model_name",
} }
@@ -315,6 +390,7 @@ def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch):
dispatched_loops = [] dispatched_loops = []
futures = [] futures = []
def tracking_run_coroutine_threadsafe(coro, target_loop): def tracking_run_coroutine_threadsafe(coro, target_loop):
dispatched_loops.append(target_loop) dispatched_loops.append(target_loop)
future = Future() future = Future()
@@ -335,7 +411,9 @@ def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch):
"get_service_sync", "get_service_sync",
classmethod(fake_get_service_sync), classmethod(fake_get_service_sync),
) )
monkeypatch.setattr(asyncio, "run_coroutine_threadsafe", tracking_run_coroutine_threadsafe) monkeypatch.setattr(
asyncio, "run_coroutine_threadsafe", tracking_run_coroutine_threadsafe
)
try: try:
manager.set("model_name_display", "file_name") manager.set("model_name_display", "file_name")
@@ -354,12 +432,14 @@ def test_migrates_legacy_settings_file(tmp_path, monkeypatch):
legacy_root = tmp_path / "legacy" legacy_root = tmp_path / "legacy"
legacy_root.mkdir() legacy_root.mkdir()
legacy_file = legacy_root / "settings.json" legacy_file = legacy_root / "settings.json"
legacy_file.write_text("{\"value\": 1}", encoding="utf-8") legacy_file.write_text('{"value": 1}', encoding="utf-8")
target_dir = tmp_path / "config" target_dir = tmp_path / "config"
monkeypatch.setattr(settings_paths, "get_project_root", lambda: str(legacy_root)) monkeypatch.setattr(settings_paths, "get_project_root", lambda: str(legacy_root))
monkeypatch.setattr(settings_paths, "user_config_dir", lambda *_, **__: str(target_dir)) monkeypatch.setattr(
settings_paths, "user_config_dir", lambda *_, **__: str(target_dir)
)
migrated_path = settings_paths.ensure_settings_file() migrated_path = settings_paths.ensure_settings_file()
@@ -380,7 +460,9 @@ def test_uses_portable_settings_file_when_enabled(tmp_path, monkeypatch):
user_dir = tmp_path / "user" user_dir = tmp_path / "user"
monkeypatch.setattr(settings_paths, "get_project_root", lambda: str(repo_root)) monkeypatch.setattr(settings_paths, "get_project_root", lambda: str(repo_root))
monkeypatch.setattr(settings_paths, "user_config_dir", lambda *_, **__: str(user_dir)) monkeypatch.setattr(
settings_paths, "user_config_dir", lambda *_, **__: str(user_dir)
)
resolved = settings_paths.ensure_settings_file() resolved = settings_paths.ensure_settings_file()
@@ -393,7 +475,9 @@ def test_migrate_creates_default_library(manager):
libraries = manager.get_libraries() libraries = manager.get_libraries()
assert "default" in libraries assert "default" in libraries
assert manager.get_active_library_name() == "default" assert manager.get_active_library_name() == "default"
assert libraries["default"].get("folder_paths", {}) == manager.settings.get("folder_paths", {}) assert libraries["default"].get("folder_paths", {}) == manager.settings.get(
"folder_paths", {}
)
def test_migrate_sanitizes_legacy_libraries(tmp_path, monkeypatch): def test_migrate_sanitizes_legacy_libraries(tmp_path, monkeypatch):
@@ -464,12 +548,21 @@ def test_refresh_environment_variables_updates_stored_value(tmp_path, monkeypatc
initial = { initial = {
"civitai_api_key": "stale", "civitai_api_key": "stale",
"libraries": {"default": {"folder_paths": {}, "default_lora_root": "", "default_checkpoint_root": "", "default_embedding_root": ""}}, "libraries": {
"default": {
"folder_paths": {},
"default_lora_root": "",
"default_checkpoint_root": "",
"default_embedding_root": "",
}
},
"active_library": "default", "active_library": "default",
} }
monkeypatch.setenv("CIVITAI_API_KEY", "from-init") monkeypatch.setenv("CIVITAI_API_KEY", "from-init")
manager = _create_manager_with_settings(tmp_path, monkeypatch, initial, save_spy=save_spy) manager = _create_manager_with_settings(
tmp_path, monkeypatch, initial, save_spy=save_spy
)
assert calls[-1] == "from-init" assert calls[-1] == "from-init"
@@ -590,7 +683,9 @@ def test_extra_paths_validation_no_overlap_with_other_libraries(manager, tmp_pat
manager.update_extra_folder_paths({"loras": [str(lora_dir1)]}) manager.update_extra_folder_paths({"loras": [str(lora_dir1)]})
def test_extra_paths_validation_no_overlap_with_active_primary_lora_root(manager, tmp_path): def test_extra_paths_validation_no_overlap_with_active_primary_lora_root(
manager, tmp_path
):
"""Test that extra LoRA paths cannot overlap the active library primary LoRA roots.""" """Test that extra LoRA paths cannot overlap the active library primary LoRA roots."""
real_lora_dir = tmp_path / "loras_real" real_lora_dir = tmp_path / "loras_real"
real_lora_dir.mkdir() real_lora_dir.mkdir()
@@ -603,7 +698,9 @@ def test_extra_paths_validation_no_overlap_with_active_primary_lora_root(manager
activate=True, activate=True,
) )
with pytest.raises(ValueError, match="overlap with the active library's primary LoRA roots"): with pytest.raises(
ValueError, match="overlap with the active library's primary LoRA roots"
):
manager.update_extra_folder_paths({"loras": [str(real_lora_dir)]}) manager.update_extra_folder_paths({"loras": [str(real_lora_dir)]})
@@ -627,7 +724,10 @@ def test_extra_paths_validation_no_overlap_with_active_primary_lora_root_case_in
original_normcase = settings_manager_module.os.path.normcase original_normcase = settings_manager_module.os.path.normcase
def fake_exists(path): def fake_exists(path):
if isinstance(path, str) and path.lower() in {str(lora_link).lower(), str(real_lora_dir).lower()}: if isinstance(path, str) and path.lower() in {
str(lora_link).lower(),
str(real_lora_dir).lower(),
}:
return True return True
return original_exists(path) return original_exists(path)
@@ -638,13 +738,21 @@ def test_extra_paths_validation_no_overlap_with_active_primary_lora_root_case_in
monkeypatch.setattr(settings_manager_module.os.path, "exists", fake_exists) monkeypatch.setattr(settings_manager_module.os.path, "exists", fake_exists)
monkeypatch.setattr(settings_manager_module.os.path, "realpath", fake_realpath) monkeypatch.setattr(settings_manager_module.os.path, "realpath", fake_realpath)
monkeypatch.setattr(settings_manager_module.os.path, "normcase", lambda value: original_normcase(value).lower()) monkeypatch.setattr(
settings_manager_module.os.path,
"normcase",
lambda value: original_normcase(value).lower(),
)
with pytest.raises(ValueError, match="overlap with the active library's primary LoRA roots"): with pytest.raises(
ValueError, match="overlap with the active library's primary LoRA roots"
):
manager.update_extra_folder_paths({"loras": [str(real_lora_dir).upper()]}) manager.update_extra_folder_paths({"loras": [str(real_lora_dir).upper()]})
def test_extra_paths_validation_allows_missing_non_overlapping_lora_root(manager, tmp_path): def test_extra_paths_validation_allows_missing_non_overlapping_lora_root(
manager, tmp_path
):
"""Missing non-overlapping extra LoRA paths should not be rejected.""" """Missing non-overlapping extra LoRA paths should not be rejected."""
lora_dir = tmp_path / "loras" lora_dir = tmp_path / "loras"
lora_dir.mkdir() lora_dir.mkdir()
@@ -662,7 +770,9 @@ def test_extra_paths_validation_allows_missing_non_overlapping_lora_root(manager
assert extra_paths["loras"] == [str(missing_extra)] assert extra_paths["loras"] == [str(missing_extra)]
def test_extra_paths_validation_rejects_primary_root_first_level_symlink_target(manager, tmp_path): def test_extra_paths_validation_rejects_primary_root_first_level_symlink_target(
manager, tmp_path
):
"""Extra LoRA paths should be rejected when already reachable via a first-level symlink under the primary root.""" """Extra LoRA paths should be rejected when already reachable via a first-level symlink under the primary root."""
lora_dir = tmp_path / "loras" lora_dir = tmp_path / "loras"
lora_dir.mkdir() lora_dir.mkdir()
@@ -677,7 +787,9 @@ def test_extra_paths_validation_rejects_primary_root_first_level_symlink_target(
activate=True, activate=True,
) )
with pytest.raises(ValueError, match="overlap with the active library's primary LoRA roots"): with pytest.raises(
ValueError, match="overlap with the active library's primary LoRA roots"
):
manager.update_extra_folder_paths({"loras": [str(external_dir)]}) manager.update_extra_folder_paths({"loras": [str(external_dir)]})
@@ -698,7 +810,6 @@ def test_delete_library_switches_active(manager, tmp_path):
assert manager.get_active_library_name() == "default" assert manager.get_active_library_name() == "default"
def test_download_skip_base_models_are_normalized(manager): def test_download_skip_base_models_are_normalized(manager):
manager.settings["download_skip_base_models"] = [ manager.settings["download_skip_base_models"] = [
"SDXL 1.0", "SDXL 1.0",
@@ -715,9 +826,6 @@ def test_download_skip_base_models_are_normalized(manager):
def test_setting_download_skip_base_models_normalizes_string_input(manager): def test_setting_download_skip_base_models_normalizes_string_input(manager):
manager.set( manager.set("download_skip_base_models", "SDXL 1.0, Pony; Invalid\nSDXL 1.0")
"download_skip_base_models",
"SDXL 1.0, Pony; Invalid\nSDXL 1.0"
)
assert manager.get("download_skip_base_models") == ["SDXL 1.0", "Pony"] assert manager.get("download_skip_base_models") == ["SDXL 1.0", "Pony"]