Merge pull request #517 from willmiao/codex/determine-example_images_path-structure

feat: namespace example image storage by library
This commit is contained in:
pixelpaws
2025-10-04 20:42:15 +08:00
committed by GitHub
8 changed files with 445 additions and 95 deletions

View File

@@ -11,7 +11,6 @@ from __future__ import annotations
import asyncio
import logging
import os
import re
import subprocess
import sys
from dataclasses import dataclass
@@ -29,6 +28,7 @@ from ...services.settings_manager import settings as default_settings
from ...services.websocket_manager import ws_manager
from ...services.downloader import get_downloader
from ...utils.constants import DEFAULT_NODE_COLOR, NODE_TYPES, SUPPORTED_MEDIA_EXTENSIONS
from ...utils.example_images_paths import is_valid_example_images_root
from ...utils.lora_metadata import extract_trained_words
from ...utils.usage_stats import UsageStats
@@ -274,21 +274,7 @@ class SettingsHandler:
return None
def _is_dedicated_example_images_folder(self, folder_path: str) -> bool:
try:
items = os.listdir(folder_path)
if not items:
return True
for item in items:
item_path = os.path.join(folder_path, item)
if item == ".download_progress.json" and os.path.isfile(item_path):
continue
if os.path.isdir(item_path) and re.fullmatch(r"[a-fA-F0-9]{64}", item):
continue
return False
return True
except Exception as exc: # pragma: no cover - defensive logging
logger.error("Error checking if folder is dedicated: %s", exc)
return False
return is_valid_example_images_root(folder_path)
class UsageStatsHandler:

View File

