Enhance SaveImage class with filename formatting and multiple image support

- Updated the INPUT_TYPES to accept multiple images and modified the corresponding processing methods.
- Introduced a new format_filename method to handle dynamic filename generation using metadata patterns.
- Replaced save_workflow_json with embed_workflow for better clarity in saving workflow metadata.
- Improved directory handling and filename generation logic to ensure proper file saving.
This commit is contained in:
Will Miao
2025-04-02 15:08:36 +08:00
parent b508f51fcf
commit aec218ba00

View File

@@ -3,11 +3,8 @@ import os
import asyncio import asyncio
import re import re
import numpy as np import numpy as np
import time
from server import PromptServer # type: ignore
import folder_paths # type: ignore import folder_paths # type: ignore
from ..services.lora_scanner import LoraScanner from ..services.lora_scanner import LoraScanner
from ..config import config
from ..workflow.parser import WorkflowParser from ..workflow.parser import WorkflowParser
from PIL import Image, PngImagePlugin from PIL import Image, PngImagePlugin
import piexif import piexif
@@ -25,18 +22,21 @@ class SaveImage:
self.compress_level = 4 self.compress_level = 4
self.counter = 0 self.counter = 0
# Add pattern format regex for filename substitution
pattern_format = re.compile(r"(%[^%]+%)")
@classmethod @classmethod
def INPUT_TYPES(cls): def INPUT_TYPES(cls):
return { return {
"required": { "required": {
"image": ("IMAGE",), "images": ("IMAGE",),
"filename_prefix": ("STRING", {"default": "ComfyUI"}), "filename_prefix": ("STRING", {"default": "ComfyUI"}),
"file_format": (["png", "jpeg", "webp"],), "file_format": (["png", "jpeg", "webp"],),
}, },
"optional": { "optional": {
"lossless_webp": ("BOOLEAN", {"default": True}), "lossless_webp": ("BOOLEAN", {"default": True}),
"quality": ("INT", {"default": 100, "min": 1, "max": 100}), "quality": ("INT", {"default": 100, "min": 1, "max": 100}),
"save_workflow_json": ("BOOLEAN", {"default": False}), "embed_workflow": ("BOOLEAN", {"default": False}),
"add_counter_to_filename": ("BOOLEAN", {"default": True}), "add_counter_to_filename": ("BOOLEAN", {"default": True}),
}, },
"hidden": { "hidden": {
@@ -45,8 +45,8 @@ class SaveImage:
}, },
} }
RETURN_TYPES = ("IMAGE", "STRING") RETURN_TYPES = ("IMAGE",)
RETURN_NAMES = ("image", "filename") RETURN_NAMES = ("images",)
FUNCTION = "process_image" FUNCTION = "process_image"
OUTPUT_NODE = True OUTPUT_NODE = True
@@ -174,8 +174,73 @@ class SaveImage:
# Join all parts with a new line # Join all parts with a new line
return "\n".join(metadata_parts) return "\n".join(metadata_parts)
# credit to nkchocoai
# Add format_filename method to handle pattern substitution
def format_filename(self, filename, parsed_workflow):
"""Format filename with metadata values"""
if not parsed_workflow:
return filename
result = re.findall(self.pattern_format, filename)
for segment in result:
parts = segment.replace("%", "").split(":")
key = parts[0]
if key == "seed" and 'seed' in parsed_workflow:
filename = filename.replace(segment, str(parsed_workflow.get('seed', '')))
elif key == "width" and 'size' in parsed_workflow:
size = parsed_workflow.get('size', 'x')
w = size.split('x')[0] if isinstance(size, str) else size[0]
filename = filename.replace(segment, str(w))
elif key == "height" and 'size' in parsed_workflow:
size = parsed_workflow.get('size', 'x')
h = size.split('x')[1] if isinstance(size, str) else size[1]
filename = filename.replace(segment, str(h))
elif key == "pprompt" and 'prompt' in parsed_workflow:
prompt = parsed_workflow.get('prompt', '').replace("\n", " ")
if len(parts) >= 2:
length = int(parts[1])
prompt = prompt[:length]
filename = filename.replace(segment, prompt.strip())
elif key == "nprompt" and 'negative_prompt' in parsed_workflow:
prompt = parsed_workflow.get('negative_prompt', '').replace("\n", " ")
if len(parts) >= 2:
length = int(parts[1])
prompt = prompt[:length]
filename = filename.replace(segment, prompt.strip())
elif key == "model" and 'checkpoint' in parsed_workflow:
model = parsed_workflow.get('checkpoint', '')
model = os.path.splitext(os.path.basename(model))[0]
if len(parts) >= 2:
length = int(parts[1])
model = model[:length]
filename = filename.replace(segment, model)
elif key == "date":
from datetime import datetime
now = datetime.now()
date_table = {
"yyyy": str(now.year),
"MM": str(now.month).zfill(2),
"dd": str(now.day).zfill(2),
"hh": str(now.hour).zfill(2),
"mm": str(now.minute).zfill(2),
"ss": str(now.second).zfill(2),
}
if len(parts) >= 2:
date_format = parts[1]
for k, v in date_table.items():
date_format = date_format.replace(k, v)
filename = filename.replace(segment, date_format)
else:
date_format = "yyyyMMddhhmmss"
for k, v in date_table.items():
date_format = date_format.replace(k, v)
filename = filename.replace(segment, date_format)
return filename
def save_images(self, images, filename_prefix, file_format, prompt=None, extra_pnginfo=None, def save_images(self, images, filename_prefix, file_format, prompt=None, extra_pnginfo=None,
lossless_webp=True, quality=100, save_workflow_json=False, add_counter_to_filename=True): lossless_webp=True, quality=100, embed_workflow=False, add_counter_to_filename=True):
"""Save images with metadata""" """Save images with metadata"""
results = [] results = []
@@ -189,44 +254,54 @@ class SaveImage:
# Get or create metadata asynchronously # Get or create metadata asynchronously
metadata = asyncio.run(self.format_metadata(parsed_workflow)) metadata = asyncio.run(self.format_metadata(parsed_workflow))
# Process filename_prefix with pattern substitution
filename_prefix = self.format_filename(filename_prefix, parsed_workflow)
# Process each image # Process each image
for i, image in enumerate(images): for i, image in enumerate(images):
# Convert the tensor image to numpy array # Convert the tensor image to numpy array
img = 255. * image.cpu().numpy() img = 255. * image.cpu().numpy()
img = Image.fromarray(np.clip(img, 0, 255).astype(np.uint8)) img = Image.fromarray(np.clip(img, 0, 255).astype(np.uint8))
# Create directory if filename_prefix contains path separators
output_path = os.path.join(self.output_dir, filename_prefix)
if not os.path.exists(os.path.dirname(output_path)):
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Use folder_paths.get_save_image_path for better counter handling
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(
filename_prefix, self.output_dir, img.width, img.height
)
# Generate filename with counter if needed # Generate filename with counter if needed
if add_counter_to_filename: if add_counter_to_filename:
filename = f"{filename_prefix}_{self.counter:05d}" filename += f"_{counter:05}"
self.counter += 1
else:
filename = f"{filename_prefix}"
# Set file extension and prepare saving parameters # Set file extension and prepare saving parameters
if file_format == "png": if file_format == "png":
filename += ".png" file = filename + ".png"
file_extension = ".png" file_extension = ".png"
save_kwargs = {"optimize": True, "compress_level": self.compress_level} save_kwargs = {"optimize": True, "compress_level": self.compress_level}
pnginfo = PngImagePlugin.PngInfo() pnginfo = PngImagePlugin.PngInfo()
elif file_format == "jpeg": elif file_format == "jpeg":
filename += ".jpg" file = filename + ".jpg"
file_extension = ".jpg" file_extension = ".jpg"
save_kwargs = {"quality": quality, "optimize": True} save_kwargs = {"quality": quality, "optimize": True}
elif file_format == "webp": elif file_format == "webp":
filename += ".webp" file = filename + ".webp"
file_extension = ".webp" file_extension = ".webp"
save_kwargs = {"quality": quality, "lossless": lossless_webp} save_kwargs = {"quality": quality, "lossless": lossless_webp}
# Full save path # Full save path
file_path = os.path.join(self.output_dir, filename) file_path = os.path.join(full_output_folder, file)
# Save the image with metadata # Save the image with metadata
try: try:
if file_format == "png": if file_format == "png":
if metadata: if metadata:
pnginfo.add_text("parameters", metadata) pnginfo.add_text("parameters", metadata)
if save_workflow_json and extra_pnginfo is not None: if embed_workflow and extra_pnginfo is not None:
workflow_json = json.dumps(extra_pnginfo) workflow_json = json.dumps(extra_pnginfo["workflow"])
pnginfo.add_text("workflow", workflow_json) pnginfo.add_text("workflow", workflow_json)
save_kwargs["pnginfo"] = pnginfo save_kwargs["pnginfo"] = pnginfo
img.save(file_path, format="PNG", **save_kwargs) img.save(file_path, format="PNG", **save_kwargs)
@@ -252,31 +327,24 @@ class SaveImage:
img.save(file_path, format="WEBP", **save_kwargs) img.save(file_path, format="WEBP", **save_kwargs)
results.append({ results.append({
"filename": filename, "filename": file,
"subfolder": "", "subfolder": subfolder,
"type": self.type "type": self.type
}) })
# Notify UI about saved image
PromptServer.instance.send_sync("image", {
"filename": filename,
"subfolder": "",
"type": self.type,
})
except Exception as e: except Exception as e:
print(f"Error saving image: {e}") print(f"Error saving image: {e}")
return results return results
def process_image(self, image, filename_prefix="ComfyUI", file_format="png", prompt=None, extra_pnginfo=None, def process_image(self, images, filename_prefix="ComfyUI", file_format="png", prompt=None, extra_pnginfo=None,
lossless_webp=True, quality=100, save_workflow_json=False, add_counter_to_filename=True): lossless_webp=True, quality=100, embed_workflow=False, add_counter_to_filename=True):
"""Process and save image with metadata""" """Process and save image with metadata"""
# Make sure the output directory exists # Make sure the output directory exists
os.makedirs(self.output_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True)
# Convert single image to list for consistent processing # Convert single image to list for consistent processing
images = [image[0]] if len(image.shape) == 3 else [img for img in image] images = [images[0]] if len(images.shape) == 3 else [img for img in images]
# Save all images # Save all images
results = self.save_images( results = self.save_images(
@@ -287,10 +355,8 @@ class SaveImage:
extra_pnginfo, extra_pnginfo,
lossless_webp, lossless_webp,
quality, quality,
save_workflow_json, embed_workflow,
add_counter_to_filename add_counter_to_filename
) )
# Return the first saved filename and the original image return (images,)
filename = results[0]["filename"] if results else ""
return (image, filename)