From e2191ab4b42ecbe87b76ab76c6c7b7cdcb5ee045 Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Tue, 18 Mar 2025 18:54:22 +0800 Subject: [PATCH] Refactor recipe metadata processing in RecipeRoutes - Introduced a new RecipeParserFactory to streamline the parsing of recipe metadata from user comments, supporting multiple formats. - Removed legacy metadata extraction logic from RecipeRoutes, delegating responsibilities to the new parser classes. - Enhanced error handling for cases where no valid parser is found, ensuring graceful responses. - Updated the RecipeScanner to improve the handling of LoRA metadata and reduce logging verbosity for better performance. --- py/routes/recipe_routes.py | 203 ++----------------- py/services/recipe_scanner.py | 28 +-- py/utils/exif_utils.py | 71 ------- py/utils/recipe_parsers.py | 355 ++++++++++++++++++++++++++++++++++ 4 files changed, 373 insertions(+), 284 deletions(-) create mode 100644 py/utils/recipe_parsers.py diff --git a/py/routes/recipe_routes.py b/py/routes/recipe_routes.py index f67131f2..09fd6e43 100644 --- a/py/routes/recipe_routes.py +++ b/py/routes/recipe_routes.py @@ -8,6 +8,7 @@ import json import aiohttp import asyncio from ..utils.exif_utils import ExifUtils +from ..utils.recipe_parsers import RecipeParserFactory from ..services.civitai_client import CivitaiClient from ..services.recipe_scanner import RecipeScanner @@ -220,197 +221,27 @@ class RecipeRoutes: "loras": [] # Return empty loras array to prevent client-side errors }, status=200) # Return 200 instead of 400 to handle gracefully - # First, check if this image has recipe metadata from a previous share - recipe_metadata = ExifUtils.extract_recipe_metadata(user_comment) - if recipe_metadata: - logger.info("Found existing recipe metadata in image") - - # Process the recipe metadata - loras = [] - for lora in recipe_metadata.get('loras', []): - # Convert recipe lora format to frontend format - lora_entry = { - 'id': lora.get('modelVersionId', ''), - 'name': lora.get('modelName', ''), - 'version': lora.get('modelVersionName', ''), - 'type': 'lora', - 'weight': lora.get('strength', 1.0), - 'file_name': lora.get('file_name', ''), - 'hash': lora.get('hash', '') - } - - # Check if this LoRA exists locally by SHA256 hash - if lora.get('hash'): - exists_locally = self.recipe_scanner._lora_scanner.has_lora_hash(lora['hash']) - if exists_locally: - lora_entry['existsLocally'] = True - - lora_cache = await self.recipe_scanner._lora_scanner.get_cached_data() - lora_item = next((item for item in lora_cache.raw_data if item['sha256'] == lora['hash']), None) - if lora_item: - lora_entry['localPath'] = lora_item['file_path'] - lora_entry['file_name'] = lora_item['file_name'] - lora_entry['size'] = lora_item['size'] - lora_entry['thumbnailUrl'] = config.get_preview_static_url(lora_item['preview_url']) - - else: - lora_entry['existsLocally'] = False - lora_entry['localPath'] = None - - # Try to get additional info from Civitai if we have a model version ID - if lora.get('modelVersionId'): - try: - civitai_info = await self.civitai_client.get_model_version_info(lora['modelVersionId']) - if civitai_info and civitai_info.get("error") != "Model not found": - # Get thumbnail URL from first image - if 'images' in civitai_info and civitai_info['images']: - lora_entry['thumbnailUrl'] = civitai_info['images'][0].get('url', '') - - # Get base model - lora_entry['baseModel'] = civitai_info.get('baseModel', '') - - # Get download URL - lora_entry['downloadUrl'] = civitai_info.get('downloadUrl', '') - - # Get size from files if available - if 'files' in civitai_info: - model_file = next((file for file in civitai_info.get('files', []) - if file.get('type') == 'Model'), None) - if model_file: - lora_entry['size'] = model_file.get('sizeKB', 0) * 1024 - else: - lora_entry['isDeleted'] = True - lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png' - except Exception as e: - logger.error(f"Error fetching Civitai info for LoRA: {e}") - lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png' - - loras.append(lora_entry) + # Use the parser factory to get the appropriate parser + parser = RecipeParserFactory.create_parser(user_comment) - logger.info(f"Found {len(loras)} loras in recipe metadata") - + if parser is None: return web.json_response({ - 'base_model': recipe_metadata.get('base_model', ''), - 'loras': loras, - 'gen_params': recipe_metadata.get('gen_params', {}), - 'tags': recipe_metadata.get('tags', []), - 'title': recipe_metadata.get('title', ''), - 'from_recipe_metadata': True - }) + "error": "No parser found for this image", + "loras": [] # Return empty loras array to prevent client-side errors + }, status=200) # Return 200 instead of 400 to handle gracefully - # If no recipe metadata, parse the standard metadata - metadata = ExifUtils.parse_recipe_metadata(user_comment) + # Parse the metadata + result = await parser.parse_metadata( + user_comment, + recipe_scanner=self.recipe_scanner, + civitai_client=self.civitai_client + ) - # Look for Civitai resources in the metadata - civitai_resources = metadata.get('loras', []) - checkpoint = metadata.get('checkpoint') + # Check for errors + if "error" in result and not result.get("loras"): + return web.json_response(result, status=200) - if not civitai_resources and not checkpoint: - return web.json_response({ - "error": "No LoRA information found in this image", - "loras": [] # Return empty loras array - }, status=200) # Return 200 instead of 400 - - # Process LoRAs and collect base models - base_model_counts = {} - loras = [] - - # Process LoRAs - for resource in civitai_resources: - # Get model version ID - model_version_id = resource.get('modelVersionId') - if not model_version_id: - continue - - # Initialize lora entry with default values - lora_entry = { - 'id': model_version_id, - 'name': resource.get('modelName', ''), - 'version': resource.get('modelVersionName', ''), - 'type': resource.get('type', 'lora'), - 'weight': resource.get('weight', 1.0), - 'existsLocally': False, - 'localPath': None, - 'file_name': '', - 'hash': '', - 'thumbnailUrl': '', - 'baseModel': '', - 'size': 0, - 'downloadUrl': '', - 'isDeleted': False # New flag to indicate if the LoRA is deleted from Civitai - } - - # Get additional info from Civitai - civitai_info = await self.civitai_client.get_model_version_info(model_version_id) - - # Check if this LoRA exists locally by SHA256 hash - if civitai_info and civitai_info.get("error") != "Model not found": - # LoRA exists on Civitai, process its information - if 'files' in civitai_info: - # Find the model file (type="Model") in the files list - model_file = next((file for file in civitai_info.get('files', []) - if file.get('type') == 'Model'), None) - - if model_file: - sha256 = model_file.get('hashes', {}).get('SHA256', '') - if sha256: - exists_locally = self.recipe_scanner._lora_scanner.has_lora_hash(sha256) - if exists_locally: - local_path = self.recipe_scanner._lora_scanner.get_lora_path_by_hash(sha256) - lora_entry['existsLocally'] = True - lora_entry['localPath'] = local_path - lora_entry['file_name'] = os.path.splitext(os.path.basename(local_path))[0] - else: - # For missing LoRAs, get file_name from model_file.name - file_name = model_file.get('name', '') - lora_entry['file_name'] = os.path.splitext(file_name)[0] if file_name else '' - - lora_entry['hash'] = sha256 - lora_entry['size'] = model_file.get('sizeKB', 0) * 1024 - - # Get thumbnail URL from first image - if 'images' in civitai_info and civitai_info['images']: - lora_entry['thumbnailUrl'] = civitai_info['images'][0].get('url', '') - - # Get base model and update counts - current_base_model = civitai_info.get('baseModel', '') - lora_entry['baseModel'] = current_base_model - if current_base_model: - base_model_counts[current_base_model] = base_model_counts.get(current_base_model, 0) + 1 - - # Get download URL - lora_entry['downloadUrl'] = civitai_info.get('downloadUrl', '') - else: - # LoRA is deleted from Civitai or not found - lora_entry['isDeleted'] = True - lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png' - - loras.append(lora_entry) - - # Set base_model to the most common one from civitai_info - base_model = None - if base_model_counts: - base_model = max(base_model_counts.items(), key=lambda x: x[1])[0] - - # Extract generation parameters for recipe metadata - gen_params = { - 'prompt': metadata.get('prompt', ''), - 'negative_prompt': metadata.get('negative_prompt', ''), - 'checkpoint': checkpoint, - 'steps': metadata.get('steps', ''), - 'sampler': metadata.get('sampler', ''), - 'cfg_scale': metadata.get('cfg_scale', ''), - 'seed': metadata.get('seed', ''), - 'size': metadata.get('size', ''), - 'clip_skip': metadata.get('clip_skip', '') - } - - return web.json_response({ - 'base_model': base_model, - 'loras': loras, - 'gen_params': gen_params, - 'raw_metadata': metadata # Include the raw metadata for saving - }) + return web.json_response(result) except Exception as e: logger.error(f"Error analyzing recipe image: {e}", exc_info=True) diff --git a/py/services/recipe_scanner.py b/py/services/recipe_scanner.py index 7878c2fa..3f948b9c 100644 --- a/py/services/recipe_scanner.py +++ b/py/services/recipe_scanner.py @@ -48,7 +48,6 @@ class RecipeScanner: # config.loras_roots already sorted case-insensitively, use the first one recipes_dir = os.path.join(config.loras_roots[0], "recipes") os.makedirs(recipes_dir, exist_ok=True) - logger.info(f"Using recipes directory: {recipes_dir}") return recipes_dir @@ -60,7 +59,6 @@ class RecipeScanner: # If another initialization is already in progress, wait for it to complete if self._is_initializing and not force_refresh: - logger.info("Initialization already in progress, returning current cache state") return self._cache or RecipeCache(raw_data=[], sorted_by_name=[], sorted_by_date=[]) # Try to acquire the lock with a timeout to prevent deadlocks @@ -79,19 +77,16 @@ class RecipeScanner: # First ensure the lora scanner is initialized if self._lora_scanner: try: - logger.info("Recipe Manager: Waiting for lora scanner initialization") lora_cache = await asyncio.wait_for( self._lora_scanner.get_cached_data(), timeout=10.0 ) - logger.info(f"Recipe Manager: Lora scanner initialized with {len(lora_cache.raw_data)} loras") except asyncio.TimeoutError: logger.error("Timeout waiting for lora scanner initialization") except Exception as e: logger.error(f"Error waiting for lora scanner: {e}") # Scan for recipe data - logger.info("Recipe Manager: Starting recipe scan") raw_data = await self.scan_all_recipes() # Update cache @@ -104,7 +99,6 @@ class RecipeScanner: # Resort cache await self._cache.resort() - logger.info(f"Recipe Manager: Cache initialization completed with {len(raw_data)} recipes") return self._cache except Exception as e: @@ -139,11 +133,9 @@ class RecipeScanner: # Get all recipe JSON files in the recipes directory recipe_files = [] - logger.info(f"Scanning for recipe JSON files in {recipes_dir}") for root, _, files in os.walk(recipes_dir): recipe_count = sum(1 for f in files if f.lower().endswith('.recipe.json')) if recipe_count > 0: - logger.info(f"Found {recipe_count} recipe files in {root}") for file in files: if file.lower().endswith('.recipe.json'): recipe_files.append(os.path.join(root, file)) @@ -153,17 +145,12 @@ class RecipeScanner: recipe_data = await self._load_recipe_file(recipe_path) if recipe_data: recipes.append(recipe_data) - logger.info(f"Processed recipe: {recipe_data.get('title')}") - - logger.info(f"Successfully processed {len(recipes)} recipes") return recipes async def _load_recipe_file(self, recipe_path: str) -> Optional[Dict]: """Load recipe data from a JSON file""" try: - logger.info(f"Loading recipe file: {recipe_path}") - with open(recipe_path, 'r', encoding='utf-8') as f: recipe_data = json.load(f) @@ -188,7 +175,6 @@ class RecipeScanner: image_filename = os.path.basename(image_path) alternative_path = os.path.join(recipe_dir, image_filename) if os.path.exists(alternative_path): - logger.info(f"Found alternative image path: {alternative_path}") recipe_data['file_path'] = alternative_path else: logger.warning(f"Could not find alternative image path for {image_path}") @@ -223,30 +209,23 @@ class RecipeScanner: metadata_updated = False for lora in recipe_data['loras']: - logger.info(f"Processing LoRA: {lora.get('modelName', 'Unknown')}, ID: {lora.get('modelVersionId', 'No ID')}") - # Skip if already has complete information if 'hash' in lora and 'file_name' in lora and lora['file_name']: - logger.info(f"LoRA already has complete information") continue # If has modelVersionId but no hash, look in lora cache first, then fetch from Civitai if 'modelVersionId' in lora and not lora.get('hash'): model_version_id = lora['modelVersionId'] - logger.info(f"Looking up hash for modelVersionId: {model_version_id}") - + # Try to find in lora cache first hash_from_cache = await self._find_hash_in_lora_cache(model_version_id) if hash_from_cache: - logger.info(f"Found hash in lora cache: {hash_from_cache}") lora['hash'] = hash_from_cache metadata_updated = True else: # If not in cache, fetch from Civitai - logger.info(f"Fetching hash from Civitai for {model_version_id}") hash_from_civitai = await self._get_hash_from_civitai(model_version_id) if hash_from_civitai: - logger.info(f"Got hash from Civitai: {hash_from_civitai}") lora['hash'] = hash_from_civitai metadata_updated = True else: @@ -255,18 +234,15 @@ class RecipeScanner: # If has hash but no file_name, look up in lora library if 'hash' in lora and (not lora.get('file_name') or not lora['file_name']): hash_value = lora['hash'] - logger.info(f"Looking up file_name for hash: {hash_value}") if self._lora_scanner.has_lora_hash(hash_value): lora_path = self._lora_scanner.get_lora_path_by_hash(hash_value) if lora_path: file_name = os.path.splitext(os.path.basename(lora_path))[0] - logger.info(f"Found lora in library: {file_name}") lora['file_name'] = file_name metadata_updated = True else: # Lora not in library - logger.info(f"LoRA with hash {hash_value} not found in library") lora['file_name'] = '' metadata_updated = True @@ -300,7 +276,6 @@ class RecipeScanner: if not self._civitai_client: return None - logger.info(f"Fetching model version info from Civitai for ID: {model_version_id}") version_info = await self._civitai_client.get_model_version_info(model_version_id) if not version_info or not version_info.get('files'): @@ -324,7 +299,6 @@ class RecipeScanner: if not self._civitai_client: return None - logger.info(f"Fetching model version info from Civitai for ID: {model_version_id}") version_info = await self._civitai_client.get_model_version_info(model_version_id) if version_info and 'name' in version_info: diff --git a/py/utils/exif_utils.py b/py/utils/exif_utils.py index 04a676ac..a39f7880 100644 --- a/py/utils/exif_utils.py +++ b/py/utils/exif_utils.py @@ -58,77 +58,6 @@ class ExifUtils: except Exception as e: logger.error(f"Error updating EXIF data in {image_path}: {e}") return image_path - - @staticmethod - def parse_recipe_metadata(user_comment: str) -> Dict[str, Any]: - """Parse recipe metadata from UserComment""" - try: - # Split by 'Negative prompt:' to get the prompt - parts = user_comment.split('Negative prompt:', 1) - prompt = parts[0].strip() - - # Initialize metadata with prompt - metadata = {"prompt": prompt, "loras": [], "checkpoint": None} - - # Extract additional fields if available - if len(parts) > 1: - negative_and_params = parts[1] - - # Extract negative prompt - if "Steps:" in negative_and_params: - neg_prompt = negative_and_params.split("Steps:", 1)[0].strip() - metadata["negative_prompt"] = neg_prompt - - # Extract key-value parameters (Steps, Sampler, CFG scale, etc.) - param_pattern = r'([A-Za-z ]+): ([^,]+)' - params = re.findall(param_pattern, negative_and_params) - for key, value in params: - clean_key = key.strip().lower().replace(' ', '_') - metadata[clean_key] = value.strip() - - # Extract Civitai resources - if 'Civitai resources:' in user_comment: - resources_part = user_comment.split('Civitai resources:', 1)[1] - if '],' in resources_part: - resources_json = resources_part.split('],', 1)[0] + ']' - try: - resources = json.loads(resources_json) - # Filter loras and checkpoints - for resource in resources: - if resource.get('type') == 'lora': - # 确保 weight 字段被正确保留 - lora_entry = resource.copy() - # 如果找不到 weight,默认为 1.0 - if 'weight' not in lora_entry: - lora_entry['weight'] = 1.0 - # Ensure modelVersionName is included - if 'modelVersionName' not in lora_entry: - lora_entry['modelVersionName'] = '' - metadata['loras'].append(lora_entry) - elif resource.get('type') == 'checkpoint': - metadata['checkpoint'] = resource - except json.JSONDecodeError: - pass - - return metadata - except Exception as e: - logger.error(f"Error parsing recipe metadata: {e}") - return {"prompt": user_comment, "loras": [], "checkpoint": None} - - @staticmethod - def extract_recipe_metadata(user_comment: str) -> Optional[Dict]: - """Extract recipe metadata section from UserComment if it exists""" - try: - # Look for recipe metadata section - recipe_match = re.search(r'Recipe metadata: (\{.*\})', user_comment, re.IGNORECASE | re.DOTALL) - if not recipe_match: - return None - - recipe_json = recipe_match.group(1) - return json.loads(recipe_json) - except Exception as e: - logger.error(f"Error extracting recipe metadata: {e}") - return None @staticmethod def append_recipe_metadata(image_path, recipe_data) -> str: diff --git a/py/utils/recipe_parsers.py b/py/utils/recipe_parsers.py new file mode 100644 index 00000000..86c3cc57 --- /dev/null +++ b/py/utils/recipe_parsers.py @@ -0,0 +1,355 @@ +import json +import logging +import os +import re +from typing import Dict, List, Any, Optional +from abc import ABC, abstractmethod +from ..config import config + +logger = logging.getLogger(__name__) + +class RecipeMetadataParser(ABC): + """Interface for parsing recipe metadata from image user comments""" + + METADATA_MARKER = None + + @abstractmethod + def is_metadata_matching(self, user_comment: str) -> bool: + """Check if the user comment matches the metadata format""" + pass + + @abstractmethod + async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]: + """ + Parse metadata from user comment and return structured recipe data + + Args: + user_comment: The EXIF UserComment string from the image + recipe_scanner: Optional recipe scanner instance for local LoRA lookup + civitai_client: Optional Civitai client for fetching model information + + Returns: + Dict containing parsed recipe data with standardized format + """ + pass + + +class RecipeFormatParser(RecipeMetadataParser): + """Parser for images with dedicated recipe metadata format""" + + # Regular expression pattern for extracting recipe metadata + METADATA_MARKER = r'Recipe metadata: (\{.*\})' + + def is_metadata_matching(self, user_comment: str) -> bool: + """Check if the user comment matches the metadata format""" + return re.search(self.METADATA_MARKER, user_comment, re.IGNORECASE | re.DOTALL) is not None + + async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]: + """Parse metadata from images with dedicated recipe metadata format""" + try: + # Extract recipe metadata from user comment + try: + # Look for recipe metadata section + recipe_match = re.search(self.METADATA_MARKER, user_comment, re.IGNORECASE | re.DOTALL) + if not recipe_match: + recipe_metadata = None + else: + recipe_json = recipe_match.group(1) + recipe_metadata = json.loads(recipe_json) + except Exception as e: + logger.error(f"Error extracting recipe metadata: {e}") + recipe_metadata = None + if not recipe_metadata: + return {"error": "No recipe metadata found", "loras": []} + + logger.info("Found existing recipe metadata in image") + + # Process the recipe metadata + loras = [] + for lora in recipe_metadata.get('loras', []): + # Convert recipe lora format to frontend format + lora_entry = { + 'id': lora.get('modelVersionId', ''), + 'name': lora.get('modelName', ''), + 'version': lora.get('modelVersionName', ''), + 'type': 'lora', + 'weight': lora.get('strength', 1.0), + 'file_name': lora.get('file_name', ''), + 'hash': lora.get('hash', '') + } + + # Check if this LoRA exists locally by SHA256 hash + if lora.get('hash') and recipe_scanner: + lora_scanner = recipe_scanner._lora_scanner + exists_locally = lora_scanner.has_lora_hash(lora['hash']) + if exists_locally: + lora_cache = await lora_scanner.get_cached_data() + lora_item = next((item for item in lora_cache.raw_data if item['sha256'] == lora['hash']), None) + if lora_item: + lora_entry['existsLocally'] = True + lora_entry['localPath'] = lora_item['file_path'] + lora_entry['file_name'] = lora_item['file_name'] + lora_entry['size'] = lora_item['size'] + lora_entry['thumbnailUrl'] = config.get_preview_static_url(lora_item['preview_url']) + + else: + lora_entry['existsLocally'] = False + lora_entry['localPath'] = None + + # Try to get additional info from Civitai if we have a model version ID + if lora.get('modelVersionId') and civitai_client: + try: + civitai_info = await civitai_client.get_model_version_info(lora['modelVersionId']) + if civitai_info and civitai_info.get("error") != "Model not found": + # Get thumbnail URL from first image + if 'images' in civitai_info and civitai_info['images']: + lora_entry['thumbnailUrl'] = civitai_info['images'][0].get('url', '') + + # Get base model + lora_entry['baseModel'] = civitai_info.get('baseModel', '') + + # Get download URL + lora_entry['downloadUrl'] = civitai_info.get('downloadUrl', '') + + # Get size from files if available + if 'files' in civitai_info: + model_file = next((file for file in civitai_info.get('files', []) + if file.get('type') == 'Model'), None) + if model_file: + lora_entry['size'] = model_file.get('sizeKB', 0) * 1024 + else: + lora_entry['isDeleted'] = True + lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png' + except Exception as e: + logger.error(f"Error fetching Civitai info for LoRA: {e}") + lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png' + + loras.append(lora_entry) + + logger.info(f"Found {len(loras)} loras in recipe metadata") + + return { + 'base_model': recipe_metadata.get('base_model', ''), + 'loras': loras, + 'gen_params': recipe_metadata.get('gen_params', {}), + 'tags': recipe_metadata.get('tags', []), + 'title': recipe_metadata.get('title', ''), + 'from_recipe_metadata': True + } + + except Exception as e: + logger.error(f"Error parsing recipe format metadata: {e}", exc_info=True) + return {"error": str(e), "loras": []} + + +class StandardMetadataParser(RecipeMetadataParser): + """Parser for images with standard civitai metadata format (prompt, negative prompt, etc.)""" + + METADATA_MARKER = r'Civitai resources: ' + + def is_metadata_matching(self, user_comment: str) -> bool: + """Check if the user comment matches the metadata format""" + return re.search(self.METADATA_MARKER, user_comment, re.IGNORECASE | re.DOTALL) is not None + + async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]: + """Parse metadata from images with standard metadata format""" + try: + # Parse the standard metadata + metadata = self._parse_recipe_metadata(user_comment) + + # Look for Civitai resources in the metadata + civitai_resources = metadata.get('loras', []) + checkpoint = metadata.get('checkpoint') + + if not civitai_resources and not checkpoint: + return { + "error": "No LoRA information found in this image", + "loras": [] + } + + # Process LoRAs and collect base models + base_model_counts = {} + loras = [] + + # Process LoRAs + for resource in civitai_resources: + # Get model version ID + model_version_id = resource.get('modelVersionId') + if not model_version_id: + continue + + # Initialize lora entry with default values + lora_entry = { + 'id': model_version_id, + 'name': resource.get('modelName', ''), + 'version': resource.get('modelVersionName', ''), + 'type': resource.get('type', 'lora'), + 'weight': resource.get('weight', 1.0), + 'existsLocally': False, + 'localPath': None, + 'file_name': '', + 'hash': '', + 'thumbnailUrl': '', + 'baseModel': '', + 'size': 0, + 'downloadUrl': '', + 'isDeleted': False + } + + # Get additional info from Civitai if client is available + if civitai_client: + civitai_info = await civitai_client.get_model_version_info(model_version_id) + + # Check if this LoRA exists locally by SHA256 hash + if civitai_info and civitai_info.get("error") != "Model not found": + # LoRA exists on Civitai, process its information + if 'files' in civitai_info: + # Find the model file (type="Model") in the files list + model_file = next((file for file in civitai_info.get('files', []) + if file.get('type') == 'Model'), None) + + if model_file and recipe_scanner: + sha256 = model_file.get('hashes', {}).get('SHA256', '') + if sha256: + lora_scanner = recipe_scanner._lora_scanner + exists_locally = lora_scanner.has_lora_hash(sha256) + if exists_locally: + local_path = lora_scanner.get_lora_path_by_hash(sha256) + lora_entry['existsLocally'] = True + lora_entry['localPath'] = local_path + lora_entry['file_name'] = os.path.splitext(os.path.basename(local_path))[0] + else: + # For missing LoRAs, get file_name from model_file.name + file_name = model_file.get('name', '') + lora_entry['file_name'] = os.path.splitext(file_name)[0] if file_name else '' + + lora_entry['hash'] = sha256 + lora_entry['size'] = model_file.get('sizeKB', 0) * 1024 + + # Get thumbnail URL from first image + if 'images' in civitai_info and civitai_info['images']: + lora_entry['thumbnailUrl'] = civitai_info['images'][0].get('url', '') + + # Get base model and update counts + current_base_model = civitai_info.get('baseModel', '') + lora_entry['baseModel'] = current_base_model + if current_base_model: + base_model_counts[current_base_model] = base_model_counts.get(current_base_model, 0) + 1 + + # Get download URL + lora_entry['downloadUrl'] = civitai_info.get('downloadUrl', '') + else: + # LoRA is deleted from Civitai or not found + lora_entry['isDeleted'] = True + lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png' + + loras.append(lora_entry) + + # Set base_model to the most common one from civitai_info + base_model = None + if base_model_counts: + base_model = max(base_model_counts.items(), key=lambda x: x[1])[0] + + # Extract generation parameters for recipe metadata + gen_params = { + 'prompt': metadata.get('prompt', ''), + 'negative_prompt': metadata.get('negative_prompt', ''), + 'checkpoint': checkpoint, + 'steps': metadata.get('steps', ''), + 'sampler': metadata.get('sampler', ''), + 'cfg_scale': metadata.get('cfg_scale', ''), + 'seed': metadata.get('seed', ''), + 'size': metadata.get('size', ''), + 'clip_skip': metadata.get('clip_skip', '') + } + + return { + 'base_model': base_model, + 'loras': loras, + 'gen_params': gen_params, + 'raw_metadata': metadata + } + + except Exception as e: + logger.error(f"Error parsing standard metadata: {e}", exc_info=True) + return {"error": str(e), "loras": []} + + def _parse_recipe_metadata(self, user_comment: str) -> Dict[str, Any]: + """Parse recipe metadata from UserComment""" + try: + # Split by 'Negative prompt:' to get the prompt + parts = user_comment.split('Negative prompt:', 1) + prompt = parts[0].strip() + + # Initialize metadata with prompt + metadata = {"prompt": prompt, "loras": [], "checkpoint": None} + + # Extract additional fields if available + if len(parts) > 1: + negative_and_params = parts[1] + + # Extract negative prompt + if "Steps:" in negative_and_params: + neg_prompt = negative_and_params.split("Steps:", 1)[0].strip() + metadata["negative_prompt"] = neg_prompt + + # Extract key-value parameters (Steps, Sampler, CFG scale, etc.) + param_pattern = r'([A-Za-z ]+): ([^,]+)' + params = re.findall(param_pattern, negative_and_params) + for key, value in params: + clean_key = key.strip().lower().replace(' ', '_') + metadata[clean_key] = value.strip() + + # Extract Civitai resources + if 'Civitai resources:' in user_comment: + resources_part = user_comment.split('Civitai resources:', 1)[1] + if '],' in resources_part: + resources_json = resources_part.split('],', 1)[0] + ']' + try: + resources = json.loads(resources_json) + # Filter loras and checkpoints + for resource in resources: + if resource.get('type') == 'lora': + # 确保 weight 字段被正确保留 + lora_entry = resource.copy() + # 如果找不到 weight,默认为 1.0 + if 'weight' not in lora_entry: + lora_entry['weight'] = 1.0 + # Ensure modelVersionName is included + if 'modelVersionName' not in lora_entry: + lora_entry['modelVersionName'] = '' + metadata['loras'].append(lora_entry) + elif resource.get('type') == 'checkpoint': + metadata['checkpoint'] = resource + except json.JSONDecodeError: + pass + + return metadata + except Exception as e: + logger.error(f"Error parsing recipe metadata: {e}") + return {"prompt": user_comment, "loras": [], "checkpoint": None} + + +class RecipeParserFactory: + """Factory for creating recipe metadata parsers""" + + @staticmethod + def create_parser(user_comment: str) -> RecipeMetadataParser: + """ + Create appropriate parser based on the user comment content + + Args: + user_comment: The EXIF UserComment string from the image + + Returns: + Appropriate RecipeMetadataParser implementation + """ + if RecipeFormatParser().is_metadata_matching(user_comment): + print("RecipeFormatParser") + return RecipeFormatParser() + elif StandardMetadataParser().is_metadata_matching(user_comment): + print("StandardMetadataParser") + return StandardMetadataParser() + else: + print("None") + return None \ No newline at end of file