mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
275 lines
12 KiB
Python
275 lines
12 KiB
Python
import logging
|
|
import os
|
|
from ..utils.metadata_manager import MetadataManager
|
|
from ..utils.routes_common import ModelRouteUtils
|
|
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MetadataUpdater:
|
|
"""Handles updating model metadata related to example images"""
|
|
|
|
@staticmethod
|
|
async def refresh_model_metadata(model_hash, model_name, scanner_type, scanner):
|
|
"""Refresh model metadata from CivitAI
|
|
|
|
Args:
|
|
model_hash: SHA256 hash of the model
|
|
model_name: Model name (for logging)
|
|
scanner_type: Scanner type ('lora' or 'checkpoint')
|
|
scanner: Scanner instance for this model type
|
|
|
|
Returns:
|
|
bool: True if metadata was successfully refreshed, False otherwise
|
|
"""
|
|
from ..utils.example_images_download_manager import download_progress
|
|
|
|
try:
|
|
# Find the model in the scanner cache
|
|
cache = await scanner.get_cached_data()
|
|
model_data = None
|
|
|
|
for item in cache.raw_data:
|
|
if item.get('sha256') == model_hash:
|
|
model_data = item
|
|
break
|
|
|
|
if not model_data:
|
|
logger.warning(f"Model {model_name} with hash {model_hash} not found in cache")
|
|
return False
|
|
|
|
file_path = model_data.get('file_path')
|
|
if not file_path:
|
|
logger.warning(f"Model {model_name} has no file path")
|
|
return False
|
|
|
|
# Track that we're refreshing this model
|
|
download_progress['refreshed_models'].add(model_hash)
|
|
|
|
# Use ModelRouteUtils to refresh metadata
|
|
async def update_cache_func(old_path, new_path, metadata):
|
|
return await scanner.update_single_model_cache(old_path, new_path, metadata)
|
|
|
|
success = await ModelRouteUtils.fetch_and_update_model(
|
|
model_hash,
|
|
file_path,
|
|
model_data,
|
|
update_cache_func
|
|
)
|
|
|
|
if success:
|
|
logger.info(f"Successfully refreshed metadata for {model_name}")
|
|
return True
|
|
else:
|
|
logger.warning(f"Failed to refresh metadata for {model_name}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error refreshing metadata for {model_name}: {str(e)}"
|
|
logger.error(error_msg, exc_info=True)
|
|
download_progress['errors'].append(error_msg)
|
|
download_progress['last_error'] = error_msg
|
|
return False
|
|
|
|
@staticmethod
|
|
async def get_updated_model(model_hash, scanner):
|
|
"""Get updated model data
|
|
|
|
Args:
|
|
model_hash: SHA256 hash of the model
|
|
scanner: Scanner instance
|
|
|
|
Returns:
|
|
dict: Updated model data or None if not found
|
|
"""
|
|
cache = await scanner.get_cached_data()
|
|
for item in cache.raw_data:
|
|
if item.get('sha256') == model_hash:
|
|
return item
|
|
return None
|
|
|
|
@staticmethod
|
|
async def update_metadata_from_local_examples(model_hash, model, scanner_type, scanner, model_dir):
|
|
"""Update model metadata with local example image information
|
|
|
|
Args:
|
|
model_hash: SHA256 hash of the model
|
|
model: Model data dictionary
|
|
scanner_type: Scanner type ('lora' or 'checkpoint')
|
|
scanner: Scanner instance for this model type
|
|
model_dir: Model images directory
|
|
|
|
Returns:
|
|
bool: True if metadata was successfully updated, False otherwise
|
|
"""
|
|
try:
|
|
# Collect local image paths
|
|
local_images_paths = []
|
|
if os.path.exists(model_dir):
|
|
for file in os.listdir(model_dir):
|
|
file_path = os.path.join(model_dir, file)
|
|
if os.path.isfile(file_path):
|
|
file_ext = os.path.splitext(file)[1].lower()
|
|
is_supported = (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
|
|
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos'])
|
|
if is_supported:
|
|
local_images_paths.append(file_path)
|
|
|
|
# Check if metadata update is needed (no civitai field or empty images)
|
|
needs_update = not model.get('civitai') or not model.get('civitai', {}).get('images')
|
|
|
|
if needs_update and local_images_paths:
|
|
logger.debug(f"Found {len(local_images_paths)} local example images for {model.get('model_name')}, updating metadata")
|
|
|
|
# Create or get civitai field
|
|
if not model.get('civitai'):
|
|
model['civitai'] = {}
|
|
|
|
# Create images array
|
|
images = []
|
|
|
|
# Generate metadata for each local image/video
|
|
for path in local_images_paths:
|
|
# Determine if video or image
|
|
file_ext = os.path.splitext(path)[1].lower()
|
|
is_video = file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
|
|
|
|
# Create image metadata entry
|
|
image_entry = {
|
|
"url": "", # Empty URL as required
|
|
"nsfwLevel": 0,
|
|
"width": 720, # Default dimensions
|
|
"height": 1280,
|
|
"type": "video" if is_video else "image",
|
|
"meta": None,
|
|
"hasMeta": False,
|
|
"hasPositivePrompt": False
|
|
}
|
|
|
|
# If it's an image, try to get actual dimensions (optional enhancement)
|
|
try:
|
|
from PIL import Image
|
|
if not is_video and os.path.exists(path):
|
|
with Image.open(path) as img:
|
|
image_entry["width"], image_entry["height"] = img.size
|
|
except:
|
|
# If PIL fails or is unavailable, use default dimensions
|
|
pass
|
|
|
|
images.append(image_entry)
|
|
|
|
# Update the model's civitai.images field
|
|
model['civitai']['images'] = images
|
|
|
|
# Save metadata to .metadata.json file
|
|
file_path = model.get('file_path')
|
|
try:
|
|
# Create a copy of model data without 'folder' field
|
|
model_copy = model.copy()
|
|
model_copy.pop('folder', None)
|
|
|
|
# Write metadata to file
|
|
await MetadataManager.save_metadata(file_path, model_copy)
|
|
logger.info(f"Saved metadata for {model.get('model_name')}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save metadata for {model.get('model_name')}: {str(e)}")
|
|
|
|
# Save updated metadata to scanner cache
|
|
success = await scanner.update_single_model_cache(file_path, file_path, model)
|
|
if success:
|
|
logger.info(f"Successfully updated metadata for {model.get('model_name')} with {len(images)} local examples")
|
|
return True
|
|
else:
|
|
logger.warning(f"Failed to update metadata for {model.get('model_name')}")
|
|
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error updating metadata from local examples: {str(e)}", exc_info=True)
|
|
return False
|
|
|
|
@staticmethod
|
|
async def update_metadata_after_import(model_hash, model_data, scanner, newly_imported_paths):
|
|
"""Update model metadata after importing example images
|
|
|
|
Args:
|
|
model_hash: SHA256 hash of the model
|
|
model_data: Model data dictionary
|
|
scanner: Scanner instance (lora or checkpoint)
|
|
newly_imported_paths: List of paths to newly imported files
|
|
|
|
Returns:
|
|
tuple: (regular_images, custom_images) - Both image arrays
|
|
"""
|
|
try:
|
|
# Ensure civitai field exists in model_data
|
|
if not model_data.get('civitai'):
|
|
model_data['civitai'] = {}
|
|
|
|
# Ensure customImages array exists
|
|
if not model_data['civitai'].get('customImages'):
|
|
model_data['civitai']['customImages'] = []
|
|
|
|
# Get current customImages array
|
|
custom_images = model_data['civitai']['customImages']
|
|
|
|
# Add new image entry for each imported file
|
|
for path_tuple in newly_imported_paths:
|
|
path, short_id = path_tuple
|
|
|
|
# Determine if video or image
|
|
file_ext = os.path.splitext(path)[1].lower()
|
|
is_video = file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
|
|
|
|
# Create image metadata entry
|
|
image_entry = {
|
|
"url": "", # Empty URL as requested
|
|
"id": short_id,
|
|
"nsfwLevel": 0,
|
|
"width": 720, # Default dimensions
|
|
"height": 1280,
|
|
"type": "video" if is_video else "image",
|
|
"meta": None,
|
|
"hasMeta": False,
|
|
"hasPositivePrompt": False
|
|
}
|
|
|
|
# If it's an image, try to get actual dimensions
|
|
try:
|
|
from PIL import Image
|
|
if not is_video and os.path.exists(path):
|
|
with Image.open(path) as img:
|
|
image_entry["width"], image_entry["height"] = img.size
|
|
except:
|
|
# If PIL fails or is unavailable, use default dimensions
|
|
pass
|
|
|
|
# Append to existing customImages array
|
|
custom_images.append(image_entry)
|
|
|
|
# Save metadata to .metadata.json file
|
|
file_path = model_data.get('file_path')
|
|
if file_path:
|
|
try:
|
|
# Create a copy of model data without 'folder' field
|
|
model_copy = model_data.copy()
|
|
model_copy.pop('folder', None)
|
|
|
|
# Write metadata to file
|
|
await MetadataManager.save_metadata(file_path, model_copy)
|
|
logger.info(f"Saved metadata for {model_data.get('model_name')}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save metadata: {str(e)}")
|
|
|
|
# Save updated metadata to scanner cache
|
|
if file_path:
|
|
await scanner.update_single_model_cache(file_path, file_path, model_data)
|
|
|
|
# Get regular images array (might be None)
|
|
regular_images = model_data['civitai'].get('images', [])
|
|
|
|
# Return both image arrays
|
|
return regular_images, custom_images
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update metadata after import: {e}", exc_info=True)
|
|
return [], [] |