Merge pull request #557 from willmiao/civarc-api-support

CivArchive API support
This commit is contained in:
pixelpaws
2025-10-11 18:10:06 +08:00
committed by GitHub
12 changed files with 1175 additions and 128 deletions

View File

@@ -0,0 +1,529 @@
import os
import json
import logging
import asyncio
from copy import deepcopy
from typing import Optional, Dict, Tuple, List
from .model_metadata_provider import CivArchiveModelMetadataProvider, ModelMetadataProviderManager
from .downloader import get_downloader
try:
from bs4 import BeautifulSoup
except ImportError as exc:
BeautifulSoup = None # type: ignore[assignment]
_BS4_IMPORT_ERROR = exc
else:
_BS4_IMPORT_ERROR = None
def _require_beautifulsoup():
if BeautifulSoup is None:
raise RuntimeError(
"BeautifulSoup (bs4) is required for CivArchive client. "
"Install it with 'pip install beautifulsoup4'."
) from _BS4_IMPORT_ERROR
return BeautifulSoup
logger = logging.getLogger(__name__)
class CivArchiveClient:
_instance = None
_lock = asyncio.Lock()
@classmethod
async def get_instance(cls):
"""Get singleton instance of CivArchiveClient"""
async with cls._lock:
if cls._instance is None:
cls._instance = cls()
# Register this client as a metadata provider
provider_manager = await ModelMetadataProviderManager.get_instance()
provider_manager.register_provider('civarchive', CivArchiveModelMetadataProvider(cls._instance), False)
return cls._instance
def __init__(self):
# Check if already initialized for singleton pattern
if hasattr(self, '_initialized'):
return
self._initialized = True
self.base_url = "https://civarchive.com/api"
async def _request_json(
self,
path: str,
params: Optional[Dict[str, str]] = None
) -> Tuple[Optional[Dict], Optional[str]]:
"""Call CivArchive API and return JSON payload"""
downloader = await get_downloader()
kwargs: Dict[str, Dict[str, str]] = {}
if params:
safe_params = {str(key): str(value) for key, value in params.items() if value is not None}
if safe_params:
kwargs["params"] = safe_params
success, payload = await downloader.make_request(
"GET",
f"{self.base_url}{path}",
use_auth=False,
**kwargs
)
if not success:
error = payload if isinstance(payload, str) else "Request failed"
return None, error
if not isinstance(payload, dict):
return None, "Invalid response structure"
return payload, None
@staticmethod
def _normalize_payload(payload: Dict) -> Dict:
"""Unwrap CivArchive responses that wrap content under a data key"""
if not isinstance(payload, dict):
return {}
data = payload.get("data")
if isinstance(data, dict):
return data
return payload
@staticmethod
def _split_context(payload: Dict) -> Tuple[Dict, Dict, List[Dict]]:
"""Separate version payload from surrounding model context"""
data = CivArchiveClient._normalize_payload(payload)
context: Dict = {}
fallback_files: List[Dict] = []
version: Dict = {}
for key, value in data.items():
if key in {"version", "model"}:
continue
context[key] = value
if isinstance(data.get("version"), dict):
version = data["version"]
model_block = data.get("model")
if isinstance(model_block, dict):
for key, value in model_block.items():
if key == "version":
if not version and isinstance(value, dict):
version = value
continue
context.setdefault(key, value)
fallback_files = fallback_files or model_block.get("files") or []
fallback_files = fallback_files or data.get("files") or []
return context, version, fallback_files
@staticmethod
def _ensure_list(value) -> List:
if isinstance(value, list):
return value
if value is None:
return []
return [value]
@staticmethod
def _build_model_info(context: Dict) -> Dict:
tags = context.get("tags")
if not isinstance(tags, list):
tags = list(tags) if isinstance(tags, (set, tuple)) else ([] if tags is None else [tags])
return {
"name": context.get("name"),
"type": context.get("type"),
"nsfw": bool(context.get("is_nsfw", context.get("nsfw", False))),
"description": context.get("description"),
"tags": tags,
}
@staticmethod
def _build_creator_info(context: Dict) -> Dict:
username = context.get("creator_username") or context.get("username") or ""
image = context.get("creator_image") or context.get("creator_avatar") or ""
creator: Dict[str, Optional[str]] = {
"username": username,
"image": image,
}
if context.get("creator_name"):
creator["name"] = context["creator_name"]
if context.get("creator_url"):
creator["url"] = context["creator_url"]
return creator
@staticmethod
def _transform_file_entry(file_data: Dict) -> Dict:
mirrors = file_data.get("mirrors") or []
if not isinstance(mirrors, list):
mirrors = [mirrors]
available_mirror = next(
(mirror for mirror in mirrors if isinstance(mirror, dict) and mirror.get("deletedAt") is None),
None
)
download_url = file_data.get("downloadUrl")
if not download_url and available_mirror:
download_url = available_mirror.get("url")
name = file_data.get("name")
if not name and available_mirror:
name = available_mirror.get("filename")
transformed: Dict = {
"id": file_data.get("id"),
"sizeKB": file_data.get("sizeKB"),
"name": name,
"type": file_data.get("type"),
"downloadUrl": download_url,
"primary": True,
# TODO: for some reason is_primary is false in CivArchive response, need to figure this out,
# "primary": bool(file_data.get("is_primary", file_data.get("primary", False))),
"mirrors": mirrors,
}
sha256 = file_data.get("sha256")
if sha256:
transformed["hashes"] = {"SHA256": str(sha256).upper()}
elif isinstance(file_data.get("hashes"), dict):
transformed["hashes"] = file_data["hashes"]
if "metadata" in file_data:
transformed["metadata"] = file_data["metadata"]
if file_data.get("modelVersionId") is not None:
transformed["modelVersionId"] = file_data.get("modelVersionId")
elif file_data.get("model_version_id") is not None:
transformed["modelVersionId"] = file_data.get("model_version_id")
if file_data.get("modelId") is not None:
transformed["modelId"] = file_data.get("modelId")
elif file_data.get("model_id") is not None:
transformed["modelId"] = file_data.get("model_id")
return transformed
def _transform_files(
self,
files: Optional[List[Dict]],
fallback_files: Optional[List[Dict]] = None
) -> List[Dict]:
candidates: List[Dict] = []
if isinstance(files, list) and files:
candidates = files
elif isinstance(fallback_files, list):
candidates = fallback_files
transformed_files: List[Dict] = []
for file_data in candidates:
if isinstance(file_data, dict):
transformed_files.append(self._transform_file_entry(file_data))
return transformed_files
def _transform_version(
self,
context: Dict,
version: Dict,
fallback_files: Optional[List[Dict]] = None
) -> Optional[Dict]:
if not version:
return None
version_copy = deepcopy(version)
version_copy.pop("model", None)
version_copy.pop("creator", None)
if "trigger" in version_copy:
triggers = version_copy.pop("trigger")
if isinstance(triggers, list):
version_copy["trainedWords"] = triggers
elif triggers is None:
version_copy["trainedWords"] = []
else:
version_copy["trainedWords"] = [triggers]
if "trainedWords" in version_copy and isinstance(version_copy["trainedWords"], str):
version_copy["trainedWords"] = [version_copy["trainedWords"]]
if "nsfw_level" in version_copy:
version_copy["nsfwLevel"] = version_copy.pop("nsfw_level")
elif "nsfwLevel" not in version_copy and context.get("nsfw_level") is not None:
version_copy["nsfwLevel"] = context.get("nsfw_level")
stats_keys = ["downloadCount", "ratingCount", "rating"]
stats = {key: version_copy.pop(key) for key in stats_keys if key in version_copy}
if stats:
version_copy["stats"] = stats
version_copy["files"] = self._transform_files(version_copy.get("files"), fallback_files)
version_copy["images"] = self._ensure_list(version_copy.get("images"))
version_copy["model"] = self._build_model_info(context)
version_copy["creator"] = self._build_creator_info(context)
version_copy["source"] = "civarchive"
version_copy["is_deleted"] = bool(context.get("deletedAt")) or bool(version.get("deletedAt"))
return version_copy
async def _resolve_version_from_files(self, payload: Dict) -> Optional[Dict]:
"""Fallback to fetch version data when only file metadata is available"""
data = self._normalize_payload(payload)
files = data.get("files") or payload.get("files") or []
if not isinstance(files, list):
files = [files]
for file_data in files:
if not isinstance(file_data, dict):
continue
model_id = file_data.get("model_id") or file_data.get("modelId")
version_id = file_data.get("model_version_id") or file_data.get("modelVersionId")
if model_id is None or version_id is None:
continue
resolved = await self.get_model_version(model_id, version_id)
if resolved:
return resolved
return None
async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]:
"""Find model by SHA256 hash value using CivArchive API"""
try:
payload, error = await self._request_json(f"/sha256/{model_hash.lower()}")
if error:
if "not found" in error.lower():
return None, "Model not found"
return None, error
context, version_data, fallback_files = self._split_context(payload)
transformed = self._transform_version(context, version_data, fallback_files)
if transformed:
return transformed, None
resolved = await self._resolve_version_from_files(payload)
if resolved:
return resolved, None
logger.error("Error fetching version of CivArchive model by hash %s", model_hash[:10])
return None, "No version data found"
except Exception as e:
logger.error(f"Error fetching CivArchive model by hash {model_hash[:10]}: {e}")
return None, str(e)
async def get_model_versions(self, model_id: str) -> Optional[Dict]:
"""Get all versions of a model using CivArchive API"""
try:
payload, error = await self._request_json(f"/models/{model_id}")
if error or payload is None:
if error and "not found" in error.lower():
return None
logger.error(f"Error fetching CivArchive model versions for {model_id}: {error}")
return None
data = self._normalize_payload(payload)
context, version_data, fallback_files = self._split_context(payload)
versions_meta = data.get("versions") or []
transformed_versions: List[Dict] = []
for meta in versions_meta:
if not isinstance(meta, dict):
continue
version_id = meta.get("id")
if version_id is None:
continue
target_model_id = meta.get("modelId") or model_id
version = await self.get_model_version(target_model_id, version_id)
if version:
transformed_versions.append(version)
# Ensure the primary version is included even if versions list was empty
primary_version = self._transform_version(context, version_data, fallback_files)
if primary_version:
transformed_versions.insert(0, primary_version)
ordered_versions: List[Dict] = []
seen_ids = set()
for version in transformed_versions:
version_id = version.get("id")
if version_id in seen_ids:
continue
seen_ids.add(version_id)
ordered_versions.append(version)
return {
"modelVersions": ordered_versions,
"type": context.get("type", ""),
"name": context.get("name", ""),
}
except Exception as e:
logger.error(f"Error fetching CivArchive model versions for {model_id}: {e}")
return None
async def get_model_version(self, model_id: int = None, version_id: int = None) -> Optional[Dict]:
"""Get specific model version using CivArchive API
Args:
model_id: The model ID (required)
version_id: Optional specific version ID to filter to
Returns:
Optional[Dict]: The model version data or None if not found
"""
if model_id is None:
return None
try:
params = {"modelVersionId": version_id} if version_id is not None else None
payload, error = await self._request_json(f"/models/{model_id}", params=params)
if error or payload is None:
if error and "not found" in error.lower():
return None
logger.error(f"Error fetching CivArchive model version via API {model_id}/{version_id}: {error}")
return None
context, version_data, fallback_files = self._split_context(payload)
if not version_data:
return await self._resolve_version_from_files(payload)
if version_id is not None:
raw_id = version_data.get("id")
if raw_id != version_id:
logger.warning(
"Requested version %s doesn't match default version %s for model %s",
version_id,
raw_id,
model_id,
)
return None
actual_model_id = version_data.get("modelId")
context_model_id = context.get("id")
# CivArchive can respond with data for a different model id while already
# returning the fully resolved model context. Only follow the redirect when
# the context itself still points to the original (wrong) model.
if (
actual_model_id is not None
and str(actual_model_id) != str(model_id)
and (context_model_id is None or str(context_model_id) != str(actual_model_id))
):
return await self.get_model_version(actual_model_id, version_id)
return self._transform_version(context, version_data, fallback_files)
except Exception as e:
logger.error(f"Error fetching CivArchive model version via API {model_id}/{version_id}: {e}")
return None
async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]:
""" Fetch model version metadata using a known bogus model lookup
CivArchive lacks a direct version lookup API, this uses a workaround (which we handle in the main model request now)
Args:
version_id: The model version ID
Returns:
Tuple[Optional[Dict], Optional[str]]: (version_data, error_message)
"""
version = await self.get_model_version(1, version_id)
if version is None:
return None, "Model not found"
return version, None
async def get_model_by_url(self, url) -> Optional[Dict]:
"""Get specific model version by parsing CivArchive HTML page (legacy method)
This is the original HTML scraping implementation, kept for reference and new sites added not in api.
The primary get_model_version() now uses the API instead.
"""
try:
# Construct CivArchive URL
url = f"https://civarchive.com/{url}"
downloader = await get_downloader()
session = await downloader.session
async with session.get(url) as response:
if response.status != 200:
return None
html_content = await response.text()
# Parse HTML to extract JSON data
soup_parser = _require_beautifulsoup()
soup = soup_parser(html_content, 'html.parser')
script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'})
if not script_tag:
return None
# Parse JSON content
json_data = json.loads(script_tag.string)
model_data = json_data.get('props', {}).get('pageProps', {}).get('model')
if not model_data or 'version' not in model_data:
return None
# Extract version data as base
version = model_data['version'].copy()
# Restructure stats
if 'downloadCount' in version and 'ratingCount' in version and 'rating' in version:
version['stats'] = {
'downloadCount': version.pop('downloadCount'),
'ratingCount': version.pop('ratingCount'),
'rating': version.pop('rating')
}
# Rename trigger to trainedWords
if 'trigger' in version:
version['trainedWords'] = version.pop('trigger')
# Transform files data to expected format
if 'files' in version:
transformed_files = []
for file_data in version['files']:
# Find first available mirror (deletedAt is null)
available_mirror = None
for mirror in file_data.get('mirrors', []):
if mirror.get('deletedAt') is None:
available_mirror = mirror
break
# Create transformed file entry
transformed_file = {
'id': file_data.get('id'),
'sizeKB': file_data.get('sizeKB'),
'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'),
'type': file_data.get('type'),
'downloadUrl': available_mirror.get('url') if available_mirror else None,
'primary': file_data.get('is_primary', False),
'mirrors': file_data.get('mirrors', [])
}
# Transform hash format
if 'sha256' in file_data:
transformed_file['hashes'] = {
'SHA256': file_data['sha256'].upper()
}
transformed_files.append(transformed_file)
version['files'] = transformed_files
# Add model information
version['model'] = {
'name': model_data.get('name'),
'type': model_data.get('type'),
'nsfw': model_data.get('is_nsfw', False),
'description': model_data.get('description'),
'tags': model_data.get('tags', [])
}
version['creator'] = {
'username': model_data.get('username'),
'image': ''
}
# Add source identifier
version['source'] = 'civarchive'
version['is_deleted'] = json_data.get('query', {}).get('is_deleted', False)
return version
except Exception as e:
logger.error(f"Error fetching CivArchive model version (scraping) {url}: {e}")
return None

