diff --git a/py/routes/handlers/recipe_handlers.py b/py/routes/handlers/recipe_handlers.py index 27afcd82..e338515f 100644 --- a/py/routes/handlers/recipe_handlers.py +++ b/py/routes/handlers/recipe_handlers.py @@ -834,6 +834,7 @@ class RecipeManagementHandler: # Extract embedded EXIF metadata (offloaded to thread pool in this call) embedded_gen_params = {} + parsed_embedded = None try: with tempfile.NamedTemporaryFile( suffix=extension, delete=False @@ -867,10 +868,35 @@ class RecipeManagementHandler: "Failed to extract embedded metadata during import: %s", exc ) + # Fallback: if EXIF extraction yielded nothing, parse Civitai API meta directly + # (same approach as analyze_remote_image — downloaded Civitai images often + # have no embedded EXIF but the API meta contains resources/hashes) + if parsed_embedded is None and civitai_meta_raw: + civitai_inner_meta = civitai_meta_raw + if isinstance(civitai_meta_raw, dict) and "meta" in civitai_meta_raw: + civitai_inner_meta = civitai_meta_raw["meta"] + if isinstance(civitai_inner_meta, dict): + parser = self._analysis_service._recipe_parser_factory.create_parser( + civitai_inner_meta + ) + if parser: + parsed_embedded = await parser.parse_metadata( + civitai_inner_meta, recipe_scanner=recipe_scanner + ) + if parsed_embedded and "gen_params" in parsed_embedded: + embedded_gen_params = parsed_embedded["gen_params"] + if embedded_gen_params: metadata["gen_params"] = embedded_gen_params - # Enrich with Civitai API and merge gen_params + if parsed_embedded: + parsed_loras = parsed_embedded.get("loras") + if parsed_loras and not metadata.get("loras"): + metadata["loras"] = parsed_loras + parsed_model = parsed_embedded.get("model") + if parsed_model and not metadata.get("checkpoint"): + metadata["checkpoint"] = parsed_model + civitai_client = self._civitai_client_getter() await RecipeEnricher.enrich_recipe( recipe=metadata, @@ -1192,7 +1218,7 @@ class RecipeManagementHandler: "exclude": False, } - async def _download_remote_media(self, image_url: str) -> tuple[bytes, str, Any]: + async def _download_remote_media(self, image_url: str) -> tuple[bytes, str, Any, Any]: civitai_client = self._civitai_client_getter() downloader = await self._downloader_factory() temp_path = None @@ -1240,11 +1266,18 @@ class RecipeManagementHandler: extension = ".webp" # Default to webp if unknown with open(temp_path, "rb") as file_obj: + model_ver_id = None + if civitai_image_id and image_info: + model_ver_id = image_info.get("modelVersionId") + if not model_ver_id: + ids = image_info.get("modelVersionIds") + if isinstance(ids, list) and ids: + model_ver_id = ids[0] return ( file_obj.read(), extension, image_info.get("meta") if civitai_image_id and image_info else None, - image_info.get("modelVersionId") if civitai_image_id and image_info else None, + model_ver_id, ) except RecipeDownloadError: raise @@ -1399,6 +1432,7 @@ class RecipeManagementHandler: # Extract embedded EXIF metadata embedded_gen_params = {} + parsed_embedded = None try: with tempfile.NamedTemporaryFile( suffix=extension, delete=False @@ -1430,6 +1464,21 @@ class RecipeManagementHandler: "Failed to extract embedded metadata: %s", exc ) + if parsed_embedded is None and civitai_meta_raw: + civitai_inner_meta = civitai_meta_raw + if isinstance(civitai_meta_raw, dict) and "meta" in civitai_meta_raw: + civitai_inner_meta = civitai_meta_raw["meta"] + if isinstance(civitai_inner_meta, dict): + parser = self._analysis_service._recipe_parser_factory.create_parser( + civitai_inner_meta + ) + if parser: + parsed_embedded = await parser.parse_metadata( + civitai_inner_meta, recipe_scanner=recipe_scanner + ) + if parsed_embedded and "gen_params" in parsed_embedded: + embedded_gen_params = parsed_embedded["gen_params"] + metadata: Dict[str, Any] = { "base_model": "", "loras": [], @@ -1437,6 +1486,14 @@ class RecipeManagementHandler: "source_path": image_url, } + if parsed_embedded: + parsed_loras = parsed_embedded.get("loras") + if parsed_loras and not metadata.get("loras"): + metadata["loras"] = parsed_loras + parsed_model = parsed_embedded.get("model") + if parsed_model and not metadata.get("checkpoint"): + metadata["checkpoint"] = parsed_model + civitai_client = self._civitai_client_getter() await RecipeEnricher.enrich_recipe( recipe=metadata, diff --git a/py/services/recipes/analysis_service.py b/py/services/recipes/analysis_service.py index 793fcb2c..04bca09c 100644 --- a/py/services/recipes/analysis_service.py +++ b/py/services/recipes/analysis_service.py @@ -15,6 +15,7 @@ from PIL import Image from ...utils.utils import calculate_recipe_fingerprint from ...utils.civitai_utils import extract_civitai_image_id, rewrite_preview_url +from ...recipes.enrichment import RecipeEnricher from .errors import ( RecipeDownloadError, RecipeNotFoundError, @@ -175,7 +176,7 @@ class RecipeAnalysisService: self._exif_utils.extract_image_metadata, temp_path ) - return await self._parse_metadata( + result = await self._parse_metadata( metadata or {}, recipe_scanner=recipe_scanner, image_path=temp_path, @@ -183,6 +184,37 @@ class RecipeAnalysisService: is_video=is_video, extension=extension, ) + + if civitai_image_id and image_info and not result.payload.get("error"): + mvid = image_info.get("modelVersionId") + if not mvid: + mvids = image_info.get("modelVersionIds") + if isinstance(mvids, list) and mvids: + mvid = mvids[0] + + recipe_for_enrich = { + "gen_params": result.payload.get("gen_params", {}), + "loras": result.payload.get("loras", []), + "base_model": result.payload.get("base_model", "") or "", + "checkpoint": result.payload.get("checkpoint") or result.payload.get("model"), + "source_path": url, + } + + await RecipeEnricher.enrich_recipe( + recipe=recipe_for_enrich, + civitai_client=civitai_client, + request_params=None, + prefetched_civitai_meta_raw=image_info.get("meta"), + prefetched_model_version_id=mvid, + ) + + result.payload["gen_params"] = recipe_for_enrich["gen_params"] + if recipe_for_enrich.get("checkpoint"): + result.payload["checkpoint"] = recipe_for_enrich["checkpoint"] + if recipe_for_enrich.get("base_model"): + result.payload["base_model"] = recipe_for_enrich["base_model"] + + return result finally: if temp_path: self._safe_cleanup(temp_path)