feat: add cleanup example image folders functionality and UI integration

This commit is contained in:
Will Miao
2025-09-23 20:35:35 +08:00
parent 49fa37f00d
commit 656f1755fd
18 changed files with 568 additions and 126 deletions

View File

@@ -122,6 +122,15 @@
"noRemoteImagesAvailable": "Keine Remote-Beispielbilder für dieses Modell auf Civitai verfügbar"
}
},
"globalContextMenu": {
"cleanupExampleImages": {
"label": "Clean up example image folders",
"success": "Moved {count} folder(s) to the deleted folder",
"none": "No example image folders needed cleanup",
"partial": "Cleanup completed with {failures} folder(s) skipped",
"error": "Failed to clean example image folders: {message}"
}
},
"header": {
"appTitle": "LoRA Manager",
"navigation": {

View File

@@ -122,6 +122,15 @@
"noRemoteImagesAvailable": "No remote example images available for this model on Civitai"
}
},
"globalContextMenu": {
"cleanupExampleImages": {
"label": "Clean up example image folders",
"success": "Moved {count} folder(s) to the deleted folder",
"none": "No example image folders needed cleanup",
"partial": "Cleanup completed with {failures} folder(s) skipped",
"error": "Failed to clean example image folders: {message}"
}
},
"header": {
"appTitle": "LoRA Manager",
"navigation": {

View File

@@ -122,6 +122,15 @@
"noRemoteImagesAvailable": "No hay imágenes de ejemplo remotas disponibles para este modelo en Civitai"
}
},
"globalContextMenu": {
"cleanupExampleImages": {
"label": "Clean up example image folders",
"success": "Moved {count} folder(s) to the deleted folder",
"none": "No example image folders needed cleanup",
"partial": "Cleanup completed with {failures} folder(s) skipped",
"error": "Failed to clean example image folders: {message}"
}
},
"header": {
"appTitle": "LoRA Manager",
"navigation": {

View File

@@ -122,6 +122,15 @@
"noRemoteImagesAvailable": "Aucune image d'exemple distante disponible pour ce modèle sur Civitai"
}
},
"globalContextMenu": {
"cleanupExampleImages": {
"label": "Clean up example image folders",
"success": "Moved {count} folder(s) to the deleted folder",
"none": "No example image folders needed cleanup",
"partial": "Cleanup completed with {failures} folder(s) skipped",
"error": "Failed to clean example image folders: {message}"
}
},
"header": {
"appTitle": "LoRA Manager",
"navigation": {

View File

@@ -122,6 +122,15 @@
"noRemoteImagesAvailable": "このモデルのCivitaiでのリモート例画像は利用できません"
}
},
"globalContextMenu": {
"cleanupExampleImages": {
"label": "Clean up example image folders",
"success": "Moved {count} folder(s) to the deleted folder",
"none": "No example image folders needed cleanup",
"partial": "Cleanup completed with {failures} folder(s) skipped",
"error": "Failed to clean example image folders: {message}"
}
},
"header": {
"appTitle": "LoRA Manager",
"navigation": {

View File

@@ -122,6 +122,15 @@
"noRemoteImagesAvailable": "Civitai에서 이 모델의 원격 예시 이미지를 사용할 수 없습니다"
}
},
"globalContextMenu": {
"cleanupExampleImages": {
"label": "Clean up example image folders",
"success": "Moved {count} folder(s) to the deleted folder",
"none": "No example image folders needed cleanup",
"partial": "Cleanup completed with {failures} folder(s) skipped",
"error": "Failed to clean example image folders: {message}"
}
},
"header": {
"appTitle": "LoRA Manager",
"navigation": {

View File

@@ -122,6 +122,15 @@
"noRemoteImagesAvailable": "Нет удаленных примеров изображений для этой модели на Civitai"
}
},
"globalContextMenu": {
"cleanupExampleImages": {
"label": "Clean up example image folders",
"success": "Moved {count} folder(s) to the deleted folder",
"none": "No example image folders needed cleanup",
"partial": "Cleanup completed with {failures} folder(s) skipped",
"error": "Failed to clean example image folders: {message}"
}
},
"header": {
"appTitle": "LoRA Manager",
"navigation": {

View File

@@ -122,6 +122,15 @@
"noRemoteImagesAvailable": "此模型在 Civitai 上没有远程示例图片"
}
},
"globalContextMenu": {
"cleanupExampleImages": {
"label": "Clean up example image folders",
"success": "Moved {count} folder(s) to the deleted folder",
"none": "No example image folders needed cleanup",
"partial": "Cleanup completed with {failures} folder(s) skipped",
"error": "Failed to clean example image folders: {message}"
}
},
"header": {
"appTitle": "LoRA 管理器",
"navigation": {

View File

@@ -122,6 +122,15 @@
"noRemoteImagesAvailable": "此模型在 Civitai 上無遠端範例圖片"
}
},
"globalContextMenu": {
"cleanupExampleImages": {
"label": "Clean up example image folders",
"success": "Moved {count} folder(s) to the deleted folder",
"none": "No example image folders needed cleanup",
"partial": "Cleanup completed with {failures} folder(s) skipped",
"error": "Failed to clean example image folders: {message}"
}
},
"header": {
"appTitle": "LoRA 管理器",
"navigation": {

View File

@@ -16,6 +16,7 @@ from .services.service_registry import ServiceRegistry
from .services.settings_manager import settings
from .utils.example_images_migration import ExampleImagesMigration
from .services.websocket_manager import ws_manager
from .services.example_images_cleanup_service import ExampleImagesCleanupService
logger = logging.getLogger(__name__)
@@ -240,7 +241,6 @@ class LoraManager:
# Run post-initialization tasks
post_tasks = [
asyncio.create_task(cls._cleanup_backup_files(), name='cleanup_bak_files'),
asyncio.create_task(cls._cleanup_example_images_folders(), name='cleanup_example_images'),
# Add more post-initialization tasks here as needed
# asyncio.create_task(cls._another_post_task(), name='another_task'),
]
@@ -352,116 +352,37 @@ class LoraManager:
@classmethod
async def _cleanup_example_images_folders(cls):
"""Clean up invalid or empty folders in example images directory"""
"""Invoke the example images cleanup service for manual execution."""
try:
example_images_path = settings.get('example_images_path')
if not example_images_path or not os.path.exists(example_images_path):
logger.debug("Example images path not configured or doesn't exist, skipping cleanup")
return
logger.debug(f"Starting cleanup of example images folders in: {example_images_path}")
# Get all scanner instances to check hash validity
lora_scanner = await ServiceRegistry.get_lora_scanner()
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
embedding_scanner = await ServiceRegistry.get_embedding_scanner()
total_folders_checked = 0
empty_folders_removed = 0
orphaned_folders_removed = 0
# Scan the example images directory
try:
with os.scandir(example_images_path) as it:
for entry in it:
if not entry.is_dir(follow_symlinks=False):
continue
folder_name = entry.name
folder_path = entry.path
total_folders_checked += 1
try:
# Check if folder is empty
is_empty = cls._is_folder_empty(folder_path)
if is_empty:
logger.debug(f"Removing empty example images folder: {folder_name}")
await cls._remove_folder_safely(folder_path)
empty_folders_removed += 1
continue
# Check if folder name is a valid SHA256 hash (64 hex characters)
if len(folder_name) != 64 or not all(c in '0123456789abcdefABCDEF' for c in folder_name):
# Skip non-hash folders to avoid deleting other content
logger.debug(f"Skipping non-hash folder: {folder_name}")
continue
# Check if hash exists in any of the scanners
hash_exists = (
lora_scanner.has_hash(folder_name) or
checkpoint_scanner.has_hash(folder_name) or
embedding_scanner.has_hash(folder_name)
)
if not hash_exists:
logger.debug(f"Removing example images folder for deleted model: {folder_name}")
await cls._remove_folder_safely(folder_path)
orphaned_folders_removed += 1
continue
service = ExampleImagesCleanupService()
result = await service.cleanup_example_image_folders()
except Exception as e:
logger.error(f"Error processing example images folder {folder_name}: {e}")
# Yield control periodically
await asyncio.sleep(0.01)
except Exception as e:
logger.error(f"Error scanning example images directory: {e}")
return
# Log final cleanup report
total_removed = empty_folders_removed + orphaned_folders_removed
if total_removed > 0:
logger.info(f"Example images cleanup completed: checked {total_folders_checked} folders, "
f"removed {empty_folders_removed} empty folders and {orphaned_folders_removed} "
f"folders for deleted models (total: {total_removed} removed)")
if result.get('success'):
logger.debug(
"Manual example images cleanup completed: moved=%s",
result.get('moved_total'),
)
elif result.get('partial_success'):
logger.warning(
"Manual example images cleanup partially succeeded: moved=%s failures=%s",
result.get('moved_total'),
result.get('move_failures'),
)
else:
logger.debug(f"Example images cleanup completed: checked {total_folders_checked} folders, "
f"no cleanup needed")
except Exception as e:
logger.debug(
"Manual example images cleanup skipped or failed: %s",
result.get('error', 'no changes'),
)
return result
except Exception as e: # pragma: no cover - defensive guard
logger.error(f"Error during example images cleanup: {e}", exc_info=True)
@classmethod
def _is_folder_empty(cls, folder_path: str) -> bool:
"""Check if a folder is empty
Args:
folder_path: Path to the folder to check
Returns:
bool: True if folder is empty, False otherwise
"""
try:
with os.scandir(folder_path) as it:
return not any(it)
except Exception as e:
logger.debug(f"Error checking if folder is empty {folder_path}: {e}")
return False
@classmethod
async def _remove_folder_safely(cls, folder_path: str):
"""Safely remove a folder and all its contents
Args:
folder_path: Path to the folder to remove
"""
try:
import shutil
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, shutil.rmtree, folder_path)
except Exception as e:
logger.warning(f"Failed to remove folder {folder_path}: {e}")
return {
'success': False,
'error': str(e),
'error_code': 'unexpected_error',
}
@classmethod
async def _cleanup(cls, app):

View File

@@ -27,6 +27,7 @@ ROUTE_DEFINITIONS: tuple[RouteDefinition, ...] = (
RouteDefinition("GET", "/api/lm/has-example-images", "has_example_images"),
RouteDefinition("POST", "/api/lm/delete-example-image", "delete_example_image"),
RouteDefinition("POST", "/api/lm/force-download-example-images", "force_download_example_images"),
RouteDefinition("POST", "/api/lm/cleanup-example-image-folders", "cleanup_example_image_folders"),
)

View File

@@ -22,6 +22,7 @@ from ..utils.example_images_download_manager import (
)
from ..utils.example_images_file_manager import ExampleImagesFileManager
from ..utils.example_images_processor import ExampleImagesProcessor
from ..services.example_images_cleanup_service import ExampleImagesCleanupService
logger = logging.getLogger(__name__)
@@ -36,12 +37,14 @@ class ExampleImagesRoutes:
download_manager: DownloadManager | None = None,
processor=ExampleImagesProcessor,
file_manager=ExampleImagesFileManager,
cleanup_service: ExampleImagesCleanupService | None = None,
) -> None:
if ws_manager is None:
raise ValueError("ws_manager is required")
self._download_manager = download_manager or get_default_download_manager(ws_manager)
self._processor = processor
self._file_manager = file_manager
self._cleanup_service = cleanup_service or ExampleImagesCleanupService()
self._handler_set: ExampleImagesHandlerSet | None = None
self._handler_mapping: Mapping[str, Callable[[web.Request], web.StreamResponse]] | None = None
@@ -72,7 +75,11 @@ class ExampleImagesRoutes:
download_use_case = DownloadExampleImagesUseCase(download_manager=self._download_manager)
download_handler = ExampleImagesDownloadHandler(download_use_case, self._download_manager)
import_use_case = ImportExampleImagesUseCase(processor=self._processor)
management_handler = ExampleImagesManagementHandler(import_use_case, self._processor)
management_handler = ExampleImagesManagementHandler(
import_use_case,
self._processor,
self._cleanup_service,
)
file_handler = ExampleImagesFileHandler(self._file_manager)
return ExampleImagesHandlerSet(
download=download_handler,

View File

@@ -89,9 +89,10 @@ class ExampleImagesDownloadHandler:
class ExampleImagesManagementHandler:
"""HTTP adapters for import/delete endpoints."""
def __init__(self, import_use_case: ImportExampleImagesUseCase, processor) -> None:
def __init__(self, import_use_case: ImportExampleImagesUseCase, processor, cleanup_service) -> None:
self._import_use_case = import_use_case
self._processor = processor
self._cleanup_service = cleanup_service
async def import_example_images(self, request: web.Request) -> web.StreamResponse:
try:
@@ -105,6 +106,16 @@ class ExampleImagesManagementHandler:
async def delete_example_image(self, request: web.Request) -> web.StreamResponse:
return await self._processor.delete_custom_image(request)
async def cleanup_example_image_folders(self, request: web.Request) -> web.StreamResponse:
result = await self._cleanup_service.cleanup_example_image_folders()
if result.get('success') or result.get('partial_success'):
return web.json_response(result, status=200)
error_code = result.get('error_code')
status = 400 if error_code in {'path_not_configured', 'path_not_found'} else 500
return web.json_response(result, status=status)
class ExampleImagesFileHandler:
"""HTTP adapters for filesystem-centric endpoints."""
@@ -141,6 +152,7 @@ class ExampleImagesHandlerSet:
"force_download_example_images": self.download.force_download_example_images,
"import_example_images": self.management.import_example_images,
"delete_example_image": self.management.delete_example_image,
"cleanup_example_image_folders": self.management.cleanup_example_image_folders,
"open_example_images_folder": self.files.open_example_images_folder,
"get_example_image_files": self.files.get_example_image_files,
"has_example_images": self.files.has_example_images,

View File

@@ -0,0 +1,246 @@
"""Service for cleaning up example image folders."""
from __future__ import annotations
import asyncio
import logging
import os
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List
from .service_registry import ServiceRegistry
from .settings_manager import settings
logger = logging.getLogger(__name__)
@dataclass(slots=True)
class CleanupResult:
"""Structured result returned from cleanup operations."""
success: bool
checked_folders: int
moved_empty_folders: int
moved_orphaned_folders: int
skipped_non_hash: int
move_failures: int
errors: List[str]
deleted_root: str | None
partial_success: bool
def to_dict(self) -> Dict[str, object]:
"""Convert the dataclass to a serialisable dictionary."""
data = {
"success": self.success,
"checked_folders": self.checked_folders,
"moved_empty_folders": self.moved_empty_folders,
"moved_orphaned_folders": self.moved_orphaned_folders,
"moved_total": self.moved_empty_folders + self.moved_orphaned_folders,
"skipped_non_hash": self.skipped_non_hash,
"move_failures": self.move_failures,
"errors": self.errors,
"deleted_root": self.deleted_root,
"partial_success": self.partial_success,
}
return data
class ExampleImagesCleanupService:
"""Encapsulates logic for cleaning example image folders."""
DELETED_FOLDER_NAME = "_deleted"
def __init__(self, deleted_folder_name: str | None = None) -> None:
self._deleted_folder_name = deleted_folder_name or self.DELETED_FOLDER_NAME
async def cleanup_example_image_folders(self) -> Dict[str, object]:
"""Clean empty or orphaned example image folders by moving them under a deleted bucket."""
example_images_path = settings.get("example_images_path")
if not example_images_path:
logger.debug("Cleanup skipped: example images path not configured")
return {
"success": False,
"error": "Example images path is not configured.",
"error_code": "path_not_configured",
}
example_root = Path(example_images_path)
if not example_root.exists():
logger.debug("Cleanup skipped: example images path missing -> %s", example_root)
return {
"success": False,
"error": "Example images path does not exist.",
"error_code": "path_not_found",
}
try:
lora_scanner = await ServiceRegistry.get_lora_scanner()
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
embedding_scanner = await ServiceRegistry.get_embedding_scanner()
except Exception as exc: # pragma: no cover - defensive guard
logger.error("Failed to acquire scanners for cleanup: %s", exc, exc_info=True)
return {
"success": False,
"error": f"Failed to load model scanners: {exc}",
"error_code": "scanner_initialization_failed",
}
deleted_bucket = example_root / self._deleted_folder_name
deleted_bucket.mkdir(exist_ok=True)
checked_folders = 0
moved_empty = 0
moved_orphaned = 0
skipped_non_hash = 0
move_failures = 0
errors: List[str] = []
for entry in os.scandir(example_root):
if not entry.is_dir(follow_symlinks=False):
continue
if entry.name == self._deleted_folder_name:
continue
checked_folders += 1
folder_path = Path(entry.path)
try:
if self._is_folder_empty(folder_path):
if await self._remove_empty_folder(folder_path):
moved_empty += 1
else:
move_failures += 1
continue
if not self._is_hash_folder(entry.name):
skipped_non_hash += 1
continue
hash_exists = (
lora_scanner.has_hash(entry.name)
or checkpoint_scanner.has_hash(entry.name)
or embedding_scanner.has_hash(entry.name)
)
if not hash_exists:
if await self._move_folder(folder_path, deleted_bucket):
moved_orphaned += 1
else:
move_failures += 1
except Exception as exc: # pragma: no cover - filesystem guard
move_failures += 1
error_message = f"{entry.name}: {exc}"
errors.append(error_message)
logger.error("Error processing example images folder %s: %s", folder_path, exc, exc_info=True)
partial_success = move_failures > 0 and (moved_empty > 0 or moved_orphaned > 0)
success = move_failures == 0 and not errors
result = CleanupResult(
success=success,
checked_folders=checked_folders,
moved_empty_folders=moved_empty,
moved_orphaned_folders=moved_orphaned,
skipped_non_hash=skipped_non_hash,
move_failures=move_failures,
errors=errors,
deleted_root=str(deleted_bucket),
partial_success=partial_success,
)
summary = result.to_dict()
if success:
logger.info(
"Example images cleanup complete: checked=%s, moved_empty=%s, moved_orphaned=%s",
checked_folders,
moved_empty,
moved_orphaned,
)
elif partial_success:
logger.warning(
"Example images cleanup partially complete: moved=%s, failures=%s",
summary["moved_total"],
move_failures,
)
else:
logger.error(
"Example images cleanup failed: move_failures=%s, errors=%s",
move_failures,
errors,
)
return summary
@staticmethod
def _is_folder_empty(folder_path: Path) -> bool:
try:
with os.scandir(folder_path) as iterator:
return not any(iterator)
except FileNotFoundError:
return True
except OSError as exc: # pragma: no cover - defensive guard
logger.debug("Failed to inspect folder %s: %s", folder_path, exc)
return False
@staticmethod
def _is_hash_folder(name: str) -> bool:
if len(name) != 64:
return False
hex_chars = set("0123456789abcdefABCDEF")
return all(char in hex_chars for char in name)
async def _remove_empty_folder(self, folder_path: Path) -> bool:
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(
None,
shutil.rmtree,
str(folder_path),
)
logger.debug("Removed empty example images folder %s", folder_path)
return True
except Exception as exc: # pragma: no cover - filesystem guard
logger.error("Failed to remove empty example images folder %s: %s", folder_path, exc, exc_info=True)
return False
async def _move_folder(self, folder_path: Path, deleted_bucket: Path) -> bool:
destination = self._build_destination(folder_path.name, deleted_bucket)
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(
None,
shutil.move,
str(folder_path),
str(destination),
)
logger.debug("Moved example images folder %s -> %s", folder_path, destination)
return True
except Exception as exc: # pragma: no cover - filesystem guard
logger.error(
"Failed to move example images folder %s to %s: %s",
folder_path,
destination,
exc,
exc_info=True,
)
return False
def _build_destination(self, folder_name: str, deleted_bucket: Path) -> Path:
destination = deleted_bucket / folder_name
suffix = 1
while destination.exists():
destination = deleted_bucket / f"{folder_name}_{suffix}"
suffix += 1
return destination

View File

@@ -1,24 +1,75 @@
import { BaseContextMenu } from './BaseContextMenu.js';
import { showToast } from '../../utils/uiHelpers.js';
export class GlobalContextMenu extends BaseContextMenu {
constructor() {
super('globalContextMenu');
this._cleanupInProgress = false;
}
showMenu(x, y) {
super.showMenu(x, y, null);
showMenu(x, y, origin = null) {
const contextOrigin = origin || { type: 'global' };
super.showMenu(x, y, contextOrigin);
}
handleMenuAction(action, menuItem) {
switch (action) {
case 'placeholder-one':
case 'placeholder-two':
case 'placeholder-three':
console.info(`Global context menu action triggered: ${action}`);
case 'cleanup-example-images-folders':
this.cleanupExampleImagesFolders(menuItem).catch((error) => {
console.error('Failed to trigger example images cleanup:', error);
});
break;
default:
console.warn(`Unhandled global context menu action: ${action}`);
break;
}
}
async cleanupExampleImagesFolders(menuItem) {
if (this._cleanupInProgress) {
return;
}
this._cleanupInProgress = true;
menuItem?.classList.add('disabled');
try {
const response = await fetch('/api/lm/cleanup-example-image-folders', {
method: 'POST',
});
let payload;
try {
payload = await response.json();
} catch (parseError) {
payload = { error: 'Unexpected response format.' };
}
if (response.ok && (payload.success || payload.partial_success)) {
const movedTotal = payload.moved_total || 0;
if (movedTotal > 0) {
showToast('globalContextMenu.cleanupExampleImages.success', { count: movedTotal }, 'success');
} else {
showToast('globalContextMenu.cleanupExampleImages.none', {}, 'info');
}
if (payload.partial_success) {
showToast(
'globalContextMenu.cleanupExampleImages.partial',
{ failures: payload.move_failures ?? 0 },
'warning',
);
}
} else {
const message = payload?.error || 'Unknown error';
showToast('globalContextMenu.cleanupExampleImages.error', { message }, 'error');
}
} catch (error) {
showToast('globalContextMenu.cleanupExampleImages.error', { message: error.message || 'Unknown error' }, 'error');
} finally {
this._cleanupInProgress = false;
menuItem?.classList.remove('disabled');
}
}
}

View File

@@ -85,14 +85,8 @@
</div>
<div id="globalContextMenu" class="context-menu">
<div class="context-menu-item" data-action="placeholder-one">
<i class="fas fa-bolt"></i> <span>Global action one</span>
</div>
<div class="context-menu-item" data-action="placeholder-two">
<i class="fas fa-layer-group"></i> <span>Global action two</span>
</div>
<div class="context-menu-item" data-action="placeholder-three">
<i class="fas fa-cog"></i> <span>Global action three</span>
<div class="context-menu-item" data-action="cleanup-example-images-folders">
<i class="fas fa-trash-restore"></i> <span>{{ t('globalContextMenu.cleanupExampleImages.label') }}</span>
</div>
</div>
@@ -115,4 +109,4 @@
<div id="nodeSelector" class="node-selector">
<!-- Dynamic node list will be populated here -->
</div>
</div>

View File

@@ -27,6 +27,7 @@ class ExampleImagesHarness:
download_manager: "StubDownloadManager"
processor: "StubExampleImagesProcessor"
file_manager: "StubExampleImagesFileManager"
cleanup_service: "StubExampleImagesCleanupService"
controller: ExampleImagesRoutes
@@ -88,6 +89,21 @@ class StubExampleImagesFileManager:
return web.json_response({"operation": "has_images", "query": dict(request.query)})
class StubExampleImagesCleanupService:
def __init__(self) -> None:
self.calls: List[Dict[str, Any]] = []
self.result: Dict[str, Any] = {
"success": True,
"moved_total": 0,
"moved_empty_folders": 0,
"moved_orphaned_folders": 0,
}
async def cleanup_example_image_folders(self) -> Dict[str, Any]:
self.calls.append({})
return self.result
class StubWebSocketManager:
def __init__(self) -> None:
self.broadcast_calls: List[Dict[str, Any]] = []
@@ -103,6 +119,7 @@ async def example_images_app() -> ExampleImagesHarness:
download_manager = StubDownloadManager()
processor = StubExampleImagesProcessor()
file_manager = StubExampleImagesFileManager()
cleanup_service = StubExampleImagesCleanupService()
ws_manager = StubWebSocketManager()
controller = ExampleImagesRoutes(
@@ -110,6 +127,7 @@ async def example_images_app() -> ExampleImagesHarness:
download_manager=download_manager,
processor=processor,
file_manager=file_manager,
cleanup_service=cleanup_service,
)
app = web.Application()
@@ -125,6 +143,7 @@ async def example_images_app() -> ExampleImagesHarness:
download_manager=download_manager,
processor=processor,
file_manager=file_manager,
cleanup_service=cleanup_service,
controller=controller,
)
finally:
@@ -255,6 +274,23 @@ async def test_file_routes_delegate_to_file_manager():
]
async def test_cleanup_route_delegates_to_service():
async with example_images_app() as harness:
harness.cleanup_service.result = {
"success": True,
"moved_total": 2,
"moved_empty_folders": 1,
"moved_orphaned_folders": 1,
}
response = await harness.client.post("/api/lm/cleanup-example-image-folders")
body = await response.json()
assert response.status == 200
assert body == harness.cleanup_service.result
assert len(harness.cleanup_service.calls) == 1
@pytest.mark.asyncio
async def test_download_handler_methods_delegate() -> None:
class Recorder:
@@ -337,15 +373,20 @@ async def test_management_handler_methods_delegate() -> None:
return "delete"
recorder = Recorder()
cleanup_service = StubExampleImagesCleanupService()
use_case = StubImportUseCase()
handler = ExampleImagesManagementHandler(use_case, recorder)
handler = ExampleImagesManagementHandler(use_case, recorder, cleanup_service)
request = object()
import_response = await handler.import_example_images(request)
assert json.loads(import_response.text) == {"status": "imported"}
assert await handler.delete_example_image(request) == "delete"
cleanup_service.result = {"success": True}
cleanup_response = await handler.cleanup_example_image_folders(request)
assert json.loads(cleanup_response.text) == {"success": True}
assert use_case.requests == [request]
assert recorder.calls == [("delete_custom_image", request)]
assert len(cleanup_service.calls) == 1
@pytest.mark.asyncio
@@ -403,7 +444,8 @@ def test_handler_set_route_mapping_includes_all_handlers() -> None:
return {}
download = ExampleImagesDownloadHandler(DummyUseCase(), DummyManager())
management = ExampleImagesManagementHandler(DummyUseCase(), DummyProcessor())
cleanup_service = StubExampleImagesCleanupService()
management = ExampleImagesManagementHandler(DummyUseCase(), DummyProcessor(), cleanup_service)
files = ExampleImagesFileHandler(object())
handler_set = ExampleImagesHandlerSet(
download=download,
@@ -421,6 +463,7 @@ def test_handler_set_route_mapping_includes_all_handlers() -> None:
"force_download_example_images",
"import_example_images",
"delete_example_image",
"cleanup_example_image_folders",
"open_example_images_folder",
"get_example_image_files",
"has_example_images",

View File

@@ -0,0 +1,86 @@
from __future__ import annotations
from pathlib import Path
import pytest
from py.services.example_images_cleanup_service import ExampleImagesCleanupService
from py.services.service_registry import ServiceRegistry
from py.services.settings_manager import settings
class StubScanner:
def __init__(self, valid_hashes: set[str] | None = None) -> None:
self._valid_hashes = valid_hashes or set()
def has_hash(self, value: str) -> bool:
return value in self._valid_hashes
@pytest.mark.asyncio
async def test_cleanup_moves_empty_and_orphaned(tmp_path, monkeypatch):
service = ExampleImagesCleanupService()
previous_path = settings.get('example_images_path')
settings.settings['example_images_path'] = str(tmp_path)
try:
empty_folder = tmp_path / 'empty_folder'
empty_folder.mkdir()
orphan_hash = 'a' * 64
orphan_folder = tmp_path / orphan_hash
orphan_folder.mkdir()
(orphan_folder / 'image.png').write_text('data', encoding='utf-8')
valid_hash = 'b' * 64
valid_folder = tmp_path / valid_hash
valid_folder.mkdir()
(valid_folder / 'image.png').write_text('data', encoding='utf-8')
matching_scanner = StubScanner({valid_hash})
empty_scanner = StubScanner()
async def get_matching_scanner(*_args, **_kwargs):
return matching_scanner
async def get_empty_scanner(*_args, **_kwargs):
return empty_scanner
monkeypatch.setattr(ServiceRegistry, 'get_lora_scanner', get_matching_scanner)
monkeypatch.setattr(ServiceRegistry, 'get_checkpoint_scanner', get_empty_scanner)
monkeypatch.setattr(ServiceRegistry, 'get_embedding_scanner', get_empty_scanner)
result = await service.cleanup_example_image_folders()
deleted_bucket = Path(result['deleted_root'])
assert result['success'] is True
assert result['moved_total'] == 2
assert not empty_folder.exists()
assert not (deleted_bucket / 'empty_folder').exists()
assert (deleted_bucket / orphan_hash).exists()
assert not orphan_folder.exists()
assert valid_folder.exists()
finally:
if previous_path is None:
settings.settings.pop('example_images_path', None)
else:
settings.settings['example_images_path'] = previous_path
@pytest.mark.asyncio
async def test_cleanup_handles_missing_path(monkeypatch):
service = ExampleImagesCleanupService()
previous_path = settings.get('example_images_path')
settings.settings.pop('example_images_path', None)
try:
result = await service.cleanup_example_image_folders()
finally:
if previous_path is not None:
settings.settings['example_images_path'] = previous_path
assert result['success'] is False
assert result['error_code'] == 'path_not_configured'