fix(metadata): preserve workflow when recipe images convert to webp

This commit is contained in:
Will Miao
2026-04-25 07:50:51 +08:00
parent e81409bea4
commit cc147a1795
3 changed files with 297 additions and 94 deletions

View File

@@ -1,15 +1,142 @@
import piexif
import json import json
import logging import logging
from typing import Optional
from io import BytesIO
import os import os
from io import BytesIO
from typing import Any, Optional
import piexif
from PIL import Image, PngImagePlugin from PIL import Image, PngImagePlugin
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ExifUtils: class ExifUtils:
"""Utility functions for working with EXIF data in images""" """Utility functions for working with EXIF data in images"""
@staticmethod
def _decode_user_comment(user_comment: Any) -> Optional[str]:
if user_comment is None:
return None
if isinstance(user_comment, bytes):
if user_comment.startswith(b"UNICODE\0"):
return user_comment[8:].decode("utf-16be", errors="ignore")
return user_comment.decode("utf-8", errors="ignore")
if isinstance(user_comment, str):
return user_comment
return str(user_comment)
@staticmethod
def _decode_exif_text(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, bytes):
return value.decode("utf-8", errors="ignore")
if isinstance(value, str):
return value
return str(value)
@staticmethod
def _load_structured_metadata(image_path: str) -> dict[str, Optional[str]]:
metadata = {
"parameters": None,
"prompt": None,
"workflow": None,
"comment": None,
}
with Image.open(image_path) as img:
info = getattr(img, "info", {}) or {}
if "parameters" in info:
metadata["parameters"] = info["parameters"]
if "prompt" in info:
metadata["prompt"] = info["prompt"]
if "workflow" in info:
metadata["workflow"] = info["workflow"]
if img.format not in ["JPEG", "TIFF", "WEBP"]:
exif = img.getexif()
if exif and piexif.ExifIFD.UserComment in exif:
metadata["comment"] = ExifUtils._decode_user_comment(
exif[piexif.ExifIFD.UserComment]
)
try:
exif_dict = piexif.load(image_path)
except Exception as e:
logger.debug(f"Error loading EXIF data: {e}")
exif_dict = {}
if piexif.ExifIFD.UserComment in exif_dict.get("Exif", {}):
metadata["comment"] = ExifUtils._decode_user_comment(
exif_dict["Exif"][piexif.ExifIFD.UserComment]
)
image_description = ExifUtils._decode_exif_text(
exif_dict.get("0th", {}).get(piexif.ImageIFD.ImageDescription)
)
if image_description:
if image_description.startswith("Workflow:"):
metadata["workflow"] = image_description[len("Workflow:") :]
elif not metadata["prompt"]:
metadata["prompt"] = image_description
if not metadata["parameters"] and metadata["comment"]:
metadata["parameters"] = metadata["comment"]
return metadata
@staticmethod
def _build_pnginfo(img: Image.Image, metadata_fields: dict[str, Optional[str]]) -> PngImagePlugin.PngInfo:
png_info = PngImagePlugin.PngInfo()
existing_info = getattr(img, "info", {}) or {}
managed_keys = {"parameters", "prompt", "workflow"}
for key, value in existing_info.items():
if key in {"exif", "dpi", "transparency", "gamma", "aspect"}:
continue
if key in managed_keys:
continue
if isinstance(value, str):
png_info.add_text(key, value)
for key in managed_keys:
value = metadata_fields.get(key)
if value:
png_info.add_text(key, value)
return png_info
@staticmethod
def _build_exif_bytes(
metadata_fields: dict[str, Optional[str]], existing_exif: bytes | None = None
) -> bytes:
try:
exif_dict = piexif.load(existing_exif or b"")
except Exception:
exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "Interop": {}, "1st": {}}
exif_dict.setdefault("0th", {})
exif_dict.setdefault("Exif", {})
parameters = metadata_fields.get("parameters")
workflow = metadata_fields.get("workflow")
prompt = metadata_fields.get("prompt")
if parameters:
exif_dict["Exif"][piexif.ExifIFD.UserComment] = (
b"UNICODE\0" + parameters.encode("utf-16be")
)
else:
exif_dict["Exif"].pop(piexif.ExifIFD.UserComment, None)
if workflow:
exif_dict["0th"][piexif.ImageIFD.ImageDescription] = f"Workflow:{workflow}"
elif prompt:
exif_dict["0th"][piexif.ImageIFD.ImageDescription] = prompt
else:
exif_dict["0th"].pop(piexif.ImageIFD.ImageDescription, None)
return piexif.dump(exif_dict)
@staticmethod @staticmethod
def extract_image_metadata(image_path: str) -> Optional[str]: def extract_image_metadata(image_path: str) -> Optional[str]:
@@ -28,48 +155,12 @@ class ExifUtils:
if ext in ['.mp4', '.webm']: if ext in ['.mp4', '.webm']:
return None return None
# First try to open the image metadata = ExifUtils._load_structured_metadata(image_path)
with Image.open(image_path) as img: return (
# Method 1: Check for parameters in image info metadata.get("parameters")
if hasattr(img, 'info') and 'parameters' in img.info: or metadata.get("prompt")
return img.info['parameters'] or metadata.get("workflow")
)
# Method 2: Check EXIF UserComment field
if img.format not in ['JPEG', 'TIFF', 'WEBP']:
# For non-JPEG/TIFF/WEBP images, try to get EXIF through PIL
exif = img.getexif()
if exif and piexif.ExifIFD.UserComment in exif:
user_comment = exif[piexif.ExifIFD.UserComment]
if isinstance(user_comment, bytes):
if user_comment.startswith(b'UNICODE\0'):
return user_comment[8:].decode('utf-16be')
return user_comment.decode('utf-8', errors='ignore')
return user_comment
# For JPEG/TIFF/WEBP, use piexif
try:
exif_dict = piexif.load(image_path)
if piexif.ExifIFD.UserComment in exif_dict.get('Exif', {}):
user_comment = exif_dict['Exif'][piexif.ExifIFD.UserComment]
if isinstance(user_comment, bytes):
if user_comment.startswith(b'UNICODE\0'):
user_comment = user_comment[8:].decode('utf-16be')
else:
user_comment = user_comment.decode('utf-8', errors='ignore')
return user_comment
except Exception as e:
logger.debug(f"Error loading EXIF data: {e}")
# Method 3: Check PNG metadata for workflow info (for ComfyUI images)
if img.format == 'PNG':
# Look for workflow or prompt metadata in PNG chunks
for key in img.info:
if key in ['workflow', 'prompt', 'parameters']:
return img.info[key]
return None
except Exception as e: except Exception as e:
logger.error(f"Error extracting image metadata: {e}", exc_info=True) logger.error(f"Error extracting image metadata: {e}", exc_info=True)
return None return None
@@ -92,50 +183,26 @@ class ExifUtils:
if ext in ['.mp4', '.webm']: if ext in ['.mp4', '.webm']:
return image_path return image_path
# Load the image and check its format metadata_fields = ExifUtils._load_structured_metadata(image_path)
metadata_fields["parameters"] = metadata
with Image.open(image_path) as img: with Image.open(image_path) as img:
img_format = img.format img_format = img.format
# For PNG, try to update parameters directly if img_format == "PNG":
if img_format == 'PNG': png_info = ExifUtils._build_pnginfo(img, metadata_fields)
# Use PngInfo instead of plain dictionary img.save(image_path, format="PNG", pnginfo=png_info)
png_info = PngImagePlugin.PngInfo()
png_info.add_text("parameters", metadata)
img.save(image_path, format='PNG', pnginfo=png_info)
return image_path return image_path
# For WebP format, use PIL's exif parameter directly exif_bytes = ExifUtils._build_exif_bytes(
elif img_format == 'WEBP': metadata_fields, img.info.get("exif")
exif_dict = {'Exif': {piexif.ExifIFD.UserComment: b'UNICODE\0' + metadata.encode('utf-16be')}} )
exif_bytes = piexif.dump(exif_dict) save_kwargs = {"exif": exif_bytes}
if img_format == "WEBP":
# Save with the exif data save_kwargs["quality"] = 85
img.save(image_path, format='WEBP', exif=exif_bytes, quality=85)
return image_path img.save(image_path, format=img_format, **save_kwargs)
# For other formats, use standard EXIF approach
else:
try:
exif_dict = piexif.load(img.info.get('exif', b''))
except:
exif_dict = {'0th':{}, 'Exif':{}, 'GPS':{}, 'Interop':{}, '1st':{}}
# If no Exif dictionary exists, create one
if 'Exif' not in exif_dict:
exif_dict['Exif'] = {}
# Update the UserComment field - use UNICODE format
unicode_bytes = metadata.encode('utf-16be')
metadata_bytes = b'UNICODE\0' + unicode_bytes
exif_dict['Exif'][piexif.ExifIFD.UserComment] = metadata_bytes
# Convert EXIF dict back to bytes
exif_bytes = piexif.dump(exif_dict)
# Save the image with updated EXIF data
img.save(image_path, exif=exif_bytes)
return image_path return image_path
except Exception as e: except Exception as e:
logger.error(f"Error updating metadata in {image_path}: {e}") logger.error(f"Error updating metadata in {image_path}: {e}")
@@ -297,12 +364,12 @@ class ExifUtils:
raise ValueError(f"Cannot process corrupt image data: {e}") raise ValueError(f"Cannot process corrupt image data: {e}")
# Extract metadata if needed and valid # Extract metadata if needed and valid
metadata = None metadata_fields = None
if preserve_metadata: if preserve_metadata:
try: try:
if isinstance(image_data, str) and os.path.exists(image_data): if isinstance(image_data, str) and os.path.exists(image_data):
# For file path, extract directly # For file path, extract directly
metadata = ExifUtils.extract_image_metadata(image_data) metadata_fields = ExifUtils._load_structured_metadata(image_data)
else: else:
# For binary data, save to temp file first # For binary data, save to temp file first
import tempfile import tempfile
@@ -310,7 +377,7 @@ class ExifUtils:
temp_path = temp_file.name temp_path = temp_file.name
temp_file.write(image_data) temp_file.write(image_data)
try: try:
metadata = ExifUtils.extract_image_metadata(temp_path) metadata_fields = ExifUtils._load_structured_metadata(temp_path)
except Exception as e: except Exception as e:
logger.warning(f"Failed to extract metadata from temp file: {e}") logger.warning(f"Failed to extract metadata from temp file: {e}")
finally: finally:
@@ -363,14 +430,13 @@ class ExifUtils:
optimized_data = output.getvalue() optimized_data = output.getvalue()
# Handle metadata preservation if requested and available # Handle metadata preservation if requested and available
if preserve_metadata and metadata: if preserve_metadata and metadata_fields:
try: try:
if save_format == 'WEBP': if save_format == 'WEBP':
# For WebP format, directly save with metadata # For WebP format, directly save with metadata
try: try:
output_with_metadata = BytesIO() output_with_metadata = BytesIO()
exif_dict = {'Exif': {piexif.ExifIFD.UserComment: b'UNICODE\0' + metadata.encode('utf-16be')}} exif_bytes = ExifUtils._build_exif_bytes(metadata_fields)
exif_bytes = piexif.dump(exif_dict)
resized_img.save(output_with_metadata, format='WEBP', exif=exif_bytes, quality=quality) resized_img.save(output_with_metadata, format='WEBP', exif=exif_bytes, quality=quality)
optimized_data = output_with_metadata.getvalue() optimized_data = output_with_metadata.getvalue()
except Exception as e: except Exception as e:
@@ -383,8 +449,9 @@ class ExifUtils:
temp_file.write(optimized_data) temp_file.write(optimized_data)
try: try:
# Add metadata ExifUtils.update_image_metadata(
ExifUtils.update_image_metadata(temp_path, metadata) temp_path, metadata_fields.get("parameters") or ""
)
# Read back the file # Read back the file
with open(temp_path, 'rb') as f: with open(temp_path, 'rb') as f:
optimized_data = f.read() optimized_data = f.read()

