mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-24 14:42:11 -03:00
Additional info: Now prioritizes using the Civitai Images API to fetch image and generation metadata. Even NSFW images can now be imported via URL.
1633 lines
72 KiB
Python
1633 lines
72 KiB
Python
import os
|
|
import time
|
|
import base64
|
|
import numpy as np
|
|
from PIL import Image
|
|
import torch
|
|
import io
|
|
import logging
|
|
from aiohttp import web
|
|
from typing import Dict
|
|
import tempfile
|
|
import json
|
|
import asyncio
|
|
import sys
|
|
from ..utils.exif_utils import ExifUtils
|
|
from ..recipes import RecipeParserFactory
|
|
from ..utils.constants import CARD_PREVIEW_WIDTH
|
|
|
|
from ..config import config
|
|
|
|
# Check if running in standalone mode
|
|
standalone_mode = 'nodes' not in sys.modules
|
|
|
|
from ..utils.utils import download_civitai_image
|
|
from ..services.service_registry import ServiceRegistry # Add ServiceRegistry import
|
|
|
|
# Only import MetadataRegistry in non-standalone mode
|
|
if not standalone_mode:
|
|
# Import metadata_collector functions and classes conditionally
|
|
from ..metadata_collector import get_metadata # Add MetadataCollector import
|
|
from ..metadata_collector.metadata_processor import MetadataProcessor # Add MetadataProcessor import
|
|
from ..metadata_collector.metadata_registry import MetadataRegistry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class RecipeRoutes:
|
|
"""API route handlers for Recipe management"""
|
|
|
|
def __init__(self):
|
|
# Initialize service references as None, will be set during async init
|
|
self.recipe_scanner = None
|
|
self.civitai_client = None
|
|
# Remove WorkflowParser instance
|
|
|
|
# Pre-warm the cache
|
|
self._init_cache_task = None
|
|
|
|
async def init_services(self):
|
|
"""Initialize services from ServiceRegistry"""
|
|
self.recipe_scanner = await ServiceRegistry.get_recipe_scanner()
|
|
self.civitai_client = await ServiceRegistry.get_civitai_client()
|
|
|
|
@classmethod
|
|
def setup_routes(cls, app: web.Application):
|
|
"""Register API routes"""
|
|
routes = cls()
|
|
app.router.add_get('/api/recipes', routes.get_recipes)
|
|
app.router.add_get('/api/recipe/{recipe_id}', routes.get_recipe_detail)
|
|
app.router.add_post('/api/recipes/analyze-image', routes.analyze_recipe_image)
|
|
app.router.add_post('/api/recipes/analyze-local-image', routes.analyze_local_image)
|
|
app.router.add_post('/api/recipes/save', routes.save_recipe)
|
|
app.router.add_delete('/api/recipe/{recipe_id}', routes.delete_recipe)
|
|
|
|
# Add new filter-related endpoints
|
|
app.router.add_get('/api/recipes/top-tags', routes.get_top_tags)
|
|
app.router.add_get('/api/recipes/base-models', routes.get_base_models)
|
|
|
|
# Add new sharing endpoints
|
|
app.router.add_get('/api/recipe/{recipe_id}/share', routes.share_recipe)
|
|
app.router.add_get('/api/recipe/{recipe_id}/share/download', routes.download_shared_recipe)
|
|
|
|
# Add new endpoint for getting recipe syntax
|
|
app.router.add_get('/api/recipe/{recipe_id}/syntax', routes.get_recipe_syntax)
|
|
|
|
# Add new endpoint for updating recipe metadata (name, tags and source_path)
|
|
app.router.add_put('/api/recipe/{recipe_id}/update', routes.update_recipe)
|
|
|
|
# Add new endpoint for reconnecting deleted LoRAs
|
|
app.router.add_post('/api/recipe/lora/reconnect', routes.reconnect_lora)
|
|
|
|
# Add new endpoint for finding duplicate recipes
|
|
app.router.add_get('/api/recipes/find-duplicates', routes.find_duplicates)
|
|
|
|
# Add new endpoint for bulk deletion of recipes
|
|
app.router.add_post('/api/recipes/bulk-delete', routes.bulk_delete)
|
|
|
|
# Start cache initialization
|
|
app.on_startup.append(routes._init_cache)
|
|
|
|
app.router.add_post('/api/recipes/save-from-widget', routes.save_recipe_from_widget)
|
|
|
|
# Add route to get recipes for a specific Lora
|
|
app.router.add_get('/api/recipes/for-lora', routes.get_recipes_for_lora)
|
|
|
|
# Add new endpoint for scanning and rebuilding the recipe cache
|
|
app.router.add_get('/api/recipes/scan', routes.scan_recipes)
|
|
|
|
async def _init_cache(self, app):
|
|
"""Initialize cache on startup"""
|
|
try:
|
|
# Initialize services first
|
|
await self.init_services()
|
|
|
|
# Now that services are initialized, get the lora scanner
|
|
lora_scanner = self.recipe_scanner._lora_scanner
|
|
|
|
# Get lora cache to ensure it's initialized
|
|
lora_cache = await lora_scanner.get_cached_data()
|
|
|
|
# Verify hash index is built
|
|
if hasattr(lora_scanner, '_hash_index'):
|
|
hash_index_size = len(lora_scanner._hash_index._hash_to_path) if hasattr(lora_scanner._hash_index, '_hash_to_path') else 0
|
|
|
|
# Now that lora scanner is initialized, initialize recipe cache
|
|
await self.recipe_scanner.get_cached_data(force_refresh=True)
|
|
except Exception as e:
|
|
logger.error(f"Error pre-warming recipe cache: {e}", exc_info=True)
|
|
|
|
async def get_recipes(self, request: web.Request) -> web.Response:
|
|
"""API endpoint for getting paginated recipes"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Get query parameters with defaults
|
|
page = int(request.query.get('page', '1'))
|
|
page_size = int(request.query.get('page_size', '20'))
|
|
sort_by = request.query.get('sort_by', 'date')
|
|
search = request.query.get('search', None)
|
|
|
|
# Get search options (renamed for better clarity)
|
|
search_title = request.query.get('search_title', 'true').lower() == 'true'
|
|
search_tags = request.query.get('search_tags', 'true').lower() == 'true'
|
|
search_lora_name = request.query.get('search_lora_name', 'true').lower() == 'true'
|
|
search_lora_model = request.query.get('search_lora_model', 'true').lower() == 'true'
|
|
|
|
# Get filter parameters
|
|
base_models = request.query.get('base_models', None)
|
|
tags = request.query.get('tags', None)
|
|
|
|
# New parameter: get LoRA hash filter
|
|
lora_hash = request.query.get('lora_hash', None)
|
|
|
|
# Parse filter parameters
|
|
filters = {}
|
|
if base_models:
|
|
filters['base_model'] = base_models.split(',')
|
|
if tags:
|
|
filters['tags'] = tags.split(',')
|
|
|
|
# Add search options to filters
|
|
search_options = {
|
|
'title': search_title,
|
|
'tags': search_tags,
|
|
'lora_name': search_lora_name,
|
|
'lora_model': search_lora_model
|
|
}
|
|
|
|
# Get paginated data with the new lora_hash parameter
|
|
result = await self.recipe_scanner.get_paginated_data(
|
|
page=page,
|
|
page_size=page_size,
|
|
sort_by=sort_by,
|
|
search=search,
|
|
filters=filters,
|
|
search_options=search_options,
|
|
lora_hash=lora_hash
|
|
)
|
|
|
|
# Format the response data with static URLs for file paths
|
|
for item in result['items']:
|
|
# Always ensure file_url is set
|
|
if 'file_path' in item:
|
|
item['file_url'] = self._format_recipe_file_url(item['file_path'])
|
|
else:
|
|
item['file_url'] = '/loras_static/images/no-preview.png'
|
|
|
|
# 确保 loras 数组存在
|
|
if 'loras' not in item:
|
|
item['loras'] = []
|
|
|
|
# 确保有 base_model 字段
|
|
if 'base_model' not in item:
|
|
item['base_model'] = ""
|
|
|
|
return web.json_response(result)
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving recipes: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def get_recipe_detail(self, request: web.Request) -> web.Response:
|
|
"""Get detailed information about a specific recipe"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
recipe_id = request.match_info['recipe_id']
|
|
|
|
# Use the new get_recipe_by_id method from recipe_scanner
|
|
recipe = await self.recipe_scanner.get_recipe_by_id(recipe_id)
|
|
|
|
if not recipe:
|
|
return web.json_response({"error": "Recipe not found"}, status=404)
|
|
|
|
return web.json_response(recipe)
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving recipe details: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
def _format_recipe_file_url(self, file_path: str) -> str:
|
|
"""Format file path for recipe image as a URL"""
|
|
try:
|
|
# Return the file URL directly for the first lora root's preview
|
|
recipes_dir = os.path.join(config.loras_roots[0], "recipes").replace(os.sep, '/')
|
|
if file_path.replace(os.sep, '/').startswith(recipes_dir):
|
|
relative_path = os.path.relpath(file_path, config.loras_roots[0]).replace(os.sep, '/')
|
|
return f"/loras_static/root1/preview/{relative_path}"
|
|
|
|
# If not in recipes dir, try to create a valid URL from the file path
|
|
file_name = os.path.basename(file_path)
|
|
return f"/loras_static/root1/preview/recipes/{file_name}"
|
|
except Exception as e:
|
|
logger.error(f"Error formatting recipe file URL: {e}", exc_info=True)
|
|
return '/loras_static/images/no-preview.png' # Return default image on error
|
|
|
|
def _format_recipe_data(self, recipe: Dict) -> Dict:
|
|
"""Format recipe data for API response"""
|
|
formatted = {**recipe} # Copy all fields
|
|
|
|
# Format file paths to URLs
|
|
if 'file_path' in formatted:
|
|
formatted['file_url'] = self._format_recipe_file_url(formatted['file_path'])
|
|
|
|
# Format dates for display
|
|
for date_field in ['created_date', 'modified']:
|
|
if date_field in formatted:
|
|
formatted[f"{date_field}_formatted"] = self._format_timestamp(formatted[date_field])
|
|
|
|
return formatted
|
|
|
|
def _format_timestamp(self, timestamp: float) -> str:
|
|
"""Format timestamp for display"""
|
|
from datetime import datetime
|
|
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
async def analyze_recipe_image(self, request: web.Request) -> web.Response:
|
|
"""Analyze an uploaded image or URL for recipe metadata"""
|
|
temp_path = None
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Check if request contains multipart data (image) or JSON data (url)
|
|
content_type = request.headers.get('Content-Type', '')
|
|
|
|
is_url_mode = False
|
|
metadata = None # Initialize metadata variable
|
|
|
|
if 'multipart/form-data' in content_type:
|
|
# Handle image upload
|
|
reader = await request.multipart()
|
|
field = await reader.next()
|
|
|
|
if field.name != 'image':
|
|
return web.json_response({
|
|
"error": "No image field found",
|
|
"loras": []
|
|
}, status=400)
|
|
|
|
# Create a temporary file to store the uploaded image
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
|
|
while True:
|
|
chunk = await field.read_chunk()
|
|
if not chunk:
|
|
break
|
|
temp_file.write(chunk)
|
|
temp_path = temp_file.name
|
|
|
|
elif 'application/json' in content_type:
|
|
# Handle URL input
|
|
data = await request.json()
|
|
url = data.get('url')
|
|
is_url_mode = True
|
|
|
|
if not url:
|
|
return web.json_response({
|
|
"error": "No URL provided",
|
|
"loras": []
|
|
}, status=400)
|
|
|
|
# Check if this is a Civitai image URL
|
|
import re
|
|
civitai_image_match = re.match(r'https://civitai\.com/images/(\d+)', url)
|
|
|
|
if civitai_image_match:
|
|
# Extract image ID and fetch image info using get_image_info
|
|
image_id = civitai_image_match.group(1)
|
|
image_info = await self.civitai_client.get_image_info(image_id)
|
|
|
|
if not image_info:
|
|
return web.json_response({
|
|
"error": "Failed to fetch image information from Civitai",
|
|
"loras": []
|
|
}, status=400)
|
|
|
|
# Get image URL from response
|
|
image_url = image_info.get('url')
|
|
if not image_url:
|
|
return web.json_response({
|
|
"error": "No image URL found in Civitai response",
|
|
"loras": []
|
|
}, status=400)
|
|
|
|
# Download image directly from URL
|
|
session = await self.civitai_client.session
|
|
# Create a temporary file to save the downloaded image
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
|
|
temp_path = temp_file.name
|
|
|
|
async with session.get(image_url) as response:
|
|
if response.status != 200:
|
|
return web.json_response({
|
|
"error": f"Failed to download image from URL: HTTP {response.status}",
|
|
"loras": []
|
|
}, status=400)
|
|
|
|
with open(temp_path, 'wb') as f:
|
|
f.write(await response.read())
|
|
|
|
# Use meta field from image_info as metadata
|
|
if 'meta' in image_info:
|
|
metadata = image_info['meta']
|
|
|
|
else:
|
|
# Not a Civitai image URL, use the original download method
|
|
temp_path = download_civitai_image(url)
|
|
|
|
if not temp_path:
|
|
return web.json_response({
|
|
"error": "Failed to download image from URL",
|
|
"loras": []
|
|
}, status=400)
|
|
|
|
# If metadata wasn't obtained from Civitai API, extract it from the image
|
|
if metadata is None:
|
|
# Extract metadata from the image using ExifUtils
|
|
metadata = ExifUtils.extract_image_metadata(temp_path)
|
|
|
|
# If no metadata found, return a more specific error
|
|
if not metadata:
|
|
result = {
|
|
"error": "No metadata found in this image",
|
|
"loras": [] # Return empty loras array to prevent client-side errors
|
|
}
|
|
|
|
# For URL mode, include the image data as base64
|
|
if is_url_mode and temp_path:
|
|
with open(temp_path, "rb") as image_file:
|
|
result["image_base64"] = base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
return web.json_response(result, status=200)
|
|
|
|
# Use the parser factory to get the appropriate parser
|
|
parser = RecipeParserFactory.create_parser(metadata)
|
|
|
|
if parser is None:
|
|
result = {
|
|
"error": "No parser found for this image",
|
|
"loras": [] # Return empty loras array to prevent client-side errors
|
|
}
|
|
|
|
# For URL mode, include the image data as base64
|
|
if is_url_mode and temp_path:
|
|
with open(temp_path, "rb") as image_file:
|
|
result["image_base64"] = base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
return web.json_response(result, status=200)
|
|
|
|
# Parse the metadata
|
|
result = await parser.parse_metadata(
|
|
metadata,
|
|
recipe_scanner=self.recipe_scanner,
|
|
civitai_client=self.civitai_client
|
|
)
|
|
|
|
# For URL mode, include the image data as base64
|
|
if is_url_mode and temp_path:
|
|
with open(temp_path, "rb") as image_file:
|
|
result["image_base64"] = base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
# Check for errors
|
|
if "error" in result and not result.get("loras"):
|
|
return web.json_response(result, status=200)
|
|
|
|
# Calculate fingerprint from parsed loras
|
|
from ..utils.utils import calculate_recipe_fingerprint
|
|
fingerprint = calculate_recipe_fingerprint(result.get("loras", []))
|
|
|
|
# Add fingerprint to result
|
|
result["fingerprint"] = fingerprint
|
|
|
|
# Find matching recipes with the same fingerprint
|
|
matching_recipes = []
|
|
if fingerprint:
|
|
matching_recipes = await self.recipe_scanner.find_recipes_by_fingerprint(fingerprint)
|
|
|
|
# Add matching recipes to result
|
|
result["matching_recipes"] = matching_recipes
|
|
|
|
return web.json_response(result)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing recipe image: {e}", exc_info=True)
|
|
return web.json_response({
|
|
"error": str(e),
|
|
"loras": [] # Return empty loras array to prevent client-side errors
|
|
}, status=500)
|
|
finally:
|
|
# Clean up the temporary file in the finally block
|
|
if temp_path and os.path.exists(temp_path):
|
|
try:
|
|
os.unlink(temp_path)
|
|
except Exception as e:
|
|
logger.error(f"Error deleting temporary file: {e}")
|
|
|
|
async def analyze_local_image(self, request: web.Request) -> web.Response:
|
|
"""Analyze a local image file for recipe metadata"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Get JSON data from request
|
|
data = await request.json()
|
|
file_path = data.get('path')
|
|
|
|
if not file_path:
|
|
return web.json_response({
|
|
'error': 'No file path provided',
|
|
'loras': []
|
|
}, status=400)
|
|
|
|
# Normalize file path for cross-platform compatibility
|
|
file_path = os.path.normpath(file_path.strip('"').strip("'"))
|
|
|
|
# Validate that the file exists
|
|
if not os.path.isfile(file_path):
|
|
return web.json_response({
|
|
'error': 'File not found',
|
|
'loras': []
|
|
}, status=404)
|
|
|
|
# Extract metadata from the image using ExifUtils
|
|
metadata = ExifUtils.extract_image_metadata(file_path)
|
|
|
|
# If no metadata found, return error
|
|
if not metadata:
|
|
# Get base64 image data
|
|
with open(file_path, "rb") as image_file:
|
|
image_base64 = base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
return web.json_response({
|
|
"error": "No metadata found in this image",
|
|
"loras": [], # Return empty loras array to prevent client-side errors
|
|
"image_base64": image_base64
|
|
}, status=200)
|
|
|
|
# Use the parser factory to get the appropriate parser
|
|
parser = RecipeParserFactory.create_parser(metadata)
|
|
|
|
if parser is None:
|
|
# Get base64 image data
|
|
with open(file_path, "rb") as image_file:
|
|
image_base64 = base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
return web.json_response({
|
|
"error": "No parser found for this image",
|
|
"loras": [], # Return empty loras array to prevent client-side errors
|
|
"image_base64": image_base64
|
|
}, status=200)
|
|
|
|
# Parse the metadata
|
|
result = await parser.parse_metadata(
|
|
metadata,
|
|
recipe_scanner=self.recipe_scanner,
|
|
civitai_client=self.civitai_client
|
|
)
|
|
|
|
# Add base64 image data to result
|
|
with open(file_path, "rb") as image_file:
|
|
result["image_base64"] = base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
# Check for errors
|
|
if "error" in result and not result.get("loras"):
|
|
return web.json_response(result, status=200)
|
|
|
|
# Calculate fingerprint from parsed loras
|
|
from ..utils.utils import calculate_recipe_fingerprint
|
|
fingerprint = calculate_recipe_fingerprint(result.get("loras", []))
|
|
|
|
# Add fingerprint to result
|
|
result["fingerprint"] = fingerprint
|
|
|
|
# Find matching recipes with the same fingerprint
|
|
matching_recipes = []
|
|
if fingerprint:
|
|
matching_recipes = await self.recipe_scanner.find_recipes_by_fingerprint(fingerprint)
|
|
|
|
# Add matching recipes to result
|
|
result["matching_recipes"] = matching_recipes
|
|
|
|
return web.json_response(result)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing local image: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'error': str(e),
|
|
'loras': [] # Return empty loras array to prevent client-side errors
|
|
}, status=500)
|
|
|
|
async def save_recipe(self, request: web.Request) -> web.Response:
|
|
"""Save a recipe to the recipes folder"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
reader = await request.multipart()
|
|
|
|
# Process form data
|
|
image = None
|
|
image_base64 = None
|
|
image_url = None
|
|
name = None
|
|
tags = []
|
|
metadata = None
|
|
|
|
while True:
|
|
field = await reader.next()
|
|
if field is None:
|
|
break
|
|
|
|
if field.name == 'image':
|
|
# Read image data
|
|
image_data = b''
|
|
while True:
|
|
chunk = await field.read_chunk()
|
|
if not chunk:
|
|
break
|
|
image_data += chunk
|
|
image = image_data
|
|
|
|
elif field.name == 'image_base64':
|
|
# Get base64 image data
|
|
image_base64 = await field.text()
|
|
|
|
elif field.name == 'image_url':
|
|
# Get image URL
|
|
image_url = await field.text()
|
|
|
|
elif field.name == 'name':
|
|
name = await field.text()
|
|
|
|
elif field.name == 'tags':
|
|
tags_text = await field.text()
|
|
try:
|
|
tags = json.loads(tags_text)
|
|
except:
|
|
tags = []
|
|
|
|
elif field.name == 'metadata':
|
|
metadata_text = await field.text()
|
|
try:
|
|
metadata = json.loads(metadata_text)
|
|
except:
|
|
metadata = {}
|
|
|
|
missing_fields = []
|
|
if not name:
|
|
missing_fields.append("name")
|
|
if not metadata:
|
|
missing_fields.append("metadata")
|
|
if missing_fields:
|
|
return web.json_response({"error": f"Missing required fields: {', '.join(missing_fields)}"}, status=400)
|
|
|
|
# Handle different image sources
|
|
if not image:
|
|
if image_base64:
|
|
# Convert base64 to binary
|
|
try:
|
|
# Remove potential data URL prefix
|
|
if ',' in image_base64:
|
|
image_base64 = image_base64.split(',', 1)[1]
|
|
image = base64.b64decode(image_base64)
|
|
except Exception as e:
|
|
return web.json_response({"error": f"Invalid base64 image data: {str(e)}"}, status=400)
|
|
elif image_url:
|
|
# Download image from URL
|
|
temp_path = download_civitai_image(image_url)
|
|
if not temp_path:
|
|
return web.json_response({"error": "Failed to download image from URL"}, status=400)
|
|
|
|
# Read the downloaded image
|
|
with open(temp_path, 'rb') as f:
|
|
image = f.read()
|
|
|
|
# Clean up temp file
|
|
try:
|
|
os.unlink(temp_path)
|
|
except:
|
|
pass
|
|
else:
|
|
return web.json_response({"error": "No image data provided"}, status=400)
|
|
|
|
# Create recipes directory if it doesn't exist
|
|
recipes_dir = self.recipe_scanner.recipes_dir
|
|
os.makedirs(recipes_dir, exist_ok=True)
|
|
|
|
# Generate UUID for the recipe
|
|
import uuid
|
|
recipe_id = str(uuid.uuid4())
|
|
|
|
# Optimize the image (resize and convert to WebP)
|
|
optimized_image, extension = ExifUtils.optimize_image(
|
|
image_data=image,
|
|
target_width=CARD_PREVIEW_WIDTH,
|
|
format='webp',
|
|
quality=85,
|
|
preserve_metadata=True
|
|
)
|
|
|
|
# Save the optimized image
|
|
image_filename = f"{recipe_id}{extension}"
|
|
image_path = os.path.join(recipes_dir, image_filename)
|
|
with open(image_path, 'wb') as f:
|
|
f.write(optimized_image)
|
|
|
|
# Create the recipe data structure
|
|
current_time = time.time()
|
|
|
|
# Format loras data according to the recipe.json format
|
|
loras_data = []
|
|
for lora in metadata.get("loras", []):
|
|
# Modified: Always include deleted LoRAs in the recipe metadata
|
|
# Even if they're marked to be excluded, we still keep their identifying information
|
|
# The exclude flag will only be used to determine if they should be included in recipe syntax
|
|
|
|
# Convert frontend lora format to recipe format
|
|
lora_entry = {
|
|
"file_name": lora.get("file_name", "") or os.path.splitext(os.path.basename(lora.get("localPath", "")))[0] if lora.get("localPath") else "",
|
|
"hash": lora.get("hash", "").lower() if lora.get("hash") else "",
|
|
"strength": float(lora.get("weight", 1.0)),
|
|
"modelVersionId": lora.get("id", ""),
|
|
"modelName": lora.get("name", ""),
|
|
"modelVersionName": lora.get("version", ""),
|
|
"isDeleted": lora.get("isDeleted", False), # Preserve deletion status in saved recipe
|
|
"exclude": lora.get("exclude", False) # Add exclude flag to the recipe
|
|
}
|
|
loras_data.append(lora_entry)
|
|
|
|
# Format gen_params according to the recipe.json format
|
|
gen_params = metadata.get("gen_params", {})
|
|
if not gen_params and "raw_metadata" in metadata:
|
|
# Extract from raw metadata if available
|
|
raw_metadata = metadata.get("raw_metadata", {})
|
|
gen_params = {
|
|
"prompt": raw_metadata.get("prompt", ""),
|
|
"negative_prompt": raw_metadata.get("negative_prompt", ""),
|
|
"checkpoint": raw_metadata.get("checkpoint", {}),
|
|
"steps": raw_metadata.get("steps", ""),
|
|
"sampler": raw_metadata.get("sampler", ""),
|
|
"cfg_scale": raw_metadata.get("cfg_scale", ""),
|
|
"seed": raw_metadata.get("seed", ""),
|
|
"size": raw_metadata.get("size", ""),
|
|
"clip_skip": raw_metadata.get("clip_skip", "")
|
|
}
|
|
|
|
# Calculate recipe fingerprint
|
|
from ..utils.utils import calculate_recipe_fingerprint
|
|
fingerprint = calculate_recipe_fingerprint(loras_data)
|
|
|
|
# Create the recipe data structure
|
|
recipe_data = {
|
|
"id": recipe_id,
|
|
"file_path": image_path,
|
|
"title": name,
|
|
"modified": current_time,
|
|
"created_date": current_time,
|
|
"base_model": metadata.get("base_model", ""),
|
|
"loras": loras_data,
|
|
"gen_params": gen_params,
|
|
"fingerprint": fingerprint
|
|
}
|
|
|
|
# Add tags if provided
|
|
if tags:
|
|
recipe_data["tags"] = tags
|
|
|
|
# Add source_path if provided in metadata
|
|
if metadata.get("source_path"):
|
|
recipe_data["source_path"] = metadata.get("source_path")
|
|
|
|
# Save the recipe JSON
|
|
json_filename = f"{recipe_id}.recipe.json"
|
|
json_path = os.path.join(recipes_dir, json_filename)
|
|
with open(json_path, 'w', encoding='utf-8') as f:
|
|
json.dump(recipe_data, f, indent=4, ensure_ascii=False)
|
|
|
|
# Add recipe metadata to the image
|
|
ExifUtils.append_recipe_metadata(image_path, recipe_data)
|
|
|
|
# Check for duplicates
|
|
matching_recipes = []
|
|
if fingerprint:
|
|
matching_recipes = await self.recipe_scanner.find_recipes_by_fingerprint(fingerprint)
|
|
# Remove current recipe from matches
|
|
if recipe_id in matching_recipes:
|
|
matching_recipes.remove(recipe_id)
|
|
|
|
# Simplified cache update approach
|
|
# Instead of trying to update the cache directly, just set it to None
|
|
# to force a refresh on the next get_cached_data call
|
|
if self.recipe_scanner._cache is not None:
|
|
# Add the recipe to the raw data if the cache exists
|
|
# This is a simple direct update without locks or timeouts
|
|
self.recipe_scanner._cache.raw_data.append(recipe_data)
|
|
# Schedule a background task to resort the cache
|
|
asyncio.create_task(self.recipe_scanner._cache.resort())
|
|
logger.info(f"Added recipe {recipe_id} to cache")
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'recipe_id': recipe_id,
|
|
'image_path': image_path,
|
|
'json_path': json_path,
|
|
'matching_recipes': matching_recipes
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving recipe: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def delete_recipe(self, request: web.Request) -> web.Response:
|
|
"""Delete a recipe by ID"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
recipe_id = request.match_info['recipe_id']
|
|
|
|
# Get recipes directory
|
|
recipes_dir = self.recipe_scanner.recipes_dir
|
|
if not recipes_dir or not os.path.exists(recipes_dir):
|
|
return web.json_response({"error": "Recipes directory not found"}, status=404)
|
|
|
|
# Find recipe JSON file
|
|
recipe_json_path = os.path.join(recipes_dir, f"{recipe_id}.recipe.json")
|
|
if not os.path.exists(recipe_json_path):
|
|
return web.json_response({"error": "Recipe not found"}, status=404)
|
|
|
|
# Load recipe data to get image path
|
|
with open(recipe_json_path, 'r', encoding='utf-8') as f:
|
|
recipe_data = json.load(f)
|
|
|
|
# Get image path
|
|
image_path = recipe_data.get('file_path')
|
|
|
|
# Delete recipe JSON file
|
|
os.remove(recipe_json_path)
|
|
logger.info(f"Deleted recipe JSON file: {recipe_json_path}")
|
|
|
|
# Delete recipe image if it exists
|
|
if image_path and os.path.exists(image_path):
|
|
os.remove(image_path)
|
|
logger.info(f"Deleted recipe image: {image_path}")
|
|
|
|
# Simplified cache update approach
|
|
if self.recipe_scanner._cache is not None:
|
|
# Remove the recipe from raw_data if it exists
|
|
self.recipe_scanner._cache.raw_data = [
|
|
r for r in self.recipe_scanner._cache.raw_data
|
|
if str(r.get('id', '')) != recipe_id
|
|
]
|
|
# Schedule a background task to resort the cache
|
|
asyncio.create_task(self.recipe_scanner._cache.resort())
|
|
logger.info(f"Removed recipe {recipe_id} from cache")
|
|
|
|
return web.json_response({"success": True, "message": "Recipe deleted successfully"})
|
|
except Exception as e:
|
|
logger.error(f"Error deleting recipe: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def get_top_tags(self, request: web.Request) -> web.Response:
|
|
"""Get top tags used in recipes"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Get limit parameter with default
|
|
limit = int(request.query.get('limit', '20'))
|
|
|
|
# Get all recipes from cache
|
|
cache = await self.recipe_scanner.get_cached_data()
|
|
|
|
# Count tag occurrences
|
|
tag_counts = {}
|
|
for recipe in cache.raw_data:
|
|
if 'tags' in recipe and recipe['tags']:
|
|
for tag in recipe['tags']:
|
|
tag_counts[tag] = tag_counts.get(tag, 0) + 1
|
|
|
|
# Sort tags by count and limit results
|
|
sorted_tags = [{'tag': tag, 'count': count} for tag, count in tag_counts.items()]
|
|
sorted_tags.sort(key=lambda x: x['count'], reverse=True)
|
|
top_tags = sorted_tags[:limit]
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'tags': top_tags
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving top tags: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': str(e)
|
|
}, status=500)
|
|
|
|
async def get_base_models(self, request: web.Request) -> web.Response:
|
|
"""Get base models used in recipes"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Get all recipes from cache
|
|
cache = await self.recipe_scanner.get_cached_data()
|
|
|
|
# Count base model occurrences
|
|
base_model_counts = {}
|
|
for recipe in cache.raw_data:
|
|
if 'base_model' in recipe and recipe['base_model']:
|
|
base_model = recipe['base_model']
|
|
base_model_counts[base_model] = base_model_counts.get(base_model, 0) + 1
|
|
|
|
# Sort base models by count
|
|
sorted_models = [{'name': model, 'count': count} for model, count in base_model_counts.items()]
|
|
sorted_models.sort(key=lambda x: x['count'], reverse=True)
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'base_models': sorted_models
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving base models: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': str(e)}
|
|
, status=500)
|
|
|
|
async def share_recipe(self, request: web.Request) -> web.Response:
|
|
"""Process a recipe image for sharing by adding metadata to EXIF"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
recipe_id = request.match_info['recipe_id']
|
|
|
|
# Get all recipes from cache
|
|
cache = await self.recipe_scanner.get_cached_data()
|
|
|
|
# Find the specific recipe
|
|
recipe = next((r for r in cache.raw_data if str(r.get('id', '')) == recipe_id), None)
|
|
|
|
if not recipe:
|
|
return web.json_response({"error": "Recipe not found"}, status=404)
|
|
|
|
# Get the image path
|
|
image_path = recipe.get('file_path')
|
|
if not image_path or not os.path.exists(image_path):
|
|
return web.json_response({"error": "Recipe image not found"}, status=404)
|
|
|
|
# Create a temporary copy of the image to modify
|
|
import tempfile
|
|
import shutil
|
|
|
|
# Create temp file with same extension
|
|
ext = os.path.splitext(image_path)[1]
|
|
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file:
|
|
temp_path = temp_file.name
|
|
|
|
# Copy the original image to temp file
|
|
shutil.copy2(image_path, temp_path)
|
|
processed_path = temp_path
|
|
|
|
# Create a URL for the processed image
|
|
# Use a timestamp to prevent caching
|
|
timestamp = int(time.time())
|
|
url_path = f"/api/recipe/{recipe_id}/share/download?t={timestamp}"
|
|
|
|
# Store the temp path in a dictionary to serve later
|
|
if not hasattr(self, '_shared_recipes'):
|
|
self._shared_recipes = {}
|
|
|
|
self._shared_recipes[recipe_id] = {
|
|
'path': processed_path,
|
|
'timestamp': timestamp,
|
|
'expires': time.time() + 300 # Expire after 5 minutes
|
|
}
|
|
|
|
# Clean up old entries
|
|
self._cleanup_shared_recipes()
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'download_url': url_path,
|
|
'filename': f"recipe_{recipe.get('title', '').replace(' ', '_').lower()}{ext}"
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error sharing recipe: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def download_shared_recipe(self, request: web.Request) -> web.Response:
|
|
"""Serve a processed recipe image for download"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
recipe_id = request.match_info['recipe_id']
|
|
|
|
# Check if we have this shared recipe
|
|
if not hasattr(self, '_shared_recipes') or recipe_id not in self._shared_recipes:
|
|
return web.json_response({"error": "Shared recipe not found or expired"}, status=404)
|
|
|
|
shared_info = self._shared_recipes[recipe_id]
|
|
file_path = shared_info['path']
|
|
|
|
if not os.path.exists(file_path):
|
|
return web.json_response({"error": "Shared recipe file not found"}, status=404)
|
|
|
|
# Get recipe to determine filename
|
|
cache = await self.recipe_scanner.get_cached_data()
|
|
recipe = next((r for r in cache.raw_data if str(r.get('id', '')) == recipe_id), None)
|
|
|
|
# Set filename for download
|
|
filename = f"recipe_{recipe.get('title', '').replace(' ', '_').lower() if recipe else recipe_id}"
|
|
ext = os.path.splitext(file_path)[1]
|
|
download_filename = f"{filename}{ext}"
|
|
|
|
# Serve the file
|
|
return web.FileResponse(
|
|
file_path,
|
|
headers={
|
|
'Content-Disposition': f'attachment; filename="{download_filename}"'
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error downloading shared recipe: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
def _cleanup_shared_recipes(self):
|
|
"""Clean up expired shared recipes"""
|
|
if not hasattr(self, '_shared_recipes'):
|
|
return
|
|
|
|
current_time = time.time()
|
|
expired_ids = [rid for rid, info in self._shared_recipes.items()
|
|
if current_time > info.get('expires', 0)]
|
|
|
|
for rid in expired_ids:
|
|
try:
|
|
# Delete the temporary file
|
|
file_path = self._shared_recipes[rid]['path']
|
|
if os.path.exists(file_path):
|
|
os.unlink(file_path)
|
|
|
|
# Remove from dictionary
|
|
del self._shared_recipes[rid]
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up shared recipe {rid}: {e}")
|
|
|
|
async def save_recipe_from_widget(self, request: web.Request) -> web.Response:
|
|
"""Save a recipe from the LoRAs widget"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Get metadata using the metadata collector instead of workflow parsing
|
|
raw_metadata = get_metadata()
|
|
metadata_dict = MetadataProcessor.to_dict(raw_metadata)
|
|
|
|
# Check if we have valid metadata
|
|
if not metadata_dict:
|
|
return web.json_response({"error": "No generation metadata found"}, status=400)
|
|
|
|
# Get the most recent image from metadata registry instead of temp directory
|
|
if not standalone_mode:
|
|
metadata_registry = MetadataRegistry()
|
|
latest_image = metadata_registry.get_first_decoded_image()
|
|
else:
|
|
latest_image = None
|
|
|
|
if not latest_image:
|
|
return web.json_response({"error": "No recent images found to use for recipe. Try generating an image first."}, status=400)
|
|
|
|
# Convert the image data to bytes - handle tuple and tensor cases
|
|
logger.debug(f"Image type: {type(latest_image)}")
|
|
|
|
try:
|
|
# Handle the tuple case first
|
|
if isinstance(latest_image, tuple):
|
|
# Extract the tensor from the tuple
|
|
if len(latest_image) > 0:
|
|
tensor_image = latest_image[0]
|
|
else:
|
|
return web.json_response({"error": "Empty image tuple received"}, status=400)
|
|
else:
|
|
tensor_image = latest_image
|
|
|
|
# Get the shape info for debugging
|
|
if hasattr(tensor_image, 'shape'):
|
|
shape_info = tensor_image.shape
|
|
logger.debug(f"Tensor shape: {shape_info}, dtype: {tensor_image.dtype}")
|
|
|
|
# Convert tensor to numpy array
|
|
if isinstance(tensor_image, torch.Tensor):
|
|
image_np = tensor_image.cpu().numpy()
|
|
else:
|
|
image_np = np.array(tensor_image)
|
|
|
|
# Handle different tensor shapes
|
|
# Case: (1, 1, H, W, 3) or (1, H, W, 3) - batch or multi-batch
|
|
if len(image_np.shape) > 3:
|
|
# Remove batch dimensions until we get to (H, W, 3)
|
|
while len(image_np.shape) > 3:
|
|
image_np = image_np[0]
|
|
|
|
# If values are in [0, 1] range, convert to [0, 255]
|
|
if image_np.dtype == np.float32 or image_np.dtype == np.float64:
|
|
if image_np.max() <= 1.0:
|
|
image_np = (image_np * 255).astype(np.uint8)
|
|
|
|
# Ensure image is in the right format (HWC with RGB channels)
|
|
if len(image_np.shape) == 3 and image_np.shape[2] == 3:
|
|
pil_image = Image.fromarray(image_np)
|
|
img_byte_arr = io.BytesIO()
|
|
pil_image.save(img_byte_arr, format='PNG')
|
|
image = img_byte_arr.getvalue()
|
|
else:
|
|
return web.json_response({"error": f"Cannot handle this data shape: {image_np.shape}, {image_np.dtype}"}, status=400)
|
|
except Exception as e:
|
|
logger.error(f"Error processing image data: {str(e)}", exc_info=True)
|
|
return web.json_response({"error": f"Error processing image: {str(e)}"}, status=400)
|
|
|
|
# Get the lora stack from the metadata
|
|
lora_stack = metadata_dict.get("loras", "")
|
|
|
|
# Parse the lora stack format: "<lora:name:strength> <lora:name2:strength2> ..."
|
|
import re
|
|
lora_matches = re.findall(r'<lora:([^:]+):([^>]+)>', lora_stack)
|
|
|
|
# Check if any loras were found
|
|
if not lora_matches:
|
|
return web.json_response({"error": "No LoRAs found in the generation metadata"}, status=400)
|
|
|
|
# Generate recipe name from the first 3 loras (or less if fewer are available)
|
|
loras_for_name = lora_matches[:3] # Take at most 3 loras for the name
|
|
|
|
recipe_name_parts = []
|
|
for lora_name, lora_strength in loras_for_name:
|
|
# Get the basename without path or extension
|
|
basename = os.path.basename(lora_name)
|
|
basename = os.path.splitext(basename)[0]
|
|
recipe_name_parts.append(f"{basename}:{lora_strength}")
|
|
|
|
recipe_name = " ".join(recipe_name_parts)
|
|
|
|
# Create recipes directory if it doesn't exist
|
|
recipes_dir = self.recipe_scanner.recipes_dir
|
|
os.makedirs(recipes_dir, exist_ok=True)
|
|
|
|
# Generate UUID for the recipe
|
|
import uuid
|
|
recipe_id = str(uuid.uuid4())
|
|
|
|
# Optimize the image (resize and convert to WebP)
|
|
optimized_image, extension = ExifUtils.optimize_image(
|
|
image_data=image,
|
|
target_width=CARD_PREVIEW_WIDTH,
|
|
format='webp',
|
|
quality=85,
|
|
preserve_metadata=True
|
|
)
|
|
|
|
# Save the optimized image
|
|
image_filename = f"{recipe_id}{extension}"
|
|
image_path = os.path.join(recipes_dir, image_filename)
|
|
with open(image_path, 'wb') as f:
|
|
f.write(optimized_image)
|
|
|
|
# Format loras data from the lora stack
|
|
loras_data = []
|
|
|
|
for lora_name, lora_strength in lora_matches:
|
|
try:
|
|
# Get lora info from scanner
|
|
lora_info = await self.recipe_scanner._lora_scanner.get_lora_info_by_name(lora_name)
|
|
|
|
# Create lora entry
|
|
lora_entry = {
|
|
"file_name": lora_name,
|
|
"hash": lora_info.get("sha256", "").lower() if lora_info else "",
|
|
"strength": float(lora_strength),
|
|
"modelVersionId": lora_info.get("civitai", {}).get("id", "") if lora_info else "",
|
|
"modelName": lora_info.get("civitai", {}).get("model", {}).get("name", "") if lora_info else lora_name,
|
|
"modelVersionName": lora_info.get("civitai", {}).get("name", "") if lora_info else "",
|
|
"isDeleted": False
|
|
}
|
|
loras_data.append(lora_entry)
|
|
except Exception as e:
|
|
logger.warning(f"Error processing LoRA {lora_name}: {e}")
|
|
|
|
# Get base model from lora scanner for the available loras
|
|
base_model_counts = {}
|
|
for lora in loras_data:
|
|
lora_info = await self.recipe_scanner._lora_scanner.get_lora_info_by_name(lora.get("file_name", ""))
|
|
if lora_info and "base_model" in lora_info:
|
|
base_model = lora_info["base_model"]
|
|
base_model_counts[base_model] = base_model_counts.get(base_model, 0) + 1
|
|
|
|
# Get most common base model
|
|
most_common_base_model = ""
|
|
if base_model_counts:
|
|
most_common_base_model = max(base_model_counts.items(), key=lambda x: x[1])[0]
|
|
|
|
# Create the recipe data structure
|
|
recipe_data = {
|
|
"id": recipe_id,
|
|
"file_path": image_path,
|
|
"title": recipe_name, # Use generated recipe name
|
|
"modified": time.time(),
|
|
"created_date": time.time(),
|
|
"base_model": most_common_base_model,
|
|
"loras": loras_data,
|
|
"checkpoint": metadata_dict.get("checkpoint", ""),
|
|
"gen_params": {key: value for key, value in metadata_dict.items()
|
|
if key not in ['checkpoint', 'loras']},
|
|
"loras_stack": lora_stack # Include the original lora stack string
|
|
}
|
|
|
|
# Save the recipe JSON
|
|
json_filename = f"{recipe_id}.recipe.json"
|
|
json_path = os.path.join(recipes_dir, json_filename)
|
|
with open(json_path, 'w', encoding='utf-8') as f:
|
|
json.dump(recipe_data, f, indent=4, ensure_ascii=False)
|
|
|
|
# Add recipe metadata to the image
|
|
ExifUtils.append_recipe_metadata(image_path, recipe_data)
|
|
|
|
# Update cache
|
|
if self.recipe_scanner._cache is not None:
|
|
# Add the recipe to the raw data if the cache exists
|
|
self.recipe_scanner._cache.raw_data.append(recipe_data)
|
|
# Schedule a background task to resort the cache
|
|
asyncio.create_task(self.recipe_scanner._cache.resort())
|
|
logger.info(f"Added recipe {recipe_id} to cache")
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'recipe_id': recipe_id,
|
|
'image_path': image_path,
|
|
'json_path': json_path,
|
|
'recipe_name': recipe_name # Include the generated recipe name in the response
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving recipe from widget: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def get_recipe_syntax(self, request: web.Request) -> web.Response:
|
|
"""Generate recipe syntax for LoRAs in the recipe, looking up proper file names using hash_index"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
recipe_id = request.match_info['recipe_id']
|
|
|
|
# Get all recipes from cache
|
|
cache = await self.recipe_scanner.get_cached_data()
|
|
|
|
# Find the specific recipe
|
|
recipe = next((r for r in cache.raw_data if str(r.get('id', '')) == recipe_id), None)
|
|
|
|
if not recipe:
|
|
return web.json_response({"error": "Recipe not found"}, status=404)
|
|
|
|
# Get the loras from the recipe
|
|
loras = recipe.get('loras', [])
|
|
|
|
if not loras:
|
|
return web.json_response({"error": "No LoRAs found in this recipe"}, status=400)
|
|
|
|
# Generate recipe syntax for all LoRAs that:
|
|
# 1. Are in the library (not deleted) OR
|
|
# 2. Are deleted but not marked for exclusion
|
|
lora_syntax_parts = []
|
|
|
|
# Access the hash_index from lora_scanner
|
|
hash_index = self.recipe_scanner._lora_scanner._hash_index
|
|
|
|
for lora in loras:
|
|
# Skip loras that are deleted AND marked for exclusion
|
|
if lora.get("isDeleted", False):
|
|
continue
|
|
|
|
if not self.recipe_scanner._lora_scanner.has_lora_hash(lora.get("hash", "")):
|
|
continue
|
|
|
|
# Get the strength
|
|
strength = lora.get("strength", 1.0)
|
|
|
|
# Try to find the actual file name for this lora
|
|
file_name = None
|
|
hash_value = lora.get("hash", "").lower()
|
|
|
|
if hash_value and hasattr(hash_index, "_hash_to_path"):
|
|
# Look up the file path from the hash
|
|
file_path = hash_index._hash_to_path.get(hash_value)
|
|
|
|
if file_path:
|
|
# Extract the file name without extension from the path
|
|
file_name = os.path.splitext(os.path.basename(file_path))[0]
|
|
|
|
# If hash lookup failed, fall back to modelVersionId lookup
|
|
if not file_name and lora.get("modelVersionId"):
|
|
# Search for files with matching modelVersionId
|
|
all_loras = await self.recipe_scanner._lora_scanner.get_cached_data()
|
|
for cached_lora in all_loras.raw_data:
|
|
if not cached_lora.get("civitai"):
|
|
continue
|
|
if cached_lora.get("civitai", {}).get("id") == lora.get("modelVersionId"):
|
|
file_name = os.path.splitext(os.path.basename(cached_lora["path"]))[0]
|
|
break
|
|
|
|
# If all lookups failed, use the file_name from the recipe
|
|
if not file_name:
|
|
file_name = lora.get("file_name", "unknown-lora")
|
|
|
|
# Add to syntax parts
|
|
lora_syntax_parts.append(f"<lora:{file_name}:{strength}>")
|
|
|
|
# Join the LoRA syntax parts
|
|
lora_syntax = " ".join(lora_syntax_parts)
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'syntax': lora_syntax
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error generating recipe syntax: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def update_recipe(self, request: web.Request) -> web.Response:
|
|
"""Update recipe metadata (name and tags)"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
recipe_id = request.match_info['recipe_id']
|
|
data = await request.json()
|
|
|
|
# Validate required fields
|
|
if 'title' not in data and 'tags' not in data and 'source_path' not in data:
|
|
return web.json_response({
|
|
"error": "At least one field to update must be provided (title or tags or source_path)"
|
|
}, status=400)
|
|
|
|
# Use the recipe scanner's update method
|
|
success = await self.recipe_scanner.update_recipe_metadata(recipe_id, data)
|
|
|
|
if not success:
|
|
return web.json_response({"error": "Recipe not found or update failed"}, status=404)
|
|
|
|
return web.json_response({
|
|
"success": True,
|
|
"recipe_id": recipe_id,
|
|
"updates": data
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error updating recipe: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def reconnect_lora(self, request: web.Request) -> web.Response:
|
|
"""Reconnect a deleted LoRA in a recipe to a local LoRA file"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Parse request data
|
|
data = await request.json()
|
|
|
|
# Validate required fields
|
|
required_fields = ['recipe_id', 'lora_data', 'target_name']
|
|
for field in required_fields:
|
|
if field not in data:
|
|
return web.json_response({
|
|
"error": f"Missing required field: {field}"
|
|
}, status=400)
|
|
|
|
recipe_id = data['recipe_id']
|
|
lora_data = data['lora_data']
|
|
target_name = data['target_name']
|
|
|
|
# Get recipe scanner
|
|
scanner = self.recipe_scanner
|
|
lora_scanner = scanner._lora_scanner
|
|
|
|
# Check if recipe exists
|
|
recipe_path = os.path.join(scanner.recipes_dir, f"{recipe_id}.recipe.json")
|
|
if not os.path.exists(recipe_path):
|
|
return web.json_response({"error": "Recipe not found"}, status=404)
|
|
|
|
# Find target LoRA by name
|
|
target_lora = await lora_scanner.get_lora_info_by_name(target_name)
|
|
if not target_lora:
|
|
return web.json_response({"error": f"Local LoRA not found with name: {target_name}"}, status=404)
|
|
|
|
# Load recipe data
|
|
with open(recipe_path, 'r', encoding='utf-8') as f:
|
|
recipe_data = json.load(f)
|
|
|
|
# Find the deleted LoRA in the recipe
|
|
found = False
|
|
updated_lora = None
|
|
|
|
# Identification can be by hash, modelVersionId, or modelName
|
|
for i, lora in enumerate(recipe_data.get('loras', [])):
|
|
match_found = False
|
|
|
|
# Try to match by available identifiers
|
|
if 'hash' in lora and 'hash' in lora_data and lora['hash'] == lora_data['hash']:
|
|
match_found = True
|
|
elif 'modelVersionId' in lora and 'modelVersionId' in lora_data and lora['modelVersionId'] == lora_data['modelVersionId']:
|
|
match_found = True
|
|
elif 'modelName' in lora and 'modelName' in lora_data and lora['modelName'] == lora_data['modelName']:
|
|
match_found = True
|
|
|
|
if match_found:
|
|
# Update LoRA data
|
|
lora['isDeleted'] = False
|
|
lora['file_name'] = target_name
|
|
|
|
# Update with information from the target LoRA
|
|
if 'sha256' in target_lora:
|
|
lora['hash'] = target_lora['sha256'].lower()
|
|
if target_lora.get("civitai"):
|
|
lora['modelName'] = target_lora['civitai']['model']['name']
|
|
lora['modelVersionName'] = target_lora['civitai']['name']
|
|
lora['modelVersionId'] = target_lora['civitai']['id']
|
|
|
|
# Keep original fields for identification
|
|
|
|
# Mark as found and store updated lora
|
|
found = True
|
|
updated_lora = dict(lora) # Make a copy for response
|
|
break
|
|
|
|
if not found:
|
|
return web.json_response({"error": "Could not find matching deleted LoRA in recipe"}, status=404)
|
|
|
|
# Recalculate recipe fingerprint after updating LoRA
|
|
from ..utils.utils import calculate_recipe_fingerprint
|
|
recipe_data['fingerprint'] = calculate_recipe_fingerprint(recipe_data.get('loras', []))
|
|
|
|
# Save updated recipe
|
|
with open(recipe_path, 'w', encoding='utf-8') as f:
|
|
json.dump(recipe_data, f, indent=4, ensure_ascii=False)
|
|
|
|
updated_lora['inLibrary'] = True
|
|
updated_lora['preview_url'] = target_lora['preview_url']
|
|
updated_lora['localPath'] = target_lora['file_path']
|
|
|
|
# Update in cache if it exists
|
|
if scanner._cache is not None:
|
|
for cache_item in scanner._cache.raw_data:
|
|
if cache_item.get('id') == recipe_id:
|
|
# Replace loras array with updated version
|
|
cache_item['loras'] = recipe_data['loras']
|
|
# Update fingerprint in cache
|
|
cache_item['fingerprint'] = recipe_data['fingerprint']
|
|
|
|
# Resort the cache
|
|
asyncio.create_task(scanner._cache.resort())
|
|
break
|
|
|
|
# Update EXIF metadata if image exists
|
|
image_path = recipe_data.get('file_path')
|
|
if image_path and os.path.exists(image_path):
|
|
from ..utils.exif_utils import ExifUtils
|
|
ExifUtils.append_recipe_metadata(image_path, recipe_data)
|
|
|
|
# Find other recipes with the same fingerprint
|
|
matching_recipes = []
|
|
if 'fingerprint' in recipe_data:
|
|
matching_recipes = await scanner.find_recipes_by_fingerprint(recipe_data['fingerprint'])
|
|
# Remove current recipe from matches
|
|
if recipe_id in matching_recipes:
|
|
matching_recipes.remove(recipe_id)
|
|
|
|
return web.json_response({
|
|
"success": True,
|
|
"recipe_id": recipe_id,
|
|
"updated_lora": updated_lora,
|
|
"matching_recipes": matching_recipes
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reconnecting LoRA: {e}", exc_info=True)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def get_recipes_for_lora(self, request: web.Request) -> web.Response:
|
|
"""Get recipes that use a specific Lora"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
lora_hash = request.query.get('hash')
|
|
|
|
# Hash is required
|
|
if not lora_hash:
|
|
return web.json_response({'success': False, 'error': 'Lora hash is required'}, status=400)
|
|
|
|
# Log the search parameters
|
|
logger.debug(f"Getting recipes for Lora by hash: {lora_hash}")
|
|
|
|
# Get all recipes from cache
|
|
cache = await self.recipe_scanner.get_cached_data()
|
|
|
|
# Filter recipes that use this Lora by hash
|
|
matching_recipes = []
|
|
for recipe in cache.raw_data:
|
|
# Check if any of the recipe's loras match this hash
|
|
loras = recipe.get('loras', [])
|
|
for lora in loras:
|
|
if lora.get('hash', '').lower() == lora_hash.lower():
|
|
matching_recipes.append(recipe)
|
|
break # No need to check other loras in this recipe
|
|
|
|
# Process the recipes similar to get_paginated_data to ensure all needed data is available
|
|
for recipe in matching_recipes:
|
|
# Add inLibrary information for each lora
|
|
if 'loras' in recipe:
|
|
for lora in recipe['loras']:
|
|
if 'hash' in lora and lora['hash']:
|
|
lora['inLibrary'] = self.recipe_scanner._lora_scanner.has_lora_hash(lora['hash'].lower())
|
|
lora['preview_url'] = self.recipe_scanner._lora_scanner.get_preview_url_by_hash(lora['hash'].lower())
|
|
lora['localPath'] = self.recipe_scanner._lora_scanner.get_lora_path_by_hash(lora['hash'].lower())
|
|
|
|
# Ensure file_url is set (needed by frontend)
|
|
if 'file_path' in recipe:
|
|
recipe['file_url'] = self._format_recipe_file_url(recipe['file_path'])
|
|
else:
|
|
recipe['file_url'] = '/loras_static/images/no-preview.png'
|
|
|
|
return web.json_response({'success': True, 'recipes': matching_recipes})
|
|
except Exception as e:
|
|
logger.error(f"Error getting recipes for Lora: {str(e)}")
|
|
return web.json_response({'success': False, 'error': str(e)}, status=500)
|
|
|
|
async def scan_recipes(self, request: web.Request) -> web.Response:
|
|
"""API endpoint for scanning and rebuilding the recipe cache"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Force refresh the recipe cache
|
|
logger.info("Manually triggering recipe cache rebuild")
|
|
await self.recipe_scanner.get_cached_data(force_refresh=True)
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'message': 'Recipe cache refreshed successfully'
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error refreshing recipe cache: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': str(e)
|
|
}, status=500)
|
|
|
|
async def find_duplicates(self, request: web.Request) -> web.Response:
|
|
"""Find all duplicate recipes based on fingerprints"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Get all duplicate recipes
|
|
duplicate_groups = await self.recipe_scanner.find_all_duplicate_recipes()
|
|
|
|
# Create response data with additional recipe information
|
|
response_data = []
|
|
|
|
for fingerprint, recipe_ids in duplicate_groups.items():
|
|
# Skip groups with only one recipe (not duplicates)
|
|
if len(recipe_ids) <= 1:
|
|
continue
|
|
|
|
# Get recipe details for each recipe in the group
|
|
recipes = []
|
|
for recipe_id in recipe_ids:
|
|
recipe = await self.recipe_scanner.get_recipe_by_id(recipe_id)
|
|
if recipe:
|
|
# Add only needed fields to keep response size manageable
|
|
recipes.append({
|
|
'id': recipe.get('id'),
|
|
'title': recipe.get('title'),
|
|
'file_url': recipe.get('file_url') or self._format_recipe_file_url(recipe.get('file_path', '')),
|
|
'modified': recipe.get('modified'),
|
|
'created_date': recipe.get('created_date'),
|
|
'lora_count': len(recipe.get('loras', [])),
|
|
})
|
|
|
|
# Only include groups with at least 2 valid recipes
|
|
if len(recipes) >= 2:
|
|
# Sort recipes by modified date (newest first)
|
|
recipes.sort(key=lambda x: x.get('modified', 0), reverse=True)
|
|
|
|
response_data.append({
|
|
'fingerprint': fingerprint,
|
|
'count': len(recipes),
|
|
'recipes': recipes
|
|
})
|
|
|
|
# Sort groups by count (highest first)
|
|
response_data.sort(key=lambda x: x['count'], reverse=True)
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'duplicate_groups': response_data
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error finding duplicate recipes: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': str(e)
|
|
}, status=500)
|
|
|
|
async def bulk_delete(self, request: web.Request) -> web.Response:
|
|
"""Delete multiple recipes by ID"""
|
|
try:
|
|
# Ensure services are initialized
|
|
await self.init_services()
|
|
|
|
# Parse request data
|
|
data = await request.json()
|
|
recipe_ids = data.get('recipe_ids', [])
|
|
|
|
if not recipe_ids:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'No recipe IDs provided'
|
|
}, status=400)
|
|
|
|
# Get recipes directory
|
|
recipes_dir = self.recipe_scanner.recipes_dir
|
|
if not recipes_dir or not os.path.exists(recipes_dir):
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'Recipes directory not found'
|
|
}, status=404)
|
|
|
|
# Track deleted and failed recipes
|
|
deleted_recipes = []
|
|
failed_recipes = []
|
|
|
|
# Process each recipe ID
|
|
for recipe_id in recipe_ids:
|
|
# Find recipe JSON file
|
|
recipe_json_path = os.path.join(recipes_dir, f"{recipe_id}.recipe.json")
|
|
|
|
if not os.path.exists(recipe_json_path):
|
|
failed_recipes.append({
|
|
'id': recipe_id,
|
|
'reason': 'Recipe not found'
|
|
})
|
|
continue
|
|
|
|
try:
|
|
# Load recipe data to get image path
|
|
with open(recipe_json_path, 'r', encoding='utf-8') as f:
|
|
recipe_data = json.load(f)
|
|
|
|
# Get image path
|
|
image_path = recipe_data.get('file_path')
|
|
|
|
# Delete recipe JSON file
|
|
os.remove(recipe_json_path)
|
|
|
|
# Delete recipe image if it exists
|
|
if image_path and os.path.exists(image_path):
|
|
os.remove(image_path)
|
|
|
|
deleted_recipes.append(recipe_id)
|
|
|
|
except Exception as e:
|
|
failed_recipes.append({
|
|
'id': recipe_id,
|
|
'reason': str(e)
|
|
})
|
|
|
|
# Update cache if any recipes were deleted
|
|
if deleted_recipes and self.recipe_scanner._cache is not None:
|
|
# Remove deleted recipes from raw_data
|
|
self.recipe_scanner._cache.raw_data = [
|
|
r for r in self.recipe_scanner._cache.raw_data
|
|
if r.get('id') not in deleted_recipes
|
|
]
|
|
# Resort the cache
|
|
asyncio.create_task(self.recipe_scanner._cache.resort())
|
|
logger.info(f"Removed {len(deleted_recipes)} recipes from cache")
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'deleted': deleted_recipes,
|
|
'failed': failed_recipes,
|
|
'total_deleted': len(deleted_recipes),
|
|
'total_failed': len(failed_recipes)
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error performing bulk delete: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': str(e)
|
|
}, status=500)
|