@@ -8,10 +8,11 @@ import os
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Tuple
from .service_registry import ServiceRegistry
from .settings_manager import settings
from ..utils.example_images_paths import iter_library_roots
logger = logging.getLogger(__name__)
@@ -70,9 +71,9 @@ class ExampleImagesCleanupService:
"error_code": "path_not_configured",
}
example_root = Path(example_images_path)
if not example_root.exists():
logger.debug("Cleanup skipped: example images path missing -> %s", example_root)
base_root = Path(example_images_path)
if not base_root.exists():
logger.debug("Cleanup skipped: example images path missing -> %s", base_root)
return {
"success": False,
"error": "Example images path does not exist.",
@@ -91,9 +92,6 @@ class ExampleImagesCleanupService:
"error_code": "scanner_initialization_failed",
}
deleted_bucket = example_root / self._deleted_folder_name
deleted_bucket.mkdir(exist_ok=True)
checked_folders = 0
moved_empty = 0
moved_orphaned = 0
@@ -101,45 +99,96 @@ class ExampleImagesCleanupService:
move_failures = 0
errors: List[str] = []
for entry in os.scandir(example_root):
if not entry.is_dir(follow_symlinks=False):
resolved_base = base_root.resolve()
library_paths: List[Tuple[str, Path]] = []
processed_paths = {resolved_base}
for library_name, library_path in iter_library_roots():
if not library_path:
continue
if entry.name == self._deleted_folder_name:
continue
checked_folders += 1
folder_path = Path(entry.path)
library_root = Path(library_path)
try:
if self._is_folder_empty(folder_path):
if await self._remove_empty_folder(folder_path):
moved_empty += 1
else:
move_failures += 1
continue
if not self._is_hash_folder(entry.name):
skipped_non_hash += 1
continue
hash_exists = (
lora_scanner.has_hash(entry.name)
or checkpoint_scanner.has_hash(entry.name)
or embedding_scanner.has_hash(entry.name)
resolved = library_root.resolve()
except FileNotFoundError:
continue
if resolved in processed_paths:
continue
if not library_root.exists():
logger.debug(
"Skipping cleanup for library '%s': folder missing (%s)",
library_name,
library_root,
)
continue
processed_paths.add(resolved)
library_paths.append((library_name, library_root))
if not hash_exists:
if await self._move_folder(folder_path, deleted_bucket):
moved_orphaned += 1
else:
move_failures += 1
deleted_roots: List[Path] = []
except Exception as exc: # pragma: no cover - filesystem guard
move_failures += 1
error_message = f"{entry.name}: {exc}"
errors.append(error_message)
logger.error("Error processing example images folder %s: %s", folder_path, exc, exc_info=True)
# Build list of (label, root) pairs including the base root for legacy layouts
cleanup_targets: List[Tuple[str, Path]] = [("__base__", base_root)] + library_paths
library_root_set = {root.resolve() for _, root in library_paths}
for label, root_path in cleanup_targets:
deleted_bucket = root_path / self._deleted_folder_name
deleted_bucket.mkdir(exist_ok=True)
deleted_roots.append(deleted_bucket)
for entry in os.scandir(root_path):
if not entry.is_dir(follow_symlinks=False):
continue
if entry.name == self._deleted_folder_name:
continue
entry_path = Path(entry.path)
if label == "__base__":
try:
resolved_entry = entry_path.resolve()
except FileNotFoundError:
continue
if resolved_entry in library_root_set:
# Skip library-specific folders tracked separately
continue
checked_folders += 1
try:
if self._is_folder_empty(entry_path):
if await self._remove_empty_folder(entry_path):
moved_empty += 1
else:
move_failures += 1
continue
if not self._is_hash_folder(entry.name):
skipped_non_hash += 1
continue
hash_exists = (
lora_scanner.has_hash(entry.name)
or checkpoint_scanner.has_hash(entry.name)
or embedding_scanner.has_hash(entry.name)
)
if not hash_exists:
if await self._move_folder(entry_path, deleted_bucket):
moved_orphaned += 1
else:
move_failures += 1
except Exception as exc: # pragma: no cover - filesystem guard
move_failures += 1
error_message = f"{entry.name}: {exc}"
errors.append(error_message)
logger.error(
"Error processing example images folder %s: %s",
entry_path,
exc,
exc_info=True,
)
partial_success = move_failures > 0 and (moved_empty > 0 or moved_orphaned > 0)
success = move_failures == 0 and not errors
@@ -152,11 +201,12 @@ class ExampleImagesCleanupService:
skipped_non_hash=skipped_non_hash,
move_failures=move_failures,
errors=errors,
deleted_root=str(deleted_bucket),
deleted_root=str(deleted_roots[0]) if deleted_roots else None,
partial_success=partial_success,
)
summary = result.to_dict()
summary["deleted_roots"] = [str(path) for path in deleted_roots]
if success:
logger.info(
"Example images cleanup complete: checked=%s, moved_empty=%s, moved_orphaned=%s",

View File

@@ -8,6 +8,7 @@ import time
from typing import Any, Dict
from ..services.service_registry import ServiceRegistry
from ..utils.example_images_paths import ensure_library_root_exists
from ..utils.metadata_manager import MetadataManager
from .example_images_processor import ExampleImagesProcessor
from .example_images_metadata import MetadataUpdater
@@ -84,6 +85,13 @@ class DownloadManager:
self._ws_manager = ws_manager
self._state_lock = state_lock or asyncio.Lock()
def _resolve_output_dir(self) -> str:
base_path = settings.get('example_images_path')
if not base_path:
return ''
library_name = settings.get_active_library_name()
return ensure_library_root_exists(library_name)
async def start_download(self, options: dict):
"""Start downloading example images for models."""
@@ -98,9 +106,9 @@ class DownloadManager:
model_types = data.get('model_types', ['lora', 'checkpoint'])
delay = float(data.get('delay', 0.2))
output_dir = settings.get('example_images_path')
base_path = settings.get('example_images_path')
if not output_dir:
if not base_path:
error_msg = 'Example images path not configured in settings'
if auto_mode:
logger.debug(error_msg)
@@ -110,7 +118,9 @@ class DownloadManager:
}
raise DownloadConfigurationError(error_msg)
os.makedirs(output_dir, exist_ok=True)
output_dir = self._resolve_output_dir()
if not output_dir:
raise DownloadConfigurationError('Example images path not configured in settings')
self._progress.reset()
self._progress['status'] = 'running'
@@ -458,13 +468,14 @@ class DownloadManager:
if not model_hashes:
raise DownloadConfigurationError('Missing model_hashes parameter')
output_dir = settings.get('example_images_path')
base_path = settings.get('example_images_path')
if not base_path:
raise DownloadConfigurationError('Example images path not configured in settings')
output_dir = self._resolve_output_dir()
if not output_dir:
raise DownloadConfigurationError('Example images path not configured in settings')
os.makedirs(output_dir, exist_ok=True)
self._progress.reset()
self._progress['total'] = len(model_hashes)
self._progress['status'] = 'running'

