mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-26 15:38:52 -03:00
refactor: remove StandardMetadataParser and ImageSaverMetadataParser, integrate AutomaticMetadataParser for improved metadata handling
This commit is contained in:
@@ -20,6 +20,9 @@ GEN_PARAM_KEYS = [
|
|||||||
'clip_skip',
|
'clip_skip',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Valid Lora types
|
||||||
|
VALID_LORA_TYPES = ['lora', 'locon']
|
||||||
|
|
||||||
class RecipeMetadataParser(ABC):
|
class RecipeMetadataParser(ABC):
|
||||||
"""Interface for parsing recipe metadata from image user comments"""
|
"""Interface for parsing recipe metadata from image user comments"""
|
||||||
|
|
||||||
@@ -46,7 +49,7 @@ class RecipeMetadataParser(ABC):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
async def populate_lora_from_civitai(self, lora_entry: Dict[str, Any], civitai_info_tuple: Tuple[Dict[str, Any], Optional[str]],
|
async def populate_lora_from_civitai(self, lora_entry: Dict[str, Any], civitai_info_tuple: Tuple[Dict[str, Any], Optional[str]],
|
||||||
recipe_scanner=None, base_model_counts=None, hash_value=None) -> Dict[str, Any]:
|
recipe_scanner=None, base_model_counts=None, hash_value=None) -> Optional[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Populate a lora entry with information from Civitai API response
|
Populate a lora entry with information from Civitai API response
|
||||||
|
|
||||||
@@ -58,13 +61,25 @@ class RecipeMetadataParser(ABC):
|
|||||||
hash_value: Optional hash value to use if not available in civitai_info
|
hash_value: Optional hash value to use if not available in civitai_info
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The populated lora_entry dict
|
The populated lora_entry dict if type is valid, None otherwise
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Unpack the tuple to get the actual data
|
# Unpack the tuple to get the actual data
|
||||||
civitai_info, error_msg = civitai_info_tuple if isinstance(civitai_info_tuple, tuple) else (civitai_info_tuple, None)
|
civitai_info, error_msg = civitai_info_tuple if isinstance(civitai_info_tuple, tuple) else (civitai_info_tuple, None)
|
||||||
|
|
||||||
if civitai_info and civitai_info.get("error") != "Model not found":
|
if not civitai_info or civitai_info.get("error") == "Model not found":
|
||||||
|
# Model not found or deleted
|
||||||
|
lora_entry['isDeleted'] = True
|
||||||
|
lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png'
|
||||||
|
return lora_entry
|
||||||
|
|
||||||
|
# Get model type and validate
|
||||||
|
model_type = civitai_info.get('model', {}).get('type', '').lower()
|
||||||
|
lora_entry['type'] = model_type
|
||||||
|
if model_type not in VALID_LORA_TYPES:
|
||||||
|
logger.debug(f"Skipping non-LoRA model type: {model_type}")
|
||||||
|
return None
|
||||||
|
|
||||||
# Check if this is an early access lora
|
# Check if this is an early access lora
|
||||||
if civitai_info.get('earlyAccessEndsAt'):
|
if civitai_info.get('earlyAccessEndsAt'):
|
||||||
# Convert earlyAccessEndsAt to a human-readable date
|
# Convert earlyAccessEndsAt to a human-readable date
|
||||||
@@ -133,10 +148,6 @@ class RecipeMetadataParser(ABC):
|
|||||||
# For missing LoRAs, get file_name from model_file.name
|
# For missing LoRAs, get file_name from model_file.name
|
||||||
file_name = model_file.get('name', '')
|
file_name = model_file.get('name', '')
|
||||||
lora_entry['file_name'] = os.path.splitext(file_name)[0] if file_name else ''
|
lora_entry['file_name'] = os.path.splitext(file_name)[0] if file_name else ''
|
||||||
else:
|
|
||||||
# Model not found or deleted
|
|
||||||
lora_entry['isDeleted'] = True
|
|
||||||
lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png'
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error populating lora from Civitai info: {e}")
|
logger.error(f"Error populating lora from Civitai info: {e}")
|
||||||
@@ -247,13 +258,16 @@ class RecipeFormatParser(RecipeMetadataParser):
|
|||||||
try:
|
try:
|
||||||
civitai_info_tuple = await civitai_client.get_model_version_info(lora['modelVersionId'])
|
civitai_info_tuple = await civitai_client.get_model_version_info(lora['modelVersionId'])
|
||||||
# Populate lora entry with Civitai info
|
# Populate lora entry with Civitai info
|
||||||
lora_entry = await self.populate_lora_from_civitai(
|
populated_entry = await self.populate_lora_from_civitai(
|
||||||
lora_entry,
|
lora_entry,
|
||||||
civitai_info_tuple,
|
civitai_info_tuple,
|
||||||
recipe_scanner,
|
recipe_scanner,
|
||||||
None, # No need to track base model counts
|
None, # No need to track base model counts
|
||||||
lora['hash']
|
lora['hash']
|
||||||
)
|
)
|
||||||
|
if populated_entry is None:
|
||||||
|
continue # Skip invalid LoRA types
|
||||||
|
lora_entry = populated_entry
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error fetching Civitai info for LoRA: {e}")
|
logger.error(f"Error fetching Civitai info for LoRA: {e}")
|
||||||
lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png'
|
lora_entry['thumbnailUrl'] = '/loras_static/images/no-preview.png'
|
||||||
@@ -282,361 +296,6 @@ class RecipeFormatParser(RecipeMetadataParser):
|
|||||||
logger.error(f"Error parsing recipe format metadata: {e}", exc_info=True)
|
logger.error(f"Error parsing recipe format metadata: {e}", exc_info=True)
|
||||||
return {"error": str(e), "loras": []}
|
return {"error": str(e), "loras": []}
|
||||||
|
|
||||||
|
|
||||||
class StandardMetadataParser(RecipeMetadataParser):
|
|
||||||
"""Parser for images with standard civitai metadata format (prompt, negative prompt, etc.)"""
|
|
||||||
|
|
||||||
METADATA_MARKER = r'Civitai resources: '
|
|
||||||
|
|
||||||
def is_metadata_matching(self, user_comment: str) -> bool:
|
|
||||||
"""Check if the user comment matches the metadata format"""
|
|
||||||
return re.search(self.METADATA_MARKER, user_comment, re.IGNORECASE | re.DOTALL) is not None
|
|
||||||
|
|
||||||
async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]:
|
|
||||||
"""Parse metadata from images with standard metadata format"""
|
|
||||||
try:
|
|
||||||
# Parse the standard metadata
|
|
||||||
metadata = self._parse_recipe_metadata(user_comment)
|
|
||||||
|
|
||||||
# Look for Civitai resources in the metadata
|
|
||||||
civitai_resources = metadata.get('loras', [])
|
|
||||||
checkpoint = metadata.get('checkpoint')
|
|
||||||
|
|
||||||
if not civitai_resources and not checkpoint:
|
|
||||||
return {
|
|
||||||
"error": "No LoRA information found in this image",
|
|
||||||
"loras": []
|
|
||||||
}
|
|
||||||
|
|
||||||
# Process LoRAs and collect base models
|
|
||||||
base_model_counts = {}
|
|
||||||
loras = []
|
|
||||||
|
|
||||||
# Process LoRAs
|
|
||||||
for resource in civitai_resources:
|
|
||||||
# Get model version ID
|
|
||||||
model_version_id = resource.get('modelVersionId')
|
|
||||||
if not model_version_id:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Initialize lora entry with default values
|
|
||||||
lora_entry = {
|
|
||||||
'id': model_version_id,
|
|
||||||
'name': resource.get('modelName', ''),
|
|
||||||
'version': resource.get('modelVersionName', ''),
|
|
||||||
'type': resource.get('type', 'lora'),
|
|
||||||
'weight': resource.get('weight', 1.0),
|
|
||||||
'existsLocally': False,
|
|
||||||
'localPath': None,
|
|
||||||
'file_name': '',
|
|
||||||
'hash': '',
|
|
||||||
'thumbnailUrl': '',
|
|
||||||
'baseModel': '',
|
|
||||||
'size': 0,
|
|
||||||
'downloadUrl': '',
|
|
||||||
'isDeleted': False
|
|
||||||
}
|
|
||||||
|
|
||||||
# Get additional info from Civitai if client is available
|
|
||||||
if civitai_client:
|
|
||||||
try:
|
|
||||||
civitai_info_tuple = await civitai_client.get_model_version_info(model_version_id)
|
|
||||||
# Populate lora entry with Civitai info
|
|
||||||
lora_entry = await self.populate_lora_from_civitai(
|
|
||||||
lora_entry,
|
|
||||||
civitai_info_tuple,
|
|
||||||
recipe_scanner,
|
|
||||||
base_model_counts
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error fetching Civitai info for LoRA: {e}")
|
|
||||||
|
|
||||||
loras.append(lora_entry)
|
|
||||||
|
|
||||||
# Set base_model to the most common one from civitai_info
|
|
||||||
base_model = None
|
|
||||||
if base_model_counts:
|
|
||||||
base_model = max(base_model_counts.items(), key=lambda x: x[1])[0]
|
|
||||||
|
|
||||||
# Extract generation parameters for recipe metadata
|
|
||||||
gen_params = {}
|
|
||||||
for key in GEN_PARAM_KEYS:
|
|
||||||
if key in metadata:
|
|
||||||
gen_params[key] = metadata.get(key, '')
|
|
||||||
|
|
||||||
return {
|
|
||||||
'base_model': base_model,
|
|
||||||
'loras': loras,
|
|
||||||
'gen_params': gen_params,
|
|
||||||
'raw_metadata': metadata
|
|
||||||
}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error parsing standard metadata: {e}", exc_info=True)
|
|
||||||
return {"error": str(e), "loras": []}
|
|
||||||
|
|
||||||
def _parse_recipe_metadata(self, user_comment: str) -> Dict[str, Any]:
|
|
||||||
"""Parse recipe metadata from UserComment"""
|
|
||||||
try:
|
|
||||||
# Split by 'Negative prompt:' to get the prompt
|
|
||||||
parts = user_comment.split('Negative prompt:', 1)
|
|
||||||
prompt = parts[0].strip()
|
|
||||||
|
|
||||||
# Initialize metadata with prompt
|
|
||||||
metadata = {"prompt": prompt, "loras": [], "checkpoint": None}
|
|
||||||
|
|
||||||
# Extract additional fields if available
|
|
||||||
if len(parts) > 1:
|
|
||||||
negative_and_params = parts[1]
|
|
||||||
|
|
||||||
# Extract negative prompt
|
|
||||||
if "Steps:" in negative_and_params:
|
|
||||||
neg_prompt = negative_and_params.split("Steps:", 1)[0].strip()
|
|
||||||
metadata["negative_prompt"] = neg_prompt
|
|
||||||
|
|
||||||
# Extract key-value parameters (Steps, Sampler, CFG scale, etc.)
|
|
||||||
param_pattern = r'([A-Za-z ]+): ([^,]+)'
|
|
||||||
params = re.findall(param_pattern, negative_and_params)
|
|
||||||
for key, value in params:
|
|
||||||
clean_key = key.strip().lower().replace(' ', '_')
|
|
||||||
metadata[clean_key] = value.strip()
|
|
||||||
|
|
||||||
# Extract Civitai resources
|
|
||||||
if 'Civitai resources:' in user_comment:
|
|
||||||
resources_part = user_comment.split('Civitai resources:', 1)[1].strip()
|
|
||||||
|
|
||||||
# Look for the opening and closing brackets to extract the JSON array
|
|
||||||
if resources_part.startswith('['):
|
|
||||||
# Find the position of the closing bracket
|
|
||||||
bracket_count = 0
|
|
||||||
end_pos = -1
|
|
||||||
|
|
||||||
for i, char in enumerate(resources_part):
|
|
||||||
if char == '[':
|
|
||||||
bracket_count += 1
|
|
||||||
elif char == ']':
|
|
||||||
bracket_count -= 1
|
|
||||||
if bracket_count == 0:
|
|
||||||
end_pos = i
|
|
||||||
break
|
|
||||||
|
|
||||||
if end_pos != -1:
|
|
||||||
resources_json = resources_part[:end_pos+1]
|
|
||||||
try:
|
|
||||||
resources = json.loads(resources_json)
|
|
||||||
# Filter loras and checkpoints
|
|
||||||
for resource in resources:
|
|
||||||
# Process both 'lora' and 'lycoris' types as loras
|
|
||||||
if resource.get('type') == 'lora' or resource.get('type') == 'lycoris':
|
|
||||||
# Ensure weight field is properly preserved
|
|
||||||
lora_entry = resource.copy()
|
|
||||||
# Default to 1.0 if weight not found
|
|
||||||
if 'weight' not in lora_entry:
|
|
||||||
lora_entry['weight'] = 1.0
|
|
||||||
# Ensure modelVersionName is included
|
|
||||||
if 'modelVersionName' not in lora_entry:
|
|
||||||
lora_entry['modelVersionName'] = ''
|
|
||||||
metadata['loras'].append(lora_entry)
|
|
||||||
elif resource.get('type') == 'checkpoint':
|
|
||||||
metadata['checkpoint'] = resource
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return metadata
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error parsing recipe metadata: {e}")
|
|
||||||
return {"prompt": user_comment, "loras": [], "checkpoint": None}
|
|
||||||
|
|
||||||
|
|
||||||
class A1111MetadataParser(RecipeMetadataParser):
|
|
||||||
"""Parser for images with A1111 metadata format (Lora hashes)"""
|
|
||||||
|
|
||||||
METADATA_MARKER = r'Lora hashes:'
|
|
||||||
LORA_PATTERN = r'<lora:([^:]+):([^>]+)>'
|
|
||||||
LORA_HASH_PATTERN = r'([^:]+):\s*([a-fA-F0-9]+)'
|
|
||||||
|
|
||||||
def is_metadata_matching(self, user_comment: str) -> bool:
|
|
||||||
"""Check if the user comment matches the A1111 metadata format"""
|
|
||||||
return 'Lora hashes:' in user_comment
|
|
||||||
|
|
||||||
async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]:
|
|
||||||
"""Parse metadata from images with A1111 metadata format"""
|
|
||||||
try:
|
|
||||||
# Initialize metadata with default empty values
|
|
||||||
metadata = {"prompt": "", "loras": []}
|
|
||||||
|
|
||||||
# Check if the user_comment contains prompt and negative prompt
|
|
||||||
if 'Negative prompt:' in user_comment:
|
|
||||||
# Extract prompt and negative prompt
|
|
||||||
parts = user_comment.split('Negative prompt:', 1)
|
|
||||||
metadata["prompt"] = parts[0].strip()
|
|
||||||
|
|
||||||
# Extract negative prompt and parameters
|
|
||||||
if len(parts) > 1:
|
|
||||||
negative_and_params = parts[1]
|
|
||||||
|
|
||||||
# Extract negative prompt
|
|
||||||
param_start = re.search(r'([A-Za-z ]+):', negative_and_params)
|
|
||||||
if param_start:
|
|
||||||
neg_prompt = negative_and_params[:param_start.start()].strip()
|
|
||||||
metadata["negative_prompt"] = neg_prompt
|
|
||||||
params_section = negative_and_params[param_start.start():]
|
|
||||||
else:
|
|
||||||
params_section = negative_and_params
|
|
||||||
|
|
||||||
# Extract parameters from this section
|
|
||||||
self._extract_parameters(params_section, metadata)
|
|
||||||
else:
|
|
||||||
# No prompt/negative prompt - extract parameters directly
|
|
||||||
self._extract_parameters(user_comment, metadata)
|
|
||||||
|
|
||||||
# Extract LoRA information from prompt if available
|
|
||||||
lora_weights = {}
|
|
||||||
if metadata["prompt"]:
|
|
||||||
lora_matches = re.findall(self.LORA_PATTERN, metadata["prompt"])
|
|
||||||
for lora_name, weights in lora_matches:
|
|
||||||
# Take only the first strength value (before the colon)
|
|
||||||
weight = weights.split(':')[0]
|
|
||||||
lora_weights[lora_name.strip()] = float(weight.strip())
|
|
||||||
|
|
||||||
# Remove LoRA patterns from prompt
|
|
||||||
metadata["prompt"] = re.sub(self.LORA_PATTERN, '', metadata["prompt"]).strip()
|
|
||||||
|
|
||||||
# Extract LoRA hashes
|
|
||||||
lora_hashes = {}
|
|
||||||
if 'Lora hashes:' in user_comment:
|
|
||||||
# Get the LoRA hashes section
|
|
||||||
lora_hash_section = user_comment.split('Lora hashes:', 1)[1].strip()
|
|
||||||
|
|
||||||
# Handle various format possibilities
|
|
||||||
if lora_hash_section.startswith('"'):
|
|
||||||
# Extract content within quotes
|
|
||||||
quote_match = re.match(r'"([^"]+)"', lora_hash_section)
|
|
||||||
if quote_match:
|
|
||||||
lora_hash_section = quote_match.group(1)
|
|
||||||
|
|
||||||
# Split by commas and parse each LoRA entry
|
|
||||||
lora_entries = []
|
|
||||||
current_entry = ""
|
|
||||||
for part in lora_hash_section.split(','):
|
|
||||||
# Check if this part contains a colon (indicating a complete entry)
|
|
||||||
if ':' in part:
|
|
||||||
if current_entry:
|
|
||||||
lora_entries.append(current_entry.strip())
|
|
||||||
current_entry = part.strip()
|
|
||||||
else:
|
|
||||||
# This is probably a continuation of the previous entry
|
|
||||||
current_entry += ',' + part
|
|
||||||
|
|
||||||
# Add the last entry if it exists
|
|
||||||
if current_entry:
|
|
||||||
lora_entries.append(current_entry.strip())
|
|
||||||
|
|
||||||
# Process each entry
|
|
||||||
for entry in lora_entries:
|
|
||||||
# Split at the colon to get name and hash
|
|
||||||
if ':' in entry:
|
|
||||||
lora_name, hash_value = entry.split(':', 1)
|
|
||||||
# Clean the values
|
|
||||||
lora_name = lora_name.strip()
|
|
||||||
hash_value = hash_value.strip()
|
|
||||||
# Store in our dictionary
|
|
||||||
lora_hashes[lora_name] = hash_value
|
|
||||||
|
|
||||||
# Alternative backup method using regex if the above parsing fails
|
|
||||||
if not lora_hashes:
|
|
||||||
if 'Lora hashes:' in user_comment:
|
|
||||||
lora_hash_section = user_comment.split('Lora hashes:', 1)[1].strip()
|
|
||||||
if lora_hash_section.startswith('"'):
|
|
||||||
# Extract content within quotes if present
|
|
||||||
quote_match = re.match(r'"([^"]+)"', lora_hash_section)
|
|
||||||
if quote_match:
|
|
||||||
lora_hash_section = quote_match.group(1)
|
|
||||||
|
|
||||||
# Use regex to find all name:hash pairs
|
|
||||||
hash_matches = re.findall(self.LORA_HASH_PATTERN, lora_hash_section)
|
|
||||||
for lora_name, hash_value in hash_matches:
|
|
||||||
# Clean up name by removing any leading comma and spaces
|
|
||||||
clean_name = lora_name.strip().lstrip(',').strip()
|
|
||||||
lora_hashes[clean_name] = hash_value.strip()
|
|
||||||
|
|
||||||
# Process LoRAs and collect base models
|
|
||||||
base_model_counts = {}
|
|
||||||
loras = []
|
|
||||||
|
|
||||||
# Process each LoRA with hash and weight
|
|
||||||
for lora_name, hash_value in lora_hashes.items():
|
|
||||||
weight = lora_weights.get(lora_name, 1.0)
|
|
||||||
|
|
||||||
# Initialize lora entry with default values
|
|
||||||
lora_entry = {
|
|
||||||
'name': lora_name,
|
|
||||||
'type': 'lora',
|
|
||||||
'weight': weight,
|
|
||||||
'existsLocally': False,
|
|
||||||
'localPath': None,
|
|
||||||
'file_name': lora_name,
|
|
||||||
'hash': hash_value.lower(), # Ensure hash is lowercase
|
|
||||||
'thumbnailUrl': '/loras_static/images/no-preview.png',
|
|
||||||
'baseModel': '',
|
|
||||||
'size': 0,
|
|
||||||
'downloadUrl': '',
|
|
||||||
'isDeleted': False
|
|
||||||
}
|
|
||||||
|
|
||||||
# Get info from Civitai by hash if available
|
|
||||||
if civitai_client and hash_value:
|
|
||||||
try:
|
|
||||||
civitai_info = await civitai_client.get_model_by_hash(hash_value)
|
|
||||||
# Populate lora entry with Civitai info
|
|
||||||
lora_entry = await self.populate_lora_from_civitai(
|
|
||||||
lora_entry,
|
|
||||||
civitai_info,
|
|
||||||
recipe_scanner,
|
|
||||||
base_model_counts,
|
|
||||||
hash_value
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error fetching Civitai info for LoRA hash {hash_value}: {e}")
|
|
||||||
|
|
||||||
loras.append(lora_entry)
|
|
||||||
|
|
||||||
# Set base_model to the most common one from civitai_info
|
|
||||||
base_model = None
|
|
||||||
if base_model_counts:
|
|
||||||
base_model = max(base_model_counts.items(), key=lambda x: x[1])[0]
|
|
||||||
|
|
||||||
# Extract generation parameters for recipe metadata
|
|
||||||
gen_params = {}
|
|
||||||
for key in GEN_PARAM_KEYS:
|
|
||||||
if key in metadata:
|
|
||||||
gen_params[key] = metadata.get(key, '')
|
|
||||||
|
|
||||||
# Add model information if available
|
|
||||||
if 'model' in metadata:
|
|
||||||
gen_params['checkpoint'] = metadata['model']
|
|
||||||
|
|
||||||
return {
|
|
||||||
'base_model': base_model,
|
|
||||||
'loras': loras,
|
|
||||||
'gen_params': gen_params,
|
|
||||||
'raw_metadata': metadata
|
|
||||||
}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error parsing A1111 metadata: {e}", exc_info=True)
|
|
||||||
return {"error": str(e), "loras": []}
|
|
||||||
|
|
||||||
def _extract_parameters(self, text: str, metadata: Dict[str, Any]) -> None:
|
|
||||||
"""Extract parameters from text section and populate metadata dict"""
|
|
||||||
# Extract key-value parameters (Steps, Sampler, CFG scale, etc.)
|
|
||||||
param_pattern = r'([A-Za-z][A-Za-z0-9 _]+): ([^,]+)(?:,|$)'
|
|
||||||
params = re.findall(param_pattern, text)
|
|
||||||
for key, value in params:
|
|
||||||
clean_key = key.strip().lower().replace(' ', '_')
|
|
||||||
metadata[clean_key] = value.strip()
|
|
||||||
|
|
||||||
|
|
||||||
class ComfyMetadataParser(RecipeMetadataParser):
|
class ComfyMetadataParser(RecipeMetadataParser):
|
||||||
"""Parser for Civitai ComfyUI metadata JSON format"""
|
"""Parser for Civitai ComfyUI metadata JSON format"""
|
||||||
|
|
||||||
@@ -706,11 +365,14 @@ class ComfyMetadataParser(RecipeMetadataParser):
|
|||||||
try:
|
try:
|
||||||
civitai_info_tuple = await civitai_client.get_model_version_info(model_version_id)
|
civitai_info_tuple = await civitai_client.get_model_version_info(model_version_id)
|
||||||
# Populate lora entry with Civitai info
|
# Populate lora entry with Civitai info
|
||||||
lora_entry = await self.populate_lora_from_civitai(
|
populated_entry = await self.populate_lora_from_civitai(
|
||||||
lora_entry,
|
lora_entry,
|
||||||
civitai_info_tuple,
|
civitai_info_tuple,
|
||||||
recipe_scanner
|
recipe_scanner
|
||||||
)
|
)
|
||||||
|
if populated_entry is None:
|
||||||
|
continue # Skip invalid LoRA types
|
||||||
|
lora_entry = populated_entry
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error fetching Civitai info for LoRA: {e}")
|
logger.error(f"Error fetching Civitai info for LoRA: {e}")
|
||||||
|
|
||||||
@@ -959,13 +621,16 @@ class MetaFormatParser(RecipeMetadataParser):
|
|||||||
try:
|
try:
|
||||||
civitai_info = await civitai_client.get_model_by_hash(hash_value)
|
civitai_info = await civitai_client.get_model_by_hash(hash_value)
|
||||||
# Populate lora entry with Civitai info
|
# Populate lora entry with Civitai info
|
||||||
lora_entry = await self.populate_lora_from_civitai(
|
populated_entry = await self.populate_lora_from_civitai(
|
||||||
lora_entry,
|
lora_entry,
|
||||||
civitai_info,
|
civitai_info,
|
||||||
recipe_scanner,
|
recipe_scanner,
|
||||||
base_model_counts,
|
base_model_counts,
|
||||||
hash_value
|
hash_value
|
||||||
)
|
)
|
||||||
|
if populated_entry is None:
|
||||||
|
continue # Skip invalid LoRA types
|
||||||
|
lora_entry = populated_entry
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error fetching Civitai info for LoRA hash {hash_value}: {e}")
|
logger.error(f"Error fetching Civitai info for LoRA hash {hash_value}: {e}")
|
||||||
|
|
||||||
@@ -1003,83 +668,145 @@ class MetaFormatParser(RecipeMetadataParser):
|
|||||||
logger.error(f"Error parsing meta format metadata: {e}", exc_info=True)
|
logger.error(f"Error parsing meta format metadata: {e}", exc_info=True)
|
||||||
return {"error": str(e), "loras": []}
|
return {"error": str(e), "loras": []}
|
||||||
|
|
||||||
|
class AutomaticMetadataParser(RecipeMetadataParser):
|
||||||
|
"""Parser for Automatic1111 metadata format"""
|
||||||
|
|
||||||
class ImageSaverMetadataParser(RecipeMetadataParser):
|
METADATA_MARKER = r"Steps: \d+"
|
||||||
"""Parser for ComfyUI Image Saver plugin metadata format"""
|
|
||||||
|
|
||||||
METADATA_MARKER = r'Hashes: \{.*\}'
|
# Regular expressions for extracting specific metadata
|
||||||
LORA_PATTERN = r'<lora:([^:]+):([^>]+)>'
|
HASHES_REGEX = r', Hashes:\s*({[^}]+})'
|
||||||
HASH_PATTERN = r'Hashes: (\{.*?\})'
|
CIVITAI_RESOURCES_REGEX = r', Civitai resources:\s*(\[\{.*?\}\])'
|
||||||
|
CIVITAI_METADATA_REGEX = r', Civitai metadata:\s*(\{.*?\})'
|
||||||
|
EXTRANETS_REGEX = r'<(lora|hypernet):([a-zA-Z0-9_\.\-]+):([0-9.]+)>'
|
||||||
|
MODEL_HASH_PATTERN = r'Model hash: ([a-zA-Z0-9]+)'
|
||||||
|
VAE_HASH_PATTERN = r'VAE hash: ([a-zA-Z0-9]+)'
|
||||||
|
|
||||||
def is_metadata_matching(self, user_comment: str) -> bool:
|
def is_metadata_matching(self, user_comment: str) -> bool:
|
||||||
"""Check if the user comment matches the Image Saver metadata format"""
|
"""Check if the user comment matches the Automatic1111 format"""
|
||||||
return "Hashes:" in user_comment and re.search(self.HASH_PATTERN, user_comment) is not None
|
# Match if it has Steps pattern and either has "Negative prompt:" or "Civitai resources:"
|
||||||
|
return (re.search(self.METADATA_MARKER, user_comment) is not None and
|
||||||
|
("Negative prompt:" in user_comment or re.search(self.CIVITAI_RESOURCES_REGEX, user_comment) is not None))
|
||||||
|
|
||||||
async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]:
|
async def parse_metadata(self, user_comment: str, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]:
|
||||||
"""Parse metadata from Image Saver plugin format"""
|
"""Parse metadata from Automatic1111 format"""
|
||||||
try:
|
try:
|
||||||
# Extract prompt and negative prompt
|
# Split on Negative prompt if it exists
|
||||||
|
if "Negative prompt:" in user_comment:
|
||||||
parts = user_comment.split('Negative prompt:', 1)
|
parts = user_comment.split('Negative prompt:', 1)
|
||||||
prompt = parts[0].strip()
|
prompt = parts[0].strip()
|
||||||
|
negative_and_params = parts[1] if len(parts) > 1 else ""
|
||||||
|
else:
|
||||||
|
# No negative prompt section
|
||||||
|
param_start = re.search(self.METADATA_MARKER, user_comment)
|
||||||
|
if param_start:
|
||||||
|
prompt = user_comment[:param_start.start()].strip()
|
||||||
|
negative_and_params = user_comment[param_start.start():]
|
||||||
|
else:
|
||||||
|
prompt = user_comment.strip()
|
||||||
|
negative_and_params = ""
|
||||||
|
|
||||||
# Initialize metadata
|
# Initialize metadata
|
||||||
metadata = {"prompt": prompt, "loras": []}
|
metadata = {
|
||||||
|
"prompt": prompt,
|
||||||
|
"loras": []
|
||||||
|
}
|
||||||
|
|
||||||
# Extract negative prompt and parameters
|
# Extract negative prompt and parameters
|
||||||
if len(parts) > 1:
|
if negative_and_params:
|
||||||
negative_and_params = parts[1]
|
# If we split on "Negative prompt:", check for params section
|
||||||
|
if "Negative prompt:" in user_comment:
|
||||||
# Extract negative prompt
|
param_start = re.search(r'Steps: ', negative_and_params)
|
||||||
if "Steps:" in negative_and_params:
|
if param_start:
|
||||||
neg_prompt = negative_and_params.split("Steps:", 1)[0].strip()
|
neg_prompt = negative_and_params[:param_start.start()].strip()
|
||||||
metadata["negative_prompt"] = neg_prompt
|
metadata["negative_prompt"] = neg_prompt
|
||||||
|
params_section = negative_and_params[param_start.start():]
|
||||||
|
else:
|
||||||
|
metadata["negative_prompt"] = negative_and_params.strip()
|
||||||
|
params_section = ""
|
||||||
|
else:
|
||||||
|
# No negative prompt, entire section is params
|
||||||
|
params_section = negative_and_params
|
||||||
|
|
||||||
|
# Extract generation parameters
|
||||||
|
if params_section:
|
||||||
|
# Extract Civitai resources
|
||||||
|
civitai_resources_match = re.search(self.CIVITAI_RESOURCES_REGEX, params_section)
|
||||||
|
if civitai_resources_match:
|
||||||
|
try:
|
||||||
|
civitai_resources = json.loads(civitai_resources_match.group(1))
|
||||||
|
metadata["civitai_resources"] = civitai_resources
|
||||||
|
params_section = params_section.replace(civitai_resources_match.group(0), '')
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.error("Error parsing Civitai resources JSON")
|
||||||
|
|
||||||
|
# Extract Hashes
|
||||||
|
hashes_match = re.search(self.HASHES_REGEX, params_section)
|
||||||
|
if hashes_match:
|
||||||
|
try:
|
||||||
|
hashes = json.loads(hashes_match.group(1))
|
||||||
|
metadata["hashes"] = hashes
|
||||||
|
# Remove hashes from params section to not interfere with other parsing
|
||||||
|
params_section = params_section.replace(hashes_match.group(0), '')
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.error("Error parsing hashes JSON")
|
||||||
|
|
||||||
|
# Extract basic parameters
|
||||||
|
param_pattern = r'([A-Za-z\s]+): ([^,]+)'
|
||||||
|
params = re.findall(param_pattern, params_section)
|
||||||
|
gen_params = {}
|
||||||
|
|
||||||
# Extract key-value parameters (Steps, Sampler, CFG scale, etc.)
|
|
||||||
param_pattern = r'([A-Za-z ]+): ([^,]+)'
|
|
||||||
params = re.findall(param_pattern, negative_and_params)
|
|
||||||
for key, value in params:
|
for key, value in params:
|
||||||
clean_key = key.strip().lower().replace(' ', '_')
|
clean_key = key.strip().lower().replace(' ', '_')
|
||||||
metadata[clean_key] = value.strip()
|
|
||||||
|
|
||||||
# Extract LoRA information from prompt
|
# Skip if not in recognized gen param keys
|
||||||
lora_weights = {}
|
if clean_key not in GEN_PARAM_KEYS:
|
||||||
lora_matches = re.findall(self.LORA_PATTERN, prompt)
|
continue
|
||||||
for lora_name, weight in lora_matches:
|
|
||||||
lora_weights[lora_name.strip()] = float(weight.split(':')[0].strip())
|
|
||||||
|
|
||||||
# Remove LoRA patterns from prompt
|
# Convert numeric values
|
||||||
metadata["prompt"] = re.sub(self.LORA_PATTERN, '', prompt).strip()
|
if clean_key in ['steps', 'seed']:
|
||||||
|
|
||||||
# Extract LoRA hashes from Hashes section
|
|
||||||
lora_hashes = {}
|
|
||||||
hash_match = re.search(self.HASH_PATTERN, user_comment)
|
|
||||||
if hash_match:
|
|
||||||
try:
|
try:
|
||||||
hashes = json.loads(hash_match.group(1))
|
gen_params[clean_key] = int(value.strip())
|
||||||
for key, hash_value in hashes.items():
|
except ValueError:
|
||||||
if key.startswith('LORA:'):
|
gen_params[clean_key] = value.strip()
|
||||||
lora_name = key[5:] # Remove 'LORA:' prefix
|
elif clean_key in ['cfg_scale']:
|
||||||
lora_hashes[lora_name] = hash_value.strip()
|
try:
|
||||||
except json.JSONDecodeError:
|
gen_params[clean_key] = float(value.strip())
|
||||||
pass
|
except ValueError:
|
||||||
|
gen_params[clean_key] = value.strip()
|
||||||
|
else:
|
||||||
|
gen_params[clean_key] = value.strip()
|
||||||
|
|
||||||
# Process LoRAs and collect base models
|
# Extract size if available and add to gen_params if a recognized key
|
||||||
base_model_counts = {}
|
size_match = re.search(r'Size: (\d+)x(\d+)', params_section)
|
||||||
|
if size_match and 'size' in GEN_PARAM_KEYS:
|
||||||
|
width, height = size_match.groups()
|
||||||
|
gen_params['size'] = f"{width}x{height}"
|
||||||
|
|
||||||
|
# Add prompt and negative_prompt to gen_params if they're in GEN_PARAM_KEYS
|
||||||
|
if 'prompt' in GEN_PARAM_KEYS and 'prompt' in metadata:
|
||||||
|
gen_params['prompt'] = metadata['prompt']
|
||||||
|
if 'negative_prompt' in GEN_PARAM_KEYS and 'negative_prompt' in metadata:
|
||||||
|
gen_params['negative_prompt'] = metadata['negative_prompt']
|
||||||
|
|
||||||
|
metadata["gen_params"] = gen_params
|
||||||
|
|
||||||
|
# Extract LoRA information
|
||||||
loras = []
|
loras = []
|
||||||
|
base_model_counts = {}
|
||||||
|
|
||||||
# Process each LoRA with hash and weight
|
# First use Civitai resources if available (more reliable source)
|
||||||
for lora_name, hash_value in lora_hashes.items():
|
if metadata.get("civitai_resources"):
|
||||||
weight = lora_weights.get(lora_name, 1.0)
|
for resource in metadata.get("civitai_resources", []):
|
||||||
|
if resource.get("type") in ["lora", "hypernet"] and resource.get("modelVersionId"):
|
||||||
# Initialize lora entry with default values
|
# Initialize lora entry
|
||||||
lora_entry = {
|
lora_entry = {
|
||||||
'name': lora_name,
|
'id': str(resource.get("modelVersionId")),
|
||||||
'type': 'lora',
|
'modelId': str(resource.get("modelId")) if resource.get("modelId") else None,
|
||||||
'weight': weight,
|
'name': resource.get("modelName", "Unknown LoRA"),
|
||||||
|
'version': resource.get("modelVersionName", ""),
|
||||||
|
'type': resource.get("type", "lora"),
|
||||||
|
'weight': float(resource.get("weight", 1.0)),
|
||||||
'existsLocally': False,
|
'existsLocally': False,
|
||||||
'localPath': None,
|
|
||||||
'file_name': lora_name,
|
|
||||||
'hash': hash_value,
|
|
||||||
'thumbnailUrl': '/loras_static/images/no-preview.png',
|
'thumbnailUrl': '/loras_static/images/no-preview.png',
|
||||||
'baseModel': '',
|
'baseModel': '',
|
||||||
'size': 0,
|
'size': 0,
|
||||||
@@ -1087,49 +814,137 @@ class ImageSaverMetadataParser(RecipeMetadataParser):
|
|||||||
'isDeleted': False
|
'isDeleted': False
|
||||||
}
|
}
|
||||||
|
|
||||||
# Get info from Civitai by hash if available
|
# Get additional info from Civitai
|
||||||
if civitai_client and hash_value:
|
if civitai_client:
|
||||||
try:
|
try:
|
||||||
civitai_info = await civitai_client.get_model_by_hash(hash_value)
|
civitai_info = await civitai_client.get_model_version_info(resource.get("modelVersionId"))
|
||||||
# Populate lora entry with Civitai info
|
populated_entry = await self.populate_lora_from_civitai(
|
||||||
lora_entry = await self.populate_lora_from_civitai(
|
lora_entry,
|
||||||
|
civitai_info,
|
||||||
|
recipe_scanner,
|
||||||
|
base_model_counts
|
||||||
|
)
|
||||||
|
if populated_entry is None:
|
||||||
|
continue # Skip invalid LoRA types
|
||||||
|
lora_entry = populated_entry
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error fetching Civitai info for LoRA {lora_entry['name']}: {e}")
|
||||||
|
|
||||||
|
loras.append(lora_entry)
|
||||||
|
|
||||||
|
# If no LoRAs from Civitai resources or to supplement, extract from prompt tags
|
||||||
|
if not loras or len(loras) == 0:
|
||||||
|
# Extract LoRAs from extranet tags in prompt
|
||||||
|
lora_matches = re.findall(self.EXTRANETS_REGEX, prompt)
|
||||||
|
for lora_type, lora_name, lora_weight in lora_matches:
|
||||||
|
# Initialize lora entry
|
||||||
|
lora_entry = {
|
||||||
|
'name': lora_name,
|
||||||
|
'type': lora_type, # 'lora' or 'hypernet'
|
||||||
|
'weight': float(lora_weight),
|
||||||
|
'existsLocally': False,
|
||||||
|
'localPath': None,
|
||||||
|
'file_name': lora_name,
|
||||||
|
'thumbnailUrl': '/loras_static/images/no-preview.png',
|
||||||
|
'baseModel': '',
|
||||||
|
'size': 0,
|
||||||
|
'downloadUrl': '',
|
||||||
|
'isDeleted': False
|
||||||
|
}
|
||||||
|
|
||||||
|
# Check for hash from hashes dict
|
||||||
|
lora_hash = None
|
||||||
|
if metadata.get("hashes") and f"{lora_type}:{lora_name}" in metadata["hashes"]:
|
||||||
|
lora_hash = metadata["hashes"][f"{lora_type}:{lora_name}"]
|
||||||
|
lora_entry['hash'] = lora_hash
|
||||||
|
|
||||||
|
# Get additional info from Civitai either by hash or by checking civitai_resources
|
||||||
|
model_version_id = None
|
||||||
|
|
||||||
|
# First check if we have model version ID from civitai_resources
|
||||||
|
if metadata.get("civitai_resources"):
|
||||||
|
for resource in metadata["civitai_resources"]:
|
||||||
|
if (lora_type == resource.get("type") and
|
||||||
|
(lora_name.lower() in resource.get("modelName", "").lower() or
|
||||||
|
resource.get("modelName", "").lower() in lora_name.lower()) and
|
||||||
|
resource.get("modelVersionId")):
|
||||||
|
model_version_id = resource.get("modelVersionId")
|
||||||
|
lora_entry['id'] = str(model_version_id)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Try to get info from Civitai
|
||||||
|
if civitai_client:
|
||||||
|
try:
|
||||||
|
if lora_hash:
|
||||||
|
# If we have hash, use it for lookup
|
||||||
|
civitai_info = await civitai_client.get_model_by_hash(lora_hash)
|
||||||
|
elif model_version_id:
|
||||||
|
# If we have model version ID, use that
|
||||||
|
civitai_info = await civitai_client.get_model_version_info(model_version_id)
|
||||||
|
else:
|
||||||
|
civitai_info = None
|
||||||
|
|
||||||
|
# Populate lora entry with Civitai info if available
|
||||||
|
if civitai_info:
|
||||||
|
populated_entry = await self.populate_lora_from_civitai(
|
||||||
lora_entry,
|
lora_entry,
|
||||||
civitai_info,
|
civitai_info,
|
||||||
recipe_scanner,
|
recipe_scanner,
|
||||||
base_model_counts,
|
base_model_counts,
|
||||||
hash_value
|
lora_hash
|
||||||
)
|
)
|
||||||
|
if populated_entry is None:
|
||||||
|
continue # Skip invalid LoRA types
|
||||||
|
lora_entry = populated_entry
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error fetching Civitai info for LoRA hash {hash_value}: {e}")
|
logger.error(f"Error fetching Civitai info for LoRA {lora_name}: {e}")
|
||||||
|
|
||||||
|
# Check if we can find it locally
|
||||||
|
if lora_hash and recipe_scanner:
|
||||||
|
lora_scanner = recipe_scanner._lora_scanner
|
||||||
|
exists_locally = lora_scanner.has_lora_hash(lora_hash)
|
||||||
|
if exists_locally:
|
||||||
|
try:
|
||||||
|
lora_cache = await lora_scanner.get_cached_data()
|
||||||
|
lora_item = next((item for item in lora_cache.raw_data
|
||||||
|
if item['sha256'].lower() == lora_hash.lower()), None)
|
||||||
|
if lora_item:
|
||||||
|
lora_entry['existsLocally'] = True
|
||||||
|
lora_entry['localPath'] = lora_item['file_path']
|
||||||
|
lora_entry['file_name'] = lora_item['file_name']
|
||||||
|
lora_entry['size'] = lora_item['size']
|
||||||
|
if 'preview_url' in lora_item:
|
||||||
|
lora_entry['thumbnailUrl'] = config.get_preview_static_url(lora_item['preview_url'])
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting local lora path: {e}")
|
||||||
|
|
||||||
loras.append(lora_entry)
|
loras.append(lora_entry)
|
||||||
|
|
||||||
# Set base_model to the most common one from civitai_info
|
# Try to get base model from resources or make educated guess
|
||||||
base_model = None
|
base_model = None
|
||||||
if base_model_counts:
|
if base_model_counts:
|
||||||
|
# Use the most common base model from the loras
|
||||||
base_model = max(base_model_counts.items(), key=lambda x: x[1])[0]
|
base_model = max(base_model_counts.items(), key=lambda x: x[1])[0]
|
||||||
|
|
||||||
# Extract generation parameters for recipe metadata
|
# Prepare final result structure
|
||||||
gen_params = {}
|
# Make sure gen_params only contains recognized keys
|
||||||
|
filtered_gen_params = {}
|
||||||
for key in GEN_PARAM_KEYS:
|
for key in GEN_PARAM_KEYS:
|
||||||
if key in metadata:
|
if key in metadata.get("gen_params", {}):
|
||||||
gen_params[key] = metadata.get(key, '')
|
filtered_gen_params[key] = metadata["gen_params"][key]
|
||||||
|
|
||||||
# Add model information if available
|
result = {
|
||||||
if 'model' in metadata:
|
|
||||||
gen_params['checkpoint'] = metadata['model']
|
|
||||||
|
|
||||||
return {
|
|
||||||
'base_model': base_model,
|
'base_model': base_model,
|
||||||
'loras': loras,
|
'loras': loras,
|
||||||
'gen_params': gen_params,
|
'gen_params': filtered_gen_params,
|
||||||
'raw_metadata': metadata
|
'from_automatic_metadata': True
|
||||||
}
|
}
|
||||||
|
|
||||||
except Exception as e:
|
return result
|
||||||
logger.error(f"Error parsing Image Saver metadata: {e}", exc_info=True)
|
|
||||||
return {"error": str(e), "loras": []}
|
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing Automatic1111 metadata: {e}", exc_info=True)
|
||||||
|
return {"error": str(e), "loras": []}
|
||||||
|
|
||||||
class RecipeParserFactory:
|
class RecipeParserFactory:
|
||||||
"""Factory for creating recipe metadata parsers"""
|
"""Factory for creating recipe metadata parsers"""
|
||||||
@@ -1155,13 +970,9 @@ class RecipeParserFactory:
|
|||||||
|
|
||||||
if RecipeFormatParser().is_metadata_matching(user_comment):
|
if RecipeFormatParser().is_metadata_matching(user_comment):
|
||||||
return RecipeFormatParser()
|
return RecipeFormatParser()
|
||||||
elif StandardMetadataParser().is_metadata_matching(user_comment):
|
elif AutomaticMetadataParser().is_metadata_matching(user_comment):
|
||||||
return StandardMetadataParser()
|
return AutomaticMetadataParser()
|
||||||
elif A1111MetadataParser().is_metadata_matching(user_comment):
|
|
||||||
return A1111MetadataParser()
|
|
||||||
elif MetaFormatParser().is_metadata_matching(user_comment):
|
elif MetaFormatParser().is_metadata_matching(user_comment):
|
||||||
return MetaFormatParser()
|
return MetaFormatParser()
|
||||||
elif ImageSaverMetadataParser().is_metadata_matching(user_comment):
|
|
||||||
return ImageSaverMetadataParser()
|
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|||||||
Reference in New Issue
Block a user