Compare commits

...

2 Commits

Author SHA1 Message Date
Will Miao
cc147a1795 fix(metadata): preserve workflow when recipe images convert to webp 2026-04-25 07:50:51 +08:00
Will Miao
e81409bea4 fix(i18n): shorten bulk delete labels 2026-04-25 07:21:42 +08:00
13 changed files with 307 additions and 104 deletions

View File

@@ -668,7 +668,7 @@
"autoOrganize": "Automatisch organisieren",
"skipMetadataRefresh": "Metadaten-Aktualisierung für ausgewählte Modelle überspringen",
"resumeMetadataRefresh": "Metadaten-Aktualisierung für ausgewählte Modelle fortsetzen",
"deleteAll": "Alle Modelle löschen",
"deleteAll": "Ausgewählte löschen",
"downloadMissingLoras": "Fehlende LoRAs herunterladen",
"clear": "Auswahl löschen",
"skipMetadataRefreshCount": "Überspringen{count} Modelle",

View File

@@ -668,7 +668,7 @@
"autoOrganize": "Auto-Organize Selected",
"skipMetadataRefresh": "Skip Metadata Refresh for Selected",
"resumeMetadataRefresh": "Resume Metadata Refresh for Selected",
"deleteAll": "Delete Selected Models",
"deleteAll": "Delete Selected",
"downloadMissingLoras": "Download Missing LoRAs",
"clear": "Clear Selection",
"skipMetadataRefreshCount": "Skip ({count} models)",

View File

@@ -668,7 +668,7 @@
"autoOrganize": "Auto-organizar seleccionados",
"skipMetadataRefresh": "Omitir actualización de metadatos para seleccionados",
"resumeMetadataRefresh": "Reanudar actualización de metadatos para seleccionados",
"deleteAll": "Eliminar todos los modelos",
"deleteAll": "Eliminar seleccionados",
"downloadMissingLoras": "Descargar LoRAs faltantes",
"clear": "Limpiar selección",
"skipMetadataRefreshCount": "Omitir{count} modelos",

View File

@@ -668,7 +668,7 @@
"autoOrganize": "Auto-organiser la sélection",
"skipMetadataRefresh": "Ignorer l'actualisation des métadonnées pour la sélection",
"resumeMetadataRefresh": "Reprendre l'actualisation des métadonnées pour la sélection",
"deleteAll": "Supprimer tous les modèles",
"deleteAll": "Supprimer la sélection",
"downloadMissingLoras": "Télécharger les LoRAs manquants",
"clear": "Effacer la sélection",
"skipMetadataRefreshCount": "Ignorer{count} modèles",

View File

@@ -668,7 +668,7 @@
"autoOrganize": "ארגן אוטומטית נבחרים",
"skipMetadataRefresh": "דילוג על רענון מטא-נתונים לנבחרים",
"resumeMetadataRefresh": "המשך רענון מטא-נתונים לנבחרים",
"deleteAll": "מחק את כל המודלים",
"deleteAll": "מחק נבחרים",
"downloadMissingLoras": "הורדת LoRAs חסרים",
"clear": "נקה בחירה",
"skipMetadataRefreshCount": "דילוג({count} מודלים)",

View File

@@ -668,7 +668,7 @@
"autoOrganize": "自動整理を実行",
"skipMetadataRefresh": "選択したモデルのメタデータ更新をスキップ",
"resumeMetadataRefresh": "選択したモデルのメタデータ更新を再開",
"deleteAll": "すべてのモデルを削除",
"deleteAll": "選択したものを削除",
"downloadMissingLoras": "不足している LoRA をダウンロード",
"clear": "選択をクリア",
"skipMetadataRefreshCount": "スキップ({count}モデル)",

View File

@@ -668,7 +668,7 @@
"autoOrganize": "자동 정리 선택",
"skipMetadataRefresh": "선택한 모델의 메타데이터 새로고침 건너뛰기",
"resumeMetadataRefresh": "선택한 모델의 메타데이터 새로고침 재개",
"deleteAll": "모든 모델 삭제",
"deleteAll": "선택된 항목 삭제",
"downloadMissingLoras": "누락된 LoRA 다운로드",
"clear": "선택 지우기",
"skipMetadataRefreshCount": "건너뛰기({count}개 모델)",

View File