View File

@@ -4,6 +4,10 @@ import sys
import subprocess
from aiohttp import web
from ..services.settings_manager import settings
from ..utils.example_images_paths import (
get_model_folder,
get_model_relative_path,
)
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
logger = logging.getLogger(__name__)
@@ -41,8 +45,12 @@ class ExampleImagesFileManager:
}, status=400)
# Construct folder path for this model
model_folder = os.path.join(example_images_path, model_hash)
model_folder = os.path.abspath(model_folder) # Get absolute path
model_folder = get_model_folder(model_hash)
if not model_folder:
return web.json_response({
'success': False,
'error': 'Failed to resolve example images folder for this model.'
}, status=500)
# Path validation: ensure model_folder is under example_images_path
if not model_folder.startswith(os.path.abspath(example_images_path)):
@@ -109,8 +117,13 @@ class ExampleImagesFileManager:
}, status=400)
# Construct folder path for this model
model_folder = os.path.join(example_images_path, model_hash)
model_folder = get_model_folder(model_hash)
if not model_folder:
return web.json_response({
'success': False,
'error': 'Failed to resolve example images folder for this model'
}, status=500)
# Check if folder exists
if not os.path.exists(model_folder):
return web.json_response({
@@ -128,9 +141,10 @@ class ExampleImagesFileManager:
file_ext = os.path.splitext(file)[1].lower()
if (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']):
relative_path = get_model_relative_path(model_hash)
files.append({
'name': file,
'path': f'/example_images_static/{model_hash}/{file}',
'path': f'/example_images_static/{relative_path}/{file}',
'extension': file_ext,
'is_video': file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
})
@@ -176,8 +190,13 @@ class ExampleImagesFileManager:
})
# Construct folder path for this model
model_folder = os.path.join(example_images_path, model_hash)
model_folder = get_model_folder(model_hash)
if not model_folder:
return web.json_response({
'has_images': False,
'error': 'Failed to resolve example images folder for this model'
})
# Check if folder exists
if not os.path.exists(model_folder) or not os.path.isdir(model_folder):
return web.json_response({

View File

@@ -5,6 +5,7 @@ import re
import json
from ..services.settings_manager import settings
from ..services.service_registry import ServiceRegistry
from ..utils.example_images_paths import iter_library_roots
from ..utils.metadata_manager import MetadataManager
from ..utils.example_images_processor import ExampleImagesProcessor
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
@@ -19,29 +20,35 @@ class ExampleImagesMigration:
@staticmethod
async def check_and_run_migrations():
"""Check if migrations are needed and run them in background"""
example_images_path = settings.get('example_images_path')
if not example_images_path or not os.path.exists(example_images_path):
root = settings.get('example_images_path')
if not root or not os.path.exists(root):
logger.debug("No example images path configured or path doesn't exist, skipping migrations")
return
# Check current version from progress file
current_version = 0
progress_file = os.path.join(example_images_path, '.download_progress.json')
if os.path.exists(progress_file):
try:
with open(progress_file, 'r', encoding='utf-8') as f:
progress_data = json.load(f)
current_version = progress_data.get('naming_version', 0)
except Exception as e:
logger.error(f"Failed to load progress file for migration check: {e}")
# If current version is less than target version, start migration
if current_version < CURRENT_NAMING_VERSION:
logger.info(f"Starting example images naming migration from v{current_version} to v{CURRENT_NAMING_VERSION}")
# Start migration in background task
asyncio.create_task(
ExampleImagesMigration.run_migrations(example_images_path, current_version, CURRENT_NAMING_VERSION)
)
for library_name, library_path in iter_library_roots():
if not library_path or not os.path.exists(library_path):
continue
current_version = 0
progress_file = os.path.join(library_path, '.download_progress.json')
if os.path.exists(progress_file):
try:
with open(progress_file, 'r', encoding='utf-8') as f:
progress_data = json.load(f)
current_version = progress_data.get('naming_version', 0)
except Exception as e:
logger.error(f"Failed to load progress file for migration check: {e}")
if current_version < CURRENT_NAMING_VERSION:
logger.info(
"Starting example images naming migration from v%s to v%s for library '%s'",
current_version,
CURRENT_NAMING_VERSION,
library_name,
)
asyncio.create_task(
ExampleImagesMigration.run_migrations(library_path, current_version, CURRENT_NAMING_VERSION)
)
@staticmethod
async def run_migrations(example_images_path, from_version, to_version):

View File

@@ -0,0 +1,193 @@
"""Utility helpers for resolving example image storage paths."""
from __future__ import annotations
import logging
import os
import re
import shutil
from typing import Iterable, List, Optional, Tuple
from ..services.settings_manager import settings
_HEX_PATTERN = re.compile(r"[a-fA-F0-9]{64}")
logger = logging.getLogger(__name__)
def _get_configured_libraries() -> List[str]:
"""Return configured library names if multi-library support is enabled."""
libraries = settings.get("libraries")
if isinstance(libraries, dict) and libraries:
return list(libraries.keys())
return []
def get_example_images_root() -> str:
"""Return the root directory configured for example images."""
root = settings.get("example_images_path") or ""
return os.path.abspath(root) if root else ""
def uses_library_scoped_folders() -> bool:
"""Return True when example images should be separated per library."""
libraries = _get_configured_libraries()
return len(libraries) > 1
def sanitize_library_name(library_name: Optional[str]) -> str:
"""Return a filesystem safe library name."""
name = library_name or settings.get_active_library_name() or "default"
safe_name = re.sub(r"[^A-Za-z0-9_.-]", "_", name)
return safe_name or "default"
def get_library_root(library_name: Optional[str] = None) -> str:
"""Return the directory where a library's example images should live."""
root = get_example_images_root()
if not root:
return ""
if uses_library_scoped_folders():
return os.path.join(root, sanitize_library_name(library_name))
return root
def ensure_library_root_exists(library_name: Optional[str] = None) -> str:
"""Ensure the example image directory for a library exists and return it."""
library_root = get_library_root(library_name)
if library_root:
os.makedirs(library_root, exist_ok=True)
return library_root
def get_model_folder(model_hash: str, library_name: Optional[str] = None) -> str:
"""Return the folder path for a model's example images."""
if not model_hash:
return ""
library_root = ensure_library_root_exists(library_name)
if not library_root:
return ""
normalized_hash = (model_hash or "").lower()
resolved_folder = os.path.join(library_root, normalized_hash)
if uses_library_scoped_folders():
legacy_root = get_example_images_root()
legacy_folder = os.path.join(legacy_root, normalized_hash)
if os.path.exists(legacy_folder) and not os.path.exists(resolved_folder):
try:
os.makedirs(library_root, exist_ok=True)
shutil.move(legacy_folder, resolved_folder)
logger.info(
"Migrated legacy example images folder '%s' to '%s'", legacy_folder, resolved_folder
)
except OSError as exc:
logger.error(
"Failed to migrate example images from '%s' to '%s': %s",
legacy_folder,
resolved_folder,
exc,
)
return legacy_folder
return resolved_folder
def get_model_relative_path(model_hash: str, library_name: Optional[str] = None) -> str:
"""Return the relative URL path from the static mount to a model folder."""
root = get_example_images_root()
folder = get_model_folder(model_hash, library_name)
if not root or not folder:
return ""
try:
relative = os.path.relpath(folder, root)
except ValueError:
return ""
return relative.replace("\\", "/")
def iter_library_roots() -> Iterable[Tuple[str, str]]:
"""Yield configured library names and their resolved filesystem roots."""
root = get_example_images_root()
if not root:
return []
libraries = _get_configured_libraries()
if uses_library_scoped_folders():
results: List[Tuple[str, str]] = []
if libraries:
for library in libraries:
results.append((library, get_library_root(library)))
else:
# Fall back to the active library to avoid skipping migrations/cleanup
active = settings.get_active_library_name() or "default"
results.append((active, get_library_root(active)))
return results
active = settings.get_active_library_name() or "default"
return [(active, root)]
def is_hash_folder(name: str) -> bool:
"""Return True if the provided name looks like a model hash folder."""
return bool(_HEX_PATTERN.fullmatch(name or ""))
def is_valid_example_images_root(folder_path: str) -> bool:
"""Check whether a folder looks like a dedicated example images root."""
try:
items = os.listdir(folder_path)
except OSError:
return False
for item in items:
item_path = os.path.join(folder_path, item)
if item == ".download_progress.json" and os.path.isfile(item_path):
continue
if os.path.isdir(item_path):
if is_hash_folder(item):
continue
if item == "_deleted":
# Allow cleanup staging folders
continue
# When multi-library mode is active we expect nested hash folders
if uses_library_scoped_folders():
if _library_folder_has_only_hash_dirs(item_path):
continue
return False
return True
def _library_folder_has_only_hash_dirs(path: str) -> bool:
"""Return True when a library subfolder only contains hash folders or metadata files."""
try:
for entry in os.listdir(path):
entry_path = os.path.join(path, entry)
if entry == ".download_progress.json" and os.path.isfile(entry_path):
continue
if entry == "_deleted" and os.path.isdir(entry_path):
continue
if not os.path.isdir(entry_path) or not is_hash_folder(entry):
return False
except OSError:
return False
return True

View File

@@ -7,6 +7,7 @@ from aiohttp import web
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
from ..services.service_registry import ServiceRegistry
from ..services.settings_manager import settings
from ..utils.example_images_paths import get_model_folder, get_model_relative_path
from .example_images_metadata import MetadataUpdater
from ..utils.metadata_manager import MetadataManager
@@ -346,7 +347,9 @@ class ExampleImagesProcessor:
)
# Create model folder
model_folder = os.path.join(example_images_path, model_hash)
model_folder = get_model_folder(model_hash)
if not model_folder:
raise ExampleImagesImportError('Failed to resolve model folder for example images')
os.makedirs(model_folder, exist_ok=True)
imported_files = []
@@ -383,7 +386,7 @@ class ExampleImagesProcessor:
# Add to imported files list
imported_files.append({
'name': new_filename,
'path': f'/example_images_static/{model_hash}/{new_filename}',
'path': f'/example_images_static/{get_model_relative_path(model_hash)}/{new_filename}',
'extension': file_ext,
'is_video': file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
})
@@ -497,7 +500,12 @@ class ExampleImagesProcessor:
}, status=404)
# Find and delete the actual file
model_folder = os.path.join(example_images_path, model_hash)
model_folder = get_model_folder(model_hash)
if not model_folder:
return web.json_response({
'success': False,
'error': 'Failed to resolve model folder for example images'
}, status=500)
file_deleted = False
if os.path.exists(model_folder):

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
import copy
import os
from pathlib import Path
import pytest
from py.services.settings_manager import settings
from py.utils.example_images_paths import get_model_folder, get_model_relative_path
@pytest.fixture(autouse=True)
def restore_settings():
original = copy.deepcopy(settings.settings)
try:
yield
finally:
settings.settings.clear()
settings.settings.update(original)
def test_get_model_folder_single_library(tmp_path):
settings.settings['example_images_path'] = str(tmp_path)
settings.settings['libraries'] = {'default': {}}
settings.settings['active_library'] = 'default'
model_hash = 'a' * 64
folder = get_model_folder(model_hash)
relative = get_model_relative_path(model_hash)
assert Path(folder) == tmp_path / model_hash
assert relative == model_hash
def test_get_model_folder_multi_library(tmp_path):
settings.settings['example_images_path'] = str(tmp_path)
settings.settings['libraries'] = {
'default': {},
'Alt Library': {},
}
settings.settings['active_library'] = 'Alt Library'
model_hash = 'b' * 64
expected_folder = tmp_path / 'Alt_Library' / model_hash
folder = get_model_folder(model_hash)
relative = get_model_relative_path(model_hash)
assert Path(folder) == expected_folder
assert relative == os.path.join('Alt_Library', model_hash).replace('\\', '/')
def test_get_model_folder_migrates_legacy_structure(tmp_path):
settings.settings['example_images_path'] = str(tmp_path)
settings.settings['libraries'] = {
'default': {},
'extra': {},
}
settings.settings['active_library'] = 'extra'
model_hash = 'c' * 64
legacy_folder = tmp_path / model_hash
legacy_folder.mkdir()
legacy_file = legacy_folder / 'image.png'
legacy_file.write_text('data', encoding='utf-8')
resolved_folder = get_model_folder(model_hash)
relative = get_model_relative_path(model_hash)
expected_folder = tmp_path / 'extra' / model_hash
assert Path(resolved_folder) == expected_folder
assert relative == os.path.join('extra', model_hash).replace('\\', '/')
assert not legacy_folder.exists()
assert (expected_folder / 'image.png').exists()