Files
ComfyUI-Lora-Manager/py/utils/models.py
Will Miao ee466113d5 feat: implement batch import recipe functionality (frontend + backend fixes)
Backend fixes:
- Add missing API route for /api/lm/recipes/batch-import/progress (GET)
- Add missing API route for /api/lm/recipes/batch-import/directory (POST)
- Add missing API route for /api/lm/recipes/browse-directory (POST)
- Register WebSocket endpoint for batch import progress
- Fix skip_no_metadata default value (True -> False) to allow no-LoRA imports
- Add items array to BatchImportProgress.to_dict() for detailed results

Frontend implementation:
- Create BatchImportManager.js with complete batch import workflow
- Add directory browser UI for selecting folders
- Add batch import modal with URL list and directory input modes
- Implement real-time progress tracking (WebSocket + HTTP polling)
- Add results summary with success/failed/skipped statistics
- Add expandable details view showing individual item status
- Auto-refresh recipe list after import completion

UI improvements:
- Add spinner animation for importing status
- Simplify results summary UI to match progress stats styling
- Fix current item text alignment
- Fix dark theme styling for directory browser button
- Fix batch import button styling consistency

Translations:
- Add batch import related i18n keys to all locale files
- Run sync_translation_keys.py to sync all translations

Fixes:
- Batch import now allows images without LoRAs (matches single import behavior)
- Progress endpoint now returns complete items array with status details
- Results view correctly displays skipped items with error messages
2026-03-16 09:41:58 +08:00

286 lines
11 KiB
Python

