Files
ComfyUI-Lora-Manager/py/nodes/lora_randomizer.py
Will Miao 6a17e75782 docs: add frontend UI architecture and ComfyUI widget guidelines
- Document dual UI systems: standalone web UI and ComfyUI custom node widgets
- Add ComfyUI widget development guidelines including styling and constraints
- Update terminology in LoraRandomizerNode from 'frontend/backend' to 'fixed/always' for clarity
- Include UI constraints for ComfyUI widgets: minimize vertical space, avoid dynamic height changes, keep UI simple
2026-01-13 11:20:50 +08:00

314 lines
11 KiB
Python

"""
Lora Randomizer Node - Randomly selects LoRAs from a pool with configurable settings.
This node accepts optional pool_config input to filter available LoRAs, and outputs
a LORA_STACK with randomly selected LoRAs. Returns UI updates with new random LoRAs
and tracks the last used combination for reuse.
"""
import logging
import random
import os
from ..utils.utils import get_lora_info
from .utils import extract_lora_name
logger = logging.getLogger(__name__)
class LoraRandomizerNode:
"""Node that randomly selects LoRAs from a pool"""
NAME = "Lora Randomizer (LoraManager)"
CATEGORY = "Lora Manager/randomizer"
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"randomizer_config": ("RANDOMIZER_CONFIG", {}),
"loras": ("LORAS", {}),
},
"optional": {
"pool_config": ("POOL_CONFIG", {}),
},
}
RETURN_TYPES = ("LORA_STACK",)
RETURN_NAMES = ("LORA_STACK",)
FUNCTION = "randomize"
OUTPUT_NODE = False
async def randomize(self, randomizer_config, loras, pool_config=None):
"""
Randomize LoRAs based on configuration and pool filters.
Args:
randomizer_config: Dict with randomizer settings (count, strength ranges, roll_mode)
loras: List of LoRA dicts from LORAS widget (includes locked state)
pool_config: Optional config from LoRA Pool node for filtering
Returns:
Dictionary with 'result' (LORA_STACK tuple) and 'ui' (for widget display)
"""
from ..services.service_registry import ServiceRegistry
roll_mode = randomizer_config.get("roll_mode", "always")
logger.debug(f"[LoraRandomizerNode] roll_mode: {roll_mode}")
if roll_mode == "fixed":
ui_loras = loras
else:
scanner = await ServiceRegistry.get_lora_scanner()
ui_loras = await self._generate_random_loras_for_ui(
scanner, randomizer_config, loras, pool_config
)
execution_stack = self._build_execution_stack_from_input(loras)
return {
"result": (execution_stack,),
"ui": {"loras": ui_loras, "last_used": loras},
}
def _build_execution_stack_from_input(self, loras):
"""
Build LORA_STACK tuple from input loras list for execution.
Args:
loras: List of LoRA dicts with name, strength, clipStrength, active
Returns:
List of tuples (lora_path, model_strength, clip_strength)
"""
lora_stack = []
for lora in loras:
if not lora.get("active", False):
continue
# Get file path
lora_path, trigger_words = get_lora_info(lora["name"])
if not lora_path:
logger.warning(
f"[LoraRandomizerNode] Could not find path for LoRA: {lora['name']}"
)
continue
# Normalize path separators
lora_path = lora_path.replace("/", os.sep)
# Extract strengths
model_strength = lora.get("strength", 1.0)
clip_strength = lora.get("clipStrength", model_strength)
lora_stack.append((lora_path, model_strength, clip_strength))
return lora_stack
async def _generate_random_loras_for_ui(
self, scanner, randomizer_config, input_loras, pool_config=None
):
"""
Generate new random loras for UI display.
Args:
scanner: LoraScanner instance
randomizer_config: Dict with randomizer settings
input_loras: Current input loras (for extracting locked loras)
pool_config: Optional pool filters
Returns:
List of LoRA dicts for UI display
"""
# Parse randomizer settings
count_mode = randomizer_config.get("count_mode", "range")
count_fixed = randomizer_config.get("count_fixed", 5)
count_min = randomizer_config.get("count_min", 3)
count_max = randomizer_config.get("count_max", 7)
model_strength_min = randomizer_config.get("model_strength_min", 0.0)
model_strength_max = randomizer_config.get("model_strength_max", 1.0)
use_same_clip_strength = randomizer_config.get("use_same_clip_strength", True)
clip_strength_min = randomizer_config.get("clip_strength_min", 0.0)
clip_strength_max = randomizer_config.get("clip_strength_max", 1.0)
# Determine target count
if count_mode == "fixed":
target_count = count_fixed
else:
target_count = random.randint(count_min, count_max)
# Extract locked LoRAs from input
locked_loras = [lora for lora in input_loras if lora.get("locked", False)]
locked_count = len(locked_loras)
# Get available loras from cache
try:
cache_data = await scanner.get_cached_data(force_refresh=False)
if cache_data and hasattr(cache_data, "raw_data"):
available_loras = cache_data.raw_data
else:
available_loras = []
except Exception as e:
logger.warning(f"[LoraRandomizerNode] Failed to get lora cache: {e}")
available_loras = []
# Apply pool filters if provided
if pool_config:
available_loras = await self._apply_pool_filters(
available_loras, pool_config, scanner
)
# Calculate how many new LoRAs to select
slots_needed = target_count - locked_count
if slots_needed < 0:
slots_needed = 0
# Too many locked, trim to target
locked_loras = locked_loras[:target_count]
locked_count = len(locked_loras)
# Filter out locked LoRAs from available pool
locked_names = {lora["name"] for lora in locked_loras}
available_pool = [
l for l in available_loras if l["file_name"] not in locked_names
]
# Ensure we don't try to select more than available
if slots_needed > len(available_pool):
slots_needed = len(available_pool)
# Random sample
selected = []
if slots_needed > 0:
selected = random.sample(available_pool, slots_needed)
# Generate random strengths for selected LoRAs
result_loras = []
for lora in selected:
model_str = round(random.uniform(model_strength_min, model_strength_max), 2)
if use_same_clip_strength:
clip_str = model_str
else:
clip_str = round(
random.uniform(clip_strength_min, clip_strength_max), 2
)
result_loras.append(
{
"name": lora["file_name"],
"strength": model_str,
"clipStrength": clip_str,
"active": True,
"expanded": abs(model_str - clip_str) > 0.001,
"locked": False,
}
)
# Merge with locked LoRAs
result_loras.extend(locked_loras)
return result_loras
async def _apply_pool_filters(self, available_loras, pool_config, scanner):
"""
Apply pool_config filters to available LoRAs.
Args:
available_loras: List of all LoRA dicts
pool_config: Dict with filter settings from LoRA Pool node
scanner: Scanner instance for accessing filter utilities
Returns:
Filtered list of LoRA dicts
"""
from ..services.lora_service import LoraService
from ..services.model_query import FilterCriteria
# Create lora service instance for filtering
lora_service = LoraService(scanner)
# Extract filter parameters from pool_config
selected_base_models = pool_config.get("baseModels", [])
tags_dict = pool_config.get("tags", {})
include_tags = tags_dict.get("include", [])
exclude_tags = tags_dict.get("exclude", [])
folders_dict = pool_config.get("folders", {})
include_folders = folders_dict.get("include", [])
exclude_folders = folders_dict.get("exclude", [])
license_dict = pool_config.get("license", {})
no_credit_required = license_dict.get("noCreditRequired", False)
allow_selling = license_dict.get("allowSelling", False)
# Build tag filters dict
tag_filters = {}
for tag in include_tags:
tag_filters[tag] = "include"
for tag in exclude_tags:
tag_filters[tag] = "exclude"
# Build folder filter
# LoRA Pool uses include/exclude folders, we need to apply this logic
# For now, we'll filter based on folder path matching
if include_folders or exclude_folders:
filtered = []
for lora in available_loras:
folder = lora.get("folder", "")
# Check exclude folders first
excluded = False
for exclude_folder in exclude_folders:
if folder.startswith(exclude_folder):
excluded = True
break
if excluded:
continue
# Check include folders
if include_folders:
included = False
for include_folder in include_folders:
if folder.startswith(include_folder):
included = True
break
if not included:
continue
filtered.append(lora)
available_loras = filtered
# Apply base model filter
if selected_base_models:
available_loras = [
lora
for lora in available_loras
if lora.get("base_model") in selected_base_models
]
# Apply tag filters
if tag_filters:
criteria = FilterCriteria(tags=tag_filters)
available_loras = lora_service.filter_set.apply(available_loras, criteria)
# Apply license filters
# Note: no_credit_required=True means filter out models where credit is NOT required
# (i.e., keep only models where credit IS required)
if no_credit_required:
available_loras = [
lora
for lora in available_loras
if not (lora.get("license_flags", 127) & (1 << 0))
]
# allow_selling=True means keep only models where selling generated content is allowed
if allow_selling:
available_loras = [
lora
for lora in available_loras
if bool(lora.get("license_flags", 127) & (1 << 1))
]
return available_loras