refactor: Rename to_comfyui_format method to to_dict and update references in save_image.py

This commit is contained in:
Will Miao
2025-04-16 21:42:54 +08:00
parent 181f78421b
commit d4b2dd0ec1
3 changed files with 45 additions and 49 deletions

View File

@@ -157,7 +157,7 @@ class MetadataProcessor:
return params
@staticmethod
def to_comfyui_format(metadata):
def to_dict(metadata):
"""Convert extracted metadata to the ComfyUI output.json format"""
params = MetadataProcessor.extract_generation_params(metadata)
@@ -171,5 +171,5 @@ class MetadataProcessor:
@staticmethod
def to_json(metadata):
"""Convert metadata to JSON string"""
params = MetadataProcessor.to_comfyui_format(metadata)
params = MetadataProcessor.to_dict(metadata)
return json.dumps(params, indent=4)

View File

@@ -167,8 +167,6 @@ class LoraLoaderManagerExtractor(NodeMetadataExtractor):
"node_id": node_id
}
print(f"Active LoRAs for node {node_id}: {active_loras}")
# Registry of node-specific extractors
NODE_EXTRACTORS = {
"CheckpointLoaderSimple": CheckpointLoaderExtractor,

View File

@@ -5,7 +5,8 @@ import re
import numpy as np
import folder_paths # type: ignore
from ..services.lora_scanner import LoraScanner
from ..workflow.parser import WorkflowParser
from ..metadata_collector.metadata_processor import MetadataProcessor
from ..metadata_collector import get_metadata
from PIL import Image, PngImagePlugin
import piexif
from io import BytesIO
@@ -61,21 +62,21 @@ class SaveImage:
return item.get('sha256')
return None
async def format_metadata(self, parsed_workflow, custom_prompt=None):
async def format_metadata(self, metadata_dict, custom_prompt=None):
"""Format metadata in the requested format similar to userComment example"""
if not parsed_workflow:
if not metadata_dict:
return ""
# Extract the prompt and negative prompt
prompt = parsed_workflow.get('prompt', '')
negative_prompt = parsed_workflow.get('negative_prompt', '')
prompt = metadata_dict.get('prompt', '')
negative_prompt = metadata_dict.get('negative_prompt', '')
# Override prompt with custom_prompt if provided
if custom_prompt:
prompt = custom_prompt
# Extract loras from the prompt if present
loras_text = parsed_workflow.get('loras', '')
loras_text = metadata_dict.get('loras', '')
lora_hashes = {}
# If loras are found, add them on a new line after the prompt
@@ -104,11 +105,11 @@ class SaveImage:
params = []
# Add standard parameters in the correct order
if 'steps' in parsed_workflow:
params.append(f"Steps: {parsed_workflow.get('steps')}")
if 'steps' in metadata_dict:
params.append(f"Steps: {metadata_dict.get('steps')}")
if 'sampler' in parsed_workflow:
sampler = parsed_workflow.get('sampler')
if 'sampler' in metadata_dict:
sampler = metadata_dict.get('sampler')
# Convert ComfyUI sampler names to user-friendly names
sampler_mapping = {
'euler': 'Euler',
@@ -130,8 +131,8 @@ class SaveImage:
sampler_name = sampler_mapping.get(sampler, sampler)
params.append(f"Sampler: {sampler_name}")
if 'scheduler' in parsed_workflow:
scheduler = parsed_workflow.get('scheduler')
if 'scheduler' in metadata_dict:
scheduler = metadata_dict.get('scheduler')
scheduler_mapping = {
'normal': 'Simple',
'karras': 'Karras',
@@ -142,24 +143,24 @@ class SaveImage:
scheduler_name = scheduler_mapping.get(scheduler, scheduler)
params.append(f"Schedule type: {scheduler_name}")
# CFG scale (cfg in parsed_workflow)
if 'cfg_scale' in parsed_workflow:
params.append(f"CFG scale: {parsed_workflow.get('cfg_scale')}")
elif 'cfg' in parsed_workflow:
params.append(f"CFG scale: {parsed_workflow.get('cfg')}")
# CFG scale (cfg_scale in metadata_dict)
if 'cfg_scale' in metadata_dict:
params.append(f"CFG scale: {metadata_dict.get('cfg_scale')}")
elif 'cfg' in metadata_dict:
params.append(f"CFG scale: {metadata_dict.get('cfg')}")
# Seed
if 'seed' in parsed_workflow:
params.append(f"Seed: {parsed_workflow.get('seed')}")
if 'seed' in metadata_dict:
params.append(f"Seed: {metadata_dict.get('seed')}")
# Size
if 'size' in parsed_workflow:
params.append(f"Size: {parsed_workflow.get('size')}")
if 'size' in metadata_dict:
params.append(f"Size: {metadata_dict.get('size')}")
# Model info
if 'checkpoint' in parsed_workflow:
if 'checkpoint' in metadata_dict:
# Extract basename without path
checkpoint = os.path.basename(parsed_workflow.get('checkpoint', ''))
checkpoint = os.path.basename(metadata_dict.get('checkpoint', ''))
# Remove extension if present
checkpoint = os.path.splitext(checkpoint)[0]
params.append(f"Model: {checkpoint}")
@@ -181,9 +182,9 @@ class SaveImage:
# credit to nkchocoai
# Add format_filename method to handle pattern substitution
def format_filename(self, filename, parsed_workflow):
def format_filename(self, filename, metadata_dict):
"""Format filename with metadata values"""
if not parsed_workflow:
if not metadata_dict:
return filename
result = re.findall(self.pattern_format, filename)
@@ -191,30 +192,30 @@ class SaveImage:
parts = segment.replace("%", "").split(":")
key = parts[0]
if key == "seed" and 'seed' in parsed_workflow:
filename = filename.replace(segment, str(parsed_workflow.get('seed', '')))
elif key == "width" and 'size' in parsed_workflow:
size = parsed_workflow.get('size', 'x')
if key == "seed" and 'seed' in metadata_dict:
filename = filename.replace(segment, str(metadata_dict.get('seed', '')))
elif key == "width" and 'size' in metadata_dict:
size = metadata_dict.get('size', 'x')
w = size.split('x')[0] if isinstance(size, str) else size[0]
filename = filename.replace(segment, str(w))
elif key == "height" and 'size' in parsed_workflow:
size = parsed_workflow.get('size', 'x')
elif key == "height" and 'size' in metadata_dict:
size = metadata_dict.get('size', 'x')
h = size.split('x')[1] if isinstance(size, str) else size[1]
filename = filename.replace(segment, str(h))
elif key == "pprompt" and 'prompt' in parsed_workflow:
prompt = parsed_workflow.get('prompt', '').replace("\n", " ")
elif key == "pprompt" and 'prompt' in metadata_dict:
prompt = metadata_dict.get('prompt', '').replace("\n", " ")
if len(parts) >= 2:
length = int(parts[1])
prompt = prompt[:length]
filename = filename.replace(segment, prompt.strip())
elif key == "nprompt" and 'negative_prompt' in parsed_workflow:
prompt = parsed_workflow.get('negative_prompt', '').replace("\n", " ")
elif key == "nprompt" and 'negative_prompt' in metadata_dict:
prompt = metadata_dict.get('negative_prompt', '').replace("\n", " ")
if len(parts) >= 2:
length = int(parts[1])
prompt = prompt[:length]
filename = filename.replace(segment, prompt.strip())
elif key == "model" and 'checkpoint' in parsed_workflow:
model = parsed_workflow.get('checkpoint', '')
elif key == "model" and 'checkpoint' in metadata_dict:
model = metadata_dict.get('checkpoint', '')
model = os.path.splitext(os.path.basename(model))[0]
if len(parts) >= 2:
length = int(parts[1])
@@ -251,18 +252,15 @@ class SaveImage:
"""Save images with metadata"""
results = []
# Parse the workflow using the WorkflowParser
parser = WorkflowParser()
if prompt:
parsed_workflow = parser.parse_workflow(prompt)
else:
parsed_workflow = {}
# Get metadata using the metadata collector
raw_metadata = get_metadata()
metadata_dict = MetadataProcessor.to_dict(raw_metadata)
# Get or create metadata asynchronously
metadata = asyncio.run(self.format_metadata(parsed_workflow, custom_prompt))
metadata = asyncio.run(self.format_metadata(metadata_dict, custom_prompt))
# Process filename_prefix with pattern substitution
filename_prefix = self.format_filename(filename_prefix, parsed_workflow)
filename_prefix = self.format_filename(filename_prefix, metadata_dict)
# Get initial save path info once for the batch
full_output_folder, filename, counter, subfolder, processed_prefix = folder_paths.get_save_image_path(