@@ -668,7 +668,7 @@
"autoOrganize": "Автоматически организовать выбранные",
"skipMetadataRefresh": "Пропустить обновление метаданных для выбранных",
"resumeMetadataRefresh": "Возобновить обновление метаданных для выбранных",
"deleteAll": "Удалить все модели",
"deleteAll": "Удалить выбранные",
"downloadMissingLoras": "Скачать отсутствующие LoRAs",
"clear": "Очистить выбор",
"skipMetadataRefreshCount": "Пропустить({count} моделей)",

View File

@@ -668,7 +668,7 @@
"autoOrganize": "自动整理所选模型",
"skipMetadataRefresh": "跳过所选模型的元数据刷新",
"resumeMetadataRefresh": "恢复所选模型的元数据刷新",
"deleteAll": "删除选中模型",
"deleteAll": "删除选",
"downloadMissingLoras": "下载缺失的 LoRAs",
"clear": "清除选择",
"skipMetadataRefreshCount": "跳过({count} 个模型)",

View File

@@ -668,7 +668,7 @@
"autoOrganize": "自動整理所選模型",
"skipMetadataRefresh": "跳過所選模型的元數據更新",
"resumeMetadataRefresh": "恢復所選模型的元數據更新",
"deleteAll": "刪除全部模型",
"deleteAll": "刪除所選",
"downloadMissingLoras": "下載缺失的 LoRAs",
"clear": "清除選取",
"skipMetadataRefreshCount": "跳過({count} 個模型)",

View File

