mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-24 14:42:11 -03:00
Refactor ExifUtils by removing unused methods and imports
- Removed the extract_user_comment and update_user_comment methods to streamline the ExifUtils class. - Cleaned up unnecessary imports and reduced code complexity, focusing on essential functionality for image metadata extraction.
This commit is contained in:
@@ -1,51 +1,16 @@
|
|||||||
import piexif
|
import piexif
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Optional, Any
|
from typing import Optional
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import os
|
import os
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
import re
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class ExifUtils:
|
class ExifUtils:
|
||||||
"""Utility functions for working with EXIF data in images"""
|
"""Utility functions for working with EXIF data in images"""
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def extract_user_comment(image_path: str) -> Optional[str]:
|
|
||||||
"""Extract UserComment field from image EXIF data"""
|
|
||||||
try:
|
|
||||||
# First try to open as image to check format
|
|
||||||
with Image.open(image_path) as img:
|
|
||||||
if img.format not in ['JPEG', 'TIFF', 'WEBP']:
|
|
||||||
# For non-JPEG/TIFF/WEBP images, try to get EXIF through PIL
|
|
||||||
exif = img._getexif()
|
|
||||||
if exif and piexif.ExifIFD.UserComment in exif:
|
|
||||||
user_comment = exif[piexif.ExifIFD.UserComment]
|
|
||||||
if isinstance(user_comment, bytes):
|
|
||||||
if user_comment.startswith(b'UNICODE\0'):
|
|
||||||
return user_comment[8:].decode('utf-16be')
|
|
||||||
return user_comment.decode('utf-8', errors='ignore')
|
|
||||||
return user_comment
|
|
||||||
return None
|
|
||||||
|
|
||||||
# For JPEG/TIFF/WEBP, use piexif
|
|
||||||
exif_dict = piexif.load(image_path)
|
|
||||||
|
|
||||||
if piexif.ExifIFD.UserComment in exif_dict.get('Exif', {}):
|
|
||||||
user_comment = exif_dict['Exif'][piexif.ExifIFD.UserComment]
|
|
||||||
if isinstance(user_comment, bytes):
|
|
||||||
if user_comment.startswith(b'UNICODE\0'):
|
|
||||||
user_comment = user_comment[8:].decode('utf-16be')
|
|
||||||
else:
|
|
||||||
user_comment = user_comment.decode('utf-8', errors='ignore')
|
|
||||||
return user_comment
|
|
||||||
return None
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
return None
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def extract_image_metadata(image_path: str) -> Optional[str]:
|
def extract_image_metadata(image_path: str) -> Optional[str]:
|
||||||
"""Extract metadata from image including UserComment or parameters field
|
"""Extract metadata from image including UserComment or parameters field
|
||||||
@@ -103,53 +68,6 @@ class ExifUtils:
|
|||||||
logger.error(f"Error extracting image metadata: {e}", exc_info=True)
|
logger.error(f"Error extracting image metadata: {e}", exc_info=True)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def update_user_comment(image_path: str, user_comment: str) -> str:
|
|
||||||
"""Update UserComment field in image EXIF data"""
|
|
||||||
try:
|
|
||||||
# Load the image and its EXIF data
|
|
||||||
with Image.open(image_path) as img:
|
|
||||||
# Get original format
|
|
||||||
img_format = img.format
|
|
||||||
|
|
||||||
# For WebP format, we need a different approach
|
|
||||||
if img_format == 'WEBP':
|
|
||||||
# WebP doesn't support standard EXIF through piexif
|
|
||||||
# We'll use PIL's exif parameter directly
|
|
||||||
exif_dict = {'Exif': {piexif.ExifIFD.UserComment: b'UNICODE\0' + user_comment.encode('utf-16be')}}
|
|
||||||
exif_bytes = piexif.dump(exif_dict)
|
|
||||||
|
|
||||||
# Save with the exif data
|
|
||||||
img.save(image_path, format='WEBP', exif=exif_bytes, quality=85)
|
|
||||||
return image_path
|
|
||||||
|
|
||||||
# For other formats, use the standard approach
|
|
||||||
try:
|
|
||||||
exif_dict = piexif.load(img.info.get('exif', b''))
|
|
||||||
except:
|
|
||||||
exif_dict = {'0th':{}, 'Exif':{}, 'GPS':{}, 'Interop':{}, '1st':{}}
|
|
||||||
|
|
||||||
# If no Exif dictionary exists, create one
|
|
||||||
if 'Exif' not in exif_dict:
|
|
||||||
exif_dict['Exif'] = {}
|
|
||||||
|
|
||||||
# Update the UserComment field - use UNICODE format
|
|
||||||
unicode_bytes = user_comment.encode('utf-16be')
|
|
||||||
user_comment_bytes = b'UNICODE\0' + unicode_bytes
|
|
||||||
|
|
||||||
exif_dict['Exif'][piexif.ExifIFD.UserComment] = user_comment_bytes
|
|
||||||
|
|
||||||
# Convert EXIF dict back to bytes
|
|
||||||
exif_bytes = piexif.dump(exif_dict)
|
|
||||||
|
|
||||||
# Save the image with updated EXIF data
|
|
||||||
img.save(image_path, exif=exif_bytes)
|
|
||||||
|
|
||||||
return image_path
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error updating EXIF data in {image_path}: {e}")
|
|
||||||
return image_path
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def update_image_metadata(image_path: str, metadata: str) -> str:
|
def update_image_metadata(image_path: str, metadata: str) -> str:
|
||||||
"""Update metadata in image's EXIF data or parameters fields
|
"""Update metadata in image's EXIF data or parameters fields
|
||||||
@@ -394,210 +312,4 @@ class ExifUtils:
|
|||||||
if isinstance(image_data, str) and os.path.exists(image_data):
|
if isinstance(image_data, str) and os.path.exists(image_data):
|
||||||
with open(image_data, 'rb') as f:
|
with open(image_data, 'rb') as f:
|
||||||
return f.read(), os.path.splitext(image_data)[1]
|
return f.read(), os.path.splitext(image_data)[1]
|
||||||
return image_data, '.jpg'
|
return image_data, '.jpg'
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_comfyui_workflow(workflow_data: Any) -> Dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Parse ComfyUI workflow data and extract relevant generation parameters
|
|
||||||
|
|
||||||
Args:
|
|
||||||
workflow_data: Raw workflow data (string or dict)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Formatted generation parameters dictionary
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# If workflow_data is a string, try to parse it as JSON
|
|
||||||
if isinstance(workflow_data, str):
|
|
||||||
try:
|
|
||||||
workflow_data = json.loads(workflow_data)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
logger.error("Failed to parse workflow data as JSON")
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Now workflow_data should be a dictionary
|
|
||||||
if not isinstance(workflow_data, dict):
|
|
||||||
logger.error(f"Workflow data is not a dictionary: {type(workflow_data)}")
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Initialize parameters dictionary with only the required fields
|
|
||||||
gen_params = {
|
|
||||||
"prompt": "",
|
|
||||||
"negative_prompt": "",
|
|
||||||
"steps": "",
|
|
||||||
"sampler": "",
|
|
||||||
"cfg_scale": "",
|
|
||||||
"seed": "",
|
|
||||||
"size": "",
|
|
||||||
"clip_skip": ""
|
|
||||||
}
|
|
||||||
|
|
||||||
# First pass: find the KSampler node to get basic parameters and node references
|
|
||||||
# Store node references to follow for prompts
|
|
||||||
positive_ref = None
|
|
||||||
negative_ref = None
|
|
||||||
|
|
||||||
for node_id, node_data in workflow_data.items():
|
|
||||||
if not isinstance(node_data, dict):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Extract node inputs if available
|
|
||||||
inputs = node_data.get("inputs", {})
|
|
||||||
if not inputs:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# KSampler nodes contain most generation parameters and references to prompt nodes
|
|
||||||
if "KSampler" in node_data.get("class_type", ""):
|
|
||||||
# Extract basic sampling parameters
|
|
||||||
gen_params["steps"] = inputs.get("steps", "")
|
|
||||||
gen_params["cfg_scale"] = inputs.get("cfg", "")
|
|
||||||
gen_params["sampler"] = inputs.get("sampler_name", "")
|
|
||||||
gen_params["seed"] = inputs.get("seed", "")
|
|
||||||
if isinstance(gen_params["seed"], list) and len(gen_params["seed"]) > 1:
|
|
||||||
gen_params["seed"] = gen_params["seed"][1] # Use the actual value if it's a list
|
|
||||||
|
|
||||||
# Get references to positive and negative prompt nodes
|
|
||||||
positive_ref = inputs.get("positive", "")
|
|
||||||
negative_ref = inputs.get("negative", "")
|
|
||||||
|
|
||||||
# CLIPSetLastLayer contains clip_skip information
|
|
||||||
elif "CLIPSetLastLayer" in node_data.get("class_type", ""):
|
|
||||||
gen_params["clip_skip"] = inputs.get("stop_at_clip_layer", "")
|
|
||||||
if isinstance(gen_params["clip_skip"], int) and gen_params["clip_skip"] < 0:
|
|
||||||
# Convert negative layer index to positive clip skip value
|
|
||||||
gen_params["clip_skip"] = abs(gen_params["clip_skip"])
|
|
||||||
|
|
||||||
# Look for resolution information
|
|
||||||
elif "LatentImage" in node_data.get("class_type", "") or "Empty" in node_data.get("class_type", ""):
|
|
||||||
width = inputs.get("width", 0)
|
|
||||||
height = inputs.get("height", 0)
|
|
||||||
if width and height:
|
|
||||||
gen_params["size"] = f"{width}x{height}"
|
|
||||||
|
|
||||||
# Some nodes have resolution as a string like "832x1216 (0.68)"
|
|
||||||
resolution = inputs.get("resolution", "")
|
|
||||||
if isinstance(resolution, str) and "x" in resolution:
|
|
||||||
gen_params["size"] = resolution.split(" ")[0] # Extract just the dimensions
|
|
||||||
|
|
||||||
# Helper function to follow node references and extract text content
|
|
||||||
def get_text_from_node_ref(node_ref, workflow_data):
|
|
||||||
if not node_ref or not isinstance(node_ref, list) or len(node_ref) < 2:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
node_id, slot_idx = node_ref
|
|
||||||
|
|
||||||
# If we can't find the node, return empty string
|
|
||||||
if node_id not in workflow_data:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
node = workflow_data[node_id]
|
|
||||||
inputs = node.get("inputs", {})
|
|
||||||
|
|
||||||
# Direct text input in CLIP Text Encode nodes
|
|
||||||
if "CLIPTextEncode" in node.get("class_type", ""):
|
|
||||||
text = inputs.get("text", "")
|
|
||||||
if isinstance(text, str):
|
|
||||||
return text
|
|
||||||
elif isinstance(text, list) and len(text) >= 2:
|
|
||||||
# If text is a reference to another node, follow it
|
|
||||||
return get_text_from_node_ref(text, workflow_data)
|
|
||||||
|
|
||||||
# Other nodes might have text input with different field names
|
|
||||||
for field_name, field_value in inputs.items():
|
|
||||||
if field_name == "text" and isinstance(field_value, str):
|
|
||||||
return field_value
|
|
||||||
elif isinstance(field_value, list) and len(field_value) >= 2 and field_name in ["text"]:
|
|
||||||
# If it's a reference to another node, follow it
|
|
||||||
return get_text_from_node_ref(field_value, workflow_data)
|
|
||||||
|
|
||||||
return ""
|
|
||||||
|
|
||||||
# Extract prompts by following references from KSampler node
|
|
||||||
if positive_ref:
|
|
||||||
gen_params["prompt"] = get_text_from_node_ref(positive_ref, workflow_data)
|
|
||||||
|
|
||||||
if negative_ref:
|
|
||||||
gen_params["negative_prompt"] = get_text_from_node_ref(negative_ref, workflow_data)
|
|
||||||
|
|
||||||
# Fallback: if we couldn't extract prompts via references, use the traditional method
|
|
||||||
if not gen_params["prompt"] or not gen_params["negative_prompt"]:
|
|
||||||
for node_id, node_data in workflow_data.items():
|
|
||||||
if not isinstance(node_data, dict):
|
|
||||||
continue
|
|
||||||
|
|
||||||
inputs = node_data.get("inputs", {})
|
|
||||||
if not inputs:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if "CLIPTextEncode" in node_data.get("class_type", ""):
|
|
||||||
# Check for negative prompt nodes
|
|
||||||
title = node_data.get("_meta", {}).get("title", "").lower()
|
|
||||||
prompt_text = inputs.get("text", "")
|
|
||||||
|
|
||||||
if isinstance(prompt_text, str):
|
|
||||||
if "negative" in title and not gen_params["negative_prompt"]:
|
|
||||||
gen_params["negative_prompt"] = prompt_text
|
|
||||||
elif prompt_text and not "negative" in title and not gen_params["prompt"]:
|
|
||||||
gen_params["prompt"] = prompt_text
|
|
||||||
|
|
||||||
return gen_params
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error parsing ComfyUI workflow: {e}", exc_info=True)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def extract_comfyui_gen_params(image_path: str) -> Dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Extract ComfyUI workflow data from PNG images and format for recipe data
|
|
||||||
Only extracts the specific generation parameters needed for recipes.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
image_path: Path to the ComfyUI-generated PNG image
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dictionary containing formatted generation parameters
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Check if the file exists and is accessible
|
|
||||||
if not os.path.exists(image_path):
|
|
||||||
logger.error(f"Image file not found: {image_path}")
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Open the image to extract embedded workflow data
|
|
||||||
with Image.open(image_path) as img:
|
|
||||||
workflow_data = None
|
|
||||||
|
|
||||||
# For PNG images, look for the ComfyUI workflow data in PNG chunks
|
|
||||||
if img.format == 'PNG':
|
|
||||||
# Check standard metadata fields that might contain workflow
|
|
||||||
if 'parameters' in img.info:
|
|
||||||
workflow_data = img.info['parameters']
|
|
||||||
elif 'prompt' in img.info:
|
|
||||||
workflow_data = img.info['prompt']
|
|
||||||
else:
|
|
||||||
# Look for other potential field names that might contain workflow data
|
|
||||||
for key in img.info:
|
|
||||||
if isinstance(key, str) and ('workflow' in key.lower() or 'comfy' in key.lower()):
|
|
||||||
workflow_data = img.info[key]
|
|
||||||
break
|
|
||||||
|
|
||||||
# If no workflow data found in PNG chunks, try extract_image_metadata as fallback
|
|
||||||
if not workflow_data:
|
|
||||||
metadata = ExifUtils.extract_image_metadata(image_path)
|
|
||||||
if metadata and '{' in metadata and '}' in metadata:
|
|
||||||
# Try to extract JSON part
|
|
||||||
json_start = metadata.find('{')
|
|
||||||
json_end = metadata.rfind('}') + 1
|
|
||||||
workflow_data = metadata[json_start:json_end]
|
|
||||||
|
|
||||||
# Parse workflow data if found
|
|
||||||
if workflow_data:
|
|
||||||
return ExifUtils._parse_comfyui_workflow(workflow_data)
|
|
||||||
|
|
||||||
return {}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error extracting ComfyUI gen params from {image_path}: {e}", exc_info=True)
|
|
||||||
return {}
|
|
||||||
Reference in New Issue
Block a user