mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
feat(example-images): namespace storage by library
This commit is contained in:
@@ -11,7 +11,6 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
@@ -29,6 +28,7 @@ from ...services.settings_manager import settings as default_settings
|
||||
from ...services.websocket_manager import ws_manager
|
||||
from ...services.downloader import get_downloader
|
||||
from ...utils.constants import DEFAULT_NODE_COLOR, NODE_TYPES, SUPPORTED_MEDIA_EXTENSIONS
|
||||
from ...utils.example_images_paths import is_valid_example_images_root
|
||||
from ...utils.lora_metadata import extract_trained_words
|
||||
from ...utils.usage_stats import UsageStats
|
||||
|
||||
@@ -274,21 +274,7 @@ class SettingsHandler:
|
||||
return None
|
||||
|
||||
def _is_dedicated_example_images_folder(self, folder_path: str) -> bool:
|
||||
try:
|
||||
items = os.listdir(folder_path)
|
||||
if not items:
|
||||
return True
|
||||
for item in items:
|
||||
item_path = os.path.join(folder_path, item)
|
||||
if item == ".download_progress.json" and os.path.isfile(item_path):
|
||||
continue
|
||||
if os.path.isdir(item_path) and re.fullmatch(r"[a-fA-F0-9]{64}", item):
|
||||
continue
|
||||
return False
|
||||
return True
|
||||
except Exception as exc: # pragma: no cover - defensive logging
|
||||
logger.error("Error checking if folder is dedicated: %s", exc)
|
||||
return False
|
||||
return is_valid_example_images_root(folder_path)
|
||||
|
||||
|
||||
class UsageStatsHandler:
|
||||
|
||||
@@ -8,10 +8,11 @@ import os
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
from .service_registry import ServiceRegistry
|
||||
from .settings_manager import settings
|
||||
from ..utils.example_images_paths import iter_library_roots
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -70,9 +71,9 @@ class ExampleImagesCleanupService:
|
||||
"error_code": "path_not_configured",
|
||||
}
|
||||
|
||||
example_root = Path(example_images_path)
|
||||
if not example_root.exists():
|
||||
logger.debug("Cleanup skipped: example images path missing -> %s", example_root)
|
||||
base_root = Path(example_images_path)
|
||||
if not base_root.exists():
|
||||
logger.debug("Cleanup skipped: example images path missing -> %s", base_root)
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Example images path does not exist.",
|
||||
@@ -91,9 +92,6 @@ class ExampleImagesCleanupService:
|
||||
"error_code": "scanner_initialization_failed",
|
||||
}
|
||||
|
||||
deleted_bucket = example_root / self._deleted_folder_name
|
||||
deleted_bucket.mkdir(exist_ok=True)
|
||||
|
||||
checked_folders = 0
|
||||
moved_empty = 0
|
||||
moved_orphaned = 0
|
||||
@@ -101,45 +99,96 @@ class ExampleImagesCleanupService:
|
||||
move_failures = 0
|
||||
errors: List[str] = []
|
||||
|
||||
for entry in os.scandir(example_root):
|
||||
if not entry.is_dir(follow_symlinks=False):
|
||||
resolved_base = base_root.resolve()
|
||||
library_paths: List[Tuple[str, Path]] = []
|
||||
processed_paths = {resolved_base}
|
||||
|
||||
for library_name, library_path in iter_library_roots():
|
||||
if not library_path:
|
||||
continue
|
||||
|
||||
if entry.name == self._deleted_folder_name:
|
||||
continue
|
||||
|
||||
checked_folders += 1
|
||||
folder_path = Path(entry.path)
|
||||
|
||||
library_root = Path(library_path)
|
||||
try:
|
||||
if self._is_folder_empty(folder_path):
|
||||
if await self._remove_empty_folder(folder_path):
|
||||
moved_empty += 1
|
||||
else:
|
||||
move_failures += 1
|
||||
continue
|
||||
|
||||
if not self._is_hash_folder(entry.name):
|
||||
skipped_non_hash += 1
|
||||
continue
|
||||
|
||||
hash_exists = (
|
||||
lora_scanner.has_hash(entry.name)
|
||||
or checkpoint_scanner.has_hash(entry.name)
|
||||
or embedding_scanner.has_hash(entry.name)
|
||||
resolved = library_root.resolve()
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
if resolved in processed_paths:
|
||||
continue
|
||||
if not library_root.exists():
|
||||
logger.debug(
|
||||
"Skipping cleanup for library '%s': folder missing (%s)",
|
||||
library_name,
|
||||
library_root,
|
||||
)
|
||||
continue
|
||||
processed_paths.add(resolved)
|
||||
library_paths.append((library_name, library_root))
|
||||
|
||||
if not hash_exists:
|
||||
if await self._move_folder(folder_path, deleted_bucket):
|
||||
moved_orphaned += 1
|
||||
else:
|
||||
move_failures += 1
|
||||
deleted_roots: List[Path] = []
|
||||
|
||||
except Exception as exc: # pragma: no cover - filesystem guard
|
||||
move_failures += 1
|
||||
error_message = f"{entry.name}: {exc}"
|
||||
errors.append(error_message)
|
||||
logger.error("Error processing example images folder %s: %s", folder_path, exc, exc_info=True)
|
||||
# Build list of (label, root) pairs including the base root for legacy layouts
|
||||
cleanup_targets: List[Tuple[str, Path]] = [("__base__", base_root)] + library_paths
|
||||
|
||||
library_root_set = {root.resolve() for _, root in library_paths}
|
||||
|
||||
for label, root_path in cleanup_targets:
|
||||
deleted_bucket = root_path / self._deleted_folder_name
|
||||
deleted_bucket.mkdir(exist_ok=True)
|
||||
deleted_roots.append(deleted_bucket)
|
||||
|
||||
for entry in os.scandir(root_path):
|
||||
if not entry.is_dir(follow_symlinks=False):
|
||||
continue
|
||||
|
||||
if entry.name == self._deleted_folder_name:
|
||||
continue
|
||||
|
||||
entry_path = Path(entry.path)
|
||||
|
||||
if label == "__base__":
|
||||
try:
|
||||
resolved_entry = entry_path.resolve()
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
if resolved_entry in library_root_set:
|
||||
# Skip library-specific folders tracked separately
|
||||
continue
|
||||
|
||||
checked_folders += 1
|
||||
|
||||
try:
|
||||
if self._is_folder_empty(entry_path):
|
||||
if await self._remove_empty_folder(entry_path):
|
||||
moved_empty += 1
|
||||
else:
|
||||
move_failures += 1
|
||||
continue
|
||||
|
||||
if not self._is_hash_folder(entry.name):
|
||||
skipped_non_hash += 1
|
||||
continue
|
||||
|
||||
hash_exists = (
|
||||
lora_scanner.has_hash(entry.name)
|
||||
or checkpoint_scanner.has_hash(entry.name)
|
||||
or embedding_scanner.has_hash(entry.name)
|
||||
)
|
||||
|
||||
if not hash_exists:
|
||||
if await self._move_folder(entry_path, deleted_bucket):
|
||||
moved_orphaned += 1
|
||||
else:
|
||||
move_failures += 1
|
||||
|
||||
except Exception as exc: # pragma: no cover - filesystem guard
|
||||
move_failures += 1
|
||||
error_message = f"{entry.name}: {exc}"
|
||||
errors.append(error_message)
|
||||
logger.error(
|
||||
"Error processing example images folder %s: %s",
|
||||
entry_path,
|
||||
exc,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
partial_success = move_failures > 0 and (moved_empty > 0 or moved_orphaned > 0)
|
||||
success = move_failures == 0 and not errors
|
||||
@@ -152,11 +201,12 @@ class ExampleImagesCleanupService:
|
||||
skipped_non_hash=skipped_non_hash,
|
||||
move_failures=move_failures,
|
||||
errors=errors,
|
||||
deleted_root=str(deleted_bucket),
|
||||
deleted_root=str(deleted_roots[0]) if deleted_roots else None,
|
||||
partial_success=partial_success,
|
||||
)
|
||||
|
||||
summary = result.to_dict()
|
||||
summary["deleted_roots"] = [str(path) for path in deleted_roots]
|
||||
if success:
|
||||
logger.info(
|
||||
"Example images cleanup complete: checked=%s, moved_empty=%s, moved_orphaned=%s",
|
||||
|
||||
@@ -8,6 +8,7 @@ import time
|
||||
from typing import Any, Dict
|
||||
|
||||
from ..services.service_registry import ServiceRegistry
|
||||
from ..utils.example_images_paths import ensure_library_root_exists
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
from .example_images_processor import ExampleImagesProcessor
|
||||
from .example_images_metadata import MetadataUpdater
|
||||
@@ -84,6 +85,13 @@ class DownloadManager:
|
||||
self._ws_manager = ws_manager
|
||||
self._state_lock = state_lock or asyncio.Lock()
|
||||
|
||||
def _resolve_output_dir(self) -> str:
|
||||
base_path = settings.get('example_images_path')
|
||||
if not base_path:
|
||||
return ''
|
||||
library_name = settings.get_active_library_name()
|
||||
return ensure_library_root_exists(library_name)
|
||||
|
||||
async def start_download(self, options: dict):
|
||||
"""Start downloading example images for models."""
|
||||
|
||||
@@ -98,9 +106,9 @@ class DownloadManager:
|
||||
model_types = data.get('model_types', ['lora', 'checkpoint'])
|
||||
delay = float(data.get('delay', 0.2))
|
||||
|
||||
output_dir = settings.get('example_images_path')
|
||||
base_path = settings.get('example_images_path')
|
||||
|
||||
if not output_dir:
|
||||
if not base_path:
|
||||
error_msg = 'Example images path not configured in settings'
|
||||
if auto_mode:
|
||||
logger.debug(error_msg)
|
||||
@@ -110,7 +118,9 @@ class DownloadManager:
|
||||
}
|
||||
raise DownloadConfigurationError(error_msg)
|
||||
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_dir = self._resolve_output_dir()
|
||||
if not output_dir:
|
||||
raise DownloadConfigurationError('Example images path not configured in settings')
|
||||
|
||||
self._progress.reset()
|
||||
self._progress['status'] = 'running'
|
||||
@@ -458,13 +468,14 @@ class DownloadManager:
|
||||
if not model_hashes:
|
||||
raise DownloadConfigurationError('Missing model_hashes parameter')
|
||||
|
||||
output_dir = settings.get('example_images_path')
|
||||
base_path = settings.get('example_images_path')
|
||||
|
||||
if not base_path:
|
||||
raise DownloadConfigurationError('Example images path not configured in settings')
|
||||
output_dir = self._resolve_output_dir()
|
||||
if not output_dir:
|
||||
raise DownloadConfigurationError('Example images path not configured in settings')
|
||||
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
self._progress.reset()
|
||||
self._progress['total'] = len(model_hashes)
|
||||
self._progress['status'] = 'running'
|
||||
|
||||
@@ -4,6 +4,10 @@ import sys
|
||||
import subprocess
|
||||
from aiohttp import web
|
||||
from ..services.settings_manager import settings
|
||||
from ..utils.example_images_paths import (
|
||||
get_model_folder,
|
||||
get_model_relative_path,
|
||||
)
|
||||
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -41,8 +45,12 @@ class ExampleImagesFileManager:
|
||||
}, status=400)
|
||||
|
||||
# Construct folder path for this model
|
||||
model_folder = os.path.join(example_images_path, model_hash)
|
||||
model_folder = os.path.abspath(model_folder) # Get absolute path
|
||||
model_folder = get_model_folder(model_hash)
|
||||
if not model_folder:
|
||||
return web.json_response({
|
||||
'success': False,
|
||||
'error': 'Failed to resolve example images folder for this model.'
|
||||
}, status=500)
|
||||
|
||||
# Path validation: ensure model_folder is under example_images_path
|
||||
if not model_folder.startswith(os.path.abspath(example_images_path)):
|
||||
@@ -109,8 +117,13 @@ class ExampleImagesFileManager:
|
||||
}, status=400)
|
||||
|
||||
# Construct folder path for this model
|
||||
model_folder = os.path.join(example_images_path, model_hash)
|
||||
|
||||
model_folder = get_model_folder(model_hash)
|
||||
if not model_folder:
|
||||
return web.json_response({
|
||||
'success': False,
|
||||
'error': 'Failed to resolve example images folder for this model'
|
||||
}, status=500)
|
||||
|
||||
# Check if folder exists
|
||||
if not os.path.exists(model_folder):
|
||||
return web.json_response({
|
||||
@@ -128,9 +141,10 @@ class ExampleImagesFileManager:
|
||||
file_ext = os.path.splitext(file)[1].lower()
|
||||
if (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
|
||||
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']):
|
||||
relative_path = get_model_relative_path(model_hash)
|
||||
files.append({
|
||||
'name': file,
|
||||
'path': f'/example_images_static/{model_hash}/{file}',
|
||||
'path': f'/example_images_static/{relative_path}/{file}',
|
||||
'extension': file_ext,
|
||||
'is_video': file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
|
||||
})
|
||||
@@ -176,8 +190,13 @@ class ExampleImagesFileManager:
|
||||
})
|
||||
|
||||
# Construct folder path for this model
|
||||
model_folder = os.path.join(example_images_path, model_hash)
|
||||
|
||||
model_folder = get_model_folder(model_hash)
|
||||
if not model_folder:
|
||||
return web.json_response({
|
||||
'has_images': False,
|
||||
'error': 'Failed to resolve example images folder for this model'
|
||||
})
|
||||
|
||||
# Check if folder exists
|
||||
if not os.path.exists(model_folder) or not os.path.isdir(model_folder):
|
||||
return web.json_response({
|
||||
|
||||
@@ -5,6 +5,7 @@ import re
|
||||
import json
|
||||
from ..services.settings_manager import settings
|
||||
from ..services.service_registry import ServiceRegistry
|
||||
from ..utils.example_images_paths import iter_library_roots
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
from ..utils.example_images_processor import ExampleImagesProcessor
|
||||
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
||||
@@ -19,29 +20,35 @@ class ExampleImagesMigration:
|
||||
@staticmethod
|
||||
async def check_and_run_migrations():
|
||||
"""Check if migrations are needed and run them in background"""
|
||||
example_images_path = settings.get('example_images_path')
|
||||
if not example_images_path or not os.path.exists(example_images_path):
|
||||
root = settings.get('example_images_path')
|
||||
if not root or not os.path.exists(root):
|
||||
logger.debug("No example images path configured or path doesn't exist, skipping migrations")
|
||||
return
|
||||
|
||||
# Check current version from progress file
|
||||
current_version = 0
|
||||
progress_file = os.path.join(example_images_path, '.download_progress.json')
|
||||
if os.path.exists(progress_file):
|
||||
try:
|
||||
with open(progress_file, 'r', encoding='utf-8') as f:
|
||||
progress_data = json.load(f)
|
||||
current_version = progress_data.get('naming_version', 0)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load progress file for migration check: {e}")
|
||||
|
||||
# If current version is less than target version, start migration
|
||||
if current_version < CURRENT_NAMING_VERSION:
|
||||
logger.info(f"Starting example images naming migration from v{current_version} to v{CURRENT_NAMING_VERSION}")
|
||||
# Start migration in background task
|
||||
asyncio.create_task(
|
||||
ExampleImagesMigration.run_migrations(example_images_path, current_version, CURRENT_NAMING_VERSION)
|
||||
)
|
||||
|
||||
for library_name, library_path in iter_library_roots():
|
||||
if not library_path or not os.path.exists(library_path):
|
||||
continue
|
||||
|
||||
current_version = 0
|
||||
progress_file = os.path.join(library_path, '.download_progress.json')
|
||||
if os.path.exists(progress_file):
|
||||
try:
|
||||
with open(progress_file, 'r', encoding='utf-8') as f:
|
||||
progress_data = json.load(f)
|
||||
current_version = progress_data.get('naming_version', 0)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load progress file for migration check: {e}")
|
||||
|
||||
if current_version < CURRENT_NAMING_VERSION:
|
||||
logger.info(
|
||||
"Starting example images naming migration from v%s to v%s for library '%s'",
|
||||
current_version,
|
||||
CURRENT_NAMING_VERSION,
|
||||
library_name,
|
||||
)
|
||||
asyncio.create_task(
|
||||
ExampleImagesMigration.run_migrations(library_path, current_version, CURRENT_NAMING_VERSION)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def run_migrations(example_images_path, from_version, to_version):
|
||||
|
||||
193
py/utils/example_images_paths.py
Normal file
193
py/utils/example_images_paths.py
Normal file
@@ -0,0 +1,193 @@
|
||||
"""Utility helpers for resolving example image storage paths."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from typing import Iterable, List, Optional, Tuple
|
||||
|
||||
from ..services.settings_manager import settings
|
||||
|
||||
_HEX_PATTERN = re.compile(r"[a-fA-F0-9]{64}")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_configured_libraries() -> List[str]:
|
||||
"""Return configured library names if multi-library support is enabled."""
|
||||
|
||||
libraries = settings.get("libraries")
|
||||
if isinstance(libraries, dict) and libraries:
|
||||
return list(libraries.keys())
|
||||
return []
|
||||
|
||||
|
||||
def get_example_images_root() -> str:
|
||||
"""Return the root directory configured for example images."""
|
||||
|
||||
root = settings.get("example_images_path") or ""
|
||||
return os.path.abspath(root) if root else ""
|
||||
|
||||
|
||||
def uses_library_scoped_folders() -> bool:
|
||||
"""Return True when example images should be separated per library."""
|
||||
|
||||
libraries = _get_configured_libraries()
|
||||
return len(libraries) > 1
|
||||
|
||||
|
||||
def sanitize_library_name(library_name: Optional[str]) -> str:
|
||||
"""Return a filesystem safe library name."""
|
||||
|
||||
name = library_name or settings.get_active_library_name() or "default"
|
||||
safe_name = re.sub(r"[^A-Za-z0-9_.-]", "_", name)
|
||||
return safe_name or "default"
|
||||
|
||||
|
||||
def get_library_root(library_name: Optional[str] = None) -> str:
|
||||
"""Return the directory where a library's example images should live."""
|
||||
|
||||
root = get_example_images_root()
|
||||
if not root:
|
||||
return ""
|
||||
|
||||
if uses_library_scoped_folders():
|
||||
return os.path.join(root, sanitize_library_name(library_name))
|
||||
return root
|
||||
|
||||
|
||||
def ensure_library_root_exists(library_name: Optional[str] = None) -> str:
|
||||
"""Ensure the example image directory for a library exists and return it."""
|
||||
|
||||
library_root = get_library_root(library_name)
|
||||
if library_root:
|
||||
os.makedirs(library_root, exist_ok=True)
|
||||
return library_root
|
||||
|
||||
|
||||
def get_model_folder(model_hash: str, library_name: Optional[str] = None) -> str:
|
||||
"""Return the folder path for a model's example images."""
|
||||
|
||||
if not model_hash:
|
||||
return ""
|
||||
|
||||
library_root = ensure_library_root_exists(library_name)
|
||||
if not library_root:
|
||||
return ""
|
||||
|
||||
normalized_hash = (model_hash or "").lower()
|
||||
resolved_folder = os.path.join(library_root, normalized_hash)
|
||||
|
||||
if uses_library_scoped_folders():
|
||||
legacy_root = get_example_images_root()
|
||||
legacy_folder = os.path.join(legacy_root, normalized_hash)
|
||||
if os.path.exists(legacy_folder) and not os.path.exists(resolved_folder):
|
||||
try:
|
||||
os.makedirs(library_root, exist_ok=True)
|
||||
shutil.move(legacy_folder, resolved_folder)
|
||||
logger.info(
|
||||
"Migrated legacy example images folder '%s' to '%s'", legacy_folder, resolved_folder
|
||||
)
|
||||
except OSError as exc:
|
||||
logger.error(
|
||||
"Failed to migrate example images from '%s' to '%s': %s",
|
||||
legacy_folder,
|
||||
resolved_folder,
|
||||
exc,
|
||||
)
|
||||
return legacy_folder
|
||||
|
||||
return resolved_folder
|
||||
|
||||
|
||||
def get_model_relative_path(model_hash: str, library_name: Optional[str] = None) -> str:
|
||||
"""Return the relative URL path from the static mount to a model folder."""
|
||||
|
||||
root = get_example_images_root()
|
||||
folder = get_model_folder(model_hash, library_name)
|
||||
if not root or not folder:
|
||||
return ""
|
||||
|
||||
try:
|
||||
relative = os.path.relpath(folder, root)
|
||||
except ValueError:
|
||||
return ""
|
||||
|
||||
return relative.replace("\\", "/")
|
||||
|
||||
|
||||
def iter_library_roots() -> Iterable[Tuple[str, str]]:
|
||||
"""Yield configured library names and their resolved filesystem roots."""
|
||||
|
||||
root = get_example_images_root()
|
||||
if not root:
|
||||
return []
|
||||
|
||||
libraries = _get_configured_libraries()
|
||||
if uses_library_scoped_folders():
|
||||
results: List[Tuple[str, str]] = []
|
||||
if libraries:
|
||||
for library in libraries:
|
||||
results.append((library, get_library_root(library)))
|
||||
else:
|
||||
# Fall back to the active library to avoid skipping migrations/cleanup
|
||||
active = settings.get_active_library_name() or "default"
|
||||
results.append((active, get_library_root(active)))
|
||||
return results
|
||||
|
||||
active = settings.get_active_library_name() or "default"
|
||||
return [(active, root)]
|
||||
|
||||
|
||||
def is_hash_folder(name: str) -> bool:
|
||||
"""Return True if the provided name looks like a model hash folder."""
|
||||
|
||||
return bool(_HEX_PATTERN.fullmatch(name or ""))
|
||||
|
||||
|
||||
def is_valid_example_images_root(folder_path: str) -> bool:
|
||||
"""Check whether a folder looks like a dedicated example images root."""
|
||||
|
||||
try:
|
||||
items = os.listdir(folder_path)
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
for item in items:
|
||||
item_path = os.path.join(folder_path, item)
|
||||
if item == ".download_progress.json" and os.path.isfile(item_path):
|
||||
continue
|
||||
|
||||
if os.path.isdir(item_path):
|
||||
if is_hash_folder(item):
|
||||
continue
|
||||
if item == "_deleted":
|
||||
# Allow cleanup staging folders
|
||||
continue
|
||||
# When multi-library mode is active we expect nested hash folders
|
||||
if uses_library_scoped_folders():
|
||||
if _library_folder_has_only_hash_dirs(item_path):
|
||||
continue
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _library_folder_has_only_hash_dirs(path: str) -> bool:
|
||||
"""Return True when a library subfolder only contains hash folders or metadata files."""
|
||||
|
||||
try:
|
||||
for entry in os.listdir(path):
|
||||
entry_path = os.path.join(path, entry)
|
||||
if entry == ".download_progress.json" and os.path.isfile(entry_path):
|
||||
continue
|
||||
if entry == "_deleted" and os.path.isdir(entry_path):
|
||||
continue
|
||||
if not os.path.isdir(entry_path) or not is_hash_folder(entry):
|
||||
return False
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
return True
|
||||
@@ -7,6 +7,7 @@ from aiohttp import web
|
||||
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
||||
from ..services.service_registry import ServiceRegistry
|
||||
from ..services.settings_manager import settings
|
||||
from ..utils.example_images_paths import get_model_folder, get_model_relative_path
|
||||
from .example_images_metadata import MetadataUpdater
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
|
||||
@@ -346,7 +347,9 @@ class ExampleImagesProcessor:
|
||||
)
|
||||
|
||||
# Create model folder
|
||||
model_folder = os.path.join(example_images_path, model_hash)
|
||||
model_folder = get_model_folder(model_hash)
|
||||
if not model_folder:
|
||||
raise ExampleImagesImportError('Failed to resolve model folder for example images')
|
||||
os.makedirs(model_folder, exist_ok=True)
|
||||
|
||||
imported_files = []
|
||||
@@ -383,7 +386,7 @@ class ExampleImagesProcessor:
|
||||
# Add to imported files list
|
||||
imported_files.append({
|
||||
'name': new_filename,
|
||||
'path': f'/example_images_static/{model_hash}/{new_filename}',
|
||||
'path': f'/example_images_static/{get_model_relative_path(model_hash)}/{new_filename}',
|
||||
'extension': file_ext,
|
||||
'is_video': file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
|
||||
})
|
||||
@@ -497,7 +500,12 @@ class ExampleImagesProcessor:
|
||||
}, status=404)
|
||||
|
||||
# Find and delete the actual file
|
||||
model_folder = os.path.join(example_images_path, model_hash)
|
||||
model_folder = get_model_folder(model_hash)
|
||||
if not model_folder:
|
||||
return web.json_response({
|
||||
'success': False,
|
||||
'error': 'Failed to resolve model folder for example images'
|
||||
}, status=500)
|
||||
file_deleted = False
|
||||
|
||||
if os.path.exists(model_folder):
|
||||
|
||||
76
tests/utils/test_example_images_paths.py
Normal file
76
tests/utils/test_example_images_paths.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from py.services.settings_manager import settings
|
||||
from py.utils.example_images_paths import get_model_folder, get_model_relative_path
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def restore_settings():
|
||||
original = copy.deepcopy(settings.settings)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
settings.settings.clear()
|
||||
settings.settings.update(original)
|
||||
|
||||
|
||||
def test_get_model_folder_single_library(tmp_path):
|
||||
settings.settings['example_images_path'] = str(tmp_path)
|
||||
settings.settings['libraries'] = {'default': {}}
|
||||
settings.settings['active_library'] = 'default'
|
||||
|
||||
model_hash = 'a' * 64
|
||||
folder = get_model_folder(model_hash)
|
||||
relative = get_model_relative_path(model_hash)
|
||||
|
||||
assert Path(folder) == tmp_path / model_hash
|
||||
assert relative == model_hash
|
||||
|
||||
|
||||
def test_get_model_folder_multi_library(tmp_path):
|
||||
settings.settings['example_images_path'] = str(tmp_path)
|
||||
settings.settings['libraries'] = {
|
||||
'default': {},
|
||||
'Alt Library': {},
|
||||
}
|
||||
settings.settings['active_library'] = 'Alt Library'
|
||||
|
||||
model_hash = 'b' * 64
|
||||
expected_folder = tmp_path / 'Alt_Library' / model_hash
|
||||
|
||||
folder = get_model_folder(model_hash)
|
||||
relative = get_model_relative_path(model_hash)
|
||||
|
||||
assert Path(folder) == expected_folder
|
||||
assert relative == os.path.join('Alt_Library', model_hash).replace('\\', '/')
|
||||
|
||||
|
||||
def test_get_model_folder_migrates_legacy_structure(tmp_path):
|
||||
settings.settings['example_images_path'] = str(tmp_path)
|
||||
settings.settings['libraries'] = {
|
||||
'default': {},
|
||||
'extra': {},
|
||||
}
|
||||
settings.settings['active_library'] = 'extra'
|
||||
|
||||
model_hash = 'c' * 64
|
||||
legacy_folder = tmp_path / model_hash
|
||||
legacy_folder.mkdir()
|
||||
legacy_file = legacy_folder / 'image.png'
|
||||
legacy_file.write_text('data', encoding='utf-8')
|
||||
|
||||
resolved_folder = get_model_folder(model_hash)
|
||||
relative = get_model_relative_path(model_hash)
|
||||
|
||||
expected_folder = tmp_path / 'extra' / model_hash
|
||||
|
||||
assert Path(resolved_folder) == expected_folder
|
||||
assert relative == os.path.join('extra', model_hash).replace('\\', '/')
|
||||
assert not legacy_folder.exists()
|
||||
assert (expected_folder / 'image.png').exists()
|
||||
Reference in New Issue
Block a user