diff --git a/py/utils/exif_utils.py b/py/utils/exif_utils.py index 0a9550f5..c0de350e 100644 --- a/py/utils/exif_utils.py +++ b/py/utils/exif_utils.py @@ -1,51 +1,16 @@ import piexif import json import logging -from typing import Dict, Optional, Any +from typing import Optional from io import BytesIO import os from PIL import Image -import re logger = logging.getLogger(__name__) class ExifUtils: """Utility functions for working with EXIF data in images""" - @staticmethod - def extract_user_comment(image_path: str) -> Optional[str]: - """Extract UserComment field from image EXIF data""" - try: - # First try to open as image to check format - with Image.open(image_path) as img: - if img.format not in ['JPEG', 'TIFF', 'WEBP']: - # For non-JPEG/TIFF/WEBP images, try to get EXIF through PIL - exif = img._getexif() - if exif and piexif.ExifIFD.UserComment in exif: - user_comment = exif[piexif.ExifIFD.UserComment] - if isinstance(user_comment, bytes): - if user_comment.startswith(b'UNICODE\0'): - return user_comment[8:].decode('utf-16be') - return user_comment.decode('utf-8', errors='ignore') - return user_comment - return None - - # For JPEG/TIFF/WEBP, use piexif - exif_dict = piexif.load(image_path) - - if piexif.ExifIFD.UserComment in exif_dict.get('Exif', {}): - user_comment = exif_dict['Exif'][piexif.ExifIFD.UserComment] - if isinstance(user_comment, bytes): - if user_comment.startswith(b'UNICODE\0'): - user_comment = user_comment[8:].decode('utf-16be') - else: - user_comment = user_comment.decode('utf-8', errors='ignore') - return user_comment - return None - - except Exception as e: - return None - @staticmethod def extract_image_metadata(image_path: str) -> Optional[str]: """Extract metadata from image including UserComment or parameters field @@ -103,53 +68,6 @@ class ExifUtils: logger.error(f"Error extracting image metadata: {e}", exc_info=True) return None - @staticmethod - def update_user_comment(image_path: str, user_comment: str) -> str: - """Update UserComment field in image EXIF data""" - try: - # Load the image and its EXIF data - with Image.open(image_path) as img: - # Get original format - img_format = img.format - - # For WebP format, we need a different approach - if img_format == 'WEBP': - # WebP doesn't support standard EXIF through piexif - # We'll use PIL's exif parameter directly - exif_dict = {'Exif': {piexif.ExifIFD.UserComment: b'UNICODE\0' + user_comment.encode('utf-16be')}} - exif_bytes = piexif.dump(exif_dict) - - # Save with the exif data - img.save(image_path, format='WEBP', exif=exif_bytes, quality=85) - return image_path - - # For other formats, use the standard approach - try: - exif_dict = piexif.load(img.info.get('exif', b'')) - except: - exif_dict = {'0th':{}, 'Exif':{}, 'GPS':{}, 'Interop':{}, '1st':{}} - - # If no Exif dictionary exists, create one - if 'Exif' not in exif_dict: - exif_dict['Exif'] = {} - - # Update the UserComment field - use UNICODE format - unicode_bytes = user_comment.encode('utf-16be') - user_comment_bytes = b'UNICODE\0' + unicode_bytes - - exif_dict['Exif'][piexif.ExifIFD.UserComment] = user_comment_bytes - - # Convert EXIF dict back to bytes - exif_bytes = piexif.dump(exif_dict) - - # Save the image with updated EXIF data - img.save(image_path, exif=exif_bytes) - - return image_path - except Exception as e: - logger.error(f"Error updating EXIF data in {image_path}: {e}") - return image_path - @staticmethod def update_image_metadata(image_path: str, metadata: str) -> str: """Update metadata in image's EXIF data or parameters fields @@ -394,210 +312,4 @@ class ExifUtils: if isinstance(image_data, str) and os.path.exists(image_data): with open(image_data, 'rb') as f: return f.read(), os.path.splitext(image_data)[1] - return image_data, '.jpg' - - @staticmethod - def _parse_comfyui_workflow(workflow_data: Any) -> Dict[str, Any]: - """ - Parse ComfyUI workflow data and extract relevant generation parameters - - Args: - workflow_data: Raw workflow data (string or dict) - - Returns: - Formatted generation parameters dictionary - """ - try: - # If workflow_data is a string, try to parse it as JSON - if isinstance(workflow_data, str): - try: - workflow_data = json.loads(workflow_data) - except json.JSONDecodeError: - logger.error("Failed to parse workflow data as JSON") - return {} - - # Now workflow_data should be a dictionary - if not isinstance(workflow_data, dict): - logger.error(f"Workflow data is not a dictionary: {type(workflow_data)}") - return {} - - # Initialize parameters dictionary with only the required fields - gen_params = { - "prompt": "", - "negative_prompt": "", - "steps": "", - "sampler": "", - "cfg_scale": "", - "seed": "", - "size": "", - "clip_skip": "" - } - - # First pass: find the KSampler node to get basic parameters and node references - # Store node references to follow for prompts - positive_ref = None - negative_ref = None - - for node_id, node_data in workflow_data.items(): - if not isinstance(node_data, dict): - continue - - # Extract node inputs if available - inputs = node_data.get("inputs", {}) - if not inputs: - continue - - # KSampler nodes contain most generation parameters and references to prompt nodes - if "KSampler" in node_data.get("class_type", ""): - # Extract basic sampling parameters - gen_params["steps"] = inputs.get("steps", "") - gen_params["cfg_scale"] = inputs.get("cfg", "") - gen_params["sampler"] = inputs.get("sampler_name", "") - gen_params["seed"] = inputs.get("seed", "") - if isinstance(gen_params["seed"], list) and len(gen_params["seed"]) > 1: - gen_params["seed"] = gen_params["seed"][1] # Use the actual value if it's a list - - # Get references to positive and negative prompt nodes - positive_ref = inputs.get("positive", "") - negative_ref = inputs.get("negative", "") - - # CLIPSetLastLayer contains clip_skip information - elif "CLIPSetLastLayer" in node_data.get("class_type", ""): - gen_params["clip_skip"] = inputs.get("stop_at_clip_layer", "") - if isinstance(gen_params["clip_skip"], int) and gen_params["clip_skip"] < 0: - # Convert negative layer index to positive clip skip value - gen_params["clip_skip"] = abs(gen_params["clip_skip"]) - - # Look for resolution information - elif "LatentImage" in node_data.get("class_type", "") or "Empty" in node_data.get("class_type", ""): - width = inputs.get("width", 0) - height = inputs.get("height", 0) - if width and height: - gen_params["size"] = f"{width}x{height}" - - # Some nodes have resolution as a string like "832x1216 (0.68)" - resolution = inputs.get("resolution", "") - if isinstance(resolution, str) and "x" in resolution: - gen_params["size"] = resolution.split(" ")[0] # Extract just the dimensions - - # Helper function to follow node references and extract text content - def get_text_from_node_ref(node_ref, workflow_data): - if not node_ref or not isinstance(node_ref, list) or len(node_ref) < 2: - return "" - - node_id, slot_idx = node_ref - - # If we can't find the node, return empty string - if node_id not in workflow_data: - return "" - - node = workflow_data[node_id] - inputs = node.get("inputs", {}) - - # Direct text input in CLIP Text Encode nodes - if "CLIPTextEncode" in node.get("class_type", ""): - text = inputs.get("text", "") - if isinstance(text, str): - return text - elif isinstance(text, list) and len(text) >= 2: - # If text is a reference to another node, follow it - return get_text_from_node_ref(text, workflow_data) - - # Other nodes might have text input with different field names - for field_name, field_value in inputs.items(): - if field_name == "text" and isinstance(field_value, str): - return field_value - elif isinstance(field_value, list) and len(field_value) >= 2 and field_name in ["text"]: - # If it's a reference to another node, follow it - return get_text_from_node_ref(field_value, workflow_data) - - return "" - - # Extract prompts by following references from KSampler node - if positive_ref: - gen_params["prompt"] = get_text_from_node_ref(positive_ref, workflow_data) - - if negative_ref: - gen_params["negative_prompt"] = get_text_from_node_ref(negative_ref, workflow_data) - - # Fallback: if we couldn't extract prompts via references, use the traditional method - if not gen_params["prompt"] or not gen_params["negative_prompt"]: - for node_id, node_data in workflow_data.items(): - if not isinstance(node_data, dict): - continue - - inputs = node_data.get("inputs", {}) - if not inputs: - continue - - if "CLIPTextEncode" in node_data.get("class_type", ""): - # Check for negative prompt nodes - title = node_data.get("_meta", {}).get("title", "").lower() - prompt_text = inputs.get("text", "") - - if isinstance(prompt_text, str): - if "negative" in title and not gen_params["negative_prompt"]: - gen_params["negative_prompt"] = prompt_text - elif prompt_text and not "negative" in title and not gen_params["prompt"]: - gen_params["prompt"] = prompt_text - - return gen_params - - except Exception as e: - logger.error(f"Error parsing ComfyUI workflow: {e}", exc_info=True) - return {} - - @staticmethod - def extract_comfyui_gen_params(image_path: str) -> Dict[str, Any]: - """ - Extract ComfyUI workflow data from PNG images and format for recipe data - Only extracts the specific generation parameters needed for recipes. - - Args: - image_path: Path to the ComfyUI-generated PNG image - - Returns: - Dictionary containing formatted generation parameters - """ - try: - # Check if the file exists and is accessible - if not os.path.exists(image_path): - logger.error(f"Image file not found: {image_path}") - return {} - - # Open the image to extract embedded workflow data - with Image.open(image_path) as img: - workflow_data = None - - # For PNG images, look for the ComfyUI workflow data in PNG chunks - if img.format == 'PNG': - # Check standard metadata fields that might contain workflow - if 'parameters' in img.info: - workflow_data = img.info['parameters'] - elif 'prompt' in img.info: - workflow_data = img.info['prompt'] - else: - # Look for other potential field names that might contain workflow data - for key in img.info: - if isinstance(key, str) and ('workflow' in key.lower() or 'comfy' in key.lower()): - workflow_data = img.info[key] - break - - # If no workflow data found in PNG chunks, try extract_image_metadata as fallback - if not workflow_data: - metadata = ExifUtils.extract_image_metadata(image_path) - if metadata and '{' in metadata and '}' in metadata: - # Try to extract JSON part - json_start = metadata.find('{') - json_end = metadata.rfind('}') + 1 - workflow_data = metadata[json_start:json_end] - - # Parse workflow data if found - if workflow_data: - return ExifUtils._parse_comfyui_workflow(workflow_data) - - return {} - - except Exception as e: - logger.error(f"Error extracting ComfyUI gen params from {image_path}: {e}", exc_info=True) - return {} \ No newline at end of file + return image_data, '.jpg' \ No newline at end of file