From 68c0a5ba7127a38cf0a75ba53c5e32fb89ac5771 Mon Sep 17 00:00:00 2001 From: scruffynerf Date: Fri, 10 Oct 2025 08:04:01 -0400 Subject: [PATCH 1/7] Better Civ Archive support (adds API) (#549) * add CivArchive API * Oops, missed committing this part when I updated codebase to latest version * Adjust API for version fetching and solve the broken API (hash gives only files, not models - likely to be fixed but in the meantime...) * add asyncio import to allow timeout cooldown --------- Co-authored-by: Scruffy Nerf --- py/services/civarchive_client.py | 419 +++++++++++++++++++++++++ py/services/metadata_service.py | 17 +- py/services/metadata_sync_service.py | 41 +++ py/services/model_metadata_provider.py | 116 +------ py/services/service_registry.py | 21 ++ 5 files changed, 499 insertions(+), 115 deletions(-) create mode 100644 py/services/civarchive_client.py diff --git a/py/services/civarchive_client.py b/py/services/civarchive_client.py new file mode 100644 index 00000000..f9e99935 --- /dev/null +++ b/py/services/civarchive_client.py @@ -0,0 +1,419 @@ +import os +import json +import logging +import asyncio +from typing import Optional, Dict, Tuple, List +from .model_metadata_provider import CivArchiveModelMetadataProvider, ModelMetadataProviderManager +from .downloader import get_downloader + +try: + from bs4 import BeautifulSoup +except ImportError as exc: + BeautifulSoup = None # type: ignore[assignment] + _BS4_IMPORT_ERROR = exc +else: + _BS4_IMPORT_ERROR = None + +def _require_beautifulsoup(): + if BeautifulSoup is None: + raise RuntimeError( + "BeautifulSoup (bs4) is required for CivArchive client. " + "Install it with 'pip install beautifulsoup4'." + ) from _BS4_IMPORT_ERROR + return BeautifulSoup + +logger = logging.getLogger(__name__) + +class CivArchiveClient: + _instance = None + _lock = asyncio.Lock() + + @classmethod + async def get_instance(cls): + """Get singleton instance of CivArchiveClient""" + async with cls._lock: + if cls._instance is None: + cls._instance = cls() + + # Register this client as a metadata provider + provider_manager = await ModelMetadataProviderManager.get_instance() + provider_manager.register_provider('civarchive', CivArchiveModelMetadataProvider(cls._instance), False) + + return cls._instance + + def __init__(self): + # Check if already initialized for singleton pattern + if hasattr(self, '_initialized'): + return + self._initialized = True + + self.base_url = "https://civarchive.com/api" + + async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]: + """Find model by SHA256 hash value using CivArchive API""" + if "/" in model_hash: + metadata = await self.get_model_by_url(model_hash) + if metadata: + return metadata, None + else: + return None, f"Error fetching url: {model_hash}" + try: + # CivArchive only supports SHA256 hashes + url = f"{self.base_url}/sha256/{model_hash.lower()}" + + downloader = await get_downloader() + session = await downloader.session + async with session.get(url) as response: + if response.status != 200: + if response.status == 404: + return None, "Model not found" + return None, f"HTTP {response.status}" + + data = await response.json() + + # Extract the model and version data from CivArchive structure + model_data = data.get('model', {}) + version_data = model_data.get('version', {}) + files_data = data.get('files', {}) + + if not version_data: + if files_data: + logger.error(f"{data}") + # sometimes CivArc returns ONLY file info... but it can then be used to get the rest of the info... + # actually as of now (10/25), api broke and ONLY returns 'files' info... + for file_data in files_data: + logger.error(f"{file_data}") + if file_data["source"] == "civitai": + api_data = await self.get_model_version(file_data["model_id"], file_data["model_version_id"]) + logger.error(f"{api_data}") + logger.error(f"found CivArchive model by hash {model_hash[:10]}") + return api_data, None + else: + logger.error(f"Error fetching version of CivArchive model by hash {model_hash[:10]}") + return None, "No version data found" + + # Transform to match expected format + result = version_data.copy() + + # Add model information + result['model'] = { + 'name': model_data.get('name'), + 'type': model_data.get('type'), + 'nsfw': model_data.get('nsfw', False), + 'description': model_data.get('description'), + 'tags': model_data.get('tags', []) + } + + # Add creator information + result['creator'] = { + 'username': model_data.get('username', model_data.get('creator_username')), + 'image': '' + } + + # Rename trigger to trainedWords for consistency + if 'trigger' in result: + result['trainedWords'] = result.pop('trigger') + + # Transform stats + if 'downloadCount' in result and 'ratingCount' in result and 'rating' in result: + result['stats'] = { + 'downloadCount': result.pop('downloadCount'), + 'ratingCount': result.pop('ratingCount'), + 'rating': result.pop('rating') + } + + # Transform files to match expected format + if 'files' in result: + transformed_files = [] + for file_data in result['files']: + # Find first available mirror + available_mirror = None + for mirror in file_data.get('mirrors', []): + if mirror.get('deletedAt') is None: + available_mirror = mirror + break + + transformed_file = { + 'id': file_data.get('id'), + 'sizeKB': file_data.get('sizeKB'), + 'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'), + 'type': file_data.get('type'), + 'downloadUrl': available_mirror.get('url') if available_mirror else file_data.get('downloadUrl'), + 'primary': True, + 'mirrors': file_data.get('mirrors', []) + } + + # Transform hash format + if 'sha256' in file_data: + transformed_file['hashes'] = { + 'SHA256': file_data['sha256'].upper() + } + + transformed_files.append(transformed_file) + + result['files'] = transformed_files + + # Add source identifier + result['source'] = 'civarchive' + + return result, None + + except Exception as e: + logger.error(f"Error fetching CivArchive model by hash {model_hash[:10]}: {e}") + return None, str(e) + + async def get_model_versions(self, model_id: str) -> Optional[Dict]: + """Get all versions of a model using CivArchive API""" + try: + url = f"{self.base_url}/models/{model_id}" + + downloader = await get_downloader() + session = await downloader.session + async with session.get(url) as response: + if response.status != 200: + return None + + data = await response.json() + + # Extract versions list + versions = data.get('versions', []) + + # Return in format similar to Civitai + return { + 'modelVersions': versions, + 'type': data.get('type', ''), + 'name': data.get('name', '') + } + + except Exception as e: + logger.error(f"Error fetching CivArchive model versions for {model_id}: {e}") + return None + + async def get_model_version(self, model_id: int = None, version_id: int = None) -> Optional[Dict]: + """Get specific model version using CivArchive API + + Args: + model_id: The model ID (required) + version_id: Optional specific version ID to filter to + + Returns: + Optional[Dict]: The model version data or None if not found + """ + if model_id is None: + return None + + try: + if version_id is not None: + url = f"{self.base_url}/models/{model_id}?modelVersionId={version_id}" + else: + url = f"{self.base_url}/models/{model_id}" + + downloader = await get_downloader() + session = await downloader.session + async with session.get(url) as response: + if response.status != 200: + return None + + data = await response.json() + + # Get the version data - CivArchive returns the latest/default version in 'version' field + version_data = data.get('version', {}) + versions = data.get('versions', {}) + + # If version_id is specified, check if it matches + if version_id is not None: + if version_data.get('id') != version_id: + # Version mismatch - would need to iterate through versions or make another call + # For now, return None as CivArchive API doesn't provide easy version filtering + logger.warning(f"Requested version {version_id} doesn't match default version {version_data.get('id')} for model {model_id}") + return None + if version_data.get('modelId') != model_id: + # you can pass ANY model id, and a version number, and get the CORRECT model id from this... + # so recall the api with the correct info now + return await self.get_model_version(version_data.get('modelId'), version_id) + + # Transform to expected format + result = version_data.copy() + + # Restructure stats + if 'downloadCount' in result and 'ratingCount' in result and 'rating' in result: + result['stats'] = { + 'downloadCount': result.pop('downloadCount'), + 'ratingCount': result.pop('ratingCount'), + 'rating': result.pop('rating') + } + + # Rename trigger to trainedWords + if 'trigger' in result: + result['trainedWords'] = result.pop('trigger') + + # Transform files data + if 'files' in result: + transformed_files = [] + for file_data in result['files']: + # Find first available mirror + available_mirror = None + for mirror in file_data.get('mirrors', []): + if mirror.get('deletedAt') is None: + available_mirror = mirror + break + + transformed_file = { + 'id': file_data.get('id'), + 'sizeKB': file_data.get('sizeKB'), + 'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'), + 'type': file_data.get('type'), + 'downloadUrl': available_mirror.get('url') if available_mirror else file_data.get('downloadUrl'), + 'primary': True, + 'mirrors': file_data.get('mirrors', []) + } + + # Transform hash format + if 'sha256' in file_data: + transformed_file['hashes'] = { + 'SHA256': file_data['sha256'].upper() + } + + transformed_files.append(transformed_file) + + result['files'] = transformed_files + + # Add model information + result['model'] = { + 'name': data.get('name'), + 'type': data.get('type'), + 'nsfw': data.get('is_nsfw', False), + 'description': data.get('description'), + 'tags': data.get('tags', []) + } + + result['creator'] = { + 'username': data.get('username', data.get('creator_username')), + 'image': '' + } + + # Add source identifier + result['source'] = 'civarchive' + result['is_deleted'] = data.get('deletedAt') is not None + + return result + + except Exception as e: + logger.error(f"Error fetching CivArchive model version via API {model_id}/{version_id}: {e}") + return None + + async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]: + """ Fetch model version metadata using a known bogus model lookup + CivArchive lacks a direct version lookup API, this uses a workaround (which we handle in the main model request now) + + Args: + version_id: The model version ID + + Returns: + Tuple[Optional[Dict], Optional[str]]: (version_data, error_message) + """ + return await self.get_model_version(1, version_id) + + async def get_model_by_url(self, url) -> Optional[Dict]: + """Get specific model version by parsing CivArchive HTML page (legacy method) + + This is the original HTML scraping implementation, kept for reference and new sites added not in api. + The primary get_model_version() now uses the API instead. + """ + + try: + # Construct CivArchive URL + url = f"https://civarchive.com/{url}" + downloader = await get_downloader() + session = await downloader.session + async with session.get(url) as response: + if response.status != 200: + return None + + html_content = await response.text() + + # Parse HTML to extract JSON data + soup_parser = _require_beautifulsoup() + soup = soup_parser(html_content, 'html.parser') + script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'}) + + if not script_tag: + return None + + # Parse JSON content + json_data = json.loads(script_tag.string) + model_data = json_data.get('props', {}).get('pageProps', {}).get('model') + + if not model_data or 'version' not in model_data: + return None + + # Extract version data as base + version = model_data['version'].copy() + + # Restructure stats + if 'downloadCount' in version and 'ratingCount' in version and 'rating' in version: + version['stats'] = { + 'downloadCount': version.pop('downloadCount'), + 'ratingCount': version.pop('ratingCount'), + 'rating': version.pop('rating') + } + + # Rename trigger to trainedWords + if 'trigger' in version: + version['trainedWords'] = version.pop('trigger') + + # Transform files data to expected format + if 'files' in version: + transformed_files = [] + for file_data in version['files']: + # Find first available mirror (deletedAt is null) + available_mirror = None + for mirror in file_data.get('mirrors', []): + if mirror.get('deletedAt') is None: + available_mirror = mirror + break + + # Create transformed file entry + transformed_file = { + 'id': file_data.get('id'), + 'sizeKB': file_data.get('sizeKB'), + 'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'), + 'type': file_data.get('type'), + 'downloadUrl': available_mirror.get('url') if available_mirror else None, + 'primary': True, + 'mirrors': file_data.get('mirrors', []) + } + + # Transform hash format + if 'sha256' in file_data: + transformed_file['hashes'] = { + 'SHA256': file_data['sha256'].upper() + } + + transformed_files.append(transformed_file) + + version['files'] = transformed_files + + # Add model information + version['model'] = { + 'name': model_data.get('name'), + 'type': model_data.get('type'), + 'nsfw': model_data.get('is_nsfw', False), + 'description': model_data.get('description'), + 'tags': model_data.get('tags', []) + } + + version['creator'] = { + 'username': model_data.get('username'), + 'image': '' + } + + # Add source identifier + version['source'] = 'civarchive' + version['is_deleted'] = json_data.get('query', {}).get('is_deleted', False) + + return version + + except Exception as e: + logger.error(f"Error fetching CivArchive model version (scraping) {model_id}/{version_id}: {e}") + return None diff --git a/py/services/metadata_service.py b/py/services/metadata_service.py index 5f5ae727..5cfd716d 100644 --- a/py/services/metadata_service.py +++ b/py/services/metadata_service.py @@ -4,6 +4,7 @@ from .model_metadata_provider import ( ModelMetadataProviderManager, SQLiteModelMetadataProvider, CivitaiModelMetadataProvider, + CivArchiveModelMetadataProvider, FallbackMetadataProvider ) from .settings_manager import get_settings_manager @@ -54,26 +55,28 @@ async def initialize_metadata_providers(): except Exception as e: logger.error(f"Failed to initialize Civitai API metadata provider: {e}") - # Register CivArchive provider, but do NOT add to fallback providers + # Register CivArchive provider, and all add to fallback providers try: - from .model_metadata_provider import CivArchiveModelMetadataProvider - civarchive_provider = CivArchiveModelMetadataProvider() - provider_manager.register_provider('civarchive', civarchive_provider) - logger.debug("CivArchive metadata provider registered (not included in fallback)") + civarchive_client = await ServiceRegistry.get_civarchive_client() + civarchive_provider = CivitaiModelMetadataProvider(civarchive_client) + provider_manager.register_provider('civarchive_api', civarchive_provider) + providers.append(('civarchive_api', civarchive_provider)) + logger.debug("CivArchive metadata provider registered (also included in fallback)") except Exception as e: logger.error(f"Failed to initialize CivArchive metadata provider: {e}") # Set up fallback provider based on available providers if len(providers) > 1: - # Always use Civitai API first, then Archive DB + # Always use Civarchive, then Civitai API, then Archive DB ordered_providers = [] + ordered_providers.extend([p[1] for p in providers if p[0] == 'civarchive_api']) ordered_providers.extend([p[1] for p in providers if p[0] == 'civitai_api']) ordered_providers.extend([p[1] for p in providers if p[0] == 'sqlite']) if ordered_providers: fallback_provider = FallbackMetadataProvider(ordered_providers) provider_manager.register_provider('fallback', fallback_provider, is_default=True) - logger.debug(f"Fallback metadata provider registered with {len(ordered_providers)} providers, Civitai API first") + logger.info(f"Fallback metadata provider registered with {len(ordered_providers)} providers, Civarchive first") elif len(providers) == 1: # Only one provider available, set it as default provider_name, provider = providers[0] diff --git a/py/services/metadata_sync_service.py b/py/services/metadata_sync_service.py index 738f3b86..def0a010 100644 --- a/py/services/metadata_sync_service.py +++ b/py/services/metadata_sync_service.py @@ -5,6 +5,7 @@ from __future__ import annotations import json import logging import os +import asyncio from datetime import datetime from typing import Any, Awaitable, Callable, Dict, Iterable, Optional @@ -169,6 +170,46 @@ class MetadataSyncService: enable_archive = self._settings.get("enable_metadata_archive_db", False) try: + metadata_provider = await self._get_provider("civarchive_api") + tryagain = True + delay = 5 + + while tryagain: + civitai_metadata, error = await metadata_provider.get_model_by_hash(sha256) + tryagain = False + if not civitai_metadata or error: + if error == "HTTP 429": + error_msg = (f"Error fetching metadata: {error} (model_name={model_data.get('model_name', '')} sha256={sha256})") + logger.error(error_msg) + delay = delay * 2 + await asyncio.sleep(delay) + tryagain = True + continue + if error == "Model not found": + model_data["from_civitai"] = False + model_data["civitai_deleted"] = True + #model_data["db_checked"] = enable_archive + model_data["last_checked_at"] = datetime.now().timestamp() + data_to_save = model_data.copy() + data_to_save.pop("folder", None) + await self._metadata_manager.save_metadata(file_path, data_to_save) + await asyncio.sleep(1) + if error == "No version data found": + error_msg = (f"Error - No civitai version found: (model_name={model_data.get('model_name', '')} sha256={sha256})") + logger.error(error_msg) + error = False + if civitai_metadata.get('files'): + for file in civitai_metadata['files']: + logger.error(f"{file}") + if 'tensorart' in file['url'] or "seaart" in file['url']: + civitai_metadata, error = await metadata_provider.get_model_by_hash(file['url']) + error_msg = (f"Error fetching metadata: {error} {civitai_metadata}") + logger.error(error_msg) + if error or not civitai_metadata: + error_msg = (f"Error fetching metadata: {error} (model_name={model_data.get('model_name', '')} sha256={sha256})") + logger.error(error_msg) + return False, error_msg + if model_data.get("civitai_deleted") is True: if not enable_archive or model_data.get("db_checked") is True: if not enable_archive: diff --git a/py/services/model_metadata_provider.py b/py/services/model_metadata_provider.py index 99b3488c..73d9b7d8 100644 --- a/py/services/model_metadata_provider.py +++ b/py/services/model_metadata_provider.py @@ -88,122 +88,22 @@ class CivitaiModelMetadataProvider(ModelMetadataProvider): return await self.client.get_user_models(username) class CivArchiveModelMetadataProvider(ModelMetadataProvider): - """Provider that uses CivArchive HTML page parsing for metadata""" + """Provider that uses CivArchive API for metadata""" + def __init__(self, civarchive_client): + self.client = civarchive_client + async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]: - """Not supported by CivArchive provider""" - return None, "CivArchive provider does not support hash lookup" + return await self.client.get_model_by_hash(model_hash) async def get_model_versions(self, model_id: str) -> Optional[Dict]: - """Not supported by CivArchive provider""" - return None + return await self.client.get_model_versions(model_id) async def get_model_version(self, model_id: int = None, version_id: int = None) -> Optional[Dict]: - """Get specific model version by parsing CivArchive HTML page""" - if model_id is None or version_id is None: - return None - - try: - # Construct CivArchive URL - url = f"https://civarchive.com/models/{model_id}?modelVersionId={version_id}" - - downloader = await get_downloader() - session = await downloader.session - async with session.get(url) as response: - if response.status != 200: - return None - - html_content = await response.text() - - # Parse HTML to extract JSON data - soup_parser = _require_beautifulsoup() - soup = soup_parser(html_content, 'html.parser') - script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'}) - - if not script_tag: - return None - - # Parse JSON content - json_data = json.loads(script_tag.string) - model_data = json_data.get('props', {}).get('pageProps', {}).get('model') - - if not model_data or 'version' not in model_data: - return None - - # Extract version data as base - version = model_data['version'].copy() - - # Restructure stats - if 'downloadCount' in version and 'ratingCount' in version and 'rating' in version: - version['stats'] = { - 'downloadCount': version.pop('downloadCount'), - 'ratingCount': version.pop('ratingCount'), - 'rating': version.pop('rating') - } - - # Rename trigger to trainedWords - if 'trigger' in version: - version['trainedWords'] = version.pop('trigger') - - # Transform files data to expected format - if 'files' in version: - transformed_files = [] - for file_data in version['files']: - # Find first available mirror (deletedAt is null) - available_mirror = None - for mirror in file_data.get('mirrors', []): - if mirror.get('deletedAt') is None: - available_mirror = mirror - break - - # Create transformed file entry - transformed_file = { - 'id': file_data.get('id'), - 'sizeKB': file_data.get('sizeKB'), - 'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'), - 'type': file_data.get('type'), - 'downloadUrl': available_mirror.get('url') if available_mirror else None, - 'primary': True, - 'mirrors': file_data.get('mirrors', []) - } - - # Transform hash format - if 'sha256' in file_data: - transformed_file['hashes'] = { - 'SHA256': file_data['sha256'].upper() - } - - transformed_files.append(transformed_file) - - version['files'] = transformed_files - - # Add model information - version['model'] = { - 'name': model_data.get('name'), - 'type': model_data.get('type'), - 'nsfw': model_data.get('is_nsfw', False), - 'description': model_data.get('description'), - 'tags': model_data.get('tags', []) - } - - version['creator'] = { - 'username': model_data.get('username'), - 'image': '' - } - - # Add source identifier - version['source'] = 'civarchive' - version['is_deleted'] = json_data.get('query', {}).get('is_deleted', False) - - return version - - except Exception as e: - logger.error(f"Error fetching CivArchive model version {model_id}/{version_id}: {e}") - return None + return await self.client.get_model_version(model_id, version_id) async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]: - """Not supported by CivArchive provider - requires both model_id and version_id""" - return None, "CivArchive provider requires both model_id and version_id" + return await self.client.get_model_version_info(version_id) async def get_user_models(self, username: str) -> Optional[List[Dict]]: """Not supported by CivArchive provider""" diff --git a/py/services/service_registry.py b/py/services/service_registry.py index 2cb102ae..d3d65e65 100644 --- a/py/services/service_registry.py +++ b/py/services/service_registry.py @@ -144,6 +144,27 @@ class ServiceRegistry: cls._services[service_name] = client logger.debug(f"Created and registered {service_name}") return client + + @classmethod + async def get_civarchive_client(cls): + """Get or create CivArchive client instance""" + service_name = "civarchive_client" + + if service_name in cls._services: + return cls._services[service_name] + + async with cls._get_lock(service_name): + # Double-check after acquiring lock + if service_name in cls._services: + return cls._services[service_name] + + # Import here to avoid circular imports + from .civarchive_client import CivArchiveClient + + client = await CivArchiveClient.get_instance() + cls._services[service_name] = client + logger.debug(f"Created and registered {service_name}") + return client @classmethod async def get_download_manager(cls): From 7d560bf07a170cfe5199a2f9a094a64d9484d68f Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Sat, 11 Oct 2025 12:59:13 +0800 Subject: [PATCH 2/7] chore: add refs --- refs/civarc_api_model_data.json | 134 ++++++++++++++++++++++++++++++++ refs/target_version.json | 38 +++++++++ 2 files changed, 172 insertions(+) create mode 100644 refs/civarc_api_model_data.json create mode 100644 refs/target_version.json diff --git a/refs/civarc_api_model_data.json b/refs/civarc_api_model_data.json new file mode 100644 index 00000000..9acbe6e2 --- /dev/null +++ b/refs/civarc_api_model_data.json @@ -0,0 +1,134 @@ +{ + "id": 1746460, + "name": "Mixplin Style [Illustrious]", + "type": "LORA", + "description": "description", + "username": "Ty_Lee", + "downloadCount": 4207, + "favoriteCount": 0, + "commentCount": 8, + "ratingCount": 0, + "rating": 0, + "is_nsfw": true, + "nsfw_level": 31, + "createdAt": "2025-07-06T01:51:42.859Z", + "updatedAt": "2025-10-10T23:15:26.714Z", + "deletedAt": null, + "tags": [ + "art", + "style", + "artist style", + "styles", + "mixplin", + "artiststyle" + ], + "creator_id": "Ty_Lee", + "creator_username": "Ty_Lee", + "creator_name": "Ty_Lee", + "creator_url": "/users/Ty_Lee", + "versions": [ + { + "id": 2042594, + "name": "v2.0", + "href": "/models/1746460?modelVersionId=2042594" + }, + { + "id": 1976567, + "name": "v1.0", + "href": "/models/1746460?modelVersionId=1976567" + } + ], + "version": { + "id": 1976567, + "modelId": 1746460, + "name": "v1.0", + "baseModel": "Illustrious", + "baseModelType": "Standard", + "description": null, + "downloadCount": 437, + "ratingCount": 0, + "rating": 0, + "is_nsfw": true, + "nsfw_level": 31, + "createdAt": "2025-07-05T10:17:28.716Z", + "updatedAt": "2025-10-10T23:15:26.756Z", + "deletedAt": null, + "files": [ + { + "id": 1874043, + "name": "mxpln-illustrious-ty_lee.safetensors", + "type": "Model", + "sizeKB": 223124.37109375, + "downloadUrl": "https://civitai.com/api/download/models/1976567", + "modelId": 1746460, + "modelName": "Mixplin Style [Illustrious]", + "modelVersionId": 1976567, + "is_nsfw": true, + "nsfw_level": 31, + "sha256": "e2b7a280d6539556f23f380b3f71e4e22bc4524445c4c96526e117c6005c6ad3", + "createdAt": "2025-07-05T10:17:28.716Z", + "updatedAt": "2025-10-10T23:15:26.766Z", + "is_primary": false, + "mirrors": [ + { + "filename": "mxpln-illustrious-ty_lee.safetensors", + "url": "https://civitai.com/api/download/models/1976567", + "source": "civitai", + "model_id": 1746460, + "model_version_id": 1976567, + "deletedAt": null, + "is_gated": false, + "is_paid": false + } + ] + } + ], + "images": [ + { + "id": 86403595, + "url": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=", + "nsfwLevel": 1, + "width": 1560, + "height": 2280, + "hash": "U7G8Zp0w02%IA6%N00-;D]-W~VNG0nMw-.IV", + "type": "image", + "minor": false, + "poi": false, + "hasMeta": true, + "hasPositivePrompt": true, + "onSite": false, + "remixOfId": null, + "image_url": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=", + "link": "https://genur.art/posts/86403595" + } + ], + "trigger": [ + "mxpln" + ], + "allow_download": true, + "download_url": "/api/download/models/1976567", + "platform_url": "https://civitai.com/models/1746460?modelVersionId=1976567", + "civitai_model_id": 1746460, + "civitai_model_version_id": 1976567, + "href": "/models/1746460?modelVersionId=1976567", + "mirrors": [ + { + "platform": "tensorart", + "href": "/tensorart/models/904473536033245448/versions/904473536033245448", + "platform_url": "https://tensor.art/models/904473536033245448", + "name": "Mixplin Style MXP", + "version_name": "Mixplin", + "id": "904473536033245448", + "version_id": "904473536033245448" + } + ] + }, + "platform": "civitai", + "platform_name": "CivitAI", + "meta": { + "title": "Mixplin Style [Illustrious] - v1.0 - CivitAI Archive", + "description": "Mixplin Style [Illustrious] v1.0 is a Illustrious LORA AI model created by Ty_Lee for generating images of art, style, artist style, styles, mixplin, artiststyle", + "image": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=", + "canonical": "https://civarchive.com/models/1746460?modelVersionId=1976567" + } +} \ No newline at end of file diff --git a/refs/target_version.json b/refs/target_version.json new file mode 100644 index 00000000..6db1f1b1 --- /dev/null +++ b/refs/target_version.json @@ -0,0 +1,38 @@ +{ + "id": 2269146, + "modelId": 2004760, + "name": "v1.0 Illustrious", + "nsfwLevel": 1, + "trainedWords": ["PencilSketchDaal"], + "baseModel": "Illustrious", + "description": "

Illustrious. Your pencil may vary with your checkpoint.

", + "model": { + "name": "Pencil Sketch Anime", + "type": "LORA", + "nsfw": false, + "description": "description", + "tags": ["style"] + }, + "files": [ + { + "id": 2161260, + "sizeKB": 223106.37890625, + "name": "Pencil-Sketch-Illustrious.safetensors", + "type": "Model", + "hashes": { + "SHA256": "2C70479CD673B0FE056EAF4FD97C7F33A39F14853805431AC9AB84226ECE3B82" + }, + "primary": true, + "downloadUrl": "https://civitai.com/api/download/models/2269146", + "mirrors": {} + } + ], + "images": [ + {}, + {} + ], + "creator": { + "username": "Daalis", + "image": "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/eb245b49-edc8-4ed6-ad7b-6d61eb8c51de/width=96/Daalis.jpeg" + } +} From 1f60160e8b90f333390c142a493a1c21a0592000 Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Sat, 11 Oct 2025 13:07:29 +0800 Subject: [PATCH 3/7] feat(civarchive_client): enhance request handling and context parsing Introduce `_request_json` method for async JSON requests and improved error handling. Add static methods `_normalize_payload`, `_split_context`, `_ensure_list`, and `_build_model_info` to parse and normalize API responses. These changes improve the robustness of the CivArchiveClient by ensuring consistent data structures and handling potential API response issues gracefully. --- py/services/civarchive_client.py | 540 ++++++++++++++--------- py/services/download_manager.py | 2 +- tests/services/test_civarchive_client.py | 239 ++++++++++ 3 files changed, 561 insertions(+), 220 deletions(-) create mode 100644 tests/services/test_civarchive_client.py diff --git a/py/services/civarchive_client.py b/py/services/civarchive_client.py index f9e99935..14b5971e 100644 --- a/py/services/civarchive_client.py +++ b/py/services/civarchive_client.py @@ -2,6 +2,7 @@ import os import json import logging import asyncio +from copy import deepcopy from typing import Optional, Dict, Tuple, List from .model_metadata_provider import CivArchiveModelMetadataProvider, ModelMetadataProviderManager from .downloader import get_downloader @@ -49,115 +50,256 @@ class CivArchiveClient: self.base_url = "https://civarchive.com/api" + async def _request_json( + self, + path: str, + params: Optional[Dict[str, str]] = None + ) -> Tuple[Optional[Dict], Optional[str]]: + """Call CivArchive API and return JSON payload""" + downloader = await get_downloader() + kwargs: Dict[str, Dict[str, str]] = {} + if params: + safe_params = {str(key): str(value) for key, value in params.items() if value is not None} + if safe_params: + kwargs["params"] = safe_params + success, payload = await downloader.make_request( + "GET", + f"{self.base_url}{path}", + use_auth=False, + **kwargs + ) + if not success: + error = payload if isinstance(payload, str) else "Request failed" + return None, error + if not isinstance(payload, dict): + return None, "Invalid response structure" + return payload, None + + @staticmethod + def _normalize_payload(payload: Dict) -> Dict: + """Unwrap CivArchive responses that wrap content under a data key""" + if not isinstance(payload, dict): + return {} + data = payload.get("data") + if isinstance(data, dict): + return data + return payload + + @staticmethod + def _split_context(payload: Dict) -> Tuple[Dict, Dict, List[Dict]]: + """Separate version payload from surrounding model context""" + data = CivArchiveClient._normalize_payload(payload) + context: Dict = {} + fallback_files: List[Dict] = [] + version: Dict = {} + + for key, value in data.items(): + if key in {"version", "model"}: + continue + context[key] = value + + if isinstance(data.get("version"), dict): + version = data["version"] + + model_block = data.get("model") + if isinstance(model_block, dict): + for key, value in model_block.items(): + if key == "version": + if not version and isinstance(value, dict): + version = value + continue + context.setdefault(key, value) + fallback_files = fallback_files or model_block.get("files") or [] + + fallback_files = fallback_files or data.get("files") or [] + return context, version, fallback_files + + @staticmethod + def _ensure_list(value) -> List: + if isinstance(value, list): + return value + if value is None: + return [] + return [value] + + @staticmethod + def _build_model_info(context: Dict) -> Dict: + tags = context.get("tags") + if not isinstance(tags, list): + tags = list(tags) if isinstance(tags, (set, tuple)) else ([] if tags is None else [tags]) + return { + "name": context.get("name"), + "type": context.get("type"), + "nsfw": bool(context.get("is_nsfw", context.get("nsfw", False))), + "description": context.get("description"), + "tags": tags, + } + + @staticmethod + def _build_creator_info(context: Dict) -> Dict: + username = context.get("creator_username") or context.get("username") or "" + image = context.get("creator_image") or context.get("creator_avatar") or "" + creator: Dict[str, Optional[str]] = { + "username": username, + "image": image, + } + if context.get("creator_name"): + creator["name"] = context["creator_name"] + if context.get("creator_url"): + creator["url"] = context["creator_url"] + return creator + + @staticmethod + def _transform_file_entry(file_data: Dict) -> Dict: + mirrors = file_data.get("mirrors") or [] + if not isinstance(mirrors, list): + mirrors = [mirrors] + available_mirror = next( + (mirror for mirror in mirrors if isinstance(mirror, dict) and mirror.get("deletedAt") is None), + None + ) + download_url = file_data.get("downloadUrl") + if not download_url and available_mirror: + download_url = available_mirror.get("url") + name = file_data.get("name") + if not name and available_mirror: + name = available_mirror.get("filename") + + transformed: Dict = { + "id": file_data.get("id"), + "sizeKB": file_data.get("sizeKB"), + "name": name, + "type": file_data.get("type"), + "downloadUrl": download_url, + "primary": True, + # TODO: for some reason is_primary is false in CivArchive response, need to figure this out, + # "primary": bool(file_data.get("is_primary", file_data.get("primary", False))), + "mirrors": mirrors, + } + + sha256 = file_data.get("sha256") + if sha256: + transformed["hashes"] = {"SHA256": str(sha256).upper()} + elif isinstance(file_data.get("hashes"), dict): + transformed["hashes"] = file_data["hashes"] + + if "metadata" in file_data: + transformed["metadata"] = file_data["metadata"] + + if file_data.get("modelVersionId") is not None: + transformed["modelVersionId"] = file_data.get("modelVersionId") + elif file_data.get("model_version_id") is not None: + transformed["modelVersionId"] = file_data.get("model_version_id") + + if file_data.get("modelId") is not None: + transformed["modelId"] = file_data.get("modelId") + elif file_data.get("model_id") is not None: + transformed["modelId"] = file_data.get("model_id") + + return transformed + + def _transform_files( + self, + files: Optional[List[Dict]], + fallback_files: Optional[List[Dict]] = None + ) -> List[Dict]: + candidates: List[Dict] = [] + if isinstance(files, list) and files: + candidates = files + elif isinstance(fallback_files, list): + candidates = fallback_files + + transformed_files: List[Dict] = [] + for file_data in candidates: + if isinstance(file_data, dict): + transformed_files.append(self._transform_file_entry(file_data)) + return transformed_files + + def _transform_version( + self, + context: Dict, + version: Dict, + fallback_files: Optional[List[Dict]] = None + ) -> Optional[Dict]: + if not version: + return None + + version_copy = deepcopy(version) + version_copy.pop("model", None) + version_copy.pop("creator", None) + + if "trigger" in version_copy: + triggers = version_copy.pop("trigger") + if isinstance(triggers, list): + version_copy["trainedWords"] = triggers + elif triggers is None: + version_copy["trainedWords"] = [] + else: + version_copy["trainedWords"] = [triggers] + + if "trainedWords" in version_copy and isinstance(version_copy["trainedWords"], str): + version_copy["trainedWords"] = [version_copy["trainedWords"]] + + if "nsfw_level" in version_copy: + version_copy["nsfwLevel"] = version_copy.pop("nsfw_level") + elif "nsfwLevel" not in version_copy and context.get("nsfw_level") is not None: + version_copy["nsfwLevel"] = context.get("nsfw_level") + + stats_keys = ["downloadCount", "ratingCount", "rating"] + stats = {key: version_copy.pop(key) for key in stats_keys if key in version_copy} + if stats: + version_copy["stats"] = stats + + version_copy["files"] = self._transform_files(version_copy.get("files"), fallback_files) + version_copy["images"] = self._ensure_list(version_copy.get("images")) + + version_copy["model"] = self._build_model_info(context) + version_copy["creator"] = self._build_creator_info(context) + + version_copy["source"] = "civarchive" + version_copy["is_deleted"] = bool(context.get("deletedAt")) or bool(version.get("deletedAt")) + + return version_copy + + async def _resolve_version_from_files(self, payload: Dict) -> Optional[Dict]: + """Fallback to fetch version data when only file metadata is available""" + data = self._normalize_payload(payload) + files = data.get("files") or payload.get("files") or [] + if not isinstance(files, list): + files = [files] + for file_data in files: + if not isinstance(file_data, dict): + continue + model_id = file_data.get("model_id") or file_data.get("modelId") + version_id = file_data.get("model_version_id") or file_data.get("modelVersionId") + if model_id is None or version_id is None: + continue + resolved = await self.get_model_version(model_id, version_id) + if resolved: + return resolved + return None + async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]: """Find model by SHA256 hash value using CivArchive API""" - if "/" in model_hash: - metadata = await self.get_model_by_url(model_hash) - if metadata: - return metadata, None - else: - return None, f"Error fetching url: {model_hash}" try: - # CivArchive only supports SHA256 hashes - url = f"{self.base_url}/sha256/{model_hash.lower()}" - - downloader = await get_downloader() - session = await downloader.session - async with session.get(url) as response: - if response.status != 200: - if response.status == 404: - return None, "Model not found" - return None, f"HTTP {response.status}" - - data = await response.json() - - # Extract the model and version data from CivArchive structure - model_data = data.get('model', {}) - version_data = model_data.get('version', {}) - files_data = data.get('files', {}) + payload, error = await self._request_json(f"/sha256/{model_hash.lower()}") + if error: + if "not found" in error.lower(): + return None, "Model not found" + return None, error + + context, version_data, fallback_files = self._split_context(payload) + transformed = self._transform_version(context, version_data, fallback_files) + if transformed: + return transformed, None + + resolved = await self._resolve_version_from_files(payload) + if resolved: + return resolved, None + + logger.error("Error fetching version of CivArchive model by hash %s", model_hash[:10]) + return None, "No version data found" - if not version_data: - if files_data: - logger.error(f"{data}") - # sometimes CivArc returns ONLY file info... but it can then be used to get the rest of the info... - # actually as of now (10/25), api broke and ONLY returns 'files' info... - for file_data in files_data: - logger.error(f"{file_data}") - if file_data["source"] == "civitai": - api_data = await self.get_model_version(file_data["model_id"], file_data["model_version_id"]) - logger.error(f"{api_data}") - logger.error(f"found CivArchive model by hash {model_hash[:10]}") - return api_data, None - else: - logger.error(f"Error fetching version of CivArchive model by hash {model_hash[:10]}") - return None, "No version data found" - - # Transform to match expected format - result = version_data.copy() - - # Add model information - result['model'] = { - 'name': model_data.get('name'), - 'type': model_data.get('type'), - 'nsfw': model_data.get('nsfw', False), - 'description': model_data.get('description'), - 'tags': model_data.get('tags', []) - } - - # Add creator information - result['creator'] = { - 'username': model_data.get('username', model_data.get('creator_username')), - 'image': '' - } - - # Rename trigger to trainedWords for consistency - if 'trigger' in result: - result['trainedWords'] = result.pop('trigger') - - # Transform stats - if 'downloadCount' in result and 'ratingCount' in result and 'rating' in result: - result['stats'] = { - 'downloadCount': result.pop('downloadCount'), - 'ratingCount': result.pop('ratingCount'), - 'rating': result.pop('rating') - } - - # Transform files to match expected format - if 'files' in result: - transformed_files = [] - for file_data in result['files']: - # Find first available mirror - available_mirror = None - for mirror in file_data.get('mirrors', []): - if mirror.get('deletedAt') is None: - available_mirror = mirror - break - - transformed_file = { - 'id': file_data.get('id'), - 'sizeKB': file_data.get('sizeKB'), - 'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'), - 'type': file_data.get('type'), - 'downloadUrl': available_mirror.get('url') if available_mirror else file_data.get('downloadUrl'), - 'primary': True, - 'mirrors': file_data.get('mirrors', []) - } - - # Transform hash format - if 'sha256' in file_data: - transformed_file['hashes'] = { - 'SHA256': file_data['sha256'].upper() - } - - transformed_files.append(transformed_file) - - result['files'] = transformed_files - - # Add source identifier - result['source'] = 'civarchive' - - return result, None - except Exception as e: logger.error(f"Error fetching CivArchive model by hash {model_hash[:10]}: {e}") return None, str(e) @@ -165,26 +307,49 @@ class CivArchiveClient: async def get_model_versions(self, model_id: str) -> Optional[Dict]: """Get all versions of a model using CivArchive API""" try: - url = f"{self.base_url}/models/{model_id}" - - downloader = await get_downloader() - session = await downloader.session - async with session.get(url) as response: - if response.status != 200: + payload, error = await self._request_json(f"/models/{model_id}") + if error or payload is None: + if error and "not found" in error.lower(): return None - - data = await response.json() - - # Extract versions list - versions = data.get('versions', []) - - # Return in format similar to Civitai + logger.error(f"Error fetching CivArchive model versions for {model_id}: {error}") + return None + + data = self._normalize_payload(payload) + context, version_data, fallback_files = self._split_context(payload) + + versions_meta = data.get("versions") or [] + transformed_versions: List[Dict] = [] + for meta in versions_meta: + if not isinstance(meta, dict): + continue + version_id = meta.get("id") + if version_id is None: + continue + target_model_id = meta.get("modelId") or model_id + version = await self.get_model_version(target_model_id, version_id) + if version: + transformed_versions.append(version) + + # Ensure the primary version is included even if versions list was empty + primary_version = self._transform_version(context, version_data, fallback_files) + if primary_version: + transformed_versions.insert(0, primary_version) + + ordered_versions: List[Dict] = [] + seen_ids = set() + for version in transformed_versions: + version_id = version.get("id") + if version_id in seen_ids: + continue + seen_ids.add(version_id) + ordered_versions.append(version) + return { - 'modelVersions': versions, - 'type': data.get('type', ''), - 'name': data.get('name', '') + "modelVersions": ordered_versions, + "type": context.get("type", ""), + "name": context.get("name", ""), } - + except Exception as e: logger.error(f"Error fetching CivArchive model versions for {model_id}: {e}") return None @@ -201,103 +366,37 @@ class CivArchiveClient: """ if model_id is None: return None - + try: - if version_id is not None: - url = f"{self.base_url}/models/{model_id}?modelVersionId={version_id}" - else: - url = f"{self.base_url}/models/{model_id}" - - downloader = await get_downloader() - session = await downloader.session - async with session.get(url) as response: - if response.status != 200: + params = {"modelVersionId": version_id} if version_id is not None else None + payload, error = await self._request_json(f"/models/{model_id}", params=params) + if error or payload is None: + if error and "not found" in error.lower(): return None - - data = await response.json() - - # Get the version data - CivArchive returns the latest/default version in 'version' field - version_data = data.get('version', {}) - versions = data.get('versions', {}) - - # If version_id is specified, check if it matches + logger.error(f"Error fetching CivArchive model version via API {model_id}/{version_id}: {error}") + return None + + context, version_data, fallback_files = self._split_context(payload) + + if not version_data: + return await self._resolve_version_from_files(payload) + if version_id is not None: - if version_data.get('id') != version_id: - # Version mismatch - would need to iterate through versions or make another call - # For now, return None as CivArchive API doesn't provide easy version filtering - logger.warning(f"Requested version {version_id} doesn't match default version {version_data.get('id')} for model {model_id}") - return None - if version_data.get('modelId') != model_id: - # you can pass ANY model id, and a version number, and get the CORRECT model id from this... - # so recall the api with the correct info now - return await self.get_model_version(version_data.get('modelId'), version_id) - - # Transform to expected format - result = version_data.copy() - - # Restructure stats - if 'downloadCount' in result and 'ratingCount' in result and 'rating' in result: - result['stats'] = { - 'downloadCount': result.pop('downloadCount'), - 'ratingCount': result.pop('ratingCount'), - 'rating': result.pop('rating') - } - - # Rename trigger to trainedWords - if 'trigger' in result: - result['trainedWords'] = result.pop('trigger') - - # Transform files data - if 'files' in result: - transformed_files = [] - for file_data in result['files']: - # Find first available mirror - available_mirror = None - for mirror in file_data.get('mirrors', []): - if mirror.get('deletedAt') is None: - available_mirror = mirror - break - - transformed_file = { - 'id': file_data.get('id'), - 'sizeKB': file_data.get('sizeKB'), - 'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'), - 'type': file_data.get('type'), - 'downloadUrl': available_mirror.get('url') if available_mirror else file_data.get('downloadUrl'), - 'primary': True, - 'mirrors': file_data.get('mirrors', []) - } - - # Transform hash format - if 'sha256' in file_data: - transformed_file['hashes'] = { - 'SHA256': file_data['sha256'].upper() - } - - transformed_files.append(transformed_file) - - result['files'] = transformed_files - - # Add model information - result['model'] = { - 'name': data.get('name'), - 'type': data.get('type'), - 'nsfw': data.get('is_nsfw', False), - 'description': data.get('description'), - 'tags': data.get('tags', []) - } - - result['creator'] = { - 'username': data.get('username', data.get('creator_username')), - 'image': '' - } - - # Add source identifier - result['source'] = 'civarchive' - result['is_deleted'] = data.get('deletedAt') is not None - - return result - + raw_id = version_data.get("id") + if raw_id != version_id: + logger.warning( + "Requested version %s doesn't match default version %s for model %s", + version_id, + raw_id, + model_id, + ) + return None + actual_model_id = version_data.get("modelId") + if actual_model_id is not None and str(actual_model_id) != str(model_id): + return await self.get_model_version(actual_model_id, version_id) + + return self._transform_version(context, version_data, fallback_files) + except Exception as e: logger.error(f"Error fetching CivArchive model version via API {model_id}/{version_id}: {e}") return None @@ -312,7 +411,10 @@ class CivArchiveClient: Returns: Tuple[Optional[Dict], Optional[str]]: (version_data, error_message) """ - return await self.get_model_version(1, version_id) + version = await self.get_model_version(1, version_id) + if version is None: + return None, "Model not found" + return version, None async def get_model_by_url(self, url) -> Optional[Dict]: """Get specific model version by parsing CivArchive HTML page (legacy method) @@ -380,7 +482,7 @@ class CivArchiveClient: 'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'), 'type': file_data.get('type'), 'downloadUrl': available_mirror.get('url') if available_mirror else None, - 'primary': True, + 'primary': file_data.get('is_primary', False), 'mirrors': file_data.get('mirrors', []) } @@ -415,5 +517,5 @@ class CivArchiveClient: return version except Exception as e: - logger.error(f"Error fetching CivArchive model version (scraping) {model_id}/{version_id}: {e}") + logger.error(f"Error fetching CivArchive model version (scraping) {url}: {e}") return None diff --git a/py/services/download_manager.py b/py/services/download_manager.py index 239c17d3..bf57e0a7 100644 --- a/py/services/download_manager.py +++ b/py/services/download_manager.py @@ -294,7 +294,7 @@ class DownloadManager: await progress_callback(0) # 2. Get file information - file_info = next((f for f in version_info.get('files', []) if f.get('primary')), None) + file_info = next((f for f in version_info.get('files', []) if f.get('primary') and f.get('type') == 'Model'), None) if not file_info: return {'success': False, 'error': 'No primary file found in metadata'} mirrors = file_info.get('mirrors') or [] diff --git a/tests/services/test_civarchive_client.py b/tests/services/test_civarchive_client.py new file mode 100644 index 00000000..fe4fa647 --- /dev/null +++ b/tests/services/test_civarchive_client.py @@ -0,0 +1,239 @@ +import copy +from unittest.mock import AsyncMock + +import pytest + +from py.services import civarchive_client as civarchive_client_module +from py.services.civarchive_client import CivArchiveClient +from py.services.model_metadata_provider import ModelMetadataProviderManager + + +class DummyDownloader: + def __init__(self): + self.calls = [] + + async def make_request(self, method, url, use_auth=False, **kwargs): + self.calls.append({"method": method, "url": url, "params": kwargs.get("params")}) + return True, {} + + +@pytest.fixture(autouse=True) +def reset_singletons(): + CivArchiveClient._instance = None + ModelMetadataProviderManager._instance = None + yield + CivArchiveClient._instance = None + ModelMetadataProviderManager._instance = None + + +@pytest.fixture +def downloader(monkeypatch): + instance = DummyDownloader() + monkeypatch.setattr(civarchive_client_module, "get_downloader", AsyncMock(return_value=instance)) + return instance + + +def _base_civarchive_payload(version_id=1976567, *, trigger="mxpln", nsfw_level=31): + version_name = "v2.0" if version_id != 1976567 else "v1.0" + file_sha = "e2b7a280d6539556f23f380b3f71e4e22bc4524445c4c96526e117c6005c6ad3" + return { + "data": { + "id": 1746460, + "name": "Mixplin Style [Illustrious]", + "type": "LORA", + "description": "description", + "is_nsfw": True, + "nsfw_level": nsfw_level, + "tags": ["art", "style"], + "creator_username": "Ty_Lee", + "creator_name": "Ty_Lee", + "creator_url": "/users/Ty_Lee", + "version": { + "id": version_id, + "modelId": 1746460, + "name": version_name, + "baseModel": "Illustrious", + "description": "version description", + "downloadCount": 437, + "ratingCount": 0, + "rating": 0, + "nsfw_level": nsfw_level, + "trigger": [trigger], + "files": [ + { + "id": 1874043, + "name": "mxpln-illustrious-ty_lee.safetensors", + "type": "Model", + "sizeKB": 223124.37109375, + "downloadUrl": "https://civitai.com/api/download/models/1976567", + "sha256": file_sha, + "is_primary": False, + "mirrors": [ + { + "filename": "mxpln-illustrious-ty_lee.safetensors", + "url": "https://civitai.com/api/download/models/1976567", + "deletedAt": None, + } + ], + } + ], + "images": [ + { + "id": 86403595, + "url": "https://img.genur.art/example.png", + "nsfwLevel": 1, + } + ], + }, + "versions": [ + {"id": 2042594, "name": "v2.0"}, + {"id": 1976567, "name": "v1.0"}, + ], + } + } + + +async def test_get_model_by_hash_transforms_payload(downloader): + payload = _base_civarchive_payload() + + async def fake_make_request(method, url, use_auth=False, **kwargs): + downloader.calls.append({"url": url, "params": kwargs.get("params")}) + if url.endswith("/sha256/abc"): + return True, copy.deepcopy(payload) + return False, "unexpected" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result, error = await client.get_model_by_hash("abc") + + assert error is None + assert result["id"] == 1976567 + assert result["nsfwLevel"] == 31 + assert result["trainedWords"] == ["mxpln"] + assert result["stats"] == {"downloadCount": 437, "ratingCount": 0, "rating": 0} + assert result["model"]["name"] == "Mixplin Style [Illustrious]" + assert result["model"]["nsfw"] is True + assert result["creator"]["username"] == "Ty_Lee" + assert result["creator"]["image"] == "" + file_meta = result["files"][0] + assert file_meta["hashes"]["SHA256"] == "E2B7A280D6539556F23F380B3F71E4E22BC4524445C4C96526E117C6005C6AD3" + assert file_meta["mirrors"][0]["url"] == "https://civitai.com/api/download/models/1976567" + assert file_meta["primary"] is False + assert result["source"] == "civarchive" + assert result["images"][0]["url"] == "https://img.genur.art/example.png" + + +async def test_get_model_versions_fetches_each_version(downloader): + base_url = "https://civarchive.com/api/models/1746460" + base_payload = _base_civarchive_payload(version_id=2042594, trigger="mxpln-new", nsfw_level=5) + other_payload = _base_civarchive_payload() + + responses = { + (base_url, None): base_payload, + (base_url, (("modelVersionId", "2042594"),)): base_payload, + (base_url, (("modelVersionId", "1976567"),)): other_payload, + } + + async def fake_make_request(method, url, use_auth=False, **kwargs): + params = kwargs.get("params") + key = (url, tuple(sorted((params or {}).items())) if params else None) + downloader.calls.append({"url": url, "params": params}) + if key in responses: + return True, copy.deepcopy(responses[key]) + return False, "unexpected" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result = await client.get_model_versions("1746460") + + assert result["name"] == "Mixplin Style [Illustrious]" + assert result["type"] == "LORA" + versions = result["modelVersions"] + assert [version["id"] for version in versions] == [2042594, 1976567] + assert versions[0]["trainedWords"] == ["mxpln-new"] + assert versions[1]["trainedWords"] == ["mxpln"] + assert versions[0]["nsfwLevel"] == 5 + assert versions[1]["nsfwLevel"] == 31 + assert any(call["params"] == {"modelVersionId": "2042594"} for call in downloader.calls) + assert any(call["params"] == {"modelVersionId": "1976567"} for call in downloader.calls) + + +async def test_get_model_version_redirects_to_actual_model_id(downloader): + first_payload = _base_civarchive_payload() + first_payload["data"]["version"]["modelId"] = 222 + + base_url_request = "https://civarchive.com/api/models/111" + redirected_url_request = "https://civarchive.com/api/models/222" + + async def fake_make_request(method, url, use_auth=False, **kwargs): + downloader.calls.append({"url": url, "params": kwargs.get("params")}) + params = kwargs.get("params") or {} + if url == base_url_request: + return True, copy.deepcopy(first_payload) + if url == redirected_url_request and params.get("modelVersionId") == "1976567": + return True, copy.deepcopy(_base_civarchive_payload()) + return False, "unexpected" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result = await client.get_model_version(model_id=111, version_id=1976567) + + assert result is not None + assert result["model"]["name"] == "Mixplin Style [Illustrious]" + assert len(downloader.calls) == 2 + assert downloader.calls[1]["url"] == redirected_url_request + + +async def test_get_model_by_hash_uses_file_fallback(downloader, monkeypatch): + file_only_payload = { + "data": { + "files": [ + { + "model_id": 1746460, + "model_version_id": 1976567, + "source": "civitai", + } + ] + } + } + + version_payload = _base_civarchive_payload() + + async def fake_make_request(method, url, use_auth=False, **kwargs): + downloader.calls.append({"url": url, "params": kwargs.get("params")}) + if "/sha256/" in url: + return True, copy.deepcopy(file_only_payload) + if "/models/1746460" in url: + return True, copy.deepcopy(version_payload) + return False, "unexpected" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result, error = await client.get_model_by_hash("fallback") + + assert error is None + assert result["id"] == 1976567 + assert result["model"]["name"] == "Mixplin Style [Illustrious]" + assert any("/models/1746460" in call["url"] for call in downloader.calls) + + +async def test_get_model_by_hash_handles_not_found(downloader): + async def fake_make_request(method, url, use_auth=False, **kwargs): + return False, "Resource not found" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result, error = await client.get_model_by_hash("missing") + + assert result is None + assert error == "Model not found" From c3a66ecf2843c7a24eef4e7b933a8732710065a8 Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Sat, 11 Oct 2025 15:07:42 +0800 Subject: [PATCH 4/7] feat(civarchive_client): update get_model_version_info to resolve the real model/version IDs before fetching the target metadata. --- py/services/civarchive_client.py | 39 ++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/py/services/civarchive_client.py b/py/services/civarchive_client.py index 14b5971e..60813d6f 100644 --- a/py/services/civarchive_client.py +++ b/py/services/civarchive_client.py @@ -411,10 +411,41 @@ class CivArchiveClient: Returns: Tuple[Optional[Dict], Optional[str]]: (version_data, error_message) """ - version = await self.get_model_version(1, version_id) - if version is None: - return None, "Model not found" - return version, None + try: + lookup_payload, error = await self._request_json( + "/models/1", + params={"modelVersionId": version_id}, + ) + if error or lookup_payload is None: + logger.error(f"Error performing CivArchive version lookup for {version_id}: {error}") + return None, error or "Model lookup failed" + + data = self._normalize_payload(lookup_payload) + version_block = data.get("version") + if not isinstance(version_block, dict): + logger.warning(f"CivArchive lookup for version {version_id} returned no version block") + return None, "Model not found" + + actual_version_id = version_block.get("id") + actual_model_id = version_block.get("modelId") + if actual_version_id is None or actual_model_id is None: + logger.warning( + "CivArchive lookup for version %s missing ids (modelId=%s, versionId=%s)", + version_id, + actual_model_id, + actual_version_id, + ) + return None, "Model not found" + + version = await self.get_model_version(actual_model_id, actual_version_id) + if version is None: + return None, "Model not found" + + return version, None + + except Exception as exc: + logger.error(f"Error resolving CivArchive model version info for {version_id}: {exc}") + return None, "Model lookup failed" async def get_model_by_url(self, url) -> Optional[Dict]: """Get specific model version by parsing CivArchive HTML page (legacy method) From 1e8bd88e28aa2ad2fb4c924716b23cfd525ff788 Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Sat, 11 Oct 2025 16:11:13 +0800 Subject: [PATCH 5/7] feat(metadata): improve model ID redirect logic and provider ordering - Fix CivArchive model ID redirect logic to only follow redirects when context points to original model - Rename CivitaiModelMetadataProvider to CivArchiveModelMetadataProvider for consistency - Reorder fallback metadata providers to prioritize Civitai API over CivArchive API for better metadata quality - Remove unused asyncio import and redundant logging from metadata sync service --- py/services/civarchive_client.py | 10 +++++- py/services/metadata_service.py | 7 ++-- py/services/metadata_sync_service.py | 45 ++---------------------- tests/services/test_civarchive_client.py | 2 +- tests/services/test_download_manager.py | 2 ++ 5 files changed, 17 insertions(+), 49 deletions(-) diff --git a/py/services/civarchive_client.py b/py/services/civarchive_client.py index 60813d6f..06b0310d 100644 --- a/py/services/civarchive_client.py +++ b/py/services/civarchive_client.py @@ -392,7 +392,15 @@ class CivArchiveClient: ) return None actual_model_id = version_data.get("modelId") - if actual_model_id is not None and str(actual_model_id) != str(model_id): + context_model_id = context.get("id") + # CivArchive can respond with data for a different model id while already + # returning the fully resolved model context. Only follow the redirect when + # the context itself still points to the original (wrong) model. + if ( + actual_model_id is not None + and str(actual_model_id) != str(model_id) + and (context_model_id is None or str(context_model_id) != str(actual_model_id)) + ): return await self.get_model_version(actual_model_id, version_id) return self._transform_version(context, version_data, fallback_files) diff --git a/py/services/metadata_service.py b/py/services/metadata_service.py index 5cfd716d..730e45b0 100644 --- a/py/services/metadata_service.py +++ b/py/services/metadata_service.py @@ -58,7 +58,7 @@ async def initialize_metadata_providers(): # Register CivArchive provider, and all add to fallback providers try: civarchive_client = await ServiceRegistry.get_civarchive_client() - civarchive_provider = CivitaiModelMetadataProvider(civarchive_client) + civarchive_provider = CivArchiveModelMetadataProvider(civarchive_client) provider_manager.register_provider('civarchive_api', civarchive_provider) providers.append(('civarchive_api', civarchive_provider)) logger.debug("CivArchive metadata provider registered (also included in fallback)") @@ -67,16 +67,15 @@ async def initialize_metadata_providers(): # Set up fallback provider based on available providers if len(providers) > 1: - # Always use Civarchive, then Civitai API, then Archive DB + # Always use Civitai API (it has better metadata), then CivArchive API, then Archive DB ordered_providers = [] - ordered_providers.extend([p[1] for p in providers if p[0] == 'civarchive_api']) ordered_providers.extend([p[1] for p in providers if p[0] == 'civitai_api']) + ordered_providers.extend([p[1] for p in providers if p[0] == 'civarchive_api']) ordered_providers.extend([p[1] for p in providers if p[0] == 'sqlite']) if ordered_providers: fallback_provider = FallbackMetadataProvider(ordered_providers) provider_manager.register_provider('fallback', fallback_provider, is_default=True) - logger.info(f"Fallback metadata provider registered with {len(ordered_providers)} providers, Civarchive first") elif len(providers) == 1: # Only one provider available, set it as default provider_name, provider = providers[0] diff --git a/py/services/metadata_sync_service.py b/py/services/metadata_sync_service.py index def0a010..0a228079 100644 --- a/py/services/metadata_sync_service.py +++ b/py/services/metadata_sync_service.py @@ -5,7 +5,6 @@ from __future__ import annotations import json import logging import os -import asyncio from datetime import datetime from typing import Any, Awaitable, Callable, Dict, Iterable, Optional @@ -170,46 +169,6 @@ class MetadataSyncService: enable_archive = self._settings.get("enable_metadata_archive_db", False) try: - metadata_provider = await self._get_provider("civarchive_api") - tryagain = True - delay = 5 - - while tryagain: - civitai_metadata, error = await metadata_provider.get_model_by_hash(sha256) - tryagain = False - if not civitai_metadata or error: - if error == "HTTP 429": - error_msg = (f"Error fetching metadata: {error} (model_name={model_data.get('model_name', '')} sha256={sha256})") - logger.error(error_msg) - delay = delay * 2 - await asyncio.sleep(delay) - tryagain = True - continue - if error == "Model not found": - model_data["from_civitai"] = False - model_data["civitai_deleted"] = True - #model_data["db_checked"] = enable_archive - model_data["last_checked_at"] = datetime.now().timestamp() - data_to_save = model_data.copy() - data_to_save.pop("folder", None) - await self._metadata_manager.save_metadata(file_path, data_to_save) - await asyncio.sleep(1) - if error == "No version data found": - error_msg = (f"Error - No civitai version found: (model_name={model_data.get('model_name', '')} sha256={sha256})") - logger.error(error_msg) - error = False - if civitai_metadata.get('files'): - for file in civitai_metadata['files']: - logger.error(f"{file}") - if 'tensorart' in file['url'] or "seaart" in file['url']: - civitai_metadata, error = await metadata_provider.get_model_by_hash(file['url']) - error_msg = (f"Error fetching metadata: {error} {civitai_metadata}") - logger.error(error_msg) - if error or not civitai_metadata: - error_msg = (f"Error fetching metadata: {error} (model_name={model_data.get('model_name', '')} sha256={sha256})") - logger.error(error_msg) - return False, error_msg - if model_data.get("civitai_deleted") is True: if not enable_archive or model_data.get("db_checked") is True: if not enable_archive: @@ -241,8 +200,8 @@ class MetadataSyncService: return False, error_msg model_data["from_civitai"] = True - model_data["civitai_deleted"] = civitai_metadata.get("source") == "archive_db" - model_data["db_checked"] = enable_archive + model_data["civitai_deleted"] = civitai_metadata.get("source") == "archive_db" or civitai_metadata.get("source") == "civarchive" + model_data["db_checked"] = enable_archive and civitai_metadata.get("source") == "archive_db" model_data["last_checked_at"] = datetime.now().timestamp() local_metadata = model_data.copy() diff --git a/tests/services/test_civarchive_client.py b/tests/services/test_civarchive_client.py index fe4fa647..6c62f878 100644 --- a/tests/services/test_civarchive_client.py +++ b/tests/services/test_civarchive_client.py @@ -120,7 +120,7 @@ async def test_get_model_by_hash_transforms_payload(downloader): file_meta = result["files"][0] assert file_meta["hashes"]["SHA256"] == "E2B7A280D6539556F23F380B3F71E4E22BC4524445C4C96526E117C6005C6AD3" assert file_meta["mirrors"][0]["url"] == "https://civitai.com/api/download/models/1976567" - assert file_meta["primary"] is False + assert file_meta["primary"] is True assert result["source"] == "civarchive" assert result["images"][0]["url"] == "https://img.genur.art/example.png" diff --git a/tests/services/test_download_manager.py b/tests/services/test_download_manager.py index 48b425af..fad366f0 100644 --- a/tests/services/test_download_manager.py +++ b/tests/services/test_download_manager.py @@ -108,6 +108,7 @@ def metadata_provider(monkeypatch): "creator": {"username": "Author"}, "files": [ { + "type": "Model", "primary": True, "downloadUrl": "https://example.invalid/file.safetensors", "name": "file.safetensors", @@ -206,6 +207,7 @@ async def test_download_uses_active_mirrors(monkeypatch, scanners, metadata_prov "creator": {"username": "Author"}, "files": [ { + "type": "Model", "primary": True, "downloadUrl": "https://example.invalid/file.safetensors", "mirrors": [ From ddb30dbb17cc8fc2920feb05fd2826a2e5cf7e35 Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Sat, 11 Oct 2025 16:11:17 +0800 Subject: [PATCH 6/7] Revert "feat(civarchive_client): update get_model_version_info to resolve the real model/version IDs before fetching the target metadata." This reverts commit c3a66ecf2843c7a24eef4e7b933a8732710065a8. --- py/services/civarchive_client.py | 39 ++++---------------------------- 1 file changed, 4 insertions(+), 35 deletions(-) diff --git a/py/services/civarchive_client.py b/py/services/civarchive_client.py index 06b0310d..76e15e54 100644 --- a/py/services/civarchive_client.py +++ b/py/services/civarchive_client.py @@ -419,41 +419,10 @@ class CivArchiveClient: Returns: Tuple[Optional[Dict], Optional[str]]: (version_data, error_message) """ - try: - lookup_payload, error = await self._request_json( - "/models/1", - params={"modelVersionId": version_id}, - ) - if error or lookup_payload is None: - logger.error(f"Error performing CivArchive version lookup for {version_id}: {error}") - return None, error or "Model lookup failed" - - data = self._normalize_payload(lookup_payload) - version_block = data.get("version") - if not isinstance(version_block, dict): - logger.warning(f"CivArchive lookup for version {version_id} returned no version block") - return None, "Model not found" - - actual_version_id = version_block.get("id") - actual_model_id = version_block.get("modelId") - if actual_version_id is None or actual_model_id is None: - logger.warning( - "CivArchive lookup for version %s missing ids (modelId=%s, versionId=%s)", - version_id, - actual_model_id, - actual_version_id, - ) - return None, "Model not found" - - version = await self.get_model_version(actual_model_id, actual_version_id) - if version is None: - return None, "Model not found" - - return version, None - - except Exception as exc: - logger.error(f"Error resolving CivArchive model version info for {version_id}: {exc}") - return None, "Model lookup failed" + version = await self.get_model_version(1, version_id) + if version is None: + return None, "Model not found" + return version, None async def get_model_by_url(self, url) -> Optional[Dict]: """Get specific model version by parsing CivArchive HTML page (legacy method) From 1dc189eb39c2db4fde14921041d7feccf43224ac Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Sat, 11 Oct 2025 17:44:38 +0800 Subject: [PATCH 7/7] feat(metadata): implement fallback provider strategy for deleted models Refactor metadata sync service to use a prioritized provider fallback system when handling deleted CivitAI models. The new approach: 1. Attempts civarchive_api provider first for deleted models 2. Falls back to sqlite provider if archive DB is enabled 3. Maintains existing default provider behavior for non-deleted models 4. Tracks provider attempts and errors for better debugging This improves reliability when fetching metadata for deleted models by trying multiple sources before giving up, and provides clearer error messages based on which providers were attempted. --- py/services/metadata_sync_service.py | 82 +++++++++++-- py/utils/models.py | 1 + tests/services/test_metadata_sync_service.py | 121 +++++++++++++++++++ 3 files changed, 193 insertions(+), 11 deletions(-) diff --git a/py/services/metadata_sync_service.py b/py/services/metadata_sync_service.py index 0a228079..5ce79e3d 100644 --- a/py/services/metadata_sync_service.py +++ b/py/services/metadata_sync_service.py @@ -167,41 +167,101 @@ class MetadataSyncService: metadata_path = os.path.splitext(file_path)[0] + ".metadata.json" enable_archive = self._settings.get("enable_metadata_archive_db", False) + previous_source = model_data.get("metadata_source") or (model_data.get("civitai") or {}).get("source") try: + provider_attempts: list[tuple[Optional[str], MetadataProviderProtocol]] = [] + sqlite_attempted = False + if model_data.get("civitai_deleted") is True: - if not enable_archive or model_data.get("db_checked") is True: + if previous_source in (None, "civarchive"): + try: + provider_attempts.append(("civarchive_api", await self._get_provider("civarchive_api"))) + except Exception as exc: # pragma: no cover - provider resolution fault + logger.debug("Unable to resolve civarchive provider: %s", exc) + + if enable_archive and model_data.get("db_checked") is not True: + try: + provider_attempts.append(("sqlite", await self._get_provider("sqlite"))) + except Exception as exc: # pragma: no cover - provider resolution fault + logger.debug("Unable to resolve sqlite provider: %s", exc) + + if not provider_attempts: if not enable_archive: error_msg = "CivitAI model is deleted and metadata archive DB is not enabled" - else: + elif model_data.get("db_checked") is True: error_msg = "CivitAI model is deleted and not found in metadata archive DB" - return (False, error_msg) - metadata_provider = await self._get_provider("sqlite") + else: + error_msg = "CivitAI model is deleted and no archive provider is available" + return False, error_msg else: - metadata_provider = await self._get_default_provider() + provider_attempts.append((None, await self._get_default_provider())) - civitai_metadata, error = await metadata_provider.get_model_by_hash(sha256) + civitai_metadata: Optional[Dict[str, Any]] = None + metadata_provider: Optional[MetadataProviderProtocol] = None + provider_used: Optional[str] = None + last_error: Optional[str] = None - if not civitai_metadata: - if error == "Model not found": + for provider_name, provider in provider_attempts: + try: + civitai_metadata_candidate, error = await provider.get_model_by_hash(sha256) + except Exception as exc: # pragma: no cover - defensive logging + logger.error("Provider %s failed for hash %s: %s", provider_name, sha256, exc) + civitai_metadata_candidate, error = None, str(exc) + + if provider_name == "sqlite": + sqlite_attempted = True + + if civitai_metadata_candidate: + civitai_metadata = civitai_metadata_candidate + metadata_provider = provider + provider_used = provider_name + break + + last_error = error or last_error + + if civitai_metadata is None or metadata_provider is None: + if sqlite_attempted: + model_data["db_checked"] = True + + if last_error == "Model not found": model_data["from_civitai"] = False model_data["civitai_deleted"] = True - model_data["db_checked"] = enable_archive + model_data["db_checked"] = sqlite_attempted or (enable_archive and model_data.get("db_checked", False)) model_data["last_checked_at"] = datetime.now().timestamp() data_to_save = model_data.copy() data_to_save.pop("folder", None) await self._metadata_manager.save_metadata(file_path, data_to_save) + default_error = ( + "CivitAI model is deleted and metadata archive DB is not enabled" + if model_data.get("civitai_deleted") and not enable_archive + else "CivitAI model is deleted and not found in metadata archive DB" + if model_data.get("civitai_deleted") and (model_data.get("db_checked") is True or sqlite_attempted) + else "No provider returned metadata" + ) + error_msg = ( - f"Error fetching metadata: {error} (model_name={model_data.get('model_name', '')})" + f"Error fetching metadata: {last_error or default_error} " + f"(model_name={model_data.get('model_name', '')})" ) logger.error(error_msg) return False, error_msg model_data["from_civitai"] = True model_data["civitai_deleted"] = civitai_metadata.get("source") == "archive_db" or civitai_metadata.get("source") == "civarchive" - model_data["db_checked"] = enable_archive and civitai_metadata.get("source") == "archive_db" + model_data["db_checked"] = enable_archive and ( + civitai_metadata.get("source") == "archive_db" or sqlite_attempted + ) + source = civitai_metadata.get("source") or "civitai_api" + if source == "api": + source = "civitai_api" + elif provider_used == "civarchive_api" and source != "civarchive": + source = "civarchive" + elif provider_used == "sqlite": + source = "archive_db" + model_data["metadata_source"] = source model_data["last_checked_at"] = datetime.now().timestamp() local_metadata = model_data.copy() diff --git a/py/utils/models.py b/py/utils/models.py index 159146d5..4caffa3e 100644 --- a/py/utils/models.py +++ b/py/utils/models.py @@ -25,6 +25,7 @@ class BaseModelMetadata: favorite: bool = False # Whether the model is a favorite exclude: bool = False # Whether to exclude this model from the cache db_checked: bool = False # Whether checked in archive DB + metadata_source: Optional[str] = None # Last provider that supplied metadata last_checked_at: float = 0 # Last checked timestamp _unknown_fields: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False) # Store unknown fields diff --git a/tests/services/test_metadata_sync_service.py b/tests/services/test_metadata_sync_service.py index 470259f6..cd3ade58 100644 --- a/tests/services/test_metadata_sync_service.py +++ b/tests/services/test_metadata_sync_service.py @@ -32,6 +32,8 @@ def build_service( get_model_by_hash=AsyncMock(), get_model_version=AsyncMock(), ) + if default_provider is None: + provider.get_model_by_hash.return_value = (None, None) default_provider_factory = AsyncMock(return_value=provider) provider_selector = provider_selector or AsyncMock(return_value=provider) @@ -138,6 +140,7 @@ async def test_fetch_and_update_model_success_updates_cache(tmp_path): assert model_data["from_civitai"] is True assert model_data["civitai_deleted"] is False assert "civitai" in model_data + assert model_data["metadata_source"] == "civitai_api" helpers.metadata_manager.hydrate_model_data.assert_not_awaited() assert model_data["hydrated"] is True @@ -219,6 +222,124 @@ async def test_fetch_and_update_model_respects_deleted_without_archive(): update_cache.assert_not_awaited() +@pytest.mark.asyncio +async def test_fetch_and_update_model_prefers_civarchive_for_deleted_models(tmp_path): + default_provider = SimpleNamespace( + get_model_by_hash=AsyncMock(), + get_model_version=AsyncMock(), + ) + civarchive_provider = SimpleNamespace( + get_model_by_hash=AsyncMock( + return_value=( + { + "source": "civarchive", + "model": {"name": "Recovered", "description": "", "tags": []}, + "images": [], + "baseModel": "sdxl", + }, + None, + ) + ), + get_model_version=AsyncMock(), + ) + + async def select_provider(name: str): + return civarchive_provider if name == "civarchive_api" else default_provider + + provider_selector = AsyncMock(side_effect=select_provider) + helpers = build_service( + settings_values={"enable_metadata_archive_db": False}, + default_provider=default_provider, + provider_selector=provider_selector, + ) + + model_path = tmp_path / "model.safetensors" + model_data = { + "civitai_deleted": True, + "metadata_source": "civarchive", + "civitai": {"source": "civarchive"}, + "file_path": str(model_path), + } + update_cache = AsyncMock() + + ok, error = await helpers.service.fetch_and_update_model( + sha256="deadbeef", + file_path=str(model_path), + model_data=model_data, + update_cache_func=update_cache, + ) + + assert ok + assert error is None + provider_selector.assert_awaited_with("civarchive_api") + helpers.default_provider_factory.assert_not_awaited() + civarchive_provider.get_model_by_hash.assert_awaited_once_with("deadbeef") + update_cache.assert_awaited() + assert model_data["metadata_source"] == "civarchive" + helpers.metadata_manager.save_metadata.assert_awaited() + + +@pytest.mark.asyncio +async def test_fetch_and_update_model_falls_back_to_sqlite_after_civarchive_failure(tmp_path): + default_provider = SimpleNamespace( + get_model_by_hash=AsyncMock(), + get_model_version=AsyncMock(), + ) + civarchive_provider = SimpleNamespace( + get_model_by_hash=AsyncMock(return_value=(None, "Model not found")), + get_model_version=AsyncMock(), + ) + sqlite_payload = { + "source": "archive_db", + "model": {"name": "Recovered", "description": "", "tags": []}, + "images": [], + "baseModel": "sdxl", + } + sqlite_provider = SimpleNamespace( + get_model_by_hash=AsyncMock(return_value=(sqlite_payload, None)), + get_model_version=AsyncMock(), + ) + + async def select_provider(name: str): + if name == "civarchive_api": + return civarchive_provider + if name == "sqlite": + return sqlite_provider + return default_provider + + provider_selector = AsyncMock(side_effect=select_provider) + helpers = build_service( + settings_values={"enable_metadata_archive_db": True}, + default_provider=default_provider, + provider_selector=provider_selector, + ) + + model_path = tmp_path / "model.safetensors" + model_data = { + "civitai_deleted": True, + "db_checked": False, + "file_path": str(model_path), + } + update_cache = AsyncMock() + + ok, error = await helpers.service.fetch_and_update_model( + sha256="cafe", + file_path=str(model_path), + model_data=model_data, + update_cache_func=update_cache, + ) + + assert ok and error is None + assert civarchive_provider.get_model_by_hash.await_count == 1 + assert sqlite_provider.get_model_by_hash.await_count == 1 + assert model_data["metadata_source"] == "archive_db" + assert model_data["db_checked"] is True + assert provider_selector.await_args_list[0].args == ("civarchive_api",) + assert provider_selector.await_args_list[1].args == ("sqlite",) + update_cache.assert_awaited() + helpers.metadata_manager.save_metadata.assert_awaited() + + @pytest.mark.asyncio async def test_relink_metadata_fetches_version_and_updates_sha(tmp_path): provider = SimpleNamespace(