Fix null-safety issues and apply code formatting

Bug fixes:
- Add null guards for base_models_roots/embeddings_roots in backup cleanup
- Fix null-safety initialization of extra_unet_roots

Formatting:
- Apply consistent code style across Python files
- Fix line wrapping, quote consistency, and trailing commas
- Add type ignore comments for dynamic/platform-specific code
This commit is contained in:
Will Miao
2026-02-28 21:38:41 +08:00
parent b005961ee5
commit c9e5ea42cb
9 changed files with 1097 additions and 760 deletions

View File

@@ -6,23 +6,24 @@ from .parsers import (
ComfyMetadataParser,
MetaFormatParser,
AutomaticMetadataParser,
CivitaiApiMetadataParser
CivitaiApiMetadataParser,
)
from .base import RecipeMetadataParser
logger = logging.getLogger(__name__)
class RecipeParserFactory:
"""Factory for creating recipe metadata parsers"""
@staticmethod
def create_parser(metadata) -> RecipeMetadataParser:
def create_parser(metadata) -> RecipeMetadataParser | None:
"""
Create appropriate parser based on the metadata content
Args:
metadata: The metadata from the image (dict or str)
Returns:
Appropriate RecipeMetadataParser implementation
"""
@@ -34,17 +35,18 @@ class RecipeParserFactory:
except Exception as e:
logger.debug(f"CivitaiApiMetadataParser check failed: {e}")
pass
# Convert dict to string for other parsers that expect string input
try:
import json
metadata_str = json.dumps(metadata)
except Exception as e:
logger.debug(f"Failed to convert dict to JSON string: {e}")
return None
else:
metadata_str = metadata
# Try ComfyMetadataParser which requires valid JSON
try:
if ComfyMetadataParser().is_metadata_matching(metadata_str):
@@ -52,7 +54,7 @@ class RecipeParserFactory:
except Exception:
# If JSON parsing fails, move on to other parsers
pass
# Check other parsers that expect string input
if RecipeFormatParser().is_metadata_matching(metadata_str):
return RecipeFormatParser()

View File