from dataclasses import dataclass, asdict, field
from typing import Dict, Optional, List, Any
from datetime import datetime
import os
from .model_utils import determine_base_model
@dataclass
class BaseModelMetadata:
"""Base class for all model metadata structures"""
file_name: str # The filename without extension
model_name: str # The model's name defined by the creator
file_path: str # Full path to the model file
size: int # File size in bytes
modified: float # Timestamp when the model was added to the management system
sha256: str # SHA256 hash of the file
base_model: str # Base model type (SD1.5/SD2.1/SDXL/etc.)
preview_url: str # Preview image URL
preview_nsfw_level: int = 0 # NSFW level of the preview image
notes: str = "" # Additional notes
from_civitai: bool = True # Whether from Civitai
civitai: Dict[str, Any] = field(
default_factory=dict
) # Civitai API data if available
tags: List[str] = None # Model tags
modelDescription: str = "" # Full model description
civitai_deleted: bool = False # Whether deleted from Civitai
favorite: bool = False # Whether the model is a favorite
exclude: bool = False # Whether to exclude this model from the cache
db_checked: bool = False # Whether checked in archive DB
skip_metadata_refresh: bool = (
False # Whether to skip this model during bulk metadata refresh
)
metadata_source: Optional[str] = None # Last provider that supplied metadata
last_checked_at: float = 0 # Last checked timestamp
hash_status: str = "completed" # Hash calculation status: pending | calculating | completed | failed
_unknown_fields: Dict[str, Any] = field(
default_factory=dict, repr=False, compare=False
) # Store unknown fields
def __post_init__(self):
# Initialize empty lists to avoid mutable default parameter issue
if self.civitai is None:
self.civitai = {}
if self.tags is None:
self.tags = []
@classmethod
def from_dict(cls, data: Dict) -> "BaseModelMetadata":
"""Create instance from dictionary"""
data_copy = data.copy()
# Use cached fields if available, otherwise compute them
if not hasattr(cls, "_known_fields_cache"):
known_fields = set()
for c in cls.mro():
if hasattr(c, "__annotations__"):
known_fields.update(c.__annotations__.keys())
cls._known_fields_cache = known_fields
known_fields = cls._known_fields_cache
# Extract fields that match our class attributes
fields_to_use = {k: v for k, v in data_copy.items() if k in known_fields}
# Store unknown fields separately
unknown_fields = {
k: v
for k, v in data_copy.items()
if k not in known_fields and not k.startswith("_")
}
# Create instance with known fields
instance = cls(**fields_to_use)
# Add unknown fields as a separate attribute
instance._unknown_fields = unknown_fields
return instance
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization"""
result = asdict(self)
# Remove private fields
result = {k: v for k, v in result.items() if not k.startswith("_")}
# Add back unknown fields if they exist
if hasattr(self, "_unknown_fields"):
result.update(self._unknown_fields)
return result
def update_civitai_info(self, civitai_data: Dict) -> None:
"""Update Civitai information"""
self.civitai = civitai_data
def update_file_info(self, file_path: str, update_timestamps: bool = False) -> None:
"""
Update metadata with actual file information.
Args:
file_path: Path to the model file
update_timestamps: If True, update size and modified from filesystem.
If False (default), only update file_path and file_name.
Set to True only when file has been moved/relocated.
"""
if os.path.exists(file_path):
if update_timestamps:
# Only update size and modified when file has been relocated
self.size = os.path.getsize(file_path)
self.modified = os.path.getmtime(file_path)
# Always update paths when this method is called
self.file_path = file_path.replace(os.sep, "/")
self.file_name = os.path.splitext(os.path.basename(file_path))[0]
@staticmethod
def generate_unique_filename(
target_dir: str, base_name: str, extension: str, hash_provider: callable = None
) -> str:
"""Generate a unique filename to avoid conflicts
Args:
target_dir: Target directory path
base_name: Base filename without extension
extension: File extension including the dot
hash_provider: A callable that returns the SHA256 hash when needed
Returns:
str: Unique filename that doesn't conflict with existing files
"""
original_filename = f"{base_name}{extension}"
target_path = os.path.join(target_dir, original_filename)
# If no conflict, return original filename
if not os.path.exists(target_path):
return original_filename
# Only compute hash when needed
if hash_provider:
sha256_hash = hash_provider()
else:
sha256_hash = "0000"
# Generate short hash (first 4 characters of SHA256)
short_hash = sha256_hash[:4] if sha256_hash else "0000"
# Try with short hash suffix
unique_filename = f"{base_name}-{short_hash}{extension}"
unique_path = os.path.join(target_dir, unique_filename)
# If still conflicts, add incremental number
counter = 1
while os.path.exists(unique_path):
unique_filename = f"{base_name}-{short_hash}-{counter}{extension}"
unique_path = os.path.join(target_dir, unique_filename)
counter += 1
return unique_filename
@dataclass
class LoraMetadata(BaseModelMetadata):
"""Represents the metadata structure for a Lora model"""
usage_tips: str = "{}" # Usage tips for the model, json string
@classmethod
def from_civitai_info(
cls, version_info: Dict, file_info: Dict, save_path: str
) -> "LoraMetadata":
"""Create LoraMetadata instance from Civitai version info"""
file_name = file_info.get("name", "")
base_model = determine_base_model(version_info.get("baseModel", ""))
# Extract tags and description if available
tags = []
description = ""
model_data = version_info.get("model") or {}
if "tags" in model_data:
tags = model_data["tags"]
if "description" in model_data:
description = model_data["description"]
return cls(
file_name=os.path.splitext(file_name)[0],
model_name=model_data.get("name", os.path.splitext(file_name)[0]),
file_path=save_path.replace(os.sep, "/"),
size=file_info.get("sizeKB", 0) * 1024,
modified=datetime.now().timestamp(),
sha256=(file_info.get("hashes") or {}).get("SHA256", "").lower(),
base_model=base_model,
preview_url="", # Will be updated after preview download
preview_nsfw_level=0, # Will be updated after preview download
from_civitai=True,
civitai=version_info,
tags=tags,
modelDescription=description,
)
@dataclass
class CheckpointMetadata(BaseModelMetadata):
"""Represents the metadata structure for a Checkpoint model"""
sub_type: str = "checkpoint" # Model sub-type (checkpoint, diffusion_model, etc.)
@classmethod
def from_civitai_info(
cls, version_info: Dict, file_info: Dict, save_path: str
) -> "CheckpointMetadata":
"""Create CheckpointMetadata instance from Civitai version info"""
file_name = file_info.get("name", "")
base_model = determine_base_model(version_info.get("baseModel", ""))
sub_type = version_info.get("type", "checkpoint")
# Extract tags and description if available
tags = []
description = ""
model_data = version_info.get("model") or {}
if "tags" in model_data:
tags = model_data["tags"]
if "description" in model_data:
description = model_data["description"]
return cls(
file_name=os.path.splitext(file_name)[0],
model_name=model_data.get("name", os.path.splitext(file_name)[0]),
file_path=save_path.replace(os.sep, "/"),
size=file_info.get("sizeKB", 0) * 1024,
modified=datetime.now().timestamp(),
sha256=(file_info.get("hashes") or {}).get("SHA256", "").lower(),
base_model=base_model,
preview_url="", # Will be updated after preview download
preview_nsfw_level=0,
from_civitai=True,
civitai=version_info,
sub_type=sub_type,
tags=tags,
modelDescription=description,
)
@dataclass
class EmbeddingMetadata(BaseModelMetadata):
"""Represents the metadata structure for an Embedding model"""
sub_type: str = "embedding"
@classmethod
def from_civitai_info(
cls, version_info: Dict, file_info: Dict, save_path: str
) -> "EmbeddingMetadata":
"""Create EmbeddingMetadata instance from Civitai version info"""
file_name = file_info.get("name", "")
base_model = determine_base_model(version_info.get("baseModel", ""))
sub_type = version_info.get("type", "embedding")
# Extract tags and description if available
tags = []
description = ""
model_data = version_info.get("model") or {}
if "tags" in model_data:
tags = model_data["tags"]
if "description" in model_data:
description = model_data["description"]
return cls(
file_name=os.path.splitext(file_name)[0],
model_name=model_data.get("name", os.path.splitext(file_name)[0]),
file_path=save_path.replace(os.sep, "/"),
size=file_info.get("sizeKB", 0) * 1024,
modified=datetime.now().timestamp(),
sha256=(file_info.get("hashes") or {}).get("SHA256", "").lower(),
base_model=base_model,
preview_url="", # Will be updated after preview download
preview_nsfw_level=0,
from_civitai=True,
civitai=version_info,
sub_type=sub_type,
tags=tags,
modelDescription=description,
)