View File

@@ -294,7 +294,7 @@ class DownloadManager:
await progress_callback(0)
# 2. Get file information
file_info = next((f for f in version_info.get('files', []) if f.get('primary')), None)
file_info = next((f for f in version_info.get('files', []) if f.get('primary') and f.get('type') == 'Model'), None)
if not file_info:
return {'success': False, 'error': 'No primary file found in metadata'}
mirrors = file_info.get('mirrors') or []

View File

@@ -4,6 +4,7 @@ from .model_metadata_provider import (
ModelMetadataProviderManager,
SQLiteModelMetadataProvider,
CivitaiModelMetadataProvider,
CivArchiveModelMetadataProvider,
FallbackMetadataProvider
)
from .settings_manager import get_settings_manager
@@ -54,26 +55,27 @@ async def initialize_metadata_providers():
except Exception as e:
logger.error(f"Failed to initialize Civitai API metadata provider: {e}")
# Register CivArchive provider, but do NOT add to fallback providers
# Register CivArchive provider, and all add to fallback providers
try:
from .model_metadata_provider import CivArchiveModelMetadataProvider
civarchive_provider = CivArchiveModelMetadataProvider()
provider_manager.register_provider('civarchive', civarchive_provider)
logger.debug("CivArchive metadata provider registered (not included in fallback)")
civarchive_client = await ServiceRegistry.get_civarchive_client()
civarchive_provider = CivArchiveModelMetadataProvider(civarchive_client)
provider_manager.register_provider('civarchive_api', civarchive_provider)
providers.append(('civarchive_api', civarchive_provider))
logger.debug("CivArchive metadata provider registered (also included in fallback)")
except Exception as e:
logger.error(f"Failed to initialize CivArchive metadata provider: {e}")
# Set up fallback provider based on available providers
if len(providers) > 1:
# Always use Civitai API first, then Archive DB
# Always use Civitai API (it has better metadata), then CivArchive API, then Archive DB
ordered_providers = []
ordered_providers.extend([p[1] for p in providers if p[0] == 'civitai_api'])
ordered_providers.extend([p[1] for p in providers if p[0] == 'civarchive_api'])
ordered_providers.extend([p[1] for p in providers if p[0] == 'sqlite'])
if ordered_providers:
fallback_provider = FallbackMetadataProvider(ordered_providers)
provider_manager.register_provider('fallback', fallback_provider, is_default=True)
logger.debug(f"Fallback metadata provider registered with {len(ordered_providers)} providers, Civitai API first")
elif len(providers) == 1:
# Only one provider available, set it as default
provider_name, provider = providers[0]

