mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-25 07:05:43 -03:00
Merge branch 'main' of https://github.com/willmiao/ComfyUI-Lora-Manager
This commit is contained in:
529
py/services/civarchive_client.py
Normal file
529
py/services/civarchive_client.py
Normal file
@@ -0,0 +1,529 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
from copy import deepcopy
|
||||||
|
from typing import Optional, Dict, Tuple, List
|
||||||
|
from .model_metadata_provider import CivArchiveModelMetadataProvider, ModelMetadataProviderManager
|
||||||
|
from .downloader import get_downloader
|
||||||
|
|
||||||
|
try:
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
except ImportError as exc:
|
||||||
|
BeautifulSoup = None # type: ignore[assignment]
|
||||||
|
_BS4_IMPORT_ERROR = exc
|
||||||
|
else:
|
||||||
|
_BS4_IMPORT_ERROR = None
|
||||||
|
|
||||||
|
def _require_beautifulsoup():
|
||||||
|
if BeautifulSoup is None:
|
||||||
|
raise RuntimeError(
|
||||||
|
"BeautifulSoup (bs4) is required for CivArchive client. "
|
||||||
|
"Install it with 'pip install beautifulsoup4'."
|
||||||
|
) from _BS4_IMPORT_ERROR
|
||||||
|
return BeautifulSoup
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class CivArchiveClient:
|
||||||
|
_instance = None
|
||||||
|
_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def get_instance(cls):
|
||||||
|
"""Get singleton instance of CivArchiveClient"""
|
||||||
|
async with cls._lock:
|
||||||
|
if cls._instance is None:
|
||||||
|
cls._instance = cls()
|
||||||
|
|
||||||
|
# Register this client as a metadata provider
|
||||||
|
provider_manager = await ModelMetadataProviderManager.get_instance()
|
||||||
|
provider_manager.register_provider('civarchive', CivArchiveModelMetadataProvider(cls._instance), False)
|
||||||
|
|
||||||
|
return cls._instance
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# Check if already initialized for singleton pattern
|
||||||
|
if hasattr(self, '_initialized'):
|
||||||
|
return
|
||||||
|
self._initialized = True
|
||||||
|
|
||||||
|
self.base_url = "https://civarchive.com/api"
|
||||||
|
|
||||||
|
async def _request_json(
|
||||||
|
self,
|
||||||
|
path: str,
|
||||||
|
params: Optional[Dict[str, str]] = None
|
||||||
|
) -> Tuple[Optional[Dict], Optional[str]]:
|
||||||
|
"""Call CivArchive API and return JSON payload"""
|
||||||
|
downloader = await get_downloader()
|
||||||
|
kwargs: Dict[str, Dict[str, str]] = {}
|
||||||
|
if params:
|
||||||
|
safe_params = {str(key): str(value) for key, value in params.items() if value is not None}
|
||||||
|
if safe_params:
|
||||||
|
kwargs["params"] = safe_params
|
||||||
|
success, payload = await downloader.make_request(
|
||||||
|
"GET",
|
||||||
|
f"{self.base_url}{path}",
|
||||||
|
use_auth=False,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
if not success:
|
||||||
|
error = payload if isinstance(payload, str) else "Request failed"
|
||||||
|
return None, error
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
return None, "Invalid response structure"
|
||||||
|
return payload, None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _normalize_payload(payload: Dict) -> Dict:
|
||||||
|
"""Unwrap CivArchive responses that wrap content under a data key"""
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
return {}
|
||||||
|
data = payload.get("data")
|
||||||
|
if isinstance(data, dict):
|
||||||
|
return data
|
||||||
|
return payload
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _split_context(payload: Dict) -> Tuple[Dict, Dict, List[Dict]]:
|
||||||
|
"""Separate version payload from surrounding model context"""
|
||||||
|
data = CivArchiveClient._normalize_payload(payload)
|
||||||
|
context: Dict = {}
|
||||||
|
fallback_files: List[Dict] = []
|
||||||
|
version: Dict = {}
|
||||||
|
|
||||||
|
for key, value in data.items():
|
||||||
|
if key in {"version", "model"}:
|
||||||
|
continue
|
||||||
|
context[key] = value
|
||||||
|
|
||||||
|
if isinstance(data.get("version"), dict):
|
||||||
|
version = data["version"]
|
||||||
|
|
||||||
|
model_block = data.get("model")
|
||||||
|
if isinstance(model_block, dict):
|
||||||
|
for key, value in model_block.items():
|
||||||
|
if key == "version":
|
||||||
|
if not version and isinstance(value, dict):
|
||||||
|
version = value
|
||||||
|
continue
|
||||||
|
context.setdefault(key, value)
|
||||||
|
fallback_files = fallback_files or model_block.get("files") or []
|
||||||
|
|
||||||
|
fallback_files = fallback_files or data.get("files") or []
|
||||||
|
return context, version, fallback_files
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _ensure_list(value) -> List:
|
||||||
|
if isinstance(value, list):
|
||||||
|
return value
|
||||||
|
if value is None:
|
||||||
|
return []
|
||||||
|
return [value]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _build_model_info(context: Dict) -> Dict:
|
||||||
|
tags = context.get("tags")
|
||||||
|
if not isinstance(tags, list):
|
||||||
|
tags = list(tags) if isinstance(tags, (set, tuple)) else ([] if tags is None else [tags])
|
||||||
|
return {
|
||||||
|
"name": context.get("name"),
|
||||||
|
"type": context.get("type"),
|
||||||
|
"nsfw": bool(context.get("is_nsfw", context.get("nsfw", False))),
|
||||||
|
"description": context.get("description"),
|
||||||
|
"tags": tags,
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _build_creator_info(context: Dict) -> Dict:
|
||||||
|
username = context.get("creator_username") or context.get("username") or ""
|
||||||
|
image = context.get("creator_image") or context.get("creator_avatar") or ""
|
||||||
|
creator: Dict[str, Optional[str]] = {
|
||||||
|
"username": username,
|
||||||
|
"image": image,
|
||||||
|
}
|
||||||
|
if context.get("creator_name"):
|
||||||
|
creator["name"] = context["creator_name"]
|
||||||
|
if context.get("creator_url"):
|
||||||
|
creator["url"] = context["creator_url"]
|
||||||
|
return creator
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _transform_file_entry(file_data: Dict) -> Dict:
|
||||||
|
mirrors = file_data.get("mirrors") or []
|
||||||
|
if not isinstance(mirrors, list):
|
||||||
|
mirrors = [mirrors]
|
||||||
|
available_mirror = next(
|
||||||
|
(mirror for mirror in mirrors if isinstance(mirror, dict) and mirror.get("deletedAt") is None),
|
||||||
|
None
|
||||||
|
)
|
||||||
|
download_url = file_data.get("downloadUrl")
|
||||||
|
if not download_url and available_mirror:
|
||||||
|
download_url = available_mirror.get("url")
|
||||||
|
name = file_data.get("name")
|
||||||
|
if not name and available_mirror:
|
||||||
|
name = available_mirror.get("filename")
|
||||||
|
|
||||||
|
transformed: Dict = {
|
||||||
|
"id": file_data.get("id"),
|
||||||
|
"sizeKB": file_data.get("sizeKB"),
|
||||||
|
"name": name,
|
||||||
|
"type": file_data.get("type"),
|
||||||
|
"downloadUrl": download_url,
|
||||||
|
"primary": True,
|
||||||
|
# TODO: for some reason is_primary is false in CivArchive response, need to figure this out,
|
||||||
|
# "primary": bool(file_data.get("is_primary", file_data.get("primary", False))),
|
||||||
|
"mirrors": mirrors,
|
||||||
|
}
|
||||||
|
|
||||||
|
sha256 = file_data.get("sha256")
|
||||||
|
if sha256:
|
||||||
|
transformed["hashes"] = {"SHA256": str(sha256).upper()}
|
||||||
|
elif isinstance(file_data.get("hashes"), dict):
|
||||||
|
transformed["hashes"] = file_data["hashes"]
|
||||||
|
|
||||||
|
if "metadata" in file_data:
|
||||||
|
transformed["metadata"] = file_data["metadata"]
|
||||||
|
|
||||||
|
if file_data.get("modelVersionId") is not None:
|
||||||
|
transformed["modelVersionId"] = file_data.get("modelVersionId")
|
||||||
|
elif file_data.get("model_version_id") is not None:
|
||||||
|
transformed["modelVersionId"] = file_data.get("model_version_id")
|
||||||
|
|
||||||
|
if file_data.get("modelId") is not None:
|
||||||
|
transformed["modelId"] = file_data.get("modelId")
|
||||||
|
elif file_data.get("model_id") is not None:
|
||||||
|
transformed["modelId"] = file_data.get("model_id")
|
||||||
|
|
||||||
|
return transformed
|
||||||
|
|
||||||
|
def _transform_files(
|
||||||
|
self,
|
||||||
|
files: Optional[List[Dict]],
|
||||||
|
fallback_files: Optional[List[Dict]] = None
|
||||||
|
) -> List[Dict]:
|
||||||
|
candidates: List[Dict] = []
|
||||||
|
if isinstance(files, list) and files:
|
||||||
|
candidates = files
|
||||||
|
elif isinstance(fallback_files, list):
|
||||||
|
candidates = fallback_files
|
||||||
|
|
||||||
|
transformed_files: List[Dict] = []
|
||||||
|
for file_data in candidates:
|
||||||
|
if isinstance(file_data, dict):
|
||||||
|
transformed_files.append(self._transform_file_entry(file_data))
|
||||||
|
return transformed_files
|
||||||
|
|
||||||
|
def _transform_version(
|
||||||
|
self,
|
||||||
|
context: Dict,
|
||||||
|
version: Dict,
|
||||||
|
fallback_files: Optional[List[Dict]] = None
|
||||||
|
) -> Optional[Dict]:
|
||||||
|
if not version:
|
||||||
|
return None
|
||||||
|
|
||||||
|
version_copy = deepcopy(version)
|
||||||
|
version_copy.pop("model", None)
|
||||||
|
version_copy.pop("creator", None)
|
||||||
|
|
||||||
|
if "trigger" in version_copy:
|
||||||
|
triggers = version_copy.pop("trigger")
|
||||||
|
if isinstance(triggers, list):
|
||||||
|
version_copy["trainedWords"] = triggers
|
||||||
|
elif triggers is None:
|
||||||
|
version_copy["trainedWords"] = []
|
||||||
|
else:
|
||||||
|
version_copy["trainedWords"] = [triggers]
|
||||||
|
|
||||||
|
if "trainedWords" in version_copy and isinstance(version_copy["trainedWords"], str):
|
||||||
|
version_copy["trainedWords"] = [version_copy["trainedWords"]]
|
||||||
|
|
||||||
|
if "nsfw_level" in version_copy:
|
||||||
|
version_copy["nsfwLevel"] = version_copy.pop("nsfw_level")
|
||||||
|
elif "nsfwLevel" not in version_copy and context.get("nsfw_level") is not None:
|
||||||
|
version_copy["nsfwLevel"] = context.get("nsfw_level")
|
||||||
|
|
||||||
|
stats_keys = ["downloadCount", "ratingCount", "rating"]
|
||||||
|
stats = {key: version_copy.pop(key) for key in stats_keys if key in version_copy}
|
||||||
|
if stats:
|
||||||
|
version_copy["stats"] = stats
|
||||||
|
|
||||||
|
version_copy["files"] = self._transform_files(version_copy.get("files"), fallback_files)
|
||||||
|
version_copy["images"] = self._ensure_list(version_copy.get("images"))
|
||||||
|
|
||||||
|
version_copy["model"] = self._build_model_info(context)
|
||||||
|
version_copy["creator"] = self._build_creator_info(context)
|
||||||
|
|
||||||
|
version_copy["source"] = "civarchive"
|
||||||
|
version_copy["is_deleted"] = bool(context.get("deletedAt")) or bool(version.get("deletedAt"))
|
||||||
|
|
||||||
|
return version_copy
|
||||||
|
|
||||||
|
async def _resolve_version_from_files(self, payload: Dict) -> Optional[Dict]:
|
||||||
|
"""Fallback to fetch version data when only file metadata is available"""
|
||||||
|
data = self._normalize_payload(payload)
|
||||||
|
files = data.get("files") or payload.get("files") or []
|
||||||
|
if not isinstance(files, list):
|
||||||
|
files = [files]
|
||||||
|
for file_data in files:
|
||||||
|
if not isinstance(file_data, dict):
|
||||||
|
continue
|
||||||
|
model_id = file_data.get("model_id") or file_data.get("modelId")
|
||||||
|
version_id = file_data.get("model_version_id") or file_data.get("modelVersionId")
|
||||||
|
if model_id is None or version_id is None:
|
||||||
|
continue
|
||||||
|
resolved = await self.get_model_version(model_id, version_id)
|
||||||
|
if resolved:
|
||||||
|
return resolved
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]:
|
||||||
|
"""Find model by SHA256 hash value using CivArchive API"""
|
||||||
|
try:
|
||||||
|
payload, error = await self._request_json(f"/sha256/{model_hash.lower()}")
|
||||||
|
if error:
|
||||||
|
if "not found" in error.lower():
|
||||||
|
return None, "Model not found"
|
||||||
|
return None, error
|
||||||
|
|
||||||
|
context, version_data, fallback_files = self._split_context(payload)
|
||||||
|
transformed = self._transform_version(context, version_data, fallback_files)
|
||||||
|
if transformed:
|
||||||
|
return transformed, None
|
||||||
|
|
||||||
|
resolved = await self._resolve_version_from_files(payload)
|
||||||
|
if resolved:
|
||||||
|
return resolved, None
|
||||||
|
|
||||||
|
logger.error("Error fetching version of CivArchive model by hash %s", model_hash[:10])
|
||||||
|
return None, "No version data found"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error fetching CivArchive model by hash {model_hash[:10]}: {e}")
|
||||||
|
return None, str(e)
|
||||||
|
|
||||||
|
async def get_model_versions(self, model_id: str) -> Optional[Dict]:
|
||||||
|
"""Get all versions of a model using CivArchive API"""
|
||||||
|
try:
|
||||||
|
payload, error = await self._request_json(f"/models/{model_id}")
|
||||||
|
if error or payload is None:
|
||||||
|
if error and "not found" in error.lower():
|
||||||
|
return None
|
||||||
|
logger.error(f"Error fetching CivArchive model versions for {model_id}: {error}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
data = self._normalize_payload(payload)
|
||||||
|
context, version_data, fallback_files = self._split_context(payload)
|
||||||
|
|
||||||
|
versions_meta = data.get("versions") or []
|
||||||
|
transformed_versions: List[Dict] = []
|
||||||
|
for meta in versions_meta:
|
||||||
|
if not isinstance(meta, dict):
|
||||||
|
continue
|
||||||
|
version_id = meta.get("id")
|
||||||
|
if version_id is None:
|
||||||
|
continue
|
||||||
|
target_model_id = meta.get("modelId") or model_id
|
||||||
|
version = await self.get_model_version(target_model_id, version_id)
|
||||||
|
if version:
|
||||||
|
transformed_versions.append(version)
|
||||||
|
|
||||||
|
# Ensure the primary version is included even if versions list was empty
|
||||||
|
primary_version = self._transform_version(context, version_data, fallback_files)
|
||||||
|
if primary_version:
|
||||||
|
transformed_versions.insert(0, primary_version)
|
||||||
|
|
||||||
|
ordered_versions: List[Dict] = []
|
||||||
|
seen_ids = set()
|
||||||
|
for version in transformed_versions:
|
||||||
|
version_id = version.get("id")
|
||||||
|
if version_id in seen_ids:
|
||||||
|
continue
|
||||||
|
seen_ids.add(version_id)
|
||||||
|
ordered_versions.append(version)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"modelVersions": ordered_versions,
|
||||||
|
"type": context.get("type", ""),
|
||||||
|
"name": context.get("name", ""),
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error fetching CivArchive model versions for {model_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_model_version(self, model_id: int = None, version_id: int = None) -> Optional[Dict]:
|
||||||
|
"""Get specific model version using CivArchive API
|
||||||
|
|
||||||
|
Args:
|
||||||
|
model_id: The model ID (required)
|
||||||
|
version_id: Optional specific version ID to filter to
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional[Dict]: The model version data or None if not found
|
||||||
|
"""
|
||||||
|
if model_id is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
params = {"modelVersionId": version_id} if version_id is not None else None
|
||||||
|
payload, error = await self._request_json(f"/models/{model_id}", params=params)
|
||||||
|
if error or payload is None:
|
||||||
|
if error and "not found" in error.lower():
|
||||||
|
return None
|
||||||
|
logger.error(f"Error fetching CivArchive model version via API {model_id}/{version_id}: {error}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
context, version_data, fallback_files = self._split_context(payload)
|
||||||
|
|
||||||
|
if not version_data:
|
||||||
|
return await self._resolve_version_from_files(payload)
|
||||||
|
|
||||||
|
if version_id is not None:
|
||||||
|
raw_id = version_data.get("id")
|
||||||
|
if raw_id != version_id:
|
||||||
|
logger.warning(
|
||||||
|
"Requested version %s doesn't match default version %s for model %s",
|
||||||
|
version_id,
|
||||||
|
raw_id,
|
||||||
|
model_id,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
actual_model_id = version_data.get("modelId")
|
||||||
|
context_model_id = context.get("id")
|
||||||
|
# CivArchive can respond with data for a different model id while already
|
||||||
|
# returning the fully resolved model context. Only follow the redirect when
|
||||||
|
# the context itself still points to the original (wrong) model.
|
||||||
|
if (
|
||||||
|
actual_model_id is not None
|
||||||
|
and str(actual_model_id) != str(model_id)
|
||||||
|
and (context_model_id is None or str(context_model_id) != str(actual_model_id))
|
||||||
|
):
|
||||||
|
return await self.get_model_version(actual_model_id, version_id)
|
||||||
|
|
||||||
|
return self._transform_version(context, version_data, fallback_files)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error fetching CivArchive model version via API {model_id}/{version_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]:
|
||||||
|
""" Fetch model version metadata using a known bogus model lookup
|
||||||
|
CivArchive lacks a direct version lookup API, this uses a workaround (which we handle in the main model request now)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
version_id: The model version ID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple[Optional[Dict], Optional[str]]: (version_data, error_message)
|
||||||
|
"""
|
||||||
|
version = await self.get_model_version(1, version_id)
|
||||||
|
if version is None:
|
||||||
|
return None, "Model not found"
|
||||||
|
return version, None
|
||||||
|
|
||||||
|
async def get_model_by_url(self, url) -> Optional[Dict]:
|
||||||
|
"""Get specific model version by parsing CivArchive HTML page (legacy method)
|
||||||
|
|
||||||
|
This is the original HTML scraping implementation, kept for reference and new sites added not in api.
|
||||||
|
The primary get_model_version() now uses the API instead.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Construct CivArchive URL
|
||||||
|
url = f"https://civarchive.com/{url}"
|
||||||
|
downloader = await get_downloader()
|
||||||
|
session = await downloader.session
|
||||||
|
async with session.get(url) as response:
|
||||||
|
if response.status != 200:
|
||||||
|
return None
|
||||||
|
|
||||||
|
html_content = await response.text()
|
||||||
|
|
||||||
|
# Parse HTML to extract JSON data
|
||||||
|
soup_parser = _require_beautifulsoup()
|
||||||
|
soup = soup_parser(html_content, 'html.parser')
|
||||||
|
script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'})
|
||||||
|
|
||||||
|
if not script_tag:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Parse JSON content
|
||||||
|
json_data = json.loads(script_tag.string)
|
||||||
|
model_data = json_data.get('props', {}).get('pageProps', {}).get('model')
|
||||||
|
|
||||||
|
if not model_data or 'version' not in model_data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Extract version data as base
|
||||||
|
version = model_data['version'].copy()
|
||||||
|
|
||||||
|
# Restructure stats
|
||||||
|
if 'downloadCount' in version and 'ratingCount' in version and 'rating' in version:
|
||||||
|
version['stats'] = {
|
||||||
|
'downloadCount': version.pop('downloadCount'),
|
||||||
|
'ratingCount': version.pop('ratingCount'),
|
||||||
|
'rating': version.pop('rating')
|
||||||
|
}
|
||||||
|
|
||||||
|
# Rename trigger to trainedWords
|
||||||
|
if 'trigger' in version:
|
||||||
|
version['trainedWords'] = version.pop('trigger')
|
||||||
|
|
||||||
|
# Transform files data to expected format
|
||||||
|
if 'files' in version:
|
||||||
|
transformed_files = []
|
||||||
|
for file_data in version['files']:
|
||||||
|
# Find first available mirror (deletedAt is null)
|
||||||
|
available_mirror = None
|
||||||
|
for mirror in file_data.get('mirrors', []):
|
||||||
|
if mirror.get('deletedAt') is None:
|
||||||
|
available_mirror = mirror
|
||||||
|
break
|
||||||
|
|
||||||
|
# Create transformed file entry
|
||||||
|
transformed_file = {
|
||||||
|
'id': file_data.get('id'),
|
||||||
|
'sizeKB': file_data.get('sizeKB'),
|
||||||
|
'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'),
|
||||||
|
'type': file_data.get('type'),
|
||||||
|
'downloadUrl': available_mirror.get('url') if available_mirror else None,
|
||||||
|
'primary': file_data.get('is_primary', False),
|
||||||
|
'mirrors': file_data.get('mirrors', [])
|
||||||
|
}
|
||||||
|
|
||||||
|
# Transform hash format
|
||||||
|
if 'sha256' in file_data:
|
||||||
|
transformed_file['hashes'] = {
|
||||||
|
'SHA256': file_data['sha256'].upper()
|
||||||
|
}
|
||||||
|
|
||||||
|
transformed_files.append(transformed_file)
|
||||||
|
|
||||||
|
version['files'] = transformed_files
|
||||||
|
|
||||||
|
# Add model information
|
||||||
|
version['model'] = {
|
||||||
|
'name': model_data.get('name'),
|
||||||
|
'type': model_data.get('type'),
|
||||||
|
'nsfw': model_data.get('is_nsfw', False),
|
||||||
|
'description': model_data.get('description'),
|
||||||
|
'tags': model_data.get('tags', [])
|
||||||
|
}
|
||||||
|
|
||||||
|
version['creator'] = {
|
||||||
|
'username': model_data.get('username'),
|
||||||
|
'image': ''
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add source identifier
|
||||||
|
version['source'] = 'civarchive'
|
||||||
|
version['is_deleted'] = json_data.get('query', {}).get('is_deleted', False)
|
||||||
|
|
||||||
|
return version
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error fetching CivArchive model version (scraping) {url}: {e}")
|
||||||
|
return None
|
||||||
@@ -294,7 +294,7 @@ class DownloadManager:
|
|||||||
await progress_callback(0)
|
await progress_callback(0)
|
||||||
|
|
||||||
# 2. Get file information
|
# 2. Get file information
|
||||||
file_info = next((f for f in version_info.get('files', []) if f.get('primary')), None)
|
file_info = next((f for f in version_info.get('files', []) if f.get('primary') and f.get('type') == 'Model'), None)
|
||||||
if not file_info:
|
if not file_info:
|
||||||
return {'success': False, 'error': 'No primary file found in metadata'}
|
return {'success': False, 'error': 'No primary file found in metadata'}
|
||||||
mirrors = file_info.get('mirrors') or []
|
mirrors = file_info.get('mirrors') or []
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from .model_metadata_provider import (
|
|||||||
ModelMetadataProviderManager,
|
ModelMetadataProviderManager,
|
||||||
SQLiteModelMetadataProvider,
|
SQLiteModelMetadataProvider,
|
||||||
CivitaiModelMetadataProvider,
|
CivitaiModelMetadataProvider,
|
||||||
|
CivArchiveModelMetadataProvider,
|
||||||
FallbackMetadataProvider
|
FallbackMetadataProvider
|
||||||
)
|
)
|
||||||
from .settings_manager import get_settings_manager
|
from .settings_manager import get_settings_manager
|
||||||
@@ -54,26 +55,27 @@ async def initialize_metadata_providers():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to initialize Civitai API metadata provider: {e}")
|
logger.error(f"Failed to initialize Civitai API metadata provider: {e}")
|
||||||
|
|
||||||
# Register CivArchive provider, but do NOT add to fallback providers
|
# Register CivArchive provider, and all add to fallback providers
|
||||||
try:
|
try:
|
||||||
from .model_metadata_provider import CivArchiveModelMetadataProvider
|
civarchive_client = await ServiceRegistry.get_civarchive_client()
|
||||||
civarchive_provider = CivArchiveModelMetadataProvider()
|
civarchive_provider = CivArchiveModelMetadataProvider(civarchive_client)
|
||||||
provider_manager.register_provider('civarchive', civarchive_provider)
|
provider_manager.register_provider('civarchive_api', civarchive_provider)
|
||||||
logger.debug("CivArchive metadata provider registered (not included in fallback)")
|
providers.append(('civarchive_api', civarchive_provider))
|
||||||
|
logger.debug("CivArchive metadata provider registered (also included in fallback)")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to initialize CivArchive metadata provider: {e}")
|
logger.error(f"Failed to initialize CivArchive metadata provider: {e}")
|
||||||
|
|
||||||
# Set up fallback provider based on available providers
|
# Set up fallback provider based on available providers
|
||||||
if len(providers) > 1:
|
if len(providers) > 1:
|
||||||
# Always use Civitai API first, then Archive DB
|
# Always use Civitai API (it has better metadata), then CivArchive API, then Archive DB
|
||||||
ordered_providers = []
|
ordered_providers = []
|
||||||
ordered_providers.extend([p[1] for p in providers if p[0] == 'civitai_api'])
|
ordered_providers.extend([p[1] for p in providers if p[0] == 'civitai_api'])
|
||||||
|
ordered_providers.extend([p[1] for p in providers if p[0] == 'civarchive_api'])
|
||||||
ordered_providers.extend([p[1] for p in providers if p[0] == 'sqlite'])
|
ordered_providers.extend([p[1] for p in providers if p[0] == 'sqlite'])
|
||||||
|
|
||||||
if ordered_providers:
|
if ordered_providers:
|
||||||
fallback_provider = FallbackMetadataProvider(ordered_providers)
|
fallback_provider = FallbackMetadataProvider(ordered_providers)
|
||||||
provider_manager.register_provider('fallback', fallback_provider, is_default=True)
|
provider_manager.register_provider('fallback', fallback_provider, is_default=True)
|
||||||
logger.debug(f"Fallback metadata provider registered with {len(ordered_providers)} providers, Civitai API first")
|
|
||||||
elif len(providers) == 1:
|
elif len(providers) == 1:
|
||||||
# Only one provider available, set it as default
|
# Only one provider available, set it as default
|
||||||
provider_name, provider = providers[0]
|
provider_name, provider = providers[0]
|
||||||
|
|||||||
@@ -167,41 +167,101 @@ class MetadataSyncService:
|
|||||||
|
|
||||||
metadata_path = os.path.splitext(file_path)[0] + ".metadata.json"
|
metadata_path = os.path.splitext(file_path)[0] + ".metadata.json"
|
||||||
enable_archive = self._settings.get("enable_metadata_archive_db", False)
|
enable_archive = self._settings.get("enable_metadata_archive_db", False)
|
||||||
|
previous_source = model_data.get("metadata_source") or (model_data.get("civitai") or {}).get("source")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
provider_attempts: list[tuple[Optional[str], MetadataProviderProtocol]] = []
|
||||||
|
sqlite_attempted = False
|
||||||
|
|
||||||
if model_data.get("civitai_deleted") is True:
|
if model_data.get("civitai_deleted") is True:
|
||||||
if not enable_archive or model_data.get("db_checked") is True:
|
if previous_source in (None, "civarchive"):
|
||||||
|
try:
|
||||||
|
provider_attempts.append(("civarchive_api", await self._get_provider("civarchive_api")))
|
||||||
|
except Exception as exc: # pragma: no cover - provider resolution fault
|
||||||
|
logger.debug("Unable to resolve civarchive provider: %s", exc)
|
||||||
|
|
||||||
|
if enable_archive and model_data.get("db_checked") is not True:
|
||||||
|
try:
|
||||||
|
provider_attempts.append(("sqlite", await self._get_provider("sqlite")))
|
||||||
|
except Exception as exc: # pragma: no cover - provider resolution fault
|
||||||
|
logger.debug("Unable to resolve sqlite provider: %s", exc)
|
||||||
|
|
||||||
|
if not provider_attempts:
|
||||||
if not enable_archive:
|
if not enable_archive:
|
||||||
error_msg = "CivitAI model is deleted and metadata archive DB is not enabled"
|
error_msg = "CivitAI model is deleted and metadata archive DB is not enabled"
|
||||||
else:
|
elif model_data.get("db_checked") is True:
|
||||||
error_msg = "CivitAI model is deleted and not found in metadata archive DB"
|
error_msg = "CivitAI model is deleted and not found in metadata archive DB"
|
||||||
return (False, error_msg)
|
else:
|
||||||
metadata_provider = await self._get_provider("sqlite")
|
error_msg = "CivitAI model is deleted and no archive provider is available"
|
||||||
|
return False, error_msg
|
||||||
else:
|
else:
|
||||||
metadata_provider = await self._get_default_provider()
|
provider_attempts.append((None, await self._get_default_provider()))
|
||||||
|
|
||||||
civitai_metadata, error = await metadata_provider.get_model_by_hash(sha256)
|
civitai_metadata: Optional[Dict[str, Any]] = None
|
||||||
|
metadata_provider: Optional[MetadataProviderProtocol] = None
|
||||||
|
provider_used: Optional[str] = None
|
||||||
|
last_error: Optional[str] = None
|
||||||
|
|
||||||
if not civitai_metadata:
|
for provider_name, provider in provider_attempts:
|
||||||
if error == "Model not found":
|
try:
|
||||||
|
civitai_metadata_candidate, error = await provider.get_model_by_hash(sha256)
|
||||||
|
except Exception as exc: # pragma: no cover - defensive logging
|
||||||
|
logger.error("Provider %s failed for hash %s: %s", provider_name, sha256, exc)
|
||||||
|
civitai_metadata_candidate, error = None, str(exc)
|
||||||
|
|
||||||
|
if provider_name == "sqlite":
|
||||||
|
sqlite_attempted = True
|
||||||
|
|
||||||
|
if civitai_metadata_candidate:
|
||||||
|
civitai_metadata = civitai_metadata_candidate
|
||||||
|
metadata_provider = provider
|
||||||
|
provider_used = provider_name
|
||||||
|
break
|
||||||
|
|
||||||
|
last_error = error or last_error
|
||||||
|
|
||||||
|
if civitai_metadata is None or metadata_provider is None:
|
||||||
|
if sqlite_attempted:
|
||||||
|
model_data["db_checked"] = True
|
||||||
|
|
||||||
|
if last_error == "Model not found":
|
||||||
model_data["from_civitai"] = False
|
model_data["from_civitai"] = False
|
||||||
model_data["civitai_deleted"] = True
|
model_data["civitai_deleted"] = True
|
||||||
model_data["db_checked"] = enable_archive
|
model_data["db_checked"] = sqlite_attempted or (enable_archive and model_data.get("db_checked", False))
|
||||||
model_data["last_checked_at"] = datetime.now().timestamp()
|
model_data["last_checked_at"] = datetime.now().timestamp()
|
||||||
|
|
||||||
data_to_save = model_data.copy()
|
data_to_save = model_data.copy()
|
||||||
data_to_save.pop("folder", None)
|
data_to_save.pop("folder", None)
|
||||||
await self._metadata_manager.save_metadata(file_path, data_to_save)
|
await self._metadata_manager.save_metadata(file_path, data_to_save)
|
||||||
|
|
||||||
|
default_error = (
|
||||||
|
"CivitAI model is deleted and metadata archive DB is not enabled"
|
||||||
|
if model_data.get("civitai_deleted") and not enable_archive
|
||||||
|
else "CivitAI model is deleted and not found in metadata archive DB"
|
||||||
|
if model_data.get("civitai_deleted") and (model_data.get("db_checked") is True or sqlite_attempted)
|
||||||
|
else "No provider returned metadata"
|
||||||
|
)
|
||||||
|
|
||||||
error_msg = (
|
error_msg = (
|
||||||
f"Error fetching metadata: {error} (model_name={model_data.get('model_name', '')})"
|
f"Error fetching metadata: {last_error or default_error} "
|
||||||
|
f"(model_name={model_data.get('model_name', '')})"
|
||||||
)
|
)
|
||||||
logger.error(error_msg)
|
logger.error(error_msg)
|
||||||
return False, error_msg
|
return False, error_msg
|
||||||
|
|
||||||
model_data["from_civitai"] = True
|
model_data["from_civitai"] = True
|
||||||
model_data["civitai_deleted"] = civitai_metadata.get("source") == "archive_db"
|
model_data["civitai_deleted"] = civitai_metadata.get("source") == "archive_db" or civitai_metadata.get("source") == "civarchive"
|
||||||
model_data["db_checked"] = enable_archive
|
model_data["db_checked"] = enable_archive and (
|
||||||
|
civitai_metadata.get("source") == "archive_db" or sqlite_attempted
|
||||||
|
)
|
||||||
|
source = civitai_metadata.get("source") or "civitai_api"
|
||||||
|
if source == "api":
|
||||||
|
source = "civitai_api"
|
||||||
|
elif provider_used == "civarchive_api" and source != "civarchive":
|
||||||
|
source = "civarchive"
|
||||||
|
elif provider_used == "sqlite":
|
||||||
|
source = "archive_db"
|
||||||
|
model_data["metadata_source"] = source
|
||||||
model_data["last_checked_at"] = datetime.now().timestamp()
|
model_data["last_checked_at"] = datetime.now().timestamp()
|
||||||
|
|
||||||
local_metadata = model_data.copy()
|
local_metadata = model_data.copy()
|
||||||
|
|||||||
@@ -88,122 +88,22 @@ class CivitaiModelMetadataProvider(ModelMetadataProvider):
|
|||||||
return await self.client.get_user_models(username)
|
return await self.client.get_user_models(username)
|
||||||
|
|
||||||
class CivArchiveModelMetadataProvider(ModelMetadataProvider):
|
class CivArchiveModelMetadataProvider(ModelMetadataProvider):
|
||||||
"""Provider that uses CivArchive HTML page parsing for metadata"""
|
"""Provider that uses CivArchive API for metadata"""
|
||||||
|
|
||||||
|
def __init__(self, civarchive_client):
|
||||||
|
self.client = civarchive_client
|
||||||
|
|
||||||
async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]:
|
async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]:
|
||||||
"""Not supported by CivArchive provider"""
|
return await self.client.get_model_by_hash(model_hash)
|
||||||
return None, "CivArchive provider does not support hash lookup"
|
|
||||||
|
|
||||||
async def get_model_versions(self, model_id: str) -> Optional[Dict]:
|
async def get_model_versions(self, model_id: str) -> Optional[Dict]:
|
||||||
"""Not supported by CivArchive provider"""
|
return await self.client.get_model_versions(model_id)
|
||||||
return None
|
|
||||||
|
|
||||||
async def get_model_version(self, model_id: int = None, version_id: int = None) -> Optional[Dict]:
|
async def get_model_version(self, model_id: int = None, version_id: int = None) -> Optional[Dict]:
|
||||||
"""Get specific model version by parsing CivArchive HTML page"""
|
return await self.client.get_model_version(model_id, version_id)
|
||||||
if model_id is None or version_id is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Construct CivArchive URL
|
|
||||||
url = f"https://civarchive.com/models/{model_id}?modelVersionId={version_id}"
|
|
||||||
|
|
||||||
downloader = await get_downloader()
|
|
||||||
session = await downloader.session
|
|
||||||
async with session.get(url) as response:
|
|
||||||
if response.status != 200:
|
|
||||||
return None
|
|
||||||
|
|
||||||
html_content = await response.text()
|
|
||||||
|
|
||||||
# Parse HTML to extract JSON data
|
|
||||||
soup_parser = _require_beautifulsoup()
|
|
||||||
soup = soup_parser(html_content, 'html.parser')
|
|
||||||
script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'})
|
|
||||||
|
|
||||||
if not script_tag:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Parse JSON content
|
|
||||||
json_data = json.loads(script_tag.string)
|
|
||||||
model_data = json_data.get('props', {}).get('pageProps', {}).get('model')
|
|
||||||
|
|
||||||
if not model_data or 'version' not in model_data:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Extract version data as base
|
|
||||||
version = model_data['version'].copy()
|
|
||||||
|
|
||||||
# Restructure stats
|
|
||||||
if 'downloadCount' in version and 'ratingCount' in version and 'rating' in version:
|
|
||||||
version['stats'] = {
|
|
||||||
'downloadCount': version.pop('downloadCount'),
|
|
||||||
'ratingCount': version.pop('ratingCount'),
|
|
||||||
'rating': version.pop('rating')
|
|
||||||
}
|
|
||||||
|
|
||||||
# Rename trigger to trainedWords
|
|
||||||
if 'trigger' in version:
|
|
||||||
version['trainedWords'] = version.pop('trigger')
|
|
||||||
|
|
||||||
# Transform files data to expected format
|
|
||||||
if 'files' in version:
|
|
||||||
transformed_files = []
|
|
||||||
for file_data in version['files']:
|
|
||||||
# Find first available mirror (deletedAt is null)
|
|
||||||
available_mirror = None
|
|
||||||
for mirror in file_data.get('mirrors', []):
|
|
||||||
if mirror.get('deletedAt') is None:
|
|
||||||
available_mirror = mirror
|
|
||||||
break
|
|
||||||
|
|
||||||
# Create transformed file entry
|
|
||||||
transformed_file = {
|
|
||||||
'id': file_data.get('id'),
|
|
||||||
'sizeKB': file_data.get('sizeKB'),
|
|
||||||
'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'),
|
|
||||||
'type': file_data.get('type'),
|
|
||||||
'downloadUrl': available_mirror.get('url') if available_mirror else None,
|
|
||||||
'primary': True,
|
|
||||||
'mirrors': file_data.get('mirrors', [])
|
|
||||||
}
|
|
||||||
|
|
||||||
# Transform hash format
|
|
||||||
if 'sha256' in file_data:
|
|
||||||
transformed_file['hashes'] = {
|
|
||||||
'SHA256': file_data['sha256'].upper()
|
|
||||||
}
|
|
||||||
|
|
||||||
transformed_files.append(transformed_file)
|
|
||||||
|
|
||||||
version['files'] = transformed_files
|
|
||||||
|
|
||||||
# Add model information
|
|
||||||
version['model'] = {
|
|
||||||
'name': model_data.get('name'),
|
|
||||||
'type': model_data.get('type'),
|
|
||||||
'nsfw': model_data.get('is_nsfw', False),
|
|
||||||
'description': model_data.get('description'),
|
|
||||||
'tags': model_data.get('tags', [])
|
|
||||||
}
|
|
||||||
|
|
||||||
version['creator'] = {
|
|
||||||
'username': model_data.get('username'),
|
|
||||||
'image': ''
|
|
||||||
}
|
|
||||||
|
|
||||||
# Add source identifier
|
|
||||||
version['source'] = 'civarchive'
|
|
||||||
version['is_deleted'] = json_data.get('query', {}).get('is_deleted', False)
|
|
||||||
|
|
||||||
return version
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error fetching CivArchive model version {model_id}/{version_id}: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]:
|
async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]:
|
||||||
"""Not supported by CivArchive provider - requires both model_id and version_id"""
|
return await self.client.get_model_version_info(version_id)
|
||||||
return None, "CivArchive provider requires both model_id and version_id"
|
|
||||||
|
|
||||||
async def get_user_models(self, username: str) -> Optional[List[Dict]]:
|
async def get_user_models(self, username: str) -> Optional[List[Dict]]:
|
||||||
"""Not supported by CivArchive provider"""
|
"""Not supported by CivArchive provider"""
|
||||||
|
|||||||
@@ -144,6 +144,27 @@ class ServiceRegistry:
|
|||||||
cls._services[service_name] = client
|
cls._services[service_name] = client
|
||||||
logger.debug(f"Created and registered {service_name}")
|
logger.debug(f"Created and registered {service_name}")
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def get_civarchive_client(cls):
|
||||||
|
"""Get or create CivArchive client instance"""
|
||||||
|
service_name = "civarchive_client"
|
||||||
|
|
||||||
|
if service_name in cls._services:
|
||||||
|
return cls._services[service_name]
|
||||||
|
|
||||||
|
async with cls._get_lock(service_name):
|
||||||
|
# Double-check after acquiring lock
|
||||||
|
if service_name in cls._services:
|
||||||
|
return cls._services[service_name]
|
||||||
|
|
||||||
|
# Import here to avoid circular imports
|
||||||
|
from .civarchive_client import CivArchiveClient
|
||||||
|
|
||||||
|
client = await CivArchiveClient.get_instance()
|
||||||
|
cls._services[service_name] = client
|
||||||
|
logger.debug(f"Created and registered {service_name}")
|
||||||
|
return client
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def get_download_manager(cls):
|
async def get_download_manager(cls):
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ class BaseModelMetadata:
|
|||||||
favorite: bool = False # Whether the model is a favorite
|
favorite: bool = False # Whether the model is a favorite
|
||||||
exclude: bool = False # Whether to exclude this model from the cache
|
exclude: bool = False # Whether to exclude this model from the cache
|
||||||
db_checked: bool = False # Whether checked in archive DB
|
db_checked: bool = False # Whether checked in archive DB
|
||||||
|
metadata_source: Optional[str] = None # Last provider that supplied metadata
|
||||||
last_checked_at: float = 0 # Last checked timestamp
|
last_checked_at: float = 0 # Last checked timestamp
|
||||||
_unknown_fields: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False) # Store unknown fields
|
_unknown_fields: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False) # Store unknown fields
|
||||||
|
|
||||||
|
|||||||
134
refs/civarc_api_model_data.json
Normal file
134
refs/civarc_api_model_data.json
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
{
|
||||||
|
"id": 1746460,
|
||||||
|
"name": "Mixplin Style [Illustrious]",
|
||||||
|
"type": "LORA",
|
||||||
|
"description": "description",
|
||||||
|
"username": "Ty_Lee",
|
||||||
|
"downloadCount": 4207,
|
||||||
|
"favoriteCount": 0,
|
||||||
|
"commentCount": 8,
|
||||||
|
"ratingCount": 0,
|
||||||
|
"rating": 0,
|
||||||
|
"is_nsfw": true,
|
||||||
|
"nsfw_level": 31,
|
||||||
|
"createdAt": "2025-07-06T01:51:42.859Z",
|
||||||
|
"updatedAt": "2025-10-10T23:15:26.714Z",
|
||||||
|
"deletedAt": null,
|
||||||
|
"tags": [
|
||||||
|
"art",
|
||||||
|
"style",
|
||||||
|
"artist style",
|
||||||
|
"styles",
|
||||||
|
"mixplin",
|
||||||
|
"artiststyle"
|
||||||
|
],
|
||||||
|
"creator_id": "Ty_Lee",
|
||||||
|
"creator_username": "Ty_Lee",
|
||||||
|
"creator_name": "Ty_Lee",
|
||||||
|
"creator_url": "/users/Ty_Lee",
|
||||||
|
"versions": [
|
||||||
|
{
|
||||||
|
"id": 2042594,
|
||||||
|
"name": "v2.0",
|
||||||
|
"href": "/models/1746460?modelVersionId=2042594"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 1976567,
|
||||||
|
"name": "v1.0",
|
||||||
|
"href": "/models/1746460?modelVersionId=1976567"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"version": {
|
||||||
|
"id": 1976567,
|
||||||
|
"modelId": 1746460,
|
||||||
|
"name": "v1.0",
|
||||||
|
"baseModel": "Illustrious",
|
||||||
|
"baseModelType": "Standard",
|
||||||
|
"description": null,
|
||||||
|
"downloadCount": 437,
|
||||||
|
"ratingCount": 0,
|
||||||
|
"rating": 0,
|
||||||
|
"is_nsfw": true,
|
||||||
|
"nsfw_level": 31,
|
||||||
|
"createdAt": "2025-07-05T10:17:28.716Z",
|
||||||
|
"updatedAt": "2025-10-10T23:15:26.756Z",
|
||||||
|
"deletedAt": null,
|
||||||
|
"files": [
|
||||||
|
{
|
||||||
|
"id": 1874043,
|
||||||
|
"name": "mxpln-illustrious-ty_lee.safetensors",
|
||||||
|
"type": "Model",
|
||||||
|
"sizeKB": 223124.37109375,
|
||||||
|
"downloadUrl": "https://civitai.com/api/download/models/1976567",
|
||||||
|
"modelId": 1746460,
|
||||||
|
"modelName": "Mixplin Style [Illustrious]",
|
||||||
|
"modelVersionId": 1976567,
|
||||||
|
"is_nsfw": true,
|
||||||
|
"nsfw_level": 31,
|
||||||
|
"sha256": "e2b7a280d6539556f23f380b3f71e4e22bc4524445c4c96526e117c6005c6ad3",
|
||||||
|
"createdAt": "2025-07-05T10:17:28.716Z",
|
||||||
|
"updatedAt": "2025-10-10T23:15:26.766Z",
|
||||||
|
"is_primary": false,
|
||||||
|
"mirrors": [
|
||||||
|
{
|
||||||
|
"filename": "mxpln-illustrious-ty_lee.safetensors",
|
||||||
|
"url": "https://civitai.com/api/download/models/1976567",
|
||||||
|
"source": "civitai",
|
||||||
|
"model_id": 1746460,
|
||||||
|
"model_version_id": 1976567,
|
||||||
|
"deletedAt": null,
|
||||||
|
"is_gated": false,
|
||||||
|
"is_paid": false
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"images": [
|
||||||
|
{
|
||||||
|
"id": 86403595,
|
||||||
|
"url": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=",
|
||||||
|
"nsfwLevel": 1,
|
||||||
|
"width": 1560,
|
||||||
|
"height": 2280,
|
||||||
|
"hash": "U7G8Zp0w02%IA6%N00-;D]-W~VNG0nMw-.IV",
|
||||||
|
"type": "image",
|
||||||
|
"minor": false,
|
||||||
|
"poi": false,
|
||||||
|
"hasMeta": true,
|
||||||
|
"hasPositivePrompt": true,
|
||||||
|
"onSite": false,
|
||||||
|
"remixOfId": null,
|
||||||
|
"image_url": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=",
|
||||||
|
"link": "https://genur.art/posts/86403595"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"trigger": [
|
||||||
|
"mxpln"
|
||||||
|
],
|
||||||
|
"allow_download": true,
|
||||||
|
"download_url": "/api/download/models/1976567",
|
||||||
|
"platform_url": "https://civitai.com/models/1746460?modelVersionId=1976567",
|
||||||
|
"civitai_model_id": 1746460,
|
||||||
|
"civitai_model_version_id": 1976567,
|
||||||
|
"href": "/models/1746460?modelVersionId=1976567",
|
||||||
|
"mirrors": [
|
||||||
|
{
|
||||||
|
"platform": "tensorart",
|
||||||
|
"href": "/tensorart/models/904473536033245448/versions/904473536033245448",
|
||||||
|
"platform_url": "https://tensor.art/models/904473536033245448",
|
||||||
|
"name": "Mixplin Style MXP",
|
||||||
|
"version_name": "Mixplin",
|
||||||
|
"id": "904473536033245448",
|
||||||
|
"version_id": "904473536033245448"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"platform": "civitai",
|
||||||
|
"platform_name": "CivitAI",
|
||||||
|
"meta": {
|
||||||
|
"title": "Mixplin Style [Illustrious] - v1.0 - CivitAI Archive",
|
||||||
|
"description": "Mixplin Style [Illustrious] v1.0 is a Illustrious LORA AI model created by Ty_Lee for generating images of art, style, artist style, styles, mixplin, artiststyle",
|
||||||
|
"image": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=",
|
||||||
|
"canonical": "https://civarchive.com/models/1746460?modelVersionId=1976567"
|
||||||
|
}
|
||||||
|
}
|
||||||
38
refs/target_version.json
Normal file
38
refs/target_version.json
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
{
|
||||||
|
"id": 2269146,
|
||||||
|
"modelId": 2004760,
|
||||||
|
"name": "v1.0 Illustrious",
|
||||||
|
"nsfwLevel": 1,
|
||||||
|
"trainedWords": ["PencilSketchDaal"],
|
||||||
|
"baseModel": "Illustrious",
|
||||||
|
"description": "<p>Illustrious. Your pencil may vary with your checkpoint. </p>",
|
||||||
|
"model": {
|
||||||
|
"name": "Pencil Sketch Anime",
|
||||||
|
"type": "LORA",
|
||||||
|
"nsfw": false,
|
||||||
|
"description": "description",
|
||||||
|
"tags": ["style"]
|
||||||
|
},
|
||||||
|
"files": [
|
||||||
|
{
|
||||||
|
"id": 2161260,
|
||||||
|
"sizeKB": 223106.37890625,
|
||||||
|
"name": "Pencil-Sketch-Illustrious.safetensors",
|
||||||
|
"type": "Model",
|
||||||
|
"hashes": {
|
||||||
|
"SHA256": "2C70479CD673B0FE056EAF4FD97C7F33A39F14853805431AC9AB84226ECE3B82"
|
||||||
|
},
|
||||||
|
"primary": true,
|
||||||
|
"downloadUrl": "https://civitai.com/api/download/models/2269146",
|
||||||
|
"mirrors": {}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"images": [
|
||||||
|
{},
|
||||||
|
{}
|
||||||
|
],
|
||||||
|
"creator": {
|
||||||
|
"username": "Daalis",
|
||||||
|
"image": "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/eb245b49-edc8-4ed6-ad7b-6d61eb8c51de/width=96/Daalis.jpeg"
|
||||||
|
}
|
||||||
|
}
|
||||||
239
tests/services/test_civarchive_client.py
Normal file
239
tests/services/test_civarchive_client.py
Normal file
@@ -0,0 +1,239 @@
|
|||||||
|
import copy
|
||||||
|
from unittest.mock import AsyncMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from py.services import civarchive_client as civarchive_client_module
|
||||||
|
from py.services.civarchive_client import CivArchiveClient
|
||||||
|
from py.services.model_metadata_provider import ModelMetadataProviderManager
|
||||||
|
|
||||||
|
|
||||||
|
class DummyDownloader:
|
||||||
|
def __init__(self):
|
||||||
|
self.calls = []
|
||||||
|
|
||||||
|
async def make_request(self, method, url, use_auth=False, **kwargs):
|
||||||
|
self.calls.append({"method": method, "url": url, "params": kwargs.get("params")})
|
||||||
|
return True, {}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def reset_singletons():
|
||||||
|
CivArchiveClient._instance = None
|
||||||
|
ModelMetadataProviderManager._instance = None
|
||||||
|
yield
|
||||||
|
CivArchiveClient._instance = None
|
||||||
|
ModelMetadataProviderManager._instance = None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def downloader(monkeypatch):
|
||||||
|
instance = DummyDownloader()
|
||||||
|
monkeypatch.setattr(civarchive_client_module, "get_downloader", AsyncMock(return_value=instance))
|
||||||
|
return instance
|
||||||
|
|
||||||
|
|
||||||
|
def _base_civarchive_payload(version_id=1976567, *, trigger="mxpln", nsfw_level=31):
|
||||||
|
version_name = "v2.0" if version_id != 1976567 else "v1.0"
|
||||||
|
file_sha = "e2b7a280d6539556f23f380b3f71e4e22bc4524445c4c96526e117c6005c6ad3"
|
||||||
|
return {
|
||||||
|
"data": {
|
||||||
|
"id": 1746460,
|
||||||
|
"name": "Mixplin Style [Illustrious]",
|
||||||
|
"type": "LORA",
|
||||||
|
"description": "description",
|
||||||
|
"is_nsfw": True,
|
||||||
|
"nsfw_level": nsfw_level,
|
||||||
|
"tags": ["art", "style"],
|
||||||
|
"creator_username": "Ty_Lee",
|
||||||
|
"creator_name": "Ty_Lee",
|
||||||
|
"creator_url": "/users/Ty_Lee",
|
||||||
|
"version": {
|
||||||
|
"id": version_id,
|
||||||
|
"modelId": 1746460,
|
||||||
|
"name": version_name,
|
||||||
|
"baseModel": "Illustrious",
|
||||||
|
"description": "version description",
|
||||||
|
"downloadCount": 437,
|
||||||
|
"ratingCount": 0,
|
||||||
|
"rating": 0,
|
||||||
|
"nsfw_level": nsfw_level,
|
||||||
|
"trigger": [trigger],
|
||||||
|
"files": [
|
||||||
|
{
|
||||||
|
"id": 1874043,
|
||||||
|
"name": "mxpln-illustrious-ty_lee.safetensors",
|
||||||
|
"type": "Model",
|
||||||
|
"sizeKB": 223124.37109375,
|
||||||
|
"downloadUrl": "https://civitai.com/api/download/models/1976567",
|
||||||
|
"sha256": file_sha,
|
||||||
|
"is_primary": False,
|
||||||
|
"mirrors": [
|
||||||
|
{
|
||||||
|
"filename": "mxpln-illustrious-ty_lee.safetensors",
|
||||||
|
"url": "https://civitai.com/api/download/models/1976567",
|
||||||
|
"deletedAt": None,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"images": [
|
||||||
|
{
|
||||||
|
"id": 86403595,
|
||||||
|
"url": "https://img.genur.art/example.png",
|
||||||
|
"nsfwLevel": 1,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
},
|
||||||
|
"versions": [
|
||||||
|
{"id": 2042594, "name": "v2.0"},
|
||||||
|
{"id": 1976567, "name": "v1.0"},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_model_by_hash_transforms_payload(downloader):
|
||||||
|
payload = _base_civarchive_payload()
|
||||||
|
|
||||||
|
async def fake_make_request(method, url, use_auth=False, **kwargs):
|
||||||
|
downloader.calls.append({"url": url, "params": kwargs.get("params")})
|
||||||
|
if url.endswith("/sha256/abc"):
|
||||||
|
return True, copy.deepcopy(payload)
|
||||||
|
return False, "unexpected"
|
||||||
|
|
||||||
|
downloader.make_request = fake_make_request
|
||||||
|
|
||||||
|
client = await CivArchiveClient.get_instance()
|
||||||
|
|
||||||
|
result, error = await client.get_model_by_hash("abc")
|
||||||
|
|
||||||
|
assert error is None
|
||||||
|
assert result["id"] == 1976567
|
||||||
|
assert result["nsfwLevel"] == 31
|
||||||
|
assert result["trainedWords"] == ["mxpln"]
|
||||||
|
assert result["stats"] == {"downloadCount": 437, "ratingCount": 0, "rating": 0}
|
||||||
|
assert result["model"]["name"] == "Mixplin Style [Illustrious]"
|
||||||
|
assert result["model"]["nsfw"] is True
|
||||||
|
assert result["creator"]["username"] == "Ty_Lee"
|
||||||
|
assert result["creator"]["image"] == ""
|
||||||
|
file_meta = result["files"][0]
|
||||||
|
assert file_meta["hashes"]["SHA256"] == "E2B7A280D6539556F23F380B3F71E4E22BC4524445C4C96526E117C6005C6AD3"
|
||||||
|
assert file_meta["mirrors"][0]["url"] == "https://civitai.com/api/download/models/1976567"
|
||||||
|
assert file_meta["primary"] is True
|
||||||
|
assert result["source"] == "civarchive"
|
||||||
|
assert result["images"][0]["url"] == "https://img.genur.art/example.png"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_model_versions_fetches_each_version(downloader):
|
||||||
|
base_url = "https://civarchive.com/api/models/1746460"
|
||||||
|
base_payload = _base_civarchive_payload(version_id=2042594, trigger="mxpln-new", nsfw_level=5)
|
||||||
|
other_payload = _base_civarchive_payload()
|
||||||
|
|
||||||
|
responses = {
|
||||||
|
(base_url, None): base_payload,
|
||||||
|
(base_url, (("modelVersionId", "2042594"),)): base_payload,
|
||||||
|
(base_url, (("modelVersionId", "1976567"),)): other_payload,
|
||||||
|
}
|
||||||
|
|
||||||
|
async def fake_make_request(method, url, use_auth=False, **kwargs):
|
||||||
|
params = kwargs.get("params")
|
||||||
|
key = (url, tuple(sorted((params or {}).items())) if params else None)
|
||||||
|
downloader.calls.append({"url": url, "params": params})
|
||||||
|
if key in responses:
|
||||||
|
return True, copy.deepcopy(responses[key])
|
||||||
|
return False, "unexpected"
|
||||||
|
|
||||||
|
downloader.make_request = fake_make_request
|
||||||
|
|
||||||
|
client = await CivArchiveClient.get_instance()
|
||||||
|
|
||||||
|
result = await client.get_model_versions("1746460")
|
||||||
|
|
||||||
|
assert result["name"] == "Mixplin Style [Illustrious]"
|
||||||
|
assert result["type"] == "LORA"
|
||||||
|
versions = result["modelVersions"]
|
||||||
|
assert [version["id"] for version in versions] == [2042594, 1976567]
|
||||||
|
assert versions[0]["trainedWords"] == ["mxpln-new"]
|
||||||
|
assert versions[1]["trainedWords"] == ["mxpln"]
|
||||||
|
assert versions[0]["nsfwLevel"] == 5
|
||||||
|
assert versions[1]["nsfwLevel"] == 31
|
||||||
|
assert any(call["params"] == {"modelVersionId": "2042594"} for call in downloader.calls)
|
||||||
|
assert any(call["params"] == {"modelVersionId": "1976567"} for call in downloader.calls)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_model_version_redirects_to_actual_model_id(downloader):
|
||||||
|
first_payload = _base_civarchive_payload()
|
||||||
|
first_payload["data"]["version"]["modelId"] = 222
|
||||||
|
|
||||||
|
base_url_request = "https://civarchive.com/api/models/111"
|
||||||
|
redirected_url_request = "https://civarchive.com/api/models/222"
|
||||||
|
|
||||||
|
async def fake_make_request(method, url, use_auth=False, **kwargs):
|
||||||
|
downloader.calls.append({"url": url, "params": kwargs.get("params")})
|
||||||
|
params = kwargs.get("params") or {}
|
||||||
|
if url == base_url_request:
|
||||||
|
return True, copy.deepcopy(first_payload)
|
||||||
|
if url == redirected_url_request and params.get("modelVersionId") == "1976567":
|
||||||
|
return True, copy.deepcopy(_base_civarchive_payload())
|
||||||
|
return False, "unexpected"
|
||||||
|
|
||||||
|
downloader.make_request = fake_make_request
|
||||||
|
|
||||||
|
client = await CivArchiveClient.get_instance()
|
||||||
|
|
||||||
|
result = await client.get_model_version(model_id=111, version_id=1976567)
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert result["model"]["name"] == "Mixplin Style [Illustrious]"
|
||||||
|
assert len(downloader.calls) == 2
|
||||||
|
assert downloader.calls[1]["url"] == redirected_url_request
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_model_by_hash_uses_file_fallback(downloader, monkeypatch):
|
||||||
|
file_only_payload = {
|
||||||
|
"data": {
|
||||||
|
"files": [
|
||||||
|
{
|
||||||
|
"model_id": 1746460,
|
||||||
|
"model_version_id": 1976567,
|
||||||
|
"source": "civitai",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
version_payload = _base_civarchive_payload()
|
||||||
|
|
||||||
|
async def fake_make_request(method, url, use_auth=False, **kwargs):
|
||||||
|
downloader.calls.append({"url": url, "params": kwargs.get("params")})
|
||||||
|
if "/sha256/" in url:
|
||||||
|
return True, copy.deepcopy(file_only_payload)
|
||||||
|
if "/models/1746460" in url:
|
||||||
|
return True, copy.deepcopy(version_payload)
|
||||||
|
return False, "unexpected"
|
||||||
|
|
||||||
|
downloader.make_request = fake_make_request
|
||||||
|
|
||||||
|
client = await CivArchiveClient.get_instance()
|
||||||
|
|
||||||
|
result, error = await client.get_model_by_hash("fallback")
|
||||||
|
|
||||||
|
assert error is None
|
||||||
|
assert result["id"] == 1976567
|
||||||
|
assert result["model"]["name"] == "Mixplin Style [Illustrious]"
|
||||||
|
assert any("/models/1746460" in call["url"] for call in downloader.calls)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_model_by_hash_handles_not_found(downloader):
|
||||||
|
async def fake_make_request(method, url, use_auth=False, **kwargs):
|
||||||
|
return False, "Resource not found"
|
||||||
|
|
||||||
|
downloader.make_request = fake_make_request
|
||||||
|
|
||||||
|
client = await CivArchiveClient.get_instance()
|
||||||
|
|
||||||
|
result, error = await client.get_model_by_hash("missing")
|
||||||
|
|
||||||
|
assert result is None
|
||||||
|
assert error == "Model not found"
|
||||||
@@ -108,6 +108,7 @@ def metadata_provider(monkeypatch):
|
|||||||
"creator": {"username": "Author"},
|
"creator": {"username": "Author"},
|
||||||
"files": [
|
"files": [
|
||||||
{
|
{
|
||||||
|
"type": "Model",
|
||||||
"primary": True,
|
"primary": True,
|
||||||
"downloadUrl": "https://example.invalid/file.safetensors",
|
"downloadUrl": "https://example.invalid/file.safetensors",
|
||||||
"name": "file.safetensors",
|
"name": "file.safetensors",
|
||||||
@@ -206,6 +207,7 @@ async def test_download_uses_active_mirrors(monkeypatch, scanners, metadata_prov
|
|||||||
"creator": {"username": "Author"},
|
"creator": {"username": "Author"},
|
||||||
"files": [
|
"files": [
|
||||||
{
|
{
|
||||||
|
"type": "Model",
|
||||||
"primary": True,
|
"primary": True,
|
||||||
"downloadUrl": "https://example.invalid/file.safetensors",
|
"downloadUrl": "https://example.invalid/file.safetensors",
|
||||||
"mirrors": [
|
"mirrors": [
|
||||||
|
|||||||
@@ -32,6 +32,8 @@ def build_service(
|
|||||||
get_model_by_hash=AsyncMock(),
|
get_model_by_hash=AsyncMock(),
|
||||||
get_model_version=AsyncMock(),
|
get_model_version=AsyncMock(),
|
||||||
)
|
)
|
||||||
|
if default_provider is None:
|
||||||
|
provider.get_model_by_hash.return_value = (None, None)
|
||||||
|
|
||||||
default_provider_factory = AsyncMock(return_value=provider)
|
default_provider_factory = AsyncMock(return_value=provider)
|
||||||
provider_selector = provider_selector or AsyncMock(return_value=provider)
|
provider_selector = provider_selector or AsyncMock(return_value=provider)
|
||||||
@@ -138,6 +140,7 @@ async def test_fetch_and_update_model_success_updates_cache(tmp_path):
|
|||||||
assert model_data["from_civitai"] is True
|
assert model_data["from_civitai"] is True
|
||||||
assert model_data["civitai_deleted"] is False
|
assert model_data["civitai_deleted"] is False
|
||||||
assert "civitai" in model_data
|
assert "civitai" in model_data
|
||||||
|
assert model_data["metadata_source"] == "civitai_api"
|
||||||
|
|
||||||
helpers.metadata_manager.hydrate_model_data.assert_not_awaited()
|
helpers.metadata_manager.hydrate_model_data.assert_not_awaited()
|
||||||
assert model_data["hydrated"] is True
|
assert model_data["hydrated"] is True
|
||||||
@@ -219,6 +222,124 @@ async def test_fetch_and_update_model_respects_deleted_without_archive():
|
|||||||
update_cache.assert_not_awaited()
|
update_cache.assert_not_awaited()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_and_update_model_prefers_civarchive_for_deleted_models(tmp_path):
|
||||||
|
default_provider = SimpleNamespace(
|
||||||
|
get_model_by_hash=AsyncMock(),
|
||||||
|
get_model_version=AsyncMock(),
|
||||||
|
)
|
||||||
|
civarchive_provider = SimpleNamespace(
|
||||||
|
get_model_by_hash=AsyncMock(
|
||||||
|
return_value=(
|
||||||
|
{
|
||||||
|
"source": "civarchive",
|
||||||
|
"model": {"name": "Recovered", "description": "", "tags": []},
|
||||||
|
"images": [],
|
||||||
|
"baseModel": "sdxl",
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
get_model_version=AsyncMock(),
|
||||||
|
)
|
||||||
|
|
||||||
|
async def select_provider(name: str):
|
||||||
|
return civarchive_provider if name == "civarchive_api" else default_provider
|
||||||
|
|
||||||
|
provider_selector = AsyncMock(side_effect=select_provider)
|
||||||
|
helpers = build_service(
|
||||||
|
settings_values={"enable_metadata_archive_db": False},
|
||||||
|
default_provider=default_provider,
|
||||||
|
provider_selector=provider_selector,
|
||||||
|
)
|
||||||
|
|
||||||
|
model_path = tmp_path / "model.safetensors"
|
||||||
|
model_data = {
|
||||||
|
"civitai_deleted": True,
|
||||||
|
"metadata_source": "civarchive",
|
||||||
|
"civitai": {"source": "civarchive"},
|
||||||
|
"file_path": str(model_path),
|
||||||
|
}
|
||||||
|
update_cache = AsyncMock()
|
||||||
|
|
||||||
|
ok, error = await helpers.service.fetch_and_update_model(
|
||||||
|
sha256="deadbeef",
|
||||||
|
file_path=str(model_path),
|
||||||
|
model_data=model_data,
|
||||||
|
update_cache_func=update_cache,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert ok
|
||||||
|
assert error is None
|
||||||
|
provider_selector.assert_awaited_with("civarchive_api")
|
||||||
|
helpers.default_provider_factory.assert_not_awaited()
|
||||||
|
civarchive_provider.get_model_by_hash.assert_awaited_once_with("deadbeef")
|
||||||
|
update_cache.assert_awaited()
|
||||||
|
assert model_data["metadata_source"] == "civarchive"
|
||||||
|
helpers.metadata_manager.save_metadata.assert_awaited()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_and_update_model_falls_back_to_sqlite_after_civarchive_failure(tmp_path):
|
||||||
|
default_provider = SimpleNamespace(
|
||||||
|
get_model_by_hash=AsyncMock(),
|
||||||
|
get_model_version=AsyncMock(),
|
||||||
|
)
|
||||||
|
civarchive_provider = SimpleNamespace(
|
||||||
|
get_model_by_hash=AsyncMock(return_value=(None, "Model not found")),
|
||||||
|
get_model_version=AsyncMock(),
|
||||||
|
)
|
||||||
|
sqlite_payload = {
|
||||||
|
"source": "archive_db",
|
||||||
|
"model": {"name": "Recovered", "description": "", "tags": []},
|
||||||
|
"images": [],
|
||||||
|
"baseModel": "sdxl",
|
||||||
|
}
|
||||||
|
sqlite_provider = SimpleNamespace(
|
||||||
|
get_model_by_hash=AsyncMock(return_value=(sqlite_payload, None)),
|
||||||
|
get_model_version=AsyncMock(),
|
||||||
|
)
|
||||||
|
|
||||||
|
async def select_provider(name: str):
|
||||||
|
if name == "civarchive_api":
|
||||||
|
return civarchive_provider
|
||||||
|
if name == "sqlite":
|
||||||
|
return sqlite_provider
|
||||||
|
return default_provider
|
||||||
|
|
||||||
|
provider_selector = AsyncMock(side_effect=select_provider)
|
||||||
|
helpers = build_service(
|
||||||
|
settings_values={"enable_metadata_archive_db": True},
|
||||||
|
default_provider=default_provider,
|
||||||
|
provider_selector=provider_selector,
|
||||||
|
)
|
||||||
|
|
||||||
|
model_path = tmp_path / "model.safetensors"
|
||||||
|
model_data = {
|
||||||
|
"civitai_deleted": True,
|
||||||
|
"db_checked": False,
|
||||||
|
"file_path": str(model_path),
|
||||||
|
}
|
||||||
|
update_cache = AsyncMock()
|
||||||
|
|
||||||
|
ok, error = await helpers.service.fetch_and_update_model(
|
||||||
|
sha256="cafe",
|
||||||
|
file_path=str(model_path),
|
||||||
|
model_data=model_data,
|
||||||
|
update_cache_func=update_cache,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert ok and error is None
|
||||||
|
assert civarchive_provider.get_model_by_hash.await_count == 1
|
||||||
|
assert sqlite_provider.get_model_by_hash.await_count == 1
|
||||||
|
assert model_data["metadata_source"] == "archive_db"
|
||||||
|
assert model_data["db_checked"] is True
|
||||||
|
assert provider_selector.await_args_list[0].args == ("civarchive_api",)
|
||||||
|
assert provider_selector.await_args_list[1].args == ("sqlite",)
|
||||||
|
update_cache.assert_awaited()
|
||||||
|
helpers.metadata_manager.save_metadata.assert_awaited()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_relink_metadata_fetches_version_and_updates_sha(tmp_path):
|
async def test_relink_metadata_fetches_version_and_updates_sha(tmp_path):
|
||||||
provider = SimpleNamespace(
|
provider = SimpleNamespace(
|
||||||
|
|||||||
Reference in New Issue
Block a user