Compare commits

..

2 Commits

Author SHA1 Message Date
Will Miao
af2146f96c fix(civitai): fallback image info hosts on request failure 2026-04-16 09:29:03 +08:00
Will Miao
bdc8dec860 fix(civitai): support civitai.red URLs (#897) 2026-04-16 08:54:12 +08:00
19 changed files with 635 additions and 78 deletions

View File

@@ -685,9 +685,9 @@
"title": "Import a recipe from image or URL", "title": "Import a recipe from image or URL",
"urlLocalPath": "URL / Local Path", "urlLocalPath": "URL / Local Path",
"uploadImage": "Upload Image", "uploadImage": "Upload Image",
"urlSectionDescription": "Input a Civitai image URL or local file path to import as a recipe.", "urlSectionDescription": "Input a Civitai image URL from civitai.com or civitai.red, or a local file path, to import as a recipe.",
"imageUrlOrPath": "Image URL or File Path:", "imageUrlOrPath": "Image URL or File Path:",
"urlPlaceholder": "https://civitai.com/images/... or C:/path/to/image.png", "urlPlaceholder": "https://civitai.com/images/... or https://civitai.red/images/... or C:/path/to/image.png",
"fetchImage": "Fetch Image", "fetchImage": "Fetch Image",
"uploadSectionDescription": "Upload an image with LoRA metadata to import as a recipe.", "uploadSectionDescription": "Upload an image with LoRA metadata to import as a recipe.",
"selectImage": "Select Image", "selectImage": "Select Image",
@@ -1090,9 +1090,9 @@
}, },
"proceedText": "Only proceed if you're sure this is what you want.", "proceedText": "Only proceed if you're sure this is what you want.",
"urlLabel": "Civitai Model URL:", "urlLabel": "Civitai Model URL:",
"urlPlaceholder": "https://civitai.com/models/649516/model-name?modelVersionId=726676", "urlPlaceholder": "https://civitai.com/models/649516/model-name?modelVersionId=726676 or https://civitai.red/models/649516/model-name?modelVersionId=726676",
"helpText": { "helpText": {
"title": "Paste any Civitai model URL. Supported formats:", "title": "Paste any Civitai model URL from civitai.com or civitai.red. Supported formats:",
"format1": "https://civitai.com/models/649516", "format1": "https://civitai.com/models/649516",
"format2": "https://civitai.com/models/649516?modelVersionId=726676", "format2": "https://civitai.com/models/649516?modelVersionId=726676",
"format3": "https://civitai.com/models/649516/model-name?modelVersionId=726676", "format3": "https://civitai.com/models/649516/model-name?modelVersionId=726676",

View File

@@ -685,9 +685,9 @@
"title": "从图片或 URL 导入配方", "title": "从图片或 URL 导入配方",
"urlLocalPath": "URL / 本地路径", "urlLocalPath": "URL / 本地路径",
"uploadImage": "上传图片", "uploadImage": "上传图片",
"urlSectionDescription": "输入 Civitai 图片 URL 或本地文件路径以导入为配方。", "urlSectionDescription": "输入来自 civitai.com 或 civitai.red 的 Civitai 图片 URL或本地文件路径以导入为配方。",
"imageUrlOrPath": "图片 URL 或文件路径:", "imageUrlOrPath": "图片 URL 或文件路径:",
"urlPlaceholder": "https://civitai.com/images/... 或 C:/path/to/image.png", "urlPlaceholder": "https://civitai.com/images/... 或 https://civitai.red/images/... 或 C:/path/to/image.png",
"fetchImage": "获取图片", "fetchImage": "获取图片",
"uploadSectionDescription": "上传带有 LoRA 元数据的图片以导入为配方。", "uploadSectionDescription": "上传带有 LoRA 元数据的图片以导入为配方。",
"selectImage": "选择图片", "selectImage": "选择图片",
@@ -1090,9 +1090,9 @@
}, },
"proceedText": "仅在你确定需要此操作时继续。", "proceedText": "仅在你确定需要此操作时继续。",
"urlLabel": "Civitai 模型 URL", "urlLabel": "Civitai 模型 URL",
"urlPlaceholder": "https://civitai.com/models/649516/model-name?modelVersionId=726676", "urlPlaceholder": "https://civitai.com/models/649516/model-name?modelVersionId=726676 或 https://civitai.red/models/649516/model-name?modelVersionId=726676",
"helpText": { "helpText": {
"title": "粘贴任意 Civitai 模型 URL。支持格式", "title": "粘贴任意来自 civitai.com 或 civitai.red 的 Civitai 模型 URL。支持格式",
"format1": "https://civitai.com/models/649516", "format1": "https://civitai.com/models/649516",
"format2": "https://civitai.com/models/649516?modelVersionId=726676", "format2": "https://civitai.com/models/649516?modelVersionId=726676",
"format3": "https://civitai.com/models/649516/model-name?modelVersionId=726676", "format3": "https://civitai.com/models/649516/model-name?modelVersionId=726676",

View File

@@ -1,11 +1,11 @@
import logging import logging
import json import json
import re
import os import os
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from .merger import GenParamsMerger from .merger import GenParamsMerger
from .base import RecipeMetadataParser from .base import RecipeMetadataParser
from ..services.metadata_service import get_default_metadata_provider from ..services.metadata_service import get_default_metadata_provider
from ..utils.civitai_utils import extract_civitai_image_id
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -39,11 +39,12 @@ class RecipeEnricher:
source_url = recipe.get("source_url") or recipe.get("source_path", "") source_url = recipe.get("source_url") or recipe.get("source_path", "")
# Check if it's a Civitai image URL # Check if it's a Civitai image URL
image_id_match = re.search(r'civitai\.com/images/(\d+)', str(source_url)) image_id = extract_civitai_image_id(str(source_url))
if image_id_match: if image_id:
image_id = image_id_match.group(1)
try: try:
image_info = await civitai_client.get_image_info(image_id) image_info = await civitai_client.get_image_info(
image_id, source_url=str(source_url)
)
if image_info: if image_info:
# Handle nested meta often found in Civitai API responses # Handle nested meta often found in Civitai API responses
raw_meta = image_info.get("meta") raw_meta = image_info.get("meta")

View File

@@ -26,7 +26,7 @@ from ...services.recipes import (
RecipeValidationError, RecipeValidationError,
) )
from ...services.metadata_service import get_default_metadata_provider from ...services.metadata_service import get_default_metadata_provider
from ...utils.civitai_utils import rewrite_preview_url from ...utils.civitai_utils import extract_civitai_image_id, rewrite_preview_url
from ...utils.exif_utils import ExifUtils from ...utils.exif_utils import ExifUtils
from ...recipes.merger import GenParamsMerger from ...recipes.merger import GenParamsMerger
from ...recipes.enrichment import RecipeEnricher from ...recipes.enrichment import RecipeEnricher
@@ -1196,13 +1196,15 @@ class RecipeManagementHandler:
temp_path = temp_file.name temp_path = temp_file.name
download_url = image_url download_url = image_url
image_info = None image_info = None
civitai_match = re.match(r"https://civitai\.com/images/(\d+)", image_url) civitai_image_id = extract_civitai_image_id(image_url)
if civitai_match: if civitai_image_id:
if civitai_client is None: if civitai_client is None:
raise RecipeDownloadError( raise RecipeDownloadError(
"Civitai client unavailable for image download" "Civitai client unavailable for image download"
) )
image_info = await civitai_client.get_image_info(civitai_match.group(1)) image_info = await civitai_client.get_image_info(
civitai_image_id, source_url=image_url
)
if not image_info: if not image_info:
raise RecipeDownloadError( raise RecipeDownloadError(
"Failed to fetch image information from Civitai" "Failed to fetch image information from Civitai"
@@ -1236,7 +1238,7 @@ class RecipeManagementHandler:
return ( return (
file_obj.read(), file_obj.read(),
extension, extension,
image_info.get("meta") if civitai_match and image_info else None, image_info.get("meta") if civitai_image_id and image_info else None,
) )
except RecipeDownloadError: except RecipeDownloadError:
raise raise

View File

@@ -9,7 +9,7 @@ from .model_metadata_provider import (
) )
from .downloader import get_downloader from .downloader import get_downloader
from .errors import RateLimitError, ResourceNotFoundError from .errors import RateLimitError, ResourceNotFoundError
from ..utils.civitai_utils import resolve_license_payload from ..utils.civitai_utils import extract_civitai_page_host, resolve_license_payload
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -40,6 +40,23 @@ class CivitaiClient:
self._initialized = True self._initialized = True
self.base_url = "https://civitai.com/api/v1" self.base_url = "https://civitai.com/api/v1"
self._image_info_api_hosts = ("civitai.com", "civitai.red")
def _build_image_info_url(self, host: str, image_id: str) -> str:
return f"https://{host}/api/v1/images?imageId={image_id}&nsfw=X"
def _resolve_image_info_hosts(self, source_url: str | None) -> List[str]:
preferred_host = extract_civitai_page_host(source_url)
if preferred_host in self._image_info_api_hosts:
return [
preferred_host,
*[
host
for host in self._image_info_api_hosts
if host != preferred_host
],
]
return list(self._image_info_api_hosts)
async def _make_request( async def _make_request(
self, self,
@@ -479,48 +496,106 @@ class CivitaiClient:
logger.error(error_msg) logger.error(error_msg)
return None, error_msg return None, error_msg
async def get_image_info(self, image_id: str) -> Optional[Dict]: async def get_image_info(
self, image_id: str, source_url: str | None = None
) -> Optional[Dict]:
"""Fetch image information from Civitai API """Fetch image information from Civitai API
Args: Args:
image_id: The Civitai image ID image_id: The Civitai image ID
source_url: Optional original image page URL used to prioritize
``civitai.com`` vs ``civitai.red`` image lookups.
Returns: Returns:
Optional[Dict]: The image data or None if not found Optional[Dict]: The image data or None if not found
""" """
try: try:
url = f"{self.base_url}/images?imageId={image_id}&nsfw=X"
requested_id = int(image_id) requested_id = int(image_id)
candidate_hosts = self._resolve_image_info_hosts(source_url)
last_error: Any = None
logger.debug(
"Fetching image info for ID %s with host order %s",
image_id,
candidate_hosts,
)
logger.debug(f"Fetching image info for ID: {image_id}") for index, host in enumerate(candidate_hosts):
success, result = await self._make_request("GET", url, use_auth=True) url = self._build_image_info_url(host, image_id)
success, result = await self._make_request("GET", url, use_auth=True)
if success: if not success:
if result and "items" in result and isinstance(result["items"], list): last_error = result
items = result["items"] if index < len(candidate_hosts) - 1:
logger.warning(
"Failed to fetch image info for ID %s from %s: %s. Trying fallback host.",
image_id,
host,
result,
)
continue
# First, try to find the item with matching ID logger.error(
for item in items: "Failed to fetch image info for ID %s from %s: %s",
if isinstance(item, dict) and item.get("id") == requested_id: image_id,
logger.debug(f"Successfully fetched image info for ID: {image_id}") host,
return item result,
# No matching ID found - log warning with details about returned items
returned_ids = [
item.get("id") for item in items
if isinstance(item, dict) and "id" in item
]
logger.warning(
f"CivitAI API returned no matching image for requested ID {image_id}. "
f"Returned {len(items)} item(s) with IDs: {returned_ids}. "
f"This may indicate the image was deleted, hidden, or there is a database lag."
) )
return None return None
logger.warning(f"No image found with ID: {image_id}") if result and "items" in result and isinstance(result["items"], list):
items = result["items"]
for item in items:
if isinstance(item, dict) and item.get("id") == requested_id:
logger.debug(
"Successfully fetched image info for ID %s from %s",
image_id,
host,
)
return item
returned_ids = [
item.get("id")
for item in items
if isinstance(item, dict) and "id" in item
]
if index < len(candidate_hosts) - 1:
logger.info(
"No matching image for requested ID %s from %s; trying fallback host. Returned %d item(s) with IDs: %s",
image_id,
host,
len(items),
returned_ids,
)
continue
logger.warning(
"CivitAI API returned no matching image for requested ID %s from %s. Returned %d item(s) with IDs: %s. This may indicate the image was deleted, hidden, or there is a database lag.",
image_id,
host,
len(items),
returned_ids,
)
return None
if index < len(candidate_hosts) - 1:
logger.info(
"No image found with ID %s from %s; trying fallback host",
image_id,
host,
)
continue
logger.warning("No image found with ID: %s", image_id)
return None return None
logger.error(f"Failed to fetch image info for ID: {image_id}: {result}") if last_error is not None:
logger.error(
"Failed to fetch image info for ID %s from all candidate hosts: %s",
image_id,
last_error,
)
return None return None
except RateLimitError: except RateLimitError:
raise raise

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
import base64 import base64
import io import io
import os import os
import re
import tempfile import tempfile
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Callable, Optional from typing import Any, Callable, Optional
@@ -14,7 +13,7 @@ import numpy as np
from PIL import Image from PIL import Image
from ...utils.utils import calculate_recipe_fingerprint from ...utils.utils import calculate_recipe_fingerprint
from ...utils.civitai_utils import rewrite_preview_url from ...utils.civitai_utils import extract_civitai_image_id, rewrite_preview_url
from .errors import ( from .errors import (
RecipeDownloadError, RecipeDownloadError,
RecipeNotFoundError, RecipeNotFoundError,
@@ -104,9 +103,11 @@ class RecipeAnalysisService:
extension = ".jpg" # Default extension = ".jpg" # Default
try: try:
civitai_match = re.match(r"https://civitai\.com/images/(\d+)", url) civitai_image_id = extract_civitai_image_id(url)
if civitai_match: if civitai_image_id:
image_info = await civitai_client.get_image_info(civitai_match.group(1)) image_info = await civitai_client.get_image_info(
civitai_image_id, source_url=url
)
if not image_info: if not image_info:
raise RecipeDownloadError( raise RecipeDownloadError(
"Failed to fetch image information from Civitai" "Failed to fetch image information from Civitai"

View File

@@ -2,10 +2,12 @@
from __future__ import annotations from __future__ import annotations
import re
from typing import Any, Dict, Iterable, Mapping, Sequence from typing import Any, Dict, Iterable, Mapping, Sequence
from urllib.parse import urlparse, urlunparse from urllib.parse import parse_qs, urlparse, urlunparse
_SUPPORTED_CIVITAI_PAGE_HOSTS = frozenset({"civitai.com", "civitai.red"})
_DEFAULT_ALLOW_COMMERCIAL_USE: Sequence[str] = ("Sell",) _DEFAULT_ALLOW_COMMERCIAL_USE: Sequence[str] = ("Sell",)
_LICENSE_DEFAULTS: Dict[str, Any] = { _LICENSE_DEFAULTS: Dict[str, Any] = {
"allowNoCredit": True, "allowNoCredit": True,
@@ -17,6 +19,77 @@ _COMMERCIAL_ALLOWED_VALUES = {"sell", "rent", "rentcivit", "image"}
_COMMERCIAL_SHIFT = 1 _COMMERCIAL_SHIFT = 1
def is_supported_civitai_page_host(hostname: str | None) -> bool:
"""Return whether the hostname is a supported Civitai page domain."""
if not hostname:
return False
return hostname.lower() in _SUPPORTED_CIVITAI_PAGE_HOSTS
def _parse_supported_civitai_page_url(url: str | None):
if not url:
return None
try:
parsed = urlparse(url)
except ValueError:
return None
if parsed.scheme not in {"http", "https"}:
return None
if not is_supported_civitai_page_host(parsed.hostname):
return None
return parsed
def extract_civitai_model_url_parts(
url: str | None,
) -> tuple[str | None, str | None]:
"""Extract model and version identifiers from a supported Civitai model URL."""
parsed = _parse_supported_civitai_page_url(url)
if parsed is None:
return None, None
path_match = re.search(r"/models/(\d+)", parsed.path)
if not path_match:
return None, None
model_id = path_match.group(1)
query_params = parse_qs(parsed.query)
version_values = query_params.get("modelVersionId") or []
version_id = version_values[0] if version_values else None
return model_id, version_id
def extract_civitai_image_id(url: str | None) -> str | None:
"""Extract the image identifier from a supported Civitai image page URL."""
parsed = _parse_supported_civitai_page_url(url)
if parsed is None:
return None
path_match = re.search(r"/images/(\d+)", parsed.path)
if not path_match:
return None
return path_match.group(1)
def extract_civitai_page_host(url: str | None) -> str | None:
"""Extract the supported Civitai page host from a URL."""
parsed = _parse_supported_civitai_page_url(url)
if parsed is None:
return None
return parsed.hostname.lower() if parsed.hostname else None
def _normalize_commercial_values(value: Any) -> Sequence[str]: def _normalize_commercial_values(value: Any) -> Sequence[str]:
"""Return a normalized list of commercial permissions preserving source values.""" """Return a normalized list of commercial permissions preserving source values."""
@@ -199,6 +272,10 @@ def rewrite_preview_url(
__all__ = [ __all__ = [
"build_license_flags", "build_license_flags",
"extract_civitai_image_id",
"extract_civitai_page_host",
"extract_civitai_model_url_parts",
"is_supported_civitai_page_host",
"resolve_license_payload", "resolve_license_payload",
"resolve_license_info", "resolve_license_info",
"rewrite_preview_url", "rewrite_preview_url",

View File

@@ -6,6 +6,7 @@ import { bulkManager } from '../../managers/BulkManager.js';
import { MODEL_CONFIG } from '../../api/apiConfig.js'; import { MODEL_CONFIG } from '../../api/apiConfig.js';
import { translate } from '../../utils/i18nHelpers.js'; import { translate } from '../../utils/i18nHelpers.js';
import { getNsfwLevelSelector } from '../shared/NsfwLevelSelector.js'; import { getNsfwLevelSelector } from '../shared/NsfwLevelSelector.js';
import { extractCivitaiModelUrlParts } from '../../utils/civitaiUtils.js';
// Mixin with shared functionality for LoraContextMenu and CheckpointContextMenu // Mixin with shared functionality for LoraContextMenu and CheckpointContextMenu
export const ModelContextMenuMixin = { export const ModelContextMenuMixin = {
@@ -154,25 +155,7 @@ export const ModelContextMenuMixin = {
}, },
extractModelVersionId(url) { extractModelVersionId(url) {
try { return extractCivitaiModelUrlParts(url);
// Handle all three URL formats:
// 1. https://civitai.com/models/649516
// 2. https://civitai.com/models/649516?modelVersionId=726676
// 3. https://civitai.com/models/649516/cynthia-pokemon-diamond-and-pearl-pdxl-lora?modelVersionId=726676
const parsedUrl = new URL(url);
// Extract model ID from path
const pathMatch = parsedUrl.pathname.match(/\/models\/(\d+)/);
const modelId = pathMatch ? pathMatch[1] : null;
// Extract model version ID from query parameters
const modelVersionId = parsedUrl.searchParams.get('modelVersionId');
return { modelId, modelVersionId };
} catch (e) {
return { modelId: null, modelVersionId: null };
}
}, },
parseModelId(value) { parseModelId(value) {

View File

@@ -6,6 +6,7 @@ import { getModelApiClient, resetAndReload } from '../api/modelApiFactory.js';
import { getStorageItem, setStorageItem } from '../utils/storageHelpers.js'; import { getStorageItem, setStorageItem } from '../utils/storageHelpers.js';
import { FolderTreeManager } from '../components/FolderTreeManager.js'; import { FolderTreeManager } from '../components/FolderTreeManager.js';
import { translate } from '../utils/i18nHelpers.js'; import { translate } from '../utils/i18nHelpers.js';
import { extractCivitaiModelUrlParts } from '../utils/civitaiUtils.js';
export class DownloadManager { export class DownloadManager {
constructor() { constructor() {
@@ -197,21 +198,22 @@ export class DownloadManager {
} }
extractModelId(url) { extractModelId(url) {
const versionMatch = url.match(/modelVersionId=(\d+)/i);
this.modelVersionId = versionMatch ? versionMatch[1] : null;
const civarchiveMatch = url.match(/https?:\/\/(?:www\.)?(?:civitaiarchive|civarchive)\.com\/models\/(\d+)/i); const civarchiveMatch = url.match(/https?:\/\/(?:www\.)?(?:civitaiarchive|civarchive)\.com\/models\/(\d+)/i);
if (civarchiveMatch) { if (civarchiveMatch) {
const versionMatch = url.match(/modelVersionId=(\d+)/i);
this.modelVersionId = versionMatch ? versionMatch[1] : null;
this.source = 'civarchive'; this.source = 'civarchive';
return civarchiveMatch[1]; return civarchiveMatch[1];
} }
const civitaiMatch = url.match(/https?:\/\/(?:www\.)?civitai\.com\/models\/(\d+)/i); const { modelId, modelVersionId } = extractCivitaiModelUrlParts(url);
if (civitaiMatch) { if (modelId) {
this.modelVersionId = modelVersionId;
this.source = null; this.source = null;
return civitaiMatch[1]; return modelId;
} }
this.modelVersionId = null;
this.source = null; this.source = null;
return null; return null;
} }

View File

@@ -13,6 +13,11 @@ export const OptimizationMode = {
THUMBNAIL: 'thumbnail', THUMBNAIL: 'thumbnail',
}; };
const SUPPORTED_CIVITAI_PAGE_HOSTS = new Set([
'civitai.com',
'civitai.red',
]);
/** /**
* Rewrite Civitai preview URLs to use optimized renditions. * Rewrite Civitai preview URLs to use optimized renditions.
* Mirrors the backend's rewrite_preview_url() function from py/utils/civitai_utils.py * Mirrors the backend's rewrite_preview_url() function from py/utils/civitai_utils.py
@@ -119,3 +124,50 @@ export function isCivitaiUrl(url) {
return false; return false;
} }
} }
export function isSupportedCivitaiPageHost(hostname) {
if (!hostname) {
return false;
}
return SUPPORTED_CIVITAI_PAGE_HOSTS.has(hostname.toLowerCase());
}
export function extractCivitaiModelUrlParts(url) {
if (!url) {
return { modelId: null, modelVersionId: null };
}
try {
const parsedUrl = new URL(url);
if (!isSupportedCivitaiPageHost(parsedUrl.hostname)) {
return { modelId: null, modelVersionId: null };
}
const pathMatch = parsedUrl.pathname.match(/\/models\/(\d+)/);
const modelId = pathMatch ? pathMatch[1] : null;
const modelVersionId = parsedUrl.searchParams.get('modelVersionId');
return { modelId, modelVersionId };
} catch (e) {
return { modelId: null, modelVersionId: null };
}
}
export function extractCivitaiImageId(url) {
if (!url) {
return null;
}
try {
const parsedUrl = new URL(url);
if (!isSupportedCivitaiPageHost(parsedUrl.hostname)) {
return null;
}
const pathMatch = parsedUrl.pathname.match(/\/images\/(\d+)/);
return pathMatch ? pathMatch[1] : null;
} catch (e) {
return null;
}
}

View File

@@ -0,0 +1,21 @@
import { describe, expect, it } from 'vitest';
import { ModelContextMenuMixin } from '../../../static/js/components/ContextMenu/ModelContextMenuMixin.js';
describe('ModelContextMenuMixin.extractModelVersionId', () => {
it('accepts civitai.red model URLs', () => {
expect(
ModelContextMenuMixin.extractModelVersionId(
'https://civitai.red/models/65423/nijimecha-artstyle?modelVersionId=777'
)
).toEqual({ modelId: '65423', modelVersionId: '777' });
});
it('rejects model-like URLs from unsupported hosts', () => {
expect(
ModelContextMenuMixin.extractModelVersionId(
'https://example.com/models/65423?modelVersionId=777'
)
).toEqual({ modelId: null, modelVersionId: null });
});
});

View File

@@ -136,4 +136,14 @@ describe('DownloadManager version history badges', () => {
expect(items[1].querySelector('.local-path')?.textContent).toContain('/models/still-local.safetensors'); expect(items[1].querySelector('.local-path')?.textContent).toContain('/models/still-local.safetensors');
expect(items[1].querySelector('.downloaded-badge')).toBeNull(); expect(items[1].querySelector('.downloaded-badge')).toBeNull();
}); });
it('extracts model and version ids from civitai.red URLs', () => {
const manager = new DownloadManager();
expect(
manager.extractModelId('https://civitai.red/models/65423/nijimecha-artstyle?modelVersionId=777')
).toBe('65423');
expect(manager.modelVersionId).toBe('777');
expect(manager.source).toBeNull();
});
}); });

View File

@@ -4,7 +4,10 @@ import {
getOptimizedUrl, getOptimizedUrl,
getShowcaseUrl, getShowcaseUrl,
getThumbnailUrl, getThumbnailUrl,
extractCivitaiImageId,
extractCivitaiModelUrlParts,
isCivitaiUrl, isCivitaiUrl,
isSupportedCivitaiPageHost,
OptimizationMode OptimizationMode
} from '../../../static/js/utils/civitaiUtils.js'; } from '../../../static/js/utils/civitaiUtils.js';
@@ -217,4 +220,43 @@ describe('civitaiUtils', () => {
expect(isCivitaiUrl('not-a-url')).toBe(false); expect(isCivitaiUrl('not-a-url')).toBe(false);
}); });
}); });
describe('isSupportedCivitaiPageHost', () => {
it('accepts civitai.com and civitai.red page hosts', () => {
expect(isSupportedCivitaiPageHost('civitai.com')).toBe(true);
expect(isSupportedCivitaiPageHost('civitai.red')).toBe(true);
});
it('rejects unrelated hosts', () => {
expect(isSupportedCivitaiPageHost('www.civitai.com')).toBe(false);
expect(isSupportedCivitaiPageHost('www.civitai.red')).toBe(false);
expect(isSupportedCivitaiPageHost('example.com')).toBe(false);
expect(isSupportedCivitaiPageHost('')).toBe(false);
expect(isSupportedCivitaiPageHost(null)).toBe(false);
});
});
describe('extractCivitaiModelUrlParts', () => {
it('extracts model and version ids from civitai.red model URLs', () => {
expect(
extractCivitaiModelUrlParts('https://civitai.red/models/65423/name?modelVersionId=98765')
).toEqual({ modelId: '65423', modelVersionId: '98765' });
});
it('rejects model-like URLs from unsupported hosts', () => {
expect(
extractCivitaiModelUrlParts('https://example.com/models/65423?modelVersionId=98765')
).toEqual({ modelId: null, modelVersionId: null });
});
});
describe('extractCivitaiImageId', () => {
it('extracts image ids from civitai.red image URLs', () => {
expect(extractCivitaiImageId('https://civitai.red/images/126920345')).toBe('126920345');
});
it('rejects image-like URLs from unsupported hosts', () => {
expect(extractCivitaiImageId('https://example.com/images/126920345')).toBe(null);
});
});
}); });

View File

@@ -274,7 +274,9 @@ class StubCivitaiClient:
def __init__(self) -> None: def __init__(self) -> None:
self.image_info: Dict[str, Any] = {} self.image_info: Dict[str, Any] = {}
async def get_image_info(self, image_id: str) -> Optional[Dict[str, Any]]: async def get_image_info(
self, image_id: str, source_url: str | None = None
) -> Optional[Dict[str, Any]]:
return self.image_info.get(image_id) return self.image_info.get(image_id)
@@ -635,6 +637,58 @@ async def test_import_remote_video_recipe(monkeypatch, tmp_path: Path) -> None:
assert call["extension"] == ".mp4" assert call["extension"] == ".mp4"
async def test_import_remote_recipe_supports_civitai_red(monkeypatch, tmp_path: Path) -> None:
async def fake_get_default_metadata_provider():
return SimpleNamespace(get_model_version_info=lambda id: ({}, None))
monkeypatch.setattr(
"py.recipes.enrichment.get_default_metadata_provider",
fake_get_default_metadata_provider,
)
async with recipe_harness(monkeypatch, tmp_path) as harness:
harness.civitai.image_info["126920345"] = {
"id": 126920345,
"url": "https://image.civitai.com/x/y/original=true/sample.jpeg",
"type": "image",
}
response = await harness.client.get(
"/api/lm/recipes/import-remote",
params={
"image_url": "https://civitai.red/images/126920345",
"name": "Red Recipe",
"resources": json.dumps([]),
"base_model": "Flux",
},
)
payload = await response.json()
assert response.status == 200
assert payload["success"] is True
assert harness.downloader.urls
assert "width=450,optimized=true" in harness.downloader.urls[0]
async def test_analyze_remote_image_supports_civitai_red(
monkeypatch, tmp_path: Path
) -> None:
async with recipe_harness(monkeypatch, tmp_path) as harness:
harness.analysis.result = SimpleNamespace(payload={"loras": []}, status=200)
response = await harness.client.post(
"/api/lm/recipes/analyze-image",
json={"url": "https://civitai.red/images/126920345"},
)
payload = await response.json()
assert response.status == 200
assert payload == {"loras": []}
assert harness.analysis.remote_calls == [
"https://civitai.red/images/126920345"
]
async def test_analyze_uploaded_image_error_path(monkeypatch, tmp_path: Path) -> None: async def test_analyze_uploaded_image_error_path(monkeypatch, tmp_path: Path) -> None:
async with recipe_harness(monkeypatch, tmp_path) as harness: async with recipe_harness(monkeypatch, tmp_path) as harness:
harness.analysis.raise_for_uploaded = RecipeValidationError( harness.analysis.raise_for_uploaded = RecipeValidationError(

View File

@@ -581,6 +581,7 @@ class TestInputValidation:
assert service._validate_url("https://example.com/image.png") is True assert service._validate_url("https://example.com/image.png") is True
assert service._validate_url("http://example.com/image.png") is True assert service._validate_url("http://example.com/image.png") is True
assert service._validate_url("https://civitai.com/images/123") is True assert service._validate_url("https://civitai.com/images/123") is True
assert service._validate_url("https://civitai.red/images/123") is True
def test_validate_invalid_url(self, service): def test_validate_invalid_url(self, service):
assert service._validate_url("not-a-url") is False assert service._validate_url("not-a-url") is False

View File

@@ -530,6 +530,97 @@ async def test_get_image_info_handles_missing(monkeypatch, downloader):
assert result is None assert result is None
async def test_get_image_info_prefers_red_host_for_red_source(monkeypatch, downloader):
requested_urls = []
async def fake_make_request(method, url, use_auth=True, **kwargs):
requested_urls.append(url)
return True, {"items": [{"id": 124950237, "name": "target"}]}
downloader.make_request = fake_make_request
client = await CivitaiClient.get_instance()
result = await client.get_image_info(
"124950237", source_url="https://civitai.red/images/124950237"
)
assert result == {"id": 124950237, "name": "target"}
assert requested_urls == [
"https://civitai.red/api/v1/images?imageId=124950237&nsfw=X"
]
async def test_get_image_info_falls_back_from_com_to_red(monkeypatch, downloader):
requested_urls = []
async def fake_make_request(method, url, use_auth=True, **kwargs):
requested_urls.append(url)
if url.startswith("https://civitai.com/"):
return True, {"items": []}
return True, {"items": [{"id": 124950237, "name": "fallback"}]}
downloader.make_request = fake_make_request
client = await CivitaiClient.get_instance()
result = await client.get_image_info("124950237")
assert result == {"id": 124950237, "name": "fallback"}
assert requested_urls == [
"https://civitai.com/api/v1/images?imageId=124950237&nsfw=X",
"https://civitai.red/api/v1/images?imageId=124950237&nsfw=X",
]
async def test_get_image_info_falls_back_from_red_to_com(monkeypatch, downloader):
requested_urls = []
async def fake_make_request(method, url, use_auth=True, **kwargs):
requested_urls.append(url)
if url.startswith("https://civitai.red/"):
return True, {"items": []}
return True, {"items": [{"id": 124950237, "name": "fallback"}]}
downloader.make_request = fake_make_request
client = await CivitaiClient.get_instance()
result = await client.get_image_info(
"124950237", source_url="https://civitai.red/images/124950237"
)
assert result == {"id": 124950237, "name": "fallback"}
assert requested_urls == [
"https://civitai.red/api/v1/images?imageId=124950237&nsfw=X",
"https://civitai.com/api/v1/images?imageId=124950237&nsfw=X",
]
async def test_get_image_info_falls_back_after_request_failure(monkeypatch, downloader):
requested_urls = []
async def fake_make_request(method, url, use_auth=True, **kwargs):
requested_urls.append(url)
if url.startswith("https://civitai.red/"):
return False, "403 forbidden"
return True, {"items": [{"id": 124950237, "name": "fallback"}]}
downloader.make_request = fake_make_request
client = await CivitaiClient.get_instance()
result = await client.get_image_info(
"124950237", source_url="https://civitai.red/images/124950237"
)
assert result == {"id": 124950237, "name": "fallback"}
assert requested_urls == [
"https://civitai.red/api/v1/images?imageId=124950237&nsfw=X",
"https://civitai.com/api/v1/images?imageId=124950237&nsfw=X",
]
async def test_get_image_info_handles_invalid_id(monkeypatch, downloader, caplog): async def test_get_image_info_handles_invalid_id(monkeypatch, downloader, caplog):
"""When given a non-numeric image ID, return None and log error.""" """When given a non-numeric image ID, return None and log error."""
client = await CivitaiClient.get_instance() client = await CivitaiClient.get_instance()

View File

@@ -6,7 +6,7 @@ from types import SimpleNamespace
# We define these here to help with spec= if needed # We define these here to help with spec= if needed
class MockCivitaiClient: class MockCivitaiClient:
async def get_image_info(self, image_id): async def get_image_info(self, image_id, source_url=None):
pass pass
class MockPersistenceService: class MockPersistenceService:
@@ -119,6 +119,50 @@ async def test_repair_all_recipes_with_enriched_checkpoint_id(setup_scanner):
assert "hash" not in checkpoint assert "hash" not in checkpoint
assert "file_name" not in checkpoint assert "file_name" not in checkpoint
@pytest.mark.asyncio
async def test_repair_all_recipes_supports_civitai_red_source_url(setup_scanner):
recipe_scanner, mock_civitai_client, mock_metadata_provider = setup_scanner
recipe = {
"id": "r1",
"title": "Red Recipe",
"source_url": "https://civitai.red/images/12345",
"checkpoint": None,
"gen_params": {"prompt": ""},
}
recipe_scanner._cache = SimpleNamespace(raw_data=[recipe])
mock_civitai_client.get_image_info.return_value = {
"modelVersionId": 5678,
"meta": {"prompt": "from red"},
}
mock_metadata_provider.get_model_version_info.return_value = (
{
"id": 5678,
"modelId": 1234,
"name": "v1.0",
"model": {"name": "Full Model Name"},
"baseModel": "SDXL 1.0",
"images": [{"url": "https://image.url/thumb.jpg"}],
"files": [
{
"type": "Model",
"hashes": {"SHA256": "ABCDEF"},
"name": "full_filename.safetensors",
}
],
},
None,
)
results = await recipe_scanner.repair_all_recipes()
assert results["repaired"] == 1
mock_civitai_client.get_image_info.assert_called_with(
"12345", source_url="https://civitai.red/images/12345"
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_repair_all_recipes_with_enriched_checkpoint_hash(setup_scanner): async def test_repair_all_recipes_with_enriched_checkpoint_hash(setup_scanner):
recipe_scanner, mock_civitai_client, mock_metadata_provider = setup_scanner recipe_scanner, mock_civitai_client, mock_metadata_provider = setup_scanner

View File

@@ -678,7 +678,7 @@ async def test_analyze_remote_video(tmp_path):
) )
class DummyClient: class DummyClient:
async def get_image_info(self, image_id): async def get_image_info(self, image_id, source_url=None):
return { return {
"url": "https://civitai.com/video.mp4", "url": "https://civitai.com/video.mp4",
"type": "video", "type": "video",
@@ -698,3 +698,60 @@ async def test_analyze_remote_video(tmp_path):
assert result.payload["is_video"] is True assert result.payload["is_video"] is True
assert result.payload["extension"] == ".mp4" assert result.payload["extension"] == ".mp4"
assert result.payload["image_base64"] is not None assert result.payload["image_base64"] is not None
@pytest.mark.asyncio
async def test_analyze_remote_image_supports_civitai_red():
exif_utils = DummyExifUtils()
class DummyFactory:
def create_parser(self, metadata):
async def parse_metadata(m, recipe_scanner=None, civitai_client=None):
return {"loras": [], "gen_params": {"prompt": "red prompt"}}
return SimpleNamespace(parse_metadata=parse_metadata)
async def downloader_factory():
class Downloader:
async def download_file(self, url, path, use_auth=False):
Path(path).write_bytes(b"fake-image")
return True, "success"
return Downloader()
service = RecipeAnalysisService(
exif_utils=exif_utils,
recipe_parser_factory=DummyFactory(),
downloader_factory=downloader_factory,
metadata_collector=None,
metadata_processor_cls=None,
metadata_registry_cls=None,
standalone_mode=False,
logger=logging.getLogger("test"),
)
class DummyClient:
def __init__(self):
self.calls = []
async def get_image_info(self, image_id, source_url=None):
self.calls.append((image_id, source_url))
return {
"url": "https://image.civitai.com/x/y/original=true/sample.jpeg",
"type": "image",
"meta": {"prompt": "red prompt"},
}
class DummyScanner:
async def find_recipes_by_fingerprint(self, fingerprint):
return []
client = DummyClient()
result = await service.analyze_remote_image(
url="https://civitai.red/images/123",
recipe_scanner=DummyScanner(),
civitai_client=client,
)
assert client.calls == [("123", "https://civitai.red/images/123")]
assert result.payload["loras"] == []

View File

@@ -1,4 +1,11 @@
from py.utils.civitai_utils import build_license_flags, resolve_license_info, resolve_license_payload from py.utils.civitai_utils import (
build_license_flags,
extract_civitai_image_id,
extract_civitai_model_url_parts,
is_supported_civitai_page_host,
resolve_license_info,
resolve_license_payload,
)
def test_resolve_license_payload_defaults(): def test_resolve_license_payload_defaults():
@@ -78,3 +85,40 @@ def test_build_license_flags_parses_aggregate_inside_list():
flags = build_license_flags(source) flags = build_license_flags(source)
expected_flags = (1 << 0) | (7 << 1) | (1 << 5) expected_flags = (1 << 0) | (7 << 1) | (1 << 5)
assert flags == expected_flags assert flags == expected_flags
def test_supported_civitai_page_hosts_include_red():
assert is_supported_civitai_page_host("civitai.com") is True
assert is_supported_civitai_page_host("civitai.red") is True
assert is_supported_civitai_page_host("www.civitai.com") is False
assert is_supported_civitai_page_host("www.civitai.red") is False
assert is_supported_civitai_page_host("example.com") is False
def test_extract_civitai_model_url_parts_supports_red():
model_id, version_id = extract_civitai_model_url_parts(
"https://civitai.red/models/65423/nijimecha-artstyle?modelVersionId=777"
)
assert model_id == "65423"
assert version_id == "777"
def test_extract_civitai_model_url_parts_rejects_non_civitai_host():
model_id, version_id = extract_civitai_model_url_parts(
"https://example.com/models/65423?modelVersionId=777"
)
assert model_id is None
assert version_id is None
def test_extract_civitai_image_id_supports_red():
assert (
extract_civitai_image_id("https://civitai.red/images/126920345")
== "126920345"
)
def test_extract_civitai_image_id_rejects_non_civitai_host():
assert extract_civitai_image_id("https://example.com/images/126920345") is None