diff --git a/py/routes/base_recipe_routes.py b/py/routes/base_recipe_routes.py index e2b726da..59d4e7ec 100644 --- a/py/routes/base_recipe_routes.py +++ b/py/routes/base_recipe_routes.py @@ -11,6 +11,15 @@ from ..config import config from ..services.server_i18n import server_i18n from ..services.service_registry import ServiceRegistry from ..services.settings_manager import settings +from .handlers.recipe_handlers import ( + RecipeAnalysisHandler, + RecipeHandlerSet, + RecipeListingHandler, + RecipeManagementHandler, + RecipePageView, + RecipeQueryHandler, + RecipeSharingHandler, +) from .recipe_route_registrar import ROUTE_DEFINITIONS logger = logging.getLogger(__name__) @@ -23,6 +32,8 @@ class BaseRecipeRoutes: definition.handler_name for definition in ROUTE_DEFINITIONS ) + template_name: str = "recipes.html" + def __init__(self) -> None: self.recipe_scanner = None self.lora_scanner = None @@ -36,6 +47,7 @@ class BaseRecipeRoutes: self._i18n_registered = False self._startup_hooks_registered = False + self._handler_set: RecipeHandlerSet | None = None self._handler_mapping: dict[str, Callable] | None = None async def attach_dependencies(self, app: web.Application | None = None) -> None: @@ -81,10 +93,9 @@ class BaseRecipeRoutes: """Return a mapping of handler name to coroutine for registrar binding.""" if self._handler_mapping is None: - owner = self.get_handler_owner() - self._handler_mapping = { - name: getattr(owner, name) for name in self._HANDLER_NAMES - } + handler_set = self._create_handler_set() + self._handler_set = handler_set + self._handler_mapping = handler_set.to_route_mapping() return self._handler_mapping # Internal helpers ------------------------------------------------- @@ -105,5 +116,57 @@ class BaseRecipeRoutes: def get_handler_owner(self): """Return the object supplying bound handler coroutines.""" - return self + if self._handler_set is None: + self._handler_set = self._create_handler_set() + return self._handler_set + + def _create_handler_set(self) -> RecipeHandlerSet: + recipe_scanner_getter = lambda: self.recipe_scanner + civitai_client_getter = lambda: self.civitai_client + + page_view = RecipePageView( + ensure_dependencies_ready=self.ensure_dependencies_ready, + settings_service=self.settings, + server_i18n=self.server_i18n, + template_env=self.template_env, + template_name=self.template_name, + recipe_scanner_getter=recipe_scanner_getter, + logger=logger, + ) + listing = RecipeListingHandler( + ensure_dependencies_ready=self.ensure_dependencies_ready, + recipe_scanner_getter=recipe_scanner_getter, + logger=logger, + ) + query = RecipeQueryHandler( + ensure_dependencies_ready=self.ensure_dependencies_ready, + recipe_scanner_getter=recipe_scanner_getter, + format_recipe_file_url=listing.format_recipe_file_url, + logger=logger, + ) + management = RecipeManagementHandler( + ensure_dependencies_ready=self.ensure_dependencies_ready, + recipe_scanner_getter=recipe_scanner_getter, + logger=logger, + ) + analysis = RecipeAnalysisHandler( + ensure_dependencies_ready=self.ensure_dependencies_ready, + recipe_scanner_getter=recipe_scanner_getter, + civitai_client_getter=civitai_client_getter, + logger=logger, + ) + sharing = RecipeSharingHandler( + ensure_dependencies_ready=self.ensure_dependencies_ready, + recipe_scanner_getter=recipe_scanner_getter, + logger=logger, + ) + + return RecipeHandlerSet( + page_view=page_view, + listing=listing, + query=query, + management=management, + analysis=analysis, + sharing=sharing, + ) diff --git a/py/routes/handlers/recipe_handlers.py b/py/routes/handlers/recipe_handlers.py new file mode 100644 index 00000000..8d8f96bf --- /dev/null +++ b/py/routes/handlers/recipe_handlers.py @@ -0,0 +1,1347 @@ +"""Dedicated handler objects for recipe-related routes.""" +from __future__ import annotations + +import asyncio +import base64 +import io +import json +import logging +import os +import tempfile +import time +from dataclasses import dataclass +from typing import Any, Awaitable, Callable, Dict, Mapping, Optional + +import numpy as np +from aiohttp import web +from PIL import Image + +from ...config import config +from ...recipes import RecipeParserFactory +from ...services.downloader import get_downloader +from ...services.server_i18n import server_i18n as default_server_i18n +from ...services.settings_manager import SettingsManager +from ...utils.constants import CARD_PREVIEW_WIDTH +from ...utils.exif_utils import ExifUtils + +# Check if running in standalone mode +standalone_mode = os.environ.get("HF_HUB_DISABLE_TELEMETRY", "0") == "0" + +if not standalone_mode: + from ...metadata_collector import get_metadata + from ...metadata_collector.metadata_processor import MetadataProcessor + from ...metadata_collector.metadata_registry import MetadataRegistry +else: # pragma: no cover - optional dependency path + get_metadata = None # type: ignore[assignment] + MetadataProcessor = None # type: ignore[assignment] + MetadataRegistry = None # type: ignore[assignment] + +Logger = logging.Logger +EnsureDependenciesCallable = Callable[[], Awaitable[None]] +RecipeScannerGetter = Callable[[], Any] +CivitaiClientGetter = Callable[[], Any] + + +@dataclass(frozen=True) +class RecipeHandlerSet: + """Group of handlers providing recipe route implementations.""" + + page_view: "RecipePageView" + listing: "RecipeListingHandler" + query: "RecipeQueryHandler" + management: "RecipeManagementHandler" + analysis: "RecipeAnalysisHandler" + sharing: "RecipeSharingHandler" + + def to_route_mapping(self) -> Mapping[str, Callable[[web.Request], Awaitable[web.StreamResponse]]]: + """Expose handler coroutines keyed by registrar handler names.""" + + return { + "render_page": self.page_view.render_page, + "list_recipes": self.listing.list_recipes, + "get_recipe": self.listing.get_recipe, + "analyze_uploaded_image": self.analysis.analyze_uploaded_image, + "analyze_local_image": self.analysis.analyze_local_image, + "save_recipe": self.management.save_recipe, + "delete_recipe": self.management.delete_recipe, + "get_top_tags": self.query.get_top_tags, + "get_base_models": self.query.get_base_models, + "share_recipe": self.sharing.share_recipe, + "download_shared_recipe": self.sharing.download_shared_recipe, + "get_recipe_syntax": self.query.get_recipe_syntax, + "update_recipe": self.management.update_recipe, + "reconnect_lora": self.management.reconnect_lora, + "find_duplicates": self.query.find_duplicates, + "bulk_delete": self.management.bulk_delete, + "save_recipe_from_widget": self.management.save_recipe_from_widget, + "get_recipes_for_lora": self.query.get_recipes_for_lora, + "scan_recipes": self.query.scan_recipes, + } + + +class RecipePageView: + """Render the recipe shell page.""" + + def __init__( + self, + *, + ensure_dependencies_ready: EnsureDependenciesCallable, + settings_service: SettingsManager, + server_i18n=default_server_i18n, + template_env, + template_name: str, + recipe_scanner_getter: RecipeScannerGetter, + logger: Logger, + ) -> None: + self._ensure_dependencies_ready = ensure_dependencies_ready + self._settings = settings_service + self._server_i18n = server_i18n + self._template_env = template_env + self._template_name = template_name + self._recipe_scanner_getter = recipe_scanner_getter + self._logger = logger + + async def render_page(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: # pragma: no cover - defensive guard + raise RuntimeError("Recipe scanner not available") + + user_language = self._settings.get("language", "en") + self._server_i18n.set_locale(user_language) + + try: + await recipe_scanner.get_cached_data(force_refresh=False) + rendered = self._template_env.get_template(self._template_name).render( + recipes=[], + is_initializing=False, + settings=self._settings, + request=request, + t=self._server_i18n.get_translation, + ) + except Exception as cache_error: # pragma: no cover - logging path + self._logger.error("Error loading recipe cache data: %s", cache_error) + rendered = self._template_env.get_template(self._template_name).render( + is_initializing=True, + settings=self._settings, + request=request, + t=self._server_i18n.get_translation, + ) + return web.Response(text=rendered, content_type="text/html") + except Exception as exc: # pragma: no cover - logging path + self._logger.error("Error handling recipes request: %s", exc, exc_info=True) + return web.Response(text="Error loading recipes page", status=500) + + +class RecipeListingHandler: + """Provide listing and detail APIs for recipes.""" + + def __init__( + self, + *, + ensure_dependencies_ready: EnsureDependenciesCallable, + recipe_scanner_getter: RecipeScannerGetter, + logger: Logger, + ) -> None: + self._ensure_dependencies_ready = ensure_dependencies_ready + self._recipe_scanner_getter = recipe_scanner_getter + self._logger = logger + + async def list_recipes(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + page = int(request.query.get("page", "1")) + page_size = int(request.query.get("page_size", "20")) + sort_by = request.query.get("sort_by", "date") + search = request.query.get("search") + + search_options = { + "title": request.query.get("search_title", "true").lower() == "true", + "tags": request.query.get("search_tags", "true").lower() == "true", + "lora_name": request.query.get("search_lora_name", "true").lower() == "true", + "lora_model": request.query.get("search_lora_model", "true").lower() == "true", + } + + filters: Dict[str, list[str]] = {} + base_models = request.query.get("base_models") + if base_models: + filters["base_model"] = base_models.split(",") + + tags = request.query.get("tags") + if tags: + filters["tags"] = tags.split(",") + + lora_hash = request.query.get("lora_hash") + + result = await recipe_scanner.get_paginated_data( + page=page, + page_size=page_size, + sort_by=sort_by, + search=search, + filters=filters, + search_options=search_options, + lora_hash=lora_hash, + ) + + for item in result.get("items", []): + file_path = item.get("file_path") + if file_path: + item["file_url"] = self.format_recipe_file_url(file_path) + else: + item.setdefault("file_url", "/loras_static/images/no-preview.png") + item.setdefault("loras", []) + item.setdefault("base_model", "") + + return web.json_response(result) + except Exception as exc: + self._logger.error("Error retrieving recipes: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + async def get_recipe(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + recipe_id = request.match_info["recipe_id"] + recipe = await recipe_scanner.get_recipe_by_id(recipe_id) + + if not recipe: + return web.json_response({"error": "Recipe not found"}, status=404) + return web.json_response(recipe) + except Exception as exc: + self._logger.error("Error retrieving recipe details: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + def format_recipe_file_url(self, file_path: str) -> str: + try: + recipes_dir = os.path.join(config.loras_roots[0], "recipes").replace(os.sep, "/") + normalized_path = file_path.replace(os.sep, "/") + if normalized_path.startswith(recipes_dir): + relative_path = os.path.relpath(file_path, config.loras_roots[0]).replace(os.sep, "/") + return f"/loras_static/root1/preview/{relative_path}" + + file_name = os.path.basename(file_path) + return f"/loras_static/root1/preview/recipes/{file_name}" + except Exception as exc: # pragma: no cover - logging path + self._logger.error("Error formatting recipe file URL: %s", exc, exc_info=True) + return "/loras_static/images/no-preview.png" + + +class RecipeQueryHandler: + """Provide read-only insights on recipe data.""" + + def __init__( + self, + *, + ensure_dependencies_ready: EnsureDependenciesCallable, + recipe_scanner_getter: RecipeScannerGetter, + format_recipe_file_url: Callable[[str], str], + logger: Logger, + ) -> None: + self._ensure_dependencies_ready = ensure_dependencies_ready + self._recipe_scanner_getter = recipe_scanner_getter + self._format_recipe_file_url = format_recipe_file_url + self._logger = logger + + async def get_top_tags(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + limit = int(request.query.get("limit", "20")) + cache = await recipe_scanner.get_cached_data() + + tag_counts: Dict[str, int] = {} + for recipe in getattr(cache, "raw_data", []): + for tag in recipe.get("tags", []) or []: + tag_counts[tag] = tag_counts.get(tag, 0) + 1 + + sorted_tags = [{"tag": tag, "count": count} for tag, count in tag_counts.items()] + sorted_tags.sort(key=lambda entry: entry["count"], reverse=True) + return web.json_response({"success": True, "tags": sorted_tags[:limit]}) + except Exception as exc: + self._logger.error("Error retrieving top tags: %s", exc, exc_info=True) + return web.json_response({"success": False, "error": str(exc)}, status=500) + + async def get_base_models(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + cache = await recipe_scanner.get_cached_data() + + base_model_counts: Dict[str, int] = {} + for recipe in getattr(cache, "raw_data", []): + base_model = recipe.get("base_model") + if base_model: + base_model_counts[base_model] = base_model_counts.get(base_model, 0) + 1 + + sorted_models = [{"name": model, "count": count} for model, count in base_model_counts.items()] + sorted_models.sort(key=lambda entry: entry["count"], reverse=True) + return web.json_response({"success": True, "base_models": sorted_models}) + except Exception as exc: + self._logger.error("Error retrieving base models: %s", exc, exc_info=True) + return web.json_response({"success": False, "error": str(exc)}, status=500) + + async def get_recipes_for_lora(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + lora_hash = request.query.get("hash") + if not lora_hash: + return web.json_response({"success": False, "error": "Lora hash is required"}, status=400) + + cache = await recipe_scanner.get_cached_data() + matching_recipes = [] + for recipe in getattr(cache, "raw_data", []): + for lora in recipe.get("loras", []): + if lora.get("hash", "").lower() == lora_hash.lower(): + matching_recipes.append(recipe) + break + + lora_scanner = getattr(recipe_scanner, "_lora_scanner", None) + for recipe in matching_recipes: + for lora in recipe.get("loras", []): + hash_value = (lora.get("hash") or "").lower() + if hash_value and lora_scanner is not None: + lora["inLibrary"] = lora_scanner.has_hash(hash_value) + lora["preview_url"] = lora_scanner.get_preview_url_by_hash(hash_value) + lora["localPath"] = lora_scanner.get_path_by_hash(hash_value) + if recipe.get("file_path"): + recipe["file_url"] = self._format_recipe_file_url(recipe["file_path"]) + else: + recipe["file_url"] = "/loras_static/images/no-preview.png" + + return web.json_response({"success": True, "recipes": matching_recipes}) + except Exception as exc: + self._logger.error("Error getting recipes for Lora: %s", exc) + return web.json_response({"success": False, "error": str(exc)}, status=500) + + async def scan_recipes(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + self._logger.info("Manually triggering recipe cache rebuild") + await recipe_scanner.get_cached_data(force_refresh=True) + return web.json_response({"success": True, "message": "Recipe cache refreshed successfully"}) + except Exception as exc: + self._logger.error("Error refreshing recipe cache: %s", exc, exc_info=True) + return web.json_response({"success": False, "error": str(exc)}, status=500) + + async def find_duplicates(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + duplicate_groups = await recipe_scanner.find_all_duplicate_recipes() + response_data = [] + + for fingerprint, recipe_ids in duplicate_groups.items(): + if len(recipe_ids) <= 1: + continue + + recipes = [] + for recipe_id in recipe_ids: + recipe = await recipe_scanner.get_recipe_by_id(recipe_id) + if recipe: + recipes.append( + { + "id": recipe.get("id"), + "title": recipe.get("title"), + "file_url": recipe.get("file_url") + or self._format_recipe_file_url(recipe.get("file_path", "")), + "modified": recipe.get("modified"), + "created_date": recipe.get("created_date"), + "lora_count": len(recipe.get("loras", [])), + } + ) + + if len(recipes) >= 2: + recipes.sort(key=lambda entry: entry.get("modified", 0), reverse=True) + response_data.append( + { + "fingerprint": fingerprint, + "count": len(recipes), + "recipes": recipes, + } + ) + + response_data.sort(key=lambda entry: entry["count"], reverse=True) + return web.json_response({"success": True, "duplicate_groups": response_data}) + except Exception as exc: + self._logger.error("Error finding duplicate recipes: %s", exc, exc_info=True) + return web.json_response({"success": False, "error": str(exc)}, status=500) + + async def get_recipe_syntax(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + recipe_id = request.match_info["recipe_id"] + cache = await recipe_scanner.get_cached_data() + recipe = next( + (r for r in getattr(cache, "raw_data", []) if str(r.get("id", "")) == recipe_id), + None, + ) + if not recipe: + return web.json_response({"error": "Recipe not found"}, status=404) + + loras = recipe.get("loras", []) + if not loras: + return web.json_response({"error": "No LoRAs found in this recipe"}, status=400) + + lora_scanner = getattr(recipe_scanner, "_lora_scanner", None) + hash_index = getattr(lora_scanner, "_hash_index", None) + + lora_syntax_parts = [] + for lora in loras: + if lora.get("isDeleted", False): + continue + hash_value = (lora.get("hash") or "").lower() + if not hash_value or lora_scanner is None or not lora_scanner.has_hash(hash_value): + continue + + file_name = None + if hash_value and hash_index is not None and hasattr(hash_index, "_hash_to_path"): + file_path = hash_index._hash_to_path.get(hash_value) + if file_path: + file_name = os.path.splitext(os.path.basename(file_path))[0] + + if not file_name and lora.get("modelVersionId") and lora_scanner is not None: + all_loras = await lora_scanner.get_cached_data() + for cached_lora in getattr(all_loras, "raw_data", []): + civitai_info = cached_lora.get("civitai") + if civitai_info and civitai_info.get("id") == lora.get("modelVersionId"): + file_name = os.path.splitext(os.path.basename(cached_lora["path"]))[0] + break + + if not file_name: + file_name = lora.get("file_name", "unknown-lora") + + strength = lora.get("strength", 1.0) + lora_syntax_parts.append(f"") + + return web.json_response({"success": True, "syntax": " ".join(lora_syntax_parts)}) + except Exception as exc: + self._logger.error("Error generating recipe syntax: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + +class RecipeManagementHandler: + """Handle create/update/delete style recipe operations.""" + + def __init__( + self, + *, + ensure_dependencies_ready: EnsureDependenciesCallable, + recipe_scanner_getter: RecipeScannerGetter, + logger: Logger, + exif_utils=ExifUtils, + card_preview_width: int = CARD_PREVIEW_WIDTH, + metadata_collector: Optional[Callable[[], Any]] = get_metadata, + metadata_processor_cls: Optional[type] = MetadataProcessor, + metadata_registry_cls: Optional[type] = MetadataRegistry, + standalone_mode: bool = standalone_mode, + ) -> None: + self._ensure_dependencies_ready = ensure_dependencies_ready + self._recipe_scanner_getter = recipe_scanner_getter + self._logger = logger + self._exif_utils = exif_utils + self._card_preview_width = card_preview_width + self._metadata_collector = metadata_collector + self._metadata_processor_cls = metadata_processor_cls + self._metadata_registry_cls = metadata_registry_cls + self._standalone_mode = standalone_mode + + async def save_recipe(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + reader = await request.multipart() + + image: Optional[bytes] = None + image_base64: Optional[str] = None + name: Optional[str] = None + tags: list[str] = [] + metadata: Dict[str, Any] | None = None + + while True: + field = await reader.next() + if field is None: + break + + if field.name == "image": + image_chunks = bytearray() + while True: + chunk = await field.read_chunk() + if not chunk: + break + image_chunks.extend(chunk) + image = bytes(image_chunks) + elif field.name == "image_base64": + image_base64 = await field.text() + elif field.name == "name": + name = await field.text() + elif field.name == "tags": + tags_text = await field.text() + try: + parsed_tags = json.loads(tags_text) + tags = parsed_tags if isinstance(parsed_tags, list) else [] + except Exception: + tags = [] + elif field.name == "metadata": + metadata_text = await field.text() + try: + metadata = json.loads(metadata_text) + except Exception: + metadata = {} + + missing_fields = [] + if not name: + missing_fields.append("name") + if not metadata: + missing_fields.append("metadata") + if missing_fields: + return web.json_response( + {"error": f"Missing required fields: {', '.join(missing_fields)}"}, + status=400, + ) + + if image is None: + if image_base64: + try: + if "," in image_base64: + image_base64 = image_base64.split(",", 1)[1] + image = base64.b64decode(image_base64) + except Exception as exc: + return web.json_response({"error": f"Invalid base64 image data: {exc}"}, status=400) + else: + return web.json_response({"error": "No image data provided"}, status=400) + + recipes_dir = recipe_scanner.recipes_dir + os.makedirs(recipes_dir, exist_ok=True) + + import uuid + + recipe_id = str(uuid.uuid4()) + optimized_image, extension = self._exif_utils.optimize_image( + image_data=image, + target_width=self._card_preview_width, + format="webp", + quality=85, + preserve_metadata=True, + ) + + image_filename = f"{recipe_id}{extension}" + image_path = os.path.join(recipes_dir, image_filename) + with open(image_path, "wb") as file_obj: + file_obj.write(optimized_image) + + current_time = time.time() + loras_data = [] + for lora in metadata.get("loras", []): + loras_data.append( + { + "file_name": lora.get("file_name", "") + or ( + os.path.splitext(os.path.basename(lora.get("localPath", "")))[0] + if lora.get("localPath") + else "" + ), + "hash": (lora.get("hash") or "").lower(), + "strength": float(lora.get("weight", 1.0)), + "modelVersionId": lora.get("id", 0), + "modelName": lora.get("name", ""), + "modelVersionName": lora.get("version", ""), + "isDeleted": lora.get("isDeleted", False), + "exclude": lora.get("exclude", False), + } + ) + + gen_params = metadata.get("gen_params", {}) + if not gen_params and "raw_metadata" in metadata: + raw_metadata = metadata.get("raw_metadata", {}) + gen_params = { + "prompt": raw_metadata.get("prompt", ""), + "negative_prompt": raw_metadata.get("negative_prompt", ""), + "checkpoint": raw_metadata.get("checkpoint", {}), + "steps": raw_metadata.get("steps", ""), + "sampler": raw_metadata.get("sampler", ""), + "cfg_scale": raw_metadata.get("cfg_scale", ""), + "seed": raw_metadata.get("seed", ""), + "size": raw_metadata.get("size", ""), + "clip_skip": raw_metadata.get("clip_skip", ""), + } + + from ...utils.utils import calculate_recipe_fingerprint + + fingerprint = calculate_recipe_fingerprint(loras_data) + + recipe_data = { + "id": recipe_id, + "file_path": image_path, + "title": name, + "modified": current_time, + "created_date": current_time, + "base_model": metadata.get("base_model", ""), + "loras": loras_data, + "gen_params": gen_params, + "fingerprint": fingerprint, + } + + if tags: + recipe_data["tags"] = tags + + if metadata.get("source_path"): + recipe_data["source_path"] = metadata.get("source_path") + + json_filename = f"{recipe_id}.recipe.json" + json_path = os.path.join(recipes_dir, json_filename) + with open(json_path, "w", encoding="utf-8") as file_obj: + json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False) + + self._exif_utils.append_recipe_metadata(image_path, recipe_data) + + matching_recipes = [] + if fingerprint: + matching_recipes = await recipe_scanner.find_recipes_by_fingerprint(fingerprint) + if recipe_id in matching_recipes: + matching_recipes.remove(recipe_id) + + cache = getattr(recipe_scanner, "_cache", None) + if cache is not None: + cache.raw_data.append(recipe_data) + asyncio.create_task(cache.resort()) + self._logger.info("Added recipe %s to cache", recipe_id) + + return web.json_response( + { + "success": True, + "recipe_id": recipe_id, + "image_path": image_path, + "json_path": json_path, + "matching_recipes": matching_recipes, + } + ) + except Exception as exc: + self._logger.error("Error saving recipe: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + async def delete_recipe(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + recipe_id = request.match_info["recipe_id"] + recipes_dir = recipe_scanner.recipes_dir + if not recipes_dir or not os.path.exists(recipes_dir): + return web.json_response({"error": "Recipes directory not found"}, status=404) + + recipe_json_path = os.path.join(recipes_dir, f"{recipe_id}.recipe.json") + if not os.path.exists(recipe_json_path): + return web.json_response({"error": "Recipe not found"}, status=404) + + with open(recipe_json_path, "r", encoding="utf-8") as file_obj: + recipe_data = json.load(file_obj) + + image_path = recipe_data.get("file_path") + os.remove(recipe_json_path) + self._logger.info("Deleted recipe JSON file: %s", recipe_json_path) + + if image_path and os.path.exists(image_path): + os.remove(image_path) + self._logger.info("Deleted recipe image: %s", image_path) + + cache = getattr(recipe_scanner, "_cache", None) + if cache is not None: + cache.raw_data = [ + item for item in cache.raw_data if str(item.get("id", "")) != recipe_id + ] + asyncio.create_task(cache.resort()) + self._logger.info("Removed recipe %s from cache", recipe_id) + + return web.json_response({"success": True, "message": "Recipe deleted successfully"}) + except Exception as exc: + self._logger.error("Error deleting recipe: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + async def update_recipe(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + recipe_id = request.match_info["recipe_id"] + data = await request.json() + + if not any( + key in data for key in ("title", "tags", "source_path", "preview_nsfw_level") + ): + return web.json_response( + { + "error": ( + "At least one field to update must be provided (title or tags or " + "source_path or preview_nsfw_level)" + ) + }, + status=400, + ) + + success = await recipe_scanner.update_recipe_metadata(recipe_id, data) + if not success: + return web.json_response({"error": "Recipe not found or update failed"}, status=404) + + return web.json_response({"success": True, "recipe_id": recipe_id, "updates": data}) + except Exception as exc: + self._logger.error("Error updating recipe: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + async def reconnect_lora(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + data = await request.json() + required_fields = ["recipe_id", "lora_index", "target_name"] + for field in required_fields: + if field not in data: + return web.json_response({"error": f"Missing required field: {field}"}, status=400) + + recipe_id = data["recipe_id"] + lora_index = int(data["lora_index"]) + target_name = data["target_name"] + + recipe_path = os.path.join(recipe_scanner.recipes_dir, f"{recipe_id}.recipe.json") + if not os.path.exists(recipe_path): + return web.json_response({"error": "Recipe not found"}, status=404) + + lora_scanner = getattr(recipe_scanner, "_lora_scanner", None) + target_lora = None if lora_scanner is None else await lora_scanner.get_model_info_by_name(target_name) + if not target_lora: + return web.json_response({"error": f"Local LoRA not found with name: {target_name}"}, status=404) + + with open(recipe_path, "r", encoding="utf-8") as file_obj: + recipe_data = json.load(file_obj) + + loras = recipe_data.get("loras", []) + lora = loras[lora_index] if lora_index < len(loras) else None + if lora is None: + return web.json_response({"error": "LoRA index out of range in recipe"}, status=404) + + lora["isDeleted"] = False + lora["exclude"] = False + lora["file_name"] = target_name + if "sha256" in target_lora: + lora["hash"] = target_lora["sha256"].lower() + if target_lora.get("civitai"): + lora["modelName"] = target_lora["civitai"]["model"]["name"] + lora["modelVersionName"] = target_lora["civitai"]["name"] + lora["modelVersionId"] = target_lora["civitai"]["id"] + + from ...utils.utils import calculate_recipe_fingerprint + + recipe_data["fingerprint"] = calculate_recipe_fingerprint(recipe_data.get("loras", [])) + + with open(recipe_path, "w", encoding="utf-8") as file_obj: + json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False) + + updated_lora = dict(lora) + updated_lora["inLibrary"] = True + updated_lora["preview_url"] = config.get_preview_static_url(target_lora["preview_url"]) + updated_lora["localPath"] = target_lora["file_path"] + + cache = getattr(recipe_scanner, "_cache", None) + if cache is not None: + for cache_item in cache.raw_data: + if cache_item.get("id") == recipe_id: + cache_item["loras"] = recipe_data["loras"] + cache_item["fingerprint"] = recipe_data["fingerprint"] + asyncio.create_task(cache.resort()) + break + + image_path = recipe_data.get("file_path") + if image_path and os.path.exists(image_path): + self._exif_utils.append_recipe_metadata(image_path, recipe_data) + + matching_recipes = [] + if "fingerprint" in recipe_data: + matching_recipes = await recipe_scanner.find_recipes_by_fingerprint(recipe_data["fingerprint"]) + if recipe_id in matching_recipes: + matching_recipes.remove(recipe_id) + + return web.json_response( + { + "success": True, + "recipe_id": recipe_id, + "updated_lora": updated_lora, + "matching_recipes": matching_recipes, + } + ) + except Exception as exc: + self._logger.error("Error reconnecting LoRA: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + async def bulk_delete(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + data = await request.json() + recipe_ids = data.get("recipe_ids", []) + if not recipe_ids: + return web.json_response( + {"success": False, "error": "No recipe IDs provided"}, + status=400, + ) + + recipes_dir = recipe_scanner.recipes_dir + if not recipes_dir or not os.path.exists(recipes_dir): + return web.json_response( + {"success": False, "error": "Recipes directory not found"}, + status=404, + ) + + deleted_recipes: list[str] = [] + failed_recipes: list[Dict[str, Any]] = [] + + for recipe_id in recipe_ids: + recipe_json_path = os.path.join(recipes_dir, f"{recipe_id}.recipe.json") + if not os.path.exists(recipe_json_path): + failed_recipes.append({"id": recipe_id, "reason": "Recipe not found"}) + continue + + try: + with open(recipe_json_path, "r", encoding="utf-8") as file_obj: + recipe_data = json.load(file_obj) + image_path = recipe_data.get("file_path") + os.remove(recipe_json_path) + if image_path and os.path.exists(image_path): + os.remove(image_path) + deleted_recipes.append(recipe_id) + except Exception as exc: + failed_recipes.append({"id": recipe_id, "reason": str(exc)}) + + cache = getattr(recipe_scanner, "_cache", None) + if deleted_recipes and cache is not None: + cache.raw_data = [item for item in cache.raw_data if item.get("id") not in deleted_recipes] + asyncio.create_task(cache.resort()) + self._logger.info("Removed %s recipes from cache", len(deleted_recipes)) + + return web.json_response( + { + "success": True, + "deleted": deleted_recipes, + "failed": failed_recipes, + "total_deleted": len(deleted_recipes), + "total_failed": len(failed_recipes), + } + ) + except Exception as exc: + self._logger.error("Error performing bulk delete: %s", exc, exc_info=True) + return web.json_response({"success": False, "error": str(exc)}, status=500) + + async def save_recipe_from_widget(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + if self._metadata_collector is None or self._metadata_processor_cls is None: + return web.json_response({"error": "Metadata collection not available"}, status=400) + + raw_metadata = self._metadata_collector() + metadata_dict = self._metadata_processor_cls.to_dict(raw_metadata) + if not metadata_dict: + return web.json_response({"error": "No generation metadata found"}, status=400) + + if not self._standalone_mode and self._metadata_registry_cls is not None: + metadata_registry = self._metadata_registry_cls() + latest_image = metadata_registry.get_first_decoded_image() + else: + latest_image = None + + if latest_image is None: + return web.json_response( + {"error": "No recent images found to use for recipe. Try generating an image first."}, + status=400, + ) + + self._logger.debug("Image type: %s", type(latest_image)) + + try: + if isinstance(latest_image, tuple): + tensor_image = latest_image[0] if latest_image else None + if tensor_image is None: + return web.json_response({"error": "Empty image tuple received"}, status=400) + else: + tensor_image = latest_image + + if hasattr(tensor_image, "shape"): + shape_info = tensor_image.shape + self._logger.debug("Tensor shape: %s, dtype: %s", shape_info, tensor_image.dtype) + + import torch # type: ignore[import-not-found] + + if isinstance(tensor_image, torch.Tensor): + image_np = tensor_image.cpu().numpy() + else: + image_np = np.array(tensor_image) + + while len(image_np.shape) > 3: + image_np = image_np[0] + + if image_np.dtype in (np.float32, np.float64) and image_np.max() <= 1.0: + image_np = (image_np * 255).astype(np.uint8) + + if len(image_np.shape) == 3 and image_np.shape[2] == 3: + pil_image = Image.fromarray(image_np) + img_byte_arr = io.BytesIO() + pil_image.save(img_byte_arr, format="PNG") + image_bytes = img_byte_arr.getvalue() + else: + return web.json_response( + {"error": f"Cannot handle this data shape: {image_np.shape}, {image_np.dtype}"}, + status=400, + ) + except Exception as exc: + self._logger.error("Error processing image data: %s", exc, exc_info=True) + return web.json_response({"error": f"Error processing image: {exc}"}, status=400) + + lora_stack = metadata_dict.get("loras", "") + import re + + lora_matches = re.findall(r"]+)>", lora_stack) + if not lora_matches: + return web.json_response({"error": "No LoRAs found in the generation metadata"}, status=400) + + loras_for_name = lora_matches[:3] + recipe_name_parts = [] + for name, strength in loras_for_name: + recipe_name_parts.append(f"{name.strip()}-{float(strength):.2f}") + recipe_name = "_".join(recipe_name_parts) + + recipe_name = recipe_name or "recipe" + + recipes_dir = recipe_scanner.recipes_dir + os.makedirs(recipes_dir, exist_ok=True) + + import uuid + + recipe_id = str(uuid.uuid4()) + image_filename = f"{recipe_id}.png" + image_path = os.path.join(recipes_dir, image_filename) + with open(image_path, "wb") as file_obj: + file_obj.write(image_bytes) + + loras_data = [] + lora_scanner = getattr(recipe_scanner, "_lora_scanner", None) + base_model_counts: Dict[str, int] = {} + + for name, strength in lora_matches: + lora_info = None + if lora_scanner is not None: + lora_info = await lora_scanner.get_model_info_by_name(name) + lora_data = { + "file_name": name, + "strength": float(strength), + "hash": (lora_info.get("sha256") or "").lower() if lora_info else "", + "modelVersionId": lora_info.get("civitai", {}).get("id") if lora_info else 0, + "modelName": lora_info.get("civitai", {}).get("model", {}).get("name") if lora_info else "", + "modelVersionName": lora_info.get("civitai", {}).get("name") if lora_info else "", + "isDeleted": False, + "exclude": False, + } + loras_data.append(lora_data) + + if lora_info and "base_model" in lora_info: + base_model = lora_info["base_model"] + base_model_counts[base_model] = base_model_counts.get(base_model, 0) + 1 + + most_common_base_model = "" + if base_model_counts: + most_common_base_model = max(base_model_counts.items(), key=lambda item: item[1])[0] + + recipe_data = { + "id": recipe_id, + "file_path": image_path, + "title": recipe_name, + "modified": time.time(), + "created_date": time.time(), + "base_model": most_common_base_model, + "loras": loras_data, + "checkpoint": metadata_dict.get("checkpoint", ""), + "gen_params": { + key: value + for key, value in metadata_dict.items() + if key not in ["checkpoint", "loras"] + }, + "loras_stack": lora_stack, + } + + json_filename = f"{recipe_id}.recipe.json" + json_path = os.path.join(recipes_dir, json_filename) + with open(json_path, "w", encoding="utf-8") as file_obj: + json.dump(recipe_data, file_obj, indent=4, ensure_ascii=False) + + self._exif_utils.append_recipe_metadata(image_path, recipe_data) + + cache = getattr(recipe_scanner, "_cache", None) + if cache is not None: + cache.raw_data.append(recipe_data) + asyncio.create_task(cache.resort()) + self._logger.info("Added recipe %s to cache", recipe_id) + + return web.json_response( + { + "success": True, + "recipe_id": recipe_id, + "image_path": image_path, + "json_path": json_path, + "recipe_name": recipe_name, + } + ) + except Exception as exc: + self._logger.error("Error saving recipe from widget: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + +class RecipeAnalysisHandler: + """Analyze images to extract recipe metadata.""" + + def __init__( + self, + *, + ensure_dependencies_ready: EnsureDependenciesCallable, + recipe_scanner_getter: RecipeScannerGetter, + civitai_client_getter: CivitaiClientGetter, + logger: Logger, + exif_utils=ExifUtils, + recipe_parser_factory=RecipeParserFactory, + downloader_factory=get_downloader, + ) -> None: + self._ensure_dependencies_ready = ensure_dependencies_ready + self._recipe_scanner_getter = recipe_scanner_getter + self._civitai_client_getter = civitai_client_getter + self._logger = logger + self._exif_utils = exif_utils + self._recipe_parser_factory = recipe_parser_factory + self._downloader_factory = downloader_factory + + async def analyze_uploaded_image(self, request: web.Request) -> web.Response: + temp_path: Optional[str] = None + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + civitai_client = self._civitai_client_getter() + if recipe_scanner is None or civitai_client is None: + raise RuntimeError("Required services unavailable") + + content_type = request.headers.get("Content-Type", "") + is_url_mode = False + metadata: Optional[Dict[str, Any]] = None + + if "multipart/form-data" in content_type: + reader = await request.multipart() + field = await reader.next() + if field is None or field.name != "image": + return web.json_response({"error": "No image field found", "loras": []}, status=400) + + with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: + while True: + chunk = await field.read_chunk() + if not chunk: + break + temp_file.write(chunk) + temp_path = temp_file.name + elif "application/json" in content_type: + data = await request.json() + url = data.get("url") + is_url_mode = True + if not url: + return web.json_response({"error": "No URL provided", "loras": []}, status=400) + + import re + + civitai_image_match = re.match(r"https://civitai\.com/images/(\d+)", url) + if civitai_image_match: + image_id = civitai_image_match.group(1) + image_info = await civitai_client.get_image_info(image_id) + if not image_info: + return web.json_response( + {"error": "Failed to fetch image information from Civitai", "loras": []}, + status=400, + ) + image_url = image_info.get("url") + if not image_url: + return web.json_response( + {"error": "No image URL found in Civitai response", "loras": []}, + status=400, + ) + + downloader = await self._downloader_factory() + with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: + temp_path = temp_file.name + + success, result = await downloader.download_file( + image_url, + temp_path, + use_auth=False, + ) + if not success: + return web.json_response( + {"error": f"Failed to download image from URL: {result}", "loras": []}, + status=400, + ) + metadata = image_info.get("meta") if "meta" in image_info else None + else: + return web.json_response({"error": "Unsupported content type", "loras": []}, status=400) + + if metadata is None and temp_path: + metadata = self._exif_utils.extract_image_metadata(temp_path) + + if not metadata: + response: Dict[str, Any] = {"error": "No metadata found in this image", "loras": []} + if is_url_mode and temp_path: + with open(temp_path, "rb") as image_file: + response["image_base64"] = base64.b64encode(image_file.read()).decode("utf-8") + return web.json_response(response, status=200) + + parser = self._recipe_parser_factory.create_parser(metadata) + if parser is None: + response = {"error": "No parser found for this image", "loras": []} + if is_url_mode and temp_path: + with open(temp_path, "rb") as image_file: + response["image_base64"] = base64.b64encode(image_file.read()).decode("utf-8") + return web.json_response(response, status=200) + + result = await parser.parse_metadata(metadata, recipe_scanner=recipe_scanner) + + if is_url_mode and temp_path: + with open(temp_path, "rb") as image_file: + result["image_base64"] = base64.b64encode(image_file.read()).decode("utf-8") + + if "error" in result and not result.get("loras"): + return web.json_response(result, status=200) + + from ...utils.utils import calculate_recipe_fingerprint + + fingerprint = calculate_recipe_fingerprint(result.get("loras", [])) + result["fingerprint"] = fingerprint + + matching_recipes = [] + if fingerprint: + matching_recipes = await recipe_scanner.find_recipes_by_fingerprint(fingerprint) + + result["matching_recipes"] = matching_recipes + return web.json_response(result) + except Exception as exc: + self._logger.error("Error analyzing recipe image: %s", exc, exc_info=True) + return web.json_response({"error": str(exc), "loras": []}, status=500) + finally: + if temp_path and os.path.exists(temp_path): + try: + os.unlink(temp_path) + except Exception as cleanup_exc: # pragma: no cover - logging path + self._logger.error("Error deleting temporary file: %s", cleanup_exc) + + async def analyze_local_image(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + data = await request.json() + file_path = data.get("path") + if not file_path: + return web.json_response({"error": "No file path provided", "loras": []}, status=400) + + file_path = os.path.normpath(file_path.strip('"').strip("'")) + if not os.path.isfile(file_path): + return web.json_response({"error": "File not found", "loras": []}, status=404) + + metadata = self._exif_utils.extract_image_metadata(file_path) + if not metadata: + with open(file_path, "rb") as image_file: + image_base64 = base64.b64encode(image_file.read()).decode("utf-8") + return web.json_response( + {"error": "No metadata found in this image", "loras": [], "image_base64": image_base64}, + status=200, + ) + + parser = self._recipe_parser_factory.create_parser(metadata) + if parser is None: + with open(file_path, "rb") as image_file: + image_base64 = base64.b64encode(image_file.read()).decode("utf-8") + return web.json_response( + {"error": "No parser found for this image", "loras": [], "image_base64": image_base64}, + status=200, + ) + + result = await parser.parse_metadata(metadata, recipe_scanner=recipe_scanner) + with open(file_path, "rb") as image_file: + result["image_base64"] = base64.b64encode(image_file.read()).decode("utf-8") + + if "error" in result and not result.get("loras"): + return web.json_response(result, status=200) + + from ...utils.utils import calculate_recipe_fingerprint + + fingerprint = calculate_recipe_fingerprint(result.get("loras", [])) + result["fingerprint"] = fingerprint + + matching_recipes = [] + if fingerprint: + matching_recipes = await recipe_scanner.find_recipes_by_fingerprint(fingerprint) + result["matching_recipes"] = matching_recipes + + return web.json_response(result) + except Exception as exc: + self._logger.error("Error analyzing local image: %s", exc, exc_info=True) + return web.json_response({"error": str(exc), "loras": []}, status=500) + + +class RecipeSharingHandler: + """Serve endpoints related to recipe sharing.""" + + def __init__( + self, + *, + ensure_dependencies_ready: EnsureDependenciesCallable, + recipe_scanner_getter: RecipeScannerGetter, + logger: Logger, + ) -> None: + self._ensure_dependencies_ready = ensure_dependencies_ready + self._recipe_scanner_getter = recipe_scanner_getter + self._logger = logger + self._shared_recipes: Dict[str, Dict[str, Any]] = {} + + async def share_recipe(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + recipe_id = request.match_info["recipe_id"] + cache = await recipe_scanner.get_cached_data() + recipe = next( + (r for r in getattr(cache, "raw_data", []) if str(r.get("id", "")) == recipe_id), + None, + ) + if not recipe: + return web.json_response({"error": "Recipe not found"}, status=404) + + image_path = recipe.get("file_path") + if not image_path or not os.path.exists(image_path): + return web.json_response({"error": "Recipe image not found"}, status=404) + + import shutil + + ext = os.path.splitext(image_path)[1] + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file: + temp_path = temp_file.name + shutil.copy2(image_path, temp_path) + processed_path = temp_path + + timestamp = int(time.time()) + url_path = f"/api/recipe/{recipe_id}/share/download?t={timestamp}" + self._shared_recipes[recipe_id] = { + "path": processed_path, + "timestamp": timestamp, + "expires": time.time() + 300, + } + self._cleanup_shared_recipes() + + filename = f"recipe_{recipe.get('title', '').replace(' ', '_').lower()}{ext}" + return web.json_response({"success": True, "download_url": url_path, "filename": filename}) + except Exception as exc: + self._logger.error("Error sharing recipe: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + async def download_shared_recipe(self, request: web.Request) -> web.Response: + try: + await self._ensure_dependencies_ready() + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + recipe_id = request.match_info["recipe_id"] + shared_info = self._shared_recipes.get(recipe_id) + if not shared_info: + return web.json_response({"error": "Shared recipe not found or expired"}, status=404) + + file_path = shared_info["path"] + if not os.path.exists(file_path): + return web.json_response({"error": "Shared recipe file not found"}, status=404) + + cache = await recipe_scanner.get_cached_data() + recipe = next( + (r for r in getattr(cache, "raw_data", []) if str(r.get("id", "")) == recipe_id), + None, + ) + filename_base = ( + f"recipe_{recipe.get('title', '').replace(' ', '_').lower()}" + if recipe + else recipe_id + ) + ext = os.path.splitext(file_path)[1] + download_filename = f"{filename_base}{ext}" + + return web.FileResponse( + file_path, + headers={"Content-Disposition": f'attachment; filename="{download_filename}"'}, + ) + except Exception as exc: + self._logger.error("Error downloading shared recipe: %s", exc, exc_info=True) + return web.json_response({"error": str(exc)}, status=500) + + def _cleanup_shared_recipes(self) -> None: + current_time = time.time() + expired_ids = [ + recipe_id + for recipe_id, info in self._shared_recipes.items() + if current_time > info.get("expires", 0) + ] + + for recipe_id in expired_ids: + try: + file_path = self._shared_recipes[recipe_id]["path"] + if os.path.exists(file_path): + os.unlink(file_path) + except Exception as exc: # pragma: no cover - logging path + self._logger.error("Error cleaning up shared recipe %s: %s", recipe_id, exc) + finally: + self._shared_recipes.pop(recipe_id, None) diff --git a/py/routes/recipe_routes.py b/py/routes/recipe_routes.py index 55208552..2c233d01 100644 --- a/py/routes/recipe_routes.py +++ b/py/routes/recipe_routes.py @@ -1,41 +1,16 @@ -import os -import time -import base64 -import numpy as np -from PIL import Image -import io -import logging +"""Concrete recipe route configuration.""" + from aiohttp import web -from typing import Dict -import tempfile -import json -import asyncio -import sys from .base_recipe_routes import BaseRecipeRoutes from .recipe_route_registrar import RecipeRouteRegistrar -from ..utils.exif_utils import ExifUtils -from ..recipes import RecipeParserFactory -from ..utils.constants import CARD_PREVIEW_WIDTH -from ..config import config -from ..services.downloader import get_downloader - -# Check if running in standalone mode -standalone_mode = os.environ.get("HF_HUB_DISABLE_TELEMETRY", "0") == "0" - -# Only import MetadataRegistry in non-standalone mode -if not standalone_mode: - # Import metadata_collector functions and classes conditionally - from ..metadata_collector import get_metadata # Add MetadataCollector import - from ..metadata_collector.metadata_processor import MetadataProcessor # Add MetadataProcessor import - from ..metadata_collector.metadata_registry import MetadataRegistry - -logger = logging.getLogger(__name__) class RecipeRoutes(BaseRecipeRoutes): """API route handlers for Recipe management.""" + template_name = "recipes.html" + @classmethod def setup_routes(cls, app: web.Application): """Register API routes using the declarative registrar.""" @@ -44,1531 +19,3 @@ class RecipeRoutes(BaseRecipeRoutes): registrar = RecipeRouteRegistrar(app) registrar.register_routes(routes.to_route_mapping()) routes.register_startup_hooks(app) - - async def render_page(self, request: web.Request) -> web.Response: - """Handle GET /loras/recipes request.""" - try: - await self.ensure_dependencies_ready() - - # 获取用户语言设置 - user_language = self.settings.get('language', 'en') - - # 设置服务端i18n语言 - self.server_i18n.set_locale(user_language) - - # Skip initialization check and directly try to get cached data - try: - # Recipe scanner will initialize cache if needed - await self.recipe_scanner.get_cached_data(force_refresh=False) - template = self.template_env.get_template('recipes.html') - rendered = template.render( - recipes=[], # Frontend will load recipes via API - is_initializing=False, - settings=self.settings, - request=request, - # 添加服务端翻译函数 - t=self.server_i18n.get_translation, - ) - except Exception as cache_error: - logger.error(f"Error loading recipe cache data: {cache_error}") - # Still keep error handling - show initializing page on error - template = self.template_env.get_template('recipes.html') - rendered = template.render( - is_initializing=True, - settings=self.settings, - request=request, - # 添加服务端翻译函数 - t=self.server_i18n.get_translation, - ) - logger.info("Recipe cache error, returning initialization page") - - return web.Response( - text=rendered, - content_type='text/html' - ) - - except Exception as e: - logger.error(f"Error handling recipes request: {e}", exc_info=True) - return web.Response( - text="Error loading recipes page", - status=500 - ) - - async def list_recipes(self, request: web.Request) -> web.Response: - """API endpoint for getting paginated recipes.""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Get query parameters with defaults - page = int(request.query.get('page', '1')) - page_size = int(request.query.get('page_size', '20')) - sort_by = request.query.get('sort_by', 'date') - search = request.query.get('search', None) - - # Get search options (renamed for better clarity) - search_title = request.query.get('search_title', 'true').lower() == 'true' - search_tags = request.query.get('search_tags', 'true').lower() == 'true' - search_lora_name = request.query.get('search_lora_name', 'true').lower() == 'true' - search_lora_model = request.query.get('search_lora_model', 'true').lower() == 'true' - - # Get filter parameters - base_models = request.query.get('base_models', None) - tags = request.query.get('tags', None) - - # New parameter: get LoRA hash filter - lora_hash = request.query.get('lora_hash', None) - - # Parse filter parameters - filters = {} - if base_models: - filters['base_model'] = base_models.split(',') - if tags: - filters['tags'] = tags.split(',') - - # Add search options to filters - search_options = { - 'title': search_title, - 'tags': search_tags, - 'lora_name': search_lora_name, - 'lora_model': search_lora_model - } - - # Get paginated data with the new lora_hash parameter - result = await self.recipe_scanner.get_paginated_data( - page=page, - page_size=page_size, - sort_by=sort_by, - search=search, - filters=filters, - search_options=search_options, - lora_hash=lora_hash - ) - - # Format the response data with static URLs for file paths - for item in result['items']: - # Always ensure file_url is set - if 'file_path' in item: - item['file_url'] = self._format_recipe_file_url(item['file_path']) - else: - item['file_url'] = '/loras_static/images/no-preview.png' - - # 确保 loras 数组存在 - if 'loras' not in item: - item['loras'] = [] - - # 确保有 base_model 字段 - if 'base_model' not in item: - item['base_model'] = "" - - return web.json_response(result) - except Exception as e: - logger.error(f"Error retrieving recipes: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - async def get_recipe(self, request: web.Request) -> web.Response: - """Get detailed information about a specific recipe.""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - recipe_id = request.match_info['recipe_id'] - - # Use the new get_recipe_by_id method from recipe_scanner - recipe = await self.recipe_scanner.get_recipe_by_id(recipe_id) - - if not recipe: - return web.json_response({"error": "Recipe not found"}, status=404) - - return web.json_response(recipe) - except Exception as e: - logger.error(f"Error retrieving recipe details: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - def _format_recipe_file_url(self, file_path: str) -> str: - """Format file path for recipe image as a URL""" - try: - # Return the file URL directly for the first lora root's preview - recipes_dir = os.path.join(config.loras_roots[0], "recipes").replace(os.sep, '/') - if file_path.replace(os.sep, '/').startswith(recipes_dir): - relative_path = os.path.relpath(file_path, config.loras_roots[0]).replace(os.sep, '/') - return f"/loras_static/root1/preview/{relative_path}" - - # If not in recipes dir, try to create a valid URL from the file path - file_name = os.path.basename(file_path) - return f"/loras_static/root1/preview/recipes/{file_name}" - except Exception as e: - logger.error(f"Error formatting recipe file URL: {e}", exc_info=True) - return '/loras_static/images/no-preview.png' # Return default image on error - - def _format_recipe_data(self, recipe: Dict) -> Dict: - """Format recipe data for API response""" - formatted = {**recipe} # Copy all fields - - # Format file paths to URLs - if 'file_path' in formatted: - formatted['file_url'] = self._format_recipe_file_url(formatted['file_path']) - - # Format dates for display - for date_field in ['created_date', 'modified']: - if date_field in formatted: - formatted[f"{date_field}_formatted"] = self._format_timestamp(formatted[date_field]) - - return formatted - - def _format_timestamp(self, timestamp: float) -> str: - """Format timestamp for display""" - from datetime import datetime - return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') - - async def analyze_uploaded_image(self, request: web.Request) -> web.Response: - """Analyze an uploaded image or URL for recipe metadata.""" - temp_path = None - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Check if request contains multipart data (image) or JSON data (url) - content_type = request.headers.get('Content-Type', '') - - is_url_mode = False - metadata = None # Initialize metadata variable - - if 'multipart/form-data' in content_type: - # Handle image upload - reader = await request.multipart() - field = await reader.next() - - if field.name != 'image': - return web.json_response({ - "error": "No image field found", - "loras": [] - }, status=400) - - # Create a temporary file to store the uploaded image - with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: - while True: - chunk = await field.read_chunk() - if not chunk: - break - temp_file.write(chunk) - temp_path = temp_file.name - - elif 'application/json' in content_type: - # Handle URL input - data = await request.json() - url = data.get('url') - is_url_mode = True - - if not url: - return web.json_response({ - "error": "No URL provided", - "loras": [] - }, status=400) - - # Check if this is a Civitai image URL - import re - civitai_image_match = re.match(r'https://civitai\.com/images/(\d+)', url) - - if civitai_image_match: - # Extract image ID and fetch image info using get_image_info - image_id = civitai_image_match.group(1) - image_info = await self.civitai_client.get_image_info(image_id) - - if not image_info: - return web.json_response({ - "error": "Failed to fetch image information from Civitai", - "loras": [] - }, status=400) - - # Get image URL from response - image_url = image_info.get('url') - if not image_url: - return web.json_response({ - "error": "No image URL found in Civitai response", - "loras": [] - }, status=400) - - # Download image using unified downloader - downloader = await get_downloader() - # Create a temporary file to save the downloaded image - with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: - temp_path = temp_file.name - - success, result = await downloader.download_file( - image_url, - temp_path, - use_auth=False # Image downloads typically don't need auth - ) - - if not success: - return web.json_response({ - "error": f"Failed to download image from URL: {result}", - "loras": [] - }, status=400) - - # Use meta field from image_info as metadata - if 'meta' in image_info: - metadata = image_info['meta'] - - # If metadata wasn't obtained from Civitai API, extract it from the image - if metadata is None: - # Extract metadata from the image using ExifUtils - metadata = ExifUtils.extract_image_metadata(temp_path) - - # If no metadata found, return a more specific error - if not metadata: - result = { - "error": "No metadata found in this image", - "loras": [] # Return empty loras array to prevent client-side errors - } - - # For URL mode, include the image data as base64 - if is_url_mode and temp_path: - with open(temp_path, "rb") as image_file: - result["image_base64"] = base64.b64encode(image_file.read()).decode('utf-8') - - return web.json_response(result, status=200) - - # Use the parser factory to get the appropriate parser - parser = RecipeParserFactory.create_parser(metadata) - - if parser is None: - result = { - "error": "No parser found for this image", - "loras": [] # Return empty loras array to prevent client-side errors - } - - # For URL mode, include the image data as base64 - if is_url_mode and temp_path: - with open(temp_path, "rb") as image_file: - result["image_base64"] = base64.b64encode(image_file.read()).decode('utf-8') - - return web.json_response(result, status=200) - - # Parse the metadata - result = await parser.parse_metadata( - metadata, - recipe_scanner=self.recipe_scanner - ) - - # For URL mode, include the image data as base64 - if is_url_mode and temp_path: - with open(temp_path, "rb") as image_file: - result["image_base64"] = base64.b64encode(image_file.read()).decode('utf-8') - - # Check for errors - if "error" in result and not result.get("loras"): - return web.json_response(result, status=200) - - # Calculate fingerprint from parsed loras - from ..utils.utils import calculate_recipe_fingerprint - fingerprint = calculate_recipe_fingerprint(result.get("loras", [])) - - # Add fingerprint to result - result["fingerprint"] = fingerprint - - # Find matching recipes with the same fingerprint - matching_recipes = [] - if fingerprint: - matching_recipes = await self.recipe_scanner.find_recipes_by_fingerprint(fingerprint) - - # Add matching recipes to result - result["matching_recipes"] = matching_recipes - - return web.json_response(result) - - except Exception as e: - logger.error(f"Error analyzing recipe image: {e}", exc_info=True) - return web.json_response({ - "error": str(e), - "loras": [] # Return empty loras array to prevent client-side errors - }, status=500) - finally: - # Clean up the temporary file in the finally block - if temp_path and os.path.exists(temp_path): - try: - os.unlink(temp_path) - except Exception as e: - logger.error(f"Error deleting temporary file: {e}") - - async def analyze_local_image(self, request: web.Request) -> web.Response: - """Analyze a local image file for recipe metadata""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Get JSON data from request - data = await request.json() - file_path = data.get('path') - - if not file_path: - return web.json_response({ - 'error': 'No file path provided', - 'loras': [] - }, status=400) - - # Normalize file path for cross-platform compatibility - file_path = os.path.normpath(file_path.strip('"').strip("'")) - - # Validate that the file exists - if not os.path.isfile(file_path): - return web.json_response({ - 'error': 'File not found', - 'loras': [] - }, status=404) - - # Extract metadata from the image using ExifUtils - metadata = ExifUtils.extract_image_metadata(file_path) - - # If no metadata found, return error - if not metadata: - # Get base64 image data - with open(file_path, "rb") as image_file: - image_base64 = base64.b64encode(image_file.read()).decode('utf-8') - - return web.json_response({ - "error": "No metadata found in this image", - "loras": [], # Return empty loras array to prevent client-side errors - "image_base64": image_base64 - }, status=200) - - # Use the parser factory to get the appropriate parser - parser = RecipeParserFactory.create_parser(metadata) - - if parser is None: - # Get base64 image data - with open(file_path, "rb") as image_file: - image_base64 = base64.b64encode(image_file.read()).decode('utf-8') - - return web.json_response({ - "error": "No parser found for this image", - "loras": [], # Return empty loras array to prevent client-side errors - "image_base64": image_base64 - }, status=200) - - # Parse the metadata - result = await parser.parse_metadata( - metadata, - recipe_scanner=self.recipe_scanner - ) - - # Add base64 image data to result - with open(file_path, "rb") as image_file: - result["image_base64"] = base64.b64encode(image_file.read()).decode('utf-8') - - # Check for errors - if "error" in result and not result.get("loras"): - return web.json_response(result, status=200) - - # Calculate fingerprint from parsed loras - from ..utils.utils import calculate_recipe_fingerprint - fingerprint = calculate_recipe_fingerprint(result.get("loras", [])) - - # Add fingerprint to result - result["fingerprint"] = fingerprint - - # Find matching recipes with the same fingerprint - matching_recipes = [] - if fingerprint: - matching_recipes = await self.recipe_scanner.find_recipes_by_fingerprint(fingerprint) - - # Add matching recipes to result - result["matching_recipes"] = matching_recipes - - return web.json_response(result) - - except Exception as e: - logger.error(f"Error analyzing local image: {e}", exc_info=True) - return web.json_response({ - 'error': str(e), - 'loras': [] # Return empty loras array to prevent client-side errors - }, status=500) - - async def save_recipe(self, request: web.Request) -> web.Response: - """Save a recipe to the recipes folder""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - reader = await request.multipart() - - # Process form data - image = None - image_base64 = None - image_url = None - name = None - tags = [] - metadata = None - - while True: - field = await reader.next() - if field is None: - break - - if field.name == 'image': - # Read image data - image_data = b'' - while True: - chunk = await field.read_chunk() - if not chunk: - break - image_data += chunk - image = image_data - - elif field.name == 'image_base64': - # Get base64 image data - image_base64 = await field.text() - - elif field.name == 'image_url': - # Get image URL - image_url = await field.text() - - elif field.name == 'name': - name = await field.text() - - elif field.name == 'tags': - tags_text = await field.text() - try: - tags = json.loads(tags_text) - except: - tags = [] - - elif field.name == 'metadata': - metadata_text = await field.text() - try: - metadata = json.loads(metadata_text) - except: - metadata = {} - - missing_fields = [] - if not name: - missing_fields.append("name") - if not metadata: - missing_fields.append("metadata") - if missing_fields: - return web.json_response({"error": f"Missing required fields: {', '.join(missing_fields)}"}, status=400) - - # Handle different image sources - if not image: - if image_base64: - # Convert base64 to binary - try: - # Remove potential data URL prefix - if ',' in image_base64: - image_base64 = image_base64.split(',', 1)[1] - image = base64.b64decode(image_base64) - except Exception as e: - return web.json_response({"error": f"Invalid base64 image data: {str(e)}"}, status=400) - else: - return web.json_response({"error": "No image data provided"}, status=400) - - # Create recipes directory if it doesn't exist - recipes_dir = self.recipe_scanner.recipes_dir - os.makedirs(recipes_dir, exist_ok=True) - - # Generate UUID for the recipe - import uuid - recipe_id = str(uuid.uuid4()) - - # Optimize the image (resize and convert to WebP) - optimized_image, extension = ExifUtils.optimize_image( - image_data=image, - target_width=CARD_PREVIEW_WIDTH, - format='webp', - quality=85, - preserve_metadata=True - ) - - # Save the optimized image - image_filename = f"{recipe_id}{extension}" - image_path = os.path.join(recipes_dir, image_filename) - with open(image_path, 'wb') as f: - f.write(optimized_image) - - # Create the recipe data structure - current_time = time.time() - - # Format loras data according to the recipe.json format - loras_data = [] - for lora in metadata.get("loras", []): - # Modified: Always include deleted LoRAs in the recipe metadata - # Even if they're marked to be excluded, we still keep their identifying information - # The exclude flag will only be used to determine if they should be included in recipe syntax - - # Convert frontend lora format to recipe format - lora_entry = { - "file_name": lora.get("file_name", "") or os.path.splitext(os.path.basename(lora.get("localPath", "")))[0] if lora.get("localPath") else "", - "hash": lora.get("hash", "").lower() if lora.get("hash") else "", - "strength": float(lora.get("weight", 1.0)), - "modelVersionId": lora.get("id", 0), - "modelName": lora.get("name", ""), - "modelVersionName": lora.get("version", ""), - "isDeleted": lora.get("isDeleted", False), # Preserve deletion status in saved recipe - "exclude": lora.get("exclude", False) # Add exclude flag to the recipe - } - loras_data.append(lora_entry) - - # Format gen_params according to the recipe.json format - gen_params = metadata.get("gen_params", {}) - if not gen_params and "raw_metadata" in metadata: - # Extract from raw metadata if available - raw_metadata = metadata.get("raw_metadata", {}) - gen_params = { - "prompt": raw_metadata.get("prompt", ""), - "negative_prompt": raw_metadata.get("negative_prompt", ""), - "checkpoint": raw_metadata.get("checkpoint", {}), - "steps": raw_metadata.get("steps", ""), - "sampler": raw_metadata.get("sampler", ""), - "cfg_scale": raw_metadata.get("cfg_scale", ""), - "seed": raw_metadata.get("seed", ""), - "size": raw_metadata.get("size", ""), - "clip_skip": raw_metadata.get("clip_skip", "") - } - - # Calculate recipe fingerprint - from ..utils.utils import calculate_recipe_fingerprint - fingerprint = calculate_recipe_fingerprint(loras_data) - - # Create the recipe data structure - recipe_data = { - "id": recipe_id, - "file_path": image_path, - "title": name, - "modified": current_time, - "created_date": current_time, - "base_model": metadata.get("base_model", ""), - "loras": loras_data, - "gen_params": gen_params, - "fingerprint": fingerprint - } - - # Add tags if provided - if tags: - recipe_data["tags"] = tags - - # Add source_path if provided in metadata - if metadata.get("source_path"): - recipe_data["source_path"] = metadata.get("source_path") - - # Save the recipe JSON - json_filename = f"{recipe_id}.recipe.json" - json_path = os.path.join(recipes_dir, json_filename) - with open(json_path, 'w', encoding='utf-8') as f: - json.dump(recipe_data, f, indent=4, ensure_ascii=False) - - # Add recipe metadata to the image - ExifUtils.append_recipe_metadata(image_path, recipe_data) - - # Check for duplicates - matching_recipes = [] - if fingerprint: - matching_recipes = await self.recipe_scanner.find_recipes_by_fingerprint(fingerprint) - # Remove current recipe from matches - if recipe_id in matching_recipes: - matching_recipes.remove(recipe_id) - - # Simplified cache update approach - # Instead of trying to update the cache directly, just set it to None - # to force a refresh on the next get_cached_data call - if self.recipe_scanner._cache is not None: - # Add the recipe to the raw data if the cache exists - # This is a simple direct update without locks or timeouts - self.recipe_scanner._cache.raw_data.append(recipe_data) - # Schedule a background task to resort the cache - asyncio.create_task(self.recipe_scanner._cache.resort()) - logger.info(f"Added recipe {recipe_id} to cache") - - return web.json_response({ - 'success': True, - 'recipe_id': recipe_id, - 'image_path': image_path, - 'json_path': json_path, - 'matching_recipes': matching_recipes - }) - - except Exception as e: - logger.error(f"Error saving recipe: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - async def delete_recipe(self, request: web.Request) -> web.Response: - """Delete a recipe by ID""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - recipe_id = request.match_info['recipe_id'] - - # Get recipes directory - recipes_dir = self.recipe_scanner.recipes_dir - if not recipes_dir or not os.path.exists(recipes_dir): - return web.json_response({"error": "Recipes directory not found"}, status=404) - - # Find recipe JSON file - recipe_json_path = os.path.join(recipes_dir, f"{recipe_id}.recipe.json") - if not os.path.exists(recipe_json_path): - return web.json_response({"error": "Recipe not found"}, status=404) - - # Load recipe data to get image path - with open(recipe_json_path, 'r', encoding='utf-8') as f: - recipe_data = json.load(f) - - # Get image path - image_path = recipe_data.get('file_path') - - # Delete recipe JSON file - os.remove(recipe_json_path) - logger.info(f"Deleted recipe JSON file: {recipe_json_path}") - - # Delete recipe image if it exists - if image_path and os.path.exists(image_path): - os.remove(image_path) - logger.info(f"Deleted recipe image: {image_path}") - - # Simplified cache update approach - if self.recipe_scanner._cache is not None: - # Remove the recipe from raw_data if it exists - self.recipe_scanner._cache.raw_data = [ - r for r in self.recipe_scanner._cache.raw_data - if str(r.get('id', '')) != recipe_id - ] - # Schedule a background task to resort the cache - asyncio.create_task(self.recipe_scanner._cache.resort()) - logger.info(f"Removed recipe {recipe_id} from cache") - - return web.json_response({"success": True, "message": "Recipe deleted successfully"}) - except Exception as e: - logger.error(f"Error deleting recipe: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - async def get_top_tags(self, request: web.Request) -> web.Response: - """Get top tags used in recipes""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Get limit parameter with default - limit = int(request.query.get('limit', '20')) - - # Get all recipes from cache - cache = await self.recipe_scanner.get_cached_data() - - # Count tag occurrences - tag_counts = {} - for recipe in cache.raw_data: - if 'tags' in recipe and recipe['tags']: - for tag in recipe['tags']: - tag_counts[tag] = tag_counts.get(tag, 0) + 1 - - # Sort tags by count and limit results - sorted_tags = [{'tag': tag, 'count': count} for tag, count in tag_counts.items()] - sorted_tags.sort(key=lambda x: x['count'], reverse=True) - top_tags = sorted_tags[:limit] - - return web.json_response({ - 'success': True, - 'tags': top_tags - }) - except Exception as e: - logger.error(f"Error retrieving top tags: {e}", exc_info=True) - return web.json_response({ - 'success': False, - 'error': str(e) - }, status=500) - - async def get_base_models(self, request: web.Request) -> web.Response: - """Get base models used in recipes""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Get all recipes from cache - cache = await self.recipe_scanner.get_cached_data() - - # Count base model occurrences - base_model_counts = {} - for recipe in cache.raw_data: - if 'base_model' in recipe and recipe['base_model']: - base_model = recipe['base_model'] - base_model_counts[base_model] = base_model_counts.get(base_model, 0) + 1 - - # Sort base models by count - sorted_models = [{'name': model, 'count': count} for model, count in base_model_counts.items()] - sorted_models.sort(key=lambda x: x['count'], reverse=True) - - return web.json_response({ - 'success': True, - 'base_models': sorted_models - }) - except Exception as e: - logger.error(f"Error retrieving base models: {e}", exc_info=True) - return web.json_response({ - 'success': False, - 'error': str(e)} - , status=500) - - async def share_recipe(self, request: web.Request) -> web.Response: - """Process a recipe image for sharing by adding metadata to EXIF""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - recipe_id = request.match_info['recipe_id'] - - # Get all recipes from cache - cache = await self.recipe_scanner.get_cached_data() - - # Find the specific recipe - recipe = next((r for r in cache.raw_data if str(r.get('id', '')) == recipe_id), None) - - if not recipe: - return web.json_response({"error": "Recipe not found"}, status=404) - - # Get the image path - image_path = recipe.get('file_path') - if not image_path or not os.path.exists(image_path): - return web.json_response({"error": "Recipe image not found"}, status=404) - - # Create a temporary copy of the image to modify - import tempfile - import shutil - - # Create temp file with same extension - ext = os.path.splitext(image_path)[1] - with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file: - temp_path = temp_file.name - - # Copy the original image to temp file - shutil.copy2(image_path, temp_path) - processed_path = temp_path - - # Create a URL for the processed image - # Use a timestamp to prevent caching - timestamp = int(time.time()) - url_path = f"/api/recipe/{recipe_id}/share/download?t={timestamp}" - - # Store the temp path in a dictionary to serve later - if not hasattr(self, '_shared_recipes'): - self._shared_recipes = {} - - self._shared_recipes[recipe_id] = { - 'path': processed_path, - 'timestamp': timestamp, - 'expires': time.time() + 300 # Expire after 5 minutes - } - - # Clean up old entries - self._cleanup_shared_recipes() - - return web.json_response({ - 'success': True, - 'download_url': url_path, - 'filename': f"recipe_{recipe.get('title', '').replace(' ', '_').lower()}{ext}" - }) - except Exception as e: - logger.error(f"Error sharing recipe: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - async def download_shared_recipe(self, request: web.Request) -> web.Response: - """Serve a processed recipe image for download""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - recipe_id = request.match_info['recipe_id'] - - # Check if we have this shared recipe - if not hasattr(self, '_shared_recipes') or recipe_id not in self._shared_recipes: - return web.json_response({"error": "Shared recipe not found or expired"}, status=404) - - shared_info = self._shared_recipes[recipe_id] - file_path = shared_info['path'] - - if not os.path.exists(file_path): - return web.json_response({"error": "Shared recipe file not found"}, status=404) - - # Get recipe to determine filename - cache = await self.recipe_scanner.get_cached_data() - recipe = next((r for r in cache.raw_data if str(r.get('id', '')) == recipe_id), None) - - # Set filename for download - filename = f"recipe_{recipe.get('title', '').replace(' ', '_').lower() if recipe else recipe_id}" - ext = os.path.splitext(file_path)[1] - download_filename = f"{filename}{ext}" - - # Serve the file - return web.FileResponse( - file_path, - headers={ - 'Content-Disposition': f'attachment; filename="{download_filename}"' - } - ) - except Exception as e: - logger.error(f"Error downloading shared recipe: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - def _cleanup_shared_recipes(self): - """Clean up expired shared recipes""" - if not hasattr(self, '_shared_recipes'): - return - - current_time = time.time() - expired_ids = [rid for rid, info in self._shared_recipes.items() - if current_time > info.get('expires', 0)] - - for rid in expired_ids: - try: - # Delete the temporary file - file_path = self._shared_recipes[rid]['path'] - if os.path.exists(file_path): - os.unlink(file_path) - - # Remove from dictionary - del self._shared_recipes[rid] - except Exception as e: - logger.error(f"Error cleaning up shared recipe {rid}: {e}") - - async def save_recipe_from_widget(self, request: web.Request) -> web.Response: - """Save a recipe from the LoRAs widget""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Get metadata using the metadata collector instead of workflow parsing - raw_metadata = get_metadata() - metadata_dict = MetadataProcessor.to_dict(raw_metadata) - - # Check if we have valid metadata - if not metadata_dict: - return web.json_response({"error": "No generation metadata found"}, status=400) - - # Get the most recent image from metadata registry instead of temp directory - if not standalone_mode: - metadata_registry = MetadataRegistry() - latest_image = metadata_registry.get_first_decoded_image() - else: - latest_image = None - - if latest_image is None: - return web.json_response({"error": "No recent images found to use for recipe. Try generating an image first."}, status=400) - - # Convert the image data to bytes - handle tuple and tensor cases - logger.debug(f"Image type: {type(latest_image)}") - - try: - # Handle the tuple case first - if isinstance(latest_image, tuple): - # Extract the tensor from the tuple - if len(latest_image) > 0: - tensor_image = latest_image[0] - else: - return web.json_response({"error": "Empty image tuple received"}, status=400) - else: - tensor_image = latest_image - - # Get the shape info for debugging - if hasattr(tensor_image, 'shape'): - shape_info = tensor_image.shape - logger.debug(f"Tensor shape: {shape_info}, dtype: {tensor_image.dtype}") - - import torch - - # Convert tensor to numpy array - if isinstance(tensor_image, torch.Tensor): - image_np = tensor_image.cpu().numpy() - else: - image_np = np.array(tensor_image) - - # Handle different tensor shapes - # Case: (1, 1, H, W, 3) or (1, H, W, 3) - batch or multi-batch - if len(image_np.shape) > 3: - # Remove batch dimensions until we get to (H, W, 3) - while len(image_np.shape) > 3: - image_np = image_np[0] - - # If values are in [0, 1] range, convert to [0, 255] - if image_np.dtype == np.float32 or image_np.dtype == np.float64: - if image_np.max() <= 1.0: - image_np = (image_np * 255).astype(np.uint8) - - # Ensure image is in the right format (HWC with RGB channels) - if len(image_np.shape) == 3 and image_np.shape[2] == 3: - pil_image = Image.fromarray(image_np) - img_byte_arr = io.BytesIO() - pil_image.save(img_byte_arr, format='PNG') - image = img_byte_arr.getvalue() - else: - return web.json_response({"error": f"Cannot handle this data shape: {image_np.shape}, {image_np.dtype}"}, status=400) - except Exception as e: - logger.error(f"Error processing image data: {str(e)}", exc_info=True) - return web.json_response({"error": f"Error processing image: {str(e)}"}, status=400) - - # Get the lora stack from the metadata - lora_stack = metadata_dict.get("loras", "") - - # Parse the lora stack format: " ..." - import re - lora_matches = re.findall(r']+)>', lora_stack) - - # Check if any loras were found - if not lora_matches: - return web.json_response({"error": "No LoRAs found in the generation metadata"}, status=400) - - # Generate recipe name from the first 3 loras (or less if fewer are available) - loras_for_name = lora_matches[:3] # Take at most 3 loras for the name - - recipe_name_parts = [] - for lora_name, lora_strength in loras_for_name: - # Get the basename without path or extension - basename = os.path.basename(lora_name) - basename = os.path.splitext(basename)[0] - recipe_name_parts.append(f"{basename}:{lora_strength}") - - recipe_name = " ".join(recipe_name_parts) - - # Create recipes directory if it doesn't exist - recipes_dir = self.recipe_scanner.recipes_dir - os.makedirs(recipes_dir, exist_ok=True) - - # Generate UUID for the recipe - import uuid - recipe_id = str(uuid.uuid4()) - - # Optimize the image (resize and convert to WebP) - optimized_image, extension = ExifUtils.optimize_image( - image_data=image, - target_width=CARD_PREVIEW_WIDTH, - format='webp', - quality=85, - preserve_metadata=True - ) - - # Save the optimized image - image_filename = f"{recipe_id}{extension}" - image_path = os.path.join(recipes_dir, image_filename) - with open(image_path, 'wb') as f: - f.write(optimized_image) - - # Format loras data from the lora stack - loras_data = [] - - for lora_name, lora_strength in lora_matches: - try: - # Get lora info from scanner - lora_info = await self.recipe_scanner._lora_scanner.get_model_info_by_name(lora_name) - - # Create lora entry - lora_entry = { - "file_name": lora_name, - "hash": lora_info.get("sha256", "").lower() if lora_info else "", - "strength": float(lora_strength), - "modelVersionId": lora_info.get("civitai", {}).get("id", 0) if lora_info else 0, - "modelName": lora_info.get("civitai", {}).get("model", {}).get("name", "") if lora_info else lora_name, - "modelVersionName": lora_info.get("civitai", {}).get("name", "") if lora_info else "", - "isDeleted": False - } - loras_data.append(lora_entry) - except Exception as e: - logger.warning(f"Error processing LoRA {lora_name}: {e}") - - # Get base model from lora scanner for the available loras - base_model_counts = {} - for lora in loras_data: - lora_info = await self.recipe_scanner._lora_scanner.get_model_info_by_name(lora.get("file_name", "")) - if lora_info and "base_model" in lora_info: - base_model = lora_info["base_model"] - base_model_counts[base_model] = base_model_counts.get(base_model, 0) + 1 - - # Get most common base model - most_common_base_model = "" - if base_model_counts: - most_common_base_model = max(base_model_counts.items(), key=lambda x: x[1])[0] - - # Create the recipe data structure - recipe_data = { - "id": recipe_id, - "file_path": image_path, - "title": recipe_name, # Use generated recipe name - "modified": time.time(), - "created_date": time.time(), - "base_model": most_common_base_model, - "loras": loras_data, - "checkpoint": metadata_dict.get("checkpoint", ""), - "gen_params": {key: value for key, value in metadata_dict.items() - if key not in ['checkpoint', 'loras']}, - "loras_stack": lora_stack # Include the original lora stack string - } - - # Save the recipe JSON - json_filename = f"{recipe_id}.recipe.json" - json_path = os.path.join(recipes_dir, json_filename) - with open(json_path, 'w', encoding='utf-8') as f: - json.dump(recipe_data, f, indent=4, ensure_ascii=False) - - # Add recipe metadata to the image - ExifUtils.append_recipe_metadata(image_path, recipe_data) - - # Update cache - if self.recipe_scanner._cache is not None: - # Add the recipe to the raw data if the cache exists - self.recipe_scanner._cache.raw_data.append(recipe_data) - # Schedule a background task to resort the cache - asyncio.create_task(self.recipe_scanner._cache.resort()) - logger.info(f"Added recipe {recipe_id} to cache") - - return web.json_response({ - 'success': True, - 'recipe_id': recipe_id, - 'image_path': image_path, - 'json_path': json_path, - 'recipe_name': recipe_name # Include the generated recipe name in the response - }) - - except Exception as e: - logger.error(f"Error saving recipe from widget: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - async def get_recipe_syntax(self, request: web.Request) -> web.Response: - """Generate recipe syntax for LoRAs in the recipe, looking up proper file names using hash_index""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - recipe_id = request.match_info['recipe_id'] - - # Get all recipes from cache - cache = await self.recipe_scanner.get_cached_data() - - # Find the specific recipe - recipe = next((r for r in cache.raw_data if str(r.get('id', '')) == recipe_id), None) - - if not recipe: - return web.json_response({"error": "Recipe not found"}, status=404) - - # Get the loras from the recipe - loras = recipe.get('loras', []) - - if not loras: - return web.json_response({"error": "No LoRAs found in this recipe"}, status=400) - - # Generate recipe syntax for all LoRAs that: - # 1. Are in the library (not deleted) OR - # 2. Are deleted but not marked for exclusion - lora_syntax_parts = [] - - # Access the hash_index from lora_scanner - hash_index = self.recipe_scanner._lora_scanner._hash_index - - for lora in loras: - # Skip loras that are deleted AND marked for exclusion - if lora.get("isDeleted", False): - continue - - if not self.recipe_scanner._lora_scanner.has_hash(lora.get("hash", "")): - continue - - # Get the strength - strength = lora.get("strength", 1.0) - - # Try to find the actual file name for this lora - file_name = None - hash_value = lora.get("hash", "").lower() - - if hash_value and hasattr(hash_index, "_hash_to_path"): - # Look up the file path from the hash - file_path = hash_index._hash_to_path.get(hash_value) - - if file_path: - # Extract the file name without extension from the path - file_name = os.path.splitext(os.path.basename(file_path))[0] - - # If hash lookup failed, fall back to modelVersionId lookup - if not file_name and lora.get("modelVersionId"): - # Search for files with matching modelVersionId - all_loras = await self.recipe_scanner._lora_scanner.get_cached_data() - for cached_lora in all_loras.raw_data: - if not cached_lora.get("civitai"): - continue - if cached_lora.get("civitai", {}).get("id") == lora.get("modelVersionId"): - file_name = os.path.splitext(os.path.basename(cached_lora["path"]))[0] - break - - # If all lookups failed, use the file_name from the recipe - if not file_name: - file_name = lora.get("file_name", "unknown-lora") - - # Add to syntax parts - lora_syntax_parts.append(f"") - - # Join the LoRA syntax parts - lora_syntax = " ".join(lora_syntax_parts) - - return web.json_response({ - 'success': True, - 'syntax': lora_syntax - }) - except Exception as e: - logger.error(f"Error generating recipe syntax: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - async def update_recipe(self, request: web.Request) -> web.Response: - """Update recipe metadata (name and tags)""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - recipe_id = request.match_info['recipe_id'] - data = await request.json() - - # Validate required fields - if 'title' not in data and 'tags' not in data and 'source_path' not in data and 'preview_nsfw_level' not in data: - return web.json_response({ - "error": "At least one field to update must be provided (title or tags or source_path or preview_nsfw_level)" - }, status=400) - - # Use the recipe scanner's update method - success = await self.recipe_scanner.update_recipe_metadata(recipe_id, data) - - if not success: - return web.json_response({"error": "Recipe not found or update failed"}, status=404) - - return web.json_response({ - "success": True, - "recipe_id": recipe_id, - "updates": data - }) - except Exception as e: - logger.error(f"Error updating recipe: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - async def reconnect_lora(self, request: web.Request) -> web.Response: - """Reconnect a deleted LoRA in a recipe to a local LoRA file""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Parse request data - data = await request.json() - - # Validate required fields - required_fields = ['recipe_id', 'lora_index', 'target_name'] - for field in required_fields: - if field not in data: - return web.json_response({ - "error": f"Missing required field: {field}" - }, status=400) - - recipe_id = data['recipe_id'] - lora_index = int(data['lora_index']) - target_name = data['target_name'] - - # Get recipe scanner - scanner = self.recipe_scanner - lora_scanner = scanner._lora_scanner - - # Check if recipe exists - recipe_path = os.path.join(scanner.recipes_dir, f"{recipe_id}.recipe.json") - if not os.path.exists(recipe_path): - return web.json_response({"error": "Recipe not found"}, status=404) - - # Find target LoRA by name - target_lora = await lora_scanner.get_model_info_by_name(target_name) - if not target_lora: - return web.json_response({"error": f"Local LoRA not found with name: {target_name}"}, status=404) - - # Load recipe data - with open(recipe_path, 'r', encoding='utf-8') as f: - recipe_data = json.load(f) - - lora = recipe_data.get("loras", [])[lora_index] if lora_index < len(recipe_data.get('loras', [])) else None - - if lora is None: - return web.json_response({"error": "LoRA index out of range in recipe"}, status=404) - - # Update LoRA data - lora['isDeleted'] = False - lora['exclude'] = False - lora['file_name'] = target_name - - # Update with information from the target LoRA - if 'sha256' in target_lora: - lora['hash'] = target_lora['sha256'].lower() - if target_lora.get("civitai"): - lora['modelName'] = target_lora['civitai']['model']['name'] - lora['modelVersionName'] = target_lora['civitai']['name'] - lora['modelVersionId'] = target_lora['civitai']['id'] - - updated_lora = dict(lora) # Make a copy for response - - # Recalculate recipe fingerprint after updating LoRA - from ..utils.utils import calculate_recipe_fingerprint - recipe_data['fingerprint'] = calculate_recipe_fingerprint(recipe_data.get('loras', [])) - - # Save updated recipe - with open(recipe_path, 'w', encoding='utf-8') as f: - json.dump(recipe_data, f, indent=4, ensure_ascii=False) - - updated_lora['inLibrary'] = True - updated_lora['preview_url'] = config.get_preview_static_url(target_lora['preview_url']) - updated_lora['localPath'] = target_lora['file_path'] - - # Update in cache if it exists - if scanner._cache is not None: - for cache_item in scanner._cache.raw_data: - if cache_item.get('id') == recipe_id: - # Replace loras array with updated version - cache_item['loras'] = recipe_data['loras'] - # Update fingerprint in cache - cache_item['fingerprint'] = recipe_data['fingerprint'] - - # Resort the cache - asyncio.create_task(scanner._cache.resort()) - break - - # Update EXIF metadata if image exists - image_path = recipe_data.get('file_path') - if image_path and os.path.exists(image_path): - from ..utils.exif_utils import ExifUtils - ExifUtils.append_recipe_metadata(image_path, recipe_data) - - # Find other recipes with the same fingerprint - matching_recipes = [] - if 'fingerprint' in recipe_data: - matching_recipes = await scanner.find_recipes_by_fingerprint(recipe_data['fingerprint']) - # Remove current recipe from matches - if recipe_id in matching_recipes: - matching_recipes.remove(recipe_id) - - return web.json_response({ - "success": True, - "recipe_id": recipe_id, - "updated_lora": updated_lora, - "matching_recipes": matching_recipes - }) - - except Exception as e: - logger.error(f"Error reconnecting LoRA: {e}", exc_info=True) - return web.json_response({"error": str(e)}, status=500) - - async def get_recipes_for_lora(self, request: web.Request) -> web.Response: - """Get recipes that use a specific Lora""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - lora_hash = request.query.get('hash') - - # Hash is required - if not lora_hash: - return web.json_response({'success': False, 'error': 'Lora hash is required'}, status=400) - - # Log the search parameters - logger.debug(f"Getting recipes for Lora by hash: {lora_hash}") - - # Get all recipes from cache - cache = await self.recipe_scanner.get_cached_data() - - # Filter recipes that use this Lora by hash - matching_recipes = [] - for recipe in cache.raw_data: - # Check if any of the recipe's loras match this hash - loras = recipe.get('loras', []) - for lora in loras: - if lora.get('hash', '').lower() == lora_hash.lower(): - matching_recipes.append(recipe) - break # No need to check other loras in this recipe - - # Process the recipes similar to get_paginated_data to ensure all needed data is available - for recipe in matching_recipes: - # Add inLibrary information for each lora - if 'loras' in recipe: - for lora in recipe['loras']: - if 'hash' in lora and lora['hash']: - lora['inLibrary'] = self.recipe_scanner._lora_scanner.has_hash(lora['hash'].lower()) - lora['preview_url'] = self.recipe_scanner._lora_scanner.get_preview_url_by_hash(lora['hash'].lower()) - lora['localPath'] = self.recipe_scanner._lora_scanner.get_path_by_hash(lora['hash'].lower()) - - # Ensure file_url is set (needed by frontend) - if 'file_path' in recipe: - recipe['file_url'] = self._format_recipe_file_url(recipe['file_path']) - else: - recipe['file_url'] = '/loras_static/images/no-preview.png' - - return web.json_response({'success': True, 'recipes': matching_recipes}) - except Exception as e: - logger.error(f"Error getting recipes for Lora: {str(e)}") - return web.json_response({'success': False, 'error': str(e)}, status=500) - - async def scan_recipes(self, request: web.Request) -> web.Response: - """API endpoint for scanning and rebuilding the recipe cache""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Force refresh the recipe cache - logger.info("Manually triggering recipe cache rebuild") - await self.recipe_scanner.get_cached_data(force_refresh=True) - - return web.json_response({ - 'success': True, - 'message': 'Recipe cache refreshed successfully' - }) - except Exception as e: - logger.error(f"Error refreshing recipe cache: {e}", exc_info=True) - return web.json_response({ - 'success': False, - 'error': str(e) - }, status=500) - - async def find_duplicates(self, request: web.Request) -> web.Response: - """Find all duplicate recipes based on fingerprints""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Get all duplicate recipes - duplicate_groups = await self.recipe_scanner.find_all_duplicate_recipes() - - # Create response data with additional recipe information - response_data = [] - - for fingerprint, recipe_ids in duplicate_groups.items(): - # Skip groups with only one recipe (not duplicates) - if len(recipe_ids) <= 1: - continue - - # Get recipe details for each recipe in the group - recipes = [] - for recipe_id in recipe_ids: - recipe = await self.recipe_scanner.get_recipe_by_id(recipe_id) - if recipe: - # Add only needed fields to keep response size manageable - recipes.append({ - 'id': recipe.get('id'), - 'title': recipe.get('title'), - 'file_url': recipe.get('file_url') or self._format_recipe_file_url(recipe.get('file_path', '')), - 'modified': recipe.get('modified'), - 'created_date': recipe.get('created_date'), - 'lora_count': len(recipe.get('loras', [])), - }) - - # Only include groups with at least 2 valid recipes - if len(recipes) >= 2: - # Sort recipes by modified date (newest first) - recipes.sort(key=lambda x: x.get('modified', 0), reverse=True) - - response_data.append({ - 'fingerprint': fingerprint, - 'count': len(recipes), - 'recipes': recipes - }) - - # Sort groups by count (highest first) - response_data.sort(key=lambda x: x['count'], reverse=True) - - return web.json_response({ - 'success': True, - 'duplicate_groups': response_data - }) - - except Exception as e: - logger.error(f"Error finding duplicate recipes: {e}", exc_info=True) - return web.json_response({ - 'success': False, - 'error': str(e) - }, status=500) - - async def bulk_delete(self, request: web.Request) -> web.Response: - """Delete multiple recipes by ID""" - try: - # Ensure services are initialized - await self.ensure_dependencies_ready() - - # Parse request data - data = await request.json() - recipe_ids = data.get('recipe_ids', []) - - if not recipe_ids: - return web.json_response({ - 'success': False, - 'error': 'No recipe IDs provided' - }, status=400) - - # Get recipes directory - recipes_dir = self.recipe_scanner.recipes_dir - if not recipes_dir or not os.path.exists(recipes_dir): - return web.json_response({ - 'success': False, - 'error': 'Recipes directory not found' - }, status=404) - - # Track deleted and failed recipes - deleted_recipes = [] - failed_recipes = [] - - # Process each recipe ID - for recipe_id in recipe_ids: - # Find recipe JSON file - recipe_json_path = os.path.join(recipes_dir, f"{recipe_id}.recipe.json") - - if not os.path.exists(recipe_json_path): - failed_recipes.append({ - 'id': recipe_id, - 'reason': 'Recipe not found' - }) - continue - - try: - # Load recipe data to get image path - with open(recipe_json_path, 'r', encoding='utf-8') as f: - recipe_data = json.load(f) - - # Get image path - image_path = recipe_data.get('file_path') - - # Delete recipe JSON file - os.remove(recipe_json_path) - - # Delete recipe image if it exists - if image_path and os.path.exists(image_path): - os.remove(image_path) - - deleted_recipes.append(recipe_id) - - except Exception as e: - failed_recipes.append({ - 'id': recipe_id, - 'reason': str(e) - }) - - # Update cache if any recipes were deleted - if deleted_recipes and self.recipe_scanner._cache is not None: - # Remove deleted recipes from raw_data - self.recipe_scanner._cache.raw_data = [ - r for r in self.recipe_scanner._cache.raw_data - if r.get('id') not in deleted_recipes - ] - # Resort the cache - asyncio.create_task(self.recipe_scanner._cache.resort()) - logger.info(f"Removed {len(deleted_recipes)} recipes from cache") - - return web.json_response({ - 'success': True, - 'deleted': deleted_recipes, - 'failed': failed_recipes, - 'total_deleted': len(deleted_recipes), - 'total_failed': len(failed_recipes) - }) - - except Exception as e: - logger.error(f"Error performing bulk delete: {e}", exc_info=True) - return web.json_response({ - 'success': False, - 'error': str(e) - }, status=500) - - # Legacy method aliases retained for compatibility with existing imports. - handle_recipes_page = render_page - get_recipes = list_recipes - get_recipe_detail = get_recipe - analyze_recipe_image = analyze_uploaded_image diff --git a/tests/routes/test_recipe_route_scaffolding.py b/tests/routes/test_recipe_route_scaffolding.py index 1f0e723d..59765d36 100644 --- a/tests/routes/test_recipe_route_scaffolding.py +++ b/tests/routes/test_recipe_route_scaffolding.py @@ -128,31 +128,38 @@ def test_register_startup_hooks_appends_once(): assert len(startup_bound_to_routes) == 2 -def test_to_route_mapping_uses_handler_owner(monkeypatch: pytest.MonkeyPatch): - class DummyOwner: - async def render_page(self, request): - return web.Response(text="ok") +def test_to_route_mapping_uses_handler_set(): + class DummyHandlerSet: + def __init__(self): + self.calls = 0 - async def list_recipes(self, request): # pragma: no cover - invoked via mapping - return web.json_response({}) + def to_route_mapping(self): + self.calls += 1 + + async def render_page(request): # pragma: no cover - simple coroutine + return web.Response(text="ok") + + return {"render_page": render_page} class DummyRoutes(base_routes_module.BaseRecipeRoutes): - def get_handler_owner(self): # noqa: D401 - simple override for test - return DummyOwner() + def __init__(self): + super().__init__() + self.created = 0 - monkeypatch.setattr( - base_routes_module.BaseRecipeRoutes, - "_HANDLER_NAMES", - ("render_page", "list_recipes"), - ) + def _create_handler_set(self): # noqa: D401 - simple override for test + self.created += 1 + return DummyHandlerSet() routes = DummyRoutes() mapping = routes.to_route_mapping() - assert set(mapping.keys()) == {"render_page", "list_recipes"} + assert set(mapping.keys()) == {"render_page"} assert asyncio.iscoroutinefunction(mapping["render_page"]) # Cached mapping reused on subsequent calls assert routes.to_route_mapping() is mapping + # Handler set cached for get_handler_owner callers + assert isinstance(routes.get_handler_owner(), DummyHandlerSet) + assert routes.created == 1 def test_recipe_route_registrar_binds_every_route():