Files
Bjornulf_custom_nodes/API_civitai.py
justumen 6a21e32a42 0.75
2025-02-21 17:03:45 +01:00

1679 lines
67 KiB
Python

import os
import time
import requests
from PIL import Image, ImageSequence, ImageOps
import numpy as np
import torch
from io import BytesIO
import json
import threading
import random
import importlib
import folder_paths
import node_helpers
import hashlib
from folder_paths import get_filename_list, get_full_path, models_dir
import nodes
from pathlib import Path
import subprocess
# ======================
# SHARED UTILITY FUNCTIONS
# ======================
def get_civitai_base_paths():
"""Returns common paths for CivitAI integration"""
custom_nodes_dir = Path(__file__).parent.parent.parent.parent
civitai_base_path = custom_nodes_dir / "ComfyUI" / "custom_nodes" / "Bjornulf_custom_nodes" / "civitai"
return custom_nodes_dir, civitai_base_path, civitai_base_path # Last one is parsed_models_path
def setup_checkpoint_directory(model_type):
"""Creates and registers checkpoint directory for specific model type"""
_, _, parsed_models_path = get_civitai_base_paths()
checkpoint_dir = Path(folder_paths.models_dir) / "checkpoints" / "Bjornulf_civitAI" / model_type
checkpoint_dir.mkdir(parents=True, exist_ok=True)
checkpoint_folders = list(folder_paths.folder_names_and_paths["checkpoints"])
if str(checkpoint_dir) not in checkpoint_folders:
checkpoint_folders.append(str(checkpoint_dir))
folder_paths.folder_names_and_paths["checkpoints"] = tuple(checkpoint_folders)
return checkpoint_dir, parsed_models_path
def setup_image_folders(folder_specs, parent_dir=""):
"""Creates and registers image folders for different model types
Args:
folder_specs: Dictionary of folder_name -> sub_path
parent_dir: Optional subdirectory to place links under in input folder
"""
_, civitai_base_path, _ = get_civitai_base_paths()
for folder_name, sub_path in folder_specs.items():
full_path = civitai_base_path / sub_path
folder_paths.add_model_folder_path(folder_name, str(full_path))
create_symlink(full_path, folder_name, parent_dir)
# Code works, tested on linux and windows
def create_symlink(source, target_name, parent_dir=None):
"""Creates a symlink inside the ComfyUI/input directory on Linux and Windows."""
if os.name == 'nt': # Windows
comfyui_input = Path("ComfyUI/input")
else:
comfyui_input = Path("input")
# Ensure the input directory exists
comfyui_input.mkdir(parents=True, exist_ok=True)
if parent_dir:
parent_path = comfyui_input / parent_dir
parent_path.mkdir(parents=True, exist_ok=True)
target = parent_path / target_name
else:
target = comfyui_input / target_name
# Windows handling remains unchanged
if os.name == 'nt':
if not target.exists():
try:
base_path = Path(__file__).resolve().parent # Get script location
source_path = base_path / "ComfyUI" / source # Ensure it points inside ComfyUI
try:
target.symlink_to(source_path, target_is_directory=source_path.is_dir())
#print(f"✅ Symlink created: {target} -> {source_path}")
except OSError:
if source_path.is_dir():
cmd = [
"powershell",
"New-Item",
"-ItemType",
"Junction",
"-Path",
str(target),
"-Value",
str(source_path)
]
subprocess.run(cmd, check=True, shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
#print(f"✅ Junction created: {target} -> {source_path}")
else:
print(f"❌ Failed to create symlink/junction for {target_name}.")
except Exception as e:
print(f"❌ Failed to create symlink for {target_name}: {e}")
else: # Linux handling with complete error management
try:
# Check if source is already absolute path
if os.path.isabs(source):
source_path = Path(source)
# Check if the source exists with the given case
if not source_path.exists():
# Try case variations for Bjornulf/bjornulf part of the path
if 'Bjornulf_custom_nodes' in str(source_path):
alt_source_path = Path(str(source_path).replace('Bjornulf_custom_nodes', 'bjornulf_custom_nodes'))
if alt_source_path.exists():
source_path = alt_source_path
elif 'bjornulf_custom_nodes' in str(source_path):
alt_source_path = Path(str(source_path).replace('bjornulf_custom_nodes', 'Bjornulf_custom_nodes'))
if alt_source_path.exists():
source_path = alt_source_path
# If still doesn't exist after trying case variations
if not source_path.exists():
print(f"❌ Source path doesn't exist (checked both cases): {source}")
return
else:
# For relative paths
source_path = Path(source).absolute()
if not source_path.exists():
print(f"❌ Source path doesn't exist: {source_path}")
return
# Force remove target if it exists (regardless of type)
if target.exists() or target.is_symlink():
try:
if target.is_dir() and not target.is_symlink():
import shutil
shutil.rmtree(target)
else:
os.unlink(target)
except Exception as e:
print(f"❌ Failed to remove existing target {target}: {e}")
return
# Create the symlink
try:
os.symlink(source_path, target, target_is_directory=source_path.is_dir())
#print(f"✅ Symlink created: {target} -> {source_path}")
except Exception as e:
# Try with explicit target_is_directory set based on source
try:
os.symlink(source_path, target, target_is_directory=True)
#print(f"✅ Symlink created with explicit directory flag: {target} -> {source_path}")
except Exception as e2:
print(f"❌ Failed to create symlink for {target_name}: {e2}")
except Exception as e:
print(f"❌ Failed to create symlink for {target_name}: {e}")
def download_file(url, destination_path, model_name, api_token=None):
"""Universal downloader with progress tracking"""
headers = {'Authorization': f'Bearer {api_token}'} if api_token else {}
filename = f"{model_name}.safetensors"
file_path = Path(destination_path) / filename
try:
with requests.get(url, headers=headers, stream=True) as response:
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Add progress reporting here if needed
return str(file_path)
except Exception as e:
raise RuntimeError(f"Download failed: {str(e)}")
# Set up main checkpoint directory
_, civitai_base_path, parsed_models_path = get_civitai_base_paths()
bjornulf_checkpoint_path = Path(folder_paths.models_dir) / "checkpoints" / "Bjornulf_civitAI"
bjornulf_checkpoint_path.mkdir(parents=True, exist_ok=True)
# Register the main checkpoint folder
checkpoint_folders = list(folder_paths.folder_names_and_paths["checkpoints"])
if str(bjornulf_checkpoint_path) not in checkpoint_folders:
checkpoint_folders.append(str(bjornulf_checkpoint_path))
folder_paths.folder_names_and_paths["checkpoints"] = tuple(checkpoint_folders)
# Define image folders
image_folders = {
"sdxl_1.0": "sdxl_1.0",
"sd_1.5": "sd_1.5",
"pony": "pony",
"flux.1_d": "flux.1_d",
"flux.1_s": "flux.1_s",
"lora_sdxl_1.0": "lora_sdxl_1.0",
"lora_sd_1.5": "lora_sd_1.5",
"lora_pony": "lora_pony",
"lora_flux.1_d": "lora_flux.1_d",
"lora_hunyuan_video": "lora_hunyuan_video",
# "NSFW_lora_hunyuan_video": "NSFW_lora_hunyuan_video"
}
# Set up image folders using the function, placing links under input/Bjornulf/
setup_image_folders(image_folders)
def get_civitai():
import civitai
importlib.reload(civitai)
return civitai
# Check if the environment variable exists
if "CIVITAI_API_TOKEN" not in os.environ:
os.environ["CIVITAI_API_TOKEN"] = "d5fc336223a367e6b503a14a10569825"
# else:
# print("CIVITAI_API_TOKEN already exists in the environment.")
import civitai
# ======================
# GENERATE WITH CIVITAI
# ======================
class APIGenerateCivitAI:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"api_token": ("STRING", {
"multiline": False,
"default": "",
"placeholder": "Enter your CivitAI API token here"
}),
"model_urn": ("STRING", {
"multiline": False,
"default": "urn:air:sdxl:checkpoint:civitai:133005@782002"
}),
"prompt": ("STRING", {
"multiline": True,
"default": "RAW photo, face portrait photo of 26 y.o woman"
}),
"negative_prompt": ("STRING", {
"multiline": True,
"default": "(deformed iris, deformed pupils, semi-realistic, cgi, 3d, render, sketch, cartoon, drawing, anime)"
}),
"width": ("INT", {
"default": 1024,
"min": 128,
"max": 1024,
"step": 64
}),
"height": ("INT", {
"default": 768,
"min": 128,
"max": 1024,
"step": 64
}),
"steps": ("INT", {
"default": 20,
"min": 1,
"max": 50,
"step": 1
}),
"cfg_scale": ("FLOAT", {
"default": 7.0,
"min": 1.0,
"max": 30.0,
"step": 0.1
}),
"seed": ("INT", {
"default": -1,
"min": -1,
"max": 0x7FFFFFFFFFFFFFFF
}),
"number_of_images": ("INT", {
"default": 1,
"min": 1,
"max": 10,
"step": 1
}),
"timeout": ("INT", {
"default": 300,
"min": 60,
"max": 1800,
"step": 60,
"display": "Timeout (seconds)"
}),
},
"optional":{
"add_LORA": ("add_LORA", {"forceInput": True}),
}
}
RETURN_TYPES = ("IMAGE", "STRING",)
RETURN_NAMES = ("image", "generation_info",)
FUNCTION = "generate"
CATEGORY = "Civitai"
def __init__(self):
self.output_dir = "output/API/CivitAI"
self.metadata_dir = "output/API/CivitAI/metadata"
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.metadata_dir, exist_ok=True)
self._interrupt_event = threading.Event()
def get_next_number(self):
"""Get the next available number for file naming"""
files = [f for f in os.listdir(self.output_dir) if f.endswith('.png')]
if not files:
return 1
numbers = [int(f.split('.')[0]) for f in files]
return max(numbers) + 1
def check_job_status(self, job_token, job_id, timeout=9999):
"""Check job status with timeout"""
start_time = time.time()
while time.time() - start_time < timeout and not self._interrupt_event.is_set():
try:
response = civitai.jobs.get(token=job_token)
job_status = response['jobs'][0]
if job_status.get('status') == 'failed':
raise Exception(f"Job failed: {job_status.get('error', 'Unknown error')}")
if job_status['result'].get('available'):
return job_status['result'].get('blobUrl')
print(f"Job Status: {job_status['status']}")
time.sleep(2)
except Exception as e:
print(f"Error checking job status: {str(e)}")
time.sleep(2)
# Check for interruption
if self._interrupt_event.is_set():
raise InterruptedError("Generation interrupted by user")
if self._interrupt_event.is_set():
raise InterruptedError("Generation interrupted by user")
raise TimeoutError(f"Job timed out after {timeout} seconds")
def save_image_and_metadata(self, img, generation_info, number):
"""Save both image and its metadata"""
# Save image
filename = f"{number:04d}.png"
filepath = os.path.join(self.output_dir, filename)
img.save(filepath)
# Save metadata
metadata_filename = f"{number:04d}_metadata.json"
metadata_filepath = os.path.join(self.metadata_dir, metadata_filename)
with open(metadata_filepath, 'w') as f:
json.dump(generation_info, f, indent=4)
return filepath, metadata_filepath
def format_generation_info(self, input_data, job_token, job_id, image_url):
"""Format generation information for recovery"""
recovery_info = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"generation_parameters": input_data,
"job_details": {
"token": job_token,
"job_id": job_id,
"image_url": image_url
},
"recovery_command": f"curl -X GET '{image_url}' --output recovered_image.png",
"recovery_instructions": """
To recover this image:
1. Use the provided curl command to download the image
2. Or use the image_url directly in a browser
3. If the image is no longer available, you can retry generation with the same parameters
"""
}
return recovery_info
def generate_single_image(self, input_data, job_token, job_id, timeout):
"""Generate a single image and return its tensor and info"""
try:
image_url = self.check_job_status(job_token, job_id, timeout)
if not image_url:
raise ValueError("No image URL received")
image_response = requests.get(image_url)
if image_response.status_code != 200:
raise ConnectionError(f"Failed to download image: Status code {image_response.status_code}")
img = Image.open(BytesIO(image_response.content))
if img.mode != 'RGB':
img = img.convert('RGB')
number = self.get_next_number()
generation_info = self.format_generation_info(input_data, job_token, job_id, image_url)
image_path, metadata_path = self.save_image_and_metadata(img, generation_info, number)
img_tensor = torch.from_numpy(np.array(img).astype(np.float32) / 255.0)
img_tensor = img_tensor.unsqueeze(0)
return img_tensor, generation_info
except Exception as e:
raise Exception(f"Error generating single image: {str(e)}")
def generate(self, api_token, prompt, negative_prompt, width, height, model_urn, steps=20,
cfg_scale=7.0, seed=-1, number_of_images=1, timeout=300, add_LORA=""):
# Set the environment variable
if api_token:
os.environ["CIVITAI_API_TOKEN"] = api_token
# Get a fresh instance of civitai with the new token
civitai = get_civitai()
self._interrupt_event.clear()
empty_image = torch.zeros((1, height, width, 3))
try:
# Handle seed
if seed == -1:
seed = random.randint(0, 0x7FFFFFFFFFFFFFFF)
# Prepare jobs list
jobs = []
generation_tasks = []
for i in range(number_of_images):
current_seed = seed + i
input_data = {
"model": model_urn,
"params": {
"prompt": prompt,
"negativePrompt": negative_prompt,
"scheduler": "EulerA",
"steps": steps,
"cfgScale": cfg_scale,
"width": width,
"height": height,
"clipSkip": 2,
"seed": current_seed
}
}
# Handle add_LORA input if provided
if add_LORA:
try:
lora_data = json.loads(add_LORA)
if "additionalNetworks" in lora_data:
input_data["additionalNetworks"] = lora_data["additionalNetworks"]
except Exception as e:
print(f"Error processing LORA data: {str(e)}")
# Create generation job
response = civitai.image.create(input_data)
if not response or 'token' not in response or 'jobs' not in response:
raise ValueError("Invalid response from Civitai API")
jobs.append({
'token': response['token'],
'job_id': response['jobs'][0]['jobId'],
'input_data': input_data
})
# Process all jobs in parallel
images = []
infos = []
failed_jobs = []
for job in jobs:
try:
img_tensor, generation_info = self.generate_single_image(
job['input_data'],
job['token'],
job['job_id'],
timeout
)
images.append(img_tensor)
infos.append(generation_info)
except Exception as e:
failed_jobs.append({
'job': job,
'error': str(e)
})
if not images: # If all jobs failed
generation_info = {
"error": "All generation jobs failed",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"failed_jobs": failed_jobs
}
return (empty_image, json.dumps(generation_info, indent=2))
# Combine images into a batch
combined_tensor = torch.cat(images, dim=0)
# Combine generation info
combined_info = {
"successful_generations": len(images),
"total_requested": number_of_images,
"base_seed": seed,
"generation_parameters": jobs[0]['input_data'],
"individual_results": infos,
"failed_jobs": failed_jobs if failed_jobs else None
}
return (combined_tensor, json.dumps(combined_info, indent=2))
except InterruptedError:
generation_info = {
"error": "Generation interrupted by user",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"base_seed": seed
}
return (empty_image, json.dumps(generation_info, indent=2))
except Exception as e:
generation_info = {
"error": f"Civitai generation failed: {str(e)}",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"base_seed": seed if 'seed' in locals() else None
}
return (empty_image, json.dumps(generation_info, indent=2))
@classmethod
def IS_CHANGED(cls, **kwargs):
return float("NaN")
def interrupt(self):
"""Method to handle interruption"""
print("Interrupting CivitAI generation...")
self._interrupt_event.set()
class APIGenerateCivitAIAddLORA:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"lora_urn": ("STRING", {
"multiline": False,
"default": "urn:air:flux1:lora:civitai:790034@883473"
}),
"strength": ("FLOAT", {
"default": 1.0,
"min": 0.0,
"max": 2.0,
"step": 0.01
}),
},
"optional": {
"add_LORA": ("add_LORA", {"forceInput": True}),
}
}
RETURN_TYPES = ("add_LORA",)
FUNCTION = "add_lora"
CATEGORY = "Civitai"
def add_lora(self, lora_urn, strength, add_LORA=None):
try:
request_data = {"additionalNetworks": {}}
# Add the new LORA
request_data["additionalNetworks"][lora_urn] = {
"type": "Lora",
"strength": strength
}
# If add_LORA is provided, concatenate it
if add_LORA:
additional_loras = json.loads(add_LORA)
if "additionalNetworks" in additional_loras:
request_data["additionalNetworks"].update(additional_loras["additionalNetworks"])
return (json.dumps(request_data),)
except Exception as e:
print(f"Error adding LORA: {str(e)}")
return (json.dumps({"additionalNetworks": {}}),)
# ======================
# MODEL SELECTOR CLASSES
# ======================
class CivitAIModelSelectorSD15:
@classmethod
def INPUT_TYPES(s):
# Get list of supported image extensions
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
files = [f"sd_1.5/{f}" for f in folder_paths.get_filename_list("sd_1.5")
if f.lower().endswith(image_extensions)]
if not files: # If no files found, provide a default option
files = ["none"]
return {
"required": {
"image": (sorted(files), {"image_upload": True}),
"civitai_token": ("STRING", {"default": ""})
},
}
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING")
RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url")
FUNCTION = "load_model"
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
if image == "none":
raise ValueError("No image selected")
# Get the absolute path to the JSON file
json_path = os.path.join(parsed_models_path, 'parsed_sd_1.5_models.json')
# Load models info
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
# Find corresponding model info
model_info = next((model for model in models_info
if os.path.basename(model['image_path']) == image_name), None)
if not model_info:
raise ValueError(f"No model information found for image: {image_name}")
# Create checkpoints directory if it doesn't exist
checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "sd1.5")
os.makedirs(checkpoint_dir, exist_ok=True)
# Expected model filename
model_filename = f"{model_info['name']}.safetensors"
full_model_path = os.path.join(checkpoint_dir, model_filename)
# Check if model is already downloaded
if not os.path.exists(full_model_path):
print(f"Downloading model {model_info['name']}...")
# Construct download URL with token
download_url = model_info['download_url']
if civitai_token:
download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}"
try:
# Download the file using class method
download_file(download_url, checkpoint_dir, model_info['name'], civitai_token)
except Exception as e:
raise ValueError(f"Failed to download model: {e}")
# Get relative path
relative_model_path = os.path.join("Bjornulf_civitAI", "sd1.5", model_filename)
# Try loading with relative path first
try:
model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path)
except Exception as e:
print(f"Error loading model with relative path: {e}")
print(f"Attempting to load from full path: {full_model_path}")
# Fallback to direct loading if needed
from comfy.sd import load_checkpoint_guess_config
model = load_checkpoint_guess_config(full_model_path)
return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}")
@classmethod
def IS_CHANGED(s, image, **kwargs):
if image == "none":
return ""
image_path = os.path.join(civitai_base_path, image)
if not os.path.exists(image_path):
return ""
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
m.update(image.encode('utf-8'))
return m.digest().hex()
class CivitAIModelSelectorSDXL:
@classmethod
def INPUT_TYPES(s):
# Get list of supported image extensions
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
files = [f"sdxl_1.0/{f}" for f in folder_paths.get_filename_list("sdxl_1.0")
if f.lower().endswith(image_extensions)]
if not files: # If no files found, provide a default option
files = ["none"]
return {
"required": {
"image": (sorted(files), {"image_upload": True}),
"civitai_token": ("STRING", {"default": ""})
},
}
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING")
RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url")
FUNCTION = "load_model"
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
if image == "none":
raise ValueError("No image selected")
# Get the absolute path to the JSON file
json_path = os.path.join(parsed_models_path, 'parsed_sdxl_1.0_models.json')
# Load models info
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
# Find corresponding model info
model_info = next((model for model in models_info
if os.path.basename(model['image_path']) == image_name), None)
if not model_info:
raise ValueError(f"No model information found for image: {image_name}")
# Create checkpoints directory if it doesn't exist
checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "sdxl_1.0")
os.makedirs(checkpoint_dir, exist_ok=True)
# Expected model filename
model_filename = f"{model_info['name']}.safetensors"
full_model_path = os.path.join(checkpoint_dir, model_filename)
# Check if model is already downloaded
if not os.path.exists(full_model_path):
print(f"Downloading model {model_info['name']}...")
# Construct download URL with token
download_url = model_info['download_url']
if civitai_token:
download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}"
try:
# Download the file using class method
download_file(download_url, checkpoint_dir, model_info['name'], civitai_token)
except Exception as e:
raise ValueError(f"Failed to download model: {e}")
# Get relative path
relative_model_path = os.path.join("Bjornulf_civitAI", "sdxl_1.0", model_filename)
# Try loading with relative path first
try:
model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path)
except Exception as e:
print(f"Error loading model with relative path: {e}")
print(f"Attempting to load from full path: {full_model_path}")
# Fallback to direct loading if needed
from comfy.sd import load_checkpoint_guess_config
model = load_checkpoint_guess_config(full_model_path)
# return (model[0], model[1], model[2], model_info['name'], model_info['download_url'])
return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}")
@classmethod
def IS_CHANGED(s, image, **kwargs):
if image == "none":
return ""
image_path = os.path.join(civitai_base_path, image)
if not os.path.exists(image_path):
return ""
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
m.update(image.encode('utf-8'))
return m.digest().hex()
class CivitAIModelSelectorFLUX_D:
@classmethod
def INPUT_TYPES(s):
# Get list of supported image extensions
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
files = [f"flux.1_d/{f}" for f in folder_paths.get_filename_list("flux.1_d")
if f.lower().endswith(image_extensions)]
if not files: # If no files found, provide a default option
files = ["none"]
return {
"required": {
"image": (sorted(files), {"image_upload": True}),
"civitai_token": ("STRING", {"default": ""})
},
}
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING")
RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url")
FUNCTION = "load_model"
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
if image == "none":
raise ValueError("No image selected")
# Get the absolute path to the JSON file
json_path = os.path.join(parsed_models_path, 'parsed_flux.1_d_models.json')
# Load models info
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
# Find corresponding model info
model_info = next((model for model in models_info
if os.path.basename(model['image_path']) == image_name), None)
if not model_info:
raise ValueError(f"No model information found for image: {image_name}")
# Create checkpoints directory if it doesn't exist
checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "flux_d")
os.makedirs(checkpoint_dir, exist_ok=True)
# Expected model filename
model_filename = f"{model_info['name']}.safetensors"
full_model_path = os.path.join(checkpoint_dir, model_filename)
# Check if model is already downloaded
if not os.path.exists(full_model_path):
print(f"Downloading model {model_info['name']}...")
# Construct download URL with token
download_url = model_info['download_url']
if civitai_token:
download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}"
try:
# Download the file using class method
download_file(download_url, checkpoint_dir, model_info['name'], civitai_token)
except Exception as e:
raise ValueError(f"Failed to download model: {e}")
# Get relative path
relative_model_path = os.path.join("Bjornulf_civitAI", "flux_d", model_filename)
# Try loading with relative path first
try:
model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path)
except Exception as e:
print(f"Error loading model with relative path: {e}")
print(f"Attempting to load from full path: {full_model_path}")
# Fallback to direct loading if needed
from comfy.sd import load_checkpoint_guess_config
model = load_checkpoint_guess_config(full_model_path)
return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}")
@classmethod
def IS_CHANGED(s, image, **kwargs):
if image == "none":
return ""
image_path = os.path.join(civitai_base_path, image)
if not os.path.exists(image_path):
return ""
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
m.update(image.encode('utf-8'))
return m.digest().hex()
class CivitAIModelSelectorFLUX_S:
@classmethod
def INPUT_TYPES(s):
# Get list of supported image extensions
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
files = [f"flux.1_s/{f}" for f in folder_paths.get_filename_list("flux.1_s")
if f.lower().endswith(image_extensions)]
if not files: # If no files found, provide a default option
files = ["none"]
return {
"required": {
"image": (sorted(files), {"image_upload": True}),
"civitai_token": ("STRING", {"default": ""})
},
}
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING")
RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url")
FUNCTION = "load_model"
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
if image == "none":
raise ValueError("No image selected")
# Get the absolute path to the JSON file
json_path = os.path.join(parsed_models_path, 'parsed_flux.1_s_models.json')
# Load models info
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
# Find corresponding model info
model_info = next((model for model in models_info
if os.path.basename(model['image_path']) == image_name), None)
if not model_info:
raise ValueError(f"No model information found for image: {image_name}")
# Create checkpoints directory if it doesn't exist
checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "flux_s")
os.makedirs(checkpoint_dir, exist_ok=True)
# Expected model filename
model_filename = f"{model_info['name']}.safetensors"
full_model_path = os.path.join(checkpoint_dir, model_filename)
# Check if model is already downloaded
if not os.path.exists(full_model_path):
print(f"Downloading model {model_info['name']}...")
# Construct download URL with token
download_url = model_info['download_url']
if civitai_token:
download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}"
try:
# Download the file using class method
download_file(download_url, checkpoint_dir, model_info['name'], civitai_token)
except Exception as e:
raise ValueError(f"Failed to download model: {e}")
# Get relative path
relative_model_path = os.path.join("Bjornulf_civitAI", "flux_s", model_filename)
# Try loading with relative path first
try:
model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path)
except Exception as e:
print(f"Error loading model with relative path: {e}")
print(f"Attempting to load from full path: {full_model_path}")
# Fallback to direct loading if needed
from comfy.sd import load_checkpoint_guess_config
model = load_checkpoint_guess_config(full_model_path)
return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}")
@classmethod
def IS_CHANGED(s, image):
if image == "none":
return ""
image_path = os.path.join(civitai_base_path, image)
if not os.path.exists(image_path):
return ""
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
m.update(image.encode('utf-8'))
return m.digest().hex()
class CivitAIModelSelectorPony:
@classmethod
def INPUT_TYPES(s):
# Get list of supported image extensions
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
files = [f"pony/{f}" for f in folder_paths.get_filename_list("pony")
if f.lower().endswith(image_extensions)]
if not files: # If no files found, provide a default option
files = ["none"]
return {
"required": {
"image": (sorted(files), {"image_upload": True}),
"civitai_token": ("STRING", {"default": ""})
},
}
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "STRING", "STRING")
RETURN_NAMES = ("model", "clip", "vae", "name", "civitai_url")
FUNCTION = "load_model"
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
if image == "none":
raise ValueError("No image selected")
# Get the absolute path to the JSON file
json_path = os.path.join(parsed_models_path, 'parsed_pony_models.json')
# Load models info
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
# Find corresponding model info
model_info = next((model for model in models_info
if os.path.basename(model['image_path']) == image_name), None)
if not model_info:
raise ValueError(f"No model information found for image: {image_name}")
# Create checkpoints directory if it doesn't exist
checkpoint_dir = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI", "pony")
os.makedirs(checkpoint_dir, exist_ok=True)
# Expected model filename
model_filename = f"{model_info['name']}.safetensors"
full_model_path = os.path.join(checkpoint_dir, model_filename)
# Check if model is already downloaded
if not os.path.exists(full_model_path):
print(f"Downloading model {model_info['name']}...")
# Construct download URL with token
download_url = model_info['download_url']
if civitai_token:
download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}"
try:
# Download the file using class method
download_file(download_url, checkpoint_dir, model_info['name'], civitai_token)
except Exception as e:
raise ValueError(f"Failed to download model: {e}")
# Get relative path
relative_model_path = os.path.join("Bjornulf_civitAI", "pony", model_filename)
# Try loading with relative path first
try:
model = nodes.CheckpointLoaderSimple().load_checkpoint(relative_model_path)
except Exception as e:
print(f"Error loading model with relative path: {e}")
print(f"Attempting to load from full path: {full_model_path}")
# Fallback to direct loading if needed
from comfy.sd import load_checkpoint_guess_config
model = load_checkpoint_guess_config(full_model_path)
return (model[0], model[1], model[2], model_info['name'], f"https://civitai.com/models/{model_info['model_id']}")
@classmethod
def IS_CHANGED(s, image, **kwargs):
if image == "none":
return ""
image_path = os.path.join(civitai_base_path, image)
if not os.path.exists(image_path):
return ""
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
m.update(image.encode('utf-8'))
return m.digest().hex()
class CivitAILoraSelectorSD15:
@classmethod
def INPUT_TYPES(s):
# Get list of supported image extensions
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
files = [f"lora_sd_1.5/{f}" for f in folder_paths.get_filename_list("lora_sd_1.5")
if f.lower().endswith(image_extensions)]
if not files: # If no files found, provide a default option
files = ["none"]
return {
"required": {
"image": (sorted(files), {"image_upload": True}),
"model": ("MODEL",),
"clip": ("CLIP",),
"strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
"strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
"civitai_token": ("STRING", {"default": ""})
},
}
RETURN_TYPES = ("MODEL", "CLIP", "STRING", "STRING", "STRING")
RETURN_NAMES = ("model", "clip", "name", "civitai_url", "trigger_words")
FUNCTION = "load_lora"
CATEGORY = "Bjornulf"
def load_lora(self, image, model, clip, strength_model, strength_clip, civitai_token):
def download_file(url, destination_path, lora_name, api_token=None):
"""
Download file with proper authentication headers and simple progress bar.
"""
filename = f"{lora_name}.safetensors"
file_path = os.path.join(destination_path, filename)
headers = {}
if api_token:
headers['Authorization'] = f'Bearer {api_token}'
try:
print(f"Downloading from: {url}")
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if file_size > 0:
progress = int(50 * downloaded / file_size)
bars = '=' * progress + '-' * (50 - progress)
percent = (downloaded / file_size) * 100
print(f'\rProgress: [{bars}] {percent:.1f}%', end='')
print(f"\nFile downloaded successfully to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
raise
if image == "none":
raise ValueError("No image selected")
# Get the absolute path to the JSON file
json_path = os.path.join(parsed_models_path, 'parsed_lora_sd_1.5_loras.json')
# Load loras info
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract lora name from image path
image_name = os.path.basename(image)
# Find corresponding lora info
lora_info = next((lora for lora in loras_info
if os.path.basename(lora['image_path']) == image_name), None)
if not lora_info:
raise ValueError(f"No LoRA information found for image: {image_name}")
# Create loras directory if it doesn't exist
lora_dir = os.path.join(folder_paths.models_dir, "loras", "Bjornulf_civitAI", "sd_1.5")
os.makedirs(lora_dir, exist_ok=True)
# Expected lora filename
lora_filename = f"{lora_info['name']}.safetensors"
full_lora_path = os.path.join(lora_dir, lora_filename)
# Check if lora is already downloaded
if not os.path.exists(full_lora_path):
print(f"Downloading LoRA {lora_info['name']}...")
# Construct download URL with token
download_url = lora_info['download_url']
if civitai_token:
download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}"
try:
# Download the file
download_file(download_url, lora_dir, lora_info['name'], civitai_token)
except Exception as e:
raise ValueError(f"Failed to download LoRA: {e}")
# Get relative path
relative_lora_path = os.path.join("Bjornulf_civitAI", "sd_1.5", lora_filename)
# Load the LoRA
try:
lora_loader = nodes.LoraLoader()
model_lora, clip_lora = lora_loader.load_lora(model=model,
clip=clip,
lora_name=relative_lora_path,
strength_model=strength_model,
strength_clip=strength_clip)
except Exception as e:
raise ValueError(f"Failed to load LoRA: {e}")
# Convert trained words list to comma-separated string
trained_words_str = ", ".join(lora_info.get('trained_words', []))
return (model_lora, clip_lora, lora_info['name'], f"https://civitai.com/models/{lora_info['lora_id']}", trained_words_str)
@classmethod
def IS_CHANGED(s, image, **kwargs):
if image == "none":
return ""
image_path = os.path.join(civitai_base_path, image)
if not os.path.exists(image_path):
return ""
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
m.update(image.encode('utf-8'))
return m.digest().hex()
class CivitAILoraSelectorSDXL:
@classmethod
def INPUT_TYPES(s):
# Get list of supported image extensions
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
files = [f"lora_sdxl_1.0/{f}" for f in folder_paths.get_filename_list("lora_sdxl_1.0")
if f.lower().endswith(image_extensions)]
if not files: # If no files found, provide a default option
files = ["none"]
return {
"required": {
"image": (sorted(files), {"image_upload": True}),
"model": ("MODEL",),
"clip": ("CLIP",),
"strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
"strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
"civitai_token": ("STRING", {"default": ""})
},
}
RETURN_TYPES = ("MODEL", "CLIP", "STRING", "STRING", "STRING")
RETURN_NAMES = ("model", "clip", "name", "civitai_url", "trigger_words")
FUNCTION = "load_lora"
CATEGORY = "Bjornulf"
def load_lora(self, image, model, clip, strength_model, strength_clip, civitai_token):
def download_file(url, destination_path, lora_name, api_token=None):
"""
Download file with proper authentication headers and simple progress bar.
"""
filename = f"{lora_name}.safetensors"
file_path = os.path.join(destination_path, filename)
headers = {}
if api_token:
headers['Authorization'] = f'Bearer {api_token}'
try:
print(f"Downloading from: {url}")
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if file_size > 0:
progress = int(50 * downloaded / file_size)
bars = '=' * progress + '-' * (50 - progress)
percent = (downloaded / file_size) * 100
print(f'\rProgress: [{bars}] {percent:.1f}%', end='')
print(f"\nFile downloaded successfully to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
raise
if image == "none":
raise ValueError("No image selected")
# Get the absolute path to the JSON file
json_path = os.path.join(parsed_models_path, 'parsed_lora_sdxl_1.0_loras.json')
# Load loras info
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract lora name from image path
image_name = os.path.basename(image)
# Find corresponding lora info
lora_info = next((lora for lora in loras_info
if os.path.basename(lora['image_path']) == image_name), None)
if not lora_info:
raise ValueError(f"No LoRA information found for image: {image_name}")
# Create loras directory if it doesn't exist
lora_dir = os.path.join(folder_paths.models_dir, "loras", "Bjornulf_civitAI", "sdxl_1.0")
os.makedirs(lora_dir, exist_ok=True)
# Expected lora filename
lora_filename = f"{lora_info['name']}.safetensors"
full_lora_path = os.path.join(lora_dir, lora_filename)
# Check if lora is already downloaded
if not os.path.exists(full_lora_path):
print(f"Downloading LoRA {lora_info['name']}...")
# Construct download URL with token
download_url = lora_info['download_url']
if civitai_token:
download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}"
try:
# Download the file
download_file(download_url, lora_dir, lora_info['name'], civitai_token)
except Exception as e:
raise ValueError(f"Failed to download LoRA: {e}")
# Get relative path
relative_lora_path = os.path.join("Bjornulf_civitAI", "sdxl_1.0", lora_filename)
# Load the LoRA
try:
lora_loader = nodes.LoraLoader()
model_lora, clip_lora = lora_loader.load_lora(model=model,
clip=clip,
lora_name=relative_lora_path,
strength_model=strength_model,
strength_clip=strength_clip)
except Exception as e:
raise ValueError(f"Failed to load LoRA: {e}")
# Convert trained words list to comma-separated string
trained_words_str = ", ".join(lora_info.get('trained_words', []))
return (model_lora, clip_lora, lora_info['name'], f"https://civitai.com/models/{lora_info['lora_id']}", trained_words_str)
@classmethod
def IS_CHANGED(s, image, **kwargs):
if image == "none":
return ""
image_path = os.path.join(civitai_base_path, image)
if not os.path.exists(image_path):
return ""
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
m.update(image.encode('utf-8'))
return m.digest().hex()
class CivitAILoraSelectorPONY:
@classmethod
def INPUT_TYPES(s):
# Get list of supported image extensions
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
files = [f"lora_pony/{f}" for f in folder_paths.get_filename_list("lora_pony")
if f.lower().endswith(image_extensions)]
if not files: # If no files found, provide a default option
files = ["none"]
return {
"required": {
"image": (sorted(files), {"image_upload": True}),
"model": ("MODEL",),
"clip": ("CLIP",),
"strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
"strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
"civitai_token": ("STRING", {"default": ""})
},
}
RETURN_TYPES = ("MODEL", "CLIP", "STRING", "STRING", "STRING")
RETURN_NAMES = ("model", "clip", "name", "civitai_url", "trigger_words")
FUNCTION = "load_lora"
CATEGORY = "Bjornulf"
def load_lora(self, image, model, clip, strength_model, strength_clip, civitai_token):
def download_file(url, destination_path, lora_name, api_token=None):
"""
Download file with proper authentication headers and simple progress bar.
"""
filename = f"{lora_name}.safetensors"
file_path = os.path.join(destination_path, filename)
headers = {}
if api_token:
headers['Authorization'] = f'Bearer {api_token}'
try:
print(f"Downloading from: {url}")
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if file_size > 0:
progress = int(50 * downloaded / file_size)
bars = '=' * progress + '-' * (50 - progress)
percent = (downloaded / file_size) * 100
print(f'\rProgress: [{bars}] {percent:.1f}%', end='')
print(f"\nFile downloaded successfully to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
raise
if image == "none":
raise ValueError("No image selected")
# Get the absolute path to the JSON file
json_path = os.path.join(parsed_models_path, 'parsed_lora_pony_loras.json')
# Load loras info
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract lora name from image path
image_name = os.path.basename(image)
# Find corresponding lora info
lora_info = next((lora for lora in loras_info
if os.path.basename(lora['image_path']) == image_name), None)
if not lora_info:
raise ValueError(f"No LoRA information found for image: {image_name}")
# Create loras directory if it doesn't exist
lora_dir = os.path.join(folder_paths.models_dir, "loras", "Bjornulf_civitAI", "pony")
os.makedirs(lora_dir, exist_ok=True)
# Expected lora filename
lora_filename = f"{lora_info['name']}.safetensors"
full_lora_path = os.path.join(lora_dir, lora_filename)
# Check if lora is already downloaded
if not os.path.exists(full_lora_path):
print(f"Downloading LoRA {lora_info['name']}...")
# Construct download URL with token
download_url = lora_info['download_url']
if civitai_token:
download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}"
try:
# Download the file
download_file(download_url, lora_dir, lora_info['name'], civitai_token)
except Exception as e:
raise ValueError(f"Failed to download LoRA: {e}")
# Get relative path
relative_lora_path = os.path.join("Bjornulf_civitAI", "pony", lora_filename)
# Load the LoRA
try:
lora_loader = nodes.LoraLoader()
model_lora, clip_lora = lora_loader.load_lora(model=model,
clip=clip,
lora_name=relative_lora_path,
strength_model=strength_model,
strength_clip=strength_clip)
except Exception as e:
raise ValueError(f"Failed to load LoRA: {e}")
# Convert trained words list to comma-separated string
trained_words_str = ", ".join(lora_info.get('trained_words', []))
return (model_lora, clip_lora, lora_info['name'], f"https://civitai.com/models/{lora_info['lora_id']}", trained_words_str)
@classmethod
def IS_CHANGED(s, image, **kwargs):
if image == "none":
return ""
image_path = os.path.join(civitai_base_path, image)
if not os.path.exists(image_path):
return ""
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
m.update(image.encode('utf-8'))
return m.digest().hex()
class CivitAILoraSelectorHunyuan:
@classmethod
def INPUT_TYPES(s):
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
# Try NSFW folder first
# nsfw_files = [f"NSFW_lora_hunyuan_video/{f}" for f in folder_paths.get_filename_list("NSFW_lora_hunyuan_video")
# if f.lower().endswith(image_extensions)]
# If NSFW folder is empty or doesn't exist, try regular folder
# if not nsfw_files:
files = [f"lora_hunyuan_video/{f}" for f in folder_paths.get_filename_list("lora_hunyuan_video")
if f.lower().endswith(image_extensions)]
# else:
# files = nsfw_files
if not files:
files = ["none"]
return {
"required": {
"image": (sorted(files), {"image_upload": True}),
"model": ("MODEL",),
"clip": ("CLIP",),
"strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
"strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
"civitai_token": ("STRING", {"default": ""})
},
}
RETURN_TYPES = ("MODEL", "CLIP", "STRING", "STRING", "STRING")
RETURN_NAMES = ("model", "clip", "name", "civitai_url", "trigger_words")
FUNCTION = "load_lora"
CATEGORY = "Bjornulf"
def load_lora(self, image, model, clip, strength_model, strength_clip, civitai_token):
def download_file(url, destination_path, lora_name, api_token=None):
filename = f"{lora_name}.safetensors"
file_path = os.path.join(destination_path, filename)
headers = {}
if api_token:
headers['Authorization'] = f'Bearer {api_token}'
try:
print(f"Downloading from: {url}")
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if file_size > 0:
progress = int(50 * downloaded / file_size)
bars = '=' * progress + '-' * (50 - progress)
percent = (downloaded / file_size) * 100
print(f'\rProgress: [{bars}] {percent:.1f}%', end='')
print(f"\nFile downloaded successfully to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
raise
if image == "none":
raise ValueError("No image selected")
# Try loading NSFW JSON first, fall back to regular JSON if not found
# nsfw_json_path = os.path.join(parsed_models_path, 'NSFW_parsed_lora_hunyuan_video_loras.json')
# regular_json_path = os.path.join(parsed_models_path, 'parsed_lora_hunyuan_video_loras.json')
json_path = os.path.join(parsed_models_path, 'parsed_lora_hunyuan_video_loras.json')
# json_path = nsfw_json_path if os.path.exists(nsfw_json_path) else regular_json_path
hunYuan = "hunyuan_video"
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
image_name = os.path.basename(image)
lora_info = next((lora for lora in loras_info
if os.path.basename(lora['image_path']) == image_name), None)
if not lora_info:
raise ValueError(f"No LoRA information found for image: {image_name}")
lora_dir = os.path.join(folder_paths.models_dir, "loras", "Bjornulf_civitAI", hunYuan)
os.makedirs(lora_dir, exist_ok=True)
lora_filename = f"{lora_info['name']}.safetensors"
full_lora_path = os.path.join(lora_dir, lora_filename)
if not os.path.exists(full_lora_path):
print(f"Downloading LoRA {lora_info['name']}...")
download_url = lora_info['download_url']
if civitai_token:
download_url += f"?token={civitai_token}" if '?' not in download_url else f"&token={civitai_token}"
try:
download_file(download_url, lora_dir, lora_info['name'], civitai_token)
except Exception as e:
raise ValueError(f"Failed to download LoRA: {e}")
relative_lora_path = os.path.join("Bjornulf_civitAI", hunYuan, lora_filename)
try:
lora_loader = nodes.LoraLoader()
model_lora, clip_lora = lora_loader.load_lora(model=model,
clip=clip,
lora_name=relative_lora_path,
strength_model=strength_model,
strength_clip=strength_clip)
except Exception as e:
raise ValueError(f"Failed to load LoRA: {e}")
trained_words_str = ", ".join(lora_info.get('trained_words', []))
return (model_lora, clip_lora, lora_info['name'], f"https://civitai.com/models/{lora_info['lora_id']}", trained_words_str)
@classmethod
def IS_CHANGED(s, image, **kwargs):
if image == "none":
return ""
image_path = os.path.join(civitai_base_path, image)
if not os.path.exists(image_path):
return ""
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
m.update(image.encode('utf-8'))
return m.digest().hex()