Merge branch 'main' into fix-crash-on-symlinks

This commit is contained in:
pixelpaws
2025-06-19 18:33:40 +08:00
committed by GitHub
50 changed files with 4226 additions and 3353 deletions

View File

@@ -1,5 +1,10 @@
import asyncio
import sys
import os
import logging
from pathlib import Path
from server import PromptServer # type: ignore
from .config import config
from .routes.lora_routes import LoraRoutes
from .routes.api_routes import ApiRoutes
@@ -10,10 +15,7 @@ from .routes.misc_routes import MiscRoutes
from .routes.example_images_routes import ExampleImagesRoutes
from .services.service_registry import ServiceRegistry
from .services.settings_manager import settings
from pathlib import Path
import logging
import sys
import os
from .utils.example_images_migration import ExampleImagesMigration
logger = logging.getLogger(__name__)
@@ -135,13 +137,13 @@ class LoraManager:
logging.getLogger('aiohttp.access').setLevel(logging.WARNING)
# Initialize CivitaiClient first to ensure it's ready for other services
civitai_client = await ServiceRegistry.get_civitai_client()
await ServiceRegistry.get_civitai_client()
# Register DownloadManager with ServiceRegistry
download_manager = await ServiceRegistry.get_download_manager()
await ServiceRegistry.get_download_manager()
# Initialize WebSocket manager
ws_manager = await ServiceRegistry.get_websocket_manager()
await ServiceRegistry.get_websocket_manager()
# Initialize scanners in background
lora_scanner = await ServiceRegistry.get_lora_scanner()
@@ -160,6 +162,8 @@ class LoraManager:
asyncio.create_task(lora_scanner.initialize_in_background(), name='lora_cache_init')
asyncio.create_task(checkpoint_scanner.initialize_in_background(), name='checkpoint_cache_init')
asyncio.create_task(recipe_scanner.initialize_in_background(), name='recipe_cache_init')
await ExampleImagesMigration.check_and_run_migrations()
logger.info("LoRA Manager: All services initialized and background tasks scheduled")

View File

@@ -14,6 +14,7 @@ import asyncio
from .update_routes import UpdateRoutes
from ..utils.constants import PREVIEW_EXTENSIONS, CARD_PREVIEW_WIDTH, VALID_LORA_TYPES
from ..utils.exif_utils import ExifUtils
from ..utils.metadata_manager import MetadataManager
from ..services.service_registry import ServiceRegistry
logger = logging.getLogger(__name__)
@@ -289,22 +290,6 @@ class ApiRoutes:
return preview_path
async def _update_preview_metadata(self, model_path: str, preview_path: str):
"""Update preview path in metadata"""
metadata_path = os.path.splitext(model_path)[0] + '.metadata.json'
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
# Update preview_url directly in the metadata dict
metadata['preview_url'] = preview_path
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error updating metadata: {e}")
async def fetch_all_civitai(self, request: web.Request) -> web.Response:
"""Fetch CivitAI metadata for all loras in the background"""
try:
@@ -640,8 +625,7 @@ class ApiRoutes:
metadata[key] = value
# Save updated metadata
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(file_path, metadata)
# Update cache
await self.scanner.update_single_model_cache(file_path, file_path, metadata)
@@ -854,9 +838,7 @@ class ApiRoutes:
metadata['tags'] = tags
metadata['creator'] = creator
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
logger.info(f"Saved model metadata to file for {file_path}")
await MetadataManager.save_metadata(file_path, metadata)
except Exception as e:
logger.error(f"Error saving model metadata: {e}")
@@ -972,6 +954,7 @@ class ApiRoutes:
patterns = [
f"{old_file_name}.safetensors", # Required
f"{old_file_name}.metadata.json",
f"{old_file_name}.metadata.json.bak",
]
# Add all preview file extensions
@@ -1027,8 +1010,7 @@ class ApiRoutes:
metadata['preview_url'] = new_preview
# Save updated metadata
with open(new_metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(new_file_path, metadata)
# Update the scanner cache
if metadata:

View File

@@ -7,6 +7,7 @@ import asyncio
from ..utils.routes_common import ModelRouteUtils
from ..utils.constants import NSFW_LEVELS
from ..utils.metadata_manager import MetadataManager
from ..services.websocket_manager import ws_manager
from ..services.service_registry import ServiceRegistry
from ..config import config
@@ -650,8 +651,7 @@ class CheckpointsRoutes:
metadata.update(metadata_updates)
# Save updated metadata
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(file_path, metadata)
# Update cache
await self.scanner.update_single_model_cache(file_path, file_path, metadata)

File diff suppressed because it is too large Load Diff

View File

@@ -70,8 +70,7 @@ class LoraRoutes:
# It's initializing if the cache object doesn't exist yet,
# OR if the scanner explicitly says it's initializing (background task running).
is_initializing = (
self.scanner._cache is None or
(hasattr(self.scanner, '_is_initializing') and self.scanner._is_initializing)
self.scanner._cache is None or self.scanner.is_initializing()
)
if is_initializing:

View File

@@ -6,6 +6,7 @@ from typing import Dict
from ..utils.models import LoraMetadata, CheckpointMetadata
from ..utils.constants import CARD_PREVIEW_WIDTH
from ..utils.exif_utils import ExifUtils
from ..utils.metadata_manager import MetadataManager
from .service_registry import ServiceRegistry
# Download to temporary file first
@@ -198,8 +199,6 @@ class DownloadManager:
if await civitai_client.download_preview_image(images[0]['url'], preview_path):
metadata.preview_url = preview_path.replace(os.sep, '/')
metadata.preview_nsfw_level = images[0].get('nsfwLevel', 0)
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata.to_dict(), f, indent=2, ensure_ascii=False)
else:
# For images, use WebP format for better performance
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
@@ -226,8 +225,6 @@ class DownloadManager:
# Update metadata
metadata.preview_url = preview_path.replace(os.sep, '/')
metadata.preview_nsfw_level = images[0].get('nsfwLevel', 0)
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata.to_dict(), f, indent=2, ensure_ascii=False)
# Remove temporary file
try:
@@ -258,8 +255,7 @@ class DownloadManager:
metadata.update_file_info(save_path)
# 5. Final metadata update
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata.to_dict(), f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(save_path, metadata)
# 6. Update cache based on model type
if model_type == "checkpoint":

View File

@@ -32,12 +32,13 @@ class ModelCache:
all_folders = set(l['folder'] for l in self.raw_data)
self.folders = sorted(list(all_folders), key=lambda x: x.lower())
async def update_preview_url(self, file_path: str, preview_url: str) -> bool:
async def update_preview_url(self, file_path: str, preview_url: str, preview_nsfw_level: int) -> bool:
"""Update preview_url for a specific model in all cached data
Args:
file_path: The file path of the model to update
preview_url: The new preview URL
preview_nsfw_level: The NSFW level of the preview
Returns:
bool: True if the update was successful, False if the model wasn't found
@@ -47,19 +48,9 @@ class ModelCache:
for item in self.raw_data:
if item['file_path'] == file_path:
item['preview_url'] = preview_url
item['preview_nsfw_level'] = preview_nsfw_level
break
else:
return False # Model not found
# Update in sorted lists (references to the same dict objects)
for item in self.sorted_by_name:
if item['file_path'] == file_path:
item['preview_url'] = preview_url
break
for item in self.sorted_by_date:
if item['file_path'] == file_path:
item['preview_url'] = preview_url
break
return True

View File

@@ -9,7 +9,8 @@ import msgpack # Add MessagePack import for efficient serialization
from ..utils.models import BaseModelMetadata
from ..config import config
from ..utils.file_utils import load_metadata, get_file_info, find_preview_file, save_metadata
from ..utils.file_utils import find_preview_file
from ..utils.metadata_manager import MetadataManager
from .model_cache import ModelCache
from .model_hash_index import ModelHashIndex
from ..utils.constants import PREVIEW_EXTENSIONS
@@ -748,13 +749,17 @@ class ModelScanner:
"""Scan all model directories and return metadata"""
raise NotImplementedError("Subclasses must implement scan_all_models")
def is_initializing(self) -> bool:
"""Check if the scanner is currently initializing"""
return self._is_initializing
def get_model_roots(self) -> List[str]:
"""Get model root directories"""
raise NotImplementedError("Subclasses must implement get_model_roots")
async def _get_file_info(self, file_path: str) -> Optional[BaseModelMetadata]:
async def _create_default_metadata(self, file_path: str) -> Optional[BaseModelMetadata]:
"""Get model file info and metadata (extensible for different model types)"""
return await get_file_info(file_path, self.model_class)
return await MetadataManager.create_default_metadata(file_path, self.model_class)
def _calculate_folder(self, file_path: str) -> str:
"""Calculate the folder path for a model file"""
@@ -767,7 +772,7 @@ class ModelScanner:
# Common methods shared between scanners
async def _process_model_file(self, file_path: str, root_path: str) -> Dict:
"""Process a single model file and return its metadata"""
metadata = await load_metadata(file_path, self.model_class)
metadata = await MetadataManager.load_metadata(file_path, self.model_class)
if metadata is None:
civitai_info_path = f"{os.path.splitext(file_path)[0]}.civitai.info"
@@ -783,7 +788,7 @@ class ModelScanner:
metadata = self.model_class.from_civitai_info(version_info, file_info, file_path)
metadata.preview_url = find_preview_file(file_name, os.path.dirname(file_path))
await save_metadata(file_path, metadata)
await MetadataManager.save_metadata(file_path, metadata)
logger.debug(f"Created metadata from .civitai.info for {file_path}")
except Exception as e:
logger.error(f"Error creating metadata from .civitai.info for {file_path}: {e}")
@@ -810,13 +815,13 @@ class ModelScanner:
metadata.modelDescription = version_info['model']['description']
# Save the updated metadata
await save_metadata(file_path, metadata)
await MetadataManager.save_metadata(file_path, metadata)
logger.debug(f"Updated metadata with civitai info for {file_path}")
except Exception as e:
logger.error(f"Error restoring civitai data from .civitai.info for {file_path}: {e}")
if metadata is None:
metadata = await self._get_file_info(file_path)
metadata = await self._create_default_metadata(file_path)
model_data = metadata.to_dict()
@@ -866,9 +871,7 @@ class ModelScanner:
logger.warning(f"Model {model_id} appears to be deleted from Civitai (404 response)")
model_data['civitai_deleted'] = True
metadata_path = os.path.splitext(file_path)[0] + '.metadata.json'
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(model_data, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(file_path, model_data)
elif model_metadata:
logger.debug(f"Updating metadata for {file_path} with model ID {model_id}")
@@ -881,9 +884,7 @@ class ModelScanner:
model_data['civitai']['creator'] = model_metadata['creator']
metadata_path = os.path.splitext(file_path)[0] + '.metadata.json'
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(model_data, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(file_path, model_data)
except Exception as e:
logger.error(f"Failed to update metadata from Civitai for {file_path}: {e}")
@@ -1049,8 +1050,7 @@ class ModelScanner:
new_preview_path = os.path.join(preview_dir, f"{preview_name}{preview_ext}")
metadata['preview_url'] = new_preview_path.replace(os.sep, '/')
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(metadata_path, metadata)
return metadata
@@ -1184,12 +1184,13 @@ class ModelScanner:
"""Get list of excluded model file paths"""
return self._excluded_models.copy()
async def update_preview_in_cache(self, file_path: str, preview_url: str) -> bool:
async def update_preview_in_cache(self, file_path: str, preview_url: str, preview_nsfw_level: int) -> bool:
"""Update preview URL in cache for a specific lora
Args:
file_path: The file path of the lora to update
preview_url: The new preview URL
preview_nsfw_level: The NSFW level of the preview
Returns:
bool: True if the update was successful, False if cache doesn't exist or lora wasn't found
@@ -1197,7 +1198,7 @@ class ModelScanner:
if self._cache is None:
return False
updated = await self._cache.update_preview_url(file_path, preview_url)
updated = await self._cache.update_preview_url(file_path, preview_url, preview_nsfw_level)
if updated:
# Save updated cache to disk
await self._save_cache_to_disk()

