diff --git a/py/utils/recipe_parsers.py b/py/utils/recipe_parsers.py index e88bb189..81610a95 100644 --- a/py/utils/recipe_parsers.py +++ b/py/utils/recipe_parsers.py @@ -20,6 +20,9 @@ GEN_PARAM_KEYS = [ 'clip_skip', ] +# Valid Lora types +VALID_LORA_TYPES = ['lora', 'locon'] + class RecipeMetadataParser(ABC): """Interface for parsing recipe metadata from image user comments""" @@ -46,7 +49,7 @@ class RecipeMetadataParser(ABC): pass async def populate_lora_from_civitai(self, lora_entry: Dict[str, Any], civitai_info_tuple: Tuple[Dict[str, Any], Optional[str]], - recipe_scanner=None, base_model_counts=None, hash_value=None) -> Dict[str, Any]: + recipe_scanner=None, base_model_counts=None, hash_value=None) -> Optional[Dict[str, Any]]: """ Populate a lora entry with information from Civitai API response @@ -58,85 +61,93 @@ class RecipeMetadataParser(ABC): hash_value: Optional hash value to use if not available in civitai_info Returns: - The populated lora_entry dict + The populated lora_entry dict if type is valid, None otherwise """ try: # Unpack the tuple to get the actual data civitai_info, error_msg = civitai_info_tuple if isinstance(civitai_info_tuple, tuple) else (civitai_info_tuple, None) - if civitai_info and civitai_info.get("error") != "Model not found": - # Check if this is an early access lora - if civitai_info.get('earlyAccessEndsAt'): - # Convert earlyAccessEndsAt to a human-readable date - early_access_date = civitai_info.get('earlyAccessEndsAt', '') - lora_entry['isEarlyAccess'] = True - lora_entry['earlyAccessEndsAt'] = early_access_date - - # Update model name if available - if 'model' in civitai_info and 'name' in civitai_info['model']: - lora_entry['name'] = civitai_info['model']['name'] - - # Update version if available - if 'name' in civitai_info: - lora_entry['version'] = civitai_info.get('name', '') - - # Get thumbnail URL from first image - if 'images' in civitai_info and civitai_info['images']: - lora_entry['thumbnailUrl'] = civitai_info['images'][0].get('url', '') - - # Get base model - current_base_model = civitai_info.get('baseModel', '') - lora_entry['baseModel'] = current_base_model - - # Update base model counts if tracking them - if base_model_counts is not None and current_base_model: - base_model_counts[current_base_model] = base_model_counts.get(current_base_model, 0) + 1 - - # Get download URL - lora_entry['downloadUrl'] = civitai_info.get('downloadUrl', '') - - # Process file information if available - if 'files' in civitai_info: - # Find the primary model file (type="Model" and primary=true) in the files list - model_file = next((file for file in civitai_info.get('files', []) - if file.get('type') == 'Model' and file.get('primary') == True), None) - - if model_file: - # Get size - lora_entry['size'] = model_file.get('sizeKB', 0) * 1024 - - # Get SHA256 hash - sha256 = model_file.get('hashes', {}).get('SHA256', hash_value) - if sha256: - lora_entry['hash'] = sha256.lower() - - # Check if exists locally - if recipe_scanner and lora_entry['hash']: - lora_scanner = recipe_scanner._lora_scanner - exists_locally = lora_scanner.has_lora_hash(lora_entry['hash']) - if exists_locally: - try: - local_path = lora_scanner.get_lora_path_by_hash(lora_entry['hash']) - lora_entry['existsLocally'] = True - lora_entry['localPath'] = local_path - lora_entry['file_name'] = os.path.splitext(os.path.basename(local_path))[0] - - # Get thumbnail from local preview if available - lora_cache = await lora_scanner.get_cached_data() - lora_item = next((item for item in lora_cache.raw_data - if item['sha256'].lower() == lora_entry['hash'].lower()), None) - if lora_item and 'preview_url' in lora_item: - lora_entry['thumbnailUrl'] = config.get_preview_static_url(lora_item['preview_url']) - except Exception as e: - logger.error(f"Error getting local lora path: {e}") - else: - # For missing LoRAs, get file_name from model_file.name - file_name = model_file.get('name', '') - lora_entry['file_name'] = os.path.splitext(file_name)[0] if file_name else '' - else: + if not civitai_info or civitai_info.get("error") == "Model not found": # Model not found or deleted lora_entry['isDeleted'] = True lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png' + return lora_entry + + # Get model type and validate + model_type = civitai_info.get('model', {}).get('type', '').lower() + lora_entry['type'] = model_type + if model_type not in VALID_LORA_TYPES: + logger.debug(f"Skipping non-LoRA model type: {model_type}") + return None + + # Check if this is an early access lora + if civitai_info.get('earlyAccessEndsAt'): + # Convert earlyAccessEndsAt to a human-readable date + early_access_date = civitai_info.get('earlyAccessEndsAt', '') + lora_entry['isEarlyAccess'] = True + lora_entry['earlyAccessEndsAt'] = early_access_date + + # Update model name if available + if 'model' in civitai_info and 'name' in civitai_info['model']: + lora_entry['name'] = civitai_info['model']['name'] + + # Update version if available + if 'name' in civitai_info: + lora_entry['version'] = civitai_info.get('name', '') + + # Get thumbnail URL from first image + if 'images' in civitai_info and civitai_info['images']: + lora_entry['thumbnailUrl'] = civitai_info['images'][0].get('url', '') + + # Get base model + current_base_model = civitai_info.get('baseModel', '') + lora_entry['baseModel'] = current_base_model + + # Update base model counts if tracking them + if base_model_counts is not None and current_base_model: + base_model_counts[current_base_model] = base_model_counts.get(current_base_model, 0) + 1 + + # Get download URL + lora_entry['downloadUrl'] = civitai_info.get('downloadUrl', '') + + # Process file information if available + if 'files' in civitai_info: + # Find the primary model file (type="Model" and primary=true) in the files list + model_file = next((file for file in civitai_info.get('files', []) + if file.get('type') == 'Model' and file.get('primary') == True), None) + + if model_file: + # Get size + lora_entry['size'] = model_file.get('sizeKB', 0) * 1024 + + # Get SHA256 hash + sha256 = model_file.get('hashes', {}).get('SHA256', hash_value) + if sha256: + lora_entry['hash'] = sha256.lower() + + # Check if exists locally + if recipe_scanner and lora_entry['hash']: + lora_scanner = recipe_scanner._lora_scanner + exists_locally = lora_scanner.has_lora_hash(lora_entry['hash']) + if exists_locally: + try: + local_path = lora_scanner.get_lora_path_by_hash(lora_entry['hash']) + lora_entry['existsLocally'] = True + lora_entry['localPath'] = local_path + lora_entry['file_name'] = os.path.splitext(os.path.basename(local_path))[0] + + # Get thumbnail from local preview if available + lora_cache = await lora_scanner.get_cached_data() + lora_item = next((item for item in lora_cache.raw_data + if item['sha256'].lower() == lora_entry['hash'].lower()), None) + if lora_item and 'preview_url' in lora_item: + lora_entry['thumbnailUrl'] = config.get_preview_static_url(lora_item['preview_url']) + except Exception as e: + logger.error(f"Error getting local lora path: {e}") + else: + # For missing LoRAs, get file_name from model_file.name + file_name = model_file.get('name', '') + lora_entry['file_name'] = os.path.splitext(file_name)[0] if file_name else '' except Exception as e: logger.error(f"Error populating lora from Civitai info: {e}") @@ -247,13 +258,16 @@ class RecipeFormatParser(RecipeMetadataParser): try: civitai_info_tuple = await civitai_client.get_model_version_info(lora['modelVersionId']) # Populate lora entry with Civitai info - lora_entry = await self.populate_lora_from_civitai( + populated_entry = await self.populate_lora_from_civitai( lora_entry, civitai_info_tuple, recipe_scanner, None, # No need to track base model counts lora['hash'] ) + if populated_entry is None: + continue # Skip invalid LoRA types + lora_entry = populated_entry except Exception as e: logger.error(f"Error fetching Civitai info for LoRA: {e}") lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png' @@ -282,361 +296,6 @@ class RecipeFormatParser(RecipeMetadataParser): logger.error(f"Error parsing recipe format metadata: {e}", exc_info=True) return {"error": str(e), "loras": []} - -class StandardMetadataParser(RecipeMetadataParser): - """Parser for images with standard civitai metadata format (prompt, negative prompt, etc.)""" - - METADATA_MARKER = r'Civitai resources: ' - - def is_metadata_matching(self, user_comment: str) -> bool: - """Check if the user comment matches the metadata format""" - return re.search(self.METADATA_MARKER, user_comment, re.IGNORECASE | re.DOTALL) is not None - - async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]: - """Parse metadata from images with standard metadata format""" - try: - # Parse the standard metadata - metadata = self._parse_recipe_metadata(user_comment) - - # Look for Civitai resources in the metadata - civitai_resources = metadata.get('loras', []) - checkpoint = metadata.get('checkpoint') - - if not civitai_resources and not checkpoint: - return { - "error": "No LoRA information found in this image", - "loras": [] - } - - # Process LoRAs and collect base models - base_model_counts = {} - loras = [] - - # Process LoRAs - for resource in civitai_resources: - # Get model version ID - model_version_id = resource.get('modelVersionId') - if not model_version_id: - continue - - # Initialize lora entry with default values - lora_entry = { - 'id': model_version_id, - 'name': resource.get('modelName', ''), - 'version': resource.get('modelVersionName', ''), - 'type': resource.get('type', 'lora'), - 'weight': resource.get('weight', 1.0), - 'existsLocally': False, - 'localPath': None, - 'file_name': '', - 'hash': '', - 'thumbnailUrl': '', - 'baseModel': '', - 'size': 0, - 'downloadUrl': '', - 'isDeleted': False - } - - # Get additional info from Civitai if client is available - if civitai_client: - try: - civitai_info_tuple = await civitai_client.get_model_version_info(model_version_id) - # Populate lora entry with Civitai info - lora_entry = await self.populate_lora_from_civitai( - lora_entry, - civitai_info_tuple, - recipe_scanner, - base_model_counts - ) - except Exception as e: - logger.error(f"Error fetching Civitai info for LoRA: {e}") - - loras.append(lora_entry) - - # Set base_model to the most common one from civitai_info - base_model = None - if base_model_counts: - base_model = max(base_model_counts.items(), key=lambda x: x[1])[0] - - # Extract generation parameters for recipe metadata - gen_params = {} - for key in GEN_PARAM_KEYS: - if key in metadata: - gen_params[key] = metadata.get(key, '') - - return { - 'base_model': base_model, - 'loras': loras, - 'gen_params': gen_params, - 'raw_metadata': metadata - } - - except Exception as e: - logger.error(f"Error parsing standard metadata: {e}", exc_info=True) - return {"error": str(e), "loras": []} - - def _parse_recipe_metadata(self, user_comment: str) -> Dict[str, Any]: - """Parse recipe metadata from UserComment""" - try: - # Split by 'Negative prompt:' to get the prompt - parts = user_comment.split('Negative prompt:', 1) - prompt = parts[0].strip() - - # Initialize metadata with prompt - metadata = {"prompt": prompt, "loras": [], "checkpoint": None} - - # Extract additional fields if available - if len(parts) > 1: - negative_and_params = parts[1] - - # Extract negative prompt - if "Steps:" in negative_and_params: - neg_prompt = negative_and_params.split("Steps:", 1)[0].strip() - metadata["negative_prompt"] = neg_prompt - - # Extract key-value parameters (Steps, Sampler, CFG scale, etc.) - param_pattern = r'([A-Za-z ]+): ([^,]+)' - params = re.findall(param_pattern, negative_and_params) - for key, value in params: - clean_key = key.strip().lower().replace(' ', '_') - metadata[clean_key] = value.strip() - - # Extract Civitai resources - if 'Civitai resources:' in user_comment: - resources_part = user_comment.split('Civitai resources:', 1)[1].strip() - - # Look for the opening and closing brackets to extract the JSON array - if resources_part.startswith('['): - # Find the position of the closing bracket - bracket_count = 0 - end_pos = -1 - - for i, char in enumerate(resources_part): - if char == '[': - bracket_count += 1 - elif char == ']': - bracket_count -= 1 - if bracket_count == 0: - end_pos = i - break - - if end_pos != -1: - resources_json = resources_part[:end_pos+1] - try: - resources = json.loads(resources_json) - # Filter loras and checkpoints - for resource in resources: - # Process both 'lora' and 'lycoris' types as loras - if resource.get('type') == 'lora' or resource.get('type') == 'lycoris': - # Ensure weight field is properly preserved - lora_entry = resource.copy() - # Default to 1.0 if weight not found - if 'weight' not in lora_entry: - lora_entry['weight'] = 1.0 - # Ensure modelVersionName is included - if 'modelVersionName' not in lora_entry: - lora_entry['modelVersionName'] = '' - metadata['loras'].append(lora_entry) - elif resource.get('type') == 'checkpoint': - metadata['checkpoint'] = resource - except json.JSONDecodeError: - pass - - return metadata - except Exception as e: - logger.error(f"Error parsing recipe metadata: {e}") - return {"prompt": user_comment, "loras": [], "checkpoint": None} - - -class A1111MetadataParser(RecipeMetadataParser): - """Parser for images with A1111 metadata format (Lora hashes)""" - - METADATA_MARKER = r'Lora hashes:' - LORA_PATTERN = r']+)>' - LORA_HASH_PATTERN = r'([^:]+):\s*([a-fA-F0-9]+)' - - def is_metadata_matching(self, user_comment: str) -> bool: - """Check if the user comment matches the A1111 metadata format""" - return 'Lora hashes:' in user_comment - - async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]: - """Parse metadata from images with A1111 metadata format""" - try: - # Initialize metadata with default empty values - metadata = {"prompt": "", "loras": []} - - # Check if the user_comment contains prompt and negative prompt - if 'Negative prompt:' in user_comment: - # Extract prompt and negative prompt - parts = user_comment.split('Negative prompt:', 1) - metadata["prompt"] = parts[0].strip() - - # Extract negative prompt and parameters - if len(parts) > 1: - negative_and_params = parts[1] - - # Extract negative prompt - param_start = re.search(r'([A-Za-z ]+):', negative_and_params) - if param_start: - neg_prompt = negative_and_params[:param_start.start()].strip() - metadata["negative_prompt"] = neg_prompt - params_section = negative_and_params[param_start.start():] - else: - params_section = negative_and_params - - # Extract parameters from this section - self._extract_parameters(params_section, metadata) - else: - # No prompt/negative prompt - extract parameters directly - self._extract_parameters(user_comment, metadata) - - # Extract LoRA information from prompt if available - lora_weights = {} - if metadata["prompt"]: - lora_matches = re.findall(self.LORA_PATTERN, metadata["prompt"]) - for lora_name, weights in lora_matches: - # Take only the first strength value (before the colon) - weight = weights.split(':')[0] - lora_weights[lora_name.strip()] = float(weight.strip()) - - # Remove LoRA patterns from prompt - metadata["prompt"] = re.sub(self.LORA_PATTERN, '', metadata["prompt"]).strip() - - # Extract LoRA hashes - lora_hashes = {} - if 'Lora hashes:' in user_comment: - # Get the LoRA hashes section - lora_hash_section = user_comment.split('Lora hashes:', 1)[1].strip() - - # Handle various format possibilities - if lora_hash_section.startswith('"'): - # Extract content within quotes - quote_match = re.match(r'"([^"]+)"', lora_hash_section) - if quote_match: - lora_hash_section = quote_match.group(1) - - # Split by commas and parse each LoRA entry - lora_entries = [] - current_entry = "" - for part in lora_hash_section.split(','): - # Check if this part contains a colon (indicating a complete entry) - if ':' in part: - if current_entry: - lora_entries.append(current_entry.strip()) - current_entry = part.strip() - else: - # This is probably a continuation of the previous entry - current_entry += ',' + part - - # Add the last entry if it exists - if current_entry: - lora_entries.append(current_entry.strip()) - - # Process each entry - for entry in lora_entries: - # Split at the colon to get name and hash - if ':' in entry: - lora_name, hash_value = entry.split(':', 1) - # Clean the values - lora_name = lora_name.strip() - hash_value = hash_value.strip() - # Store in our dictionary - lora_hashes[lora_name] = hash_value - - # Alternative backup method using regex if the above parsing fails - if not lora_hashes: - if 'Lora hashes:' in user_comment: - lora_hash_section = user_comment.split('Lora hashes:', 1)[1].strip() - if lora_hash_section.startswith('"'): - # Extract content within quotes if present - quote_match = re.match(r'"([^"]+)"', lora_hash_section) - if quote_match: - lora_hash_section = quote_match.group(1) - - # Use regex to find all name:hash pairs - hash_matches = re.findall(self.LORA_HASH_PATTERN, lora_hash_section) - for lora_name, hash_value in hash_matches: - # Clean up name by removing any leading comma and spaces - clean_name = lora_name.strip().lstrip(',').strip() - lora_hashes[clean_name] = hash_value.strip() - - # Process LoRAs and collect base models - base_model_counts = {} - loras = [] - - # Process each LoRA with hash and weight - for lora_name, hash_value in lora_hashes.items(): - weight = lora_weights.get(lora_name, 1.0) - - # Initialize lora entry with default values - lora_entry = { - 'name': lora_name, - 'type': 'lora', - 'weight': weight, - 'existsLocally': False, - 'localPath': None, - 'file_name': lora_name, - 'hash': hash_value.lower(), # Ensure hash is lowercase - 'thumbnailUrl': '/loras_static/images/no-preview.png', - 'baseModel': '', - 'size': 0, - 'downloadUrl': '', - 'isDeleted': False - } - - # Get info from Civitai by hash if available - if civitai_client and hash_value: - try: - civitai_info = await civitai_client.get_model_by_hash(hash_value) - # Populate lora entry with Civitai info - lora_entry = await self.populate_lora_from_civitai( - lora_entry, - civitai_info, - recipe_scanner, - base_model_counts, - hash_value - ) - except Exception as e: - logger.error(f"Error fetching Civitai info for LoRA hash {hash_value}: {e}") - - loras.append(lora_entry) - - # Set base_model to the most common one from civitai_info - base_model = None - if base_model_counts: - base_model = max(base_model_counts.items(), key=lambda x: x[1])[0] - - # Extract generation parameters for recipe metadata - gen_params = {} - for key in GEN_PARAM_KEYS: - if key in metadata: - gen_params[key] = metadata.get(key, '') - - # Add model information if available - if 'model' in metadata: - gen_params['checkpoint'] = metadata['model'] - - return { - 'base_model': base_model, - 'loras': loras, - 'gen_params': gen_params, - 'raw_metadata': metadata - } - - except Exception as e: - logger.error(f"Error parsing A1111 metadata: {e}", exc_info=True) - return {"error": str(e), "loras": []} - - def _extract_parameters(self, text: str, metadata: Dict[str, Any]) -> None: - """Extract parameters from text section and populate metadata dict""" - # Extract key-value parameters (Steps, Sampler, CFG scale, etc.) - param_pattern = r'([A-Za-z][A-Za-z0-9 _]+): ([^,]+)(?:,|$)' - params = re.findall(param_pattern, text) - for key, value in params: - clean_key = key.strip().lower().replace(' ', '_') - metadata[clean_key] = value.strip() - - class ComfyMetadataParser(RecipeMetadataParser): """Parser for Civitai ComfyUI metadata JSON format""" @@ -706,11 +365,14 @@ class ComfyMetadataParser(RecipeMetadataParser): try: civitai_info_tuple = await civitai_client.get_model_version_info(model_version_id) # Populate lora entry with Civitai info - lora_entry = await self.populate_lora_from_civitai( + populated_entry = await self.populate_lora_from_civitai( lora_entry, civitai_info_tuple, recipe_scanner ) + if populated_entry is None: + continue # Skip invalid LoRA types + lora_entry = populated_entry except Exception as e: logger.error(f"Error fetching Civitai info for LoRA: {e}") @@ -959,13 +621,16 @@ class MetaFormatParser(RecipeMetadataParser): try: civitai_info = await civitai_client.get_model_by_hash(hash_value) # Populate lora entry with Civitai info - lora_entry = await self.populate_lora_from_civitai( + populated_entry = await self.populate_lora_from_civitai( lora_entry, civitai_info, recipe_scanner, base_model_counts, hash_value ) + if populated_entry is None: + continue # Skip invalid LoRA types + lora_entry = populated_entry except Exception as e: logger.error(f"Error fetching Civitai info for LoRA hash {hash_value}: {e}") @@ -1003,134 +668,284 @@ class MetaFormatParser(RecipeMetadataParser): logger.error(f"Error parsing meta format metadata: {e}", exc_info=True) return {"error": str(e), "loras": []} - -class ImageSaverMetadataParser(RecipeMetadataParser): - """Parser for ComfyUI Image Saver plugin metadata format""" +class AutomaticMetadataParser(RecipeMetadataParser): + """Parser for Automatic1111 metadata format""" - METADATA_MARKER = r'Hashes: \{.*\}' - LORA_PATTERN = r']+)>' - HASH_PATTERN = r'Hashes: (\{.*?\})' + METADATA_MARKER = r"Steps: \d+" + + # Regular expressions for extracting specific metadata + HASHES_REGEX = r', Hashes:\s*({[^}]+})' + CIVITAI_RESOURCES_REGEX = r', Civitai resources:\s*(\[\{.*?\}\])' + CIVITAI_METADATA_REGEX = r', Civitai metadata:\s*(\{.*?\})' + EXTRANETS_REGEX = r'<(lora|hypernet):([a-zA-Z0-9_\.\-]+):([0-9.]+)>' + MODEL_HASH_PATTERN = r'Model hash: ([a-zA-Z0-9]+)' + VAE_HASH_PATTERN = r'VAE hash: ([a-zA-Z0-9]+)' def is_metadata_matching(self, user_comment: str) -> bool: - """Check if the user comment matches the Image Saver metadata format""" - return "Hashes:" in user_comment and re.search(self.HASH_PATTERN, user_comment) is not None + """Check if the user comment matches the Automatic1111 format""" + # Match if it has Steps pattern and either has "Negative prompt:" or "Civitai resources:" + return (re.search(self.METADATA_MARKER, user_comment) is not None and + ("Negative prompt:" in user_comment or re.search(self.CIVITAI_RESOURCES_REGEX, user_comment) is not None)) async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]: - """Parse metadata from Image Saver plugin format""" + """Parse metadata from Automatic1111 format""" try: - # Extract prompt and negative prompt - parts = user_comment.split('Negative prompt:', 1) - prompt = parts[0].strip() + # Split on Negative prompt if it exists + if "Negative prompt:" in user_comment: + parts = user_comment.split('Negative prompt:', 1) + prompt = parts[0].strip() + negative_and_params = parts[1] if len(parts) > 1 else "" + else: + # No negative prompt section + param_start = re.search(self.METADATA_MARKER, user_comment) + if param_start: + prompt = user_comment[:param_start.start()].strip() + negative_and_params = user_comment[param_start.start():] + else: + prompt = user_comment.strip() + negative_and_params = "" # Initialize metadata - metadata = {"prompt": prompt, "loras": []} - - # Extract negative prompt and parameters - if len(parts) > 1: - negative_and_params = parts[1] - - # Extract negative prompt - if "Steps:" in negative_and_params: - neg_prompt = negative_and_params.split("Steps:", 1)[0].strip() - metadata["negative_prompt"] = neg_prompt - - # Extract key-value parameters (Steps, Sampler, CFG scale, etc.) - param_pattern = r'([A-Za-z ]+): ([^,]+)' - params = re.findall(param_pattern, negative_and_params) - for key, value in params: - clean_key = key.strip().lower().replace(' ', '_') - metadata[clean_key] = value.strip() - - # Extract LoRA information from prompt - lora_weights = {} - lora_matches = re.findall(self.LORA_PATTERN, prompt) - for lora_name, weight in lora_matches: - lora_weights[lora_name.strip()] = float(weight.split(':')[0].strip()) - - # Remove LoRA patterns from prompt - metadata["prompt"] = re.sub(self.LORA_PATTERN, '', prompt).strip() - - # Extract LoRA hashes from Hashes section - lora_hashes = {} - hash_match = re.search(self.HASH_PATTERN, user_comment) - if hash_match: - try: - hashes = json.loads(hash_match.group(1)) - for key, hash_value in hashes.items(): - if key.startswith('LORA:'): - lora_name = key[5:] # Remove 'LORA:' prefix - lora_hashes[lora_name] = hash_value.strip() - except json.JSONDecodeError: - pass - - # Process LoRAs and collect base models - base_model_counts = {} - loras = [] - - # Process each LoRA with hash and weight - for lora_name, hash_value in lora_hashes.items(): - weight = lora_weights.get(lora_name, 1.0) - - # Initialize lora entry with default values - lora_entry = { - 'name': lora_name, - 'type': 'lora', - 'weight': weight, - 'existsLocally': False, - 'localPath': None, - 'file_name': lora_name, - 'hash': hash_value, - 'thumbnailUrl': '/loras_static/images/no-preview.png', - 'baseModel': '', - 'size': 0, - 'downloadUrl': '', - 'isDeleted': False - } - - # Get info from Civitai by hash if available - if civitai_client and hash_value: - try: - civitai_info = await civitai_client.get_model_by_hash(hash_value) - # Populate lora entry with Civitai info - lora_entry = await self.populate_lora_from_civitai( - lora_entry, - civitai_info, - recipe_scanner, - base_model_counts, - hash_value - ) - except Exception as e: - logger.error(f"Error fetching Civitai info for LoRA hash {hash_value}: {e}") - - loras.append(lora_entry) - - # Set base_model to the most common one from civitai_info - base_model = None - if base_model_counts: - base_model = max(base_model_counts.items(), key=lambda x: x[1])[0] - - # Extract generation parameters for recipe metadata - gen_params = {} - for key in GEN_PARAM_KEYS: - if key in metadata: - gen_params[key] = metadata.get(key, '') - - # Add model information if available - if 'model' in metadata: - gen_params['checkpoint'] = metadata['model'] - - return { - 'base_model': base_model, - 'loras': loras, - 'gen_params': gen_params, - 'raw_metadata': metadata + metadata = { + "prompt": prompt, + "loras": [] } + # Extract negative prompt and parameters + if negative_and_params: + # If we split on "Negative prompt:", check for params section + if "Negative prompt:" in user_comment: + param_start = re.search(r'Steps: ', negative_and_params) + if param_start: + neg_prompt = negative_and_params[:param_start.start()].strip() + metadata["negative_prompt"] = neg_prompt + params_section = negative_and_params[param_start.start():] + else: + metadata["negative_prompt"] = negative_and_params.strip() + params_section = "" + else: + # No negative prompt, entire section is params + params_section = negative_and_params + + # Extract generation parameters + if params_section: + # Extract Civitai resources + civitai_resources_match = re.search(self.CIVITAI_RESOURCES_REGEX, params_section) + if civitai_resources_match: + try: + civitai_resources = json.loads(civitai_resources_match.group(1)) + metadata["civitai_resources"] = civitai_resources + params_section = params_section.replace(civitai_resources_match.group(0), '') + except json.JSONDecodeError: + logger.error("Error parsing Civitai resources JSON") + + # Extract Hashes + hashes_match = re.search(self.HASHES_REGEX, params_section) + if hashes_match: + try: + hashes = json.loads(hashes_match.group(1)) + metadata["hashes"] = hashes + # Remove hashes from params section to not interfere with other parsing + params_section = params_section.replace(hashes_match.group(0), '') + except json.JSONDecodeError: + logger.error("Error parsing hashes JSON") + + # Extract basic parameters + param_pattern = r'([A-Za-z\s]+): ([^,]+)' + params = re.findall(param_pattern, params_section) + gen_params = {} + + for key, value in params: + clean_key = key.strip().lower().replace(' ', '_') + + # Skip if not in recognized gen param keys + if clean_key not in GEN_PARAM_KEYS: + continue + + # Convert numeric values + if clean_key in ['steps', 'seed']: + try: + gen_params[clean_key] = int(value.strip()) + except ValueError: + gen_params[clean_key] = value.strip() + elif clean_key in ['cfg_scale']: + try: + gen_params[clean_key] = float(value.strip()) + except ValueError: + gen_params[clean_key] = value.strip() + else: + gen_params[clean_key] = value.strip() + + # Extract size if available and add to gen_params if a recognized key + size_match = re.search(r'Size: (\d+)x(\d+)', params_section) + if size_match and 'size' in GEN_PARAM_KEYS: + width, height = size_match.groups() + gen_params['size'] = f"{width}x{height}" + + # Add prompt and negative_prompt to gen_params if they're in GEN_PARAM_KEYS + if 'prompt' in GEN_PARAM_KEYS and 'prompt' in metadata: + gen_params['prompt'] = metadata['prompt'] + if 'negative_prompt' in GEN_PARAM_KEYS and 'negative_prompt' in metadata: + gen_params['negative_prompt'] = metadata['negative_prompt'] + + metadata["gen_params"] = gen_params + + # Extract LoRA information + loras = [] + base_model_counts = {} + + # First use Civitai resources if available (more reliable source) + if metadata.get("civitai_resources"): + for resource in metadata.get("civitai_resources", []): + if resource.get("type") in ["lora", "hypernet"] and resource.get("modelVersionId"): + # Initialize lora entry + lora_entry = { + 'id': str(resource.get("modelVersionId")), + 'modelId': str(resource.get("modelId")) if resource.get("modelId") else None, + 'name': resource.get("modelName", "Unknown LoRA"), + 'version': resource.get("modelVersionName", ""), + 'type': resource.get("type", "lora"), + 'weight': float(resource.get("weight", 1.0)), + 'existsLocally': False, + 'thumbnailUrl': '/loras_static/images/no-preview.png', + 'baseModel': '', + 'size': 0, + 'downloadUrl': '', + 'isDeleted': False + } + + # Get additional info from Civitai + if civitai_client: + try: + civitai_info = await civitai_client.get_model_version_info(resource.get("modelVersionId")) + populated_entry = await self.populate_lora_from_civitai( + lora_entry, + civitai_info, + recipe_scanner, + base_model_counts + ) + if populated_entry is None: + continue # Skip invalid LoRA types + lora_entry = populated_entry + except Exception as e: + logger.error(f"Error fetching Civitai info for LoRA {lora_entry['name']}: {e}") + + loras.append(lora_entry) + + # If no LoRAs from Civitai resources or to supplement, extract from prompt tags + if not loras or len(loras) == 0: + # Extract LoRAs from extranet tags in prompt + lora_matches = re.findall(self.EXTRANETS_REGEX, prompt) + for lora_type, lora_name, lora_weight in lora_matches: + # Initialize lora entry + lora_entry = { + 'name': lora_name, + 'type': lora_type, # 'lora' or 'hypernet' + 'weight': float(lora_weight), + 'existsLocally': False, + 'localPath': None, + 'file_name': lora_name, + 'thumbnailUrl': '/loras_static/images/no-preview.png', + 'baseModel': '', + 'size': 0, + 'downloadUrl': '', + 'isDeleted': False + } + + # Check for hash from hashes dict + lora_hash = None + if metadata.get("hashes") and f"{lora_type}:{lora_name}" in metadata["hashes"]: + lora_hash = metadata["hashes"][f"{lora_type}:{lora_name}"] + lora_entry['hash'] = lora_hash + + # Get additional info from Civitai either by hash or by checking civitai_resources + model_version_id = None + + # First check if we have model version ID from civitai_resources + if metadata.get("civitai_resources"): + for resource in metadata["civitai_resources"]: + if (lora_type == resource.get("type") and + (lora_name.lower() in resource.get("modelName", "").lower() or + resource.get("modelName", "").lower() in lora_name.lower()) and + resource.get("modelVersionId")): + model_version_id = resource.get("modelVersionId") + lora_entry['id'] = str(model_version_id) + break + + # Try to get info from Civitai + if civitai_client: + try: + if lora_hash: + # If we have hash, use it for lookup + civitai_info = await civitai_client.get_model_by_hash(lora_hash) + elif model_version_id: + # If we have model version ID, use that + civitai_info = await civitai_client.get_model_version_info(model_version_id) + else: + civitai_info = None + + # Populate lora entry with Civitai info if available + if civitai_info: + populated_entry = await self.populate_lora_from_civitai( + lora_entry, + civitai_info, + recipe_scanner, + base_model_counts, + lora_hash + ) + if populated_entry is None: + continue # Skip invalid LoRA types + lora_entry = populated_entry + except Exception as e: + logger.error(f"Error fetching Civitai info for LoRA {lora_name}: {e}") + + # Check if we can find it locally + if lora_hash and recipe_scanner: + lora_scanner = recipe_scanner._lora_scanner + exists_locally = lora_scanner.has_lora_hash(lora_hash) + if exists_locally: + try: + lora_cache = await lora_scanner.get_cached_data() + lora_item = next((item for item in lora_cache.raw_data + if item['sha256'].lower() == lora_hash.lower()), None) + if lora_item: + lora_entry['existsLocally'] = True + lora_entry['localPath'] = lora_item['file_path'] + lora_entry['file_name'] = lora_item['file_name'] + lora_entry['size'] = lora_item['size'] + if 'preview_url' in lora_item: + lora_entry['thumbnailUrl'] = config.get_preview_static_url(lora_item['preview_url']) + except Exception as e: + logger.error(f"Error getting local lora path: {e}") + + loras.append(lora_entry) + + # Try to get base model from resources or make educated guess + base_model = None + if base_model_counts: + # Use the most common base model from the loras + base_model = max(base_model_counts.items(), key=lambda x: x[1])[0] + + # Prepare final result structure + # Make sure gen_params only contains recognized keys + filtered_gen_params = {} + for key in GEN_PARAM_KEYS: + if key in metadata.get("gen_params", {}): + filtered_gen_params[key] = metadata["gen_params"][key] + + result = { + 'base_model': base_model, + 'loras': loras, + 'gen_params': filtered_gen_params, + 'from_automatic_metadata': True + } + + return result + except Exception as e: - logger.error(f"Error parsing Image Saver metadata: {e}", exc_info=True) + logger.error(f"Error parsing Automatic1111 metadata: {e}", exc_info=True) return {"error": str(e), "loras": []} - class RecipeParserFactory: """Factory for creating recipe metadata parsers""" @@ -1155,13 +970,9 @@ class RecipeParserFactory: if RecipeFormatParser().is_metadata_matching(user_comment): return RecipeFormatParser() - elif StandardMetadataParser().is_metadata_matching(user_comment): - return StandardMetadataParser() - elif A1111MetadataParser().is_metadata_matching(user_comment): - return A1111MetadataParser() + elif AutomaticMetadataParser().is_metadata_matching(user_comment): + return AutomaticMetadataParser() elif MetaFormatParser().is_metadata_matching(user_comment): return MetaFormatParser() - elif ImageSaverMetadataParser().is_metadata_matching(user_comment): - return ImageSaverMetadataParser() else: return None