diff --git a/py/recipes/enrichment.py b/py/recipes/enrichment.py index 34acdfcd..908548ab 100644 --- a/py/recipes/enrichment.py +++ b/py/recipes/enrichment.py @@ -1,11 +1,11 @@ import logging import json -import re import os from typing import Any, Dict, Optional from .merger import GenParamsMerger from .base import RecipeMetadataParser from ..services.metadata_service import get_default_metadata_provider +from ..utils.civitai_utils import extract_civitai_image_id logger = logging.getLogger(__name__) @@ -39,11 +39,12 @@ class RecipeEnricher: source_url = recipe.get("source_url") or recipe.get("source_path", "") # Check if it's a Civitai image URL - image_id_match = re.search(r'civitai\.com/images/(\d+)', str(source_url)) - if image_id_match: - image_id = image_id_match.group(1) + image_id = extract_civitai_image_id(str(source_url)) + if image_id: try: - image_info = await civitai_client.get_image_info(image_id) + image_info = await civitai_client.get_image_info( + image_id, source_url=str(source_url) + ) if image_info: # Handle nested meta often found in Civitai API responses raw_meta = image_info.get("meta") diff --git a/py/routes/handlers/recipe_handlers.py b/py/routes/handlers/recipe_handlers.py index 28025fa6..5b477115 100644 --- a/py/routes/handlers/recipe_handlers.py +++ b/py/routes/handlers/recipe_handlers.py @@ -1202,7 +1202,9 @@ class RecipeManagementHandler: raise RecipeDownloadError( "Civitai client unavailable for image download" ) - image_info = await civitai_client.get_image_info(civitai_image_id) + image_info = await civitai_client.get_image_info( + civitai_image_id, source_url=image_url + ) if not image_info: raise RecipeDownloadError( "Failed to fetch image information from Civitai" diff --git a/py/services/civitai_client.py b/py/services/civitai_client.py index 2bdd46c7..c7e5d6c0 100644 --- a/py/services/civitai_client.py +++ b/py/services/civitai_client.py @@ -9,7 +9,7 @@ from .model_metadata_provider import ( ) from .downloader import get_downloader from .errors import RateLimitError, ResourceNotFoundError -from ..utils.civitai_utils import resolve_license_payload +from ..utils.civitai_utils import extract_civitai_page_host, resolve_license_payload logger = logging.getLogger(__name__) @@ -40,6 +40,23 @@ class CivitaiClient: self._initialized = True self.base_url = "https://civitai.com/api/v1" + self._image_info_api_hosts = ("civitai.com", "civitai.red") + + def _build_image_info_url(self, host: str, image_id: str) -> str: + return f"https://{host}/api/v1/images?imageId={image_id}&nsfw=X" + + def _resolve_image_info_hosts(self, source_url: str | None) -> List[str]: + preferred_host = extract_civitai_page_host(source_url) + if preferred_host in self._image_info_api_hosts: + return [ + preferred_host, + *[ + host + for host in self._image_info_api_hosts + if host != preferred_host + ], + ] + return list(self._image_info_api_hosts) async def _make_request( self, @@ -479,48 +496,106 @@ class CivitaiClient: logger.error(error_msg) return None, error_msg - async def get_image_info(self, image_id: str) -> Optional[Dict]: + async def get_image_info( + self, image_id: str, source_url: str | None = None + ) -> Optional[Dict]: """Fetch image information from Civitai API Args: image_id: The Civitai image ID + source_url: Optional original image page URL used to prioritize + ``civitai.com`` vs ``civitai.red`` image lookups. Returns: Optional[Dict]: The image data or None if not found """ try: - url = f"{self.base_url}/images?imageId={image_id}&nsfw=X" requested_id = int(image_id) + candidate_hosts = self._resolve_image_info_hosts(source_url) + last_error: Any = None + logger.debug( + "Fetching image info for ID %s with host order %s", + image_id, + candidate_hosts, + ) - logger.debug(f"Fetching image info for ID: {image_id}") - success, result = await self._make_request("GET", url, use_auth=True) + for index, host in enumerate(candidate_hosts): + url = self._build_image_info_url(host, image_id) + success, result = await self._make_request("GET", url, use_auth=True) - if success: - if result and "items" in result and isinstance(result["items"], list): - items = result["items"] + if not success: + last_error = result + if index < len(candidate_hosts) - 1: + logger.warning( + "Failed to fetch image info for ID %s from %s: %s. Trying fallback host.", + image_id, + host, + result, + ) + continue - # First, try to find the item with matching ID - for item in items: - if isinstance(item, dict) and item.get("id") == requested_id: - logger.debug(f"Successfully fetched image info for ID: {image_id}") - return item - - # No matching ID found - log warning with details about returned items - returned_ids = [ - item.get("id") for item in items - if isinstance(item, dict) and "id" in item - ] - logger.warning( - f"CivitAI API returned no matching image for requested ID {image_id}. " - f"Returned {len(items)} item(s) with IDs: {returned_ids}. " - f"This may indicate the image was deleted, hidden, or there is a database lag." + logger.error( + "Failed to fetch image info for ID %s from %s: %s", + image_id, + host, + result, ) return None - logger.warning(f"No image found with ID: {image_id}") + if result and "items" in result and isinstance(result["items"], list): + items = result["items"] + + for item in items: + if isinstance(item, dict) and item.get("id") == requested_id: + logger.debug( + "Successfully fetched image info for ID %s from %s", + image_id, + host, + ) + return item + + returned_ids = [ + item.get("id") + for item in items + if isinstance(item, dict) and "id" in item + ] + + if index < len(candidate_hosts) - 1: + logger.info( + "No matching image for requested ID %s from %s; trying fallback host. Returned %d item(s) with IDs: %s", + image_id, + host, + len(items), + returned_ids, + ) + continue + + logger.warning( + "CivitAI API returned no matching image for requested ID %s from %s. Returned %d item(s) with IDs: %s. This may indicate the image was deleted, hidden, or there is a database lag.", + image_id, + host, + len(items), + returned_ids, + ) + return None + + if index < len(candidate_hosts) - 1: + logger.info( + "No image found with ID %s from %s; trying fallback host", + image_id, + host, + ) + continue + + logger.warning("No image found with ID: %s", image_id) return None - logger.error(f"Failed to fetch image info for ID: {image_id}: {result}") + if last_error is not None: + logger.error( + "Failed to fetch image info for ID %s from all candidate hosts: %s", + image_id, + last_error, + ) return None except RateLimitError: raise diff --git a/py/services/recipes/analysis_service.py b/py/services/recipes/analysis_service.py index c2f4577a..97f1c6c2 100644 --- a/py/services/recipes/analysis_service.py +++ b/py/services/recipes/analysis_service.py @@ -5,7 +5,6 @@ from __future__ import annotations import base64 import io import os -import re import tempfile from dataclasses import dataclass from typing import Any, Callable, Optional @@ -14,7 +13,7 @@ import numpy as np from PIL import Image from ...utils.utils import calculate_recipe_fingerprint -from ...utils.civitai_utils import rewrite_preview_url +from ...utils.civitai_utils import extract_civitai_image_id, rewrite_preview_url from .errors import ( RecipeDownloadError, RecipeNotFoundError, @@ -104,9 +103,11 @@ class RecipeAnalysisService: extension = ".jpg" # Default try: - civitai_match = re.match(r"https://civitai\.com/images/(\d+)", url) - if civitai_match: - image_info = await civitai_client.get_image_info(civitai_match.group(1)) + civitai_image_id = extract_civitai_image_id(url) + if civitai_image_id: + image_info = await civitai_client.get_image_info( + civitai_image_id, source_url=url + ) if not image_info: raise RecipeDownloadError( "Failed to fetch image information from Civitai" diff --git a/py/utils/civitai_utils.py b/py/utils/civitai_utils.py index 0a24f84d..3c7326d8 100644 --- a/py/utils/civitai_utils.py +++ b/py/utils/civitai_utils.py @@ -80,6 +80,16 @@ def extract_civitai_image_id(url: str | None) -> str | None: return path_match.group(1) +def extract_civitai_page_host(url: str | None) -> str | None: + """Extract the supported Civitai page host from a URL.""" + + parsed = _parse_supported_civitai_page_url(url) + if parsed is None: + return None + + return parsed.hostname.lower() if parsed.hostname else None + + def _normalize_commercial_values(value: Any) -> Sequence[str]: """Return a normalized list of commercial permissions preserving source values.""" @@ -263,6 +273,7 @@ def rewrite_preview_url( __all__ = [ "build_license_flags", "extract_civitai_image_id", + "extract_civitai_page_host", "extract_civitai_model_url_parts", "is_supported_civitai_page_host", "resolve_license_payload", diff --git a/tests/routes/test_recipe_routes.py b/tests/routes/test_recipe_routes.py index 94cd0384..19920078 100644 --- a/tests/routes/test_recipe_routes.py +++ b/tests/routes/test_recipe_routes.py @@ -274,7 +274,9 @@ class StubCivitaiClient: def __init__(self) -> None: self.image_info: Dict[str, Any] = {} - async def get_image_info(self, image_id: str) -> Optional[Dict[str, Any]]: + async def get_image_info( + self, image_id: str, source_url: str | None = None + ) -> Optional[Dict[str, Any]]: return self.image_info.get(image_id) @@ -668,6 +670,25 @@ async def test_import_remote_recipe_supports_civitai_red(monkeypatch, tmp_path: assert "width=450,optimized=true" in harness.downloader.urls[0] +async def test_analyze_remote_image_supports_civitai_red( + monkeypatch, tmp_path: Path +) -> None: + async with recipe_harness(monkeypatch, tmp_path) as harness: + harness.analysis.result = SimpleNamespace(payload={"loras": []}, status=200) + + response = await harness.client.post( + "/api/lm/recipes/analyze-image", + json={"url": "https://civitai.red/images/126920345"}, + ) + payload = await response.json() + + assert response.status == 200 + assert payload == {"loras": []} + assert harness.analysis.remote_calls == [ + "https://civitai.red/images/126920345" + ] + + async def test_analyze_uploaded_image_error_path(monkeypatch, tmp_path: Path) -> None: async with recipe_harness(monkeypatch, tmp_path) as harness: harness.analysis.raise_for_uploaded = RecipeValidationError( diff --git a/tests/services/test_civitai_client.py b/tests/services/test_civitai_client.py index f9b2ee7e..c42df364 100644 --- a/tests/services/test_civitai_client.py +++ b/tests/services/test_civitai_client.py @@ -530,6 +530,97 @@ async def test_get_image_info_handles_missing(monkeypatch, downloader): assert result is None +async def test_get_image_info_prefers_red_host_for_red_source(monkeypatch, downloader): + requested_urls = [] + + async def fake_make_request(method, url, use_auth=True, **kwargs): + requested_urls.append(url) + return True, {"items": [{"id": 124950237, "name": "target"}]} + + downloader.make_request = fake_make_request + + client = await CivitaiClient.get_instance() + + result = await client.get_image_info( + "124950237", source_url="https://civitai.red/images/124950237" + ) + + assert result == {"id": 124950237, "name": "target"} + assert requested_urls == [ + "https://civitai.red/api/v1/images?imageId=124950237&nsfw=X" + ] + + +async def test_get_image_info_falls_back_from_com_to_red(monkeypatch, downloader): + requested_urls = [] + + async def fake_make_request(method, url, use_auth=True, **kwargs): + requested_urls.append(url) + if url.startswith("https://civitai.com/"): + return True, {"items": []} + return True, {"items": [{"id": 124950237, "name": "fallback"}]} + + downloader.make_request = fake_make_request + + client = await CivitaiClient.get_instance() + + result = await client.get_image_info("124950237") + + assert result == {"id": 124950237, "name": "fallback"} + assert requested_urls == [ + "https://civitai.com/api/v1/images?imageId=124950237&nsfw=X", + "https://civitai.red/api/v1/images?imageId=124950237&nsfw=X", + ] + + +async def test_get_image_info_falls_back_from_red_to_com(monkeypatch, downloader): + requested_urls = [] + + async def fake_make_request(method, url, use_auth=True, **kwargs): + requested_urls.append(url) + if url.startswith("https://civitai.red/"): + return True, {"items": []} + return True, {"items": [{"id": 124950237, "name": "fallback"}]} + + downloader.make_request = fake_make_request + + client = await CivitaiClient.get_instance() + + result = await client.get_image_info( + "124950237", source_url="https://civitai.red/images/124950237" + ) + + assert result == {"id": 124950237, "name": "fallback"} + assert requested_urls == [ + "https://civitai.red/api/v1/images?imageId=124950237&nsfw=X", + "https://civitai.com/api/v1/images?imageId=124950237&nsfw=X", + ] + + +async def test_get_image_info_falls_back_after_request_failure(monkeypatch, downloader): + requested_urls = [] + + async def fake_make_request(method, url, use_auth=True, **kwargs): + requested_urls.append(url) + if url.startswith("https://civitai.red/"): + return False, "403 forbidden" + return True, {"items": [{"id": 124950237, "name": "fallback"}]} + + downloader.make_request = fake_make_request + + client = await CivitaiClient.get_instance() + + result = await client.get_image_info( + "124950237", source_url="https://civitai.red/images/124950237" + ) + + assert result == {"id": 124950237, "name": "fallback"} + assert requested_urls == [ + "https://civitai.red/api/v1/images?imageId=124950237&nsfw=X", + "https://civitai.com/api/v1/images?imageId=124950237&nsfw=X", + ] + + async def test_get_image_info_handles_invalid_id(monkeypatch, downloader, caplog): """When given a non-numeric image ID, return None and log error.""" client = await CivitaiClient.get_instance() diff --git a/tests/services/test_recipe_repair.py b/tests/services/test_recipe_repair.py index bc1020cd..681573b9 100644 --- a/tests/services/test_recipe_repair.py +++ b/tests/services/test_recipe_repair.py @@ -6,7 +6,7 @@ from types import SimpleNamespace # We define these here to help with spec= if needed class MockCivitaiClient: - async def get_image_info(self, image_id): + async def get_image_info(self, image_id, source_url=None): pass class MockPersistenceService: @@ -119,6 +119,50 @@ async def test_repair_all_recipes_with_enriched_checkpoint_id(setup_scanner): assert "hash" not in checkpoint assert "file_name" not in checkpoint + +@pytest.mark.asyncio +async def test_repair_all_recipes_supports_civitai_red_source_url(setup_scanner): + recipe_scanner, mock_civitai_client, mock_metadata_provider = setup_scanner + + recipe = { + "id": "r1", + "title": "Red Recipe", + "source_url": "https://civitai.red/images/12345", + "checkpoint": None, + "gen_params": {"prompt": ""}, + } + recipe_scanner._cache = SimpleNamespace(raw_data=[recipe]) + + mock_civitai_client.get_image_info.return_value = { + "modelVersionId": 5678, + "meta": {"prompt": "from red"}, + } + mock_metadata_provider.get_model_version_info.return_value = ( + { + "id": 5678, + "modelId": 1234, + "name": "v1.0", + "model": {"name": "Full Model Name"}, + "baseModel": "SDXL 1.0", + "images": [{"url": "https://image.url/thumb.jpg"}], + "files": [ + { + "type": "Model", + "hashes": {"SHA256": "ABCDEF"}, + "name": "full_filename.safetensors", + } + ], + }, + None, + ) + + results = await recipe_scanner.repair_all_recipes() + + assert results["repaired"] == 1 + mock_civitai_client.get_image_info.assert_called_with( + "12345", source_url="https://civitai.red/images/12345" + ) + @pytest.mark.asyncio async def test_repair_all_recipes_with_enriched_checkpoint_hash(setup_scanner): recipe_scanner, mock_civitai_client, mock_metadata_provider = setup_scanner diff --git a/tests/services/test_recipe_services.py b/tests/services/test_recipe_services.py index 32e93ee1..255441f7 100644 --- a/tests/services/test_recipe_services.py +++ b/tests/services/test_recipe_services.py @@ -678,7 +678,7 @@ async def test_analyze_remote_video(tmp_path): ) class DummyClient: - async def get_image_info(self, image_id): + async def get_image_info(self, image_id, source_url=None): return { "url": "https://civitai.com/video.mp4", "type": "video", @@ -698,3 +698,60 @@ async def test_analyze_remote_video(tmp_path): assert result.payload["is_video"] is True assert result.payload["extension"] == ".mp4" assert result.payload["image_base64"] is not None + + +@pytest.mark.asyncio +async def test_analyze_remote_image_supports_civitai_red(): + exif_utils = DummyExifUtils() + + class DummyFactory: + def create_parser(self, metadata): + async def parse_metadata(m, recipe_scanner=None, civitai_client=None): + return {"loras": [], "gen_params": {"prompt": "red prompt"}} + + return SimpleNamespace(parse_metadata=parse_metadata) + + async def downloader_factory(): + class Downloader: + async def download_file(self, url, path, use_auth=False): + Path(path).write_bytes(b"fake-image") + return True, "success" + + return Downloader() + + service = RecipeAnalysisService( + exif_utils=exif_utils, + recipe_parser_factory=DummyFactory(), + downloader_factory=downloader_factory, + metadata_collector=None, + metadata_processor_cls=None, + metadata_registry_cls=None, + standalone_mode=False, + logger=logging.getLogger("test"), + ) + + class DummyClient: + def __init__(self): + self.calls = [] + + async def get_image_info(self, image_id, source_url=None): + self.calls.append((image_id, source_url)) + return { + "url": "https://image.civitai.com/x/y/original=true/sample.jpeg", + "type": "image", + "meta": {"prompt": "red prompt"}, + } + + class DummyScanner: + async def find_recipes_by_fingerprint(self, fingerprint): + return [] + + client = DummyClient() + result = await service.analyze_remote_image( + url="https://civitai.red/images/123", + recipe_scanner=DummyScanner(), + civitai_client=client, + ) + + assert client.calls == [("123", "https://civitai.red/images/123")] + assert result.payload["loras"] == []