Files
ComfyUI-Lora-Manager/py/routes/handlers/recipe_handlers.py

1348 lines
59 KiB
Python

"""Dedicated handler objects for recipe-related routes."""
from __future__ import annotations
import asyncio
import base64
import io
import json
import logging
import os
import tempfile
import time
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, Mapping, Optional
import numpy as np
from aiohttp import web
from PIL import Image
from ...config import config
from ...recipes import RecipeParserFactory
from ...services.downloader import get_downloader
from ...services.server_i18n import server_i18n as default_server_i18n
from ...services.settings_manager import SettingsManager
from ...utils.constants import CARD_PREVIEW_WIDTH
from ...utils.exif_utils import ExifUtils
# Check if running in standalone mode
standalone_mode = os.environ.get("HF_HUB_DISABLE_TELEMETRY", "0") == "0"
if not standalone_mode:
from ...metadata_collector import get_metadata
from ...metadata_collector.metadata_processor import MetadataProcessor
from ...metadata_collector.metadata_registry import MetadataRegistry
else: # pragma: no cover - optional dependency path
get_metadata = None # type: ignore[assignment]
MetadataProcessor = None # type: ignore[assignment]
MetadataRegistry = None # type: ignore[assignment]
Logger = logging.Logger
EnsureDependenciesCallable = Callable[[], Awaitable[None]]
RecipeScannerGetter = Callable[[], Any]
CivitaiClientGetter = Callable[[], Any]
@dataclass(frozen=True)
class RecipeHandlerSet:
"""Group of handlers providing recipe route implementations."""
page_view: "RecipePageView"
listing: "RecipeListingHandler"
query: "RecipeQueryHandler"
management: "RecipeManagementHandler"
analysis: "RecipeAnalysisHandler"
sharing: "RecipeSharingHandler"
def to_route_mapping(self) -> Mapping[str, Callable[[web.Request], Awaitable[web.StreamResponse]]]:
"""Expose handler coroutines keyed by registrar handler names."""
return {
"render_page": self.page_view.render_page,
"list_recipes": self.listing.list_recipes,
"get_recipe": self.listing.get_recipe,
"analyze_uploaded_image": self.analysis.analyze_uploaded_image,
"analyze_local_image": self.analysis.analyze_local_image,
"save_recipe": self.management.save_recipe,
"delete_recipe": self.management.delete_recipe,
"get_top_tags": self.query.get_top_tags,
"get_base_models": self.query.get_base_models,
"share_recipe": self.sharing.share_recipe,
"download_shared_recipe": self.sharing.download_shared_recipe,
"get_recipe_syntax": self.query.get_recipe_syntax,
"update_recipe": self.management.update_recipe,
"reconnect_lora": self.management.reconnect_lora,
"find_duplicates": self.query.find_duplicates,
"bulk_delete": self.management.bulk_delete,
"save_recipe_from_widget": self.management.save_recipe_from_widget,
"get_recipes_for_lora": self.query.get_recipes_for_lora,
"scan_recipes": self.query.scan_recipes,
}
class RecipePageView:
"""Render the recipe shell page."""
def __init__(
self,
*,
ensure_dependencies_ready: EnsureDependenciesCallable,
settings_service: SettingsManager,
server_i18n=default_server_i18n,
template_env,
template_name: str,
recipe_scanner_getter: RecipeScannerGetter,
logger: Logger,
) -> None:
self._ensure_dependencies_ready = ensure_dependencies_ready
self._settings = settings_service
self._server_i18n = server_i18n
self._template_env = template_env
self._template_name = template_name
self._recipe_scanner_getter = recipe_scanner_getter
self._logger = logger
async def render_page(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None: # pragma: no cover - defensive guard
raise RuntimeError("Recipe scanner not available")
user_language = self._settings.get("language", "en")
self._server_i18n.set_locale(user_language)
try:
await recipe_scanner.get_cached_data(force_refresh=False)
rendered = self._template_env.get_template(self._template_name).render(
recipes=[],
is_initializing=False,
settings=self._settings,
request=request,
t=self._server_i18n.get_translation,
)
except Exception as cache_error: # pragma: no cover - logging path
self._logger.error("Error loading recipe cache data: %s", cache_error)
rendered = self._template_env.get_template(self._template_name).render(
is_initializing=True,
settings=self._settings,
request=request,
t=self._server_i18n.get_translation,
)
return web.Response(text=rendered, content_type="text/html")
except Exception as exc: # pragma: no cover - logging path
self._logger.error("Error handling recipes request: %s", exc, exc_info=True)
return web.Response(text="Error loading recipes page", status=500)
class RecipeListingHandler:
"""Provide listing and detail APIs for recipes."""
def __init__(
self,
*,
ensure_dependencies_ready: EnsureDependenciesCallable,
recipe_scanner_getter: RecipeScannerGetter,
logger: Logger,
) -> None:
self._ensure_dependencies_ready = ensure_dependencies_ready
self._recipe_scanner_getter = recipe_scanner_getter
self._logger = logger
async def list_recipes(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
page = int(request.query.get("page", "1"))
page_size = int(request.query.get("page_size", "20"))
sort_by = request.query.get("sort_by", "date")
search = request.query.get("search")
search_options = {
"title": request.query.get("search_title", "true").lower() == "true",
"tags": request.query.get("search_tags", "true").lower() == "true",
"lora_name": request.query.get("search_lora_name", "true").lower() == "true",
"lora_model": request.query.get("search_lora_model", "true").lower() == "true",
}
filters: Dict[str, list[str]] = {}
base_models = request.query.get("base_models")
if base_models:
filters["base_model"] = base_models.split(",")
tags = request.query.get("tags")
if tags:
filters["tags"] = tags.split(",")
lora_hash = request.query.get("lora_hash")
result = await recipe_scanner.get_paginated_data(
page=page,
page_size=page_size,
sort_by=sort_by,
search=search,
filters=filters,
search_options=search_options,
lora_hash=lora_hash,
)
for item in result.get("items", []):
file_path = item.get("file_path")
if file_path:
item["file_url"] = self.format_recipe_file_url(file_path)
else:
item.setdefault("file_url", "/loras_static/images/no-preview.png")
item.setdefault("loras", [])
item.setdefault("base_model", "")
return web.json_response(result)
except Exception as exc:
self._logger.error("Error retrieving recipes: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
async def get_recipe(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
recipe_id = request.match_info["recipe_id"]
recipe = await recipe_scanner.get_recipe_by_id(recipe_id)
if not recipe:
return web.json_response({"error": "Recipe not found"}, status=404)
return web.json_response(recipe)
except Exception as exc:
self._logger.error("Error retrieving recipe details: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
def format_recipe_file_url(self, file_path: str) -> str:
try:
recipes_dir = os.path.join(config.loras_roots[0], "recipes").replace(os.sep, "/")
normalized_path = file_path.replace(os.sep, "/")
if normalized_path.startswith(recipes_dir):
relative_path = os.path.relpath(file_path, config.loras_roots[0]).replace(os.sep, "/")
return f"/loras_static/root1/preview/{relative_path}"
file_name = os.path.basename(file_path)
return f"/loras_static/root1/preview/recipes/{file_name}"
except Exception as exc: # pragma: no cover - logging path
self._logger.error("Error formatting recipe file URL: %s", exc, exc_info=True)
return "/loras_static/images/no-preview.png"
class RecipeQueryHandler:
"""Provide read-only insights on recipe data."""
def __init__(
self,
*,
ensure_dependencies_ready: EnsureDependenciesCallable,
recipe_scanner_getter: RecipeScannerGetter,
format_recipe_file_url: Callable[[str], str],
logger: Logger,
) -> None:
self._ensure_dependencies_ready = ensure_dependencies_ready
self._recipe_scanner_getter = recipe_scanner_getter
self._format_recipe_file_url = format_recipe_file_url
self._logger = logger
async def get_top_tags(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
limit = int(request.query.get("limit", "20"))
cache = await recipe_scanner.get_cached_data()
tag_counts: Dict[str, int] = {}
for recipe in getattr(cache, "raw_data", []):
for tag in recipe.get("tags", []) or []:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
sorted_tags = [{"tag": tag, "count": count} for tag, count in tag_counts.items()]
sorted_tags.sort(key=lambda entry: entry["count"], reverse=True)
return web.json_response({"success": True, "tags": sorted_tags[:limit]})
except Exception as exc:
self._logger.error("Error retrieving top tags: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500)
async def get_base_models(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
cache = await recipe_scanner.get_cached_data()
base_model_counts: Dict[str, int] = {}
for recipe in getattr(cache, "raw_data", []):
base_model = recipe.get("base_model")
if base_model:
base_model_counts[base_model] = base_model_counts.get(base_model, 0) + 1
sorted_models = [{"name": model, "count": count} for model, count in base_model_counts.items()]
sorted_models.sort(key=lambda entry: entry["count"], reverse=True)
return web.json_response({"success": True, "base_models": sorted_models})
except Exception as exc:
self._logger.error("Error retrieving base models: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500)
async def get_recipes_for_lora(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
lora_hash = request.query.get("hash")
if not lora_hash:
return web.json_response({"success": False, "error": "Lora hash is required"}, status=400)
cache = await recipe_scanner.get_cached_data()
matching_recipes = []
for recipe in getattr(cache, "raw_data", []):
for lora in recipe.get("loras", []):
if lora.get("hash", "").lower() == lora_hash.lower():
matching_recipes.append(recipe)
break
lora_scanner = getattr(recipe_scanner, "_lora_scanner", None)
for recipe in matching_recipes:
for lora in recipe.get("loras", []):
hash_value = (lora.get("hash") or "").lower()
if hash_value and lora_scanner is not None:
lora["inLibrary"] = lora_scanner.has_hash(hash_value)
lora["preview_url"] = lora_scanner.get_preview_url_by_hash(hash_value)
lora["localPath"] = lora_scanner.get_path_by_hash(hash_value)
if recipe.get("file_path"):
recipe["file_url"] = self._format_recipe_file_url(recipe["file_path"])
else:
recipe["file_url"] = "/loras_static/images/no-preview.png"
return web.json_response({"success": True, "recipes": matching_recipes})
except Exception as exc:
self._logger.error("Error getting recipes for Lora: %s", exc)
return web.json_response({"success": False, "error": str(exc)}, status=500)
async def scan_recipes(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
self._logger.info("Manually triggering recipe cache rebuild")
await recipe_scanner.get_cached_data(force_refresh=True)
return web.json_response({"success": True, "message": "Recipe cache refreshed successfully"})
except Exception as exc:
self._logger.error("Error refreshing recipe cache: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500)
async def find_duplicates(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
duplicate_groups = await recipe_scanner.find_all_duplicate_recipes()
response_data = []
for fingerprint, recipe_ids in duplicate_groups.items():
if len(recipe_ids) <= 1:
continue
recipes = []
for recipe_id in recipe_ids:
recipe = await recipe_scanner.get_recipe_by_id(recipe_id)
if recipe:
recipes.append(
{
"id": recipe.get("id"),
"title": recipe.get("title"),
"file_url": recipe.get("file_url")
or self._format_recipe_file_url(recipe.get("file_path", "")),
"modified": recipe.get("modified"),
"created_date": recipe.get("created_date"),
"lora_count": len(recipe.get("loras", [])),
}
)
if len(recipes) >= 2:
recipes.sort(key=lambda entry: entry.get("modified", 0), reverse=True)
response_data.append(
{
"fingerprint": fingerprint,
"count": len(recipes),
"recipes": recipes,
}
)
response_data.sort(key=lambda entry: entry["count"], reverse=True)
return web.json_response({"success": True, "duplicate_groups": response_data})
except Exception as exc:
self._logger.error("Error finding duplicate recipes: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500)
async def get_recipe_syntax(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
recipe_id = request.match_info["recipe_id"]
cache = await recipe_scanner.get_cached_data()
recipe = next(
(r for r in getattr(cache, "raw_data", []) if str(r.get("id", "")) == recipe_id),
None,
)
if not recipe:
return web.json_response({"error": "Recipe not found"}, status=404)
loras = recipe.get("loras", [])
if not loras:
return web.json_response({"error": "No LoRAs found in this recipe"}, status=400)
lora_scanner = getattr(recipe_scanner, "_lora_scanner", None)
hash_index = getattr(lora_scanner, "_hash_index", None)
lora_syntax_parts = []
for lora in loras:
if lora.get("isDeleted", False):
continue
hash_value = (lora.get("hash") or "").lower()
if not hash_value or lora_scanner is None or not lora_scanner.has_hash(hash_value):
continue
file_name = None
if hash_value and hash_index is not None and hasattr(hash_index, "_hash_to_path"):
file_path = hash_index._hash_to_path.get(hash_value)
if file_path:
file_name = os.path.splitext(os.path.basename(file_path))[0]
if not file_name and lora.get("modelVersionId") and lora_scanner is not None:
all_loras = await lora_scanner.get_cached_data()
for cached_lora in getattr(all_loras, "raw_data", []):
civitai_info = cached_lora.get("civitai")
if civitai_info and civitai_info.get("id") == lora.get("modelVersionId"):
file_name = os.path.splitext(os.path.basename(cached_lora["path"]))[0]
break
if not file_name:
file_name = lora.get("file_name", "unknown-lora")
strength = lora.get("strength", 1.0)
lora_syntax_parts.append(f"<lora:{file_name}:{strength}>")
return web.json_response({"success": True, "syntax": " ".join(lora_syntax_parts)})
except Exception as exc:
self._logger.error("Error generating recipe syntax: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
class RecipeManagementHandler:
"""Handle create/update/delete style recipe operations."""
def __init__(
self,
*,
ensure_dependencies_ready: EnsureDependenciesCallable,
recipe_scanner_getter: RecipeScannerGetter,
logger: Logger,
exif_utils=ExifUtils,
card_preview_width: int = CARD_PREVIEW_WIDTH,
metadata_collector: Optional[Callable[[], Any]] = get_metadata,
metadata_processor_cls: Optional[type] = MetadataProcessor,
metadata_registry_cls: Optional[type] = MetadataRegistry,
standalone_mode: bool = standalone_mode,
) -> None:
self._ensure_dependencies_ready = ensure_dependencies_ready
self._recipe_scanner_getter = recipe_scanner_getter
self._logger = logger
self._exif_utils = exif_utils
self._card_preview_width = card_preview_width
self._metadata_collector = metadata_collector
self._metadata_processor_cls = metadata_processor_cls
self._metadata_registry_cls = metadata_registry_cls
self._standalone_mode = standalone_mode
async def save_recipe(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
reader = await request.multipart()
image: Optional[bytes] = None
image_base64: Optional[str] = None
name: Optional[str] = None
tags: list[str] = []
metadata: Dict[str, Any] | None = None
while True:
field = await reader.next()
if field is None:
break
if field.name == "image":
image_chunks = bytearray()
while True:
chunk = await field.read_chunk()
if not chunk:
break
image_chunks.extend(chunk)
image = bytes(image_chunks)
elif field.name == "image_base64":
image_base64 = await field.text()
elif field.name == "name":
name = await field.text()
elif field.name == "tags":
tags_text = await field.text()
try:
parsed_tags = json.loads(tags_text)
tags = parsed_tags if isinstance(parsed_tags, list) else []
except Exception:
tags = []
elif field.name == "metadata":
metadata_text = await field.text()
try:
metadata = json.loads(metadata_text)
except Exception:
metadata = {}
missing_fields = []
if not name:
missing_fields.append("name")
if not metadata:
missing_fields.append("metadata")
if missing_fields:
return web.json_response(
{"error": f"Missing required fields: {', '.join(missing_fields)}"},
status=400,
)
if image is None:
if image_base64:
try:
if "," in image_base64:
image_base64 = image_base64.split(",", 1)[1]
image = base64.b64decode(image_base64)
except Exception as exc:
return web.json_response({"error": f"Invalid base64 image data: {exc}"}, status=400)
else:
return web.json_response({"error": "No image data provided"}, status=400)
recipes_dir = recipe_scanner.recipes_dir
os.makedirs(recipes_dir, exist_ok=True)
import uuid
recipe_id = str(uuid.uuid4())
optimized_image, extension = self._exif_utils.optimize_image(
image_data=image,
target_width=self._card_preview_width,
format="webp",
quality=85,
preserve_metadata=True,
)
image_filename = f"{recipe_id}{extension}"
image_path = os.path.join(recipes_dir, image_filename)
with open(image_path, "wb") as file_obj:
file_obj.write(optimized_image)
current_time = time.time()
loras_data = []
for lora in metadata.get("loras", []):
loras_data.append(
{
"file_name": lora.get("file_name", "")
or (
os.path.splitext(os.path.basename(lora.get("localPath", "")))[0]
if lora.get("localPath")
else ""
),
"hash": (lora.get("hash") or "").lower(),
"strength": float(lora.get("weight", 1.0)),
"modelVersionId": lora.get("id", 0),
"modelName": lora.get("name", ""),
"modelVersionName": lora.get("version", ""),
"isDeleted": lora.get("isDeleted", False),
"exclude": lora.get("exclude", False),
}
)
gen_params = metadata.get("gen_params", {})
if not gen_params and "raw_metadata" in metadata:
raw_metadata = metadata.get("raw_metadata", {})
gen_params = {
"prompt": raw_metadata.get("prompt", ""),
"negative_prompt": raw_metadata.get("negative_prompt", ""),
"checkpoint": raw_metadata.get("checkpoint", {}),
"steps": raw_metadata.get("steps", ""),
"sampler": raw_metadata.get("sampler", ""),
"cfg_scale": raw_metadata.get("cfg_scale", ""),
"seed": raw_metadata.get("seed", ""),
"size": raw_metadata.get("size", ""),
"clip_skip": raw_metadata.get("clip_skip", ""),
}
from ...utils.utils import calculate_recipe_fingerprint
fingerprint = calculate_recipe_fingerprint(loras_data)
recipe_data = {
"id": recipe_id,
"file_path": image_path,
"title": name,
"modified": current_time,
"created_date": current_time,
"base_model": metadata.get("base_model", ""),
"loras": loras_data,
"gen_params": gen_params,
"fingerprint": fingerprint,
}
if tags:
recipe_data["tags"] = tags
if metadata.get("source_path"):
recipe_data["source_path"] = metadata.get("source_path")
json_filename = f"{recipe_id}.recipe.json"
json_path = os.path.join(recipes_dir, json_filename)
with open(json_path, "w", encoding="utf-8") as file_obj:
json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False)
self._exif_utils.append_recipe_metadata(image_path, recipe_data)
matching_recipes = []
if fingerprint:
matching_recipes = await recipe_scanner.find_recipes_by_fingerprint(fingerprint)
if recipe_id in matching_recipes:
matching_recipes.remove(recipe_id)
cache = getattr(recipe_scanner, "_cache", None)
if cache is not None:
cache.raw_data.append(recipe_data)
asyncio.create_task(cache.resort())
self._logger.info("Added recipe %s to cache", recipe_id)
return web.json_response(
{
"success": True,
"recipe_id": recipe_id,
"image_path": image_path,
"json_path": json_path,
"matching_recipes": matching_recipes,
}
)
except Exception as exc:
self._logger.error("Error saving recipe: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
async def delete_recipe(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
recipe_id = request.match_info["recipe_id"]
recipes_dir = recipe_scanner.recipes_dir
if not recipes_dir or not os.path.exists(recipes_dir):
return web.json_response({"error": "Recipes directory not found"}, status=404)
recipe_json_path = os.path.join(recipes_dir, f"{recipe_id}.recipe.json")
if not os.path.exists(recipe_json_path):
return web.json_response({"error": "Recipe not found"}, status=404)
with open(recipe_json_path, "r", encoding="utf-8") as file_obj:
recipe_data = json.load(file_obj)
image_path = recipe_data.get("file_path")
os.remove(recipe_json_path)
self._logger.info("Deleted recipe JSON file: %s", recipe_json_path)
if image_path and os.path.exists(image_path):
os.remove(image_path)
self._logger.info("Deleted recipe image: %s", image_path)
cache = getattr(recipe_scanner, "_cache", None)
if cache is not None:
cache.raw_data = [
item for item in cache.raw_data if str(item.get("id", "")) != recipe_id
]
asyncio.create_task(cache.resort())
self._logger.info("Removed recipe %s from cache", recipe_id)
return web.json_response({"success": True, "message": "Recipe deleted successfully"})
except Exception as exc:
self._logger.error("Error deleting recipe: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
async def update_recipe(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
recipe_id = request.match_info["recipe_id"]
data = await request.json()
if not any(
key in data for key in ("title", "tags", "source_path", "preview_nsfw_level")
):
return web.json_response(
{
"error": (
"At least one field to update must be provided (title or tags or "
"source_path or preview_nsfw_level)"
)
},
status=400,
)
success = await recipe_scanner.update_recipe_metadata(recipe_id, data)
if not success:
return web.json_response({"error": "Recipe not found or update failed"}, status=404)
return web.json_response({"success": True, "recipe_id": recipe_id, "updates": data})
except Exception as exc:
self._logger.error("Error updating recipe: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
async def reconnect_lora(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
data = await request.json()
required_fields = ["recipe_id", "lora_index", "target_name"]
for field in required_fields:
if field not in data:
return web.json_response({"error": f"Missing required field: {field}"}, status=400)
recipe_id = data["recipe_id"]
lora_index = int(data["lora_index"])
target_name = data["target_name"]
recipe_path = os.path.join(recipe_scanner.recipes_dir, f"{recipe_id}.recipe.json")
if not os.path.exists(recipe_path):
return web.json_response({"error": "Recipe not found"}, status=404)
lora_scanner = getattr(recipe_scanner, "_lora_scanner", None)
target_lora = None if lora_scanner is None else await lora_scanner.get_model_info_by_name(target_name)
if not target_lora:
return web.json_response({"error": f"Local LoRA not found with name: {target_name}"}, status=404)
with open(recipe_path, "r", encoding="utf-8") as file_obj:
recipe_data = json.load(file_obj)
loras = recipe_data.get("loras", [])
lora = loras[lora_index] if lora_index < len(loras) else None
if lora is None:
return web.json_response({"error": "LoRA index out of range in recipe"}, status=404)
lora["isDeleted"] = False
lora["exclude"] = False
lora["file_name"] = target_name
if "sha256" in target_lora:
lora["hash"] = target_lora["sha256"].lower()
if target_lora.get("civitai"):
lora["modelName"] = target_lora["civitai"]["model"]["name"]
lora["modelVersionName"] = target_lora["civitai"]["name"]
lora["modelVersionId"] = target_lora["civitai"]["id"]
from ...utils.utils import calculate_recipe_fingerprint
recipe_data["fingerprint"] = calculate_recipe_fingerprint(recipe_data.get("loras", []))
with open(recipe_path, "w", encoding="utf-8") as file_obj:
json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False)
updated_lora = dict(lora)
updated_lora["inLibrary"] = True
updated_lora["preview_url"] = config.get_preview_static_url(target_lora["preview_url"])
updated_lora["localPath"] = target_lora["file_path"]
cache = getattr(recipe_scanner, "_cache", None)
if cache is not None:
for cache_item in cache.raw_data:
if cache_item.get("id") == recipe_id:
cache_item["loras"] = recipe_data["loras"]
cache_item["fingerprint"] = recipe_data["fingerprint"]
asyncio.create_task(cache.resort())
break
image_path = recipe_data.get("file_path")
if image_path and os.path.exists(image_path):
self._exif_utils.append_recipe_metadata(image_path, recipe_data)
matching_recipes = []
if "fingerprint" in recipe_data:
matching_recipes = await recipe_scanner.find_recipes_by_fingerprint(recipe_data["fingerprint"])
if recipe_id in matching_recipes:
matching_recipes.remove(recipe_id)
return web.json_response(
{
"success": True,
"recipe_id": recipe_id,
"updated_lora": updated_lora,
"matching_recipes": matching_recipes,
}
)
except Exception as exc:
self._logger.error("Error reconnecting LoRA: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
async def bulk_delete(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
data = await request.json()
recipe_ids = data.get("recipe_ids", [])
if not recipe_ids:
return web.json_response(
{"success": False, "error": "No recipe IDs provided"},
status=400,
)
recipes_dir = recipe_scanner.recipes_dir
if not recipes_dir or not os.path.exists(recipes_dir):
return web.json_response(
{"success": False, "error": "Recipes directory not found"},
status=404,
)
deleted_recipes: list[str] = []
failed_recipes: list[Dict[str, Any]] = []
for recipe_id in recipe_ids:
recipe_json_path = os.path.join(recipes_dir, f"{recipe_id}.recipe.json")
if not os.path.exists(recipe_json_path):
failed_recipes.append({"id": recipe_id, "reason": "Recipe not found"})
continue
try:
with open(recipe_json_path, "r", encoding="utf-8") as file_obj:
recipe_data = json.load(file_obj)
image_path = recipe_data.get("file_path")
os.remove(recipe_json_path)
if image_path and os.path.exists(image_path):
os.remove(image_path)
deleted_recipes.append(recipe_id)
except Exception as exc:
failed_recipes.append({"id": recipe_id, "reason": str(exc)})
cache = getattr(recipe_scanner, "_cache", None)
if deleted_recipes and cache is not None:
cache.raw_data = [item for item in cache.raw_data if item.get("id") not in deleted_recipes]
asyncio.create_task(cache.resort())
self._logger.info("Removed %s recipes from cache", len(deleted_recipes))
return web.json_response(
{
"success": True,
"deleted": deleted_recipes,
"failed": failed_recipes,
"total_deleted": len(deleted_recipes),
"total_failed": len(failed_recipes),
}
)
except Exception as exc:
self._logger.error("Error performing bulk delete: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500)
async def save_recipe_from_widget(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
if self._metadata_collector is None or self._metadata_processor_cls is None:
return web.json_response({"error": "Metadata collection not available"}, status=400)
raw_metadata = self._metadata_collector()
metadata_dict = self._metadata_processor_cls.to_dict(raw_metadata)
if not metadata_dict:
return web.json_response({"error": "No generation metadata found"}, status=400)
if not self._standalone_mode and self._metadata_registry_cls is not None:
metadata_registry = self._metadata_registry_cls()
latest_image = metadata_registry.get_first_decoded_image()
else:
latest_image = None
if latest_image is None:
return web.json_response(
{"error": "No recent images found to use for recipe. Try generating an image first."},
status=400,
)
self._logger.debug("Image type: %s", type(latest_image))
try:
if isinstance(latest_image, tuple):
tensor_image = latest_image[0] if latest_image else None
if tensor_image is None:
return web.json_response({"error": "Empty image tuple received"}, status=400)
else:
tensor_image = latest_image
if hasattr(tensor_image, "shape"):
shape_info = tensor_image.shape
self._logger.debug("Tensor shape: %s, dtype: %s", shape_info, tensor_image.dtype)
import torch # type: ignore[import-not-found]
if isinstance(tensor_image, torch.Tensor):
image_np = tensor_image.cpu().numpy()
else:
image_np = np.array(tensor_image)
while len(image_np.shape) > 3:
image_np = image_np[0]
if image_np.dtype in (np.float32, np.float64) and image_np.max() <= 1.0:
image_np = (image_np * 255).astype(np.uint8)
if len(image_np.shape) == 3 and image_np.shape[2] == 3:
pil_image = Image.fromarray(image_np)
img_byte_arr = io.BytesIO()
pil_image.save(img_byte_arr, format="PNG")
image_bytes = img_byte_arr.getvalue()
else:
return web.json_response(
{"error": f"Cannot handle this data shape: {image_np.shape}, {image_np.dtype}"},
status=400,
)
except Exception as exc:
self._logger.error("Error processing image data: %s", exc, exc_info=True)
return web.json_response({"error": f"Error processing image: {exc}"}, status=400)
lora_stack = metadata_dict.get("loras", "")
import re
lora_matches = re.findall(r"<lora:([^:]+):([^>]+)>", lora_stack)
if not lora_matches:
return web.json_response({"error": "No LoRAs found in the generation metadata"}, status=400)
loras_for_name = lora_matches[:3]
recipe_name_parts = []
for name, strength in loras_for_name:
recipe_name_parts.append(f"{name.strip()}-{float(strength):.2f}")
recipe_name = "_".join(recipe_name_parts)
recipe_name = recipe_name or "recipe"
recipes_dir = recipe_scanner.recipes_dir
os.makedirs(recipes_dir, exist_ok=True)
import uuid
recipe_id = str(uuid.uuid4())
image_filename = f"{recipe_id}.png"
image_path = os.path.join(recipes_dir, image_filename)
with open(image_path, "wb") as file_obj:
file_obj.write(image_bytes)
loras_data = []
lora_scanner = getattr(recipe_scanner, "_lora_scanner", None)
base_model_counts: Dict[str, int] = {}
for name, strength in lora_matches:
lora_info = None
if lora_scanner is not None:
lora_info = await lora_scanner.get_model_info_by_name(name)
lora_data = {
"file_name": name,
"strength": float(strength),
"hash": (lora_info.get("sha256") or "").lower() if lora_info else "",
"modelVersionId": lora_info.get("civitai", {}).get("id") if lora_info else 0,
"modelName": lora_info.get("civitai", {}).get("model", {}).get("name") if lora_info else "",
"modelVersionName": lora_info.get("civitai", {}).get("name") if lora_info else "",
"isDeleted": False,
"exclude": False,
}
loras_data.append(lora_data)
if lora_info and "base_model" in lora_info:
base_model = lora_info["base_model"]
base_model_counts[base_model] = base_model_counts.get(base_model, 0) + 1
most_common_base_model = ""
if base_model_counts:
most_common_base_model = max(base_model_counts.items(), key=lambda item: item[1])[0]
recipe_data = {
"id": recipe_id,
"file_path": image_path,
"title": recipe_name,
"modified": time.time(),
"created_date": time.time(),
"base_model": most_common_base_model,
"loras": loras_data,
"checkpoint": metadata_dict.get("checkpoint", ""),
"gen_params": {
key: value
for key, value in metadata_dict.items()
if key not in ["checkpoint", "loras"]
},
"loras_stack": lora_stack,
}
json_filename = f"{recipe_id}.recipe.json"
json_path = os.path.join(recipes_dir, json_filename)
with open(json_path, "w", encoding="utf-8") as file_obj:
json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False)
self._exif_utils.append_recipe_metadata(image_path, recipe_data)
cache = getattr(recipe_scanner, "_cache", None)
if cache is not None:
cache.raw_data.append(recipe_data)
asyncio.create_task(cache.resort())
self._logger.info("Added recipe %s to cache", recipe_id)
return web.json_response(
{
"success": True,
"recipe_id": recipe_id,
"image_path": image_path,
"json_path": json_path,
"recipe_name": recipe_name,
}
)
except Exception as exc:
self._logger.error("Error saving recipe from widget: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
class RecipeAnalysisHandler:
"""Analyze images to extract recipe metadata."""
def __init__(
self,
*,
ensure_dependencies_ready: EnsureDependenciesCallable,
recipe_scanner_getter: RecipeScannerGetter,
civitai_client_getter: CivitaiClientGetter,
logger: Logger,
exif_utils=ExifUtils,
recipe_parser_factory=RecipeParserFactory,
downloader_factory=get_downloader,
) -> None:
self._ensure_dependencies_ready = ensure_dependencies_ready
self._recipe_scanner_getter = recipe_scanner_getter
self._civitai_client_getter = civitai_client_getter
self._logger = logger
self._exif_utils = exif_utils
self._recipe_parser_factory = recipe_parser_factory
self._downloader_factory = downloader_factory
async def analyze_uploaded_image(self, request: web.Request) -> web.Response:
temp_path: Optional[str] = None
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
civitai_client = self._civitai_client_getter()
if recipe_scanner is None or civitai_client is None:
raise RuntimeError("Required services unavailable")
content_type = request.headers.get("Content-Type", "")
is_url_mode = False
metadata: Optional[Dict[str, Any]] = None
if "multipart/form-data" in content_type:
reader = await request.multipart()
field = await reader.next()
if field is None or field.name != "image":
return web.json_response({"error": "No image field found", "loras": []}, status=400)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
while True:
chunk = await field.read_chunk()
if not chunk:
break
temp_file.write(chunk)
temp_path = temp_file.name
elif "application/json" in content_type:
data = await request.json()
url = data.get("url")
is_url_mode = True
if not url:
return web.json_response({"error": "No URL provided", "loras": []}, status=400)
import re
civitai_image_match = re.match(r"https://civitai\.com/images/(\d+)", url)
if civitai_image_match:
image_id = civitai_image_match.group(1)
image_info = await civitai_client.get_image_info(image_id)
if not image_info:
return web.json_response(
{"error": "Failed to fetch image information from Civitai", "loras": []},
status=400,
)
image_url = image_info.get("url")
if not image_url:
return web.json_response(
{"error": "No image URL found in Civitai response", "loras": []},
status=400,
)
downloader = await self._downloader_factory()
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
temp_path = temp_file.name
success, result = await downloader.download_file(
image_url,
temp_path,
use_auth=False,
)
if not success:
return web.json_response(
{"error": f"Failed to download image from URL: {result}", "loras": []},
status=400,
)
metadata = image_info.get("meta") if "meta" in image_info else None
else:
return web.json_response({"error": "Unsupported content type", "loras": []}, status=400)
if metadata is None and temp_path:
metadata = self._exif_utils.extract_image_metadata(temp_path)
if not metadata:
response: Dict[str, Any] = {"error": "No metadata found in this image", "loras": []}
if is_url_mode and temp_path:
with open(temp_path, "rb") as image_file:
response["image_base64"] = base64.b64encode(image_file.read()).decode("utf-8")
return web.json_response(response, status=200)
parser = self._recipe_parser_factory.create_parser(metadata)
if parser is None:
response = {"error": "No parser found for this image", "loras": []}
if is_url_mode and temp_path:
with open(temp_path, "rb") as image_file:
response["image_base64"] = base64.b64encode(image_file.read()).decode("utf-8")
return web.json_response(response, status=200)
result = await parser.parse_metadata(metadata, recipe_scanner=recipe_scanner)
if is_url_mode and temp_path:
with open(temp_path, "rb") as image_file:
result["image_base64"] = base64.b64encode(image_file.read()).decode("utf-8")
if "error" in result and not result.get("loras"):
return web.json_response(result, status=200)
from ...utils.utils import calculate_recipe_fingerprint
fingerprint = calculate_recipe_fingerprint(result.get("loras", []))
result["fingerprint"] = fingerprint
matching_recipes = []
if fingerprint:
matching_recipes = await recipe_scanner.find_recipes_by_fingerprint(fingerprint)
result["matching_recipes"] = matching_recipes
return web.json_response(result)
except Exception as exc:
self._logger.error("Error analyzing recipe image: %s", exc, exc_info=True)
return web.json_response({"error": str(exc), "loras": []}, status=500)
finally:
if temp_path and os.path.exists(temp_path):
try:
os.unlink(temp_path)
except Exception as cleanup_exc: # pragma: no cover - logging path
self._logger.error("Error deleting temporary file: %s", cleanup_exc)
async def analyze_local_image(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
data = await request.json()
file_path = data.get("path")
if not file_path:
return web.json_response({"error": "No file path provided", "loras": []}, status=400)
file_path = os.path.normpath(file_path.strip('"').strip("'"))
if not os.path.isfile(file_path):
return web.json_response({"error": "File not found", "loras": []}, status=404)
metadata = self._exif_utils.extract_image_metadata(file_path)
if not metadata:
with open(file_path, "rb") as image_file:
image_base64 = base64.b64encode(image_file.read()).decode("utf-8")
return web.json_response(
{"error": "No metadata found in this image", "loras": [], "image_base64": image_base64},
status=200,
)
parser = self._recipe_parser_factory.create_parser(metadata)
if parser is None:
with open(file_path, "rb") as image_file:
image_base64 = base64.b64encode(image_file.read()).decode("utf-8")
return web.json_response(
{"error": "No parser found for this image", "loras": [], "image_base64": image_base64},
status=200,
)
result = await parser.parse_metadata(metadata, recipe_scanner=recipe_scanner)
with open(file_path, "rb") as image_file:
result["image_base64"] = base64.b64encode(image_file.read()).decode("utf-8")
if "error" in result and not result.get("loras"):
return web.json_response(result, status=200)
from ...utils.utils import calculate_recipe_fingerprint
fingerprint = calculate_recipe_fingerprint(result.get("loras", []))
result["fingerprint"] = fingerprint
matching_recipes = []
if fingerprint:
matching_recipes = await recipe_scanner.find_recipes_by_fingerprint(fingerprint)
result["matching_recipes"] = matching_recipes
return web.json_response(result)
except Exception as exc:
self._logger.error("Error analyzing local image: %s", exc, exc_info=True)
return web.json_response({"error": str(exc), "loras": []}, status=500)
class RecipeSharingHandler:
"""Serve endpoints related to recipe sharing."""
def __init__(
self,
*,
ensure_dependencies_ready: EnsureDependenciesCallable,
recipe_scanner_getter: RecipeScannerGetter,
logger: Logger,
) -> None:
self._ensure_dependencies_ready = ensure_dependencies_ready
self._recipe_scanner_getter = recipe_scanner_getter
self._logger = logger
self._shared_recipes: Dict[str, Dict[str, Any]] = {}
async def share_recipe(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
recipe_id = request.match_info["recipe_id"]
cache = await recipe_scanner.get_cached_data()
recipe = next(
(r for r in getattr(cache, "raw_data", []) if str(r.get("id", "")) == recipe_id),
None,
)
if not recipe:
return web.json_response({"error": "Recipe not found"}, status=404)
image_path = recipe.get("file_path")
if not image_path or not os.path.exists(image_path):
return web.json_response({"error": "Recipe image not found"}, status=404)
import shutil
ext = os.path.splitext(image_path)[1]
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file:
temp_path = temp_file.name
shutil.copy2(image_path, temp_path)
processed_path = temp_path
timestamp = int(time.time())
url_path = f"/api/recipe/{recipe_id}/share/download?t={timestamp}"
self._shared_recipes[recipe_id] = {
"path": processed_path,
"timestamp": timestamp,
"expires": time.time() + 300,
}
self._cleanup_shared_recipes()
filename = f"recipe_{recipe.get('title', '').replace(' ', '_').lower()}{ext}"
return web.json_response({"success": True, "download_url": url_path, "filename": filename})
except Exception as exc:
self._logger.error("Error sharing recipe: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
async def download_shared_recipe(self, request: web.Request) -> web.Response:
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
recipe_id = request.match_info["recipe_id"]
shared_info = self._shared_recipes.get(recipe_id)
if not shared_info:
return web.json_response({"error": "Shared recipe not found or expired"}, status=404)
file_path = shared_info["path"]
if not os.path.exists(file_path):
return web.json_response({"error": "Shared recipe file not found"}, status=404)
cache = await recipe_scanner.get_cached_data()
recipe = next(
(r for r in getattr(cache, "raw_data", []) if str(r.get("id", "")) == recipe_id),
None,
)
filename_base = (
f"recipe_{recipe.get('title', '').replace(' ', '_').lower()}"
if recipe
else recipe_id
)
ext = os.path.splitext(file_path)[1]
download_filename = f"{filename_base}{ext}"
return web.FileResponse(
file_path,
headers={"Content-Disposition": f'attachment; filename="{download_filename}"'},
)
except Exception as exc:
self._logger.error("Error downloading shared recipe: %s", exc, exc_info=True)
return web.json_response({"error": str(exc)}, status=500)
def _cleanup_shared_recipes(self) -> None:
current_time = time.time()
expired_ids = [
recipe_id
for recipe_id, info in self._shared_recipes.items()
if current_time > info.get("expires", 0)
]
for recipe_id in expired_ids:
try:
file_path = self._shared_recipes[recipe_id]["path"]
if os.path.exists(file_path):
os.unlink(file_path)
except Exception as exc: # pragma: no cover - logging path
self._logger.error("Error cleaning up shared recipe %s: %s", recipe_id, exc)
finally:
self._shared_recipes.pop(recipe_id, None)