mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-22 13:42:12 -03:00
- Change `model_type` field to `sub_type` for checkpoint models to improve naming consistency - Add `sub_type="embedding"` for embedding models to properly categorize model subtypes - Maintain backward compatibility with existing metadata structure
336 lines
13 KiB
Python
336 lines
13 KiB
Python
from datetime import datetime
|
|
import os
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Any, Dict, Optional, Type, Union
|
|
|
|
from .models import BaseModelMetadata, LoraMetadata
|
|
from .file_utils import normalize_path, find_preview_file, calculate_sha256
|
|
from .lora_metadata import extract_lora_metadata, extract_checkpoint_metadata
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MetadataManager:
|
|
"""
|
|
Centralized manager for all metadata operations.
|
|
|
|
This class is responsible for:
|
|
1. Loading metadata safely with fallback mechanisms
|
|
2. Saving metadata with atomic operations
|
|
3. Creating default metadata for models
|
|
4. Handling unknown fields gracefully
|
|
"""
|
|
|
|
@staticmethod
|
|
async def load_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> tuple[Optional[BaseModelMetadata], bool]:
|
|
"""
|
|
Load metadata safely.
|
|
|
|
Returns:
|
|
tuple: (metadata, should_skip)
|
|
- metadata: BaseModelMetadata instance or None
|
|
- should_skip: True if corrupted metadata file exists and model should be skipped
|
|
"""
|
|
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
|
|
|
|
# Check if metadata file exists
|
|
if not os.path.exists(metadata_path):
|
|
return None, False
|
|
|
|
try:
|
|
with open(metadata_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Create model instance
|
|
metadata = model_class.from_dict(data)
|
|
|
|
# Normalize paths
|
|
await MetadataManager._normalize_metadata_paths(metadata, file_path)
|
|
|
|
return metadata, False
|
|
|
|
except (json.JSONDecodeError, Exception) as e:
|
|
error_type = "Invalid JSON" if isinstance(e, json.JSONDecodeError) else "Parse error"
|
|
logger.error(f"{error_type} in metadata file: {metadata_path}. Error: {str(e)}. Skipping model to preserve existing data.")
|
|
return None, True # should_skip = True
|
|
|
|
@staticmethod
|
|
async def load_metadata_payload(file_path: str) -> Dict:
|
|
"""
|
|
Load metadata and return it as a dictionary, including any unknown fields.
|
|
Falls back to reading the raw JSON file if parsing into a model class fails.
|
|
"""
|
|
|
|
payload: Dict = {}
|
|
metadata_obj, should_skip = await MetadataManager.load_metadata(file_path)
|
|
|
|
if metadata_obj:
|
|
payload = metadata_obj.to_dict()
|
|
unknown_fields = getattr(metadata_obj, "_unknown_fields", None)
|
|
if isinstance(unknown_fields, dict):
|
|
payload.update(unknown_fields)
|
|
else:
|
|
if not should_skip:
|
|
metadata_path = (
|
|
file_path
|
|
if file_path.endswith(".metadata.json")
|
|
else f"{os.path.splitext(file_path)[0]}.metadata.json"
|
|
)
|
|
if os.path.exists(metadata_path):
|
|
try:
|
|
with open(metadata_path, "r", encoding="utf-8") as handle:
|
|
raw = json.load(handle)
|
|
if isinstance(raw, dict):
|
|
payload = raw
|
|
except json.JSONDecodeError:
|
|
logger.warning(
|
|
"Failed to parse metadata file %s while loading payload",
|
|
metadata_path,
|
|
)
|
|
except Exception as exc: # pragma: no cover - defensive logging
|
|
logger.warning("Failed to read metadata file %s: %s", metadata_path, exc)
|
|
|
|
if not isinstance(payload, dict):
|
|
payload = {}
|
|
|
|
if file_path:
|
|
payload.setdefault("file_path", normalize_path(file_path))
|
|
|
|
return payload
|
|
|
|
@staticmethod
|
|
async def hydrate_model_data(model_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Replace the provided model data with the authoritative payload from disk.
|
|
Preserves the cached folder entry if present.
|
|
"""
|
|
|
|
file_path = model_data.get("file_path")
|
|
if not file_path:
|
|
return model_data
|
|
|
|
folder = model_data.get("folder")
|
|
payload = await MetadataManager.load_metadata_payload(file_path)
|
|
if folder is not None:
|
|
payload["folder"] = folder
|
|
|
|
model_data.clear()
|
|
model_data.update(payload)
|
|
return model_data
|
|
|
|
@staticmethod
|
|
async def save_metadata(path: str, metadata: Union[BaseModelMetadata, Dict]) -> bool:
|
|
"""
|
|
Save metadata with atomic write operations.
|
|
|
|
Args:
|
|
path: Path to the model file or directly to the metadata file
|
|
metadata: Metadata to save (either BaseModelMetadata object or dict)
|
|
|
|
Returns:
|
|
bool: Success or failure
|
|
"""
|
|
# Determine if the input is a metadata path or a model file path
|
|
if path.endswith('.metadata.json'):
|
|
metadata_path = path
|
|
else:
|
|
# Use existing logic for model file paths
|
|
file_path = path
|
|
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
|
|
temp_path = f"{metadata_path}.tmp"
|
|
|
|
try:
|
|
# Convert to dict if needed
|
|
if isinstance(metadata, BaseModelMetadata):
|
|
metadata_dict = metadata.to_dict()
|
|
# Preserve unknown fields if present
|
|
if hasattr(metadata, '_unknown_fields'):
|
|
metadata_dict.update(metadata._unknown_fields)
|
|
else:
|
|
metadata_dict = metadata.copy()
|
|
|
|
# Normalize paths
|
|
if 'file_path' in metadata_dict:
|
|
metadata_dict['file_path'] = normalize_path(metadata_dict['file_path'])
|
|
if 'preview_url' in metadata_dict:
|
|
metadata_dict['preview_url'] = normalize_path(metadata_dict['preview_url'])
|
|
|
|
# Write to temporary file first
|
|
with open(temp_path, 'w', encoding='utf-8') as f:
|
|
json.dump(metadata_dict, f, indent=2, ensure_ascii=False)
|
|
|
|
# Atomic rename operation
|
|
os.replace(temp_path, metadata_path)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving metadata to {metadata_path}: {str(e)}")
|
|
# Clean up temporary file if it exists
|
|
if os.path.exists(temp_path):
|
|
try:
|
|
os.remove(temp_path)
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
@staticmethod
|
|
async def create_default_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]:
|
|
"""
|
|
Create basic metadata structure for a model file.
|
|
This replaces the old get_file_info function with a more appropriately named method.
|
|
|
|
Args:
|
|
file_path: Path to the model file
|
|
model_class: Class to instantiate
|
|
|
|
Returns:
|
|
BaseModelMetadata instance or None if file doesn't exist
|
|
"""
|
|
# First check if file actually exists and resolve symlinks
|
|
try:
|
|
real_path = os.path.realpath(file_path)
|
|
if not os.path.exists(real_path):
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error checking file existence for {file_path}: {e}")
|
|
return None
|
|
|
|
try:
|
|
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
|
dir_path = os.path.dirname(file_path)
|
|
|
|
# Find preview image
|
|
preview_url = find_preview_file(base_name, dir_path)
|
|
|
|
# Calculate file hash
|
|
start_hash_time = time.perf_counter()
|
|
logger.debug(f"Calculating SHA256 hash for {real_path}...")
|
|
sha256 = await calculate_sha256(real_path)
|
|
hash_duration = time.perf_counter() - start_hash_time
|
|
logger.info(f"SHA256 hash calculated for {real_path} in {hash_duration:.3f}s")
|
|
|
|
# Create instance based on model type
|
|
if model_class.__name__ == "CheckpointMetadata":
|
|
metadata = model_class(
|
|
file_name=base_name,
|
|
model_name=base_name,
|
|
file_path=normalize_path(file_path),
|
|
size=os.path.getsize(real_path),
|
|
modified=datetime.now().timestamp(),
|
|
sha256=sha256,
|
|
base_model="Unknown",
|
|
preview_url=normalize_path(preview_url),
|
|
tags=[],
|
|
modelDescription="",
|
|
sub_type="checkpoint",
|
|
from_civitai=True
|
|
)
|
|
elif model_class.__name__ == "EmbeddingMetadata":
|
|
metadata = model_class(
|
|
file_name=base_name,
|
|
model_name=base_name,
|
|
file_path=normalize_path(file_path),
|
|
size=os.path.getsize(real_path),
|
|
modified=datetime.now().timestamp(),
|
|
sha256=sha256,
|
|
base_model="Unknown",
|
|
preview_url=normalize_path(preview_url),
|
|
tags=[],
|
|
modelDescription="",
|
|
sub_type="embedding",
|
|
from_civitai=True
|
|
)
|
|
else: # Default to LoraMetadata
|
|
metadata = model_class(
|
|
file_name=base_name,
|
|
model_name=base_name,
|
|
file_path=normalize_path(file_path),
|
|
size=os.path.getsize(real_path),
|
|
modified=datetime.now().timestamp(),
|
|
sha256=sha256,
|
|
base_model="Unknown",
|
|
preview_url=normalize_path(preview_url),
|
|
tags=[],
|
|
modelDescription="",
|
|
from_civitai=True,
|
|
usage_tips="{}"
|
|
)
|
|
|
|
# Try to extract model-specific metadata
|
|
# await MetadataManager._enrich_metadata(metadata, real_path)
|
|
|
|
# Save the created metadata
|
|
logger.info(f"Creating new .metadata.json for {file_path} (Reason: No existing metadata found)")
|
|
await MetadataManager.save_metadata(file_path, metadata)
|
|
|
|
return metadata
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating default metadata for {file_path}: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
async def _enrich_metadata(metadata: BaseModelMetadata, file_path: str) -> None:
|
|
"""
|
|
Enrich metadata with model-specific information
|
|
|
|
Args:
|
|
metadata: Metadata to enrich
|
|
file_path: Path to the model file
|
|
"""
|
|
try:
|
|
if metadata.__class__.__name__ == "LoraMetadata":
|
|
model_info = await extract_lora_metadata(file_path)
|
|
metadata.base_model = model_info['base_model']
|
|
|
|
# elif metadata.__class__.__name__ == "CheckpointMetadata":
|
|
# model_info = await extract_checkpoint_metadata(file_path)
|
|
# metadata.base_model = model_info['base_model']
|
|
# if 'model_type' in model_info:
|
|
# metadata.model_type = model_info['model_type']
|
|
except Exception as e:
|
|
logger.error(f"Error enriching metadata: {str(e)}")
|
|
|
|
@staticmethod
|
|
async def _normalize_metadata_paths(metadata: BaseModelMetadata, file_path: str) -> None:
|
|
"""
|
|
Normalize paths in metadata object
|
|
|
|
Args:
|
|
metadata: Metadata object to update
|
|
file_path: Current file path for the model
|
|
"""
|
|
need_update = False
|
|
|
|
# Check if file_name matches the actual file name
|
|
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
|
if metadata.file_name != base_name:
|
|
metadata.file_name = base_name
|
|
need_update = True
|
|
|
|
# Check if file path is different from what's in metadata
|
|
if normalize_path(file_path) != metadata.file_path:
|
|
metadata.file_path = normalize_path(file_path)
|
|
need_update = True
|
|
|
|
# Check if preview exists at the current location
|
|
preview_url = metadata.preview_url
|
|
if preview_url:
|
|
# Get directory parts of both paths
|
|
file_dir = os.path.dirname(file_path)
|
|
preview_dir = os.path.dirname(preview_url)
|
|
|
|
# Update preview if it doesn't exist OR if model and preview are in different directories
|
|
if not os.path.exists(preview_url) or file_dir != preview_dir:
|
|
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
|
dir_path = os.path.dirname(file_path)
|
|
new_preview_url = find_preview_file(base_name, dir_path)
|
|
if new_preview_url:
|
|
metadata.preview_url = normalize_path(new_preview_url)
|
|
need_update = True
|
|
|
|
# If path attributes were changed, save the metadata back to disk
|
|
if need_update:
|
|
await MetadataManager.save_metadata(file_path, metadata)
|