feat(recipe): add Create As Recipe from example images with import dedup check (#945)

This commit is contained in:
Will Miao
2026-06-03 17:50:58 +08:00
parent 98e1d168b0
commit 151a467598
18 changed files with 497 additions and 6 deletions

View File

@@ -16,7 +16,7 @@ from aiohttp import web
from ...config import config
from ...services.server_i18n import server_i18n as default_server_i18n
from ...services.settings_manager import SettingsManager
from ...services.settings_manager import SettingsManager, get_settings_manager
from ...services.recipes import (
RecipeAnalysisService,
RecipeDownloadError,
@@ -26,7 +26,12 @@ from ...services.recipes import (
RecipeValidationError,
)
from ...services.metadata_service import get_default_metadata_provider
from ...utils.civitai_utils import extract_civitai_image_id, rewrite_preview_url
from ...utils.civitai_utils import (
build_civitai_image_page_url,
extract_civitai_image_id,
extract_civitai_image_id_from_cdn_url,
rewrite_preview_url,
)
from ...utils.exif_utils import ExifUtils
from ...recipes.merger import GenParamsMerger
from ...recipes.enrichment import RecipeEnricher
@@ -96,6 +101,7 @@ class RecipeHandlerSet:
"browse_directory": self.batch_import.browse_directory,
"check_image_exists": self.management.check_image_exists,
"import_from_url": self.management.import_from_url,
"create_from_example": self.management.create_from_example,
}
@@ -1668,6 +1674,246 @@ class RecipeManagementHandler:
)
return web.json_response(result.payload, status=result.status)
async def create_from_example(self, request: web.Request) -> web.Response:
"""Create a recipe from a model's example image using cached metadata.
Uses the image's meta data (already cached in .metadata.json from the
CivitAI model-versions API) to create a recipe without additional
CivitAI API calls.
If the image metadata doesn't contain any resources of the parent
model's type (LoRA-type or Checkpoint), the parent model is
auto-populated as a fallback.
Request body:
image_data (dict): The full image object from model-versions API
(includes meta, additionalResources, url, etc.)
model_hash (str): SHA256 hash of the parent model
model_name (str): Filename of the parent model
model_type (str): Page type (``"loras"``, ``"checkpoints"``, etc.)
local_image_path (str, optional): Local filesystem path to read
the image bytes for the recipe preview
"""
try:
await self._ensure_dependencies_ready()
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
data = await request.json()
image_data = data.get("image_data")
model_hash = data.get("model_hash")
model_name = data.get("model_name")
model_type = data.get("model_type", "")
if not image_data or not model_hash or not model_name:
raise RecipeValidationError(
"Missing required fields: image_data, model_hash, model_name"
)
# Merge nested meta into top level so the parser finds everything.
# CivitaiApiMetadataParser expects prompt, seed, resources, etc.
# at the top level or wrapped under a "meta" key.
inner_meta = image_data.get("meta") or {}
parsed_input = {**image_data, **inner_meta}
parsed_input.pop("meta", None)
parser = self._analysis_service._recipe_parser_factory.create_parser(
parsed_input
)
if not parser:
raise RecipeValidationError("Unable to parse image metadata")
parsed = await parser.parse_metadata(
parsed_input, recipe_scanner=recipe_scanner
)
loras = list(parsed.get("loras") or [])
checkpoint = parsed.get("model")
is_lora_type = model_type.startswith("lora")
is_ckpt_type = model_type.startswith("checkpoint")
# Look up parent model's cached CivitAI metadata (version ID,
# version name, model ID) from the scanner cache. Used to fix
# isDeleted entries and enrich auto-populated ones.
parent_civitai_id: int | None = None
parent_model_id: int | None = None
parent_version_name: str | None = None
parent_model_name: str | None = None
lora_scanner = getattr(recipe_scanner, "_lora_scanner", None)
if lora_scanner and model_hash:
try:
parent_cache = await lora_scanner.get_cached_data()
for item in getattr(parent_cache, "raw_data", []):
if item.get("sha256", "").lower() == model_hash.lower():
civ = item.get("civitai") or {}
if isinstance(civ, dict):
parent_civitai_id = civ.get("id")
parent_model_id = civ.get("modelId")
parent_version_name = civ.get("name")
# model_name is a flat SQLite column holding the
# CivitAI model display name (not nested under
# civitai.model which only stores type).
parent_model_name = item.get("model_name")
break
else:
pass
except Exception:
pass
# Reconcile isDeleted entries against the parent model.
# When the CivitAI hash lookup fails (known issue — hashes not
# yet computed), the parser marks the entry isDeleted even though
# the model exists locally.
if is_lora_type:
for lora in loras:
if lora.get("isDeleted") and lora.get("file_name") == model_name:
lora["isDeleted"] = False
lora["existsLocally"] = True
lora["hash"] = model_hash
if parent_civitai_id is not None:
lora["id"] = parent_civitai_id
if parent_model_id is not None:
lora["modelId"] = parent_model_id
if parent_version_name is not None:
lora["version"] = parent_version_name
if parent_model_name is not None:
lora["name"] = parent_model_name
elif is_ckpt_type and checkpoint and checkpoint.get("isDeleted"):
if checkpoint.get("file_name") == model_name:
checkpoint["isDeleted"] = False
checkpoint["existsLocally"] = True
checkpoint["hash"] = model_hash
if parent_civitai_id is not None:
checkpoint["id"] = parent_civitai_id
if parent_model_id is not None:
checkpoint["modelId"] = parent_model_id
if parent_version_name is not None:
checkpoint["version"] = parent_version_name
# Auto-populate parent model only when the image metadata didn't
# contain any resources of that type.
if is_lora_type and not loras:
lora_entry = {
"name": model_name,
"type": "lora",
"weight": 1.0,
"hash": model_hash,
"existsLocally": True,
"localPath": None,
"file_name": model_name,
"thumbnailUrl": "/loras_static/images/no-preview.png",
"baseModel": parsed.get("base_model", ""),
"size": 0,
"downloadUrl": "",
"isDeleted": False,
}
if parent_civitai_id is not None:
lora_entry["id"] = parent_civitai_id
if parent_model_id is not None:
lora_entry["modelId"] = parent_model_id
if parent_version_name is not None:
lora_entry["version"] = parent_version_name
if parent_model_name is not None:
lora_entry["name"] = parent_model_name
loras.insert(0, lora_entry)
elif is_ckpt_type and not checkpoint:
checkpoint = {
"name": model_name,
"type": "checkpoint",
"hash": model_hash,
"file_name": model_name,
"existsLocally": True,
"baseModel": parsed.get("base_model", ""),
"isDeleted": False,
}
if parent_civitai_id is not None:
checkpoint["id"] = parent_civitai_id
if parent_model_id is not None:
checkpoint["modelId"] = parent_model_id
if parent_version_name is not None:
checkpoint["version"] = parent_version_name
if parent_model_name is not None:
checkpoint["name"] = parent_model_name
image_url = image_data.get("url") or ""
image_id = extract_civitai_image_id_from_cdn_url(image_url)
settings_mgr = get_settings_manager()
civitai_host = settings_mgr.get("civitai_host") if settings_mgr else None
page_url = build_civitai_image_page_url(image_id, host=civitai_host) or image_url
recipe_metadata: dict[str, Any] = {
"base_model": parsed.get("base_model") or "",
"loras": loras,
"gen_params": parsed.get("gen_params") or {},
"source_path": page_url,
}
nsfw_level = image_data.get("nsfwLevel")
if isinstance(nsfw_level, int):
recipe_metadata["preview_nsfw_level"] = nsfw_level
if checkpoint:
recipe_metadata["checkpoint"] = checkpoint
image_bytes: bytes | None = None
extension: str | None = None
local_image_path = data.get("local_image_path")
if local_image_path and os.path.exists(local_image_path):
with open(local_image_path, "rb") as f:
image_bytes = f.read()
ext = os.path.splitext(local_image_path)[1].lower()
if ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"):
extension = ext
elif image_data.get("url"):
try:
downloader = await self._downloader_factory()
url = image_data["url"]
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.close()
success, result = await downloader.download_file(
url, tmp.name, use_auth=False
)
if success:
with open(tmp.name, "rb") as f:
image_bytes = f.read()
url_path = url.split("?")[0].split("#")[0]
ext = os.path.splitext(url_path)[1].lower()
if ext:
extension = ext
if os.path.exists(tmp.name):
os.unlink(tmp.name)
except Exception as exc:
self._logger.warning(
"Failed to download image for recipe: %s", exc
)
prompt = (
(parsed.get("gen_params") or {}).get("prompt") or ""
)
if prompt:
name = " ".join(str(prompt).split()[:10])
else:
name = f"Recipe from {model_name}"
save_result = await self._persistence_service.save_recipe(
recipe_scanner=recipe_scanner,
image_bytes=image_bytes,
image_base64=None,
name=name,
tags=[],
metadata=recipe_metadata,
extension=extension,
)
return web.json_response(save_result.payload, status=save_result.status)
except RecipeValidationError as exc:
return web.json_response({"error": str(exc)}, status=400)
except Exception as exc:
self._logger.error(
"Error creating recipe from example: %s", exc, exc_info=True
)
return web.json_response({"error": str(exc)}, status=500)
class RecipeAnalysisHandler:
"""Analyze images to extract recipe metadata."""