fix(recipes): offload EXIF to thread pool, throttle concurrent imports, eliminate duplicate Civitai API call

- Wrap ExifUtils.extract_image_metadata() with asyncio.to_thread() in
  both import handlers and analysis_service to prevent Pillow/piexif
  from blocking ComfyUI's event loop during batch imports.
- Add asyncio.Semaphore(2) to import_remote_recipe and import_from_url
  endpoints to cap concurrent heavy work and prevent event loop starvation.
- Pre-fetch Civitai image_info during download and pass it to the recipe
  enricher, eliminating a redundant get_image_info() API round-trip.
This commit is contained in:
Will Miao
2026-05-15 18:29:54 +08:00
parent a105cb322b
commit 30b01b8a92
3 changed files with 252 additions and 221 deletions

View File

@@ -16,7 +16,9 @@ class RecipeEnricher:
async def enrich_recipe( async def enrich_recipe(
recipe: Dict[str, Any], recipe: Dict[str, Any],
civitai_client: Any, civitai_client: Any,
request_params: Optional[Dict[str, Any]] = None request_params: Optional[Dict[str, Any]] = None,
prefetched_civitai_meta_raw: Optional[Dict[str, Any]] = None,
prefetched_model_version_id: Optional[int] = None,
) -> bool: ) -> bool:
""" """
Enrich a recipe dictionary in-place with metadata from Civitai and embedded params. Enrich a recipe dictionary in-place with metadata from Civitai and embedded params.
@@ -25,6 +27,9 @@ class RecipeEnricher:
recipe: The recipe dictionary to enrich. Must have 'gen_params' initialized. recipe: The recipe dictionary to enrich. Must have 'gen_params' initialized.
civitai_client: Authenticated Civitai client instance. civitai_client: Authenticated Civitai client instance.
request_params: (Optional) Parameters from a user request (e.g. import). request_params: (Optional) Parameters from a user request (e.g. import).
prefetched_civitai_meta_raw: (Optional) Pre-fetched raw meta from Civitai
get_image_info, avoiding a duplicate API call.
prefetched_model_version_id: (Optional) Pre-fetched model version ID.
Returns: Returns:
bool: True if the recipe was modified, False otherwise. bool: True if the recipe was modified, False otherwise.
@@ -32,13 +37,20 @@ class RecipeEnricher:
updated = False updated = False
gen_params = recipe.get("gen_params", {}) gen_params = recipe.get("gen_params", {})
# 1. Fetch Civitai Info if available # 1. Obtain Civitai metadata
civitai_meta = None civitai_meta = None
model_version_id = None model_version_id = prefetched_model_version_id
source_path = recipe.get("source_path", "") source_path = recipe.get("source_path", "")
# Check if it's a Civitai image URL if prefetched_civitai_meta_raw is not None:
raw_meta = prefetched_civitai_meta_raw
if isinstance(raw_meta, dict):
if "meta" in raw_meta and isinstance(raw_meta["meta"], dict):
civitai_meta = raw_meta["meta"]
else:
civitai_meta = raw_meta
else:
image_id = extract_civitai_image_id(str(source_path)) image_id = extract_civitai_image_id(str(source_path))
if image_id: if image_id:
try: try:
@@ -46,7 +58,6 @@ class RecipeEnricher:
image_id, source_url=str(source_path) image_id, source_url=str(source_path)
) )
if image_info: if image_info:
# Handle nested meta often found in Civitai API responses
raw_meta = image_info.get("meta") raw_meta = image_info.get("meta")
if isinstance(raw_meta, dict): if isinstance(raw_meta, dict):
if "meta" in raw_meta and isinstance(raw_meta["meta"], dict): if "meta" in raw_meta and isinstance(raw_meta["meta"], dict):
@@ -55,16 +66,15 @@ class RecipeEnricher:
civitai_meta = raw_meta civitai_meta = raw_meta
model_version_id = image_info.get("modelVersionId") model_version_id = image_info.get("modelVersionId")
except Exception as e:
logger.warning(f"Failed to fetch Civitai image info: {e}")
# If not at top level, check resources in meta
if not model_version_id and civitai_meta: if not model_version_id and civitai_meta:
resources = civitai_meta.get("civitaiResources", []) resources = civitai_meta.get("civitaiResources", [])
for res in resources: for res in resources:
if res.get("type") == "checkpoint": if res.get("type") == "checkpoint":
model_version_id = res.get("modelVersionId") model_version_id = res.get("modelVersionId")
break break
except Exception as e:
logger.warning(f"Failed to fetch Civitai image info: {e}")
# 2. Merge Parameters # 2. Merge Parameters
# Priority: request_params > civitai_meta > embedded (existing gen_params) # Priority: request_params > civitai_meta > embedded (existing gen_params)

