From 08c1edba9170b346ef987c08d0d5bb6ed0e98855 Mon Sep 17 00:00:00 2001 From: TSC <112517630+LucianoCirino@users.noreply.github.com> Date: Mon, 17 Apr 2023 22:25:57 -0500 Subject: [PATCH] ____Node Changes____ XY Plot: - A new node that connects to the KSampler Efficient through a "script" type connection. - Allows user to define a 2D grid of variable parameters. - The currently supported XY parameters to plot are: 1. Incremental Seeds Batch (Seeds++ Batch) 2. Latent Batch 3. Steps 4. CFG Scale 5. Sampler, Scheduler 6. Denoise 7. VAE Ksampler (Efficient): - Upgraded the custom KSampler to handle XY Plot script inputs. - Updated Efficient Loader: - Restructured the guts of the loader for future flexibility. ... Rest of the nodes are unchanged. --- efficiency_nodes.py | 916 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 797 insertions(+), 119 deletions(-) diff --git a/efficiency_nodes.py b/efficiency_nodes.py index d1e2430..d2c04ed 100644 --- a/efficiency_nodes.py +++ b/efficiency_nodes.py @@ -4,7 +4,7 @@ from comfy.sd import ModelPatcher, CLIP, VAE from nodes import common_ksampler from torch import Tensor -from PIL import Image, ImageOps +from PIL import Image, ImageOps, ImageDraw, ImageFont from PIL.PngImagePlugin import PngInfo import numpy as np import torch @@ -47,6 +47,62 @@ loaded_objects = { "bvae": [], # (ckpt_name, location) "vae": [] # (vae_name, location) } + +def load_checkpoint(ckpt_name,output_vae=True, output_clip=True): + """ + Searches for tuple index that contains ckpt_name in "ckpt" array of loaded_objects. + If found, extracts the model, clip, and vae from the loaded_objects. + If not found, loads the checkpoint, extracts the model, clip, and vae, and adds them to the loaded_objects. + Returns the model, clip, and vae. + """ + global loaded_objects + + # Search for tuple index that contains ckpt_name in "ckpt" array of loaded_objects + checkpoint_found = False + for i, entry in enumerate(loaded_objects["ckpt"]): + if entry[0] == ckpt_name: + # Extract the second element of the tuple at 'i' in the "ckpt", "clip", "bvae" arrays + model = loaded_objects["ckpt"][i][1] + clip = loaded_objects["clip"][i][1] + vae = loaded_objects["bvae"][i][1] + checkpoint_found = True + break + + # If not found, load ckpt + if checkpoint_found == False: + # Load Checkpoint + ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) + out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, + embedding_directory=folder_paths.get_folder_paths("embeddings")) + model = out[0] + clip = out[1] + vae = out[2] + + # Update loaded_objects[] array + loaded_objects["ckpt"].append((ckpt_name, out[0])) + loaded_objects["clip"].append((ckpt_name, out[1])) + loaded_objects["bvae"].append((ckpt_name, out[2])) + + return model, clip, vae + +def load_vae(vae_name): + """ + Extracts the vae with a given name from the "vae" array in loaded_objects. + If the vae is not found, creates a new VAE object with the given name and adds it to the "vae" array. + """ + global loaded_objects + + # Check if vae_name exists in "vae" array + if any(entry[0] == vae_name for entry in loaded_objects["vae"]): + # Extract the second tuple entry of the checkpoint + vae = [entry[1] for entry in loaded_objects["vae"] if entry[0] == vae_name][0] + else: + vae_path = folder_paths.get_full_path("vae", vae_name) + vae = comfy.sd.VAE(ckpt_path=vae_path) + # Update loaded_objects[] array + loaded_objects["vae"].append((vae_name, vae)) + return vae + class TSC_EfficientLoader: @classmethod @@ -70,53 +126,22 @@ class TSC_EfficientLoader: def efficientloader(self, ckpt_name, vae_name, clip_skip, positive, negative, empty_latent_width, empty_latent_height, batch_size, output_vae=False, output_clip=True): - # Baked VAE setup - if vae_name == "Baked VAE": - output_vae = True - model: ModelPatcher | None = None clip: CLIP | None = None vae: VAE | None = None - # Search for tuple index that contains ckpt_name in "ckpt" array of loaded_lbjects - checkpoint_found = False - for i, entry in enumerate(loaded_objects["ckpt"]): - if entry[0] == ckpt_name: - # Extract the second element of the tuple at 'i' in the "ckpt", "clip", "bvae" arrays - model = loaded_objects["ckpt"][i][1] - clip = loaded_objects["clip"][i][1] - vae = loaded_objects["bvae"][i][1] - checkpoint_found = True - break - - # If not found, load ckpt - if checkpoint_found == False: - # Load Checkpoint - ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) - out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) - model = out[0] - clip = out[1] - vae = out[2] - - # Update loaded_objects[] array - loaded_objects["ckpt"].append((ckpt_name, out[0])) - loaded_objects["clip"].append((ckpt_name, out[1])) - loaded_objects["bvae"].append((ckpt_name, out[2])) - # Create Empty Latent latent = torch.zeros([batch_size, 4, empty_latent_height // 8, empty_latent_width // 8]).cpu() + # Check for "Baked VAE" selected + if vae_name == "Baked VAE": + output_vae = True + + model, clip, vae = load_checkpoint(ckpt_name,output_vae) + # Check for custom VAE if vae_name != "Baked VAE": - # Check if vae_name exists in "vae" array - if any(entry[0] == vae_name for entry in loaded_objects["vae"]): - # Extract the second tuple entry of the checkpoint - vae = [entry[1] for entry in loaded_objects["vae"] if entry[0] == vae_name][0] - else: - vae_path = folder_paths.get_full_path("vae", vae_name) - vae = comfy.sd.VAE(ckpt_path=vae_path) - # Update loaded_objects[] array - loaded_objects["vae"].append((vae_name, vae)) + vae = load_vae(vae_name) # CLIP skip if not clip: @@ -126,14 +151,16 @@ class TSC_EfficientLoader: return (model, [[clip.encode(positive), {}]], [[clip.encode(negative), {}]], {"samples":latent}, vae, clip, ) - # TSC KSampler (Efficient) last_helds: dict[str, list] = { "results": [None for _ in range(15)], "latent": [None for _ in range(15)], - "images": [None for _ in range(15)] + "images": [None for _ in range(15)], + "vae_decode": [False for _ in range(15)] } class TSC_KSampler: + + empty_image = pil2tensor(Image.new('RGBA', (1, 1), (0, 0, 0, 0))) def __init__(self): self.output_dir = os.path.join(comfy_dir, 'temp') @@ -142,7 +169,7 @@ class TSC_KSampler: @classmethod def INPUT_TYPES(cls): return {"required": - {"sampler_state": (["Sample", "Hold"], ), + {"sampler_state": (["Sample", "Hold", "Script"], ), "my_unique_id": ("INT", {"default": 0, "min": 0, "max": 15}), "model": ("MODEL",), "seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), @@ -156,7 +183,8 @@ class TSC_KSampler: "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), "preview_image": (["Disabled", "Enabled"],), }, - "optional": { "optional_vae": ("VAE",), }, + "optional": { "optional_vae": ("VAE",), #change to vae + "script": ("SCRIPT",),}, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, } @@ -167,19 +195,67 @@ class TSC_KSampler: CATEGORY = "Efficiency Nodes/Sampling" def sample(self, sampler_state, my_unique_id, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, - latent_image, preview_image, denoise=1.0, prompt=None, extra_pnginfo=None, optional_vae=(None,)): + latent_image, preview_image, denoise=1.0, prompt=None, extra_pnginfo=None, optional_vae=(None,), script=None): - empty_image = pil2tensor(Image.new('RGBA', (1, 1), (0, 0, 0, 0))) + # Functions for previewing images in Ksampler + def map_filename(filename): + prefix_len = len(os.path.basename(filename_prefix)) + prefix = filename[:prefix_len + 1] + try: + digits = int(filename[prefix_len + 1:].split('_')[0]) + except: + digits = 0 + return (digits, prefix) + + def compute_vars(input): + input = input.replace("%width%", str(images[0].shape[1])) + input = input.replace("%height%", str(images[0].shape[0])) + return input + + def preview_images(images, filename_prefix): + filename_prefix = compute_vars(filename_prefix) + + subfolder = os.path.dirname(os.path.normpath(filename_prefix)) + filename = os.path.basename(os.path.normpath(filename_prefix)) + + full_output_folder = os.path.join(self.output_dir, subfolder) + + try: + counter = max(filter(lambda a: a[1][:-1] == filename and a[1][-1] == "_", + map(map_filename, os.listdir(full_output_folder))))[0] + 1 + except ValueError: + counter = 1 + except FileNotFoundError: + os.makedirs(full_output_folder, exist_ok=True) + counter = 1 + + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + + results = list() + for image in images: + i = 255. * image.cpu().numpy() + img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) + metadata = PngInfo() + if prompt is not None: + metadata.add_text("prompt", json.dumps(prompt)) + if extra_pnginfo is not None: + for x in extra_pnginfo: + metadata.add_text(x, json.dumps(extra_pnginfo[x])) + file = f"{filename}_{counter:05}_.png" + img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=4) + results.append({ + "filename": file, + "subfolder": subfolder, + "type": self.type + }); + counter += 1 + return results + + # Vae input check vae = optional_vae - - # Preview check - preview = True - if vae == (None,) or preview_image == "Disabled": - preview = False - last_helds["results"][my_unique_id] = None - last_helds["images"][my_unique_id] = None - if vae == (None,): - print('\033[32mKSampler(Efficient)[{}]:\033[0m No vae input detected, preview image disabled'.format(my_unique_id)) + if vae == (None,): + print('\033[32mKSampler(Efficient)[{}] Warning:\033[0m No vae input detected, preview image disabled'.format(my_unique_id)) # Init last_results if last_helds["results"][my_unique_id] == None: @@ -196,90 +272,691 @@ class TSC_KSampler: # Init last_images if last_helds["images"][my_unique_id] == None: - last_images = empty_image + last_images = TSC_KSampler.empty_image else: last_images = last_helds["images"][my_unique_id] - + + # Initialize latent latent: Tensor|None = None - if sampler_state == "Sample": - samples = common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) - latent = samples[0]["samples"] - last_helds["latent"][my_unique_id] = latent - if preview == False: - return {"ui": {"images": list()}, "result": (model, positive, negative, {"samples": latent}, vae, empty_image,)} + # Define filename_prefix + filename_prefix = "KSeff_{:02d}".format(my_unique_id) - # Adjust for KSampler states + # Check the current sampler state + if sampler_state == "Sample": + + # Sample using the common KSampler function and store the samples + samples = common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, + latent_image, denoise=denoise) + + # Extract the latent samples from the returned samples dictionary + latent = samples[0]["samples"] + + # Store the latent samples in the 'last_helds' dictionary with a unique ID + last_helds["latent"][my_unique_id] = latent + + # If not in preview mode, return the results in the specified format + if preview_image == "Disabled": + # Enable vae decode on next Hold + last_helds["vae_decode"][my_unique_id] = True + return {"ui": {"images": list()}, + "result": (model, positive, negative, {"samples": latent}, vae, TSC_KSampler.empty_image,)} + else: + # Decode images and store + images = vae.decode(latent).cpu() + last_helds["images"][my_unique_id] = images + + # Disable vae decode on next Hold + last_helds["vae_decode"][my_unique_id] = False + + # Generate image results and store + results = preview_images(images, filename_prefix) + last_helds["results"][my_unique_id] = results + + # Output image results to ui and node outputs + return {"ui": {"images": results}, + "result": (model, positive, negative, {"samples": latent}, vae, images,)} + + # If the sampler state is "Hold" elif sampler_state == "Hold": - print('\033[32mKSampler(Efficient)[{}] outputs on hold\033[0m'.format(my_unique_id)) - if preview == False: - return {"ui": {"images": last_results}, "result": (model, positive, negative, last_latent, vae, last_images,)} + # Print a message indicating that the KSampler is in "Hold" state with the unique ID + print('\033[32mKSampler(Efficient)[{}]:\033[0mHeld'.format(my_unique_id)) + + # If not in preview mode, return the results in the specified format + if preview_image == "Disabled": + return {"ui": {"images": list()}, + "result": (model, positive, negative, last_latent, vae, TSC_KSampler.empty_image,)} + + # if preview_image == "Enabled": else: latent = last_latent["samples"] - images = vae.decode(latent).cpu() - last_helds["images"][my_unique_id] = images + if last_helds["vae_decode"][my_unique_id] == True: - filename_prefix = "TSC_KS_{:02d}".format(my_unique_id) + # Decode images and store + images = vae.decode(latent).cpu() + last_helds["images"][my_unique_id] = images - def map_filename(filename): - prefix_len = len(os.path.basename(filename_prefix)) - prefix = filename[:prefix_len + 1] - try: - digits = int(filename[prefix_len + 1:].split('_')[0]) - except: - digits = 0 - return (digits, prefix) + # Disable vae decode on next Hold + last_helds["vae_decode"][my_unique_id] = False - def compute_vars(input): - input = input.replace("%width%", str(images[0].shape[1])) - input = input.replace("%height%", str(images[0].shape[0])) - return input + # Generate image results and store + results = preview_images(images, filename_prefix) + last_helds["results"][my_unique_id] = results - filename_prefix = compute_vars(filename_prefix) + else: + images = last_images + results = last_results - subfolder = os.path.dirname(os.path.normpath(filename_prefix)) - filename = os.path.basename(os.path.normpath(filename_prefix)) + # Output image results to ui and node outputs + return {"ui": {"images": results}, + "result": (model, positive, negative, {"samples": latent}, vae, images,)} - full_output_folder = os.path.join(self.output_dir, subfolder) + elif sampler_state == "Script": + # If not in preview mode, return the results in the specified format + if preview_image == "Disabled": + print('\033[31mKSampler(Efficient)[{}] Error:\033[0m Preview must be enabled to use Script mode.'.format(my_unique_id)) + return {"ui": {"images": list()}, + "result": (model, positive, negative, last_latent, vae, TSC_KSampler.empty_image,)} - try: - counter = max(filter(lambda a: a[1][:-1] == filename and a[1][-1] == "_", - map(map_filename, os.listdir(full_output_folder))))[0] + 1 - except ValueError: - counter = 1 - except FileNotFoundError: - os.makedirs(full_output_folder, exist_ok=True) - counter = 1 + # If no script input connected, set X_type and Y_type to "Nothing" + if script is None: + X_type = "Nothing" + Y_type = "Nothing" + else: + # Unpack script Tuple (X_type, X_value, Y_type, Y_value, grid_spacing, latent_id) + X_type, X_value, Y_type, Y_value, grid_spacing, latent_id = script - if not os.path.exists(self.output_dir): - os.makedirs(self.output_dir) + if (X_type == "Nothing" and Y_type == "Nothing"): + print('\033[31mKSampler(Efficient)[{}] Error:\033[0m No valid script input detected'.format(my_unique_id)) + return {"ui": {"images": list()}, + "result": (model, positive, negative, last_latent, vae, TSC_KSampler.empty_image,)} - results = list() - for image in images: - i = 255. * image.cpu().numpy() - img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) - metadata = PngInfo() - if prompt is not None: - metadata.add_text("prompt", json.dumps(prompt)) - if extra_pnginfo is not None: - for x in extra_pnginfo: - metadata.add_text(x, json.dumps(extra_pnginfo[x])) - file = f"{filename}_{counter:05}_.png" - img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=4) - results.append({ - "filename": file, - "subfolder": subfolder, - "type": self.type - }); - counter += 1 - last_helds["results"][my_unique_id] = results + # Extract the 'samples' tensor from the dictionary + latent_image_tensor = latent_image['samples'] - #if sampler_state == "Sample": - # Output results to ui and node outputs - return {"ui": {"images": results}, "result": (model, positive, negative, {"samples":latent}, vae, images, )} - #if sampler_state == "Hold": - # return {"ui": {"images": last_results}, "result": (model, positive, negative, last_latent, vae, last_images,)} + # Split the tensor into individual image tensors + image_tensors = torch.split(latent_image_tensor, 1, dim=0) + + # Create a list of dictionaries containing the individual image tensors + latent_list = [{'samples': image} for image in image_tensors] + + # Set latent only to the first latent of batch + if latent_id >= len(latent_list): + print( + f'\033[31mKSampler(Efficient)[{my_unique_id}] Warning:\033[0m ' + f'The selected latent_id ({latent_id}) is out of range.\n' + f'Automatically setting the latent_id to the last image in the list (index: {len(latent_list) - 1}).') + latent_id = len(latent_list) - 1 + + latent_image = latent_list[latent_id] + + # Define X/Y_values for "Seeds++ Batch" + if X_type == "Seeds++ Batch": + X_value = [latent_image for _ in range(X_value[0])] + if Y_type == "Seeds++ Batch": + Y_value = [latent_image for _ in range(Y_value[0])] + + # Define X/Y_values for "Latent Batch" + if X_type == "Latent Batch": + X_value = latent_list + if Y_type == "Latent Batch": + Y_value = latent_list + + def define_variable(var_type, var, seed, steps, cfg,sampler_name, scheduler, latent_image, denoise, + vae_name, var_label, num_label): + + # If var_type is "Seeds++ Batch", update var and seed, and generate labels + if var_type == "Latent Batch": + latent_image = var + text = f"{len(var_label)}" + # If var_type is "Seeds++ Batch", update var and seed, and generate labels + elif var_type == "Seeds++ Batch": + text = f"seed: {seed}" + # If var_type is "Steps", update steps and generate labels + elif var_type == "Steps": + steps = var + text = f"Steps: {steps}" + # If var_type is "CFG Scale", update cfg and generate labels + elif var_type == "CFG Scale": + cfg = var + text = f"CFG Scale: {cfg}" + # If var_type is "Sampler", update sampler_name, scheduler, and generate labels + elif var_type == "Sampler": + sampler_name = var[0] + if var[1] != None: + scheduler[0] = var[1] + else: + scheduler[0] = scheduler[1] + text = f"{sampler_name} ({scheduler[0]})" + text = text.replace("ancestral", "a").replace("uniform", "u") + # If var_type is "Denoise", update denoise and generate labels + elif var_type == "Denoise": + denoise = var + text = f"Denoise: {denoise}" + # For any other var_type, set text to "?" + elif var_type == "VAE": + vae_name = var + text = f"VAE: {vae_name}" + # For any other var_type, set text to "" + else: + text = "" + + def truncate_texts(texts, num_label): + min_length = min([len(text) for text in texts]) + truncate_length = min(min_length, 24) + + if truncate_length < 16: + truncate_length = 16 + + truncated_texts = [] + for text in texts: + if len(text) > truncate_length: + text = text[:truncate_length] + "..." + truncated_texts.append(text) + + return truncated_texts + + # Add the generated text to var_label if it's not full + if len(var_label) < num_label: + var_label.append(text) + + # If var_type VAE , truncate entries in the var_label list when it's full + if len(var_label) == num_label and var_type == "VAE": + var_label = truncate_texts(var_label, num_label) + + # Return the modified variables + return steps, cfg,sampler_name, scheduler, latent_image, denoise, vae_name, var_label + + # Define a helper function to help process X and Y values + def process_values(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise, + vae,vae_name, latent_new=[], max_width=0, max_height=0, image_list=[], size_list=[]): + + # Sample + samples = common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, + latent_image, denoise=denoise) + + # Decode images and store + latent = samples[0]["samples"] + + # Add the latent tensor to the tensors list + latent_new.append(latent) + + # Load custom vae if available + if vae_name is not None: + vae = load_vae(vae_name) + + # Decode the image + image = vae.decode(latent).cpu() + + # Convert the image from tensor to PIL Image and add it to the list + pil_image = tensor2pil(image) + image_list.append(pil_image) + size_list.append(pil_image.size) + + # Update max dimensions + max_width = max(max_width, pil_image.width) + max_height = max(max_height, pil_image.height) + + # Return the touched variables + return image_list, size_list, max_width, max_height, latent_new + + # Initiate Plot label text variables X/Y_label + X_label = [] + Y_label = [] + + # Seed_updated for "Seeds++ Batch" incremental seeds + seed_updated = seed + + # Store the KSamplers original scheduler inside the same scheduler variable + scheduler = [scheduler, scheduler] + + # By default set vae_name to None + vae_name = None + + # Fill Plot Rows (X) + for X_index, X in enumerate(X_value): + # Seed control based on loop index during Batch + if X_type == "Seeds++ Batch": + # Update seed based on the inner loop index + seed_updated = seed + X_index + + # Define X parameters and generate labels + steps, cfg, sampler_name, scheduler, latent_image, denoise, vae_name, X_label = \ + define_variable(X_type, X, seed_updated, steps, cfg, sampler_name, scheduler, latent_image, + denoise, vae_name, X_label, len(X_value)) + + if Y_type != "Nothing": + # Seed control based on loop index during Batch + for Y_index, Y in enumerate(Y_value): + if Y_type == "Seeds++ Batch": + # Update seed based on the inner loop index + seed_updated = seed + Y_index + + # Define Y parameters and generate labels + steps, cfg, sampler_name, scheduler, latent_image, denoise, vae_name, Y_label = \ + define_variable(Y_type, Y, seed_updated, steps, cfg, sampler_name, scheduler, latent_image, + denoise, vae_name, Y_label, len(Y_value)) + + # Generate images + image_list, size_list, max_width, max_height, latent_new = \ + process_values(model, seed_updated, steps, cfg, sampler_name, scheduler[0], + positive, negative, latent_image, denoise, vae, vae_name) + else: + # Generate images + image_list, size_list, max_width, max_height, latent_new = \ + process_values(model, seed_updated, steps, cfg, sampler_name, scheduler[0], + positive, negative, latent_image, denoise, vae, vae_name) + + + def adjusted_font_size(text, initial_font_size, max_width): + font = ImageFont.truetype('arial.ttf', initial_font_size) + text_width, _ = font.getsize(text) + + if text_width > (max_width * 0.9): + scaling_factor = 0.9 # A value less than 1 to shrink the font size more aggressively + new_font_size = int(initial_font_size * (max_width / text_width) * scaling_factor) + else: + new_font_size = initial_font_size + + return new_font_size + + # Disable vae decode on next Hold + last_helds["vae_decode"][my_unique_id] = False + + # Extract plot dimensions + num_rows = max(len(Y_value) if Y_value is not None else 0, 1) + num_cols = max(len(X_value) if X_value is not None else 0, 1) + + def rearrange_tensors(latent, num_cols, num_rows): + new_latent = [] + for i in range(num_rows): + for j in range(num_cols): + index = j * num_rows + i + new_latent.append(latent[index]) + return new_latent + + # Rearrange latent array to match preview image grid + latent_new = rearrange_tensors(latent_new, num_cols, num_rows) + + # Concatenate the tensors along the first dimension (dim=0) + latent_new = torch.cat(latent_new, dim=0) + + # Store latent_new as last latent + last_helds["latent"][my_unique_id] = latent_new + + # Calculate the dimensions of the white background image + border_size = max_width // 15 + + # Modify the background width and x_offset initialization based on Y_type + if Y_type == "Nothing": + bg_width = num_cols * max_width + (num_cols - 1) * grid_spacing + x_offset_initial = 0 + else: + bg_width = num_cols * max_width + (num_cols - 1) * grid_spacing + 3 * border_size + x_offset_initial = border_size * 3 + + # Modify the background height based on X_type + if X_type == "Nothing": + bg_height = num_rows * max_height + (num_rows - 1) * grid_spacing + y_offset = 0 + else: + bg_height = num_rows * max_height + (num_rows - 1) * grid_spacing + 2.3 * border_size + y_offset = border_size * 3 + + # Create the white background image + background = Image.new('RGBA', (int(bg_width), int(bg_height)), color=(255, 255, 255, 255)) + + for row in range(num_rows): + + # Initialize the X_offset + x_offset = x_offset_initial + + for col in range(num_cols): + # Calculate the index for image_list + index = col * num_rows + row + img = image_list[index] + + # Paste the image + background.paste(img, (x_offset, y_offset)) + + if row == 0 and X_type != "Nothing": + # Assign text + text = X_label[col] + + # Add the corresponding X_value as a label above the image + initial_font_size = int(48 * img.width / 512) + font_size = adjusted_font_size(text, initial_font_size, img.width) + label_height = int(font_size*1.5) + + # Create a white background label image + label_bg = Image.new('RGBA', (img.width, label_height), color=(255, 255, 255, 0)) + d = ImageDraw.Draw(label_bg) + + # Create the font object + font = ImageFont.truetype('arial.ttf', font_size) + + # Calculate the text size and the starting position + text_width, text_height = d.textsize(text, font=font) + text_x = (img.width - text_width) // 2 + text_y = (label_height - text_height) // 2 + + # Add the text to the label image + d.text((text_x, text_y), text, fill='black', font=font) + + # Calculate the available space between the top of the background and the top of the image + available_space = y_offset - label_height + + # Calculate the new Y position for the label image + label_y = available_space // 2 + + # Paste the label image above the image on the background using alpha_composite() + background.alpha_composite(label_bg, (x_offset, label_y)) + + if col == 0 and Y_type != "Nothing": + # Assign text + text = Y_label[row] + + # Add the corresponding Y_value as a label to the left of the image + initial_font_size = int(48 * img.height / 512) + font_size = adjusted_font_size(text, initial_font_size, img.height) + + # Create a white background label image + label_bg = Image.new('RGBA', (img.height, font_size), color=(255, 255, 255, 0)) + d = ImageDraw.Draw(label_bg) + + # Create the font object + font = ImageFont.truetype('arial.ttf', font_size) + + # Calculate the text size and the starting position + text_width, text_height = d.textsize(text, font=font) + text_x = (img.height - text_width) // 2 + text_y = (font_size - text_height) // 2 + + # Add the text to the label image + d.text((text_x, text_y), text, fill='black', font=font) + + # Rotate the label_bg 90 degrees counter-clockwise + if Y_type != "Latent Batch": + label_bg = label_bg.rotate(90, expand=True) + + # Calculate the available space between the left of the background and the left of the image + available_space = x_offset - label_bg.width + + # Calculate the new X position for the label image + label_x = available_space // 2 + + # Calculate the Y position for the label image + label_y = y_offset + (img.height - label_bg.height) // 2 + + # Paste the label image to the left of the image on the background using alpha_composite() + background.alpha_composite(label_bg, (label_x, label_y)) + + # Update the x_offset + x_offset += img.width + grid_spacing + + # Update the y_offset + y_offset += img.height + grid_spacing + + images = pil2tensor(background) + last_helds["images"][my_unique_id] = images + + # Generate image results and store + results = preview_images(images, filename_prefix) + last_helds["results"][my_unique_id] = results + + # Output image results to ui and node outputs + return {"ui": {"images": results}, "result": (model, positive, negative, {"samples": latent_new}, vae, images,)} + + +# TSC XY Plot +class TSC_XYplot: + examples = "(X/Y_types) (X/Y_values)\n" \ + "Latent Batch n/a\n" \ + "Seeds++ Batch 3\n" \ + "Steps 15;20;25\n" \ + "CFG Scale 5;10;15;20\n" \ + "Sampler(1) dpmpp_2s_ancestral;euler;ddim\n" \ + "Sampler(2) dpmpp_2m,karras;heun,normal\n" \ + "Denoise .3;.4;.5;.6;.7\n" \ + "VAE vae_1; vae_2; vae_3" + + samplers = ";\n".join(comfy.samplers.KSampler.SAMPLERS) + schedulers = ";\n".join(comfy.samplers.KSampler.SCHEDULERS) + vaes = ";\n".join(folder_paths.get_filename_list("vae")) + notes = "- During a 'Latent Batch', the corresponding X/Y_value is ignored.\n" \ + "- During a 'Latent Batch', the latent_id is ignored.\n" \ + "- For a 'Seeds++ Batch', starting seed is defined by the KSampler.\n" \ + "- Trailing semicolons are ignored in the X/Y_values.\n" \ + "- Parameter types not set by this node are defined in the KSampler." + + @classmethod + def INPUT_TYPES(cls): + return {"required": { + "X_type": (["Nothing", "Latent Batch", "Seeds++ Batch", + "Steps", "CFG Scale", "Sampler", "Denoise", "VAE"],), + "X_value": ("STRING", {"default": "", "multiline": False}), + "Y_type": (["Nothing", "Latent Batch", "Seeds++ Batch", + "Steps", "CFG Scale", "Sampler", "Denoise", "VAE"],), + "Y_value": ("STRING", {"default": "", "multiline": False}), + "grid_spacing": ("INT", {"default": 0, "min": 0, "max": 500, "step": 5}), + "XY_flip": (["False","True"],), + "latent_id": ("INT", {"default": 0, "min": 0, "max": 100}), + "help": ("STRING", {"default": + f"____________EXAMPLES____________\n{cls.examples}\n\n" + f"____________SAMPLERS____________\n{cls.samplers}\n\n" + f"___________SCHEDULERS___________\n{cls.schedulers}\n\n" + f"______________VAE_______________\n{cls.vaes}\n\n" + f"_____________NOTES______________\n{cls.notes}", + "multiline": True}),}, + } + RETURN_TYPES = ("SCRIPT",) + RETURN_NAMES = ("script",) + FUNCTION = "XYplot" + CATEGORY = "Efficiency Nodes/Scripts" + + def XYplot(self, X_type, X_value, Y_type, Y_value, grid_spacing, XY_flip, latent_id, help): + + # Store values as arrays + X_value = X_value.replace(" ", "").replace("\n", "") # Remove spaces and newline characters + X_value = X_value.rstrip(";") # Remove trailing semicolon + X_value = X_value.split(";") # Turn to array + + Y_value = Y_value.replace(" ", "").replace("\n", "") # Remove spaces and newline characters + Y_value = Y_value.rstrip(";") # Remove trailing semicolon + Y_value = Y_value.split(";") # Turn to array + + # Define the valid bounds for each type + bounds = { + "Seeds++ Batch": {"min": 0, "max": 50}, + "Steps": {"min": 0}, + "CFG Scale": {"min": 0, "max": 100}, + "Sampler": {"options": comfy.samplers.KSampler.SAMPLERS}, + "Scheduler": {"options": comfy.samplers.KSampler.SCHEDULERS}, + "Denoise": {"min": 0, "max": 1}, + "VAE": {"options": folder_paths.get_filename_list("vae")} + } + + def validate_value(value, value_type, bounds): + """ + Validates a value based on its corresponding value_type and bounds. + + Parameters: + value (str or int or float): The value to validate. + value_type (str): The type of the value, which determines the valid bounds. + bounds (dict): A dictionary that contains the valid bounds for each value_type. + + Returns: + The validated value. + None if no validation was done or failed. + """ + + if value_type == "Seeds++ Batch": + try: + x = float(value) + except ValueError: + print( + f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid batch count.") + return None + + if not x.is_integer(): + print( + f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid batch count.") + return None + else: + x = int(x) + + if x < bounds["Seeds++ Batch"]["min"]: + x = bounds["Seeds++ Batch"]["min"] + elif x > bounds["Seeds++ Batch"]["max"]: + x = bounds["Seeds++ Batch"]["max"] + + return x + elif value_type == "Steps": + try: + x = int(value) + if x < bounds["Steps"]["min"]: + x = bounds["Steps"]["min"] + return x + except ValueError: + print( + f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid Step count.") + return None + elif value_type == "CFG Scale": + try: + x = float(value) + if x < bounds["CFG Scale"]["min"]: + x = bounds["CFG Scale"]["min"] + elif x > bounds["CFG Scale"]["max"]: + x = bounds["CFG Scale"]["max"] + return x + except ValueError: + print( + f"\033[31mXY Plot Error:\033[0m '{value}' is not a number between {bounds['CFG Scale']['min']}" + f" and {bounds['CFG Scale']['max']} for CFG Scale.") + return None + elif value_type == "Sampler": + if isinstance(value, str) and ',' in value: + value = tuple(map(str.strip, value.split(','))) + + if isinstance(value, tuple): + if len(value) == 2: + sampler, scheduler = value + scheduler = scheduler.lower() # Convert the scheduler name to lowercase + if sampler not in bounds["Sampler"]["options"]: + valid_samplers = '\n'.join(bounds["Sampler"]["options"]) + print( + f"\033[31mXY Plot Error:\033[0m '{sampler}' is not a valid sampler. Valid samplers are:\n{valid_samplers}") + sampler = None + if scheduler not in bounds["Scheduler"]["options"]: + valid_schedulers = '\n'.join(bounds["Scheduler"]["options"]) + print( + f"\033[31mXY Plot Error:\033[0m '{scheduler}' is not a valid scheduler. Valid schedulers are:\n{valid_schedulers}") + scheduler = None + if sampler is None or scheduler is None: + return None + else: + return sampler, scheduler + else: + print( + f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid sampler.'") + return None + else: + if value not in bounds["Sampler"]["options"]: + valid_samplers = '\n'.join(bounds["Sampler"]["options"]) + print( + f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid sampler. Valid samplers are:\n{valid_samplers}") + return None + else: + return value, None + elif value_type == "Denoise": + try: + x = float(value) + if x < bounds["Denoise"]["min"]: + x = bounds["Denoise"]["min"] + elif x > bounds["Denoise"]["max"]: + x = bounds["Denoise"]["max"] + return x + except ValueError: + print( + f"\033[31mXY Plot Error:\033[0m '{value}' is not a number between {bounds['Denoise']['min']} " + f"and {bounds['Denoise']['max']} for Denoise.") + return None + elif value_type == "VAE": + if value not in bounds["VAE"]["options"]: + valid_vaes = '\n'.join(bounds["VAE"]["options"]) + print( + f"\033[31mXY Plot Error:\033[0m '{value}' is not a valid VAE. Valid VAEs are:\n{valid_vaes}") + return None + else: + return value + else: + return None + + def reset_variables(): + X_type = "Nothing" + X_value = [None] + Y_type = "Nothing" + Y_value = [None] + latent_id = None + grid_spacing = None + return X_type, X_value, Y_type, Y_value, grid_spacing, latent_id + + if X_type == Y_type == "Nothing": + return (reset_variables(),) + + # If types are the same, error and return + if (X_type == Y_type) and (X_type != "Nothing"): + print(f"\033[31mXY Plot Error:\033[0m X_type and Y_type must be different.") + # Reset variables to default values and return + return (reset_variables(),) + + # Validate X_value array length is 1 if doing a "Seeds++ Batch" + if len(X_value) != 1 and X_type == "Seeds++ Batch": + print(f"\033[31mXY Plot Error:\033[0m '{';'.join(X_value)}' is not a valid batch count.") + return (reset_variables(),) + + # Validate Y_value array length is 1 if doing a "Seeds++ Batch" + if len(Y_value) != 1 and Y_type == "Seeds++ Batch": + print(f"\033[31mXY Plot Error:\033[0m '{';'.join(Y_value)}' is not a valid batch count.") + return (reset_variables(),) + + # Loop over each entry in X_value and check if it's valid + # Validate X_value based on X_type + if X_type != "Nothing" and X_type != "Latent Batch": + for i in range(len(X_value)): + X_value[i] = validate_value(X_value[i], X_type, bounds) + if X_value[i] == None: + # Reset variables to default values and return + return (reset_variables(),) + + # Loop over each entry in Y_value and check if it's valid + # Validate Y_value based on Y_type + if Y_type != "Nothing" and Y_type != "Latent Batch": + for i in range(len(Y_value)): + Y_value[i] = validate_value(Y_value[i], Y_type, bounds) + if Y_value[i] == None: + # Reset variables to default values and return + return (reset_variables(),) + + # Clean X/Y_values + if X_type == "Nothing" or X_type == "Latent Batch": + X_value = [None] + if Y_type == "Nothing" or Y_type == "Latent Batch": + Y_value = [None] + + # Flip X and Y + if XY_flip == "True": + X_type, Y_type = Y_type, X_type + X_value, Y_value = Y_value, X_value + + # Print the validated values + if X_type != "Nothing" and X_type != "Latent Batch": + print("\033[90m" + f"XY Plot validated values for X_type '{X_type}': {', '.join(map(str, X_value))}\033[0m") + if Y_type != "Nothing" and Y_type != "Latent Batch": + print("\033[90m" + f"XY Plot validated values for Y_type '{Y_type}': {', '.join(map(str, Y_value))}\033[0m") + + return ((X_type, X_value, Y_type, Y_value, grid_spacing, latent_id),) # TSC Image Overlay @@ -420,6 +1097,7 @@ class TSC_EvaluateStrs: NODE_CLASS_MAPPINGS = { "KSampler (Efficient)": TSC_KSampler, "Efficient Loader": TSC_EfficientLoader, + "XY Plot": TSC_XYplot, "Image Overlay": TSC_ImageOverlay, "Evaluate Integers": TSC_EvaluateInts, "Evaluate Strings": TSC_EvaluateStrs,