@@ -1,15 +1,142 @@
import piexif
import json
import logging
from typing import Optional
from io import BytesIO
import os
from io import BytesIO
from typing import Any, Optional
import piexif
from PIL import Image, PngImagePlugin
logger = logging.getLogger(__name__)
class ExifUtils:
"""Utility functions for working with EXIF data in images"""
@staticmethod
def _decode_user_comment(user_comment: Any) -> Optional[str]:
if user_comment is None:
return None
if isinstance(user_comment, bytes):
if user_comment.startswith(b"UNICODE\0"):
return user_comment[8:].decode("utf-16be", errors="ignore")
return user_comment.decode("utf-8", errors="ignore")
if isinstance(user_comment, str):
return user_comment
return str(user_comment)
@staticmethod
def _decode_exif_text(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, bytes):
return value.decode("utf-8", errors="ignore")
if isinstance(value, str):
return value
return str(value)
@staticmethod
def _load_structured_metadata(image_path: str) -> dict[str, Optional[str]]:
metadata = {
"parameters": None,
"prompt": None,
"workflow": None,
"comment": None,
}
with Image.open(image_path) as img:
info = getattr(img, "info", {}) or {}
if "parameters" in info:
metadata["parameters"] = info["parameters"]
if "prompt" in info:
metadata["prompt"] = info["prompt"]
if "workflow" in info:
metadata["workflow"] = info["workflow"]
if img.format not in ["JPEG", "TIFF", "WEBP"]:
exif = img.getexif()
if exif and piexif.ExifIFD.UserComment in exif:
metadata["comment"] = ExifUtils._decode_user_comment(
exif[piexif.ExifIFD.UserComment]
)
try:
exif_dict = piexif.load(image_path)
except Exception as e:
logger.debug(f"Error loading EXIF data: {e}")
exif_dict = {}
if piexif.ExifIFD.UserComment in exif_dict.get("Exif", {}):
metadata["comment"] = ExifUtils._decode_user_comment(
exif_dict["Exif"][piexif.ExifIFD.UserComment]
)
image_description = ExifUtils._decode_exif_text(
exif_dict.get("0th", {}).get(piexif.ImageIFD.ImageDescription)
)
if image_description:
if image_description.startswith("Workflow:"):
metadata["workflow"] = image_description[len("Workflow:") :]
elif not metadata["prompt"]:
metadata["prompt"] = image_description
if not metadata["parameters"] and metadata["comment"]:
metadata["parameters"] = metadata["comment"]
return metadata
@staticmethod
def _build_pnginfo(img: Image.Image, metadata_fields: dict[str, Optional[str]]) -> PngImagePlugin.PngInfo:
png_info = PngImagePlugin.PngInfo()
existing_info = getattr(img, "info", {}) or {}
managed_keys = {"parameters", "prompt", "workflow"}
for key, value in existing_info.items():
if key in {"exif", "dpi", "transparency", "gamma", "aspect"}:
continue
if key in managed_keys:
continue
if isinstance(value, str):
png_info.add_text(key, value)
for key in managed_keys:
value = metadata_fields.get(key)
if value:
png_info.add_text(key, value)
return png_info
@staticmethod
def _build_exif_bytes(
metadata_fields: dict[str, Optional[str]], existing_exif: bytes | None = None
) -> bytes:
try:
exif_dict = piexif.load(existing_exif or b"")
except Exception:
exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "Interop": {}, "1st": {}}
exif_dict.setdefault("0th", {})
exif_dict.setdefault("Exif", {})
parameters = metadata_fields.get("parameters")
workflow = metadata_fields.get("workflow")
prompt = metadata_fields.get("prompt")
if parameters:
exif_dict["Exif"][piexif.ExifIFD.UserComment] = (
b"UNICODE\0" + parameters.encode("utf-16be")
)
else:
exif_dict["Exif"].pop(piexif.ExifIFD.UserComment, None)
if workflow:
exif_dict["0th"][piexif.ImageIFD.ImageDescription] = f"Workflow:{workflow}"
elif prompt:
exif_dict["0th"][piexif.ImageIFD.ImageDescription] = prompt
else:
exif_dict["0th"].pop(piexif.ImageIFD.ImageDescription, None)
return piexif.dump(exif_dict)
@staticmethod
def extract_image_metadata(image_path: str) -> Optional[str]:
@@ -28,48 +155,12 @@ class ExifUtils:
if ext in ['.mp4', '.webm']:
return None
# First try to open the image
with Image.open(image_path) as img:
# Method 1: Check for parameters in image info
if hasattr(img, 'info') and 'parameters' in img.info:
return img.info['parameters']
# Method 2: Check EXIF UserComment field
if img.format not in ['JPEG', 'TIFF', 'WEBP']:
# For non-JPEG/TIFF/WEBP images, try to get EXIF through PIL
exif = img.getexif()
if exif and piexif.ExifIFD.UserComment in exif:
user_comment = exif[piexif.ExifIFD.UserComment]
if isinstance(user_comment, bytes):
if user_comment.startswith(b'UNICODE\0'):
return user_comment[8:].decode('utf-16be')
return user_comment.decode('utf-8', errors='ignore')
return user_comment
# For JPEG/TIFF/WEBP, use piexif
try:
exif_dict = piexif.load(image_path)
if piexif.ExifIFD.UserComment in exif_dict.get('Exif', {}):
user_comment = exif_dict['Exif'][piexif.ExifIFD.UserComment]
if isinstance(user_comment, bytes):
if user_comment.startswith(b'UNICODE\0'):
user_comment = user_comment[8:].decode('utf-16be')
else:
user_comment = user_comment.decode('utf-8', errors='ignore')
return user_comment
except Exception as e:
logger.debug(f"Error loading EXIF data: {e}")
# Method 3: Check PNG metadata for workflow info (for ComfyUI images)
if img.format == 'PNG':
# Look for workflow or prompt metadata in PNG chunks
for key in img.info:
if key in ['workflow', 'prompt', 'parameters']:
return img.info[key]
return None
metadata = ExifUtils._load_structured_metadata(image_path)
return (
metadata.get("parameters")
or metadata.get("prompt")
or metadata.get("workflow")
)
except Exception as e:
logger.error(f"Error extracting image metadata: {e}", exc_info=True)
return None
@@ -92,50 +183,26 @@ class ExifUtils:
if ext in ['.mp4', '.webm']:
return image_path
# Load the image and check its format
metadata_fields = ExifUtils._load_structured_metadata(image_path)
metadata_fields["parameters"] = metadata
with Image.open(image_path) as img:
img_format = img.format
# For PNG, try to update parameters directly
if img_format == 'PNG':
# Use PngInfo instead of plain dictionary
png_info = PngImagePlugin.PngInfo()
png_info.add_text("parameters", metadata)
img.save(image_path, format='PNG', pnginfo=png_info)
if img_format == "PNG":
png_info = ExifUtils._build_pnginfo(img, metadata_fields)
img.save(image_path, format="PNG", pnginfo=png_info)
return image_path
# For WebP format, use PIL's exif parameter directly
elif img_format == 'WEBP':
exif_dict = {'Exif': {piexif.ExifIFD.UserComment: b'UNICODE\0' + metadata.encode('utf-16be')}}
exif_bytes = piexif.dump(exif_dict)
# Save with the exif data
img.save(image_path, format='WEBP', exif=exif_bytes, quality=85)
return image_path
# For other formats, use standard EXIF approach
else:
try:
exif_dict = piexif.load(img.info.get('exif', b''))
except:
exif_dict = {'0th':{}, 'Exif':{}, 'GPS':{}, 'Interop':{}, '1st':{}}
# If no Exif dictionary exists, create one
if 'Exif' not in exif_dict:
exif_dict['Exif'] = {}
# Update the UserComment field - use UNICODE format
unicode_bytes = metadata.encode('utf-16be')
metadata_bytes = b'UNICODE\0' + unicode_bytes
exif_dict['Exif'][piexif.ExifIFD.UserComment] = metadata_bytes
# Convert EXIF dict back to bytes
exif_bytes = piexif.dump(exif_dict)
# Save the image with updated EXIF data
img.save(image_path, exif=exif_bytes)
exif_bytes = ExifUtils._build_exif_bytes(
metadata_fields, img.info.get("exif")
)
save_kwargs = {"exif": exif_bytes}
if img_format == "WEBP":
save_kwargs["quality"] = 85
img.save(image_path, format=img_format, **save_kwargs)
return image_path
except Exception as e:
logger.error(f"Error updating metadata in {image_path}: {e}")
@@ -297,12 +364,12 @@ class ExifUtils:
raise ValueError(f"Cannot process corrupt image data: {e}")
# Extract metadata if needed and valid
metadata = None
metadata_fields = None
if preserve_metadata:
try:
if isinstance(image_data, str) and os.path.exists(image_data):
# For file path, extract directly
metadata = ExifUtils.extract_image_metadata(image_data)
metadata_fields = ExifUtils._load_structured_metadata(image_data)
else:
# For binary data, save to temp file first
import tempfile
@@ -310,7 +377,7 @@ class ExifUtils:
temp_path = temp_file.name
temp_file.write(image_data)
try:
metadata = ExifUtils.extract_image_metadata(temp_path)
metadata_fields = ExifUtils._load_structured_metadata(temp_path)
except Exception as e:
logger.warning(f"Failed to extract metadata from temp file: {e}")
finally:
@@ -363,14 +430,13 @@ class ExifUtils:
optimized_data = output.getvalue()
# Handle metadata preservation if requested and available
if preserve_metadata and metadata:
if preserve_metadata and metadata_fields:
try:
if save_format == 'WEBP':
# For WebP format, directly save with metadata
try:
output_with_metadata = BytesIO()
exif_dict = {'Exif': {piexif.ExifIFD.UserComment: b'UNICODE\0' + metadata.encode('utf-16be')}}
exif_bytes = piexif.dump(exif_dict)
exif_bytes = ExifUtils._build_exif_bytes(metadata_fields)
resized_img.save(output_with_metadata, format='WEBP', exif=exif_bytes, quality=quality)
optimized_data = output_with_metadata.getvalue()
except Exception as e:
@@ -383,8 +449,9 @@ class ExifUtils:
temp_file.write(optimized_data)
try:
# Add metadata
ExifUtils.update_image_metadata(temp_path, metadata)
ExifUtils.update_image_metadata(
temp_path, metadata_fields.get("parameters") or ""
)
# Read back the file
with open(temp_path, 'rb') as f:
optimized_data = f.read()

