From 234c942f347626c59ec42fe9812ca22c74736fb6 Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Wed, 2 Apr 2025 17:01:10 +0800 Subject: [PATCH] Refactor transform functions and update node mappers - Moved and redefined transform functions for KSampler, EmptyLatentImage, CLIPTextEncode, and FluxGuidance to improve organization and maintainability. - Updated NODE_MAPPERS to include new input tracking for clip_skip in KSampler and added new transform functions for LatentUpscale and CLIPSetLastLayer. - Enhanced the transform_sampler_custom_advanced function to handle clip_skip extraction from model inputs. --- py/workflow/ext/comfyui_core.py | 128 +++++++++++++++++++++++++++++++- py/workflow/mappers.py | 95 ++---------------------- 2 files changed, 130 insertions(+), 93 deletions(-) diff --git a/py/workflow/ext/comfyui_core.py b/py/workflow/ext/comfyui_core.py index 125c29f0..5a116d59 100644 --- a/py/workflow/ext/comfyui_core.py +++ b/py/workflow/ext/comfyui_core.py @@ -94,6 +94,7 @@ def transform_sampler_custom_advanced(inputs: Dict) -> Dict: if "model" in guider and isinstance(guider["model"], dict): result["checkpoint"] = guider["model"].get("checkpoint", "") result["loras"] = guider["model"].get("loras", "") + result["clip_skip"] = str(int(guider["model"].get("clip_skip", "-1")) * -1) # Extract dimensions from latent_image if "latent_image" in inputs and isinstance(inputs["latent_image"], dict): @@ -107,6 +108,73 @@ def transform_sampler_custom_advanced(inputs: Dict) -> Dict: return result +def transform_ksampler(inputs: Dict) -> Dict: + """Transform function for KSampler nodes""" + result = { + "seed": str(inputs.get("seed", "")), + "steps": str(inputs.get("steps", "")), + "cfg": str(inputs.get("cfg", "")), + "sampler": inputs.get("sampler_name", ""), + "scheduler": inputs.get("scheduler", ""), + } + + # Process positive prompt + if "positive" in inputs: + result["prompt"] = inputs["positive"] + + # Process negative prompt + if "negative" in inputs: + result["negative_prompt"] = inputs["negative"] + + # Get dimensions from latent image + if "latent_image" in inputs and isinstance(inputs["latent_image"], dict): + width = inputs["latent_image"].get("width", 0) + height = inputs["latent_image"].get("height", 0) + if width and height: + result["size"] = f"{width}x{height}" + + # Add clip_skip if present + if "clip_skip" in inputs: + result["clip_skip"] = str(inputs.get("clip_skip", "")) + + # Add guidance if present + if "guidance" in inputs: + result["guidance"] = str(inputs.get("guidance", "")) + + # Add model if present + if "model" in inputs: + result["checkpoint"] = inputs.get("model", {}).get("checkpoint", "") + result["loras"] = inputs.get("model", {}).get("loras", "") + result["clip_skip"] = str(inputs.get("model", {}).get("clip_skip", -1) * -1) + + return result + +def transform_empty_latent(inputs: Dict) -> Dict: + """Transform function for EmptyLatentImage nodes""" + width = inputs.get("width", 0) + height = inputs.get("height", 0) + return {"width": width, "height": height, "size": f"{width}x{height}"} + +def transform_clip_text(inputs: Dict) -> Any: + """Transform function for CLIPTextEncode nodes""" + return inputs.get("text", "") + +def transform_flux_guidance(inputs: Dict) -> Dict: + """Transform function for FluxGuidance nodes""" + result = {} + + if "guidance" in inputs: + result["guidance"] = inputs["guidance"] + + if "conditioning" in inputs: + conditioning = inputs["conditioning"] + if isinstance(conditioning, str): + result["prompt"] = conditioning + else: + result["prompt"] = "Unknown prompt" + + return result + def transform_unet_loader(inputs: Dict) -> Dict: """Transform function for UNETLoader node""" unet_name = inputs.get("unet_name", "") @@ -117,6 +185,27 @@ def transform_checkpoint_loader(inputs: Dict) -> Dict: ckpt_name = inputs.get("ckpt_name", "") return {"checkpoint": ckpt_name} if ckpt_name else {} +def transform_latent_upscale_by(inputs: Dict) -> Dict: + """Transform function for LatentUpscaleBy node""" + result = {} + + width = inputs["samples"].get("width", 0) * inputs["scale_by"] + height = inputs["samples"].get("height", 0) * inputs["scale_by"] + result["width"] = width + result["height"] = height + result["size"] = f"{width}x{height}" + + return result + +def transform_clip_set_last_layer(inputs: Dict) -> Dict: + """Transform function for CLIPSetLastLayer node""" + result = {} + + if "stop_at_clip_layer" in inputs: + result["clip_skip"] = inputs["stop_at_clip_layer"] + + return result + # ============================================================================= # Node Mapper Definitions # ============================================================================= @@ -128,6 +217,31 @@ NODE_MAPPERS_EXT = { "inputs_to_track": ["noise", "guider", "sampler", "sigmas", "latent_image"], "transform_func": transform_sampler_custom_advanced }, + "KSampler": { + "inputs_to_track": [ + "seed", "steps", "cfg", "sampler_name", "scheduler", + "denoise", "positive", "negative", "latent_image", + "model", "clip_skip" + ], + "transform_func": transform_ksampler + }, + # ComfyUI core nodes + "EmptyLatentImage": { + "inputs_to_track": ["width", "height", "batch_size"], + "transform_func": transform_empty_latent + }, + "EmptySD3LatentImage": { + "inputs_to_track": ["width", "height", "batch_size"], + "transform_func": transform_empty_latent + }, + "CLIPTextEncode": { + "inputs_to_track": ["text", "clip"], + "transform_func": transform_clip_text + }, + "FluxGuidance": { + "inputs_to_track": ["guidance", "conditioning"], + "transform_func": transform_flux_guidance + }, "RandomNoise": { "inputs_to_track": ["noise_seed"], "transform_func": transform_random_noise @@ -156,8 +270,16 @@ NODE_MAPPERS_EXT = { "inputs_to_track": ["ckpt_name"], "transform_func": transform_checkpoint_loader }, - "CheckpointLoader": { - "inputs_to_track": ["ckpt_name"], - "transform_func": transform_checkpoint_loader + "LatentUpscale": { + "inputs_to_track": ["width", "height"], + "transform_func": transform_empty_latent + }, + "LatentUpscaleBy": { + "inputs_to_track": ["samples", "scale_by"], + "transform_func": transform_latent_upscale_by + }, + "CLIPSetLastLayer": { + "inputs_to_track": ["clip", "stop_at_clip_layer"], + "transform_func": transform_clip_set_last_layer } } \ No newline at end of file diff --git a/py/workflow/mappers.py b/py/workflow/mappers.py index 156afb27..22811ec6 100644 --- a/py/workflow/mappers.py +++ b/py/workflow/mappers.py @@ -97,55 +97,7 @@ def process_node(node_id: str, node_data: Dict, workflow: Dict, parser: 'Workflo # Transform Functions # ============================================================================= -def transform_ksampler(inputs: Dict) -> Dict: - """Transform function for KSampler nodes""" - result = { - "seed": str(inputs.get("seed", "")), - "steps": str(inputs.get("steps", "")), - "cfg": str(inputs.get("cfg", "")), - "sampler": inputs.get("sampler_name", ""), - "scheduler": inputs.get("scheduler", ""), - } - - # Process positive prompt - if "positive" in inputs: - result["prompt"] = inputs["positive"] - - # Process negative prompt - if "negative" in inputs: - result["negative_prompt"] = inputs["negative"] - - # Get dimensions from latent image - if "latent_image" in inputs and isinstance(inputs["latent_image"], dict): - width = inputs["latent_image"].get("width", 0) - height = inputs["latent_image"].get("height", 0) - if width and height: - result["size"] = f"{width}x{height}" - - # Add clip_skip if present - if "clip_skip" in inputs: - result["clip_skip"] = str(inputs.get("clip_skip", "")) - # Add guidance if present - if "guidance" in inputs: - result["guidance"] = str(inputs.get("guidance", "")) - - # Add model if present - if "model" in inputs: - result["checkpoint"] = inputs.get("model", {}).get("checkpoint", "") - result["loras"] = inputs.get("model", {}).get("loras", "") - - return result - -def transform_empty_latent(inputs: Dict) -> Dict: - """Transform function for EmptyLatentImage nodes""" - width = inputs.get("width", 0) - height = inputs.get("height", 0) - return {"width": width, "height": height, "size": f"{width}x{height}"} - -def transform_clip_text(inputs: Dict) -> Any: - """Transform function for CLIPTextEncode nodes""" - return inputs.get("text", "") def transform_lora_loader(inputs: Dict) -> Dict: """Transform function for LoraLoader nodes""" @@ -181,6 +133,9 @@ def transform_lora_loader(inputs: Dict) -> Dict: "checkpoint": inputs.get("model", {}).get("checkpoint", ""), "loras": " ".join(lora_texts) } + + if "clip" in inputs: + result["clip_skip"] = inputs["clip"].get("clip_skip", "-1") return result @@ -241,56 +196,16 @@ def transform_trigger_word_toggle(inputs: Dict) -> str: return ", ".join(active_words) -def transform_flux_guidance(inputs: Dict) -> Dict: - """Transform function for FluxGuidance nodes""" - result = {} - - if "guidance" in inputs: - result["guidance"] = inputs["guidance"] - - if "conditioning" in inputs: - conditioning = inputs["conditioning"] - if isinstance(conditioning, str): - result["prompt"] = conditioning - else: - result["prompt"] = "Unknown prompt" - - return result - # ============================================================================= # Node Mapper Definitions # ============================================================================= # Central definition of all supported node types and their configurations NODE_MAPPERS = { - # ComfyUI core nodes - "KSampler": { - "inputs_to_track": [ - "seed", "steps", "cfg", "sampler_name", "scheduler", - "denoise", "positive", "negative", "latent_image", - "model", "clip_skip" - ], - "transform_func": transform_ksampler - }, - "EmptyLatentImage": { - "inputs_to_track": ["width", "height", "batch_size"], - "transform_func": transform_empty_latent - }, - "EmptySD3LatentImage": { - "inputs_to_track": ["width", "height", "batch_size"], - "transform_func": transform_empty_latent - }, - "CLIPTextEncode": { - "inputs_to_track": ["text", "clip"], - "transform_func": transform_clip_text - }, - "FluxGuidance": { - "inputs_to_track": ["guidance", "conditioning"], - "transform_func": transform_flux_guidance - }, + # LoraManager nodes "Lora Loader (LoraManager)": { - "inputs_to_track": ["model", "loras", "lora_stack"], + "inputs_to_track": ["model", "clip", "loras", "lora_stack"], "transform_func": transform_lora_loader }, "Lora Stacker (LoraManager)": {