View File

@@ -167,41 +167,101 @@ class MetadataSyncService:
metadata_path = os.path.splitext(file_path)[0] + ".metadata.json"
enable_archive = self._settings.get("enable_metadata_archive_db", False)
previous_source = model_data.get("metadata_source") or (model_data.get("civitai") or {}).get("source")
try:
provider_attempts: list[tuple[Optional[str], MetadataProviderProtocol]] = []
sqlite_attempted = False
if model_data.get("civitai_deleted") is True:
if not enable_archive or model_data.get("db_checked") is True:
if previous_source in (None, "civarchive"):
try:
provider_attempts.append(("civarchive_api", await self._get_provider("civarchive_api")))
except Exception as exc: # pragma: no cover - provider resolution fault
logger.debug("Unable to resolve civarchive provider: %s", exc)
if enable_archive and model_data.get("db_checked") is not True:
try:
provider_attempts.append(("sqlite", await self._get_provider("sqlite")))
except Exception as exc: # pragma: no cover - provider resolution fault
logger.debug("Unable to resolve sqlite provider: %s", exc)
if not provider_attempts:
if not enable_archive:
error_msg = "CivitAI model is deleted and metadata archive DB is not enabled"
else:
elif model_data.get("db_checked") is True:
error_msg = "CivitAI model is deleted and not found in metadata archive DB"
return (False, error_msg)
metadata_provider = await self._get_provider("sqlite")
else:
error_msg = "CivitAI model is deleted and no archive provider is available"
return False, error_msg
else:
metadata_provider = await self._get_default_provider()
provider_attempts.append((None, await self._get_default_provider()))
civitai_metadata, error = await metadata_provider.get_model_by_hash(sha256)
civitai_metadata: Optional[Dict[str, Any]] = None
metadata_provider: Optional[MetadataProviderProtocol] = None
provider_used: Optional[str] = None
last_error: Optional[str] = None
if not civitai_metadata:
if error == "Model not found":
for provider_name, provider in provider_attempts:
try:
civitai_metadata_candidate, error = await provider.get_model_by_hash(sha256)
except Exception as exc: # pragma: no cover - defensive logging
logger.error("Provider %s failed for hash %s: %s", provider_name, sha256, exc)
civitai_metadata_candidate, error = None, str(exc)
if provider_name == "sqlite":
sqlite_attempted = True
if civitai_metadata_candidate:
civitai_metadata = civitai_metadata_candidate
metadata_provider = provider
provider_used = provider_name
break
last_error = error or last_error
if civitai_metadata is None or metadata_provider is None:
if sqlite_attempted:
model_data["db_checked"] = True
if last_error == "Model not found":
model_data["from_civitai"] = False
model_data["civitai_deleted"] = True
model_data["db_checked"] = enable_archive
model_data["db_checked"] = sqlite_attempted or (enable_archive and model_data.get("db_checked", False))
model_data["last_checked_at"] = datetime.now().timestamp()
data_to_save = model_data.copy()
data_to_save.pop("folder", None)
await self._metadata_manager.save_metadata(file_path, data_to_save)
default_error = (
"CivitAI model is deleted and metadata archive DB is not enabled"
if model_data.get("civitai_deleted") and not enable_archive
else "CivitAI model is deleted and not found in metadata archive DB"
if model_data.get("civitai_deleted") and (model_data.get("db_checked") is True or sqlite_attempted)
else "No provider returned metadata"
)
error_msg = (
f"Error fetching metadata: {error} (model_name={model_data.get('model_name', '')})"
f"Error fetching metadata: {last_error or default_error} "
f"(model_name={model_data.get('model_name', '')})"
)
logger.error(error_msg)
return False, error_msg
model_data["from_civitai"] = True
model_data["civitai_deleted"] = civitai_metadata.get("source") == "archive_db"
model_data["db_checked"] = enable_archive
model_data["civitai_deleted"] = civitai_metadata.get("source") == "archive_db" or civitai_metadata.get("source") == "civarchive"
model_data["db_checked"] = enable_archive and (
civitai_metadata.get("source") == "archive_db" or sqlite_attempted
)
source = civitai_metadata.get("source") or "civitai_api"
if source == "api":
source = "civitai_api"
elif provider_used == "civarchive_api" and source != "civarchive":
source = "civarchive"
elif provider_used == "sqlite":
source = "archive_db"
model_data["metadata_source"] = source
model_data["last_checked_at"] = datetime.now().timestamp()
local_metadata = model_data.copy()

