Files
efficiency-nodes-comfyui/efficiency_nodes.py
2023-04-22 15:50:01 -05:00

1325 lines
59 KiB
Python

# Efficiency Nodes - A collection of my ComfyUI custom nodes to help streamline workflows and reduce total node count.
# by Luciano Cirino (Discord: TSC#9184) - April 2023
from comfy.sd import ModelPatcher, CLIP, VAE
from nodes import common_ksampler
from torch import Tensor
from PIL import Image, ImageOps, ImageDraw, ImageFont
from PIL.PngImagePlugin import PngInfo
import numpy as np
import torch
from pathlib import Path
import os
import sys
import json
import folder_paths
# Get the absolute path of the parent directory of the current script
my_dir = os.path.dirname(os.path.abspath(__file__))
# Construct the absolute path to the ComfyUI directory
comfy_dir = os.path.abspath(os.path.join(my_dir, '..', '..'))
# Add the ComfyUI directory path to the sys.path list
sys.path.append(comfy_dir)
# Construct the path to the font file
font_path = os.path.join(my_dir, 'arial.ttf')
# Import functions from nodes.py in the ComfyUI directory
import comfy.samplers
import comfy.sd
import comfy.utils
MAX_RESOLUTION=8192
# Tensor to PIL (grabbed from WAS Suite)
def tensor2pil(image: torch.Tensor) -> Image.Image:
return Image.fromarray(np.clip(255. * image.cpu().numpy().squeeze(), 0, 255).astype(np.uint8))
# Convert PIL to Tensor (grabbed from WAS Suite)
def pil2tensor(image: Image.Image) -> torch.Tensor:
return torch.from_numpy(np.array(image).astype(np.float32) / 255.0).unsqueeze(0)
def resolve_input_links(prompt, input_value):
if isinstance(input_value, list) and len(input_value) == 2:
input_id, input_index = input_value
return prompt[input_id]['inputs'][list(prompt[input_id]['inputs'].keys())[input_index]]
return input_value
# Cache models in RAM
loaded_objects = {
"ckpt": [], # (ckpt_name, location)
"clip": [], # (ckpt_name, location)
"bvae": [], # (ckpt_name, location)
"vae": [], # (vae_name, location)
"lora": [] # (lora_name, location)
}
def print_loaded_objects_entries():
print("\n" + "-" * 40) # Print an empty line followed by a separator line
for key in ["ckpt", "clip", "bvae", "vae", "lora"]:
print(f"{key.capitalize()} entries:")
for entry in loaded_objects[key]:
truncated_name = entry[0][:20]
print(f" Name: {truncated_name}\n Location: {entry[1]}")
if len(entry) == 3:
print(f" Entry[2]: {entry[2]}")
print("-" * 40) # Print a separator line
print("\n") # Print an empty line
def update_loaded_objects(prompt):
global loaded_objects
# Extract all Efficient Loader class type entries
efficient_loader_entries = [entry for entry in prompt.values() if entry["class_type"] == "Efficient Loader"]
# Collect all desired model, vae, and lora names
desired_ckpt_names = set()
desired_vae_names = set()
desired_lora_names = set()
for entry in efficient_loader_entries:
desired_ckpt_names.add(entry["inputs"]["ckpt_name"])
desired_vae_names.add(entry["inputs"]["vae_name"])
desired_lora_names.add(entry["inputs"]["lora_name"])
# Check and clear unused ckpt, clip, and bvae entries
for list_key in ["ckpt", "clip", "bvae"]:
unused_indices = [i for i, entry in enumerate(loaded_objects[list_key]) if entry[0] not in desired_ckpt_names]
for index in sorted(unused_indices, reverse=True):
loaded_objects[list_key].pop(index)
# Check and clear unused vae entries
unused_vae_indices = [i for i, entry in enumerate(loaded_objects["vae"]) if entry[0] not in desired_vae_names]
for index in sorted(unused_vae_indices, reverse=True):
loaded_objects["vae"].pop(index)
# Check and clear unused lora entries
unused_lora_indices = [i for i, entry in enumerate(loaded_objects["lora"]) if entry[0] not in desired_lora_names]
for index in sorted(unused_lora_indices, reverse=True):
loaded_objects["lora"].pop(index)
def load_checkpoint(ckpt_name,output_vae=True, output_clip=True):
"""
Searches for tuple index that contains ckpt_name in "ckpt" array of loaded_objects.
If found, extracts the model, clip, and vae from the loaded_objects.
If not found, loads the checkpoint, extracts the model, clip, and vae, and adds them to the loaded_objects.
Returns the model, clip, and vae.
"""
global loaded_objects
# Search for tuple index that contains ckpt_name in "ckpt" array of loaded_objects
checkpoint_found = False
for i, entry in enumerate(loaded_objects["ckpt"]):
if entry[0] == ckpt_name:
# Extract the second element of the tuple at 'i' in the "ckpt", "clip", "bvae" arrays
model = loaded_objects["ckpt"][i][1]
clip = loaded_objects["clip"][i][1]
vae = loaded_objects["bvae"][i][1]
checkpoint_found = True
break
# If not found, load ckpt
if checkpoint_found == False:
# Load Checkpoint
ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name)
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True,
embedding_directory=folder_paths.get_folder_paths("embeddings"))
model = out[0]
clip = out[1]
vae = out[2]
# Update loaded_objects[] array
loaded_objects["ckpt"].append((ckpt_name, out[0]))
loaded_objects["clip"].append((ckpt_name, out[1]))
loaded_objects["bvae"].append((ckpt_name, out[2]))
return model, clip, vae
def load_vae(vae_name):
"""
Extracts the vae with a given name from the "vae" array in loaded_objects.
If the vae is not found, creates a new VAE object with the given name and adds it to the "vae" array.
"""
global loaded_objects
# Check if vae_name exists in "vae" array
if any(entry[0] == vae_name for entry in loaded_objects["vae"]):
# Extract the second tuple entry of the checkpoint
vae = [entry[1] for entry in loaded_objects["vae"] if entry[0] == vae_name][0]
else:
vae_path = folder_paths.get_full_path("vae", vae_name)
vae = comfy.sd.VAE(ckpt_path=vae_path)
# Update loaded_objects[] array
loaded_objects["vae"].append((vae_name, vae))
return vae
def load_lora(lora_name, model, clip, strength_model, strength_clip):
"""
Extracts the Lora model with a given name from the "lora" array in loaded_objects.
If the Lora model is not found or the strength values change, creates a new Lora object with the given name and adds it to the "lora" array.
"""
global loaded_objects
# Check if lora_name exists in "lora" array
existing_lora = [entry for entry in loaded_objects["lora"] if entry[0] == lora_name]
if existing_lora:
lora_name, model_lora, clip_lora, stored_strength_model, stored_strength_clip = existing_lora[0]
if strength_model == stored_strength_model and strength_clip == stored_strength_clip:
return model_lora, clip_lora
# If Lora model not found or strength values changed, generate new Lora models
lora_path = folder_paths.get_full_path("loras", lora_name)
model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora_path, strength_model, strength_clip)
# Remove existing Lora model if it exists
if existing_lora:
loaded_objects["lora"].remove(existing_lora[0])
# Update loaded_objects[] array
loaded_objects["lora"].append((lora_name, model_lora, clip_lora, strength_model, strength_clip))
return model_lora, clip_lora
# TSC Efficient Loader
class TSC_EfficientLoader:
@classmethod
def INPUT_TYPES(cls):
return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ),
"vae_name": (["Baked VAE"] + folder_paths.get_filename_list("vae"),),
"clip_skip": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
"lora_name": (["None"] + folder_paths.get_filename_list("loras"),),
"lora_model_strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
"lora_clip_strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
"positive": ("STRING", {"default": "Positive","multiline": True}),
"negative": ("STRING", {"default": "Negative", "multiline": True}),
"empty_latent_width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 64}),
"empty_latent_height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 64}),
"batch_size": ("INT", {"default": 1, "min": 1, "max": 64})},
"hidden": {"prompt": "PROMPT"}}
RETURN_TYPES = ("MODEL", "CONDITIONING", "CONDITIONING", "LATENT", "VAE", "CLIP" ,)
RETURN_NAMES = ("MODEL", "CONDITIONING+", "CONDITIONING-", "LATENT", "VAE", "CLIP", )
FUNCTION = "efficientloader"
CATEGORY = "Efficiency Nodes/Loaders"
def efficientloader(self, ckpt_name, vae_name, clip_skip, lora_name, lora_model_strength, lora_clip_strength,
positive, negative, empty_latent_width, empty_latent_height, batch_size, prompt=None):
model: ModelPatcher | None = None
clip: CLIP | None = None
vae: VAE | None = None
# Create Empty Latent
latent = torch.zeros([batch_size, 4, empty_latent_height // 8, empty_latent_width // 8]).cpu()
# Clean models from loaded_objects
update_loaded_objects(prompt)
# Load models
model, clip, vae = load_checkpoint(ckpt_name)
if lora_name != "None":
model, clip = load_lora(lora_name, model, clip, lora_model_strength, lora_clip_strength)
# Check for custom VAE
if vae_name != "Baked VAE":
vae = load_vae(vae_name)
#print_loaded_objects_entries()
# CLIP skip
if not clip:
raise Exception("No CLIP found")
clip = clip.clone()
clip.clip_layer(clip_skip)
return (model, [[clip.encode(positive), {}]], [[clip.encode(negative), {}]], {"samples":latent}, vae, clip, )
# KSampler Efficient ID finder
last_returned_ids = {}
def find_k_sampler_id(prompt, sampler_state=None, seed=None, steps=None, cfg=None,
sampler_name=None, scheduler=None, denoise=None, preview_image=None):
global last_returned_ids
input_params = [
('sampler_state', sampler_state),
('seed', seed),
('steps', steps),
('cfg', cfg),
('sampler_name', sampler_name),
('scheduler', scheduler),
('denoise', denoise),
('preview_image', preview_image),
]
matching_ids = []
for key, value in prompt.items():
if value.get('class_type') == 'KSampler (Efficient)':
inputs = value['inputs']
match = all(inputs[param_name] == param_value for param_name, param_value in input_params if param_value is not None)
if match:
matching_ids.append(key)
if matching_ids:
input_key = tuple(param_value for param_name, param_value in input_params)
if input_key in last_returned_ids:
last_id = last_returned_ids[input_key]
next_id = None
for id in matching_ids:
if id > last_id:
if next_id is None or id < next_id:
next_id = id
if next_id is None:
# All IDs have been used; start again from the first one
next_id = min(matching_ids)
else:
next_id = min(matching_ids)
last_returned_ids[input_key] = next_id
return next_id
else:
last_returned_ids.clear()
return None
# TSC KSampler (Efficient)
last_helds: dict[str, list] = {
"results": [],
"latent": [],
"images": [],
"vae_decode": []
}
class TSC_KSampler:
empty_image = pil2tensor(Image.new('RGBA', (1, 1), (0, 0, 0, 0)))
def __init__(self):
self.output_dir = os.path.join(comfy_dir, 'temp')
self.type = "temp"
@classmethod
def INPUT_TYPES(cls):
return {"required":
{"sampler_state": (["Sample", "Hold", "Script"], ),
"model": ("MODEL",),
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS,),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS,),
"positive": ("CONDITIONING",),
"negative": ("CONDITIONING",),
"latent_image": ("LATENT",),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"preview_image": (["Disabled", "Enabled"],),
},
"optional": { "optional_vae": ("VAE",),
"script": ("SCRIPT",),},
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
}
RETURN_TYPES = ("MODEL", "CONDITIONING", "CONDITIONING", "LATENT", "VAE", "IMAGE", )
RETURN_NAMES = ("MODEL", "CONDITIONING+", "CONDITIONING-", "LATENT", "VAE", "IMAGE", )
OUTPUT_NODE = True
FUNCTION = "sample"
CATEGORY = "Efficiency Nodes/Sampling"
def sample(self, sampler_state, model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
latent_image, preview_image, denoise=1.0, prompt=None, extra_pnginfo=None, optional_vae=(None,), script=None):
# Functions for previewing images in Ksampler
def map_filename(filename):
prefix_len = len(os.path.basename(filename_prefix))
prefix = filename[:prefix_len + 1]
try:
digits = int(filename[prefix_len + 1:].split('_')[0])
except:
digits = 0
return (digits, prefix)
def compute_vars(input):
input = input.replace("%width%", str(images[0].shape[1]))
input = input.replace("%height%", str(images[0].shape[0]))
return input
def preview_images(images, filename_prefix):
filename_prefix = compute_vars(filename_prefix)
subfolder = os.path.dirname(os.path.normpath(filename_prefix))
filename = os.path.basename(os.path.normpath(filename_prefix))
full_output_folder = os.path.join(self.output_dir, subfolder)
try:
counter = max(filter(lambda a: a[1][:-1] == filename and a[1][-1] == "_",
map(map_filename, os.listdir(full_output_folder))))[0] + 1
except ValueError:
counter = 1
except FileNotFoundError:
os.makedirs(full_output_folder, exist_ok=True)
counter = 1
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
results = list()
for image in images:
i = 255. * image.cpu().numpy()
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
metadata = PngInfo()
if prompt is not None:
metadata.add_text("prompt", json.dumps(prompt))
if extra_pnginfo is not None:
for x in extra_pnginfo:
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
file = f"{filename}_{counter:05}_.png"
img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=4)
results.append({
"filename": file,
"subfolder": subfolder,
"type": self.type
});
counter += 1
return results
def get_value_by_id(key: str, my_unique_id):
global last_helds
for value, id_ in last_helds[key]:
if id_ == my_unique_id:
return value
return None
def update_value_by_id(key: str, my_unique_id, new_value):
global last_helds
for i, (value, id_) in enumerate(last_helds[key]):
if id_ == my_unique_id:
last_helds[key][i] = (new_value, id_)
return True
last_helds[key].append((new_value, my_unique_id))
return True
my_unique_id = int(find_k_sampler_id(prompt, sampler_state, seed, steps, cfg, sampler_name,scheduler, denoise, preview_image))
# Vae input check
vae = optional_vae
if vae == (None,):
print('\033[32mKSampler(Efficient)[{}] Warning:\033[0m No vae input detected, preview and output image disabled.\n'.format(my_unique_id))
preview_image = "Disabled"
# Init last_results
if get_value_by_id("results", my_unique_id) is None:
last_results = list()
else:
last_results = get_value_by_id("results", my_unique_id)
# Init last_latent
if get_value_by_id("latent", my_unique_id) is None:
last_latent = latent_image
else:
last_latent = {"samples": None}
last_latent["samples"] = get_value_by_id("latent", my_unique_id)
# Init last_images
if get_value_by_id("images", my_unique_id) == None:
last_images = TSC_KSampler.empty_image
else:
last_images = get_value_by_id("images", my_unique_id)
# Initialize latent
latent: Tensor|None = None
# Define filename_prefix
filename_prefix = "KSeff_{:02d}".format(my_unique_id)
# Check the current sampler state
if sampler_state == "Sample":
# Sample using the common KSampler function and store the samples
samples = common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
latent_image, denoise=denoise)
# Extract the latent samples from the returned samples dictionary
latent = samples[0]["samples"]
# Store the latent samples in the 'last_helds' dictionary with a unique ID
update_value_by_id("latent", my_unique_id, latent)
# If not in preview mode, return the results in the specified format
if preview_image == "Disabled":
# Enable vae decode on next Hold
update_value_by_id("vae_decode", my_unique_id, True)
return {"ui": {"images": list()},
"result": (model, positive, negative, {"samples": latent}, vae, TSC_KSampler.empty_image,)}
else:
# Decode images and store
images = vae.decode(latent).cpu()
update_value_by_id("images", my_unique_id, images)
# Disable vae decode on next Hold
update_value_by_id("vae_decode", my_unique_id, False)
# Generate image results and store
results = preview_images(images, filename_prefix)
update_value_by_id("results", my_unique_id, results)
# Output image results to ui and node outputs
return {"ui": {"images": results},
"result": (model, positive, negative, {"samples": latent}, vae, images,)}
# If the sampler state is "Hold"
elif sampler_state == "Hold":
# Print a message indicating that the KSampler is in "Hold" state with the unique ID
print('\033[32mKSampler(Efficient)[{}]:\033[0mHeld'.format(my_unique_id))
# If not in preview mode, return the results in the specified format
if preview_image == "Disabled":
return {"ui": {"images": list()},
"result": (model, positive, negative, last_latent, vae, TSC_KSampler.empty_image,)}
# if preview_image == "Enabled":
else:
latent = last_latent["samples"]
if get_value_by_id("vae_decode", my_unique_id) == True:
# Decode images and store
images = vae.decode(latent).cpu()
update_value_by_id("images", my_unique_id, images)
# Disable vae decode on next Hold
update_value_by_id("vae_decode", my_unique_id, False)
# Generate image results and store
results = preview_images(images, filename_prefix)
update_value_by_id("results", my_unique_id, results)
else:
images = last_images
results = last_results
# Output image results to ui and node outputs
return {"ui": {"images": results},
"result": (model, positive, negative, {"samples": latent}, vae, images,)}
elif sampler_state == "Script":
# If no script input connected, set X_type and Y_type to "Nothing"
if script is None:
X_type = "Nothing"
Y_type = "Nothing"
else:
# Unpack script Tuple (X_type, X_value, Y_type, Y_value, grid_spacing, latent_id)
X_type, X_value, Y_type, Y_value, grid_spacing, latent_id = script
if (X_type == "Nothing" and Y_type == "Nothing"):
print('\033[31mKSampler(Efficient)[{}] Error:\033[0m No valid script entry detected'.format(my_unique_id))
return {"ui": {"images": list()},
"result": (model, positive, negative, last_latent, vae, last_images,)}
if vae == (None,):
print('\033[31mKSampler(Efficient)[{}] Error:\033[0m VAE must be connected to use Script mode.'.format(my_unique_id))
return {"ui": {"images": list()},
"result": (model, positive, negative, last_latent, vae, last_images,)}
# Extract the 'samples' tensor from the dictionary
latent_image_tensor = latent_image['samples']
# Split the tensor into individual image tensors
image_tensors = torch.split(latent_image_tensor, 1, dim=0)
# Create a list of dictionaries containing the individual image tensors
latent_list = [{'samples': image} for image in image_tensors]
# Set latent only to the first latent of batch
if latent_id >= len(latent_list):
print(
f'\033[31mKSampler(Efficient)[{my_unique_id}] Warning:\033[0m '
f'The selected latent_id ({latent_id}) is out of range.\n'
f'Automatically setting the latent_id to the last image in the list (index: {len(latent_list) - 1}).')
latent_id = len(latent_list) - 1
latent_image = latent_list[latent_id]
# Define X/Y_values for "Seeds++ Batch"
if X_type == "Seeds++ Batch":
X_value = [latent_image for _ in range(X_value[0])]
if Y_type == "Seeds++ Batch":
Y_value = [latent_image for _ in range(Y_value[0])]
# Define X/Y_values for "Latent Batch"
if X_type == "Latent Batch":
X_value = latent_list
if Y_type == "Latent Batch":
Y_value = latent_list
# Embedd information into "Scheduler" X/Y_values for text label
if X_type == "Scheduler" and Y_type != "Sampler":
# X_value second list value of each array entry = None
for i in range(len(X_value)):
if len(X_value[i]) == 2:
X_value[i][1] = None
else:
X_value[i] = [X_value[i], None]
if Y_type == "Scheduler" and X_type != "Sampler":
# Y_value second list value of each array entry = None
for i in range(len(Y_value)):
if len(Y_value[i]) == 2:
Y_value[i][1] = None
else:
Y_value[i] = [Y_value[i], None]
def define_variable(var_type, var, seed, steps, cfg,sampler_name, scheduler, latent_image, denoise,
vae_name, var_label, num_label):
# If var_type is "Seeds++ Batch", update var and seed, and generate labels
if var_type == "Latent Batch":
latent_image = var
text = f"{len(var_label)}"
# If var_type is "Seeds++ Batch", update var and seed, and generate labels
elif var_type == "Seeds++ Batch":
text = f"seed: {seed}"
# If var_type is "Steps", update steps and generate labels
elif var_type == "Steps":
steps = var
text = f"Steps: {steps}"
# If var_type is "CFG Scale", update cfg and generate labels
elif var_type == "CFG Scale":
cfg = var
text = f"CFG Scale: {cfg}"
# If var_type is "Sampler", update sampler_name, scheduler, and generate labels
elif var_type == "Sampler":
sampler_name = var[0]
if var[1] == "":
text = f"{sampler_name}"
else:
if var[1] != None:
scheduler[0] = var[1]
else:
scheduler[0] = scheduler[1]
text = f"{sampler_name} ({scheduler[0]})"
text = text.replace("ancestral", "a").replace("uniform", "u")
# If var_type is "Scheduler", update scheduler and generate labels
elif var_type == "Scheduler":
scheduler[0] = var[0]
if len(var) == 2:
text = f"{sampler_name} ({var[0]})"
else:
text = f"{var}"
text = text.replace("ancestral", "a").replace("uniform", "u")
# If var_type is "Denoise", update denoise and generate labels
elif var_type == "Denoise":
denoise = var
text = f"Denoise: {denoise}"
# For any other var_type, set text to "?"
elif var_type == "VAE":
vae_name = var
text = f"VAE: {vae_name}"
# For any other var_type, set text to ""
else:
text = ""
def truncate_texts(texts, num_label):
min_length = min([len(text) for text in texts])
truncate_length = min(min_length, 24)
if truncate_length < 16:
truncate_length = 16
truncated_texts = []
for text in texts:
if len(text) > truncate_length:
text = text[:truncate_length] + "..."
truncated_texts.append(text)
return truncated_texts
# Add the generated text to var_label if it's not full
if len(var_label) < num_label:
var_label.append(text)
# If var_type VAE , truncate entries in the var_label list when it's full
if len(var_label) == num_label and var_type == "VAE":
var_label = truncate_texts(var_label, num_label)
# Return the modified variables
return steps, cfg,sampler_name, scheduler, latent_image, denoise, vae_name, var_label
# Define a helper function to help process X and Y values
def process_values(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise,
vae,vae_name, latent_new=[], max_width=0, max_height=0, image_list=[], size_list=[]):
# Sample
samples = common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
latent_image, denoise=denoise)
# Decode images and store
latent = samples[0]["samples"]
# Add the latent tensor to the tensors list
latent_new.append(latent)
# Load custom vae if available
if vae_name is not None:
vae = load_vae(vae_name)
# Decode the image
image = vae.decode(latent).cpu()
# Convert the image from tensor to PIL Image and add it to the list
pil_image = tensor2pil(image)
image_list.append(pil_image)
size_list.append(pil_image.size)
# Update max dimensions
max_width = max(max_width, pil_image.width)
max_height = max(max_height, pil_image.height)
# Return the touched variables
return image_list, size_list, max_width, max_height, latent_new
# Initiate Plot label text variables X/Y_label
X_label = []
Y_label = []
# Seed_updated for "Seeds++ Batch" incremental seeds
seed_updated = seed
# Store the KSamplers original scheduler inside the same scheduler variable
scheduler = [scheduler, scheduler]
# By default set vae_name to None
vae_name = None
# Fill Plot Rows (X)
for X_index, X in enumerate(X_value):
# Seed control based on loop index during Batch
if X_type == "Seeds++ Batch":
# Update seed based on the inner loop index
seed_updated = seed + X_index
# Define X parameters and generate labels
steps, cfg, sampler_name, scheduler, latent_image, denoise, vae_name, X_label = \
define_variable(X_type, X, seed_updated, steps, cfg, sampler_name, scheduler, latent_image,
denoise, vae_name, X_label, len(X_value))
if Y_type != "Nothing":
# Seed control based on loop index during Batch
for Y_index, Y in enumerate(Y_value):
if Y_type == "Seeds++ Batch":
# Update seed based on the inner loop index
seed_updated = seed + Y_index
# Define Y parameters and generate labels
steps, cfg, sampler_name, scheduler, latent_image, denoise, vae_name, Y_label = \
define_variable(Y_type, Y, seed_updated, steps, cfg, sampler_name, scheduler, latent_image,
denoise, vae_name, Y_label, len(Y_value))
# Generate images
image_list, size_list, max_width, max_height, latent_new = \
process_values(model, seed_updated, steps, cfg, sampler_name, scheduler[0],
positive, negative, latent_image, denoise, vae, vae_name)
else:
# Generate images
image_list, size_list, max_width, max_height, latent_new = \
process_values(model, seed_updated, steps, cfg, sampler_name, scheduler[0],
positive, negative, latent_image, denoise, vae, vae_name)
def adjusted_font_size(text, initial_font_size, max_width):
font = ImageFont.truetype(str(Path(font_path)), initial_font_size)
text_width, _ = font.getsize(text)
if text_width > (max_width * 0.9):
scaling_factor = 0.9 # A value less than 1 to shrink the font size more aggressively
new_font_size = int(initial_font_size * (max_width / text_width) * scaling_factor)
else:
new_font_size = initial_font_size
return new_font_size
# Disable vae decode on next Hold
update_value_by_id("vae_decode", my_unique_id, False)
# Extract plot dimensions
num_rows = max(len(Y_value) if Y_value is not None else 0, 1)
num_cols = max(len(X_value) if X_value is not None else 0, 1)
def rearrange_tensors(latent, num_cols, num_rows):
new_latent = []
for i in range(num_rows):
for j in range(num_cols):
index = j * num_rows + i
new_latent.append(latent[index])
return new_latent
# Rearrange latent array to match preview image grid
latent_new = rearrange_tensors(latent_new, num_cols, num_rows)
# Concatenate the tensors along the first dimension (dim=0)
latent_new = torch.cat(latent_new, dim=0)
# Store latent_new as last latent
update_value_by_id("latent", my_unique_id, latent_new)
# Calculate the dimensions of the white background image
border_size = max_width // 15
# Modify the background width and x_offset initialization based on Y_type
if Y_type == "Nothing":
bg_width = num_cols * max_width + (num_cols - 1) * grid_spacing
x_offset_initial = 0
else:
bg_width = num_cols * max_width + (num_cols - 1) * grid_spacing + 3 * border_size
x_offset_initial = border_size * 3
# Modify the background height based on X_type
if X_type == "Nothing":
bg_height = num_rows * max_height + (num_rows - 1) * grid_spacing
y_offset = 0
else:
bg_height = num_rows * max_height + (num_rows - 1) * grid_spacing + 3 * border_size
y_offset = border_size * 3
# Create the white background image
background = Image.new('RGBA', (int(bg_width), int(bg_height)), color=(255, 255, 255, 255))
for row in range(num_rows):
# Initialize the X_offset
x_offset = x_offset_initial
for col in range(num_cols):
# Calculate the index for image_list
index = col * num_rows + row
img = image_list[index]
# Paste the image
background.paste(img, (x_offset, y_offset))
if row == 0 and X_type != "Nothing":
# Assign text
text = X_label[col]
# Add the corresponding X_value as a label above the image
initial_font_size = int(48 * img.width / 512)
font_size = adjusted_font_size(text, initial_font_size, img.width)
label_height = int(font_size*1.5)
# Create a white background label image
label_bg = Image.new('RGBA', (img.width, label_height), color=(255, 255, 255, 0))
d = ImageDraw.Draw(label_bg)
# Create the font object
font = ImageFont.truetype(str(Path(font_path)), font_size)
# Calculate the text size and the starting position
text_width, text_height = d.textsize(text, font=font)
text_x = (img.width - text_width) // 2
text_y = (label_height - text_height) // 2
# Add the text to the label image
d.text((text_x, text_y), text, fill='black', font=font)
# Calculate the available space between the top of the background and the top of the image
available_space = y_offset - label_height
# Calculate the new Y position for the label image
label_y = available_space // 2
# Paste the label image above the image on the background using alpha_composite()
background.alpha_composite(label_bg, (x_offset, label_y))
if col == 0 and Y_type != "Nothing":
# Assign text
text = Y_label[row]
# Add the corresponding Y_value as a label to the left of the image
initial_font_size = int(48 * img.height / 512)
font_size = adjusted_font_size(text, initial_font_size, img.height)
# Create a white background label image
label_bg = Image.new('RGBA', (img.height, font_size), color=(255, 255, 255, 0))
d = ImageDraw.Draw(label_bg)
# Create the font object
font = ImageFont.truetype(str(Path(font_path)), font_size)
# Calculate the text size and the starting position
text_width, text_height = d.textsize(text, font=font)
text_x = (img.height - text_width) // 2
text_y = (font_size - text_height) // 2
# Add the text to the label image
d.text((text_x, text_y), text, fill='black', font=font)
# Rotate the label_bg 90 degrees counter-clockwise
if Y_type != "Latent Batch":
label_bg = label_bg.rotate(90, expand=True)
# Calculate the available space between the left of the background and the left of the image
available_space = x_offset - label_bg.width
# Calculate the new X position for the label image
label_x = available_space // 2
# Calculate the Y position for the label image
label_y = y_offset + (img.height - label_bg.height) // 2
# Paste the label image to the left of the image on the background using alpha_composite()
background.alpha_composite(label_bg, (label_x, label_y))
# Update the x_offset
x_offset += img.width + grid_spacing
# Update the y_offset
y_offset += img.height + grid_spacing
images = pil2tensor(background)
update_value_by_id("images", my_unique_id, images)
# Generate image results and store
results = preview_images(images, filename_prefix)
update_value_by_id("results", my_unique_id, results)
# Clean loaded_objects
update_loaded_objects(prompt)
# Output image results to ui and node outputs
return {"ui": {"images": results}, "result": (model, positive, negative, {"samples": latent_new}, vae, images,)}
# TSC XY Plot
class TSC_XYplot:
examples = "(X/Y_types) (X/Y_values)\n" \
"Latent Batch n/a\n" \
"Seeds++ Batch 3\n" \
"Steps 15;20;25\n" \
"CFG Scale 5;10;15;20\n" \
"Sampler(1) dpmpp_2s_ancestral;euler;ddim\n" \
"Sampler(2) dpmpp_2m,karras;heun,normal\n" \
"Scheduler normal;simple;karras\n" \
"Denoise .3;.4;.5;.6;.7\n" \
"VAE vae_1; vae_2; vae_3"
samplers = ";\n".join(comfy.samplers.KSampler.SAMPLERS)
schedulers = ";\n".join(comfy.samplers.KSampler.SCHEDULERS)
vaes = ";\n".join(folder_paths.get_filename_list("vae"))
notes = "- During a 'Latent Batch', the corresponding X/Y_value is ignored.\n" \
"- During a 'Latent Batch', the latent_id is ignored.\n" \
"- For a 'Seeds++ Batch', starting seed is defined by the KSampler.\n" \
"- Trailing semicolons are ignored in the X/Y_values.\n" \
"- Parameter types not set by this node are defined in the KSampler."
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"X_type": (["Nothing", "Latent Batch", "Seeds++ Batch", "Steps", "CFG Scale",
"Sampler", "Scheduler", "Denoise", "VAE"],),
"X_value": ("STRING", {"default": "", "multiline": False}),
"Y_type": (["Nothing", "Latent Batch", "Seeds++ Batch", "Steps", "CFG Scale",
"Sampler", "Scheduler", "Denoise", "VAE"],),
"Y_value": ("STRING", {"default": "", "multiline": False}),
"grid_spacing": ("INT", {"default": 0, "min": 0, "max": 500, "step": 5}),
"XY_flip": (["False","True"],),
"latent_id": ("INT", {"default": 0, "min": 0, "max": 100}),
"help": ("STRING", {"default":
f"____________EXAMPLES____________\n{cls.examples}\n\n"
f"____________SAMPLERS____________\n{cls.samplers}\n\n"
f"___________SCHEDULERS___________\n{cls.schedulers}\n\n"
f"______________VAE_______________\n{cls.vaes}\n\n"
f"_____________NOTES______________\n{cls.notes}",
"multiline": True}),},
}
RETURN_TYPES = ("SCRIPT",)
RETURN_NAMES = ("script",)
FUNCTION = "XYplot"
CATEGORY = "Efficiency Nodes/Scripts"
def XYplot(self, X_type, X_value, Y_type, Y_value, grid_spacing, XY_flip, latent_id, help):
# Store values as arrays
X_value = X_value.replace(" ", "").replace("\n", "") # Remove spaces and newline characters
X_value = X_value.rstrip(";") # Remove trailing semicolon
X_value = X_value.split(";") # Turn to array
Y_value = Y_value.replace(" ", "").replace("\n", "") # Remove spaces and newline characters
Y_value = Y_value.rstrip(";") # Remove trailing semicolon
Y_value = Y_value.split(";") # Turn to array
# Define the valid bounds for each type
bounds = {
"Seeds++ Batch": {"min": 0, "max": 50},
"Steps": {"min": 0},
"CFG Scale": {"min": 0, "max": 100},
"Sampler": {"options": comfy.samplers.KSampler.SAMPLERS},
"Scheduler": {"options": comfy.samplers.KSampler.SCHEDULERS},
"Denoise": {"min": 0, "max": 1},
"VAE": {"options": folder_paths.get_filename_list("vae")}
}
def validate_value(value, value_type, bounds):
"""
Validates a value based on its corresponding value_type and bounds.
Parameters:
value (str or int or float): The value to validate.
value_type (str): The type of the value, which determines the valid bounds.
bounds (dict): A dictionary that contains the valid bounds for each value_type.
Returns:
The validated value.
None if no validation was done or failed.
"""
# Seeds++ Batch
if value_type == "Seeds++ Batch":
try:
x = float(value)
except ValueError:
print(
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid batch count.")
return None
if not x.is_integer():
print(
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid batch count.")
return None
else:
x = int(x)
if x < bounds["Seeds++ Batch"]["min"]:
x = bounds["Seeds++ Batch"]["min"]
elif x > bounds["Seeds++ Batch"]["max"]:
x = bounds["Seeds++ Batch"]["max"]
return x
# Steps
elif value_type == "Steps":
try:
x = int(value)
if x < bounds["Steps"]["min"]:
x = bounds["Steps"]["min"]
return x
except ValueError:
print(
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid Step count.")
return None
# CFG Scale
elif value_type == "CFG Scale":
try:
x = float(value)
if x < bounds["CFG Scale"]["min"]:
x = bounds["CFG Scale"]["min"]
elif x > bounds["CFG Scale"]["max"]:
x = bounds["CFG Scale"]["max"]
return x
except ValueError:
print(
f"\033[31mXY Plot Error:\033[0m '{value}' is not a number between {bounds['CFG Scale']['min']}"
f" and {bounds['CFG Scale']['max']} for CFG Scale.")
return None
# Sampler
elif value_type == "Sampler":
if isinstance(value, str) and ',' in value:
value = tuple(map(str.strip, value.split(',')))
if isinstance(value, tuple):
if len(value) == 2:
sampler, scheduler = value
scheduler = scheduler.lower() # Convert the scheduler name to lowercase
if sampler not in bounds["Sampler"]["options"]:
valid_samplers = '\n'.join(bounds["Sampler"]["options"])
print(
f"\033[31mXY Plot Error:\033[0m '{sampler}' is not a valid sampler. Valid samplers are:\n{valid_samplers}")
sampler = None
if scheduler not in bounds["Scheduler"]["options"]:
valid_schedulers = '\n'.join(bounds["Scheduler"]["options"])
print(
f"\033[31mXY Plot Error:\033[0m '{scheduler}' is not a valid scheduler. Valid schedulers are:\n{valid_schedulers}")
scheduler = None
if sampler is None or scheduler is None:
return None
else:
return sampler, scheduler
else:
print(
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid sampler.'")
return None
else:
if value not in bounds["Sampler"]["options"]:
valid_samplers = '\n'.join(bounds["Sampler"]["options"])
print(
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid sampler. Valid samplers are:\n{valid_samplers}")
return None
else:
return value, None
# Scheduler
elif value_type == "Scheduler":
if value not in bounds["Scheduler"]["options"]:
valid_schedulers = '\n'.join(bounds["Scheduler"]["options"])
print(
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid Scheduler. Valid Schedulers are:\n{valid_schedulers}")
return None
else:
return value
# Denoise
elif value_type == "Denoise":
try:
x = float(value)
if x < bounds["Denoise"]["min"]:
x = bounds["Denoise"]["min"]
elif x > bounds["Denoise"]["max"]:
x = bounds["Denoise"]["max"]
return x
except ValueError:
print(
f"\033[31mXY Plot Error:\033[0m '{value}' is not a number between {bounds['Denoise']['min']} "
f"and {bounds['Denoise']['max']} for Denoise.")
return None
# VAE
elif value_type == "VAE":
if value not in bounds["VAE"]["options"]:
valid_vaes = '\n'.join(bounds["VAE"]["options"])
print(
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid VAE. Valid VAEs are:\n{valid_vaes}")
return None
else:
return value
else:
return None
def reset_variables():
X_type = "Nothing"
X_value = [None]
Y_type = "Nothing"
Y_value = [None]
latent_id = None
grid_spacing = None
return X_type, X_value, Y_type, Y_value, grid_spacing, latent_id
if X_type == Y_type == "Nothing":
return (reset_variables(),)
# If types are the same, error and return
if (X_type == Y_type) and (X_type != "Nothing"):
print(f"\033[31mXY Plot Error:\033[0m X_type and Y_type must be different.")
# Reset variables to default values and return
return (reset_variables(),)
# Validate X_value array length is 1 if doing a "Seeds++ Batch"
if len(X_value) != 1 and X_type == "Seeds++ Batch":
print(f"\033[31mXY Plot Error:\033[0m '{';'.join(X_value)}' is not a valid batch count.")
return (reset_variables(),)
# Validate Y_value array length is 1 if doing a "Seeds++ Batch"
if len(Y_value) != 1 and Y_type == "Seeds++ Batch":
print(f"\033[31mXY Plot Error:\033[0m '{';'.join(Y_value)}' is not a valid batch count.")
return (reset_variables(),)
# Loop over each entry in X_value and check if it's valid
# Validate X_value based on X_type
if X_type != "Nothing" and X_type != "Latent Batch":
for i in range(len(X_value)):
X_value[i] = validate_value(X_value[i], X_type, bounds)
if X_value[i] == None:
# Reset variables to default values and return
return (reset_variables(),)
# Loop over each entry in Y_value and check if it's valid
# Validate Y_value based on Y_type
if Y_type != "Nothing" and Y_type != "Latent Batch":
for i in range(len(Y_value)):
Y_value[i] = validate_value(Y_value[i], Y_type, bounds)
if Y_value[i] == None:
# Reset variables to default values and return
return (reset_variables(),)
# Clean Schedulers from Sampler data (if other type is Scheduler)
if X_type == "Sampler" and Y_type == "Scheduler":
# Clear X_value Scheduler's
X_value = [[x[0], ""] for x in X_value]
elif Y_type == "Sampler" and X_type == "Scheduler":
# Clear Y_value Scheduler's
Y_value = [[y[0], ""] for y in Y_value]
# Clean X/Y_values
if X_type == "Nothing" or X_type == "Latent Batch":
X_value = [None]
if Y_type == "Nothing" or Y_type == "Latent Batch":
Y_value = [None]
# Flip X and Y
if XY_flip == "True":
X_type, Y_type = Y_type, X_type
X_value, Y_value = Y_value, X_value
# Print the validated values
if X_type != "Nothing" and X_type != "Latent Batch":
print("\033[90m" + f"XY Plot validated values for X_type '{X_type}': {', '.join(map(str, X_value))}\033[0m")
if Y_type != "Nothing" and Y_type != "Latent Batch":
print("\033[90m" + f"XY Plot validated values for Y_type '{Y_type}': {', '.join(map(str, Y_value))}\033[0m")
return ((X_type, X_value, Y_type, Y_value, grid_spacing, latent_id),)
# TSC Image Overlay
class TSC_ImageOverlay:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"base_image": ("IMAGE",),
"overlay_image": ("IMAGE",),
"overlay_resize": (["None", "Fit", "Resize by rescale_factor", "Resize to width & heigth"],),
"resize_method": (["nearest-exact", "bilinear", "area"],),
"rescale_factor": ("FLOAT", {"default": 1, "min": 0.01, "max": 16.0, "step": 0.1}),
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 64}),
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 64}),
"x_offset": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 10}),
"y_offset": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 10}),
"rotation": ("INT", {"default": 0, "min": -180, "max": 180, "step": 5}),
"opacity": ("FLOAT", {"default": 0, "min": 0, "max": 100, "step": 5}),
},
"optional": {"optional_mask": ("MASK",),}
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "overlayimage"
CATEGORY = "Efficiency Nodes/Image"
def overlayimage(self, base_image, overlay_image, overlay_resize, resize_method, rescale_factor, width, height, x_offset, y_offset, rotation, opacity, optional_mask=None):
result = self.apply_overlay(tensor2pil(base_image), overlay_image, overlay_resize, resize_method, rescale_factor, (int(width), int(height)),
(int(x_offset), int(y_offset)), int(rotation), opacity, optional_mask)
return (pil2tensor(result),)
def apply_overlay(self, base, overlay, size_option, resize_method, rescale_factor, size, location, rotation, opacity, mask):
# Check for different sizing options
if size_option != "None":
#Extract overlay size and store in Tuple "overlay_size" (WxH)
overlay_size = overlay.size()
overlay_size = (overlay_size[2], overlay_size[1])
if size_option == "Fit":
overlay_size = (base.size[0],base.size[1])
elif size_option == "Resize by rescale_factor":
overlay_size = tuple(int(dimension * rescale_factor) for dimension in overlay_size)
elif size_option == "Resize to width & heigth":
overlay_size = (size[0], size[1])
samples = overlay.movedim(-1, 1)
overlay = comfy.utils.common_upscale(samples, overlay_size[0], overlay_size[1], resize_method, False)
overlay = overlay.movedim(1, -1)
overlay = tensor2pil(overlay)
# Add Alpha channel to overlay
overlay = overlay.convert('RGBA')
overlay.putalpha(Image.new("L", overlay.size, 255))
# If mask connected, check if the overlay image has an alpha channel
if mask is not None:
# Convert mask to pil and resize
mask = tensor2pil(mask)
mask = mask.resize(overlay.size)
# Apply mask as overlay's alpha
overlay.putalpha(ImageOps.invert(mask))
# Rotate the overlay image
overlay = overlay.rotate(rotation, expand=True)
# Apply opacity on overlay image
r, g, b, a = overlay.split()
a = a.point(lambda x: max(0, int(x * (1 - opacity / 100))))
overlay.putalpha(a)
# Paste the overlay image onto the base image
if mask is None:
base.paste(overlay, location)
else:
base.paste(overlay, location, overlay)
# Return the edited base image
return base
# TSC Evaluate Integers
class TSC_EvaluateInts:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"python_expression": ("STRING", {"default": "((a + b) - c) / 2", "multiline": False}),
"print_to_console": (["False", "True"],),},
"optional": {
"a": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 1}),
"b": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 1}),
"c": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 1}),}
}
RETURN_TYPES = ("INT", "FLOAT",)
OUTPUT_NODE = True
FUNCTION = "evaluate"
CATEGORY = "Efficiency Nodes/Math"
def evaluate(self, python_expression, print_to_console, a=0, b=0, c=0):
int_result = int(eval(python_expression))
float_result = float(eval(python_expression))
if print_to_console=="True":
print("\n\033[31mEvaluate Integers Debug:\033[0m")
print(f"\033[90m{{a = {a} , b = {b} , c = {c}}} \033[0m")
print(f"{python_expression} = \033[92m INT: " + str(int_result) + " , FLOAT: " + str(float_result) + "\033[0m")
return (int_result, float_result,)
# TSC Evaluate Strings
class TSC_EvaluateStrs:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"python_expression": ("STRING", {"default": "a + b + c", "multiline": False}),
"print_to_console": (["False", "True"],)},
"optional": {
"a": ("STRING", {"default": "Hello", "multiline": False}),
"b": ("STRING", {"default": " World", "multiline": False}),
"c": ("STRING", {"default": "!", "multiline": False}),}
}
RETURN_TYPES = ("STRING",)
OUTPUT_NODE = True
FUNCTION = "evaluate"
CATEGORY = "Efficiency Nodes/Math"
def evaluate(self, python_expression, print_to_console, a="", b="", c=""):
result = str(eval(python_expression))
if print_to_console=="True":
print("\n\033[31mEvaluate Strings Debug:\033[0m")
print(f"\033[90ma = {a} \nb = {b} \nc = {c}\033[0m")
print(f"{python_expression} = \033[92m" + result + "\033[0m")
return (result,)
# NODE MAPPING
NODE_CLASS_MAPPINGS = {
"KSampler (Efficient)": TSC_KSampler,
"Efficient Loader": TSC_EfficientLoader,
"XY Plot": TSC_XYplot,
"Image Overlay": TSC_ImageOverlay,
"Evaluate Integers": TSC_EvaluateInts,
"Evaluate Strings": TSC_EvaluateStrs,
}