mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
Refactor metadata handling by introducing MetadataManager for centralized operations and improving error handling
This commit is contained in:
@@ -14,6 +14,7 @@ import asyncio
|
||||
from .update_routes import UpdateRoutes
|
||||
from ..utils.constants import PREVIEW_EXTENSIONS, CARD_PREVIEW_WIDTH, VALID_LORA_TYPES
|
||||
from ..utils.exif_utils import ExifUtils
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
from ..services.service_registry import ServiceRegistry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -289,22 +290,6 @@ class ApiRoutes:
|
||||
|
||||
return preview_path
|
||||
|
||||
async def _update_preview_metadata(self, model_path: str, preview_path: str):
|
||||
"""Update preview path in metadata"""
|
||||
metadata_path = os.path.splitext(model_path)[0] + '.metadata.json'
|
||||
if os.path.exists(metadata_path):
|
||||
try:
|
||||
with open(metadata_path, 'r', encoding='utf-8') as f:
|
||||
metadata = json.load(f)
|
||||
|
||||
# Update preview_url directly in the metadata dict
|
||||
metadata['preview_url'] = preview_path
|
||||
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating metadata: {e}")
|
||||
|
||||
async def fetch_all_civitai(self, request: web.Request) -> web.Response:
|
||||
"""Fetch CivitAI metadata for all loras in the background"""
|
||||
try:
|
||||
@@ -640,8 +625,7 @@ class ApiRoutes:
|
||||
metadata[key] = value
|
||||
|
||||
# Save updated metadata
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(file_path, metadata)
|
||||
|
||||
# Update cache
|
||||
await self.scanner.update_single_model_cache(file_path, file_path, metadata)
|
||||
@@ -854,9 +838,7 @@ class ApiRoutes:
|
||||
metadata['tags'] = tags
|
||||
metadata['creator'] = creator
|
||||
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
logger.info(f"Saved model metadata to file for {file_path}")
|
||||
await MetadataManager.save_metadata(file_path, metadata)
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving model metadata: {e}")
|
||||
|
||||
@@ -972,6 +954,7 @@ class ApiRoutes:
|
||||
patterns = [
|
||||
f"{old_file_name}.safetensors", # Required
|
||||
f"{old_file_name}.metadata.json",
|
||||
f"{old_file_name}.metadata.json.bak",
|
||||
]
|
||||
|
||||
# Add all preview file extensions
|
||||
@@ -1027,8 +1010,7 @@ class ApiRoutes:
|
||||
metadata['preview_url'] = new_preview
|
||||
|
||||
# Save updated metadata
|
||||
with open(new_metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(new_file_path, metadata)
|
||||
|
||||
# Update the scanner cache
|
||||
if metadata:
|
||||
|
||||
@@ -7,6 +7,7 @@ import asyncio
|
||||
|
||||
from ..utils.routes_common import ModelRouteUtils
|
||||
from ..utils.constants import NSFW_LEVELS
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
from ..services.websocket_manager import ws_manager
|
||||
from ..services.service_registry import ServiceRegistry
|
||||
from ..config import config
|
||||
@@ -650,8 +651,7 @@ class CheckpointsRoutes:
|
||||
metadata.update(metadata_updates)
|
||||
|
||||
# Save updated metadata
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(file_path, metadata)
|
||||
|
||||
# Update cache
|
||||
await self.scanner.update_single_model_cache(file_path, file_path, metadata)
|
||||
|
||||
@@ -13,6 +13,7 @@ from ..services.settings_manager import settings
|
||||
from ..services.service_registry import ServiceRegistry
|
||||
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
||||
from ..utils.routes_common import ModelRouteUtils
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -446,8 +447,7 @@ class ExampleImagesRoutes:
|
||||
model_copy.pop('folder', None)
|
||||
|
||||
# Write the metadata to file without the folder field
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(model_copy, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(file_path, model_copy)
|
||||
logger.info(f"Saved metadata to {metadata_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save metadata to {metadata_path}: {str(e)}")
|
||||
@@ -1231,8 +1231,7 @@ class ExampleImagesRoutes:
|
||||
model_copy.pop('folder', None)
|
||||
|
||||
# Write the metadata to file
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(model_copy, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(file_path, model_copy)
|
||||
logger.info(f"Saved metadata to {metadata_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save metadata to {metadata_path}: {str(e)}")
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Dict
|
||||
from ..utils.models import LoraMetadata, CheckpointMetadata
|
||||
from ..utils.constants import CARD_PREVIEW_WIDTH
|
||||
from ..utils.exif_utils import ExifUtils
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
from .service_registry import ServiceRegistry
|
||||
|
||||
# Download to temporary file first
|
||||
@@ -198,8 +199,6 @@ class DownloadManager:
|
||||
if await civitai_client.download_preview_image(images[0]['url'], preview_path):
|
||||
metadata.preview_url = preview_path.replace(os.sep, '/')
|
||||
metadata.preview_nsfw_level = images[0].get('nsfwLevel', 0)
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
else:
|
||||
# For images, use WebP format for better performance
|
||||
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
|
||||
@@ -226,8 +225,6 @@ class DownloadManager:
|
||||
# Update metadata
|
||||
metadata.preview_url = preview_path.replace(os.sep, '/')
|
||||
metadata.preview_nsfw_level = images[0].get('nsfwLevel', 0)
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
|
||||
# Remove temporary file
|
||||
try:
|
||||
@@ -258,8 +255,7 @@ class DownloadManager:
|
||||
metadata.update_file_info(save_path)
|
||||
|
||||
# 5. Final metadata update
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(save_path, metadata)
|
||||
|
||||
# 6. Update cache based on model type
|
||||
if model_type == "checkpoint":
|
||||
|
||||
@@ -9,7 +9,8 @@ import msgpack # Add MessagePack import for efficient serialization
|
||||
|
||||
from ..utils.models import BaseModelMetadata
|
||||
from ..config import config
|
||||
from ..utils.file_utils import load_metadata, get_file_info, find_preview_file, save_metadata
|
||||
from ..utils.file_utils import find_preview_file
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
from .model_cache import ModelCache
|
||||
from .model_hash_index import ModelHashIndex
|
||||
from ..utils.constants import PREVIEW_EXTENSIONS
|
||||
@@ -752,9 +753,9 @@ class ModelScanner:
|
||||
"""Get model root directories"""
|
||||
raise NotImplementedError("Subclasses must implement get_model_roots")
|
||||
|
||||
async def _get_file_info(self, file_path: str) -> Optional[BaseModelMetadata]:
|
||||
async def _create_default_metadata(self, file_path: str) -> Optional[BaseModelMetadata]:
|
||||
"""Get model file info and metadata (extensible for different model types)"""
|
||||
return await get_file_info(file_path, self.model_class)
|
||||
return await MetadataManager.create_default_metadata(file_path, self.model_class)
|
||||
|
||||
def _calculate_folder(self, file_path: str) -> str:
|
||||
"""Calculate the folder path for a model file"""
|
||||
@@ -767,7 +768,7 @@ class ModelScanner:
|
||||
# Common methods shared between scanners
|
||||
async def _process_model_file(self, file_path: str, root_path: str) -> Dict:
|
||||
"""Process a single model file and return its metadata"""
|
||||
metadata = await load_metadata(file_path, self.model_class)
|
||||
metadata = await MetadataManager.load_metadata(file_path, self.model_class)
|
||||
|
||||
if metadata is None:
|
||||
civitai_info_path = f"{os.path.splitext(file_path)[0]}.civitai.info"
|
||||
@@ -783,7 +784,7 @@ class ModelScanner:
|
||||
|
||||
metadata = self.model_class.from_civitai_info(version_info, file_info, file_path)
|
||||
metadata.preview_url = find_preview_file(file_name, os.path.dirname(file_path))
|
||||
await save_metadata(file_path, metadata)
|
||||
await MetadataManager.save_metadata(file_path, metadata)
|
||||
logger.debug(f"Created metadata from .civitai.info for {file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating metadata from .civitai.info for {file_path}: {e}")
|
||||
@@ -810,13 +811,13 @@ class ModelScanner:
|
||||
metadata.modelDescription = version_info['model']['description']
|
||||
|
||||
# Save the updated metadata
|
||||
await save_metadata(file_path, metadata)
|
||||
await MetadataManager.save_metadata(file_path, metadata)
|
||||
logger.debug(f"Updated metadata with civitai info for {file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error restoring civitai data from .civitai.info for {file_path}: {e}")
|
||||
|
||||
if metadata is None:
|
||||
metadata = await self._get_file_info(file_path)
|
||||
metadata = await self._create_default_metadata(file_path)
|
||||
|
||||
model_data = metadata.to_dict()
|
||||
|
||||
@@ -866,9 +867,7 @@ class ModelScanner:
|
||||
logger.warning(f"Model {model_id} appears to be deleted from Civitai (404 response)")
|
||||
model_data['civitai_deleted'] = True
|
||||
|
||||
metadata_path = os.path.splitext(file_path)[0] + '.metadata.json'
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(model_data, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(file_path, model_data)
|
||||
|
||||
elif model_metadata:
|
||||
logger.debug(f"Updating metadata for {file_path} with model ID {model_id}")
|
||||
@@ -881,9 +880,7 @@ class ModelScanner:
|
||||
|
||||
model_data['civitai']['creator'] = model_metadata['creator']
|
||||
|
||||
metadata_path = os.path.splitext(file_path)[0] + '.metadata.json'
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(model_data, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(file_path, model_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update metadata from Civitai for {file_path}: {e}")
|
||||
|
||||
@@ -1049,8 +1046,7 @@ class ModelScanner:
|
||||
new_preview_path = os.path.join(preview_dir, f"{preview_name}{preview_ext}")
|
||||
metadata['preview_url'] = new_preview_path.replace(os.sep, '/')
|
||||
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(metadata_path, metadata)
|
||||
|
||||
return metadata
|
||||
|
||||
|
||||
@@ -172,7 +172,7 @@ async def save_metadata(file_path: str, metadata: BaseModelMetadata) -> None:
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata_dict, f, indent=2, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
print(f"Error saving metadata to {metadata_path}: {str(e)}")
|
||||
logger.error(f"Error saving metadata to {metadata_path}: {str(e)}")
|
||||
|
||||
async def load_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]:
|
||||
"""Load metadata from .metadata.json file"""
|
||||
@@ -251,11 +251,5 @@ async def load_metadata(file_path: str, model_class: Type[BaseModelMetadata] = L
|
||||
return model_class.from_dict(data)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error loading metadata from {metadata_path}: {str(e)}")
|
||||
return None
|
||||
|
||||
async def update_civitai_metadata(file_path: str, civitai_data: Dict) -> None:
|
||||
"""Update metadata file with Civitai data"""
|
||||
metadata = await load_metadata(file_path)
|
||||
metadata['civitai'] = civitai_data
|
||||
await save_metadata(file_path, metadata)
|
||||
logger.error(f"Error loading metadata from {metadata_path}: {str(e)}")
|
||||
return None
|
||||
275
py/utils/metadata_manager.py
Normal file
275
py/utils/metadata_manager.py
Normal file
@@ -0,0 +1,275 @@
|
||||
import os
|
||||
import json
|
||||
import shutil
|
||||
import logging
|
||||
from typing import Dict, Optional, Type, Union
|
||||
|
||||
from .models import BaseModelMetadata, LoraMetadata
|
||||
from .file_utils import normalize_path, find_preview_file, calculate_sha256
|
||||
from .lora_metadata import extract_lora_metadata, extract_checkpoint_metadata
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class MetadataManager:
|
||||
"""
|
||||
Centralized manager for all metadata operations.
|
||||
|
||||
This class is responsible for:
|
||||
1. Loading metadata safely with fallback mechanisms
|
||||
2. Saving metadata with atomic operations and backups
|
||||
3. Creating default metadata for models
|
||||
4. Handling unknown fields gracefully
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
async def load_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]:
|
||||
"""
|
||||
Load metadata with robust error handling and data preservation.
|
||||
|
||||
Args:
|
||||
file_path: Path to the model file
|
||||
model_class: Class to instantiate (LoraMetadata, CheckpointMetadata, etc.)
|
||||
|
||||
Returns:
|
||||
BaseModelMetadata instance or None if file doesn't exist
|
||||
"""
|
||||
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
|
||||
backup_path = f"{metadata_path}.bak"
|
||||
|
||||
# Try loading the main metadata file
|
||||
if os.path.exists(metadata_path):
|
||||
try:
|
||||
with open(metadata_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Create model instance
|
||||
metadata = model_class.from_dict(data)
|
||||
|
||||
# Normalize paths
|
||||
await MetadataManager._normalize_metadata_paths(metadata, file_path)
|
||||
|
||||
return metadata
|
||||
|
||||
except json.JSONDecodeError:
|
||||
# JSON parsing error - try to restore from backup
|
||||
logger.warning(f"Invalid JSON in metadata file: {metadata_path}")
|
||||
return await MetadataManager._restore_from_backup(backup_path, file_path, model_class)
|
||||
|
||||
except Exception as e:
|
||||
# Other errors might be due to unknown fields or schema changes
|
||||
logger.error(f"Error loading metadata from {metadata_path}: {str(e)}")
|
||||
return await MetadataManager._restore_from_backup(backup_path, file_path, model_class)
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
async def _restore_from_backup(backup_path: str, file_path: str, model_class: Type[BaseModelMetadata]) -> Optional[BaseModelMetadata]:
|
||||
"""
|
||||
Try to restore metadata from backup file
|
||||
|
||||
Args:
|
||||
backup_path: Path to backup file
|
||||
file_path: Path to the original model file
|
||||
model_class: Class to instantiate
|
||||
|
||||
Returns:
|
||||
BaseModelMetadata instance or None if restoration fails
|
||||
"""
|
||||
if os.path.exists(backup_path):
|
||||
try:
|
||||
logger.info(f"Attempting to restore metadata from backup: {backup_path}")
|
||||
with open(backup_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Process data similarly to normal loading
|
||||
metadata = model_class.from_dict(data)
|
||||
await MetadataManager._normalize_metadata_paths(metadata, file_path)
|
||||
return metadata
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restore from backup: {str(e)}")
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
async def save_metadata(path: str, metadata: Union[BaseModelMetadata, Dict], create_backup: bool = True) -> bool:
|
||||
"""
|
||||
Save metadata with atomic write operations and backup creation.
|
||||
|
||||
Args:
|
||||
path: Path to the model file or directly to the metadata file
|
||||
metadata: Metadata to save (either BaseModelMetadata object or dict)
|
||||
create_backup: Whether to create a backup of existing file
|
||||
|
||||
Returns:
|
||||
bool: Success or failure
|
||||
"""
|
||||
# Determine if the input is a metadata path or a model file path
|
||||
if path.endswith('.metadata.json'):
|
||||
metadata_path = path
|
||||
else:
|
||||
# Use existing logic for model file paths
|
||||
file_path = path
|
||||
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
|
||||
temp_path = f"{metadata_path}.tmp"
|
||||
backup_path = f"{metadata_path}.bak"
|
||||
|
||||
try:
|
||||
# Create backup if requested and file exists
|
||||
if create_backup and os.path.exists(metadata_path):
|
||||
try:
|
||||
shutil.copy2(metadata_path, backup_path)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to create metadata backup: {str(e)}")
|
||||
|
||||
# Convert to dict if needed
|
||||
if isinstance(metadata, BaseModelMetadata):
|
||||
metadata_dict = metadata.to_dict()
|
||||
# Preserve unknown fields if present
|
||||
if hasattr(metadata, '_unknown_fields'):
|
||||
metadata_dict.update(metadata._unknown_fields)
|
||||
else:
|
||||
metadata_dict = metadata.copy()
|
||||
|
||||
# Normalize paths
|
||||
if 'file_path' in metadata_dict:
|
||||
metadata_dict['file_path'] = normalize_path(metadata_dict['file_path'])
|
||||
if 'preview_url' in metadata_dict:
|
||||
metadata_dict['preview_url'] = normalize_path(metadata_dict['preview_url'])
|
||||
|
||||
# Write to temporary file first
|
||||
with open(temp_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata_dict, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# Atomic rename operation
|
||||
os.replace(temp_path, metadata_path)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving metadata to {metadata_path}: {str(e)}")
|
||||
# Clean up temporary file if it exists
|
||||
if os.path.exists(temp_path):
|
||||
try:
|
||||
os.remove(temp_path)
|
||||
except:
|
||||
pass
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
async def create_default_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]:
|
||||
"""
|
||||
Create basic metadata structure for a model file.
|
||||
This replaces the old get_file_info function with a more appropriately named method.
|
||||
|
||||
Args:
|
||||
file_path: Path to the model file
|
||||
model_class: Class to instantiate
|
||||
|
||||
Returns:
|
||||
BaseModelMetadata instance or None if file doesn't exist
|
||||
"""
|
||||
# First check if file actually exists and resolve symlinks
|
||||
try:
|
||||
real_path = os.path.realpath(file_path)
|
||||
if not os.path.exists(real_path):
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking file existence for {file_path}: {e}")
|
||||
return None
|
||||
|
||||
try:
|
||||
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
||||
dir_path = os.path.dirname(file_path)
|
||||
|
||||
# Find preview image
|
||||
preview_url = find_preview_file(base_name, dir_path)
|
||||
|
||||
# Calculate file hash
|
||||
sha256 = await calculate_sha256(real_path)
|
||||
|
||||
# Create instance based on model type
|
||||
if model_class.__name__ == "CheckpointMetadata":
|
||||
metadata = model_class(
|
||||
file_name=base_name,
|
||||
model_name=base_name,
|
||||
file_path=normalize_path(file_path),
|
||||
size=os.path.getsize(real_path),
|
||||
modified=os.path.getmtime(real_path),
|
||||
sha256=sha256,
|
||||
base_model="Unknown",
|
||||
preview_url=normalize_path(preview_url),
|
||||
tags=[],
|
||||
modelDescription="",
|
||||
model_type="checkpoint",
|
||||
from_civitai=False
|
||||
)
|
||||
else: # Default to LoraMetadata
|
||||
metadata = model_class(
|
||||
file_name=base_name,
|
||||
model_name=base_name,
|
||||
file_path=normalize_path(file_path),
|
||||
size=os.path.getsize(real_path),
|
||||
modified=os.path.getmtime(real_path),
|
||||
sha256=sha256,
|
||||
base_model="Unknown",
|
||||
preview_url=normalize_path(preview_url),
|
||||
tags=[],
|
||||
modelDescription="",
|
||||
from_civitai=False,
|
||||
usage_tips="{}"
|
||||
)
|
||||
|
||||
# Try to extract model-specific metadata
|
||||
await MetadataManager._enrich_metadata(metadata, real_path)
|
||||
|
||||
# Save the created metadata
|
||||
await MetadataManager.save_metadata(file_path, metadata, create_backup=False)
|
||||
|
||||
return metadata
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating default metadata for {file_path}: {e}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
async def _enrich_metadata(metadata: BaseModelMetadata, file_path: str) -> None:
|
||||
"""
|
||||
Enrich metadata with model-specific information
|
||||
|
||||
Args:
|
||||
metadata: Metadata to enrich
|
||||
file_path: Path to the model file
|
||||
"""
|
||||
try:
|
||||
if metadata.__class__.__name__ == "LoraMetadata":
|
||||
model_info = await extract_lora_metadata(file_path)
|
||||
metadata.base_model = model_info['base_model']
|
||||
|
||||
elif metadata.__class__.__name__ == "CheckpointMetadata":
|
||||
model_info = await extract_checkpoint_metadata(file_path)
|
||||
metadata.base_model = model_info['base_model']
|
||||
if 'model_type' in model_info:
|
||||
metadata.model_type = model_info['model_type']
|
||||
except Exception as e:
|
||||
logger.error(f"Error enriching metadata: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
async def _normalize_metadata_paths(metadata: BaseModelMetadata, file_path: str) -> None:
|
||||
"""
|
||||
Normalize paths in metadata object
|
||||
|
||||
Args:
|
||||
metadata: Metadata object to update
|
||||
file_path: Current file path for the model
|
||||
"""
|
||||
# Check if file path is different from what's in metadata
|
||||
if normalize_path(file_path) != metadata.file_path:
|
||||
metadata.file_path = normalize_path(file_path)
|
||||
|
||||
# Check if preview exists at the current location
|
||||
preview_url = metadata.preview_url
|
||||
if preview_url and not os.path.exists(preview_url):
|
||||
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
||||
dir_path = os.path.dirname(file_path)
|
||||
new_preview_url = find_preview_file(base_name, dir_path)
|
||||
if new_preview_url:
|
||||
metadata.preview_url = normalize_path(new_preview_url)
|
||||
@@ -1,5 +1,5 @@
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import Dict, Optional, List
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from typing import Dict, Optional, List, Any
|
||||
from datetime import datetime
|
||||
import os
|
||||
from .model_utils import determine_base_model
|
||||
@@ -24,6 +24,7 @@ class BaseModelMetadata:
|
||||
civitai_deleted: bool = False # Whether deleted from Civitai
|
||||
favorite: bool = False # Whether the model is a favorite
|
||||
exclude: bool = False # Whether to exclude this model from the cache
|
||||
_unknown_fields: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False) # Store unknown fields
|
||||
|
||||
def __post_init__(self):
|
||||
# Initialize empty lists to avoid mutable default parameter issue
|
||||
@@ -34,11 +35,43 @@ class BaseModelMetadata:
|
||||
def from_dict(cls, data: Dict) -> 'BaseModelMetadata':
|
||||
"""Create instance from dictionary"""
|
||||
data_copy = data.copy()
|
||||
return cls(**data_copy)
|
||||
|
||||
# Use cached fields if available, otherwise compute them
|
||||
if not hasattr(cls, '_known_fields_cache'):
|
||||
known_fields = set()
|
||||
for c in cls.mro():
|
||||
if hasattr(c, '__annotations__'):
|
||||
known_fields.update(c.__annotations__.keys())
|
||||
cls._known_fields_cache = known_fields
|
||||
|
||||
known_fields = cls._known_fields_cache
|
||||
|
||||
# Extract fields that match our class attributes
|
||||
fields_to_use = {k: v for k, v in data_copy.items() if k in known_fields}
|
||||
|
||||
# Store unknown fields separately
|
||||
unknown_fields = {k: v for k, v in data_copy.items() if k not in known_fields and not k.startswith('_')}
|
||||
|
||||
# Create instance with known fields
|
||||
instance = cls(**fields_to_use)
|
||||
|
||||
# Add unknown fields as a separate attribute
|
||||
instance._unknown_fields = unknown_fields
|
||||
|
||||
return instance
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
"""Convert to dictionary for JSON serialization"""
|
||||
return asdict(self)
|
||||
result = asdict(self)
|
||||
|
||||
# Remove private fields
|
||||
result = {k: v for k, v in result.items() if not k.startswith('_')}
|
||||
|
||||
# Add back unknown fields if they exist
|
||||
if hasattr(self, '_unknown_fields'):
|
||||
result.update(self._unknown_fields)
|
||||
|
||||
return result
|
||||
|
||||
@property
|
||||
def modified_datetime(self) -> datetime:
|
||||
|
||||
@@ -9,6 +9,7 @@ from .constants import PREVIEW_EXTENSIONS, CARD_PREVIEW_WIDTH
|
||||
from ..config import config
|
||||
from ..services.civitai_client import CivitaiClient
|
||||
from ..utils.exif_utils import ExifUtils
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
from ..services.download_manager import DownloadManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -32,8 +33,7 @@ class ModelRouteUtils:
|
||||
async def handle_not_found_on_civitai(metadata_path: str, local_metadata: Dict) -> None:
|
||||
"""Handle case when model is not found on CivitAI"""
|
||||
local_metadata['from_civitai'] = False
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(local_metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(metadata_path, local_metadata)
|
||||
|
||||
@staticmethod
|
||||
async def update_model_metadata(metadata_path: str, local_metadata: Dict,
|
||||
@@ -138,8 +138,7 @@ class ModelRouteUtils:
|
||||
local_metadata['preview_nsfw_level'] = first_preview.get('nsfwLevel', 0)
|
||||
|
||||
# Save updated metadata
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(local_metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(metadata_path, local_metadata)
|
||||
|
||||
@staticmethod
|
||||
async def fetch_and_update_model(
|
||||
@@ -177,8 +176,7 @@ class ModelRouteUtils:
|
||||
# Mark as not from CivitAI if not found
|
||||
local_metadata['from_civitai'] = False
|
||||
model_data['from_civitai'] = False
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(local_metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(file_path, local_metadata)
|
||||
return False
|
||||
|
||||
# Update metadata
|
||||
@@ -270,10 +268,12 @@ class ModelRouteUtils:
|
||||
|
||||
@staticmethod
|
||||
def get_multipart_ext(filename):
|
||||
"""Get extension that may have multiple parts like .metadata.json"""
|
||||
"""Get extension that may have multiple parts like .metadata.json or .metadata.json.bak"""
|
||||
parts = filename.split(".")
|
||||
if len(parts) > 2: # If contains multi-part extension
|
||||
if len(parts) == 3: # If contains 2-part extension
|
||||
return "." + ".".join(parts[-2:]) # Take the last two parts, like ".metadata.json"
|
||||
elif len(parts) >= 4: # If contains 3-part or more extensions
|
||||
return "." + ".".join(parts[-3:]) # Take the last three parts, like ".metadata.json.bak"
|
||||
return os.path.splitext(filename)[1] # Otherwise take the regular extension, like ".safetensors"
|
||||
|
||||
# New common endpoint handlers
|
||||
@@ -428,8 +428,7 @@ class ModelRouteUtils:
|
||||
# Update preview_url directly in the metadata dict
|
||||
metadata['preview_url'] = preview_path
|
||||
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(model_path, metadata)
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating metadata: {e}")
|
||||
|
||||
@@ -469,8 +468,7 @@ class ModelRouteUtils:
|
||||
metadata['exclude'] = True
|
||||
|
||||
# Save updated metadata
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(file_path, metadata)
|
||||
|
||||
# Update cache
|
||||
cache = await scanner.get_cached_data()
|
||||
@@ -759,8 +757,7 @@ class ModelRouteUtils:
|
||||
metadata['sha256'] = actual_hash
|
||||
|
||||
# Save updated metadata
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
await MetadataManager.save_metadata(file_path, metadata)
|
||||
|
||||
# Update cache
|
||||
await scanner.update_single_model_cache(file_path, file_path, metadata)
|
||||
|
||||
Reference in New Issue
Block a user