mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
568 lines
25 KiB
Python
568 lines
25 KiB
Python
import logging
|
|
import os
|
|
import re
|
|
import tempfile
|
|
import random
|
|
import string
|
|
from aiohttp import web
|
|
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
|
from ..services.service_registry import ServiceRegistry
|
|
from ..services.settings_manager import settings
|
|
from .example_images_metadata import MetadataUpdater
|
|
from ..utils.metadata_manager import MetadataManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ExampleImagesProcessor:
|
|
"""Processes and manipulates example images"""
|
|
|
|
@staticmethod
|
|
def generate_short_id(length=8):
|
|
"""Generate a short random alphanumeric identifier"""
|
|
chars = string.ascii_lowercase + string.digits
|
|
return ''.join(random.choice(chars) for _ in range(length))
|
|
|
|
@staticmethod
|
|
def get_civitai_optimized_url(image_url):
|
|
"""Convert Civitai image URL to its optimized WebP version"""
|
|
base_pattern = r'(https://image\.civitai\.com/[^/]+/[^/]+)'
|
|
match = re.match(base_pattern, image_url)
|
|
|
|
if match:
|
|
base_url = match.group(1)
|
|
return f"{base_url}/optimized=true/image.webp"
|
|
|
|
return image_url
|
|
|
|
@staticmethod
|
|
async def download_model_images(model_hash, model_name, model_images, model_dir, optimize, independent_session):
|
|
"""Download images for a single model
|
|
|
|
Returns:
|
|
tuple: (success, is_stale_metadata) - whether download was successful, whether metadata is stale
|
|
"""
|
|
model_success = True
|
|
|
|
for i, image in enumerate(model_images):
|
|
image_url = image.get('url')
|
|
if not image_url:
|
|
continue
|
|
|
|
# Get image filename from URL
|
|
image_filename = os.path.basename(image_url.split('?')[0])
|
|
image_ext = os.path.splitext(image_filename)[1].lower()
|
|
|
|
# Handle images and videos
|
|
is_image = image_ext in SUPPORTED_MEDIA_EXTENSIONS['images']
|
|
is_video = image_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
|
|
|
|
if not (is_image or is_video):
|
|
logger.debug(f"Skipping unsupported file type: {image_filename}")
|
|
continue
|
|
|
|
# Use 0-based indexing instead of 1-based indexing
|
|
save_filename = f"image_{i}{image_ext}"
|
|
|
|
# If optimizing images and this is a Civitai image, use their pre-optimized WebP version
|
|
if is_image and optimize and 'civitai.com' in image_url:
|
|
image_url = ExampleImagesProcessor.get_civitai_optimized_url(image_url)
|
|
save_filename = f"image_{i}.webp"
|
|
|
|
# Check if already downloaded
|
|
save_path = os.path.join(model_dir, save_filename)
|
|
if os.path.exists(save_path):
|
|
logger.debug(f"File already exists: {save_path}")
|
|
continue
|
|
|
|
# Download the file
|
|
try:
|
|
logger.debug(f"Downloading {save_filename} for {model_name}")
|
|
|
|
# Download directly using the independent session
|
|
async with independent_session.get(image_url, timeout=60) as response:
|
|
if response.status == 200:
|
|
with open(save_path, 'wb') as f:
|
|
async for chunk in response.content.iter_chunked(8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
elif response.status == 404:
|
|
error_msg = f"Failed to download file: {image_url}, status code: 404 - Model metadata might be stale"
|
|
logger.warning(error_msg)
|
|
model_success = False # Mark the model as failed due to 404 error
|
|
# Return early to trigger metadata refresh attempt
|
|
return False, True # (success, is_metadata_stale)
|
|
else:
|
|
error_msg = f"Failed to download file: {image_url}, status code: {response.status}"
|
|
logger.warning(error_msg)
|
|
model_success = False # Mark the model as failed
|
|
except Exception as e:
|
|
error_msg = f"Error downloading file {image_url}: {str(e)}"
|
|
logger.error(error_msg)
|
|
model_success = False # Mark the model as failed
|
|
|
|
return model_success, False # (success, is_metadata_stale)
|
|
|
|
@staticmethod
|
|
async def download_model_images_with_tracking(model_hash, model_name, model_images, model_dir, optimize, independent_session):
|
|
"""Download images for a single model with tracking of failed image URLs
|
|
|
|
Returns:
|
|
tuple: (success, is_stale_metadata, failed_images) - whether download was successful, whether metadata is stale, list of failed image URLs
|
|
"""
|
|
model_success = True
|
|
failed_images = []
|
|
|
|
for i, image in enumerate(model_images):
|
|
image_url = image.get('url')
|
|
if not image_url:
|
|
continue
|
|
|
|
# Get image filename from URL
|
|
image_filename = os.path.basename(image_url.split('?')[0])
|
|
image_ext = os.path.splitext(image_filename)[1].lower()
|
|
|
|
# Handle images and videos
|
|
is_image = image_ext in SUPPORTED_MEDIA_EXTENSIONS['images']
|
|
is_video = image_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
|
|
|
|
if not (is_image or is_video):
|
|
logger.debug(f"Skipping unsupported file type: {image_filename}")
|
|
continue
|
|
|
|
# Use 0-based indexing instead of 1-based indexing
|
|
save_filename = f"image_{i}{image_ext}"
|
|
|
|
# If optimizing images and this is a Civitai image, use their pre-optimized WebP version
|
|
if is_image and optimize and 'civitai.com' in image_url:
|
|
image_url = ExampleImagesProcessor.get_civitai_optimized_url(image_url)
|
|
save_filename = f"image_{i}.webp"
|
|
|
|
# Check if already downloaded
|
|
save_path = os.path.join(model_dir, save_filename)
|
|
if os.path.exists(save_path):
|
|
logger.debug(f"File already exists: {save_path}")
|
|
continue
|
|
|
|
# Download the file
|
|
try:
|
|
logger.debug(f"Downloading {save_filename} for {model_name}")
|
|
|
|
# Download directly using the independent session
|
|
async with independent_session.get(image_url, timeout=60) as response:
|
|
if response.status == 200:
|
|
with open(save_path, 'wb') as f:
|
|
async for chunk in response.content.iter_chunked(8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
elif response.status == 404:
|
|
error_msg = f"Failed to download file: {image_url}, status code: 404 - Model metadata might be stale"
|
|
logger.warning(error_msg)
|
|
model_success = False # Mark the model as failed due to 404 error
|
|
failed_images.append(image_url) # Track failed URL
|
|
# Return early to trigger metadata refresh attempt
|
|
return False, True, failed_images # (success, is_metadata_stale, failed_images)
|
|
else:
|
|
error_msg = f"Failed to download file: {image_url}, status code: {response.status}"
|
|
logger.warning(error_msg)
|
|
model_success = False # Mark the model as failed
|
|
failed_images.append(image_url) # Track failed URL
|
|
except Exception as e:
|
|
error_msg = f"Error downloading file {image_url}: {str(e)}"
|
|
logger.error(error_msg)
|
|
model_success = False # Mark the model as failed
|
|
failed_images.append(image_url) # Track failed URL
|
|
|
|
return model_success, False, failed_images # (success, is_metadata_stale, failed_images)
|
|
|
|
@staticmethod
|
|
async def process_local_examples(model_file_path, model_file_name, model_name, model_dir, optimize):
|
|
"""Process local example images
|
|
|
|
Returns:
|
|
bool: True if local images were processed successfully, False otherwise
|
|
"""
|
|
try:
|
|
if not model_file_path or not os.path.exists(os.path.dirname(model_file_path)):
|
|
return False
|
|
|
|
model_dir_path = os.path.dirname(model_file_path)
|
|
local_images = []
|
|
|
|
# Look for files with pattern: filename.example.*.ext
|
|
if model_file_name:
|
|
example_prefix = f"{model_file_name}.example."
|
|
|
|
if os.path.exists(model_dir_path):
|
|
for file in os.listdir(model_dir_path):
|
|
file_lower = file.lower()
|
|
if file_lower.startswith(example_prefix.lower()):
|
|
file_ext = os.path.splitext(file_lower)[1]
|
|
is_supported = (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
|
|
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos'])
|
|
|
|
if is_supported:
|
|
local_images.append(os.path.join(model_dir_path, file))
|
|
|
|
# Process local images if found
|
|
if local_images:
|
|
logger.info(f"Found {len(local_images)} local example images for {model_name}")
|
|
|
|
for local_image_path in local_images:
|
|
# Extract index from filename
|
|
file_name = os.path.basename(local_image_path)
|
|
example_prefix = f"{model_file_name}.example."
|
|
|
|
try:
|
|
# Extract the part between '.example.' and the file extension
|
|
index_part = file_name[len(example_prefix):].split('.')[0]
|
|
# Try to parse it as an integer
|
|
index = int(index_part)
|
|
local_ext = os.path.splitext(local_image_path)[1].lower()
|
|
save_filename = f"image_{index}{local_ext}"
|
|
except (ValueError, IndexError):
|
|
# If we can't parse the index, fall back to sequential numbering
|
|
logger.warning(f"Could not extract index from {file_name}, using sequential numbering")
|
|
local_ext = os.path.splitext(local_image_path)[1].lower()
|
|
save_filename = f"image_{len(local_images)}{local_ext}"
|
|
|
|
save_path = os.path.join(model_dir, save_filename)
|
|
|
|
# Skip if already exists in output directory
|
|
if os.path.exists(save_path):
|
|
logger.debug(f"File already exists in output: {save_path}")
|
|
continue
|
|
|
|
# Copy the file
|
|
with open(local_image_path, 'rb') as src_file:
|
|
with open(save_path, 'wb') as dst_file:
|
|
dst_file.write(src_file.read())
|
|
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error processing local examples for {model_name}: {str(e)}")
|
|
return False
|
|
|
|
@staticmethod
|
|
async def import_images(request):
|
|
"""
|
|
Import local example images
|
|
|
|
Accepts:
|
|
- multipart/form-data form with model_hash and files fields
|
|
or
|
|
- JSON request with model_hash and file_paths
|
|
|
|
Returns:
|
|
- Success status and list of imported files
|
|
"""
|
|
try:
|
|
model_hash = None
|
|
files_to_import = []
|
|
temp_files_to_cleanup = []
|
|
|
|
# Check if it's a multipart form-data request (direct file upload)
|
|
if request.content_type and 'multipart/form-data' in request.content_type:
|
|
reader = await request.multipart()
|
|
|
|
# First get model_hash
|
|
field = await reader.next()
|
|
if field.name == 'model_hash':
|
|
model_hash = await field.text()
|
|
|
|
# Then process all files
|
|
while True:
|
|
field = await reader.next()
|
|
if field is None:
|
|
break
|
|
|
|
if field.name == 'files':
|
|
# Create a temporary file with appropriate suffix for type detection
|
|
file_name = field.filename
|
|
file_ext = os.path.splitext(file_name)[1].lower()
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=file_ext, delete=False) as tmp_file:
|
|
temp_path = tmp_file.name
|
|
temp_files_to_cleanup.append(temp_path) # Track for cleanup
|
|
|
|
# Write chunks to the temporary file
|
|
while True:
|
|
chunk = await field.read_chunk()
|
|
if not chunk:
|
|
break
|
|
tmp_file.write(chunk)
|
|
|
|
# Add to the list of files to process
|
|
files_to_import.append(temp_path)
|
|
else:
|
|
# Parse JSON request (legacy method using file paths)
|
|
data = await request.json()
|
|
model_hash = data.get('model_hash')
|
|
files_to_import = data.get('file_paths', [])
|
|
|
|
if not model_hash:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'Missing model_hash parameter'
|
|
}, status=400)
|
|
|
|
if not files_to_import:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'No files provided to import'
|
|
}, status=400)
|
|
|
|
# Get example images path
|
|
example_images_path = settings.get('example_images_path')
|
|
if not example_images_path:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'No example images path configured'
|
|
}, status=400)
|
|
|
|
# Find the model and get current metadata
|
|
lora_scanner = await ServiceRegistry.get_lora_scanner()
|
|
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
|
|
embedding_scanner = await ServiceRegistry.get_embedding_scanner()
|
|
|
|
model_data = None
|
|
scanner = None
|
|
|
|
# Check both scanners to find the model
|
|
for scan_obj in [lora_scanner, checkpoint_scanner, embedding_scanner]:
|
|
cache = await scan_obj.get_cached_data()
|
|
for item in cache.raw_data:
|
|
if item.get('sha256') == model_hash:
|
|
model_data = item
|
|
scanner = scan_obj
|
|
break
|
|
if model_data:
|
|
break
|
|
|
|
if not model_data:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': f"Model with hash {model_hash} not found in cache"
|
|
}, status=404)
|
|
|
|
# Create model folder
|
|
model_folder = os.path.join(example_images_path, model_hash)
|
|
os.makedirs(model_folder, exist_ok=True)
|
|
|
|
imported_files = []
|
|
errors = []
|
|
newly_imported_paths = []
|
|
|
|
# Process each file path
|
|
for file_path in files_to_import:
|
|
try:
|
|
# Ensure the file exists
|
|
if not os.path.isfile(file_path):
|
|
errors.append(f"File not found: {file_path}")
|
|
continue
|
|
|
|
# Check if file type is supported
|
|
file_ext = os.path.splitext(file_path)[1].lower()
|
|
if not (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
|
|
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']):
|
|
errors.append(f"Unsupported file type: {file_path}")
|
|
continue
|
|
|
|
# Generate new filename using short ID instead of UUID
|
|
short_id = ExampleImagesProcessor.generate_short_id()
|
|
new_filename = f"custom_{short_id}{file_ext}"
|
|
|
|
dest_path = os.path.join(model_folder, new_filename)
|
|
|
|
# Copy the file
|
|
import shutil
|
|
shutil.copy2(file_path, dest_path)
|
|
# Store both the dest_path and the short_id
|
|
newly_imported_paths.append((dest_path, short_id))
|
|
|
|
# Add to imported files list
|
|
imported_files.append({
|
|
'name': new_filename,
|
|
'path': f'/example_images_static/{model_hash}/{new_filename}',
|
|
'extension': file_ext,
|
|
'is_video': file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
|
|
})
|
|
except Exception as e:
|
|
errors.append(f"Error importing {file_path}: {str(e)}")
|
|
|
|
# Update metadata with new example images
|
|
regular_images, custom_images = await MetadataUpdater.update_metadata_after_import(
|
|
model_hash,
|
|
model_data,
|
|
scanner,
|
|
newly_imported_paths
|
|
)
|
|
|
|
return web.json_response({
|
|
'success': len(imported_files) > 0,
|
|
'message': f'Successfully imported {len(imported_files)} files' +
|
|
(f' with {len(errors)} errors' if errors else ''),
|
|
'files': imported_files,
|
|
'errors': errors,
|
|
'regular_images': regular_images,
|
|
'custom_images': custom_images,
|
|
"model_file_path": model_data.get('file_path', ''),
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to import example images: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': str(e)
|
|
}, status=500)
|
|
finally:
|
|
# Clean up temporary files
|
|
for temp_file in temp_files_to_cleanup:
|
|
try:
|
|
os.remove(temp_file)
|
|
except Exception as e:
|
|
logger.error(f"Failed to remove temporary file {temp_file}: {e}")
|
|
|
|
@staticmethod
|
|
async def delete_custom_image(request):
|
|
"""
|
|
Delete a custom example image for a model
|
|
|
|
Accepts:
|
|
- JSON request with model_hash and short_id
|
|
|
|
Returns:
|
|
- Success status and updated image lists
|
|
"""
|
|
try:
|
|
# Parse request data
|
|
data = await request.json()
|
|
model_hash = data.get('model_hash')
|
|
short_id = data.get('short_id')
|
|
|
|
if not model_hash or not short_id:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'Missing required parameters: model_hash and short_id'
|
|
}, status=400)
|
|
|
|
# Get example images path
|
|
example_images_path = settings.get('example_images_path')
|
|
if not example_images_path:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'No example images path configured'
|
|
}, status=400)
|
|
|
|
# Find the model and get current metadata
|
|
lora_scanner = await ServiceRegistry.get_lora_scanner()
|
|
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
|
|
embedding_scanner = await ServiceRegistry.get_embedding_scanner()
|
|
|
|
model_data = None
|
|
scanner = None
|
|
|
|
# Check both scanners to find the model
|
|
for scan_obj in [lora_scanner, checkpoint_scanner, embedding_scanner]:
|
|
if scan_obj.has_hash(model_hash):
|
|
cache = await scan_obj.get_cached_data()
|
|
for item in cache.raw_data:
|
|
if item.get('sha256') == model_hash:
|
|
model_data = item
|
|
scanner = scan_obj
|
|
break
|
|
if model_data:
|
|
break
|
|
|
|
if not model_data:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': f"Model with hash {model_hash} not found in cache"
|
|
}, status=404)
|
|
|
|
# Check if model has custom images
|
|
if not model_data.get('civitai', {}).get('customImages'):
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': f"Model has no custom images"
|
|
}, status=404)
|
|
|
|
# Find the custom image with matching short_id
|
|
custom_images = model_data['civitai']['customImages']
|
|
matching_image = None
|
|
new_custom_images = []
|
|
|
|
for image in custom_images:
|
|
if image.get('id') == short_id:
|
|
matching_image = image
|
|
else:
|
|
new_custom_images.append(image)
|
|
|
|
if not matching_image:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': f"Custom image with id {short_id} not found"
|
|
}, status=404)
|
|
|
|
# Find and delete the actual file
|
|
model_folder = os.path.join(example_images_path, model_hash)
|
|
file_deleted = False
|
|
|
|
if os.path.exists(model_folder):
|
|
for filename in os.listdir(model_folder):
|
|
if f"custom_{short_id}" in filename:
|
|
file_path = os.path.join(model_folder, filename)
|
|
try:
|
|
os.remove(file_path)
|
|
file_deleted = True
|
|
logger.info(f"Deleted custom example file: {file_path}")
|
|
break
|
|
except Exception as e:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': f"Failed to delete file: {str(e)}"
|
|
}, status=500)
|
|
|
|
if not file_deleted:
|
|
logger.warning(f"File for custom example with id {short_id} not found, but metadata will still be updated")
|
|
|
|
# Update metadata
|
|
model_data['civitai']['customImages'] = new_custom_images
|
|
|
|
# Save updated metadata to file
|
|
file_path = model_data.get('file_path')
|
|
if file_path:
|
|
try:
|
|
# Create a copy of model data without 'folder' field
|
|
model_copy = model_data.copy()
|
|
model_copy.pop('folder', None)
|
|
|
|
# Write metadata to file
|
|
await MetadataManager.save_metadata(file_path, model_copy)
|
|
logger.debug(f"Saved updated metadata for {model_data.get('model_name')}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save metadata: {str(e)}")
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': f"Failed to save metadata: {str(e)}"
|
|
}, status=500)
|
|
|
|
# Update cache
|
|
await scanner.update_single_model_cache(file_path, file_path, model_data)
|
|
|
|
# Get regular images array (might be None)
|
|
regular_images = model_data['civitai'].get('images', [])
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'regular_images': regular_images,
|
|
'custom_images': new_custom_images,
|
|
'model_file_path': model_data.get('file_path', '')
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete custom example image: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': str(e)
|
|
}, status=500) |