mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-05-06 16:36:45 -03:00
Compare commits
2 Commits
14cb7fec47
...
9bdb337962
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9bdb337962 | ||
|
|
f93baf5fc0 |
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
61
py/config.py
61
py/config.py
@@ -25,6 +25,31 @@ standalone_mode = (
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _resolve_valid_default_root(
|
||||
current: str, primary_paths: List[str], name: str
|
||||
) -> str:
|
||||
"""Return a valid default root from the current primary path set."""
|
||||
|
||||
valid_paths = [path for path in primary_paths if isinstance(path, str) and path.strip()]
|
||||
if not valid_paths:
|
||||
return ""
|
||||
|
||||
if current in valid_paths:
|
||||
return current
|
||||
|
||||
if current:
|
||||
logger.info(
|
||||
"Repaired stale %s from '%s' to '%s'",
|
||||
name,
|
||||
current,
|
||||
valid_paths[0],
|
||||
)
|
||||
else:
|
||||
logger.info("Auto-setting %s to '%s'", name, valid_paths[0])
|
||||
|
||||
return valid_paths[0]
|
||||
|
||||
|
||||
def _normalize_folder_paths_for_comparison(
|
||||
folder_paths: Mapping[str, Iterable[str]],
|
||||
) -> Dict[str, Set[str]]:
|
||||
@@ -197,25 +222,23 @@ class Config:
|
||||
"Failed to rename legacy 'default' library: %s", rename_error
|
||||
)
|
||||
|
||||
default_lora_root = comfy_library.get("default_lora_root", "")
|
||||
if not default_lora_root and len(self.loras_roots) == 1:
|
||||
default_lora_root = self.loras_roots[0]
|
||||
default_lora_root = _resolve_valid_default_root(
|
||||
comfy_library.get("default_lora_root", ""),
|
||||
list(self.loras_roots or []),
|
||||
"default_lora_root",
|
||||
)
|
||||
|
||||
default_checkpoint_root = comfy_library.get("default_checkpoint_root", "")
|
||||
if (
|
||||
not default_checkpoint_root
|
||||
and self.checkpoints_roots
|
||||
and len(self.checkpoints_roots) == 1
|
||||
):
|
||||
default_checkpoint_root = self.checkpoints_roots[0]
|
||||
default_checkpoint_root = _resolve_valid_default_root(
|
||||
comfy_library.get("default_checkpoint_root", ""),
|
||||
list(self.checkpoints_roots or []),
|
||||
"default_checkpoint_root",
|
||||
)
|
||||
|
||||
default_embedding_root = comfy_library.get("default_embedding_root", "")
|
||||
if (
|
||||
not default_embedding_root
|
||||
and self.embeddings_roots
|
||||
and len(self.embeddings_roots) == 1
|
||||
):
|
||||
default_embedding_root = self.embeddings_roots[0]
|
||||
default_embedding_root = _resolve_valid_default_root(
|
||||
comfy_library.get("default_embedding_root", ""),
|
||||
list(self.embeddings_roots or []),
|
||||
"default_embedding_root",
|
||||
)
|
||||
|
||||
metadata = dict(comfy_library.get("metadata", {}))
|
||||
metadata.setdefault("display_name", "ComfyUI")
|
||||
@@ -706,7 +729,9 @@ class Config:
|
||||
return unique_paths
|
||||
|
||||
@staticmethod
|
||||
def _normalize_path_for_comparison(path: str, *, resolve_realpath: bool = False) -> str:
|
||||
def _normalize_path_for_comparison(
|
||||
path: str, *, resolve_realpath: bool = False
|
||||
) -> str:
|
||||
"""Normalize a path for equality checks across platforms."""
|
||||
candidate = os.path.realpath(path) if resolve_realpath else path
|
||||
return os.path.normcase(os.path.normpath(candidate)).replace(os.sep, "/")
|
||||
|
||||
@@ -7,7 +7,17 @@ import logging
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
from threading import Lock
|
||||
from typing import Any, Awaitable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from platformdirs import user_config_dir
|
||||
|
||||
@@ -17,7 +27,11 @@ from ..utils.constants import (
|
||||
SUPPORTED_DOWNLOAD_SKIP_BASE_MODELS,
|
||||
)
|
||||
from ..utils.preview_selection import VALID_MATURE_BLUR_LEVELS
|
||||
from ..utils.settings_paths import APP_NAME, ensure_settings_file, get_legacy_settings_path
|
||||
from ..utils.settings_paths import (
|
||||
APP_NAME,
|
||||
ensure_settings_file,
|
||||
get_legacy_settings_path,
|
||||
)
|
||||
from ..utils.tag_priorities import (
|
||||
PriorityTagEntry,
|
||||
collect_canonical_tags,
|
||||
@@ -94,7 +108,9 @@ class SettingsManager:
|
||||
self._template_payload_cache_loaded = False
|
||||
self._original_disk_payload: Optional[Dict[str, Any]] = None
|
||||
self._preserve_disk_template = False
|
||||
self._template_path = Path(__file__).resolve().parents[2] / "settings.json.example"
|
||||
self._template_path = (
|
||||
Path(__file__).resolve().parents[2] / "settings.json.example"
|
||||
)
|
||||
self.settings = self._load_settings()
|
||||
self._migrate_setting_keys()
|
||||
self._ensure_default_settings()
|
||||
@@ -120,7 +136,7 @@ class SettingsManager:
|
||||
"""Load settings from file"""
|
||||
if os.path.exists(self.settings_file):
|
||||
try:
|
||||
with open(self.settings_file, 'r', encoding='utf-8') as f:
|
||||
with open(self.settings_file, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
if isinstance(data, dict):
|
||||
self._original_disk_payload = copy.deepcopy(data)
|
||||
@@ -198,7 +214,9 @@ class SettingsManager:
|
||||
return None
|
||||
|
||||
if not isinstance(data, dict):
|
||||
logger.debug("settings.json.example is not a JSON object; ignoring template")
|
||||
logger.debug(
|
||||
"settings.json.example is not a JSON object; ignoring template"
|
||||
)
|
||||
return None
|
||||
|
||||
self._template_payload_cache = copy.deepcopy(data)
|
||||
@@ -274,7 +292,9 @@ class SettingsManager:
|
||||
normalized_skip_paths = self.normalize_metadata_refresh_skip_paths(
|
||||
self.settings.get("metadata_refresh_skip_paths")
|
||||
)
|
||||
if normalized_skip_paths != self.settings.get("metadata_refresh_skip_paths"):
|
||||
if normalized_skip_paths != self.settings.get(
|
||||
"metadata_refresh_skip_paths"
|
||||
):
|
||||
self.settings["metadata_refresh_skip_paths"] = normalized_skip_paths
|
||||
updated_existing = True
|
||||
else:
|
||||
@@ -288,9 +308,7 @@ class SettingsManager:
|
||||
if normalized_skip_base_models != self.settings.get(
|
||||
"download_skip_base_models"
|
||||
):
|
||||
self.settings["download_skip_base_models"] = (
|
||||
normalized_skip_base_models
|
||||
)
|
||||
self.settings["download_skip_base_models"] = normalized_skip_base_models
|
||||
updated_existing = True
|
||||
else:
|
||||
self.settings["download_skip_base_models"] = []
|
||||
@@ -330,19 +348,19 @@ class SettingsManager:
|
||||
raw_top_level_paths = self.settings.get("folder_paths", {})
|
||||
normalized_top_level_paths: Dict[str, List[str]] = {}
|
||||
if isinstance(raw_top_level_paths, Mapping):
|
||||
normalized_top_level_paths = self._normalize_folder_paths(raw_top_level_paths)
|
||||
normalized_top_level_paths = self._normalize_folder_paths(
|
||||
raw_top_level_paths
|
||||
)
|
||||
if normalized_top_level_paths != raw_top_level_paths:
|
||||
self.settings["folder_paths"] = copy.deepcopy(normalized_top_level_paths)
|
||||
self.settings["folder_paths"] = copy.deepcopy(
|
||||
normalized_top_level_paths
|
||||
)
|
||||
|
||||
top_level_has_paths = self._has_configured_paths(normalized_top_level_paths)
|
||||
|
||||
needs_library_bootstrap = not isinstance(libraries, dict) or not libraries
|
||||
|
||||
if (
|
||||
not needs_library_bootstrap
|
||||
and top_level_has_paths
|
||||
and len(libraries) == 1
|
||||
):
|
||||
if not needs_library_bootstrap and top_level_has_paths and len(libraries) == 1:
|
||||
only_library_payload = next(iter(libraries.values()))
|
||||
if isinstance(only_library_payload, Mapping):
|
||||
folder_payload = only_library_payload.get("folder_paths")
|
||||
@@ -354,7 +372,9 @@ class SettingsManager:
|
||||
library_payload = self._build_library_payload(
|
||||
folder_paths=normalized_top_level_paths,
|
||||
default_lora_root=self.settings.get("default_lora_root", ""),
|
||||
default_checkpoint_root=self.settings.get("default_checkpoint_root", ""),
|
||||
default_checkpoint_root=self.settings.get(
|
||||
"default_checkpoint_root", ""
|
||||
),
|
||||
default_unet_root=self.settings.get("default_unet_root", ""),
|
||||
default_embedding_root=self.settings.get("default_embedding_root", ""),
|
||||
)
|
||||
@@ -376,7 +396,11 @@ class SettingsManager:
|
||||
|
||||
if target_name:
|
||||
candidate_payload = libraries.get(target_name)
|
||||
if isinstance(candidate_payload, Mapping) and not self._has_configured_paths(candidate_payload.get("folder_paths")):
|
||||
if isinstance(
|
||||
candidate_payload, Mapping
|
||||
) and not self._has_configured_paths(
|
||||
candidate_payload.get("folder_paths")
|
||||
):
|
||||
seed_library_name = target_name
|
||||
|
||||
sanitized_libraries: Dict[str, Dict[str, Any]] = {}
|
||||
@@ -435,11 +459,17 @@ class SettingsManager:
|
||||
active_library = libraries.get(active_name, {})
|
||||
folder_paths = copy.deepcopy(active_library.get("folder_paths", {}))
|
||||
self.settings["folder_paths"] = folder_paths
|
||||
self.settings["extra_folder_paths"] = copy.deepcopy(active_library.get("extra_folder_paths", {}))
|
||||
self.settings["extra_folder_paths"] = copy.deepcopy(
|
||||
active_library.get("extra_folder_paths", {})
|
||||
)
|
||||
self.settings["default_lora_root"] = active_library.get("default_lora_root", "")
|
||||
self.settings["default_checkpoint_root"] = active_library.get("default_checkpoint_root", "")
|
||||
self.settings["default_checkpoint_root"] = active_library.get(
|
||||
"default_checkpoint_root", ""
|
||||
)
|
||||
self.settings["default_unet_root"] = active_library.get("default_unet_root", "")
|
||||
self.settings["default_embedding_root"] = active_library.get("default_embedding_root", "")
|
||||
self.settings["default_embedding_root"] = active_library.get(
|
||||
"default_embedding_root", ""
|
||||
)
|
||||
|
||||
if save:
|
||||
self._save_settings()
|
||||
@@ -468,7 +498,9 @@ class SettingsManager:
|
||||
payload.setdefault("folder_paths", {})
|
||||
|
||||
if extra_folder_paths is not None:
|
||||
payload["extra_folder_paths"] = self._normalize_folder_paths(extra_folder_paths)
|
||||
payload["extra_folder_paths"] = self._normalize_folder_paths(
|
||||
extra_folder_paths
|
||||
)
|
||||
else:
|
||||
payload.setdefault("extra_folder_paths", {})
|
||||
|
||||
@@ -577,7 +609,9 @@ class SettingsManager:
|
||||
}
|
||||
overlap = existing.intersection(new_paths.keys())
|
||||
if overlap:
|
||||
collisions = ", ".join(sorted(new_paths[value] for value in overlap))
|
||||
collisions = ", ".join(
|
||||
sorted(new_paths[value] for value in overlap)
|
||||
)
|
||||
raise ValueError(
|
||||
f"Folder path(s) {collisions} already assigned to library '{other_name}'"
|
||||
)
|
||||
@@ -612,19 +646,31 @@ class SettingsManager:
|
||||
library["extra_folder_paths"] = normalized_extra_paths
|
||||
changed = True
|
||||
|
||||
if default_lora_root is not None and library.get("default_lora_root") != default_lora_root:
|
||||
if (
|
||||
default_lora_root is not None
|
||||
and library.get("default_lora_root") != default_lora_root
|
||||
):
|
||||
library["default_lora_root"] = default_lora_root
|
||||
changed = True
|
||||
|
||||
if default_checkpoint_root is not None and library.get("default_checkpoint_root") != default_checkpoint_root:
|
||||
if (
|
||||
default_checkpoint_root is not None
|
||||
and library.get("default_checkpoint_root") != default_checkpoint_root
|
||||
):
|
||||
library["default_checkpoint_root"] = default_checkpoint_root
|
||||
changed = True
|
||||
|
||||
if default_unet_root is not None and library.get("default_unet_root") != default_unet_root:
|
||||
if (
|
||||
default_unet_root is not None
|
||||
and library.get("default_unet_root") != default_unet_root
|
||||
):
|
||||
library["default_unet_root"] = default_unet_root
|
||||
changed = True
|
||||
|
||||
if default_embedding_root is not None and library.get("default_embedding_root") != default_embedding_root:
|
||||
if (
|
||||
default_embedding_root is not None
|
||||
and library.get("default_embedding_root") != default_embedding_root
|
||||
):
|
||||
library["default_embedding_root"] = default_embedding_root
|
||||
changed = True
|
||||
|
||||
@@ -637,16 +683,16 @@ class SettingsManager:
|
||||
def _migrate_setting_keys(self) -> None:
|
||||
"""Migrate legacy camelCase setting keys to snake_case"""
|
||||
key_migrations = {
|
||||
'optimizeExampleImages': 'optimize_example_images',
|
||||
'autoDownloadExampleImages': 'auto_download_example_images',
|
||||
'blurMatureContent': 'blur_mature_content',
|
||||
'matureBlurLevel': 'mature_blur_level',
|
||||
'autoplayOnHover': 'autoplay_on_hover',
|
||||
'displayDensity': 'display_density',
|
||||
'cardInfoDisplay': 'card_info_display',
|
||||
'includeTriggerWords': 'include_trigger_words',
|
||||
'compactMode': 'compact_mode',
|
||||
'modelCardFooterAction': 'model_card_footer_action',
|
||||
"optimizeExampleImages": "optimize_example_images",
|
||||
"autoDownloadExampleImages": "auto_download_example_images",
|
||||
"blurMatureContent": "blur_mature_content",
|
||||
"matureBlurLevel": "mature_blur_level",
|
||||
"autoplayOnHover": "autoplay_on_hover",
|
||||
"displayDensity": "display_density",
|
||||
"cardInfoDisplay": "card_info_display",
|
||||
"includeTriggerWords": "include_trigger_words",
|
||||
"compactMode": "compact_mode",
|
||||
"modelCardFooterAction": "model_card_footer_action",
|
||||
}
|
||||
|
||||
updated = False
|
||||
@@ -663,65 +709,77 @@ class SettingsManager:
|
||||
|
||||
def _migrate_download_path_template(self):
|
||||
"""Migrate old download_path_template to new download_path_templates"""
|
||||
old_template = self.settings.get('download_path_template')
|
||||
templates = self.settings.get('download_path_templates')
|
||||
old_template = self.settings.get("download_path_template")
|
||||
templates = self.settings.get("download_path_templates")
|
||||
|
||||
# If old template exists and new templates don't exist, migrate
|
||||
if old_template is not None and not templates:
|
||||
logger.info("Migrating download_path_template to download_path_templates")
|
||||
self.settings['download_path_templates'] = {
|
||||
'lora': old_template,
|
||||
'checkpoint': old_template,
|
||||
'embedding': old_template
|
||||
self.settings["download_path_templates"] = {
|
||||
"lora": old_template,
|
||||
"checkpoint": old_template,
|
||||
"embedding": old_template,
|
||||
}
|
||||
# Remove old setting
|
||||
del self.settings['download_path_template']
|
||||
del self.settings["download_path_template"]
|
||||
self._save_settings()
|
||||
logger.info("Migration completed")
|
||||
|
||||
def _auto_set_default_roots(self):
|
||||
"""Auto set default root paths when the current default is unset or not among the options.
|
||||
"""Ensure default root paths always point at a current valid root.
|
||||
|
||||
For single-path cases, always use that path.
|
||||
For multi-path cases, only set if current default is empty or invalid.
|
||||
Empty or stale defaults are repaired to the first configured root.
|
||||
Skips auto-setting when the settings file matches the template
|
||||
(user hasn't customized yet).
|
||||
"""
|
||||
folder_paths = self.settings.get('folder_paths', {})
|
||||
# Skip auto-setting if the user hasn't customized settings yet (template preserved)
|
||||
if self._preserve_disk_template:
|
||||
return
|
||||
|
||||
folder_paths = self.settings.get("folder_paths", {})
|
||||
updated = False
|
||||
# loras
|
||||
loras = folder_paths.get('loras', [])
|
||||
if isinstance(loras, list) and len(loras) == 1:
|
||||
current_lora_root = self.settings.get('default_lora_root')
|
||||
if current_lora_root not in loras:
|
||||
self.settings['default_lora_root'] = loras[0]
|
||||
updated = True
|
||||
# checkpoints
|
||||
checkpoints = folder_paths.get('checkpoints', [])
|
||||
if isinstance(checkpoints, list) and len(checkpoints) == 1:
|
||||
current_checkpoint_root = self.settings.get('default_checkpoint_root')
|
||||
if current_checkpoint_root not in checkpoints:
|
||||
self.settings['default_checkpoint_root'] = checkpoints[0]
|
||||
updated = True
|
||||
# unet (diffusion models) - auto-set if empty or invalid
|
||||
unet_paths = folder_paths.get('unet', [])
|
||||
if isinstance(unet_paths, list) and len(unet_paths) >= 1:
|
||||
current_unet_root = self.settings.get('default_unet_root')
|
||||
# Set to first path if current is empty or not in the valid paths
|
||||
if not current_unet_root or current_unet_root not in unet_paths:
|
||||
self.settings['default_unet_root'] = unet_paths[0]
|
||||
updated = True
|
||||
# embeddings
|
||||
embeddings = folder_paths.get('embeddings', [])
|
||||
if isinstance(embeddings, list) and len(embeddings) == 1:
|
||||
current_embedding_root = self.settings.get('default_embedding_root')
|
||||
if current_embedding_root not in embeddings:
|
||||
self.settings['default_embedding_root'] = embeddings[0]
|
||||
updated = True
|
||||
|
||||
def _check_and_auto_set(key: str, setting_key: str) -> bool:
|
||||
"""Repair default roots when empty or no longer present."""
|
||||
current = self.settings.get(setting_key, "")
|
||||
candidates = folder_paths.get(key, [])
|
||||
if not isinstance(candidates, list) or not candidates:
|
||||
return False
|
||||
|
||||
# Filter valid path strings
|
||||
valid_paths = [p for p in candidates if isinstance(p, str) and p.strip()]
|
||||
if not valid_paths:
|
||||
return False
|
||||
|
||||
if current in valid_paths:
|
||||
return False
|
||||
|
||||
self.settings[setting_key] = valid_paths[0]
|
||||
if current:
|
||||
logger.info(
|
||||
"Repaired stale %s from '%s' to '%s'",
|
||||
setting_key,
|
||||
current,
|
||||
valid_paths[0],
|
||||
)
|
||||
else:
|
||||
logger.info("Auto-set %s to '%s'", setting_key, valid_paths[0])
|
||||
return True
|
||||
|
||||
# Process all model types
|
||||
updated = _check_and_auto_set("loras", "default_lora_root") or updated
|
||||
updated = (
|
||||
_check_and_auto_set("checkpoints", "default_checkpoint_root") or updated
|
||||
)
|
||||
updated = _check_and_auto_set("unet", "default_unet_root") or updated
|
||||
updated = _check_and_auto_set("embeddings", "default_embedding_root") or updated
|
||||
|
||||
if updated:
|
||||
self._update_active_library_entry(
|
||||
default_lora_root=self.settings.get('default_lora_root'),
|
||||
default_checkpoint_root=self.settings.get('default_checkpoint_root'),
|
||||
default_unet_root=self.settings.get('default_unet_root'),
|
||||
default_embedding_root=self.settings.get('default_embedding_root'),
|
||||
default_lora_root=self.settings.get("default_lora_root"),
|
||||
default_checkpoint_root=self.settings.get("default_checkpoint_root"),
|
||||
default_unet_root=self.settings.get("default_unet_root"),
|
||||
default_embedding_root=self.settings.get("default_embedding_root"),
|
||||
)
|
||||
if self._bootstrap_reason == "missing":
|
||||
self._needs_initial_save = True
|
||||
@@ -730,11 +788,11 @@ class SettingsManager:
|
||||
|
||||
def _check_environment_variables(self) -> None:
|
||||
"""Check for environment variables and update settings if needed"""
|
||||
env_api_key = os.environ.get('CIVITAI_API_KEY')
|
||||
env_api_key = os.environ.get("CIVITAI_API_KEY")
|
||||
if env_api_key: # Check if the environment variable exists and is not empty
|
||||
logger.info("Found CIVITAI_API_KEY environment variable")
|
||||
# Always use the environment variable if it exists
|
||||
self.settings['civitai_api_key'] = env_api_key
|
||||
self.settings["civitai_api_key"] = env_api_key
|
||||
self._save_settings()
|
||||
|
||||
def _default_settings_actions(self) -> List[Dict[str, Any]]:
|
||||
@@ -799,7 +857,9 @@ class SettingsManager:
|
||||
disk_value = self._original_disk_payload.get(key)
|
||||
default_value = defaults.get(key)
|
||||
# Compare using JSON serialization for complex objects
|
||||
if json.dumps(disk_value, sort_keys=True, default=str) == json.dumps(default_value, sort_keys=True, default=str):
|
||||
if json.dumps(disk_value, sort_keys=True, default=str) == json.dumps(
|
||||
default_value, sort_keys=True, default=str
|
||||
):
|
||||
default_value_keys.add(key)
|
||||
|
||||
# Only cleanup if there are "many" default keys (indicating a bloated file)
|
||||
@@ -807,7 +867,7 @@ class SettingsManager:
|
||||
if len(default_value_keys) >= DEFAULT_KEYS_CLEANUP_THRESHOLD:
|
||||
logger.info(
|
||||
"Cleaning up %d default value(s) from settings.json to keep it minimal",
|
||||
len(default_value_keys)
|
||||
len(default_value_keys),
|
||||
)
|
||||
self._save_settings()
|
||||
# Update original payload to match what we just saved
|
||||
@@ -817,8 +877,8 @@ class SettingsManager:
|
||||
if not self._standalone_mode:
|
||||
return
|
||||
|
||||
folder_paths = self.settings.get('folder_paths', {}) or {}
|
||||
monitored_keys = ('loras', 'checkpoints', 'embeddings')
|
||||
folder_paths = self.settings.get("folder_paths", {}) or {}
|
||||
monitored_keys = ("loras", "checkpoints", "embeddings")
|
||||
|
||||
has_valid_paths = False
|
||||
for key in monitored_keys:
|
||||
@@ -829,7 +889,10 @@ class SettingsManager:
|
||||
iterator = list(raw_paths)
|
||||
except TypeError:
|
||||
continue
|
||||
if any(isinstance(path, str) and path and os.path.exists(path) for path in iterator):
|
||||
if any(
|
||||
isinstance(path, str) and path and os.path.exists(path)
|
||||
for path in iterator
|
||||
):
|
||||
has_valid_paths = True
|
||||
break
|
||||
|
||||
@@ -860,13 +923,13 @@ class SettingsManager:
|
||||
def _get_default_settings(self) -> Dict[str, Any]:
|
||||
"""Return default settings"""
|
||||
defaults = copy.deepcopy(DEFAULT_SETTINGS)
|
||||
defaults['base_model_path_mappings'] = {}
|
||||
defaults['download_path_templates'] = {}
|
||||
defaults['priority_tags'] = DEFAULT_PRIORITY_TAG_CONFIG.copy()
|
||||
defaults.setdefault('folder_paths', {})
|
||||
defaults.setdefault('extra_folder_paths', {})
|
||||
defaults['auto_organize_exclusions'] = []
|
||||
defaults['metadata_refresh_skip_paths'] = []
|
||||
defaults["base_model_path_mappings"] = {}
|
||||
defaults["download_path_templates"] = {}
|
||||
defaults["priority_tags"] = DEFAULT_PRIORITY_TAG_CONFIG.copy()
|
||||
defaults.setdefault("folder_paths", {})
|
||||
defaults.setdefault("extra_folder_paths", {})
|
||||
defaults["auto_organize_exclusions"] = []
|
||||
defaults["metadata_refresh_skip_paths"] = []
|
||||
|
||||
library_name = defaults.get("active_library") or "default"
|
||||
default_library = self._build_library_payload(
|
||||
@@ -876,8 +939,8 @@ class SettingsManager:
|
||||
default_checkpoint_root=defaults.get("default_checkpoint_root"),
|
||||
default_embedding_root=defaults.get("default_embedding_root"),
|
||||
)
|
||||
defaults['libraries'] = {library_name: default_library}
|
||||
defaults['active_library'] = library_name
|
||||
defaults["libraries"] = {library_name: default_library}
|
||||
defaults["active_library"] = library_name
|
||||
return defaults
|
||||
|
||||
def _normalize_priority_tag_config(self, value: Any) -> Dict[str, str]:
|
||||
@@ -908,7 +971,9 @@ class SettingsManager:
|
||||
candidates: Iterable[str] = (
|
||||
value.replace("\n", ",").replace(";", ",").split(",")
|
||||
)
|
||||
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)):
|
||||
elif isinstance(value, Sequence) and not isinstance(
|
||||
value, (bytes, bytearray, str)
|
||||
):
|
||||
candidates = value
|
||||
else:
|
||||
return []
|
||||
@@ -954,7 +1019,9 @@ class SettingsManager:
|
||||
candidates: Iterable[str] = (
|
||||
value.replace("\n", ",").replace(";", ",").split(",")
|
||||
)
|
||||
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)):
|
||||
elif isinstance(value, Sequence) and not isinstance(
|
||||
value, (bytes, bytearray, str)
|
||||
):
|
||||
candidates = value
|
||||
else:
|
||||
return []
|
||||
@@ -1060,7 +1127,9 @@ class SettingsManager:
|
||||
continue
|
||||
normalized = os.path.normcase(os.path.normpath(stripped))
|
||||
if os.path.exists(stripped):
|
||||
normalized = os.path.normcase(os.path.normpath(os.path.realpath(stripped)))
|
||||
normalized = os.path.normcase(
|
||||
os.path.normpath(os.path.realpath(stripped))
|
||||
)
|
||||
primary_real_paths.add(normalized)
|
||||
|
||||
primary_symlink_targets = set()
|
||||
@@ -1096,8 +1165,13 @@ class SettingsManager:
|
||||
continue
|
||||
normalized = os.path.normcase(os.path.normpath(stripped))
|
||||
if os.path.exists(stripped):
|
||||
normalized = os.path.normcase(os.path.normpath(os.path.realpath(stripped)))
|
||||
if normalized in primary_real_paths or normalized in primary_symlink_targets:
|
||||
normalized = os.path.normcase(
|
||||
os.path.normpath(os.path.realpath(stripped))
|
||||
)
|
||||
if (
|
||||
normalized in primary_real_paths
|
||||
or normalized in primary_symlink_targets
|
||||
):
|
||||
overlapping_paths.append(stripped)
|
||||
|
||||
if overlapping_paths:
|
||||
@@ -1161,19 +1235,19 @@ class SettingsManager:
|
||||
if key == "use_portable_settings" and isinstance(value, bool):
|
||||
portable_switch_pending = True
|
||||
self._prepare_portable_switch(value)
|
||||
if key == 'folder_paths' and isinstance(value, Mapping):
|
||||
if key == "folder_paths" and isinstance(value, Mapping):
|
||||
self._update_active_library_entry(folder_paths=value) # type: ignore[arg-type]
|
||||
elif key == 'extra_folder_paths' and isinstance(value, Mapping):
|
||||
elif key == "extra_folder_paths" and isinstance(value, Mapping):
|
||||
self._update_active_library_entry(extra_folder_paths=value) # type: ignore[arg-type]
|
||||
elif key == 'default_lora_root':
|
||||
elif key == "default_lora_root":
|
||||
self._update_active_library_entry(default_lora_root=str(value))
|
||||
elif key == 'default_checkpoint_root':
|
||||
elif key == "default_checkpoint_root":
|
||||
self._update_active_library_entry(default_checkpoint_root=str(value))
|
||||
elif key == 'default_unet_root':
|
||||
elif key == "default_unet_root":
|
||||
self._update_active_library_entry(default_unet_root=str(value))
|
||||
elif key == 'default_embedding_root':
|
||||
elif key == "default_embedding_root":
|
||||
self._update_active_library_entry(default_embedding_root=str(value))
|
||||
elif key == 'model_name_display':
|
||||
elif key == "model_name_display":
|
||||
self._notify_model_name_display_change(value)
|
||||
self._save_settings()
|
||||
if portable_switch_pending:
|
||||
@@ -1249,10 +1323,9 @@ class SettingsManager:
|
||||
|
||||
source_cache_dir = os.path.join(source_dir, "model_cache")
|
||||
target_cache_dir = os.path.join(target_dir, "model_cache")
|
||||
if (
|
||||
os.path.isdir(source_cache_dir)
|
||||
and os.path.abspath(source_cache_dir) != os.path.abspath(target_cache_dir)
|
||||
):
|
||||
if os.path.isdir(source_cache_dir) and os.path.abspath(
|
||||
source_cache_dir
|
||||
) != os.path.abspath(target_cache_dir):
|
||||
try:
|
||||
shutil.copytree(
|
||||
source_cache_dir,
|
||||
@@ -1270,10 +1343,9 @@ class SettingsManager:
|
||||
|
||||
source_cache_file = os.path.join(source_dir, "model_cache.sqlite")
|
||||
target_cache_file = os.path.join(target_dir, "model_cache.sqlite")
|
||||
if (
|
||||
os.path.isfile(source_cache_file)
|
||||
and os.path.abspath(source_cache_file) != os.path.abspath(target_cache_file)
|
||||
):
|
||||
if os.path.isfile(source_cache_file) and os.path.abspath(
|
||||
source_cache_file
|
||||
) != os.path.abspath(target_cache_file):
|
||||
try:
|
||||
shutil.copy2(source_cache_file, target_cache_file)
|
||||
except Exception as exc:
|
||||
@@ -1299,7 +1371,9 @@ class SettingsManager:
|
||||
try:
|
||||
os.makedirs(config_dir, exist_ok=True)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to create user config directory %s: %s", config_dir, exc)
|
||||
logger.warning(
|
||||
"Failed to create user config directory %s: %s", config_dir, exc
|
||||
)
|
||||
|
||||
return config_dir
|
||||
|
||||
@@ -1359,7 +1433,9 @@ class SettingsManager:
|
||||
try:
|
||||
asyncio.run(coroutine)
|
||||
except RuntimeError:
|
||||
logger.debug("Skipping name display update due to missing event loop")
|
||||
logger.debug(
|
||||
"Skipping name display update due to missing event loop"
|
||||
)
|
||||
continue
|
||||
|
||||
if loop is not None and target_loop is loop:
|
||||
@@ -1382,7 +1458,7 @@ class SettingsManager:
|
||||
"""Save settings to file"""
|
||||
try:
|
||||
payload = self._serialize_settings_for_disk()
|
||||
with open(self.settings_file, 'w', encoding='utf-8') as f:
|
||||
with open(self.settings_file, "w", encoding="utf-8") as f:
|
||||
json.dump(payload, f, indent=2)
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving settings: {e}")
|
||||
@@ -1423,7 +1499,9 @@ class SettingsManager:
|
||||
minimal[key] = copy.deepcopy(value)
|
||||
# Complex objects need deep comparison
|
||||
elif isinstance(value, (dict, list)) and default_value is not None:
|
||||
if json.dumps(value, sort_keys=True, default=str) != json.dumps(default_value, sort_keys=True, default=str):
|
||||
if json.dumps(value, sort_keys=True, default=str) != json.dumps(
|
||||
default_value, sort_keys=True, default=str
|
||||
):
|
||||
minimal[key] = copy.deepcopy(value)
|
||||
# Simple values use direct comparison
|
||||
elif value != default_value:
|
||||
@@ -1500,9 +1578,15 @@ class SettingsManager:
|
||||
existing = libraries.get(name, {})
|
||||
|
||||
payload = self._build_library_payload(
|
||||
folder_paths=folder_paths if folder_paths is not None else existing.get("folder_paths"),
|
||||
extra_folder_paths=extra_folder_paths if extra_folder_paths is not None else existing.get("extra_folder_paths"),
|
||||
default_lora_root=default_lora_root if default_lora_root is not None else existing.get("default_lora_root"),
|
||||
folder_paths=folder_paths
|
||||
if folder_paths is not None
|
||||
else existing.get("folder_paths"),
|
||||
extra_folder_paths=extra_folder_paths
|
||||
if extra_folder_paths is not None
|
||||
else existing.get("extra_folder_paths"),
|
||||
default_lora_root=default_lora_root
|
||||
if default_lora_root is not None
|
||||
else existing.get("default_lora_root"),
|
||||
default_checkpoint_root=(
|
||||
default_checkpoint_root
|
||||
if default_checkpoint_root is not None
|
||||
@@ -1662,7 +1746,9 @@ class SettingsManager:
|
||||
if service and hasattr(service, "on_library_changed"):
|
||||
try:
|
||||
service.on_library_changed()
|
||||
except Exception as service_exc: # pragma: no cover - defensive logging
|
||||
except (
|
||||
Exception
|
||||
) as service_exc: # pragma: no cover - defensive logging
|
||||
logger.debug(
|
||||
"Service %s failed to handle library change: %s",
|
||||
service_name,
|
||||
@@ -1673,15 +1759,15 @@ class SettingsManager:
|
||||
|
||||
def get_download_path_template(self, model_type: str) -> str:
|
||||
"""Get download path template for specific model type
|
||||
|
||||
|
||||
Args:
|
||||
model_type: The type of model ('lora', 'checkpoint', 'embedding')
|
||||
|
||||
|
||||
Returns:
|
||||
Template string for the model type, defaults to '{base_model}/{first_tag}'
|
||||
"""
|
||||
templates = self.settings.get('download_path_templates', {})
|
||||
|
||||
templates = self.settings.get("download_path_templates", {})
|
||||
|
||||
# Handle edge case where templates might be stored as JSON string
|
||||
if isinstance(templates, str):
|
||||
try:
|
||||
@@ -1689,36 +1775,40 @@ class SettingsManager:
|
||||
parsed_templates = json.loads(templates)
|
||||
if isinstance(parsed_templates, dict):
|
||||
# Update settings with parsed dictionary
|
||||
self.settings['download_path_templates'] = parsed_templates
|
||||
self.settings["download_path_templates"] = parsed_templates
|
||||
self._save_settings()
|
||||
templates = parsed_templates
|
||||
logger.info("Successfully parsed download_path_templates from JSON string")
|
||||
logger.info(
|
||||
"Successfully parsed download_path_templates from JSON string"
|
||||
)
|
||||
else:
|
||||
raise ValueError("Parsed JSON is not a dictionary")
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
# If parsing fails, set default values
|
||||
logger.warning(f"Failed to parse download_path_templates JSON string: {e}. Setting default values.")
|
||||
default_template = '{base_model}/{first_tag}'
|
||||
logger.warning(
|
||||
f"Failed to parse download_path_templates JSON string: {e}. Setting default values."
|
||||
)
|
||||
default_template = "{base_model}/{first_tag}"
|
||||
templates = {
|
||||
'lora': default_template,
|
||||
'checkpoint': default_template,
|
||||
'embedding': default_template
|
||||
"lora": default_template,
|
||||
"checkpoint": default_template,
|
||||
"embedding": default_template,
|
||||
}
|
||||
self.settings['download_path_templates'] = templates
|
||||
self.settings["download_path_templates"] = templates
|
||||
self._save_settings()
|
||||
|
||||
|
||||
# Ensure templates is a dictionary
|
||||
if not isinstance(templates, dict):
|
||||
default_template = '{base_model}/{first_tag}'
|
||||
default_template = "{base_model}/{first_tag}"
|
||||
templates = {
|
||||
'lora': default_template,
|
||||
'checkpoint': default_template,
|
||||
'embedding': default_template
|
||||
"lora": default_template,
|
||||
"checkpoint": default_template,
|
||||
"embedding": default_template,
|
||||
}
|
||||
self.settings['download_path_templates'] = templates
|
||||
self.settings["download_path_templates"] = templates
|
||||
self._save_settings()
|
||||
|
||||
return templates.get(model_type, '{base_model}/{first_tag}')
|
||||
|
||||
return templates.get(model_type, "{base_model}/{first_tag}")
|
||||
|
||||
|
||||
_SETTINGS_MANAGER: Optional["SettingsManager"] = None
|
||||
|
||||
@@ -1246,10 +1246,7 @@ export class SettingsManager {
|
||||
throw new Error('No LoRA roots found');
|
||||
}
|
||||
|
||||
// Clear existing options except the first one (No Default)
|
||||
const noDefaultOption = defaultLoraRootSelect.querySelector('option[value=""]');
|
||||
defaultLoraRootSelect.innerHTML = '';
|
||||
defaultLoraRootSelect.appendChild(noDefaultOption);
|
||||
|
||||
// Add options for each root
|
||||
data.roots.forEach(root => {
|
||||
@@ -1259,9 +1256,8 @@ export class SettingsManager {
|
||||
defaultLoraRootSelect.appendChild(option);
|
||||
});
|
||||
|
||||
// Set selected value from settings
|
||||
const defaultRoot = state.global.settings.default_lora_root || '';
|
||||
defaultLoraRootSelect.value = defaultRoot;
|
||||
defaultLoraRootSelect.value = data.roots.includes(defaultRoot) ? defaultRoot : data.roots[0];
|
||||
|
||||
} catch (error) {
|
||||
console.error('Error loading LoRA roots:', error);
|
||||
@@ -1285,10 +1281,7 @@ export class SettingsManager {
|
||||
throw new Error('No checkpoint roots found');
|
||||
}
|
||||
|
||||
// Clear existing options except first one (No Default)
|
||||
const noDefaultOption = defaultCheckpointRootSelect.querySelector('option[value=""]');
|
||||
defaultCheckpointRootSelect.innerHTML = '';
|
||||
defaultCheckpointRootSelect.appendChild(noDefaultOption);
|
||||
|
||||
// Add options for each root
|
||||
data.roots.forEach(root => {
|
||||
@@ -1298,9 +1291,8 @@ export class SettingsManager {
|
||||
defaultCheckpointRootSelect.appendChild(option);
|
||||
});
|
||||
|
||||
// Set selected value from settings
|
||||
const defaultRoot = state.global.settings.default_checkpoint_root || '';
|
||||
defaultCheckpointRootSelect.value = defaultRoot;
|
||||
defaultCheckpointRootSelect.value = data.roots.includes(defaultRoot) ? defaultRoot : data.roots[0];
|
||||
|
||||
} catch (error) {
|
||||
console.error('Error loading checkpoint roots:', error);
|
||||
@@ -1324,10 +1316,7 @@ export class SettingsManager {
|
||||
throw new Error('No diffusion model roots found');
|
||||
}
|
||||
|
||||
// Clear existing options except first one (No Default)
|
||||
const noDefaultOption = defaultUnetRootSelect.querySelector('option[value=""]');
|
||||
defaultUnetRootSelect.innerHTML = '';
|
||||
defaultUnetRootSelect.appendChild(noDefaultOption);
|
||||
|
||||
// Add options for each root
|
||||
data.roots.forEach(root => {
|
||||
@@ -1337,9 +1326,8 @@ export class SettingsManager {
|
||||
defaultUnetRootSelect.appendChild(option);
|
||||
});
|
||||
|
||||
// Set selected value from settings
|
||||
const defaultRoot = state.global.settings.default_unet_root || '';
|
||||
defaultUnetRootSelect.value = defaultRoot;
|
||||
defaultUnetRootSelect.value = data.roots.includes(defaultRoot) ? defaultRoot : data.roots[0];
|
||||
|
||||
} catch (error) {
|
||||
console.error('Error loading diffusion model roots:', error);
|
||||
@@ -1363,10 +1351,7 @@ export class SettingsManager {
|
||||
throw new Error('No embedding roots found');
|
||||
}
|
||||
|
||||
// Clear existing options except first one (No Default)
|
||||
const noDefaultOption = defaultEmbeddingRootSelect.querySelector('option[value=""]');
|
||||
defaultEmbeddingRootSelect.innerHTML = '';
|
||||
defaultEmbeddingRootSelect.appendChild(noDefaultOption);
|
||||
|
||||
// Add options for each root
|
||||
data.roots.forEach(root => {
|
||||
@@ -1376,9 +1361,8 @@ export class SettingsManager {
|
||||
defaultEmbeddingRootSelect.appendChild(option);
|
||||
});
|
||||
|
||||
// Set selected value from settings
|
||||
const defaultRoot = state.global.settings.default_embedding_root || '';
|
||||
defaultEmbeddingRootSelect.value = defaultRoot;
|
||||
defaultEmbeddingRootSelect.value = data.roots.includes(defaultRoot) ? defaultRoot : data.roots[0];
|
||||
|
||||
} catch (error) {
|
||||
console.error('Error loading embedding roots:', error);
|
||||
|
||||
@@ -484,9 +484,7 @@
|
||||
</label>
|
||||
</div>
|
||||
<div class="setting-control select-control">
|
||||
<select id="defaultLoraRoot" onchange="settingsManager.saveSelectSetting('defaultLoraRoot', 'default_lora_root')">
|
||||
<option value="">{{ t('settings.folderSettings.noDefault') }}</option>
|
||||
</select>
|
||||
<select id="defaultLoraRoot" onchange="settingsManager.saveSelectSetting('defaultLoraRoot', 'default_lora_root')"></select>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -500,9 +498,7 @@
|
||||
</label>
|
||||
</div>
|
||||
<div class="setting-control select-control">
|
||||
<select id="defaultCheckpointRoot" onchange="settingsManager.saveSelectSetting('defaultCheckpointRoot', 'default_checkpoint_root')">
|
||||
<option value="">{{ t('settings.folderSettings.noDefault') }}</option>
|
||||
</select>
|
||||
<select id="defaultCheckpointRoot" onchange="settingsManager.saveSelectSetting('defaultCheckpointRoot', 'default_checkpoint_root')"></select>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -516,9 +512,7 @@
|
||||
</label>
|
||||
</div>
|
||||
<div class="setting-control select-control">
|
||||
<select id="defaultUnetRoot" onchange="settingsManager.saveSelectSetting('defaultUnetRoot', 'default_unet_root')">
|
||||
<option value="">{{ t('settings.folderSettings.noDefault') }}</option>
|
||||
</select>
|
||||
<select id="defaultUnetRoot" onchange="settingsManager.saveSelectSetting('defaultUnetRoot', 'default_unet_root')"></select>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -532,9 +526,7 @@
|
||||
</label>
|
||||
</div>
|
||||
<div class="setting-control select-control">
|
||||
<select id="defaultEmbeddingRoot" onchange="settingsManager.saveSelectSetting('defaultEmbeddingRoot', 'default_embedding_root')">
|
||||
<option value="">{{ t('settings.folderSettings.noDefault') }}</option>
|
||||
</select>
|
||||
<select id="defaultEmbeddingRoot" onchange="settingsManager.saveSelectSetting('defaultEmbeddingRoot', 'default_embedding_root')"></select>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -131,6 +131,102 @@ def test_save_paths_logs_warning_when_upsert_fails(
|
||||
assert "Failed to save folder paths: boom" in caplog.text
|
||||
|
||||
|
||||
def test_save_paths_repairs_empty_default_roots(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||
folder_paths = _setup_config_environment(monkeypatch, tmp_path)
|
||||
|
||||
class FakeSettingsService:
|
||||
def get_libraries(self):
|
||||
return {
|
||||
"comfyui": {
|
||||
"folder_paths": {key: list(value) for key, value in folder_paths.items()},
|
||||
"default_lora_root": "",
|
||||
"default_checkpoint_root": "",
|
||||
"default_embedding_root": "",
|
||||
}
|
||||
}
|
||||
|
||||
def rename_library(self, *_):
|
||||
raise AssertionError("rename_library should not be invoked")
|
||||
|
||||
def upsert_library(self, name: str, **payload):
|
||||
self.name = name
|
||||
self.payload = payload
|
||||
|
||||
fake_settings = FakeSettingsService()
|
||||
monkeypatch.setattr(settings_manager_module, "settings", fake_settings)
|
||||
|
||||
config_module.Config()
|
||||
|
||||
assert fake_settings.name == "comfyui"
|
||||
assert fake_settings.payload["default_lora_root"] == folder_paths["loras"][0].replace("\\", "/")
|
||||
assert fake_settings.payload["default_checkpoint_root"] == folder_paths["checkpoints"][0].replace("\\", "/")
|
||||
assert fake_settings.payload["default_embedding_root"] == folder_paths["embeddings"][0].replace("\\", "/")
|
||||
|
||||
|
||||
def test_save_paths_repairs_stale_default_roots(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||
folder_paths = _setup_config_environment(monkeypatch, tmp_path)
|
||||
|
||||
class FakeSettingsService:
|
||||
def get_libraries(self):
|
||||
return {
|
||||
"comfyui": {
|
||||
"folder_paths": {key: list(value) for key, value in folder_paths.items()},
|
||||
"default_lora_root": "/stale/loras",
|
||||
"default_checkpoint_root": "/stale/checkpoints",
|
||||
"default_embedding_root": "/stale/embeddings",
|
||||
}
|
||||
}
|
||||
|
||||
def rename_library(self, *_):
|
||||
raise AssertionError("rename_library should not be invoked")
|
||||
|
||||
def upsert_library(self, name: str, **payload):
|
||||
self.name = name
|
||||
self.payload = payload
|
||||
|
||||
fake_settings = FakeSettingsService()
|
||||
monkeypatch.setattr(settings_manager_module, "settings", fake_settings)
|
||||
|
||||
config_module.Config()
|
||||
|
||||
assert fake_settings.name == "comfyui"
|
||||
assert fake_settings.payload["default_lora_root"] == folder_paths["loras"][0].replace("\\", "/")
|
||||
assert fake_settings.payload["default_checkpoint_root"] == folder_paths["checkpoints"][0].replace("\\", "/")
|
||||
assert fake_settings.payload["default_embedding_root"] == folder_paths["embeddings"][0].replace("\\", "/")
|
||||
|
||||
|
||||
def test_save_paths_keeps_valid_default_roots(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
||||
folder_paths = _setup_config_environment(monkeypatch, tmp_path)
|
||||
|
||||
class FakeSettingsService:
|
||||
def get_libraries(self):
|
||||
return {
|
||||
"comfyui": {
|
||||
"folder_paths": {key: list(value) for key, value in folder_paths.items()},
|
||||
"default_lora_root": folder_paths["loras"][0],
|
||||
"default_checkpoint_root": folder_paths["checkpoints"][0],
|
||||
"default_embedding_root": folder_paths["embeddings"][0],
|
||||
}
|
||||
}
|
||||
|
||||
def rename_library(self, *_):
|
||||
raise AssertionError("rename_library should not be invoked")
|
||||
|
||||
def upsert_library(self, name: str, **payload):
|
||||
self.name = name
|
||||
self.payload = payload
|
||||
|
||||
fake_settings = FakeSettingsService()
|
||||
monkeypatch.setattr(settings_manager_module, "settings", fake_settings)
|
||||
|
||||
config_module.Config()
|
||||
|
||||
assert fake_settings.name == "comfyui"
|
||||
assert fake_settings.payload["default_lora_root"] == folder_paths["loras"][0].replace("\\", "/")
|
||||
assert fake_settings.payload["default_checkpoint_root"] == folder_paths["checkpoints"][0].replace("\\", "/")
|
||||
assert fake_settings.payload["default_embedding_root"] == folder_paths["embeddings"][0].replace("\\", "/")
|
||||
|
||||
|
||||
def test_save_paths_removes_template_default_library(monkeypatch, tmp_path):
|
||||
folder_paths = _setup_config_environment(monkeypatch, tmp_path)
|
||||
|
||||
|
||||
@@ -17,7 +17,9 @@ def test_portable_settings_use_project_root(tmp_path, monkeypatch):
|
||||
from importlib import reload
|
||||
|
||||
settings_paths_module = reload(settings_paths)
|
||||
monkeypatch.setattr(settings_paths_module, "get_project_root", lambda: str(tmp_path))
|
||||
monkeypatch.setattr(
|
||||
settings_paths_module, "get_project_root", lambda: str(tmp_path)
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
settings_paths_module,
|
||||
"user_config_dir",
|
||||
@@ -25,7 +27,9 @@ def test_portable_settings_use_project_root(tmp_path, monkeypatch):
|
||||
)
|
||||
|
||||
portable_settings = {"use_portable_settings": True}
|
||||
(tmp_path / "settings.json").write_text(json.dumps(portable_settings), encoding="utf-8")
|
||||
(tmp_path / "settings.json").write_text(
|
||||
json.dumps(portable_settings), encoding="utf-8"
|
||||
)
|
||||
|
||||
config_dir = settings_paths_module.get_settings_dir(create=True)
|
||||
assert config_dir == str(tmp_path)
|
||||
@@ -74,7 +78,9 @@ def test_initial_save_persists_minimal_template(tmp_path, monkeypatch):
|
||||
self._seed_template = copy.deepcopy(template)
|
||||
return copy.deepcopy(template)
|
||||
|
||||
monkeypatch.setattr(SettingsManager, "_load_settings_template", fake_template_loader)
|
||||
monkeypatch.setattr(
|
||||
SettingsManager, "_load_settings_template", fake_template_loader
|
||||
)
|
||||
|
||||
manager = SettingsManager()
|
||||
|
||||
@@ -118,7 +124,10 @@ def test_existing_folder_paths_seed_default_library(tmp_path, monkeypatch):
|
||||
assert "default" in libraries
|
||||
assert libraries["default"]["folder_paths"]["loras"] == [str(lora_dir)]
|
||||
assert libraries["default"]["folder_paths"]["checkpoints"] == [str(checkpoint_dir)]
|
||||
assert libraries["default"]["folder_paths"]["unet"] == [str(diffusion_dir), str(unet_dir)]
|
||||
assert libraries["default"]["folder_paths"]["unet"] == [
|
||||
str(diffusion_dir),
|
||||
str(unet_dir),
|
||||
]
|
||||
assert libraries["default"]["folder_paths"]["embeddings"] == [str(embedding_dir)]
|
||||
|
||||
assert manager.get_startup_messages() == []
|
||||
@@ -138,7 +147,9 @@ def test_environment_variable_overrides_settings(tmp_path, monkeypatch):
|
||||
assert mgr.get("civitai_api_key") == "secret"
|
||||
|
||||
|
||||
def _create_manager_with_settings(tmp_path, monkeypatch, initial_settings, *, save_spy=None):
|
||||
def _create_manager_with_settings(
|
||||
tmp_path, monkeypatch, initial_settings, *, save_spy=None
|
||||
):
|
||||
"""Helper to instantiate SettingsManager with predefined settings."""
|
||||
|
||||
fake_settings_path = tmp_path / "settings.json"
|
||||
@@ -203,7 +214,9 @@ def test_switch_to_portable_mode_copies_cache(tmp_path, monkeypatch):
|
||||
assert manager.settings_file == str(project_root / "settings.json")
|
||||
marker_copy = project_root / "model_cache" / "user_marker.txt"
|
||||
assert marker_copy.read_text(encoding="utf-8") == "user_marker.txt"
|
||||
assert (project_root / "model_cache.sqlite").read_text(encoding="utf-8") == "user_db"
|
||||
assert (project_root / "model_cache.sqlite").read_text(
|
||||
encoding="utf-8"
|
||||
) == "user_db"
|
||||
assert user_settings.exists()
|
||||
|
||||
|
||||
@@ -216,13 +229,17 @@ def test_switching_back_to_user_config_moves_cache(tmp_path, monkeypatch):
|
||||
|
||||
project_cache_dir = project_root / "model_cache"
|
||||
project_cache_dir.mkdir(exist_ok=True)
|
||||
(project_cache_dir / "project_marker.txt").write_text("project_marker", encoding="utf-8")
|
||||
(project_cache_dir / "project_marker.txt").write_text(
|
||||
"project_marker", encoding="utf-8"
|
||||
)
|
||||
(project_root / "model_cache.sqlite").write_text("project_db", encoding="utf-8")
|
||||
|
||||
manager.set("use_portable_settings", False)
|
||||
|
||||
assert manager.settings_file == str(user_settings)
|
||||
assert (user_dir / "model_cache" / "project_marker.txt").read_text(encoding="utf-8") == "project_marker"
|
||||
assert (user_dir / "model_cache" / "project_marker.txt").read_text(
|
||||
encoding="utf-8"
|
||||
) == "project_marker"
|
||||
assert (user_dir / "model_cache.sqlite").read_text(encoding="utf-8") == "project_db"
|
||||
|
||||
|
||||
@@ -242,10 +259,19 @@ def test_download_path_template_invalid_json(manager):
|
||||
template = manager.get_download_path_template("checkpoint")
|
||||
|
||||
assert template == "{base_model}/{first_tag}"
|
||||
assert manager.settings["download_path_templates"]["lora"] == "{base_model}/{first_tag}"
|
||||
assert (
|
||||
manager.settings["download_path_templates"]["lora"]
|
||||
== "{base_model}/{first_tag}"
|
||||
)
|
||||
|
||||
|
||||
def test_auto_set_default_roots(manager):
|
||||
# Clear any previously auto-set values to test fresh behavior
|
||||
manager.settings["default_lora_root"] = ""
|
||||
manager.settings["default_checkpoint_root"] = ""
|
||||
manager.settings["default_embedding_root"] = ""
|
||||
manager.settings["default_unet_root"] = ""
|
||||
|
||||
manager.settings["folder_paths"] = {
|
||||
"loras": ["/loras"],
|
||||
"checkpoints": ["/checkpoints"],
|
||||
@@ -259,6 +285,48 @@ def test_auto_set_default_roots(manager):
|
||||
assert manager.get("default_embedding_root") == "/embeddings"
|
||||
|
||||
|
||||
def test_auto_set_default_roots_repairs_stale_values(manager):
|
||||
manager.settings["default_lora_root"] = "/stale-lora"
|
||||
manager.settings["default_checkpoint_root"] = "/stale-checkpoint"
|
||||
manager.settings["default_embedding_root"] = "/stale-embedding"
|
||||
manager.settings["default_unet_root"] = "/stale-unet"
|
||||
|
||||
manager.settings["folder_paths"] = {
|
||||
"loras": ["/loras"],
|
||||
"checkpoints": ["/checkpoints"],
|
||||
"unet": ["/unet"],
|
||||
"embeddings": ["/embeddings"],
|
||||
}
|
||||
|
||||
manager._auto_set_default_roots()
|
||||
|
||||
assert manager.get("default_lora_root") == "/loras"
|
||||
assert manager.get("default_checkpoint_root") == "/checkpoints"
|
||||
assert manager.get("default_unet_root") == "/unet"
|
||||
assert manager.get("default_embedding_root") == "/embeddings"
|
||||
|
||||
|
||||
def test_auto_set_default_roots_keeps_valid_values(manager):
|
||||
manager.settings["default_lora_root"] = "/loras"
|
||||
manager.settings["default_checkpoint_root"] = "/checkpoints"
|
||||
manager.settings["default_embedding_root"] = "/embeddings"
|
||||
manager.settings["default_unet_root"] = "/unet"
|
||||
|
||||
manager.settings["folder_paths"] = {
|
||||
"loras": ["/loras", "/other-loras"],
|
||||
"checkpoints": ["/checkpoints"],
|
||||
"unet": ["/unet", "/other-unet"],
|
||||
"embeddings": ["/embeddings"],
|
||||
}
|
||||
|
||||
manager._auto_set_default_roots()
|
||||
|
||||
assert manager.get("default_lora_root") == "/loras"
|
||||
assert manager.get("default_checkpoint_root") == "/checkpoints"
|
||||
assert manager.get("default_unet_root") == "/unet"
|
||||
assert manager.get("default_embedding_root") == "/embeddings"
|
||||
|
||||
|
||||
def test_delete_setting(manager):
|
||||
manager.set("example", 1)
|
||||
manager.delete("example")
|
||||
@@ -293,7 +361,14 @@ def test_invalid_mature_blur_level_is_normalized_to_r(tmp_path, monkeypatch):
|
||||
|
||||
def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch):
|
||||
initial = {
|
||||
"libraries": {"default": {"folder_paths": {}, "default_lora_root": "", "default_checkpoint_root": "", "default_embedding_root": ""}},
|
||||
"libraries": {
|
||||
"default": {
|
||||
"folder_paths": {},
|
||||
"default_lora_root": "",
|
||||
"default_checkpoint_root": "",
|
||||
"default_embedding_root": "",
|
||||
}
|
||||
},
|
||||
"active_library": "default",
|
||||
"model_name_display": "model_name",
|
||||
}
|
||||
@@ -315,6 +390,7 @@ def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch):
|
||||
|
||||
dispatched_loops = []
|
||||
futures = []
|
||||
|
||||
def tracking_run_coroutine_threadsafe(coro, target_loop):
|
||||
dispatched_loops.append(target_loop)
|
||||
future = Future()
|
||||
@@ -335,7 +411,9 @@ def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch):
|
||||
"get_service_sync",
|
||||
classmethod(fake_get_service_sync),
|
||||
)
|
||||
monkeypatch.setattr(asyncio, "run_coroutine_threadsafe", tracking_run_coroutine_threadsafe)
|
||||
monkeypatch.setattr(
|
||||
asyncio, "run_coroutine_threadsafe", tracking_run_coroutine_threadsafe
|
||||
)
|
||||
|
||||
try:
|
||||
manager.set("model_name_display", "file_name")
|
||||
@@ -354,12 +432,14 @@ def test_migrates_legacy_settings_file(tmp_path, monkeypatch):
|
||||
legacy_root = tmp_path / "legacy"
|
||||
legacy_root.mkdir()
|
||||
legacy_file = legacy_root / "settings.json"
|
||||
legacy_file.write_text("{\"value\": 1}", encoding="utf-8")
|
||||
legacy_file.write_text('{"value": 1}', encoding="utf-8")
|
||||
|
||||
target_dir = tmp_path / "config"
|
||||
|
||||
monkeypatch.setattr(settings_paths, "get_project_root", lambda: str(legacy_root))
|
||||
monkeypatch.setattr(settings_paths, "user_config_dir", lambda *_, **__: str(target_dir))
|
||||
monkeypatch.setattr(
|
||||
settings_paths, "user_config_dir", lambda *_, **__: str(target_dir)
|
||||
)
|
||||
|
||||
migrated_path = settings_paths.ensure_settings_file()
|
||||
|
||||
@@ -380,7 +460,9 @@ def test_uses_portable_settings_file_when_enabled(tmp_path, monkeypatch):
|
||||
user_dir = tmp_path / "user"
|
||||
|
||||
monkeypatch.setattr(settings_paths, "get_project_root", lambda: str(repo_root))
|
||||
monkeypatch.setattr(settings_paths, "user_config_dir", lambda *_, **__: str(user_dir))
|
||||
monkeypatch.setattr(
|
||||
settings_paths, "user_config_dir", lambda *_, **__: str(user_dir)
|
||||
)
|
||||
|
||||
resolved = settings_paths.ensure_settings_file()
|
||||
|
||||
@@ -393,7 +475,9 @@ def test_migrate_creates_default_library(manager):
|
||||
libraries = manager.get_libraries()
|
||||
assert "default" in libraries
|
||||
assert manager.get_active_library_name() == "default"
|
||||
assert libraries["default"].get("folder_paths", {}) == manager.settings.get("folder_paths", {})
|
||||
assert libraries["default"].get("folder_paths", {}) == manager.settings.get(
|
||||
"folder_paths", {}
|
||||
)
|
||||
|
||||
|
||||
def test_migrate_sanitizes_legacy_libraries(tmp_path, monkeypatch):
|
||||
@@ -464,12 +548,21 @@ def test_refresh_environment_variables_updates_stored_value(tmp_path, monkeypatc
|
||||
|
||||
initial = {
|
||||
"civitai_api_key": "stale",
|
||||
"libraries": {"default": {"folder_paths": {}, "default_lora_root": "", "default_checkpoint_root": "", "default_embedding_root": ""}},
|
||||
"libraries": {
|
||||
"default": {
|
||||
"folder_paths": {},
|
||||
"default_lora_root": "",
|
||||
"default_checkpoint_root": "",
|
||||
"default_embedding_root": "",
|
||||
}
|
||||
},
|
||||
"active_library": "default",
|
||||
}
|
||||
|
||||
monkeypatch.setenv("CIVITAI_API_KEY", "from-init")
|
||||
manager = _create_manager_with_settings(tmp_path, monkeypatch, initial, save_spy=save_spy)
|
||||
manager = _create_manager_with_settings(
|
||||
tmp_path, monkeypatch, initial, save_spy=save_spy
|
||||
)
|
||||
|
||||
assert calls[-1] == "from-init"
|
||||
|
||||
@@ -590,7 +683,9 @@ def test_extra_paths_validation_no_overlap_with_other_libraries(manager, tmp_pat
|
||||
manager.update_extra_folder_paths({"loras": [str(lora_dir1)]})
|
||||
|
||||
|
||||
def test_extra_paths_validation_no_overlap_with_active_primary_lora_root(manager, tmp_path):
|
||||
def test_extra_paths_validation_no_overlap_with_active_primary_lora_root(
|
||||
manager, tmp_path
|
||||
):
|
||||
"""Test that extra LoRA paths cannot overlap the active library primary LoRA roots."""
|
||||
real_lora_dir = tmp_path / "loras_real"
|
||||
real_lora_dir.mkdir()
|
||||
@@ -603,7 +698,9 @@ def test_extra_paths_validation_no_overlap_with_active_primary_lora_root(manager
|
||||
activate=True,
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="overlap with the active library's primary LoRA roots"):
|
||||
with pytest.raises(
|
||||
ValueError, match="overlap with the active library's primary LoRA roots"
|
||||
):
|
||||
manager.update_extra_folder_paths({"loras": [str(real_lora_dir)]})
|
||||
|
||||
|
||||
@@ -627,7 +724,10 @@ def test_extra_paths_validation_no_overlap_with_active_primary_lora_root_case_in
|
||||
original_normcase = settings_manager_module.os.path.normcase
|
||||
|
||||
def fake_exists(path):
|
||||
if isinstance(path, str) and path.lower() in {str(lora_link).lower(), str(real_lora_dir).lower()}:
|
||||
if isinstance(path, str) and path.lower() in {
|
||||
str(lora_link).lower(),
|
||||
str(real_lora_dir).lower(),
|
||||
}:
|
||||
return True
|
||||
return original_exists(path)
|
||||
|
||||
@@ -638,13 +738,21 @@ def test_extra_paths_validation_no_overlap_with_active_primary_lora_root_case_in
|
||||
|
||||
monkeypatch.setattr(settings_manager_module.os.path, "exists", fake_exists)
|
||||
monkeypatch.setattr(settings_manager_module.os.path, "realpath", fake_realpath)
|
||||
monkeypatch.setattr(settings_manager_module.os.path, "normcase", lambda value: original_normcase(value).lower())
|
||||
monkeypatch.setattr(
|
||||
settings_manager_module.os.path,
|
||||
"normcase",
|
||||
lambda value: original_normcase(value).lower(),
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="overlap with the active library's primary LoRA roots"):
|
||||
with pytest.raises(
|
||||
ValueError, match="overlap with the active library's primary LoRA roots"
|
||||
):
|
||||
manager.update_extra_folder_paths({"loras": [str(real_lora_dir).upper()]})
|
||||
|
||||
|
||||
def test_extra_paths_validation_allows_missing_non_overlapping_lora_root(manager, tmp_path):
|
||||
def test_extra_paths_validation_allows_missing_non_overlapping_lora_root(
|
||||
manager, tmp_path
|
||||
):
|
||||
"""Missing non-overlapping extra LoRA paths should not be rejected."""
|
||||
lora_dir = tmp_path / "loras"
|
||||
lora_dir.mkdir()
|
||||
@@ -662,7 +770,9 @@ def test_extra_paths_validation_allows_missing_non_overlapping_lora_root(manager
|
||||
assert extra_paths["loras"] == [str(missing_extra)]
|
||||
|
||||
|
||||
def test_extra_paths_validation_rejects_primary_root_first_level_symlink_target(manager, tmp_path):
|
||||
def test_extra_paths_validation_rejects_primary_root_first_level_symlink_target(
|
||||
manager, tmp_path
|
||||
):
|
||||
"""Extra LoRA paths should be rejected when already reachable via a first-level symlink under the primary root."""
|
||||
lora_dir = tmp_path / "loras"
|
||||
lora_dir.mkdir()
|
||||
@@ -677,7 +787,9 @@ def test_extra_paths_validation_rejects_primary_root_first_level_symlink_target(
|
||||
activate=True,
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="overlap with the active library's primary LoRA roots"):
|
||||
with pytest.raises(
|
||||
ValueError, match="overlap with the active library's primary LoRA roots"
|
||||
):
|
||||
manager.update_extra_folder_paths({"loras": [str(external_dir)]})
|
||||
|
||||
|
||||
@@ -698,7 +810,6 @@ def test_delete_library_switches_active(manager, tmp_path):
|
||||
assert manager.get_active_library_name() == "default"
|
||||
|
||||
|
||||
|
||||
def test_download_skip_base_models_are_normalized(manager):
|
||||
manager.settings["download_skip_base_models"] = [
|
||||
"SDXL 1.0",
|
||||
@@ -715,9 +826,6 @@ def test_download_skip_base_models_are_normalized(manager):
|
||||
|
||||
|
||||
def test_setting_download_skip_base_models_normalizes_string_input(manager):
|
||||
manager.set(
|
||||
"download_skip_base_models",
|
||||
"SDXL 1.0, Pony; Invalid\nSDXL 1.0"
|
||||
)
|
||||
manager.set("download_skip_base_models", "SDXL 1.0, Pony; Invalid\nSDXL 1.0")
|
||||
|
||||
assert manager.get("download_skip_base_models") == ["SDXL 1.0", "Pony"]
|
||||
|
||||
Reference in New Issue
Block a user