mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-25 15:15:44 -03:00
Merge pull request #542 from willmiao/codex/investigate-backend-tests-modifying-settings.json
Refactor settings manager to lazy singleton
This commit is contained in:
@@ -74,8 +74,9 @@ class Config:
|
|||||||
"""Persist ComfyUI-derived folder paths to the multi-library settings."""
|
"""Persist ComfyUI-derived folder paths to the multi-library settings."""
|
||||||
try:
|
try:
|
||||||
ensure_settings_file(logger)
|
ensure_settings_file(logger)
|
||||||
from .services.settings_manager import settings as settings_service
|
from .services.settings_manager import get_settings_manager
|
||||||
|
|
||||||
|
settings_service = get_settings_manager()
|
||||||
libraries = settings_service.get_libraries()
|
libraries = settings_service.get_libraries()
|
||||||
comfy_library = libraries.get("comfyui", {})
|
comfy_library = libraries.get("comfyui", {})
|
||||||
default_library = libraries.get("default", {})
|
default_library = libraries.get("default", {})
|
||||||
@@ -442,8 +443,9 @@ class Config:
|
|||||||
"""Return the current library registry and active library name."""
|
"""Return the current library registry and active library name."""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from .services.settings_manager import settings as settings_service
|
from .services.settings_manager import get_settings_manager
|
||||||
|
|
||||||
|
settings_service = get_settings_manager()
|
||||||
libraries = settings_service.get_libraries()
|
libraries = settings_service.get_libraries()
|
||||||
active_library = settings_service.get_active_library_name()
|
active_library = settings_service.get_active_library_name()
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ from .routes.misc_routes import MiscRoutes
|
|||||||
from .routes.preview_routes import PreviewRoutes
|
from .routes.preview_routes import PreviewRoutes
|
||||||
from .routes.example_images_routes import ExampleImagesRoutes
|
from .routes.example_images_routes import ExampleImagesRoutes
|
||||||
from .services.service_registry import ServiceRegistry
|
from .services.service_registry import ServiceRegistry
|
||||||
from .services.settings_manager import settings
|
from .services.settings_manager import get_settings_manager
|
||||||
from .utils.example_images_migration import ExampleImagesMigration
|
from .utils.example_images_migration import ExampleImagesMigration
|
||||||
from .services.websocket_manager import ws_manager
|
from .services.websocket_manager import ws_manager
|
||||||
from .services.example_images_cleanup_service import ExampleImagesCleanupService
|
from .services.example_images_cleanup_service import ExampleImagesCleanupService
|
||||||
@@ -23,6 +23,25 @@ logger = logging.getLogger(__name__)
|
|||||||
# Check if we're in standalone mode
|
# Check if we're in standalone mode
|
||||||
STANDALONE_MODE = 'nodes' not in sys.modules
|
STANDALONE_MODE = 'nodes' not in sys.modules
|
||||||
|
|
||||||
|
|
||||||
|
class _SettingsProxy:
|
||||||
|
def __init__(self):
|
||||||
|
self._manager = None
|
||||||
|
|
||||||
|
def _resolve(self):
|
||||||
|
if self._manager is None:
|
||||||
|
self._manager = get_settings_manager()
|
||||||
|
return self._manager
|
||||||
|
|
||||||
|
def get(self, *args, **kwargs):
|
||||||
|
return self._resolve().get(*args, **kwargs)
|
||||||
|
|
||||||
|
def __getattr__(self, item):
|
||||||
|
return getattr(self._resolve(), item)
|
||||||
|
|
||||||
|
|
||||||
|
settings = _SettingsProxy()
|
||||||
|
|
||||||
class LoraManager:
|
class LoraManager:
|
||||||
"""Main entry point for LoRA Manager plugin"""
|
"""Main entry point for LoRA Manager plugin"""
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ from ..services.model_lifecycle_service import ModelLifecycleService
|
|||||||
from ..services.preview_asset_service import PreviewAssetService
|
from ..services.preview_asset_service import PreviewAssetService
|
||||||
from ..services.server_i18n import server_i18n as default_server_i18n
|
from ..services.server_i18n import server_i18n as default_server_i18n
|
||||||
from ..services.service_registry import ServiceRegistry
|
from ..services.service_registry import ServiceRegistry
|
||||||
from ..services.settings_manager import settings as default_settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
from ..services.tag_update_service import TagUpdateService
|
from ..services.tag_update_service import TagUpdateService
|
||||||
from ..services.websocket_manager import ws_manager as default_ws_manager
|
from ..services.websocket_manager import ws_manager as default_ws_manager
|
||||||
from ..services.use_cases import (
|
from ..services.use_cases import (
|
||||||
@@ -56,14 +56,14 @@ class BaseModelRoutes(ABC):
|
|||||||
self,
|
self,
|
||||||
service=None,
|
service=None,
|
||||||
*,
|
*,
|
||||||
settings_service=default_settings,
|
settings_service=None,
|
||||||
ws_manager=default_ws_manager,
|
ws_manager=default_ws_manager,
|
||||||
server_i18n=default_server_i18n,
|
server_i18n=default_server_i18n,
|
||||||
metadata_provider_factory=get_default_metadata_provider,
|
metadata_provider_factory=get_default_metadata_provider,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.service = None
|
self.service = None
|
||||||
self.model_type = ""
|
self.model_type = ""
|
||||||
self._settings = settings_service
|
self._settings = settings_service or get_settings_manager()
|
||||||
self._ws_manager = ws_manager
|
self._ws_manager = ws_manager
|
||||||
self._server_i18n = server_i18n
|
self._server_i18n = server_i18n
|
||||||
self._metadata_provider_factory = metadata_provider_factory
|
self._metadata_provider_factory = metadata_provider_factory
|
||||||
@@ -90,7 +90,7 @@ class BaseModelRoutes(ABC):
|
|||||||
self._metadata_sync_service = MetadataSyncService(
|
self._metadata_sync_service = MetadataSyncService(
|
||||||
metadata_manager=MetadataManager,
|
metadata_manager=MetadataManager,
|
||||||
preview_service=self._preview_service,
|
preview_service=self._preview_service,
|
||||||
settings=settings_service,
|
settings=self._settings,
|
||||||
default_metadata_provider_factory=metadata_provider_factory,
|
default_metadata_provider_factory=metadata_provider_factory,
|
||||||
metadata_provider_selector=get_metadata_provider,
|
metadata_provider_selector=get_metadata_provider,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ from ..services.recipes import (
|
|||||||
)
|
)
|
||||||
from ..services.server_i18n import server_i18n
|
from ..services.server_i18n import server_i18n
|
||||||
from ..services.service_registry import ServiceRegistry
|
from ..services.service_registry import ServiceRegistry
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
from ..utils.constants import CARD_PREVIEW_WIDTH
|
from ..utils.constants import CARD_PREVIEW_WIDTH
|
||||||
from ..utils.exif_utils import ExifUtils
|
from ..utils.exif_utils import ExifUtils
|
||||||
from .handlers.recipe_handlers import (
|
from .handlers.recipe_handlers import (
|
||||||
@@ -48,7 +48,7 @@ class BaseRecipeRoutes:
|
|||||||
self.recipe_scanner = None
|
self.recipe_scanner = None
|
||||||
self.lora_scanner = None
|
self.lora_scanner = None
|
||||||
self.civitai_client = None
|
self.civitai_client = None
|
||||||
self.settings = settings
|
self.settings = get_settings_manager()
|
||||||
self.server_i18n = server_i18n
|
self.server_i18n = server_i18n
|
||||||
self.template_env = jinja2.Environment(
|
self.template_env = jinja2.Environment(
|
||||||
loader=jinja2.FileSystemLoader(config.templates_path),
|
loader=jinja2.FileSystemLoader(config.templates_path),
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ from ...services.metadata_service import (
|
|||||||
update_metadata_providers,
|
update_metadata_providers,
|
||||||
)
|
)
|
||||||
from ...services.service_registry import ServiceRegistry
|
from ...services.service_registry import ServiceRegistry
|
||||||
from ...services.settings_manager import settings as default_settings
|
from ...services.settings_manager import get_settings_manager
|
||||||
from ...services.websocket_manager import ws_manager
|
from ...services.websocket_manager import ws_manager
|
||||||
from ...services.downloader import get_downloader
|
from ...services.downloader import get_downloader
|
||||||
from ...utils.constants import DEFAULT_NODE_COLOR, NODE_TYPES, SUPPORTED_MEDIA_EXTENSIONS
|
from ...utils.constants import DEFAULT_NODE_COLOR, NODE_TYPES, SUPPORTED_MEDIA_EXTENSIONS
|
||||||
@@ -162,11 +162,11 @@ class SettingsHandler:
|
|||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
settings_service=default_settings,
|
settings_service=None,
|
||||||
metadata_provider_updater: Callable[[], Awaitable[None]] = update_metadata_providers,
|
metadata_provider_updater: Callable[[], Awaitable[None]] = update_metadata_providers,
|
||||||
downloader_factory: Callable[[], Awaitable[DownloaderProtocol]] = get_downloader,
|
downloader_factory: Callable[[], Awaitable[DownloaderProtocol]] = get_downloader,
|
||||||
) -> None:
|
) -> None:
|
||||||
self._settings = settings_service
|
self._settings = settings_service or get_settings_manager()
|
||||||
self._metadata_provider_updater = metadata_provider_updater
|
self._metadata_provider_updater = metadata_provider_updater
|
||||||
self._downloader_factory = downloader_factory
|
self._downloader_factory = downloader_factory
|
||||||
|
|
||||||
@@ -617,11 +617,11 @@ class MetadataArchiveHandler:
|
|||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
metadata_archive_manager_factory: Callable[[], Awaitable[MetadataArchiveManagerProtocol]] = get_metadata_archive_manager,
|
metadata_archive_manager_factory: Callable[[], Awaitable[MetadataArchiveManagerProtocol]] = get_metadata_archive_manager,
|
||||||
settings_service=default_settings,
|
settings_service=None,
|
||||||
metadata_provider_updater: Callable[[], Awaitable[None]] = update_metadata_providers,
|
metadata_provider_updater: Callable[[], Awaitable[None]] = update_metadata_providers,
|
||||||
) -> None:
|
) -> None:
|
||||||
self._metadata_archive_manager_factory = metadata_archive_manager_factory
|
self._metadata_archive_manager_factory = metadata_archive_manager_factory
|
||||||
self._settings = settings_service
|
self._settings = settings_service or get_settings_manager()
|
||||||
self._metadata_provider_updater = metadata_provider_updater
|
self._metadata_provider_updater = metadata_provider_updater
|
||||||
|
|
||||||
async def download_metadata_archive(self, request: web.Request) -> web.Response:
|
async def download_metadata_archive(self, request: web.Request) -> web.Response:
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ from ..services.metadata_service import (
|
|||||||
get_metadata_provider,
|
get_metadata_provider,
|
||||||
update_metadata_providers,
|
update_metadata_providers,
|
||||||
)
|
)
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
from ..services.downloader import get_downloader
|
from ..services.downloader import get_downloader
|
||||||
from ..utils.usage_stats import UsageStats
|
from ..utils.usage_stats import UsageStats
|
||||||
from .handlers.misc_handlers import (
|
from .handlers.misc_handlers import (
|
||||||
@@ -47,7 +47,7 @@ class MiscRoutes:
|
|||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
settings_service=settings,
|
settings_service=None,
|
||||||
usage_stats_factory: Callable[[], UsageStats] = UsageStats,
|
usage_stats_factory: Callable[[], UsageStats] = UsageStats,
|
||||||
prompt_server: type[PromptServer] = PromptServer,
|
prompt_server: type[PromptServer] = PromptServer,
|
||||||
service_registry_adapter=build_service_registry_adapter(),
|
service_registry_adapter=build_service_registry_adapter(),
|
||||||
@@ -60,7 +60,7 @@ class MiscRoutes:
|
|||||||
node_registry: NodeRegistry | None = None,
|
node_registry: NodeRegistry | None = None,
|
||||||
standalone_mode_flag: bool = standalone_mode,
|
standalone_mode_flag: bool = standalone_mode,
|
||||||
) -> None:
|
) -> None:
|
||||||
self._settings = settings_service
|
self._settings = settings_service or get_settings_manager()
|
||||||
self._usage_stats_factory = usage_stats_factory
|
self._usage_stats_factory = usage_stats_factory
|
||||||
self._prompt_server = prompt_server
|
self._prompt_server = prompt_server
|
||||||
self._service_registry_adapter = service_registry_adapter
|
self._service_registry_adapter = service_registry_adapter
|
||||||
|
|||||||
@@ -8,13 +8,32 @@ from collections import defaultdict, Counter
|
|||||||
from typing import Dict, List, Any
|
from typing import Dict, List, Any
|
||||||
|
|
||||||
from ..config import config
|
from ..config import config
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
from ..services.server_i18n import server_i18n
|
from ..services.server_i18n import server_i18n
|
||||||
from ..services.service_registry import ServiceRegistry
|
from ..services.service_registry import ServiceRegistry
|
||||||
from ..utils.usage_stats import UsageStats
|
from ..utils.usage_stats import UsageStats
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class _SettingsProxy:
|
||||||
|
def __init__(self):
|
||||||
|
self._manager = None
|
||||||
|
|
||||||
|
def _resolve(self):
|
||||||
|
if self._manager is None:
|
||||||
|
self._manager = get_settings_manager()
|
||||||
|
return self._manager
|
||||||
|
|
||||||
|
def get(self, *args, **kwargs):
|
||||||
|
return self._resolve().get(*args, **kwargs)
|
||||||
|
|
||||||
|
def __getattr__(self, item):
|
||||||
|
return getattr(self._resolve(), item)
|
||||||
|
|
||||||
|
|
||||||
|
settings = _SettingsProxy()
|
||||||
|
|
||||||
class StatsRoutes:
|
class StatsRoutes:
|
||||||
"""Route handlers for Statistics page and API endpoints"""
|
"""Route handlers for Statistics page and API endpoints"""
|
||||||
|
|
||||||
@@ -66,7 +85,9 @@ class StatsRoutes:
|
|||||||
is_initializing = lora_initializing or checkpoint_initializing or embedding_initializing
|
is_initializing = lora_initializing or checkpoint_initializing or embedding_initializing
|
||||||
|
|
||||||
# 获取用户语言设置
|
# 获取用户语言设置
|
||||||
user_language = settings.get('language', 'en')
|
settings_object = settings
|
||||||
|
user_language = settings_object.get('language', 'en')
|
||||||
|
settings_manager = settings_object if not isinstance(settings_object, _SettingsProxy) else settings_object._resolve()
|
||||||
|
|
||||||
# 设置服务端i18n语言
|
# 设置服务端i18n语言
|
||||||
server_i18n.set_locale(user_language)
|
server_i18n.set_locale(user_language)
|
||||||
@@ -79,7 +100,7 @@ class StatsRoutes:
|
|||||||
template = self.template_env.get_template('statistics.html')
|
template = self.template_env.get_template('statistics.html')
|
||||||
rendered = template.render(
|
rendered = template.render(
|
||||||
is_initializing=is_initializing,
|
is_initializing=is_initializing,
|
||||||
settings=settings,
|
settings=settings_manager,
|
||||||
request=request,
|
request=request,
|
||||||
t=server_i18n.get_translation,
|
t=server_i18n.get_translation,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import os
|
|||||||
from ..utils.models import BaseModelMetadata
|
from ..utils.models import BaseModelMetadata
|
||||||
from ..utils.metadata_manager import MetadataManager
|
from ..utils.metadata_manager import MetadataManager
|
||||||
from .model_query import FilterCriteria, ModelCacheRepository, ModelFilterSet, SearchStrategy, SettingsProvider
|
from .model_query import FilterCriteria, ModelCacheRepository, ModelFilterSet, SearchStrategy, SettingsProvider
|
||||||
from .settings_manager import settings as default_settings
|
from .settings_manager import get_settings_manager
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -38,7 +38,7 @@ class BaseModelService(ABC):
|
|||||||
self.model_type = model_type
|
self.model_type = model_type
|
||||||
self.scanner = scanner
|
self.scanner = scanner
|
||||||
self.metadata_class = metadata_class
|
self.metadata_class = metadata_class
|
||||||
self.settings = settings_provider or default_settings
|
self.settings = settings_provider or get_settings_manager()
|
||||||
self.cache_repository = cache_repository or ModelCacheRepository(scanner)
|
self.cache_repository = cache_repository or ModelCacheRepository(scanner)
|
||||||
self.filter_set = filter_set or ModelFilterSet(self.settings)
|
self.filter_set = filter_set or ModelFilterSet(self.settings)
|
||||||
self.search_strategy = search_strategy or SearchStrategy()
|
self.search_strategy = search_strategy or SearchStrategy()
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ from ..utils.constants import CARD_PREVIEW_WIDTH, VALID_LORA_TYPES, CIVITAI_MODE
|
|||||||
from ..utils.exif_utils import ExifUtils
|
from ..utils.exif_utils import ExifUtils
|
||||||
from ..utils.metadata_manager import MetadataManager
|
from ..utils.metadata_manager import MetadataManager
|
||||||
from .service_registry import ServiceRegistry
|
from .service_registry import ServiceRegistry
|
||||||
from .settings_manager import settings
|
from .settings_manager import get_settings_manager
|
||||||
from .metadata_service import get_default_metadata_provider
|
from .metadata_service import get_default_metadata_provider
|
||||||
from .downloader import get_downloader
|
from .downloader import get_downloader
|
||||||
|
|
||||||
@@ -241,19 +241,20 @@ class DownloadManager:
|
|||||||
|
|
||||||
# Handle use_default_paths
|
# Handle use_default_paths
|
||||||
if use_default_paths:
|
if use_default_paths:
|
||||||
|
settings_manager = get_settings_manager()
|
||||||
# Set save_dir based on model type
|
# Set save_dir based on model type
|
||||||
if model_type == 'checkpoint':
|
if model_type == 'checkpoint':
|
||||||
default_path = settings.get('default_checkpoint_root')
|
default_path = settings_manager.get('default_checkpoint_root')
|
||||||
if not default_path:
|
if not default_path:
|
||||||
return {'success': False, 'error': 'Default checkpoint root path not set in settings'}
|
return {'success': False, 'error': 'Default checkpoint root path not set in settings'}
|
||||||
save_dir = default_path
|
save_dir = default_path
|
||||||
elif model_type == 'lora':
|
elif model_type == 'lora':
|
||||||
default_path = settings.get('default_lora_root')
|
default_path = settings_manager.get('default_lora_root')
|
||||||
if not default_path:
|
if not default_path:
|
||||||
return {'success': False, 'error': 'Default lora root path not set in settings'}
|
return {'success': False, 'error': 'Default lora root path not set in settings'}
|
||||||
save_dir = default_path
|
save_dir = default_path
|
||||||
elif model_type == 'embedding':
|
elif model_type == 'embedding':
|
||||||
default_path = settings.get('default_embedding_root')
|
default_path = settings_manager.get('default_embedding_root')
|
||||||
if not default_path:
|
if not default_path:
|
||||||
return {'success': False, 'error': 'Default embedding root path not set in settings'}
|
return {'success': False, 'error': 'Default embedding root path not set in settings'}
|
||||||
save_dir = default_path
|
save_dir = default_path
|
||||||
@@ -360,7 +361,8 @@ class DownloadManager:
|
|||||||
Relative path string
|
Relative path string
|
||||||
"""
|
"""
|
||||||
# Get path template from settings for specific model type
|
# Get path template from settings for specific model type
|
||||||
path_template = settings.get_download_path_template(model_type)
|
settings_manager = get_settings_manager()
|
||||||
|
path_template = settings_manager.get_download_path_template(model_type)
|
||||||
|
|
||||||
# If template is empty, return empty path (flat structure)
|
# If template is empty, return empty path (flat structure)
|
||||||
if not path_template:
|
if not path_template:
|
||||||
@@ -377,7 +379,7 @@ class DownloadManager:
|
|||||||
author = 'Anonymous'
|
author = 'Anonymous'
|
||||||
|
|
||||||
# Apply mapping if available
|
# Apply mapping if available
|
||||||
base_model_mappings = settings.get('base_model_path_mappings', {})
|
base_model_mappings = settings_manager.get('base_model_path_mappings', {})
|
||||||
mapped_base_model = base_model_mappings.get(base_model, base_model)
|
mapped_base_model = base_model_mappings.get(base_model, base_model)
|
||||||
|
|
||||||
# Get model tags
|
# Get model tags
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ import asyncio
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional, Dict, Tuple, Callable, Union
|
from typing import Optional, Dict, Tuple, Callable, Union
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -94,12 +94,13 @@ class Downloader:
|
|||||||
|
|
||||||
# Check for app-level proxy settings
|
# Check for app-level proxy settings
|
||||||
proxy_url = None
|
proxy_url = None
|
||||||
if settings.get('proxy_enabled', False):
|
settings_manager = get_settings_manager()
|
||||||
proxy_host = settings.get('proxy_host', '').strip()
|
if settings_manager.get('proxy_enabled', False):
|
||||||
proxy_port = settings.get('proxy_port', '').strip()
|
proxy_host = settings_manager.get('proxy_host', '').strip()
|
||||||
proxy_type = settings.get('proxy_type', 'http').lower()
|
proxy_port = settings_manager.get('proxy_port', '').strip()
|
||||||
proxy_username = settings.get('proxy_username', '').strip()
|
proxy_type = settings_manager.get('proxy_type', 'http').lower()
|
||||||
proxy_password = settings.get('proxy_password', '').strip()
|
proxy_username = settings_manager.get('proxy_username', '').strip()
|
||||||
|
proxy_password = settings_manager.get('proxy_password', '').strip()
|
||||||
|
|
||||||
if proxy_host and proxy_port:
|
if proxy_host and proxy_port:
|
||||||
# Build proxy URL
|
# Build proxy URL
|
||||||
@@ -146,7 +147,8 @@ class Downloader:
|
|||||||
|
|
||||||
if use_auth:
|
if use_auth:
|
||||||
# Add CivitAI API key if available
|
# Add CivitAI API key if available
|
||||||
api_key = settings.get('civitai_api_key')
|
settings_manager = get_settings_manager()
|
||||||
|
api_key = settings_manager.get('civitai_api_key')
|
||||||
if api_key:
|
if api_key:
|
||||||
headers['Authorization'] = f'Bearer {api_key}'
|
headers['Authorization'] = f'Bearer {api_key}'
|
||||||
headers['Content-Type'] = 'application/json'
|
headers['Content-Type'] = 'application/json'
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ from pathlib import Path
|
|||||||
from typing import Dict, List, Tuple
|
from typing import Dict, List, Tuple
|
||||||
|
|
||||||
from .service_registry import ServiceRegistry
|
from .service_registry import ServiceRegistry
|
||||||
from .settings_manager import settings
|
from .settings_manager import get_settings_manager
|
||||||
from ..utils.example_images_paths import iter_library_roots
|
from ..utils.example_images_paths import iter_library_roots
|
||||||
|
|
||||||
|
|
||||||
@@ -62,7 +62,8 @@ class ExampleImagesCleanupService:
|
|||||||
async def cleanup_example_image_folders(self) -> Dict[str, object]:
|
async def cleanup_example_image_folders(self) -> Dict[str, object]:
|
||||||
"""Clean empty or orphaned example image folders by moving them under a deleted bucket."""
|
"""Clean empty or orphaned example image folders by moving them under a deleted bucket."""
|
||||||
|
|
||||||
example_images_path = settings.get("example_images_path")
|
settings_manager = get_settings_manager()
|
||||||
|
example_images_path = settings_manager.get("example_images_path")
|
||||||
if not example_images_path:
|
if not example_images_path:
|
||||||
logger.debug("Cleanup skipped: example images path not configured")
|
logger.debug("Cleanup skipped: example images path not configured")
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from .model_metadata_provider import (
|
|||||||
CivitaiModelMetadataProvider,
|
CivitaiModelMetadataProvider,
|
||||||
FallbackMetadataProvider
|
FallbackMetadataProvider
|
||||||
)
|
)
|
||||||
from .settings_manager import settings
|
from .settings_manager import get_settings_manager
|
||||||
from .metadata_archive_manager import MetadataArchiveManager
|
from .metadata_archive_manager import MetadataArchiveManager
|
||||||
from .service_registry import ServiceRegistry
|
from .service_registry import ServiceRegistry
|
||||||
|
|
||||||
@@ -21,7 +21,8 @@ async def initialize_metadata_providers():
|
|||||||
provider_manager.default_provider = None
|
provider_manager.default_provider = None
|
||||||
|
|
||||||
# Get settings
|
# Get settings
|
||||||
enable_archive_db = settings.get('enable_metadata_archive_db', False)
|
settings_manager = get_settings_manager()
|
||||||
|
enable_archive_db = settings_manager.get('enable_metadata_archive_db', False)
|
||||||
|
|
||||||
providers = []
|
providers = []
|
||||||
|
|
||||||
@@ -87,7 +88,8 @@ async def update_metadata_providers():
|
|||||||
"""Update metadata providers based on current settings"""
|
"""Update metadata providers based on current settings"""
|
||||||
try:
|
try:
|
||||||
# Get current settings
|
# Get current settings
|
||||||
enable_archive_db = settings.get('enable_metadata_archive_db', False)
|
settings_manager = get_settings_manager()
|
||||||
|
enable_archive_db = settings_manager.get('enable_metadata_archive_db', False)
|
||||||
|
|
||||||
# Reinitialize all providers with new settings
|
# Reinitialize all providers with new settings
|
||||||
provider_manager = await initialize_metadata_providers()
|
provider_manager = await initialize_metadata_providers()
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from abc import ABC, abstractmethod
|
|||||||
|
|
||||||
from ..utils.utils import calculate_relative_path_for_model, remove_empty_dirs
|
from ..utils.utils import calculate_relative_path_for_model, remove_empty_dirs
|
||||||
from ..utils.constants import AUTO_ORGANIZE_BATCH_SIZE
|
from ..utils.constants import AUTO_ORGANIZE_BATCH_SIZE
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -114,7 +114,8 @@ class ModelFileService:
|
|||||||
raise ValueError('No model roots configured')
|
raise ValueError('No model roots configured')
|
||||||
|
|
||||||
# Check if flat structure is configured for this model type
|
# Check if flat structure is configured for this model type
|
||||||
path_template = settings.get_download_path_template(self.model_type)
|
settings_manager = get_settings_manager()
|
||||||
|
path_template = settings_manager.get_download_path_template(self.model_type)
|
||||||
result.is_flat_structure = not path_template
|
result.is_flat_structure = not path_template
|
||||||
|
|
||||||
# Initialize tracking
|
# Initialize tracking
|
||||||
|
|||||||
@@ -351,7 +351,7 @@ class PersistentModelCache:
|
|||||||
|
|
||||||
|
|
||||||
def get_persistent_cache() -> PersistentModelCache:
|
def get_persistent_cache() -> PersistentModelCache:
|
||||||
from .settings_manager import settings as settings_service # Local import to avoid cycles
|
from .settings_manager import get_settings_manager # Local import to avoid cycles
|
||||||
|
|
||||||
library_name = settings_service.get_active_library_name()
|
library_name = get_settings_manager().get_active_library_name()
|
||||||
return PersistentModelCache.get_default(library_name)
|
return PersistentModelCache.get_default(library_name)
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import json
|
|||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
from threading import Lock
|
||||||
from typing import Any, Dict, Iterable, List, Mapping, Optional
|
from typing import Any, Dict, Iterable, List, Mapping, Optional
|
||||||
|
|
||||||
from ..utils.settings_paths import ensure_settings_file
|
from ..utils.settings_paths import ensure_settings_file
|
||||||
@@ -688,4 +689,38 @@ class SettingsManager:
|
|||||||
|
|
||||||
return templates.get(model_type, '{base_model}/{first_tag}')
|
return templates.get(model_type, '{base_model}/{first_tag}')
|
||||||
|
|
||||||
settings = SettingsManager()
|
|
||||||
|
_SETTINGS_MANAGER: Optional["SettingsManager"] = None
|
||||||
|
_SETTINGS_MANAGER_LOCK = Lock()
|
||||||
|
# Legacy module-level alias for backwards compatibility with callers that
|
||||||
|
# monkeypatch ``py.services.settings_manager.settings`` during tests.
|
||||||
|
settings: Optional["SettingsManager"] = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_settings_manager() -> "SettingsManager":
|
||||||
|
"""Return the lazily initialised global :class:`SettingsManager`."""
|
||||||
|
|
||||||
|
global _SETTINGS_MANAGER, settings
|
||||||
|
if settings is not None:
|
||||||
|
return settings
|
||||||
|
|
||||||
|
if _SETTINGS_MANAGER is None:
|
||||||
|
with _SETTINGS_MANAGER_LOCK:
|
||||||
|
if _SETTINGS_MANAGER is None:
|
||||||
|
_SETTINGS_MANAGER = SettingsManager()
|
||||||
|
|
||||||
|
settings = _SETTINGS_MANAGER
|
||||||
|
return _SETTINGS_MANAGER
|
||||||
|
|
||||||
|
|
||||||
|
def reset_settings_manager() -> None:
|
||||||
|
"""Reset the cached settings manager instance.
|
||||||
|
|
||||||
|
Primarily intended for tests so they can configure the settings
|
||||||
|
directory before the manager touches the filesystem.
|
||||||
|
"""
|
||||||
|
|
||||||
|
global _SETTINGS_MANAGER, settings
|
||||||
|
with _SETTINGS_MANAGER_LOCK:
|
||||||
|
_SETTINGS_MANAGER = None
|
||||||
|
settings = None
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ from ..utils.metadata_manager import MetadataManager
|
|||||||
from .example_images_processor import ExampleImagesProcessor
|
from .example_images_processor import ExampleImagesProcessor
|
||||||
from .example_images_metadata import MetadataUpdater
|
from .example_images_metadata import MetadataUpdater
|
||||||
from ..services.downloader import get_downloader
|
from ..services.downloader import get_downloader
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
|
|
||||||
|
|
||||||
class ExampleImagesDownloadError(RuntimeError):
|
class ExampleImagesDownloadError(RuntimeError):
|
||||||
@@ -107,7 +107,7 @@ class DownloadManager:
|
|||||||
self._state_lock = state_lock or asyncio.Lock()
|
self._state_lock = state_lock or asyncio.Lock()
|
||||||
|
|
||||||
def _resolve_output_dir(self, library_name: str | None = None) -> str:
|
def _resolve_output_dir(self, library_name: str | None = None) -> str:
|
||||||
base_path = settings.get('example_images_path')
|
base_path = get_settings_manager().get('example_images_path')
|
||||||
if not base_path:
|
if not base_path:
|
||||||
return ''
|
return ''
|
||||||
return ensure_library_root_exists(library_name)
|
return ensure_library_root_exists(library_name)
|
||||||
@@ -126,7 +126,8 @@ class DownloadManager:
|
|||||||
model_types = data.get('model_types', ['lora', 'checkpoint'])
|
model_types = data.get('model_types', ['lora', 'checkpoint'])
|
||||||
delay = float(data.get('delay', 0.2))
|
delay = float(data.get('delay', 0.2))
|
||||||
|
|
||||||
base_path = settings.get('example_images_path')
|
settings_manager = get_settings_manager()
|
||||||
|
base_path = settings_manager.get('example_images_path')
|
||||||
|
|
||||||
if not base_path:
|
if not base_path:
|
||||||
error_msg = 'Example images path not configured in settings'
|
error_msg = 'Example images path not configured in settings'
|
||||||
@@ -138,7 +139,7 @@ class DownloadManager:
|
|||||||
}
|
}
|
||||||
raise DownloadConfigurationError(error_msg)
|
raise DownloadConfigurationError(error_msg)
|
||||||
|
|
||||||
active_library = settings.get_active_library_name()
|
active_library = get_settings_manager().get_active_library_name()
|
||||||
output_dir = self._resolve_output_dir(active_library)
|
output_dir = self._resolve_output_dir(active_library)
|
||||||
if not output_dir:
|
if not output_dir:
|
||||||
raise DownloadConfigurationError('Example images path not configured in settings')
|
raise DownloadConfigurationError('Example images path not configured in settings')
|
||||||
@@ -151,7 +152,7 @@ class DownloadManager:
|
|||||||
progress_file = os.path.join(output_dir, '.download_progress.json')
|
progress_file = os.path.join(output_dir, '.download_progress.json')
|
||||||
progress_source = progress_file
|
progress_source = progress_file
|
||||||
if uses_library_scoped_folders():
|
if uses_library_scoped_folders():
|
||||||
legacy_root = settings.get('example_images_path') or ''
|
legacy_root = get_settings_manager().get('example_images_path') or ''
|
||||||
legacy_progress = os.path.join(legacy_root, '.download_progress.json') if legacy_root else ''
|
legacy_progress = os.path.join(legacy_root, '.download_progress.json') if legacy_root else ''
|
||||||
if legacy_progress and os.path.exists(legacy_progress) and not os.path.exists(progress_file):
|
if legacy_progress and os.path.exists(legacy_progress) and not os.path.exists(progress_file):
|
||||||
try:
|
try:
|
||||||
@@ -555,11 +556,12 @@ class DownloadManager:
|
|||||||
if not model_hashes:
|
if not model_hashes:
|
||||||
raise DownloadConfigurationError('Missing model_hashes parameter')
|
raise DownloadConfigurationError('Missing model_hashes parameter')
|
||||||
|
|
||||||
base_path = settings.get('example_images_path')
|
settings_manager = get_settings_manager()
|
||||||
|
base_path = settings_manager.get('example_images_path')
|
||||||
|
|
||||||
if not base_path:
|
if not base_path:
|
||||||
raise DownloadConfigurationError('Example images path not configured in settings')
|
raise DownloadConfigurationError('Example images path not configured in settings')
|
||||||
active_library = settings.get_active_library_name()
|
active_library = settings_manager.get_active_library_name()
|
||||||
output_dir = self._resolve_output_dir(active_library)
|
output_dir = self._resolve_output_dir(active_library)
|
||||||
if not output_dir:
|
if not output_dir:
|
||||||
raise DownloadConfigurationError('Example images path not configured in settings')
|
raise DownloadConfigurationError('Example images path not configured in settings')
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import os
|
|||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
from ..utils.example_images_paths import (
|
from ..utils.example_images_paths import (
|
||||||
get_model_folder,
|
get_model_folder,
|
||||||
get_model_relative_path,
|
get_model_relative_path,
|
||||||
@@ -37,7 +37,8 @@ class ExampleImagesFileManager:
|
|||||||
}, status=400)
|
}, status=400)
|
||||||
|
|
||||||
# Get example images path from settings
|
# Get example images path from settings
|
||||||
example_images_path = settings.get('example_images_path')
|
settings_manager = get_settings_manager()
|
||||||
|
example_images_path = settings_manager.get('example_images_path')
|
||||||
if not example_images_path:
|
if not example_images_path:
|
||||||
return web.json_response({
|
return web.json_response({
|
||||||
'success': False,
|
'success': False,
|
||||||
@@ -109,7 +110,8 @@ class ExampleImagesFileManager:
|
|||||||
}, status=400)
|
}, status=400)
|
||||||
|
|
||||||
# Get example images path from settings
|
# Get example images path from settings
|
||||||
example_images_path = settings.get('example_images_path')
|
settings_manager = get_settings_manager()
|
||||||
|
example_images_path = settings_manager.get('example_images_path')
|
||||||
if not example_images_path:
|
if not example_images_path:
|
||||||
return web.json_response({
|
return web.json_response({
|
||||||
'success': False,
|
'success': False,
|
||||||
@@ -183,7 +185,8 @@ class ExampleImagesFileManager:
|
|||||||
}, status=400)
|
}, status=400)
|
||||||
|
|
||||||
# Get example images path from settings
|
# Get example images path from settings
|
||||||
example_images_path = settings.get('example_images_path')
|
settings_manager = get_settings_manager()
|
||||||
|
example_images_path = settings_manager.get('example_images_path')
|
||||||
if not example_images_path:
|
if not example_images_path:
|
||||||
return web.json_response({
|
return web.json_response({
|
||||||
'has_images': False
|
'has_images': False
|
||||||
|
|||||||
@@ -1,12 +1,13 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
from typing import TYPE_CHECKING, Optional
|
||||||
|
|
||||||
from ..recipes.constants import GEN_PARAM_KEYS
|
from ..recipes.constants import GEN_PARAM_KEYS
|
||||||
from ..services.metadata_service import get_default_metadata_provider, get_metadata_provider
|
from ..services.metadata_service import get_default_metadata_provider, get_metadata_provider
|
||||||
from ..services.metadata_sync_service import MetadataSyncService
|
from ..services.metadata_sync_service import MetadataSyncService
|
||||||
from ..services.preview_asset_service import PreviewAssetService
|
from ..services.preview_asset_service import PreviewAssetService
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
from ..services.downloader import get_downloader
|
from ..services.downloader import get_downloader
|
||||||
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
||||||
from ..utils.exif_utils import ExifUtils
|
from ..utils.exif_utils import ExifUtils
|
||||||
@@ -20,13 +21,46 @@ _preview_service = PreviewAssetService(
|
|||||||
exif_utils=ExifUtils,
|
exif_utils=ExifUtils,
|
||||||
)
|
)
|
||||||
|
|
||||||
_metadata_sync_service = MetadataSyncService(
|
_metadata_sync_service: MetadataSyncService | None = None
|
||||||
|
_metadata_sync_service_settings: Optional["SettingsManager"] = None
|
||||||
|
|
||||||
|
if TYPE_CHECKING: # pragma: no cover - import for type checkers only
|
||||||
|
from ..services.settings_manager import SettingsManager
|
||||||
|
|
||||||
|
|
||||||
|
def _build_metadata_sync_service(settings_manager: "SettingsManager") -> MetadataSyncService:
|
||||||
|
"""Construct a metadata sync service bound to the provided settings."""
|
||||||
|
|
||||||
|
return MetadataSyncService(
|
||||||
metadata_manager=MetadataManager,
|
metadata_manager=MetadataManager,
|
||||||
preview_service=_preview_service,
|
preview_service=_preview_service,
|
||||||
settings=settings,
|
settings=settings_manager,
|
||||||
default_metadata_provider_factory=get_default_metadata_provider,
|
default_metadata_provider_factory=get_default_metadata_provider,
|
||||||
metadata_provider_selector=get_metadata_provider,
|
metadata_provider_selector=get_metadata_provider,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_metadata_sync_service() -> MetadataSyncService:
|
||||||
|
"""Return the shared metadata sync service, initialising it lazily."""
|
||||||
|
|
||||||
|
global _metadata_sync_service, _metadata_sync_service_settings
|
||||||
|
|
||||||
|
settings_manager = get_settings_manager()
|
||||||
|
|
||||||
|
if isinstance(_metadata_sync_service, MetadataSyncService):
|
||||||
|
if _metadata_sync_service_settings is not settings_manager:
|
||||||
|
_metadata_sync_service = _build_metadata_sync_service(settings_manager)
|
||||||
|
_metadata_sync_service_settings = settings_manager
|
||||||
|
elif _metadata_sync_service is None:
|
||||||
|
_metadata_sync_service = _build_metadata_sync_service(settings_manager)
|
||||||
|
_metadata_sync_service_settings = settings_manager
|
||||||
|
else:
|
||||||
|
# Tests may inject stand-ins that do not match the sync service type. Preserve
|
||||||
|
# those injections while still updating our cached settings reference so the
|
||||||
|
# next real service instantiation uses the current configuration.
|
||||||
|
_metadata_sync_service_settings = settings_manager
|
||||||
|
|
||||||
|
return _metadata_sync_service
|
||||||
|
|
||||||
|
|
||||||
class MetadataUpdater:
|
class MetadataUpdater:
|
||||||
@@ -71,7 +105,7 @@ class MetadataUpdater:
|
|||||||
async def update_cache_func(old_path, new_path, metadata):
|
async def update_cache_func(old_path, new_path, metadata):
|
||||||
return await scanner.update_single_model_cache(old_path, new_path, metadata)
|
return await scanner.update_single_model_cache(old_path, new_path, metadata)
|
||||||
|
|
||||||
success, error = await _metadata_sync_service.fetch_and_update_model(
|
success, error = await _get_metadata_sync_service().fetch_and_update_model(
|
||||||
sha256=model_hash,
|
sha256=model_hash,
|
||||||
file_path=file_path,
|
file_path=file_path,
|
||||||
model_data=model_data,
|
model_data=model_data,
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import json
|
import json
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
from ..services.service_registry import ServiceRegistry
|
from ..services.service_registry import ServiceRegistry
|
||||||
from ..utils.example_images_paths import iter_library_roots
|
from ..utils.example_images_paths import iter_library_roots
|
||||||
from ..utils.metadata_manager import MetadataManager
|
from ..utils.metadata_manager import MetadataManager
|
||||||
@@ -14,6 +14,25 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
CURRENT_NAMING_VERSION = 2 # Increment this when naming conventions change
|
CURRENT_NAMING_VERSION = 2 # Increment this when naming conventions change
|
||||||
|
|
||||||
|
|
||||||
|
class _SettingsProxy:
|
||||||
|
def __init__(self):
|
||||||
|
self._manager = None
|
||||||
|
|
||||||
|
def _resolve(self):
|
||||||
|
if self._manager is None:
|
||||||
|
self._manager = get_settings_manager()
|
||||||
|
return self._manager
|
||||||
|
|
||||||
|
def get(self, *args, **kwargs):
|
||||||
|
return self._resolve().get(*args, **kwargs)
|
||||||
|
|
||||||
|
def __getattr__(self, item):
|
||||||
|
return getattr(self._resolve(), item)
|
||||||
|
|
||||||
|
|
||||||
|
settings = _SettingsProxy()
|
||||||
|
|
||||||
class ExampleImagesMigration:
|
class ExampleImagesMigration:
|
||||||
"""Handles migrations for example images naming conventions"""
|
"""Handles migrations for example images naming conventions"""
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import re
|
|||||||
import shutil
|
import shutil
|
||||||
from typing import Iterable, List, Optional, Tuple
|
from typing import Iterable, List, Optional, Tuple
|
||||||
|
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
|
|
||||||
_HEX_PATTERN = re.compile(r"[a-fA-F0-9]{64}")
|
_HEX_PATTERN = re.compile(r"[a-fA-F0-9]{64}")
|
||||||
|
|
||||||
@@ -18,7 +18,8 @@ logger = logging.getLogger(__name__)
|
|||||||
def _get_configured_libraries() -> List[str]:
|
def _get_configured_libraries() -> List[str]:
|
||||||
"""Return configured library names if multi-library support is enabled."""
|
"""Return configured library names if multi-library support is enabled."""
|
||||||
|
|
||||||
libraries = settings.get("libraries")
|
settings_manager = get_settings_manager()
|
||||||
|
libraries = settings_manager.get("libraries")
|
||||||
if isinstance(libraries, dict) and libraries:
|
if isinstance(libraries, dict) and libraries:
|
||||||
return list(libraries.keys())
|
return list(libraries.keys())
|
||||||
return []
|
return []
|
||||||
@@ -27,7 +28,8 @@ def _get_configured_libraries() -> List[str]:
|
|||||||
def get_example_images_root() -> str:
|
def get_example_images_root() -> str:
|
||||||
"""Return the root directory configured for example images."""
|
"""Return the root directory configured for example images."""
|
||||||
|
|
||||||
root = settings.get("example_images_path") or ""
|
settings_manager = get_settings_manager()
|
||||||
|
root = settings_manager.get("example_images_path") or ""
|
||||||
return os.path.abspath(root) if root else ""
|
return os.path.abspath(root) if root else ""
|
||||||
|
|
||||||
|
|
||||||
@@ -41,7 +43,8 @@ def uses_library_scoped_folders() -> bool:
|
|||||||
def sanitize_library_name(library_name: Optional[str]) -> str:
|
def sanitize_library_name(library_name: Optional[str]) -> str:
|
||||||
"""Return a filesystem safe library name."""
|
"""Return a filesystem safe library name."""
|
||||||
|
|
||||||
name = library_name or settings.get_active_library_name() or "default"
|
settings_manager = get_settings_manager()
|
||||||
|
name = library_name or settings_manager.get_active_library_name() or "default"
|
||||||
safe_name = re.sub(r"[^A-Za-z0-9_.-]", "_", name)
|
safe_name = re.sub(r"[^A-Za-z0-9_.-]", "_", name)
|
||||||
return safe_name or "default"
|
return safe_name or "default"
|
||||||
|
|
||||||
@@ -161,11 +164,13 @@ def iter_library_roots() -> Iterable[Tuple[str, str]]:
|
|||||||
results.append((library, get_library_root(library)))
|
results.append((library, get_library_root(library)))
|
||||||
else:
|
else:
|
||||||
# Fall back to the active library to avoid skipping migrations/cleanup
|
# Fall back to the active library to avoid skipping migrations/cleanup
|
||||||
active = settings.get_active_library_name() or "default"
|
settings_manager = get_settings_manager()
|
||||||
|
active = settings_manager.get_active_library_name() or "default"
|
||||||
results.append((active, get_library_root(active)))
|
results.append((active, get_library_root(active)))
|
||||||
return results
|
return results
|
||||||
|
|
||||||
active = settings.get_active_library_name() or "default"
|
settings_manager = get_settings_manager()
|
||||||
|
active = settings_manager.get_active_library_name() or "default"
|
||||||
return [(active, root)]
|
return [(active, root)]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import string
|
|||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
||||||
from ..services.service_registry import ServiceRegistry
|
from ..services.service_registry import ServiceRegistry
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
from ..utils.example_images_paths import get_model_folder, get_model_relative_path
|
from ..utils.example_images_paths import get_model_folder, get_model_relative_path
|
||||||
from .example_images_metadata import MetadataUpdater
|
from .example_images_metadata import MetadataUpdater
|
||||||
from ..utils.metadata_manager import MetadataManager
|
from ..utils.metadata_manager import MetadataManager
|
||||||
@@ -318,7 +318,7 @@ class ExampleImagesProcessor:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# Get example images path
|
# Get example images path
|
||||||
example_images_path = settings.get('example_images_path')
|
example_images_path = get_settings_manager().get('example_images_path')
|
||||||
if not example_images_path:
|
if not example_images_path:
|
||||||
raise ExampleImagesValidationError('No example images path configured')
|
raise ExampleImagesValidationError('No example images path configured')
|
||||||
|
|
||||||
@@ -442,7 +442,7 @@ class ExampleImagesProcessor:
|
|||||||
}, status=400)
|
}, status=400)
|
||||||
|
|
||||||
# Get example images path
|
# Get example images path
|
||||||
example_images_path = settings.get('example_images_path')
|
example_images_path = get_settings_manager().get('example_images_path')
|
||||||
if not example_images_path:
|
if not example_images_path:
|
||||||
return web.json_response({
|
return web.json_response({
|
||||||
'success': False,
|
'success': False,
|
||||||
|
|||||||
@@ -65,6 +65,12 @@ def ensure_settings_file(logger: Optional[logging.Logger] = None) -> str:
|
|||||||
|
|
||||||
logger = logger or _LOGGER
|
logger = logger or _LOGGER
|
||||||
target_path = get_settings_file_path(create_dir=True)
|
target_path = get_settings_file_path(create_dir=True)
|
||||||
|
preferred_dir = user_config_dir(APP_NAME, appauthor=False)
|
||||||
|
preferred_path = os.path.join(preferred_dir, "settings.json")
|
||||||
|
|
||||||
|
if os.path.abspath(target_path) != os.path.abspath(preferred_path):
|
||||||
|
os.makedirs(preferred_dir, exist_ok=True)
|
||||||
|
target_path = preferred_path
|
||||||
legacy_path = get_legacy_settings_path()
|
legacy_path = get_legacy_settings_path()
|
||||||
|
|
||||||
if os.path.exists(legacy_path) and not os.path.exists(target_path):
|
if os.path.exists(legacy_path) and not os.path.exists(target_path):
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import os
|
|||||||
from typing import Dict
|
from typing import Dict
|
||||||
from ..services.service_registry import ServiceRegistry
|
from ..services.service_registry import ServiceRegistry
|
||||||
from ..config import config
|
from ..config import config
|
||||||
from ..services.settings_manager import settings
|
from ..services.settings_manager import get_settings_manager
|
||||||
from .constants import CIVITAI_MODEL_TAGS
|
from .constants import CIVITAI_MODEL_TAGS
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
@@ -143,7 +143,8 @@ def calculate_relative_path_for_model(model_data: Dict, model_type: str = 'lora'
|
|||||||
Relative path string (empty string for flat structure)
|
Relative path string (empty string for flat structure)
|
||||||
"""
|
"""
|
||||||
# Get path template from settings for specific model type
|
# Get path template from settings for specific model type
|
||||||
path_template = settings.get_download_path_template(model_type)
|
settings_manager = get_settings_manager()
|
||||||
|
path_template = settings_manager.get_download_path_template(model_type)
|
||||||
|
|
||||||
# If template is empty, return empty path (flat structure)
|
# If template is empty, return empty path (flat structure)
|
||||||
if not path_template:
|
if not path_template:
|
||||||
@@ -166,7 +167,7 @@ def calculate_relative_path_for_model(model_data: Dict, model_type: str = 'lora'
|
|||||||
model_tags = model_data.get('tags', [])
|
model_tags = model_data.get('tags', [])
|
||||||
|
|
||||||
# Apply mapping if available
|
# Apply mapping if available
|
||||||
base_model_mappings = settings.get('base_model_path_mappings', {})
|
base_model_mappings = settings_manager.get('base_model_path_mappings', {})
|
||||||
mapped_base_model = base_model_mappings.get(base_model, base_model)
|
mapped_base_model = base_model_mappings.get(base_model, base_model)
|
||||||
|
|
||||||
# Find the first Civitai model tag that exists in model_tags
|
# Find the first Civitai model tag that exists in model_tags
|
||||||
|
|||||||
@@ -73,6 +73,30 @@ nodes_mock.NODE_CLASS_MAPPINGS = {}
|
|||||||
sys.modules['nodes'] = nodes_mock
|
sys.modules['nodes'] = nodes_mock
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _isolate_settings_dir(tmp_path_factory, monkeypatch):
|
||||||
|
"""Redirect settings.json into a temporary directory for each test."""
|
||||||
|
|
||||||
|
settings_dir = tmp_path_factory.mktemp("settings_dir")
|
||||||
|
|
||||||
|
def fake_get_settings_dir(create: bool = True) -> str:
|
||||||
|
if create:
|
||||||
|
settings_dir.mkdir(exist_ok=True)
|
||||||
|
return str(settings_dir)
|
||||||
|
|
||||||
|
monkeypatch.setattr("py.utils.settings_paths.get_settings_dir", fake_get_settings_dir)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"py.utils.settings_paths.user_config_dir",
|
||||||
|
lambda *_args, **_kwargs: str(settings_dir),
|
||||||
|
)
|
||||||
|
|
||||||
|
from py.services import settings_manager as settings_manager_module
|
||||||
|
|
||||||
|
settings_manager_module.reset_settings_manager()
|
||||||
|
yield
|
||||||
|
settings_manager_module.reset_settings_manager()
|
||||||
|
|
||||||
|
|
||||||
def pytest_pyfunc_call(pyfuncitem):
|
def pytest_pyfunc_call(pyfuncitem):
|
||||||
"""Allow bare async tests to run without pytest.mark.asyncio."""
|
"""Allow bare async tests to run without pytest.mark.asyncio."""
|
||||||
test_function = pyfuncitem.function
|
test_function = pyfuncitem.function
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import pytest
|
|||||||
from py.services.download_manager import DownloadManager
|
from py.services.download_manager import DownloadManager
|
||||||
from py.services import download_manager
|
from py.services import download_manager
|
||||||
from py.services.service_registry import ServiceRegistry
|
from py.services.service_registry import ServiceRegistry
|
||||||
from py.services.settings_manager import settings
|
from py.services.settings_manager import SettingsManager, get_settings_manager
|
||||||
from py.utils.metadata_manager import MetadataManager
|
from py.utils.metadata_manager import MetadataManager
|
||||||
|
|
||||||
|
|
||||||
@@ -23,7 +23,8 @@ def reset_download_manager():
|
|||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def isolate_settings(monkeypatch, tmp_path):
|
def isolate_settings(monkeypatch, tmp_path):
|
||||||
"""Point settings writes at a temporary directory to avoid touching real files."""
|
"""Point settings writes at a temporary directory to avoid touching real files."""
|
||||||
default_settings = settings._get_default_settings()
|
manager = get_settings_manager()
|
||||||
|
default_settings = manager._get_default_settings()
|
||||||
default_settings.update(
|
default_settings.update(
|
||||||
{
|
{
|
||||||
"default_lora_root": str(tmp_path),
|
"default_lora_root": str(tmp_path),
|
||||||
@@ -37,8 +38,8 @@ def isolate_settings(monkeypatch, tmp_path):
|
|||||||
"base_model_path_mappings": {"BaseModel": "MappedModel"},
|
"base_model_path_mappings": {"BaseModel": "MappedModel"},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
monkeypatch.setattr(settings, "settings", default_settings)
|
monkeypatch.setattr(manager, "settings", default_settings)
|
||||||
monkeypatch.setattr(type(settings), "_save_settings", lambda self: None)
|
monkeypatch.setattr(SettingsManager, "_save_settings", lambda self: None)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
@@ -187,7 +188,7 @@ async def test_successful_download_uses_defaults(monkeypatch, scanners, metadata
|
|||||||
assert manager._active_downloads[result["download_id"]]["status"] == "completed"
|
assert manager._active_downloads[result["download_id"]]["status"] == "completed"
|
||||||
|
|
||||||
assert captured["relative_path"] == "MappedModel/fantasy"
|
assert captured["relative_path"] == "MappedModel/fantasy"
|
||||||
expected_dir = Path(settings.get("default_lora_root")) / "MappedModel" / "fantasy"
|
expected_dir = Path(get_settings_manager().get("default_lora_root")) / "MappedModel" / "fantasy"
|
||||||
assert captured["save_dir"] == expected_dir
|
assert captured["save_dir"] == expected_dir
|
||||||
assert captured["model_type"] == "lora"
|
assert captured["model_type"] == "lora"
|
||||||
assert captured["download_urls"] == [
|
assert captured["download_urls"] == [
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import pytest
|
|||||||
|
|
||||||
from py.services.example_images_cleanup_service import ExampleImagesCleanupService
|
from py.services.example_images_cleanup_service import ExampleImagesCleanupService
|
||||||
from py.services.service_registry import ServiceRegistry
|
from py.services.service_registry import ServiceRegistry
|
||||||
from py.services.settings_manager import settings
|
from py.services.settings_manager import get_settings_manager
|
||||||
|
|
||||||
|
|
||||||
class StubScanner:
|
class StubScanner:
|
||||||
@@ -21,8 +21,9 @@ class StubScanner:
|
|||||||
async def test_cleanup_moves_empty_and_orphaned(tmp_path, monkeypatch):
|
async def test_cleanup_moves_empty_and_orphaned(tmp_path, monkeypatch):
|
||||||
service = ExampleImagesCleanupService()
|
service = ExampleImagesCleanupService()
|
||||||
|
|
||||||
previous_path = settings.get('example_images_path')
|
settings_manager = get_settings_manager()
|
||||||
settings.settings['example_images_path'] = str(tmp_path)
|
previous_path = settings_manager.get('example_images_path')
|
||||||
|
settings_manager.settings['example_images_path'] = str(tmp_path)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
empty_folder = tmp_path / 'empty_folder'
|
empty_folder = tmp_path / 'empty_folder'
|
||||||
@@ -64,23 +65,24 @@ async def test_cleanup_moves_empty_and_orphaned(tmp_path, monkeypatch):
|
|||||||
|
|
||||||
finally:
|
finally:
|
||||||
if previous_path is None:
|
if previous_path is None:
|
||||||
settings.settings.pop('example_images_path', None)
|
settings_manager.settings.pop('example_images_path', None)
|
||||||
else:
|
else:
|
||||||
settings.settings['example_images_path'] = previous_path
|
settings_manager.settings['example_images_path'] = previous_path
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_cleanup_handles_missing_path(monkeypatch):
|
async def test_cleanup_handles_missing_path(monkeypatch):
|
||||||
service = ExampleImagesCleanupService()
|
service = ExampleImagesCleanupService()
|
||||||
|
|
||||||
previous_path = settings.get('example_images_path')
|
settings_manager = get_settings_manager()
|
||||||
settings.settings.pop('example_images_path', None)
|
previous_path = settings_manager.get('example_images_path')
|
||||||
|
settings_manager.settings.pop('example_images_path', None)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = await service.cleanup_example_image_folders()
|
result = await service.cleanup_example_image_folders()
|
||||||
finally:
|
finally:
|
||||||
if previous_path is not None:
|
if previous_path is not None:
|
||||||
settings.settings['example_images_path'] = previous_path
|
settings_manager.settings['example_images_path'] = previous_path
|
||||||
|
|
||||||
assert result['success'] is False
|
assert result['success'] is False
|
||||||
assert result['error_code'] == 'path_not_configured'
|
assert result['error_code'] == 'path_not_configured'
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from types import SimpleNamespace
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from py.services.settings_manager import settings
|
from py.services.settings_manager import SettingsManager, get_settings_manager
|
||||||
from py.utils import example_images_download_manager as download_module
|
from py.utils import example_images_download_manager as download_module
|
||||||
|
|
||||||
|
|
||||||
@@ -43,11 +43,15 @@ def _patch_scanner(monkeypatch: pytest.MonkeyPatch, scanner: StubScanner) -> Non
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("tmp_path")
|
@pytest.mark.usefixtures("tmp_path")
|
||||||
async def test_start_download_rejects_parallel_runs(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
async def test_start_download_rejects_parallel_runs(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
tmp_path,
|
||||||
|
settings_manager,
|
||||||
|
):
|
||||||
ws_manager = RecordingWebSocketManager()
|
ws_manager = RecordingWebSocketManager()
|
||||||
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
||||||
|
|
||||||
monkeypatch.setitem(settings.settings, "example_images_path", str(tmp_path))
|
monkeypatch.setitem(settings_manager.settings, "example_images_path", str(tmp_path))
|
||||||
|
|
||||||
model = {
|
model = {
|
||||||
"sha256": "abc123",
|
"sha256": "abc123",
|
||||||
@@ -106,11 +110,15 @@ async def test_start_download_rejects_parallel_runs(monkeypatch: pytest.MonkeyPa
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("tmp_path")
|
@pytest.mark.usefixtures("tmp_path")
|
||||||
async def test_pause_resume_blocks_processing(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
async def test_pause_resume_blocks_processing(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
tmp_path,
|
||||||
|
settings_manager,
|
||||||
|
):
|
||||||
ws_manager = RecordingWebSocketManager()
|
ws_manager = RecordingWebSocketManager()
|
||||||
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
||||||
|
|
||||||
monkeypatch.setitem(settings.settings, "example_images_path", str(tmp_path))
|
monkeypatch.setitem(settings_manager.settings, "example_images_path", str(tmp_path))
|
||||||
|
|
||||||
models = [
|
models = [
|
||||||
{
|
{
|
||||||
@@ -231,13 +239,17 @@ async def test_pause_resume_blocks_processing(monkeypatch: pytest.MonkeyPatch, t
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("tmp_path")
|
@pytest.mark.usefixtures("tmp_path")
|
||||||
async def test_legacy_folder_migrated_and_skipped(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
async def test_legacy_folder_migrated_and_skipped(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
tmp_path,
|
||||||
|
settings_manager,
|
||||||
|
):
|
||||||
ws_manager = RecordingWebSocketManager()
|
ws_manager = RecordingWebSocketManager()
|
||||||
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
||||||
|
|
||||||
monkeypatch.setitem(settings.settings, "example_images_path", str(tmp_path))
|
monkeypatch.setitem(settings_manager.settings, "example_images_path", str(tmp_path))
|
||||||
monkeypatch.setitem(settings.settings, "libraries", {"default": {}, "extra": {}})
|
monkeypatch.setitem(settings_manager.settings, "libraries", {"default": {}, "extra": {}})
|
||||||
monkeypatch.setitem(settings.settings, "active_library", "extra")
|
monkeypatch.setitem(settings_manager.settings, "active_library", "extra")
|
||||||
|
|
||||||
model_hash = "d" * 64
|
model_hash = "d" * 64
|
||||||
model_path = tmp_path / "model.safetensors"
|
model_path = tmp_path / "model.safetensors"
|
||||||
@@ -310,13 +322,17 @@ async def test_legacy_folder_migrated_and_skipped(monkeypatch: pytest.MonkeyPatc
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("tmp_path")
|
@pytest.mark.usefixtures("tmp_path")
|
||||||
async def test_legacy_progress_file_migrates(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
async def test_legacy_progress_file_migrates(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
tmp_path,
|
||||||
|
settings_manager,
|
||||||
|
):
|
||||||
ws_manager = RecordingWebSocketManager()
|
ws_manager = RecordingWebSocketManager()
|
||||||
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
||||||
|
|
||||||
monkeypatch.setitem(settings.settings, "example_images_path", str(tmp_path))
|
monkeypatch.setitem(settings_manager.settings, "example_images_path", str(tmp_path))
|
||||||
monkeypatch.setitem(settings.settings, "libraries", {"default": {}, "extra": {}})
|
monkeypatch.setitem(settings_manager.settings, "libraries", {"default": {}, "extra": {}})
|
||||||
monkeypatch.setitem(settings.settings, "active_library", "extra")
|
monkeypatch.setitem(settings_manager.settings, "active_library", "extra")
|
||||||
|
|
||||||
model_hash = "e" * 64
|
model_hash = "e" * 64
|
||||||
model_path = tmp_path / "model-two.safetensors"
|
model_path = tmp_path / "model-two.safetensors"
|
||||||
@@ -380,20 +396,24 @@ async def test_legacy_progress_file_migrates(monkeypatch: pytest.MonkeyPatch, tm
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("tmp_path")
|
@pytest.mark.usefixtures("tmp_path")
|
||||||
async def test_download_remains_in_initial_library(monkeypatch: pytest.MonkeyPatch, tmp_path):
|
async def test_download_remains_in_initial_library(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
tmp_path,
|
||||||
|
settings_manager,
|
||||||
|
):
|
||||||
ws_manager = RecordingWebSocketManager()
|
ws_manager = RecordingWebSocketManager()
|
||||||
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
||||||
|
|
||||||
monkeypatch.setitem(settings.settings, "example_images_path", str(tmp_path))
|
monkeypatch.setitem(settings_manager.settings, "example_images_path", str(tmp_path))
|
||||||
monkeypatch.setitem(settings.settings, "libraries", {"LibraryA": {}, "LibraryB": {}})
|
monkeypatch.setitem(settings_manager.settings, "libraries", {"LibraryA": {}, "LibraryB": {}})
|
||||||
monkeypatch.setitem(settings.settings, "active_library", "LibraryA")
|
monkeypatch.setitem(settings_manager.settings, "active_library", "LibraryA")
|
||||||
|
|
||||||
state = {"active": "LibraryA"}
|
state = {"active": "LibraryA"}
|
||||||
|
|
||||||
def fake_get_active_library_name(self):
|
def fake_get_active_library_name(self):
|
||||||
return state["active"]
|
return state["active"]
|
||||||
|
|
||||||
monkeypatch.setattr(type(settings), "get_active_library_name", fake_get_active_library_name)
|
monkeypatch.setattr(SettingsManager, "get_active_library_name", fake_get_active_library_name)
|
||||||
|
|
||||||
model_hash = "f" * 64
|
model_hash = "f" * 64
|
||||||
model_path = tmp_path / "example-model.safetensors"
|
model_path = tmp_path / "example-model.safetensors"
|
||||||
@@ -454,3 +474,7 @@ async def test_download_remains_in_initial_library(monkeypatch: pytest.MonkeyPat
|
|||||||
assert (model_dir / "local.txt").exists()
|
assert (model_dir / "local.txt").exists()
|
||||||
assert not (library_b_root / ".download_progress.json").exists()
|
assert not (library_b_root / ".download_progress.json").exists()
|
||||||
assert not (library_b_root / model_hash).exists()
|
assert not (library_b_root / model_hash).exists()
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def settings_manager():
|
||||||
|
return get_settings_manager()
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ from typing import Any, Dict
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from py.services.settings_manager import settings
|
from py.services.settings_manager import get_settings_manager
|
||||||
from py.utils import example_images_download_manager as download_module
|
from py.utils import example_images_download_manager as download_module
|
||||||
|
|
||||||
|
|
||||||
@@ -19,19 +19,21 @@ class RecordingWebSocketManager:
|
|||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def restore_settings() -> None:
|
def restore_settings() -> None:
|
||||||
original = settings.settings.copy()
|
manager = get_settings_manager()
|
||||||
|
original = manager.settings.copy()
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
settings.settings.clear()
|
manager.settings.clear()
|
||||||
settings.settings.update(original)
|
manager.settings.update(original)
|
||||||
|
|
||||||
|
|
||||||
async def test_start_download_requires_configured_path(monkeypatch: pytest.MonkeyPatch) -> None:
|
async def test_start_download_requires_configured_path(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
manager = download_module.DownloadManager(ws_manager=RecordingWebSocketManager())
|
manager = download_module.DownloadManager(ws_manager=RecordingWebSocketManager())
|
||||||
|
|
||||||
# Ensure example_images_path is not configured
|
# Ensure example_images_path is not configured
|
||||||
settings.settings.pop('example_images_path', None)
|
settings_manager = get_settings_manager()
|
||||||
|
settings_manager.settings.pop('example_images_path', None)
|
||||||
|
|
||||||
with pytest.raises(download_module.DownloadConfigurationError) as exc_info:
|
with pytest.raises(download_module.DownloadConfigurationError) as exc_info:
|
||||||
await manager.start_download({})
|
await manager.start_download({})
|
||||||
@@ -44,9 +46,10 @@ async def test_start_download_requires_configured_path(monkeypatch: pytest.Monke
|
|||||||
|
|
||||||
|
|
||||||
async def test_start_download_bootstraps_progress_and_task(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
async def test_start_download_bootstraps_progress_and_task(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
||||||
settings.settings["example_images_path"] = str(tmp_path)
|
settings_manager = get_settings_manager()
|
||||||
settings.settings["libraries"] = {"default": {}}
|
settings_manager.settings["example_images_path"] = str(tmp_path)
|
||||||
settings.settings["active_library"] = "default"
|
settings_manager.settings["libraries"] = {"default": {}}
|
||||||
|
settings_manager.settings["active_library"] = "default"
|
||||||
|
|
||||||
ws_manager = RecordingWebSocketManager()
|
ws_manager = RecordingWebSocketManager()
|
||||||
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
||||||
@@ -84,9 +87,10 @@ async def test_start_download_bootstraps_progress_and_task(monkeypatch: pytest.M
|
|||||||
|
|
||||||
|
|
||||||
async def test_pause_and_resume_flow(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
async def test_pause_and_resume_flow(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
||||||
settings.settings["example_images_path"] = str(tmp_path)
|
settings_manager = get_settings_manager()
|
||||||
settings.settings["libraries"] = {"default": {}}
|
settings_manager.settings["example_images_path"] = str(tmp_path)
|
||||||
settings.settings["active_library"] = "default"
|
settings_manager.settings["libraries"] = {"default": {}}
|
||||||
|
settings_manager.settings["active_library"] = "default"
|
||||||
|
|
||||||
ws_manager = RecordingWebSocketManager()
|
ws_manager = RecordingWebSocketManager()
|
||||||
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
manager = download_module.DownloadManager(ws_manager=ws_manager)
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from typing import Any, Dict
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from py.services.settings_manager import settings
|
from py.services.settings_manager import get_settings_manager
|
||||||
from py.utils.example_images_file_manager import ExampleImagesFileManager
|
from py.utils.example_images_file_manager import ExampleImagesFileManager
|
||||||
|
|
||||||
|
|
||||||
@@ -22,16 +22,18 @@ class JsonRequest:
|
|||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def restore_settings() -> None:
|
def restore_settings() -> None:
|
||||||
original = settings.settings.copy()
|
manager = get_settings_manager()
|
||||||
|
original = manager.settings.copy()
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
settings.settings.clear()
|
manager.settings.clear()
|
||||||
settings.settings.update(original)
|
manager.settings.update(original)
|
||||||
|
|
||||||
|
|
||||||
async def test_open_folder_requires_existing_model_directory(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
async def test_open_folder_requires_existing_model_directory(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
||||||
settings.settings["example_images_path"] = str(tmp_path)
|
settings_manager = get_settings_manager()
|
||||||
|
settings_manager.settings["example_images_path"] = str(tmp_path)
|
||||||
model_hash = "a" * 64
|
model_hash = "a" * 64
|
||||||
model_folder = tmp_path / model_hash
|
model_folder = tmp_path / model_hash
|
||||||
model_folder.mkdir()
|
model_folder.mkdir()
|
||||||
@@ -65,7 +67,8 @@ async def test_open_folder_requires_existing_model_directory(monkeypatch: pytest
|
|||||||
|
|
||||||
|
|
||||||
async def test_open_folder_rejects_invalid_paths(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
async def test_open_folder_rejects_invalid_paths(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
||||||
settings.settings["example_images_path"] = str(tmp_path)
|
settings_manager = get_settings_manager()
|
||||||
|
settings_manager.settings["example_images_path"] = str(tmp_path)
|
||||||
|
|
||||||
def fake_get_model_folder(_hash):
|
def fake_get_model_folder(_hash):
|
||||||
return str(tmp_path.parent / "outside")
|
return str(tmp_path.parent / "outside")
|
||||||
@@ -81,7 +84,8 @@ async def test_open_folder_rejects_invalid_paths(monkeypatch: pytest.MonkeyPatch
|
|||||||
|
|
||||||
|
|
||||||
async def test_get_files_lists_supported_media(tmp_path) -> None:
|
async def test_get_files_lists_supported_media(tmp_path) -> None:
|
||||||
settings.settings["example_images_path"] = str(tmp_path)
|
settings_manager = get_settings_manager()
|
||||||
|
settings_manager.settings["example_images_path"] = str(tmp_path)
|
||||||
model_hash = "b" * 64
|
model_hash = "b" * 64
|
||||||
model_folder = tmp_path / model_hash
|
model_folder = tmp_path / model_hash
|
||||||
model_folder.mkdir()
|
model_folder.mkdir()
|
||||||
@@ -99,7 +103,8 @@ async def test_get_files_lists_supported_media(tmp_path) -> None:
|
|||||||
|
|
||||||
|
|
||||||
async def test_has_images_reports_presence(tmp_path) -> None:
|
async def test_has_images_reports_presence(tmp_path) -> None:
|
||||||
settings.settings["example_images_path"] = str(tmp_path)
|
settings_manager = get_settings_manager()
|
||||||
|
settings_manager.settings["example_images_path"] = str(tmp_path)
|
||||||
model_hash = "c" * 64
|
model_hash = "c" * 64
|
||||||
model_folder = tmp_path / model_hash
|
model_folder = tmp_path / model_hash
|
||||||
model_folder.mkdir()
|
model_folder.mkdir()
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from py.services.settings_manager import settings
|
from py.services.settings_manager import get_settings_manager
|
||||||
from py.utils.example_images_paths import (
|
from py.utils.example_images_paths import (
|
||||||
ensure_library_root_exists,
|
ensure_library_root_exists,
|
||||||
get_model_folder,
|
get_model_folder,
|
||||||
@@ -18,18 +18,24 @@ from py.utils.example_images_paths import (
|
|||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def restore_settings():
|
def restore_settings():
|
||||||
original = copy.deepcopy(settings.settings)
|
manager = get_settings_manager()
|
||||||
|
original = copy.deepcopy(manager.settings)
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
settings.settings.clear()
|
manager.settings.clear()
|
||||||
settings.settings.update(original)
|
manager.settings.update(original)
|
||||||
|
|
||||||
|
|
||||||
def test_get_model_folder_single_library(tmp_path):
|
@pytest.fixture
|
||||||
settings.settings['example_images_path'] = str(tmp_path)
|
def settings_manager():
|
||||||
settings.settings['libraries'] = {'default': {}}
|
return get_settings_manager()
|
||||||
settings.settings['active_library'] = 'default'
|
|
||||||
|
|
||||||
|
def test_get_model_folder_single_library(tmp_path, settings_manager):
|
||||||
|
settings_manager.settings['example_images_path'] = str(tmp_path)
|
||||||
|
settings_manager.settings['libraries'] = {'default': {}}
|
||||||
|
settings_manager.settings['active_library'] = 'default'
|
||||||
|
|
||||||
model_hash = 'a' * 64
|
model_hash = 'a' * 64
|
||||||
folder = get_model_folder(model_hash)
|
folder = get_model_folder(model_hash)
|
||||||
@@ -39,13 +45,13 @@ def test_get_model_folder_single_library(tmp_path):
|
|||||||
assert relative == model_hash
|
assert relative == model_hash
|
||||||
|
|
||||||
|
|
||||||
def test_get_model_folder_multi_library(tmp_path):
|
def test_get_model_folder_multi_library(tmp_path, settings_manager):
|
||||||
settings.settings['example_images_path'] = str(tmp_path)
|
settings_manager.settings['example_images_path'] = str(tmp_path)
|
||||||
settings.settings['libraries'] = {
|
settings_manager.settings['libraries'] = {
|
||||||
'default': {},
|
'default': {},
|
||||||
'Alt Library': {},
|
'Alt Library': {},
|
||||||
}
|
}
|
||||||
settings.settings['active_library'] = 'Alt Library'
|
settings_manager.settings['active_library'] = 'Alt Library'
|
||||||
|
|
||||||
model_hash = 'b' * 64
|
model_hash = 'b' * 64
|
||||||
expected_folder = tmp_path / 'Alt_Library' / model_hash
|
expected_folder = tmp_path / 'Alt_Library' / model_hash
|
||||||
@@ -57,13 +63,13 @@ def test_get_model_folder_multi_library(tmp_path):
|
|||||||
assert relative == os.path.join('Alt_Library', model_hash).replace('\\', '/')
|
assert relative == os.path.join('Alt_Library', model_hash).replace('\\', '/')
|
||||||
|
|
||||||
|
|
||||||
def test_get_model_folder_migrates_legacy_structure(tmp_path):
|
def test_get_model_folder_migrates_legacy_structure(tmp_path, settings_manager):
|
||||||
settings.settings['example_images_path'] = str(tmp_path)
|
settings_manager.settings['example_images_path'] = str(tmp_path)
|
||||||
settings.settings['libraries'] = {
|
settings_manager.settings['libraries'] = {
|
||||||
'default': {},
|
'default': {},
|
||||||
'extra': {},
|
'extra': {},
|
||||||
}
|
}
|
||||||
settings.settings['active_library'] = 'extra'
|
settings_manager.settings['active_library'] = 'extra'
|
||||||
|
|
||||||
model_hash = 'c' * 64
|
model_hash = 'c' * 64
|
||||||
legacy_folder = tmp_path / model_hash
|
legacy_folder = tmp_path / model_hash
|
||||||
@@ -82,31 +88,31 @@ def test_get_model_folder_migrates_legacy_structure(tmp_path):
|
|||||||
assert (expected_folder / 'image.png').exists()
|
assert (expected_folder / 'image.png').exists()
|
||||||
|
|
||||||
|
|
||||||
def test_ensure_library_root_exists_creates_directories(tmp_path):
|
def test_ensure_library_root_exists_creates_directories(tmp_path, settings_manager):
|
||||||
settings.settings['example_images_path'] = str(tmp_path)
|
settings_manager.settings['example_images_path'] = str(tmp_path)
|
||||||
settings.settings['libraries'] = {'default': {}, 'secondary': {}}
|
settings_manager.settings['libraries'] = {'default': {}, 'secondary': {}}
|
||||||
settings.settings['active_library'] = 'secondary'
|
settings_manager.settings['active_library'] = 'secondary'
|
||||||
|
|
||||||
resolved = ensure_library_root_exists('secondary')
|
resolved = ensure_library_root_exists('secondary')
|
||||||
assert Path(resolved) == tmp_path / 'secondary'
|
assert Path(resolved) == tmp_path / 'secondary'
|
||||||
assert (tmp_path / 'secondary').is_dir()
|
assert (tmp_path / 'secondary').is_dir()
|
||||||
|
|
||||||
|
|
||||||
def test_iter_library_roots_returns_all_configured(tmp_path):
|
def test_iter_library_roots_returns_all_configured(tmp_path, settings_manager):
|
||||||
settings.settings['example_images_path'] = str(tmp_path)
|
settings_manager.settings['example_images_path'] = str(tmp_path)
|
||||||
settings.settings['libraries'] = {'default': {}, 'alt': {}}
|
settings_manager.settings['libraries'] = {'default': {}, 'alt': {}}
|
||||||
settings.settings['active_library'] = 'alt'
|
settings_manager.settings['active_library'] = 'alt'
|
||||||
|
|
||||||
roots = dict(iter_library_roots())
|
roots = dict(iter_library_roots())
|
||||||
assert roots['default'] == str(tmp_path / 'default')
|
assert roots['default'] == str(tmp_path / 'default')
|
||||||
assert roots['alt'] == str(tmp_path / 'alt')
|
assert roots['alt'] == str(tmp_path / 'alt')
|
||||||
|
|
||||||
|
|
||||||
def test_is_valid_example_images_root_accepts_hash_directories(tmp_path):
|
def test_is_valid_example_images_root_accepts_hash_directories(tmp_path, settings_manager):
|
||||||
settings.settings['example_images_path'] = str(tmp_path)
|
settings_manager.settings['example_images_path'] = str(tmp_path)
|
||||||
# Ensure single-library mode (not multi-library mode)
|
# Ensure single-library mode (not multi-library mode)
|
||||||
settings.settings['libraries'] = {'default': {}}
|
settings_manager.settings['libraries'] = {'default': {}}
|
||||||
settings.settings['active_library'] = 'default'
|
settings_manager.settings['active_library'] = 'default'
|
||||||
|
|
||||||
hash_folder = tmp_path / ('d' * 64)
|
hash_folder = tmp_path / ('d' * 64)
|
||||||
hash_folder.mkdir()
|
hash_folder.mkdir()
|
||||||
|
|||||||
@@ -7,18 +7,19 @@ from typing import Any, Dict, Tuple
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from py.services.settings_manager import settings
|
from py.services.settings_manager import get_settings_manager
|
||||||
from py.utils import example_images_processor as processor_module
|
from py.utils import example_images_processor as processor_module
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def restore_settings() -> None:
|
def restore_settings() -> None:
|
||||||
original = settings.settings.copy()
|
manager = get_settings_manager()
|
||||||
|
original = manager.settings.copy()
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
settings.settings.clear()
|
manager.settings.clear()
|
||||||
settings.settings.update(original)
|
manager.settings.update(original)
|
||||||
|
|
||||||
|
|
||||||
def test_get_file_extension_from_magic_bytes() -> None:
|
def test_get_file_extension_from_magic_bytes() -> None:
|
||||||
@@ -90,9 +91,10 @@ def stub_scanners(monkeypatch: pytest.MonkeyPatch, tmp_path) -> StubScanner:
|
|||||||
|
|
||||||
|
|
||||||
async def test_import_images_creates_hash_directory(monkeypatch: pytest.MonkeyPatch, tmp_path, stub_scanners: StubScanner) -> None:
|
async def test_import_images_creates_hash_directory(monkeypatch: pytest.MonkeyPatch, tmp_path, stub_scanners: StubScanner) -> None:
|
||||||
settings.settings["example_images_path"] = str(tmp_path / "examples")
|
settings_manager = get_settings_manager()
|
||||||
settings.settings["libraries"] = {"default": {}}
|
settings_manager.settings["example_images_path"] = str(tmp_path / "examples")
|
||||||
settings.settings["active_library"] = "default"
|
settings_manager.settings["libraries"] = {"default": {}}
|
||||||
|
settings_manager.settings["active_library"] = "default"
|
||||||
|
|
||||||
source_file = tmp_path / "upload.png"
|
source_file = tmp_path / "upload.png"
|
||||||
source_file.write_bytes(b"PNG data")
|
source_file.write_bytes(b"PNG data")
|
||||||
@@ -112,7 +114,7 @@ async def test_import_images_creates_hash_directory(monkeypatch: pytest.MonkeyPa
|
|||||||
assert result["success"] is True
|
assert result["success"] is True
|
||||||
assert result["files"][0]["name"].startswith("custom_short")
|
assert result["files"][0]["name"].startswith("custom_short")
|
||||||
|
|
||||||
model_folder = Path(settings.settings["example_images_path"]) / ("a" * 64)
|
model_folder = Path(settings_manager.settings["example_images_path"]) / ("a" * 64)
|
||||||
assert model_folder.exists()
|
assert model_folder.exists()
|
||||||
created_files = list(model_folder.glob("custom_short*.png"))
|
created_files = list(model_folder.glob("custom_short*.png"))
|
||||||
assert len(created_files) == 1
|
assert len(created_files) == 1
|
||||||
@@ -132,7 +134,8 @@ async def test_import_images_rejects_missing_parameters(monkeypatch: pytest.Monk
|
|||||||
|
|
||||||
|
|
||||||
async def test_import_images_raises_when_model_not_found(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
async def test_import_images_raises_when_model_not_found(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None:
|
||||||
settings.settings["example_images_path"] = str(tmp_path)
|
settings_manager = get_settings_manager()
|
||||||
|
settings_manager.settings["example_images_path"] = str(tmp_path)
|
||||||
|
|
||||||
async def _empty_scanner(cls=None):
|
async def _empty_scanner(cls=None):
|
||||||
return StubScanner([])
|
return StubScanner([])
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from py.services.settings_manager import settings
|
from py.services.settings_manager import SettingsManager, get_settings_manager
|
||||||
from py.utils.utils import (
|
from py.utils.utils import (
|
||||||
calculate_recipe_fingerprint,
|
calculate_recipe_fingerprint,
|
||||||
calculate_relative_path_for_model,
|
calculate_relative_path_for_model,
|
||||||
@@ -9,7 +9,8 @@ from py.utils.utils import (
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def isolated_settings(monkeypatch):
|
def isolated_settings(monkeypatch):
|
||||||
default_settings = settings._get_default_settings()
|
manager = get_settings_manager()
|
||||||
|
default_settings = manager._get_default_settings()
|
||||||
default_settings.update(
|
default_settings.update(
|
||||||
{
|
{
|
||||||
"download_path_templates": {
|
"download_path_templates": {
|
||||||
@@ -20,8 +21,8 @@ def isolated_settings(monkeypatch):
|
|||||||
"base_model_path_mappings": {},
|
"base_model_path_mappings": {},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
monkeypatch.setattr(settings, "settings", default_settings)
|
monkeypatch.setattr(manager, "settings", default_settings)
|
||||||
monkeypatch.setattr(type(settings), "_save_settings", lambda self: None)
|
monkeypatch.setattr(SettingsManager, "_save_settings", lambda self: None)
|
||||||
return default_settings
|
return default_settings
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user