import logging import json import os from typing import Dict, List, Optional from .base_model_service import BaseModelService from .model_query import resolve_sub_type from .auto_tag_service import extract_auto_tags from ..utils.models import LoraMetadata from ..config import config logger = logging.getLogger(__name__) class LoraService(BaseModelService): """LoRA-specific service implementation""" def __init__(self, scanner, update_service=None): """Initialize LoRA service Args: scanner: LoRA scanner instance update_service: Optional service for remote update tracking. """ super().__init__("lora", scanner, LoraMetadata, update_service=update_service) async def format_response(self, lora_data: Dict) -> Optional[Dict]: """Format LoRA data for API response. Returns None when the entry is missing critical fields (corrupted cache row), so the handler layer can filter it out instead of crashing the whole listing request. See issue #730. """ # Guard against corrupted cache entries missing critical fields file_path = lora_data.get("file_path") if not file_path or not isinstance(file_path, str): logger.warning( "Skipping corrupted LoRA entry (missing file_path): %s", lora_data.get("file_name", ""), ) return None # Resolve sub_type using priority: sub_type > model_type > civitai.model.type > default # Normalize to lowercase for consistent API responses sub_type = resolve_sub_type(lora_data).lower() file_name = lora_data.get("file_name") or "" model_name = lora_data.get("model_name") or file_name folder = lora_data.get("folder") or "" return { "model_name": model_name, "file_name": file_name, "preview_url": config.get_preview_static_url( lora_data.get("preview_url", "") ), "preview_nsfw_level": lora_data.get("preview_nsfw_level", 0), "base_model": lora_data.get("base_model", ""), "folder": folder, "sha256": lora_data.get("sha256", ""), "file_path": file_path.replace(os.sep, "/"), "file_size": lora_data.get("size", 0), "modified": lora_data.get("modified", ""), "tags": lora_data.get("tags", []), "from_civitai": lora_data.get("from_civitai", True), "usage_count": lora_data.get("usage_count", 0), "usage_tips": lora_data.get("usage_tips", ""), "notes": lora_data.get("notes", ""), "favorite": lora_data.get("favorite", False), "exclude": bool(lora_data.get("exclude", False)), "update_available": bool(lora_data.get("update_available", False)), "skip_metadata_refresh": bool( lora_data.get("skip_metadata_refresh", False) ), "sub_type": sub_type, "civitai": self.filter_civitai_data( lora_data.get("civitai", {}), minimal=True ), "auto_tags": lora_data.get("auto_tags") or extract_auto_tags(lora_data), "version_count": lora_data.get("version_count"), } async def _apply_specific_filters(self, data: List[Dict], **kwargs) -> List[Dict]: """Apply LoRA-specific filters""" # Handle first_letter filter for LoRAs first_letter = kwargs.get("first_letter") if first_letter: data = self._filter_by_first_letter(data, first_letter) # Handle name pattern filters name_pattern_include = kwargs.get("name_pattern_include", []) name_pattern_exclude = kwargs.get("name_pattern_exclude", []) name_pattern_use_regex = kwargs.get("name_pattern_use_regex", False) if name_pattern_include or name_pattern_exclude: import re def matches_pattern(name, pattern, use_regex): """Check if name matches pattern (regex or substring)""" if not name: return False if use_regex: try: return bool(re.search(pattern, name, re.IGNORECASE)) except re.error: # Invalid regex, fall back to substring match return pattern.lower() in name.lower() else: return pattern.lower() in name.lower() def matches_any_pattern(name, patterns, use_regex): """Check if name matches any of the patterns""" if not patterns: return True return any(matches_pattern(name, p, use_regex) for p in patterns) filtered = [] for lora in data: model_name = lora.get("model_name", "") file_name = lora.get("file_name", "") names_to_check = [n for n in [model_name, file_name] if n] # Check exclude patterns first excluded = False if name_pattern_exclude: for name in names_to_check: if matches_any_pattern( name, name_pattern_exclude, name_pattern_use_regex ): excluded = True break if excluded: continue # Check include patterns if name_pattern_include: included = False for name in names_to_check: if matches_any_pattern( name, name_pattern_include, name_pattern_use_regex ): included = True break if not included: continue filtered.append(lora) data = filtered return data def _filter_by_first_letter(self, data: List[Dict], letter: str) -> List[Dict]: """Filter data by first letter of model name Special handling: - '#': Numbers (0-9) - '@': Special characters (not alphanumeric) - '漢': CJK characters """ filtered_data = [] for lora in data: model_name = lora.get("model_name", "") if not model_name: continue first_char = model_name[0].upper() if letter == "#" and first_char.isdigit(): filtered_data.append(lora) elif letter == "@" and not first_char.isalnum(): # Special characters (not alphanumeric) filtered_data.append(lora) elif letter == "漢" and self._is_cjk_character(first_char): # CJK characters filtered_data.append(lora) elif letter.upper() == first_char: # Regular alphabet matching filtered_data.append(lora) return filtered_data def _is_cjk_character(self, char: str) -> bool: """Check if character is a CJK character""" # Define Unicode ranges for CJK characters cjk_ranges = [ (0x4E00, 0x9FFF), # CJK Unified Ideographs (0x3400, 0x4DBF), # CJK Unified Ideographs Extension A (0x20000, 0x2A6DF), # CJK Unified Ideographs Extension B (0x2A700, 0x2B73F), # CJK Unified Ideographs Extension C (0x2B740, 0x2B81F), # CJK Unified Ideographs Extension D (0x2B820, 0x2CEAF), # CJK Unified Ideographs Extension E (0x2CEB0, 0x2EBEF), # CJK Unified Ideographs Extension F (0x30000, 0x3134F), # CJK Unified Ideographs Extension G (0xF900, 0xFAFF), # CJK Compatibility Ideographs (0x3300, 0x33FF), # CJK Compatibility (0x3200, 0x32FF), # Enclosed CJK Letters and Months (0x3100, 0x312F), # Bopomofo (0x31A0, 0x31BF), # Bopomofo Extended (0x3040, 0x309F), # Hiragana (0x30A0, 0x30FF), # Katakana (0x31F0, 0x31FF), # Katakana Phonetic Extensions (0xAC00, 0xD7AF), # Hangul Syllables (0x1100, 0x11FF), # Hangul Jamo (0xA960, 0xA97F), # Hangul Jamo Extended-A (0xD7B0, 0xD7FF), # Hangul Jamo Extended-B ] code_point = ord(char) return any(start <= code_point <= end for start, end in cjk_ranges) # LoRA-specific methods async def get_letter_counts(self) -> Dict[str, int]: """Get count of LoRAs for each letter of the alphabet""" cache = await self.scanner.get_cached_data() data = cache.raw_data # Define letter categories letters = { "#": 0, # Numbers "A": 0, "B": 0, "C": 0, "D": 0, "E": 0, "F": 0, "G": 0, "H": 0, "I": 0, "J": 0, "K": 0, "L": 0, "M": 0, "N": 0, "O": 0, "P": 0, "Q": 0, "R": 0, "S": 0, "T": 0, "U": 0, "V": 0, "W": 0, "X": 0, "Y": 0, "Z": 0, "@": 0, # Special characters "漢": 0, # CJK characters } # Count models for each letter for lora in data: model_name = lora.get("model_name", "") if not model_name: continue first_char = model_name[0].upper() if first_char.isdigit(): letters["#"] += 1 elif first_char in letters: letters[first_char] += 1 elif self._is_cjk_character(first_char): letters["漢"] += 1 elif not first_char.isalnum(): letters["@"] += 1 return letters async def get_lora_trigger_words(self, lora_name: str) -> List[str]: """Get trigger words for a specific LoRA file""" cache = await self.scanner.get_cached_data() for lora in cache.raw_data: if lora["file_name"] == lora_name: civitai_data = lora.get("civitai", {}) return civitai_data.get("trainedWords", []) return [] async def get_lora_usage_tips_by_relative_path( self, relative_path: str ) -> Optional[str]: """Get usage tips for a LoRA by its relative path""" cache = await self.scanner.get_cached_data() for lora in cache.raw_data: file_path = lora.get("file_path", "") if file_path: # Convert to forward slashes and extract relative path file_path_normalized = file_path.replace("\\", "/") relative_path = relative_path.replace("\\", "/") # Find the relative path part by looking for the relative_path in the full path if ( file_path_normalized.endswith(relative_path) or relative_path in file_path_normalized ): return lora.get("usage_tips", "") return None @staticmethod def get_recommended_strength_from_lora_data(lora_data: Dict) -> Optional[float]: """Parse usage_tips JSON and extract recommended model strength.""" try: usage_tips = lora_data.get("usage_tips", "") if not usage_tips: return None tips_data = json.loads(usage_tips) return tips_data.get("strength") except (json.JSONDecodeError, TypeError, AttributeError): return None @staticmethod def get_recommended_clip_strength_from_lora_data( lora_data: Dict, ) -> Optional[float]: """Parse usage_tips JSON and extract recommended clip strength.""" try: usage_tips = lora_data.get("usage_tips", "") if not usage_tips: return None tips_data = json.loads(usage_tips) return tips_data.get("clipStrength") except (json.JSONDecodeError, TypeError, AttributeError): return None async def get_lora_metadata_by_filename(self, filename: str) -> Optional[Dict]: """Return cached raw metadata for a LoRA matching the given filename.""" cache = await self.scanner.get_cached_data(force_refresh=False) fn_normalized = filename.replace("\\", "/") fn_no_ext = fn_normalized for ext in (".safetensors", ".ckpt", ".pt", ".bin"): if fn_no_ext.lower().endswith(ext): fn_no_ext = fn_no_ext[: -len(ext)] break for lora in cache.raw_data if cache else []: file_name = lora.get("file_name", "") folder = lora.get("folder", "") file_name_no_ext = file_name for ext in (".safetensors", ".ckpt", ".pt", ".bin"): if file_name_no_ext.lower().endswith(ext): file_name_no_ext = file_name_no_ext[: -len(ext)] break path_name = f"{folder}/{file_name_no_ext}".replace("\\", "/") if folder else file_name_no_ext if fn_no_ext in (file_name_no_ext, path_name): return lora return None def find_duplicate_hashes(self) -> Dict: """Find LoRAs with duplicate SHA256 hashes""" return self.scanner._hash_index.get_duplicate_hashes() def find_duplicate_filenames(self) -> Dict: """Find LoRAs with conflicting filenames""" return self.scanner._hash_index.get_duplicate_filenames() async def get_random_loras( self, count: int, model_strength_min: float = 0.0, model_strength_max: float = 1.0, use_same_clip_strength: bool = True, clip_strength_min: float = 0.0, clip_strength_max: float = 1.0, locked_loras: Optional[List[Dict]] = None, pool_config: Optional[Dict] = None, count_mode: str = "fixed", count_min: int = 3, count_max: int = 7, use_recommended_strength: bool = False, recommended_strength_scale_min: float = 0.5, recommended_strength_scale_max: float = 1.0, seed: Optional[int] = None, ) -> List[Dict]: """ Get random LoRAs with specified strength ranges. Args: count: Number of LoRAs to select (if count_mode='fixed') model_strength_min: Minimum model strength model_strength_max: Maximum model strength use_same_clip_strength: Whether to use same strength for clip clip_strength_min: Minimum clip strength clip_strength_max: Maximum clip strength locked_loras: List of locked LoRA dicts to preserve pool_config: Optional pool config for filtering count_mode: How to determine count ('fixed' or 'range') count_min: Minimum count for range mode count_max: Maximum count for range mode use_recommended_strength: Whether to use recommended strength from usage_tips recommended_strength_scale_min: Minimum scale factor for recommended strength recommended_strength_scale_max: Maximum scale factor for recommended strength seed: Optional random seed for reproducible/unique randomization per execution Returns: List of LoRA dicts with randomized strengths """ import random # Use a local Random instance to avoid affecting global random state # This ensures each execution with a different seed produces different results rng = random.Random(seed) if locked_loras is None: locked_loras = [] # Determine target count based on count_mode if count_mode == "fixed": target_count = count else: target_count = rng.randint(count_min, count_max) # Get available loras from cache cache = await self.scanner.get_cached_data(force_refresh=False) available_loras = cache.raw_data if cache else [] # Apply pool filters if provided if pool_config: available_loras = await self._apply_pool_filters( available_loras, pool_config ) # Calculate slots needed (total - locked) locked_count = len(locked_loras) slots_needed = target_count - locked_count if slots_needed < 0: slots_needed = 0 # Too many locked, trim to target locked_loras = locked_loras[:target_count] # Filter out locked LoRAs from available pool locked_names = { os.path.basename(lora["name"]) if "/" in str(lora.get("name", "")) else lora["name"] for lora in locked_loras } available_pool = [ l for l in available_loras if l["file_name"] not in locked_names ] # Ensure we don't try to select more than available if slots_needed > len(available_pool): slots_needed = len(available_pool) # Random sample selected = [] if slots_needed > 0: selected = rng.sample(available_pool, slots_needed) # Generate random strengths for selected LoRAs result_loras = [] for lora in selected: if use_recommended_strength: recommended_strength = self.get_recommended_strength_from_lora_data( lora ) if recommended_strength is not None: scale = rng.uniform( recommended_strength_scale_min, recommended_strength_scale_max ) model_str = round(recommended_strength * scale, 2) else: model_str = round( rng.uniform(model_strength_min, model_strength_max), 2 ) else: model_str = round( rng.uniform(model_strength_min, model_strength_max), 2 ) if use_same_clip_strength: clip_str = model_str elif use_recommended_strength: recommended_clip_strength = ( self.get_recommended_clip_strength_from_lora_data(lora) ) if recommended_clip_strength is not None: scale = rng.uniform( recommended_strength_scale_min, recommended_strength_scale_max ) clip_str = round(recommended_clip_strength * scale, 2) else: clip_str = round( rng.uniform(clip_strength_min, clip_strength_max), 2 ) else: clip_str = round(rng.uniform(clip_strength_min, clip_strength_max), 2) result_loras.append( { "name": f"{lora['folder']}/{lora['file_name']}" if lora.get("folder") else lora["file_name"], "strength": model_str, "clipStrength": clip_str, "active": True, "expanded": abs(model_str - clip_str) > 0.001, "locked": False, } ) # Merge with locked LoRAs result_loras.extend(locked_loras) return result_loras async def _apply_pool_filters( self, available_loras: List[Dict], pool_config: Dict ) -> List[Dict]: """ Apply pool_config filters to available LoRAs. Args: available_loras: List of all LoRA dicts pool_config: Dict with filter settings from LoRA Pool node Returns: Filtered list of LoRA dicts """ from .model_query import FilterCriteria filter_section = pool_config # Extract filter parameters selected_base_models = filter_section.get("baseModels", []) tags_dict = filter_section.get("tags", {}) include_tags = tags_dict.get("include", []) exclude_tags = tags_dict.get("exclude", []) folders_dict = filter_section.get("folders", {}) include_folders = folders_dict.get("include", []) exclude_folders = folders_dict.get("exclude", []) license_dict = filter_section.get("license", {}) no_credit_required = license_dict.get("noCreditRequired", False) allow_selling = license_dict.get("allowSelling", False) # Build tag filters dict tag_filters = {} for tag in include_tags: tag_filters[tag] = "include" for tag in exclude_tags: tag_filters[tag] = "exclude" # Build folder filter if include_folders or exclude_folders: filtered = [] for lora in available_loras: folder = lora.get("folder", "") # Check exclude folders first excluded = False for exclude_folder in exclude_folders: if folder.startswith(exclude_folder): excluded = True break if excluded: continue # Check include folders if include_folders: included = False for include_folder in include_folders: if folder.startswith(include_folder): included = True break if not included: continue filtered.append(lora) available_loras = filtered # Apply base model filter if selected_base_models: available_loras = [ lora for lora in available_loras if lora.get("base_model") in selected_base_models ] # Apply tag filters if tag_filters: criteria = FilterCriteria(tags=tag_filters) available_loras = self.filter_set.apply(available_loras, criteria) # Apply license filters # no_credit_required=True means keep only models where credit is NOT required # (i.e., allowNoCredit=True, which is bit 0 = 1 in license_flags) if no_credit_required: available_loras = [ lora for lora in available_loras if bool(lora.get("license_flags", 127) & (1 << 0)) ] # allow_selling=True means keep only models where selling generated content is allowed if allow_selling: available_loras = [ lora for lora in available_loras if bool(lora.get("license_flags", 127) & (1 << 1)) ] # Apply name pattern filters name_patterns = filter_section.get("namePatterns", {}) include_patterns = name_patterns.get("include", []) exclude_patterns = name_patterns.get("exclude", []) use_regex = name_patterns.get("useRegex", False) if include_patterns or exclude_patterns: import re def matches_pattern(name, pattern, use_regex): """Check if name matches pattern (regex or substring)""" if not name: return False if use_regex: try: return bool(re.search(pattern, name, re.IGNORECASE)) except re.error: # Invalid regex, fall back to substring match return pattern.lower() in name.lower() else: return pattern.lower() in name.lower() def matches_any_pattern(name, patterns, use_regex): """Check if name matches any of the patterns""" if not patterns: return True return any(matches_pattern(name, p, use_regex) for p in patterns) filtered = [] for lora in available_loras: model_name = lora.get("model_name", "") file_name = lora.get("file_name", "") names_to_check = [n for n in [model_name, file_name] if n] # Check exclude patterns first excluded = False if exclude_patterns: for name in names_to_check: if matches_any_pattern(name, exclude_patterns, use_regex): excluded = True break if excluded: continue # Check include patterns if include_patterns: included = False for name in names_to_check: if matches_any_pattern(name, include_patterns, use_regex): included = True break if not included: continue filtered.append(lora) available_loras = filtered return available_loras async def get_cycler_list( self, pool_config: Optional[Dict] = None, sort_by: str = "filename" ) -> List[Dict]: """ Get filtered and sorted LoRA list for cycling. Args: pool_config: Optional pool config for filtering (filters dict) sort_by: Sort field - 'filename' or 'model_name' Returns: List of LoRA dicts with file_name and model_name """ # Get cached data cache = await self.scanner.get_cached_data(force_refresh=False) available_loras = cache.raw_data if cache else [] # Apply pool filters if provided if pool_config: available_loras = await self._apply_pool_filters( available_loras, pool_config ) # Sort by specified field if sort_by == "model_name": available_loras = sorted( available_loras, key=lambda x: ( (x.get("model_name") or x.get("file_name", "")).lower(), x.get("file_path", "").lower(), ), ) else: # Default to filename available_loras = sorted( available_loras, key=lambda x: ( x.get("file_name", "").lower(), x.get("file_path", "").lower(), ), ) # Return minimal data needed for cycling return [ { "file_name": f"{lora['folder']}/{lora['file_name']}" if lora.get("folder") else lora["file_name"], "model_name": lora.get("model_name", lora["file_name"]), "folder": lora.get("folder", ""), } for lora in available_loras ]