Add method to update model metadata from local example images. Fixes #211

This commit is contained in:
Will Miao
2025-06-08 20:10:36 +08:00
parent 4d6ea0236b
commit 7a4139544c

View File

@@ -374,6 +374,91 @@ class ExampleImagesRoutes:
return model_success, False # (success, is_stale_metadata)
@staticmethod
async def _update_model_metadata_from_local_examples(model, local_images_paths, scanner_type, scanner):
"""Update model metadata with local example images information
Args:
model: Model data dictionary
local_images_paths: List of paths to local example images/videos
scanner_type: Type of scanner ('lora' or 'checkpoint')
scanner: Scanner instance for this model type
Returns:
bool: True if metadata was successfully updated, False otherwise
"""
try:
# Check if we need to update metadata (no civitai field or empty images)
needs_update = not model.get('civitai') or not model.get('civitai', {}).get('images')
if needs_update and local_images_paths:
logger.debug(f"Found {len(local_images_paths)} local example images for {model.get('model_name')}, updating metadata")
# Create or get civitai field
if not model.get('civitai'):
model['civitai'] = {}
# Create images array
images = []
# Generate metadata for each local image/video
for path in local_images_paths:
# Determine if it's a video or image
file_ext = os.path.splitext(path)[1].lower()
is_video = file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
# Create image metadata entry
image_entry = {
"url": "", # Empty URL as requested
"nsfwLevel": 0,
"width": 720, # Default dimensions
"height": 1280,
"type": "video" if is_video else "image",
"meta": None,
"hasMeta": False,
"hasPositivePrompt": False
}
# Try to get actual dimensions if it's an image (optional enhancement)
try:
from PIL import Image
if not is_video and os.path.exists(path):
with Image.open(path) as img:
image_entry["width"], image_entry["height"] = img.size
except:
# If PIL fails or isn't available, use default dimensions
pass
images.append(image_entry)
# Update the model's civitai.images field
model['civitai']['images'] = images
# Save metadata to the .metadata.json file
file_path = model.get('file_path')
base_path = os.path.splitext(file_path)[0] # Remove .safetensors extension
metadata_path = f"{base_path}.metadata.json"
try:
# Write the metadata to file
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(model, f, indent=2, ensure_ascii=False)
logger.info(f"Saved metadata to {metadata_path}")
except Exception as e:
logger.error(f"Failed to save metadata to {metadata_path}: {str(e)}")
# Save updated metadata to scanner cache
success = await scanner.update_single_model_cache(file_path, file_path, model)
if success:
logger.info(f"Successfully updated metadata for {model.get('model_name')} with {len(images)} local examples")
return True
else:
logger.warning(f"Failed to update metadata for {model.get('model_name')}")
return False
except Exception as e:
logger.error(f"Error updating metadata from local examples: {str(e)}", exc_info=True)
return False
@staticmethod
async def _process_local_example_images(model_file_path, model_file_name, model_name, model_dir, optimize):
"""Process local example images for a model
@@ -443,6 +528,9 @@ class ExampleImagesRoutes:
with open(save_path, 'wb') as dst_file:
dst_file.write(src_file.read())
# Now check if we need to add this information to the model's metadata
# This is handled externally by the caller with the new method
return True
return False
except Exception as e:
@@ -498,13 +586,13 @@ class ExampleImagesRoutes:
cache = await scanner.get_cached_data()
if cache and cache.raw_data:
for model in cache.raw_data:
# Only process models with images and a valid sha256
if model.get('civitai') and model.get('civitai', {}).get('images') and model.get('sha256'):
# Only process models with a valid sha256 (relaxed condition)
if model.get('sha256'):
all_models.append((scanner_type, model, scanner))
# Update total count
download_progress['total'] = len(all_models)
logger.info(f"Found {download_progress['total']} models with example images")
logger.info(f"Found {download_progress['total']} models to process")
# Process each model
for scanner_type, model, scanner in all_models:
@@ -536,17 +624,9 @@ class ExampleImagesRoutes:
model_dir = os.path.join(output_dir, model_hash)
os.makedirs(model_dir, exist_ok=True)
# Process images for this model
images = model.get('civitai', {}).get('images', [])
if not images:
logger.debug(f"No images found for model: {model_name}")
download_progress['processed_models'].add(model_hash)
download_progress['completed'] += 1
continue
# First check if we have local example images for this model
local_images_processed = False
local_image_paths = []
if model_file_path:
local_images_processed = await ExampleImagesRoutes._process_local_example_images(
model_file_path,
@@ -556,14 +636,34 @@ class ExampleImagesRoutes:
optimize
)
# Collect local image paths for potential metadata update
if local_images_processed:
for file in os.listdir(model_dir):
file_path = os.path.join(model_dir, file)
if os.path.isfile(file_path):
file_ext = os.path.splitext(file)[1].lower()
is_supported = (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos'])
if is_supported:
local_image_paths.append(file_path)
# Update metadata if needed and if we found local images
await ExampleImagesRoutes._update_model_metadata_from_local_examples(
model,
local_image_paths,
scanner_type,
scanner
)
# Mark as successfully processed if all local images were processed
download_progress['processed_models'].add(model_hash)
logger.info(f"Successfully processed local examples for {model_name}")
# If we didn't process local images, download from remote
if not local_images_processed:
# If we didn't process local images, download from remote only if metadata is available
if not local_images_processed and model.get('civitai') and model.get('civitai', {}).get('images'):
# Try to download images
images = model.get('civitai', {}).get('images', [])
model_success, is_stale_metadata = await ExampleImagesRoutes._process_model_images(
model_hash,
model_name,
@@ -860,6 +960,9 @@ class ExampleImagesRoutes:
model_dir = os.path.join(output_dir, model_hash)
os.makedirs(model_dir, exist_ok=True)
# Track local image paths for metadata update
local_image_paths = []
# Migrate each example file
for local_image_path, index in example_files:
# Get file extension
@@ -867,6 +970,9 @@ class ExampleImagesRoutes:
save_filename = f"image_{index}{local_ext}"
save_path = os.path.join(model_dir, save_filename)
# Track all local image paths for potential metadata update
local_image_paths.append(local_image_path)
# Skip if already exists in output directory
if os.path.exists(save_path):
logger.debug(f"File already exists in output: {save_path}")
@@ -884,6 +990,15 @@ class ExampleImagesRoutes:
download_progress['errors'].append(error_msg)
download_progress['last_error'] = error_msg
# Update model metadata if local images were found
if local_image_paths:
await ExampleImagesRoutes._update_model_metadata_from_local_examples(
model,
local_image_paths,
scanner_type,
scanner
)
# Mark this model as processed
download_progress['processed_models'].add(model_hash)