View File

@@ -88,122 +88,22 @@ class CivitaiModelMetadataProvider(ModelMetadataProvider):
return await self.client.get_user_models(username)
class CivArchiveModelMetadataProvider(ModelMetadataProvider):
"""Provider that uses CivArchive HTML page parsing for metadata"""
"""Provider that uses CivArchive API for metadata"""
def __init__(self, civarchive_client):
self.client = civarchive_client
async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]:
"""Not supported by CivArchive provider"""
return None, "CivArchive provider does not support hash lookup"
return await self.client.get_model_by_hash(model_hash)
async def get_model_versions(self, model_id: str) -> Optional[Dict]:
"""Not supported by CivArchive provider"""
return None
return await self.client.get_model_versions(model_id)
async def get_model_version(self, model_id: int = None, version_id: int = None) -> Optional[Dict]:
"""Get specific model version by parsing CivArchive HTML page"""
if model_id is None or version_id is None:
return None
try:
# Construct CivArchive URL
url = f"https://civarchive.com/models/{model_id}?modelVersionId={version_id}"
downloader = await get_downloader()
session = await downloader.session
async with session.get(url) as response:
if response.status != 200:
return None
html_content = await response.text()
# Parse HTML to extract JSON data
soup_parser = _require_beautifulsoup()
soup = soup_parser(html_content, 'html.parser')
script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'})
if not script_tag:
return None
# Parse JSON content
json_data = json.loads(script_tag.string)
model_data = json_data.get('props', {}).get('pageProps', {}).get('model')
if not model_data or 'version' not in model_data:
return None
# Extract version data as base
version = model_data['version'].copy()
# Restructure stats
if 'downloadCount' in version and 'ratingCount' in version and 'rating' in version:
version['stats'] = {
'downloadCount': version.pop('downloadCount'),
'ratingCount': version.pop('ratingCount'),
'rating': version.pop('rating')
}
# Rename trigger to trainedWords
if 'trigger' in version:
version['trainedWords'] = version.pop('trigger')
# Transform files data to expected format
if 'files' in version:
transformed_files = []
for file_data in version['files']:
# Find first available mirror (deletedAt is null)
available_mirror = None
for mirror in file_data.get('mirrors', []):
if mirror.get('deletedAt') is None:
available_mirror = mirror
break
# Create transformed file entry
transformed_file = {
'id': file_data.get('id'),
'sizeKB': file_data.get('sizeKB'),
'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'),
'type': file_data.get('type'),
'downloadUrl': available_mirror.get('url') if available_mirror else None,
'primary': True,
'mirrors': file_data.get('mirrors', [])
}
# Transform hash format
if 'sha256' in file_data:
transformed_file['hashes'] = {
'SHA256': file_data['sha256'].upper()
}
transformed_files.append(transformed_file)
version['files'] = transformed_files
# Add model information
version['model'] = {
'name': model_data.get('name'),
'type': model_data.get('type'),
'nsfw': model_data.get('is_nsfw', False),
'description': model_data.get('description'),
'tags': model_data.get('tags', [])
}
version['creator'] = {
'username': model_data.get('username'),
'image': ''
}
# Add source identifier
version['source'] = 'civarchive'
version['is_deleted'] = json_data.get('query', {}).get('is_deleted', False)
return version
except Exception as e:
logger.error(f"Error fetching CivArchive model version {model_id}/{version_id}: {e}")
return None
return await self.client.get_model_version(model_id, version_id)
async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]:
"""Not supported by CivArchive provider - requires both model_id and version_id"""
return None, "CivArchive provider requires both model_id and version_id"
return await self.client.get_model_version_info(version_id)
async def get_user_models(self, username: str) -> Optional[List[Dict]]:
"""Not supported by CivArchive provider"""