@@ -9,15 +9,16 @@ from ...services.metadata_service import get_default_metadata_provider
logger = logging.getLogger(__name__)
class CivitaiApiMetadataParser(RecipeMetadataParser):
"""Parser for Civitai image metadata format"""
def is_metadata_matching(self, metadata) -> bool:
"""Check if the metadata matches the Civitai image metadata format
Args:
metadata: The metadata from the image (dict)
Returns:
bool: True if this parser can handle the metadata
"""
@@ -28,7 +29,7 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
# Check for common CivitAI image metadata fields
civitai_image_fields = (
"resources",
"civitaiResources",
"civitaiResources",
"additionalResources",
"hashes",
"prompt",
@@ -40,7 +41,7 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
"width",
"height",
"Model",
"Model hash"
"Model hash",
)
return any(key in payload for key in civitai_image_fields)
@@ -50,7 +51,9 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
# Check for LoRA hash patterns
hashes = metadata.get("hashes")
if isinstance(hashes, dict) and any(str(key).lower().startswith("lora:") for key in hashes):
if isinstance(hashes, dict) and any(
str(key).lower().startswith("lora:") for key in hashes
):
return True
# Check nested meta object (common in CivitAI image responses)
@@ -61,22 +64,28 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
# Also check for LoRA hash patterns in nested meta
hashes = nested_meta.get("hashes")
if isinstance(hashes, dict) and any(str(key).lower().startswith("lora:") for key in hashes):
if isinstance(hashes, dict) and any(
str(key).lower().startswith("lora:") for key in hashes
):
return True
return False
async def parse_metadata(self, metadata, recipe_scanner=None, civitai_client=None) -> Dict[str, Any]:
async def parse_metadata( # type: ignore[override]
self, user_comment, recipe_scanner=None, civitai_client=None
) -> Dict[str, Any]:
"""Parse metadata from Civitai image format
Args:
metadata: The metadata from the image (dict)
user_comment: The metadata from the image (dict)
recipe_scanner: Optional recipe scanner service
civitai_client: Optional Civitai API client (deprecated, use metadata_provider instead)
Returns:
Dict containing parsed recipe data
"""
metadata: Dict[str, Any] = user_comment # type: ignore[assignment]
metadata = user_comment
try:
# Get metadata provider instead of using civitai_client directly
metadata_provider = await get_default_metadata_provider()
@@ -100,19 +109,19 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
)
):
metadata = inner_meta
# Initialize result structure
result = {
'base_model': None,
'loras': [],
'model': None,
'gen_params': {},
'from_civitai_image': True
"base_model": None,
"loras": [],
"model": None,
"gen_params": {},
"from_civitai_image": True,
}
# Track already added LoRAs to prevent duplicates
added_loras = {} # key: model_version_id or hash, value: index in result["loras"]
# Extract hash information from hashes field for LoRA matching
lora_hashes = {}
if "hashes" in metadata and isinstance(metadata["hashes"], dict):
@@ -121,14 +130,14 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
if key_str.lower().startswith("lora:"):
lora_name = key_str.split(":", 1)[1]
lora_hashes[lora_name] = hash_value
# Extract prompt and negative prompt
if "prompt" in metadata:
result["gen_params"]["prompt"] = metadata["prompt"]
if "negativePrompt" in metadata:
result["gen_params"]["negative_prompt"] = metadata["negativePrompt"]
# Extract other generation parameters
param_mapping = {
"steps": "steps",
@@ -138,98 +147,117 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
"Size": "size",
"clipSkip": "clip_skip",
}
for civitai_key, our_key in param_mapping.items():
if civitai_key in metadata and our_key in GEN_PARAM_KEYS:
result["gen_params"][our_key] = metadata[civitai_key]
# Extract base model information - directly if available
if "baseModel" in metadata:
result["base_model"] = metadata["baseModel"]
elif "Model hash" in metadata and metadata_provider:
model_hash = metadata["Model hash"]
model_info, error = await metadata_provider.get_model_by_hash(model_hash)
model_info, error = await metadata_provider.get_model_by_hash(
model_hash
)
if model_info:
result["base_model"] = model_info.get("baseModel", "")
elif "Model" in metadata and isinstance(metadata.get("resources"), list):
# Try to find base model in resources
for resource in metadata.get("resources", []):
if resource.get("type") == "model" and resource.get("name") == metadata.get("Model"):
if resource.get("type") == "model" and resource.get(
"name"
) == metadata.get("Model"):
# This is likely the checkpoint model
if metadata_provider and resource.get("hash"):
model_info, error = await metadata_provider.get_model_by_hash(resource.get("hash"))
(
model_info,
error,
) = await metadata_provider.get_model_by_hash(
resource.get("hash")
)
if model_info:
result["base_model"] = model_info.get("baseModel", "")
base_model_counts = {}
# Process standard resources array
if "resources" in metadata and isinstance(metadata["resources"], list):
for resource in metadata["resources"]:
# Modified to process resources without a type field as potential LoRAs
if resource.get("type", "lora") == "lora":
lora_hash = resource.get("hash", "")
# Try to get hash from the hashes field if not present in resource
if not lora_hash and resource.get("name"):
lora_hash = lora_hashes.get(resource["name"], "")
# Skip LoRAs without proper identification (hash or modelVersionId)
if not lora_hash and not resource.get("modelVersionId"):
logger.debug(f"Skipping LoRA resource '{resource.get('name', 'Unknown')}' - no hash or modelVersionId")
logger.debug(
f"Skipping LoRA resource '{resource.get('name', 'Unknown')}' - no hash or modelVersionId"
)
continue
# Skip if we've already added this LoRA by hash
if lora_hash and lora_hash in added_loras:
continue
lora_entry = {
'name': resource.get("name", "Unknown LoRA"),
'type': "lora",
'weight': float(resource.get("weight", 1.0)),
'hash': lora_hash,
'existsLocally': False,
'localPath': None,
'file_name': resource.get("name", "Unknown"),
'thumbnailUrl': '/loras_static/images/no-preview.png',
'baseModel': '',
'size': 0,
'downloadUrl': '',
'isDeleted': False
"name": resource.get("name", "Unknown LoRA"),
"type": "lora",
"weight": float(resource.get("weight", 1.0)),
"hash": lora_hash,
"existsLocally": False,
"localPath": None,
"file_name": resource.get("name", "Unknown"),
"thumbnailUrl": "/loras_static/images/no-preview.png",
"baseModel": "",
"size": 0,
"downloadUrl": "",
"isDeleted": False,
}
# Try to get info from Civitai if hash is available
if lora_entry['hash'] and metadata_provider:
if lora_entry["hash"] and metadata_provider:
try:
civitai_info = await metadata_provider.get_model_by_hash(lora_hash)
civitai_info = (
await metadata_provider.get_model_by_hash(lora_hash)
)
populated_entry = await self.populate_lora_from_civitai(
lora_entry,
civitai_info,
recipe_scanner,
base_model_counts,
lora_hash
lora_hash,
)
if populated_entry is None:
continue # Skip invalid LoRA types
lora_entry = populated_entry
# If we have a version ID from Civitai, track it for deduplication
if 'id' in lora_entry and lora_entry['id']:
added_loras[str(lora_entry['id'])] = len(result["loras"])
if "id" in lora_entry and lora_entry["id"]:
added_loras[str(lora_entry["id"])] = len(
result["loras"]
)
except Exception as e:
logger.error(f"Error fetching Civitai info for LoRA hash {lora_entry['hash']}: {e}")
logger.error(
f"Error fetching Civitai info for LoRA hash {lora_entry['hash']}: {e}"
)
# Track by hash if we have it
if lora_hash:
added_loras[lora_hash] = len(result["loras"])
result["loras"].append(lora_entry)
# Process civitaiResources array
if "civitaiResources" in metadata and isinstance(metadata["civitaiResources"], list):
if "civitaiResources" in metadata and isinstance(
metadata["civitaiResources"], list
):
for resource in metadata["civitaiResources"]:
# Get resource type and identifier
resource_type = str(resource.get("type") or "").lower()
@@ -237,32 +265,39 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
if resource_type == "checkpoint":
checkpoint_entry = {
'id': resource.get("modelVersionId", 0),
'modelId': resource.get("modelId", 0),
'name': resource.get("modelName", "Unknown Checkpoint"),
'version': resource.get("modelVersionName", ""),
'type': resource.get("type", "checkpoint"),
'existsLocally': False,
'localPath': None,
'file_name': resource.get("modelName", ""),
'hash': resource.get("hash", "") or "",
'thumbnailUrl': '/loras_static/images/no-preview.png',
'baseModel': '',
'size': 0,
'downloadUrl': '',
'isDeleted': False
"id": resource.get("modelVersionId", 0),
"modelId": resource.get("modelId", 0),
"name": resource.get("modelName", "Unknown Checkpoint"),
"version": resource.get("modelVersionName", ""),
"type": resource.get("type", "checkpoint"),
"existsLocally": False,
"localPath": None,
"file_name": resource.get("modelName", ""),
"hash": resource.get("hash", "") or "",
"thumbnailUrl": "/loras_static/images/no-preview.png",
"baseModel": "",
"size": 0,
"downloadUrl": "",
"isDeleted": False,
}
if version_id and metadata_provider:
try:
civitai_info = await metadata_provider.get_model_version_info(version_id)
civitai_info = (
await metadata_provider.get_model_version_info(
version_id
)
)
checkpoint_entry = await self.populate_checkpoint_from_civitai(
checkpoint_entry,
civitai_info
checkpoint_entry = (
await self.populate_checkpoint_from_civitai(
checkpoint_entry, civitai_info
)
)
except Exception as e:
logger.error(f"Error fetching Civitai info for checkpoint version {version_id}: {e}")
logger.error(
f"Error fetching Civitai info for checkpoint version {version_id}: {e}"
)
if result["model"] is None:
result["model"] = checkpoint_entry
@@ -275,31 +310,35 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
# Initialize lora entry
lora_entry = {
'id': resource.get("modelVersionId", 0),
'modelId': resource.get("modelId", 0),
'name': resource.get("modelName", "Unknown LoRA"),
'version': resource.get("modelVersionName", ""),
'type': resource.get("type", "lora"),
'weight': round(float(resource.get("weight", 1.0)), 2),
'existsLocally': False,
'thumbnailUrl': '/loras_static/images/no-preview.png',
'baseModel': '',
'size': 0,
'downloadUrl': '',
'isDeleted': False
"id": resource.get("modelVersionId", 0),
"modelId": resource.get("modelId", 0),
"name": resource.get("modelName", "Unknown LoRA"),
"version": resource.get("modelVersionName", ""),
"type": resource.get("type", "lora"),
"weight": round(float(resource.get("weight", 1.0)), 2),
"existsLocally": False,
"thumbnailUrl": "/loras_static/images/no-preview.png",
"baseModel": "",
"size": 0,
"downloadUrl": "",
"isDeleted": False,
}
# Try to get info from Civitai if modelVersionId is available
if version_id and metadata_provider:
try:
# Use get_model_version_info instead of get_model_version
civitai_info = await metadata_provider.get_model_version_info(version_id)
civitai_info = (
await metadata_provider.get_model_version_info(
version_id
)
)
populated_entry = await self.populate_lora_from_civitai(
lora_entry,
civitai_info,
recipe_scanner,
base_model_counts
base_model_counts,
)
if populated_entry is None:
@@ -307,74 +346,87 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
lora_entry = populated_entry
except Exception as e:
logger.error(f"Error fetching Civitai info for model version {version_id}: {e}")
logger.error(
f"Error fetching Civitai info for model version {version_id}: {e}"
)
# Track this LoRA in our deduplication dict
if version_id:
added_loras[version_id] = len(result["loras"])
result["loras"].append(lora_entry)
# Process additionalResources array
if "additionalResources" in metadata and isinstance(metadata["additionalResources"], list):
if "additionalResources" in metadata and isinstance(
metadata["additionalResources"], list
):
for resource in metadata["additionalResources"]:
# Skip resources that aren't LoRAs or LyCORIS
if resource.get("type") not in ["lora", "lycoris"] and "type" not in resource:
if (
resource.get("type") not in ["lora", "lycoris"]
and "type" not in resource
):
continue
lora_type = resource.get("type", "lora")
name = resource.get("name", "")
# Extract ID from URN format if available
version_id = None
if name and "civitai:" in name:
parts = name.split("@")
if len(parts) > 1:
version_id = parts[1]
# Skip if we've already added this LoRA
if version_id in added_loras:
continue
lora_entry = {
'name': name,
'type': lora_type,
'weight': float(resource.get("strength", 1.0)),
'hash': "",
'existsLocally': False,
'localPath': None,
'file_name': name,
'thumbnailUrl': '/loras_static/images/no-preview.png',
'baseModel': '',
'size': 0,
'downloadUrl': '',
'isDeleted': False
"name": name,
"type": lora_type,
"weight": float(resource.get("strength", 1.0)),
"hash": "",
"existsLocally": False,
"localPath": None,
"file_name": name,
"thumbnailUrl": "/loras_static/images/no-preview.png",
"baseModel": "",
"size": 0,
"downloadUrl": "",
"isDeleted": False,
}
# If we have a version ID and metadata provider, try to get more info
if version_id and metadata_provider:
try:
# Use get_model_version_info with the version ID
civitai_info = await metadata_provider.get_model_version_info(version_id)
civitai_info = (
await metadata_provider.get_model_version_info(
version_id
)
)
populated_entry = await self.populate_lora_from_civitai(
lora_entry,
civitai_info,
recipe_scanner,
base_model_counts
base_model_counts,
)
if populated_entry is None:
continue # Skip invalid LoRA types
lora_entry = populated_entry
# Track this LoRA for deduplication
if version_id:
added_loras[version_id] = len(result["loras"])
except Exception as e:
logger.error(f"Error fetching Civitai info for model ID {version_id}: {e}")
logger.error(
f"Error fetching Civitai info for model ID {version_id}: {e}"
)
result["loras"].append(lora_entry)
# If we found LoRA hashes in the metadata but haven't already
@@ -390,30 +442,32 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
continue
lora_entry = {
'name': lora_name,
'type': "lora",
'weight': 1.0,
'hash': lora_hash,
'existsLocally': False,
'localPath': None,
'file_name': lora_name,
'thumbnailUrl': '/loras_static/images/no-preview.png',
'baseModel': '',
'size': 0,
'downloadUrl': '',
'isDeleted': False
"name": lora_name,
"type": "lora",
"weight": 1.0,
"hash": lora_hash,
"existsLocally": False,
"localPath": None,
"file_name": lora_name,
"thumbnailUrl": "/loras_static/images/no-preview.png",
"baseModel": "",
"size": 0,
"downloadUrl": "",
"isDeleted": False,
}
if metadata_provider:
try:
civitai_info = await metadata_provider.get_model_by_hash(lora_hash)
civitai_info = await metadata_provider.get_model_by_hash(
lora_hash
)
populated_entry = await self.populate_lora_from_civitai(
lora_entry,
civitai_info,
recipe_scanner,
base_model_counts,
lora_hash
lora_hash,
)
if populated_entry is None:
@@ -421,80 +475,93 @@ class CivitaiApiMetadataParser(RecipeMetadataParser):
lora_entry = populated_entry
if 'id' in lora_entry and lora_entry['id']:
added_loras[str(lora_entry['id'])] = len(result["loras"])
if "id" in lora_entry and lora_entry["id"]:
added_loras[str(lora_entry["id"])] = len(result["loras"])
except Exception as e:
logger.error(f"Error fetching Civitai info for LoRA hash {lora_hash}: {e}")
logger.error(
f"Error fetching Civitai info for LoRA hash {lora_hash}: {e}"
)
added_loras[lora_hash] = len(result["loras"])
result["loras"].append(lora_entry)
# Check for LoRA info in the format "Lora_0 Model hash", "Lora_0 Model name", etc.
lora_index = 0
while f"Lora_{lora_index} Model hash" in metadata and f"Lora_{lora_index} Model name" in metadata:
while (
f"Lora_{lora_index} Model hash" in metadata
and f"Lora_{lora_index} Model name" in metadata
):
lora_hash = metadata[f"Lora_{lora_index} Model hash"]
lora_name = metadata[f"Lora_{lora_index} Model name"]
lora_strength_model = float(metadata.get(f"Lora_{lora_index} Strength model", 1.0))
lora_strength_model = float(
metadata.get(f"Lora_{lora_index} Strength model", 1.0)
)
# Skip if we've already added this LoRA by hash
if lora_hash and lora_hash in added_loras:
lora_index += 1
continue
lora_entry = {
'name': lora_name,
'type': "lora",
'weight': lora_strength_model,
'hash': lora_hash,
'existsLocally': False,
'localPath': None,
'file_name': lora_name,
'thumbnailUrl': '/loras_static/images/no-preview.png',
'baseModel': '',
'size': 0,
'downloadUrl': '',
'isDeleted': False
"name": lora_name,
"type": "lora",
"weight": lora_strength_model,
"hash": lora_hash,
"existsLocally": False,
"localPath": None,
"file_name": lora_name,
"thumbnailUrl": "/loras_static/images/no-preview.png",
"baseModel": "",
"size": 0,
"downloadUrl": "",
"isDeleted": False,
}
# Try to get info from Civitai if hash is available
if lora_entry['hash'] and metadata_provider:
if lora_entry["hash"] and metadata_provider:
try:
civitai_info = await metadata_provider.get_model_by_hash(lora_hash)
civitai_info = await metadata_provider.get_model_by_hash(
lora_hash
)
populated_entry = await self.populate_lora_from_civitai(
lora_entry,
civitai_info,
recipe_scanner,
base_model_counts,
lora_hash
lora_hash,
)
if populated_entry is None:
lora_index += 1
continue # Skip invalid LoRA types
lora_entry = populated_entry
# If we have a version ID from Civitai, track it for deduplication
if 'id' in lora_entry and lora_entry['id']:
added_loras[str(lora_entry['id'])] = len(result["loras"])
if "id" in lora_entry and lora_entry["id"]:
added_loras[str(lora_entry["id"])] = len(result["loras"])
except Exception as e:
logger.error(f"Error fetching Civitai info for LoRA hash {lora_entry['hash']}: {e}")
logger.error(
f"Error fetching Civitai info for LoRA hash {lora_entry['hash']}: {e}"
)
# Track by hash if we have it
if lora_hash:
added_loras[lora_hash] = len(result["loras"])
result["loras"].append(lora_entry)
lora_index += 1
# If base model wasn't found earlier, use the most common one from LoRAs
if not result["base_model"] and base_model_counts:
result["base_model"] = max(base_model_counts.items(), key=lambda x: x[1])[0]
result["base_model"] = max(
base_model_counts.items(), key=lambda x: x[1]
)[0]
return result
except Exception as e:
logger.error(f"Error parsing Civitai image metadata: {e}", exc_info=True)
return {"error": str(e), "loras": []}