Files
ComfyUI-Lora-Manager/py/utils/metadata_manager.py

335 lines
13 KiB
Python

from datetime import datetime
import os
import json
import logging
import time
from typing import Any, Dict, Optional, Type, Union
from .models import BaseModelMetadata, LoraMetadata
from .file_utils import normalize_path, find_preview_file, calculate_sha256
from .lora_metadata import extract_lora_metadata, extract_checkpoint_metadata
logger = logging.getLogger(__name__)
class MetadataManager:
"""
Centralized manager for all metadata operations.
This class is responsible for:
1. Loading metadata safely with fallback mechanisms
2. Saving metadata with atomic operations
3. Creating default metadata for models
4. Handling unknown fields gracefully
"""
@staticmethod
async def load_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> tuple[Optional[BaseModelMetadata], bool]:
"""
Load metadata safely.
Returns:
tuple: (metadata, should_skip)
- metadata: BaseModelMetadata instance or None
- should_skip: True if corrupted metadata file exists and model should be skipped
"""
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
# Check if metadata file exists
if not os.path.exists(metadata_path):
return None, False
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Create model instance
metadata = model_class.from_dict(data)
# Normalize paths
await MetadataManager._normalize_metadata_paths(metadata, file_path)
return metadata, False
except (json.JSONDecodeError, Exception) as e:
error_type = "Invalid JSON" if isinstance(e, json.JSONDecodeError) else "Parse error"
logger.error(f"{error_type} in metadata file: {metadata_path}. Error: {str(e)}. Skipping model to preserve existing data.")
return None, True # should_skip = True
@staticmethod
async def load_metadata_payload(file_path: str) -> Dict:
"""
Load metadata and return it as a dictionary, including any unknown fields.
Falls back to reading the raw JSON file if parsing into a model class fails.
"""
payload: Dict = {}
metadata_obj, should_skip = await MetadataManager.load_metadata(file_path)
if metadata_obj:
payload = metadata_obj.to_dict()
unknown_fields = getattr(metadata_obj, "_unknown_fields", None)
if isinstance(unknown_fields, dict):
payload.update(unknown_fields)
else:
if not should_skip:
metadata_path = (
file_path
if file_path.endswith(".metadata.json")
else f"{os.path.splitext(file_path)[0]}.metadata.json"
)
if os.path.exists(metadata_path):
try:
with open(metadata_path, "r", encoding="utf-8") as handle:
raw = json.load(handle)
if isinstance(raw, dict):
payload = raw
except json.JSONDecodeError:
logger.warning(
"Failed to parse metadata file %s while loading payload",
metadata_path,
)
except Exception as exc: # pragma: no cover - defensive logging
logger.warning("Failed to read metadata file %s: %s", metadata_path, exc)
if not isinstance(payload, dict):
payload = {}
if file_path:
payload.setdefault("file_path", normalize_path(file_path))
return payload
@staticmethod
async def hydrate_model_data(model_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Replace the provided model data with the authoritative payload from disk.
Preserves the cached folder entry if present.
"""
file_path = model_data.get("file_path")
if not file_path:
return model_data
folder = model_data.get("folder")
payload = await MetadataManager.load_metadata_payload(file_path)
if folder is not None:
payload["folder"] = folder
model_data.clear()
model_data.update(payload)
return model_data
@staticmethod
async def save_metadata(path: str, metadata: Union[BaseModelMetadata, Dict]) -> bool:
"""
Save metadata with atomic write operations.
Args:
path: Path to the model file or directly to the metadata file
metadata: Metadata to save (either BaseModelMetadata object or dict)
Returns:
bool: Success or failure
"""
# Determine if the input is a metadata path or a model file path
if path.endswith('.metadata.json'):
metadata_path = path
else:
# Use existing logic for model file paths
file_path = path
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
temp_path = f"{metadata_path}.tmp"
try:
# Convert to dict if needed
if isinstance(metadata, BaseModelMetadata):
metadata_dict = metadata.to_dict()
# Preserve unknown fields if present
if hasattr(metadata, '_unknown_fields'):
metadata_dict.update(metadata._unknown_fields)
else:
metadata_dict = metadata.copy()
# Normalize paths
if 'file_path' in metadata_dict:
metadata_dict['file_path'] = normalize_path(metadata_dict['file_path'])
if 'preview_url' in metadata_dict:
metadata_dict['preview_url'] = normalize_path(metadata_dict['preview_url'])
# Write to temporary file first
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(metadata_dict, f, indent=2, ensure_ascii=False)
# Atomic rename operation
os.replace(temp_path, metadata_path)
return True
except Exception as e:
logger.error(f"Error saving metadata to {metadata_path}: {str(e)}")
# Clean up temporary file if it exists
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
return False
@staticmethod
async def create_default_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]:
"""
Create basic metadata structure for a model file.
This replaces the old get_file_info function with a more appropriately named method.
Args:
file_path: Path to the model file
model_class: Class to instantiate
Returns:
BaseModelMetadata instance or None if file doesn't exist
"""
# First check if file actually exists and resolve symlinks
try:
real_path = os.path.realpath(file_path)
if not os.path.exists(real_path):
return None
except Exception as e:
logger.error(f"Error checking file existence for {file_path}: {e}")
return None
try:
base_name = os.path.splitext(os.path.basename(file_path))[0]
dir_path = os.path.dirname(file_path)
# Find preview image
preview_url = find_preview_file(base_name, dir_path)
# Calculate file hash
start_hash_time = time.perf_counter()
logger.debug(f"Calculating SHA256 hash for {real_path}...")
sha256 = await calculate_sha256(real_path)
hash_duration = time.perf_counter() - start_hash_time
logger.info(f"SHA256 hash calculated for {real_path} in {hash_duration:.3f}s")
# Create instance based on model type
if model_class.__name__ == "CheckpointMetadata":
metadata = model_class(
file_name=base_name,
model_name=base_name,
file_path=normalize_path(file_path),
size=os.path.getsize(real_path),
modified=datetime.now().timestamp(),
sha256=sha256,
base_model="Unknown",
preview_url=normalize_path(preview_url),
tags=[],
modelDescription="",
model_type="checkpoint",
from_civitai=True
)
elif model_class.__name__ == "EmbeddingMetadata":
metadata = model_class(
file_name=base_name,
model_name=base_name,
file_path=normalize_path(file_path),
size=os.path.getsize(real_path),
modified=datetime.now().timestamp(),
sha256=sha256,
base_model="Unknown",
preview_url=normalize_path(preview_url),
tags=[],
modelDescription="",
from_civitai=True
)
else: # Default to LoraMetadata
metadata = model_class(
file_name=base_name,
model_name=base_name,
file_path=normalize_path(file_path),
size=os.path.getsize(real_path),
modified=datetime.now().timestamp(),
sha256=sha256,
base_model="Unknown",
preview_url=normalize_path(preview_url),
tags=[],
modelDescription="",
from_civitai=True,
usage_tips="{}"
)
# Try to extract model-specific metadata
# await MetadataManager._enrich_metadata(metadata, real_path)
# Save the created metadata
logger.info(f"Creating new .metadata.json for {file_path} (Reason: No existing metadata found)")
await MetadataManager.save_metadata(file_path, metadata)
return metadata
except Exception as e:
logger.error(f"Error creating default metadata for {file_path}: {e}")
return None
@staticmethod
async def _enrich_metadata(metadata: BaseModelMetadata, file_path: str) -> None:
"""
Enrich metadata with model-specific information
Args:
metadata: Metadata to enrich
file_path: Path to the model file
"""
try:
if metadata.__class__.__name__ == "LoraMetadata":
model_info = await extract_lora_metadata(file_path)
metadata.base_model = model_info['base_model']
# elif metadata.__class__.__name__ == "CheckpointMetadata":
# model_info = await extract_checkpoint_metadata(file_path)
# metadata.base_model = model_info['base_model']
# if 'model_type' in model_info:
# metadata.model_type = model_info['model_type']
except Exception as e:
logger.error(f"Error enriching metadata: {str(e)}")
@staticmethod
async def _normalize_metadata_paths(metadata: BaseModelMetadata, file_path: str) -> None:
"""
Normalize paths in metadata object
Args:
metadata: Metadata object to update
file_path: Current file path for the model
"""
need_update = False
# Check if file_name matches the actual file name
base_name = os.path.splitext(os.path.basename(file_path))[0]
if metadata.file_name != base_name:
metadata.file_name = base_name
need_update = True
# Check if file path is different from what's in metadata
if normalize_path(file_path) != metadata.file_path:
metadata.file_path = normalize_path(file_path)
need_update = True
# Check if preview exists at the current location
preview_url = metadata.preview_url
if preview_url:
# Get directory parts of both paths
file_dir = os.path.dirname(file_path)
preview_dir = os.path.dirname(preview_url)
# Update preview if it doesn't exist OR if model and preview are in different directories
if not os.path.exists(preview_url) or file_dir != preview_dir:
base_name = os.path.splitext(os.path.basename(file_path))[0]
dir_path = os.path.dirname(file_path)
new_preview_url = find_preview_file(base_name, dir_path)
if new_preview_url:
metadata.preview_url = normalize_path(new_preview_url)
need_update = True
# If path attributes were changed, save the metadata back to disk
if need_update:
await MetadataManager.save_metadata(file_path, metadata)