mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-05-06 16:36:45 -03:00
481 lines
20 KiB
Python
481 lines
20 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
from io import BytesIO
|
|
from typing import Any, Optional
|
|
|
|
import piexif
|
|
from PIL import Image, PngImagePlugin
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ExifUtils:
|
|
"""Utility functions for working with EXIF data in images"""
|
|
|
|
@staticmethod
|
|
def _decode_user_comment(user_comment: Any) -> Optional[str]:
|
|
if user_comment is None:
|
|
return None
|
|
if isinstance(user_comment, bytes):
|
|
if user_comment.startswith(b"UNICODE\0"):
|
|
return user_comment[8:].decode("utf-16be", errors="ignore")
|
|
return user_comment.decode("utf-8", errors="ignore")
|
|
if isinstance(user_comment, str):
|
|
return user_comment
|
|
return str(user_comment)
|
|
|
|
@staticmethod
|
|
def _decode_exif_text(value: Any) -> Optional[str]:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, bytes):
|
|
return value.decode("utf-8", errors="ignore")
|
|
if isinstance(value, str):
|
|
return value
|
|
return str(value)
|
|
|
|
@staticmethod
|
|
def _load_structured_metadata(image_path: str) -> dict[str, Optional[str]]:
|
|
metadata = {
|
|
"parameters": None,
|
|
"prompt": None,
|
|
"workflow": None,
|
|
"comment": None,
|
|
}
|
|
|
|
with Image.open(image_path) as img:
|
|
info = getattr(img, "info", {}) or {}
|
|
|
|
if "parameters" in info:
|
|
metadata["parameters"] = info["parameters"]
|
|
if "prompt" in info:
|
|
metadata["prompt"] = info["prompt"]
|
|
if "workflow" in info:
|
|
metadata["workflow"] = info["workflow"]
|
|
|
|
if img.format not in ["JPEG", "TIFF", "WEBP"]:
|
|
exif = img.getexif()
|
|
if exif and piexif.ExifIFD.UserComment in exif:
|
|
metadata["comment"] = ExifUtils._decode_user_comment(
|
|
exif[piexif.ExifIFD.UserComment]
|
|
)
|
|
|
|
try:
|
|
exif_dict = piexif.load(image_path)
|
|
except Exception as e:
|
|
logger.debug(f"Error loading EXIF data: {e}")
|
|
exif_dict = {}
|
|
|
|
if piexif.ExifIFD.UserComment in exif_dict.get("Exif", {}):
|
|
metadata["comment"] = ExifUtils._decode_user_comment(
|
|
exif_dict["Exif"][piexif.ExifIFD.UserComment]
|
|
)
|
|
|
|
image_description = ExifUtils._decode_exif_text(
|
|
exif_dict.get("0th", {}).get(piexif.ImageIFD.ImageDescription)
|
|
)
|
|
if image_description:
|
|
if image_description.startswith("Workflow:"):
|
|
metadata["workflow"] = image_description[len("Workflow:") :]
|
|
elif not metadata["prompt"]:
|
|
metadata["prompt"] = image_description
|
|
|
|
if not metadata["parameters"] and metadata["comment"]:
|
|
metadata["parameters"] = metadata["comment"]
|
|
|
|
return metadata
|
|
|
|
@staticmethod
|
|
def _build_pnginfo(img: Image.Image, metadata_fields: dict[str, Optional[str]]) -> PngImagePlugin.PngInfo:
|
|
png_info = PngImagePlugin.PngInfo()
|
|
existing_info = getattr(img, "info", {}) or {}
|
|
managed_keys = {"parameters", "prompt", "workflow"}
|
|
|
|
for key, value in existing_info.items():
|
|
if key in {"exif", "dpi", "transparency", "gamma", "aspect"}:
|
|
continue
|
|
if key in managed_keys:
|
|
continue
|
|
if isinstance(value, str):
|
|
png_info.add_text(key, value)
|
|
|
|
for key in managed_keys:
|
|
value = metadata_fields.get(key)
|
|
if value:
|
|
png_info.add_text(key, value)
|
|
|
|
return png_info
|
|
|
|
@staticmethod
|
|
def _build_exif_bytes(
|
|
metadata_fields: dict[str, Optional[str]], existing_exif: bytes | None = None
|
|
) -> bytes:
|
|
try:
|
|
exif_dict = piexif.load(existing_exif or b"")
|
|
except Exception:
|
|
exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "Interop": {}, "1st": {}}
|
|
|
|
exif_dict.setdefault("0th", {})
|
|
exif_dict.setdefault("Exif", {})
|
|
|
|
parameters = metadata_fields.get("parameters")
|
|
workflow = metadata_fields.get("workflow")
|
|
prompt = metadata_fields.get("prompt")
|
|
|
|
if parameters:
|
|
exif_dict["Exif"][piexif.ExifIFD.UserComment] = (
|
|
b"UNICODE\0" + parameters.encode("utf-16be")
|
|
)
|
|
else:
|
|
exif_dict["Exif"].pop(piexif.ExifIFD.UserComment, None)
|
|
|
|
if workflow:
|
|
exif_dict["0th"][piexif.ImageIFD.ImageDescription] = f"Workflow:{workflow}"
|
|
elif prompt:
|
|
exif_dict["0th"][piexif.ImageIFD.ImageDescription] = prompt
|
|
else:
|
|
exif_dict["0th"].pop(piexif.ImageIFD.ImageDescription, None)
|
|
|
|
return piexif.dump(exif_dict)
|
|
|
|
@staticmethod
|
|
def extract_image_metadata(image_path: str) -> Optional[str]:
|
|
"""Extract metadata from image including UserComment or parameters field
|
|
|
|
Args:
|
|
image_path (str): Path to the image file
|
|
|
|
Returns:
|
|
Optional[str]: Extracted metadata or None if not found
|
|
"""
|
|
try:
|
|
# Skip for video files
|
|
if image_path:
|
|
ext = os.path.splitext(image_path)[1].lower()
|
|
if ext in ['.mp4', '.webm']:
|
|
return None
|
|
|
|
metadata = ExifUtils._load_structured_metadata(image_path)
|
|
return (
|
|
metadata.get("parameters")
|
|
or metadata.get("prompt")
|
|
or metadata.get("workflow")
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error extracting image metadata: {e}", exc_info=True)
|
|
return None
|
|
|
|
@staticmethod
|
|
def update_image_metadata(image_path: str, metadata: str) -> str:
|
|
"""Update metadata in image's EXIF data or parameters fields
|
|
|
|
Args:
|
|
image_path (str): Path to the image file
|
|
metadata (str): Metadata string to save
|
|
|
|
Returns:
|
|
str: Path to the updated image
|
|
"""
|
|
try:
|
|
# Skip for video files
|
|
if image_path:
|
|
ext = os.path.splitext(image_path)[1].lower()
|
|
if ext in ['.mp4', '.webm']:
|
|
return image_path
|
|
|
|
metadata_fields = ExifUtils._load_structured_metadata(image_path)
|
|
metadata_fields["parameters"] = metadata
|
|
|
|
with Image.open(image_path) as img:
|
|
img_format = img.format
|
|
|
|
if img_format == "PNG":
|
|
png_info = ExifUtils._build_pnginfo(img, metadata_fields)
|
|
img.save(image_path, format="PNG", pnginfo=png_info)
|
|
return image_path
|
|
|
|
exif_bytes = ExifUtils._build_exif_bytes(
|
|
metadata_fields, img.info.get("exif")
|
|
)
|
|
save_kwargs = {"exif": exif_bytes}
|
|
if img_format == "WEBP":
|
|
save_kwargs["quality"] = 85
|
|
|
|
img.save(image_path, format=img_format, **save_kwargs)
|
|
|
|
return image_path
|
|
except Exception as e:
|
|
logger.error(f"Error updating metadata in {image_path}: {e}")
|
|
return image_path
|
|
|
|
@staticmethod
|
|
def append_recipe_metadata(image_path, recipe_data) -> str:
|
|
"""Append recipe metadata to an image's EXIF data"""
|
|
try:
|
|
# Skip for video files
|
|
if image_path:
|
|
ext = os.path.splitext(image_path)[1].lower()
|
|
if ext in ['.mp4', '.webm']:
|
|
return image_path
|
|
|
|
# First, extract existing metadata
|
|
metadata = ExifUtils.extract_image_metadata(image_path)
|
|
|
|
# Check if there's already recipe metadata
|
|
if metadata:
|
|
# Remove any existing recipe metadata
|
|
metadata = ExifUtils.remove_recipe_metadata(metadata)
|
|
|
|
# Prepare checkpoint data
|
|
checkpoint_data = recipe_data.get("checkpoint") or {}
|
|
simplified_checkpoint = None
|
|
if isinstance(checkpoint_data, dict) and checkpoint_data:
|
|
simplified_checkpoint = {
|
|
"type": checkpoint_data.get("type", "checkpoint"),
|
|
"modelId": checkpoint_data.get("modelId", 0),
|
|
"modelVersionId": checkpoint_data.get("modelVersionId")
|
|
or checkpoint_data.get("id", 0),
|
|
"modelName": checkpoint_data.get(
|
|
"modelName", checkpoint_data.get("name", "")
|
|
),
|
|
"modelVersionName": checkpoint_data.get(
|
|
"modelVersionName", checkpoint_data.get("version", "")
|
|
),
|
|
"hash": checkpoint_data.get("hash", "").lower()
|
|
if checkpoint_data.get("hash")
|
|
else "",
|
|
"file_name": checkpoint_data.get("file_name", ""),
|
|
"baseModel": checkpoint_data.get("baseModel", ""),
|
|
}
|
|
|
|
# Prepare simplified loras data
|
|
simplified_loras = []
|
|
for lora in recipe_data.get("loras", []):
|
|
simplified_lora = {
|
|
"file_name": lora.get("file_name", ""),
|
|
"hash": lora.get("hash", "").lower() if lora.get("hash") else "",
|
|
"strength": float(lora.get("strength", 1.0)),
|
|
"modelVersionId": lora.get("modelVersionId", 0),
|
|
"modelName": lora.get("modelName", ""),
|
|
"modelVersionName": lora.get("modelVersionName", ""),
|
|
}
|
|
simplified_loras.append(simplified_lora)
|
|
|
|
# Create recipe metadata JSON
|
|
recipe_metadata = {
|
|
'title': recipe_data.get('title', ''),
|
|
'base_model': recipe_data.get('base_model', ''),
|
|
'loras': simplified_loras,
|
|
'gen_params': recipe_data.get('gen_params', {}),
|
|
'tags': recipe_data.get('tags', []),
|
|
**({'checkpoint': simplified_checkpoint} if simplified_checkpoint else {})
|
|
}
|
|
|
|
# Convert to JSON string
|
|
recipe_metadata_json = json.dumps(recipe_metadata)
|
|
|
|
# Create the recipe metadata marker
|
|
recipe_metadata_marker = f"Recipe metadata: {recipe_metadata_json}"
|
|
|
|
# Append to existing metadata or create new one
|
|
new_metadata = f"{metadata} \n {recipe_metadata_marker}" if metadata else recipe_metadata_marker
|
|
|
|
# Write back to the image
|
|
return ExifUtils.update_image_metadata(image_path, new_metadata)
|
|
except Exception as e:
|
|
logger.error(f"Error appending recipe metadata: {e}", exc_info=True)
|
|
return image_path
|
|
|
|
@staticmethod
|
|
def remove_recipe_metadata(user_comment):
|
|
"""Remove recipe metadata from user comment"""
|
|
if not user_comment:
|
|
return ""
|
|
|
|
# Find the recipe metadata marker
|
|
recipe_marker_index = user_comment.find("Recipe metadata: ")
|
|
if recipe_marker_index == -1:
|
|
return user_comment
|
|
|
|
# If recipe metadata is not at the start, remove the preceding ", "
|
|
if recipe_marker_index >= 2 and user_comment[recipe_marker_index-2:recipe_marker_index] == ", ":
|
|
recipe_marker_index -= 2
|
|
|
|
# Remove the recipe metadata part
|
|
# First, find where the metadata ends (next line or end of string)
|
|
next_line_index = user_comment.find("\n", recipe_marker_index)
|
|
if next_line_index == -1:
|
|
# Metadata is at the end of the string
|
|
return user_comment[:recipe_marker_index].rstrip()
|
|
else:
|
|
# Metadata is in the middle of the string
|
|
return user_comment[:recipe_marker_index] + user_comment[next_line_index:]
|
|
|
|
@staticmethod
|
|
def optimize_image(image_data, target_width=250, format='webp', quality=85, preserve_metadata=False):
|
|
"""
|
|
Optimize an image by resizing and converting to WebP format
|
|
|
|
Args:
|
|
image_data: Binary image data or path to image file
|
|
target_width: Width to resize the image to (preserves aspect ratio)
|
|
format: Output format (default: webp)
|
|
quality: Output quality (0-100)
|
|
preserve_metadata: Whether to preserve EXIF metadata
|
|
|
|
Returns:
|
|
Tuple of (optimized_image_data, extension)
|
|
"""
|
|
try:
|
|
# Skip for video files early if it's a file path
|
|
if isinstance(image_data, str) and os.path.exists(image_data):
|
|
ext = os.path.splitext(image_data)[1].lower()
|
|
if ext in ['.mp4', '.webm']:
|
|
try:
|
|
with open(image_data, 'rb') as f:
|
|
return f.read(), ext
|
|
except Exception:
|
|
return image_data, ext
|
|
|
|
# First validate the image data is usable
|
|
img = None
|
|
if isinstance(image_data, str) and os.path.exists(image_data):
|
|
# It's a file path - validate file
|
|
try:
|
|
with Image.open(image_data) as test_img:
|
|
# Verify the image can be fully loaded by accessing its size
|
|
width, height = test_img.size
|
|
# If we got here, the image is valid
|
|
img = Image.open(image_data)
|
|
except (IOError, OSError) as e:
|
|
logger.error(f"Invalid or corrupt image file: {image_data}: {e}")
|
|
raise ValueError(f"Cannot process corrupt image: {e}")
|
|
else:
|
|
# It's binary data - validate data
|
|
try:
|
|
with BytesIO(image_data) as temp_buf:
|
|
test_img = Image.open(temp_buf)
|
|
# Verify the image can be fully loaded
|
|
width, height = test_img.size
|
|
# If successful, reopen for processing
|
|
img = Image.open(BytesIO(image_data))
|
|
except Exception as e:
|
|
logger.error(f"Invalid binary image data: {e}")
|
|
raise ValueError(f"Cannot process corrupt image data: {e}")
|
|
|
|
# Extract metadata if needed and valid
|
|
metadata_fields = None
|
|
if preserve_metadata:
|
|
try:
|
|
if isinstance(image_data, str) and os.path.exists(image_data):
|
|
# For file path, extract directly
|
|
metadata_fields = ExifUtils._load_structured_metadata(image_data)
|
|
else:
|
|
# For binary data, save to temp file first
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_file:
|
|
temp_path = temp_file.name
|
|
temp_file.write(image_data)
|
|
try:
|
|
metadata_fields = ExifUtils._load_structured_metadata(temp_path)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract metadata from temp file: {e}")
|
|
finally:
|
|
# Clean up temp file
|
|
try:
|
|
os.unlink(temp_path)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract metadata, continuing without it: {e}")
|
|
# Continue without metadata
|
|
|
|
# Calculate new height to maintain aspect ratio
|
|
width, height = img.size
|
|
new_height = int(height * (target_width / width))
|
|
|
|
# Resize the image with error handling
|
|
try:
|
|
resized_img = img.resize((target_width, new_height), Image.LANCZOS)
|
|
except Exception as e:
|
|
logger.error(f"Failed to resize image: {e}")
|
|
# Return original image if resize fails
|
|
return image_data, '.jpg' if not isinstance(image_data, str) else os.path.splitext(image_data)[1]
|
|
|
|
# Save to BytesIO in the specified format
|
|
output = BytesIO()
|
|
|
|
# Set format and extension
|
|
if format.lower() == 'webp':
|
|
save_format, extension = 'WEBP', '.webp'
|
|
elif format.lower() in ('jpg', 'jpeg'):
|
|
save_format, extension = 'JPEG', '.jpg'
|
|
elif format.lower() == 'png':
|
|
save_format, extension = 'PNG', '.png'
|
|
else:
|
|
save_format, extension = 'WEBP', '.webp'
|
|
|
|
# Save with error handling
|
|
try:
|
|
if save_format == 'PNG':
|
|
resized_img.save(output, format=save_format, optimize=True)
|
|
else:
|
|
resized_img.save(output, format=save_format, quality=quality)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save optimized image: {e}")
|
|
# Return original image if save fails
|
|
return image_data, '.jpg' if not isinstance(image_data, str) else os.path.splitext(image_data)[1]
|
|
|
|
# Get the optimized image data
|
|
optimized_data = output.getvalue()
|
|
|
|
# Handle metadata preservation if requested and available
|
|
if preserve_metadata and metadata_fields:
|
|
try:
|
|
if save_format == 'WEBP':
|
|
# For WebP format, directly save with metadata
|
|
try:
|
|
output_with_metadata = BytesIO()
|
|
exif_bytes = ExifUtils._build_exif_bytes(metadata_fields)
|
|
resized_img.save(output_with_metadata, format='WEBP', exif=exif_bytes, quality=quality)
|
|
optimized_data = output_with_metadata.getvalue()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to add metadata to WebP, continuing without it: {e}")
|
|
else:
|
|
# For other formats, use temporary file
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(suffix=extension, delete=False) as temp_file:
|
|
temp_path = temp_file.name
|
|
temp_file.write(optimized_data)
|
|
|
|
try:
|
|
ExifUtils.update_image_metadata(
|
|
temp_path, metadata_fields.get("parameters") or ""
|
|
)
|
|
# Read back the file
|
|
with open(temp_path, 'rb') as f:
|
|
optimized_data = f.read()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to add metadata to image, continuing without it: {e}")
|
|
finally:
|
|
# Clean up temp file
|
|
try:
|
|
os.unlink(temp_path)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.warning(f"Failed to preserve metadata: {e}, continuing with unmodified output")
|
|
|
|
return optimized_data, extension
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error optimizing image: {e}", exc_info=True)
|
|
# Return original data if optimization completely fails
|
|
if isinstance(image_data, str) and os.path.exists(image_data):
|
|
try:
|
|
with open(image_data, 'rb') as f:
|
|
return f.read(), os.path.splitext(image_data)[1]
|
|
except Exception:
|
|
return image_data, '.jpg' # Last resort fallback
|
|
return image_data, '.jpg'
|