diff --git a/py/services/civarchive_client.py b/py/services/civarchive_client.py new file mode 100644 index 00000000..76e15e54 --- /dev/null +++ b/py/services/civarchive_client.py @@ -0,0 +1,529 @@ +import os +import json +import logging +import asyncio +from copy import deepcopy +from typing import Optional, Dict, Tuple, List +from .model_metadata_provider import CivArchiveModelMetadataProvider, ModelMetadataProviderManager +from .downloader import get_downloader + +try: + from bs4 import BeautifulSoup +except ImportError as exc: + BeautifulSoup = None # type: ignore[assignment] + _BS4_IMPORT_ERROR = exc +else: + _BS4_IMPORT_ERROR = None + +def _require_beautifulsoup(): + if BeautifulSoup is None: + raise RuntimeError( + "BeautifulSoup (bs4) is required for CivArchive client. " + "Install it with 'pip install beautifulsoup4'." + ) from _BS4_IMPORT_ERROR + return BeautifulSoup + +logger = logging.getLogger(__name__) + +class CivArchiveClient: + _instance = None + _lock = asyncio.Lock() + + @classmethod + async def get_instance(cls): + """Get singleton instance of CivArchiveClient""" + async with cls._lock: + if cls._instance is None: + cls._instance = cls() + + # Register this client as a metadata provider + provider_manager = await ModelMetadataProviderManager.get_instance() + provider_manager.register_provider('civarchive', CivArchiveModelMetadataProvider(cls._instance), False) + + return cls._instance + + def __init__(self): + # Check if already initialized for singleton pattern + if hasattr(self, '_initialized'): + return + self._initialized = True + + self.base_url = "https://civarchive.com/api" + + async def _request_json( + self, + path: str, + params: Optional[Dict[str, str]] = None + ) -> Tuple[Optional[Dict], Optional[str]]: + """Call CivArchive API and return JSON payload""" + downloader = await get_downloader() + kwargs: Dict[str, Dict[str, str]] = {} + if params: + safe_params = {str(key): str(value) for key, value in params.items() if value is not None} + if safe_params: + kwargs["params"] = safe_params + success, payload = await downloader.make_request( + "GET", + f"{self.base_url}{path}", + use_auth=False, + **kwargs + ) + if not success: + error = payload if isinstance(payload, str) else "Request failed" + return None, error + if not isinstance(payload, dict): + return None, "Invalid response structure" + return payload, None + + @staticmethod + def _normalize_payload(payload: Dict) -> Dict: + """Unwrap CivArchive responses that wrap content under a data key""" + if not isinstance(payload, dict): + return {} + data = payload.get("data") + if isinstance(data, dict): + return data + return payload + + @staticmethod + def _split_context(payload: Dict) -> Tuple[Dict, Dict, List[Dict]]: + """Separate version payload from surrounding model context""" + data = CivArchiveClient._normalize_payload(payload) + context: Dict = {} + fallback_files: List[Dict] = [] + version: Dict = {} + + for key, value in data.items(): + if key in {"version", "model"}: + continue + context[key] = value + + if isinstance(data.get("version"), dict): + version = data["version"] + + model_block = data.get("model") + if isinstance(model_block, dict): + for key, value in model_block.items(): + if key == "version": + if not version and isinstance(value, dict): + version = value + continue + context.setdefault(key, value) + fallback_files = fallback_files or model_block.get("files") or [] + + fallback_files = fallback_files or data.get("files") or [] + return context, version, fallback_files + + @staticmethod + def _ensure_list(value) -> List: + if isinstance(value, list): + return value + if value is None: + return [] + return [value] + + @staticmethod + def _build_model_info(context: Dict) -> Dict: + tags = context.get("tags") + if not isinstance(tags, list): + tags = list(tags) if isinstance(tags, (set, tuple)) else ([] if tags is None else [tags]) + return { + "name": context.get("name"), + "type": context.get("type"), + "nsfw": bool(context.get("is_nsfw", context.get("nsfw", False))), + "description": context.get("description"), + "tags": tags, + } + + @staticmethod + def _build_creator_info(context: Dict) -> Dict: + username = context.get("creator_username") or context.get("username") or "" + image = context.get("creator_image") or context.get("creator_avatar") or "" + creator: Dict[str, Optional[str]] = { + "username": username, + "image": image, + } + if context.get("creator_name"): + creator["name"] = context["creator_name"] + if context.get("creator_url"): + creator["url"] = context["creator_url"] + return creator + + @staticmethod + def _transform_file_entry(file_data: Dict) -> Dict: + mirrors = file_data.get("mirrors") or [] + if not isinstance(mirrors, list): + mirrors = [mirrors] + available_mirror = next( + (mirror for mirror in mirrors if isinstance(mirror, dict) and mirror.get("deletedAt") is None), + None + ) + download_url = file_data.get("downloadUrl") + if not download_url and available_mirror: + download_url = available_mirror.get("url") + name = file_data.get("name") + if not name and available_mirror: + name = available_mirror.get("filename") + + transformed: Dict = { + "id": file_data.get("id"), + "sizeKB": file_data.get("sizeKB"), + "name": name, + "type": file_data.get("type"), + "downloadUrl": download_url, + "primary": True, + # TODO: for some reason is_primary is false in CivArchive response, need to figure this out, + # "primary": bool(file_data.get("is_primary", file_data.get("primary", False))), + "mirrors": mirrors, + } + + sha256 = file_data.get("sha256") + if sha256: + transformed["hashes"] = {"SHA256": str(sha256).upper()} + elif isinstance(file_data.get("hashes"), dict): + transformed["hashes"] = file_data["hashes"] + + if "metadata" in file_data: + transformed["metadata"] = file_data["metadata"] + + if file_data.get("modelVersionId") is not None: + transformed["modelVersionId"] = file_data.get("modelVersionId") + elif file_data.get("model_version_id") is not None: + transformed["modelVersionId"] = file_data.get("model_version_id") + + if file_data.get("modelId") is not None: + transformed["modelId"] = file_data.get("modelId") + elif file_data.get("model_id") is not None: + transformed["modelId"] = file_data.get("model_id") + + return transformed + + def _transform_files( + self, + files: Optional[List[Dict]], + fallback_files: Optional[List[Dict]] = None + ) -> List[Dict]: + candidates: List[Dict] = [] + if isinstance(files, list) and files: + candidates = files + elif isinstance(fallback_files, list): + candidates = fallback_files + + transformed_files: List[Dict] = [] + for file_data in candidates: + if isinstance(file_data, dict): + transformed_files.append(self._transform_file_entry(file_data)) + return transformed_files + + def _transform_version( + self, + context: Dict, + version: Dict, + fallback_files: Optional[List[Dict]] = None + ) -> Optional[Dict]: + if not version: + return None + + version_copy = deepcopy(version) + version_copy.pop("model", None) + version_copy.pop("creator", None) + + if "trigger" in version_copy: + triggers = version_copy.pop("trigger") + if isinstance(triggers, list): + version_copy["trainedWords"] = triggers + elif triggers is None: + version_copy["trainedWords"] = [] + else: + version_copy["trainedWords"] = [triggers] + + if "trainedWords" in version_copy and isinstance(version_copy["trainedWords"], str): + version_copy["trainedWords"] = [version_copy["trainedWords"]] + + if "nsfw_level" in version_copy: + version_copy["nsfwLevel"] = version_copy.pop("nsfw_level") + elif "nsfwLevel" not in version_copy and context.get("nsfw_level") is not None: + version_copy["nsfwLevel"] = context.get("nsfw_level") + + stats_keys = ["downloadCount", "ratingCount", "rating"] + stats = {key: version_copy.pop(key) for key in stats_keys if key in version_copy} + if stats: + version_copy["stats"] = stats + + version_copy["files"] = self._transform_files(version_copy.get("files"), fallback_files) + version_copy["images"] = self._ensure_list(version_copy.get("images")) + + version_copy["model"] = self._build_model_info(context) + version_copy["creator"] = self._build_creator_info(context) + + version_copy["source"] = "civarchive" + version_copy["is_deleted"] = bool(context.get("deletedAt")) or bool(version.get("deletedAt")) + + return version_copy + + async def _resolve_version_from_files(self, payload: Dict) -> Optional[Dict]: + """Fallback to fetch version data when only file metadata is available""" + data = self._normalize_payload(payload) + files = data.get("files") or payload.get("files") or [] + if not isinstance(files, list): + files = [files] + for file_data in files: + if not isinstance(file_data, dict): + continue + model_id = file_data.get("model_id") or file_data.get("modelId") + version_id = file_data.get("model_version_id") or file_data.get("modelVersionId") + if model_id is None or version_id is None: + continue + resolved = await self.get_model_version(model_id, version_id) + if resolved: + return resolved + return None + + async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]: + """Find model by SHA256 hash value using CivArchive API""" + try: + payload, error = await self._request_json(f"/sha256/{model_hash.lower()}") + if error: + if "not found" in error.lower(): + return None, "Model not found" + return None, error + + context, version_data, fallback_files = self._split_context(payload) + transformed = self._transform_version(context, version_data, fallback_files) + if transformed: + return transformed, None + + resolved = await self._resolve_version_from_files(payload) + if resolved: + return resolved, None + + logger.error("Error fetching version of CivArchive model by hash %s", model_hash[:10]) + return None, "No version data found" + + except Exception as e: + logger.error(f"Error fetching CivArchive model by hash {model_hash[:10]}: {e}") + return None, str(e) + + async def get_model_versions(self, model_id: str) -> Optional[Dict]: + """Get all versions of a model using CivArchive API""" + try: + payload, error = await self._request_json(f"/models/{model_id}") + if error or payload is None: + if error and "not found" in error.lower(): + return None + logger.error(f"Error fetching CivArchive model versions for {model_id}: {error}") + return None + + data = self._normalize_payload(payload) + context, version_data, fallback_files = self._split_context(payload) + + versions_meta = data.get("versions") or [] + transformed_versions: List[Dict] = [] + for meta in versions_meta: + if not isinstance(meta, dict): + continue + version_id = meta.get("id") + if version_id is None: + continue + target_model_id = meta.get("modelId") or model_id + version = await self.get_model_version(target_model_id, version_id) + if version: + transformed_versions.append(version) + + # Ensure the primary version is included even if versions list was empty + primary_version = self._transform_version(context, version_data, fallback_files) + if primary_version: + transformed_versions.insert(0, primary_version) + + ordered_versions: List[Dict] = [] + seen_ids = set() + for version in transformed_versions: + version_id = version.get("id") + if version_id in seen_ids: + continue + seen_ids.add(version_id) + ordered_versions.append(version) + + return { + "modelVersions": ordered_versions, + "type": context.get("type", ""), + "name": context.get("name", ""), + } + + except Exception as e: + logger.error(f"Error fetching CivArchive model versions for {model_id}: {e}") + return None + + async def get_model_version(self, model_id: int = None, version_id: int = None) -> Optional[Dict]: + """Get specific model version using CivArchive API + + Args: + model_id: The model ID (required) + version_id: Optional specific version ID to filter to + + Returns: + Optional[Dict]: The model version data or None if not found + """ + if model_id is None: + return None + + try: + params = {"modelVersionId": version_id} if version_id is not None else None + payload, error = await self._request_json(f"/models/{model_id}", params=params) + if error or payload is None: + if error and "not found" in error.lower(): + return None + logger.error(f"Error fetching CivArchive model version via API {model_id}/{version_id}: {error}") + return None + + context, version_data, fallback_files = self._split_context(payload) + + if not version_data: + return await self._resolve_version_from_files(payload) + + if version_id is not None: + raw_id = version_data.get("id") + if raw_id != version_id: + logger.warning( + "Requested version %s doesn't match default version %s for model %s", + version_id, + raw_id, + model_id, + ) + return None + actual_model_id = version_data.get("modelId") + context_model_id = context.get("id") + # CivArchive can respond with data for a different model id while already + # returning the fully resolved model context. Only follow the redirect when + # the context itself still points to the original (wrong) model. + if ( + actual_model_id is not None + and str(actual_model_id) != str(model_id) + and (context_model_id is None or str(context_model_id) != str(actual_model_id)) + ): + return await self.get_model_version(actual_model_id, version_id) + + return self._transform_version(context, version_data, fallback_files) + + except Exception as e: + logger.error(f"Error fetching CivArchive model version via API {model_id}/{version_id}: {e}") + return None + + async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]: + """ Fetch model version metadata using a known bogus model lookup + CivArchive lacks a direct version lookup API, this uses a workaround (which we handle in the main model request now) + + Args: + version_id: The model version ID + + Returns: + Tuple[Optional[Dict], Optional[str]]: (version_data, error_message) + """ + version = await self.get_model_version(1, version_id) + if version is None: + return None, "Model not found" + return version, None + + async def get_model_by_url(self, url) -> Optional[Dict]: + """Get specific model version by parsing CivArchive HTML page (legacy method) + + This is the original HTML scraping implementation, kept for reference and new sites added not in api. + The primary get_model_version() now uses the API instead. + """ + + try: + # Construct CivArchive URL + url = f"https://civarchive.com/{url}" + downloader = await get_downloader() + session = await downloader.session + async with session.get(url) as response: + if response.status != 200: + return None + + html_content = await response.text() + + # Parse HTML to extract JSON data + soup_parser = _require_beautifulsoup() + soup = soup_parser(html_content, 'html.parser') + script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'}) + + if not script_tag: + return None + + # Parse JSON content + json_data = json.loads(script_tag.string) + model_data = json_data.get('props', {}).get('pageProps', {}).get('model') + + if not model_data or 'version' not in model_data: + return None + + # Extract version data as base + version = model_data['version'].copy() + + # Restructure stats + if 'downloadCount' in version and 'ratingCount' in version and 'rating' in version: + version['stats'] = { + 'downloadCount': version.pop('downloadCount'), + 'ratingCount': version.pop('ratingCount'), + 'rating': version.pop('rating') + } + + # Rename trigger to trainedWords + if 'trigger' in version: + version['trainedWords'] = version.pop('trigger') + + # Transform files data to expected format + if 'files' in version: + transformed_files = [] + for file_data in version['files']: + # Find first available mirror (deletedAt is null) + available_mirror = None + for mirror in file_data.get('mirrors', []): + if mirror.get('deletedAt') is None: + available_mirror = mirror + break + + # Create transformed file entry + transformed_file = { + 'id': file_data.get('id'), + 'sizeKB': file_data.get('sizeKB'), + 'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'), + 'type': file_data.get('type'), + 'downloadUrl': available_mirror.get('url') if available_mirror else None, + 'primary': file_data.get('is_primary', False), + 'mirrors': file_data.get('mirrors', []) + } + + # Transform hash format + if 'sha256' in file_data: + transformed_file['hashes'] = { + 'SHA256': file_data['sha256'].upper() + } + + transformed_files.append(transformed_file) + + version['files'] = transformed_files + + # Add model information + version['model'] = { + 'name': model_data.get('name'), + 'type': model_data.get('type'), + 'nsfw': model_data.get('is_nsfw', False), + 'description': model_data.get('description'), + 'tags': model_data.get('tags', []) + } + + version['creator'] = { + 'username': model_data.get('username'), + 'image': '' + } + + # Add source identifier + version['source'] = 'civarchive' + version['is_deleted'] = json_data.get('query', {}).get('is_deleted', False) + + return version + + except Exception as e: + logger.error(f"Error fetching CivArchive model version (scraping) {url}: {e}") + return None diff --git a/py/services/download_manager.py b/py/services/download_manager.py index ea3a87d0..0a3cb142 100644 --- a/py/services/download_manager.py +++ b/py/services/download_manager.py @@ -294,7 +294,7 @@ class DownloadManager: await progress_callback(0) # 2. Get file information - file_info = next((f for f in version_info.get('files', []) if f.get('primary')), None) + file_info = next((f for f in version_info.get('files', []) if f.get('primary') and f.get('type') == 'Model'), None) if not file_info: return {'success': False, 'error': 'No primary file found in metadata'} mirrors = file_info.get('mirrors') or [] diff --git a/py/services/metadata_service.py b/py/services/metadata_service.py index 5f5ae727..730e45b0 100644 --- a/py/services/metadata_service.py +++ b/py/services/metadata_service.py @@ -4,6 +4,7 @@ from .model_metadata_provider import ( ModelMetadataProviderManager, SQLiteModelMetadataProvider, CivitaiModelMetadataProvider, + CivArchiveModelMetadataProvider, FallbackMetadataProvider ) from .settings_manager import get_settings_manager @@ -54,26 +55,27 @@ async def initialize_metadata_providers(): except Exception as e: logger.error(f"Failed to initialize Civitai API metadata provider: {e}") - # Register CivArchive provider, but do NOT add to fallback providers + # Register CivArchive provider, and all add to fallback providers try: - from .model_metadata_provider import CivArchiveModelMetadataProvider - civarchive_provider = CivArchiveModelMetadataProvider() - provider_manager.register_provider('civarchive', civarchive_provider) - logger.debug("CivArchive metadata provider registered (not included in fallback)") + civarchive_client = await ServiceRegistry.get_civarchive_client() + civarchive_provider = CivArchiveModelMetadataProvider(civarchive_client) + provider_manager.register_provider('civarchive_api', civarchive_provider) + providers.append(('civarchive_api', civarchive_provider)) + logger.debug("CivArchive metadata provider registered (also included in fallback)") except Exception as e: logger.error(f"Failed to initialize CivArchive metadata provider: {e}") # Set up fallback provider based on available providers if len(providers) > 1: - # Always use Civitai API first, then Archive DB + # Always use Civitai API (it has better metadata), then CivArchive API, then Archive DB ordered_providers = [] ordered_providers.extend([p[1] for p in providers if p[0] == 'civitai_api']) + ordered_providers.extend([p[1] for p in providers if p[0] == 'civarchive_api']) ordered_providers.extend([p[1] for p in providers if p[0] == 'sqlite']) if ordered_providers: fallback_provider = FallbackMetadataProvider(ordered_providers) provider_manager.register_provider('fallback', fallback_provider, is_default=True) - logger.debug(f"Fallback metadata provider registered with {len(ordered_providers)} providers, Civitai API first") elif len(providers) == 1: # Only one provider available, set it as default provider_name, provider = providers[0] diff --git a/py/services/metadata_sync_service.py b/py/services/metadata_sync_service.py index 738f3b86..5ce79e3d 100644 --- a/py/services/metadata_sync_service.py +++ b/py/services/metadata_sync_service.py @@ -167,41 +167,101 @@ class MetadataSyncService: metadata_path = os.path.splitext(file_path)[0] + ".metadata.json" enable_archive = self._settings.get("enable_metadata_archive_db", False) + previous_source = model_data.get("metadata_source") or (model_data.get("civitai") or {}).get("source") try: + provider_attempts: list[tuple[Optional[str], MetadataProviderProtocol]] = [] + sqlite_attempted = False + if model_data.get("civitai_deleted") is True: - if not enable_archive or model_data.get("db_checked") is True: + if previous_source in (None, "civarchive"): + try: + provider_attempts.append(("civarchive_api", await self._get_provider("civarchive_api"))) + except Exception as exc: # pragma: no cover - provider resolution fault + logger.debug("Unable to resolve civarchive provider: %s", exc) + + if enable_archive and model_data.get("db_checked") is not True: + try: + provider_attempts.append(("sqlite", await self._get_provider("sqlite"))) + except Exception as exc: # pragma: no cover - provider resolution fault + logger.debug("Unable to resolve sqlite provider: %s", exc) + + if not provider_attempts: if not enable_archive: error_msg = "CivitAI model is deleted and metadata archive DB is not enabled" - else: + elif model_data.get("db_checked") is True: error_msg = "CivitAI model is deleted and not found in metadata archive DB" - return (False, error_msg) - metadata_provider = await self._get_provider("sqlite") + else: + error_msg = "CivitAI model is deleted and no archive provider is available" + return False, error_msg else: - metadata_provider = await self._get_default_provider() + provider_attempts.append((None, await self._get_default_provider())) - civitai_metadata, error = await metadata_provider.get_model_by_hash(sha256) + civitai_metadata: Optional[Dict[str, Any]] = None + metadata_provider: Optional[MetadataProviderProtocol] = None + provider_used: Optional[str] = None + last_error: Optional[str] = None - if not civitai_metadata: - if error == "Model not found": + for provider_name, provider in provider_attempts: + try: + civitai_metadata_candidate, error = await provider.get_model_by_hash(sha256) + except Exception as exc: # pragma: no cover - defensive logging + logger.error("Provider %s failed for hash %s: %s", provider_name, sha256, exc) + civitai_metadata_candidate, error = None, str(exc) + + if provider_name == "sqlite": + sqlite_attempted = True + + if civitai_metadata_candidate: + civitai_metadata = civitai_metadata_candidate + metadata_provider = provider + provider_used = provider_name + break + + last_error = error or last_error + + if civitai_metadata is None or metadata_provider is None: + if sqlite_attempted: + model_data["db_checked"] = True + + if last_error == "Model not found": model_data["from_civitai"] = False model_data["civitai_deleted"] = True - model_data["db_checked"] = enable_archive + model_data["db_checked"] = sqlite_attempted or (enable_archive and model_data.get("db_checked", False)) model_data["last_checked_at"] = datetime.now().timestamp() data_to_save = model_data.copy() data_to_save.pop("folder", None) await self._metadata_manager.save_metadata(file_path, data_to_save) + default_error = ( + "CivitAI model is deleted and metadata archive DB is not enabled" + if model_data.get("civitai_deleted") and not enable_archive + else "CivitAI model is deleted and not found in metadata archive DB" + if model_data.get("civitai_deleted") and (model_data.get("db_checked") is True or sqlite_attempted) + else "No provider returned metadata" + ) + error_msg = ( - f"Error fetching metadata: {error} (model_name={model_data.get('model_name', '')})" + f"Error fetching metadata: {last_error or default_error} " + f"(model_name={model_data.get('model_name', '')})" ) logger.error(error_msg) return False, error_msg model_data["from_civitai"] = True - model_data["civitai_deleted"] = civitai_metadata.get("source") == "archive_db" - model_data["db_checked"] = enable_archive + model_data["civitai_deleted"] = civitai_metadata.get("source") == "archive_db" or civitai_metadata.get("source") == "civarchive" + model_data["db_checked"] = enable_archive and ( + civitai_metadata.get("source") == "archive_db" or sqlite_attempted + ) + source = civitai_metadata.get("source") or "civitai_api" + if source == "api": + source = "civitai_api" + elif provider_used == "civarchive_api" and source != "civarchive": + source = "civarchive" + elif provider_used == "sqlite": + source = "archive_db" + model_data["metadata_source"] = source model_data["last_checked_at"] = datetime.now().timestamp() local_metadata = model_data.copy() diff --git a/py/services/model_metadata_provider.py b/py/services/model_metadata_provider.py index 99b3488c..73d9b7d8 100644 --- a/py/services/model_metadata_provider.py +++ b/py/services/model_metadata_provider.py @@ -88,122 +88,22 @@ class CivitaiModelMetadataProvider(ModelMetadataProvider): return await self.client.get_user_models(username) class CivArchiveModelMetadataProvider(ModelMetadataProvider): - """Provider that uses CivArchive HTML page parsing for metadata""" + """Provider that uses CivArchive API for metadata""" + def __init__(self, civarchive_client): + self.client = civarchive_client + async def get_model_by_hash(self, model_hash: str) -> Tuple[Optional[Dict], Optional[str]]: - """Not supported by CivArchive provider""" - return None, "CivArchive provider does not support hash lookup" + return await self.client.get_model_by_hash(model_hash) async def get_model_versions(self, model_id: str) -> Optional[Dict]: - """Not supported by CivArchive provider""" - return None + return await self.client.get_model_versions(model_id) async def get_model_version(self, model_id: int = None, version_id: int = None) -> Optional[Dict]: - """Get specific model version by parsing CivArchive HTML page""" - if model_id is None or version_id is None: - return None - - try: - # Construct CivArchive URL - url = f"https://civarchive.com/models/{model_id}?modelVersionId={version_id}" - - downloader = await get_downloader() - session = await downloader.session - async with session.get(url) as response: - if response.status != 200: - return None - - html_content = await response.text() - - # Parse HTML to extract JSON data - soup_parser = _require_beautifulsoup() - soup = soup_parser(html_content, 'html.parser') - script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'}) - - if not script_tag: - return None - - # Parse JSON content - json_data = json.loads(script_tag.string) - model_data = json_data.get('props', {}).get('pageProps', {}).get('model') - - if not model_data or 'version' not in model_data: - return None - - # Extract version data as base - version = model_data['version'].copy() - - # Restructure stats - if 'downloadCount' in version and 'ratingCount' in version and 'rating' in version: - version['stats'] = { - 'downloadCount': version.pop('downloadCount'), - 'ratingCount': version.pop('ratingCount'), - 'rating': version.pop('rating') - } - - # Rename trigger to trainedWords - if 'trigger' in version: - version['trainedWords'] = version.pop('trigger') - - # Transform files data to expected format - if 'files' in version: - transformed_files = [] - for file_data in version['files']: - # Find first available mirror (deletedAt is null) - available_mirror = None - for mirror in file_data.get('mirrors', []): - if mirror.get('deletedAt') is None: - available_mirror = mirror - break - - # Create transformed file entry - transformed_file = { - 'id': file_data.get('id'), - 'sizeKB': file_data.get('sizeKB'), - 'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'), - 'type': file_data.get('type'), - 'downloadUrl': available_mirror.get('url') if available_mirror else None, - 'primary': True, - 'mirrors': file_data.get('mirrors', []) - } - - # Transform hash format - if 'sha256' in file_data: - transformed_file['hashes'] = { - 'SHA256': file_data['sha256'].upper() - } - - transformed_files.append(transformed_file) - - version['files'] = transformed_files - - # Add model information - version['model'] = { - 'name': model_data.get('name'), - 'type': model_data.get('type'), - 'nsfw': model_data.get('is_nsfw', False), - 'description': model_data.get('description'), - 'tags': model_data.get('tags', []) - } - - version['creator'] = { - 'username': model_data.get('username'), - 'image': '' - } - - # Add source identifier - version['source'] = 'civarchive' - version['is_deleted'] = json_data.get('query', {}).get('is_deleted', False) - - return version - - except Exception as e: - logger.error(f"Error fetching CivArchive model version {model_id}/{version_id}: {e}") - return None + return await self.client.get_model_version(model_id, version_id) async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]: - """Not supported by CivArchive provider - requires both model_id and version_id""" - return None, "CivArchive provider requires both model_id and version_id" + return await self.client.get_model_version_info(version_id) async def get_user_models(self, username: str) -> Optional[List[Dict]]: """Not supported by CivArchive provider""" diff --git a/py/services/service_registry.py b/py/services/service_registry.py index 2cb102ae..d3d65e65 100644 --- a/py/services/service_registry.py +++ b/py/services/service_registry.py @@ -144,6 +144,27 @@ class ServiceRegistry: cls._services[service_name] = client logger.debug(f"Created and registered {service_name}") return client + + @classmethod + async def get_civarchive_client(cls): + """Get or create CivArchive client instance""" + service_name = "civarchive_client" + + if service_name in cls._services: + return cls._services[service_name] + + async with cls._get_lock(service_name): + # Double-check after acquiring lock + if service_name in cls._services: + return cls._services[service_name] + + # Import here to avoid circular imports + from .civarchive_client import CivArchiveClient + + client = await CivArchiveClient.get_instance() + cls._services[service_name] = client + logger.debug(f"Created and registered {service_name}") + return client @classmethod async def get_download_manager(cls): diff --git a/py/utils/models.py b/py/utils/models.py index 159146d5..4caffa3e 100644 --- a/py/utils/models.py +++ b/py/utils/models.py @@ -25,6 +25,7 @@ class BaseModelMetadata: favorite: bool = False # Whether the model is a favorite exclude: bool = False # Whether to exclude this model from the cache db_checked: bool = False # Whether checked in archive DB + metadata_source: Optional[str] = None # Last provider that supplied metadata last_checked_at: float = 0 # Last checked timestamp _unknown_fields: Dict[str, Any] = field(default_factory=dict, repr=False, compare=False) # Store unknown fields diff --git a/refs/civarc_api_model_data.json b/refs/civarc_api_model_data.json new file mode 100644 index 00000000..9acbe6e2 --- /dev/null +++ b/refs/civarc_api_model_data.json @@ -0,0 +1,134 @@ +{ + "id": 1746460, + "name": "Mixplin Style [Illustrious]", + "type": "LORA", + "description": "description", + "username": "Ty_Lee", + "downloadCount": 4207, + "favoriteCount": 0, + "commentCount": 8, + "ratingCount": 0, + "rating": 0, + "is_nsfw": true, + "nsfw_level": 31, + "createdAt": "2025-07-06T01:51:42.859Z", + "updatedAt": "2025-10-10T23:15:26.714Z", + "deletedAt": null, + "tags": [ + "art", + "style", + "artist style", + "styles", + "mixplin", + "artiststyle" + ], + "creator_id": "Ty_Lee", + "creator_username": "Ty_Lee", + "creator_name": "Ty_Lee", + "creator_url": "/users/Ty_Lee", + "versions": [ + { + "id": 2042594, + "name": "v2.0", + "href": "/models/1746460?modelVersionId=2042594" + }, + { + "id": 1976567, + "name": "v1.0", + "href": "/models/1746460?modelVersionId=1976567" + } + ], + "version": { + "id": 1976567, + "modelId": 1746460, + "name": "v1.0", + "baseModel": "Illustrious", + "baseModelType": "Standard", + "description": null, + "downloadCount": 437, + "ratingCount": 0, + "rating": 0, + "is_nsfw": true, + "nsfw_level": 31, + "createdAt": "2025-07-05T10:17:28.716Z", + "updatedAt": "2025-10-10T23:15:26.756Z", + "deletedAt": null, + "files": [ + { + "id": 1874043, + "name": "mxpln-illustrious-ty_lee.safetensors", + "type": "Model", + "sizeKB": 223124.37109375, + "downloadUrl": "https://civitai.com/api/download/models/1976567", + "modelId": 1746460, + "modelName": "Mixplin Style [Illustrious]", + "modelVersionId": 1976567, + "is_nsfw": true, + "nsfw_level": 31, + "sha256": "e2b7a280d6539556f23f380b3f71e4e22bc4524445c4c96526e117c6005c6ad3", + "createdAt": "2025-07-05T10:17:28.716Z", + "updatedAt": "2025-10-10T23:15:26.766Z", + "is_primary": false, + "mirrors": [ + { + "filename": "mxpln-illustrious-ty_lee.safetensors", + "url": "https://civitai.com/api/download/models/1976567", + "source": "civitai", + "model_id": 1746460, + "model_version_id": 1976567, + "deletedAt": null, + "is_gated": false, + "is_paid": false + } + ] + } + ], + "images": [ + { + "id": 86403595, + "url": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=", + "nsfwLevel": 1, + "width": 1560, + "height": 2280, + "hash": "U7G8Zp0w02%IA6%N00-;D]-W~VNG0nMw-.IV", + "type": "image", + "minor": false, + "poi": false, + "hasMeta": true, + "hasPositivePrompt": true, + "onSite": false, + "remixOfId": null, + "image_url": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=", + "link": "https://genur.art/posts/86403595" + } + ], + "trigger": [ + "mxpln" + ], + "allow_download": true, + "download_url": "/api/download/models/1976567", + "platform_url": "https://civitai.com/models/1746460?modelVersionId=1976567", + "civitai_model_id": 1746460, + "civitai_model_version_id": 1976567, + "href": "/models/1746460?modelVersionId=1976567", + "mirrors": [ + { + "platform": "tensorart", + "href": "/tensorart/models/904473536033245448/versions/904473536033245448", + "platform_url": "https://tensor.art/models/904473536033245448", + "name": "Mixplin Style MXP", + "version_name": "Mixplin", + "id": "904473536033245448", + "version_id": "904473536033245448" + } + ] + }, + "platform": "civitai", + "platform_name": "CivitAI", + "meta": { + "title": "Mixplin Style [Illustrious] - v1.0 - CivitAI Archive", + "description": "Mixplin Style [Illustrious] v1.0 is a Illustrious LORA AI model created by Ty_Lee for generating images of art, style, artist style, styles, mixplin, artiststyle", + "image": "https://img.genur.art/sig/width:450/quality:85/aHR0cHM6Ly9jLmdlbnVyLmFydC9hNmE3Njc2YS0wMWQ3LTQ1YzAtOWEzYS1mNWJiYTU4MDNiMDE=", + "canonical": "https://civarchive.com/models/1746460?modelVersionId=1976567" + } +} \ No newline at end of file diff --git a/refs/target_version.json b/refs/target_version.json new file mode 100644 index 00000000..6db1f1b1 --- /dev/null +++ b/refs/target_version.json @@ -0,0 +1,38 @@ +{ + "id": 2269146, + "modelId": 2004760, + "name": "v1.0 Illustrious", + "nsfwLevel": 1, + "trainedWords": ["PencilSketchDaal"], + "baseModel": "Illustrious", + "description": "