View File

@@ -1,10 +1,13 @@
import json import json
import logging import logging
import os import os
from io import BytesIO
from pathlib import Path from pathlib import Path
from types import SimpleNamespace from types import SimpleNamespace
import piexif
import pytest import pytest
from PIL import Image, PngImagePlugin
from py.services.recipes.analysis_service import RecipeAnalysisService from py.services.recipes.analysis_service import RecipeAnalysisService
from py.services.recipes.errors import ( from py.services.recipes.errors import (
@@ -13,6 +16,7 @@ from py.services.recipes.errors import (
RecipeValidationError, RecipeValidationError,
) )
from py.services.recipes.persistence_service import RecipePersistenceService from py.services.recipes.persistence_service import RecipePersistenceService
from py.utils.exif_utils import ExifUtils
class DummyExifUtils: class DummyExifUtils:
@@ -420,6 +424,56 @@ async def test_save_recipe_derives_allowed_fields_from_raw_metadata(tmp_path):
} }
@pytest.mark.asyncio
async def test_save_recipe_preserves_workflow_when_png_is_converted_to_webp(tmp_path):
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
async def find_recipes_by_fingerprint(self, fingerprint):
return []
async def add_recipe(self, recipe_data):
return None
png_info = PngImagePlugin.PngInfo()
png_info.add_text("parameters", "prompt text\nSteps: 20")
png_info.add_text("workflow", '{"nodes":[{"id":1}]}')
image_buffer = BytesIO()
Image.new("RGB", (96, 48), color="purple").save(
image_buffer, format="PNG", pnginfo=png_info
)
service = RecipePersistenceService(
exif_utils=ExifUtils,
card_preview_width=64,
logger=logging.getLogger("test"),
)
result = await service.save_recipe(
recipe_scanner=DummyScanner(tmp_path),
image_bytes=image_buffer.getvalue(),
image_base64=None,
name="Workflow Recipe",
tags=["workflow"],
metadata={"base_model": "sd", "loras": []},
extension=".png",
)
image_path = Path(result.payload["image_path"])
exif_dict = piexif.load(str(image_path))
assert (
exif_dict["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8")
== 'Workflow:{"nodes":[{"id":1}]}'
)
user_comment = exif_dict["Exif"][piexif.ExifIFD.UserComment]
decoded_comment = user_comment[8:].decode("utf-16be")
assert "prompt text" in decoded_comment
assert "Recipe metadata:" in decoded_comment
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_save_recipe_strips_checkpoint_local_fields(tmp_path): async def test_save_recipe_strips_checkpoint_local_fields(tmp_path):
exif_utils = DummyExifUtils() exif_utils = DummyExifUtils()

