mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
672 lines
30 KiB
Python
672 lines
30 KiB
Python
import os
|
|
import logging
|
|
import asyncio
|
|
import json
|
|
import re
|
|
from typing import List, Dict, Optional, Any
|
|
from datetime import datetime
|
|
from ..utils.exif_utils import ExifUtils
|
|
from ..config import config
|
|
from .recipe_cache import RecipeCache
|
|
from .lora_scanner import LoraScanner
|
|
from .civitai_client import CivitaiClient
|
|
import sys
|
|
|
|
print("Recipe Scanner module loaded", file=sys.stderr)
|
|
|
|
def setup_logger():
|
|
"""Configure logger for recipe scanner"""
|
|
# First, print directly to stderr
|
|
print("Setting up recipe scanner logger", file=sys.stderr)
|
|
|
|
# Create a stderr handler
|
|
handler = logging.StreamHandler(sys.stderr)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
handler.setFormatter(formatter)
|
|
|
|
# Configure recipe logger
|
|
recipe_logger = logging.getLogger(__name__)
|
|
recipe_logger.setLevel(logging.INFO)
|
|
|
|
# Remove existing handlers if any
|
|
for h in recipe_logger.handlers:
|
|
recipe_logger.removeHandler(h)
|
|
|
|
recipe_logger.addHandler(handler)
|
|
recipe_logger.propagate = False
|
|
|
|
# Also ensure the root logger has a handler
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(logging.INFO)
|
|
|
|
# Check if the root logger already has handlers
|
|
if not root_logger.handlers:
|
|
root_logger.addHandler(handler)
|
|
|
|
print(f"Logger setup complete: {__name__}", file=sys.stderr)
|
|
return recipe_logger
|
|
|
|
# Use our configured logger
|
|
logger = setup_logger()
|
|
|
|
class RecipeScanner:
|
|
"""Service for scanning and managing recipe images"""
|
|
|
|
_instance = None
|
|
_lock = asyncio.Lock()
|
|
|
|
def __new__(cls, lora_scanner: Optional[LoraScanner] = None):
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
cls._instance._lora_scanner = lora_scanner
|
|
cls._instance._civitai_client = CivitaiClient()
|
|
return cls._instance
|
|
|
|
def __init__(self, lora_scanner: Optional[LoraScanner] = None):
|
|
# Ensure initialization only happens once
|
|
if not hasattr(self, '_initialized'):
|
|
self._cache: Optional[RecipeCache] = None
|
|
self._initialization_lock = asyncio.Lock()
|
|
self._initialization_task: Optional[asyncio.Task] = None
|
|
if lora_scanner:
|
|
self._lora_scanner = lora_scanner
|
|
self._initialized = True
|
|
|
|
@property
|
|
def recipes_dir(self) -> str:
|
|
"""Get path to recipes directory"""
|
|
if not config.loras_roots:
|
|
return ""
|
|
|
|
# config.loras_roots already sorted case-insensitively, use the first one
|
|
recipes_dir = os.path.join(config.loras_roots[0], "recipes")
|
|
os.makedirs(recipes_dir, exist_ok=True)
|
|
logger.info(f"Using recipes directory: {recipes_dir}")
|
|
|
|
return recipes_dir
|
|
|
|
async def get_cached_data(self, force_refresh: bool = False) -> RecipeCache:
|
|
"""Get cached recipe data, refresh if needed"""
|
|
async with self._initialization_lock:
|
|
|
|
# If cache is unitialized but needs to respond to request, return empty cache
|
|
if self._cache is None and not force_refresh:
|
|
return RecipeCache(
|
|
raw_data=[],
|
|
sorted_by_name=[],
|
|
sorted_by_date=[]
|
|
)
|
|
|
|
# If initializing, wait for completion
|
|
if self._initialization_task and not self._initialization_task.done():
|
|
try:
|
|
await self._initialization_task
|
|
except Exception as e:
|
|
logger.error(f"Recipe cache initialization failed: {e}")
|
|
self._initialization_task = None
|
|
|
|
if (self._cache is None or force_refresh):
|
|
|
|
# Create new initialization task
|
|
if not self._initialization_task or self._initialization_task.done():
|
|
# First ensure the lora scanner is initialized
|
|
if self._lora_scanner:
|
|
await self._lora_scanner.get_cached_data()
|
|
|
|
self._initialization_task = asyncio.create_task(self._initialize_cache())
|
|
|
|
try:
|
|
await self._initialization_task
|
|
except Exception as e:
|
|
logger.error(f"Recipe cache initialization failed: {e}")
|
|
# If cache already exists, continue using old cache
|
|
if self._cache is None:
|
|
raise # If no cache, raise exception
|
|
|
|
return self._cache
|
|
|
|
async def _initialize_cache(self) -> None:
|
|
"""Initialize or refresh the cache"""
|
|
try:
|
|
# Ensure lora scanner is fully initialized first
|
|
if self._lora_scanner:
|
|
logger.info("Recipe Manager: Waiting for lora scanner initialization to complete")
|
|
|
|
# Force a fresh initialization of the lora scanner to ensure it's complete
|
|
lora_cache = await self._lora_scanner.get_cached_data(force_refresh=True)
|
|
|
|
# Add a delay to ensure any background tasks complete
|
|
await asyncio.sleep(2)
|
|
|
|
# Get the cache again to ensure we have the latest data
|
|
lora_cache = await self._lora_scanner.get_cached_data()
|
|
logger.info(f"Recipe Manager: Lora scanner initialized with {len(lora_cache.raw_data)} loras")
|
|
|
|
# Verify hash index is built
|
|
if hasattr(self._lora_scanner, '_hash_index'):
|
|
hash_index_size = len(self._lora_scanner._hash_index._hash_to_path) if hasattr(self._lora_scanner._hash_index, '_hash_to_path') else 0
|
|
logger.info(f"Recipe Manager: Lora hash index contains {hash_index_size} entries")
|
|
|
|
# If hash index is empty but we have loras, consider this an error condition
|
|
if hash_index_size == 0 and len(lora_cache.raw_data) > 0:
|
|
logger.error("Recipe Manager: Lora hash index is empty despite having loras in cache")
|
|
await self._lora_scanner.diagnose_hash_index()
|
|
|
|
# Wait another moment for hash index to potentially initialize
|
|
await asyncio.sleep(1)
|
|
|
|
# Try to check again
|
|
hash_index_size = len(self._lora_scanner._hash_index._hash_to_path) if hasattr(self._lora_scanner._hash_index, '_hash_to_path') else 0
|
|
logger.info(f"Recipe Manager: Lora hash index now contains {hash_index_size} entries")
|
|
else:
|
|
logger.warning("Recipe Manager: No lora hash index available")
|
|
else:
|
|
logger.warning("Recipe Manager: No lora scanner available")
|
|
|
|
# Scan for recipe data
|
|
raw_data = await self.scan_all_recipes()
|
|
|
|
# Update cache
|
|
self._cache = RecipeCache(
|
|
raw_data=raw_data,
|
|
sorted_by_name=[],
|
|
sorted_by_date=[]
|
|
)
|
|
|
|
# Resort cache
|
|
await self._cache.resort()
|
|
|
|
self._initialization_task = None
|
|
logger.info("Recipe Manager: Cache initialization completed")
|
|
except Exception as e:
|
|
logger.error(f"Recipe Manager: Error initializing cache: {e}", exc_info=True)
|
|
self._cache = RecipeCache(
|
|
raw_data=[],
|
|
sorted_by_name=[],
|
|
sorted_by_date=[]
|
|
)
|
|
|
|
async def scan_all_recipes(self) -> List[Dict]:
|
|
"""Scan all recipe images and return metadata"""
|
|
recipes = []
|
|
recipes_dir = self.recipes_dir
|
|
|
|
if not recipes_dir or not os.path.exists(recipes_dir):
|
|
logger.warning(f"Recipes directory not found: {recipes_dir}")
|
|
return recipes
|
|
|
|
# Get all jpg/jpeg files in the recipes directory
|
|
image_files = []
|
|
logger.info(f"Scanning for recipe images in {recipes_dir}")
|
|
for root, _, files in os.walk(recipes_dir):
|
|
image_count = sum(1 for f in files if f.lower().endswith(('.jpg', '.jpeg')))
|
|
if image_count > 0:
|
|
logger.info(f"Found {image_count} potential recipe images in {root}")
|
|
for file in files:
|
|
if file.lower().endswith(('.jpg', '.jpeg')):
|
|
image_files.append(os.path.join(root, file))
|
|
|
|
# Process each image
|
|
for image_path in image_files:
|
|
recipe_data = await self._process_recipe_image(image_path)
|
|
if recipe_data:
|
|
recipes.append(recipe_data)
|
|
logger.info(f"Processed recipe: {recipe_data.get('title')}")
|
|
|
|
logger.info(f"Successfully processed {len(recipes)} recipes")
|
|
|
|
return recipes
|
|
|
|
async def _process_recipe_image(self, image_path: str) -> Optional[Dict]:
|
|
"""Process a single recipe image and return metadata"""
|
|
try:
|
|
print(f"Processing recipe image: {image_path}", file=sys.stderr)
|
|
logger.info(f"Processing recipe image: {image_path}")
|
|
|
|
# Extract EXIF UserComment
|
|
user_comment = ExifUtils.extract_user_comment(image_path)
|
|
if not user_comment:
|
|
print(f"No EXIF UserComment found in {image_path}", file=sys.stderr)
|
|
logger.warning(f"No EXIF UserComment found in {image_path}")
|
|
return None
|
|
else:
|
|
print(f"Found UserComment: {user_comment[:50]}...", file=sys.stderr)
|
|
|
|
# Parse generation parameters from UserComment
|
|
gen_params = ExifUtils.parse_recipe_metadata(user_comment)
|
|
if not gen_params:
|
|
print(f"Failed to parse recipe metadata from {image_path}", file=sys.stderr)
|
|
logger.warning(f"Failed to parse recipe metadata from {image_path}")
|
|
return None
|
|
|
|
# Get file info
|
|
stat = os.stat(image_path)
|
|
file_name = os.path.basename(image_path)
|
|
title = os.path.splitext(file_name)[0]
|
|
|
|
# Check for existing recipe metadata
|
|
recipe_data = self._extract_recipe_metadata(user_comment)
|
|
if not recipe_data:
|
|
# Create new recipe data
|
|
recipe_data = {
|
|
'id': file_name,
|
|
'file_path': image_path,
|
|
'title': title,
|
|
'modified': stat.st_mtime,
|
|
'created_date': stat.st_ctime,
|
|
'file_size': stat.st_size,
|
|
'loras': [],
|
|
'gen_params': {}
|
|
}
|
|
|
|
# Copy loras from gen_params to recipe_data with proper structure
|
|
for lora in gen_params.get('loras', []):
|
|
recipe_lora = {
|
|
'file_name': '',
|
|
'hash': lora.get('hash', '').lower() if lora.get('hash') else '',
|
|
'strength': lora.get('weight', 1.0),
|
|
'modelVersionId': lora.get('modelVersionId', ''),
|
|
'modelName': lora.get('modelName', ''),
|
|
'modelVersionName': lora.get('modelVersionName', '')
|
|
}
|
|
recipe_data['loras'].append(recipe_lora)
|
|
|
|
# Add generation parameters to recipe_data.gen_params instead of top level
|
|
recipe_data['gen_params'] = {
|
|
'prompt': gen_params.get('prompt', ''),
|
|
'negative_prompt': gen_params.get('negative_prompt', ''),
|
|
'checkpoint': gen_params.get('checkpoint', None),
|
|
'steps': gen_params.get('steps', ''),
|
|
'sampler': gen_params.get('sampler', ''),
|
|
'cfg_scale': gen_params.get('cfg_scale', ''),
|
|
'seed': gen_params.get('seed', ''),
|
|
'size': gen_params.get('size', ''),
|
|
'clip_skip': gen_params.get('clip_skip', '')
|
|
}
|
|
|
|
# Update recipe metadata with missing information
|
|
metadata_updated = await self._update_recipe_metadata(recipe_data, user_comment)
|
|
recipe_data['_metadata_updated'] = metadata_updated
|
|
|
|
# If metadata was updated, save back to image
|
|
if metadata_updated:
|
|
print(f"Updating metadata for {image_path}", file=sys.stderr)
|
|
logger.info(f"Updating metadata for {image_path}")
|
|
self._save_updated_metadata(image_path, user_comment, recipe_data)
|
|
|
|
return recipe_data
|
|
except Exception as e:
|
|
print(f"Error processing recipe image {image_path}: {e}", file=sys.stderr)
|
|
logger.error(f"Error processing recipe image {image_path}: {e}")
|
|
import traceback
|
|
traceback.print_exc(file=sys.stderr)
|
|
return None
|
|
|
|
def _create_basic_recipe_data(self, image_path: str) -> Dict:
|
|
"""Create basic recipe data from file information"""
|
|
file_name = os.path.basename(image_path)
|
|
title = os.path.splitext(file_name)[0]
|
|
|
|
return {
|
|
'file_path': image_path.replace(os.sep, '/'),
|
|
'title': title,
|
|
'file_name': file_name,
|
|
'modified': os.path.getmtime(image_path),
|
|
'created_date': os.path.getctime(image_path),
|
|
'loras': []
|
|
}
|
|
|
|
def _extract_created_date(self, user_comment: str) -> Optional[float]:
|
|
"""Extract creation date from UserComment if present"""
|
|
try:
|
|
# Look for Created Date pattern
|
|
created_date_match = re.search(r'Created Date: ([^,}]+)', user_comment)
|
|
if created_date_match:
|
|
date_str = created_date_match.group(1).strip()
|
|
# Parse ISO format date
|
|
dt = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
|
|
return dt.timestamp()
|
|
except Exception as e:
|
|
logger.error(f"Error extracting creation date: {e}")
|
|
|
|
return None
|
|
|
|
async def _update_lora_information(self, recipe_data: Dict) -> bool:
|
|
"""Update LoRA information with hash and file_name
|
|
|
|
Returns:
|
|
bool: True if metadata was updated
|
|
"""
|
|
if not recipe_data.get('loras'):
|
|
return False
|
|
|
|
metadata_updated = False
|
|
|
|
for lora in recipe_data['loras']:
|
|
logger.info(f"Processing LoRA: {lora.get('modelName', 'Unknown')}, ID: {lora.get('modelVersionId', 'No ID')}")
|
|
|
|
# Skip if already has complete information
|
|
if 'hash' in lora and 'file_name' in lora and lora['file_name']:
|
|
logger.info(f"LoRA already has complete information")
|
|
continue
|
|
|
|
# If has modelVersionId but no hash, look in lora cache first, then fetch from Civitai
|
|
if 'modelVersionId' in lora and not lora.get('hash'):
|
|
model_version_id = lora['modelVersionId']
|
|
logger.info(f"Looking up hash for modelVersionId: {model_version_id}")
|
|
|
|
# Try to find in lora cache first
|
|
hash_from_cache = await self._find_hash_in_lora_cache(model_version_id)
|
|
if hash_from_cache:
|
|
logger.info(f"Found hash in lora cache: {hash_from_cache}")
|
|
lora['hash'] = hash_from_cache
|
|
metadata_updated = True
|
|
else:
|
|
# If not in cache, fetch from Civitai
|
|
logger.info(f"Fetching hash from Civitai for {model_version_id}")
|
|
hash_from_civitai = await self._get_hash_from_civitai(model_version_id)
|
|
if hash_from_civitai:
|
|
logger.info(f"Got hash from Civitai: {hash_from_civitai}")
|
|
lora['hash'] = hash_from_civitai
|
|
metadata_updated = True
|
|
else:
|
|
logger.warning(f"Could not get hash for modelVersionId {model_version_id}")
|
|
|
|
# If has hash but no file_name, look up in lora library
|
|
if 'hash' in lora and (not lora.get('file_name') or not lora['file_name']):
|
|
hash_value = lora['hash']
|
|
logger.info(f"Looking up file_name for hash: {hash_value}")
|
|
|
|
if self._lora_scanner.has_lora_hash(hash_value):
|
|
lora_path = self._lora_scanner.get_lora_path_by_hash(hash_value)
|
|
if lora_path:
|
|
file_name = os.path.splitext(os.path.basename(lora_path))[0]
|
|
logger.info(f"Found lora in library: {file_name}")
|
|
lora['file_name'] = file_name
|
|
metadata_updated = True
|
|
else:
|
|
# Lora not in library
|
|
logger.info(f"LoRA with hash {hash_value} not found in library")
|
|
lora['file_name'] = ''
|
|
metadata_updated = True
|
|
|
|
return metadata_updated
|
|
|
|
async def _find_hash_in_lora_cache(self, model_version_id: str) -> Optional[str]:
|
|
"""Find hash in lora cache based on modelVersionId"""
|
|
try:
|
|
# Get all loras from cache
|
|
if not self._lora_scanner:
|
|
return None
|
|
|
|
cache = await self._lora_scanner.get_cached_data()
|
|
if not cache or not cache.raw_data:
|
|
return None
|
|
|
|
# Find lora with matching civitai.id
|
|
for lora in cache.raw_data:
|
|
civitai_data = lora.get('civitai', {})
|
|
if civitai_data and str(civitai_data.get('id', '')) == str(model_version_id):
|
|
return lora.get('sha256')
|
|
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error finding hash in lora cache: {e}")
|
|
return None
|
|
|
|
async def _get_hash_from_civitai(self, model_version_id: str) -> Optional[str]:
|
|
"""Get hash from Civitai API"""
|
|
try:
|
|
if not self._civitai_client:
|
|
return None
|
|
|
|
logger.info(f"Fetching model version info from Civitai for ID: {model_version_id}")
|
|
version_info = await self._civitai_client.get_model_version_info(model_version_id)
|
|
|
|
if not version_info or not version_info.get('files'):
|
|
logger.warning(f"No files found in version info for ID: {model_version_id}")
|
|
return None
|
|
|
|
# Get hash from the first file
|
|
for file_info in version_info.get('files', []):
|
|
if file_info.get('hashes', {}).get('SHA256'):
|
|
return file_info['hashes']['SHA256']
|
|
|
|
logger.warning(f"No SHA256 hash found in version info for ID: {model_version_id}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting hash from Civitai: {e}")
|
|
return None
|
|
|
|
def _save_updated_metadata(self, image_path: str, original_comment: str, recipe_data: Dict) -> None:
|
|
"""Save updated metadata back to image file"""
|
|
try:
|
|
# Check if we already have a recipe metadata section
|
|
recipe_metadata_exists = "recipe metadata:" in original_comment.lower()
|
|
|
|
# Prepare recipe metadata
|
|
recipe_metadata = {
|
|
'id': recipe_data.get('id', ''),
|
|
'file_path': recipe_data.get('file_path', ''),
|
|
'title': recipe_data.get('title', ''),
|
|
'modified': recipe_data.get('modified', 0),
|
|
'created_date': recipe_data.get('created_date', 0),
|
|
'base_model': recipe_data.get('base_model', ''),
|
|
'loras': [],
|
|
'gen_params': recipe_data.get('gen_params', {})
|
|
}
|
|
|
|
# Add lora data with only necessary fields (removing weight, adding modelVersionName)
|
|
for lora in recipe_data.get('loras', []):
|
|
lora_entry = {
|
|
'file_name': lora.get('file_name', ''),
|
|
'hash': lora.get('hash', '').lower() if lora.get('hash') else '',
|
|
'strength': lora.get('strength', 1.0),
|
|
'modelVersionId': lora.get('modelVersionId', ''),
|
|
'modelName': lora.get('modelName', ''),
|
|
'modelVersionName': lora.get('modelVersionName', '')
|
|
}
|
|
recipe_metadata['loras'].append(lora_entry)
|
|
|
|
# Convert to JSON
|
|
recipe_metadata_json = json.dumps(recipe_metadata)
|
|
|
|
# Create or update the recipe metadata section
|
|
if recipe_metadata_exists:
|
|
# Replace existing recipe metadata
|
|
updated_comment = re.sub(
|
|
r'recipe metadata: \{.*\}',
|
|
f'recipe metadata: {recipe_metadata_json}',
|
|
original_comment,
|
|
flags=re.IGNORECASE | re.DOTALL
|
|
)
|
|
else:
|
|
# Append recipe metadata to the end
|
|
updated_comment = f"{original_comment}, recipe metadata: {recipe_metadata_json}"
|
|
|
|
# Save back to image
|
|
logger.info(f"Saving updated metadata to {image_path}")
|
|
ExifUtils.update_user_comment(image_path, updated_comment)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving updated metadata: {e}", exc_info=True)
|
|
|
|
async def get_paginated_data(self, page: int, page_size: int, sort_by: str = 'date', search: str = None):
|
|
"""Get paginated and filtered recipe data
|
|
|
|
Args:
|
|
page: Current page number (1-based)
|
|
page_size: Number of items per page
|
|
sort_by: Sort method ('name' or 'date')
|
|
search: Search term
|
|
"""
|
|
cache = await self.get_cached_data()
|
|
|
|
# Get base dataset
|
|
filtered_data = cache.sorted_by_date if sort_by == 'date' else cache.sorted_by_name
|
|
|
|
# Apply search filter
|
|
if search:
|
|
filtered_data = [
|
|
item for item in filtered_data
|
|
if search.lower() in str(item.get('title', '')).lower() or
|
|
search.lower() in str(item.get('prompt', '')).lower()
|
|
]
|
|
|
|
# Calculate pagination
|
|
total_items = len(filtered_data)
|
|
start_idx = (page - 1) * page_size
|
|
end_idx = min(start_idx + page_size, total_items)
|
|
|
|
result = {
|
|
'items': filtered_data[start_idx:end_idx],
|
|
'total': total_items,
|
|
'page': page,
|
|
'page_size': page_size,
|
|
'total_pages': (total_items + page_size - 1) // page_size
|
|
}
|
|
|
|
return result
|
|
|
|
async def _update_recipe_metadata(self, recipe_data: Dict, original_comment: str) -> bool:
|
|
"""Update recipe metadata with missing information
|
|
|
|
Returns:
|
|
bool: True if metadata was updated
|
|
"""
|
|
metadata_updated = False
|
|
|
|
# Update lora information
|
|
for lora in recipe_data.get('loras', []):
|
|
# First check if modelVersionId exists and hash doesn't
|
|
if 'modelVersionId' in lora and not lora.get('hash'):
|
|
model_version_id = str(lora['modelVersionId'])
|
|
# Try to find hash in lora cache first
|
|
hash_from_cache = await self._find_hash_in_lora_cache(model_version_id)
|
|
if hash_from_cache:
|
|
logger.info(f"Found hash in cache for modelVersionId {model_version_id}")
|
|
lora['hash'] = hash_from_cache.lower() # Standardize to lowercase
|
|
metadata_updated = True
|
|
else:
|
|
# If not in cache, fetch from Civitai
|
|
logger.info(f"Fetching hash from Civitai for {model_version_id}")
|
|
hash_from_civitai = await self._get_hash_from_civitai(model_version_id)
|
|
if hash_from_civitai:
|
|
logger.info(f"Got hash from Civitai")
|
|
lora['hash'] = hash_from_civitai.lower() # Standardize to lowercase
|
|
metadata_updated = True
|
|
else:
|
|
logger.warning(f"Could not get hash for modelVersionId {model_version_id}")
|
|
|
|
# If modelVersionId exists but no modelVersionName, try to get it from Civitai
|
|
if 'modelVersionId' in lora and not lora.get('modelVersionName'):
|
|
model_version_id = str(lora['modelVersionId'])
|
|
model_version_name = await self._get_model_version_name(model_version_id)
|
|
if model_version_name:
|
|
lora['modelVersionName'] = model_version_name
|
|
metadata_updated = True
|
|
|
|
# If has hash, check if it's in library
|
|
if 'hash' in lora:
|
|
hash_value = lora['hash'].lower() # Ensure lowercase when comparing
|
|
in_library = self._lora_scanner.has_lora_hash(hash_value)
|
|
lora['inLibrary'] = in_library
|
|
|
|
# If hash is in library but no file_name, look up and set file_name
|
|
if in_library and (not lora.get('file_name') or not lora['file_name']):
|
|
lora_path = self._lora_scanner.get_lora_path_by_hash(hash_value)
|
|
if lora_path:
|
|
file_name = os.path.splitext(os.path.basename(lora_path))[0]
|
|
logger.info(f"Found lora in library: {file_name}")
|
|
lora['file_name'] = file_name
|
|
metadata_updated = True
|
|
elif not in_library:
|
|
# Lora not in library
|
|
logger.info(f"LoRA with hash {hash_value[:8]}... not found in library")
|
|
lora['file_name'] = ''
|
|
metadata_updated = True
|
|
|
|
# Determine the base_model for the recipe based on loras
|
|
if recipe_data.get('loras'):
|
|
base_model = await self._determine_base_model(recipe_data.get('loras', []))
|
|
if base_model and (not recipe_data.get('base_model') or recipe_data['base_model'] != base_model):
|
|
recipe_data['base_model'] = base_model
|
|
metadata_updated = True
|
|
|
|
return metadata_updated
|
|
|
|
async def _get_model_version_name(self, model_version_id: str) -> Optional[str]:
|
|
"""Get model version name from Civitai API"""
|
|
try:
|
|
if not self._civitai_client:
|
|
return None
|
|
|
|
logger.info(f"Fetching model version info from Civitai for ID: {model_version_id}")
|
|
version_info = await self._civitai_client.get_model_version_info(model_version_id)
|
|
|
|
if version_info and 'name' in version_info:
|
|
return version_info['name']
|
|
|
|
logger.warning(f"No version name found for modelVersionId {model_version_id}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting model version name from Civitai: {e}")
|
|
return None
|
|
|
|
async def _determine_base_model(self, loras: List[Dict]) -> Optional[str]:
|
|
"""Determine the most common base model among LoRAs"""
|
|
base_models = {}
|
|
|
|
# Count occurrences of each base model
|
|
for lora in loras:
|
|
if 'hash' in lora:
|
|
lora_path = self._lora_scanner.get_lora_path_by_hash(lora['hash'])
|
|
if lora_path:
|
|
base_model = await self._get_base_model_for_lora(lora_path)
|
|
if base_model:
|
|
base_models[base_model] = base_models.get(base_model, 0) + 1
|
|
|
|
# Return the most common base model
|
|
if base_models:
|
|
return max(base_models.items(), key=lambda x: x[1])[0]
|
|
return None
|
|
|
|
async def _get_base_model_for_lora(self, lora_path: str) -> Optional[str]:
|
|
"""Get base model for a LoRA from cache"""
|
|
try:
|
|
if not self._lora_scanner:
|
|
return None
|
|
|
|
cache = await self._lora_scanner.get_cached_data()
|
|
if not cache or not cache.raw_data:
|
|
return None
|
|
|
|
# Find matching lora in cache
|
|
for lora in cache.raw_data:
|
|
if lora.get('file_path') == lora_path:
|
|
return lora.get('base_model')
|
|
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting base model for lora: {e}")
|
|
return None
|
|
|
|
def _extract_recipe_metadata(self, user_comment: str) -> Optional[Dict]:
|
|
"""Extract recipe metadata section from UserComment if it exists"""
|
|
try:
|
|
# Look for recipe metadata section
|
|
recipe_match = re.search(r'recipe metadata: (\{.*\})', user_comment, re.IGNORECASE | re.DOTALL)
|
|
if not recipe_match:
|
|
return None
|
|
|
|
recipe_json = recipe_match.group(1)
|
|
recipe_data = json.loads(recipe_json)
|
|
|
|
# Ensure loras array exists
|
|
if 'loras' not in recipe_data:
|
|
recipe_data['loras'] = []
|
|
|
|
return recipe_data
|
|
except Exception as e:
|
|
logger.error(f"Error extracting recipe metadata: {e}")
|
|
return None |