From cc147a179550114aa59f8b7dbad7d6f5fa9b7b26 Mon Sep 17 00:00:00 2001 From: Will Miao Date: Sat, 25 Apr 2026 07:50:51 +0800 Subject: [PATCH] fix(metadata): preserve workflow when recipe images convert to webp --- py/utils/exif_utils.py | 255 ++++++++++++++++--------- tests/services/test_recipe_services.py | 54 ++++++ tests/utils/test_exif_utils.py | 82 ++++++++ 3 files changed, 297 insertions(+), 94 deletions(-) diff --git a/py/utils/exif_utils.py b/py/utils/exif_utils.py index b8684f99..df0f26d1 100644 --- a/py/utils/exif_utils.py +++ b/py/utils/exif_utils.py @@ -1,15 +1,142 @@ -import piexif import json import logging -from typing import Optional -from io import BytesIO import os +from io import BytesIO +from typing import Any, Optional + +import piexif from PIL import Image, PngImagePlugin logger = logging.getLogger(__name__) class ExifUtils: """Utility functions for working with EXIF data in images""" + + @staticmethod + def _decode_user_comment(user_comment: Any) -> Optional[str]: + if user_comment is None: + return None + if isinstance(user_comment, bytes): + if user_comment.startswith(b"UNICODE\0"): + return user_comment[8:].decode("utf-16be", errors="ignore") + return user_comment.decode("utf-8", errors="ignore") + if isinstance(user_comment, str): + return user_comment + return str(user_comment) + + @staticmethod + def _decode_exif_text(value: Any) -> Optional[str]: + if value is None: + return None + if isinstance(value, bytes): + return value.decode("utf-8", errors="ignore") + if isinstance(value, str): + return value + return str(value) + + @staticmethod + def _load_structured_metadata(image_path: str) -> dict[str, Optional[str]]: + metadata = { + "parameters": None, + "prompt": None, + "workflow": None, + "comment": None, + } + + with Image.open(image_path) as img: + info = getattr(img, "info", {}) or {} + + if "parameters" in info: + metadata["parameters"] = info["parameters"] + if "prompt" in info: + metadata["prompt"] = info["prompt"] + if "workflow" in info: + metadata["workflow"] = info["workflow"] + + if img.format not in ["JPEG", "TIFF", "WEBP"]: + exif = img.getexif() + if exif and piexif.ExifIFD.UserComment in exif: + metadata["comment"] = ExifUtils._decode_user_comment( + exif[piexif.ExifIFD.UserComment] + ) + + try: + exif_dict = piexif.load(image_path) + except Exception as e: + logger.debug(f"Error loading EXIF data: {e}") + exif_dict = {} + + if piexif.ExifIFD.UserComment in exif_dict.get("Exif", {}): + metadata["comment"] = ExifUtils._decode_user_comment( + exif_dict["Exif"][piexif.ExifIFD.UserComment] + ) + + image_description = ExifUtils._decode_exif_text( + exif_dict.get("0th", {}).get(piexif.ImageIFD.ImageDescription) + ) + if image_description: + if image_description.startswith("Workflow:"): + metadata["workflow"] = image_description[len("Workflow:") :] + elif not metadata["prompt"]: + metadata["prompt"] = image_description + + if not metadata["parameters"] and metadata["comment"]: + metadata["parameters"] = metadata["comment"] + + return metadata + + @staticmethod + def _build_pnginfo(img: Image.Image, metadata_fields: dict[str, Optional[str]]) -> PngImagePlugin.PngInfo: + png_info = PngImagePlugin.PngInfo() + existing_info = getattr(img, "info", {}) or {} + managed_keys = {"parameters", "prompt", "workflow"} + + for key, value in existing_info.items(): + if key in {"exif", "dpi", "transparency", "gamma", "aspect"}: + continue + if key in managed_keys: + continue + if isinstance(value, str): + png_info.add_text(key, value) + + for key in managed_keys: + value = metadata_fields.get(key) + if value: + png_info.add_text(key, value) + + return png_info + + @staticmethod + def _build_exif_bytes( + metadata_fields: dict[str, Optional[str]], existing_exif: bytes | None = None + ) -> bytes: + try: + exif_dict = piexif.load(existing_exif or b"") + except Exception: + exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "Interop": {}, "1st": {}} + + exif_dict.setdefault("0th", {}) + exif_dict.setdefault("Exif", {}) + + parameters = metadata_fields.get("parameters") + workflow = metadata_fields.get("workflow") + prompt = metadata_fields.get("prompt") + + if parameters: + exif_dict["Exif"][piexif.ExifIFD.UserComment] = ( + b"UNICODE\0" + parameters.encode("utf-16be") + ) + else: + exif_dict["Exif"].pop(piexif.ExifIFD.UserComment, None) + + if workflow: + exif_dict["0th"][piexif.ImageIFD.ImageDescription] = f"Workflow:{workflow}" + elif prompt: + exif_dict["0th"][piexif.ImageIFD.ImageDescription] = prompt + else: + exif_dict["0th"].pop(piexif.ImageIFD.ImageDescription, None) + + return piexif.dump(exif_dict) @staticmethod def extract_image_metadata(image_path: str) -> Optional[str]: @@ -28,48 +155,12 @@ class ExifUtils: if ext in ['.mp4', '.webm']: return None - # First try to open the image - with Image.open(image_path) as img: - # Method 1: Check for parameters in image info - if hasattr(img, 'info') and 'parameters' in img.info: - return img.info['parameters'] - - # Method 2: Check EXIF UserComment field - if img.format not in ['JPEG', 'TIFF', 'WEBP']: - # For non-JPEG/TIFF/WEBP images, try to get EXIF through PIL - exif = img.getexif() - if exif and piexif.ExifIFD.UserComment in exif: - user_comment = exif[piexif.ExifIFD.UserComment] - if isinstance(user_comment, bytes): - if user_comment.startswith(b'UNICODE\0'): - return user_comment[8:].decode('utf-16be') - return user_comment.decode('utf-8', errors='ignore') - return user_comment - - # For JPEG/TIFF/WEBP, use piexif - try: - exif_dict = piexif.load(image_path) - - if piexif.ExifIFD.UserComment in exif_dict.get('Exif', {}): - user_comment = exif_dict['Exif'][piexif.ExifIFD.UserComment] - if isinstance(user_comment, bytes): - if user_comment.startswith(b'UNICODE\0'): - user_comment = user_comment[8:].decode('utf-16be') - else: - user_comment = user_comment.decode('utf-8', errors='ignore') - return user_comment - except Exception as e: - logger.debug(f"Error loading EXIF data: {e}") - - # Method 3: Check PNG metadata for workflow info (for ComfyUI images) - if img.format == 'PNG': - # Look for workflow or prompt metadata in PNG chunks - for key in img.info: - if key in ['workflow', 'prompt', 'parameters']: - return img.info[key] - - return None - + metadata = ExifUtils._load_structured_metadata(image_path) + return ( + metadata.get("parameters") + or metadata.get("prompt") + or metadata.get("workflow") + ) except Exception as e: logger.error(f"Error extracting image metadata: {e}", exc_info=True) return None @@ -92,50 +183,26 @@ class ExifUtils: if ext in ['.mp4', '.webm']: return image_path - # Load the image and check its format + metadata_fields = ExifUtils._load_structured_metadata(image_path) + metadata_fields["parameters"] = metadata + with Image.open(image_path) as img: img_format = img.format - - # For PNG, try to update parameters directly - if img_format == 'PNG': - # Use PngInfo instead of plain dictionary - png_info = PngImagePlugin.PngInfo() - png_info.add_text("parameters", metadata) - img.save(image_path, format='PNG', pnginfo=png_info) + + if img_format == "PNG": + png_info = ExifUtils._build_pnginfo(img, metadata_fields) + img.save(image_path, format="PNG", pnginfo=png_info) return image_path - - # For WebP format, use PIL's exif parameter directly - elif img_format == 'WEBP': - exif_dict = {'Exif': {piexif.ExifIFD.UserComment: b'UNICODE\0' + metadata.encode('utf-16be')}} - exif_bytes = piexif.dump(exif_dict) - - # Save with the exif data - img.save(image_path, format='WEBP', exif=exif_bytes, quality=85) - return image_path - - # For other formats, use standard EXIF approach - else: - try: - exif_dict = piexif.load(img.info.get('exif', b'')) - except: - exif_dict = {'0th':{}, 'Exif':{}, 'GPS':{}, 'Interop':{}, '1st':{}} - - # If no Exif dictionary exists, create one - if 'Exif' not in exif_dict: - exif_dict['Exif'] = {} - - # Update the UserComment field - use UNICODE format - unicode_bytes = metadata.encode('utf-16be') - metadata_bytes = b'UNICODE\0' + unicode_bytes - - exif_dict['Exif'][piexif.ExifIFD.UserComment] = metadata_bytes - - # Convert EXIF dict back to bytes - exif_bytes = piexif.dump(exif_dict) - - # Save the image with updated EXIF data - img.save(image_path, exif=exif_bytes) - + + exif_bytes = ExifUtils._build_exif_bytes( + metadata_fields, img.info.get("exif") + ) + save_kwargs = {"exif": exif_bytes} + if img_format == "WEBP": + save_kwargs["quality"] = 85 + + img.save(image_path, format=img_format, **save_kwargs) + return image_path except Exception as e: logger.error(f"Error updating metadata in {image_path}: {e}") @@ -297,12 +364,12 @@ class ExifUtils: raise ValueError(f"Cannot process corrupt image data: {e}") # Extract metadata if needed and valid - metadata = None + metadata_fields = None if preserve_metadata: try: if isinstance(image_data, str) and os.path.exists(image_data): # For file path, extract directly - metadata = ExifUtils.extract_image_metadata(image_data) + metadata_fields = ExifUtils._load_structured_metadata(image_data) else: # For binary data, save to temp file first import tempfile @@ -310,7 +377,7 @@ class ExifUtils: temp_path = temp_file.name temp_file.write(image_data) try: - metadata = ExifUtils.extract_image_metadata(temp_path) + metadata_fields = ExifUtils._load_structured_metadata(temp_path) except Exception as e: logger.warning(f"Failed to extract metadata from temp file: {e}") finally: @@ -363,14 +430,13 @@ class ExifUtils: optimized_data = output.getvalue() # Handle metadata preservation if requested and available - if preserve_metadata and metadata: + if preserve_metadata and metadata_fields: try: if save_format == 'WEBP': # For WebP format, directly save with metadata try: output_with_metadata = BytesIO() - exif_dict = {'Exif': {piexif.ExifIFD.UserComment: b'UNICODE\0' + metadata.encode('utf-16be')}} - exif_bytes = piexif.dump(exif_dict) + exif_bytes = ExifUtils._build_exif_bytes(metadata_fields) resized_img.save(output_with_metadata, format='WEBP', exif=exif_bytes, quality=quality) optimized_data = output_with_metadata.getvalue() except Exception as e: @@ -383,8 +449,9 @@ class ExifUtils: temp_file.write(optimized_data) try: - # Add metadata - ExifUtils.update_image_metadata(temp_path, metadata) + ExifUtils.update_image_metadata( + temp_path, metadata_fields.get("parameters") or "" + ) # Read back the file with open(temp_path, 'rb') as f: optimized_data = f.read() diff --git a/tests/services/test_recipe_services.py b/tests/services/test_recipe_services.py index 417183dc..5353d2ee 100644 --- a/tests/services/test_recipe_services.py +++ b/tests/services/test_recipe_services.py @@ -1,10 +1,13 @@ import json import logging import os +from io import BytesIO from pathlib import Path from types import SimpleNamespace +import piexif import pytest +from PIL import Image, PngImagePlugin from py.services.recipes.analysis_service import RecipeAnalysisService from py.services.recipes.errors import ( @@ -13,6 +16,7 @@ from py.services.recipes.errors import ( RecipeValidationError, ) from py.services.recipes.persistence_service import RecipePersistenceService +from py.utils.exif_utils import ExifUtils class DummyExifUtils: @@ -420,6 +424,56 @@ async def test_save_recipe_derives_allowed_fields_from_raw_metadata(tmp_path): } +@pytest.mark.asyncio +async def test_save_recipe_preserves_workflow_when_png_is_converted_to_webp(tmp_path): + class DummyScanner: + def __init__(self, root): + self.recipes_dir = str(root) + + async def find_recipes_by_fingerprint(self, fingerprint): + return [] + + async def add_recipe(self, recipe_data): + return None + + png_info = PngImagePlugin.PngInfo() + png_info.add_text("parameters", "prompt text\nSteps: 20") + png_info.add_text("workflow", '{"nodes":[{"id":1}]}') + + image_buffer = BytesIO() + Image.new("RGB", (96, 48), color="purple").save( + image_buffer, format="PNG", pnginfo=png_info + ) + + service = RecipePersistenceService( + exif_utils=ExifUtils, + card_preview_width=64, + logger=logging.getLogger("test"), + ) + + result = await service.save_recipe( + recipe_scanner=DummyScanner(tmp_path), + image_bytes=image_buffer.getvalue(), + image_base64=None, + name="Workflow Recipe", + tags=["workflow"], + metadata={"base_model": "sd", "loras": []}, + extension=".png", + ) + + image_path = Path(result.payload["image_path"]) + exif_dict = piexif.load(str(image_path)) + assert ( + exif_dict["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8") + == 'Workflow:{"nodes":[{"id":1}]}' + ) + + user_comment = exif_dict["Exif"][piexif.ExifIFD.UserComment] + decoded_comment = user_comment[8:].decode("utf-16be") + assert "prompt text" in decoded_comment + assert "Recipe metadata:" in decoded_comment + + @pytest.mark.asyncio async def test_save_recipe_strips_checkpoint_local_fields(tmp_path): exif_utils = DummyExifUtils() diff --git a/tests/utils/test_exif_utils.py b/tests/utils/test_exif_utils.py index 9e84d7a5..5d22f521 100644 --- a/tests/utils/test_exif_utils.py +++ b/tests/utils/test_exif_utils.py @@ -1,5 +1,8 @@ import json +import piexif +from PIL import Image, PngImagePlugin + from py.utils.exif_utils import ExifUtils @@ -59,3 +62,82 @@ def test_append_recipe_metadata_includes_checkpoint(monkeypatch, tmp_path): "baseModel": "Illustrious", } assert payload["base_model"] == "Illustrious" + + +def test_optimize_image_preserves_workflow_when_converting_png_to_webp(tmp_path): + image_path = tmp_path / "source.png" + png_info = PngImagePlugin.PngInfo() + png_info.add_text("parameters", "prompt text\nSteps: 20") + png_info.add_text("workflow", json.dumps({"nodes": [{"id": 1}]})) + + Image.new("RGB", (64, 32), color="red").save(image_path, pnginfo=png_info) + + optimized_data, extension = ExifUtils.optimize_image( + str(image_path), + target_width=32, + format="webp", + quality=85, + preserve_metadata=True, + ) + + optimized_path = tmp_path / f"optimized{extension}" + optimized_path.write_bytes(optimized_data) + + exif_dict = piexif.load(str(optimized_path)) + assert ( + exif_dict["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8") + == 'Workflow:{"nodes": [{"id": 1}]}' + ) + user_comment = exif_dict["Exif"][piexif.ExifIFD.UserComment] + assert user_comment.startswith(b"UNICODE\0") + assert user_comment[8:].decode("utf-16be") == "prompt text\nSteps: 20" + + +def test_update_image_metadata_preserves_webp_workflow(tmp_path): + image_path = tmp_path / "recipe.webp" + exif_dict = { + "0th": { + piexif.ImageIFD.ImageDescription: 'Workflow:{"nodes":[{"id":1}]}', + }, + "Exif": { + piexif.ExifIFD.UserComment: b"UNICODE\0" + + "prompt text".encode("utf-16be") + }, + } + Image.new("RGB", (32, 32), color="blue").save( + image_path, format="WEBP", exif=piexif.dump(exif_dict), quality=85 + ) + + ExifUtils.update_image_metadata( + str(image_path), 'prompt text\nRecipe metadata: {"title":"recipe"}' + ) + + updated_exif = piexif.load(str(image_path)) + assert ( + updated_exif["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8") + == 'Workflow:{"nodes":[{"id":1}]}' + ) + updated_comment = updated_exif["Exif"][piexif.ExifIFD.UserComment] + assert ( + updated_comment[8:].decode("utf-16be") + == 'prompt text\nRecipe metadata: {"title":"recipe"}' + ) + + +def test_update_image_metadata_preserves_png_workflow(tmp_path): + image_path = tmp_path / "recipe.png" + png_info = PngImagePlugin.PngInfo() + png_info.add_text("parameters", "prompt text") + png_info.add_text("workflow", '{"nodes":[{"id":1}]}') + Image.new("RGB", (32, 32), color="green").save(image_path, pnginfo=png_info) + + ExifUtils.update_image_metadata( + str(image_path), 'prompt text\nRecipe metadata: {"title":"recipe"}' + ) + + with Image.open(image_path) as img: + assert img.info["workflow"] == '{"nodes":[{"id":1}]}' + assert ( + img.info["parameters"] + == 'prompt text\nRecipe metadata: {"title":"recipe"}' + )