mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-22 05:32:12 -03:00
298 lines
10 KiB
Python
298 lines
10 KiB
Python
"""Service for cleaning up example image folders."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
from .service_registry import ServiceRegistry
|
|
from .settings_manager import get_settings_manager
|
|
from ..utils.example_images_paths import iter_library_roots
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class CleanupResult:
|
|
"""Structured result returned from cleanup operations."""
|
|
|
|
success: bool
|
|
checked_folders: int
|
|
moved_empty_folders: int
|
|
moved_orphaned_folders: int
|
|
skipped_non_hash: int
|
|
move_failures: int
|
|
errors: List[str]
|
|
deleted_root: str | None
|
|
partial_success: bool
|
|
|
|
def to_dict(self) -> Dict[str, object]:
|
|
"""Convert the dataclass to a serialisable dictionary."""
|
|
|
|
data = {
|
|
"success": self.success,
|
|
"checked_folders": self.checked_folders,
|
|
"moved_empty_folders": self.moved_empty_folders,
|
|
"moved_orphaned_folders": self.moved_orphaned_folders,
|
|
"moved_total": self.moved_empty_folders + self.moved_orphaned_folders,
|
|
"skipped_non_hash": self.skipped_non_hash,
|
|
"move_failures": self.move_failures,
|
|
"errors": self.errors,
|
|
"deleted_root": self.deleted_root,
|
|
"partial_success": self.partial_success,
|
|
}
|
|
|
|
return data
|
|
|
|
|
|
class ExampleImagesCleanupService:
|
|
"""Encapsulates logic for cleaning example image folders."""
|
|
|
|
DELETED_FOLDER_NAME = "_deleted"
|
|
|
|
def __init__(self, deleted_folder_name: str | None = None) -> None:
|
|
self._deleted_folder_name = deleted_folder_name or self.DELETED_FOLDER_NAME
|
|
|
|
async def cleanup_example_image_folders(self) -> Dict[str, object]:
|
|
"""Clean empty or orphaned example image folders by moving them under a deleted bucket."""
|
|
|
|
settings_manager = get_settings_manager()
|
|
example_images_path = settings_manager.get("example_images_path")
|
|
if not example_images_path:
|
|
logger.debug("Cleanup skipped: example images path not configured")
|
|
return {
|
|
"success": False,
|
|
"error": "Example images path is not configured.",
|
|
"error_code": "path_not_configured",
|
|
}
|
|
|
|
base_root = Path(example_images_path)
|
|
if not base_root.exists():
|
|
logger.debug("Cleanup skipped: example images path missing -> %s", base_root)
|
|
return {
|
|
"success": False,
|
|
"error": "Example images path does not exist.",
|
|
"error_code": "path_not_found",
|
|
}
|
|
|
|
try:
|
|
lora_scanner = await ServiceRegistry.get_lora_scanner()
|
|
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
|
|
embedding_scanner = await ServiceRegistry.get_embedding_scanner()
|
|
except Exception as exc: # pragma: no cover - defensive guard
|
|
logger.error("Failed to acquire scanners for cleanup: %s", exc, exc_info=True)
|
|
return {
|
|
"success": False,
|
|
"error": f"Failed to load model scanners: {exc}",
|
|
"error_code": "scanner_initialization_failed",
|
|
}
|
|
|
|
checked_folders = 0
|
|
moved_empty = 0
|
|
moved_orphaned = 0
|
|
skipped_non_hash = 0
|
|
move_failures = 0
|
|
errors: List[str] = []
|
|
|
|
resolved_base = base_root.resolve()
|
|
library_paths: List[Tuple[str, Path]] = []
|
|
processed_paths = {resolved_base}
|
|
|
|
for library_name, library_path in iter_library_roots():
|
|
if not library_path:
|
|
continue
|
|
library_root = Path(library_path)
|
|
try:
|
|
resolved = library_root.resolve()
|
|
except FileNotFoundError:
|
|
continue
|
|
if resolved in processed_paths:
|
|
continue
|
|
if not library_root.exists():
|
|
logger.debug(
|
|
"Skipping cleanup for library '%s': folder missing (%s)",
|
|
library_name,
|
|
library_root,
|
|
)
|
|
continue
|
|
processed_paths.add(resolved)
|
|
library_paths.append((library_name, library_root))
|
|
|
|
deleted_roots: List[Path] = []
|
|
|
|
# Build list of (label, root) pairs including the base root for legacy layouts
|
|
cleanup_targets: List[Tuple[str, Path]] = [("__base__", base_root)] + library_paths
|
|
|
|
library_root_set = {root.resolve() for _, root in library_paths}
|
|
|
|
for label, root_path in cleanup_targets:
|
|
deleted_bucket = root_path / self._deleted_folder_name
|
|
deleted_bucket.mkdir(exist_ok=True)
|
|
deleted_roots.append(deleted_bucket)
|
|
|
|
for entry in os.scandir(root_path):
|
|
if not entry.is_dir(follow_symlinks=False):
|
|
continue
|
|
|
|
if entry.name == self._deleted_folder_name:
|
|
continue
|
|
|
|
entry_path = Path(entry.path)
|
|
|
|
if label == "__base__":
|
|
try:
|
|
resolved_entry = entry_path.resolve()
|
|
except FileNotFoundError:
|
|
continue
|
|
if resolved_entry in library_root_set:
|
|
# Skip library-specific folders tracked separately
|
|
continue
|
|
|
|
checked_folders += 1
|
|
|
|
try:
|
|
if self._is_folder_empty(entry_path):
|
|
if await self._remove_empty_folder(entry_path):
|
|
moved_empty += 1
|
|
else:
|
|
move_failures += 1
|
|
continue
|
|
|
|
if not self._is_hash_folder(entry.name):
|
|
skipped_non_hash += 1
|
|
continue
|
|
|
|
hash_exists = (
|
|
lora_scanner.has_hash(entry.name)
|
|
or checkpoint_scanner.has_hash(entry.name)
|
|
or embedding_scanner.has_hash(entry.name)
|
|
)
|
|
|
|
if not hash_exists:
|
|
if await self._move_folder(entry_path, deleted_bucket):
|
|
moved_orphaned += 1
|
|
else:
|
|
move_failures += 1
|
|
|
|
except Exception as exc: # pragma: no cover - filesystem guard
|
|
move_failures += 1
|
|
error_message = f"{entry.name}: {exc}"
|
|
errors.append(error_message)
|
|
logger.error(
|
|
"Error processing example images folder %s: %s",
|
|
entry_path,
|
|
exc,
|
|
exc_info=True,
|
|
)
|
|
|
|
partial_success = move_failures > 0 and (moved_empty > 0 or moved_orphaned > 0)
|
|
success = move_failures == 0 and not errors
|
|
|
|
result = CleanupResult(
|
|
success=success,
|
|
checked_folders=checked_folders,
|
|
moved_empty_folders=moved_empty,
|
|
moved_orphaned_folders=moved_orphaned,
|
|
skipped_non_hash=skipped_non_hash,
|
|
move_failures=move_failures,
|
|
errors=errors,
|
|
deleted_root=str(deleted_roots[0]) if deleted_roots else None,
|
|
partial_success=partial_success,
|
|
)
|
|
|
|
summary = result.to_dict()
|
|
summary["deleted_roots"] = [str(path) for path in deleted_roots]
|
|
if success:
|
|
logger.info(
|
|
"Example images cleanup complete: checked=%s, moved_empty=%s, moved_orphaned=%s",
|
|
checked_folders,
|
|
moved_empty,
|
|
moved_orphaned,
|
|
)
|
|
elif partial_success:
|
|
logger.warning(
|
|
"Example images cleanup partially complete: moved=%s, failures=%s",
|
|
summary["moved_total"],
|
|
move_failures,
|
|
)
|
|
else:
|
|
logger.error(
|
|
"Example images cleanup failed: move_failures=%s, errors=%s",
|
|
move_failures,
|
|
errors,
|
|
)
|
|
|
|
return summary
|
|
|
|
@staticmethod
|
|
def _is_folder_empty(folder_path: Path) -> bool:
|
|
try:
|
|
with os.scandir(folder_path) as iterator:
|
|
return not any(iterator)
|
|
except FileNotFoundError:
|
|
return True
|
|
except OSError as exc: # pragma: no cover - defensive guard
|
|
logger.debug("Failed to inspect folder %s: %s", folder_path, exc)
|
|
return False
|
|
|
|
@staticmethod
|
|
def _is_hash_folder(name: str) -> bool:
|
|
if len(name) != 64:
|
|
return False
|
|
hex_chars = set("0123456789abcdefABCDEF")
|
|
return all(char in hex_chars for char in name)
|
|
|
|
async def _remove_empty_folder(self, folder_path: Path) -> bool:
|
|
loop = asyncio.get_running_loop()
|
|
|
|
try:
|
|
await loop.run_in_executor(
|
|
None,
|
|
shutil.rmtree,
|
|
str(folder_path),
|
|
)
|
|
logger.debug("Removed empty example images folder %s", folder_path)
|
|
return True
|
|
except Exception as exc: # pragma: no cover - filesystem guard
|
|
logger.error("Failed to remove empty example images folder %s: %s", folder_path, exc, exc_info=True)
|
|
return False
|
|
|
|
async def _move_folder(self, folder_path: Path, deleted_bucket: Path) -> bool:
|
|
destination = self._build_destination(folder_path.name, deleted_bucket)
|
|
loop = asyncio.get_running_loop()
|
|
|
|
try:
|
|
await loop.run_in_executor(
|
|
None,
|
|
shutil.move,
|
|
str(folder_path),
|
|
str(destination),
|
|
)
|
|
logger.debug("Moved example images folder %s -> %s", folder_path, destination)
|
|
return True
|
|
except Exception as exc: # pragma: no cover - filesystem guard
|
|
logger.error(
|
|
"Failed to move example images folder %s to %s: %s",
|
|
folder_path,
|
|
destination,
|
|
exc,
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
|
|
def _build_destination(self, folder_name: str, deleted_bucket: Path) -> Path:
|
|
destination = deleted_bucket / folder_name
|
|
suffix = 1
|
|
|
|
while destination.exists():
|
|
destination = deleted_bucket / f"{folder_name}_{suffix}"
|
|
suffix += 1
|
|
|
|
return destination
|