View File

@@ -1,5 +1,8 @@
import json import json
import piexif
from PIL import Image, PngImagePlugin
from py.utils.exif_utils import ExifUtils from py.utils.exif_utils import ExifUtils
@@ -59,3 +62,82 @@ def test_append_recipe_metadata_includes_checkpoint(monkeypatch, tmp_path):
"baseModel": "Illustrious", "baseModel": "Illustrious",
} }
assert payload["base_model"] == "Illustrious" assert payload["base_model"] == "Illustrious"
def test_optimize_image_preserves_workflow_when_converting_png_to_webp(tmp_path):
image_path = tmp_path / "source.png"
png_info = PngImagePlugin.PngInfo()
png_info.add_text("parameters", "prompt text\nSteps: 20")
png_info.add_text("workflow", json.dumps({"nodes": [{"id": 1}]}))
Image.new("RGB", (64, 32), color="red").save(image_path, pnginfo=png_info)
optimized_data, extension = ExifUtils.optimize_image(
str(image_path),
target_width=32,
format="webp",
quality=85,
preserve_metadata=True,
)
optimized_path = tmp_path / f"optimized{extension}"
optimized_path.write_bytes(optimized_data)
exif_dict = piexif.load(str(optimized_path))
assert (
exif_dict["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8")
== 'Workflow:{"nodes": [{"id": 1}]}'
)
user_comment = exif_dict["Exif"][piexif.ExifIFD.UserComment]
assert user_comment.startswith(b"UNICODE\0")
assert user_comment[8:].decode("utf-16be") == "prompt text\nSteps: 20"
def test_update_image_metadata_preserves_webp_workflow(tmp_path):
image_path = tmp_path / "recipe.webp"
exif_dict = {
"0th": {
piexif.ImageIFD.ImageDescription: 'Workflow:{"nodes":[{"id":1}]}',
},
"Exif": {
piexif.ExifIFD.UserComment: b"UNICODE\0"
+ "prompt text".encode("utf-16be")
},
}
Image.new("RGB", (32, 32), color="blue").save(
image_path, format="WEBP", exif=piexif.dump(exif_dict), quality=85
)
ExifUtils.update_image_metadata(
str(image_path), 'prompt text\nRecipe metadata: {"title":"recipe"}'
)
updated_exif = piexif.load(str(image_path))
assert (
updated_exif["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8")
== 'Workflow:{"nodes":[{"id":1}]}'
)
updated_comment = updated_exif["Exif"][piexif.ExifIFD.UserComment]
assert (
updated_comment[8:].decode("utf-16be")
== 'prompt text\nRecipe metadata: {"title":"recipe"}'
)
def test_update_image_metadata_preserves_png_workflow(tmp_path):
image_path = tmp_path / "recipe.png"
png_info = PngImagePlugin.PngInfo()
png_info.add_text("parameters", "prompt text")
png_info.add_text("workflow", '{"nodes":[{"id":1}]}')
Image.new("RGB", (32, 32), color="green").save(image_path, pnginfo=png_info)
ExifUtils.update_image_metadata(
str(image_path), 'prompt text\nRecipe metadata: {"title":"recipe"}'
)
with Image.open(image_path) as img:
assert img.info["workflow"] == '{"nodes":[{"id":1}]}'
assert (
img.info["parameters"]
== 'prompt text\nRecipe metadata: {"title":"recipe"}'
)