View File

@@ -144,6 +144,27 @@ class ServiceRegistry:
cls._services[service_name] = client
logger.debug(f"Created and registered {service_name}")
return client
@classmethod
async def get_civarchive_client(cls):
"""Get or create CivArchive client instance"""
service_name = "civarchive_client"
if service_name in cls._services:
return cls._services[service_name]
async with cls._get_lock(service_name):
# Double-check after acquiring lock
if service_name in cls._services:
return cls._services[service_name]
# Import here to avoid circular imports
from .civarchive_client import CivArchiveClient
client = await CivArchiveClient.get_instance()
cls._services[service_name] = client
logger.debug(f"Created and registered {service_name}")
return client
@classmethod
async def get_download_manager(cls):

View File

@@ -25,6 +25,7 @@ class BaseModelMetadata:
favorite: bool = False # Whether the model is a favorite
exclude: bool = False # Whether to exclude this model from the cache
db_checked: bool = False # Whether checked in archive DB
metadata_source: Optional[str] = None # Last provider that supplied metadata
last_checked_at: float = 0 # Last checked timestamp
_unknown_fields: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False) # Store unknown fields

View File

@@ -0,0 +1,134 @@
{
"id": 1746460,
"name": "Mixplin Style [Illustrious]",
"type": "LORA",
"description": "description",
"username": "Ty_Lee",
"downloadCount": 4207,
"favoriteCount": 0,
"commentCount": 8,
"ratingCount": 0,
"rating": 0,
"is_nsfw": true,
"nsfw_level": 31,
"createdAt": "2025-07-06T01:51:42.859Z",
"updatedAt": "2025-10-10T23:15:26.714Z",
"deletedAt": null,
"tags": [
"art",
"style",
"artist style",
"styles",
"mixplin",
"artiststyle"
],
"creator_id": "Ty_Lee",
"creator_username": "Ty_Lee",
"creator_name": "Ty_Lee",
"creator_url": "/users/Ty_Lee",
"versions": [
{
"id": 2042594,
"name": "v2.0",
"href": "/models/1746460?modelVersionId=2042594"
},
{
"id": 1976567,
"name": "v1.0",
"href": "/models/1746460?modelVersionId=1976567"
}
],
"version": {
"id": 1976567,
"modelId": 1746460,
"name": "v1.0",
"baseModel": "Illustrious",
"baseModelType": "Standard",
"description": null,
"downloadCount": 437,
"ratingCount": 0,
"rating": 0,
"is_nsfw": true,
"nsfw_level": 31,
"createdAt": "2025-07-05T10:17:28.716Z",
"updatedAt": "2025-10-10T23:15:26.756Z",
"deletedAt": null,
"files": [
{
"id": 1874043,
"name": "mxpln-illustrious-ty_lee.safetensors",
"type": "Model",
"sizeKB": 223124.37109375,
"downloadUrl": "https://civitai.com/api/download/models/1976567",
"modelId": 1746460,
"modelName": "Mixplin Style [Illustrious]",
"modelVersionId": 1976567,
"is_nsfw": true,
"nsfw_level": 31,
"sha256": "e2b7a280d6539556f23f380b3f71e4e22bc4524445c4c96526e117c6005c6ad3",
"createdAt": "2025-07-05T10:17:28.716Z",
"updatedAt": "2025-10-10T23:15:26.766Z",
"is_primary": false,
"mirrors": [
{
"filename": "mxpln-illustrious-ty_lee.safetensors",
"url": "https://civitai.com/api/download/models/1976567",
"source": "civitai",
"model_id": 1746460,
"model_version_id": 1976567,
"deletedAt": null,
"is_gated": false,
"is_paid": false
}
]
}
],
"images": [
{
"id": 86403595,
"url": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=",
"nsfwLevel": 1,
"width": 1560,
"height": 2280,
"hash": "U7G8Zp0w02%IA6%N00-;D]-W~VNG0nMw-.IV",
"type": "image",
"minor": false,
"poi": false,
"hasMeta": true,
"hasPositivePrompt": true,
"onSite": false,
"remixOfId": null,
"image_url": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=",
"link": "https://genur.art/posts/86403595"
}
],
"trigger": [
"mxpln"
],
"allow_download": true,
"download_url": "/api/download/models/1976567",
"platform_url": "https://civitai.com/models/1746460?modelVersionId=1976567",
"civitai_model_id": 1746460,
"civitai_model_version_id": 1976567,
"href": "/models/1746460?modelVersionId=1976567",
"mirrors": [
{
"platform": "tensorart",
"href": "/tensorart/models/904473536033245448/versions/904473536033245448",
"platform_url": "https://tensor.art/models/904473536033245448",
"name": "Mixplin Style MXP",
"version_name": "Mixplin",
"id": "904473536033245448",
"version_id": "904473536033245448"
}
]
},
"platform": "civitai",
"platform_name": "CivitAI",
"meta": {
"title": "Mixplin Style [Illustrious] - v1.0 - CivitAI Archive",
"description": "Mixplin Style [Illustrious] v1.0 is a Illustrious LORA AI model created by Ty_Lee for generating images of art, style, artist style, styles, mixplin, artiststyle",
"image": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=",
"canonical": "https://civarchive.com/models/1746460?modelVersionId=1976567"
}
}