View File

@@ -1,10 +1,13 @@
import json
import logging
import os
from io import BytesIO
from pathlib import Path
from types import SimpleNamespace
import piexif
import pytest
from PIL import Image, PngImagePlugin
from py.services.recipes.analysis_service import RecipeAnalysisService
from py.services.recipes.errors import (
@@ -13,6 +16,7 @@ from py.services.recipes.errors import (
RecipeValidationError,
)
from py.services.recipes.persistence_service import RecipePersistenceService
from py.utils.exif_utils import ExifUtils
class DummyExifUtils:
@@ -420,6 +424,56 @@ async def test_save_recipe_derives_allowed_fields_from_raw_metadata(tmp_path):
}
@pytest.mark.asyncio
async def test_save_recipe_preserves_workflow_when_png_is_converted_to_webp(tmp_path):
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
async def find_recipes_by_fingerprint(self, fingerprint):
return []
async def add_recipe(self, recipe_data):
return None
png_info = PngImagePlugin.PngInfo()
png_info.add_text("parameters", "prompt text\nSteps: 20")
png_info.add_text("workflow", '{"nodes":[{"id":1}]}')
image_buffer = BytesIO()
Image.new("RGB", (96, 48), color="purple").save(
image_buffer, format="PNG", pnginfo=png_info
)
service = RecipePersistenceService(
exif_utils=ExifUtils,
card_preview_width=64,
logger=logging.getLogger("test"),
)
result = await service.save_recipe(
recipe_scanner=DummyScanner(tmp_path),
image_bytes=image_buffer.getvalue(),
image_base64=None,
name="Workflow Recipe",
tags=["workflow"],
metadata={"base_model": "sd", "loras": []},
extension=".png",
)
image_path = Path(result.payload["image_path"])
exif_dict = piexif.load(str(image_path))
assert (
exif_dict["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8")
== 'Workflow:{"nodes":[{"id":1}]}'
)
user_comment = exif_dict["Exif"][piexif.ExifIFD.UserComment]
decoded_comment = user_comment[8:].decode("utf-16be")
assert "prompt text" in decoded_comment
assert "Recipe metadata:" in decoded_comment
@pytest.mark.asyncio
async def test_save_recipe_strips_checkpoint_local_fields(tmp_path):
exif_utils = DummyExifUtils()

