Files
ComfyUI-Lora-Manager/py/services/recipe_scanner.py
Will Miao 4d8113464c perf(recipe_scanner): eliminate event loop blocking during cache rebuild
Refactor force_refresh path to use thread pool execution instead of blocking
the event loop shared with ComfyUI. Key changes:

- Fix 1: Route force_refresh through _initialize_recipe_cache_sync() in thread pool
- Fix 2: Add GIL release points (time.sleep(0)) every 100 files in sync loops
- Fix 3: Move RecipeCache.resort() to thread pool via run_in_executor
- Fix 4: Persist cache automatically after force_refresh
- Fix 5: Increase yield frequency in _enrich_cache_metadata (every recipe)

This eliminates the ~5 minute freeze when rebuilding 30K recipe cache.

Fixes performance issue where ComfyUI became unresponsive during recipe
scanning due to shared Python event loop blocking.
2026-03-04 15:10:46 +08:00

2549 lines
96 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import json
import logging
import os
import time
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple
from ..config import config
from .recipe_cache import RecipeCache
from .recipe_fts_index import RecipeFTSIndex
from .persistent_recipe_cache import (
PersistentRecipeCache,
get_persistent_recipe_cache,
PersistedRecipeData,
)
from .service_registry import ServiceRegistry
from .lora_scanner import LoraScanner
from .metadata_service import get_default_metadata_provider
from .checkpoint_scanner import CheckpointScanner
from .recipes.errors import RecipeNotFoundError
from ..utils.utils import calculate_recipe_fingerprint, fuzzy_match
from natsort import natsorted
import sys
import re
from ..recipes.merger import GenParamsMerger
from ..recipes.enrichment import RecipeEnricher
logger = logging.getLogger(__name__)
class RecipeScanner:
"""Service for scanning and managing recipe images"""
_instance = None
_lock = asyncio.Lock()
@classmethod
async def get_instance(
cls,
lora_scanner: Optional[LoraScanner] = None,
checkpoint_scanner: Optional[CheckpointScanner] = None,
):
"""Get singleton instance of RecipeScanner"""
async with cls._lock:
if cls._instance is None:
if not lora_scanner:
# Get lora scanner from service registry if not provided
lora_scanner = await ServiceRegistry.get_lora_scanner()
if not checkpoint_scanner:
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
cls._instance = cls(lora_scanner, checkpoint_scanner)
return cls._instance
def __new__(
cls,
lora_scanner: Optional[LoraScanner] = None,
checkpoint_scanner: Optional[CheckpointScanner] = None,
):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._lora_scanner = lora_scanner
cls._instance._checkpoint_scanner = checkpoint_scanner
cls._instance._civitai_client = None # Will be lazily initialized
return cls._instance
REPAIR_VERSION = 3
def __init__(
self,
lora_scanner: Optional[LoraScanner] = None,
checkpoint_scanner: Optional[CheckpointScanner] = None,
):
# Ensure initialization only happens once
if not hasattr(self, "_initialized"):
self._cache: Optional[RecipeCache] = None
self._initialization_lock = asyncio.Lock()
self._initialization_task: Optional[asyncio.Task] = None
self._is_initializing = False
self._mutation_lock = asyncio.Lock()
self._post_scan_task: Optional[asyncio.Task] = None
self._resort_tasks: Set[asyncio.Task] = set()
self._cancel_requested = False
# FTS index for fast search
self._fts_index: Optional[RecipeFTSIndex] = None
self._fts_index_task: Optional[asyncio.Task] = None
# Persistent cache for fast startup
self._persistent_cache: Optional[PersistentRecipeCache] = None
self._json_path_map: Dict[str, str] = {} # recipe_id -> json_path
if lora_scanner:
self._lora_scanner = lora_scanner
if checkpoint_scanner:
self._checkpoint_scanner = checkpoint_scanner
self._initialized = True
def on_library_changed(self) -> None:
"""Reset cached state when the active library changes."""
# Cancel any in-flight initialization or resorting work so the next
# access rebuilds the cache for the new library.
if self._initialization_task and not self._initialization_task.done():
self._initialization_task.cancel()
for task in list(self._resort_tasks):
if not task.done():
task.cancel()
self._resort_tasks.clear()
if self._post_scan_task and not self._post_scan_task.done():
self._post_scan_task.cancel()
self._post_scan_task = None
# Cancel FTS index task and clear index
if self._fts_index_task and not self._fts_index_task.done():
self._fts_index_task.cancel()
self._fts_index_task = None
if self._fts_index:
self._fts_index.clear()
self._fts_index = None
# Reset persistent cache instance for new library
self._persistent_cache = None
self._json_path_map = {}
PersistentRecipeCache.clear_instances()
self._cache = None
self._initialization_task = None
self._is_initializing = False
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and not loop.is_closed():
loop.create_task(self.initialize_in_background())
async def _get_civitai_client(self):
"""Lazily initialize CivitaiClient from registry"""
if self._civitai_client is None:
self._civitai_client = await ServiceRegistry.get_civitai_client()
return self._civitai_client
def cancel_task(self) -> None:
"""Request cancellation of the current long-running task."""
self._cancel_requested = True
logger.info("Recipe Scanner: Cancellation requested")
def reset_cancellation(self) -> None:
"""Reset the cancellation flag."""
self._cancel_requested = False
def is_cancelled(self) -> bool:
"""Check if cancellation has been requested."""
return self._cancel_requested
async def repair_all_recipes(
self, progress_callback: Optional[Callable[[Dict], Any]] = None
) -> Dict[str, Any]:
"""Repair all recipes by enrichment with Civitai and embedded metadata.
Args:
persistence_service: Service for saving updated recipes
progress_callback: Optional callback for progress updates
Returns:
Dict summary of repair results
"""
if progress_callback:
await progress_callback({"status": "started"})
async with self._mutation_lock:
cache = await self.get_cached_data()
all_recipes = list(cache.raw_data)
total = len(all_recipes)
repaired_count = 0
skipped_count = 0
errors_count = 0
civitai_client = await self._get_civitai_client()
self.reset_cancellation()
for i, recipe in enumerate(all_recipes):
if self.is_cancelled():
logger.info("Recipe repair cancelled by user")
if progress_callback:
await progress_callback(
{
"status": "cancelled",
"current": i,
"total": total,
"repaired": repaired_count,
"skipped": skipped_count,
"errors": errors_count,
}
)
return {
"success": False,
"status": "cancelled",
"repaired": repaired_count,
"skipped": skipped_count,
"errors": errors_count,
"total": total,
}
try:
# Report progress
if progress_callback:
await progress_callback(
{
"status": "processing",
"current": i + 1,
"total": total,
"recipe_name": recipe.get("name", "Unknown"),
}
)
if await self._repair_single_recipe(recipe, civitai_client):
repaired_count += 1
else:
skipped_count += 1
except Exception as e:
logger.error(
f"Error repairing recipe {recipe.get('file_path')}: {e}"
)
errors_count += 1
# Final progress update
if progress_callback:
await progress_callback(
{
"status": "completed",
"repaired": repaired_count,
"skipped": skipped_count,
"errors": errors_count,
"total": total,
}
)
return {
"success": True,
"repaired": repaired_count,
"skipped": skipped_count,
"errors": errors_count,
"total": total,
}
async def repair_recipe_by_id(self, recipe_id: str) -> Dict[str, Any]:
"""Repair a single recipe by its ID.
Args:
recipe_id: ID of the recipe to repair
Returns:
Dict summary of repair result
"""
async with self._mutation_lock:
# Get raw recipe from cache directly to avoid formatted fields
cache = await self.get_cached_data()
recipe = next(
(r for r in cache.raw_data if str(r.get("id", "")) == recipe_id), None
)
if not recipe:
raise RecipeNotFoundError(f"Recipe {recipe_id} not found")
civitai_client = await self._get_civitai_client()
success = await self._repair_single_recipe(recipe, civitai_client)
# If successfully repaired, we should return the formatted version for the UI
return {
"success": True,
"repaired": 1 if success else 0,
"skipped": 0 if success else 1,
"recipe": await self.get_recipe_by_id(recipe_id) if success else recipe,
}
async def _repair_single_recipe(
self, recipe: Dict[str, Any], civitai_client: Any
) -> bool:
"""Internal helper to repair a single recipe object.
Args:
recipe: The recipe dictionary to repair (modified in-place)
civitai_client: Authenticated Civitai client
Returns:
bool: True if recipe was repaired or updated, False if skipped
"""
# 1. Skip if already at latest repair version
if recipe.get("repair_version", 0) >= self.REPAIR_VERSION:
return False
# 2. Identification: Is repair needed?
has_checkpoint = (
"checkpoint" in recipe
and recipe["checkpoint"]
and recipe["checkpoint"].get("name")
)
gen_params = recipe.get("gen_params", {})
has_prompt = bool(gen_params.get("prompt"))
needs_repair = not has_checkpoint or not has_prompt
if not needs_repair:
# Even if no repair needed, we mark it with version if it was processed
# Always update and save because if we are here, the version is old (checked in step 1)
recipe["repair_version"] = self.REPAIR_VERSION
await self._save_recipe_persistently(recipe)
return True
# 3. Use Enricher to repair/enrich
try:
updated = await RecipeEnricher.enrich_recipe(recipe, civitai_client)
except Exception as e:
logger.error(f"Error enriching recipe {recipe.get('id')}: {e}")
updated = False
# 4. Mark version and save if updated or just marking version
# If we updated it, OR if the version is old (which we know it is if we are here), save it.
# Actually, if we are here and updated is False, it means we tried to repair but couldn't/didn't need to.
# But we still want to mark it as processed so we don't try again until version bump.
if updated or recipe.get("repair_version", 0) < self.REPAIR_VERSION:
recipe["repair_version"] = self.REPAIR_VERSION
await self._save_recipe_persistently(recipe)
return True
return False
async def _save_recipe_persistently(self, recipe: Dict[str, Any]) -> bool:
"""Helper to save a recipe to both JSON and EXIF metadata."""
recipe_id = recipe.get("id")
if not recipe_id:
return False
recipe_json_path = await self.get_recipe_json_path(recipe_id)
if not recipe_json_path:
return False
try:
# 1. Sanitize for storage (remove runtime convenience fields)
clean_recipe = self._sanitize_recipe_for_storage(recipe)
# 2. Update the original dictionary so that we persist the clean version
# globally if needed, effectively overwriting it in-place.
recipe.clear()
recipe.update(clean_recipe)
# 3. Save JSON
with open(recipe_json_path, "w", encoding="utf-8") as f:
json.dump(recipe, f, indent=4, ensure_ascii=False)
# 4. Update persistent SQLite cache
if self._persistent_cache:
self._persistent_cache.update_recipe(recipe, recipe_json_path)
self._json_path_map[str(recipe_id)] = recipe_json_path
# 5. Update EXIF if image exists
image_path = recipe.get("file_path")
if image_path and os.path.exists(image_path):
from ..utils.exif_utils import ExifUtils
ExifUtils.append_recipe_metadata(image_path, recipe)
return True
except Exception as e:
logger.error(f"Error persisting recipe {recipe_id}: {e}")
return False
def _sanitize_recipe_for_storage(self, recipe: Dict[str, Any]) -> Dict[str, Any]:
"""Create a clean copy of the recipe without runtime convenience fields."""
import copy
clean = copy.deepcopy(recipe)
# 0. Clean top-level runtime fields
for key in ("file_url", "created_date_formatted", "modified_formatted"):
clean.pop(key, None)
# 1. Clean LORAs
if "loras" in clean and isinstance(clean["loras"], list):
for lora in clean["loras"]:
# Fields to remove (runtime only)
for key in ("inLibrary", "preview_url", "localPath"):
lora.pop(key, None)
# Normalize weight/strength if mapping is desired (standard in persistence_service)
if "weight" in lora and "strength" not in lora:
lora["strength"] = float(lora.pop("weight"))
# 2. Clean Checkpoint
if "checkpoint" in clean and isinstance(clean["checkpoint"], dict):
cp = clean["checkpoint"]
# Fields to remove (runtime only)
for key in (
"inLibrary",
"localPath",
"preview_url",
"thumbnailUrl",
"size",
"downloadUrl",
):
cp.pop(key, None)
return clean
async def initialize_in_background(self) -> None:
"""Initialize cache in background using thread pool"""
try:
await self._wait_for_lora_scanner()
# Set initial empty cache to avoid None reference errors
if self._cache is None:
self._cache = RecipeCache(
raw_data=[],
sorted_by_name=[],
sorted_by_date=[],
folders=[],
folder_tree={},
)
# Mark as initializing to prevent concurrent initializations
self._is_initializing = True
self._initialization_task = asyncio.current_task()
try:
# Start timer
start_time = time.time()
# Use thread pool to execute CPU-intensive operations
loop = asyncio.get_event_loop()
cache = await loop.run_in_executor(
None, # Use default thread pool
self._initialize_recipe_cache_sync, # Run synchronous version in thread
)
if cache is not None:
self._cache = cache
# Calculate elapsed time and log it
elapsed_time = time.time() - start_time
recipe_count = (
len(cache.raw_data) if cache and hasattr(cache, "raw_data") else 0
)
logger.info(
f"Recipe cache initialized in {elapsed_time:.2f} seconds. Found {recipe_count} recipes"
)
self._schedule_post_scan_enrichment()
# Schedule FTS index build in background (non-blocking)
self._schedule_fts_index_build()
finally:
# Mark initialization as complete regardless of outcome
self._is_initializing = False
except Exception as e:
logger.error(f"Recipe Scanner: Error initializing cache in background: {e}")
def _initialize_recipe_cache_sync(self):
"""Synchronous version of recipe cache initialization for thread pool execution.
Uses persistent cache for fast startup when available:
1. Try to load from persistent SQLite cache
2. Reconcile with filesystem (check mtime/size for changes)
3. Fall back to full directory scan if cache miss or reconciliation fails
4. Persist results for next startup
"""
try:
# Ensure cache exists to avoid None reference errors
if self._cache is None:
self._cache = RecipeCache(
raw_data=[],
sorted_by_name=[],
sorted_by_date=[],
folders=[],
folder_tree={},
)
# Create a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Initialize persistent cache
if self._persistent_cache is None:
self._persistent_cache = get_persistent_recipe_cache()
recipes_dir = self.recipes_dir
if not recipes_dir or not os.path.exists(recipes_dir):
logger.warning(f"Recipes directory not found: {recipes_dir}")
return self._cache
# Try to load from persistent cache first
persisted = self._persistent_cache.load_cache()
if persisted:
recipes, changed, json_paths = self._reconcile_recipe_cache(
persisted, recipes_dir
)
self._json_path_map = json_paths
if not changed:
# Fast path: use cached data directly
logger.info(
"Recipe cache hit: loaded %d recipes from persistent cache",
len(recipes),
)
self._cache.raw_data = recipes
self._update_folder_metadata(self._cache)
self._sort_cache_sync()
return self._cache
else:
# Partial update: some files changed
logger.info(
"Recipe cache partial hit: reconciled %d recipes with filesystem",
len(recipes),
)
self._cache.raw_data = recipes
self._update_folder_metadata(self._cache)
self._sort_cache_sync()
# Persist updated cache
self._persistent_cache.save_cache(recipes, json_paths)
return self._cache
# Fall back to full directory scan
logger.info("Recipe cache miss: performing full directory scan")
recipes, json_paths = self._full_directory_scan_sync(recipes_dir)
self._json_path_map = json_paths
# Update cache with the collected data
self._cache.raw_data = recipes
self._update_folder_metadata(self._cache)
self._sort_cache_sync()
# Persist for next startup
self._persistent_cache.save_cache(recipes, json_paths)
return self._cache
except Exception as e:
logger.error(f"Error in thread-based recipe cache initialization: {e}")
import traceback
traceback.print_exc(file=sys.stderr)
return self._cache if hasattr(self, "_cache") else None
finally:
# Clean up the event loop
loop.close()
def _reconcile_recipe_cache(
self,
persisted: PersistedRecipeData,
recipes_dir: str,
) -> Tuple[List[Dict], bool, Dict[str, str]]:
"""Reconcile persisted cache with current filesystem state.
Args:
persisted: The persisted recipe data from SQLite cache.
recipes_dir: Path to the recipes directory.
Returns:
Tuple of (recipes list, changed flag, json_paths dict).
"""
recipes: List[Dict] = []
json_paths: Dict[str, str] = {}
changed = False
# Build set of current recipe files
current_files: Dict[str, Tuple[float, int]] = {}
for root, _, files in os.walk(recipes_dir):
for file in files:
if file.lower().endswith(".recipe.json"):
file_path = os.path.join(root, file)
try:
stat = os.stat(file_path)
current_files[file_path] = (stat.st_mtime, stat.st_size)
except OSError:
continue
# Build recipe_id -> recipe lookup (O(n) instead of O(n²))
recipe_by_id: Dict[str, Dict] = {
str(r.get("id", "")): r for r in persisted.raw_data if r.get("id")
}
# Build json_path -> recipe lookup from file_stats (O(m))
persisted_by_path: Dict[str, Dict] = {}
for json_path in persisted.file_stats.keys():
basename = os.path.basename(json_path)
if basename.lower().endswith(".recipe.json"):
recipe_id = basename[: -len(".recipe.json")]
if recipe_id in recipe_by_id:
persisted_by_path[json_path] = recipe_by_id[recipe_id]
# Process current files
for file_idx, (file_path, (current_mtime, current_size)) in enumerate(
current_files.items()
):
cached_stats = persisted.file_stats.get(file_path)
# Extract recipe_id from current file for fallback lookup
basename = os.path.basename(file_path)
recipe_id_from_file = (
basename[: -len(".recipe.json")]
if basename.lower().endswith(".recipe.json")
else None
)
if cached_stats:
cached_mtime, cached_size = cached_stats
# Check if file is unchanged
if (
abs(current_mtime - cached_mtime) < 1.0
and current_size == cached_size
):
# Try direct path lookup first
cached_recipe = persisted_by_path.get(file_path)
# Fallback to recipe_id lookup if path lookup fails
if not cached_recipe and recipe_id_from_file:
cached_recipe = recipe_by_id.get(recipe_id_from_file)
if cached_recipe:
recipe_id = str(cached_recipe.get("id", ""))
# Track folder from file path
cached_recipe["folder"] = cached_recipe.get(
"folder"
) or self._calculate_folder(file_path)
recipes.append(cached_recipe)
json_paths[recipe_id] = file_path
continue
# File is new or changed - need to re-read
changed = True
recipe_data = self._load_recipe_file_sync(file_path)
if recipe_data:
recipe_id = str(recipe_data.get("id", ""))
recipes.append(recipe_data)
json_paths[recipe_id] = file_path
# Periodically release GIL so the event loop thread can run
if file_idx % 100 == 0:
time.sleep(0)
# Check for deleted files
for json_path in persisted.file_stats.keys():
if json_path not in current_files:
changed = True
logger.debug("Recipe file deleted: %s", json_path)
return recipes, changed, json_paths
def _full_directory_scan_sync(
self, recipes_dir: str
) -> Tuple[List[Dict], Dict[str, str]]:
"""Perform a full synchronous directory scan for recipes.
Args:
recipes_dir: Path to the recipes directory.
Returns:
Tuple of (recipes list, json_paths dict).
"""
recipes: List[Dict] = []
json_paths: Dict[str, str] = {}
# Get all recipe JSON files
recipe_files = []
for root, _, files in os.walk(recipes_dir):
for file in files:
if file.lower().endswith(".recipe.json"):
recipe_files.append(os.path.join(root, file))
# Process each recipe file
for i, recipe_path in enumerate(recipe_files):
recipe_data = self._load_recipe_file_sync(recipe_path)
if recipe_data:
recipe_id = str(recipe_data.get("id", ""))
recipes.append(recipe_data)
json_paths[recipe_id] = recipe_path
# Periodically release GIL so the event loop thread can run
if i % 100 == 0:
time.sleep(0)
return recipes, json_paths
def _load_recipe_file_sync(self, recipe_path: str) -> Optional[Dict]:
"""Load a single recipe file synchronously.
Args:
recipe_path: Path to the recipe JSON file.
Returns:
Recipe dictionary if valid, None otherwise.
"""
try:
with open(recipe_path, "r", encoding="utf-8") as f:
recipe_data = json.load(f)
# Validate recipe data
if not recipe_data or not isinstance(recipe_data, dict):
logger.warning(f"Invalid recipe data in {recipe_path}")
return None
# Ensure required fields exist
required_fields = ["id", "file_path", "title"]
if not all(field in recipe_data for field in required_fields):
logger.warning(f"Missing required fields in {recipe_path}")
return None
# Ensure the image file exists and prioritize local siblings
image_path = recipe_data.get("file_path")
path_updated = False
if image_path:
recipe_dir = os.path.dirname(recipe_path)
image_filename = os.path.basename(image_path)
local_sibling_path = os.path.normpath(
os.path.join(recipe_dir, image_filename)
)
# If local sibling exists and stored path is different, prefer local
if (
os.path.exists(local_sibling_path)
and os.path.normpath(image_path) != local_sibling_path
):
recipe_data["file_path"] = local_sibling_path
path_updated = True
logger.info(
f"Updated recipe image path to local sibling: {local_sibling_path}"
)
elif not os.path.exists(image_path):
logger.warning(
f"Recipe image not found and no local sibling: {image_path}"
)
if path_updated:
try:
with open(recipe_path, "w", encoding="utf-8") as f:
json.dump(recipe_data, f, indent=4, ensure_ascii=False)
except Exception as e:
logger.warning(f"Failed to persist repair for {recipe_path}: {e}")
# Track folder placement relative to recipes directory
recipe_data["folder"] = recipe_data.get("folder") or self._calculate_folder(
recipe_path
)
# Ensure loras array exists
if "loras" not in recipe_data:
recipe_data["loras"] = []
# Ensure gen_params exists
if "gen_params" not in recipe_data:
recipe_data["gen_params"] = {}
return recipe_data
except Exception as e:
logger.error(f"Error loading recipe file {recipe_path}: {e}")
import traceback
traceback.print_exc(file=sys.stderr)
return None
def _sort_cache_sync(self) -> None:
"""Sort cache data synchronously."""
try:
# Sort by name
self._cache.sorted_by_name = natsorted(
self._cache.raw_data, key=lambda x: x.get("title", "").lower()
)
# Sort by date (modified or created)
self._cache.sorted_by_date = sorted(
self._cache.raw_data,
key=lambda x: (
x.get("modified", x.get("created_date", 0)),
x.get("file_path", ""),
),
reverse=True,
)
except Exception as e:
logger.error(f"Error sorting recipe cache: {e}")
async def _wait_for_lora_scanner(self) -> None:
"""Ensure the LoRA scanner has initialized before recipe enrichment."""
if not getattr(self, "_lora_scanner", None):
return
lora_scanner = self._lora_scanner
cache_ready = getattr(lora_scanner, "_cache", None) is not None
# If cache is already available, we can proceed
if cache_ready:
return
# Await an existing initialization task if present
task = getattr(lora_scanner, "_initialization_task", None)
if task and hasattr(task, "done") and not task.done():
try:
await task
except Exception: # pragma: no cover - defensive guard
pass
if getattr(lora_scanner, "_cache", None) is not None:
return
# Otherwise, request initialization and proceed once it completes
try:
await lora_scanner.initialize_in_background()
except Exception as exc: # pragma: no cover - defensive guard
logger.debug("Recipe Scanner: LoRA init request failed: %s", exc)
def _schedule_post_scan_enrichment(self) -> None:
"""Kick off a non-blocking enrichment pass to fill remote metadata."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
if self._post_scan_task and not self._post_scan_task.done():
return
async def _run_enrichment():
try:
await self._enrich_cache_metadata()
except asyncio.CancelledError:
raise
except Exception as exc: # pragma: no cover - defensive guard
logger.error(
"Recipe Scanner: error during post-scan enrichment: %s",
exc,
exc_info=True,
)
self._post_scan_task = loop.create_task(
_run_enrichment(), name="recipe_cache_enrichment"
)
def _schedule_fts_index_build(self) -> None:
"""Build FTS index in background without blocking.
Validates existing index first and reuses it if valid.
"""
if self._fts_index_task and not self._fts_index_task.done():
return # Already running
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
async def _build_fts():
if self._cache is None:
return
try:
self._fts_index = RecipeFTSIndex()
# Check if existing index is valid
recipe_ids = {
str(r.get("id", "")) for r in self._cache.raw_data if r.get("id")
}
recipe_count = len(self._cache.raw_data)
# Run validation in thread pool
is_valid = await loop.run_in_executor(
None, self._fts_index.validate_index, recipe_count, recipe_ids
)
if is_valid:
logger.info(
"FTS index validated, reusing existing index with %d recipes",
recipe_count,
)
self._fts_index._ready.set()
return
# Only rebuild if validation fails
logger.info("FTS index invalid or outdated, rebuilding...")
await loop.run_in_executor(
None, self._fts_index.build_index, self._cache.raw_data
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error(
"Recipe Scanner: error building FTS index: %s", exc, exc_info=True
)
self._fts_index_task = loop.create_task(
_build_fts(), name="recipe_fts_index_build"
)
def _search_with_fts(self, search: str, search_options: Dict) -> Optional[Set[str]]:
"""Search recipes using FTS index if available.
Args:
search: The search query string.
search_options: Dictionary of search options (title, tags, lora_name, lora_model, prompt).
Returns:
Set of matching recipe IDs if FTS is available and search succeeded,
None if FTS is not ready (caller should fall back to fuzzy search).
"""
if not self._fts_index or not self._fts_index.is_ready():
return None
# Build the set of fields to search based on search_options
fields: Set[str] = set()
if search_options.get("title", True):
fields.add("title")
if search_options.get("tags", True):
fields.add("tags")
if search_options.get("lora_name", True):
fields.add("lora_name")
if search_options.get("lora_model", True):
fields.add("lora_model")
if search_options.get("prompt", False): # prompt search is opt-in by default
fields.add("prompt")
# If no fields enabled, search all fields
if not fields:
fields = None
try:
result = self._fts_index.search(search, fields)
# Return None if empty to trigger fuzzy fallback
# Empty FTS results may indicate query syntax issues or need for fuzzy matching
if not result:
return None
return result
except Exception as exc:
logger.debug("FTS search failed, falling back to fuzzy search: %s", exc)
return None
def _update_fts_index_for_recipe(
self, recipe: Dict[str, Any], operation: str = "add"
) -> None:
"""Update FTS index for a single recipe (add, update, or remove).
Args:
recipe: The recipe dictionary.
operation: One of 'add', 'update', or 'remove'.
"""
if not self._fts_index or not self._fts_index.is_ready():
return
try:
if operation == "remove":
recipe_id = (
str(recipe.get("id", ""))
if isinstance(recipe, dict)
else str(recipe)
)
self._fts_index.remove_recipe(recipe_id)
elif operation in ("add", "update"):
self._fts_index.update_recipe(recipe)
except Exception as exc:
logger.debug("Failed to update FTS index for recipe: %s", exc)
async def _enrich_cache_metadata(self) -> None:
"""Perform remote metadata enrichment after the initial scan."""
cache = self._cache
if cache is None or not getattr(cache, "raw_data", None):
return
for index, recipe in enumerate(list(cache.raw_data)):
try:
metadata_updated = await self._update_lora_information(recipe)
if metadata_updated:
recipe_id = recipe.get("id")
if recipe_id:
recipe_path = os.path.join(
self.recipes_dir, f"{recipe_id}.recipe.json"
)
if os.path.exists(recipe_path):
try:
self._write_recipe_file(recipe_path, recipe)
except (
Exception
) as exc: # pragma: no cover - best-effort persistence
logger.debug(
"Recipe Scanner: could not persist recipe %s: %s",
recipe_id,
exc,
)
except asyncio.CancelledError:
raise
except Exception as exc: # pragma: no cover - defensive logging
logger.error(
"Recipe Scanner: error enriching recipe %s: %s",
recipe.get("id"),
exc,
exc_info=True,
)
await asyncio.sleep(0)
try:
await cache.resort()
except Exception as exc: # pragma: no cover - defensive logging
logger.debug(
"Recipe Scanner: error resorting cache after enrichment: %s", exc
)
def _schedule_resort(self, *, name_only: bool = False) -> None:
"""Schedule a background resort of the recipe cache."""
if not self._cache:
return
# Keep folder metadata up to date alongside sort order
self._update_folder_metadata()
async def _resort_wrapper() -> None:
try:
await self._cache.resort(name_only=name_only)
except Exception as exc: # pragma: no cover - defensive logging
logger.error(
"Recipe Scanner: error resorting cache: %s", exc, exc_info=True
)
task = asyncio.create_task(_resort_wrapper())
self._resort_tasks.add(task)
task.add_done_callback(lambda finished: self._resort_tasks.discard(finished))
def _calculate_folder(self, recipe_path: str) -> str:
"""Calculate a normalized folder path relative to ``recipes_dir``."""
recipes_dir = self.recipes_dir
if not recipes_dir:
return ""
try:
recipe_dir = os.path.dirname(os.path.normpath(recipe_path))
relative_dir = os.path.relpath(recipe_dir, recipes_dir)
if relative_dir in (".", ""):
return ""
return relative_dir.replace(os.path.sep, "/")
except Exception:
return ""
def _build_folder_tree(self, folders: list[str]) -> dict:
"""Build a nested folder tree structure from relative folder paths."""
tree: dict[str, dict] = {}
for folder in folders:
if not folder:
continue
parts = folder.split("/")
current_level = tree
for part in parts:
if part not in current_level:
current_level[part] = {}
current_level = current_level[part]
return tree
def _update_folder_metadata(self, cache: RecipeCache | None = None) -> None:
"""Ensure folder lists and tree metadata are synchronized with cache contents."""
cache = cache or self._cache
if cache is None:
return
folders: set[str] = set()
for item in cache.raw_data:
folder_value = item.get("folder", "")
if folder_value is None:
folder_value = ""
if folder_value == ".":
folder_value = ""
normalized = str(folder_value).replace("\\", "/")
item["folder"] = normalized
folders.add(normalized)
cache.folders = sorted(folders, key=lambda entry: entry.lower())
cache.folder_tree = self._build_folder_tree(cache.folders)
async def get_folders(self) -> list[str]:
"""Return a sorted list of recipe folders relative to the recipes root."""
cache = await self.get_cached_data()
self._update_folder_metadata(cache)
return cache.folders
async def get_folder_tree(self) -> dict:
"""Return a hierarchical tree of recipe folders for sidebar navigation."""
cache = await self.get_cached_data()
self._update_folder_metadata(cache)
return cache.folder_tree
@property
def recipes_dir(self) -> str:
"""Get path to recipes directory"""
if not config.loras_roots:
return ""
# config.loras_roots already sorted case-insensitively, use the first one
recipes_dir = os.path.join(config.loras_roots[0], "recipes")
os.makedirs(recipes_dir, exist_ok=True)
return recipes_dir
async def get_cached_data(self, force_refresh: bool = False) -> RecipeCache:
"""Get cached recipe data, refresh if needed"""
# If cache is already initialized and no refresh is needed, return it immediately
if self._cache is not None and not force_refresh:
self._update_folder_metadata()
return self._cache
# If another initialization is already in progress, wait for it to complete
if self._is_initializing and not force_refresh:
return self._cache or RecipeCache(
raw_data=[],
sorted_by_name=[],
sorted_by_date=[],
folders=[],
folder_tree={},
)
# If force refresh is requested, re-scan in a thread pool to avoid
# blocking the event loop (which is shared with ComfyUI).
if force_refresh:
try:
async with self._initialization_lock:
self._is_initializing = True
try:
# Invalidate persistent cache so the sync path does a
# full directory scan instead of reconciling stale data.
if self._persistent_cache:
self._persistent_cache.save_cache([], {})
self._json_path_map = {}
start_time = time.time()
# Run the heavy lifting in a thread pool same path
# used by initialize_in_background().
loop = asyncio.get_event_loop()
cache = await loop.run_in_executor(
None,
self._initialize_recipe_cache_sync,
)
if cache is not None:
self._cache = cache
elapsed = time.time() - start_time
count = len(self._cache.raw_data) if self._cache else 0
logger.info(
"Recipe cache force-refreshed in %.2f seconds. "
"Found %d recipes",
elapsed,
count,
)
# Schedule non-blocking background work
self._schedule_post_scan_enrichment()
self._schedule_fts_index_build()
return self._cache
except Exception as e:
logger.error(
f"Recipe Manager: Error initializing cache: {e}",
exc_info=True,
)
self._cache = RecipeCache(
raw_data=[],
sorted_by_name=[],
sorted_by_date=[],
folders=[],
folder_tree={},
)
return self._cache
finally:
self._is_initializing = False
except Exception as e:
logger.error(f"Unexpected error in get_cached_data: {e}")
# Return the cache (may be empty or partially initialized)
return self._cache or RecipeCache(
raw_data=[],
sorted_by_name=[],
sorted_by_date=[],
folders=[],
folder_tree={},
)
async def refresh_cache(self, force: bool = False) -> RecipeCache:
"""Public helper to refresh or return the recipe cache."""
return await self.get_cached_data(force_refresh=force)
async def add_recipe(self, recipe_data: Dict[str, Any]) -> None:
"""Add a recipe to the in-memory cache."""
if not recipe_data:
return
cache = await self.get_cached_data()
await cache.add_recipe(recipe_data, resort=False)
self._update_folder_metadata(cache)
self._schedule_resort()
# Update FTS index
self._update_fts_index_for_recipe(recipe_data, "add")
# Persist to SQLite cache
if self._persistent_cache:
recipe_id = str(recipe_data.get("id", ""))
json_path = self._json_path_map.get(recipe_id, "")
self._persistent_cache.update_recipe(recipe_data, json_path)
async def remove_recipe(self, recipe_id: str) -> bool:
"""Remove a recipe from the cache by ID."""
if not recipe_id:
return False
cache = await self.get_cached_data()
removed = await cache.remove_recipe(recipe_id, resort=False)
if removed is None:
return False
self._update_folder_metadata(cache)
self._schedule_resort()
# Update FTS index
self._update_fts_index_for_recipe(recipe_id, "remove")
# Remove from SQLite cache
if self._persistent_cache:
self._persistent_cache.remove_recipe(recipe_id)
self._json_path_map.pop(recipe_id, None)
return True
async def bulk_remove(self, recipe_ids: Iterable[str]) -> int:
"""Remove multiple recipes from the cache."""
cache = await self.get_cached_data()
removed = await cache.bulk_remove(recipe_ids, resort=False)
if removed:
self._schedule_resort()
# Update FTS index and persistent cache for each removed recipe
for recipe in removed:
recipe_id = str(recipe.get("id", ""))
self._update_fts_index_for_recipe(recipe_id, "remove")
if self._persistent_cache:
self._persistent_cache.remove_recipe(recipe_id)
self._json_path_map.pop(recipe_id, None)
return len(removed)
async def scan_all_recipes(self) -> List[Dict]:
"""Scan all recipe JSON files and return metadata"""
recipes = []
recipes_dir = self.recipes_dir
if not recipes_dir or not os.path.exists(recipes_dir):
logger.warning(f"Recipes directory not found: {recipes_dir}")
return recipes
# Get all recipe JSON files in the recipes directory
recipe_files = []
for root, _, files in os.walk(recipes_dir):
recipe_count = sum(1 for f in files if f.lower().endswith(".recipe.json"))
if recipe_count > 0:
for file in files:
if file.lower().endswith(".recipe.json"):
recipe_files.append(os.path.join(root, file))
# Process each recipe file
for recipe_path in recipe_files:
recipe_data = await self._load_recipe_file(recipe_path)
if recipe_data:
recipes.append(recipe_data)
return recipes
async def _load_recipe_file(self, recipe_path: str) -> Optional[Dict]:
"""Load recipe data from a JSON file"""
try:
with open(recipe_path, "r", encoding="utf-8") as f:
recipe_data = json.load(f)
# Validate recipe data
if not recipe_data or not isinstance(recipe_data, dict):
logger.warning(f"Invalid recipe data in {recipe_path}")
return None
# Ensure required fields exist
required_fields = ["id", "file_path", "title"]
for field in required_fields:
if field not in recipe_data:
logger.warning(f"Missing required field '{field}' in {recipe_path}")
return None
# Ensure the image file exists and prioritize local siblings
image_path = recipe_data.get("file_path")
path_updated = False
if image_path:
recipe_dir = os.path.dirname(recipe_path)
image_filename = os.path.basename(image_path)
local_sibling_path = os.path.normpath(
os.path.join(recipe_dir, image_filename)
)
# If local sibling exists and stored path is different, prefer local
if (
os.path.exists(local_sibling_path)
and os.path.normpath(image_path) != local_sibling_path
):
recipe_data["file_path"] = local_sibling_path
image_path = local_sibling_path
path_updated = True
logger.info(
"Updated recipe image path to local sibling: %s",
local_sibling_path,
)
elif not os.path.exists(image_path):
logger.warning(
f"Recipe image not found and no local sibling: {image_path}"
)
if path_updated:
self._write_recipe_file(recipe_path, recipe_data)
# Track folder placement relative to recipes directory
recipe_data["folder"] = recipe_data.get("folder") or self._calculate_folder(
recipe_path
)
# Ensure loras array exists
if "loras" not in recipe_data:
recipe_data["loras"] = []
# Ensure gen_params exists
if "gen_params" not in recipe_data:
recipe_data["gen_params"] = {}
# Update lora information with local paths and availability
lora_metadata_updated = await self._update_lora_information(recipe_data)
if recipe_data.get("checkpoint"):
checkpoint_entry = self._normalize_checkpoint_entry(
recipe_data["checkpoint"]
)
if checkpoint_entry:
recipe_data["checkpoint"] = self._enrich_checkpoint_entry(
checkpoint_entry
)
else:
logger.warning(
"Dropping invalid checkpoint entry in %s", recipe_path
)
recipe_data.pop("checkpoint", None)
# Calculate and update fingerprint if missing
if "loras" in recipe_data and "fingerprint" not in recipe_data:
fingerprint = calculate_recipe_fingerprint(recipe_data["loras"])
recipe_data["fingerprint"] = fingerprint
# Write updated recipe data back to file
try:
self._write_recipe_file(recipe_path, recipe_data)
logger.info(f"Added fingerprint to recipe: {recipe_path}")
except Exception as e:
logger.error(f"Error writing updated recipe with fingerprint: {e}")
elif lora_metadata_updated:
# Persist updates such as marking invalid entries as deleted
try:
self._write_recipe_file(recipe_path, recipe_data)
except Exception as e:
logger.error(f"Error writing updated recipe metadata: {e}")
return recipe_data
except Exception as e:
logger.error(f"Error loading recipe file {recipe_path}: {e}")
import traceback
traceback.print_exc(file=sys.stderr)
return None
@staticmethod
def _write_recipe_file(recipe_path: str, recipe_data: Dict[str, Any]) -> None:
"""Persist ``recipe_data`` back to ``recipe_path`` with standard formatting."""
with open(recipe_path, "w", encoding="utf-8") as file_obj:
json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False)
async def _update_lora_information(self, recipe_data: Dict) -> bool:
"""Update LoRA information with hash and file_name
Returns:
bool: True if metadata was updated
"""
if not recipe_data.get("loras"):
return False
metadata_updated = False
for lora in recipe_data["loras"]:
# Skip deleted loras that were already marked
if lora.get("isDeleted", False):
continue
# Skip if already has complete information
if "hash" in lora and "file_name" in lora and lora["file_name"]:
continue
# If has modelVersionId but no hash, look in lora cache first, then fetch from Civitai
if "modelVersionId" in lora and not lora.get("hash"):
model_version_id = lora["modelVersionId"]
# Check if model_version_id is an integer and > 0
if isinstance(model_version_id, int) and model_version_id > 0:
# Try to find in lora cache first
hash_from_cache = await self._find_hash_in_lora_cache(
model_version_id
)
if hash_from_cache:
lora["hash"] = hash_from_cache
metadata_updated = True
else:
# If not in cache, fetch from Civitai
result = await self._get_hash_from_civitai(model_version_id)
if isinstance(result, tuple):
hash_from_civitai, is_deleted = result
if hash_from_civitai:
lora["hash"] = hash_from_civitai
metadata_updated = True
elif is_deleted:
# Mark the lora as deleted if it was not found on Civitai
lora["isDeleted"] = True
logger.warning(
f"Marked lora with modelVersionId {model_version_id} as deleted"
)
metadata_updated = True
else:
# No hash returned; mark as deleted to avoid repeated lookups
lora["isDeleted"] = True
metadata_updated = True
logger.warning(
"Marked lora with modelVersionId %s as deleted after failed hash lookup",
model_version_id,
)
# If has hash but no file_name, look up in lora library
if "hash" in lora and (not lora.get("file_name") or not lora["file_name"]):
hash_value = lora["hash"]
if self._lora_scanner.has_hash(hash_value):
lora_path = self._lora_scanner.get_path_by_hash(hash_value)
if lora_path:
file_name = os.path.splitext(os.path.basename(lora_path))[0]
lora["file_name"] = file_name
metadata_updated = True
else:
# Lora not in library
lora["file_name"] = ""
metadata_updated = True
return metadata_updated
async def _find_hash_in_lora_cache(self, model_version_id: str) -> Optional[str]:
"""Find hash in lora cache based on modelVersionId"""
try:
# Get all loras from cache
if not self._lora_scanner:
return None
cache = await self._lora_scanner.get_cached_data()
if not cache or not cache.raw_data:
return None
# Find lora with matching civitai.id
for lora in cache.raw_data:
civitai_data = lora.get("civitai", {})
if civitai_data and str(civitai_data.get("id", "")) == str(
model_version_id
):
return lora.get("sha256")
return None
except Exception as e:
logger.error(f"Error finding hash in lora cache: {e}")
return None
async def _get_hash_from_civitai(self, model_version_id: str) -> Optional[str]:
"""Get hash from Civitai API"""
try:
# Get metadata provider instead of civitai client directly
metadata_provider = await get_default_metadata_provider()
if not metadata_provider:
logger.error("Failed to get metadata provider")
return None
version_info, error_msg = await metadata_provider.get_model_version_info(
model_version_id
)
if not version_info:
if error_msg and "model not found" in error_msg.lower():
logger.warning(
f"Model with version ID {model_version_id} was not found on Civitai - marking as deleted"
)
return None, True # Return None hash and True for isDeleted flag
else:
logger.debug(
f"Could not get hash for modelVersionId {model_version_id}: {error_msg}"
)
return None, False # Return None hash but not marked as deleted
# Get hash from the first file
for file_info in version_info.get("files", []):
sha256_hash = (file_info.get("hashes") or {}).get("SHA256")
if sha256_hash:
return (
sha256_hash,
False,
) # Return hash with False for isDeleted flag
logger.debug(
f"No SHA256 hash found in version info for ID: {model_version_id}"
)
return None, False
except Exception as e:
logger.error(f"Error getting hash from Civitai: {e}")
return None, False
def _get_lora_from_version_index(
self, model_version_id: Any
) -> Optional[Dict[str, Any]]:
"""Quickly fetch a cached LoRA entry by modelVersionId using the version index."""
if not self._lora_scanner:
return None
cache = getattr(self._lora_scanner, "_cache", None)
if cache is None:
return None
version_index = getattr(cache, "version_index", None)
if not version_index:
return None
try:
normalized_id = int(model_version_id)
except (TypeError, ValueError):
return None
return version_index.get(normalized_id)
def _get_checkpoint_from_version_index(
self, model_version_id: Any
) -> Optional[Dict[str, Any]]:
"""Fetch a cached checkpoint entry by version id."""
if not self._checkpoint_scanner:
return None
cache = getattr(self._checkpoint_scanner, "_cache", None)
if cache is None:
return None
version_index = getattr(cache, "version_index", None)
if not version_index:
return None
try:
normalized_id = int(model_version_id)
except (TypeError, ValueError):
return None
return version_index.get(normalized_id)
async def _determine_base_model(self, loras: List[Dict]) -> Optional[str]:
"""Determine the most common base model among LoRAs"""
base_models = {}
# Count occurrences of each base model
for lora in loras:
if "hash" in lora:
lora_path = self._lora_scanner.get_path_by_hash(lora["hash"])
if lora_path:
base_model = await self._get_base_model_for_lora(lora_path)
if base_model:
base_models[base_model] = base_models.get(base_model, 0) + 1
# Return the most common base model
if base_models:
return max(base_models.items(), key=lambda x: x[1])[0]
return None
async def _get_base_model_for_lora(self, lora_path: str) -> Optional[str]:
"""Get base model for a LoRA from cache"""
try:
if not self._lora_scanner:
return None
cache = await self._lora_scanner.get_cached_data()
if not cache or not cache.raw_data:
return None
# Find matching lora in cache
for lora in cache.raw_data:
if lora.get("file_path") == lora_path:
return lora.get("base_model")
return None
except Exception as e:
logger.error(f"Error getting base model for lora: {e}")
return None
def _normalize_checkpoint_entry(
self, checkpoint_raw: Any
) -> Optional[Dict[str, Any]]:
"""Coerce legacy or malformed checkpoint entries into a dict."""
if isinstance(checkpoint_raw, dict):
return dict(checkpoint_raw)
if isinstance(checkpoint_raw, (list, tuple)) and len(checkpoint_raw) == 1:
return self._normalize_checkpoint_entry(checkpoint_raw[0])
if isinstance(checkpoint_raw, str):
name = checkpoint_raw.strip()
if not name:
return None
file_name = os.path.splitext(os.path.basename(name))[0]
return {
"name": name,
"file_name": file_name,
}
logger.warning(
"Unexpected checkpoint payload type %s", type(checkpoint_raw).__name__
)
return None
def _enrich_checkpoint_entry(self, checkpoint: Dict[str, Any]) -> Dict[str, Any]:
"""Populate convenience fields for a checkpoint entry."""
if (
not checkpoint
or not isinstance(checkpoint, dict)
or not self._checkpoint_scanner
):
return checkpoint
hash_value = (checkpoint.get("hash") or "").lower()
version_entry = None
model_version_id = checkpoint.get("id") or checkpoint.get("modelVersionId")
if not hash_value and model_version_id is not None:
version_entry = self._get_checkpoint_from_version_index(model_version_id)
try:
preview_url = checkpoint.get("preview_url") or checkpoint.get(
"thumbnailUrl"
)
if preview_url:
checkpoint["preview_url"] = self._normalize_preview_url(preview_url)
if hash_value:
checkpoint["inLibrary"] = self._checkpoint_scanner.has_hash(hash_value)
checkpoint["preview_url"] = self._normalize_preview_url(
checkpoint.get("preview_url")
or self._checkpoint_scanner.get_preview_url_by_hash(hash_value)
)
checkpoint["localPath"] = self._checkpoint_scanner.get_path_by_hash(
hash_value
)
elif version_entry:
checkpoint["inLibrary"] = True
cached_path = version_entry.get("file_path") or version_entry.get(
"path"
)
if cached_path:
checkpoint.setdefault("localPath", cached_path)
if not checkpoint.get("file_name"):
checkpoint["file_name"] = os.path.splitext(
os.path.basename(cached_path)
)[0]
if version_entry.get("sha256") and not checkpoint.get("hash"):
checkpoint["hash"] = version_entry.get("sha256")
preview_url = self._normalize_preview_url(
version_entry.get("preview_url")
)
if preview_url:
checkpoint.setdefault("preview_url", preview_url)
if version_entry.get("model_type"):
checkpoint.setdefault("model_type", version_entry.get("model_type"))
else:
checkpoint.setdefault("inLibrary", False)
if checkpoint.get("preview_url"):
checkpoint["preview_url"] = self._normalize_preview_url(
checkpoint["preview_url"]
)
except Exception as exc: # pragma: no cover - defensive logging
logger.debug(
"Error enriching checkpoint entry %s: %s",
hash_value or model_version_id,
exc,
)
return checkpoint
def _enrich_lora_entry(self, lora: Dict[str, Any]) -> Dict[str, Any]:
"""Populate convenience fields for a LoRA entry."""
if not lora or not self._lora_scanner:
return lora
hash_value = (lora.get("hash") or "").lower()
version_entry = None
if not hash_value and lora.get("modelVersionId") is not None:
version_entry = self._get_lora_from_version_index(
lora.get("modelVersionId")
)
try:
if hash_value:
lora["inLibrary"] = self._lora_scanner.has_hash(hash_value)
lora["preview_url"] = self._normalize_preview_url(
self._lora_scanner.get_preview_url_by_hash(hash_value)
)
lora["localPath"] = self._lora_scanner.get_path_by_hash(hash_value)
elif version_entry:
lora["inLibrary"] = True
cached_path = version_entry.get("file_path") or version_entry.get(
"path"
)
if cached_path:
lora.setdefault("localPath", cached_path)
if not lora.get("file_name"):
lora["file_name"] = os.path.splitext(
os.path.basename(cached_path)
)[0]
if version_entry.get("sha256") and not lora.get("hash"):
lora["hash"] = version_entry.get("sha256")
preview_url = self._normalize_preview_url(
version_entry.get("preview_url")
)
if preview_url:
lora.setdefault("preview_url", preview_url)
else:
lora.setdefault("inLibrary", False)
if lora.get("preview_url"):
lora["preview_url"] = self._normalize_preview_url(lora["preview_url"])
except Exception as exc: # pragma: no cover - defensive logging
logger.debug("Error enriching lora entry %s: %s", hash_value, exc)
return lora
def _normalize_preview_url(self, preview_url: Optional[str]) -> Optional[str]:
"""Return a preview URL that is reachable from the browser."""
if not preview_url or not isinstance(preview_url, str):
return preview_url
normalized = preview_url.strip()
if normalized.startswith("/api/lm/previews?path="):
return normalized
if os.path.isabs(normalized):
return config.get_preview_static_url(normalized)
return normalized
async def get_local_lora(self, name: str) -> Optional[Dict[str, Any]]:
"""Lookup a local LoRA model by name."""
if not self._lora_scanner or not name:
return None
return await self._lora_scanner.get_model_info_by_name(name)
async def get_paginated_data(
self,
page: int,
page_size: int,
sort_by: str = "date",
search: str = None,
filters: dict = None,
search_options: dict = None,
lora_hash: str = None,
bypass_filters: bool = True,
folder: str | None = None,
recursive: bool = True,
):
"""Get paginated and filtered recipe data
Args:
page: Current page number (1-based)
page_size: Number of items per page
sort_by: Sort method ('name' or 'date')
search: Search term
filters: Dictionary of filters to apply
search_options: Dictionary of search options to apply
lora_hash: Optional SHA256 hash of a LoRA to filter recipes by
bypass_filters: If True, ignore other filters when a lora_hash is provided
folder: Optional folder filter relative to recipes directory
recursive: Whether to include recipes in subfolders of the selected folder
"""
cache = await self.get_cached_data()
# Get base dataset
sort_field = sort_by.split(":")[0] if ":" in sort_by else sort_by
if sort_field == "date":
filtered_data = list(cache.sorted_by_date)
elif sort_field == "name":
filtered_data = list(cache.sorted_by_name)
else:
filtered_data = list(cache.raw_data)
# Apply SFW filtering if enabled
from .settings_manager import get_settings_manager
settings = get_settings_manager()
if settings.get("show_only_sfw", False):
from ..utils.constants import NSFW_LEVELS
threshold = NSFW_LEVELS.get("R", 4) # Default to R level (4) if not found
filtered_data = [
item
for item in filtered_data
if not item.get("preview_nsfw_level")
or item.get("preview_nsfw_level") < threshold
]
# Special case: Filter by LoRA hash (takes precedence if bypass_filters is True)
if lora_hash:
# Filter recipes that contain this LoRA hash
filtered_data = [
item
for item in filtered_data
if "loras" in item
and any(
lora.get("hash", "").lower() == lora_hash.lower()
for lora in item["loras"]
)
]
if bypass_filters:
# Skip other filters if bypass_filters is True
pass
# Otherwise continue with normal filtering after applying LoRA hash filter
# Skip further filtering if we're only filtering by LoRA hash with bypass enabled
if not (lora_hash and bypass_filters):
# Apply folder filter before other criteria
if folder is not None:
normalized_folder = folder.strip("/")
def matches_folder(item_folder: str) -> bool:
item_path = (item_folder or "").strip("/")
if recursive:
if not normalized_folder:
return True
return item_path == normalized_folder or item_path.startswith(
f"{normalized_folder}/"
)
return item_path == normalized_folder
filtered_data = [
item
for item in filtered_data
if matches_folder(item.get("folder", ""))
]
# Apply search filter
if search:
# Default search options if none provided
if not search_options:
search_options = {
"title": True,
"tags": True,
"lora_name": True,
"lora_model": True,
}
# Try FTS search first if available (much faster)
fts_matching_ids = self._search_with_fts(search, search_options)
if fts_matching_ids is not None:
# FTS search succeeded, filter by matching IDs
filtered_data = [
item
for item in filtered_data
if str(item.get("id", "")) in fts_matching_ids
]
else:
# Fallback to fuzzy_match (slower but always available)
# Build the search predicate based on search options
def matches_search(item):
# Search in title if enabled
if search_options.get("title", True):
if fuzzy_match(str(item.get("title", "")), search):
return True
# Search in tags if enabled
if search_options.get("tags", True) and "tags" in item:
for tag in item["tags"]:
if fuzzy_match(tag, search):
return True
# Search in lora file names if enabled
if search_options.get("lora_name", True) and "loras" in item:
for lora in item["loras"]:
if fuzzy_match(str(lora.get("file_name", "")), search):
return True
# Search in lora model names if enabled
if search_options.get("lora_model", True) and "loras" in item:
for lora in item["loras"]:
if fuzzy_match(str(lora.get("modelName", "")), search):
return True
# Search in prompt and negative_prompt if enabled
if search_options.get("prompt", True) and "gen_params" in item:
gen_params = item["gen_params"]
if fuzzy_match(str(gen_params.get("prompt", "")), search):
return True
if fuzzy_match(
str(gen_params.get("negative_prompt", "")), search
):
return True
# No match found
return False
# Filter the data using the search predicate
filtered_data = [
item for item in filtered_data if matches_search(item)
]
# Apply additional filters
if filters:
# Filter by base model
if "base_model" in filters and filters["base_model"]:
filtered_data = [
item
for item in filtered_data
if item.get("base_model", "") in filters["base_model"]
]
# Filter by favorite
if "favorite" in filters and filters["favorite"]:
filtered_data = [
item for item in filtered_data if item.get("favorite") is True
]
# Filter by tags
if "tags" in filters and filters["tags"]:
tag_spec = filters["tags"]
include_tags = set()
exclude_tags = set()
if isinstance(tag_spec, dict):
for tag, state in tag_spec.items():
if not tag:
continue
if state == "exclude":
exclude_tags.add(tag)
else:
include_tags.add(tag)
else:
include_tags = {tag for tag in tag_spec if tag}
if include_tags:
def matches_include(item_tags):
if not item_tags and "__no_tags__" in include_tags:
return True
return any(tag in include_tags for tag in (item_tags or []))
filtered_data = [
item
for item in filtered_data
if matches_include(item.get("tags"))
]
if exclude_tags:
def matches_exclude(item_tags):
if not item_tags and "__no_tags__" in exclude_tags:
return True
return any(tag in exclude_tags for tag in (item_tags or []))
filtered_data = [
item
for item in filtered_data
if not matches_exclude(item.get("tags"))
]
# Apply sorting if not already handled by pre-sorted cache
if ":" in sort_by or sort_field == "loras_count":
field, order = (sort_by.split(":") + ["desc"])[:2]
reverse = order.lower() == "desc"
if field == "name":
filtered_data = natsorted(
filtered_data,
key=lambda x: x.get("title", "").lower(),
reverse=reverse,
)
elif field == "date":
# Use modified if available, falling back to created_date
filtered_data.sort(
key=lambda x: (
x.get("modified", x.get("created_date", 0)),
x.get("file_path", ""),
),
reverse=reverse,
)
elif field == "loras_count":
filtered_data.sort(
key=lambda x: len(x.get("loras", [])), reverse=reverse
)
# Calculate pagination
total_items = len(filtered_data)
start_idx = (page - 1) * page_size
end_idx = min(start_idx + page_size, total_items)
# Get paginated items
paginated_items = filtered_data[start_idx:end_idx]
# Add inLibrary information and URLs for each recipe
for item in paginated_items:
# Format file path to URL
if "file_path" in item:
item["file_url"] = self._format_file_url(item["file_path"])
# Format dates for display
for date_field in ["created_date", "modified"]:
if date_field in item:
item[f"{date_field}_formatted"] = self._format_timestamp(
item[date_field]
)
if "loras" in item:
item["loras"] = [
self._enrich_lora_entry(dict(lora)) for lora in item["loras"]
]
if item.get("checkpoint"):
checkpoint_entry = self._normalize_checkpoint_entry(item["checkpoint"])
if checkpoint_entry:
item["checkpoint"] = self._enrich_checkpoint_entry(checkpoint_entry)
else:
item.pop("checkpoint", None)
result = {
"items": paginated_items,
"total": total_items,
"page": page,
"page_size": page_size,
"total_pages": (total_items + page_size - 1) // page_size,
}
return result
async def get_recipe_by_id(self, recipe_id: str) -> dict:
"""Get a single recipe by ID with all metadata and formatted URLs
Args:
recipe_id: The ID of the recipe to retrieve
Returns:
Dict containing the recipe data or None if not found
"""
if not recipe_id:
return None
# Get all recipes from cache
cache = await self.get_cached_data()
# Find the recipe with the specified ID
recipe = next(
(r for r in cache.raw_data if str(r.get("id", "")) == recipe_id), None
)
if not recipe:
return None
# Format the recipe with all needed information
formatted_recipe = {**recipe} # Copy all fields
# Format file path to URL
if "file_path" in formatted_recipe:
formatted_recipe["file_url"] = self._format_file_url(
formatted_recipe["file_path"]
)
# Format dates for display
for date_field in ["created_date", "modified"]:
if date_field in formatted_recipe:
formatted_recipe[f"{date_field}_formatted"] = self._format_timestamp(
formatted_recipe[date_field]
)
# Add lora metadata
if "loras" in formatted_recipe:
formatted_recipe["loras"] = [
self._enrich_lora_entry(dict(lora))
for lora in formatted_recipe["loras"]
]
if formatted_recipe.get("checkpoint"):
checkpoint_entry = self._normalize_checkpoint_entry(
formatted_recipe["checkpoint"]
)
if checkpoint_entry:
formatted_recipe["checkpoint"] = self._enrich_checkpoint_entry(
checkpoint_entry
)
else:
formatted_recipe.pop("checkpoint", None)
return formatted_recipe
def _format_file_url(self, file_path: str) -> str:
"""Format file path as URL for serving in web UI"""
if not file_path:
return "/loras_static/images/no-preview.png"
try:
normalized_path = os.path.normpath(file_path)
static_url = config.get_preview_static_url(normalized_path)
if static_url:
return static_url
except Exception as e:
logger.error(f"Error formatting file URL: {e}")
return "/loras_static/images/no-preview.png"
return "/loras_static/images/no-preview.png"
def _format_timestamp(self, timestamp: float) -> str:
"""Format timestamp for display"""
from datetime import datetime
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
async def get_recipe_json_path(self, recipe_id: str) -> Optional[str]:
"""Locate the recipe JSON file, accounting for folder placement."""
recipes_dir = self.recipes_dir
if not recipes_dir:
return None
cache = await self.get_cached_data()
folder = ""
for item in cache.raw_data:
if str(item.get("id")) == str(recipe_id):
folder = item.get("folder") or ""
break
candidate = os.path.normpath(
os.path.join(recipes_dir, folder, f"{recipe_id}.recipe.json")
)
if os.path.exists(candidate):
return candidate
for root, _, files in os.walk(recipes_dir):
if f"{recipe_id}.recipe.json" in files:
return os.path.join(root, f"{recipe_id}.recipe.json")
return None
async def update_recipe_metadata(self, recipe_id: str, metadata: dict) -> bool:
"""Update recipe metadata (like title and tags) in both file system and cache
Args:
recipe_id: The ID of the recipe to update
metadata: Dictionary containing metadata fields to update (title, tags, etc.)
Returns:
bool: True if successful, False otherwise
"""
# First, find the recipe JSON file path
recipe_json_path = await self.get_recipe_json_path(recipe_id)
if not recipe_json_path or not os.path.exists(recipe_json_path):
return False
try:
# Load existing recipe data
with open(recipe_json_path, "r", encoding="utf-8") as f:
recipe_data = json.load(f)
# Update fields
for key, value in metadata.items():
recipe_data[key] = value
# Save updated recipe
with open(recipe_json_path, "w", encoding="utf-8") as f:
json.dump(recipe_data, f, indent=4, ensure_ascii=False)
# Update the cache if it exists
if self._cache is not None:
await self._cache.update_recipe_metadata(
recipe_id, metadata, resort=False
)
self._schedule_resort()
# Update FTS index
self._update_fts_index_for_recipe(recipe_data, "update")
# Update persistent SQLite cache
if self._persistent_cache:
self._persistent_cache.update_recipe(recipe_data, recipe_json_path)
self._json_path_map[recipe_id] = recipe_json_path
# If the recipe has an image, update its EXIF metadata
from ..utils.exif_utils import ExifUtils
image_path = recipe_data.get("file_path")
if image_path and os.path.exists(image_path):
ExifUtils.append_recipe_metadata(image_path, recipe_data)
return True
except Exception as e:
import logging
logging.getLogger(__name__).error(
f"Error updating recipe metadata: {e}", exc_info=True
)
return False
async def update_lora_entry(
self,
recipe_id: str,
lora_index: int,
*,
target_name: str,
target_lora: Optional[Dict[str, Any]] = None,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Update a specific LoRA entry within a recipe.
Returns the updated recipe data and the refreshed LoRA metadata.
"""
if target_name is None:
raise ValueError("target_name must be provided")
recipe_json_path = await self.get_recipe_json_path(recipe_id)
if not recipe_json_path or not os.path.exists(recipe_json_path):
raise RecipeNotFoundError("Recipe not found")
async with self._mutation_lock:
with open(recipe_json_path, "r", encoding="utf-8") as file_obj:
recipe_data = json.load(file_obj)
loras = recipe_data.get("loras", [])
if lora_index >= len(loras):
raise RecipeNotFoundError("LoRA index out of range in recipe")
lora_entry = loras[lora_index]
lora_entry["isDeleted"] = False
lora_entry["exclude"] = False
lora_entry["file_name"] = target_name
if target_lora is not None:
sha_value = target_lora.get("sha256") or target_lora.get("sha")
if sha_value:
lora_entry["hash"] = sha_value.lower()
civitai_info = target_lora.get("civitai") or {}
if civitai_info:
lora_entry["modelName"] = civitai_info.get("model", {}).get(
"name", ""
)
lora_entry["modelVersionName"] = civitai_info.get("name", "")
lora_entry["modelVersionId"] = civitai_info.get("id")
recipe_data["fingerprint"] = calculate_recipe_fingerprint(
recipe_data.get("loras", [])
)
recipe_data["modified"] = time.time()
with open(recipe_json_path, "w", encoding="utf-8") as file_obj:
json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False)
cache = await self.get_cached_data()
replaced = await cache.replace_recipe(recipe_id, recipe_data, resort=False)
if not replaced:
await cache.add_recipe(recipe_data, resort=False)
self._schedule_resort()
# Update FTS index
self._update_fts_index_for_recipe(recipe_data, "update")
# Update persistent SQLite cache
if self._persistent_cache:
self._persistent_cache.update_recipe(recipe_data, recipe_json_path)
self._json_path_map[recipe_id] = recipe_json_path
updated_lora = dict(lora_entry)
if target_lora is not None:
preview_url = target_lora.get("preview_url")
if preview_url:
updated_lora["preview_url"] = config.get_preview_static_url(preview_url)
if target_lora.get("file_path"):
updated_lora["localPath"] = target_lora["file_path"]
updated_lora = self._enrich_lora_entry(updated_lora)
return recipe_data, updated_lora
async def get_recipes_for_lora(self, lora_hash: str) -> List[Dict[str, Any]]:
"""Return recipes that reference a given LoRA hash."""
if not lora_hash:
return []
normalized_hash = lora_hash.lower()
cache = await self.get_cached_data()
matching_recipes: List[Dict[str, Any]] = []
for recipe in cache.raw_data:
loras = recipe.get("loras", [])
if any(
(entry.get("hash") or "").lower() == normalized_hash for entry in loras
):
recipe_copy = {**recipe}
recipe_copy["loras"] = [
self._enrich_lora_entry(dict(entry)) for entry in loras
]
recipe_copy["file_url"] = self._format_file_url(recipe.get("file_path"))
matching_recipes.append(recipe_copy)
return matching_recipes
async def get_recipe_syntax_tokens(self, recipe_id: str) -> List[str]:
"""Build LoRA syntax tokens for a recipe."""
cache = await self.get_cached_data()
recipe = await cache.get_recipe(recipe_id)
if recipe is None:
raise RecipeNotFoundError("Recipe not found")
loras = recipe.get("loras", [])
if not loras:
return []
lora_cache = None
if self._lora_scanner is not None:
lora_cache = await self._lora_scanner.get_cached_data()
syntax_parts: List[str] = []
for lora in loras:
if lora.get("isDeleted", False):
continue
file_name = None
hash_value = (lora.get("hash") or "").lower()
if (
hash_value
and self._lora_scanner is not None
and hasattr(self._lora_scanner, "_hash_index")
):
file_path = self._lora_scanner._hash_index.get_path(hash_value)
if file_path:
file_name = os.path.splitext(os.path.basename(file_path))[0]
if not file_name and lora.get("modelVersionId") and lora_cache is not None:
for cached_lora in getattr(lora_cache, "raw_data", []):
civitai_info = cached_lora.get("civitai")
if civitai_info and civitai_info.get("id") == lora.get(
"modelVersionId"
):
cached_path = cached_lora.get("path") or cached_lora.get(
"file_path"
)
if cached_path:
file_name = os.path.splitext(os.path.basename(cached_path))[
0
]
break
if not file_name:
file_name = lora.get("file_name", "unknown-lora")
strength = lora.get("strength", 1.0)
syntax_parts.append(f"<lora:{file_name}:{strength}>")
return syntax_parts
async def update_lora_filename_by_hash(
self, hash_value: str, new_file_name: str
) -> Tuple[int, int]:
"""Update file_name in all recipes that contain a LoRA with the specified hash.
Args:
hash_value: The SHA256 hash value of the LoRA
new_file_name: The new file_name to set
Returns:
Tuple[int, int]: (number of recipes updated in files, number of recipes updated in cache)
"""
if not hash_value or not new_file_name:
return 0, 0
# Always use lowercase hash for consistency
hash_value = hash_value.lower()
# Get cache
cache = await self.get_cached_data()
if not cache or not cache.raw_data:
return 0, 0
file_updated_count = 0
cache_updated_count = 0
# Find recipes that need updating from the cache
recipes_to_update = []
for recipe in cache.raw_data:
loras = recipe.get("loras", [])
if not isinstance(loras, list):
continue
has_match = False
for lora in loras:
if not isinstance(lora, dict):
continue
if (lora.get("hash") or "").lower() == hash_value:
if lora.get("file_name") != new_file_name:
lora["file_name"] = new_file_name
has_match = True
if has_match:
recipes_to_update.append(recipe)
cache_updated_count += 1
if not recipes_to_update:
return 0, 0
# Persist changes to disk and SQLite cache
async with self._mutation_lock:
for recipe in recipes_to_update:
recipe_id = str(recipe.get("id", ""))
if not recipe_id:
continue
recipe_path = os.path.join(self.recipes_dir, f"{recipe_id}.recipe.json")
try:
self._write_recipe_file(recipe_path, recipe)
file_updated_count += 1
logger.info(
f"Updated file_name in recipe {recipe_path}: -> {new_file_name}"
)
# Update persistent SQLite cache
if self._persistent_cache:
self._persistent_cache.update_recipe(recipe, recipe_path)
self._json_path_map[recipe_id] = recipe_path
except Exception as e:
logger.error(f"Error updating recipe file {recipe_path}: {e}")
# We don't necessarily need to resort because LoRA file_name isn't a sort key,
# but we might want to schedule a resort if we're paranoid or if searching relies on sorted state.
# Given it's a rename of a dependency, search results might change if searching by LoRA name.
self._schedule_resort()
return file_updated_count, cache_updated_count
async def find_recipes_by_fingerprint(self, fingerprint: str) -> list:
"""Find recipes with a matching fingerprint
Args:
fingerprint: The recipe fingerprint to search for
Returns:
List of recipe details that match the fingerprint
"""
if not fingerprint:
return []
# Get all recipes from cache
cache = await self.get_cached_data()
# Find recipes with matching fingerprint
matching_recipes = []
for recipe in cache.raw_data:
if recipe.get("fingerprint") == fingerprint:
recipe_details = {
"id": recipe.get("id"),
"title": recipe.get("title"),
"file_url": self._format_file_url(recipe.get("file_path")),
"modified": recipe.get("modified"),
"created_date": recipe.get("created_date"),
"lora_count": len(recipe.get("loras", [])),
}
matching_recipes.append(recipe_details)
return matching_recipes
async def find_all_duplicate_recipes(self) -> dict:
"""Find all recipe duplicates based on fingerprints
Returns:
Dictionary where keys are fingerprints and values are lists of recipe IDs
"""
# Get all recipes from cache
cache = await self.get_cached_data()
# Group recipes by fingerprint
fingerprint_groups = {}
for recipe in cache.raw_data:
fingerprint = recipe.get("fingerprint")
if not fingerprint:
continue
if fingerprint not in fingerprint_groups:
fingerprint_groups[fingerprint] = []
fingerprint_groups[fingerprint].append(recipe.get("id"))
# Filter to only include groups with more than one recipe
duplicate_groups = {k: v for k, v in fingerprint_groups.items() if len(v) > 1}
return duplicate_groups
async def find_duplicate_recipes_by_source(self) -> dict:
"""Find all recipe duplicates based on source_path (Civitai image URLs)
Returns:
Dictionary where keys are source URLs and values are lists of recipe IDs
"""
cache = await self.get_cached_data()
url_groups = {}
for recipe in cache.raw_data:
source_url = recipe.get("source_path", "").strip()
if not source_url:
continue
if source_url not in url_groups:
url_groups[source_url] = []
url_groups[source_url].append(recipe.get("id"))
duplicate_groups = {k: v for k, v in url_groups.items() if len(v) > 1}
return duplicate_groups