mirror of
https://github.com/jags111/efficiency-nodes-comfyui.git
synced 2026-03-21 21:22:13 -03:00
2667 lines
131 KiB
Python
2667 lines
131 KiB
Python
# Efficiency Nodes - A collection of my ComfyUI custom nodes to help streamline workflows and reduce total node count.
|
|
# by Luciano Cirino (Discord: TSC#9184) - April 2023
|
|
|
|
from comfy.sd import ModelPatcher, CLIP, VAE
|
|
from nodes import common_ksampler, CLIPSetLastLayer
|
|
|
|
from torch import Tensor
|
|
from PIL import Image, ImageOps, ImageDraw, ImageFont
|
|
from PIL.PngImagePlugin import PngInfo
|
|
import numpy as np
|
|
import torch
|
|
|
|
import ast
|
|
from pathlib import Path
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import json
|
|
import folder_paths
|
|
import psutil
|
|
|
|
# Get the absolute path of the parent directory of the current script
|
|
my_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
# Construct the absolute path to the ComfyUI directory
|
|
comfy_dir = os.path.abspath(os.path.join(my_dir, '..', '..'))
|
|
|
|
# Add the ComfyUI directory path to the sys.path list
|
|
sys.path.append(comfy_dir)
|
|
|
|
# Construct the path to the font file
|
|
font_path = os.path.join(my_dir, 'arial.ttf')
|
|
|
|
# Import functions from nodes.py in the ComfyUI directory
|
|
import comfy.samplers
|
|
import comfy.sd
|
|
import comfy.utils
|
|
|
|
MAX_RESOLUTION=8192
|
|
|
|
# Tensor to PIL (grabbed from WAS Suite)
|
|
def tensor2pil(image: torch.Tensor) -> Image.Image:
|
|
return Image.fromarray(np.clip(255. * image.cpu().numpy().squeeze(), 0, 255).astype(np.uint8))
|
|
|
|
# Convert PIL to Tensor (grabbed from WAS Suite)
|
|
def pil2tensor(image: Image.Image) -> torch.Tensor:
|
|
return torch.from_numpy(np.array(image).astype(np.float32) / 255.0).unsqueeze(0)
|
|
|
|
def extract_node_info(prompt, id, indirect_key=None):
|
|
# Convert ID to string
|
|
id = str(id)
|
|
node_id = None
|
|
|
|
# If an indirect_key (like 'script') is provided, perform a two-step lookup
|
|
if indirect_key:
|
|
# Ensure the id exists in the prompt and has an 'inputs' entry with the indirect_key
|
|
if id in prompt and 'inputs' in prompt[id] and indirect_key in prompt[id]['inputs']:
|
|
# Extract the indirect_id
|
|
indirect_id = prompt[id]['inputs'][indirect_key][0]
|
|
|
|
# Ensure the indirect_id exists in the prompt
|
|
if indirect_id in prompt:
|
|
node_id = indirect_id
|
|
return prompt[indirect_id].get('class_type', None), node_id
|
|
|
|
# If indirect_key is not found within the prompt
|
|
return None, None
|
|
|
|
# If no indirect_key is provided, perform a direct lookup
|
|
return prompt.get(id, {}).get('class_type', None), node_id
|
|
|
|
def extract_node_value(prompt, id, key):
|
|
# If ID is in data, return its 'inputs' value for a given key. Otherwise, return None.
|
|
return prompt.get(str(id), {}).get('inputs', {}).get(key, None)
|
|
|
|
# Cache models in RAM
|
|
loaded_objects = {
|
|
"ckpt": [], # (ckpt_name, ckpt_model, clip, bvae, [id])
|
|
"vae": [], # (vae_name, vae, [id])
|
|
"lora": [] # (lora_name, ckpt_name, lora_model, clip_lora, strength_model, strength_clip, [id])
|
|
}
|
|
|
|
def print_loaded_objects_entries(id=None, prompt=None, show_id=False):
|
|
print("-" * 40) # Print an empty line followed by a separator line
|
|
if id is not None:
|
|
id = str(id) # Convert ID to string
|
|
if prompt is not None and id is not None:
|
|
node_name, _ = extract_node_info(prompt, id)
|
|
if show_id:
|
|
print(f"\033[36m{node_name} Models Cache: (node_id:{int(id)})\033[0m")
|
|
else:
|
|
print(f"\033[36m{node_name} Models Cache:\033[0m")
|
|
elif id is None:
|
|
print(f"\033[36mGlobal Models Cache:\033[0m")
|
|
else:
|
|
print(f"\033[36mModels Cache: \nnode_id:{int(id)}\033[0m")
|
|
entries_found = False
|
|
for key in ["ckpt", "vae", "lora"]:
|
|
entries_with_id = loaded_objects[key] if id is None else [entry for entry in loaded_objects[key] if id in entry[-1]]
|
|
if not entries_with_id: # If no entries with the chosen ID, print None and skip this key
|
|
continue
|
|
entries_found = True
|
|
print(f"{key.capitalize()}:")
|
|
for i, entry in enumerate(entries_with_id, 1): # Start numbering from 1
|
|
truncated_name = entry[0][:50] # Truncate at 50 characters
|
|
if key == "lora":
|
|
lora_model_str_rounded = round(entry[4], 2) # Round lora_weight to 2 decimal places
|
|
lora_clip_str_rounded = round(entry[4], 2) # Round lora_weight to 2 decimal places
|
|
if id is None:
|
|
associated_ids = ', '.join(map(str, entry[-1])) # Gather all associated ids
|
|
print(f" [{i}] {truncated_name} (ids: {associated_ids}, {lora_model_str_rounded},"
|
|
f" {lora_clip_str_rounded}, base_ckpt: {entry[1]})")
|
|
else:
|
|
print(f" [{i}] {truncated_name} ({lora_model_str_rounded},"
|
|
f" {lora_clip_str_rounded}, base_ckpt: {entry[1]})")
|
|
else:
|
|
if id is None:
|
|
associated_ids = ', '.join(map(str, entry[-1])) # Gather all associated ids
|
|
print(f" [{i}] {truncated_name} (ids: {associated_ids})")
|
|
else:
|
|
print(f" [{i}] {truncated_name}")
|
|
if not entries_found:
|
|
print("-")
|
|
|
|
# This function cleans global variables associated with nodes that are no longer detected on UI
|
|
def globals_cleanup(prompt):
|
|
global loaded_objects
|
|
global last_helds
|
|
|
|
# Step 1: Clean up last_helds
|
|
for key in list(last_helds.keys()):
|
|
original_length = len(last_helds[key])
|
|
last_helds[key] = [(value, id) for value, id in last_helds[key] if str(id) in prompt.keys()]
|
|
###if original_length != len(last_helds[key]):
|
|
###print(f'Updated {key} in last_helds: {last_helds[key]}')
|
|
|
|
# Step 2: Clean up loaded_objects
|
|
for key in list(loaded_objects.keys()):
|
|
for i, tup in enumerate(list(loaded_objects[key])):
|
|
# Remove ids from id array in each tuple that don't exist in prompt
|
|
id_array = [id for id in tup[-1] if str(id) in prompt.keys()]
|
|
if len(id_array) != len(tup[-1]):
|
|
if id_array:
|
|
loaded_objects[key][i] = tup[:-1] + (id_array,)
|
|
###print(f'Updated tuple at index {i} in {key} in loaded_objects: {loaded_objects[key][i]}')
|
|
else:
|
|
# If id array becomes empty, delete the corresponding tuple
|
|
loaded_objects[key].remove(tup)
|
|
###print(f'Deleted tuple at index {i} in {key} in loaded_objects because its id array became empty.')
|
|
|
|
def load_checkpoint(ckpt_name, id, output_vae=True, cache=None, cache_overwrite=False):
|
|
"""
|
|
Searches for tuple index that contains ckpt_name in "ckpt" array of loaded_objects.
|
|
If found, extracts the model, clip, and vae from the loaded_objects.
|
|
If not found, loads the checkpoint, extracts the model, clip, and vae.
|
|
The id parameter represents the node ID and is used for caching models for the XY Plot node.
|
|
If the cache limit is reached for a specific id, clears the cache and returns the loaded model, clip, and vae without adding a new entry.
|
|
If there is cache space, adds the id to the ids list if it's not already there.
|
|
If there is cache space and the checkpoint was not found in loaded_objects, adds a new entry to loaded_objects.
|
|
|
|
Parameters:
|
|
- ckpt_name: name of the checkpoint to load.
|
|
- id: an identifier for caching models for specific nodes.
|
|
- output_vae: boolean, if True loads the VAE too.
|
|
- cache (optional): an integer that specifies how many checkpoint entries with a given id can exist in loaded_objects. Defaults to None.
|
|
"""
|
|
global loaded_objects
|
|
|
|
for entry in loaded_objects["ckpt"]:
|
|
if entry[0] == ckpt_name:
|
|
_, model, clip, vae, ids = entry
|
|
cache_full = cache and len([entry for entry in loaded_objects["ckpt"] if id in entry[-1]]) >= cache
|
|
|
|
if cache_full:
|
|
clear_cache(id, cache, "ckpt")
|
|
elif id not in ids:
|
|
ids.append(id)
|
|
|
|
return model, clip, vae
|
|
|
|
ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name)
|
|
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae, output_clip=True,
|
|
embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
model = out[0]
|
|
clip = out[1]
|
|
vae = out[2] # bvae
|
|
|
|
if cache:
|
|
if len([entry for entry in loaded_objects["ckpt"] if id in entry[-1]]) < cache:
|
|
loaded_objects["ckpt"].append((ckpt_name, model, clip, vae, [id]))
|
|
else:
|
|
clear_cache(id, cache, "ckpt")
|
|
if cache_overwrite:
|
|
# Find the first entry with the id, remove the id from the entry's id list
|
|
for e in loaded_objects["ckpt"]:
|
|
if id in e[-1]:
|
|
e[-1].remove(id)
|
|
# If the id list becomes empty, remove the entry from the "ckpt" list
|
|
if not e[-1]:
|
|
loaded_objects["ckpt"].remove(e)
|
|
break
|
|
loaded_objects["ckpt"].append((ckpt_name, model, clip, vae, [id]))
|
|
|
|
return model, clip, vae
|
|
|
|
def get_bvae_by_ckpt_name(ckpt_name):
|
|
for ckpt in loaded_objects["ckpt"]:
|
|
if ckpt[0] == ckpt_name:
|
|
return ckpt[3] # return 'bvae' variable
|
|
return None # return None if no match is found
|
|
|
|
def load_vae(vae_name, id, cache=None, cache_overwrite=False):
|
|
"""
|
|
Extracts the vae with a given name from the "vae" array in loaded_objects.
|
|
If the vae is not found, creates a new VAE object with the given name and adds it to the "vae" array.
|
|
Also stores the id parameter, which is used for caching models specifically for nodes with the given ID.
|
|
If the cache limit is reached for a specific id, returns the loaded vae without adding id or making a new entry in loaded_objects.
|
|
If there is cache space, and the id is not in the ids list, adds the id to the ids list.
|
|
If there is cache space, and the vae was not found in loaded_objects, adds a new entry to the loaded_objects.
|
|
|
|
Parameters:
|
|
- vae_name: name of the VAE to load.
|
|
- id (optional): an identifier for caching models for specific nodes. Defaults to None.
|
|
- cache (optional): an integer that specifies how many vae entries with a given id can exist in loaded_objects. Defaults to None.
|
|
"""
|
|
global loaded_objects
|
|
|
|
for i, entry in enumerate(loaded_objects["vae"]):
|
|
if entry[0] == vae_name:
|
|
vae, ids = entry[1], entry[2]
|
|
if id not in ids:
|
|
if cache and len([entry for entry in loaded_objects["vae"] if id in entry[-1]]) >= cache:
|
|
return vae
|
|
ids.append(id)
|
|
if cache:
|
|
clear_cache(id, cache, "vae")
|
|
return vae
|
|
|
|
vae_path = folder_paths.get_full_path("vae", vae_name)
|
|
vae = comfy.sd.VAE(ckpt_path=vae_path)
|
|
|
|
if cache:
|
|
if len([entry for entry in loaded_objects["vae"] if id in entry[-1]]) < cache:
|
|
loaded_objects["vae"].append((vae_name, vae, [id]))
|
|
else:
|
|
clear_cache(id, cache, "vae")
|
|
if cache_overwrite:
|
|
# Find the first entry with the id, remove the id from the entry's id list
|
|
for e in loaded_objects["vae"]:
|
|
if id in e[-1]:
|
|
e[-1].remove(id)
|
|
# If the id list becomes empty, remove the entry from the "vae" list
|
|
if not e[-1]:
|
|
loaded_objects["vae"].remove(e)
|
|
break
|
|
loaded_objects["vae"].append((vae_name, vae, [id]))
|
|
|
|
return vae
|
|
|
|
def load_lora(lora_name, ckpt_name, strength_model, strength_clip, id, cache=None, ckpt_cache=None, cache_overwrite=False):
|
|
"""
|
|
Extracts the Lora model with a given name from the "lora" array in loaded_objects.
|
|
If the Lora model is not found or strength values changed or model changed, creates a new Lora object with the given name and adds it to the "lora" array.
|
|
Also stores the id parameter, which is used for caching models specifically for nodes with the given ID.
|
|
If the cache limit is reached for a specific id, clears the cache and returns the loaded Lora model and clip without adding a new entry.
|
|
If there is cache space, adds the id to the ids list if it's not already there.
|
|
If there is cache space and the Lora model was not found in loaded_objects, adds a new entry to loaded_objects.
|
|
|
|
Parameters:
|
|
- lora_name: name of the Lora model to load.
|
|
- ckpt_name: name of the checkpoint from which the Lora model is created.
|
|
- strength_model: strength of the Lora model.
|
|
- strength_clip: strength of the clip in the Lora model.
|
|
- id: an identifier for caching models for specific nodes.
|
|
- cache (optional): an integer that specifies how many Lora entries with a given id can exist in loaded_objects. Defaults to None.
|
|
"""
|
|
global loaded_objects
|
|
|
|
for entry in loaded_objects["lora"]:
|
|
if entry[0] == lora_name and entry[1] == ckpt_name and entry[4] == strength_model and entry[5] == strength_clip:
|
|
_, _, lora_model, lora_clip, _, _, ids = entry
|
|
cache_full = cache and len([entry for entry in loaded_objects["lora"] if id in entry[-1]]) >= cache
|
|
|
|
if cache_full:
|
|
clear_cache(id, cache, "lora")
|
|
elif id not in ids:
|
|
ids.append(id)
|
|
|
|
return lora_model, lora_clip
|
|
|
|
ckpt, clip, _ = load_checkpoint(ckpt_name, id, cache=ckpt_cache, cache_overwrite=cache_overwrite)
|
|
lora_path = folder_paths.get_full_path("loras", lora_name)
|
|
lora_model, lora_clip = comfy.sd.load_lora_for_models(ckpt, clip, lora_path, strength_model, strength_clip)
|
|
|
|
if cache:
|
|
if len([entry for entry in loaded_objects["lora"] if id in entry[-1]]) < cache:
|
|
loaded_objects["lora"].append((lora_name, ckpt_name, lora_model, lora_clip, strength_model, strength_clip, [id]))
|
|
else:
|
|
clear_cache(id, cache, "lora")
|
|
if cache_overwrite:
|
|
# Find the first entry with the id, remove the id from the entry's id list
|
|
for e in loaded_objects["lora"]:
|
|
if id in e[-1]:
|
|
e[-1].remove(id)
|
|
# If the id list becomes empty, remove the entry from the "lora" list
|
|
if not e[-1]:
|
|
loaded_objects["lora"].remove(e)
|
|
break
|
|
loaded_objects["lora"].append((lora_name, ckpt_name, lora_model, lora_clip, strength_model, strength_clip, [id]))
|
|
|
|
return lora_model, lora_clip
|
|
|
|
def clear_cache(id, cache, dict_name):
|
|
"""
|
|
Clear the cache for a specific id in a specific dictionary (either "ckpt" or "vae").
|
|
If the cache limit is reached for a specific id, deletes the id from the oldest entry.
|
|
If the id array of the entry becomes empty, deletes the entry.
|
|
"""
|
|
# Get all entries associated with the id_element
|
|
id_associated_entries = [entry for entry in loaded_objects[dict_name] if id in entry[-1]]
|
|
while len(id_associated_entries) > cache:
|
|
# Identify an older entry (but not necessarily the oldest) containing id
|
|
older_entry = id_associated_entries[0]
|
|
# Remove the id_element from the older entry
|
|
older_entry[-1].remove(id)
|
|
# If the id array of the older entry becomes empty after this, delete the entry
|
|
if not older_entry[-1]:
|
|
loaded_objects[dict_name].remove(older_entry)
|
|
# Update the id_associated_entries
|
|
id_associated_entries = [entry for entry in loaded_objects[dict_name] if id in entry[-1]]
|
|
|
|
def clear_cache_by_exception(node_id, vae_dict=None, ckpt_dict=None, lora_dict=None):
|
|
"""
|
|
This function deletes a specific ID from tuples in one or more specified dictionaries in the global 'loaded_objects' variable.
|
|
The function requires the 'node_id' to delete and takes optional arguments for each dictionary ('vae_dict', 'ckpt_dict', 'lora_dict').
|
|
If an argument is None, the function does nothing for that dictionary.
|
|
If an argument is an empty list, the function deletes the 'node_id' from all tuples in that dictionary.
|
|
For 'lora_dict', exceptions to deletion can be passed as a list of tuples.
|
|
|
|
node_id : The ID to delete.
|
|
vae_dict : The 'vae' dictionary exceptions. If empty list, delete 'node_id' from all 'vae' tuples. If None, do nothing.
|
|
ckpt_dict : The 'ckpt' dictionary exceptions. If empty list, delete 'node_id' from all 'ckpt' tuples. If None, do nothing.
|
|
lora_dict : The 'lora' dictionary exceptions. Each exception is a tuple of ('lora_name', 'ckpt_name', 'strength_model', 'strength_clip').
|
|
If empty list, delete 'node_id' from all 'lora' tuples. If None, do nothing.
|
|
"""
|
|
global loaded_objects # reference the global variable 'loaded_objects'
|
|
|
|
# Create a dictionary to map argument names to 'loaded_objects' dictionary names
|
|
dict_mapping = {
|
|
"vae_dict": "vae",
|
|
"ckpt_dict": "ckpt",
|
|
"lora_dict": "lora"
|
|
}
|
|
|
|
# Loop over the input arguments
|
|
for arg_name, arg_val in {"vae_dict": vae_dict, "ckpt_dict": ckpt_dict, "lora_dict": lora_dict}.items():
|
|
# Skip if argument is None
|
|
if arg_val is None:
|
|
continue
|
|
|
|
dict_name = dict_mapping[arg_name] # get the corresponding dictionary name in 'loaded_objects'
|
|
|
|
# Iterate over a copy of the list to allow modification during iteration
|
|
for tuple_idx, tuple_item in enumerate(loaded_objects[dict_name].copy()):
|
|
# Handle 'lora_dict' exceptions differently, checking if the tuple matches one in exceptions
|
|
if arg_name == "lora_dict" and (tuple_item[0], tuple_item[1], tuple_item[4], tuple_item[5]) in arg_val:
|
|
continue
|
|
# For 'ckpt_dict' and 'vae_dict', check if the name is in exceptions
|
|
elif tuple_item[0] in arg_val:
|
|
continue
|
|
|
|
# Check if the 'node_id' is in the id array of the tuple
|
|
if node_id in tuple_item[-1]:
|
|
# Remove the 'node_id' from the id array
|
|
tuple_item[-1].remove(node_id)
|
|
|
|
# If the id array becomes empty, remove the entire tuple
|
|
if not tuple_item[-1]:
|
|
loaded_objects[dict_name].remove(tuple_item)
|
|
|
|
# Retrieve the cache number from 'node_settings' json file
|
|
def get_cache_numbers(node_name):
|
|
# Get the directory path of the current file
|
|
my_dir = os.path.dirname(os.path.abspath(__file__))
|
|
# Construct the file path for node_settings.json
|
|
settings_file = os.path.join(my_dir, 'node_settings.json')
|
|
# Load the settings from the JSON file
|
|
with open(settings_file, 'r') as file:
|
|
node_settings = json.load(file)
|
|
# Retrieve the cache numbers for the given node
|
|
model_cache_settings = node_settings.get(node_name, {}).get('model_cache', {})
|
|
vae_cache = int(model_cache_settings.get('vae', 1))
|
|
ckpt_cache = int(model_cache_settings.get('ckpt', 1))
|
|
lora_cache = int(model_cache_settings.get('lora', 1))
|
|
return vae_cache, ckpt_cache, lora_cache
|
|
|
|
########################################################################################################################
|
|
# TSC Efficient Loader
|
|
class TSC_EfficientLoader:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"),),
|
|
"vae_name": (["Baked VAE"] + folder_paths.get_filename_list("vae"),),
|
|
"clip_skip": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
"lora_name": (["None"] + folder_paths.get_filename_list("loras"),),
|
|
"lora_model_strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"lora_clip_strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"positive": ("STRING", {"default": "Positive","multiline": True}),
|
|
"negative": ("STRING", {"default": "Negative", "multiline": True}),
|
|
"empty_latent_width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 64}),
|
|
"empty_latent_height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 64}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 64})},
|
|
"hidden": { "prompt": "PROMPT",
|
|
"my_unique_id": "UNIQUE_ID",},
|
|
}
|
|
|
|
RETURN_TYPES = ("MODEL", "CONDITIONING", "CONDITIONING", "LATENT", "VAE", "DEPENDENCIES",)
|
|
RETURN_NAMES = ("MODEL", "CONDITIONING+", "CONDITIONING-", "LATENT", "VAE", "DEPENDENCIES", )
|
|
FUNCTION = "efficientloader"
|
|
CATEGORY = "Efficiency Nodes/Loaders"
|
|
|
|
def efficientloader(self, ckpt_name, vae_name, clip_skip, lora_name, lora_model_strength, lora_clip_strength,
|
|
positive, negative, empty_latent_width, empty_latent_height, batch_size,
|
|
prompt=None, my_unique_id=None):
|
|
|
|
model: ModelPatcher | None = None
|
|
clip: CLIP | None = None
|
|
vae: VAE | None = None
|
|
|
|
# Create Empty Latent
|
|
latent = torch.zeros([batch_size, 4, empty_latent_height // 8, empty_latent_width // 8]).cpu()
|
|
|
|
# Clean globally stored objects
|
|
globals_cleanup(prompt)
|
|
|
|
# Retrieve cache numbers
|
|
vae_cache, ckpt_cache, lora_cache = get_cache_numbers("Efficient Loader")
|
|
|
|
if lora_name != "None":
|
|
model, clip = load_lora(lora_name, ckpt_name, lora_model_strength, lora_clip_strength, my_unique_id,
|
|
cache=lora_cache, ckpt_cache=ckpt_cache, cache_overwrite=True)
|
|
if vae_name == "Baked VAE":
|
|
vae = get_bvae_by_ckpt_name(ckpt_name)
|
|
else:
|
|
model, clip, vae = load_checkpoint(ckpt_name, my_unique_id, cache=ckpt_cache, cache_overwrite=True)
|
|
lora_name = None
|
|
|
|
# Check for custom VAE
|
|
if vae_name != "Baked VAE":
|
|
vae = load_vae(vae_name, my_unique_id, cache=vae_cache, cache_overwrite=True)
|
|
|
|
# CLIP skip
|
|
if not clip:
|
|
raise Exception("No CLIP found")
|
|
clip = clip.clone()
|
|
clip.clip_layer(clip_skip)
|
|
|
|
# Data for XY Plot
|
|
dependencies = (vae_name, ckpt_name, clip, clip_skip, positive, negative, lora_name, lora_model_strength, lora_clip_strength)
|
|
|
|
return (model, [[clip.encode(positive), {}]], [[clip.encode(negative), {}]], {"samples":latent}, vae, dependencies, )
|
|
|
|
########################################################################################################################
|
|
# TSC KSampler (Efficient)
|
|
last_helds: dict[str, list] = {
|
|
"results": [], # (results, id) # Preview Images, stored as a pil image list
|
|
"latent": [], # (latent, id) # Latent outputs, stored as a latent tensor list
|
|
"images": [], # (images, id) # Image outputs, stored as an image tensor list
|
|
"vae_decode": [], # (vae_decode, id) # Used to track wether to vae-decode or not
|
|
}
|
|
|
|
def print_last_helds(id=None):
|
|
print("\n" + "-" * 40) # Print an empty line followed by a separator line
|
|
if id is not None:
|
|
id = str(id) # Convert ID to string
|
|
print(f"Node-specific Last Helds (node_id:{int(id)})")
|
|
else:
|
|
print(f"Global Last Helds:")
|
|
for key in ["results", "latent", "images", "vae_decode"]:
|
|
entries_with_id = last_helds[key] if id is None else [entry for entry in last_helds[key] if id == entry[-1]]
|
|
if not entries_with_id: # If no entries with the chosen ID, print None and skip this key
|
|
continue
|
|
print(f"{key.capitalize()}:")
|
|
for i, entry in enumerate(entries_with_id, 1): # Start numbering from 1
|
|
if isinstance(entry[0], bool): # Special handling for boolean types
|
|
output = entry[0]
|
|
else:
|
|
output = len(entry[0])
|
|
if id is None:
|
|
print(f" [{i}] Output: {output} (id: {entry[-1]})")
|
|
else:
|
|
print(f" [{i}] Output: {output}")
|
|
print("-" * 40) # Print a separator line
|
|
print("\n") # Print an empty line
|
|
|
|
class TSC_KSampler:
|
|
|
|
empty_image = pil2tensor(Image.new('RGBA', (1, 1), (0, 0, 0, 0)))
|
|
|
|
def __init__(self):
|
|
self.output_dir = os.path.join(comfy_dir, 'temp')
|
|
self.type = "temp"
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required":
|
|
{"sampler_state": (["Sample", "Hold", "Script"], ),
|
|
"model": ("MODEL",),
|
|
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"cfg": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0}),
|
|
"sampler_name": (comfy.samplers.KSampler.SAMPLERS,),
|
|
"scheduler": (comfy.samplers.KSampler.SCHEDULERS,),
|
|
"positive": ("CONDITIONING",),
|
|
"negative": ("CONDITIONING",),
|
|
"latent_image": ("LATENT",),
|
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
|
|
"preview_image": (["Disabled", "Enabled"],),
|
|
},
|
|
"optional": { "optional_vae": ("VAE",),
|
|
"script": ("SCRIPT",),},
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID",},
|
|
}
|
|
|
|
RETURN_TYPES = ("MODEL", "CONDITIONING", "CONDITIONING", "LATENT", "VAE", "IMAGE", )
|
|
RETURN_NAMES = ("MODEL", "CONDITIONING+", "CONDITIONING-", "LATENT", "VAE", "IMAGE", )
|
|
OUTPUT_NODE = True
|
|
FUNCTION = "sample"
|
|
CATEGORY = "Efficiency Nodes/Sampling"
|
|
|
|
def sample(self, sampler_state, model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
|
|
latent_image, preview_image, denoise=1.0, prompt=None, extra_pnginfo=None, my_unique_id=None,
|
|
optional_vae=(None,), script=None):
|
|
|
|
# Extract node_settings from json
|
|
def get_settings():
|
|
# Get the directory path of the current file
|
|
my_dir = os.path.dirname(os.path.abspath(__file__))
|
|
# Construct the file path for node_settings.json
|
|
settings_file = os.path.join(my_dir, 'node_settings.json')
|
|
# Load the settings from the JSON file
|
|
with open(settings_file, 'r') as file:
|
|
node_settings = json.load(file)
|
|
# Retrieve the settings
|
|
xyplot_as_output_image = node_settings.get("KSampler (Efficient)", {}).get('xyplot_as_output_image', False)
|
|
kse_vae_tiled = node_settings.get("KSampler (Efficient)", {}).get('vae_tiled', False)
|
|
xy_vae_tiled = node_settings.get("XY Plot", {}).get('vae_tiled', False)
|
|
return xyplot_as_output_image, kse_vae_tiled, xy_vae_tiled
|
|
|
|
xyplot_as_output_image, kse_vae_tiled, xy_vae_tiled = get_settings()
|
|
|
|
# Functions for previewing images in Ksampler
|
|
def map_filename(filename):
|
|
prefix_len = len(os.path.basename(filename_prefix))
|
|
prefix = filename[:prefix_len + 1]
|
|
try:
|
|
digits = int(filename[prefix_len + 1:].split('_')[0])
|
|
except:
|
|
digits = 0
|
|
return (digits, prefix)
|
|
|
|
def compute_vars(input):
|
|
input = input.replace("%width%", str(images[0].shape[1]))
|
|
input = input.replace("%height%", str(images[0].shape[0]))
|
|
return input
|
|
|
|
def preview_images(images, filename_prefix):
|
|
filename_prefix = compute_vars(filename_prefix)
|
|
|
|
subfolder = os.path.dirname(os.path.normpath(filename_prefix))
|
|
filename = os.path.basename(os.path.normpath(filename_prefix))
|
|
|
|
full_output_folder = os.path.join(self.output_dir, subfolder)
|
|
|
|
try:
|
|
counter = max(filter(lambda a: a[1][:-1] == filename and a[1][-1] == "_",
|
|
map(map_filename, os.listdir(full_output_folder))))[0] + 1
|
|
except ValueError:
|
|
counter = 1
|
|
except FileNotFoundError:
|
|
os.makedirs(full_output_folder, exist_ok=True)
|
|
counter = 1
|
|
|
|
if not os.path.exists(self.output_dir):
|
|
os.makedirs(self.output_dir)
|
|
|
|
results = list()
|
|
for image in images:
|
|
i = 255. * image.cpu().numpy()
|
|
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
|
|
metadata = PngInfo()
|
|
if prompt is not None:
|
|
metadata.add_text("prompt", json.dumps(prompt))
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
|
|
file = f"{filename}_{counter:05}_.png"
|
|
img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=4)
|
|
results.append({
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": self.type
|
|
});
|
|
counter += 1
|
|
return results
|
|
|
|
def get_value_by_id(key: str, my_unique_id):
|
|
global last_helds
|
|
for value, id_ in last_helds[key]:
|
|
if id_ == my_unique_id:
|
|
return value
|
|
return None
|
|
|
|
def update_value_by_id(key: str, my_unique_id, new_value):
|
|
global last_helds
|
|
|
|
for i, (value, id_) in enumerate(last_helds[key]):
|
|
if id_ == my_unique_id:
|
|
last_helds[key][i] = (new_value, id_)
|
|
return True
|
|
|
|
last_helds[key].append((new_value, my_unique_id))
|
|
return True
|
|
|
|
# Clean globally stored objects of non-existant nodes
|
|
globals_cleanup(prompt)
|
|
|
|
# Convert ID string to an integer
|
|
my_unique_id = int(my_unique_id)
|
|
|
|
# Vae input check
|
|
vae = optional_vae
|
|
if vae == (None,):
|
|
print('\033[32mKSampler(Efficient)[{}] Warning:\033[0m No vae input detected, preview and output image disabled.\n'.format(my_unique_id))
|
|
preview_image = "Disabled"
|
|
|
|
# Init last_results
|
|
if get_value_by_id("results", my_unique_id) is None:
|
|
last_results = list()
|
|
else:
|
|
last_results = get_value_by_id("results", my_unique_id)
|
|
|
|
# Init last_latent
|
|
if get_value_by_id("latent", my_unique_id) is None:
|
|
last_latent = latent_image
|
|
else:
|
|
last_latent = {"samples": None}
|
|
last_latent["samples"] = get_value_by_id("latent", my_unique_id)
|
|
|
|
# Init last_images
|
|
if get_value_by_id("images", my_unique_id) == None:
|
|
last_images = TSC_KSampler.empty_image
|
|
else:
|
|
last_images = get_value_by_id("images", my_unique_id)
|
|
|
|
# Initialize latent
|
|
latent: Tensor|None = None
|
|
|
|
# Define filename_prefix
|
|
filename_prefix = "KSeff_{:02d}".format(my_unique_id)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Check the current sampler state
|
|
if sampler_state == "Sample":
|
|
|
|
# Sample using the common KSampler function and store the samples
|
|
samples = common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
|
|
latent_image, denoise=denoise)
|
|
|
|
# Extract the latent samples from the returned samples dictionary
|
|
latent = samples[0]["samples"]
|
|
|
|
# Store the latent samples in the 'last_helds' dictionary with a unique ID
|
|
update_value_by_id("latent", my_unique_id, latent)
|
|
|
|
# If not in preview mode, return the results in the specified format
|
|
if preview_image == "Disabled":
|
|
# Enable vae decode on next Hold
|
|
update_value_by_id("vae_decode", my_unique_id, True)
|
|
return {"ui": {"images": list()},
|
|
"result": (model, positive, negative, {"samples": latent}, vae, TSC_KSampler.empty_image,)}
|
|
else:
|
|
# Decode images and store
|
|
if kse_vae_tiled == False:
|
|
images = vae.decode(latent).cpu()
|
|
else:
|
|
images = vae.decode_tiled(latent).cpu()
|
|
update_value_by_id("images", my_unique_id, images)
|
|
|
|
# Disable vae decode on next Hold
|
|
update_value_by_id("vae_decode", my_unique_id, False)
|
|
|
|
# Generate image results and store
|
|
results = preview_images(images, filename_prefix)
|
|
update_value_by_id("results", my_unique_id, results)
|
|
|
|
# Output image results to ui and node outputs
|
|
return {"ui": {"images": results},
|
|
"result": (model, positive, negative, {"samples": latent}, vae, images,)}
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# If the sampler state is "Hold"
|
|
elif sampler_state == "Hold":
|
|
|
|
# If not in preview mode, return the results in the specified format
|
|
if preview_image == "Disabled":
|
|
return {"ui": {"images": list()},
|
|
"result": (model, positive, negative, last_latent, vae, TSC_KSampler.empty_image,)}
|
|
|
|
else:
|
|
latent = last_latent["samples"]
|
|
|
|
if get_value_by_id("vae_decode", my_unique_id) == True:
|
|
|
|
# Decode images and store
|
|
if kse_vae_tiled == False:
|
|
images = vae.decode(latent).cpu()
|
|
else:
|
|
images = vae.decode_tiled(latent).cpu()
|
|
update_value_by_id("images", my_unique_id, images)
|
|
|
|
# Disable vae decode on next Hold
|
|
update_value_by_id("vae_decode", my_unique_id, False)
|
|
|
|
# Generate image results and store
|
|
results = preview_images(images, filename_prefix)
|
|
update_value_by_id("results", my_unique_id, results)
|
|
|
|
else:
|
|
images = last_images
|
|
results = last_results
|
|
|
|
# Output image results to ui and node outputs
|
|
return {"ui": {"images": results},
|
|
"result": (model, positive, negative, {"samples": latent}, vae, images,)}
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
elif sampler_state == "Script":
|
|
|
|
# Store name of connected node to script input
|
|
script_node_name, script_node_id = extract_node_info(prompt, my_unique_id, 'script')
|
|
|
|
# If no valid script input connected, error out
|
|
if script == None or script == (None,) or script_node_name!="XY Plot":
|
|
if script_node_name!="XY Plot":
|
|
print('\033[31mKSampler(Efficient) Error:\033[0m No valid script input detected')
|
|
return {"ui": {"images": list()},
|
|
"result": (model, positive, negative, last_latent, vae, last_images,)}
|
|
|
|
# If no vae connected, throw errors
|
|
if vae == (None,):
|
|
print('\033[31mKSampler(Efficient) Error:\033[0m VAE must be connected to use Script mode.')
|
|
return {"ui": {"images": list()},
|
|
"result": (model, positive, negative, last_latent, vae, last_images,)}
|
|
|
|
# Extract the 'samples' tensor and split it into individual image tensors
|
|
image_tensors = torch.split(latent_image['samples'], 1, dim=0)
|
|
|
|
# Get the shape of the first image tensor
|
|
shape = image_tensors[0].shape
|
|
|
|
# Extract the original height and width
|
|
latent_height, latent_width = shape[2] * 8, shape[3] * 8
|
|
|
|
# Set latent only to the first latent of batch
|
|
latent_image = {'samples': image_tensors[0]}
|
|
|
|
#___________________________________________________________________________________________________________
|
|
# Initialize, unpack, and clean variables for the XY Plot script
|
|
if script_node_name == "XY Plot":
|
|
|
|
# Initialize variables
|
|
vae_name = None
|
|
ckpt_name = None
|
|
clip = None
|
|
lora_name = None
|
|
lora_model_wt = None
|
|
lora_clip_wt = None
|
|
positive_prompt = None
|
|
negative_prompt = None
|
|
clip_skip = None
|
|
|
|
# Unpack script Tuple (X_type, X_value, Y_type, Y_value, grid_spacing, Y_label_orientation, dependencies)
|
|
X_type, X_value, Y_type, Y_value, grid_spacing, Y_label_orientation, cache_models, dependencies = script
|
|
|
|
# Unpack Effficient Loader dependencies
|
|
if dependencies is not None:
|
|
vae_name, ckpt_name, clip, clip_skip, positive_prompt, negative_prompt,\
|
|
lora_name, lora_model_wt, lora_clip_wt = dependencies
|
|
|
|
# Helper function to process printout values
|
|
def process_xy_for_print(value, replacement):
|
|
if isinstance(value, tuple):
|
|
return tuple(replacement if v is None else v for v in value)
|
|
else:
|
|
return replacement if value is None else value
|
|
|
|
# Determine the replacements based on X_type and Y_type
|
|
replacement_X = scheduler if X_type == 'Sampler' else clip_skip if X_type == 'Checkpoint' else None
|
|
replacement_Y = scheduler if Y_type == 'Sampler' else clip_skip if Y_type == 'Checkpoint' else None
|
|
|
|
# Process X_value and Y_value
|
|
X_value_processed = [process_xy_for_print(v, replacement_X) for v in X_value]
|
|
Y_value_processed = [process_xy_for_print(v, replacement_Y) for v in Y_value]
|
|
|
|
# Print XY Plot Inputs
|
|
print("-" * 40)
|
|
print("XY Plot Script Inputs:")
|
|
print(f"(X) {X_type}: {X_value_processed}")
|
|
print(f"(Y) {Y_type}: {Y_value_processed}")
|
|
print("-" * 40)
|
|
|
|
# If not caching models, set to 1.
|
|
if cache_models == "False":
|
|
vae_cache = ckpt_cache = lora_cache = 1
|
|
else:
|
|
# Retrieve cache numbers
|
|
vae_cache, ckpt_cache, lora_cache = get_cache_numbers("XY Plot")
|
|
# Pack cache numbers in a tuple
|
|
cache = (vae_cache, ckpt_cache, lora_cache)
|
|
|
|
# Embedd original prompts into prompt variables
|
|
positive_prompt = (positive_prompt, positive_prompt)
|
|
negative_prompt = (negative_prompt, negative_prompt)
|
|
|
|
# Define X/Y_values for "Seeds++ Batch"
|
|
if X_type == "Seeds++ Batch":
|
|
X_value = [i for i in range(X_value[0])]
|
|
if Y_type == "Seeds++ Batch":
|
|
Y_value = [i for i in range(Y_value[0])]
|
|
|
|
# Embedd information into "Scheduler" X/Y_values for text label
|
|
if X_type == "Scheduler" and Y_type != "Sampler":
|
|
# X_value second list value of each array entry = None
|
|
for i in range(len(X_value)):
|
|
if len(X_value[i]) == 2:
|
|
X_value[i][1] = None
|
|
else:
|
|
X_value[i] = [X_value[i], None]
|
|
if Y_type == "Scheduler" and X_type != "Sampler":
|
|
# Y_value second list value of each array entry = None
|
|
for i in range(len(Y_value)):
|
|
if len(Y_value[i]) == 2:
|
|
Y_value[i][1] = None
|
|
else:
|
|
Y_value[i] = [Y_value[i], None]
|
|
|
|
# Optimize image generation by prioritizing Checkpoint>LoRA>VAE>PromptSR as X in For Loop. Flip back when done.
|
|
if Y_type == "Checkpoint" or \
|
|
Y_type == "LoRA" and X_type not in {"Checkpoint"} or \
|
|
Y_type == "VAE" and X_type not in {"Checkpoint", "LoRA"} or \
|
|
Y_type == "Positive Prompt S/R" and X_type not in {"Checkpoint", "LoRA", "VAE", "Negative Prompt S/R"} or \
|
|
Y_type == "Negative Prompt S/R" and X_type not in {"Checkpoint", "LoRA", "VAE", "Positive Prompt S/R"} or \
|
|
X_type == "Nothing" and Y_type != "Nothing":
|
|
flip_xy = True
|
|
X_type, Y_type = Y_type, X_type
|
|
X_value, Y_value = Y_value, X_value
|
|
else:
|
|
flip_xy = False
|
|
|
|
#_______________________________________________________________________________________________________
|
|
#The below code will clean from the cache any ckpt/vae/lora models it will not be reusing.
|
|
|
|
# Map the type names to the dictionaries
|
|
dict_map = {"VAE": [], "Checkpoint": [], "LoRA": []}
|
|
|
|
# Create a list of tuples with types and values
|
|
type_value_pairs = [(X_type, X_value), (Y_type, Y_value)]
|
|
|
|
# Iterate over type-value pairs
|
|
for t, v in type_value_pairs:
|
|
if t in dict_map:
|
|
dict_map[t] = v
|
|
|
|
ckpt_dict = [t[0] for t in dict_map.get("Checkpoint", [])] if dict_map.get("Checkpoint", []) else []
|
|
|
|
lora_dict = [t for t in dict_map.get("LoRA", [])] if dict_map.get("LoRA", []) else []
|
|
|
|
# If both ckpt_dict and lora_dict are not empty, manipulate lora_dict as described
|
|
if ckpt_dict and lora_dict:
|
|
lora_dict = [(lora_name, ckpt, lora_model_wt, lora_clip_wt) for ckpt in ckpt_dict for
|
|
lora_name, lora_model_wt, lora_clip_wt in lora_dict]
|
|
# If lora_dict is not empty and ckpt_dict is empty, insert ckpt_name into each tuple in lora_dict
|
|
elif lora_dict:
|
|
lora_dict = [(lora_name, ckpt_name, lora_model_wt, lora_clip_wt) for
|
|
lora_name, lora_model_wt, lora_clip_wt in
|
|
lora_dict]
|
|
|
|
vae_dict = dict_map.get("VAE", [])
|
|
|
|
# prioritize Caching Checkpoints over LoRAs but not both.
|
|
if X_type == "LoRA":
|
|
ckpt_dict = []
|
|
if X_type == "Checkpoint":
|
|
lora_dict = []
|
|
|
|
# Print dict_arrays for debugging
|
|
###print(f"vae_dict={vae_dict}\nckpt_dict={ckpt_dict}\nlora_dict={lora_dict}")
|
|
|
|
# Clean values that won't be reused
|
|
clear_cache_by_exception(script_node_id, vae_dict=vae_dict, ckpt_dict=ckpt_dict, lora_dict=lora_dict)
|
|
|
|
#_______________________________________________________________________________________________________
|
|
# Function that changes appropiate variables for next processed generations (also generates XY_labels)
|
|
def define_variable(var_type, var, seed, steps, cfg, sampler_name, scheduler, denoise, vae_name, ckpt_name,
|
|
clip_skip, positive_prompt, negative_prompt, lora_name, lora_model_wt, lora_clip_wt,
|
|
var_label, num_label):
|
|
|
|
# If var_type is "Seeds++ Batch", update var and seed, and generate labels
|
|
if var_type == "Seeds++ Batch":
|
|
text = f"Seed: {seed}"
|
|
|
|
# If var_type is "Steps", update steps and generate labels
|
|
elif var_type == "Steps":
|
|
steps = var
|
|
text = f"steps: {steps}"
|
|
|
|
# If var_type is "CFG Scale", update cfg and generate labels
|
|
elif var_type == "CFG Scale":
|
|
cfg = var
|
|
text = f"CFG: {round(cfg,2)}"
|
|
|
|
# If var_type is "Sampler", update sampler_name, scheduler, and generate labels
|
|
elif var_type == "Sampler":
|
|
sampler_name = var[0]
|
|
if var[1] == "":
|
|
text = f"{sampler_name}"
|
|
else:
|
|
if var[1] != None:
|
|
scheduler = (var[1], scheduler[1])
|
|
else:
|
|
scheduler = (scheduler[1], scheduler[1])
|
|
text = f"{sampler_name} ({scheduler[0]})"
|
|
text = text.replace("ancestral", "a").replace("uniform", "u")
|
|
|
|
# If var_type is "Scheduler", update scheduler and generate labels
|
|
elif var_type == "Scheduler":
|
|
scheduler = (var[0], scheduler[1])
|
|
if len(var) == 2:
|
|
text = f"{sampler_name} ({scheduler[0]})"
|
|
else:
|
|
text = f"{var}"
|
|
text = text.replace("ancestral", "a").replace("uniform", "u")
|
|
|
|
# If var_type is "Denoise", update denoise and generate labels
|
|
elif var_type == "Denoise":
|
|
denoise = var
|
|
text = f"denoise: {round(denoise, 2)}"
|
|
|
|
# If var_type is "VAE", update vae_name and generate labels
|
|
elif var_type == "VAE":
|
|
vae_name = var
|
|
vae_filename = os.path.basename(vae_name)
|
|
text = f"VAE: {vae_filename}"
|
|
|
|
# If var_type is "Positive Prompt S/R", update positive_prompt and generate labels
|
|
elif var_type == "Positive Prompt S/R":
|
|
search_txt, replace_txt = var
|
|
if replace_txt != None:
|
|
positive_prompt = (positive_prompt[1].replace(search_txt, replace_txt, 1), positive_prompt[1])
|
|
else:
|
|
positive_prompt = (positive_prompt[1], positive_prompt[1])
|
|
replace_txt = search_txt
|
|
text = f"{replace_txt}"
|
|
|
|
# If var_type is "Negative Prompt S/R", update negative_prompt and generate labels
|
|
elif var_type == "Negative Prompt S/R":
|
|
search_txt, replace_txt = var
|
|
if replace_txt:
|
|
negative_prompt = (negative_prompt[1].replace(search_txt, replace_txt, 1), negative_prompt[1])
|
|
else:
|
|
negative_prompt = (negative_prompt[1], negative_prompt[1])
|
|
replace_txt = search_txt
|
|
text = f"(-) {replace_txt}"
|
|
|
|
# If var_type is "Checkpoint", update model and clip (if needed) and generate labels
|
|
elif var_type == "Checkpoint":
|
|
ckpt_name = var[0]
|
|
if var[1] == None:
|
|
clip_skip = (clip_skip[1],clip_skip[1])
|
|
else:
|
|
clip_skip = (var[1],clip_skip[1])
|
|
ckpt_filename = os.path.basename(ckpt_name)
|
|
text = f"{ckpt_filename}"
|
|
#text = f"{ckpt_filename[:16]}... ({clip_skip[0]})" if len(
|
|
#ckpt_filename) > 16 else f"{ckpt_filename} ({clip_skip[0]})"
|
|
|
|
elif var_type == "Clip Skip":
|
|
clip_skip = (var, clip_skip[1])
|
|
text = f"Clip Skip ({clip_skip[0]})"
|
|
|
|
# If var_type is "LoRA", update lora_model and lora_clip (if needed) and generate labels
|
|
elif var_type == "LoRA":
|
|
lora_name = var[0]
|
|
lora_model_wt = var[1]
|
|
lora_clip_wt = var[2]
|
|
lora_filename = os.path.basename(lora_name)
|
|
if lora_model_wt == lora_clip_wt:
|
|
text = f"<{round(lora_model_wt, 2)}> {lora_filename}"
|
|
else:
|
|
text = f"<{round(lora_model_wt, 2)},{round(lora_clip_wt, 2)}> {lora_filename}"
|
|
|
|
# For any other var_type, set text to ""
|
|
else:
|
|
text = ""
|
|
|
|
def truncate_texts(texts, num_label):
|
|
min_length = min([len(text) for text in texts])
|
|
truncate_length = min(min_length, 24)
|
|
|
|
if truncate_length < 16:
|
|
truncate_length = 16
|
|
|
|
truncated_texts = []
|
|
for text in texts:
|
|
if len(text) > truncate_length:
|
|
text = text[:truncate_length] + "..."
|
|
truncated_texts.append(text)
|
|
|
|
return truncated_texts
|
|
|
|
# Add the generated text to var_label if it's not full
|
|
if len(var_label) < num_label:
|
|
var_label.append(text)
|
|
|
|
# If var_type VAE , truncate entries in the var_label list when it's full
|
|
if len(var_label) == num_label and (var_type == "VAE" or var_type == "Checkpoint" or var_type == "LoRA"):
|
|
var_label = truncate_texts(var_label, num_label)
|
|
|
|
# Return the modified variables
|
|
return steps, cfg, sampler_name, scheduler, denoise, vae_name, ckpt_name, clip_skip, \
|
|
positive_prompt, negative_prompt, lora_name, lora_model_wt, lora_clip_wt, var_label
|
|
|
|
# _______________________________________________________________________________________________________
|
|
# The function below is used to smartly load Checkpoint/LoRA/VAE models between generations.
|
|
def define_model(model, clip, positive, negative, positive_prompt, negative_prompt, clip_skip, vae,
|
|
vae_name, ckpt_name, lora_name, lora_model_wt, lora_clip_wt, index, types, script_node_id, cache):
|
|
|
|
# Encode prompt and apply clip_skip. Return new conditioning.
|
|
def encode_prompt(positive_prompt, negative_prompt, clip, clip_skip):
|
|
clip = CLIPSetLastLayer().set_last_layer(clip, clip_skip)[0]
|
|
return [[clip.encode(positive_prompt), {}]], [[clip.encode(negative_prompt), {}]]
|
|
|
|
# Variable to track wether to encode prompt or not
|
|
encode = False
|
|
|
|
# Unpack types tuple
|
|
X_type, Y_type = types
|
|
|
|
# Note: Index is held at 0 when Y_type == "Nothing"
|
|
|
|
# Load VAE if required
|
|
if (X_type == "VAE" and index == 0) or Y_type == "VAE":
|
|
vae = load_vae(vae_name, script_node_id, cache=cache[0])
|
|
|
|
# Load Checkpoint if required. If Y_type is LoRA, required models will be loaded by load_lora func.
|
|
if (X_type == "Checkpoint" and index == 0 and Y_type != "LoRA"):
|
|
if lora_name is None:
|
|
model, clip, _ = load_checkpoint(ckpt_name, script_node_id, False, cache=cache[1])
|
|
else: # Load Efficient Loader LoRA
|
|
model, clip = load_lora(lora_name, ckpt_name, lora_model_wt, lora_clip_wt, script_node_id,
|
|
cache=None, ckpt_cache=cache[1])
|
|
encode = True
|
|
|
|
# Load LoRA if required
|
|
elif (X_type == "LoRA" and index == 0):
|
|
# Don't cache Checkpoints
|
|
model, clip = load_lora(lora_name, ckpt_name, lora_model_wt, lora_clip_wt, script_node_id, cache=cache[2])
|
|
encode = True
|
|
|
|
elif Y_type == "LoRA": # X_type must be Checkpoint, so cache those as defined
|
|
model, clip = load_lora(lora_name, ckpt_name, lora_model_wt, lora_clip_wt, script_node_id,
|
|
cache=None, ckpt_cache=cache[1])
|
|
encode = True
|
|
|
|
# Encode Prompt if required
|
|
prompt_types = ["Positive Prompt S/R", "Negative Prompt S/R", "Clip Skip"]
|
|
if (X_type in prompt_types and index == 0) or Y_type in prompt_types:
|
|
encode = True
|
|
|
|
# Encode prompt if needed
|
|
if encode == True:
|
|
positive, negative = encode_prompt(positive_prompt[0], negative_prompt[0], clip, clip_skip)
|
|
|
|
return model, positive, negative, vae
|
|
|
|
# ______________________________________________________________________________________________________
|
|
# The below function is used to generate the results based on all the processed variables
|
|
def process_values(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
|
|
denoise, vae, latent_list=[], image_tensor_list=[], image_pil_list=[]):
|
|
|
|
# Sample
|
|
samples = common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative,
|
|
latent_image, denoise=denoise)
|
|
|
|
# Decode images and store
|
|
latent = samples[0]["samples"]
|
|
|
|
# Add the latent tensor to the tensors list
|
|
latent_list.append(latent)
|
|
|
|
# Decode the latent tensor
|
|
if xy_vae_tiled == False:
|
|
image = vae.decode(latent).cpu()
|
|
else:
|
|
image = vae.decode_tiled(latent).cpu()
|
|
|
|
# Add the resulting image tensor to image_tensor_list
|
|
image_tensor_list.append(image)
|
|
|
|
# Convert the image from tensor to PIL Image and add it to the image_pil_list
|
|
image_pil_list.append(tensor2pil(image))
|
|
|
|
# Return the touched variables
|
|
return latent_list, image_tensor_list, image_pil_list
|
|
|
|
# ______________________________________________________________________________________________________
|
|
# The below section is the heart of the XY Plot image generation
|
|
|
|
# Initiate Plot label text variables X/Y_label
|
|
X_label = []
|
|
Y_label = []
|
|
|
|
# Seed_updated for "Seeds++ Batch" incremental seeds
|
|
seed_updated = seed
|
|
|
|
# Store the KSamplers original scheduler inside the same scheduler variable
|
|
scheduler = (scheduler, scheduler)
|
|
|
|
# Store the Eff Loaders original clip_skip inside the same clip_skip variable
|
|
clip_skip = (clip_skip, clip_skip)
|
|
|
|
# Store types in a Tuple for easy function passing
|
|
types = (X_type, Y_type)
|
|
|
|
# Fill Plot Rows (X)
|
|
for X_index, X in enumerate(X_value):
|
|
|
|
# Seed control based on loop index during Batch
|
|
if X_type == "Seeds++ Batch":
|
|
# Update seed based on the inner loop index
|
|
seed_updated = seed + X_index
|
|
|
|
# Define X parameters and generate labels
|
|
steps, cfg, sampler_name, scheduler, denoise, vae_name, ckpt_name, clip_skip, positive_prompt, negative_prompt, \
|
|
lora_name, lora_model_wt, lora_clip_wt, X_label = \
|
|
define_variable(X_type, X, seed_updated, steps, cfg, sampler_name, scheduler, denoise, vae_name, ckpt_name,
|
|
clip_skip, positive_prompt, negative_prompt, lora_name, lora_model_wt, lora_clip_wt,
|
|
X_label, len(X_value))
|
|
|
|
if X_type != "Nothing" and Y_type == "Nothing":
|
|
|
|
# Models & Conditionings
|
|
model, positive, negative , vae = \
|
|
define_model(model, clip, positive, negative, positive_prompt, negative_prompt, clip_skip[0], vae,
|
|
vae_name, ckpt_name, lora_name, lora_model_wt, lora_clip_wt, 0, types, script_node_id, cache)
|
|
|
|
# Generate Results
|
|
latent_list, image_tensor_list, image_pil_list = \
|
|
process_values(model, seed_updated, steps, cfg, sampler_name, scheduler[0],
|
|
positive, negative, latent_image, denoise, vae)
|
|
|
|
elif X_type != "Nothing" and Y_type != "Nothing":
|
|
# Seed control based on loop index during Batch
|
|
for Y_index, Y in enumerate(Y_value):
|
|
|
|
if Y_type == "Seeds++ Batch":
|
|
# Update seed based on the inner loop index
|
|
seed_updated = seed + Y_index
|
|
|
|
# Define Y parameters and generate labels
|
|
steps, cfg, sampler_name, scheduler, denoise, vae_name, ckpt_name, clip_skip, positive_prompt, negative_prompt, \
|
|
lora_name, lora_model_wt, lora_clip_wt, Y_label = \
|
|
define_variable(Y_type, Y, seed_updated, steps, cfg, sampler_name, scheduler, denoise, vae_name, ckpt_name,
|
|
clip_skip, positive_prompt, negative_prompt, lora_name, lora_model_wt, lora_clip_wt,
|
|
Y_label, len(Y_value))
|
|
|
|
# Models & Conditionings
|
|
model, positive, negative, vae = \
|
|
define_model(model, clip, positive, negative, positive_prompt, negative_prompt, clip_skip[0], vae,
|
|
vae_name, ckpt_name, lora_name, lora_model_wt, lora_clip_wt, Y_index, types, script_node_id, cache)
|
|
|
|
# Generate Results
|
|
latent_list, image_tensor_list, image_pil_list = \
|
|
process_values(model, seed_updated, steps, cfg, sampler_name, scheduler[0],
|
|
positive, negative, latent_image, denoise, vae)
|
|
|
|
# Clean up cache
|
|
if cache_models == "False":
|
|
clear_cache_by_exception(script_node_id, vae_dict=[], ckpt_dict=[], lora_dict=[])
|
|
#
|
|
else:
|
|
# Prioritrize Caching Checkpoints over LoRAs.
|
|
if X_type == "LoRA":
|
|
clear_cache_by_exception(script_node_id, ckpt_dict=[])
|
|
elif X_type == "Checkpoint":
|
|
clear_cache_by_exception(script_node_id, lora_dict=[])
|
|
|
|
# ______________________________________________________________________________________________________
|
|
def print_plot_variables(X_type, Y_type, X_value, Y_value, seed, ckpt_name, lora_name, lora_model_wt, lora_clip_wt,
|
|
vae_name, clip_skip, steps, cfg, sampler_name, scheduler, denoise,
|
|
num_rows, num_cols, latent_height, latent_width):
|
|
|
|
print("-" * 40) # Print an empty line followed by a separator line
|
|
print("\033[32mXY Plot Results:\033[0m")
|
|
|
|
if X_type == "Checkpoint":
|
|
if Y_type == "Clip Skip":
|
|
ckpt_name = ", ".join([os.path.basename(str(x[0])) for x in X_value]) if X_type == "Checkpoint" else ckpt_name
|
|
else:
|
|
ckpt_name = ", ".join([f"{os.path.basename(str(x[0]))}({str(x[1]) if x[1] is not None else str(clip_skip[1])})"
|
|
for x in X_value]) if X_type == "Checkpoint" else ckpt_name
|
|
clip_skip = "_"
|
|
|
|
clip_skip = ", ".join(map(str, X_value)) if X_type == "Clip Skip" else ", ".join(
|
|
map(str, Y_value)) if Y_type == "Clip Skip" else clip_skip
|
|
|
|
if X_type != "LoRA" and Y_type != "LoRA":
|
|
if lora_name:
|
|
lora_name = f"{os.path.basename(lora_name)}({lora_model_wt},{lora_clip_wt})"
|
|
else:
|
|
lora_name = ", ".join([f"{os.path.basename(str(x[0]))}({str(x[1])},{str(x[2])})" for x in X_value])\
|
|
if X_type == "LoRA" else ", ".join([f"{os.path.basename(str(y[0]))}({str(y[1])},{str(y[2])})"
|
|
for y in Y_value]) if Y_type == "LoRA" else lora_name
|
|
|
|
vae_name = ", ".join(
|
|
map(lambda x: os.path.basename(str(x)), X_value)) if X_type == "VAE" else ", ".join(
|
|
map(lambda y: os.path.basename(str(y)), Y_value)) if Y_type == "VAE" else vae_name
|
|
|
|
seed_list = [seed + x for x in X_value] if X_type == "Seeds++ Batch" else [seed + y for y in
|
|
Y_value] if Y_type == "Seeds++ Batch" else [
|
|
seed]
|
|
seed = ", ".join(map(str, seed_list))
|
|
|
|
steps = ", ".join(map(str, X_value)) if X_type == "Steps" else ", ".join(
|
|
map(str, Y_value)) if Y_type == "Steps" else steps
|
|
|
|
cfg = ", ".join(map(str, X_value)) if X_type == "CFG Scale" else ", ".join(
|
|
map(str, Y_value)) if Y_type == "CFG Scale" else cfg
|
|
|
|
if X_type == "Sampler":
|
|
sampler_name = ", ".join([f"{x[0]}({x[1] if x[1] is not None else scheduler[1]})" for x in X_value])
|
|
scheduler = "_"
|
|
elif Y_type == "Sampler":
|
|
sampler_name = ", ".join([f"{y[0]}({y[1] if y[1] is not None else scheduler[1]})" for y in Y_value])
|
|
scheduler = "_"
|
|
|
|
scheduler = ", ".join([str(x[0]) for x in X_value]) if X_type == "Scheduler" else ", ".join(
|
|
[str(y[0]) for y in Y_value]) if Y_type == "Scheduler" else scheduler
|
|
|
|
if isinstance(scheduler, tuple):
|
|
scheduler = scheduler[0]
|
|
|
|
denoise = ", ".join(map(str, X_value)) if X_type == "Denoise" else ", ".join(
|
|
map(str, Y_value)) if Y_type == "Denoise" else denoise
|
|
|
|
# Printouts
|
|
print(f"img_count: {len(X_value)*len(Y_value)}")
|
|
print(f"img_dims: {latent_height} x {latent_width}")
|
|
print(f"plot_dim: {num_cols} x {num_rows}")
|
|
if clip_skip == "_":
|
|
print(f"ckpt(clipskip): {ckpt_name if ckpt_name is not None else ''}")
|
|
else:
|
|
print(f"ckpt: {ckpt_name if ckpt_name is not None else ''}")
|
|
print(f"clip_skip: {clip_skip[1] if clip_skip is not None else ''}")
|
|
if lora_name:
|
|
print(f"lora(mod,clip): {lora_name if lora_name is not None else ''}")
|
|
print(f"vae: {vae_name if vae_name is not None else ''}")
|
|
print(f"seed: {seed}")
|
|
print(f"steps: {steps}")
|
|
print(f"cfg: {cfg}")
|
|
if scheduler == "_":
|
|
print(f"sampler(schr): {sampler_name}")
|
|
else:
|
|
print(f"sampler: {sampler_name}")
|
|
print(f"scheduler: {scheduler}")
|
|
print(f"denoise: {denoise}")
|
|
|
|
if X_type == "Positive Prompt S/R" or Y_type == "Positive Prompt S/R":
|
|
positive_prompt = ", ".join([str(x[0]) if i == 0 else str(x[1]) for i, x in enumerate(
|
|
X_value)]) if X_type == "Positive Prompt S/R" else ", ".join(
|
|
[str(y[0]) if i == 0 else str(y[1]) for i, y in
|
|
enumerate(Y_value)]) if Y_type == "Positive Prompt S/R" else positive_prompt
|
|
print(f"+prompt_s/r: {positive_prompt}")
|
|
|
|
if X_type == "Negative Prompt S/R" or Y_type == "Negative Prompt S/R":
|
|
negative_prompt = ", ".join([str(x[0]) if i == 0 else str(x[1]) for i, x in enumerate(
|
|
X_value)]) if X_type == "Negative Prompt S/R" else ", ".join(
|
|
[str(y[0]) if i == 0 else str(y[1]) for i, y in
|
|
enumerate(Y_value)]) if Y_type == "Negative Prompt S/R" else negative_prompt
|
|
print(f"-prompt_s/r: {negative_prompt}")
|
|
|
|
# ______________________________________________________________________________________________________
|
|
def adjusted_font_size(text, initial_font_size, latent_width):
|
|
font = ImageFont.truetype(str(Path(font_path)), initial_font_size)
|
|
text_width, _ = font.getsize(text)
|
|
|
|
if text_width > (latent_width * 0.9):
|
|
scaling_factor = 0.9 # A value less than 1 to shrink the font size more aggressively
|
|
new_font_size = int(initial_font_size * (latent_width / text_width) * scaling_factor)
|
|
else:
|
|
new_font_size = initial_font_size
|
|
|
|
return new_font_size
|
|
|
|
# ______________________________________________________________________________________________________
|
|
|
|
# Disable vae decode on next Hold
|
|
update_value_by_id("vae_decode", my_unique_id, False)
|
|
|
|
def rearrange_list_A(arr, num_cols, num_rows):
|
|
new_list = []
|
|
for i in range(num_rows):
|
|
for j in range(num_cols):
|
|
index = j * num_rows + i
|
|
new_list.append(arr[index])
|
|
return new_list
|
|
|
|
def rearrange_list_B(arr, num_rows, num_cols):
|
|
new_list = []
|
|
for i in range(num_rows):
|
|
for j in range(num_cols):
|
|
index = i * num_cols + j
|
|
new_list.append(arr[index])
|
|
return new_list
|
|
|
|
# Extract plot dimensions
|
|
num_rows = max(len(Y_value) if Y_value is not None else 0, 1)
|
|
num_cols = max(len(X_value) if X_value is not None else 0, 1)
|
|
|
|
# Flip X & Y results back if flipped earlier (for Checkpoint/LoRA For loop optimizations)
|
|
if flip_xy == True:
|
|
X_type, Y_type = Y_type, X_type
|
|
X_value, Y_value = Y_value, X_value
|
|
X_label, Y_label = Y_label, X_label
|
|
num_rows, num_cols = num_cols, num_rows
|
|
image_pil_list = rearrange_list_A(image_pil_list, num_rows, num_cols)
|
|
else:
|
|
image_pil_list = rearrange_list_B(image_pil_list, num_rows, num_cols)
|
|
image_tensor_list = rearrange_list_A(image_tensor_list, num_cols, num_rows)
|
|
latent_list = rearrange_list_A(latent_list, num_cols, num_rows)
|
|
|
|
# Print XY Plot Results
|
|
print_plot_variables(X_type, Y_type, X_value, Y_value, seed, ckpt_name, lora_name, lora_model_wt, lora_clip_wt,
|
|
vae_name, clip_skip, steps, cfg, sampler_name, scheduler, denoise,
|
|
num_rows, num_cols, latent_height, latent_width)
|
|
|
|
# Concatenate the tensors along the first dimension (dim=0)
|
|
latent_list = torch.cat(latent_list, dim=0)
|
|
|
|
# Store latent_list as last latent
|
|
update_value_by_id("latent", my_unique_id, latent_list)
|
|
|
|
# Calculate the dimensions of the white background image
|
|
border_size_top = latent_width // 15
|
|
|
|
# Longest Y-label length
|
|
if len(Y_label) > 0:
|
|
Y_label_longest = max(len(s) for s in Y_label)
|
|
else:
|
|
# Handle the case when the sequence is empty
|
|
Y_label_longest = 0 # or any other appropriate value
|
|
|
|
Y_label_scale = min(Y_label_longest + 4,24) / 24
|
|
|
|
if Y_label_orientation == "Vertical":
|
|
border_size_left = border_size_top
|
|
else: # Assuming Y_label_orientation is "Horizontal"
|
|
# border_size_left is now min(latent_width, latent_height) plus 20% of the difference between the two
|
|
border_size_left = min(latent_width, latent_height) + int(0.2 * abs(latent_width - latent_height))
|
|
border_size_left = int(border_size_left * Y_label_scale)
|
|
|
|
# Modify the border size, background width and x_offset initialization based on Y_type and Y_label_orientation
|
|
if Y_type == "Nothing":
|
|
bg_width = num_cols * latent_width + (num_cols - 1) * grid_spacing
|
|
x_offset_initial = 0
|
|
else:
|
|
if Y_label_orientation == "Vertical":
|
|
bg_width = num_cols * latent_width + (num_cols - 1) * grid_spacing + 3 * border_size_left
|
|
x_offset_initial = border_size_left * 3
|
|
else: # Assuming Y_label_orientation is "Horizontal"
|
|
bg_width = num_cols * latent_width + (num_cols - 1) * grid_spacing + border_size_left
|
|
x_offset_initial = border_size_left
|
|
|
|
# Modify the background height based on X_type
|
|
if X_type == "Nothing":
|
|
bg_height = num_rows * latent_height + (num_rows - 1) * grid_spacing
|
|
y_offset = 0
|
|
else:
|
|
bg_height = num_rows * latent_height + (num_rows - 1) * grid_spacing + 3 * border_size_top
|
|
y_offset = border_size_top * 3
|
|
|
|
# Create the white background image
|
|
background = Image.new('RGBA', (int(bg_width), int(bg_height)), color=(255, 255, 255, 255))
|
|
|
|
for row in range(num_rows):
|
|
|
|
# Initialize the X_offset
|
|
x_offset = x_offset_initial
|
|
|
|
for col in range(num_cols):
|
|
# Calculate the index for image_pil_list
|
|
index = col * num_rows + row
|
|
img = image_pil_list[index]
|
|
|
|
# Paste the image
|
|
background.paste(img, (x_offset, y_offset))
|
|
|
|
if row == 0 and X_type != "Nothing":
|
|
# Assign text
|
|
text = X_label[col]
|
|
|
|
# Add the corresponding X_value as a label above the image
|
|
initial_font_size = int(48 * img.width / 512)
|
|
font_size = adjusted_font_size(text, initial_font_size, img.width)
|
|
label_height = int(font_size*1.5)
|
|
|
|
# Create a white background label image
|
|
label_bg = Image.new('RGBA', (img.width, label_height), color=(255, 255, 255, 0))
|
|
d = ImageDraw.Draw(label_bg)
|
|
|
|
# Create the font object
|
|
font = ImageFont.truetype(str(Path(font_path)), font_size)
|
|
|
|
# Calculate the text size and the starting position
|
|
text_width, text_height = d.textsize(text, font=font)
|
|
text_x = (img.width - text_width) // 2
|
|
text_y = (label_height - text_height) // 2
|
|
|
|
# Add the text to the label image
|
|
d.text((text_x, text_y), text, fill='black', font=font)
|
|
|
|
# Calculate the available space between the top of the background and the top of the image
|
|
available_space = y_offset - label_height
|
|
|
|
# Calculate the new Y position for the label image
|
|
label_y = available_space // 2
|
|
|
|
# Paste the label image above the image on the background using alpha_composite()
|
|
background.alpha_composite(label_bg, (x_offset, label_y))
|
|
|
|
if col == 0 and Y_type != "Nothing":
|
|
# Assign text
|
|
text = Y_label[row]
|
|
|
|
# Add the corresponding Y_value as a label to the left of the image
|
|
if Y_label_orientation == "Vertical":
|
|
initial_font_size = int(48 * latent_width / 512) # Adjusting this to be same as X_label size
|
|
font_size = adjusted_font_size(text, initial_font_size, latent_width)
|
|
else: # Assuming Y_label_orientation is "Horizontal"
|
|
initial_font_size = int(48 * (border_size_left/Y_label_scale) / 512) # Adjusting this to be same as X_label size
|
|
font_size = adjusted_font_size(text, initial_font_size, int(border_size_left/Y_label_scale))
|
|
|
|
# Create a white background label image
|
|
label_bg = Image.new('RGBA', (img.height, int(font_size*1.2)), color=(255, 255, 255, 0))
|
|
d = ImageDraw.Draw(label_bg)
|
|
|
|
# Create the font object
|
|
font = ImageFont.truetype(str(Path(font_path)), font_size)
|
|
|
|
# Calculate the text size and the starting position
|
|
text_width, text_height = d.textsize(text, font=font)
|
|
text_x = (img.height - text_width) // 2
|
|
text_y = (font_size - text_height) // 2
|
|
|
|
# Add the text to the label image
|
|
d.text((text_x, text_y), text, fill='black', font=font)
|
|
|
|
# Rotate the label_bg 90 degrees counter-clockwise only if Y_label_orientation is "Vertical"
|
|
if Y_label_orientation == "Vertical":
|
|
label_bg = label_bg.rotate(90, expand=True)
|
|
|
|
# Calculate the available space between the left of the background and the left of the image
|
|
available_space = x_offset - label_bg.width
|
|
|
|
# Calculate the new X position for the label image
|
|
label_x = available_space // 2
|
|
|
|
# Calculate the Y position for the label image based on its orientation
|
|
if Y_label_orientation == "Vertical":
|
|
label_y = y_offset + (img.height - label_bg.height) // 2
|
|
else: # Assuming Y_label_orientation is "Horizontal"
|
|
label_y = y_offset + img.height - (img.height - label_bg.height) // 2
|
|
|
|
# Paste the label image to the left of the image on the background using alpha_composite()
|
|
background.alpha_composite(label_bg, (label_x, label_y))
|
|
|
|
# Update the x_offset
|
|
x_offset += img.width + grid_spacing
|
|
|
|
# Update the y_offset
|
|
y_offset += img.height + grid_spacing
|
|
|
|
images = pil2tensor(background)
|
|
|
|
# Generate image results and store
|
|
results = preview_images(images, filename_prefix)
|
|
update_value_by_id("results", my_unique_id, results)
|
|
|
|
# Squeeze and Stack the tensors, and store results
|
|
if xyplot_as_output_image == False:
|
|
image_tensor_list = torch.stack([tensor.squeeze() for tensor in image_tensor_list])
|
|
else:
|
|
image_tensor_list = images
|
|
update_value_by_id("images", my_unique_id, image_tensor_list)
|
|
|
|
# Print cache if set to true
|
|
if cache_models == "True":
|
|
print_loaded_objects_entries(script_node_id, prompt)
|
|
|
|
print("-" * 40) # Print an empty line followed by a separator line
|
|
|
|
# Output image results to ui and node outputs
|
|
return {"ui": {"images": results}, "result": (model, positive, negative, {"samples": latent_list}, vae, image_tensor_list,)}
|
|
|
|
########################################################################################################################
|
|
# TSC XY Plot
|
|
class TSC_XYplot:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"grid_spacing": ("INT", {"default": 0, "min": 0, "max": 500, "step": 5}),
|
|
"XY_flip": (["False","True"],),
|
|
"Y_label_orientation": (["Horizontal", "Vertical"],),
|
|
"cache_models": (["True", "False"],),},
|
|
"optional": {"dependencies": ("DEPENDENCIES", ),
|
|
"X": ("XY", ),
|
|
"Y": ("XY", ),},}
|
|
|
|
RETURN_TYPES = ("SCRIPT",)
|
|
RETURN_NAMES = ("SCRIPT",)
|
|
FUNCTION = "XYplot"
|
|
CATEGORY = "Efficiency Nodes/XY Plot"
|
|
|
|
def XYplot(self, grid_spacing, XY_flip, Y_label_orientation, cache_models, dependencies=None, X=None, Y=None):
|
|
|
|
# Unpack X & Y Tuples if connected
|
|
if X != None:
|
|
X_type, X_value = X
|
|
else:
|
|
X_type = "Nothing"
|
|
X_value = [""]
|
|
if Y != None:
|
|
Y_type, Y_value = Y
|
|
else:
|
|
Y_type = "Nothing"
|
|
Y_value = [""]
|
|
|
|
# If types are the same exit. If one isn't "Nothing", print error
|
|
if (X_type == Y_type):
|
|
if X_type != "Nothing":
|
|
print(f"\033[31mXY Plot Error:\033[0m X and Y input types must be different.")
|
|
else:
|
|
# Print XY Plot Inputs
|
|
print("-" * 40)
|
|
print("XY Plot Script Inputs:")
|
|
print(f"(X) {X_type}: {X_value}")
|
|
print(f"(Y) {Y_type}: {Y_value}")
|
|
print("-" * 40)
|
|
return (None,)
|
|
|
|
# Check that dependencies is connected for Checkpoint and LoRA plots
|
|
types = ("Checkpoint", "LoRA", "Positive Prompt S/R", "Negative Prompt S/R")
|
|
if X_type in types or Y_type in types:
|
|
if dependencies == None: # Not connected
|
|
print(f"\033[31mXY Plot Error:\033[0m The dependencies input must be connected for certain plot types.")
|
|
# Return None
|
|
return (None,)
|
|
|
|
# Clean Schedulers from Sampler data (if other type is Scheduler)
|
|
if X_type == "Sampler" and Y_type == "Scheduler":
|
|
# Clear X_value Scheduler's
|
|
X_value = [[x[0], ""] for x in X_value]
|
|
elif Y_type == "Sampler" and X_type == "Scheduler":
|
|
# Clear Y_value Scheduler's
|
|
Y_value = [[y[0], ""] for y in Y_value]
|
|
|
|
# Flip X and Y
|
|
if XY_flip == "True":
|
|
X_type, Y_type = Y_type, X_type
|
|
X_value, Y_value = Y_value, X_value
|
|
|
|
return ((X_type, X_value, Y_type, Y_value, grid_spacing, Y_label_orientation, cache_models, dependencies),)
|
|
|
|
|
|
# TSC XY Plot: Seeds Values
|
|
class TSC_XYplot_SeedsBatch:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"batch_count": ("INT", {"default": 1, "min": 0, "max": 50}),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, batch_count):
|
|
if batch_count == 0:
|
|
return (None,)
|
|
xy_type = "Seeds++ Batch"
|
|
xy_value = [batch_count]
|
|
return ((xy_type, xy_value),)
|
|
|
|
# TSC XY Plot: Step Values
|
|
class TSC_XYplot_Steps:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"selection_count": ("INT", {"default": 0, "min": 0, "max": 5}),
|
|
"steps_1": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"steps_2": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"steps_3": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"steps_4": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"steps_5": ("INT", {"default": 20, "min": 1, "max": 10000}),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, selection_count, steps_1, steps_2, steps_3, steps_4, steps_5):
|
|
xy_type = "Steps"
|
|
xy_value = [step for idx, step in enumerate([steps_1, steps_2, steps_3, steps_4, steps_5], start=1) if
|
|
idx <= selection_count]
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
|
|
# TSC XY Plot: CFG Values
|
|
class TSC_XYplot_CFG:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"selection_count": ("INT", {"default": 0, "min": 0, "max": 5}),
|
|
"cfg_1": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0}),
|
|
"cfg_2": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0}),
|
|
"cfg_3": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0}),
|
|
"cfg_4": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0}),
|
|
"cfg_5": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0}),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, selection_count, cfg_1, cfg_2, cfg_3, cfg_4, cfg_5):
|
|
xy_type = "CFG Scale"
|
|
xy_value = [cfg for idx, cfg in enumerate([cfg_1, cfg_2, cfg_3, cfg_4, cfg_5], start=1) if idx <= selection_count]
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
|
|
# TSC XY Plot: Sampler Values
|
|
class TSC_XYplot_Sampler:
|
|
|
|
samplers = ["None"] + comfy.samplers.KSampler.SAMPLERS
|
|
schedulers = ["None"] + comfy.samplers.KSampler.SCHEDULERS
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"sampler_1": (cls.samplers,),
|
|
"scheduler_1": (cls.schedulers,),
|
|
"sampler_2": (cls.samplers,),
|
|
"scheduler_2": (cls.schedulers,),
|
|
"sampler_3": (cls.samplers,),
|
|
"scheduler_3": (cls.schedulers,),
|
|
"sampler_4": (cls.samplers,),
|
|
"scheduler_4": (cls.schedulers,),
|
|
"sampler_5": (cls.samplers,),
|
|
"scheduler_5": (cls.schedulers,),},
|
|
}
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, sampler_1, scheduler_1, sampler_2, scheduler_2, sampler_3, scheduler_3,
|
|
sampler_4, scheduler_4, sampler_5, scheduler_5):
|
|
|
|
samplers = [sampler_1, sampler_2, sampler_3, sampler_4, sampler_5]
|
|
schedulers = [scheduler_1, scheduler_2, scheduler_3, scheduler_4, scheduler_5]
|
|
|
|
pairs = []
|
|
for sampler, scheduler in zip(samplers, schedulers):
|
|
if sampler != "None":
|
|
if scheduler != "None":
|
|
pairs.append((sampler, scheduler))
|
|
else:
|
|
pairs.append((sampler,None))
|
|
|
|
xy_type = "Sampler"
|
|
xy_value = pairs
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
|
|
# TSC XY Plot: Scheduler Values
|
|
class TSC_XYplot_Scheduler:
|
|
|
|
schedulers = ["None"] + comfy.samplers.KSampler.SCHEDULERS
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"scheduler_1": (cls.schedulers,),
|
|
"scheduler_2": (cls.schedulers,),
|
|
"scheduler_3": (cls.schedulers,),
|
|
"scheduler_4": (cls.schedulers,),
|
|
"scheduler_5": (cls.schedulers,),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, scheduler_1, scheduler_2, scheduler_3, scheduler_4, scheduler_5):
|
|
xy_type = "Scheduler"
|
|
xy_value = [scheduler for scheduler in [scheduler_1, scheduler_2, scheduler_3, scheduler_4, scheduler_5] if
|
|
scheduler != "None"]
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
|
|
# TSC XY Plot: Denoise Values
|
|
class TSC_XYplot_Denoise:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"select_count": ("INT", {"default": 0, "min": 0, "max": 5}),
|
|
"denoise_1": ("FLOAT", {"default": 1.0, "min": 0.00, "max": 1.0, "step": 0.01}),
|
|
"denoise_2": ("FLOAT", {"default": 1.0, "min": 0.00, "max": 1.0, "step": 0.01}),
|
|
"denoise_3": ("FLOAT", {"default": 1.0, "min": 0.00, "max": 1.0, "step": 0.01}),
|
|
"denoise_4": ("FLOAT", {"default": 1.0, "min": 0.00, "max": 1.0, "step": 0.01}),
|
|
"denoise_5": ("FLOAT", {"default": 1.0, "min": 0.00, "max": 1.0, "step": 0.01}),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, select_count, denoise_1, denoise_2, denoise_3, denoise_4, denoise_5):
|
|
xy_type = "Denoise"
|
|
xy_value = [denoise for idx, denoise in
|
|
enumerate([denoise_1, denoise_2, denoise_3, denoise_4, denoise_5], start=1) if idx <= select_count]
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
|
|
# TSC XY Plot: VAE Values
|
|
class TSC_XYplot_VAE:
|
|
|
|
vaes = ["None"] + folder_paths.get_filename_list("vae")
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"vae_name_1": (cls.vaes,),
|
|
"vae_name_2": (cls.vaes,),
|
|
"vae_name_3": (cls.vaes,),
|
|
"vae_name_4": (cls.vaes,),
|
|
"vae_name_5": (cls.vaes,),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, vae_name_1, vae_name_2, vae_name_3, vae_name_4, vae_name_5):
|
|
xy_type = "VAE"
|
|
xy_value = [vae for vae in [vae_name_1, vae_name_2, vae_name_3, vae_name_4, vae_name_5] if vae != "None"]
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
# TSC XY Plot: Prompt S/R
|
|
class TSC_XYplot_PromptSR_Positive:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"search_txt": ("STRING", {"default": "", "multiline": False}),
|
|
"replace_count": ("INT", {"default": 0, "min": 0, "max": 4}),
|
|
"replace_1":("STRING", {"default": "", "multiline": False}),
|
|
"replace_2": ("STRING", {"default": "", "multiline": False}),
|
|
"replace_3": ("STRING", {"default": "", "multiline": False}),
|
|
"replace_4": ("STRING", {"default": "", "multiline": False}),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, search_txt, replace_count, replace_1, replace_2, replace_3, replace_4):
|
|
# If search_txt is empty, return (None,)
|
|
if search_txt == "":
|
|
return (None,)
|
|
|
|
xy_type = "Positive Prompt S/R"
|
|
|
|
# Create a list of replacement arguments
|
|
replacements = [replace_1, replace_2, replace_3, replace_4]
|
|
|
|
# Create base entry
|
|
xy_values = [(search_txt, None)]
|
|
|
|
if replace_count > 0:
|
|
# Append additional entries based on replace_count
|
|
xy_values.extend([(search_txt, replacements[i]) for i in range(replace_count)])
|
|
|
|
return ((xy_type, xy_values),)
|
|
|
|
class TSC_XYplot_PromptSR_Negative:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"search_txt": ("STRING", {"default": "", "multiline": False}),
|
|
"replace_count": ("INT", {"default": 0, "min": 0, "max": 4}),
|
|
"replace_1":("STRING", {"default": "", "multiline": False}),
|
|
"replace_2": ("STRING", {"default": "", "multiline": False}),
|
|
"replace_3": ("STRING", {"default": "", "multiline": False}),
|
|
"replace_4": ("STRING", {"default": "", "multiline": False}),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, search_txt, replace_count, replace_1, replace_2, replace_3, replace_4):
|
|
# If search_txt is empty, return (None,)
|
|
if search_txt == "":
|
|
return (None,)
|
|
|
|
xy_type = "Negative Prompt S/R"
|
|
|
|
# Create a list of replacement arguments
|
|
replacements = [replace_1, replace_2, replace_3, replace_4]
|
|
|
|
# Create base entry
|
|
xy_values = [(search_txt, None)]
|
|
|
|
if replace_count > 0:
|
|
# Append additional entries based on replace_count
|
|
xy_values.extend([(search_txt, replacements[i]) for i in range(replace_count)])
|
|
|
|
return ((xy_type, xy_values),)
|
|
|
|
# TSC XY Plot: Checkpoint Values
|
|
class TSC_XYplot_Checkpoint:
|
|
|
|
checkpoints = ["None"] + folder_paths.get_filename_list("checkpoints")
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"ckpt_name_1": (cls.checkpoints,),
|
|
"clip_skip1": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
"ckpt_name_2": (cls.checkpoints,),
|
|
"clip_skip2": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
"ckpt_name_3": (cls.checkpoints,),
|
|
"clip_skip3": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
"ckpt_name_4": (cls.checkpoints,),
|
|
"clip_skip4": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
"ckpt_name_5": (cls.checkpoints,),
|
|
"clip_skip5": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, ckpt_name_1, clip_skip1, ckpt_name_2, clip_skip2, ckpt_name_3, clip_skip3,
|
|
ckpt_name_4, clip_skip4, ckpt_name_5, clip_skip5):
|
|
xy_type = "Checkpoint"
|
|
checkpoints = [ckpt_name_1, ckpt_name_2, ckpt_name_3, ckpt_name_4, ckpt_name_5]
|
|
clip_skips = [clip_skip1, clip_skip2, clip_skip3, clip_skip4, clip_skip5]
|
|
xy_value = [(checkpoint, clip_skip) for checkpoint, clip_skip in zip(checkpoints, clip_skips) if
|
|
checkpoint != "None"]
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
# TSC XY Plot: Clip Skip
|
|
class TSC_XYplot_ClipSkip:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"select_count": ("INT", {"default": 0, "min": 0, "max": 5}),
|
|
"clip_skip_1": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
"clip_skip_2": ("INT", {"default": -2, "min": -24, "max": -1, "step": 1}),
|
|
"clip_skip_3": ("INT", {"default": -3, "min": -24, "max": -1, "step": 1}),
|
|
"clip_skip_4": ("INT", {"default": -4, "min": -24, "max": -1, "step": 1}),
|
|
"clip_skip_5": ("INT", {"default": -5, "min": -24, "max": -1, "step": 1}),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, select_count, clip_skip_1, clip_skip_2, clip_skip_3, clip_skip_4, clip_skip_5):
|
|
xy_type = "Clip Skip"
|
|
xy_value = [clip_skip for idx, clip_skip in
|
|
enumerate([clip_skip_1, clip_skip_2, clip_skip_3, clip_skip_4, clip_skip_5], start=1) if idx <= select_count]
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
# TSC XY Plot: LoRA Values
|
|
class TSC_XYplot_LoRA:
|
|
|
|
loras = ["None"] + folder_paths.get_filename_list("loras")
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"model_strengths": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"clip_strengths": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"lora_name_1": (cls.loras,),
|
|
"lora_name_2": (cls.loras,),
|
|
"lora_name_3": (cls.loras,),
|
|
"lora_name_4": (cls.loras,),
|
|
"lora_name_5": (cls.loras,),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, model_strengths, clip_strengths, lora_name_1, lora_name_2, lora_name_3, lora_name_4, lora_name_5):
|
|
xy_type = "LoRA"
|
|
loras = [lora_name_1, lora_name_2, lora_name_3, lora_name_4, lora_name_5]
|
|
xy_value = [(lora, model_strengths, clip_strengths) for lora in loras if lora != "None"]
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
# TSC XY Plot: LoRA Advanced
|
|
class TSC_XYplot_LoRA_Adv:
|
|
|
|
loras = ["None"] + folder_paths.get_filename_list("loras")
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"lora_name_1": (cls.loras,),
|
|
"model_str_1": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"clip_str_1": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"lora_name_2": (cls.loras,),
|
|
"model_str_2": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"clip_str_2": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"lora_name_3": (cls.loras,),
|
|
"model_str_3": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"clip_str_3": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"lora_name_4": (cls.loras,),
|
|
"model_str_4": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"clip_str_4": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"lora_name_5": (cls.loras,),
|
|
"model_str_5": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"clip_str_5": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, lora_name_1, model_str_1, clip_str_1, lora_name_2, model_str_2, clip_str_2,
|
|
lora_name_3, model_str_3, clip_str_3, lora_name_4, model_str_4, clip_str_4, lora_name_5, model_str_5, clip_str_5):
|
|
xy_type = "LoRA"
|
|
loras = [lora_name_1, lora_name_2, lora_name_3, lora_name_4, lora_name_5]
|
|
model_strs = [model_str_1, model_str_2, model_str_3, model_str_4, model_str_5]
|
|
clip_strs = [clip_str_1, clip_str_2, clip_str_3, clip_str_4, clip_str_5]
|
|
xy_value = [(lora, model_str, clip_str) for lora, model_str, clip_str in zip(loras, model_strs, clip_strs) if lora != "None"]
|
|
if not xy_value: # Check if the list is empty
|
|
return (None,)
|
|
return ((xy_type, xy_value),)
|
|
|
|
|
|
# TSC XY Plot: Manual Entry Notes
|
|
class TSC_XYplot_Manual_XY_Entry_Info:
|
|
|
|
syntax = "(X/Y_types) (X/Y_values)\n" \
|
|
"Seeds++ Batch batch_count\n" \
|
|
"Steps steps_1;steps_2;...\n" \
|
|
"CFG Scale cfg_1;cfg_2;...\n" \
|
|
"Sampler(1) sampler_1;sampler_2;...\n" \
|
|
"Sampler(2) sampler_1,scheduler_1;...\n" \
|
|
"Sampler(3) sampler_1;...;,default_scheduler\n" \
|
|
"Scheduler scheduler_1;scheduler_2;...\n" \
|
|
"Denoise denoise_1;denoise_2;...\n" \
|
|
"VAE vae_1;vae_2;vae_3;...\n" \
|
|
"+Prompt S/R search_txt;replace_1;replace_2;...\n" \
|
|
"-Prompt S/R search_txt;replace_1;replace_2;...\n" \
|
|
"Checkpoint(1) ckpt_1;ckpt_2;ckpt_3;...\n" \
|
|
"Checkpoint(2) ckpt_1,clip_skip_1;...\n" \
|
|
"Checkpoint(3) ckpt_1;ckpt_2;...;,default_clip_skip\n" \
|
|
"Clip Skip clip_skip_1;clip_skip_2;...\n" \
|
|
"LoRA(1) lora_1;lora_2;lora_3;...\n" \
|
|
"LoRA(2) lora_1;...;,default_model_str,default_clip_str\n" \
|
|
"LoRA(3) lora_1,model_str_1,clip_str_1;..."
|
|
|
|
samplers = ";\n".join(comfy.samplers.KSampler.SAMPLERS)
|
|
schedulers = ";\n".join(comfy.samplers.KSampler.SCHEDULERS)
|
|
vaes = ";\n".join(folder_paths.get_filename_list("vae"))
|
|
ckpts = ";\n".join(folder_paths.get_filename_list("checkpoints"))
|
|
loras = ";\n".join(folder_paths.get_filename_list("loras"))
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"notes": ("STRING", {"default":
|
|
f"_____________SYNTAX_____________\n{cls.syntax}\n\n"
|
|
f"____________SAMPLERS____________\n{cls.samplers}\n\n"
|
|
f"___________SCHEDULERS___________\n{cls.schedulers}\n\n"
|
|
f"_____________VAES_______________\n{cls.vaes}\n\n"
|
|
f"___________CHECKPOINTS__________\n{cls.ckpts}\n\n"
|
|
f"_____________LORAS______________\n{cls.loras}\n","multiline": True}),},}
|
|
|
|
RETURN_TYPES = ()
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
# TSC XY Plot: Manual Entry
|
|
class TSC_XYplot_Manual_XY_Entry:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"X_type": (["Nothing", "Seeds++ Batch", "Steps", "CFG Scale", "Sampler", "Scheduler", "Denoise", "VAE",
|
|
"Positive Prompt S/R", "Negative Prompt S/R", "Checkpoint", "Clip Skip", "LoRA"],),
|
|
"X_value": ("STRING", {"default": "", "multiline": True}),
|
|
"Y_type": (["Nothing", "Seeds++ Batch", "Steps", "CFG Scale", "Sampler", "Scheduler", "Denoise", "VAE",
|
|
"Positive Prompt S/R", "Negative Prompt S/R", "Checkpoint", "Clip Skip", "LoRA"],),
|
|
"Y_value": ("STRING", {"default": "", "multiline": True}),},}
|
|
|
|
RETURN_TYPES = ("XY", "XY",)
|
|
RETURN_NAMES = ("X", "Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, X_type, X_value, Y_type, Y_value, prompt=None, my_unique_id=None):
|
|
|
|
# Store X values as arrays
|
|
if X_type not in {"Positive Prompt S/R", "Negative Prompt S/R", "VAE", "Checkpoint", "LoRA"}:
|
|
X_value = X_value.replace(" ", "") # Remove spaces
|
|
X_value = X_value.replace("\n", "") # Remove newline characters
|
|
X_value = X_value.rstrip(";") # Remove trailing semicolon
|
|
X_value = X_value.split(";") # Turn to array
|
|
|
|
# Store Y values as arrays
|
|
if Y_type not in {"Positive Prompt S/R", "Negative Prompt S/R", "VAE", "Checkpoint", "LoRA"}:
|
|
Y_value = Y_value.replace(" ", "") # Remove spaces
|
|
Y_value = Y_value.replace("\n", "") # Remove newline characters
|
|
Y_value = Y_value.rstrip(";") # Remove trailing semicolon
|
|
Y_value = Y_value.split(";") # Turn to array
|
|
|
|
# Define the valid bounds for each type
|
|
bounds = {
|
|
"Seeds++ Batch": {"min": 0, "max": 50},
|
|
"Steps": {"min": 1, "max": 10000},
|
|
"CFG Scale": {"min": 0, "max": 100},
|
|
"Sampler": {"options": comfy.samplers.KSampler.SAMPLERS},
|
|
"Scheduler": {"options": comfy.samplers.KSampler.SCHEDULERS},
|
|
"Denoise": {"min": 0, "max": 1},
|
|
"VAE": {"options": folder_paths.get_filename_list("vae")},
|
|
"Checkpoint": {"options": folder_paths.get_filename_list("checkpoints")},
|
|
"Clip Skip": {"min": -24, "max": -1},
|
|
"LoRA": {"options": folder_paths.get_filename_list("loras"),
|
|
"model_str": {"min": 0, "max": 10},"clip_str": {"min": 0, "max": 10},},
|
|
}
|
|
|
|
# Validates a value based on its corresponding value_type and bounds.
|
|
def validate_value(value, value_type, bounds):
|
|
# ________________________________________________________________________
|
|
# Seeds++ Batch
|
|
if value_type == "Seeds++ Batch":
|
|
try:
|
|
x = int(float(value))
|
|
if x < bounds["Seeds++ Batch"]["min"]:
|
|
x = bounds["Seeds++ Batch"]["min"]
|
|
elif x > bounds["Seeds++ Batch"]["max"]:
|
|
x = bounds["Seeds++ Batch"]["max"]
|
|
except ValueError:
|
|
print(f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid batch count.")
|
|
return None
|
|
if float(value) != x:
|
|
print(f"\033[31mmXY Plot Error:\033[0m '{value}' is not a valid batch count.")
|
|
return None
|
|
return x
|
|
# ________________________________________________________________________
|
|
# Steps
|
|
elif value_type == "Steps":
|
|
try:
|
|
x = int(value)
|
|
if x < bounds["Steps"]["min"]:
|
|
x = bounds["Steps"]["min"]
|
|
elif x > bounds["Steps"]["max"]:
|
|
x = bounds["Steps"]["max"]
|
|
return x
|
|
except ValueError:
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid Step count.")
|
|
return None
|
|
# ________________________________________________________________________
|
|
# CFG Scale
|
|
elif value_type == "CFG Scale":
|
|
try:
|
|
x = float(value)
|
|
if x < bounds["CFG Scale"]["min"]:
|
|
x = bounds["CFG Scale"]["min"]
|
|
elif x > bounds["CFG Scale"]["max"]:
|
|
x = bounds["CFG Scale"]["max"]
|
|
return x
|
|
except ValueError:
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{value}' is not a number between {bounds['CFG Scale']['min']}"
|
|
f" and {bounds['CFG Scale']['max']} for CFG Scale.")
|
|
return None
|
|
# ________________________________________________________________________
|
|
# Sampler
|
|
elif value_type == "Sampler":
|
|
if isinstance(value, str) and ',' in value:
|
|
value = tuple(map(str.strip, value.split(',')))
|
|
if isinstance(value, tuple):
|
|
if len(value) >= 2:
|
|
value = value[:2] # Slice the value tuple to keep only the first two elements
|
|
sampler, scheduler = value
|
|
scheduler = scheduler.lower() # Convert the scheduler name to lowercase
|
|
if sampler not in bounds["Sampler"]["options"]:
|
|
valid_samplers = '\n'.join(bounds["Sampler"]["options"])
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{sampler}' is not a valid sampler. Valid samplers are:\n{valid_samplers}")
|
|
sampler = None
|
|
if scheduler not in bounds["Scheduler"]["options"]:
|
|
valid_schedulers = '\n'.join(bounds["Scheduler"]["options"])
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{scheduler}' is not a valid scheduler. Valid schedulers are:\n{valid_schedulers}")
|
|
scheduler = None
|
|
if sampler is None or scheduler is None:
|
|
return None
|
|
else:
|
|
return sampler, scheduler
|
|
else:
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid sampler.'")
|
|
return None
|
|
else:
|
|
if value not in bounds["Sampler"]["options"]:
|
|
valid_samplers = '\n'.join(bounds["Sampler"]["options"])
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid sampler. Valid samplers are:\n{valid_samplers}")
|
|
return None
|
|
else:
|
|
return value, None
|
|
# ________________________________________________________________________
|
|
# Scheduler
|
|
elif value_type == "Scheduler":
|
|
if value not in bounds["Scheduler"]["options"]:
|
|
valid_schedulers = '\n'.join(bounds["Scheduler"]["options"])
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid Scheduler. Valid Schedulers are:\n{valid_schedulers}")
|
|
return None
|
|
else:
|
|
return value
|
|
# ________________________________________________________________________
|
|
# Denoise
|
|
elif value_type == "Denoise":
|
|
try:
|
|
x = float(value)
|
|
if x < bounds["Denoise"]["min"]:
|
|
x = bounds["Denoise"]["min"]
|
|
elif x > bounds["Denoise"]["max"]:
|
|
x = bounds["Denoise"]["max"]
|
|
return x
|
|
except ValueError:
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{value}' is not a number between {bounds['Denoise']['min']} "
|
|
f"and {bounds['Denoise']['max']} for Denoise.")
|
|
return None
|
|
# ________________________________________________________________________
|
|
# VAE
|
|
elif value_type == "VAE":
|
|
if value not in bounds["VAE"]["options"]:
|
|
valid_vaes = '\n'.join(bounds["VAE"]["options"])
|
|
print(f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid VAE. Valid VAEs are:\n{valid_vaes}")
|
|
return None
|
|
else:
|
|
return value
|
|
# ________________________________________________________________________
|
|
# Checkpoint
|
|
elif value_type == "Checkpoint":
|
|
if isinstance(value, str) and ',' in value:
|
|
value = tuple(map(str.strip, value.split(',')))
|
|
if isinstance(value, tuple):
|
|
if len(value) >= 2:
|
|
value = value[:2] # Slice the value tuple to keep only the first two elements
|
|
checkpoint, clip_skip = value
|
|
try:
|
|
clip_skip = int(clip_skip) # Convert the clip_skip to integer
|
|
except ValueError:
|
|
print(f"\033[31mXY Plot Error:\033[0m '{clip_skip}' is not a valid clip_skip. "
|
|
f"Valid clip skip values are integers between {bounds['Clip Skip']['min']} and {bounds['Clip Skip']['max']}.")
|
|
return None
|
|
if checkpoint not in bounds["Checkpoint"]["options"]:
|
|
valid_checkpoints = '\n'.join(bounds["Checkpoint"]["options"])
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{checkpoint}' is not a valid checkpoint. Valid checkpoints are:\n{valid_checkpoints}")
|
|
checkpoint = None
|
|
if clip_skip < bounds["Clip Skip"]["min"] or clip_skip > bounds["Clip Skip"]["max"]:
|
|
print(f"\033[31mXY Plot Error:\033[0m '{clip_skip}' is not a valid clip skip. "
|
|
f"Valid clip skip values are integers between {bounds['Clip Skip']['min']} and {bounds['Clip Skip']['max']}.")
|
|
clip_skip = None
|
|
if checkpoint is None or clip_skip is None:
|
|
return None
|
|
else:
|
|
return checkpoint, clip_skip
|
|
else:
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid checkpoint.'")
|
|
return None
|
|
else:
|
|
if value not in bounds["Checkpoint"]["options"]:
|
|
valid_checkpoints = '\n'.join(bounds["Checkpoint"]["options"])
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid checkpoint. Valid checkpoints are:\n{valid_checkpoints}")
|
|
return None
|
|
else:
|
|
return value, None
|
|
# ________________________________________________________________________
|
|
# Clip Skip
|
|
elif value_type == "Clip Skip":
|
|
try:
|
|
x = int(value)
|
|
if x < bounds["Clip Skip"]["min"]:
|
|
x = bounds["Clip Skip"]["min"]
|
|
elif x > bounds["Clip Skip"]["max"]:
|
|
x = bounds["Clip Skip"]["max"]
|
|
return x
|
|
except ValueError:
|
|
print(f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid Clip Skip.")
|
|
return None
|
|
# ________________________________________________________________________
|
|
# LoRA
|
|
elif value_type == "LoRA":
|
|
if isinstance(value, str) and ',' in value:
|
|
value = tuple(map(str.strip, value.split(',')))
|
|
|
|
if isinstance(value, tuple):
|
|
lora_name, model_str, clip_str = (value + (1.0, 1.0))[:3] # Defaults model_str and clip_str to 1 if not provided
|
|
|
|
if lora_name not in bounds["LoRA"]["options"]:
|
|
valid_loras = '\n'.join(bounds["LoRA"]["options"])
|
|
print(f"\033[31mXY Plot Error:\033[0m '{lora_name}' is not a valid LoRA. Valid LoRAs are:\n{valid_loras}")
|
|
lora_name = None
|
|
|
|
try:
|
|
model_str = float(model_str)
|
|
clip_str = float(clip_str)
|
|
except ValueError:
|
|
print(f"\033[31mXY Plot Error:\033[0m The LoRA model strength and clip strength values should be numbers"
|
|
f" between {bounds['LoRA']['model_str']['min']} and {bounds['LoRA']['model_str']['max']}.")
|
|
return None
|
|
|
|
if model_str < bounds["LoRA"]["model_str"]["min"] or model_str > bounds["LoRA"]["model_str"]["max"]:
|
|
print(f"\033[31mXY Plot Error:\033[0m '{model_str}' is not a valid LoRA model strength value. "
|
|
f"Valid lora model strength values are between {bounds['LoRA']['model_str']['min']} and {bounds['LoRA']['model_str']['max']}.")
|
|
model_str = None
|
|
|
|
if clip_str < bounds["LoRA"]["clip_str"]["min"] or clip_str > bounds["LoRA"]["clip_str"]["max"]:
|
|
print(f"\033[31mXY Plot Error:\033[0m '{clip_str}' is not a valid LoRA clip strength value. "
|
|
f"Valid lora clip strength values are between {bounds['LoRA']['clip_str']['min']} and {bounds['LoRA']['clip_str']['max']}.")
|
|
clip_str = None
|
|
|
|
if lora_name is None or model_str is None or clip_str is None:
|
|
return None
|
|
else:
|
|
return lora_name, model_str, clip_str
|
|
else:
|
|
if value not in bounds["LoRA"]["options"]:
|
|
valid_loras = '\n'.join(bounds["LoRA"]["options"])
|
|
print(
|
|
f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid LoRA. Valid LoRAs are:\n{valid_loras}")
|
|
return None
|
|
else:
|
|
return value, 1.0, 1.0
|
|
|
|
# ________________________________________________________________________
|
|
else:
|
|
return None
|
|
|
|
# Validate X_value array length is 1 if doing a "Seeds++ Batch"
|
|
if len(X_value) != 1 and X_type == "Seeds++ Batch":
|
|
print(f"\033[31mXY Plot Error:\033[0m '{';'.join(X_value)}' is not a valid batch count.")
|
|
return (None,None,)
|
|
|
|
# Validate Y_value array length is 1 if doing a "Seeds++ Batch"
|
|
if len(Y_value) != 1 and Y_type == "Seeds++ Batch":
|
|
print(f"\033[31mXY Plot Error:\033[0m '{';'.join(Y_value)}' is not a valid batch count.")
|
|
return (None,None,)
|
|
|
|
# Apply allowed shortcut syntax to certain input types
|
|
if X_type in ["Sampler", "Checkpoint", "LoRA"]:
|
|
if X_value[-1].startswith(','):
|
|
# Remove the leading comma from the last entry and store it as suffixes
|
|
suffixes = X_value.pop().lstrip(',').split(',')
|
|
# Split all preceding entries into subentries
|
|
X_value = [entry.split(',') for entry in X_value]
|
|
# Make all entries the same length as suffixes by appending missing elements
|
|
for entry in X_value:
|
|
entry += suffixes[len(entry) - 1:]
|
|
# Join subentries back into strings
|
|
X_value = [','.join(entry) for entry in X_value]
|
|
|
|
# Apply allowed shortcut syntax to certain input types
|
|
if Y_type in ["Sampler", "Checkpoint", "LoRA"]:
|
|
if Y_value[-1].startswith(','):
|
|
# Remove the leading comma from the last entry and store it as suffixes
|
|
suffixes = Y_value.pop().lstrip(',').split(',')
|
|
# Split all preceding entries into subentries
|
|
Y_value = [entry.split(',') for entry in Y_value]
|
|
# Make all entries the same length as suffixes by appending missing elements
|
|
for entry in Y_value:
|
|
entry += suffixes[len(entry) - 1:]
|
|
# Join subentries back into strings
|
|
Y_value = [','.join(entry) for entry in Y_value]
|
|
|
|
# Prompt S/R X Cleanup
|
|
if X_type in {"Positive Prompt S/R", "Negative Prompt S/R"}:
|
|
if X_value[0] == '':
|
|
print(f"\033[31mXY Plot Error:\033[0m Prompt S/R value can not be empty.")
|
|
return (None, None,)
|
|
else:
|
|
X_value = [(X_value[0], None) if i == 0 else (X_value[0], x) for i, x in enumerate(X_value)]
|
|
|
|
# Prompt S/R X Cleanup
|
|
if Y_type in {"Positive Prompt S/R", "Negative Prompt S/R"}:
|
|
if Y_value[0] == '':
|
|
print(f"\033[31mXY Plot Error:\033[0m Prompt S/R value can not be empty.")
|
|
return (None, None,)
|
|
else:
|
|
Y_value = [(Y_value[0], None) if i == 0 else (Y_value[0], y) for i, y in enumerate(Y_value)]
|
|
|
|
# Loop over each entry in X_value and check if it's valid
|
|
if X_type not in {"Nothing", "Positive Prompt S/R", "Negative Prompt S/R"}:
|
|
for i in range(len(X_value)):
|
|
X_value[i] = validate_value(X_value[i], X_type, bounds)
|
|
if X_value[i] == None:
|
|
return (None,None,)
|
|
|
|
# Loop over each entry in Y_value and check if it's valid
|
|
if Y_type not in {"Nothing", "Positive Prompt S/R", "Negative Prompt S/R"}:
|
|
for i in range(len(Y_value)):
|
|
Y_value[i] = validate_value(Y_value[i], Y_type, bounds)
|
|
if Y_value[i] == None:
|
|
return (None,None,)
|
|
|
|
# Clean Schedulers from Sampler data (if other type is Scheduler)
|
|
if X_type == "Sampler" and Y_type == "Scheduler":
|
|
# Clear X_value Scheduler's
|
|
X_value = [[x[0], ""] for x in X_value]
|
|
elif Y_type == "Sampler" and X_type == "Scheduler":
|
|
# Clear Y_value Scheduler's
|
|
Y_value = [[y[0], ""] for y in Y_value]
|
|
|
|
# Clean X/Y_values
|
|
if X_type == "Nothing":
|
|
X_value = [""]
|
|
if Y_type == "Nothing":
|
|
Y_value = [""]
|
|
|
|
return ((X_type, X_value), (Y_type, Y_value),)
|
|
|
|
# TSC XY Plot: Seeds Values
|
|
class TSC_XYplot_JoinInputs:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"XY_1": ("XY",),
|
|
"XY_2": ("XY",),},
|
|
}
|
|
|
|
RETURN_TYPES = ("XY",)
|
|
RETURN_NAMES = ("X or Y",)
|
|
FUNCTION = "xy_value"
|
|
CATEGORY = "Efficiency Nodes/XY Plot/XY Inputs"
|
|
|
|
def xy_value(self, XY_1, XY_2):
|
|
xy_type_1, xy_value_1 = XY_1
|
|
xy_type_2, xy_value_2 = XY_2
|
|
|
|
if xy_type_1 != xy_type_2:
|
|
print(f"\033[31mJoin XY Inputs Error:\033[0m Input types must match")
|
|
return (None,)
|
|
elif xy_type_1 == "Seeds++ Batch":
|
|
xy_type = xy_type_1
|
|
xy_value = [xy_value_1[0] + xy_value_2[0]]
|
|
elif xy_type_1 == "Positive Prompt S/R" or xy_type_1 == "Negative Prompt S/R":
|
|
xy_type = xy_type_1
|
|
xy_value = xy_value_1 + [(xy_value_1[0][0], t[1]) for t in xy_value_2[1:]]
|
|
else:
|
|
xy_type = xy_type_1
|
|
xy_value = xy_value_1 + xy_value_2
|
|
return ((xy_type, xy_value),)
|
|
|
|
########################################################################################################################
|
|
# TSC Image Overlay
|
|
class TSC_ImageOverlay:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"base_image": ("IMAGE",),
|
|
"overlay_image": ("IMAGE",),
|
|
"overlay_resize": (["None", "Fit", "Resize by rescale_factor", "Resize to width & heigth"],),
|
|
"resize_method": (["nearest-exact", "bilinear", "area"],),
|
|
"rescale_factor": ("FLOAT", {"default": 1, "min": 0.01, "max": 16.0, "step": 0.1}),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 64}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 64}),
|
|
"x_offset": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 10}),
|
|
"y_offset": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 10}),
|
|
"rotation": ("INT", {"default": 0, "min": -180, "max": 180, "step": 5}),
|
|
"opacity": ("FLOAT", {"default": 0, "min": 0, "max": 100, "step": 5}),
|
|
},
|
|
"optional": {"optional_mask": ("MASK",),}
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "apply_overlay_image"
|
|
CATEGORY = "Efficiency Nodes/Image"
|
|
|
|
def apply_overlay_image(self, base_image, overlay_image, overlay_resize, resize_method, rescale_factor,
|
|
width, height, x_offset, y_offset, rotation, opacity, optional_mask):
|
|
|
|
# Pack tuples and assign variables
|
|
size = width, height
|
|
location = x_offset, y_offset
|
|
mask = optional_mask
|
|
|
|
# Check for different sizing options
|
|
if overlay_resize != "None":
|
|
#Extract overlay_image size and store in Tuple "overlay_image_size" (WxH)
|
|
overlay_image_size = overlay_image.size()
|
|
overlay_image_size = (overlay_image_size[2], overlay_image_size[1])
|
|
if overlay_resize == "Fit":
|
|
overlay_image_size = (base_image.size[0],base_image.size[1])
|
|
elif overlay_resize == "Resize by rescale_factor":
|
|
overlay_image_size = tuple(int(dimension * rescale_factor) for dimension in overlay_image_size)
|
|
elif overlay_resize == "Resize to width & heigth":
|
|
overlay_image_size = (size[0], size[1])
|
|
|
|
samples = overlay_image.movedim(-1, 1)
|
|
overlay_image = comfy.utils.common_upscale(samples, overlay_image_size[0], overlay_image_size[1], resize_method, False)
|
|
overlay_image = overlay_image.movedim(1, -1)
|
|
|
|
overlay_image = tensor2pil(overlay_image)
|
|
|
|
# Add Alpha channel to overlay
|
|
overlay_image = overlay_image.convert('RGBA')
|
|
overlay_image.putalpha(Image.new("L", overlay_image.size, 255))
|
|
|
|
# If mask connected, check if the overlay_image image has an alpha channel
|
|
if mask is not None:
|
|
# Convert mask to pil and resize
|
|
mask = tensor2pil(mask)
|
|
mask = mask.resize(overlay_image.size)
|
|
# Apply mask as overlay's alpha
|
|
overlay_image.putalpha(ImageOps.invert(mask))
|
|
|
|
# Rotate the overlay image
|
|
overlay_image = overlay_image.rotate(rotation, expand=True)
|
|
|
|
# Apply opacity on overlay image
|
|
r, g, b, a = overlay_image.split()
|
|
a = a.point(lambda x: max(0, int(x * (1 - opacity / 100))))
|
|
overlay_image.putalpha(a)
|
|
|
|
# Split the base_image tensor along the first dimension to get a list of tensors
|
|
base_image_list = torch.unbind(base_image, dim=0)
|
|
|
|
# Convert each tensor to a PIL image, apply the overlay, and then convert it back to a tensor
|
|
processed_base_image_list = []
|
|
for tensor in base_image_list:
|
|
# Convert tensor to PIL Image
|
|
image = tensor2pil(tensor)
|
|
|
|
# Paste the overlay image onto the base image
|
|
if mask is None:
|
|
image.paste(overlay_image, location)
|
|
else:
|
|
image.paste(overlay_image, location, overlay_image)
|
|
|
|
# Convert PIL Image back to tensor
|
|
processed_tensor = pil2tensor(image)
|
|
|
|
# Append to list
|
|
processed_base_image_list.append(processed_tensor)
|
|
|
|
# Combine the processed images back into a single tensor
|
|
base_image = torch.stack([tensor.squeeze() for tensor in processed_base_image_list])
|
|
|
|
# Return the edited base image
|
|
return (base_image,)
|
|
|
|
########################################################################################################################
|
|
# Install simple_eval if missing from packages
|
|
def install_simpleeval():
|
|
if 'simpleeval' not in packages():
|
|
print("\033[32mEfficiency Nodes:\033[0m")
|
|
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'simpleeval'])
|
|
|
|
def packages(versions=False):
|
|
return [(r.decode().split('==')[0] if not versions else r.decode()) for r in subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']).split()]
|
|
|
|
install_simpleeval()
|
|
from simpleeval import simple_eval
|
|
|
|
# TSC Evaluate Integers (https://github.com/danthedeckie/simpleeval)
|
|
class TSC_EvaluateInts:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"python_expression": ("STRING", {"default": "((a + b) - c) / 2", "multiline": False}),
|
|
"print_to_console": (["False", "True"],),},
|
|
"optional": {
|
|
"a": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 1}),
|
|
"b": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 1}),
|
|
"c": ("INT", {"default": 0, "min": -48000, "max": 48000, "step": 1}),},
|
|
}
|
|
RETURN_TYPES = ("INT", "FLOAT", "STRING",)
|
|
OUTPUT_NODE = True
|
|
FUNCTION = "evaluate"
|
|
CATEGORY = "Efficiency Nodes/Simple Eval"
|
|
|
|
def evaluate(self, python_expression, print_to_console, a=0, b=0, c=0):
|
|
# simple_eval doesn't require the result to be converted to a string
|
|
result = simple_eval(python_expression, names={'a': a, 'b': b, 'c': c})
|
|
int_result = int(result)
|
|
float_result = float(result)
|
|
string_result = str(result)
|
|
if print_to_console == "True":
|
|
print("\n\033[31mEvaluate Integers:\033[0m")
|
|
print(f"\033[90m{{a = {a} , b = {b} , c = {c}}} \033[0m")
|
|
print(f"{python_expression} = \033[92m INT: " + str(int_result) + " , FLOAT: " + str(
|
|
float_result) + ", STRING: " + string_result + "\033[0m")
|
|
return (int_result, float_result, string_result,)
|
|
|
|
# TSC Evaluate Floats (https://github.com/danthedeckie/simpleeval)
|
|
class TSC_EvaluateFloats:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"python_expression": ("STRING", {"default": "((a + b) - c) / 2", "multiline": False}),
|
|
"print_to_console": (["False", "True"],),},
|
|
"optional": {
|
|
"a": ("FLOAT", {"default": 0, "min": -sys.float_info.max, "max": sys.float_info.max, "step": 1}),
|
|
"b": ("FLOAT", {"default": 0, "min": -sys.float_info.max, "max": sys.float_info.max, "step": 1}),
|
|
"c": ("FLOAT", {"default": 0, "min": -sys.float_info.max, "max": sys.float_info.max, "step": 1}),},
|
|
}
|
|
RETURN_TYPES = ("INT", "FLOAT", "STRING",)
|
|
OUTPUT_NODE = True
|
|
FUNCTION = "evaluate"
|
|
CATEGORY = "Efficiency Nodes/Simple Eval"
|
|
|
|
def evaluate(self, python_expression, print_to_console, a=0, b=0, c=0):
|
|
# simple_eval doesn't require the result to be converted to a string
|
|
result = simple_eval(python_expression, names={'a': a, 'b': b, 'c': c})
|
|
int_result = int(result)
|
|
float_result = float(result)
|
|
string_result = str(result)
|
|
if print_to_console == "True":
|
|
print("\n\033[31mEvaluate Floats:\033[0m")
|
|
print(f"\033[90m{{a = {a} , b = {b} , c = {c}}} \033[0m")
|
|
print(f"{python_expression} = \033[92m INT: " + str(int_result) + " , FLOAT: " + str(
|
|
float_result) + ", STRING: " + string_result + "\033[0m")
|
|
return (int_result, float_result, string_result,)
|
|
|
|
# TSC Evaluate Strings (https://github.com/danthedeckie/simpleeval)
|
|
class TSC_EvaluateStrs:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": {
|
|
"python_expression": ("STRING", {"default": "a + b + c", "multiline": False}),
|
|
"print_to_console": (["False", "True"],)},
|
|
"optional": {
|
|
"a": ("STRING", {"default": "Hello", "multiline": False}),
|
|
"b": ("STRING", {"default": " World", "multiline": False}),
|
|
"c": ("STRING", {"default": "!", "multiline": False}),}
|
|
}
|
|
RETURN_TYPES = ("STRING",)
|
|
OUTPUT_NODE = True
|
|
FUNCTION = "evaluate"
|
|
CATEGORY = "Efficiency Nodes/Simple Eval"
|
|
|
|
def evaluate(self, python_expression, print_to_console, a="", b="", c=""):
|
|
variables = {'a': a, 'b': b, 'c': c} # Define the variables for the expression
|
|
functions = {"len": len} # Define the functions for the expression
|
|
result = simple_eval(python_expression, names=variables, functions=functions)
|
|
if print_to_console == "True":
|
|
print("\n\033[31mEvaluate Strings:\033[0m")
|
|
print(f"\033[90ma = {a} \nb = {b} \nc = {c}\033[0m")
|
|
print(f"{python_expression} = \033[92m" + str(result) + "\033[0m")
|
|
return (str(result),) # Convert result to a string before returning
|
|
|
|
# TSC Simple Eval Examples (https://github.com/danthedeckie/simpleeval)
|
|
class TSC_EvalExamples:
|
|
filepath = os.path.join(my_dir, 'workflows', 'SimpleEval_Node_Examples.txt')
|
|
with open(filepath, 'r') as file:
|
|
examples = file.read()
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {"required": { "models_text": ("STRING", {"default": cls.examples ,"multiline": True}),},}
|
|
RETURN_TYPES = ()
|
|
CATEGORY = "Efficiency Nodes/Simple Eval"
|
|
|
|
# NODE MAPPING
|
|
NODE_CLASS_MAPPINGS = {
|
|
"KSampler (Efficient)": TSC_KSampler,
|
|
"Efficient Loader": TSC_EfficientLoader,
|
|
"XY Plot": TSC_XYplot,
|
|
"XY Input: Seeds++ Batch": TSC_XYplot_SeedsBatch,
|
|
"XY Input: Steps": TSC_XYplot_Steps,
|
|
"XY Input: CFG Scale": TSC_XYplot_CFG,
|
|
"XY Input: Sampler": TSC_XYplot_Sampler,
|
|
"XY Input: Scheduler": TSC_XYplot_Scheduler,
|
|
"XY Input: Denoise": TSC_XYplot_Denoise,
|
|
"XY Input: VAE": TSC_XYplot_VAE,
|
|
"XY Input: Positive Prompt S/R": TSC_XYplot_PromptSR_Positive,
|
|
"XY Input: Negative Prompt S/R": TSC_XYplot_PromptSR_Negative,
|
|
"XY Input: Checkpoint": TSC_XYplot_Checkpoint,
|
|
"XY Input: Clip Skip": TSC_XYplot_ClipSkip,
|
|
"XY Input: LoRA": TSC_XYplot_LoRA,
|
|
"XY Input: LoRA (Advanced)": TSC_XYplot_LoRA_Adv,
|
|
"XY Input: Manual XY Entry": TSC_XYplot_Manual_XY_Entry,
|
|
"Manual XY Entry Info": TSC_XYplot_Manual_XY_Entry_Info,
|
|
"Join XY Inputs of Same Type": TSC_XYplot_JoinInputs,
|
|
"Image Overlay": TSC_ImageOverlay,
|
|
"Evaluate Integers": TSC_EvaluateInts,
|
|
"Evaluate Floats": TSC_EvaluateFloats,
|
|
"Evaluate Strings": TSC_EvaluateStrs,
|
|
"Simple Eval Examples": TSC_EvalExamples
|
|
} |