import os from server import PromptServer import aiohttp.web as web import time import requests from PIL import Image, ImageSequence, ImageOps import numpy as np import torch from io import BytesIO import json import threading import random import importlib import folder_paths import node_helpers import hashlib from folder_paths import get_filename_list, get_full_path, models_dir import nodes from pathlib import Path import subprocess # ====================== # SHARED UTILITY FUNCTIONS # ====================== def get_civitai_base_paths(): """Returns common paths for CivitAI integration""" custom_nodes_dir = Path(__file__).parent.parent.parent.parent civitai_base_path = custom_nodes_dir / "ComfyUI" / "custom_nodes" / "Bjornulf_custom_nodes" / "civitai" return custom_nodes_dir, civitai_base_path, civitai_base_path # Last one is parsed_models_path def setup_checkpoint_directory(model_type): """Creates and registers checkpoint directory for specific model type""" _, _, parsed_models_path = get_civitai_base_paths() checkpoint_dir = Path(folder_paths.models_dir) / "checkpoints" / "Bjornulf_civitAI" / model_type checkpoint_dir.mkdir(parents=True, exist_ok=True) checkpoint_folders = list(folder_paths.folder_names_and_paths["checkpoints"]) if str(checkpoint_dir) not in checkpoint_folders: checkpoint_folders.append(str(checkpoint_dir)) folder_paths.folder_names_and_paths["checkpoints"] = tuple(checkpoint_folders) return checkpoint_dir, parsed_models_path def setup_image_folders(folder_specs, parent_dir=""): """Creates and registers image folders for different model types Args: folder_specs: Dictionary of folder_name -> sub_path parent_dir: Optional subdirectory to place links under in input folder """ _, civitai_base_path, _ = get_civitai_base_paths() for folder_name, sub_path in folder_specs.items(): full_path = civitai_base_path / sub_path folder_paths.add_model_folder_path(folder_name, str(full_path)) create_symlink(full_path, folder_name, parent_dir) # Code works, tested on linux and windows def create_symlink(source, target_name, parent_dir=None): """Creates a symlink inside the ComfyUI/input directory on Linux and Windows.""" if os.name == 'nt': # Windows comfyui_input = Path("ComfyUI/input") else: comfyui_input = Path("input") # Ensure the input directory exists comfyui_input.mkdir(parents=True, exist_ok=True) if parent_dir: parent_path = comfyui_input / parent_dir parent_path.mkdir(parents=True, exist_ok=True) target = parent_path / target_name else: target = comfyui_input / target_name # Windows handling remains unchanged if os.name == 'nt': if not target.exists(): try: base_path = Path(__file__).resolve().parent # Get script location source_path = base_path / "ComfyUI" / source # Ensure it points inside ComfyUI try: target.symlink_to(source_path, target_is_directory=source_path.is_dir()) #print(f"✅ Symlink created: {target} -> {source_path}") except OSError: if source_path.is_dir(): cmd = [ "powershell", "New-Item", "-ItemType", "Junction", "-Path", str(target), "-Value", str(source_path) ] subprocess.run(cmd, check=True, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) #print(f"✅ Junction created: {target} -> {source_path}") else: print(f"❌ Failed to create symlink/junction for {target_name}.") except Exception as e: print(f"❌ Failed to create symlink for {target_name}: {e}") else: # Linux handling with complete error management try: # Check if source is already absolute path if os.path.isabs(source): source_path = Path(source) # Check if the source exists with the given case if not source_path.exists(): # Try case variations for Bjornulf/bjornulf part of the path if 'Bjornulf_custom_nodes' in str(source_path): alt_source_path = Path(str(source_path).replace('Bjornulf_custom_nodes', 'bjornulf_custom_nodes')) if alt_source_path.exists(): source_path = alt_source_path elif 'bjornulf_custom_nodes' in str(source_path): alt_source_path = Path(str(source_path).replace('bjornulf_custom_nodes', 'Bjornulf_custom_nodes')) if alt_source_path.exists(): source_path = alt_source_path # If still doesn't exist after trying case variations if not source_path.exists(): print(f"❌ Source path doesn't exist (checked both cases): {source}") return else: # For relative paths source_path = Path(source).absolute() if not source_path.exists(): print(f"❌ Source path doesn't exist: {source_path}") return # Force remove target if it exists (regardless of type) if target.exists() or target.is_symlink(): try: if target.is_dir() and not target.is_symlink(): import shutil shutil.rmtree(target) else: os.unlink(target) except Exception as e: print(f"❌ Failed to remove existing target {target}: {e}") return # Create the symlink try: os.symlink(source_path, target, target_is_directory=source_path.is_dir()) #print(f"✅ Symlink created: {target} -> {source_path}") except Exception as e: # Try with explicit target_is_directory set based on source try: os.symlink(source_path, target, target_is_directory=True) #print(f"✅ Symlink created with explicit directory flag: {target} -> {source_path}") except Exception as e2: print(f"❌ Failed to create symlink for {target_name}: {e2}") except Exception as e: print(f"❌ Failed to create symlink for {target_name}: {e}") def download_file(url, destination_path, model_name, api_token=None): """Universal downloader with progress tracking""" headers = {'Authorization': f'Bearer {api_token}'} if api_token else {} filename = f"{model_name}.safetensors" file_path = Path(destination_path) / filename try: with requests.get(url, headers=headers, stream=True) as response: response.raise_for_status() file_size = int(response.headers.get('content-length', 0)) # Initialize progress tracking if file size is known if file_size > 0: downloaded = 0 bar_width = 20 # Fixed width for the progress bar with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) # Progress bar logic if file_size > 0: downloaded += len(chunk) progress = min(100, int((downloaded / file_size) * 100)) num_hashes = int(progress / (100 / bar_width)) bar = "[" + "#" * num_hashes + " " * (bar_width - num_hashes) + "]" percentage = f"{progress:3d}%" print(f"\r{bar} {percentage}", end="", flush=True) # Add a newline after download completes to avoid overwriting if file_size > 0: print() # Moves to the next line after completion return str(file_path) except Exception as e: raise RuntimeError(f"Download failed: {str(e)}") # Set up main checkpoint directory _, civitai_base_path, parsed_models_path = get_civitai_base_paths() bjornulf_checkpoint_path = Path(folder_paths.models_dir) / "checkpoints" / "Bjornulf_civitAI" bjornulf_checkpoint_path.mkdir(parents=True, exist_ok=True) # Register the main checkpoint folder checkpoint_folders = list(folder_paths.folder_names_and_paths["checkpoints"]) if str(bjornulf_checkpoint_path) not in checkpoint_folders: checkpoint_folders.append(str(bjornulf_checkpoint_path)) folder_paths.folder_names_and_paths["checkpoints"] = tuple(checkpoint_folders) # Define image folders image_folders = { "sdxl_1.0": "sdxl_1.0", "sd_1.5": "sd_1.5", "pony": "pony", "flux.1_d": "flux.1_d", "flux.1_s": "flux.1_s", "lora_sdxl_1.0": "lora_sdxl_1.0", "lora_sd_1.5": "lora_sd_1.5", "lora_pony": "lora_pony", "lora_flux.1_d": "lora_flux.1_d", "lora_hunyuan_video": "lora_hunyuan_video", # "NSFW_lora_hunyuan_video": "NSFW_lora_hunyuan_video" } # Set up image folders using the function, placing links under input/Bjornulf/ setup_image_folders(image_folders) def get_civitai(): import civitai importlib.reload(civitai) return civitai # Check if the environment variable exists if "CIVITAI_API_TOKEN" not in os.environ: os.environ["CIVITAI_API_TOKEN"] = "d5fc336223a367e6b503a14a10569825" # else: # print("CIVITAI_API_TOKEN already exists in the environment.") import civitai # ====================== # GENERATE WITH CIVITAI # ====================== class APIGenerateCivitAI: @classmethod def INPUT_TYPES(cls): """Define the input types for the node.""" return { "required": { "api_token": ("STRING", {"default": "", "placeholder": "CivitAI API token"}), "prompt": ("STRING", {"multiline": True, "default": "RAW photo, face portrait photo of 26 y.o woman"}), "negative_prompt": ("STRING", { "multiline": True, "default": "low quality, blurry, pixelated, distorted, artifacts" }), "width": ("INT", {"default": 1024, "min": 128, "max": 1024, "step": 64}), "height": ("INT", {"default": 768, "min": 128, "max": 1024, "step": 64}), "steps": ("INT", {"default": 20, "min": 1, "max": 50, "step": 1}), "cfg_scale": ("FLOAT", {"default": 7.0, "min": 1.0, "max": 30.0, "step": 0.1}), "seed": ("INT", {"default": -1, "min": -1, "max": 0x7FFFFFFFFFFFFFFF}), "number_of_images": ("INT", {"default": 1, "min": 1, "max": 10, "step": 1}), "timeout": ("INT", {"default": 300, "min": 60, "max": 1800, "step": 60}), }, "optional": { "model_urn": ("STRING", {"default": "urn:air:sdxl:checkpoint:civitai:101055@128078"}), #SDXL default "add_LORA": ("STRING", {"multiline": True, "default": ""}), "DO_NOT_WAIT": ("BOOLEAN", {"default": False, "label_on": "Save Links Only", "label_off": "Generate Now"}), "links_file": ("STRING", {"default": "", "multiline": False}), "LIST_from_style_selector": ("STRING", { "default": "", "multiline": True, "placeholder": "e.g., Low Poly ;Samaritan 3D Cartoon;urn:air:sdxl:checkpoint:civitai:81270@144566;https://civitai.green/models/81270?modelVersionId=144566" }), } } RETURN_TYPES = ("IMAGE", "STRING") RETURN_NAMES = ("images", "generation_info") FUNCTION = "generate" CATEGORY = "Civitai" def __init__(self): self.links_dir = "Bjornulf/civitai_links" os.makedirs(self.links_dir, exist_ok=True) self._interrupt_event = threading.Event() def generate(self, api_token, prompt, negative_prompt, width, height, steps, cfg_scale, seed, number_of_images, timeout, model_urn="", add_LORA="", DO_NOT_WAIT=False, links_file="", LIST_from_style_selector=""): """Generate images or save links based on DO_NOT_WAIT.""" if not api_token: raise ValueError("API token is required") os.environ["CIVITAI_API_TOKEN"] = api_token civitai = get_civitai() empty_image = torch.zeros((1, 512, 512, 3)) # Extract model_urn from LIST_from_style_selector if model_urn is empty and LIST_from_style_selector is provided if not model_urn and LIST_from_style_selector: parts = LIST_from_style_selector.split(';') if len(parts) >= 3: model_urn = parts[2].strip() else: raise ValueError("Invalid LIST_from_style_selector format: cannot extract model_urn") if not model_urn: raise ValueError("model_urn is required") seed = random.randint(0, 0x7FFFFFFFFFFFFFFF) if seed == -1 else seed jobs = [] # Prepare job requests for i in range(number_of_images): current_seed = seed + i input_data = { "model": model_urn, "params": { "prompt": prompt, "negativePrompt": negative_prompt, "scheduler": "EulerA", "steps": steps, "cfgScale": cfg_scale, "width": width, "height": height, "clipSkip": 2, "seed": current_seed } } if add_LORA: try: lora_data = json.loads(add_LORA) if "additionalNetworks" in lora_data: input_data["additionalNetworks"] = lora_data["additionalNetworks"] except Exception as e: print(f"Error processing LORA data: {str(e)}") response = civitai.image.create(input_data) if 'token' not in response or 'jobs' not in response: raise ValueError("Invalid API response") jobs.append({ 'token': response['token'], 'job_id': response['jobs'][0]['jobId'], 'input_data': input_data }) # Save links if DO_NOT_WAIT is True if DO_NOT_WAIT: date_str = time.strftime("%d_%B_%Y").lower() base_name = f"{date_str}_" existing_files = [f for f in os.listdir(self.links_dir) if f.startswith(base_name) and f.endswith(".txt")] next_number = max([int(f[len(base_name):-4]) for f in existing_files] or [0]) + 1 file_name = f"{date_str}_{next_number:03d}.txt" file_path = os.path.join(self.links_dir, links_file if links_file else file_name) mode = 'a' if links_file else 'w' if not file_path.endswith(".txt"): file_path += ".txt" with open(file_path, mode) as f: for job in jobs: if LIST_from_style_selector: f.write(f"{LIST_from_style_selector};Token: {job['token']};Job ID: {job['job_id']}\n") else: f.write(f"Token: {job['token']};Job ID: {job['job_id']}\n") generation_info = { "status": "links_saved", "links_file": file_path, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "number_of_jobs": len(jobs) } return (empty_image, json.dumps(generation_info, indent=2)) # Generate images immediately (DO_NOT_WAIT=False) images = [] infos = [] failed_jobs = [] for job in jobs: try: image_url = self.check_job_status(job['token'], job['job_id'], timeout) image_response = requests.get(image_url) if image_response.status_code != 200: raise ConnectionError(f"Image download failed: {image_response.status_code}") img = Image.open(BytesIO(image_response.content)).convert('RGB') img_tensor = torch.from_numpy(np.array(img).astype(np.float32) / 255.0) images.append(img_tensor.unsqueeze(0)) infos.append(self.format_generation_info(job['input_data'], job['token'], job['job_id'], image_url)) except Exception as e: failed_jobs.append({'job': job, 'error': str(e)}) if not images: generation_info = {"error": "All jobs failed", "failed_jobs": failed_jobs} return (empty_image, json.dumps(generation_info, indent=2)) combined_tensor = torch.cat(images, dim=0) combined_info = { "successful_generations": len(images), "total_requested": number_of_images, "individual_results": infos, "failed_jobs": failed_jobs if failed_jobs else None } return (combined_tensor, json.dumps(combined_info, indent=2)) def check_job_status(self, job_token, job_id, timeout=9999): """Check job status with timeout""" start_time = time.time() while time.time() - start_time < timeout and not self._interrupt_event.is_set(): try: response = civitai.jobs.get(token=job_token) job_status = response['jobs'][0] if job_status.get('status') == 'failed': raise Exception(f"Job failed: {job_status.get('error', 'Unknown error')}") if job_status['result'].get('available'): return job_status['result'].get('blobUrl') print(f"Job Status: {job_status['status']}") time.sleep(2) except Exception as e: print(f"Error checking job status: {str(e)}") time.sleep(2) # Check for interruption if self._interrupt_event.is_set(): raise InterruptedError("Generation interrupted by user") if self._interrupt_event.is_set(): raise InterruptedError("Generation interrupted by user") raise TimeoutError(f"Job timed out after {timeout} seconds") def format_generation_info(self, input_data, token, job_id, image_url): """Format generation info (implementation assumed).""" return {"token": token, "job_id": job_id, "image_url": image_url} class LoadCivitAILinks: @classmethod def INPUT_TYPES(cls): """Define the input types for the node.""" return { "required": { "api_token": ("STRING", {"default": "", "placeholder": "CivitAI API token"}), "links_file_path": ("STRING", { "default": "", "placeholder": "Path to links file (priority if not empty)" }), "selected_file": (["Not selected"] + cls.get_links_files(), { "default": "Not selected" }), "direct_links": ("STRING", { "multiline": True, "default": "", "placeholder": "Enter links directly (e.g., Style;Model;URN;Link;Token: ;Job ID: )" }), }, "optional": { "auto_save": ("BOOLEAN", { "default": False, "label_on": "Enable Auto-Save", "label_off": "Disable Auto-Save" }), } } RETURN_TYPES = ("IMAGE", "STRING", "STRING") RETURN_NAMES = ("images", "status_info", "LIST_style") OUTPUT_IS_LIST = (False, False, True) FUNCTION = "load_images" CATEGORY = "Civitai" def __init__(self): """Initialize the node with the links directory.""" self.links_dir = "Bjornulf/civitai_links" os.makedirs(self.links_dir, exist_ok=True) @classmethod def get_links_files(cls): links_dir = "Bjornulf/civitai_links" if not os.path.exists(links_dir): return [] files = [f for f in os.listdir(links_dir) if f.endswith(".txt")] return files def load_images(self, api_token, links_file_path, selected_file, direct_links, auto_save=False): """Load images from links and optionally save them to style-based folders.""" if not api_token: raise ValueError("API token is required") os.environ["CIVITAI_API_TOKEN"] = api_token civitai = get_civitai() # Determine the source of links lines = None if links_file_path: if not os.path.exists(links_file_path): raise ValueError(f"File path '{links_file_path}' does not exist") with open(links_file_path, 'r') as f: lines = f.readlines() elif selected_file != "Not selected": file_path = os.path.join(self.links_dir, selected_file) if not os.path.exists(file_path): raise ValueError(f"Selected file '{file_path}' does not exist") with open(file_path, 'r') as f: lines = f.readlines() elif direct_links: lines = direct_links.splitlines() else: raise ValueError("No valid links source provided") images = [] list_styles = [] # To store LIST_style strings status_info = { "loaded": 0, "failed": 0, "attempted": 0, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } for line in lines: line = line.strip() if not line: continue status_info["attempted"] += 1 try: parts = line.split(";") if len(parts) == 6: style = parts[0].strip() model_name = parts[1].strip() model_urn = parts[2].strip() model_link = parts[3].strip() token = parts[4].split("Token: ")[1].strip() job_id = parts[5].split("Job ID: ")[1].strip() list_style = ';'.join(parts[:4]) elif len(parts) == 2 and "Token: " in parts[0] and "Job ID: " in parts[1]: token = parts[0].split("Token: ")[1].strip() job_id = parts[1].split("Job ID: ")[1].strip() list_style = "" else: raise ValueError(f"Invalid link format: {line}") # Fetch job status from CivitAI API response = civitai.jobs.get(token=token) job_status = next((job for job in response['jobs'] if job['jobId'] == job_id), None) if not job_status or not job_status['result'].get('available'): status_info["failed"] += 1 continue # Download and process the image image_url = job_status['result'].get('blobUrl') image_response = requests.get(image_url) if image_response.status_code != 200: status_info["failed"] += 1 continue img = Image.open(BytesIO(image_response.content)) if img.mode != 'RGB': img = img.convert('RGB') # Auto-save if enabled and style is available if auto_save and len(parts) == 6: style_folder = style.replace(" ", "_") # Replace spaces with underscores save_dir = os.path.join(folder_paths.get_output_directory(), "civitai_autosave", style_folder) os.makedirs(save_dir, exist_ok=True) file_name = f"{job_id}.png" file_path = os.path.join(save_dir, file_name) img.save(file_path) # Convert to tensor and collect img_tensor = torch.from_numpy(np.array(img).astype(np.float32) / 255.0) images.append(img_tensor.unsqueeze(0)) list_styles.append(list_style) status_info["loaded"] += 1 except Exception as e: status_info["failed"] += 1 print(f"Error processing link '{line}': {str(e)}") if not images: raise ValueError("No images loaded from the provided links") combined_tensor = torch.cat(images, dim=0) return (combined_tensor, json.dumps(status_info, indent=2), list_styles) @classmethod def IS_CHANGED(cls, api_token, links_file_path, selected_file, direct_links, auto_save): """Force node re-execution when inputs change.""" return float("NaN") @PromptServer.instance.routes.post("/get_civitai_links_files") async def get_civitai_links_files(request): try: links_dir = "Bjornulf/civitai_links" if not os.path.exists(links_dir): return web.json_response({ "success": False, "error": "Links directory does not exist" }, status=404) files = [f for f in os.listdir(links_dir) if f.endswith(".txt")] return web.json_response({ "success": True, "files": files }, status=200) except Exception as e: error_msg = str(e) return web.json_response({ "success": False, "error": error_msg }, status=500) class APIGenerateCivitAIAddLORA: @classmethod def INPUT_TYPES(cls): return { "required": { "lora_urn": ("STRING", { "multiline": False, "default": "urn:air:flux1:lora:civitai:790034@883473" }), "strength": ("FLOAT", { "default": 1.0, "min": 0.0, "max": 2.0, "step": 0.01 }), }, "optional": { "add_LORA": ("add_LORA", {"forceInput": True}), } } RETURN_TYPES = ("add_LORA",) FUNCTION = "add_lora" CATEGORY = "Civitai" def add_lora(self, lora_urn, strength, add_LORA=None): try: request_data = {"additionalNetworks": {}} # Add the new LORA request_data["additionalNetworks"][lora_urn] = { "type": "Lora", "strength": strength } # If add_LORA is provided, concatenate it if add_LORA: additional_loras = json.loads(add_LORA) if "additionalNetworks" in additional_loras: request_data["additionalNetworks"].update(additional_loras["additionalNetworks"]) return (json.dumps(request_data),) except Exception as e: print(f"Error adding LORA: {str(e)}") return (json.dumps({"additionalNetworks": {}}),) # ====================== # MODEL SELECTOR CLASSES # ====================== class CivitAIModelSelectorSD15: @classmethod def INPUT_TYPES(s): # Get list of supported image extensions image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp') files = [f"sd_1.5/{f}" for f in folder_paths.get_filename_list("sd_1.5") if f.lower().endswith(image_extensions)] if not files: # If no files found, provide a default option files = ["none"] return { "required": { "image": (sorted(files), {"image_upload": True}), "civitai_token": ("STRING", {"default": ""}) }, } RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING") RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url") FUNCTION = "load_model" CATEGORY = "Bjornulf" def load_model(self, image, civitai_token): if image == "none": raise ValueError("No image selected") # Get the absolute path to the JSON file json_path = os.path.join(parsed_models_path, 'parsed_sd_1.5_models.json') # Load models info try: with open(json_path, 'r', encoding='utf-8') as f: models_info = json.load(f) except UnicodeDecodeError: # Fallback to latin-1 if UTF-8 fails with open(json_path, 'r', encoding='latin-1') as f: models_info = json.load(f) # Extract model name from image path image_name = os.path.basename(image) # Find corresponding model info model_info = next((model for model in models_info if os.path.basename(model['image_path']) == image_name), None) if not model_info: raise ValueError(f"No model information found for image: {image_name}") # Create checkpoints directory if it doesn't exist checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "sd1.5") os.makedirs(checkpoint_dir, exist_ok=True) # Expected model filename model_filename = f"{model_info['name']}.safetensors" full_model_path = os.path.join(checkpoint_dir, model_filename) # Check if model is already downloaded if not os.path.exists(full_model_path): print(f"Downloading model {model_info['name']}...") # Construct download URL with token download_url = model_info['download_url'] if civitai_token: download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}" try: # Download the file using class method download_file(download_url, checkpoint_dir, model_info['name'], civitai_token) except Exception as e: raise ValueError(f"Failed to download model: {e}") # Get relative path relative_model_path = os.path.join("Bjornulf_civitAI", "sd1.5", model_filename) # Try loading with relative path first try: model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path) except Exception as e: print(f"Error loading model with relative path: {e}") print(f"Attempting to load from full path: {full_model_path}") # Fallback to direct loading if needed from comfy.sd import load_checkpoint_guess_config model = load_checkpoint_guess_config(full_model_path) return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}") @classmethod def IS_CHANGED(s, image, **kwargs): if image == "none": return "" image_path = os.path.join(civitai_base_path, image) if not os.path.exists(image_path): return "" m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) m.update(image.encode('utf-8')) return m.digest().hex() class CivitAIModelSelectorSDXL: @classmethod def INPUT_TYPES(s): # Get list of supported image extensions image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp') files = [f"sdxl_1.0/{f}" for f in folder_paths.get_filename_list("sdxl_1.0") if f.lower().endswith(image_extensions)] if not files: # If no files found, provide a default option files = ["none"] return { "required": { "image": (sorted(files), {"image_upload": True}), "civitai_token": ("STRING", {"default": ""}) }, } RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING") RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url") FUNCTION = "load_model" CATEGORY = "Bjornulf" def load_model(self, image, civitai_token): if image == "none": raise ValueError("No image selected") # Get the absolute path to the JSON file json_path = os.path.join(parsed_models_path, 'parsed_sdxl_1.0_models.json') # Load models info try: with open(json_path, 'r', encoding='utf-8') as f: models_info = json.load(f) except UnicodeDecodeError: # Fallback to latin-1 if UTF-8 fails with open(json_path, 'r', encoding='latin-1') as f: models_info = json.load(f) # Extract model name from image path image_name = os.path.basename(image) # Find corresponding model info model_info = next((model for model in models_info if os.path.basename(model['image_path']) == image_name), None) if not model_info: raise ValueError(f"No model information found for image: {image_name}") # Create checkpoints directory if it doesn't exist checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "sdxl_1.0") os.makedirs(checkpoint_dir, exist_ok=True) # Expected model filename model_filename = f"{model_info['name']}.safetensors" full_model_path = os.path.join(checkpoint_dir, model_filename) # Check if model is already downloaded if not os.path.exists(full_model_path): print(f"Downloading model {model_info['name']}...") # Construct download URL with token download_url = model_info['download_url'] if civitai_token: download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}" try: # Download the file using class method download_file(download_url, checkpoint_dir, model_info['name'], civitai_token) except Exception as e: raise ValueError(f"Failed to download model: {e}") # Get relative path relative_model_path = os.path.join("Bjornulf_civitAI", "sdxl_1.0", model_filename) # Try loading with relative path first try: model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path) except Exception as e: print(f"Error loading model with relative path: {e}") print(f"Attempting to load from full path: {full_model_path}") # Fallback to direct loading if needed from comfy.sd import load_checkpoint_guess_config model = load_checkpoint_guess_config(full_model_path) # return (model[0], model[1], model[2], model_info['name'], model_info['download_url']) return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}") @classmethod def IS_CHANGED(s, image, **kwargs): if image == "none": return "" image_path = os.path.join(civitai_base_path, image) if not os.path.exists(image_path): return "" m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) m.update(image.encode('utf-8')) return m.digest().hex() class CivitAIModelSelectorFLUX_D: @classmethod def INPUT_TYPES(s): # Get list of supported image extensions image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp') files = [f"flux.1_d/{f}" for f in folder_paths.get_filename_list("flux.1_d") if f.lower().endswith(image_extensions)] if not files: # If no files found, provide a default option files = ["none"] return { "required": { "image": (sorted(files), {"image_upload": True}), "civitai_token": ("STRING", {"default": ""}) }, } RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING") RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url") FUNCTION = "load_model" CATEGORY = "Bjornulf" def load_model(self, image, civitai_token): if image == "none": raise ValueError("No image selected") # Get the absolute path to the JSON file json_path = os.path.join(parsed_models_path, 'parsed_flux.1_d_models.json') # Load models info try: with open(json_path, 'r', encoding='utf-8') as f: models_info = json.load(f) except UnicodeDecodeError: # Fallback to latin-1 if UTF-8 fails with open(json_path, 'r', encoding='latin-1') as f: models_info = json.load(f) # Extract model name from image path image_name = os.path.basename(image) # Find corresponding model info model_info = next((model for model in models_info if os.path.basename(model['image_path']) == image_name), None) if not model_info: raise ValueError(f"No model information found for image: {image_name}") # Create checkpoints directory if it doesn't exist checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "flux_d") os.makedirs(checkpoint_dir, exist_ok=True) # Expected model filename model_filename = f"{model_info['name']}.safetensors" full_model_path = os.path.join(checkpoint_dir, model_filename) # Check if model is already downloaded if not os.path.exists(full_model_path): print(f"Downloading model {model_info['name']}...") # Construct download URL with token download_url = model_info['download_url'] if civitai_token: download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}" try: # Download the file using class method download_file(download_url, checkpoint_dir, model_info['name'], civitai_token) except Exception as e: raise ValueError(f"Failed to download model: {e}") # Get relative path relative_model_path = os.path.join("Bjornulf_civitAI", "flux_d", model_filename) # Try loading with relative path first try: model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path) except Exception as e: print(f"Error loading model with relative path: {e}") print(f"Attempting to load from full path: {full_model_path}") # Fallback to direct loading if needed from comfy.sd import load_checkpoint_guess_config model = load_checkpoint_guess_config(full_model_path) return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}") @classmethod def IS_CHANGED(s, image, **kwargs): if image == "none": return "" image_path = os.path.join(civitai_base_path, image) if not os.path.exists(image_path): return "" m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) m.update(image.encode('utf-8')) return m.digest().hex() class CivitAIModelSelectorFLUX_S: @classmethod def INPUT_TYPES(s): # Get list of supported image extensions image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp') files = [f"flux.1_s/{f}" for f in folder_paths.get_filename_list("flux.1_s") if f.lower().endswith(image_extensions)] if not files: # If no files found, provide a default option files = ["none"] return { "required": { "image": (sorted(files), {"image_upload": True}), "civitai_token": ("STRING", {"default": ""}) }, } RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING") RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url") FUNCTION = "load_model" CATEGORY = "Bjornulf" def load_model(self, image, civitai_token): if image == "none": raise ValueError("No image selected") # Get the absolute path to the JSON file json_path = os.path.join(parsed_models_path, 'parsed_flux.1_s_models.json') # Load models info try: with open(json_path, 'r', encoding='utf-8') as f: models_info = json.load(f) except UnicodeDecodeError: # Fallback to latin-1 if UTF-8 fails with open(json_path, 'r', encoding='latin-1') as f: models_info = json.load(f) # Extract model name from image path image_name = os.path.basename(image) # Find corresponding model info model_info = next((model for model in models_info if os.path.basename(model['image_path']) == image_name), None) if not model_info: raise ValueError(f"No model information found for image: {image_name}") # Create checkpoints directory if it doesn't exist checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "flux_s") os.makedirs(checkpoint_dir, exist_ok=True) # Expected model filename model_filename = f"{model_info['name']}.safetensors" full_model_path = os.path.join(checkpoint_dir, model_filename) # Check if model is already downloaded if not os.path.exists(full_model_path): print(f"Downloading model {model_info['name']}...") # Construct download URL with token download_url = model_info['download_url'] if civitai_token: download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}" try: # Download the file using class method download_file(download_url, checkpoint_dir, model_info['name'], civitai_token) except Exception as e: raise ValueError(f"Failed to download model: {e}") # Get relative path relative_model_path = os.path.join("Bjornulf_civitAI", "flux_s", model_filename) # Try loading with relative path first try: model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path) except Exception as e: print(f"Error loading model with relative path: {e}") print(f"Attempting to load from full path: {full_model_path}") # Fallback to direct loading if needed from comfy.sd import load_checkpoint_guess_config model = load_checkpoint_guess_config(full_model_path) return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}") @classmethod def IS_CHANGED(s, image): if image == "none": return "" image_path = os.path.join(civitai_base_path, image) if not os.path.exists(image_path): return "" m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) m.update(image.encode('utf-8')) return m.digest().hex() class CivitAIModelSelectorPony: @classmethod def INPUT_TYPES(s): # Get list of supported image extensions image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp') files = [f"pony/{f}" for f in folder_paths.get_filename_list("pony") if f.lower().endswith(image_extensions)] if not files: # If no files found, provide a default option files = ["none"] return { "required": { "image": (sorted(files), {"image_upload": True}), "civitai_token": ("STRING", {"default": ""}) }, } RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING") RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url") FUNCTION = "load_model" CATEGORY = "Bjornulf" def load_model(self, image, civitai_token): if image == "none": raise ValueError("No image selected") # Get the absolute path to the JSON file json_path = os.path.join(parsed_models_path, 'parsed_pony_models.json') # Load models info try: with open(json_path, 'r', encoding='utf-8') as f: models_info = json.load(f) except UnicodeDecodeError: # Fallback to latin-1 if UTF-8 fails with open(json_path, 'r', encoding='latin-1') as f: models_info = json.load(f) # Extract model name from image path image_name = os.path.basename(image) # Find corresponding model info model_info = next((model for model in models_info if os.path.basename(model['image_path']) == image_name), None) if not model_info: raise ValueError(f"No model information found for image: {image_name}") # Create checkpoints directory if it doesn't exist checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "pony") os.makedirs(checkpoint_dir, exist_ok=True) # Expected model filename model_filename = f"{model_info['name']}.safetensors" full_model_path = os.path.join(checkpoint_dir, model_filename) # Check if model is already downloaded if not os.path.exists(full_model_path): print(f"Downloading model {model_info['name']}...") # Construct download URL with token download_url = model_info['download_url'] if civitai_token: download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}" try: # Download the file using class method download_file(download_url, checkpoint_dir, model_info['name'], civitai_token) except Exception as e: raise ValueError(f"Failed to download model: {e}") # Get relative path relative_model_path = os.path.join("Bjornulf_civitAI", "pony", model_filename) # Try loading with relative path first try: model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path) except Exception as e: print(f"Error loading model with relative path: {e}") print(f"Attempting to load from full path: {full_model_path}") # Fallback to direct loading if needed from comfy.sd import load_checkpoint_guess_config model = load_checkpoint_guess_config(full_model_path) return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}") @classmethod def IS_CHANGED(s, image, **kwargs): if image == "none": return "" image_path = os.path.join(civitai_base_path, image) if not os.path.exists(image_path): return "" m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) m.update(image.encode('utf-8')) return m.digest().hex() class CivitAILoraSelectorSD15: @classmethod def INPUT_TYPES(s): # Get list of supported image extensions image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp') files = [f"lora_sd_1.5/{f}" for f in folder_paths.get_filename_list("lora_sd_1.5") if f.lower().endswith(image_extensions)] if not files: # If no files found, provide a default option files = ["none"] return { "required": { "image": (sorted(files), {"image_upload": True}), "model": ("MODEL",), "clip": ("CLIP",), "strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), "strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), "civitai_token": ("STRING", {"default": ""}) }, } RETURN_TYPES = ("MODEL", "CLIP", "STRING", "STRING", "STRING") RETURN_NAMES = ("model", "clip", "name", "civitai_url", "trigger_words") FUNCTION = "load_lora" CATEGORY = "Bjornulf" def load_lora(self, image, model, clip, strength_model, strength_clip, civitai_token): def download_file(url, destination_path, lora_name, api_token=None): """ Download file with proper authentication headers and simple progress bar. """ filename = f"{lora_name}.safetensors" file_path = os.path.join(destination_path, filename) headers = {} if api_token: headers['Authorization'] = f'Bearer {api_token}' try: print(f"Downloading from: {url}") response = requests.get(url, headers=headers, stream=True) response.raise_for_status() file_size = int(response.headers.get('content-length', 0)) block_size = 8192 downloaded = 0 with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=block_size): if chunk: f.write(chunk) downloaded += len(chunk) if file_size > 0: progress = int(50 * downloaded / file_size) bars = '=' * progress + '-' * (50 - progress) percent = (downloaded / file_size) * 100 print(f'\rProgress: [{bars}] {percent:.1f}%', end='') print(f"\nFile downloaded successfully to: {file_path}") return file_path except requests.exceptions.RequestException as e: print(f"Error downloading file: {e}") raise if image == "none": raise ValueError("No image selected") # Get the absolute path to the JSON file json_path = os.path.join(parsed_models_path, 'parsed_lora_sd_1.5_loras.json') # Load loras info try: with open(json_path, 'r', encoding='utf-8') as f: models_info = json.load(f) except UnicodeDecodeError: # Fallback to latin-1 if UTF-8 fails with open(json_path, 'r', encoding='latin-1') as f: models_info = json.load(f) # Extract lora name from image path image_name = os.path.basename(image) # Find corresponding lora info lora_info = next((lora for lora in loras_info if os.path.basename(lora['image_path']) == image_name), None) if not lora_info: raise ValueError(f"No LoRA information found for image: {image_name}") # Create loras directory if it doesn't exist lora_dir = os.path.join(folder_paths.models_dir, "loras", "Bjornulf_civitAI", "sd_1.5") os.makedirs(lora_dir, exist_ok=True) # Expected lora filename lora_filename = f"{lora_info['name']}.safetensors" full_lora_path = os.path.join(lora_dir, lora_filename) # Check if lora is already downloaded if not os.path.exists(full_lora_path): print(f"Downloading LoRA {lora_info['name']}...") # Construct download URL with token download_url = lora_info['download_url'] if civitai_token: download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}" try: # Download the file download_file(download_url, lora_dir, lora_info['name'], civitai_token) except Exception as e: raise ValueError(f"Failed to download LoRA: {e}") # Get relative path relative_lora_path = os.path.join("Bjornulf_civitAI", "sd_1.5", lora_filename) # Load the LoRA try: lora_loader = nodes.LoraLoader() model_lora, clip_lora = lora_loader.load_lora(model=model, clip=clip, lora_name=relative_lora_path, strength_model=strength_model, strength_clip=strength_clip) except Exception as e: raise ValueError(f"Failed to load LoRA: {e}") # Convert trained words list to comma-separated string trained_words_str = ", ".join(lora_info.get('trained_words', [])) return (model_lora, clip_lora, lora_info['name'], f"https://civitai.com/models/{lora_info['lora_id']}", trained_words_str) @classmethod def IS_CHANGED(s, image, **kwargs): if image == "none": return "" image_path = os.path.join(civitai_base_path, image) if not os.path.exists(image_path): return "" m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) m.update(image.encode('utf-8')) return m.digest().hex() class CivitAILoraSelectorSDXL: @classmethod def INPUT_TYPES(s): # Get list of supported image extensions image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp') files = [f"lora_sdxl_1.0/{f}" for f in folder_paths.get_filename_list("lora_sdxl_1.0") if f.lower().endswith(image_extensions)] if not files: # If no files found, provide a default option files = ["none"] return { "required": { "image": (sorted(files), {"image_upload": True}), "model": ("MODEL",), "clip": ("CLIP",), "strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), "strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), "civitai_token": ("STRING", {"default": ""}) }, } RETURN_TYPES = ("MODEL", "CLIP", "STRING", "STRING", "STRING") RETURN_NAMES = ("model", "clip", "name", "civitai_url", "trigger_words") FUNCTION = "load_lora" CATEGORY = "Bjornulf" def load_lora(self, image, model, clip, strength_model, strength_clip, civitai_token): def download_file(url, destination_path, lora_name, api_token=None): """ Download file with proper authentication headers and simple progress bar. """ filename = f"{lora_name}.safetensors" file_path = os.path.join(destination_path, filename) headers = {} if api_token: headers['Authorization'] = f'Bearer {api_token}' try: print(f"Downloading from: {url}") response = requests.get(url, headers=headers, stream=True) response.raise_for_status() file_size = int(response.headers.get('content-length', 0)) block_size = 8192 downloaded = 0 with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=block_size): if chunk: f.write(chunk) downloaded += len(chunk) if file_size > 0: progress = int(50 * downloaded / file_size) bars = '=' * progress + '-' * (50 - progress) percent = (downloaded / file_size) * 100 print(f'\rProgress: [{bars}] {percent:.1f}%', end='') print(f"\nFile downloaded successfully to: {file_path}") return file_path except requests.exceptions.RequestException as e: print(f"Error downloading file: {e}") raise if image == "none": raise ValueError("No image selected") # Get the absolute path to the JSON file json_path = os.path.join(parsed_models_path, 'parsed_lora_sdxl_1.0_loras.json') # Load loras info try: with open(json_path, 'r', encoding='utf-8') as f: models_info = json.load(f) except UnicodeDecodeError: # Fallback to latin-1 if UTF-8 fails with open(json_path, 'r', encoding='latin-1') as f: models_info = json.load(f) # Extract lora name from image path image_name = os.path.basename(image) # Find corresponding lora info lora_info = next((lora for lora in loras_info if os.path.basename(lora['image_path']) == image_name), None) if not lora_info: raise ValueError(f"No LoRA information found for image: {image_name}") # Create loras directory if it doesn't exist lora_dir = os.path.join(folder_paths.models_dir, "loras", "Bjornulf_civitAI", "sdxl_1.0") os.makedirs(lora_dir, exist_ok=True) # Expected lora filename lora_filename = f"{lora_info['name']}.safetensors" full_lora_path = os.path.join(lora_dir, lora_filename) # Check if lora is already downloaded if not os.path.exists(full_lora_path): print(f"Downloading LoRA {lora_info['name']}...") # Construct download URL with token download_url = lora_info['download_url'] if civitai_token: download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}" try: # Download the file download_file(download_url, lora_dir, lora_info['name'], civitai_token) except Exception as e: raise ValueError(f"Failed to download LoRA: {e}") # Get relative path relative_lora_path = os.path.join("Bjornulf_civitAI", "sdxl_1.0", lora_filename) # Load the LoRA try: lora_loader = nodes.LoraLoader() model_lora, clip_lora = lora_loader.load_lora(model=model, clip=clip, lora_name=relative_lora_path, strength_model=strength_model, strength_clip=strength_clip) except Exception as e: raise ValueError(f"Failed to load LoRA: {e}") # Convert trained words list to comma-separated string trained_words_str = ", ".join(lora_info.get('trained_words', [])) return (model_lora, clip_lora, lora_info['name'], f"https://civitai.com/models/{lora_info['lora_id']}", trained_words_str) @classmethod def IS_CHANGED(s, image, **kwargs): if image == "none": return "" image_path = os.path.join(civitai_base_path, image) if not os.path.exists(image_path): return "" m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) m.update(image.encode('utf-8')) return m.digest().hex() class CivitAILoraSelectorPONY: @classmethod def INPUT_TYPES(s): # Get list of supported image extensions image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp') files = [f"lora_pony/{f}" for f in folder_paths.get_filename_list("lora_pony") if f.lower().endswith(image_extensions)] if not files: # If no files found, provide a default option files = ["none"] return { "required": { "image": (sorted(files), {"image_upload": True}), "model": ("MODEL",), "clip": ("CLIP",), "strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), "strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), "civitai_token": ("STRING", {"default": ""}) }, } RETURN_TYPES = ("MODEL", "CLIP", "STRING", "STRING", "STRING") RETURN_NAMES = ("model", "clip", "name", "civitai_url", "trigger_words") FUNCTION = "load_lora" CATEGORY = "Bjornulf" def load_lora(self, image, model, clip, strength_model, strength_clip, civitai_token): def download_file(url, destination_path, lora_name, api_token=None): """ Download file with proper authentication headers and simple progress bar. """ filename = f"{lora_name}.safetensors" file_path = os.path.join(destination_path, filename) headers = {} if api_token: headers['Authorization'] = f'Bearer {api_token}' try: print(f"Downloading from: {url}") response = requests.get(url, headers=headers, stream=True) response.raise_for_status() file_size = int(response.headers.get('content-length', 0)) block_size = 8192 downloaded = 0 with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=block_size): if chunk: f.write(chunk) downloaded += len(chunk) if file_size > 0: progress = int(50 * downloaded / file_size) bars = '=' * progress + '-' * (50 - progress) percent = (downloaded / file_size) * 100 print(f'\rProgress: [{bars}] {percent:.1f}%', end='') print(f"\nFile downloaded successfully to: {file_path}") return file_path except requests.exceptions.RequestException as e: print(f"Error downloading file: {e}") raise if image == "none": raise ValueError("No image selected") # Get the absolute path to the JSON file json_path = os.path.join(parsed_models_path, 'parsed_lora_pony_loras.json') # Load loras info try: with open(json_path, 'r', encoding='utf-8') as f: models_info = json.load(f) except UnicodeDecodeError: # Fallback to latin-1 if UTF-8 fails with open(json_path, 'r', encoding='latin-1') as f: models_info = json.load(f) # Extract lora name from image path image_name = os.path.basename(image) # Find corresponding lora info lora_info = next((lora for lora in loras_info if os.path.basename(lora['image_path']) == image_name), None) if not lora_info: raise ValueError(f"No LoRA information found for image: {image_name}") # Create loras directory if it doesn't exist lora_dir = os.path.join(folder_paths.models_dir, "loras", "Bjornulf_civitAI", "pony") os.makedirs(lora_dir, exist_ok=True) # Expected lora filename lora_filename = f"{lora_info['name']}.safetensors" full_lora_path = os.path.join(lora_dir, lora_filename) # Check if lora is already downloaded if not os.path.exists(full_lora_path): print(f"Downloading LoRA {lora_info['name']}...") # Construct download URL with token download_url = lora_info['download_url'] if civitai_token: download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}" try: # Download the file download_file(download_url, lora_dir, lora_info['name'], civitai_token) except Exception as e: raise ValueError(f"Failed to download LoRA: {e}") # Get relative path relative_lora_path = os.path.join("Bjornulf_civitAI", "pony", lora_filename) # Load the LoRA try: lora_loader = nodes.LoraLoader() model_lora, clip_lora = lora_loader.load_lora(model=model, clip=clip, lora_name=relative_lora_path, strength_model=strength_model, strength_clip=strength_clip) except Exception as e: raise ValueError(f"Failed to load LoRA: {e}") # Convert trained words list to comma-separated string trained_words_str = ", ".join(lora_info.get('trained_words', [])) return (model_lora, clip_lora, lora_info['name'], f"https://civitai.com/models/{lora_info['lora_id']}", trained_words_str) @classmethod def IS_CHANGED(s, image, **kwargs): if image == "none": return "" image_path = os.path.join(civitai_base_path, image) if not os.path.exists(image_path): return "" m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) m.update(image.encode('utf-8')) return m.digest().hex() class CivitAILoraSelectorHunyuan: @classmethod def INPUT_TYPES(s): image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp') # Try NSFW folder first # nsfw_files = [f"NSFW_lora_hunyuan_video/{f}" for f in folder_paths.get_filename_list("NSFW_lora_hunyuan_video") # if f.lower().endswith(image_extensions)] # If NSFW folder is empty or doesn't exist, try regular folder # if not nsfw_files: files = [f"lora_hunyuan_video/{f}" for f in folder_paths.get_filename_list("lora_hunyuan_video") if f.lower().endswith(image_extensions)] # else: # files = nsfw_files if not files: files = ["none"] return { "required": { "image": (sorted(files), {"image_upload": True}), "model": ("MODEL",), "clip": ("CLIP",), "strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), "strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), "civitai_token": ("STRING", {"default": ""}) }, } RETURN_TYPES = ("MODEL", "CLIP", "STRING", "STRING", "STRING") RETURN_NAMES = ("model", "clip", "name", "civitai_url", "trigger_words") FUNCTION = "load_lora" CATEGORY = "Bjornulf" def load_lora(self, image, model, clip, strength_model, strength_clip, civitai_token): def download_file(url, destination_path, lora_name, api_token=None): filename = f"{lora_name}.safetensors" file_path = os.path.join(destination_path, filename) headers = {} if api_token: headers['Authorization'] = f'Bearer {api_token}' try: print(f"Downloading from: {url}") response = requests.get(url, headers=headers, stream=True) response.raise_for_status() file_size = int(response.headers.get('content-length', 0)) block_size = 8192 downloaded = 0 with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=block_size): if chunk: f.write(chunk) downloaded += len(chunk) if file_size > 0: progress = int(50 * downloaded / file_size) bars = '=' * progress + '-' * (50 - progress) percent = (downloaded / file_size) * 100 print(f'\rProgress: [{bars}] {percent:.1f}%', end='') print(f"\nFile downloaded successfully to: {file_path}") return file_path except requests.exceptions.RequestException as e: print(f"Error downloading file: {e}") raise if image == "none": raise ValueError("No image selected") # Try loading NSFW JSON first, fall back to regular JSON if not found # nsfw_json_path = os.path.join(parsed_models_path, 'NSFW_parsed_lora_hunyuan_video_loras.json') # regular_json_path = os.path.join(parsed_models_path, 'parsed_lora_hunyuan_video_loras.json') json_path = os.path.join(parsed_models_path, 'parsed_lora_hunyuan_video_loras.json') # json_path = nsfw_json_path if os.path.exists(nsfw_json_path) else regular_json_path hunYuan = "hunyuan_video" try: with open(json_path, 'r', encoding='utf-8') as f: models_info = json.load(f) except UnicodeDecodeError: # Fallback to latin-1 if UTF-8 fails with open(json_path, 'r', encoding='latin-1') as f: models_info = json.load(f) image_name = os.path.basename(image) lora_info = next((lora for lora in loras_info if os.path.basename(lora['image_path']) == image_name), None) if not lora_info: raise ValueError(f"No LoRA information found for image: {image_name}") lora_dir = os.path.join(folder_paths.models_dir, "loras", "Bjornulf_civitAI", hunYuan) os.makedirs(lora_dir, exist_ok=True) lora_filename = f"{lora_info['name']}.safetensors" full_lora_path = os.path.join(lora_dir, lora_filename) if not os.path.exists(full_lora_path): print(f"Downloading LoRA {lora_info['name']}...") download_url = lora_info['download_url'] if civitai_token: download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}" try: download_file(download_url, lora_dir, lora_info['name'], civitai_token) except Exception as e: raise ValueError(f"Failed to download LoRA: {e}") relative_lora_path = os.path.join("Bjornulf_civitAI", hunYuan, lora_filename) try: lora_loader = nodes.LoraLoader() model_lora, clip_lora = lora_loader.load_lora(model=model, clip=clip, lora_name=relative_lora_path, strength_model=strength_model, strength_clip=strength_clip) except Exception as e: raise ValueError(f"Failed to load LoRA: {e}") trained_words_str = ", ".join(lora_info.get('trained_words', [])) return (model_lora, clip_lora, lora_info['name'], f"https://civitai.com/models/{lora_info['lora_id']}", trained_words_str) @classmethod def IS_CHANGED(s, image, **kwargs): if image == "none": return "" image_path = os.path.join(civitai_base_path, image) if not os.path.exists(image_path): return "" m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) m.update(image.encode('utf-8')) return m.digest().hex()