View File

@@ -0,0 +1,399 @@
import logging
import os
import asyncio
import json
import time
import aiohttp
from aiohttp import web
from ..services.service_registry import ServiceRegistry
from .example_images_processor import ExampleImagesProcessor
from .example_images_metadata import MetadataUpdater
logger = logging.getLogger(__name__)
# Download status tracking
download_task = None
is_downloading = False
download_progress = {
'total': 0,
'completed': 0,
'current_model': '',
'status': 'idle', # idle, running, paused, completed, error
'errors': [],
'last_error': None,
'start_time': None,
'end_time': None,
'processed_models': set(), # Track models that have been processed
'refreshed_models': set() # Track models that had metadata refreshed
}
class DownloadManager:
"""Manages downloading example images for models"""
@staticmethod
async def start_download(request):
"""
Start downloading example images for models
Expects a JSON body with:
{
"output_dir": "path/to/output", # Base directory to save example images
"optimize": true, # Whether to optimize images (default: true)
"model_types": ["lora", "checkpoint"], # Model types to process (default: both)
"delay": 1.0 # Delay between downloads to avoid rate limiting (default: 1.0)
}
"""
global download_task, is_downloading, download_progress
if is_downloading:
# Create a copy for JSON serialization
response_progress = download_progress.copy()
response_progress['processed_models'] = list(download_progress['processed_models'])
response_progress['refreshed_models'] = list(download_progress['refreshed_models'])
return web.json_response({
'success': False,
'error': 'Download already in progress',
'status': response_progress
}, status=400)
try:
# Parse the request body
data = await request.json()
output_dir = data.get('output_dir')
optimize = data.get('optimize', True)
model_types = data.get('model_types', ['lora', 'checkpoint'])
delay = float(data.get('delay', 0.2)) # Default to 0.2 seconds
if not output_dir:
return web.json_response({
'success': False,
'error': 'Missing output_dir parameter'
}, status=400)
# Create the output directory
os.makedirs(output_dir, exist_ok=True)
# Initialize progress tracking
download_progress['total'] = 0
download_progress['completed'] = 0
download_progress['current_model'] = ''
download_progress['status'] = 'running'
download_progress['errors'] = []
download_progress['last_error'] = None
download_progress['start_time'] = time.time()
download_progress['end_time'] = None
# Get the processed models list from a file if it exists
progress_file = os.path.join(output_dir, '.download_progress.json')
if os.path.exists(progress_file):
try:
with open(progress_file, 'r', encoding='utf-8') as f:
saved_progress = json.load(f)
download_progress['processed_models'] = set(saved_progress.get('processed_models', []))
logger.info(f"Loaded previous progress, {len(download_progress['processed_models'])} models already processed")
except Exception as e:
logger.error(f"Failed to load progress file: {e}")
download_progress['processed_models'] = set()
else:
download_progress['processed_models'] = set()
# Start the download task
is_downloading = True
download_task = asyncio.create_task(
DownloadManager._download_all_example_images(
output_dir,
optimize,
model_types,
delay
)
)
# Create a copy for JSON serialization
response_progress = download_progress.copy()
response_progress['processed_models'] = list(download_progress['processed_models'])
response_progress['refreshed_models'] = list(download_progress['refreshed_models'])
return web.json_response({
'success': True,
'message': 'Download started',
'status': response_progress
})
except Exception as e:
logger.error(f"Failed to start example images download: {e}", exc_info=True)
return web.json_response({
'success': False,
'error': str(e)
}, status=500)
@staticmethod
async def get_status(request):
"""Get the current status of example images download"""
global download_progress
# Create a copy of the progress dict with the set converted to a list for JSON serialization
response_progress = download_progress.copy()
response_progress['processed_models'] = list(download_progress['processed_models'])
response_progress['refreshed_models'] = list(download_progress['refreshed_models'])
return web.json_response({
'success': True,
'is_downloading': is_downloading,
'status': response_progress
})
@staticmethod
async def pause_download(request):
"""Pause the example images download"""
global download_progress
if not is_downloading:
return web.json_response({
'success': False,
'error': 'No download in progress'
}, status=400)
download_progress['status'] = 'paused'
return web.json_response({
'success': True,
'message': 'Download paused'
})
@staticmethod
async def resume_download(request):
"""Resume the example images download"""
global download_progress
if not is_downloading:
return web.json_response({
'success': False,
'error': 'No download in progress'
}, status=400)
if download_progress['status'] == 'paused':
download_progress['status'] = 'running'
return web.json_response({
'success': True,
'message': 'Download resumed'
})
else:
return web.json_response({
'success': False,
'error': f"Download is in '{download_progress['status']}' state, cannot resume"
}, status=400)
@staticmethod
async def _download_all_example_images(output_dir, optimize, model_types, delay):
"""Download example images for all models"""
global is_downloading, download_progress
# Create independent download session
connector = aiohttp.TCPConnector(
ssl=True,
limit=3,
force_close=False,
enable_cleanup_closed=True
)
timeout = aiohttp.ClientTimeout(total=None, connect=60, sock_read=60)
independent_session = aiohttp.ClientSession(
connector=connector,
trust_env=True,
timeout=timeout
)
try:
# Get scanners
scanners = []
if 'lora' in model_types:
lora_scanner = await ServiceRegistry.get_lora_scanner()
scanners.append(('lora', lora_scanner))
if 'checkpoint' in model_types:
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
scanners.append(('checkpoint', checkpoint_scanner))
# Get all models
all_models = []
for scanner_type, scanner in scanners:
cache = await scanner.get_cached_data()
if cache and cache.raw_data:
for model in cache.raw_data:
if model.get('sha256'):
all_models.append((scanner_type, model, scanner))
# Update total count
download_progress['total'] = len(all_models)
logger.info(f"Found {download_progress['total']} models to process")
# Process each model
for i, (scanner_type, model, scanner) in enumerate(all_models):
# Main logic for processing model is here, but actual operations are delegated to other classes
was_remote_download = await DownloadManager._process_model(
scanner_type, model, scanner,
output_dir, optimize, independent_session
)
# Update progress
download_progress['completed'] += 1
# Only add delay after remote download of models, and not after processing the last model
if was_remote_download and i < len(all_models) - 1 and download_progress['status'] == 'running':
await asyncio.sleep(delay)
# Mark as completed
download_progress['status'] = 'completed'
download_progress['end_time'] = time.time()
logger.info(f"Example images download completed: {download_progress['completed']}/{download_progress['total']} models processed")
except Exception as e:
error_msg = f"Error during example images download: {str(e)}"
logger.error(error_msg, exc_info=True)
download_progress['errors'].append(error_msg)
download_progress['last_error'] = error_msg
download_progress['status'] = 'error'
download_progress['end_time'] = time.time()
finally:
# Close the independent session
try:
await independent_session.close()
except Exception as e:
logger.error(f"Error closing download session: {e}")
# Save final progress to file
try:
DownloadManager._save_progress(output_dir)
except Exception as e:
logger.error(f"Failed to save progress file: {e}")
# Set download status to not downloading
is_downloading = False
@staticmethod
async def _process_model(scanner_type, model, scanner, output_dir, optimize, independent_session):
"""Process a single model download"""
global download_progress
# Check if download is paused
while download_progress['status'] == 'paused':
await asyncio.sleep(1)
# Check if download should continue
if download_progress['status'] != 'running':
logger.info(f"Download stopped: {download_progress['status']}")
return False # Return False to indicate no remote download happened
model_hash = model.get('sha256', '').lower()
model_name = model.get('model_name', 'Unknown')
model_file_path = model.get('file_path', '')
model_file_name = model.get('file_name', '')
try:
# Update current model info
download_progress['current_model'] = f"{model_name} ({model_hash[:8]})"
# Skip if already processed
if model_hash in download_progress['processed_models']:
logger.debug(f"Skipping already processed model: {model_name}")
return False
# Create model directory
model_dir = os.path.join(output_dir, model_hash)
os.makedirs(model_dir, exist_ok=True)
# First check for local example images - local processing doesn't need delay
local_images_processed = await ExampleImagesProcessor.process_local_examples(
model_file_path, model_file_name, model_name, model_dir, optimize
)
# If we processed local images, update metadata
if local_images_processed:
await MetadataUpdater.update_metadata_from_local_examples(
model_hash, model, scanner_type, scanner, model_dir
)
download_progress['processed_models'].add(model_hash)
return False # Return False to indicate no remote download happened
# If no local images, try to download from remote
elif model.get('civitai') and model.get('civitai', {}).get('images'):
images = model.get('civitai', {}).get('images', [])
success, is_stale = await ExampleImagesProcessor.download_model_images(
model_hash, model_name, images, model_dir, optimize, independent_session
)
# If metadata is stale, try to refresh it
if is_stale and model_hash not in download_progress['refreshed_models']:
await MetadataUpdater.refresh_model_metadata(
model_hash, model_name, scanner_type, scanner
)
# Get the updated model data
updated_model = await MetadataUpdater.get_updated_model(
model_hash, scanner
)
if updated_model and updated_model.get('civitai', {}).get('images'):
# Retry download with updated metadata
updated_images = updated_model.get('civitai', {}).get('images', [])
success, _ = await ExampleImagesProcessor.download_model_images(
model_hash, model_name, updated_images, model_dir, optimize, independent_session
)
# Only mark as processed if all images were downloaded successfully
if success:
download_progress['processed_models'].add(model_hash)
return True # Return True to indicate a remote download happened
# Save progress periodically
if download_progress['completed'] % 10 == 0 or download_progress['completed'] == download_progress['total'] - 1:
DownloadManager._save_progress(output_dir)
return False # Default return if no conditions met
except Exception as e:
error_msg = f"Error processing model {model.get('model_name')}: {str(e)}"
logger.error(error_msg, exc_info=True)
download_progress['errors'].append(error_msg)
download_progress['last_error'] = error_msg
return False # Return False on exception
@staticmethod
def _save_progress(output_dir):
"""Save download progress to file"""
global download_progress
try:
progress_file = os.path.join(output_dir, '.download_progress.json')
# Read existing progress file if it exists
existing_data = {}
if os.path.exists(progress_file):
try:
with open(progress_file, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
except Exception as e:
logger.warning(f"Failed to read existing progress file: {e}")
# Create new progress data
progress_data = {
'processed_models': list(download_progress['processed_models']),
'refreshed_models': list(download_progress['refreshed_models']),
'completed': download_progress['completed'],
'total': download_progress['total'],
'last_update': time.time()
}
# Preserve existing fields (especially naming_version)
for key, value in existing_data.items():
if key not in progress_data:
progress_data[key] = value
# Write updated progress data
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump(progress_data, f, indent=2)
except Exception as e:
logger.error(f"Failed to save progress file: {e}")

View File

@@ -0,0 +1,201 @@
import logging
import os
import re
import sys
import subprocess
from aiohttp import web
from ..services.settings_manager import settings
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
logger = logging.getLogger(__name__)
class ExampleImagesFileManager:
"""Manages access and operations for example image files"""
@staticmethod
async def open_folder(request):
"""
Open the example images folder for a specific model
Expects a JSON request body with:
{
"model_hash": "sha256_hash" # SHA256 hash of the model
}
"""
try:
# Parse request body
data = await request.json()
model_hash = data.get('model_hash')
if not model_hash:
return web.json_response({
'success': False,
'error': 'Missing model_hash parameter'
}, status=400)
# Get example images path from settings
example_images_path = settings.get('example_images_path')
if not example_images_path:
return web.json_response({
'success': False,
'error': 'No example images path configured. Please set it in the settings panel first.'
}, status=400)
# Construct folder path for this model
model_folder = os.path.join(example_images_path, model_hash)
# Check if folder exists
if not os.path.exists(model_folder):
return web.json_response({
'success': False,
'error': 'No example images found for this model. Download example images first.'
}, status=404)
# Open folder in file explorer
if os.name == 'nt': # Windows
os.startfile(model_folder)
elif os.name == 'posix': # macOS and Linux
if sys.platform == 'darwin': # macOS
subprocess.Popen(['open', model_folder])
else: # Linux
subprocess.Popen(['xdg-open', model_folder])
return web.json_response({
'success': True,
'message': f'Opened example images folder for model {model_hash}'
})
except Exception as e:
logger.error(f"Failed to open example images folder: {e}", exc_info=True)
return web.json_response({
'success': False,
'error': str(e)
}, status=500)
@staticmethod
async def get_files(request):
"""
Get the list of example image files for a specific model
Expects:
- model_hash in query parameters
Returns:
- List of image files and their paths
"""
try:
# Get model_hash from query parameters
model_hash = request.query.get('model_hash')
if not model_hash:
return web.json_response({
'success': False,
'error': 'Missing model_hash parameter'
}, status=400)
# Get example images path from settings
example_images_path = settings.get('example_images_path')
if not example_images_path:
return web.json_response({
'success': False,
'error': 'No example images path configured'
}, status=400)
# Construct folder path for this model
model_folder = os.path.join(example_images_path, model_hash)
# Check if folder exists
if not os.path.exists(model_folder):
return web.json_response({
'success': False,
'error': 'No example images found for this model',
'files': []
}, status=404)
# Get list of files in the folder
files = []
for file in os.listdir(model_folder):
file_path = os.path.join(model_folder, file)
if os.path.isfile(file_path):
# Check if file is a supported media file
file_ext = os.path.splitext(file)[1].lower()
if (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']):
files.append({
'name': file,
'path': f'/example_images_static/{model_hash}/{file}',
'extension': file_ext,
'is_video': file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
})
return web.json_response({
'success': True,
'files': files
})
except Exception as e:
logger.error(f"Failed to get example image files: {e}", exc_info=True)
return web.json_response({
'success': False,
'error': str(e)
}, status=500)
@staticmethod
async def has_images(request):
"""
Check if the example images folder for a model exists and is not empty
Expects:
- model_hash in query parameters
Returns:
- Boolean indicating whether the folder exists and contains images/videos
"""
try:
# Get model_hash from query parameters
model_hash = request.query.get('model_hash')
if not model_hash:
return web.json_response({
'success': False,
'error': 'Missing model_hash parameter'
}, status=400)
# Get example images path from settings
example_images_path = settings.get('example_images_path')
if not example_images_path:
return web.json_response({
'has_images': False
})
# Construct folder path for this model
model_folder = os.path.join(example_images_path, model_hash)
# Check if folder exists
if not os.path.exists(model_folder) or not os.path.isdir(model_folder):
return web.json_response({
'has_images': False
})
# Check if folder contains any supported media files
for file in os.listdir(model_folder):
file_path = os.path.join(model_folder, file)
if os.path.isfile(file_path):
file_ext = os.path.splitext(file)[1].lower()
if (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']):
return web.json_response({
'has_images': True
})
# If reached here, folder exists but has no supported media files
return web.json_response({
'has_images': False
})
except Exception as e:
logger.error(f"Failed to check example images folder: {e}", exc_info=True)
return web.json_response({
'has_images': False,
'error': str(e)
})

View File

@@ -0,0 +1,390 @@
import logging
import os
import re
from ..utils.metadata_manager import MetadataManager
from ..utils.routes_common import ModelRouteUtils
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
from ..utils.exif_utils import ExifUtils
from ..recipes.constants import GEN_PARAM_KEYS
logger = logging.getLogger(__name__)
class MetadataUpdater:
"""Handles updating model metadata related to example images"""
@staticmethod
async def refresh_model_metadata(model_hash, model_name, scanner_type, scanner):
"""Refresh model metadata from CivitAI
Args:
model_hash: SHA256 hash of the model
model_name: Model name (for logging)
scanner_type: Scanner type ('lora' or 'checkpoint')
scanner: Scanner instance for this model type
Returns:
bool: True if metadata was successfully refreshed, False otherwise
"""
from ..utils.example_images_download_manager import download_progress
try:
# Find the model in the scanner cache
cache = await scanner.get_cached_data()
model_data = None
for item in cache.raw_data:
if item.get('sha256') == model_hash:
model_data = item
break
if not model_data:
logger.warning(f"Model {model_name} with hash {model_hash} not found in cache")
return False
file_path = model_data.get('file_path')
if not file_path:
logger.warning(f"Model {model_name} has no file path")
return False
# Track that we're refreshing this model
download_progress['refreshed_models'].add(model_hash)
# Use ModelRouteUtils to refresh metadata
async def update_cache_func(old_path, new_path, metadata):
return await scanner.update_single_model_cache(old_path, new_path, metadata)
success = await ModelRouteUtils.fetch_and_update_model(
model_hash,
file_path,
model_data,
update_cache_func
)
if success:
logger.info(f"Successfully refreshed metadata for {model_name}")
return True
else:
logger.warning(f"Failed to refresh metadata for {model_name}")
return False
except Exception as e:
error_msg = f"Error refreshing metadata for {model_name}: {str(e)}"
logger.error(error_msg, exc_info=True)
download_progress['errors'].append(error_msg)
download_progress['last_error'] = error_msg
return False
@staticmethod
async def get_updated_model(model_hash, scanner):
"""Get updated model data
Args:
model_hash: SHA256 hash of the model
scanner: Scanner instance
Returns:
dict: Updated model data or None if not found
"""
cache = await scanner.get_cached_data()
for item in cache.raw_data:
if item.get('sha256') == model_hash:
return item
return None
@staticmethod
async def update_metadata_from_local_examples(model_hash, model, scanner_type, scanner, model_dir):
"""Update model metadata with local example image information
Args:
model_hash: SHA256 hash of the model
model: Model data dictionary
scanner_type: Scanner type ('lora' or 'checkpoint')
scanner: Scanner instance for this model type
model_dir: Model images directory
Returns:
bool: True if metadata was successfully updated, False otherwise
"""
try:
# Collect local image paths
local_images_paths = []
if os.path.exists(model_dir):
for file in os.listdir(model_dir):
file_path = os.path.join(model_dir, file)
if os.path.isfile(file_path):
file_ext = os.path.splitext(file)[1].lower()
is_supported = (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos'])
if is_supported:
local_images_paths.append(file_path)
# Check if metadata update is needed (no civitai field or empty images)
needs_update = not model.get('civitai') or not model.get('civitai', {}).get('images')
if needs_update and local_images_paths:
logger.debug(f"Found {len(local_images_paths)} local example images for {model.get('model_name')}, updating metadata")
# Create or get civitai field
if not model.get('civitai'):
model['civitai'] = {}
# Create images array
images = []
# Generate metadata for each local image/video
for path in local_images_paths:
# Determine if video or image
file_ext = os.path.splitext(path)[1].lower()
is_video = file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
# Create image metadata entry
image_entry = {
"url": "", # Empty URL as required
"nsfwLevel": 0,
"width": 720, # Default dimensions
"height": 1280,
"type": "video" if is_video else "image",
"meta": None,
"hasMeta": False,
"hasPositivePrompt": False
}
# If it's an image, try to get actual dimensions (optional enhancement)
try:
from PIL import Image
if not is_video and os.path.exists(path):
with Image.open(path) as img:
image_entry["width"], image_entry["height"] = img.size
except:
# If PIL fails or is unavailable, use default dimensions
pass
images.append(image_entry)
# Update the model's civitai.images field
model['civitai']['images'] = images
# Save metadata to .metadata.json file
file_path = model.get('file_path')
try:
# Create a copy of model data without 'folder' field
model_copy = model.copy()
model_copy.pop('folder', None)
# Write metadata to file
await MetadataManager.save_metadata(file_path, model_copy)
logger.info(f"Saved metadata for {model.get('model_name')}")
except Exception as e:
logger.error(f"Failed to save metadata for {model.get('model_name')}: {str(e)}")
# Save updated metadata to scanner cache
success = await scanner.update_single_model_cache(file_path, file_path, model)
if success:
logger.info(f"Successfully updated metadata for {model.get('model_name')} with {len(images)} local examples")
return True
else:
logger.warning(f"Failed to update metadata for {model.get('model_name')}")
return False
except Exception as e:
logger.error(f"Error updating metadata from local examples: {str(e)}", exc_info=True)
return False
@staticmethod
async def update_metadata_after_import(model_hash, model_data, scanner, newly_imported_paths):
"""Update model metadata after importing example images
Args:
model_hash: SHA256 hash of the model
model_data: Model data dictionary
scanner: Scanner instance (lora or checkpoint)
newly_imported_paths: List of paths to newly imported files
Returns:
tuple: (regular_images, custom_images) - Both image arrays
"""
try:
# Ensure civitai field exists in model_data
if not model_data.get('civitai'):
model_data['civitai'] = {}
# Ensure customImages array exists
if not model_data['civitai'].get('customImages'):
model_data['civitai']['customImages'] = []
# Get current customImages array
custom_images = model_data['civitai']['customImages']
# Add new image entry for each imported file
for path_tuple in newly_imported_paths:
path, short_id = path_tuple
# Determine if video or image
file_ext = os.path.splitext(path)[1].lower()
is_video = file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
# Create image metadata entry
image_entry = {
"url": "", # Empty URL as requested
"id": short_id,
"nsfwLevel": 0,
"width": 720, # Default dimensions
"height": 1280,
"type": "video" if is_video else "image",
"meta": None,
"hasMeta": False,
"hasPositivePrompt": False
}
# Extract and parse metadata if this is an image
if not is_video:
try:
# Extract metadata from image
extracted_metadata = ExifUtils.extract_image_metadata(path)
if extracted_metadata:
# Parse the extracted metadata to get generation parameters
parsed_meta = MetadataUpdater._parse_image_metadata(extracted_metadata)
if parsed_meta:
image_entry["meta"] = parsed_meta
image_entry["hasMeta"] = True
image_entry["hasPositivePrompt"] = bool(parsed_meta.get("prompt", ""))
logger.debug(f"Extracted metadata from {os.path.basename(path)}")
except Exception as e:
logger.warning(f"Failed to extract metadata from {os.path.basename(path)}: {e}")
# If it's an image, try to get actual dimensions
try:
from PIL import Image
if not is_video and os.path.exists(path):
with Image.open(path) as img:
image_entry["width"], image_entry["height"] = img.size
except:
# If PIL fails or is unavailable, use default dimensions
pass
# Append to existing customImages array
custom_images.append(image_entry)
# Save metadata to .metadata.json file
file_path = model_data.get('file_path')
if file_path:
try:
# Create a copy of model data without 'folder' field
model_copy = model_data.copy()
model_copy.pop('folder', None)
# Write metadata to file
await MetadataManager.save_metadata(file_path, model_copy)
logger.info(f"Saved metadata for {model_data.get('model_name')}")
except Exception as e:
logger.error(f"Failed to save metadata: {str(e)}")
# Save updated metadata to scanner cache
if file_path:
await scanner.update_single_model_cache(file_path, file_path, model_data)
# Get regular images array (might be None)
regular_images = model_data['civitai'].get('images', [])
# Return both image arrays
return regular_images, custom_images
except Exception as e:
logger.error(f"Failed to update metadata after import: {e}", exc_info=True)
return [], []
@staticmethod
def _parse_image_metadata(user_comment):
"""Parse metadata from image to extract generation parameters
Args:
user_comment: Metadata string extracted from image
Returns:
dict: Parsed metadata with generation parameters
"""
if not user_comment:
return None
try:
# Initialize metadata dictionary
metadata = {}
# Split on Negative prompt if it exists
if "Negative prompt:" in user_comment:
parts = user_comment.split('Negative prompt:', 1)
prompt = parts[0].strip()
negative_and_params = parts[1] if len(parts) > 1 else ""
else:
# No negative prompt section
param_start = re.search(r'Steps: \d+', user_comment)
if param_start:
prompt = user_comment[:param_start.start()].strip()
negative_and_params = user_comment[param_start.start():]
else:
prompt = user_comment.strip()
negative_and_params = ""
# Add prompt if it's in GEN_PARAM_KEYS
if 'prompt' in GEN_PARAM_KEYS:
metadata['prompt'] = prompt
# Extract negative prompt and parameters
if negative_and_params:
# If we split on "Negative prompt:", check for params section
if "Negative prompt:" in user_comment:
param_start = re.search(r'Steps: ', negative_and_params)
if param_start:
neg_prompt = negative_and_params[:param_start.start()].strip()
if 'negative_prompt' in GEN_PARAM_KEYS:
metadata['negative_prompt'] = neg_prompt
params_section = negative_and_params[param_start.start():]
else:
if 'negative_prompt' in GEN_PARAM_KEYS:
metadata['negative_prompt'] = negative_and_params.strip()
params_section = ""
else:
# No negative prompt, entire section is params
params_section = negative_and_params
# Extract generation parameters
if params_section:
# Extract basic parameters
param_pattern = r'([A-Za-z\s]+): ([^,]+)'
params = re.findall(param_pattern, params_section)
for key, value in params:
clean_key = key.strip().lower().replace(' ', '_')
# Skip if not in recognized gen param keys
if clean_key not in GEN_PARAM_KEYS:
continue
# Convert numeric values
if clean_key in ['steps', 'seed']:
try:
metadata[clean_key] = int(value.strip())
except ValueError:
metadata[clean_key] = value.strip()
elif clean_key in ['cfg_scale']:
try:
metadata[clean_key] = float(value.strip())
except ValueError:
metadata[clean_key] = value.strip()
else:
metadata[clean_key] = value.strip()
# Extract size if available and add if a recognized key
size_match = re.search(r'Size: (\d+)x(\d+)', params_section)
if size_match and 'size' in GEN_PARAM_KEYS:
width, height = size_match.groups()
metadata['size'] = f"{width}x{height}"
# Return metadata if we have any entries
return metadata if metadata else None
except Exception as e:
logger.error(f"Error parsing image metadata: {e}", exc_info=True)
return None

View File

@@ -0,0 +1,318 @@
import asyncio
import logging
import os
import re
import json
from ..services.settings_manager import settings
from ..services.service_registry import ServiceRegistry
from ..utils.metadata_manager import MetadataManager
from ..utils.example_images_processor import ExampleImagesProcessor
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
logger = logging.getLogger(__name__)
CURRENT_NAMING_VERSION = 2 # Increment this when naming conventions change
class ExampleImagesMigration:
"""Handles migrations for example images naming conventions"""
@staticmethod
async def check_and_run_migrations():
"""Check if migrations are needed and run them in background"""
example_images_path = settings.get('example_images_path')
if not example_images_path or not os.path.exists(example_images_path):
logger.debug("No example images path configured or path doesn't exist, skipping migrations")
return
# Check current version from progress file
current_version = 0
progress_file = os.path.join(example_images_path, '.download_progress.json')
if os.path.exists(progress_file):
try:
with open(progress_file, 'r', encoding='utf-8') as f:
progress_data = json.load(f)
current_version = progress_data.get('naming_version', 0)
except Exception as e:
logger.error(f"Failed to load progress file for migration check: {e}")
# If current version is less than target version, start migration
if current_version < CURRENT_NAMING_VERSION:
logger.info(f"Starting example images naming migration from v{current_version} to v{CURRENT_NAMING_VERSION}")
# Start migration in background task
asyncio.create_task(
ExampleImagesMigration.run_migrations(example_images_path, current_version, CURRENT_NAMING_VERSION)
)
@staticmethod
async def run_migrations(example_images_path, from_version, to_version):
"""Run necessary migrations based on version difference"""
try:
# Get all model folders
model_folders = []
for item in os.listdir(example_images_path):
item_path = os.path.join(example_images_path, item)
if os.path.isdir(item_path) and len(item) == 64: # SHA256 hash is 64 chars
model_folders.append(item_path)
logger.info(f"Found {len(model_folders)} model folders to check for migration")
# Apply migrations sequentially
if from_version < 1 and to_version >= 1:
await ExampleImagesMigration._migrate_to_v1(model_folders)
if from_version < 2 and to_version >= 2:
await ExampleImagesMigration._migrate_to_v2(model_folders)
# Update version in progress file
progress_file = os.path.join(example_images_path, '.download_progress.json')
try:
progress_data = {}
if os.path.exists(progress_file):
with open(progress_file, 'r', encoding='utf-8') as f:
progress_data = json.load(f)
progress_data['naming_version'] = to_version
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump(progress_data, f, indent=2)
logger.info(f"Example images naming migration to v{to_version} completed")
except Exception as e:
logger.error(f"Failed to update version in progress file: {e}")
except Exception as e:
logger.error(f"Error during migration: {e}", exc_info=True)
@staticmethod
async def _migrate_to_v1(model_folders):
"""Migrate from 1-based to 0-based indexing"""
count = 0
for folder in model_folders:
has_one_based = False
has_zero_based = False
files_to_rename = []
# Check naming pattern in this folder
for file in os.listdir(folder):
if re.match(r'image_1\.\w+$', file):
has_one_based = True
if re.match(r'image_0\.\w+$', file):
has_zero_based = True
# Only migrate folders with 1-based indexing and no 0-based
if has_one_based and not has_zero_based:
# Create rename mapping
for file in os.listdir(folder):
match = re.match(r'image_(\d+)\.(\w+)$', file)
if match:
index = int(match.group(1))
ext = match.group(2)
if index > 0: # Only rename if index is positive
files_to_rename.append((
file,
f"image_{index-1}.{ext}"
))
# Use temporary names to avoid conflicts
for old_name, new_name in files_to_rename:
old_path = os.path.join(folder, old_name)
temp_path = os.path.join(folder, f"temp_{old_name}")
try:
os.rename(old_path, temp_path)
except Exception as e:
logger.error(f"Failed to rename {old_path} to {temp_path}: {e}")
# Rename from temporary names to final names
for old_name, new_name in files_to_rename:
temp_path = os.path.join(folder, f"temp_{old_name}")
new_path = os.path.join(folder, new_name)
try:
os.rename(temp_path, new_path)
logger.debug(f"Renamed {old_name} to {new_name} in {folder}")
except Exception as e:
logger.error(f"Failed to rename {temp_path} to {new_path}: {e}")
count += 1
# Give other tasks a chance to run
if count % 10 == 0:
await asyncio.sleep(0)
logger.info(f"Migrated {count} folders from 1-based to 0-based indexing")
@staticmethod
async def _migrate_to_v2(model_folders):
"""
Migrate to v2 naming scheme:
- Move custom examples from images array to customImages array
- Rename files from image_<index>.<ext> to custom_<short_id>.<ext>
- Add id field to each custom image entry
"""
count = 0
updated_models = 0
migration_errors = 0
# Get scanner instances
lora_scanner = await ServiceRegistry.get_lora_scanner()
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
# Wait until scanners are initialized
scanners = [lora_scanner, checkpoint_scanner]
for scanner in scanners:
if scanner.is_initializing():
logger.info("Waiting for scanners to complete initialization before starting migration...")
initialized = False
retry_count = 0
while not initialized and retry_count < 120: # Wait up to 120 seconds
await asyncio.sleep(1)
initialized = not scanner.is_initializing()
retry_count += 1
if not initialized:
logger.warning("Scanner initialization timeout - proceeding with migration anyway")
logger.info(f"Starting migration to v2 naming scheme for {len(model_folders)} model folders")
for folder in model_folders:
try:
# Extract model hash from folder name
model_hash = os.path.basename(folder)
if not model_hash or len(model_hash) != 64:
continue
# Find the model in scanner cache
model_data = None
scanner = None
for scan_obj in scanners:
if scan_obj.has_hash(model_hash):
cache = await scan_obj.get_cached_data()
for item in cache.raw_data:
if item.get('sha256') == model_hash:
model_data = item
scanner = scan_obj
break
if model_data:
break
if not model_data or not scanner:
logger.debug(f"Model with hash {model_hash} not found in cache, skipping migration")
continue
# Clone model data to avoid modifying the cache directly
model_metadata = model_data.copy()
# Check if model has civitai metadata
if not model_metadata.get('civitai'):
continue
# Get images array
images = model_metadata.get('civitai', {}).get('images', [])
if not images:
continue
# Initialize customImages array if it doesn't exist
if not model_metadata['civitai'].get('customImages'):
model_metadata['civitai']['customImages'] = []
# Find custom examples (entries with empty url)
custom_indices = []
for i, image in enumerate(images):
if image.get('url') == "":
custom_indices.append(i)
if not custom_indices:
continue
logger.debug(f"Found {len(custom_indices)} custom examples in {model_hash}")
# Process each custom example
for index in custom_indices:
try:
image_entry = images[index]
# Determine media type based on the entry type
media_type = 'videos' if image_entry.get('type') == 'video' else 'images'
extensions_to_try = SUPPORTED_MEDIA_EXTENSIONS[media_type]
# Find the image file by trying possible extensions
old_path = None
old_filename = None
found = False
for ext in extensions_to_try:
test_path = os.path.join(folder, f"image_{index}{ext}")
if os.path.exists(test_path):
old_path = test_path
old_filename = f"image_{index}{ext}"
found = True
break
if not found:
logger.warning(f"Could not find file for index {index} in {model_hash}, skipping")
continue
# Generate short ID for the custom example
short_id = ExampleImagesProcessor.generate_short_id()
# Get file extension
file_ext = os.path.splitext(old_path)[1]
# Create new filename
new_filename = f"custom_{short_id}{file_ext}"
new_path = os.path.join(folder, new_filename)
# Rename the file
try:
os.rename(old_path, new_path)
logger.debug(f"Renamed {old_filename} to {new_filename} in {folder}")
except Exception as e:
logger.error(f"Failed to rename {old_path} to {new_path}: {e}")
continue
# Create a copy of the image entry with the id field
custom_entry = image_entry.copy()
custom_entry['id'] = short_id
# Add to customImages array
model_metadata['civitai']['customImages'].append(custom_entry)
count += 1
except Exception as e:
logger.error(f"Error migrating custom example at index {index} for {model_hash}: {e}")
# Remove custom examples from the original images array
model_metadata['civitai']['images'] = [
img for i, img in enumerate(images) if i not in custom_indices
]
# Save the updated metadata
file_path = model_data.get('file_path')
if file_path:
try:
# Create a copy of model data without 'folder' field
model_copy = model_metadata.copy()
model_copy.pop('folder', None)
# Save metadata to file
await MetadataManager.save_metadata(file_path, model_copy)
# Update scanner cache
await scanner.update_single_model_cache(file_path, file_path, model_metadata)
updated_models += 1
except Exception as e:
logger.error(f"Failed to save metadata for {model_hash}: {e}")
migration_errors += 1
# Give other tasks a chance to run
if count % 10 == 0:
await asyncio.sleep(0)
except Exception as e:
logger.error(f"Error migrating folder {folder}: {e}")
migration_errors += 1
logger.info(f"Migration to v2 complete: migrated {count} custom examples across {updated_models} models with {migration_errors} errors")

View File

@@ -0,0 +1,494 @@
import logging
import os
import re
import tempfile
import random
import string
from aiohttp import web
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
from ..services.service_registry import ServiceRegistry
from ..services.settings_manager import settings
from .example_images_metadata import MetadataUpdater
from ..utils.metadata_manager import MetadataManager
logger = logging.getLogger(__name__)
class ExampleImagesProcessor:
"""Processes and manipulates example images"""
@staticmethod
def generate_short_id(length=8):
"""Generate a short random alphanumeric identifier"""
chars = string.ascii_lowercase + string.digits
return ''.join(random.choice(chars) for _ in range(length))
@staticmethod
def get_civitai_optimized_url(image_url):
"""Convert Civitai image URL to its optimized WebP version"""
base_pattern = r'(https://image\.civitai\.com/[^/]+/[^/]+)'
match = re.match(base_pattern, image_url)
if match:
base_url = match.group(1)
return f"{base_url}/optimized=true/image.webp"
return image_url
@staticmethod
async def download_model_images(model_hash, model_name, model_images, model_dir, optimize, independent_session):
"""Download images for a single model
Returns:
tuple: (success, is_stale_metadata) - whether download was successful, whether metadata is stale
"""
model_success = True
for i, image in enumerate(model_images):
image_url = image.get('url')
if not image_url:
continue
# Get image filename from URL
image_filename = os.path.basename(image_url.split('?')[0])
image_ext = os.path.splitext(image_filename)[1].lower()
# Handle images and videos
is_image = image_ext in SUPPORTED_MEDIA_EXTENSIONS['images']
is_video = image_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
if not (is_image or is_video):
logger.debug(f"Skipping unsupported file type: {image_filename}")
continue
# Use 0-based indexing instead of 1-based indexing
save_filename = f"image_{i}{image_ext}"
# If optimizing images and this is a Civitai image, use their pre-optimized WebP version
if is_image and optimize and 'civitai.com' in image_url:
image_url = ExampleImagesProcessor.get_civitai_optimized_url(image_url)
save_filename = f"image_{i}.webp"
# Check if already downloaded
save_path = os.path.join(model_dir, save_filename)
if os.path.exists(save_path):
logger.debug(f"File already exists: {save_path}")
continue
# Download the file
try:
logger.debug(f"Downloading {save_filename} for {model_name}")
# Download directly using the independent session
async with independent_session.get(image_url, timeout=60) as response:
if response.status == 200:
with open(save_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
if chunk:
f.write(chunk)
elif response.status == 404:
error_msg = f"Failed to download file: {image_url}, status code: 404 - Model metadata might be stale"
logger.warning(error_msg)
model_success = False # Mark the model as failed due to 404 error
# Return early to trigger metadata refresh attempt
return False, True # (success, is_metadata_stale)
else:
error_msg = f"Failed to download file: {image_url}, status code: {response.status}"
logger.warning(error_msg)
model_success = False # Mark the model as failed
except Exception as e:
error_msg = f"Error downloading file {image_url}: {str(e)}"
logger.error(error_msg)
model_success = False # Mark the model as failed
return model_success, False # (success, is_metadata_stale)
@staticmethod
async def process_local_examples(model_file_path, model_file_name, model_name, model_dir, optimize):
"""Process local example images
Returns:
bool: True if local images were processed successfully, False otherwise
"""
try:
if not model_file_path or not os.path.exists(os.path.dirname(model_file_path)):
return False
model_dir_path = os.path.dirname(model_file_path)
local_images = []
# Look for files with pattern: filename.example.*.ext
if model_file_name:
example_prefix = f"{model_file_name}.example."
if os.path.exists(model_dir_path):
for file in os.listdir(model_dir_path):
file_lower = file.lower()
if file_lower.startswith(example_prefix.lower()):
file_ext = os.path.splitext(file_lower)[1]
is_supported = (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos'])
if is_supported:
local_images.append(os.path.join(model_dir_path, file))
# Process local images if found
if local_images:
logger.info(f"Found {len(local_images)} local example images for {model_name}")
for local_image_path in local_images:
# Extract index from filename
file_name = os.path.basename(local_image_path)
example_prefix = f"{model_file_name}.example."
try:
# Extract the part between '.example.' and the file extension
index_part = file_name[len(example_prefix):].split('.')[0]
# Try to parse it as an integer
index = int(index_part)
local_ext = os.path.splitext(local_image_path)[1].lower()
save_filename = f"image_{index}{local_ext}"
except (ValueError, IndexError):
# If we can't parse the index, fall back to sequential numbering
logger.warning(f"Could not extract index from {file_name}, using sequential numbering")
local_ext = os.path.splitext(local_image_path)[1].lower()
save_filename = f"image_{len(local_images)}{local_ext}"
save_path = os.path.join(model_dir, save_filename)
# Skip if already exists in output directory
if os.path.exists(save_path):
logger.debug(f"File already exists in output: {save_path}")
continue
# Copy the file
with open(local_image_path, 'rb') as src_file:
with open(save_path, 'wb') as dst_file:
dst_file.write(src_file.read())
return True
return False
except Exception as e:
logger.error(f"Error processing local examples for {model_name}: {str(e)}")
return False
@staticmethod
async def import_images(request):
"""
Import local example images
Accepts:
- multipart/form-data form with model_hash and files fields
or
- JSON request with model_hash and file_paths
Returns:
- Success status and list of imported files
"""
try:
model_hash = None
files_to_import = []
temp_files_to_cleanup = []
# Check if it's a multipart form-data request (direct file upload)
if request.content_type and 'multipart/form-data' in request.content_type:
reader = await request.multipart()
# First get model_hash
field = await reader.next()
if field.name == 'model_hash':
model_hash = await field.text()
# Then process all files
while True:
field = await reader.next()
if field is None:
break
if field.name == 'files':
# Create a temporary file with appropriate suffix for type detection
file_name = field.filename
file_ext = os.path.splitext(file_name)[1].lower()
with tempfile.NamedTemporaryFile(suffix=file_ext, delete=False) as tmp_file:
temp_path = tmp_file.name
temp_files_to_cleanup.append(temp_path) # Track for cleanup
# Write chunks to the temporary file
while True:
chunk = await field.read_chunk()
if not chunk:
break
tmp_file.write(chunk)
# Add to the list of files to process
files_to_import.append(temp_path)
else:
# Parse JSON request (legacy method using file paths)
data = await request.json()
model_hash = data.get('model_hash')
files_to_import = data.get('file_paths', [])
if not model_hash:
return web.json_response({
'success': False,
'error': 'Missing model_hash parameter'
}, status=400)
if not files_to_import:
return web.json_response({
'success': False,
'error': 'No files provided to import'
}, status=400)
# Get example images path
example_images_path = settings.get('example_images_path')
if not example_images_path:
return web.json_response({
'success': False,
'error': 'No example images path configured'
}, status=400)
# Find the model and get current metadata
lora_scanner = await ServiceRegistry.get_lora_scanner()
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
model_data = None
scanner = None
# Check both scanners to find the model
for scan_obj in [lora_scanner, checkpoint_scanner]:
cache = await scan_obj.get_cached_data()
for item in cache.raw_data:
if item.get('sha256') == model_hash:
model_data = item
scanner = scan_obj
break
if model_data:
break
if not model_data:
return web.json_response({
'success': False,
'error': f"Model with hash {model_hash} not found in cache"
}, status=404)
# Create model folder
model_folder = os.path.join(example_images_path, model_hash)
os.makedirs(model_folder, exist_ok=True)
imported_files = []
errors = []
newly_imported_paths = []
# Process each file path
for file_path in files_to_import:
try:
# Ensure the file exists
if not os.path.isfile(file_path):
errors.append(f"File not found: {file_path}")
continue
# Check if file type is supported
file_ext = os.path.splitext(file_path)[1].lower()
if not (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']):
errors.append(f"Unsupported file type: {file_path}")
continue
# Generate new filename using short ID instead of UUID
short_id = ExampleImagesProcessor.generate_short_id()
new_filename = f"custom_{short_id}{file_ext}"
dest_path = os.path.join(model_folder, new_filename)
# Copy the file
import shutil
shutil.copy2(file_path, dest_path)
# Store both the dest_path and the short_id
newly_imported_paths.append((dest_path, short_id))
# Add to imported files list
imported_files.append({
'name': new_filename,
'path': f'/example_images_static/{model_hash}/{new_filename}',
'extension': file_ext,
'is_video': file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
})
except Exception as e:
errors.append(f"Error importing {file_path}: {str(e)}")
# Update metadata with new example images
regular_images, custom_images = await MetadataUpdater.update_metadata_after_import(
model_hash,
model_data,
scanner,
newly_imported_paths
)
return web.json_response({
'success': len(imported_files) > 0,
'message': f'Successfully imported {len(imported_files)} files' +
(f' with {len(errors)} errors' if errors else ''),
'files': imported_files,
'errors': errors,
'regular_images': regular_images,
'custom_images': custom_images,
"model_file_path": model_data.get('file_path', ''),
})
except Exception as e:
logger.error(f"Failed to import example images: {e}", exc_info=True)
return web.json_response({
'success': False,
'error': str(e)
}, status=500)
finally:
# Clean up temporary files
for temp_file in temp_files_to_cleanup:
try:
os.remove(temp_file)
except Exception as e:
logger.error(f"Failed to remove temporary file {temp_file}: {e}")
@staticmethod
async def delete_custom_image(request):
"""
Delete a custom example image for a model
Accepts:
- JSON request with model_hash and short_id
Returns:
- Success status and updated image lists
"""
try:
# Parse request data
data = await request.json()
model_hash = data.get('model_hash')
short_id = data.get('short_id')
if not model_hash or not short_id:
return web.json_response({
'success': False,
'error': 'Missing required parameters: model_hash and short_id'
}, status=400)
# Get example images path
example_images_path = settings.get('example_images_path')
if not example_images_path:
return web.json_response({
'success': False,
'error': 'No example images path configured'
}, status=400)
# Find the model and get current metadata
lora_scanner = await ServiceRegistry.get_lora_scanner()
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
model_data = None
scanner = None
# Check both scanners to find the model
for scan_obj in [lora_scanner, checkpoint_scanner]:
if scan_obj.has_hash(model_hash):
cache = await scan_obj.get_cached_data()
for item in cache.raw_data:
if item.get('sha256') == model_hash:
model_data = item
scanner = scan_obj
break
if model_data:
break
if not model_data:
return web.json_response({
'success': False,
'error': f"Model with hash {model_hash} not found in cache"
}, status=404)
# Check if model has custom images
if not model_data.get('civitai', {}).get('customImages'):
return web.json_response({
'success': False,
'error': f"Model has no custom images"
}, status=404)
# Find the custom image with matching short_id
custom_images = model_data['civitai']['customImages']
matching_image = None
new_custom_images = []
for image in custom_images:
if image.get('id') == short_id:
matching_image = image
else:
new_custom_images.append(image)
if not matching_image:
return web.json_response({
'success': False,
'error': f"Custom image with id {short_id} not found"
}, status=404)
# Find and delete the actual file
model_folder = os.path.join(example_images_path, model_hash)
file_deleted = False
if os.path.exists(model_folder):
for filename in os.listdir(model_folder):
if f"custom_{short_id}" in filename:
file_path = os.path.join(model_folder, filename)
try:
os.remove(file_path)
file_deleted = True
logger.info(f"Deleted custom example file: {file_path}")
break
except Exception as e:
return web.json_response({
'success': False,
'error': f"Failed to delete file: {str(e)}"
}, status=500)
if not file_deleted:
logger.warning(f"File for custom example with id {short_id} not found, but metadata will still be updated")
# Update metadata
model_data['civitai']['customImages'] = new_custom_images
# Save updated metadata to file
file_path = model_data.get('file_path')
if file_path:
try:
# Create a copy of model data without 'folder' field
model_copy = model_data.copy()
model_copy.pop('folder', None)
# Write metadata to file
await MetadataManager.save_metadata(file_path, model_copy)
logger.debug(f"Saved updated metadata for {model_data.get('model_name')}")
except Exception as e:
logger.error(f"Failed to save metadata: {str(e)}")
return web.json_response({
'success': False,
'error': f"Failed to save metadata: {str(e)}"
}, status=500)
# Update cache
await scanner.update_single_model_cache(file_path, file_path, model_data)
# Get regular images array (might be None)
regular_images = model_data['civitai'].get('images', [])
return web.json_response({
'success': True,
'regular_images': regular_images,
'custom_images': new_custom_images,
'model_file_path': model_data.get('file_path', '')
})
except Exception as e:
logger.error(f"Failed to delete custom example image: {e}", exc_info=True)
return web.json_response({
'success': False,
'error': str(e)
}, status=500)

View File

@@ -63,199 +63,4 @@ def find_preview_file(base_name: str, dir_path: str) -> str:
def normalize_path(path: str) -> str:
"""Normalize file path to use forward slashes"""
return path.replace(os.sep, "/") if path else path
async def get_file_info(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]:
"""Get basic file information as a model metadata object"""
# First check if file actually exists and resolve symlinks
try:
real_path = os.path.realpath(file_path)
if not os.path.exists(real_path):
return None
except Exception as e:
logger.error(f"Error checking file existence for {file_path}: {e}")
return None
base_name = os.path.splitext(os.path.basename(file_path))[0]
dir_path = os.path.dirname(file_path)
preview_url = find_preview_file(base_name, dir_path)
# Check if a .json file exists with SHA256 hash to avoid recalculation
json_path = f"{os.path.splitext(file_path)[0]}.json"
sha256 = None
if os.path.exists(json_path):
try:
with open(json_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
if 'sha256' in json_data:
sha256 = json_data['sha256'].lower()
logger.debug(f"Using SHA256 from .json file for {file_path}")
except Exception as e:
logger.error(f"Error reading .json file for {file_path}: {e}")
# If SHA256 is still not found, check for a .sha256 file
if sha256 is None:
sha256_file = f"{os.path.splitext(file_path)[0]}.sha256"
if os.path.exists(sha256_file):
try:
with open(sha256_file, 'r', encoding='utf-8') as f:
sha256 = f.read().strip().lower()
logger.debug(f"Using SHA256 from .sha256 file for {file_path}")
except Exception as e:
logger.error(f"Error reading .sha256 file for {file_path}: {e}")
try:
# If we didn't get SHA256 from the .json file, calculate it
if not sha256:
start_time = time.time()
sha256 = await calculate_sha256(real_path)
logger.debug(f"Calculated SHA256 for {file_path} in {time.time() - start_time:.2f} seconds")
# Create default metadata based on model class
if model_class == CheckpointMetadata:
metadata = CheckpointMetadata(
file_name=base_name,
model_name=base_name,
file_path=normalize_path(file_path),
size=os.path.getsize(real_path),
modified=os.path.getmtime(real_path),
sha256=sha256,
base_model="Unknown", # Will be updated later
preview_url=normalize_path(preview_url),
tags=[],
modelDescription="",
model_type="checkpoint"
)
# Extract checkpoint-specific metadata
# model_info = await extract_checkpoint_metadata(real_path)
# metadata.base_model = model_info['base_model']
# if 'model_type' in model_info:
# metadata.model_type = model_info['model_type']
else: # Default to LoraMetadata
metadata = LoraMetadata(
file_name=base_name,
model_name=base_name,
file_path=normalize_path(file_path),
size=os.path.getsize(real_path),
modified=os.path.getmtime(real_path),
sha256=sha256,
base_model="Unknown", # Will be updated later
usage_tips="{}",
preview_url=normalize_path(preview_url),
tags=[],
modelDescription=""
)
# Extract lora-specific metadata
model_info = await extract_lora_metadata(real_path)
metadata.base_model = model_info['base_model']
# Save metadata to file
await save_metadata(file_path, metadata)
return metadata
except Exception as e:
logger.error(f"Error getting file info for {file_path}: {e}")
return None
async def save_metadata(file_path: str, metadata: BaseModelMetadata) -> None:
"""Save metadata to .metadata.json file"""
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
try:
metadata_dict = metadata.to_dict()
metadata_dict['file_path'] = normalize_path(metadata_dict['file_path'])
metadata_dict['preview_url'] = normalize_path(metadata_dict['preview_url'])
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata_dict, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error saving metadata to {metadata_path}: {str(e)}")
async def load_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]:
"""Load metadata from .metadata.json file"""
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
try:
if os.path.exists(metadata_path):
with open(metadata_path, 'r', encoding='utf-8') as f:
data = json.load(f)
needs_update = False
# Check and normalize base model name
normalized_base_model = determine_base_model(data['base_model'])
if data['base_model'] != normalized_base_model:
data['base_model'] = normalized_base_model
needs_update = True
# Compare paths without extensions
stored_path_base = os.path.splitext(data['file_path'])[0]
current_path_base = os.path.splitext(normalize_path(file_path))[0]
if stored_path_base != current_path_base:
data['file_path'] = normalize_path(file_path)
needs_update = True
# TODO: optimize preview image to webp format if not already done
preview_url = data.get('preview_url', '')
if not preview_url or not os.path.exists(preview_url):
base_name = os.path.splitext(os.path.basename(file_path))[0]
dir_path = os.path.dirname(file_path)
new_preview_url = normalize_path(find_preview_file(base_name, dir_path))
if new_preview_url != preview_url:
data['preview_url'] = new_preview_url
needs_update = True
else:
if stored_path_base != current_path_base:
# If model location changed, update preview path by replacing old path with new path
preview_file = os.path.basename(preview_url)
new_preview_url = os.path.join(os.path.dirname(file_path), preview_file)
data['preview_url'] = normalize_path(new_preview_url)
needs_update = True
# Ensure all fields are present
if 'tags' not in data:
data['tags'] = []
needs_update = True
if 'modelDescription' not in data:
data['modelDescription'] = ""
needs_update = True
# For checkpoint metadata
if model_class == CheckpointMetadata and 'model_type' not in data:
data['model_type'] = "checkpoint"
needs_update = True
# For lora metadata
if model_class == LoraMetadata and 'usage_tips' not in data:
data['usage_tips'] = "{}"
needs_update = True
# Update preview_nsfw_level if needed
civitai_data = data.get('civitai', {})
civitai_images = civitai_data.get('images', []) if civitai_data else []
if (data.get('preview_url') and
data.get('preview_nsfw_level', 0) == 0 and
civitai_images and
civitai_images[0].get('nsfwLevel', 0) != 0):
data['preview_nsfw_level'] = civitai_images[0]['nsfwLevel']
# TODO: write to metadata file
# needs_update = True
if needs_update:
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return model_class.from_dict(data)
except Exception as e:
print(f"Error loading metadata from {metadata_path}: {str(e)}")
return None
async def update_civitai_metadata(file_path: str, civitai_data: Dict) -> None:
"""Update metadata file with Civitai data"""
metadata = await load_metadata(file_path)
metadata['civitai'] = civitai_data
await save_metadata(file_path, metadata)
return path.replace(os.sep, "/") if path else path

View File

@@ -0,0 +1,275 @@
import os
import json
import shutil
import logging
from typing import Dict, Optional, Type, Union
from .models import BaseModelMetadata, LoraMetadata
from .file_utils import normalize_path, find_preview_file, calculate_sha256
from .lora_metadata import extract_lora_metadata, extract_checkpoint_metadata
logger = logging.getLogger(__name__)
class MetadataManager:
"""
Centralized manager for all metadata operations.
This class is responsible for:
1. Loading metadata safely with fallback mechanisms
2. Saving metadata with atomic operations and backups
3. Creating default metadata for models
4. Handling unknown fields gracefully
"""
@staticmethod
async def load_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]:
"""
Load metadata with robust error handling and data preservation.
Args:
file_path: Path to the model file
model_class: Class to instantiate (LoraMetadata, CheckpointMetadata, etc.)
Returns:
BaseModelMetadata instance or None if file doesn't exist
"""
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
backup_path = f"{metadata_path}.bak"
# Try loading the main metadata file
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Create model instance
metadata = model_class.from_dict(data)
# Normalize paths
await MetadataManager._normalize_metadata_paths(metadata, file_path)
return metadata
except json.JSONDecodeError:
# JSON parsing error - try to restore from backup
logger.warning(f"Invalid JSON in metadata file: {metadata_path}")
return await MetadataManager._restore_from_backup(backup_path, file_path, model_class)
except Exception as e:
# Other errors might be due to unknown fields or schema changes
logger.error(f"Error loading metadata from {metadata_path}: {str(e)}")
return await MetadataManager._restore_from_backup(backup_path, file_path, model_class)
return None
@staticmethod
async def _restore_from_backup(backup_path: str, file_path: str, model_class: Type[BaseModelMetadata]) -> Optional[BaseModelMetadata]:
"""
Try to restore metadata from backup file
Args:
backup_path: Path to backup file
file_path: Path to the original model file
model_class: Class to instantiate
Returns:
BaseModelMetadata instance or None if restoration fails
"""
if os.path.exists(backup_path):
try:
logger.info(f"Attempting to restore metadata from backup: {backup_path}")
with open(backup_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Process data similarly to normal loading
metadata = model_class.from_dict(data)
await MetadataManager._normalize_metadata_paths(metadata, file_path)
return metadata
except Exception as e:
logger.error(f"Failed to restore from backup: {str(e)}")
return None
@staticmethod
async def save_metadata(path: str, metadata: Union[BaseModelMetadata, Dict], create_backup: bool = True) -> bool:
"""
Save metadata with atomic write operations and backup creation.
Args:
path: Path to the model file or directly to the metadata file
metadata: Metadata to save (either BaseModelMetadata object or dict)
create_backup: Whether to create a backup of existing file
Returns:
bool: Success or failure
"""
# Determine if the input is a metadata path or a model file path
if path.endswith('.metadata.json'):
metadata_path = path
else:
# Use existing logic for model file paths
file_path = path
metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json"
temp_path = f"{metadata_path}.tmp"
backup_path = f"{metadata_path}.bak"
try:
# Create backup if requested and file exists
if create_backup and os.path.exists(metadata_path):
try:
shutil.copy2(metadata_path, backup_path)
except Exception as e:
logger.warning(f"Failed to create metadata backup: {str(e)}")
# Convert to dict if needed
if isinstance(metadata, BaseModelMetadata):
metadata_dict = metadata.to_dict()
# Preserve unknown fields if present
if hasattr(metadata, '_unknown_fields'):
metadata_dict.update(metadata._unknown_fields)
else:
metadata_dict = metadata.copy()
# Normalize paths
if 'file_path' in metadata_dict:
metadata_dict['file_path'] = normalize_path(metadata_dict['file_path'])
if 'preview_url' in metadata_dict:
metadata_dict['preview_url'] = normalize_path(metadata_dict['preview_url'])
# Write to temporary file first
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(metadata_dict, f, indent=2, ensure_ascii=False)
# Atomic rename operation
os.replace(temp_path, metadata_path)
return True
except Exception as e:
logger.error(f"Error saving metadata to {metadata_path}: {str(e)}")
# Clean up temporary file if it exists
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
return False
@staticmethod
async def create_default_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]:
"""
Create basic metadata structure for a model file.
This replaces the old get_file_info function with a more appropriately named method.
Args:
file_path: Path to the model file
model_class: Class to instantiate
Returns:
BaseModelMetadata instance or None if file doesn't exist
"""
# First check if file actually exists and resolve symlinks
try:
real_path = os.path.realpath(file_path)
if not os.path.exists(real_path):
return None
except Exception as e:
logger.error(f"Error checking file existence for {file_path}: {e}")
return None
try:
base_name = os.path.splitext(os.path.basename(file_path))[0]
dir_path = os.path.dirname(file_path)
# Find preview image
preview_url = find_preview_file(base_name, dir_path)
# Calculate file hash
sha256 = await calculate_sha256(real_path)
# Create instance based on model type
if model_class.__name__ == "CheckpointMetadata":
metadata = model_class(
file_name=base_name,
model_name=base_name,
file_path=normalize_path(file_path),
size=os.path.getsize(real_path),
modified=os.path.getmtime(real_path),
sha256=sha256,
base_model="Unknown",
preview_url=normalize_path(preview_url),
tags=[],
modelDescription="",
model_type="checkpoint",
from_civitai=False
)
else: # Default to LoraMetadata
metadata = model_class(
file_name=base_name,
model_name=base_name,
file_path=normalize_path(file_path),
size=os.path.getsize(real_path),
modified=os.path.getmtime(real_path),
sha256=sha256,
base_model="Unknown",
preview_url=normalize_path(preview_url),
tags=[],
modelDescription="",
from_civitai=False,
usage_tips="{}"
)
# Try to extract model-specific metadata
await MetadataManager._enrich_metadata(metadata, real_path)
# Save the created metadata
await MetadataManager.save_metadata(file_path, metadata, create_backup=False)
return metadata
except Exception as e:
logger.error(f"Error creating default metadata for {file_path}: {e}")
return None
@staticmethod
async def _enrich_metadata(metadata: BaseModelMetadata, file_path: str) -> None:
"""
Enrich metadata with model-specific information
Args:
metadata: Metadata to enrich
file_path: Path to the model file
"""
try:
if metadata.__class__.__name__ == "LoraMetadata":
model_info = await extract_lora_metadata(file_path)
metadata.base_model = model_info['base_model']
elif metadata.__class__.__name__ == "CheckpointMetadata":
model_info = await extract_checkpoint_metadata(file_path)
metadata.base_model = model_info['base_model']
if 'model_type' in model_info:
metadata.model_type = model_info['model_type']
except Exception as e:
logger.error(f"Error enriching metadata: {str(e)}")
@staticmethod
async def _normalize_metadata_paths(metadata: BaseModelMetadata, file_path: str) -> None:
"""
Normalize paths in metadata object
Args:
metadata: Metadata object to update
file_path: Current file path for the model
"""
# Check if file path is different from what's in metadata
if normalize_path(file_path) != metadata.file_path:
metadata.file_path = normalize_path(file_path)
# Check if preview exists at the current location
preview_url = metadata.preview_url
if preview_url and not os.path.exists(preview_url):
base_name = os.path.splitext(os.path.basename(file_path))[0]
dir_path = os.path.dirname(file_path)
new_preview_url = find_preview_file(base_name, dir_path)
if new_preview_url:
metadata.preview_url = normalize_path(new_preview_url)

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass, asdict
from typing import Dict, Optional, List
from dataclasses import dataclass, asdict, field
from typing import Dict, Optional, List, Any
from datetime import datetime
import os
from .model_utils import determine_base_model
@@ -24,6 +24,7 @@ class BaseModelMetadata:
civitai_deleted: bool = False # Whether deleted from Civitai
favorite: bool = False # Whether the model is a favorite
exclude: bool = False # Whether to exclude this model from the cache
_unknown_fields: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False) # Store unknown fields
def __post_init__(self):
# Initialize empty lists to avoid mutable default parameter issue
@@ -34,11 +35,43 @@ class BaseModelMetadata:
def from_dict(cls, data: Dict) -> 'BaseModelMetadata':
"""Create instance from dictionary"""
data_copy = data.copy()
return cls(**data_copy)
# Use cached fields if available, otherwise compute them
if not hasattr(cls, '_known_fields_cache'):
known_fields = set()
for c in cls.mro():
if hasattr(c, '__annotations__'):
known_fields.update(c.__annotations__.keys())
cls._known_fields_cache = known_fields
known_fields = cls._known_fields_cache
# Extract fields that match our class attributes
fields_to_use = {k: v for k, v in data_copy.items() if k in known_fields}
# Store unknown fields separately
unknown_fields = {k: v for k, v in data_copy.items() if k not in known_fields and not k.startswith('_')}
# Create instance with known fields
instance = cls(**fields_to_use)
# Add unknown fields as a separate attribute
instance._unknown_fields = unknown_fields
return instance
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)
result = asdict(self)
# Remove private fields
result = {k: v for k, v in result.items() if not k.startswith('_')}
# Add back unknown fields if they exist
if hasattr(self, '_unknown_fields'):
result.update(self._unknown_fields)
return result
@property
def modified_datetime(self) -> datetime:

View File

@@ -9,6 +9,7 @@ from .constants import PREVIEW_EXTENSIONS, CARD_PREVIEW_WIDTH
from ..config import config
from ..services.civitai_client import CivitaiClient
from ..utils.exif_utils import ExifUtils
from ..utils.metadata_manager import MetadataManager
from ..services.download_manager import DownloadManager
logger = logging.getLogger(__name__)
@@ -32,14 +33,29 @@ class ModelRouteUtils:
async def handle_not_found_on_civitai(metadata_path: str, local_metadata: Dict) -> None:
"""Handle case when model is not found on CivitAI"""
local_metadata['from_civitai'] = False
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(local_metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(metadata_path, local_metadata)
@staticmethod
async def update_model_metadata(metadata_path: str, local_metadata: Dict,
civitai_metadata: Dict, client: CivitaiClient) -> None:
"""Update local metadata with CivitAI data"""
local_metadata['civitai'] = civitai_metadata
# Save existing trainedWords and customImages if they exist
existing_civitai = local_metadata.get('civitai') or {} # Use empty dict if None
# Create a new civitai metadata by updating existing with new
merged_civitai = existing_civitai.copy()
merged_civitai.update(civitai_metadata)
# Special handling for trainedWords - ensure we don't lose any existing trained words
if 'trainedWords' in existing_civitai:
existing_trained_words = existing_civitai.get('trainedWords', [])
new_trained_words = civitai_metadata.get('trainedWords', [])
# Use a set to combine words without duplicates, then convert back to list
merged_trained_words = list(set(existing_trained_words + new_trained_words))
merged_civitai['trainedWords'] = merged_trained_words
# Update local metadata with merged civitai data
local_metadata['civitai'] = merged_civitai
local_metadata['from_civitai'] = True
# Update model name if available
@@ -138,8 +154,7 @@ class ModelRouteUtils:
local_metadata['preview_nsfw_level'] = first_preview.get('nsfwLevel', 0)
# Save updated metadata
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(local_metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(metadata_path, local_metadata)
@staticmethod
async def fetch_and_update_model(
@@ -177,8 +192,7 @@ class ModelRouteUtils:
# Mark as not from CivitAI if not found
local_metadata['from_civitai'] = False
model_data['from_civitai'] = False
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(local_metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(file_path, local_metadata)
return False
# Update metadata
@@ -221,7 +235,7 @@ class ModelRouteUtils:
fields = [
"id", "modelId", "name", "createdAt", "updatedAt",
"publishedAt", "trainedWords", "baseModel", "description",
"model", "images", "creator"
"model", "images", "customImages", "creator"
]
return {k: data[k] for k in fields if k in data}
@@ -270,10 +284,12 @@ class ModelRouteUtils:
@staticmethod
def get_multipart_ext(filename):
"""Get extension that may have multiple parts like .metadata.json"""
"""Get extension that may have multiple parts like .metadata.json or .metadata.json.bak"""
parts = filename.split(".")
if len(parts) > 2: # If contains multi-part extension
if len(parts) == 3: # If contains 2-part extension
return "." + ".".join(parts[-2:]) # Take the last two parts, like ".metadata.json"
elif len(parts) >= 4: # If contains 3-part or more extensions
return "." + ".".join(parts[-3:]) # Take the last three parts, like ".metadata.json.bak"
return os.path.splitext(filename)[1] # Otherwise take the regular extension, like ".safetensors"
# New common endpoint handlers
@@ -393,6 +409,15 @@ class ModelRouteUtils:
raise ValueError("Expected 'model_path' field")
model_path = (await field.read()).decode()
# Read NSFW level (new parameter)
nsfw_level = 0 # Default to 0 (unknown)
field = await reader.next()
if field and field.name == 'nsfw_level':
try:
nsfw_level = int((await field.read()).decode())
except (ValueError, TypeError):
logger.warning("Invalid NSFW level format, using default 0")
# Save preview file
base_name = os.path.splitext(os.path.basename(model_path))[0]
folder = os.path.dirname(model_path)
@@ -413,33 +438,43 @@ class ModelRouteUtils:
)
extension = '.webp' # Use .webp without .preview part
# Delete any existing preview files for this model
for ext in PREVIEW_EXTENSIONS:
existing_preview = os.path.join(folder, base_name + ext)
if os.path.exists(existing_preview):
try:
os.remove(existing_preview)
logger.debug(f"Deleted existing preview: {existing_preview}")
except Exception as e:
logger.warning(f"Failed to delete existing preview {existing_preview}: {e}")
preview_path = os.path.join(folder, base_name + extension).replace(os.sep, '/')
with open(preview_path, 'wb') as f:
f.write(optimized_data)
# Update preview path in metadata
# Update preview path and NSFW level in metadata
metadata_path = os.path.splitext(model_path)[0] + '.metadata.json'
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
# Update preview_url directly in the metadata dict
# Update preview_url and preview_nsfw_level in the metadata dict
metadata['preview_url'] = preview_path
metadata['preview_nsfw_level'] = nsfw_level
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(model_path, metadata)
except Exception as e:
logger.error(f"Error updating metadata: {e}")
# Update preview URL in scanner cache
if hasattr(scanner, 'update_preview_in_cache'):
await scanner.update_preview_in_cache(model_path, preview_path)
await scanner.update_preview_in_cache(model_path, preview_path, nsfw_level)
return web.json_response({
"success": True,
"preview_url": config.get_preview_static_url(preview_path)
"preview_url": config.get_preview_static_url(preview_path),
"preview_nsfw_level": nsfw_level
})
except Exception as e:
@@ -469,8 +504,7 @@ class ModelRouteUtils:
metadata['exclude'] = True
# Save updated metadata
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(file_path, metadata)
# Update cache
cache = await scanner.get_cached_data()
@@ -759,8 +793,7 @@ class ModelRouteUtils:
metadata['sha256'] = actual_hash
# Save updated metadata
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
await MetadataManager.save_metadata(file_path, metadata)
# Update cache
await scanner.update_single_model_cache(file_path, file_path, metadata)