From 2d39b848066f263be74b18c276b8f368bde57913 Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Tue, 3 Jun 2025 14:58:43 +0800 Subject: [PATCH] Add CivitaiApiMetadataParser and improve recipe parsing logic for Civitai images. Also fixes #197 Additional info: Now prioritizes using the Civitai Images API to fetch image and generation metadata. Even NSFW images can now be imported via URL. --- py/recipes/__init__.py | 6 +- py/recipes/factory.py | 41 +++-- py/recipes/parsers/__init__.py | 2 + py/recipes/parsers/civitai_image.py | 248 ++++++++++++++++++++++++++++ py/routes/recipe_routes.py | 65 +++++++- py/services/civitai_client.py | 31 ++++ static/js/managers/ImportManager.js | 13 +- 7 files changed, 383 insertions(+), 23 deletions(-) create mode 100644 py/recipes/parsers/civitai_image.py diff --git a/py/recipes/__init__.py b/py/recipes/__init__.py index 80f294bf..2e209981 100644 --- a/py/recipes/__init__.py +++ b/py/recipes/__init__.py @@ -7,7 +7,8 @@ from .parsers import ( RecipeFormatParser, ComfyMetadataParser, MetaFormatParser, - AutomaticMetadataParser + AutomaticMetadataParser, + CivitaiApiMetadataParser ) __all__ = [ @@ -18,5 +19,6 @@ __all__ = [ 'RecipeFormatParser', 'ComfyMetadataParser', 'MetaFormatParser', - 'AutomaticMetadataParser' + 'AutomaticMetadataParser', + 'CivitaiApiMetadataParser' ] diff --git a/py/recipes/factory.py b/py/recipes/factory.py index f4f0017b..cab1a6be 100644 --- a/py/recipes/factory.py +++ b/py/recipes/factory.py @@ -5,7 +5,8 @@ from .parsers import ( RecipeFormatParser, ComfyMetadataParser, MetaFormatParser, - AutomaticMetadataParser + AutomaticMetadataParser, + CivitaiApiMetadataParser ) from .base import RecipeMetadataParser @@ -15,29 +16,49 @@ class RecipeParserFactory: """Factory for creating recipe metadata parsers""" @staticmethod - def create_parser(user_comment: str) -> RecipeMetadataParser: + def create_parser(metadata) -> RecipeMetadataParser: """ - Create appropriate parser based on the user comment content + Create appropriate parser based on the metadata content Args: - user_comment: The EXIF UserComment string from the image + metadata: The metadata from the image (dict or str) Returns: Appropriate RecipeMetadataParser implementation """ - # Try ComfyMetadataParser first since it requires valid JSON + # First, try CivitaiApiMetadataParser for dict input + if isinstance(metadata, dict): + try: + if CivitaiApiMetadataParser().is_metadata_matching(metadata): + return CivitaiApiMetadataParser() + except Exception as e: + logger.debug(f"CivitaiApiMetadataParser check failed: {e}") + pass + + # Convert dict to string for other parsers that expect string input + try: + import json + metadata_str = json.dumps(metadata) + except Exception as e: + logger.debug(f"Failed to convert dict to JSON string: {e}") + return None + else: + metadata_str = metadata + + # Try ComfyMetadataParser which requires valid JSON try: - if ComfyMetadataParser().is_metadata_matching(user_comment): + if ComfyMetadataParser().is_metadata_matching(metadata_str): return ComfyMetadataParser() except Exception: # If JSON parsing fails, move on to other parsers pass - - if RecipeFormatParser().is_metadata_matching(user_comment): + + # Check other parsers that expect string input + if RecipeFormatParser().is_metadata_matching(metadata_str): return RecipeFormatParser() - elif AutomaticMetadataParser().is_metadata_matching(user_comment): + elif AutomaticMetadataParser().is_metadata_matching(metadata_str): return AutomaticMetadataParser() - elif MetaFormatParser().is_metadata_matching(user_comment): + elif MetaFormatParser().is_metadata_matching(metadata_str): return MetaFormatParser() else: return None diff --git a/py/recipes/parsers/__init__.py b/py/recipes/parsers/__init__.py index 9cc02fa4..436737a4 100644 --- a/py/recipes/parsers/__init__.py +++ b/py/recipes/parsers/__init__.py @@ -4,10 +4,12 @@ from .recipe_format import RecipeFormatParser from .comfy import ComfyMetadataParser from .meta_format import MetaFormatParser from .automatic import AutomaticMetadataParser +from .civitai_image import CivitaiApiMetadataParser __all__ = [ 'RecipeFormatParser', 'ComfyMetadataParser', 'MetaFormatParser', 'AutomaticMetadataParser', + 'CivitaiApiMetadataParser', ] diff --git a/py/recipes/parsers/civitai_image.py b/py/recipes/parsers/civitai_image.py new file mode 100644 index 00000000..824abd3e --- /dev/null +++ b/py/recipes/parsers/civitai_image.py @@ -0,0 +1,248 @@ +"""Parser for Civitai image metadata format.""" + +import json +import logging +from typing import Dict, Any, Union +from ..base import RecipeMetadataParser +from ..constants import GEN_PARAM_KEYS + +logger = logging.getLogger(__name__) + +class CivitaiApiMetadataParser(RecipeMetadataParser): + """Parser for Civitai image metadata format""" + + def is_metadata_matching(self, metadata) -> bool: + """Check if the metadata matches the Civitai image metadata format + + Args: + metadata: The metadata from the image (dict) + + Returns: + bool: True if this parser can handle the metadata + """ + if not metadata or not isinstance(metadata, dict): + return False + + # Check for key markers specific to Civitai image metadata + return any([ + "resources" in metadata, + "civitaiResources" in metadata, + "additionalResources" in metadata + ]) + + async def parse_metadata(self, metadata, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]: + """Parse metadata from Civitai image format + + Args: + metadata: The metadata from the image (dict) + recipe_scanner: Optional recipe scanner service + civitai_client: Optional Civitai API client + + Returns: + Dict containing parsed recipe data + """ + try: + # Initialize result structure + result = { + 'base_model': None, + 'loras': [], + 'gen_params': {}, + 'from_civitai_image': True + } + + # Extract prompt and negative prompt + if "prompt" in metadata: + result["gen_params"]["prompt"] = metadata["prompt"] + + if "negativePrompt" in metadata: + result["gen_params"]["negative_prompt"] = metadata["negativePrompt"] + + # Extract other generation parameters + param_mapping = { + "steps": "steps", + "sampler": "sampler", + "cfgScale": "cfg_scale", + "seed": "seed", + "Size": "size", + "clipSkip": "clip_skip", + } + + for civitai_key, our_key in param_mapping.items(): + if civitai_key in metadata and our_key in GEN_PARAM_KEYS: + result["gen_params"][our_key] = metadata[civitai_key] + + # Extract base model information - directly if available + if "baseModel" in metadata: + result["base_model"] = metadata["baseModel"] + elif "Model hash" in metadata and civitai_client: + model_hash = metadata["Model hash"] + model_info = await civitai_client.get_model_by_hash(model_hash) + if model_info: + result["base_model"] = model_info.get("baseModel", "") + elif "Model" in metadata and isinstance(metadata.get("resources"), list): + # Try to find base model in resources + for resource in metadata.get("resources", []): + if resource.get("type") == "model" and resource.get("name") == metadata.get("Model"): + # This is likely the checkpoint model + if civitai_client and resource.get("hash"): + model_info = await civitai_client.get_model_by_hash(resource.get("hash")) + if model_info: + result["base_model"] = model_info.get("baseModel", "") + + base_model_counts = {} + + # Process standard resources array + if "resources" in metadata and isinstance(metadata["resources"], list): + for resource in metadata["resources"]: + # Modified to process resources without a type field as potential LoRAs + if resource.get("type", "lora") == "lora": + lora_entry = { + 'name': resource.get("name", "Unknown LoRA"), + 'type': "lora", + 'weight': float(resource.get("weight", 1.0)), + 'hash': resource.get("hash", ""), + 'existsLocally': False, + 'localPath': None, + 'file_name': resource.get("name", "Unknown"), + 'thumbnailUrl': '/loras_static/images/no-preview.png', + 'baseModel': '', + 'size': 0, + 'downloadUrl': '', + 'isDeleted': False + } + + # Try to get info from Civitai if hash is available + if lora_entry['hash'] and civitai_client: + try: + lora_hash = lora_entry['hash'] + civitai_info = await civitai_client.get_model_by_hash(lora_hash) + + populated_entry = await self.populate_lora_from_civitai( + lora_entry, + civitai_info, + recipe_scanner, + base_model_counts, + lora_hash + ) + + if populated_entry is None: + continue # Skip invalid LoRA types + + lora_entry = populated_entry + except Exception as e: + logger.error(f"Error fetching Civitai info for LoRA hash {lora_entry['hash']}: {e}") + + result["loras"].append(lora_entry) + + # Process civitaiResources array + if "civitaiResources" in metadata and isinstance(metadata["civitaiResources"], list): + for resource in metadata["civitaiResources"]: + # Modified to process resources without a type field as potential LoRAs + if resource.get("type") in ["lora", "lycoris"] or "type" not in resource: + # Initialize lora entry with the same structure as in automatic.py + lora_entry = { + 'id': str(resource.get("modelVersionId")), + 'modelId': str(resource.get("modelId")) if resource.get("modelId") else None, + 'name': resource.get("modelName", "Unknown LoRA"), + 'version': resource.get("modelVersionName", ""), + 'type': resource.get("type", "lora"), + 'weight': round(float(resource.get("weight", 1.0)), 2), + 'existsLocally': False, + 'thumbnailUrl': '/loras_static/images/no-preview.png', + 'baseModel': '', + 'size': 0, + 'downloadUrl': '', + 'isDeleted': False + } + + # Try to get info from Civitai if modelVersionId is available + if resource.get('modelVersionId') and civitai_client: + try: + version_id = str(resource.get('modelVersionId')) + # Use get_model_version_info instead of get_model_version + civitai_info, error = await civitai_client.get_model_version_info(version_id) + + if error: + logger.warning(f"Error getting model version info: {error}") + continue + + populated_entry = await self.populate_lora_from_civitai( + lora_entry, + civitai_info, + recipe_scanner, + base_model_counts + ) + + if populated_entry is None: + continue # Skip invalid LoRA types + + lora_entry = populated_entry + except Exception as e: + logger.error(f"Error fetching Civitai info for model version {resource.get('modelVersionId')}: {e}") + + result["loras"].append(lora_entry) + + # Process additionalResources array + if "additionalResources" in metadata and isinstance(metadata["additionalResources"], list): + for resource in metadata["additionalResources"]: + # Modified to process resources without a type field as potential LoRAs + if resource.get("type") in ["lora", "lycoris"] or "type" not in resource: + lora_type = resource.get("type", "lora") + name = resource.get("name", "") + + # Extract ID from URN format if available + model_id = None + if name and "civitai:" in name: + parts = name.split("@") + if len(parts) > 1: + model_id = parts[1] + + lora_entry = { + 'name': name, + 'type': lora_type, + 'weight': float(resource.get("strength", 1.0)), + 'hash': "", + 'existsLocally': False, + 'localPath': None, + 'file_name': name, + 'thumbnailUrl': '/loras_static/images/no-preview.png', + 'baseModel': '', + 'size': 0, + 'downloadUrl': '', + 'isDeleted': False + } + + # If we have a model ID and civitai client, try to get more info + if model_id and civitai_client: + try: + # Use get_model_version_info with the model ID + civitai_info, error = await civitai_client.get_model_version_info(model_id) + + if error: + logger.warning(f"Error getting model version info: {error}") + else: + populated_entry = await self.populate_lora_from_civitai( + lora_entry, + civitai_info, + recipe_scanner, + base_model_counts + ) + + if populated_entry is None: + continue # Skip invalid LoRA types + + lora_entry = populated_entry + except Exception as e: + logger.error(f"Error fetching Civitai info for model ID {model_id}: {e}") + + result["loras"].append(lora_entry) + + # If base model wasn't found earlier, use the most common one from LoRAs + if not result["base_model"] and base_model_counts: + result["base_model"] = max(base_model_counts.items(), key=lambda x: x[1])[0] + + return result + + except Exception as e: + logger.error(f"Error parsing Civitai image metadata: {e}", exc_info=True) + return {"error": str(e), "loras": []} diff --git a/py/routes/recipe_routes.py b/py/routes/recipe_routes.py index 955689e3..b0444603 100644 --- a/py/routes/recipe_routes.py +++ b/py/routes/recipe_routes.py @@ -254,6 +254,7 @@ class RecipeRoutes: content_type = request.headers.get('Content-Type', '') is_url_mode = False + metadata = None # Initialize metadata variable if 'multipart/form-data' in content_type: # Handle image upload @@ -287,17 +288,63 @@ class RecipeRoutes: "loras": [] }, status=400) - # Download image from URL - temp_path = download_civitai_image(url) + # Check if this is a Civitai image URL + import re + civitai_image_match = re.match(r'https://civitai\.com/images/(\d+)', url) - if not temp_path: - return web.json_response({ - "error": "Failed to download image from URL", - "loras": [] - }, status=400) + if civitai_image_match: + # Extract image ID and fetch image info using get_image_info + image_id = civitai_image_match.group(1) + image_info = await self.civitai_client.get_image_info(image_id) + + if not image_info: + return web.json_response({ + "error": "Failed to fetch image information from Civitai", + "loras": [] + }, status=400) + + # Get image URL from response + image_url = image_info.get('url') + if not image_url: + return web.json_response({ + "error": "No image URL found in Civitai response", + "loras": [] + }, status=400) + + # Download image directly from URL + session = await self.civitai_client.session + # Create a temporary file to save the downloaded image + with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: + temp_path = temp_file.name + + async with session.get(image_url) as response: + if response.status != 200: + return web.json_response({ + "error": f"Failed to download image from URL: HTTP {response.status}", + "loras": [] + }, status=400) + + with open(temp_path, 'wb') as f: + f.write(await response.read()) + + # Use meta field from image_info as metadata + if 'meta' in image_info: + metadata = image_info['meta'] + + else: + # Not a Civitai image URL, use the original download method + temp_path = download_civitai_image(url) + + if not temp_path: + return web.json_response({ + "error": "Failed to download image from URL", + "loras": [] + }, status=400) - # Extract metadata from the image using ExifUtils - metadata = ExifUtils.extract_image_metadata(temp_path) + # If metadata wasn't obtained from Civitai API, extract it from the image + if metadata is None: + # Extract metadata from the image using ExifUtils + metadata = ExifUtils.extract_image_metadata(temp_path) # If no metadata found, return a more specific error if not metadata: diff --git a/py/services/civitai_client.py b/py/services/civitai_client.py index 1198716c..59abd7d7 100644 --- a/py/services/civitai_client.py +++ b/py/services/civitai_client.py @@ -346,3 +346,34 @@ class CivitaiClient: except Exception as e: logger.error(f"Error getting hash from Civitai: {e}") return None + + async def get_image_info(self, image_id: str) -> Optional[Dict]: + """Fetch image information from Civitai API + + Args: + image_id: The Civitai image ID + + Returns: + Optional[Dict]: The image data or None if not found + """ + try: + session = await self._ensure_fresh_session() + headers = self._get_request_headers() + url = f"{self.base_url}/images?imageId={image_id}&nsfw=X" + + logger.debug(f"Fetching image info for ID: {image_id}") + async with session.get(url, headers=headers) as response: + if response.status == 200: + data = await response.json() + if data and "items" in data and len(data["items"]) > 0: + logger.debug(f"Successfully fetched image info for ID: {image_id}") + return data["items"][0] + logger.warning(f"No image found with ID: {image_id}") + return None + + logger.error(f"Failed to fetch image info for ID: {image_id} (status {response.status})") + return None + except Exception as e: + error_msg = f"Error fetching image info: {e}" + logger.error(error_msg) + return None diff --git a/static/js/managers/ImportManager.js b/static/js/managers/ImportManager.js index 831ae148..5a5f1fb7 100644 --- a/static/js/managers/ImportManager.js +++ b/static/js/managers/ImportManager.js @@ -58,8 +58,17 @@ export class ImportManager { this.stepManager.removeInjectedStyles(); }); - // Verify visibility - setTimeout(() => this.ensureModalVisible(), 50); + // Verify visibility and focus on URL input + setTimeout(() => { + this.ensureModalVisible(); + + // Ensure URL option is selected and focus on the input + this.toggleImportMode('url'); + const urlInput = document.getElementById('imageUrlInput'); + if (urlInput) { + urlInput.focus(); + } + }, 50); } resetSteps() {