View File

@@ -1,5 +1,8 @@
import json
import piexif
from PIL import Image, PngImagePlugin
from py.utils.exif_utils import ExifUtils
@@ -59,3 +62,82 @@ def test_append_recipe_metadata_includes_checkpoint(monkeypatch, tmp_path):
"baseModel": "Illustrious",
}
assert payload["base_model"] == "Illustrious"
def test_optimize_image_preserves_workflow_when_converting_png_to_webp(tmp_path):
image_path = tmp_path / "source.png"
png_info = PngImagePlugin.PngInfo()
png_info.add_text("parameters", "prompt text\nSteps: 20")
png_info.add_text("workflow", json.dumps({"nodes": [{"id": 1}]}))
Image.new("RGB", (64, 32), color="red").save(image_path, pnginfo=png_info)
optimized_data, extension = ExifUtils.optimize_image(
str(image_path),
target_width=32,
format="webp",
quality=85,
preserve_metadata=True,
)
optimized_path = tmp_path / f"optimized{extension}"
optimized_path.write_bytes(optimized_data)
exif_dict = piexif.load(str(optimized_path))
assert (
exif_dict["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8")
== 'Workflow:{"nodes": [{"id": 1}]}'
)
user_comment = exif_dict["Exif"][piexif.ExifIFD.UserComment]
assert user_comment.startswith(b"UNICODE\0")
assert user_comment[8:].decode("utf-16be") == "prompt text\nSteps: 20"
def test_update_image_metadata_preserves_webp_workflow(tmp_path):
image_path = tmp_path / "recipe.webp"
exif_dict = {
"0th": {
piexif.ImageIFD.ImageDescription: 'Workflow:{"nodes":[{"id":1}]}',
},
"Exif": {
piexif.ExifIFD.UserComment: b"UNICODE\0"
+ "prompt text".encode("utf-16be")
},
}
Image.new("RGB", (32, 32), color="blue").save(
image_path, format="WEBP", exif=piexif.dump(exif_dict), quality=85
)
ExifUtils.update_image_metadata(
str(image_path), 'prompt text\nRecipe metadata: {"title":"recipe"}'
)
updated_exif = piexif.load(str(image_path))
assert (
updated_exif["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8")
== 'Workflow:{"nodes":[{"id":1}]}'
)
updated_comment = updated_exif["Exif"][piexif.ExifIFD.UserComment]
assert (
updated_comment[8:].decode("utf-16be")
== 'prompt text\nRecipe metadata: {"title":"recipe"}'
)
def test_update_image_metadata_preserves_png_workflow(tmp_path):
image_path = tmp_path / "recipe.png"
png_info = PngImagePlugin.PngInfo()
png_info.add_text("parameters", "prompt text")
png_info.add_text("workflow", '{"nodes":[{"id":1}]}')
Image.new("RGB", (32, 32), color="green").save(image_path, pnginfo=png_info)
ExifUtils.update_image_metadata(
str(image_path), 'prompt text\nRecipe metadata: {"title":"recipe"}'
)
with Image.open(image_path) as img:
assert img.info["workflow"] == '{"nodes":[{"id":1}]}'
assert (
img.info["parameters"]
== 'prompt text\nRecipe metadata: {"title":"recipe"}'
)