This commit is contained in:
justumen
2025-02-21 16:09:13 +01:00
parent 980e86c051
commit 3450ddce72
2 changed files with 195 additions and 380 deletions

View File

@@ -14,68 +14,135 @@ import node_helpers
import hashlib
from folder_paths import get_filename_list, get_full_path, models_dir
import nodes
from pathlib import Path
import subprocess
# ======================
# SHARED UTILITY FUNCTIONS
# ======================
def get_civitai_base_paths():
"""Returns common paths for CivitAI integration"""
custom_nodes_dir = Path(__file__).parent.parent.parent.parent
civitai_base_path = custom_nodes_dir / "ComfyUI" / "custom_nodes" / "Bjornulf_custom_nodes" / "civitai"
return custom_nodes_dir, civitai_base_path, civitai_base_path # Last one is parsed_models_path
def setup_checkpoint_directory(model_type):
"""Creates and registers checkpoint directory for specific model type"""
_, _, parsed_models_path = get_civitai_base_paths()
checkpoint_dir = Path(folder_paths.models_dir) / "checkpoints" / "Bjornulf_civitAI" / model_type
checkpoint_dir.mkdir(parents=True, exist_ok=True)
checkpoint_folders = list(folder_paths.folder_names_and_paths["checkpoints"])
if str(checkpoint_dir) not in checkpoint_folders:
checkpoint_folders.append(str(checkpoint_dir))
folder_paths.folder_names_and_paths["checkpoints"] = tuple(checkpoint_folders)
return checkpoint_dir, parsed_models_path
def setup_image_folders(folder_specs, parent_dir=""):
"""Creates and registers image folders for different model types
Args:
folder_specs: Dictionary of folder_name -> sub_path
parent_dir: Optional subdirectory to place links under in input folder
"""
_, civitai_base_path, _ = get_civitai_base_paths()
for folder_name, sub_path in folder_specs.items():
full_path = civitai_base_path / sub_path
folder_paths.add_model_folder_path(folder_name, str(full_path))
create_symlink(full_path, folder_name, parent_dir)
# Code works, tested on linux and
def create_symlink(source, target_name, parent_dir=None):
"""Creates a symlink inside the ComfyUI/input directory on Linux and Windows."""
if os.name == 'nt': # Windows
comfyui_input = Path("ComfyUI/input")
else:
comfyui_input = Path("input")
if parent_dir:
parent_path = comfyui_input / parent_dir
parent_path.mkdir(parents=True, exist_ok=True)
target = parent_path / target_name
else:
target = comfyui_input / target_name
if not target.exists():
try:
if os.name == 'nt': # Windows
base_path = Path(__file__).resolve().parent # Get script location
source = base_path / "ComfyUI" / source # Ensure it points inside ComfyUI
try:
target.symlink_to(source, target_is_directory=source.is_dir())
#print(f"✅ Symlink created: {target} -> {source}")
except OSError:
if source.is_dir():
cmd = [
"powershell", "New-Item", "-ItemType", "Junction",
"-Path", str(target), "-Value", str(source)
]
subprocess.run(cmd, check=True, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
#print(f"✅ Junction created: {target} -> {source}")
else:
print(f"❌ Failed to create symlink/junction for {target_name}.")
else:
target.symlink_to(source, target_is_directory=True)
#print(f"✅ Symlink created: {target} -> {source}")
except Exception as e:
print(f"❌ Failed to create symlink for {target_name}: {e}")
def download_file(url, destination_path, model_name, api_token=None):
"""Universal downloader with progress tracking"""
headers = {'Authorization': f'Bearer {api_token}'} if api_token else {}
filename = f"{model_name}.safetensors"
file_path = Path(destination_path) / filename
try:
with requests.get(url, headers=headers, stream=True) as response:
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
# Register the new checkpoint folder
bjornulf_checkpoint_path = os.path.join(folder_paths.models_dir, "checkpoints", "Bjornulf_civitAI")
os.makedirs(bjornulf_checkpoint_path, exist_ok=True)
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Add progress reporting here if needed
return str(file_path)
except Exception as e:
raise RuntimeError(f"Download failed: {str(e)}")
# Convert tuple to list, append new path, and convert back to tuple
# Set up main checkpoint directory
_, civitai_base_path, parsed_models_path = get_civitai_base_paths()
bjornulf_checkpoint_path = Path(folder_paths.models_dir) / "checkpoints" / "Bjornulf_civitAI"
bjornulf_checkpoint_path.mkdir(parents=True, exist_ok=True)
# Register the main checkpoint folder
checkpoint_folders = list(folder_paths.folder_names_and_paths["checkpoints"])
checkpoint_folders.append(bjornulf_checkpoint_path)
folder_paths.folder_names_and_paths["checkpoints"] = tuple(checkpoint_folders)
# Prepare Models
custom_nodes_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
civitai_base_path = os.path.join(custom_nodes_dir, "ComfyUI", "custom_nodes", "Bjornulf_custom_nodes", "civitai")
parsed_models_path = civitai_base_path
if str(bjornulf_checkpoint_path) not in checkpoint_folders:
checkpoint_folders.append(str(bjornulf_checkpoint_path))
folder_paths.folder_names_and_paths["checkpoints"] = tuple(checkpoint_folders)
# Define image folders
image_folders = {
"sdxl_1.0": os.path.join(civitai_base_path, "sdxl_1.0"),
"sd_1.5": os.path.join(civitai_base_path, "sd_1.5"),
"pony": os.path.join(civitai_base_path, "pony"),
"flux.1_d": os.path.join(civitai_base_path, "flux.1_d"),
"flux.1_s": os.path.join(civitai_base_path, "flux.1_s"),
"lora_sdxl_1.0": os.path.join(civitai_base_path, "lora_sdxl_1.0"),
"lora_sd_1.5": os.path.join(civitai_base_path, "lora_sd_1.5"),
"lora_pony": os.path.join(civitai_base_path, "lora_pony"),
"lora_flux.1_d": os.path.join(civitai_base_path, "lora_flux.1_d"),
"lora_hunyuan_video": os.path.join(civitai_base_path, "lora_hunyuan_video"),
"sdxl_1.0": "sdxl_1.0",
"sd_1.5": "sd_1.5",
"pony": "pony",
"flux.1_d": "flux.1_d",
"flux.1_s": "flux.1_s",
"lora_sdxl_1.0": "lora_sdxl_1.0",
"lora_sd_1.5": "lora_sd_1.5",
"lora_pony": "lora_pony",
"lora_flux.1_d": "lora_flux.1_d",
"lora_hunyuan_video": "lora_hunyuan_video",
# "NSFW_lora_hunyuan_video": "NSFW_lora_hunyuan_video"
}
# "NSFW_lora_hunyuan_video": os.path.join(civitai_base_path, "NSFW_lora_hunyuan_video")
# Add folder paths for each image folder
for folder_name, folder_path in image_folders.items():
folder_paths.add_model_folder_path(folder_name, folder_path)
# Create target paths in input directory
target_path = os.path.join('input', folder_name)
# Create link if it doesn't exist
if not os.path.exists(target_path):
# try:
if os.name == 'nt': # Windows
os.system(f'mklink /J "{target_path}" "{folder_path}"')
else: # Unix-like
os.symlink(folder_path, target_path)
#print(f"Successfully created link from {folder_path} to {target_path}")
# except OSError as e:
# print(f"Failed to create link: {e}")
# Prepare Loras
# lora_images_path = os.path.join(custom_nodes_dir, "ComfyUI", "custom_nodes", "Bjornulf_custom_nodes", "civitai", "lora_images")
# folder_paths.add_model_folder_path("lora_images", lora_images_path)
# target_lora_path = os.path.join('input', 'lora_images')
# # Create link if it doesn't exist
# if not os.path.exists(target_lora_path):
# try:
# if os.name == 'nt': # Windows
# os.system(f'mklink /J "{target_lora_path}" "{lora_images_path}"')
# else: # Unix-like
# os.symlink(lora_images_path, target_lora_path)
# print(f"Successfully created link from {lora_images_path} to {target_lora_path}")
# except OSError as e:
# print(f"Failed to create link: {e}")
# Set up image folders using the function, placing links under input/Bjornulf/
setup_image_folders(image_folders)
def get_civitai():
import civitai
@@ -85,10 +152,13 @@ def get_civitai():
# Check if the environment variable exists
if "CIVITAI_API_TOKEN" not in os.environ:
os.environ["CIVITAI_API_TOKEN"] = "d5fc336223a367e6b503a14a10569825"
else:
print("CIVITAI_API_TOKEN already exists in the environment.")
# else:
# print("CIVITAI_API_TOKEN already exists in the environment.")
import civitai
# ======================
# GENERATE WITH CIVITAI
# ======================
class APIGenerateCivitAI:
@classmethod
def INPUT_TYPES(cls):
@@ -170,7 +240,6 @@ class APIGenerateCivitAI:
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.metadata_dir, exist_ok=True)
self._interrupt_event = threading.Event()
def get_next_number(self):
"""Get the next available number for file naming"""
@@ -445,6 +514,10 @@ class APIGenerateCivitAIAddLORA:
print(f"Error adding LORA: {str(e)}")
return (json.dumps({"additionalNetworks": {}}),)
# ======================
# MODEL SELECTOR CLASSES
# ======================
class CivitAIModelSelectorSD15:
@classmethod
def INPUT_TYPES(s):
@@ -469,47 +542,6 @@ class CivitAIModelSelectorSD15:
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
def download_file(url, destination_path, model_name, api_token=None):
"""
Download file with proper authentication headers and simple progress bar.
"""
filename = f"{model_name}.safetensors"
file_path = os.path.join(destination_path, filename)
headers = {}
if api_token:
headers['Authorization'] = f'Bearer {api_token}'
try:
print(f"Downloading from: {url}")
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
# Get file size if available
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Calculate progress
if file_size > 0:
progress = int(50 * downloaded / file_size)
bars = '=' * progress + '-' * (50 - progress)
percent = (downloaded / file_size) * 100
print(f'\rProgress: [{bars}] {percent:.1f}%', end='')
print(f"\nFile downloaded successfully to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
raise
if image == "none":
raise ValueError("No image selected")
@@ -517,8 +549,13 @@ class CivitAIModelSelectorSD15:
json_path = os.path.join(parsed_models_path, 'parsed_sd_1.5_models.json')
# Load models info
with open(json_path, 'r') as f:
models_info = json.load(f)
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
@@ -606,47 +643,6 @@ class CivitAIModelSelectorSDXL:
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
def download_file(url, destination_path, model_name, api_token=None):
"""
Download file with proper authentication headers and simple progress bar.
"""
filename = f"{model_name}.safetensors"
file_path = os.path.join(destination_path, filename)
headers = {}
if api_token:
headers['Authorization'] = f'Bearer {api_token}'
try:
print(f"Downloading from: {url}")
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
# Get file size if available
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Calculate progress
if file_size > 0:
progress = int(50 * downloaded / file_size)
bars = '=' * progress + '-' * (50 - progress)
percent = (downloaded / file_size) * 100
print(f'\rProgress: [{bars}] {percent:.1f}%', end='')
print(f"\nFile downloaded successfully to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
raise
if image == "none":
raise ValueError("No image selected")
@@ -654,8 +650,13 @@ class CivitAIModelSelectorSDXL:
json_path = os.path.join(parsed_models_path, 'parsed_sdxl_1.0_models.json')
# Load models info
with open(json_path, 'r') as f:
models_info = json.load(f)
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
@@ -719,6 +720,7 @@ class CivitAIModelSelectorSDXL:
m.update(image.encode('utf-8'))
return m.digest().hex()
class CivitAIModelSelectorFLUX_D:
@classmethod
def INPUT_TYPES(s):
@@ -743,47 +745,6 @@ class CivitAIModelSelectorFLUX_D:
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
def download_file(url, destination_path, model_name, api_token=None):
"""
Download file with proper authentication headers and simple progress bar.
"""
filename = f"{model_name}.safetensors"
file_path = os.path.join(destination_path, filename)
headers = {}
if api_token:
headers['Authorization'] = f'Bearer {api_token}'
try:
print(f"Downloading from: {url}")
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
# Get file size if available
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Calculate progress
if file_size > 0:
progress = int(50 * downloaded / file_size)
bars = '=' * progress + '-' * (50 - progress)
percent = (downloaded / file_size) * 100
print(f'\rProgress: [{bars}] {percent:.1f}%', end='')
print(f"\nFile downloaded successfully to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
raise
if image == "none":
raise ValueError("No image selected")
@@ -791,8 +752,13 @@ class CivitAIModelSelectorFLUX_D:
json_path = os.path.join(parsed_models_path, 'parsed_flux.1_d_models.json')
# Load models info
with open(json_path, 'r') as f:
models_info = json.load(f)
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
@@ -880,47 +846,6 @@ class CivitAIModelSelectorFLUX_S:
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
def download_file(url, destination_path, model_name, api_token=None):
"""
Download file with proper authentication headers and simple progress bar.
"""
filename = f"{model_name}.safetensors"
file_path = os.path.join(destination_path, filename)
headers = {}
if api_token:
headers['Authorization'] = f'Bearer {api_token}'
try:
print(f"Downloading from: {url}")
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
# Get file size if available
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Calculate progress
if file_size > 0:
progress = int(50 * downloaded / file_size)
bars = '=' * progress + '-' * (50 - progress)
percent = (downloaded / file_size) * 100
print(f'\rProgress: [{bars}] {percent:.1f}%', end='')
print(f"\nFile downloaded successfully to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
raise
if image == "none":
raise ValueError("No image selected")
@@ -928,8 +853,13 @@ class CivitAIModelSelectorFLUX_S:
json_path = os.path.join(parsed_models_path, 'parsed_flux.1_s_models.json')
# Load models info
with open(json_path, 'r') as f:
models_info = json.load(f)
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
@@ -1017,47 +947,6 @@ class CivitAIModelSelectorPony:
CATEGORY = "Bjornulf"
def load_model(self, image, civitai_token):
def download_file(url, destination_path, model_name, api_token=None):
"""
Download file with proper authentication headers and simple progress bar.
"""
filename = f"{model_name}.safetensors"
file_path = os.path.join(destination_path, filename)
headers = {}
if api_token:
headers['Authorization'] = f'Bearer {api_token}'
try:
print(f"Downloading from: {url}")
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
# Get file size if available
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Calculate progress
if file_size > 0:
progress = int(50 * downloaded / file_size)
bars = '=' * progress + '-' * (50 - progress)
percent = (downloaded / file_size) * 100
print(f'\rProgress: [{bars}] {percent:.1f}%', end='')
print(f"\nFile downloaded successfully to: {file_path}")
return file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
raise
if image == "none":
raise ValueError("No image selected")
@@ -1065,8 +954,13 @@ class CivitAIModelSelectorPony:
json_path = os.path.join(parsed_models_path, 'parsed_pony_models.json')
# Load models info
with open(json_path, 'r') as f:
models_info = json.load(f)
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract model name from image path
image_name = os.path.basename(image)
@@ -1129,105 +1023,6 @@ class CivitAIModelSelectorPony:
m.update(image.encode('utf-8'))
return m.digest().hex()
# class CivitAILoraSelector:
# @classmethod
# def INPUT_TYPES(s):
# # Get list of supported image extensions
# image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')
# files = [f"lora_images/{f}" for f in folder_paths.get_filename_list("lora_images")
# if f.lower().endswith(image_extensions)]
# if not files: # If no files found, provide a default option
# files = ["none"]
# return {"required":
# {"image": (sorted(files), {"image_upload": True})}, # Added image_upload option here
# }
# RETURN_TYPES = ("IMAGE", "STRING")
# RETURN_NAMES = ("image", "image_name")
# FUNCTION = "load_image"
# CATEGORY = "Bjornulf"
# def load_image(self, image):
# if image == "none":
# # Return a small blank image if no image is selected
# blank_image = torch.zeros((1, 64, 64, 3), dtype=torch.float32)
# return (blank_image, "none")
# image_path = os.path.join(lora_images_path, image)
# if not os.path.exists(image_path):
# raise FileNotFoundError(f"Image not found: {image_path}")
# # Copy the image to ComfyUI/input directory
# input_dir = folder_paths.get_input_directory()
# dest_path = os.path.join(input_dir, os.path.basename(image))
# try:
# shutil.copy2(image_path, dest_path)
# except Exception as e:
# print(f"Warning: Failed to copy image to input directory: {e}")
# img = node_helpers.pillow(Image.open, image_path)
# output_images = []
# w, h = None, None
# excluded_formats = ['MPO']
# for i in ImageSequence.Iterator(img):
# i = node_helpers.pillow(ImageOps.exif_transpose, i)
# if i.mode == 'I':
# i = i.point(lambda i: i * (1 / 255))
# image = i.convert("RGBA")
# if len(output_images) == 0:
# w = image.size[0]
# h = image.size[1]
# if image.size[0] != w or image.size[1] != h:
# continue
# image = np.array(image).astype(np.float32) / 255.0
# image = torch.from_numpy(image)[None,]
# output_images.append(image)
# if len(output_images) > 1 and img.format not in excluded_formats:
# output_image = torch.cat(output_images, dim=0)
# else:
# output_image = output_images[0]
# return (output_image, image)
# @classmethod
# def IS_CHANGED(s, image):
# if image == "none":
# return ""
# # Use the full path for the image
# image_path = os.path.join(lora_images_path, image)
# if not os.path.exists(image_path):
# return ""
# # Calculate hash of the image content
# m = hashlib.sha256()
# with open(image_path, 'rb') as f:
# m.update(f.read())
# # Include the image name in the hash to ensure updates when selection changes
# m.update(image.encode('utf-8'))
# return m.digest().hex()
# @classmethod
# def VALIDATE_INPUTS(s, image):
# if image == "none":
# return True
# image_path = os.path.join(lora_images_path, image)
# if not os.path.exists(image_path):
# return f"Invalid image file: {image}"
# return True
class CivitAILoraSelectorSD15:
@classmethod
def INPUT_TYPES(s):
@@ -1302,8 +1097,13 @@ class CivitAILoraSelectorSD15:
json_path = os.path.join(parsed_models_path, 'parsed_lora_sd_1.5_loras.json')
# Load loras info
with open(json_path, 'r') as f:
loras_info = json.load(f)
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract lora name from image path
image_name = os.path.basename(image)
@@ -1445,8 +1245,13 @@ class CivitAILoraSelectorSDXL:
json_path = os.path.join(parsed_models_path, 'parsed_lora_sdxl_1.0_loras.json')
# Load loras info
with open(json_path, 'r') as f:
loras_info = json.load(f)
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract lora name from image path
image_name = os.path.basename(image)
@@ -1588,8 +1393,13 @@ class CivitAILoraSelectorPONY:
json_path = os.path.join(parsed_models_path, 'parsed_lora_pony_loras.json')
# Load loras info
with open(json_path, 'r') as f:
loras_info = json.load(f)
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
# Extract lora name from image path
image_name = os.path.basename(image)
@@ -1742,8 +1552,13 @@ class CivitAILoraSelectorHunyuan:
# json_path = nsfw_json_path if os.path.exists(nsfw_json_path) else regular_json_path
hunYuan = "hunyuan_video"
with open(json_path, 'r') as f:
loras_info = json.load(f)
try:
with open(json_path, 'r', encoding='utf-8') as f:
models_info = json.load(f)
except UnicodeDecodeError:
# Fallback to latin-1 if UTF-8 fails
with open(json_path, 'r', encoding='latin-1') as f:
models_info = json.load(f)
image_name = os.path.basename(image)
lora_info = next((lora for lora in loras_info