View File

@@ -609,6 +609,7 @@ class RecipeManagementHandler:
self._downloader_factory = downloader_factory self._downloader_factory = downloader_factory
self._civitai_client_getter = civitai_client_getter self._civitai_client_getter = civitai_client_getter
self._ws_manager = ws_manager self._ws_manager = ws_manager
self._import_semaphore = asyncio.Semaphore(2)
async def save_recipe(self, request: web.Request) -> web.Response: async def save_recipe(self, request: web.Request) -> web.Response:
try: try:
@@ -769,21 +770,53 @@ class RecipeManagementHandler:
sorted(checkpoint_entry.keys()) if isinstance(checkpoint_entry, dict) else [], sorted(checkpoint_entry.keys()) if isinstance(checkpoint_entry, dict) else [],
) )
# 2. Initial Metadata Construction # Throttle concurrent imports to avoid starving ComfyUI's event loop
async with self._import_semaphore:
return await self._do_import_remote_recipe(
image_url=image_url,
name=name,
lora_entries=lora_entries,
checkpoint_entry=checkpoint_entry,
gen_params_request=gen_params_request,
tags=self._parse_tags(params.get("tags")),
base_model=params.get("base_model", "") or "",
source_path=params.get("source_path") or image_url,
)
except RecipeValidationError as exc:
return web.json_response({"error": str(exc)}, status=400)
except RecipeDownloadError as exc:
return web.json_response({"error": str(exc)}, status=400)
except Exception as exc:
self._logger.error(
"Error importing recipe from remote source: %s", exc, exc_info=True
)
return web.json_response({"error": str(exc)}, status=500)
async def _do_import_remote_recipe(
self,
*,
image_url: str,
name: str,
lora_entries: list,
checkpoint_entry: dict,
gen_params_request: dict,
tags: list,
base_model: str,
source_path: str,
) -> web.Response:
recipe_scanner = self._recipe_scanner_getter()
if recipe_scanner is None:
raise RuntimeError("Recipe scanner unavailable")
metadata: Dict[str, Any] = { metadata: Dict[str, Any] = {
"base_model": params.get("base_model", "") or "", "base_model": base_model,
"loras": lora_entries, "loras": lora_entries,
"gen_params": gen_params_request or {}, "gen_params": gen_params_request or {},
"source_path": params.get("source_path") or image_url, "source_path": source_path,
} }
# Checkpoint handling
if checkpoint_entry: if checkpoint_entry:
metadata["checkpoint"] = checkpoint_entry metadata["checkpoint"] = checkpoint_entry
# Ensure checkpoint is also in gen_params for consistency if needed by enricher?
# Actually enricher looks at metadata['checkpoint'], so this is fine.
# Try to resolve base model from checkpoint if not explicitly provided
if not metadata["base_model"]: if not metadata["base_model"]:
base_model_from_metadata = ( base_model_from_metadata = (
await self._resolve_base_model_from_checkpoint(checkpoint_entry) await self._resolve_base_model_from_checkpoint(checkpoint_entry)
@@ -791,29 +824,15 @@ class RecipeManagementHandler:
if base_model_from_metadata: if base_model_from_metadata:
metadata["base_model"] = base_model_from_metadata metadata["base_model"] = base_model_from_metadata
tags = self._parse_tags(params.get("tags")) # Download image
# 3. Download Image
( (
image_bytes, image_bytes,
extension, extension,
civitai_meta_from_download, civitai_meta_raw,
model_version_id,
) = await self._download_remote_media(image_url) ) = await self._download_remote_media(image_url)
# 4. Extract Embedded Metadata # Extract embedded EXIF metadata (offloaded to thread pool in this call)
# Note: We still extract this here because Enricher currently expects 'gen_params' to already be populated
# with embedded data if we want it to merge it.
# However, logic in Enricher merges: request > civitai > embedded.
# So we should gather embedded params and put them into the recipe's gen_params (as initial state)
# OR pass them to enricher to handle?
# The interface of Enricher.enrich_recipe takes `recipe` (with gen_params) and `request_params`.
# So let's extract embedded and put it into recipe['gen_params'] but careful not to overwrite request params.
# Actually, `GenParamsMerger` which `Enricher` uses handles 3 layers.
# But `Enricher` interface is: recipe['gen_params'] (as embedded) + request_params + civitai (fetched internally).
# Wait, `Enricher` fetches Civitai info internally based on URL.
# `civitai_meta_from_download` is returned by `_download_remote_media` which might be useful if URL didn't have ID.
# Let's extract embedded metadata first
embedded_gen_params = {} embedded_gen_params = {}
try: try:
with tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(
@@ -823,7 +842,9 @@ class RecipeManagementHandler:
temp_img_path = temp_img.name temp_img_path = temp_img.name
try: try:
raw_embedded = ExifUtils.extract_image_metadata(temp_img_path) raw_embedded = await asyncio.to_thread(
ExifUtils.extract_image_metadata, temp_img_path
)
if raw_embedded: if raw_embedded:
parser = ( parser = (
self._analysis_service._recipe_parser_factory.create_parser( self._analysis_service._recipe_parser_factory.create_parser(
@@ -846,27 +867,19 @@ class RecipeManagementHandler:
"Failed to extract embedded metadata during import: %s", exc "Failed to extract embedded metadata during import: %s", exc
) )
# Pre-populate gen_params with embedded data so Enricher treats it as the "base" layer
if embedded_gen_params: if embedded_gen_params:
# Merge embedded into existing gen_params (which currently only has request params if any)
# But wait, we want request params to override everything.
# So we should set recipe['gen_params'] = embedded, and pass request params to enricher.
metadata["gen_params"] = embedded_gen_params metadata["gen_params"] = embedded_gen_params
# 5. Enrich with unified logic # Enrich with Civitai API and merge gen_params
# This will fetch Civitai info (if URL matches) and merge: request > civitai > embedded
civitai_client = self._civitai_client_getter() civitai_client = self._civitai_client_getter()
await RecipeEnricher.enrich_recipe( await RecipeEnricher.enrich_recipe(
recipe=metadata, recipe=metadata,
civitai_client=civitai_client, civitai_client=civitai_client,
request_params=gen_params_request, # Pass explicit request params here to override request_params=gen_params_request,
prefetched_civitai_meta_raw=civitai_meta_raw,
prefetched_model_version_id=model_version_id,
) )
# If we got civitai_meta from download but Enricher didn't fetch it (e.g. not a civitai URL or failed),
# we might want to manually merge it?
# But usually `import_remote_recipe` is used with Civitai URLs.
# For now, relying on Enricher's internal fetch is consistent with repair.
result = await self._persistence_service.save_recipe( result = await self._persistence_service.save_recipe(
recipe_scanner=recipe_scanner, recipe_scanner=recipe_scanner,
image_bytes=image_bytes, image_bytes=image_bytes,
@@ -877,15 +890,6 @@ class RecipeManagementHandler:
extension=extension, extension=extension,
) )
return web.json_response(result.payload, status=result.status) return web.json_response(result.payload, status=result.status)
except RecipeValidationError as exc:
return web.json_response({"error": str(exc)}, status=400)
except RecipeDownloadError as exc:
return web.json_response({"error": str(exc)}, status=400)
except Exception as exc:
self._logger.error(
"Error importing recipe from remote source: %s", exc, exc_info=True
)
return web.json_response({"error": str(exc)}, status=500)
async def delete_recipe(self, request: web.Request) -> web.Response: async def delete_recipe(self, request: web.Request) -> web.Response:
try: try:
@@ -1240,6 +1244,7 @@ class RecipeManagementHandler:
file_obj.read(), file_obj.read(),
extension, extension,
image_info.get("meta") if civitai_image_id and image_info else None, image_info.get("meta") if civitai_image_id and image_info else None,
image_info.get("modelVersionId") if civitai_image_id and image_info else None,
) )
except RecipeDownloadError: except RecipeDownloadError:
raise raise
@@ -1351,7 +1356,7 @@ class RecipeManagementHandler:
"Could not extract Civitai image ID from URL" "Could not extract Civitai image ID from URL"
) )
# Check for duplicate # Check for duplicate (fast, before acquiring semaphore)
cache = await recipe_scanner.get_cached_data() cache = await recipe_scanner.get_cached_data()
for recipe in getattr(cache, "raw_data", []): for recipe in getattr(cache, "raw_data", []):
source = recipe.get("source_path") source = recipe.get("source_path")
@@ -1365,8 +1370,30 @@ class RecipeManagementHandler:
"already_exists": True, "already_exists": True,
}) })
# Download image and extract metadata async with self._import_semaphore:
image_bytes, extension, civitai_meta = ( return await self._do_import_from_url(image_url, recipe_scanner)
except RecipeValidationError as exc:
return web.json_response({"error": str(exc)}, status=400)
except RecipeDownloadError as exc:
return web.json_response({"error": str(exc)}, status=400)
except Exception as exc:
self._logger.error(
"Error importing recipe from URL: %s", exc, exc_info=True
)
return web.json_response({"error": str(exc)}, status=500)
async def _do_import_from_url(
self,
image_url: str,
recipe_scanner: Any,
) -> web.Response:
image_id = extract_civitai_image_id(image_url)
if not image_id:
raise RecipeValidationError(
"Could not extract Civitai image ID from URL"
)
image_bytes, extension, civitai_meta_raw, model_version_id = (
await self._download_remote_media(image_url) await self._download_remote_media(image_url)
) )
@@ -1380,7 +1407,9 @@ class RecipeManagementHandler:
temp_img_path = temp_img.name temp_img_path = temp_img.name
try: try:
raw_embedded = ExifUtils.extract_image_metadata(temp_img_path) raw_embedded = await asyncio.to_thread(
ExifUtils.extract_image_metadata, temp_img_path
)
if raw_embedded: if raw_embedded:
parser = ( parser = (
self._analysis_service._recipe_parser_factory.create_parser( self._analysis_service._recipe_parser_factory.create_parser(
@@ -1401,7 +1430,6 @@ class RecipeManagementHandler:
"Failed to extract embedded metadata: %s", exc "Failed to extract embedded metadata: %s", exc
) )
# Build metadata
metadata: Dict[str, Any] = { metadata: Dict[str, Any] = {
"base_model": "", "base_model": "",
"loras": [], "loras": [],
@@ -1409,15 +1437,15 @@ class RecipeManagementHandler:
"source_path": image_url, "source_path": image_url,
} }
# Enrich via Civitai API
civitai_client = self._civitai_client_getter() civitai_client = self._civitai_client_getter()
await RecipeEnricher.enrich_recipe( await RecipeEnricher.enrich_recipe(
recipe=metadata, recipe=metadata,
civitai_client=civitai_client, civitai_client=civitai_client,
request_params={}, request_params={},
prefetched_civitai_meta_raw=civitai_meta_raw,
prefetched_model_version_id=model_version_id,
) )
# Auto-generate name from prompt or fallback
prompt = ( prompt = (
metadata.get("gen_params", {}).get("prompt") metadata.get("gen_params", {}).get("prompt")
or metadata.get("gen_params", {}).get("positivePrompt") or metadata.get("gen_params", {}).get("positivePrompt")
@@ -1428,28 +1456,16 @@ class RecipeManagementHandler:
else: else:
name = f"Civitai Image {image_id}" name = f"Civitai Image {image_id}"
# Parse tags from params if available
tags = self._parse_tags(request.query.get("tags"))
result = await self._persistence_service.save_recipe( result = await self._persistence_service.save_recipe(
recipe_scanner=recipe_scanner, recipe_scanner=recipe_scanner,
image_bytes=image_bytes, image_bytes=image_bytes,
image_base64=None, image_base64=None,
name=name, name=name,
tags=tags, tags=[],
metadata=metadata, metadata=metadata,
extension=extension, extension=extension,
) )
return web.json_response(result.payload, status=result.status) return web.json_response(result.payload, status=result.status)
except RecipeValidationError as exc:
return web.json_response({"error": str(exc)}, status=400)
except RecipeDownloadError as exc:
return web.json_response({"error": str(exc)}, status=400)
except Exception as exc:
self._logger.error(
"Error importing recipe from URL: %s", exc, exc_info=True
)
return web.json_response({"error": str(exc)}, status=500)
class RecipeAnalysisHandler: class RecipeAnalysisHandler:

View File

@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import base64 import base64
import io import io
import os import os
@@ -170,7 +171,9 @@ class RecipeAnalysisService:
await self._download_image(url, temp_path) await self._download_image(url, temp_path)
if metadata is None and not is_video: if metadata is None and not is_video:
metadata = self._exif_utils.extract_image_metadata(temp_path) metadata = await asyncio.to_thread(
self._exif_utils.extract_image_metadata, temp_path
)
return await self._parse_metadata( return await self._parse_metadata(
metadata or {}, metadata or {},
@@ -199,7 +202,9 @@ class RecipeAnalysisService:
if not os.path.isfile(normalized_path): if not os.path.isfile(normalized_path):
raise RecipeNotFoundError("File not found") raise RecipeNotFoundError("File not found")
metadata = self._exif_utils.extract_image_metadata(normalized_path) metadata = await asyncio.to_thread(
self._exif_utils.extract_image_metadata, normalized_path
)
if not metadata: if not metadata:
return self._metadata_not_found_response(normalized_path) return self._metadata_not_found_response(normalized_path)