mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-04-02 10:48:51 -03:00
2597 lines
98 KiB
Python
2597 lines
98 KiB
Python
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import time
|
||
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple
|
||
from ..config import config
|
||
from .recipe_cache import RecipeCache
|
||
from .recipe_fts_index import RecipeFTSIndex
|
||
from .persistent_recipe_cache import (
|
||
PersistentRecipeCache,
|
||
get_persistent_recipe_cache,
|
||
PersistedRecipeData,
|
||
)
|
||
from .service_registry import ServiceRegistry
|
||
from .lora_scanner import LoraScanner
|
||
from .metadata_service import get_default_metadata_provider
|
||
from .checkpoint_scanner import CheckpointScanner
|
||
from .recipes.errors import RecipeNotFoundError
|
||
from ..utils.utils import calculate_recipe_fingerprint, fuzzy_match
|
||
from natsort import natsorted
|
||
import sys
|
||
import re
|
||
from ..recipes.merger import GenParamsMerger
|
||
from ..recipes.enrichment import RecipeEnricher
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class RecipeScanner:
|
||
"""Service for scanning and managing recipe images"""
|
||
|
||
_instance = None
|
||
_lock = asyncio.Lock()
|
||
|
||
@classmethod
|
||
async def get_instance(
|
||
cls,
|
||
lora_scanner: Optional[LoraScanner] = None,
|
||
checkpoint_scanner: Optional[CheckpointScanner] = None,
|
||
):
|
||
"""Get singleton instance of RecipeScanner"""
|
||
async with cls._lock:
|
||
if cls._instance is None:
|
||
if not lora_scanner:
|
||
# Get lora scanner from service registry if not provided
|
||
lora_scanner = await ServiceRegistry.get_lora_scanner()
|
||
if not checkpoint_scanner:
|
||
checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner()
|
||
cls._instance = cls(lora_scanner, checkpoint_scanner)
|
||
return cls._instance
|
||
|
||
def __new__(
|
||
cls,
|
||
lora_scanner: Optional[LoraScanner] = None,
|
||
checkpoint_scanner: Optional[CheckpointScanner] = None,
|
||
):
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
cls._instance._lora_scanner = lora_scanner
|
||
cls._instance._checkpoint_scanner = checkpoint_scanner
|
||
cls._instance._civitai_client = None # Will be lazily initialized
|
||
return cls._instance
|
||
|
||
REPAIR_VERSION = 3
|
||
|
||
def __init__(
|
||
self,
|
||
lora_scanner: Optional[LoraScanner] = None,
|
||
checkpoint_scanner: Optional[CheckpointScanner] = None,
|
||
):
|
||
# Ensure initialization only happens once
|
||
if not hasattr(self, "_initialized"):
|
||
self._cache: Optional[RecipeCache] = None
|
||
self._initialization_lock = asyncio.Lock()
|
||
self._initialization_task: Optional[asyncio.Task] = None
|
||
self._is_initializing = False
|
||
self._mutation_lock = asyncio.Lock()
|
||
self._post_scan_task: Optional[asyncio.Task] = None
|
||
self._resort_tasks: Set[asyncio.Task] = set()
|
||
self._cancel_requested = False
|
||
# FTS index for fast search
|
||
self._fts_index: Optional[RecipeFTSIndex] = None
|
||
self._fts_index_task: Optional[asyncio.Task] = None
|
||
# Persistent cache for fast startup
|
||
self._persistent_cache: Optional[PersistentRecipeCache] = None
|
||
self._json_path_map: Dict[str, str] = {} # recipe_id -> json_path
|
||
if lora_scanner:
|
||
self._lora_scanner = lora_scanner
|
||
if checkpoint_scanner:
|
||
self._checkpoint_scanner = checkpoint_scanner
|
||
self._initialized = True
|
||
|
||
def on_library_changed(self) -> None:
|
||
"""Reset cached state when the active library changes."""
|
||
|
||
# Cancel any in-flight initialization or resorting work so the next
|
||
# access rebuilds the cache for the new library.
|
||
if self._initialization_task and not self._initialization_task.done():
|
||
self._initialization_task.cancel()
|
||
|
||
for task in list(self._resort_tasks):
|
||
if not task.done():
|
||
task.cancel()
|
||
self._resort_tasks.clear()
|
||
|
||
if self._post_scan_task and not self._post_scan_task.done():
|
||
self._post_scan_task.cancel()
|
||
self._post_scan_task = None
|
||
|
||
# Cancel FTS index task and clear index
|
||
if self._fts_index_task and not self._fts_index_task.done():
|
||
self._fts_index_task.cancel()
|
||
self._fts_index_task = None
|
||
if self._fts_index:
|
||
self._fts_index.clear()
|
||
self._fts_index = None
|
||
|
||
# Reset persistent cache instance for new library
|
||
self._persistent_cache = None
|
||
self._json_path_map = {}
|
||
PersistentRecipeCache.clear_instances()
|
||
|
||
self._cache = None
|
||
self._initialization_task = None
|
||
self._is_initializing = False
|
||
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
except RuntimeError:
|
||
loop = None
|
||
|
||
if loop and not loop.is_closed():
|
||
loop.create_task(self.initialize_in_background())
|
||
|
||
async def _get_civitai_client(self):
|
||
"""Lazily initialize CivitaiClient from registry"""
|
||
if self._civitai_client is None:
|
||
self._civitai_client = await ServiceRegistry.get_civitai_client()
|
||
return self._civitai_client
|
||
|
||
def cancel_task(self) -> None:
|
||
"""Request cancellation of the current long-running task."""
|
||
self._cancel_requested = True
|
||
logger.info("Recipe Scanner: Cancellation requested")
|
||
|
||
def reset_cancellation(self) -> None:
|
||
"""Reset the cancellation flag."""
|
||
self._cancel_requested = False
|
||
|
||
def is_cancelled(self) -> bool:
|
||
"""Check if cancellation has been requested."""
|
||
return self._cancel_requested
|
||
|
||
async def repair_all_recipes(
|
||
self, progress_callback: Optional[Callable[[Dict], Any]] = None
|
||
) -> Dict[str, Any]:
|
||
"""Repair all recipes by enrichment with Civitai and embedded metadata.
|
||
|
||
Args:
|
||
persistence_service: Service for saving updated recipes
|
||
progress_callback: Optional callback for progress updates
|
||
|
||
Returns:
|
||
Dict summary of repair results
|
||
"""
|
||
if progress_callback:
|
||
await progress_callback({"status": "started"})
|
||
async with self._mutation_lock:
|
||
cache = await self.get_cached_data()
|
||
all_recipes = list(cache.raw_data)
|
||
total = len(all_recipes)
|
||
repaired_count = 0
|
||
skipped_count = 0
|
||
errors_count = 0
|
||
|
||
civitai_client = await self._get_civitai_client()
|
||
self.reset_cancellation()
|
||
|
||
for i, recipe in enumerate(all_recipes):
|
||
if self.is_cancelled():
|
||
logger.info("Recipe repair cancelled by user")
|
||
if progress_callback:
|
||
await progress_callback(
|
||
{
|
||
"status": "cancelled",
|
||
"current": i,
|
||
"total": total,
|
||
"repaired": repaired_count,
|
||
"skipped": skipped_count,
|
||
"errors": errors_count,
|
||
}
|
||
)
|
||
return {
|
||
"success": False,
|
||
"status": "cancelled",
|
||
"repaired": repaired_count,
|
||
"skipped": skipped_count,
|
||
"errors": errors_count,
|
||
"total": total,
|
||
}
|
||
|
||
try:
|
||
# Report progress
|
||
if progress_callback:
|
||
await progress_callback(
|
||
{
|
||
"status": "processing",
|
||
"current": i + 1,
|
||
"total": total,
|
||
"recipe_name": recipe.get("name", "Unknown"),
|
||
}
|
||
)
|
||
|
||
if await self._repair_single_recipe(recipe, civitai_client):
|
||
repaired_count += 1
|
||
else:
|
||
skipped_count += 1
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Error repairing recipe {recipe.get('file_path')}: {e}"
|
||
)
|
||
errors_count += 1
|
||
|
||
# Final progress update
|
||
if progress_callback:
|
||
await progress_callback(
|
||
{
|
||
"status": "completed",
|
||
"repaired": repaired_count,
|
||
"skipped": skipped_count,
|
||
"errors": errors_count,
|
||
"total": total,
|
||
}
|
||
)
|
||
|
||
return {
|
||
"success": True,
|
||
"repaired": repaired_count,
|
||
"skipped": skipped_count,
|
||
"errors": errors_count,
|
||
"total": total,
|
||
}
|
||
|
||
async def repair_recipe_by_id(self, recipe_id: str) -> Dict[str, Any]:
|
||
"""Repair a single recipe by its ID.
|
||
|
||
Args:
|
||
recipe_id: ID of the recipe to repair
|
||
|
||
Returns:
|
||
Dict summary of repair result
|
||
"""
|
||
async with self._mutation_lock:
|
||
# Get raw recipe from cache directly to avoid formatted fields
|
||
cache = await self.get_cached_data()
|
||
recipe = next(
|
||
(r for r in cache.raw_data if str(r.get("id", "")) == recipe_id), None
|
||
)
|
||
|
||
if not recipe:
|
||
raise RecipeNotFoundError(f"Recipe {recipe_id} not found")
|
||
|
||
civitai_client = await self._get_civitai_client()
|
||
success = await self._repair_single_recipe(recipe, civitai_client)
|
||
|
||
# If successfully repaired, we should return the formatted version for the UI
|
||
return {
|
||
"success": True,
|
||
"repaired": 1 if success else 0,
|
||
"skipped": 0 if success else 1,
|
||
"recipe": await self.get_recipe_by_id(recipe_id) if success else recipe,
|
||
}
|
||
|
||
async def _repair_single_recipe(
|
||
self, recipe: Dict[str, Any], civitai_client: Any
|
||
) -> bool:
|
||
"""Internal helper to repair a single recipe object.
|
||
|
||
Args:
|
||
recipe: The recipe dictionary to repair (modified in-place)
|
||
civitai_client: Authenticated Civitai client
|
||
|
||
Returns:
|
||
bool: True if recipe was repaired or updated, False if skipped
|
||
"""
|
||
# 1. Skip if already at latest repair version
|
||
if recipe.get("repair_version", 0) >= self.REPAIR_VERSION:
|
||
return False
|
||
|
||
# 2. Identification: Is repair needed?
|
||
has_checkpoint = (
|
||
"checkpoint" in recipe
|
||
and recipe["checkpoint"]
|
||
and recipe["checkpoint"].get("name")
|
||
)
|
||
gen_params = recipe.get("gen_params", {})
|
||
has_prompt = bool(gen_params.get("prompt"))
|
||
|
||
needs_repair = not has_checkpoint or not has_prompt
|
||
|
||
if not needs_repair:
|
||
# Even if no repair needed, we mark it with version if it was processed
|
||
# Always update and save because if we are here, the version is old (checked in step 1)
|
||
recipe["repair_version"] = self.REPAIR_VERSION
|
||
await self._save_recipe_persistently(recipe)
|
||
return True
|
||
|
||
# 3. Use Enricher to repair/enrich
|
||
try:
|
||
updated = await RecipeEnricher.enrich_recipe(recipe, civitai_client)
|
||
except Exception as e:
|
||
logger.error(f"Error enriching recipe {recipe.get('id')}: {e}")
|
||
updated = False
|
||
|
||
# 4. Mark version and save if updated or just marking version
|
||
# If we updated it, OR if the version is old (which we know it is if we are here), save it.
|
||
# Actually, if we are here and updated is False, it means we tried to repair but couldn't/didn't need to.
|
||
# But we still want to mark it as processed so we don't try again until version bump.
|
||
if updated or recipe.get("repair_version", 0) < self.REPAIR_VERSION:
|
||
recipe["repair_version"] = self.REPAIR_VERSION
|
||
await self._save_recipe_persistently(recipe)
|
||
return True
|
||
|
||
return False
|
||
|
||
async def _save_recipe_persistently(self, recipe: Dict[str, Any]) -> bool:
|
||
"""Helper to save a recipe to both JSON and EXIF metadata."""
|
||
recipe_id = recipe.get("id")
|
||
if not recipe_id:
|
||
return False
|
||
|
||
recipe_json_path = await self.get_recipe_json_path(recipe_id)
|
||
if not recipe_json_path:
|
||
return False
|
||
|
||
try:
|
||
# 1. Sanitize for storage (remove runtime convenience fields)
|
||
clean_recipe = self._sanitize_recipe_for_storage(recipe)
|
||
|
||
# 2. Update the original dictionary so that we persist the clean version
|
||
# globally if needed, effectively overwriting it in-place.
|
||
recipe.clear()
|
||
recipe.update(clean_recipe)
|
||
|
||
# 3. Save JSON
|
||
with open(recipe_json_path, "w", encoding="utf-8") as f:
|
||
json.dump(recipe, f, indent=4, ensure_ascii=False)
|
||
|
||
# 4. Update persistent SQLite cache
|
||
if self._persistent_cache:
|
||
self._persistent_cache.update_recipe(recipe, recipe_json_path)
|
||
self._json_path_map[str(recipe_id)] = recipe_json_path
|
||
|
||
# 5. Update EXIF if image exists
|
||
image_path = recipe.get("file_path")
|
||
if image_path and os.path.exists(image_path):
|
||
from ..utils.exif_utils import ExifUtils
|
||
|
||
ExifUtils.append_recipe_metadata(image_path, recipe)
|
||
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Error persisting recipe {recipe_id}: {e}")
|
||
return False
|
||
|
||
def _sanitize_recipe_for_storage(self, recipe: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Create a clean copy of the recipe without runtime convenience fields."""
|
||
import copy
|
||
|
||
clean = copy.deepcopy(recipe)
|
||
|
||
# 0. Clean top-level runtime fields
|
||
for key in ("file_url", "created_date_formatted", "modified_formatted"):
|
||
clean.pop(key, None)
|
||
|
||
# 1. Clean LORAs
|
||
if "loras" in clean and isinstance(clean["loras"], list):
|
||
for lora in clean["loras"]:
|
||
# Fields to remove (runtime only)
|
||
for key in ("inLibrary", "preview_url", "localPath"):
|
||
lora.pop(key, None)
|
||
|
||
# Normalize weight/strength if mapping is desired (standard in persistence_service)
|
||
if "weight" in lora and "strength" not in lora:
|
||
lora["strength"] = float(lora.pop("weight"))
|
||
|
||
# 2. Clean Checkpoint
|
||
if "checkpoint" in clean and isinstance(clean["checkpoint"], dict):
|
||
cp = clean["checkpoint"]
|
||
# Fields to remove (runtime only)
|
||
for key in (
|
||
"inLibrary",
|
||
"localPath",
|
||
"preview_url",
|
||
"thumbnailUrl",
|
||
"size",
|
||
"downloadUrl",
|
||
):
|
||
cp.pop(key, None)
|
||
|
||
return clean
|
||
|
||
async def initialize_in_background(self) -> None:
|
||
"""Initialize cache in background using thread pool"""
|
||
try:
|
||
await self._wait_for_lora_scanner()
|
||
|
||
# Set initial empty cache to avoid None reference errors
|
||
if self._cache is None:
|
||
self._cache = RecipeCache(
|
||
raw_data=[],
|
||
sorted_by_name=[],
|
||
sorted_by_date=[],
|
||
folders=[],
|
||
folder_tree={},
|
||
)
|
||
|
||
# Mark as initializing to prevent concurrent initializations
|
||
self._is_initializing = True
|
||
self._initialization_task = asyncio.current_task()
|
||
|
||
try:
|
||
# Start timer
|
||
start_time = time.time()
|
||
|
||
# Use thread pool to execute CPU-intensive operations
|
||
loop = asyncio.get_event_loop()
|
||
cache = await loop.run_in_executor(
|
||
None, # Use default thread pool
|
||
self._initialize_recipe_cache_sync, # Run synchronous version in thread
|
||
)
|
||
if cache is not None:
|
||
self._cache = cache
|
||
|
||
# Calculate elapsed time and log it
|
||
elapsed_time = time.time() - start_time
|
||
recipe_count = (
|
||
len(cache.raw_data) if cache and hasattr(cache, "raw_data") else 0
|
||
)
|
||
logger.info(
|
||
f"Recipe cache initialized in {elapsed_time:.2f} seconds. Found {recipe_count} recipes"
|
||
)
|
||
self._schedule_post_scan_enrichment()
|
||
# Schedule FTS index build in background (non-blocking)
|
||
self._schedule_fts_index_build()
|
||
finally:
|
||
# Mark initialization as complete regardless of outcome
|
||
self._is_initializing = False
|
||
except Exception as e:
|
||
logger.error(f"Recipe Scanner: Error initializing cache in background: {e}")
|
||
|
||
def _initialize_recipe_cache_sync(self):
|
||
"""Synchronous version of recipe cache initialization for thread pool execution.
|
||
|
||
Uses persistent cache for fast startup when available:
|
||
1. Try to load from persistent SQLite cache
|
||
2. Reconcile with filesystem (check mtime/size for changes)
|
||
3. Fall back to full directory scan if cache miss or reconciliation fails
|
||
4. Persist results for next startup
|
||
"""
|
||
try:
|
||
# Ensure cache exists to avoid None reference errors
|
||
if self._cache is None:
|
||
self._cache = RecipeCache(
|
||
raw_data=[],
|
||
sorted_by_name=[],
|
||
sorted_by_date=[],
|
||
folders=[],
|
||
folder_tree={},
|
||
)
|
||
|
||
# Create a new event loop for this thread
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
|
||
# Initialize persistent cache
|
||
if self._persistent_cache is None:
|
||
self._persistent_cache = get_persistent_recipe_cache()
|
||
|
||
recipes_dir = self.recipes_dir
|
||
if not recipes_dir or not os.path.exists(recipes_dir):
|
||
logger.warning(f"Recipes directory not found: {recipes_dir}")
|
||
return self._cache
|
||
|
||
# Try to load from persistent cache first
|
||
persisted = self._persistent_cache.load_cache()
|
||
if persisted:
|
||
recipes, changed, json_paths = self._reconcile_recipe_cache(
|
||
persisted, recipes_dir
|
||
)
|
||
self._json_path_map = json_paths
|
||
|
||
if not changed:
|
||
# Fast path: use cached data directly
|
||
logger.info(
|
||
"Recipe cache hit: loaded %d recipes from persistent cache",
|
||
len(recipes),
|
||
)
|
||
self._cache.raw_data = recipes
|
||
self._update_folder_metadata(self._cache)
|
||
self._sort_cache_sync()
|
||
return self._cache
|
||
else:
|
||
# Partial update: some files changed
|
||
logger.info(
|
||
"Recipe cache partial hit: reconciled %d recipes with filesystem",
|
||
len(recipes),
|
||
)
|
||
self._cache.raw_data = recipes
|
||
self._update_folder_metadata(self._cache)
|
||
self._sort_cache_sync()
|
||
# Persist updated cache
|
||
self._persistent_cache.save_cache(recipes, json_paths)
|
||
return self._cache
|
||
|
||
# Fall back to full directory scan
|
||
logger.info("Recipe cache miss: performing full directory scan")
|
||
recipes, json_paths = self._full_directory_scan_sync(recipes_dir)
|
||
self._json_path_map = json_paths
|
||
|
||
# Update cache with the collected data
|
||
self._cache.raw_data = recipes
|
||
self._update_folder_metadata(self._cache)
|
||
self._sort_cache_sync()
|
||
|
||
# Persist for next startup
|
||
self._persistent_cache.save_cache(recipes, json_paths)
|
||
|
||
return self._cache
|
||
except Exception as e:
|
||
logger.error(f"Error in thread-based recipe cache initialization: {e}")
|
||
import traceback
|
||
|
||
traceback.print_exc(file=sys.stderr)
|
||
return self._cache if hasattr(self, "_cache") else None
|
||
finally:
|
||
# Clean up the event loop
|
||
loop.close()
|
||
|
||
def _reconcile_recipe_cache(
|
||
self,
|
||
persisted: PersistedRecipeData,
|
||
recipes_dir: str,
|
||
) -> Tuple[List[Dict], bool, Dict[str, str]]:
|
||
"""Reconcile persisted cache with current filesystem state.
|
||
|
||
Args:
|
||
persisted: The persisted recipe data from SQLite cache.
|
||
recipes_dir: Path to the recipes directory.
|
||
|
||
Returns:
|
||
Tuple of (recipes list, changed flag, json_paths dict).
|
||
"""
|
||
recipes: List[Dict] = []
|
||
json_paths: Dict[str, str] = {}
|
||
changed = False
|
||
|
||
# Build set of current recipe files
|
||
current_files: Dict[str, Tuple[float, int]] = {}
|
||
for root, _, files in os.walk(recipes_dir):
|
||
for file in files:
|
||
if file.lower().endswith(".recipe.json"):
|
||
file_path = os.path.join(root, file)
|
||
try:
|
||
stat = os.stat(file_path)
|
||
current_files[file_path] = (stat.st_mtime, stat.st_size)
|
||
except OSError:
|
||
continue
|
||
|
||
# Build recipe_id -> recipe lookup (O(n) instead of O(n²))
|
||
recipe_by_id: Dict[str, Dict] = {
|
||
str(r.get("id", "")): r for r in persisted.raw_data if r.get("id")
|
||
}
|
||
|
||
# Build json_path -> recipe lookup from file_stats (O(m))
|
||
persisted_by_path: Dict[str, Dict] = {}
|
||
for json_path in persisted.file_stats.keys():
|
||
basename = os.path.basename(json_path)
|
||
if basename.lower().endswith(".recipe.json"):
|
||
recipe_id = basename[: -len(".recipe.json")]
|
||
if recipe_id in recipe_by_id:
|
||
persisted_by_path[json_path] = recipe_by_id[recipe_id]
|
||
|
||
# Process current files
|
||
for file_idx, (file_path, (current_mtime, current_size)) in enumerate(
|
||
current_files.items()
|
||
):
|
||
cached_stats = persisted.file_stats.get(file_path)
|
||
|
||
# Extract recipe_id from current file for fallback lookup
|
||
basename = os.path.basename(file_path)
|
||
recipe_id_from_file = (
|
||
basename[: -len(".recipe.json")]
|
||
if basename.lower().endswith(".recipe.json")
|
||
else None
|
||
)
|
||
|
||
if cached_stats:
|
||
cached_mtime, cached_size = cached_stats
|
||
# Check if file is unchanged
|
||
if (
|
||
abs(current_mtime - cached_mtime) < 1.0
|
||
and current_size == cached_size
|
||
):
|
||
# Try direct path lookup first
|
||
cached_recipe = persisted_by_path.get(file_path)
|
||
# Fallback to recipe_id lookup if path lookup fails
|
||
if not cached_recipe and recipe_id_from_file:
|
||
cached_recipe = recipe_by_id.get(recipe_id_from_file)
|
||
if cached_recipe:
|
||
recipe_id = str(cached_recipe.get("id", ""))
|
||
# Track folder from file path
|
||
cached_recipe["folder"] = cached_recipe.get(
|
||
"folder"
|
||
) or self._calculate_folder(file_path)
|
||
recipes.append(cached_recipe)
|
||
json_paths[recipe_id] = file_path
|
||
continue
|
||
|
||
# File is new or changed - need to re-read
|
||
changed = True
|
||
recipe_data = self._load_recipe_file_sync(file_path)
|
||
if recipe_data:
|
||
recipe_id = str(recipe_data.get("id", ""))
|
||
recipes.append(recipe_data)
|
||
json_paths[recipe_id] = file_path
|
||
|
||
# Periodically release GIL so the event loop thread can run
|
||
if file_idx % 100 == 0:
|
||
time.sleep(0)
|
||
|
||
# Check for deleted files
|
||
for json_path in persisted.file_stats.keys():
|
||
if json_path not in current_files:
|
||
changed = True
|
||
logger.debug("Recipe file deleted: %s", json_path)
|
||
|
||
return recipes, changed, json_paths
|
||
|
||
def _full_directory_scan_sync(
|
||
self, recipes_dir: str
|
||
) -> Tuple[List[Dict], Dict[str, str]]:
|
||
"""Perform a full synchronous directory scan for recipes.
|
||
|
||
Args:
|
||
recipes_dir: Path to the recipes directory.
|
||
|
||
Returns:
|
||
Tuple of (recipes list, json_paths dict).
|
||
"""
|
||
recipes: List[Dict] = []
|
||
json_paths: Dict[str, str] = {}
|
||
|
||
# Get all recipe JSON files
|
||
recipe_files = []
|
||
for root, _, files in os.walk(recipes_dir):
|
||
for file in files:
|
||
if file.lower().endswith(".recipe.json"):
|
||
recipe_files.append(os.path.join(root, file))
|
||
|
||
# Process each recipe file
|
||
for i, recipe_path in enumerate(recipe_files):
|
||
recipe_data = self._load_recipe_file_sync(recipe_path)
|
||
if recipe_data:
|
||
recipe_id = str(recipe_data.get("id", ""))
|
||
recipes.append(recipe_data)
|
||
json_paths[recipe_id] = recipe_path
|
||
# Periodically release GIL so the event loop thread can run
|
||
if i % 100 == 0:
|
||
time.sleep(0)
|
||
|
||
return recipes, json_paths
|
||
|
||
def _load_recipe_file_sync(self, recipe_path: str) -> Optional[Dict]:
|
||
"""Load a single recipe file synchronously.
|
||
|
||
Args:
|
||
recipe_path: Path to the recipe JSON file.
|
||
|
||
Returns:
|
||
Recipe dictionary if valid, None otherwise.
|
||
"""
|
||
try:
|
||
with open(recipe_path, "r", encoding="utf-8") as f:
|
||
recipe_data = json.load(f)
|
||
|
||
# Validate recipe data
|
||
if not recipe_data or not isinstance(recipe_data, dict):
|
||
logger.warning(f"Invalid recipe data in {recipe_path}")
|
||
return None
|
||
|
||
# Ensure required fields exist
|
||
required_fields = ["id", "file_path", "title"]
|
||
if not all(field in recipe_data for field in required_fields):
|
||
logger.warning(f"Missing required fields in {recipe_path}")
|
||
return None
|
||
|
||
# Ensure the image file exists and prioritize local siblings
|
||
image_path = recipe_data.get("file_path")
|
||
path_updated = False
|
||
if image_path:
|
||
recipe_dir = os.path.dirname(recipe_path)
|
||
image_filename = os.path.basename(image_path)
|
||
local_sibling_path = os.path.normpath(
|
||
os.path.join(recipe_dir, image_filename)
|
||
)
|
||
|
||
# If local sibling exists and stored path is different, prefer local
|
||
if (
|
||
os.path.exists(local_sibling_path)
|
||
and os.path.normpath(image_path) != local_sibling_path
|
||
):
|
||
recipe_data["file_path"] = local_sibling_path
|
||
path_updated = True
|
||
logger.info(
|
||
f"Updated recipe image path to local sibling: {local_sibling_path}"
|
||
)
|
||
elif not os.path.exists(image_path):
|
||
logger.warning(
|
||
f"Recipe image not found and no local sibling: {image_path}"
|
||
)
|
||
|
||
if path_updated:
|
||
try:
|
||
with open(recipe_path, "w", encoding="utf-8") as f:
|
||
json.dump(recipe_data, f, indent=4, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to persist repair for {recipe_path}: {e}")
|
||
|
||
# Track folder placement relative to recipes directory
|
||
recipe_data["folder"] = recipe_data.get("folder") or self._calculate_folder(
|
||
recipe_path
|
||
)
|
||
|
||
# Ensure loras array exists
|
||
if "loras" not in recipe_data:
|
||
recipe_data["loras"] = []
|
||
|
||
# Ensure gen_params exists
|
||
if "gen_params" not in recipe_data:
|
||
recipe_data["gen_params"] = {}
|
||
|
||
return recipe_data
|
||
except Exception as e:
|
||
logger.error(f"Error loading recipe file {recipe_path}: {e}")
|
||
import traceback
|
||
|
||
traceback.print_exc(file=sys.stderr)
|
||
return None
|
||
|
||
def _sort_cache_sync(self) -> None:
|
||
"""Sort cache data synchronously."""
|
||
try:
|
||
# Sort by name
|
||
self._cache.sorted_by_name = natsorted(
|
||
self._cache.raw_data, key=lambda x: x.get("title", "").lower()
|
||
)
|
||
|
||
# Sort by date (modified or created)
|
||
self._cache.sorted_by_date = sorted(
|
||
self._cache.raw_data,
|
||
key=lambda x: (
|
||
x.get("modified", x.get("created_date", 0)),
|
||
x.get("file_path", ""),
|
||
),
|
||
reverse=True,
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Error sorting recipe cache: {e}")
|
||
|
||
async def _wait_for_lora_scanner(self) -> None:
|
||
"""Ensure the LoRA scanner has initialized before recipe enrichment."""
|
||
|
||
if not getattr(self, "_lora_scanner", None):
|
||
return
|
||
|
||
lora_scanner = self._lora_scanner
|
||
cache_ready = getattr(lora_scanner, "_cache", None) is not None
|
||
|
||
# If cache is already available, we can proceed
|
||
if cache_ready:
|
||
return
|
||
|
||
# Await an existing initialization task if present
|
||
task = getattr(lora_scanner, "_initialization_task", None)
|
||
if task and hasattr(task, "done") and not task.done():
|
||
try:
|
||
await task
|
||
except Exception: # pragma: no cover - defensive guard
|
||
pass
|
||
if getattr(lora_scanner, "_cache", None) is not None:
|
||
return
|
||
|
||
# Otherwise, request initialization and proceed once it completes
|
||
try:
|
||
await lora_scanner.initialize_in_background()
|
||
except Exception as exc: # pragma: no cover - defensive guard
|
||
logger.debug("Recipe Scanner: LoRA init request failed: %s", exc)
|
||
|
||
def _schedule_post_scan_enrichment(self) -> None:
|
||
"""Kick off a non-blocking enrichment pass to fill remote metadata."""
|
||
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
except RuntimeError:
|
||
return
|
||
|
||
if self._post_scan_task and not self._post_scan_task.done():
|
||
return
|
||
|
||
async def _run_enrichment():
|
||
try:
|
||
await self._enrich_cache_metadata()
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as exc: # pragma: no cover - defensive guard
|
||
logger.error(
|
||
"Recipe Scanner: error during post-scan enrichment: %s",
|
||
exc,
|
||
exc_info=True,
|
||
)
|
||
|
||
self._post_scan_task = loop.create_task(
|
||
_run_enrichment(), name="recipe_cache_enrichment"
|
||
)
|
||
|
||
def _schedule_fts_index_build(self) -> None:
|
||
"""Build FTS index in background without blocking.
|
||
|
||
Validates existing index first and reuses it if valid.
|
||
"""
|
||
|
||
if self._fts_index_task and not self._fts_index_task.done():
|
||
return # Already running
|
||
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
except RuntimeError:
|
||
return
|
||
|
||
async def _build_fts():
|
||
if self._cache is None:
|
||
return
|
||
|
||
try:
|
||
self._fts_index = RecipeFTSIndex()
|
||
|
||
# Check if existing index is valid
|
||
recipe_ids = {
|
||
str(r.get("id", "")) for r in self._cache.raw_data if r.get("id")
|
||
}
|
||
recipe_count = len(self._cache.raw_data)
|
||
|
||
# Run validation in thread pool
|
||
is_valid = await loop.run_in_executor(
|
||
None, self._fts_index.validate_index, recipe_count, recipe_ids
|
||
)
|
||
|
||
if is_valid:
|
||
logger.info(
|
||
"FTS index validated, reusing existing index with %d recipes",
|
||
recipe_count,
|
||
)
|
||
self._fts_index._ready.set()
|
||
return
|
||
|
||
# Only rebuild if validation fails
|
||
logger.info("FTS index invalid or outdated, rebuilding...")
|
||
await loop.run_in_executor(
|
||
None, self._fts_index.build_index, self._cache.raw_data
|
||
)
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as exc:
|
||
logger.error(
|
||
"Recipe Scanner: error building FTS index: %s", exc, exc_info=True
|
||
)
|
||
|
||
self._fts_index_task = loop.create_task(
|
||
_build_fts(), name="recipe_fts_index_build"
|
||
)
|
||
|
||
def _search_with_fts(self, search: str, search_options: Dict) -> Optional[Set[str]]:
|
||
"""Search recipes using FTS index if available.
|
||
|
||
Args:
|
||
search: The search query string.
|
||
search_options: Dictionary of search options (title, tags, lora_name, lora_model, prompt).
|
||
|
||
Returns:
|
||
Set of matching recipe IDs if FTS is available and search succeeded,
|
||
None if FTS is not ready (caller should fall back to fuzzy search).
|
||
"""
|
||
if not self._fts_index or not self._fts_index.is_ready():
|
||
return None
|
||
|
||
# Build the set of fields to search based on search_options
|
||
fields: Set[str] = set()
|
||
if search_options.get("title", True):
|
||
fields.add("title")
|
||
if search_options.get("tags", True):
|
||
fields.add("tags")
|
||
if search_options.get("lora_name", True):
|
||
fields.add("lora_name")
|
||
if search_options.get("lora_model", True):
|
||
fields.add("lora_model")
|
||
if search_options.get("prompt", False): # prompt search is opt-in by default
|
||
fields.add("prompt")
|
||
|
||
# If no fields enabled, search all fields
|
||
if not fields:
|
||
fields = None
|
||
|
||
try:
|
||
result = self._fts_index.search(search, fields)
|
||
# Return None if empty to trigger fuzzy fallback
|
||
# Empty FTS results may indicate query syntax issues or need for fuzzy matching
|
||
if not result:
|
||
return None
|
||
return result
|
||
except Exception as exc:
|
||
logger.debug("FTS search failed, falling back to fuzzy search: %s", exc)
|
||
return None
|
||
|
||
def _update_fts_index_for_recipe(
|
||
self, recipe: Dict[str, Any], operation: str = "add"
|
||
) -> None:
|
||
"""Update FTS index for a single recipe (add, update, or remove).
|
||
|
||
Args:
|
||
recipe: The recipe dictionary.
|
||
operation: One of 'add', 'update', or 'remove'.
|
||
"""
|
||
if not self._fts_index or not self._fts_index.is_ready():
|
||
return
|
||
|
||
try:
|
||
if operation == "remove":
|
||
recipe_id = (
|
||
str(recipe.get("id", ""))
|
||
if isinstance(recipe, dict)
|
||
else str(recipe)
|
||
)
|
||
self._fts_index.remove_recipe(recipe_id)
|
||
elif operation in ("add", "update"):
|
||
self._fts_index.update_recipe(recipe)
|
||
except Exception as exc:
|
||
logger.debug("Failed to update FTS index for recipe: %s", exc)
|
||
|
||
async def _enrich_cache_metadata(self) -> None:
|
||
"""Perform remote metadata enrichment after the initial scan."""
|
||
|
||
cache = self._cache
|
||
if cache is None or not getattr(cache, "raw_data", None):
|
||
return
|
||
|
||
for index, recipe in enumerate(list(cache.raw_data)):
|
||
try:
|
||
metadata_updated = await self._update_lora_information(recipe)
|
||
if metadata_updated:
|
||
recipe_id = recipe.get("id")
|
||
if recipe_id:
|
||
recipe_path = os.path.join(
|
||
self.recipes_dir, f"{recipe_id}.recipe.json"
|
||
)
|
||
if os.path.exists(recipe_path):
|
||
try:
|
||
self._write_recipe_file(recipe_path, recipe)
|
||
except (
|
||
Exception
|
||
) as exc: # pragma: no cover - best-effort persistence
|
||
logger.debug(
|
||
"Recipe Scanner: could not persist recipe %s: %s",
|
||
recipe_id,
|
||
exc,
|
||
)
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as exc: # pragma: no cover - defensive logging
|
||
logger.error(
|
||
"Recipe Scanner: error enriching recipe %s: %s",
|
||
recipe.get("id"),
|
||
exc,
|
||
exc_info=True,
|
||
)
|
||
|
||
await asyncio.sleep(0)
|
||
|
||
try:
|
||
await cache.resort()
|
||
except Exception as exc: # pragma: no cover - defensive logging
|
||
logger.debug(
|
||
"Recipe Scanner: error resorting cache after enrichment: %s", exc
|
||
)
|
||
|
||
def _schedule_resort(self, *, name_only: bool = False) -> None:
|
||
"""Schedule a background resort of the recipe cache."""
|
||
|
||
if not self._cache:
|
||
return
|
||
|
||
# Keep folder metadata up to date alongside sort order
|
||
self._update_folder_metadata()
|
||
|
||
async def _resort_wrapper() -> None:
|
||
try:
|
||
await self._cache.resort(name_only=name_only)
|
||
except Exception as exc: # pragma: no cover - defensive logging
|
||
logger.error(
|
||
"Recipe Scanner: error resorting cache: %s", exc, exc_info=True
|
||
)
|
||
|
||
task = asyncio.create_task(_resort_wrapper())
|
||
self._resort_tasks.add(task)
|
||
task.add_done_callback(lambda finished: self._resort_tasks.discard(finished))
|
||
|
||
def _calculate_folder(self, recipe_path: str) -> str:
|
||
"""Calculate a normalized folder path relative to ``recipes_dir``."""
|
||
|
||
recipes_dir = self.recipes_dir
|
||
if not recipes_dir:
|
||
return ""
|
||
|
||
try:
|
||
recipe_dir = os.path.dirname(os.path.normpath(recipe_path))
|
||
relative_dir = os.path.relpath(recipe_dir, recipes_dir)
|
||
if relative_dir in (".", ""):
|
||
return ""
|
||
return relative_dir.replace(os.path.sep, "/")
|
||
except Exception:
|
||
return ""
|
||
|
||
def _build_folder_tree(self, folders: list[str]) -> dict:
|
||
"""Build a nested folder tree structure from relative folder paths."""
|
||
|
||
tree: dict[str, dict] = {}
|
||
for folder in folders:
|
||
if not folder:
|
||
continue
|
||
|
||
parts = folder.split("/")
|
||
current_level = tree
|
||
|
||
for part in parts:
|
||
if part not in current_level:
|
||
current_level[part] = {}
|
||
current_level = current_level[part]
|
||
|
||
return tree
|
||
|
||
def _update_folder_metadata(self, cache: RecipeCache | None = None) -> None:
|
||
"""Ensure folder lists and tree metadata are synchronized with cache contents."""
|
||
|
||
cache = cache or self._cache
|
||
if cache is None:
|
||
return
|
||
|
||
folders: set[str] = set()
|
||
for item in cache.raw_data:
|
||
folder_value = item.get("folder", "")
|
||
if folder_value is None:
|
||
folder_value = ""
|
||
if folder_value == ".":
|
||
folder_value = ""
|
||
normalized = str(folder_value).replace("\\", "/")
|
||
item["folder"] = normalized
|
||
folders.add(normalized)
|
||
|
||
cache.folders = sorted(folders, key=lambda entry: entry.lower())
|
||
cache.folder_tree = self._build_folder_tree(cache.folders)
|
||
|
||
async def get_folders(self) -> list[str]:
|
||
"""Return a sorted list of recipe folders relative to the recipes root."""
|
||
|
||
cache = await self.get_cached_data()
|
||
self._update_folder_metadata(cache)
|
||
return cache.folders
|
||
|
||
async def get_folder_tree(self) -> dict:
|
||
"""Return a hierarchical tree of recipe folders for sidebar navigation."""
|
||
|
||
cache = await self.get_cached_data()
|
||
self._update_folder_metadata(cache)
|
||
return cache.folder_tree
|
||
|
||
@property
|
||
def recipes_dir(self) -> str:
|
||
"""Get path to recipes directory"""
|
||
if not config.loras_roots:
|
||
return ""
|
||
|
||
# config.loras_roots already sorted case-insensitively, use the first one
|
||
recipes_dir = os.path.join(config.loras_roots[0], "recipes")
|
||
os.makedirs(recipes_dir, exist_ok=True)
|
||
|
||
return recipes_dir
|
||
|
||
async def get_cached_data(self, force_refresh: bool = False) -> RecipeCache:
|
||
"""Get cached recipe data, refresh if needed"""
|
||
# If cache is already initialized and no refresh is needed, return it immediately
|
||
if self._cache is not None and not force_refresh:
|
||
self._update_folder_metadata()
|
||
return self._cache
|
||
|
||
# If another initialization is already in progress, wait for it to complete
|
||
if self._is_initializing and not force_refresh:
|
||
return self._cache or RecipeCache(
|
||
raw_data=[],
|
||
sorted_by_name=[],
|
||
sorted_by_date=[],
|
||
folders=[],
|
||
folder_tree={},
|
||
)
|
||
|
||
# If force refresh is requested, re-scan in a thread pool to avoid
|
||
# blocking the event loop (which is shared with ComfyUI).
|
||
if force_refresh:
|
||
try:
|
||
async with self._initialization_lock:
|
||
self._is_initializing = True
|
||
|
||
try:
|
||
# Invalidate persistent cache so the sync path does a
|
||
# full directory scan instead of reconciling stale data.
|
||
if self._persistent_cache:
|
||
self._persistent_cache.save_cache([], {})
|
||
self._json_path_map = {}
|
||
|
||
start_time = time.time()
|
||
|
||
# Run the heavy lifting in a thread pool – same path
|
||
# used by initialize_in_background().
|
||
loop = asyncio.get_event_loop()
|
||
cache = await loop.run_in_executor(
|
||
None,
|
||
self._initialize_recipe_cache_sync,
|
||
)
|
||
if cache is not None:
|
||
self._cache = cache
|
||
|
||
elapsed = time.time() - start_time
|
||
count = len(self._cache.raw_data) if self._cache else 0
|
||
logger.info(
|
||
"Recipe cache force-refreshed in %.2f seconds. "
|
||
"Found %d recipes",
|
||
elapsed,
|
||
count,
|
||
)
|
||
|
||
# Schedule non-blocking background work
|
||
self._schedule_post_scan_enrichment()
|
||
self._schedule_fts_index_build()
|
||
|
||
return self._cache
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Recipe Manager: Error initializing cache: {e}",
|
||
exc_info=True,
|
||
)
|
||
self._cache = RecipeCache(
|
||
raw_data=[],
|
||
sorted_by_name=[],
|
||
sorted_by_date=[],
|
||
folders=[],
|
||
folder_tree={},
|
||
)
|
||
return self._cache
|
||
finally:
|
||
self._is_initializing = False
|
||
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error in get_cached_data: {e}")
|
||
|
||
# Return the cache (may be empty or partially initialized)
|
||
return self._cache or RecipeCache(
|
||
raw_data=[],
|
||
sorted_by_name=[],
|
||
sorted_by_date=[],
|
||
folders=[],
|
||
folder_tree={},
|
||
)
|
||
|
||
async def refresh_cache(self, force: bool = False) -> RecipeCache:
|
||
"""Public helper to refresh or return the recipe cache."""
|
||
|
||
return await self.get_cached_data(force_refresh=force)
|
||
|
||
async def add_recipe(self, recipe_data: Dict[str, Any]) -> None:
|
||
"""Add a recipe to the in-memory cache."""
|
||
|
||
if not recipe_data:
|
||
return
|
||
|
||
cache = await self.get_cached_data()
|
||
await cache.add_recipe(recipe_data, resort=False)
|
||
self._update_folder_metadata(cache)
|
||
self._schedule_resort()
|
||
|
||
# Update FTS index
|
||
self._update_fts_index_for_recipe(recipe_data, "add")
|
||
|
||
# Persist to SQLite cache
|
||
if self._persistent_cache:
|
||
recipe_id = str(recipe_data.get("id", ""))
|
||
json_path = self._json_path_map.get(recipe_id, "")
|
||
self._persistent_cache.update_recipe(recipe_data, json_path)
|
||
|
||
async def remove_recipe(self, recipe_id: str) -> bool:
|
||
"""Remove a recipe from the cache by ID."""
|
||
|
||
if not recipe_id:
|
||
return False
|
||
|
||
cache = await self.get_cached_data()
|
||
removed = await cache.remove_recipe(recipe_id, resort=False)
|
||
if removed is None:
|
||
return False
|
||
|
||
self._update_folder_metadata(cache)
|
||
self._schedule_resort()
|
||
|
||
# Update FTS index
|
||
self._update_fts_index_for_recipe(recipe_id, "remove")
|
||
|
||
# Remove from SQLite cache
|
||
if self._persistent_cache:
|
||
self._persistent_cache.remove_recipe(recipe_id)
|
||
self._json_path_map.pop(recipe_id, None)
|
||
|
||
return True
|
||
|
||
async def bulk_remove(self, recipe_ids: Iterable[str]) -> int:
|
||
"""Remove multiple recipes from the cache."""
|
||
|
||
cache = await self.get_cached_data()
|
||
removed = await cache.bulk_remove(recipe_ids, resort=False)
|
||
if removed:
|
||
self._schedule_resort()
|
||
# Update FTS index and persistent cache for each removed recipe
|
||
for recipe in removed:
|
||
recipe_id = str(recipe.get("id", ""))
|
||
self._update_fts_index_for_recipe(recipe_id, "remove")
|
||
if self._persistent_cache:
|
||
self._persistent_cache.remove_recipe(recipe_id)
|
||
self._json_path_map.pop(recipe_id, None)
|
||
return len(removed)
|
||
|
||
async def scan_all_recipes(self) -> List[Dict]:
|
||
"""Scan all recipe JSON files and return metadata"""
|
||
recipes = []
|
||
recipes_dir = self.recipes_dir
|
||
|
||
if not recipes_dir or not os.path.exists(recipes_dir):
|
||
logger.warning(f"Recipes directory not found: {recipes_dir}")
|
||
return recipes
|
||
|
||
# Get all recipe JSON files in the recipes directory
|
||
recipe_files = []
|
||
for root, _, files in os.walk(recipes_dir):
|
||
recipe_count = sum(1 for f in files if f.lower().endswith(".recipe.json"))
|
||
if recipe_count > 0:
|
||
for file in files:
|
||
if file.lower().endswith(".recipe.json"):
|
||
recipe_files.append(os.path.join(root, file))
|
||
|
||
# Process each recipe file
|
||
for recipe_path in recipe_files:
|
||
recipe_data = await self._load_recipe_file(recipe_path)
|
||
if recipe_data:
|
||
recipes.append(recipe_data)
|
||
|
||
return recipes
|
||
|
||
async def _load_recipe_file(self, recipe_path: str) -> Optional[Dict]:
|
||
"""Load recipe data from a JSON file"""
|
||
try:
|
||
with open(recipe_path, "r", encoding="utf-8") as f:
|
||
recipe_data = json.load(f)
|
||
|
||
# Validate recipe data
|
||
if not recipe_data or not isinstance(recipe_data, dict):
|
||
logger.warning(f"Invalid recipe data in {recipe_path}")
|
||
return None
|
||
|
||
# Ensure required fields exist
|
||
required_fields = ["id", "file_path", "title"]
|
||
for field in required_fields:
|
||
if field not in recipe_data:
|
||
logger.warning(f"Missing required field '{field}' in {recipe_path}")
|
||
return None
|
||
|
||
# Ensure the image file exists and prioritize local siblings
|
||
image_path = recipe_data.get("file_path")
|
||
path_updated = False
|
||
if image_path:
|
||
recipe_dir = os.path.dirname(recipe_path)
|
||
image_filename = os.path.basename(image_path)
|
||
local_sibling_path = os.path.normpath(
|
||
os.path.join(recipe_dir, image_filename)
|
||
)
|
||
|
||
# If local sibling exists and stored path is different, prefer local
|
||
if (
|
||
os.path.exists(local_sibling_path)
|
||
and os.path.normpath(image_path) != local_sibling_path
|
||
):
|
||
recipe_data["file_path"] = local_sibling_path
|
||
image_path = local_sibling_path
|
||
path_updated = True
|
||
logger.info(
|
||
"Updated recipe image path to local sibling: %s",
|
||
local_sibling_path,
|
||
)
|
||
elif not os.path.exists(image_path):
|
||
logger.warning(
|
||
f"Recipe image not found and no local sibling: {image_path}"
|
||
)
|
||
|
||
if path_updated:
|
||
self._write_recipe_file(recipe_path, recipe_data)
|
||
|
||
# Track folder placement relative to recipes directory
|
||
recipe_data["folder"] = recipe_data.get("folder") or self._calculate_folder(
|
||
recipe_path
|
||
)
|
||
|
||
# Ensure loras array exists
|
||
if "loras" not in recipe_data:
|
||
recipe_data["loras"] = []
|
||
|
||
# Ensure gen_params exists
|
||
if "gen_params" not in recipe_data:
|
||
recipe_data["gen_params"] = {}
|
||
|
||
# Update lora information with local paths and availability
|
||
lora_metadata_updated = await self._update_lora_information(recipe_data)
|
||
|
||
if recipe_data.get("checkpoint"):
|
||
checkpoint_entry = self._normalize_checkpoint_entry(
|
||
recipe_data["checkpoint"]
|
||
)
|
||
if checkpoint_entry:
|
||
recipe_data["checkpoint"] = self._enrich_checkpoint_entry(
|
||
checkpoint_entry
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"Dropping invalid checkpoint entry in %s", recipe_path
|
||
)
|
||
recipe_data.pop("checkpoint", None)
|
||
|
||
# Calculate and update fingerprint if missing
|
||
if "loras" in recipe_data and "fingerprint" not in recipe_data:
|
||
fingerprint = calculate_recipe_fingerprint(recipe_data["loras"])
|
||
recipe_data["fingerprint"] = fingerprint
|
||
|
||
# Write updated recipe data back to file
|
||
try:
|
||
self._write_recipe_file(recipe_path, recipe_data)
|
||
logger.info(f"Added fingerprint to recipe: {recipe_path}")
|
||
except Exception as e:
|
||
logger.error(f"Error writing updated recipe with fingerprint: {e}")
|
||
elif lora_metadata_updated:
|
||
# Persist updates such as marking invalid entries as deleted
|
||
try:
|
||
self._write_recipe_file(recipe_path, recipe_data)
|
||
except Exception as e:
|
||
logger.error(f"Error writing updated recipe metadata: {e}")
|
||
|
||
return recipe_data
|
||
except Exception as e:
|
||
logger.error(f"Error loading recipe file {recipe_path}: {e}")
|
||
import traceback
|
||
|
||
traceback.print_exc(file=sys.stderr)
|
||
return None
|
||
|
||
@staticmethod
|
||
def _write_recipe_file(recipe_path: str, recipe_data: Dict[str, Any]) -> None:
|
||
"""Persist ``recipe_data`` back to ``recipe_path`` with standard formatting."""
|
||
|
||
with open(recipe_path, "w", encoding="utf-8") as file_obj:
|
||
json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False)
|
||
|
||
async def _update_lora_information(self, recipe_data: Dict) -> bool:
|
||
"""Update LoRA information with hash and file_name
|
||
|
||
Returns:
|
||
bool: True if metadata was updated
|
||
"""
|
||
if not recipe_data.get("loras"):
|
||
return False
|
||
|
||
metadata_updated = False
|
||
|
||
for lora in recipe_data["loras"]:
|
||
# Skip deleted loras that were already marked
|
||
if lora.get("isDeleted", False):
|
||
continue
|
||
|
||
# Skip if already has complete information
|
||
if "hash" in lora and "file_name" in lora and lora["file_name"]:
|
||
continue
|
||
|
||
# If has modelVersionId but no hash, look in lora cache first, then fetch from Civitai
|
||
if "modelVersionId" in lora and not lora.get("hash"):
|
||
model_version_id = lora["modelVersionId"]
|
||
# Check if model_version_id is an integer and > 0
|
||
if isinstance(model_version_id, int) and model_version_id > 0:
|
||
# Try to find in lora cache first
|
||
hash_from_cache = await self._find_hash_in_lora_cache(
|
||
model_version_id
|
||
)
|
||
if hash_from_cache:
|
||
lora["hash"] = hash_from_cache
|
||
metadata_updated = True
|
||
else:
|
||
# If not in cache, fetch from Civitai
|
||
result = await self._get_hash_from_civitai(model_version_id)
|
||
if isinstance(result, tuple):
|
||
hash_from_civitai, is_deleted = result
|
||
if hash_from_civitai:
|
||
lora["hash"] = hash_from_civitai
|
||
metadata_updated = True
|
||
elif is_deleted:
|
||
# Mark the lora as deleted if it was not found on Civitai
|
||
lora["isDeleted"] = True
|
||
logger.warning(
|
||
f"Marked lora with modelVersionId {model_version_id} as deleted"
|
||
)
|
||
metadata_updated = True
|
||
else:
|
||
# No hash returned; mark as deleted to avoid repeated lookups
|
||
lora["isDeleted"] = True
|
||
metadata_updated = True
|
||
logger.warning(
|
||
"Marked lora with modelVersionId %s as deleted after failed hash lookup",
|
||
model_version_id,
|
||
)
|
||
|
||
# If has hash but no file_name, look up in lora library
|
||
if "hash" in lora and (not lora.get("file_name") or not lora["file_name"]):
|
||
hash_value = lora["hash"]
|
||
|
||
if self._lora_scanner.has_hash(hash_value):
|
||
lora_path = self._lora_scanner.get_path_by_hash(hash_value)
|
||
if lora_path:
|
||
file_name = os.path.splitext(os.path.basename(lora_path))[0]
|
||
lora["file_name"] = file_name
|
||
metadata_updated = True
|
||
else:
|
||
# Lora not in library
|
||
lora["file_name"] = ""
|
||
metadata_updated = True
|
||
|
||
return metadata_updated
|
||
|
||
async def _find_hash_in_lora_cache(self, model_version_id: str) -> Optional[str]:
|
||
"""Find hash in lora cache based on modelVersionId"""
|
||
try:
|
||
# Get all loras from cache
|
||
if not self._lora_scanner:
|
||
return None
|
||
|
||
cache = await self._lora_scanner.get_cached_data()
|
||
if not cache or not cache.raw_data:
|
||
return None
|
||
|
||
# Find lora with matching civitai.id
|
||
for lora in cache.raw_data:
|
||
civitai_data = lora.get("civitai", {})
|
||
if civitai_data and str(civitai_data.get("id", "")) == str(
|
||
model_version_id
|
||
):
|
||
return lora.get("sha256")
|
||
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Error finding hash in lora cache: {e}")
|
||
return None
|
||
|
||
async def _get_hash_from_civitai(self, model_version_id: str) -> Optional[str]:
|
||
"""Get hash from Civitai API"""
|
||
try:
|
||
# Get metadata provider instead of civitai client directly
|
||
metadata_provider = await get_default_metadata_provider()
|
||
if not metadata_provider:
|
||
logger.error("Failed to get metadata provider")
|
||
return None
|
||
|
||
version_info, error_msg = await metadata_provider.get_model_version_info(
|
||
model_version_id
|
||
)
|
||
|
||
if not version_info:
|
||
if error_msg and "model not found" in error_msg.lower():
|
||
logger.warning(
|
||
f"Model with version ID {model_version_id} was not found on Civitai - marking as deleted"
|
||
)
|
||
return None, True # Return None hash and True for isDeleted flag
|
||
else:
|
||
logger.debug(
|
||
f"Could not get hash for modelVersionId {model_version_id}: {error_msg}"
|
||
)
|
||
return None, False # Return None hash but not marked as deleted
|
||
|
||
# Get hash from the first file
|
||
for file_info in version_info.get("files", []):
|
||
sha256_hash = (file_info.get("hashes") or {}).get("SHA256")
|
||
if sha256_hash:
|
||
return (
|
||
sha256_hash,
|
||
False,
|
||
) # Return hash with False for isDeleted flag
|
||
|
||
logger.debug(
|
||
f"No SHA256 hash found in version info for ID: {model_version_id}"
|
||
)
|
||
return None, False
|
||
except Exception as e:
|
||
logger.error(f"Error getting hash from Civitai: {e}")
|
||
return None, False
|
||
|
||
def _get_lora_from_version_index(
|
||
self, model_version_id: Any
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""Quickly fetch a cached LoRA entry by modelVersionId using the version index."""
|
||
|
||
if not self._lora_scanner:
|
||
return None
|
||
|
||
cache = getattr(self._lora_scanner, "_cache", None)
|
||
if cache is None:
|
||
return None
|
||
|
||
version_index = getattr(cache, "version_index", None)
|
||
if not version_index:
|
||
return None
|
||
|
||
try:
|
||
normalized_id = int(model_version_id)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
return version_index.get(normalized_id)
|
||
|
||
def _get_checkpoint_from_version_index(
|
||
self, model_version_id: Any
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""Fetch a cached checkpoint entry by version id."""
|
||
|
||
if not self._checkpoint_scanner:
|
||
return None
|
||
|
||
cache = getattr(self._checkpoint_scanner, "_cache", None)
|
||
if cache is None:
|
||
return None
|
||
|
||
version_index = getattr(cache, "version_index", None)
|
||
if not version_index:
|
||
return None
|
||
|
||
try:
|
||
normalized_id = int(model_version_id)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
return version_index.get(normalized_id)
|
||
|
||
async def _determine_base_model(self, loras: List[Dict]) -> Optional[str]:
|
||
"""Determine the most common base model among LoRAs"""
|
||
base_models = {}
|
||
|
||
# Count occurrences of each base model
|
||
for lora in loras:
|
||
if "hash" in lora:
|
||
lora_path = self._lora_scanner.get_path_by_hash(lora["hash"])
|
||
if lora_path:
|
||
base_model = await self._get_base_model_for_lora(lora_path)
|
||
if base_model:
|
||
base_models[base_model] = base_models.get(base_model, 0) + 1
|
||
|
||
# Return the most common base model
|
||
if base_models:
|
||
return max(base_models.items(), key=lambda x: x[1])[0]
|
||
return None
|
||
|
||
async def _get_base_model_for_lora(self, lora_path: str) -> Optional[str]:
|
||
"""Get base model for a LoRA from cache"""
|
||
try:
|
||
if not self._lora_scanner:
|
||
return None
|
||
|
||
cache = await self._lora_scanner.get_cached_data()
|
||
if not cache or not cache.raw_data:
|
||
return None
|
||
|
||
# Find matching lora in cache
|
||
for lora in cache.raw_data:
|
||
if lora.get("file_path") == lora_path:
|
||
return lora.get("base_model")
|
||
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Error getting base model for lora: {e}")
|
||
return None
|
||
|
||
def _normalize_checkpoint_entry(
|
||
self, checkpoint_raw: Any
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""Coerce legacy or malformed checkpoint entries into a dict."""
|
||
|
||
if checkpoint_raw is None:
|
||
return None
|
||
|
||
if isinstance(checkpoint_raw, dict):
|
||
return dict(checkpoint_raw)
|
||
|
||
if isinstance(checkpoint_raw, (list, tuple)) and len(checkpoint_raw) == 1:
|
||
return self._normalize_checkpoint_entry(checkpoint_raw[0])
|
||
|
||
if isinstance(checkpoint_raw, str):
|
||
name = checkpoint_raw.strip()
|
||
if not name:
|
||
return None
|
||
|
||
file_name = os.path.splitext(os.path.basename(name))[0]
|
||
return {
|
||
"name": name,
|
||
"file_name": file_name,
|
||
}
|
||
|
||
return None
|
||
|
||
def _enrich_checkpoint_entry(self, checkpoint: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Populate convenience fields for a checkpoint entry."""
|
||
|
||
if (
|
||
not checkpoint
|
||
or not isinstance(checkpoint, dict)
|
||
or not self._checkpoint_scanner
|
||
):
|
||
return checkpoint
|
||
|
||
hash_value = (checkpoint.get("hash") or "").lower()
|
||
version_entry = None
|
||
model_version_id = checkpoint.get("id") or checkpoint.get("modelVersionId")
|
||
if not hash_value and model_version_id is not None:
|
||
version_entry = self._get_checkpoint_from_version_index(model_version_id)
|
||
|
||
try:
|
||
preview_url = checkpoint.get("preview_url") or checkpoint.get(
|
||
"thumbnailUrl"
|
||
)
|
||
if preview_url:
|
||
checkpoint["preview_url"] = self._normalize_preview_url(preview_url)
|
||
|
||
if hash_value:
|
||
checkpoint["inLibrary"] = self._checkpoint_scanner.has_hash(hash_value)
|
||
checkpoint["preview_url"] = self._normalize_preview_url(
|
||
checkpoint.get("preview_url")
|
||
or self._checkpoint_scanner.get_preview_url_by_hash(hash_value)
|
||
)
|
||
checkpoint["localPath"] = self._checkpoint_scanner.get_path_by_hash(
|
||
hash_value
|
||
)
|
||
elif version_entry:
|
||
checkpoint["inLibrary"] = True
|
||
cached_path = version_entry.get("file_path") or version_entry.get(
|
||
"path"
|
||
)
|
||
if cached_path:
|
||
checkpoint.setdefault("localPath", cached_path)
|
||
if not checkpoint.get("file_name"):
|
||
checkpoint["file_name"] = os.path.splitext(
|
||
os.path.basename(cached_path)
|
||
)[0]
|
||
|
||
if version_entry.get("sha256") and not checkpoint.get("hash"):
|
||
checkpoint["hash"] = version_entry.get("sha256")
|
||
|
||
preview_url = self._normalize_preview_url(
|
||
version_entry.get("preview_url")
|
||
)
|
||
if preview_url:
|
||
checkpoint.setdefault("preview_url", preview_url)
|
||
|
||
if version_entry.get("model_type"):
|
||
checkpoint.setdefault("model_type", version_entry.get("model_type"))
|
||
else:
|
||
checkpoint.setdefault("inLibrary", False)
|
||
|
||
if checkpoint.get("preview_url"):
|
||
checkpoint["preview_url"] = self._normalize_preview_url(
|
||
checkpoint["preview_url"]
|
||
)
|
||
except Exception as exc: # pragma: no cover - defensive logging
|
||
logger.debug(
|
||
"Error enriching checkpoint entry %s: %s",
|
||
hash_value or model_version_id,
|
||
exc,
|
||
)
|
||
|
||
return checkpoint
|
||
|
||
def _enrich_lora_entry(self, lora: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Populate convenience fields for a LoRA entry."""
|
||
|
||
if not lora or not self._lora_scanner:
|
||
return lora
|
||
|
||
hash_value = (lora.get("hash") or "").lower()
|
||
version_entry = None
|
||
if not hash_value and lora.get("modelVersionId") is not None:
|
||
version_entry = self._get_lora_from_version_index(
|
||
lora.get("modelVersionId")
|
||
)
|
||
|
||
try:
|
||
if hash_value:
|
||
lora["inLibrary"] = self._lora_scanner.has_hash(hash_value)
|
||
lora["preview_url"] = self._normalize_preview_url(
|
||
self._lora_scanner.get_preview_url_by_hash(hash_value)
|
||
)
|
||
lora["localPath"] = self._lora_scanner.get_path_by_hash(hash_value)
|
||
elif version_entry:
|
||
lora["inLibrary"] = True
|
||
cached_path = version_entry.get("file_path") or version_entry.get(
|
||
"path"
|
||
)
|
||
if cached_path:
|
||
lora.setdefault("localPath", cached_path)
|
||
if not lora.get("file_name"):
|
||
lora["file_name"] = os.path.splitext(
|
||
os.path.basename(cached_path)
|
||
)[0]
|
||
|
||
if version_entry.get("sha256") and not lora.get("hash"):
|
||
lora["hash"] = version_entry.get("sha256")
|
||
|
||
preview_url = self._normalize_preview_url(
|
||
version_entry.get("preview_url")
|
||
)
|
||
if preview_url:
|
||
lora.setdefault("preview_url", preview_url)
|
||
else:
|
||
lora.setdefault("inLibrary", False)
|
||
|
||
if lora.get("preview_url"):
|
||
lora["preview_url"] = self._normalize_preview_url(lora["preview_url"])
|
||
except Exception as exc: # pragma: no cover - defensive logging
|
||
logger.debug("Error enriching lora entry %s: %s", hash_value, exc)
|
||
|
||
return lora
|
||
|
||
def _normalize_preview_url(self, preview_url: Optional[str]) -> Optional[str]:
|
||
"""Return a preview URL that is reachable from the browser."""
|
||
|
||
if not preview_url or not isinstance(preview_url, str):
|
||
return preview_url
|
||
|
||
normalized = preview_url.strip()
|
||
if normalized.startswith("/api/lm/previews?path="):
|
||
return normalized
|
||
|
||
if os.path.isabs(normalized):
|
||
return config.get_preview_static_url(normalized)
|
||
|
||
return normalized
|
||
|
||
async def get_local_lora(self, name: str) -> Optional[Dict[str, Any]]:
|
||
"""Lookup a local LoRA model by name."""
|
||
|
||
if not self._lora_scanner or not name:
|
||
return None
|
||
|
||
return await self._lora_scanner.get_model_info_by_name(name)
|
||
|
||
async def get_paginated_data(
|
||
self,
|
||
page: int,
|
||
page_size: int,
|
||
sort_by: str = "date",
|
||
search: str = None,
|
||
filters: dict = None,
|
||
search_options: dict = None,
|
||
lora_hash: str = None,
|
||
checkpoint_hash: str = None,
|
||
bypass_filters: bool = True,
|
||
folder: str | None = None,
|
||
recursive: bool = True,
|
||
):
|
||
"""Get paginated and filtered recipe data
|
||
|
||
Args:
|
||
page: Current page number (1-based)
|
||
page_size: Number of items per page
|
||
sort_by: Sort method ('name' or 'date')
|
||
search: Search term
|
||
filters: Dictionary of filters to apply
|
||
search_options: Dictionary of search options to apply
|
||
lora_hash: Optional SHA256 hash of a LoRA to filter recipes by
|
||
checkpoint_hash: Optional SHA256 hash of a checkpoint to filter recipes by
|
||
bypass_filters: If True, ignore other filters when a hash filter is provided
|
||
folder: Optional folder filter relative to recipes directory
|
||
recursive: Whether to include recipes in subfolders of the selected folder
|
||
"""
|
||
cache = await self.get_cached_data()
|
||
|
||
# Get base dataset
|
||
sort_field = sort_by.split(":")[0] if ":" in sort_by else sort_by
|
||
|
||
if sort_field == "date":
|
||
filtered_data = list(cache.sorted_by_date)
|
||
elif sort_field == "name":
|
||
filtered_data = list(cache.sorted_by_name)
|
||
else:
|
||
filtered_data = list(cache.raw_data)
|
||
|
||
# Apply SFW filtering if enabled
|
||
from .settings_manager import get_settings_manager
|
||
|
||
settings = get_settings_manager()
|
||
if settings.get("show_only_sfw", False):
|
||
from ..utils.constants import NSFW_LEVELS
|
||
|
||
threshold = NSFW_LEVELS.get("R", 4) # Default to R level (4) if not found
|
||
filtered_data = [
|
||
item
|
||
for item in filtered_data
|
||
if not item.get("preview_nsfw_level")
|
||
or item.get("preview_nsfw_level") < threshold
|
||
]
|
||
|
||
# Special case: Filter by LoRA hash (takes precedence if bypass_filters is True)
|
||
if lora_hash:
|
||
# Filter recipes that contain this LoRA hash
|
||
filtered_data = [
|
||
item
|
||
for item in filtered_data
|
||
if "loras" in item
|
||
and any(
|
||
lora.get("hash", "").lower() == lora_hash.lower()
|
||
for lora in item["loras"]
|
||
)
|
||
]
|
||
|
||
if bypass_filters:
|
||
# Skip other filters if bypass_filters is True
|
||
pass
|
||
# Otherwise continue with normal filtering after applying LoRA hash filter
|
||
elif checkpoint_hash:
|
||
normalized_checkpoint_hash = checkpoint_hash.lower()
|
||
filtered_data = [
|
||
item
|
||
for item in filtered_data
|
||
if isinstance(item.get("checkpoint"), dict)
|
||
and (item["checkpoint"].get("hash", "") or "").lower()
|
||
== normalized_checkpoint_hash
|
||
]
|
||
|
||
if bypass_filters:
|
||
pass
|
||
|
||
has_hash_filter = bool(lora_hash or checkpoint_hash)
|
||
|
||
# Skip further filtering if we're only filtering by model hash with bypass enabled
|
||
if not (has_hash_filter and bypass_filters):
|
||
# Apply folder filter before other criteria
|
||
if folder is not None:
|
||
normalized_folder = folder.strip("/")
|
||
|
||
def matches_folder(item_folder: str) -> bool:
|
||
item_path = (item_folder or "").strip("/")
|
||
if recursive:
|
||
if not normalized_folder:
|
||
return True
|
||
return item_path == normalized_folder or item_path.startswith(
|
||
f"{normalized_folder}/"
|
||
)
|
||
return item_path == normalized_folder
|
||
|
||
filtered_data = [
|
||
item
|
||
for item in filtered_data
|
||
if matches_folder(item.get("folder", ""))
|
||
]
|
||
|
||
# Apply search filter
|
||
if search:
|
||
# Default search options if none provided
|
||
if not search_options:
|
||
search_options = {
|
||
"title": True,
|
||
"tags": True,
|
||
"lora_name": True,
|
||
"lora_model": True,
|
||
}
|
||
|
||
# Try FTS search first if available (much faster)
|
||
fts_matching_ids = self._search_with_fts(search, search_options)
|
||
if fts_matching_ids is not None:
|
||
# FTS search succeeded, filter by matching IDs
|
||
filtered_data = [
|
||
item
|
||
for item in filtered_data
|
||
if str(item.get("id", "")) in fts_matching_ids
|
||
]
|
||
else:
|
||
# Fallback to fuzzy_match (slower but always available)
|
||
# Build the search predicate based on search options
|
||
def matches_search(item):
|
||
# Search in title if enabled
|
||
if search_options.get("title", True):
|
||
if fuzzy_match(str(item.get("title", "")), search):
|
||
return True
|
||
|
||
# Search in tags if enabled
|
||
if search_options.get("tags", True) and "tags" in item:
|
||
for tag in item["tags"]:
|
||
if fuzzy_match(tag, search):
|
||
return True
|
||
|
||
# Search in lora file names if enabled
|
||
if search_options.get("lora_name", True) and "loras" in item:
|
||
for lora in item["loras"]:
|
||
if fuzzy_match(str(lora.get("file_name", "")), search):
|
||
return True
|
||
|
||
# Search in lora model names if enabled
|
||
if search_options.get("lora_model", True) and "loras" in item:
|
||
for lora in item["loras"]:
|
||
if fuzzy_match(str(lora.get("modelName", "")), search):
|
||
return True
|
||
|
||
# Search in prompt and negative_prompt if enabled
|
||
if search_options.get("prompt", True) and "gen_params" in item:
|
||
gen_params = item["gen_params"]
|
||
if fuzzy_match(str(gen_params.get("prompt", "")), search):
|
||
return True
|
||
if fuzzy_match(
|
||
str(gen_params.get("negative_prompt", "")), search
|
||
):
|
||
return True
|
||
|
||
# No match found
|
||
return False
|
||
|
||
# Filter the data using the search predicate
|
||
filtered_data = [
|
||
item for item in filtered_data if matches_search(item)
|
||
]
|
||
|
||
# Apply additional filters
|
||
if filters:
|
||
# Filter by base model
|
||
if "base_model" in filters and filters["base_model"]:
|
||
filtered_data = [
|
||
item
|
||
for item in filtered_data
|
||
if item.get("base_model", "") in filters["base_model"]
|
||
]
|
||
|
||
# Filter by favorite
|
||
if "favorite" in filters and filters["favorite"]:
|
||
filtered_data = [
|
||
item for item in filtered_data if item.get("favorite") is True
|
||
]
|
||
|
||
# Filter by tags
|
||
if "tags" in filters and filters["tags"]:
|
||
tag_spec = filters["tags"]
|
||
include_tags = set()
|
||
exclude_tags = set()
|
||
|
||
if isinstance(tag_spec, dict):
|
||
for tag, state in tag_spec.items():
|
||
if not tag:
|
||
continue
|
||
if state == "exclude":
|
||
exclude_tags.add(tag)
|
||
else:
|
||
include_tags.add(tag)
|
||
else:
|
||
include_tags = {tag for tag in tag_spec if tag}
|
||
|
||
if include_tags:
|
||
|
||
def matches_include(item_tags):
|
||
if not item_tags and "__no_tags__" in include_tags:
|
||
return True
|
||
return any(tag in include_tags for tag in (item_tags or []))
|
||
|
||
filtered_data = [
|
||
item
|
||
for item in filtered_data
|
||
if matches_include(item.get("tags"))
|
||
]
|
||
|
||
if exclude_tags:
|
||
|
||
def matches_exclude(item_tags):
|
||
if not item_tags and "__no_tags__" in exclude_tags:
|
||
return True
|
||
return any(tag in exclude_tags for tag in (item_tags or []))
|
||
|
||
filtered_data = [
|
||
item
|
||
for item in filtered_data
|
||
if not matches_exclude(item.get("tags"))
|
||
]
|
||
|
||
# Apply sorting if not already handled by pre-sorted cache
|
||
if ":" in sort_by or sort_field == "loras_count":
|
||
field, order = (sort_by.split(":") + ["desc"])[:2]
|
||
reverse = order.lower() == "desc"
|
||
|
||
if field == "name":
|
||
filtered_data = natsorted(
|
||
filtered_data,
|
||
key=lambda x: x.get("title", "").lower(),
|
||
reverse=reverse,
|
||
)
|
||
elif field == "date":
|
||
# Use modified if available, falling back to created_date
|
||
filtered_data.sort(
|
||
key=lambda x: (
|
||
x.get("modified", x.get("created_date", 0)),
|
||
x.get("file_path", ""),
|
||
),
|
||
reverse=reverse,
|
||
)
|
||
elif field == "loras_count":
|
||
filtered_data.sort(
|
||
key=lambda x: len(x.get("loras", [])), reverse=reverse
|
||
)
|
||
|
||
# Calculate pagination
|
||
total_items = len(filtered_data)
|
||
start_idx = (page - 1) * page_size
|
||
end_idx = min(start_idx + page_size, total_items)
|
||
|
||
# Get paginated items
|
||
paginated_items = filtered_data[start_idx:end_idx]
|
||
|
||
# Add inLibrary information and URLs for each recipe
|
||
for item in paginated_items:
|
||
# Format file path to URL
|
||
if "file_path" in item:
|
||
item["file_url"] = self._format_file_url(item["file_path"])
|
||
|
||
# Format dates for display
|
||
for date_field in ["created_date", "modified"]:
|
||
if date_field in item:
|
||
item[f"{date_field}_formatted"] = self._format_timestamp(
|
||
item[date_field]
|
||
)
|
||
|
||
if "loras" in item:
|
||
item["loras"] = [
|
||
self._enrich_lora_entry(dict(lora)) for lora in item["loras"]
|
||
]
|
||
if item.get("checkpoint"):
|
||
checkpoint_entry = self._normalize_checkpoint_entry(item["checkpoint"])
|
||
if checkpoint_entry:
|
||
item["checkpoint"] = self._enrich_checkpoint_entry(checkpoint_entry)
|
||
else:
|
||
item.pop("checkpoint", None)
|
||
|
||
result = {
|
||
"items": paginated_items,
|
||
"total": total_items,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"total_pages": (total_items + page_size - 1) // page_size,
|
||
}
|
||
|
||
return result
|
||
|
||
async def get_recipe_by_id(self, recipe_id: str) -> dict:
|
||
"""Get a single recipe by ID with all metadata and formatted URLs
|
||
|
||
Args:
|
||
recipe_id: The ID of the recipe to retrieve
|
||
|
||
Returns:
|
||
Dict containing the recipe data or None if not found
|
||
"""
|
||
if not recipe_id:
|
||
return None
|
||
|
||
# Get all recipes from cache
|
||
cache = await self.get_cached_data()
|
||
|
||
# Find the recipe with the specified ID
|
||
recipe = next(
|
||
(r for r in cache.raw_data if str(r.get("id", "")) == recipe_id), None
|
||
)
|
||
|
||
if not recipe:
|
||
return None
|
||
|
||
# Format the recipe with all needed information
|
||
formatted_recipe = {**recipe} # Copy all fields
|
||
|
||
# Format file path to URL
|
||
if "file_path" in formatted_recipe:
|
||
formatted_recipe["file_url"] = self._format_file_url(
|
||
formatted_recipe["file_path"]
|
||
)
|
||
|
||
# Format dates for display
|
||
for date_field in ["created_date", "modified"]:
|
||
if date_field in formatted_recipe:
|
||
formatted_recipe[f"{date_field}_formatted"] = self._format_timestamp(
|
||
formatted_recipe[date_field]
|
||
)
|
||
|
||
# Add lora metadata
|
||
if "loras" in formatted_recipe:
|
||
formatted_recipe["loras"] = [
|
||
self._enrich_lora_entry(dict(lora))
|
||
for lora in formatted_recipe["loras"]
|
||
]
|
||
if formatted_recipe.get("checkpoint"):
|
||
checkpoint_entry = self._normalize_checkpoint_entry(
|
||
formatted_recipe["checkpoint"]
|
||
)
|
||
if checkpoint_entry:
|
||
formatted_recipe["checkpoint"] = self._enrich_checkpoint_entry(
|
||
checkpoint_entry
|
||
)
|
||
else:
|
||
formatted_recipe.pop("checkpoint", None)
|
||
|
||
return formatted_recipe
|
||
|
||
def _format_file_url(self, file_path: str) -> str:
|
||
"""Format file path as URL for serving in web UI"""
|
||
if not file_path:
|
||
return "/loras_static/images/no-preview.png"
|
||
|
||
try:
|
||
normalized_path = os.path.normpath(file_path)
|
||
static_url = config.get_preview_static_url(normalized_path)
|
||
if static_url:
|
||
return static_url
|
||
except Exception as e:
|
||
logger.error(f"Error formatting file URL: {e}")
|
||
return "/loras_static/images/no-preview.png"
|
||
|
||
return "/loras_static/images/no-preview.png"
|
||
|
||
def _format_timestamp(self, timestamp: float) -> str:
|
||
"""Format timestamp for display"""
|
||
from datetime import datetime
|
||
|
||
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
async def get_recipe_json_path(self, recipe_id: str) -> Optional[str]:
|
||
"""Locate the recipe JSON file, accounting for folder placement."""
|
||
|
||
recipes_dir = self.recipes_dir
|
||
if not recipes_dir:
|
||
return None
|
||
|
||
cache = await self.get_cached_data()
|
||
folder = ""
|
||
for item in cache.raw_data:
|
||
if str(item.get("id")) == str(recipe_id):
|
||
folder = item.get("folder") or ""
|
||
break
|
||
|
||
candidate = os.path.normpath(
|
||
os.path.join(recipes_dir, folder, f"{recipe_id}.recipe.json")
|
||
)
|
||
if os.path.exists(candidate):
|
||
return candidate
|
||
|
||
for root, _, files in os.walk(recipes_dir):
|
||
if f"{recipe_id}.recipe.json" in files:
|
||
return os.path.join(root, f"{recipe_id}.recipe.json")
|
||
|
||
return None
|
||
|
||
async def update_recipe_metadata(self, recipe_id: str, metadata: dict) -> bool:
|
||
"""Update recipe metadata (like title and tags) in both file system and cache
|
||
|
||
Args:
|
||
recipe_id: The ID of the recipe to update
|
||
metadata: Dictionary containing metadata fields to update (title, tags, etc.)
|
||
|
||
Returns:
|
||
bool: True if successful, False otherwise
|
||
"""
|
||
# First, find the recipe JSON file path
|
||
recipe_json_path = await self.get_recipe_json_path(recipe_id)
|
||
if not recipe_json_path or not os.path.exists(recipe_json_path):
|
||
return False
|
||
|
||
try:
|
||
# Load existing recipe data
|
||
with open(recipe_json_path, "r", encoding="utf-8") as f:
|
||
recipe_data = json.load(f)
|
||
|
||
# Update fields
|
||
for key, value in metadata.items():
|
||
recipe_data[key] = value
|
||
|
||
# Save updated recipe
|
||
with open(recipe_json_path, "w", encoding="utf-8") as f:
|
||
json.dump(recipe_data, f, indent=4, ensure_ascii=False)
|
||
|
||
# Update the cache if it exists
|
||
if self._cache is not None:
|
||
await self._cache.update_recipe_metadata(
|
||
recipe_id, metadata, resort=False
|
||
)
|
||
self._schedule_resort()
|
||
|
||
# Update FTS index
|
||
self._update_fts_index_for_recipe(recipe_data, "update")
|
||
|
||
# Update persistent SQLite cache
|
||
if self._persistent_cache:
|
||
self._persistent_cache.update_recipe(recipe_data, recipe_json_path)
|
||
self._json_path_map[recipe_id] = recipe_json_path
|
||
|
||
# If the recipe has an image, update its EXIF metadata
|
||
from ..utils.exif_utils import ExifUtils
|
||
|
||
image_path = recipe_data.get("file_path")
|
||
if image_path and os.path.exists(image_path):
|
||
ExifUtils.append_recipe_metadata(image_path, recipe_data)
|
||
|
||
return True
|
||
except Exception as e:
|
||
import logging
|
||
|
||
logging.getLogger(__name__).error(
|
||
f"Error updating recipe metadata: {e}", exc_info=True
|
||
)
|
||
return False
|
||
|
||
async def update_lora_entry(
|
||
self,
|
||
recipe_id: str,
|
||
lora_index: int,
|
||
*,
|
||
target_name: str,
|
||
target_lora: Optional[Dict[str, Any]] = None,
|
||
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
||
"""Update a specific LoRA entry within a recipe.
|
||
|
||
Returns the updated recipe data and the refreshed LoRA metadata.
|
||
"""
|
||
|
||
if target_name is None:
|
||
raise ValueError("target_name must be provided")
|
||
|
||
recipe_json_path = await self.get_recipe_json_path(recipe_id)
|
||
if not recipe_json_path or not os.path.exists(recipe_json_path):
|
||
raise RecipeNotFoundError("Recipe not found")
|
||
|
||
async with self._mutation_lock:
|
||
with open(recipe_json_path, "r", encoding="utf-8") as file_obj:
|
||
recipe_data = json.load(file_obj)
|
||
|
||
loras = recipe_data.get("loras", [])
|
||
if lora_index >= len(loras):
|
||
raise RecipeNotFoundError("LoRA index out of range in recipe")
|
||
|
||
lora_entry = loras[lora_index]
|
||
lora_entry["isDeleted"] = False
|
||
lora_entry["exclude"] = False
|
||
lora_entry["file_name"] = target_name
|
||
|
||
if target_lora is not None:
|
||
sha_value = target_lora.get("sha256") or target_lora.get("sha")
|
||
if sha_value:
|
||
lora_entry["hash"] = sha_value.lower()
|
||
|
||
civitai_info = target_lora.get("civitai") or {}
|
||
if civitai_info:
|
||
lora_entry["modelName"] = civitai_info.get("model", {}).get(
|
||
"name", ""
|
||
)
|
||
lora_entry["modelVersionName"] = civitai_info.get("name", "")
|
||
lora_entry["modelVersionId"] = civitai_info.get("id")
|
||
|
||
recipe_data["fingerprint"] = calculate_recipe_fingerprint(
|
||
recipe_data.get("loras", [])
|
||
)
|
||
recipe_data["modified"] = time.time()
|
||
|
||
with open(recipe_json_path, "w", encoding="utf-8") as file_obj:
|
||
json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False)
|
||
|
||
cache = await self.get_cached_data()
|
||
replaced = await cache.replace_recipe(recipe_id, recipe_data, resort=False)
|
||
if not replaced:
|
||
await cache.add_recipe(recipe_data, resort=False)
|
||
self._schedule_resort()
|
||
|
||
# Update FTS index
|
||
self._update_fts_index_for_recipe(recipe_data, "update")
|
||
|
||
# Update persistent SQLite cache
|
||
if self._persistent_cache:
|
||
self._persistent_cache.update_recipe(recipe_data, recipe_json_path)
|
||
self._json_path_map[recipe_id] = recipe_json_path
|
||
|
||
updated_lora = dict(lora_entry)
|
||
if target_lora is not None:
|
||
preview_url = target_lora.get("preview_url")
|
||
if preview_url:
|
||
updated_lora["preview_url"] = config.get_preview_static_url(preview_url)
|
||
if target_lora.get("file_path"):
|
||
updated_lora["localPath"] = target_lora["file_path"]
|
||
|
||
updated_lora = self._enrich_lora_entry(updated_lora)
|
||
return recipe_data, updated_lora
|
||
|
||
async def get_recipes_for_lora(self, lora_hash: str) -> List[Dict[str, Any]]:
|
||
"""Return recipes that reference a given LoRA hash."""
|
||
|
||
if not lora_hash:
|
||
return []
|
||
|
||
normalized_hash = lora_hash.lower()
|
||
cache = await self.get_cached_data()
|
||
matching_recipes: List[Dict[str, Any]] = []
|
||
|
||
for recipe in cache.raw_data:
|
||
loras = recipe.get("loras", [])
|
||
if any(
|
||
(entry.get("hash") or "").lower() == normalized_hash for entry in loras
|
||
):
|
||
recipe_copy = {**recipe}
|
||
recipe_copy["loras"] = [
|
||
self._enrich_lora_entry(dict(entry)) for entry in loras
|
||
]
|
||
recipe_copy["file_url"] = self._format_file_url(recipe.get("file_path"))
|
||
matching_recipes.append(recipe_copy)
|
||
|
||
return matching_recipes
|
||
|
||
async def get_recipes_for_checkpoint(
|
||
self, checkpoint_hash: str
|
||
) -> List[Dict[str, Any]]:
|
||
"""Return recipes that reference a given checkpoint hash."""
|
||
|
||
if not checkpoint_hash:
|
||
return []
|
||
|
||
normalized_hash = checkpoint_hash.lower()
|
||
cache = await self.get_cached_data()
|
||
matching_recipes: List[Dict[str, Any]] = []
|
||
|
||
for recipe in cache.raw_data:
|
||
checkpoint = self._normalize_checkpoint_entry(recipe.get("checkpoint"))
|
||
if not checkpoint:
|
||
continue
|
||
|
||
enriched_checkpoint = self._enrich_checkpoint_entry(dict(checkpoint))
|
||
if (enriched_checkpoint.get("hash") or "").lower() != normalized_hash:
|
||
continue
|
||
|
||
recipe_copy = {**recipe}
|
||
recipe_copy["checkpoint"] = enriched_checkpoint
|
||
recipe_copy["loras"] = [
|
||
self._enrich_lora_entry(dict(entry))
|
||
for entry in recipe.get("loras", [])
|
||
]
|
||
recipe_copy["file_url"] = self._format_file_url(recipe.get("file_path"))
|
||
matching_recipes.append(recipe_copy)
|
||
|
||
return matching_recipes
|
||
|
||
async def get_recipe_syntax_tokens(self, recipe_id: str) -> List[str]:
|
||
"""Build LoRA syntax tokens for a recipe."""
|
||
|
||
cache = await self.get_cached_data()
|
||
recipe = await cache.get_recipe(recipe_id)
|
||
if recipe is None:
|
||
raise RecipeNotFoundError("Recipe not found")
|
||
|
||
loras = recipe.get("loras", [])
|
||
if not loras:
|
||
return []
|
||
|
||
lora_cache = None
|
||
if self._lora_scanner is not None:
|
||
lora_cache = await self._lora_scanner.get_cached_data()
|
||
|
||
syntax_parts: List[str] = []
|
||
for lora in loras:
|
||
if lora.get("isDeleted", False):
|
||
continue
|
||
|
||
file_name = None
|
||
hash_value = (lora.get("hash") or "").lower()
|
||
if (
|
||
hash_value
|
||
and self._lora_scanner is not None
|
||
and hasattr(self._lora_scanner, "_hash_index")
|
||
):
|
||
file_path = self._lora_scanner._hash_index.get_path(hash_value)
|
||
if file_path:
|
||
file_name = os.path.splitext(os.path.basename(file_path))[0]
|
||
|
||
if not file_name and lora.get("modelVersionId") and lora_cache is not None:
|
||
for cached_lora in getattr(lora_cache, "raw_data", []):
|
||
civitai_info = cached_lora.get("civitai")
|
||
if civitai_info and civitai_info.get("id") == lora.get(
|
||
"modelVersionId"
|
||
):
|
||
cached_path = cached_lora.get("path") or cached_lora.get(
|
||
"file_path"
|
||
)
|
||
if cached_path:
|
||
file_name = os.path.splitext(os.path.basename(cached_path))[
|
||
0
|
||
]
|
||
break
|
||
|
||
if not file_name:
|
||
file_name = lora.get("file_name", "unknown-lora")
|
||
|
||
strength = lora.get("strength", 1.0)
|
||
syntax_parts.append(f"<lora:{file_name}:{strength}>")
|
||
|
||
return syntax_parts
|
||
|
||
async def update_lora_filename_by_hash(
|
||
self, hash_value: str, new_file_name: str
|
||
) -> Tuple[int, int]:
|
||
"""Update file_name in all recipes that contain a LoRA with the specified hash.
|
||
|
||
Args:
|
||
hash_value: The SHA256 hash value of the LoRA
|
||
new_file_name: The new file_name to set
|
||
|
||
Returns:
|
||
Tuple[int, int]: (number of recipes updated in files, number of recipes updated in cache)
|
||
"""
|
||
if not hash_value or not new_file_name:
|
||
return 0, 0
|
||
|
||
# Always use lowercase hash for consistency
|
||
hash_value = hash_value.lower()
|
||
|
||
# Get cache
|
||
cache = await self.get_cached_data()
|
||
if not cache or not cache.raw_data:
|
||
return 0, 0
|
||
|
||
file_updated_count = 0
|
||
cache_updated_count = 0
|
||
|
||
# Find recipes that need updating from the cache
|
||
recipes_to_update = []
|
||
for recipe in cache.raw_data:
|
||
loras = recipe.get("loras", [])
|
||
if not isinstance(loras, list):
|
||
continue
|
||
|
||
has_match = False
|
||
for lora in loras:
|
||
if not isinstance(lora, dict):
|
||
continue
|
||
if (lora.get("hash") or "").lower() == hash_value:
|
||
if lora.get("file_name") != new_file_name:
|
||
lora["file_name"] = new_file_name
|
||
has_match = True
|
||
|
||
if has_match:
|
||
recipes_to_update.append(recipe)
|
||
cache_updated_count += 1
|
||
|
||
if not recipes_to_update:
|
||
return 0, 0
|
||
|
||
# Persist changes to disk and SQLite cache
|
||
async with self._mutation_lock:
|
||
for recipe in recipes_to_update:
|
||
recipe_id = str(recipe.get("id", ""))
|
||
if not recipe_id:
|
||
continue
|
||
|
||
recipe_path = os.path.join(self.recipes_dir, f"{recipe_id}.recipe.json")
|
||
try:
|
||
self._write_recipe_file(recipe_path, recipe)
|
||
file_updated_count += 1
|
||
logger.info(
|
||
f"Updated file_name in recipe {recipe_path}: -> {new_file_name}"
|
||
)
|
||
|
||
# Update persistent SQLite cache
|
||
if self._persistent_cache:
|
||
self._persistent_cache.update_recipe(recipe, recipe_path)
|
||
self._json_path_map[recipe_id] = recipe_path
|
||
except Exception as e:
|
||
logger.error(f"Error updating recipe file {recipe_path}: {e}")
|
||
|
||
# We don't necessarily need to resort because LoRA file_name isn't a sort key,
|
||
# but we might want to schedule a resort if we're paranoid or if searching relies on sorted state.
|
||
# Given it's a rename of a dependency, search results might change if searching by LoRA name.
|
||
self._schedule_resort()
|
||
|
||
return file_updated_count, cache_updated_count
|
||
|
||
async def find_recipes_by_fingerprint(self, fingerprint: str) -> list:
|
||
"""Find recipes with a matching fingerprint
|
||
|
||
Args:
|
||
fingerprint: The recipe fingerprint to search for
|
||
|
||
Returns:
|
||
List of recipe details that match the fingerprint
|
||
"""
|
||
if not fingerprint:
|
||
return []
|
||
|
||
# Get all recipes from cache
|
||
cache = await self.get_cached_data()
|
||
|
||
# Find recipes with matching fingerprint
|
||
matching_recipes = []
|
||
for recipe in cache.raw_data:
|
||
if recipe.get("fingerprint") == fingerprint:
|
||
recipe_details = {
|
||
"id": recipe.get("id"),
|
||
"title": recipe.get("title"),
|
||
"file_url": self._format_file_url(recipe.get("file_path")),
|
||
"modified": recipe.get("modified"),
|
||
"created_date": recipe.get("created_date"),
|
||
"lora_count": len(recipe.get("loras", [])),
|
||
}
|
||
matching_recipes.append(recipe_details)
|
||
|
||
return matching_recipes
|
||
|
||
async def find_all_duplicate_recipes(self) -> dict:
|
||
"""Find all recipe duplicates based on fingerprints
|
||
|
||
Returns:
|
||
Dictionary where keys are fingerprints and values are lists of recipe IDs
|
||
"""
|
||
# Get all recipes from cache
|
||
cache = await self.get_cached_data()
|
||
|
||
# Group recipes by fingerprint
|
||
fingerprint_groups = {}
|
||
for recipe in cache.raw_data:
|
||
fingerprint = recipe.get("fingerprint")
|
||
if not fingerprint:
|
||
continue
|
||
|
||
if fingerprint not in fingerprint_groups:
|
||
fingerprint_groups[fingerprint] = []
|
||
|
||
fingerprint_groups[fingerprint].append(recipe.get("id"))
|
||
|
||
# Filter to only include groups with more than one recipe
|
||
duplicate_groups = {k: v for k, v in fingerprint_groups.items() if len(v) > 1}
|
||
|
||
return duplicate_groups
|
||
|
||
async def find_duplicate_recipes_by_source(self) -> dict:
|
||
"""Find all recipe duplicates based on source_path (Civitai image URLs)
|
||
|
||
Returns:
|
||
Dictionary where keys are source URLs and values are lists of recipe IDs
|
||
"""
|
||
cache = await self.get_cached_data()
|
||
|
||
url_groups = {}
|
||
for recipe in cache.raw_data:
|
||
source_url = recipe.get("source_path", "").strip()
|
||
if not source_url:
|
||
continue
|
||
|
||
if source_url not in url_groups:
|
||
url_groups[source_url] = []
|
||
|
||
url_groups[source_url].append(recipe.get("id"))
|
||
|
||
duplicate_groups = {k: v for k, v in url_groups.items() if len(v) > 1}
|
||
|
||
return duplicate_groups
|