38
refs/target_version.json Normal file
View File

@@ -0,0 +1,38 @@
{
"id": 2269146,
"modelId": 2004760,
"name": "v1.0 Illustrious",
"nsfwLevel": 1,
"trainedWords": ["PencilSketchDaal"],
"baseModel": "Illustrious",
"description": "<p>Illustrious. Your pencil may vary with your checkpoint. </p>",
"model": {
"name": "Pencil Sketch Anime",
"type": "LORA",
"nsfw": false,
"description": "description",
"tags": ["style"]
},
"files": [
{
"id": 2161260,
"sizeKB": 223106.37890625,
"name": "Pencil-Sketch-Illustrious.safetensors",
"type": "Model",
"hashes": {
"SHA256": "2C70479CD673B0FE056EAF4FD97C7F33A39F14853805431AC9AB84226ECE3B82"
},
"primary": true,
"downloadUrl": "https://civitai.com/api/download/models/2269146",
"mirrors": {}
}
],
"images": [
{},
{}
],
"creator": {
"username": "Daalis",
"image": "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/eb245b49-edc8-4ed6-ad7b-6d61eb8c51de/width=96/Daalis.jpeg"
}
}

View File

@@ -0,0 +1,239 @@
import copy
from unittest.mock import AsyncMock
import pytest
from py.services import civarchive_client as civarchive_client_module
from py.services.civarchive_client import CivArchiveClient
from py.services.model_metadata_provider import ModelMetadataProviderManager
class DummyDownloader:
def __init__(self):
self.calls = []
async def make_request(self, method, url, use_auth=False, **kwargs):
self.calls.append({"method": method, "url": url, "params": kwargs.get("params")})
return True, {}
@pytest.fixture(autouse=True)
def reset_singletons():
CivArchiveClient._instance = None
ModelMetadataProviderManager._instance = None
yield
CivArchiveClient._instance = None
ModelMetadataProviderManager._instance = None
@pytest.fixture
def downloader(monkeypatch):
instance = DummyDownloader()
monkeypatch.setattr(civarchive_client_module, "get_downloader", AsyncMock(return_value=instance))
return instance
def _base_civarchive_payload(version_id=1976567, *, trigger="mxpln", nsfw_level=31):
version_name = "v2.0" if version_id != 1976567 else "v1.0"
file_sha = "e2b7a280d6539556f23f380b3f71e4e22bc4524445c4c96526e117c6005c6ad3"
return {
"data": {
"id": 1746460,
"name": "Mixplin Style [Illustrious]",
"type": "LORA",
"description": "description",
"is_nsfw": True,
"nsfw_level": nsfw_level,
"tags": ["art", "style"],
"creator_username": "Ty_Lee",
"creator_name": "Ty_Lee",
"creator_url": "/users/Ty_Lee",
"version": {
"id": version_id,
"modelId": 1746460,
"name": version_name,
"baseModel": "Illustrious",
"description": "version description",
"downloadCount": 437,
"ratingCount": 0,
"rating": 0,
"nsfw_level": nsfw_level,
"trigger": [trigger],
"files": [
{
"id": 1874043,
"name": "mxpln-illustrious-ty_lee.safetensors",
"type": "Model",
"sizeKB": 223124.37109375,
"downloadUrl": "https://civitai.com/api/download/models/1976567",
"sha256": file_sha,
"is_primary": False,
"mirrors": [
{
"filename": "mxpln-illustrious-ty_lee.safetensors",
"url": "https://civitai.com/api/download/models/1976567",
"deletedAt": None,
}
],
}
],
"images": [
{
"id": 86403595,
"url": "https://img.genur.art/example.png",
"nsfwLevel": 1,
}
],
},
"versions": [
{"id": 2042594, "name": "v2.0"},
{"id": 1976567, "name": "v1.0"},
],
}
}
async def test_get_model_by_hash_transforms_payload(downloader):
payload = _base_civarchive_payload()
async def fake_make_request(method, url, use_auth=False, **kwargs):
downloader.calls.append({"url": url, "params": kwargs.get("params")})
if url.endswith("/sha256/abc"):
return True, copy.deepcopy(payload)
return False, "unexpected"
downloader.make_request = fake_make_request
client = await CivArchiveClient.get_instance()
result, error = await client.get_model_by_hash("abc")
assert error is None
assert result["id"] == 1976567
assert result["nsfwLevel"] == 31
assert result["trainedWords"] == ["mxpln"]
assert result["stats"] == {"downloadCount": 437, "ratingCount": 0, "rating": 0}
assert result["model"]["name"] == "Mixplin Style [Illustrious]"
assert result["model"]["nsfw"] is True
assert result["creator"]["username"] == "Ty_Lee"
assert result["creator"]["image"] == ""
file_meta = result["files"][0]
assert file_meta["hashes"]["SHA256"] == "E2B7A280D6539556F23F380B3F71E4E22BC4524445C4C96526E117C6005C6AD3"
assert file_meta["mirrors"][0]["url"] == "https://civitai.com/api/download/models/1976567"
assert file_meta["primary"] is True
assert result["source"] == "civarchive"
assert result["images"][0]["url"] == "https://img.genur.art/example.png"
async def test_get_model_versions_fetches_each_version(downloader):
base_url = "https://civarchive.com/api/models/1746460"
base_payload = _base_civarchive_payload(version_id=2042594, trigger="mxpln-new", nsfw_level=5)
other_payload = _base_civarchive_payload()
responses = {
(base_url, None): base_payload,
(base_url, (("modelVersionId", "2042594"),)): base_payload,
(base_url, (("modelVersionId", "1976567"),)): other_payload,
}
async def fake_make_request(method, url, use_auth=False, **kwargs):
params = kwargs.get("params")
key = (url, tuple(sorted((params or {}).items())) if params else None)
downloader.calls.append({"url": url, "params": params})
if key in responses:
return True, copy.deepcopy(responses[key])
return False, "unexpected"
downloader.make_request = fake_make_request
client = await CivArchiveClient.get_instance()
result = await client.get_model_versions("1746460")
assert result["name"] == "Mixplin Style [Illustrious]"
assert result["type"] == "LORA"
versions = result["modelVersions"]
assert [version["id"] for version in versions] == [2042594, 1976567]
assert versions[0]["trainedWords"] == ["mxpln-new"]
assert versions[1]["trainedWords"] == ["mxpln"]
assert versions[0]["nsfwLevel"] == 5
assert versions[1]["nsfwLevel"] == 31
assert any(call["params"] == {"modelVersionId": "2042594"} for call in downloader.calls)
assert any(call["params"] == {"modelVersionId": "1976567"} for call in downloader.calls)
async def test_get_model_version_redirects_to_actual_model_id(downloader):
first_payload = _base_civarchive_payload()
first_payload["data"]["version"]["modelId"] = 222
base_url_request = "https://civarchive.com/api/models/111"
redirected_url_request = "https://civarchive.com/api/models/222"
async def fake_make_request(method, url, use_auth=False, **kwargs):
downloader.calls.append({"url": url, "params": kwargs.get("params")})
params = kwargs.get("params") or {}
if url == base_url_request:
return True, copy.deepcopy(first_payload)
if url == redirected_url_request and params.get("modelVersionId") == "1976567":
return True, copy.deepcopy(_base_civarchive_payload())
return False, "unexpected"
downloader.make_request = fake_make_request
client = await CivArchiveClient.get_instance()
result = await client.get_model_version(model_id=111, version_id=1976567)
assert result is not None
assert result["model"]["name"] == "Mixplin Style [Illustrious]"
assert len(downloader.calls) == 2
assert downloader.calls[1]["url"] == redirected_url_request
async def test_get_model_by_hash_uses_file_fallback(downloader, monkeypatch):
file_only_payload = {
"data": {
"files": [
{
"model_id": 1746460,
"model_version_id": 1976567,
"source": "civitai",
}
]
}
}
version_payload = _base_civarchive_payload()
async def fake_make_request(method, url, use_auth=False, **kwargs):
downloader.calls.append({"url": url, "params": kwargs.get("params")})
if "/sha256/" in url:
return True, copy.deepcopy(file_only_payload)
if "/models/1746460" in url:
return True, copy.deepcopy(version_payload)
return False, "unexpected"
downloader.make_request = fake_make_request
client = await CivArchiveClient.get_instance()
result, error = await client.get_model_by_hash("fallback")
assert error is None
assert result["id"] == 1976567
assert result["model"]["name"] == "Mixplin Style [Illustrious]"
assert any("/models/1746460" in call["url"] for call in downloader.calls)
async def test_get_model_by_hash_handles_not_found(downloader):
async def fake_make_request(method, url, use_auth=False, **kwargs):
return False, "Resource not found"
downloader.make_request = fake_make_request
client = await CivArchiveClient.get_instance()
result, error = await client.get_model_by_hash("missing")
assert result is None
assert error == "Model not found"

