From 30b01b8a92bed28dcedc9b98d288bd95e11fe4b8 Mon Sep 17 00:00:00 2001 From: Will Miao Date: Fri, 15 May 2026 18:29:54 +0800 Subject: [PATCH] fix(recipes): offload EXIF to thread pool, throttle concurrent imports, eliminate duplicate Civitai API call - Wrap ExifUtils.extract_image_metadata() with asyncio.to_thread() in both import handlers and analysis_service to prevent Pillow/piexif from blocking ComfyUI's event loop during batch imports. - Add asyncio.Semaphore(2) to import_remote_recipe and import_from_url endpoints to cap concurrent heavy work and prevent event loop starvation. - Pre-fetch Civitai image_info during download and pass it to the recipe enricher, eliminating a redundant get_image_info() API round-trip. --- py/recipes/enrichment.py | 80 ++--- py/routes/handlers/recipe_handlers.py | 384 ++++++++++++------------ py/services/recipes/analysis_service.py | 9 +- 3 files changed, 252 insertions(+), 221 deletions(-) diff --git a/py/recipes/enrichment.py b/py/recipes/enrichment.py index 2df0c8e4..f640bb32 100644 --- a/py/recipes/enrichment.py +++ b/py/recipes/enrichment.py @@ -16,55 +16,65 @@ class RecipeEnricher: async def enrich_recipe( recipe: Dict[str, Any], civitai_client: Any, - request_params: Optional[Dict[str, Any]] = None + request_params: Optional[Dict[str, Any]] = None, + prefetched_civitai_meta_raw: Optional[Dict[str, Any]] = None, + prefetched_model_version_id: Optional[int] = None, ) -> bool: """ Enrich a recipe dictionary in-place with metadata from Civitai and embedded params. - + Args: recipe: The recipe dictionary to enrich. Must have 'gen_params' initialized. civitai_client: Authenticated Civitai client instance. request_params: (Optional) Parameters from a user request (e.g. import). - + prefetched_civitai_meta_raw: (Optional) Pre-fetched raw meta from Civitai + get_image_info, avoiding a duplicate API call. + prefetched_model_version_id: (Optional) Pre-fetched model version ID. + Returns: bool: True if the recipe was modified, False otherwise. """ updated = False gen_params = recipe.get("gen_params", {}) - - # 1. Fetch Civitai Info if available + + # 1. Obtain Civitai metadata civitai_meta = None - model_version_id = None - + model_version_id = prefetched_model_version_id + source_path = recipe.get("source_path", "") - - # Check if it's a Civitai image URL - image_id = extract_civitai_image_id(str(source_path)) - if image_id: - try: - image_info = await civitai_client.get_image_info( - image_id, source_url=str(source_path) - ) - if image_info: - # Handle nested meta often found in Civitai API responses - raw_meta = image_info.get("meta") - if isinstance(raw_meta, dict): - if "meta" in raw_meta and isinstance(raw_meta["meta"], dict): - civitai_meta = raw_meta["meta"] - else: - civitai_meta = raw_meta - - model_version_id = image_info.get("modelVersionId") - - # If not at top level, check resources in meta - if not model_version_id and civitai_meta: - resources = civitai_meta.get("civitaiResources", []) - for res in resources: - if res.get("type") == "checkpoint": - model_version_id = res.get("modelVersionId") - break - except Exception as e: - logger.warning(f"Failed to fetch Civitai image info: {e}") + + if prefetched_civitai_meta_raw is not None: + raw_meta = prefetched_civitai_meta_raw + if isinstance(raw_meta, dict): + if "meta" in raw_meta and isinstance(raw_meta["meta"], dict): + civitai_meta = raw_meta["meta"] + else: + civitai_meta = raw_meta + else: + image_id = extract_civitai_image_id(str(source_path)) + if image_id: + try: + image_info = await civitai_client.get_image_info( + image_id, source_url=str(source_path) + ) + if image_info: + raw_meta = image_info.get("meta") + if isinstance(raw_meta, dict): + if "meta" in raw_meta and isinstance(raw_meta["meta"], dict): + civitai_meta = raw_meta["meta"] + else: + civitai_meta = raw_meta + + model_version_id = image_info.get("modelVersionId") + except Exception as e: + logger.warning(f"Failed to fetch Civitai image info: {e}") + + if not model_version_id and civitai_meta: + resources = civitai_meta.get("civitaiResources", []) + for res in resources: + if res.get("type") == "checkpoint": + model_version_id = res.get("modelVersionId") + break # 2. Merge Parameters # Priority: request_params > civitai_meta > embedded (existing gen_params) diff --git a/py/routes/handlers/recipe_handlers.py b/py/routes/handlers/recipe_handlers.py index 2a7b778b..27afcd82 100644 --- a/py/routes/handlers/recipe_handlers.py +++ b/py/routes/handlers/recipe_handlers.py @@ -609,6 +609,7 @@ class RecipeManagementHandler: self._downloader_factory = downloader_factory self._civitai_client_getter = civitai_client_getter self._ws_manager = ws_manager + self._import_semaphore = asyncio.Semaphore(2) async def save_recipe(self, request: web.Request) -> web.Response: try: @@ -769,114 +770,18 @@ class RecipeManagementHandler: sorted(checkpoint_entry.keys()) if isinstance(checkpoint_entry, dict) else [], ) - # 2. Initial Metadata Construction - metadata: Dict[str, Any] = { - "base_model": params.get("base_model", "") or "", - "loras": lora_entries, - "gen_params": gen_params_request or {}, - "source_path": params.get("source_path") or image_url, - } - - # Checkpoint handling - if checkpoint_entry: - metadata["checkpoint"] = checkpoint_entry - # Ensure checkpoint is also in gen_params for consistency if needed by enricher? - # Actually enricher looks at metadata['checkpoint'], so this is fine. - - # Try to resolve base model from checkpoint if not explicitly provided - if not metadata["base_model"]: - base_model_from_metadata = ( - await self._resolve_base_model_from_checkpoint(checkpoint_entry) - ) - if base_model_from_metadata: - metadata["base_model"] = base_model_from_metadata - - tags = self._parse_tags(params.get("tags")) - - # 3. Download Image - ( - image_bytes, - extension, - civitai_meta_from_download, - ) = await self._download_remote_media(image_url) - - # 4. Extract Embedded Metadata - # Note: We still extract this here because Enricher currently expects 'gen_params' to already be populated - # with embedded data if we want it to merge it. - # However, logic in Enricher merges: request > civitai > embedded. - # So we should gather embedded params and put them into the recipe's gen_params (as initial state) - # OR pass them to enricher to handle? - # The interface of Enricher.enrich_recipe takes `recipe` (with gen_params) and `request_params`. - # So let's extract embedded and put it into recipe['gen_params'] but careful not to overwrite request params. - # Actually, `GenParamsMerger` which `Enricher` uses handles 3 layers. - # But `Enricher` interface is: recipe['gen_params'] (as embedded) + request_params + civitai (fetched internally). - # Wait, `Enricher` fetches Civitai info internally based on URL. - # `civitai_meta_from_download` is returned by `_download_remote_media` which might be useful if URL didn't have ID. - - # Let's extract embedded metadata first - embedded_gen_params = {} - try: - with tempfile.NamedTemporaryFile( - suffix=extension, delete=False - ) as temp_img: - temp_img.write(image_bytes) - temp_img_path = temp_img.name - - try: - raw_embedded = ExifUtils.extract_image_metadata(temp_img_path) - if raw_embedded: - parser = ( - self._analysis_service._recipe_parser_factory.create_parser( - raw_embedded - ) - ) - if parser: - parsed_embedded = await parser.parse_metadata( - raw_embedded, recipe_scanner=recipe_scanner - ) - if parsed_embedded and "gen_params" in parsed_embedded: - embedded_gen_params = parsed_embedded["gen_params"] - else: - embedded_gen_params = {"raw_metadata": raw_embedded} - finally: - if os.path.exists(temp_img_path): - os.unlink(temp_img_path) - except Exception as exc: - self._logger.warning( - "Failed to extract embedded metadata during import: %s", exc + # Throttle concurrent imports to avoid starving ComfyUI's event loop + async with self._import_semaphore: + return await self._do_import_remote_recipe( + image_url=image_url, + name=name, + lora_entries=lora_entries, + checkpoint_entry=checkpoint_entry, + gen_params_request=gen_params_request, + tags=self._parse_tags(params.get("tags")), + base_model=params.get("base_model", "") or "", + source_path=params.get("source_path") or image_url, ) - - # Pre-populate gen_params with embedded data so Enricher treats it as the "base" layer - if embedded_gen_params: - # Merge embedded into existing gen_params (which currently only has request params if any) - # But wait, we want request params to override everything. - # So we should set recipe['gen_params'] = embedded, and pass request params to enricher. - metadata["gen_params"] = embedded_gen_params - - # 5. Enrich with unified logic - # This will fetch Civitai info (if URL matches) and merge: request > civitai > embedded - civitai_client = self._civitai_client_getter() - await RecipeEnricher.enrich_recipe( - recipe=metadata, - civitai_client=civitai_client, - request_params=gen_params_request, # Pass explicit request params here to override - ) - - # If we got civitai_meta from download but Enricher didn't fetch it (e.g. not a civitai URL or failed), - # we might want to manually merge it? - # But usually `import_remote_recipe` is used with Civitai URLs. - # For now, relying on Enricher's internal fetch is consistent with repair. - - result = await self._persistence_service.save_recipe( - recipe_scanner=recipe_scanner, - image_bytes=image_bytes, - image_base64=None, - name=name, - tags=tags, - metadata=metadata, - extension=extension, - ) - return web.json_response(result.payload, status=result.status) except RecipeValidationError as exc: return web.json_response({"error": str(exc)}, status=400) except RecipeDownloadError as exc: @@ -887,6 +792,105 @@ class RecipeManagementHandler: ) return web.json_response({"error": str(exc)}, status=500) + async def _do_import_remote_recipe( + self, + *, + image_url: str, + name: str, + lora_entries: list, + checkpoint_entry: dict, + gen_params_request: dict, + tags: list, + base_model: str, + source_path: str, + ) -> web.Response: + recipe_scanner = self._recipe_scanner_getter() + if recipe_scanner is None: + raise RuntimeError("Recipe scanner unavailable") + + metadata: Dict[str, Any] = { + "base_model": base_model, + "loras": lora_entries, + "gen_params": gen_params_request or {}, + "source_path": source_path, + } + + if checkpoint_entry: + metadata["checkpoint"] = checkpoint_entry + if not metadata["base_model"]: + base_model_from_metadata = ( + await self._resolve_base_model_from_checkpoint(checkpoint_entry) + ) + if base_model_from_metadata: + metadata["base_model"] = base_model_from_metadata + + # Download image + ( + image_bytes, + extension, + civitai_meta_raw, + model_version_id, + ) = await self._download_remote_media(image_url) + + # Extract embedded EXIF metadata (offloaded to thread pool in this call) + embedded_gen_params = {} + try: + with tempfile.NamedTemporaryFile( + suffix=extension, delete=False + ) as temp_img: + temp_img.write(image_bytes) + temp_img_path = temp_img.name + + try: + raw_embedded = await asyncio.to_thread( + ExifUtils.extract_image_metadata, temp_img_path + ) + if raw_embedded: + parser = ( + self._analysis_service._recipe_parser_factory.create_parser( + raw_embedded + ) + ) + if parser: + parsed_embedded = await parser.parse_metadata( + raw_embedded, recipe_scanner=recipe_scanner + ) + if parsed_embedded and "gen_params" in parsed_embedded: + embedded_gen_params = parsed_embedded["gen_params"] + else: + embedded_gen_params = {"raw_metadata": raw_embedded} + finally: + if os.path.exists(temp_img_path): + os.unlink(temp_img_path) + except Exception as exc: + self._logger.warning( + "Failed to extract embedded metadata during import: %s", exc + ) + + if embedded_gen_params: + metadata["gen_params"] = embedded_gen_params + + # Enrich with Civitai API and merge gen_params + civitai_client = self._civitai_client_getter() + await RecipeEnricher.enrich_recipe( + recipe=metadata, + civitai_client=civitai_client, + request_params=gen_params_request, + prefetched_civitai_meta_raw=civitai_meta_raw, + prefetched_model_version_id=model_version_id, + ) + + result = await self._persistence_service.save_recipe( + recipe_scanner=recipe_scanner, + image_bytes=image_bytes, + image_base64=None, + name=name, + tags=tags, + metadata=metadata, + extension=extension, + ) + return web.json_response(result.payload, status=result.status) + async def delete_recipe(self, request: web.Request) -> web.Response: try: await self._ensure_dependencies_ready() @@ -1240,6 +1244,7 @@ class RecipeManagementHandler: file_obj.read(), extension, image_info.get("meta") if civitai_image_id and image_info else None, + image_info.get("modelVersionId") if civitai_image_id and image_info else None, ) except RecipeDownloadError: raise @@ -1351,7 +1356,7 @@ class RecipeManagementHandler: "Could not extract Civitai image ID from URL" ) - # Check for duplicate + # Check for duplicate (fast, before acquiring semaphore) cache = await recipe_scanner.get_cached_data() for recipe in getattr(cache, "raw_data", []): source = recipe.get("source_path") @@ -1365,82 +1370,8 @@ class RecipeManagementHandler: "already_exists": True, }) - # Download image and extract metadata - image_bytes, extension, civitai_meta = ( - await self._download_remote_media(image_url) - ) - - # Extract embedded EXIF metadata - embedded_gen_params = {} - try: - with tempfile.NamedTemporaryFile( - suffix=extension, delete=False - ) as temp_img: - temp_img.write(image_bytes) - temp_img_path = temp_img.name - - try: - raw_embedded = ExifUtils.extract_image_metadata(temp_img_path) - if raw_embedded: - parser = ( - self._analysis_service._recipe_parser_factory.create_parser( - raw_embedded - ) - ) - if parser: - parsed_embedded = await parser.parse_metadata( - raw_embedded, recipe_scanner=recipe_scanner - ) - if parsed_embedded and "gen_params" in parsed_embedded: - embedded_gen_params = parsed_embedded["gen_params"] - finally: - if os.path.exists(temp_img_path): - os.unlink(temp_img_path) - except Exception as exc: - self._logger.warning( - "Failed to extract embedded metadata: %s", exc - ) - - # Build metadata - metadata: Dict[str, Any] = { - "base_model": "", - "loras": [], - "gen_params": embedded_gen_params or {}, - "source_path": image_url, - } - - # Enrich via Civitai API - civitai_client = self._civitai_client_getter() - await RecipeEnricher.enrich_recipe( - recipe=metadata, - civitai_client=civitai_client, - request_params={}, - ) - - # Auto-generate name from prompt or fallback - prompt = ( - metadata.get("gen_params", {}).get("prompt") - or metadata.get("gen_params", {}).get("positivePrompt") - or "" - ) - if prompt: - name = " ".join(str(prompt).split()[:10]) - else: - name = f"Civitai Image {image_id}" - - # Parse tags from params if available - tags = self._parse_tags(request.query.get("tags")) - - result = await self._persistence_service.save_recipe( - recipe_scanner=recipe_scanner, - image_bytes=image_bytes, - image_base64=None, - name=name, - tags=tags, - metadata=metadata, - extension=extension, - ) - return web.json_response(result.payload, status=result.status) + async with self._import_semaphore: + return await self._do_import_from_url(image_url, recipe_scanner) except RecipeValidationError as exc: return web.json_response({"error": str(exc)}, status=400) except RecipeDownloadError as exc: @@ -1451,6 +1382,91 @@ class RecipeManagementHandler: ) return web.json_response({"error": str(exc)}, status=500) + async def _do_import_from_url( + self, + image_url: str, + recipe_scanner: Any, + ) -> web.Response: + image_id = extract_civitai_image_id(image_url) + if not image_id: + raise RecipeValidationError( + "Could not extract Civitai image ID from URL" + ) + + image_bytes, extension, civitai_meta_raw, model_version_id = ( + await self._download_remote_media(image_url) + ) + + # Extract embedded EXIF metadata + embedded_gen_params = {} + try: + with tempfile.NamedTemporaryFile( + suffix=extension, delete=False + ) as temp_img: + temp_img.write(image_bytes) + temp_img_path = temp_img.name + + try: + raw_embedded = await asyncio.to_thread( + ExifUtils.extract_image_metadata, temp_img_path + ) + if raw_embedded: + parser = ( + self._analysis_service._recipe_parser_factory.create_parser( + raw_embedded + ) + ) + if parser: + parsed_embedded = await parser.parse_metadata( + raw_embedded, recipe_scanner=recipe_scanner + ) + if parsed_embedded and "gen_params" in parsed_embedded: + embedded_gen_params = parsed_embedded["gen_params"] + finally: + if os.path.exists(temp_img_path): + os.unlink(temp_img_path) + except Exception as exc: + self._logger.warning( + "Failed to extract embedded metadata: %s", exc + ) + + metadata: Dict[str, Any] = { + "base_model": "", + "loras": [], + "gen_params": embedded_gen_params or {}, + "source_path": image_url, + } + + civitai_client = self._civitai_client_getter() + await RecipeEnricher.enrich_recipe( + recipe=metadata, + civitai_client=civitai_client, + request_params={}, + prefetched_civitai_meta_raw=civitai_meta_raw, + prefetched_model_version_id=model_version_id, + ) + + prompt = ( + metadata.get("gen_params", {}).get("prompt") + or metadata.get("gen_params", {}).get("positivePrompt") + or "" + ) + if prompt: + name = " ".join(str(prompt).split()[:10]) + else: + name = f"Civitai Image {image_id}" + + result = await self._persistence_service.save_recipe( + recipe_scanner=recipe_scanner, + image_bytes=image_bytes, + image_base64=None, + name=name, + tags=[], + metadata=metadata, + extension=extension, + ) + return web.json_response(result.payload, status=result.status) + class RecipeAnalysisHandler: """Analyze images to extract recipe metadata.""" diff --git a/py/services/recipes/analysis_service.py b/py/services/recipes/analysis_service.py index 97f1c6c2..793fcb2c 100644 --- a/py/services/recipes/analysis_service.py +++ b/py/services/recipes/analysis_service.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import base64 import io import os @@ -170,7 +171,9 @@ class RecipeAnalysisService: await self._download_image(url, temp_path) if metadata is None and not is_video: - metadata = self._exif_utils.extract_image_metadata(temp_path) + metadata = await asyncio.to_thread( + self._exif_utils.extract_image_metadata, temp_path + ) return await self._parse_metadata( metadata or {}, @@ -199,7 +202,9 @@ class RecipeAnalysisService: if not os.path.isfile(normalized_path): raise RecipeNotFoundError("File not found") - metadata = self._exif_utils.extract_image_metadata(normalized_path) + metadata = await asyncio.to_thread( + self._exif_utils.extract_image_metadata, normalized_path + ) if not metadata: return self._metadata_not_found_response(normalized_path)