Illustrious. Your pencil may vary with your checkpoint.

", + "model": { + "name": "Pencil Sketch Anime", + "type": "LORA", + "nsfw": false, + "description": "description", + "tags": ["style"] + }, + "files": [ + { + "id": 2161260, + "sizeKB": 223106.37890625, + "name": "Pencil-Sketch-Illustrious.safetensors", + "type": "Model", + "hashes": { + "SHA256": "2C70479CD673B0FE056EAF4FD97C7F33A39F14853805431AC9AB84226ECE3B82" + }, + "primary": true, + "downloadUrl": "https://civitai.com/api/download/models/2269146", + "mirrors": {} + } + ], + "images": [ + {}, + {} + ], + "creator": { + "username": "Daalis", + "image": "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/eb245b49-edc8-4ed6-ad7b-6d61eb8c51de/width=96/Daalis.jpeg" + } +} diff --git a/tests/services/test_civarchive_client.py b/tests/services/test_civarchive_client.py new file mode 100644 index 00000000..6c62f878 --- /dev/null +++ b/tests/services/test_civarchive_client.py @@ -0,0 +1,239 @@ +import copy +from unittest.mock import AsyncMock + +import pytest + +from py.services import civarchive_client as civarchive_client_module +from py.services.civarchive_client import CivArchiveClient +from py.services.model_metadata_provider import ModelMetadataProviderManager + + +class DummyDownloader: + def __init__(self): + self.calls = [] + + async def make_request(self, method, url, use_auth=False, **kwargs): + self.calls.append({"method": method, "url": url, "params": kwargs.get("params")}) + return True, {} + + +@pytest.fixture(autouse=True) +def reset_singletons(): + CivArchiveClient._instance = None + ModelMetadataProviderManager._instance = None + yield + CivArchiveClient._instance = None + ModelMetadataProviderManager._instance = None + + +@pytest.fixture +def downloader(monkeypatch): + instance = DummyDownloader() + monkeypatch.setattr(civarchive_client_module, "get_downloader", AsyncMock(return_value=instance)) + return instance + + +def _base_civarchive_payload(version_id=1976567, *, trigger="mxpln", nsfw_level=31): + version_name = "v2.0" if version_id != 1976567 else "v1.0" + file_sha = "e2b7a280d6539556f23f380b3f71e4e22bc4524445c4c96526e117c6005c6ad3" + return { + "data": { + "id": 1746460, + "name": "Mixplin Style [Illustrious]", + "type": "LORA", + "description": "description", + "is_nsfw": True, + "nsfw_level": nsfw_level, + "tags": ["art", "style"], + "creator_username": "Ty_Lee", + "creator_name": "Ty_Lee", + "creator_url": "/users/Ty_Lee", + "version": { + "id": version_id, + "modelId": 1746460, + "name": version_name, + "baseModel": "Illustrious", + "description": "version description", + "downloadCount": 437, + "ratingCount": 0, + "rating": 0, + "nsfw_level": nsfw_level, + "trigger": [trigger], + "files": [ + { + "id": 1874043, + "name": "mxpln-illustrious-ty_lee.safetensors", + "type": "Model", + "sizeKB": 223124.37109375, + "downloadUrl": "https://civitai.com/api/download/models/1976567", + "sha256": file_sha, + "is_primary": False, + "mirrors": [ + { + "filename": "mxpln-illustrious-ty_lee.safetensors", + "url": "https://civitai.com/api/download/models/1976567", + "deletedAt": None, + } + ], + } + ], + "images": [ + { + "id": 86403595, + "url": "https://img.genur.art/example.png", + "nsfwLevel": 1, + } + ], + }, + "versions": [ + {"id": 2042594, "name": "v2.0"}, + {"id": 1976567, "name": "v1.0"}, + ], + } + } + + +async def test_get_model_by_hash_transforms_payload(downloader): + payload = _base_civarchive_payload() + + async def fake_make_request(method, url, use_auth=False, **kwargs): + downloader.calls.append({"url": url, "params": kwargs.get("params")}) + if url.endswith("/sha256/abc"): + return True, copy.deepcopy(payload) + return False, "unexpected" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result, error = await client.get_model_by_hash("abc") + + assert error is None + assert result["id"] == 1976567 + assert result["nsfwLevel"] == 31 + assert result["trainedWords"] == ["mxpln"] + assert result["stats"] == {"downloadCount": 437, "ratingCount": 0, "rating": 0} + assert result["model"]["name"] == "Mixplin Style [Illustrious]" + assert result["model"]["nsfw"] is True + assert result["creator"]["username"] == "Ty_Lee" + assert result["creator"]["image"] == "" + file_meta = result["files"][0] + assert file_meta["hashes"]["SHA256"] == "E2B7A280D6539556F23F380B3F71E4E22BC4524445C4C96526E117C6005C6AD3" + assert file_meta["mirrors"][0]["url"] == "https://civitai.com/api/download/models/1976567" + assert file_meta["primary"] is True + assert result["source"] == "civarchive" + assert result["images"][0]["url"] == "https://img.genur.art/example.png" + + +async def test_get_model_versions_fetches_each_version(downloader): + base_url = "https://civarchive.com/api/models/1746460" + base_payload = _base_civarchive_payload(version_id=2042594, trigger="mxpln-new", nsfw_level=5) + other_payload = _base_civarchive_payload() + + responses = { + (base_url, None): base_payload, + (base_url, (("modelVersionId", "2042594"),)): base_payload, + (base_url, (("modelVersionId", "1976567"),)): other_payload, + } + + async def fake_make_request(method, url, use_auth=False, **kwargs): + params = kwargs.get("params") + key = (url, tuple(sorted((params or {}).items())) if params else None) + downloader.calls.append({"url": url, "params": params}) + if key in responses: + return True, copy.deepcopy(responses[key]) + return False, "unexpected" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result = await client.get_model_versions("1746460") + + assert result["name"] == "Mixplin Style [Illustrious]" + assert result["type"] == "LORA" + versions = result["modelVersions"] + assert [version["id"] for version in versions] == [2042594, 1976567] + assert versions[0]["trainedWords"] == ["mxpln-new"] + assert versions[1]["trainedWords"] == ["mxpln"] + assert versions[0]["nsfwLevel"] == 5 + assert versions[1]["nsfwLevel"] == 31 + assert any(call["params"] == {"modelVersionId": "2042594"} for call in downloader.calls) + assert any(call["params"] == {"modelVersionId": "1976567"} for call in downloader.calls) + + +async def test_get_model_version_redirects_to_actual_model_id(downloader): + first_payload = _base_civarchive_payload() + first_payload["data"]["version"]["modelId"] = 222 + + base_url_request = "https://civarchive.com/api/models/111" + redirected_url_request = "https://civarchive.com/api/models/222" + + async def fake_make_request(method, url, use_auth=False, **kwargs): + downloader.calls.append({"url": url, "params": kwargs.get("params")}) + params = kwargs.get("params") or {} + if url == base_url_request: + return True, copy.deepcopy(first_payload) + if url == redirected_url_request and params.get("modelVersionId") == "1976567": + return True, copy.deepcopy(_base_civarchive_payload()) + return False, "unexpected" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result = await client.get_model_version(model_id=111, version_id=1976567) + + assert result is not None + assert result["model"]["name"] == "Mixplin Style [Illustrious]" + assert len(downloader.calls) == 2 + assert downloader.calls[1]["url"] == redirected_url_request + + +async def test_get_model_by_hash_uses_file_fallback(downloader, monkeypatch): + file_only_payload = { + "data": { + "files": [ + { + "model_id": 1746460, + "model_version_id": 1976567, + "source": "civitai", + } + ] + } + } + + version_payload = _base_civarchive_payload() + + async def fake_make_request(method, url, use_auth=False, **kwargs): + downloader.calls.append({"url": url, "params": kwargs.get("params")}) + if "/sha256/" in url: + return True, copy.deepcopy(file_only_payload) + if "/models/1746460" in url: + return True, copy.deepcopy(version_payload) + return False, "unexpected" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result, error = await client.get_model_by_hash("fallback") + + assert error is None + assert result["id"] == 1976567 + assert result["model"]["name"] == "Mixplin Style [Illustrious]" + assert any("/models/1746460" in call["url"] for call in downloader.calls) + + +async def test_get_model_by_hash_handles_not_found(downloader): + async def fake_make_request(method, url, use_auth=False, **kwargs): + return False, "Resource not found" + + downloader.make_request = fake_make_request + + client = await CivArchiveClient.get_instance() + + result, error = await client.get_model_by_hash("missing") + + assert result is None + assert error == "Model not found" diff --git a/tests/services/test_download_manager.py b/tests/services/test_download_manager.py index 48b425af..fad366f0 100644 --- a/tests/services/test_download_manager.py +++ b/tests/services/test_download_manager.py @@ -108,6 +108,7 @@ def metadata_provider(monkeypatch): "creator": {"username": "Author"}, "files": [ { + "type": "Model", "primary": True, "downloadUrl": "https://example.invalid/file.safetensors", "name": "file.safetensors", @@ -206,6 +207,7 @@ async def test_download_uses_active_mirrors(monkeypatch, scanners, metadata_prov "creator": {"username": "Author"}, "files": [ { + "type": "Model", "primary": True, "downloadUrl": "https://example.invalid/file.safetensors", "mirrors": [ diff --git a/tests/services/test_metadata_sync_service.py b/tests/services/test_metadata_sync_service.py index 470259f6..cd3ade58 100644 --- a/tests/services/test_metadata_sync_service.py +++ b/tests/services/test_metadata_sync_service.py @@ -32,6 +32,8 @@ def build_service( get_model_by_hash=AsyncMock(), get_model_version=AsyncMock(), ) + if default_provider is None: + provider.get_model_by_hash.return_value = (None, None) default_provider_factory = AsyncMock(return_value=provider) provider_selector = provider_selector or AsyncMock(return_value=provider) @@ -138,6 +140,7 @@ async def test_fetch_and_update_model_success_updates_cache(tmp_path): assert model_data["from_civitai"] is True assert model_data["civitai_deleted"] is False assert "civitai" in model_data + assert model_data["metadata_source"] == "civitai_api" helpers.metadata_manager.hydrate_model_data.assert_not_awaited() assert model_data["hydrated"] is True @@ -219,6 +222,124 @@ async def test_fetch_and_update_model_respects_deleted_without_archive(): update_cache.assert_not_awaited() +@pytest.mark.asyncio +async def test_fetch_and_update_model_prefers_civarchive_for_deleted_models(tmp_path): + default_provider = SimpleNamespace( + get_model_by_hash=AsyncMock(), + get_model_version=AsyncMock(), + ) + civarchive_provider = SimpleNamespace( + get_model_by_hash=AsyncMock( + return_value=( + { + "source": "civarchive", + "model": {"name": "Recovered", "description": "", "tags": []}, + "images": [], + "baseModel": "sdxl", + }, + None, + ) + ), + get_model_version=AsyncMock(), + ) + + async def select_provider(name: str): + return civarchive_provider if name == "civarchive_api" else default_provider + + provider_selector = AsyncMock(side_effect=select_provider) + helpers = build_service( + settings_values={"enable_metadata_archive_db": False}, + default_provider=default_provider, + provider_selector=provider_selector, + ) + + model_path = tmp_path / "model.safetensors" + model_data = { + "civitai_deleted": True, + "metadata_source": "civarchive", + "civitai": {"source": "civarchive"}, + "file_path": str(model_path), + } + update_cache = AsyncMock() + + ok, error = await helpers.service.fetch_and_update_model( + sha256="deadbeef", + file_path=str(model_path), + model_data=model_data, + update_cache_func=update_cache, + ) + + assert ok + assert error is None + provider_selector.assert_awaited_with("civarchive_api") + helpers.default_provider_factory.assert_not_awaited() + civarchive_provider.get_model_by_hash.assert_awaited_once_with("deadbeef") + update_cache.assert_awaited() + assert model_data["metadata_source"] == "civarchive" + helpers.metadata_manager.save_metadata.assert_awaited() + + +@pytest.mark.asyncio +async def test_fetch_and_update_model_falls_back_to_sqlite_after_civarchive_failure(tmp_path): + default_provider = SimpleNamespace( + get_model_by_hash=AsyncMock(), + get_model_version=AsyncMock(), + ) + civarchive_provider = SimpleNamespace( + get_model_by_hash=AsyncMock(return_value=(None, "Model not found")), + get_model_version=AsyncMock(), + ) + sqlite_payload = { + "source": "archive_db", + "model": {"name": "Recovered", "description": "", "tags": []}, + "images": [], + "baseModel": "sdxl", + } + sqlite_provider = SimpleNamespace( + get_model_by_hash=AsyncMock(return_value=(sqlite_payload, None)), + get_model_version=AsyncMock(), + ) + + async def select_provider(name: str): + if name == "civarchive_api": + return civarchive_provider + if name == "sqlite": + return sqlite_provider + return default_provider + + provider_selector = AsyncMock(side_effect=select_provider) + helpers = build_service( + settings_values={"enable_metadata_archive_db": True}, + default_provider=default_provider, + provider_selector=provider_selector, + ) + + model_path = tmp_path / "model.safetensors" + model_data = { + "civitai_deleted": True, + "db_checked": False, + "file_path": str(model_path), + } + update_cache = AsyncMock() + + ok, error = await helpers.service.fetch_and_update_model( + sha256="cafe", + file_path=str(model_path), + model_data=model_data, + update_cache_func=update_cache, + ) + + assert ok and error is None + assert civarchive_provider.get_model_by_hash.await_count == 1 + assert sqlite_provider.get_model_by_hash.await_count == 1 + assert model_data["metadata_source"] == "archive_db" + assert model_data["db_checked"] is True + assert provider_selector.await_args_list[0].args == ("civarchive_api",) + assert provider_selector.await_args_list[1].args == ("sqlite",) + update_cache.assert_awaited() + helpers.metadata_manager.save_metadata.assert_awaited() + + @pytest.mark.asyncio async def test_relink_metadata_fetches_version_and_updates_sha(tmp_path): provider = SimpleNamespace(