View File

@@ -108,6 +108,7 @@ def metadata_provider(monkeypatch):
"creator": {"username": "Author"},
"files": [
{
"type": "Model",
"primary": True,
"downloadUrl": "https://example.invalid/file.safetensors",
"name": "file.safetensors",
@@ -206,6 +207,7 @@ async def test_download_uses_active_mirrors(monkeypatch, scanners, metadata_prov
"creator": {"username": "Author"},
"files": [
{
"type": "Model",
"primary": True,
"downloadUrl": "https://example.invalid/file.safetensors",
"mirrors": [

View File

@@ -32,6 +32,8 @@ def build_service(
get_model_by_hash=AsyncMock(),
get_model_version=AsyncMock(),
)
if default_provider is None:
provider.get_model_by_hash.return_value = (None, None)
default_provider_factory = AsyncMock(return_value=provider)
provider_selector = provider_selector or AsyncMock(return_value=provider)
@@ -138,6 +140,7 @@ async def test_fetch_and_update_model_success_updates_cache(tmp_path):
assert model_data["from_civitai"] is True
assert model_data["civitai_deleted"] is False
assert "civitai" in model_data
assert model_data["metadata_source"] == "civitai_api"
helpers.metadata_manager.hydrate_model_data.assert_not_awaited()
assert model_data["hydrated"] is True
@@ -219,6 +222,124 @@ async def test_fetch_and_update_model_respects_deleted_without_archive():
update_cache.assert_not_awaited()
@pytest.mark.asyncio
async def test_fetch_and_update_model_prefers_civarchive_for_deleted_models(tmp_path):
default_provider = SimpleNamespace(
get_model_by_hash=AsyncMock(),
get_model_version=AsyncMock(),
)
civarchive_provider = SimpleNamespace(
get_model_by_hash=AsyncMock(
return_value=(
{
"source": "civarchive",
"model": {"name": "Recovered", "description": "", "tags": []},
"images": [],
"baseModel": "sdxl",
},
None,
)
),
get_model_version=AsyncMock(),
)
async def select_provider(name: str):
return civarchive_provider if name == "civarchive_api" else default_provider
provider_selector = AsyncMock(side_effect=select_provider)
helpers = build_service(
settings_values={"enable_metadata_archive_db": False},
default_provider=default_provider,
provider_selector=provider_selector,
)
model_path = tmp_path / "model.safetensors"
model_data = {
"civitai_deleted": True,
"metadata_source": "civarchive",
"civitai": {"source": "civarchive"},
"file_path": str(model_path),
}
update_cache = AsyncMock()
ok, error = await helpers.service.fetch_and_update_model(
sha256="deadbeef",
file_path=str(model_path),
model_data=model_data,
update_cache_func=update_cache,
)
assert ok
assert error is None
provider_selector.assert_awaited_with("civarchive_api")
helpers.default_provider_factory.assert_not_awaited()
civarchive_provider.get_model_by_hash.assert_awaited_once_with("deadbeef")
update_cache.assert_awaited()
assert model_data["metadata_source"] == "civarchive"
helpers.metadata_manager.save_metadata.assert_awaited()
@pytest.mark.asyncio
async def test_fetch_and_update_model_falls_back_to_sqlite_after_civarchive_failure(tmp_path):
default_provider = SimpleNamespace(
get_model_by_hash=AsyncMock(),
get_model_version=AsyncMock(),
)
civarchive_provider = SimpleNamespace(
get_model_by_hash=AsyncMock(return_value=(None, "Model not found")),
get_model_version=AsyncMock(),
)
sqlite_payload = {
"source": "archive_db",
"model": {"name": "Recovered", "description": "", "tags": []},
"images": [],
"baseModel": "sdxl",
}
sqlite_provider = SimpleNamespace(
get_model_by_hash=AsyncMock(return_value=(sqlite_payload, None)),
get_model_version=AsyncMock(),
)
async def select_provider(name: str):
if name == "civarchive_api":
return civarchive_provider
if name == "sqlite":
return sqlite_provider
return default_provider
provider_selector = AsyncMock(side_effect=select_provider)
helpers = build_service(
settings_values={"enable_metadata_archive_db": True},
default_provider=default_provider,
provider_selector=provider_selector,
)
model_path = tmp_path / "model.safetensors"
model_data = {
"civitai_deleted": True,
"db_checked": False,
"file_path": str(model_path),
}
update_cache = AsyncMock()
ok, error = await helpers.service.fetch_and_update_model(
sha256="cafe",
file_path=str(model_path),
model_data=model_data,
update_cache_func=update_cache,
)
assert ok and error is None
assert civarchive_provider.get_model_by_hash.await_count == 1
assert sqlite_provider.get_model_by_hash.await_count == 1
assert model_data["metadata_source"] == "archive_db"
assert model_data["db_checked"] is True
assert provider_selector.await_args_list[0].args == ("civarchive_api",)
assert provider_selector.await_args_list[1].args == ("sqlite",)
update_cache.assert_awaited()
helpers.metadata_manager.save_metadata.assert_awaited()
@pytest.mark.asyncio
async def test_relink_metadata_fetches_version_and_updates_sha(tmp_path):
provider = SimpleNamespace(