mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-05-06 16:36:45 -03:00
Compare commits
2 Commits
v1.0.3
...
af2146f96c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af2146f96c | ||
|
|
bdc8dec860 |
@@ -685,9 +685,9 @@
|
|||||||
"title": "Import a recipe from image or URL",
|
"title": "Import a recipe from image or URL",
|
||||||
"urlLocalPath": "URL / Local Path",
|
"urlLocalPath": "URL / Local Path",
|
||||||
"uploadImage": "Upload Image",
|
"uploadImage": "Upload Image",
|
||||||
"urlSectionDescription": "Input a Civitai image URL or local file path to import as a recipe.",
|
"urlSectionDescription": "Input a Civitai image URL from civitai.com or civitai.red, or a local file path, to import as a recipe.",
|
||||||
"imageUrlOrPath": "Image URL or File Path:",
|
"imageUrlOrPath": "Image URL or File Path:",
|
||||||
"urlPlaceholder": "https://civitai.com/images/... or C:/path/to/image.png",
|
"urlPlaceholder": "https://civitai.com/images/... or https://civitai.red/images/... or C:/path/to/image.png",
|
||||||
"fetchImage": "Fetch Image",
|
"fetchImage": "Fetch Image",
|
||||||
"uploadSectionDescription": "Upload an image with LoRA metadata to import as a recipe.",
|
"uploadSectionDescription": "Upload an image with LoRA metadata to import as a recipe.",
|
||||||
"selectImage": "Select Image",
|
"selectImage": "Select Image",
|
||||||
@@ -1090,9 +1090,9 @@
|
|||||||
},
|
},
|
||||||
"proceedText": "Only proceed if you're sure this is what you want.",
|
"proceedText": "Only proceed if you're sure this is what you want.",
|
||||||
"urlLabel": "Civitai Model URL:",
|
"urlLabel": "Civitai Model URL:",
|
||||||
"urlPlaceholder": "https://civitai.com/models/649516/model-name?modelVersionId=726676",
|
"urlPlaceholder": "https://civitai.com/models/649516/model-name?modelVersionId=726676 or https://civitai.red/models/649516/model-name?modelVersionId=726676",
|
||||||
"helpText": {
|
"helpText": {
|
||||||
"title": "Paste any Civitai model URL. Supported formats:",
|
"title": "Paste any Civitai model URL from civitai.com or civitai.red. Supported formats:",
|
||||||
"format1": "https://civitai.com/models/649516",
|
"format1": "https://civitai.com/models/649516",
|
||||||
"format2": "https://civitai.com/models/649516?modelVersionId=726676",
|
"format2": "https://civitai.com/models/649516?modelVersionId=726676",
|
||||||
"format3": "https://civitai.com/models/649516/model-name?modelVersionId=726676",
|
"format3": "https://civitai.com/models/649516/model-name?modelVersionId=726676",
|
||||||
|
|||||||
@@ -685,9 +685,9 @@
|
|||||||
"title": "从图片或 URL 导入配方",
|
"title": "从图片或 URL 导入配方",
|
||||||
"urlLocalPath": "URL / 本地路径",
|
"urlLocalPath": "URL / 本地路径",
|
||||||
"uploadImage": "上传图片",
|
"uploadImage": "上传图片",
|
||||||
"urlSectionDescription": "输入 Civitai 图片 URL 或本地文件路径以导入为配方。",
|
"urlSectionDescription": "输入来自 civitai.com 或 civitai.red 的 Civitai 图片 URL,或本地文件路径以导入为配方。",
|
||||||
"imageUrlOrPath": "图片 URL 或文件路径:",
|
"imageUrlOrPath": "图片 URL 或文件路径:",
|
||||||
"urlPlaceholder": "https://civitai.com/images/... 或 C:/path/to/image.png",
|
"urlPlaceholder": "https://civitai.com/images/... 或 https://civitai.red/images/... 或 C:/path/to/image.png",
|
||||||
"fetchImage": "获取图片",
|
"fetchImage": "获取图片",
|
||||||
"uploadSectionDescription": "上传带有 LoRA 元数据的图片以导入为配方。",
|
"uploadSectionDescription": "上传带有 LoRA 元数据的图片以导入为配方。",
|
||||||
"selectImage": "选择图片",
|
"selectImage": "选择图片",
|
||||||
@@ -1090,9 +1090,9 @@
|
|||||||
},
|
},
|
||||||
"proceedText": "仅在你确定需要此操作时继续。",
|
"proceedText": "仅在你确定需要此操作时继续。",
|
||||||
"urlLabel": "Civitai 模型 URL:",
|
"urlLabel": "Civitai 模型 URL:",
|
||||||
"urlPlaceholder": "https://civitai.com/models/649516/model-name?modelVersionId=726676",
|
"urlPlaceholder": "https://civitai.com/models/649516/model-name?modelVersionId=726676 或 https://civitai.red/models/649516/model-name?modelVersionId=726676",
|
||||||
"helpText": {
|
"helpText": {
|
||||||
"title": "粘贴任意 Civitai 模型 URL。支持格式:",
|
"title": "粘贴任意来自 civitai.com 或 civitai.red 的 Civitai 模型 URL。支持格式:",
|
||||||
"format1": "https://civitai.com/models/649516",
|
"format1": "https://civitai.com/models/649516",
|
||||||
"format2": "https://civitai.com/models/649516?modelVersionId=726676",
|
"format2": "https://civitai.com/models/649516?modelVersionId=726676",
|
||||||
"format3": "https://civitai.com/models/649516/model-name?modelVersionId=726676",
|
"format3": "https://civitai.com/models/649516/model-name?modelVersionId=726676",
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
import re
|
|
||||||
import os
|
import os
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
from .merger import GenParamsMerger
|
from .merger import GenParamsMerger
|
||||||
from .base import RecipeMetadataParser
|
from .base import RecipeMetadataParser
|
||||||
from ..services.metadata_service import get_default_metadata_provider
|
from ..services.metadata_service import get_default_metadata_provider
|
||||||
|
from ..utils.civitai_utils import extract_civitai_image_id
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -39,11 +39,12 @@ class RecipeEnricher:
|
|||||||
source_url = recipe.get("source_url") or recipe.get("source_path", "")
|
source_url = recipe.get("source_url") or recipe.get("source_path", "")
|
||||||
|
|
||||||
# Check if it's a Civitai image URL
|
# Check if it's a Civitai image URL
|
||||||
image_id_match = re.search(r'civitai\.com/images/(\d+)', str(source_url))
|
image_id = extract_civitai_image_id(str(source_url))
|
||||||
if image_id_match:
|
if image_id:
|
||||||
image_id = image_id_match.group(1)
|
|
||||||
try:
|
try:
|
||||||
image_info = await civitai_client.get_image_info(image_id)
|
image_info = await civitai_client.get_image_info(
|
||||||
|
image_id, source_url=str(source_url)
|
||||||
|
)
|
||||||
if image_info:
|
if image_info:
|
||||||
# Handle nested meta often found in Civitai API responses
|
# Handle nested meta often found in Civitai API responses
|
||||||
raw_meta = image_info.get("meta")
|
raw_meta = image_info.get("meta")
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ from ...services.recipes import (
|
|||||||
RecipeValidationError,
|
RecipeValidationError,
|
||||||
)
|
)
|
||||||
from ...services.metadata_service import get_default_metadata_provider
|
from ...services.metadata_service import get_default_metadata_provider
|
||||||
from ...utils.civitai_utils import rewrite_preview_url
|
from ...utils.civitai_utils import extract_civitai_image_id, rewrite_preview_url
|
||||||
from ...utils.exif_utils import ExifUtils
|
from ...utils.exif_utils import ExifUtils
|
||||||
from ...recipes.merger import GenParamsMerger
|
from ...recipes.merger import GenParamsMerger
|
||||||
from ...recipes.enrichment import RecipeEnricher
|
from ...recipes.enrichment import RecipeEnricher
|
||||||
@@ -1196,13 +1196,15 @@ class RecipeManagementHandler:
|
|||||||
temp_path = temp_file.name
|
temp_path = temp_file.name
|
||||||
download_url = image_url
|
download_url = image_url
|
||||||
image_info = None
|
image_info = None
|
||||||
civitai_match = re.match(r"https://civitai\.com/images/(\d+)", image_url)
|
civitai_image_id = extract_civitai_image_id(image_url)
|
||||||
if civitai_match:
|
if civitai_image_id:
|
||||||
if civitai_client is None:
|
if civitai_client is None:
|
||||||
raise RecipeDownloadError(
|
raise RecipeDownloadError(
|
||||||
"Civitai client unavailable for image download"
|
"Civitai client unavailable for image download"
|
||||||
)
|
)
|
||||||
image_info = await civitai_client.get_image_info(civitai_match.group(1))
|
image_info = await civitai_client.get_image_info(
|
||||||
|
civitai_image_id, source_url=image_url
|
||||||
|
)
|
||||||
if not image_info:
|
if not image_info:
|
||||||
raise RecipeDownloadError(
|
raise RecipeDownloadError(
|
||||||
"Failed to fetch image information from Civitai"
|
"Failed to fetch image information from Civitai"
|
||||||
@@ -1236,7 +1238,7 @@ class RecipeManagementHandler:
|
|||||||
return (
|
return (
|
||||||
file_obj.read(),
|
file_obj.read(),
|
||||||
extension,
|
extension,
|
||||||
image_info.get("meta") if civitai_match and image_info else None,
|
image_info.get("meta") if civitai_image_id and image_info else None,
|
||||||
)
|
)
|
||||||
except RecipeDownloadError:
|
except RecipeDownloadError:
|
||||||
raise
|
raise
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ from .model_metadata_provider import (
|
|||||||
)
|
)
|
||||||
from .downloader import get_downloader
|
from .downloader import get_downloader
|
||||||
from .errors import RateLimitError, ResourceNotFoundError
|
from .errors import RateLimitError, ResourceNotFoundError
|
||||||
from ..utils.civitai_utils import resolve_license_payload
|
from ..utils.civitai_utils import extract_civitai_page_host, resolve_license_payload
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -40,6 +40,23 @@ class CivitaiClient:
|
|||||||
self._initialized = True
|
self._initialized = True
|
||||||
|
|
||||||
self.base_url = "https://civitai.com/api/v1"
|
self.base_url = "https://civitai.com/api/v1"
|
||||||
|
self._image_info_api_hosts = ("civitai.com", "civitai.red")
|
||||||
|
|
||||||
|
def _build_image_info_url(self, host: str, image_id: str) -> str:
|
||||||
|
return f"https://{host}/api/v1/images?imageId={image_id}&nsfw=X"
|
||||||
|
|
||||||
|
def _resolve_image_info_hosts(self, source_url: str | None) -> List[str]:
|
||||||
|
preferred_host = extract_civitai_page_host(source_url)
|
||||||
|
if preferred_host in self._image_info_api_hosts:
|
||||||
|
return [
|
||||||
|
preferred_host,
|
||||||
|
*[
|
||||||
|
host
|
||||||
|
for host in self._image_info_api_hosts
|
||||||
|
if host != preferred_host
|
||||||
|
],
|
||||||
|
]
|
||||||
|
return list(self._image_info_api_hosts)
|
||||||
|
|
||||||
async def _make_request(
|
async def _make_request(
|
||||||
self,
|
self,
|
||||||
@@ -479,48 +496,106 @@ class CivitaiClient:
|
|||||||
logger.error(error_msg)
|
logger.error(error_msg)
|
||||||
return None, error_msg
|
return None, error_msg
|
||||||
|
|
||||||
async def get_image_info(self, image_id: str) -> Optional[Dict]:
|
async def get_image_info(
|
||||||
|
self, image_id: str, source_url: str | None = None
|
||||||
|
) -> Optional[Dict]:
|
||||||
"""Fetch image information from Civitai API
|
"""Fetch image information from Civitai API
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
image_id: The Civitai image ID
|
image_id: The Civitai image ID
|
||||||
|
source_url: Optional original image page URL used to prioritize
|
||||||
|
``civitai.com`` vs ``civitai.red`` image lookups.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Optional[Dict]: The image data or None if not found
|
Optional[Dict]: The image data or None if not found
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
url = f"{self.base_url}/images?imageId={image_id}&nsfw=X"
|
|
||||||
requested_id = int(image_id)
|
requested_id = int(image_id)
|
||||||
|
candidate_hosts = self._resolve_image_info_hosts(source_url)
|
||||||
|
last_error: Any = None
|
||||||
|
logger.debug(
|
||||||
|
"Fetching image info for ID %s with host order %s",
|
||||||
|
image_id,
|
||||||
|
candidate_hosts,
|
||||||
|
)
|
||||||
|
|
||||||
logger.debug(f"Fetching image info for ID: {image_id}")
|
for index, host in enumerate(candidate_hosts):
|
||||||
success, result = await self._make_request("GET", url, use_auth=True)
|
url = self._build_image_info_url(host, image_id)
|
||||||
|
success, result = await self._make_request("GET", url, use_auth=True)
|
||||||
|
|
||||||
if success:
|
if not success:
|
||||||
if result and "items" in result and isinstance(result["items"], list):
|
last_error = result
|
||||||
items = result["items"]
|
if index < len(candidate_hosts) - 1:
|
||||||
|
logger.warning(
|
||||||
|
"Failed to fetch image info for ID %s from %s: %s. Trying fallback host.",
|
||||||
|
image_id,
|
||||||
|
host,
|
||||||
|
result,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
# First, try to find the item with matching ID
|
logger.error(
|
||||||
for item in items:
|
"Failed to fetch image info for ID %s from %s: %s",
|
||||||
if isinstance(item, dict) and item.get("id") == requested_id:
|
image_id,
|
||||||
logger.debug(f"Successfully fetched image info for ID: {image_id}")
|
host,
|
||||||
return item
|
result,
|
||||||
|
|
||||||
# No matching ID found - log warning with details about returned items
|
|
||||||
returned_ids = [
|
|
||||||
item.get("id") for item in items
|
|
||||||
if isinstance(item, dict) and "id" in item
|
|
||||||
]
|
|
||||||
logger.warning(
|
|
||||||
f"CivitAI API returned no matching image for requested ID {image_id}. "
|
|
||||||
f"Returned {len(items)} item(s) with IDs: {returned_ids}. "
|
|
||||||
f"This may indicate the image was deleted, hidden, or there is a database lag."
|
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
logger.warning(f"No image found with ID: {image_id}")
|
if result and "items" in result and isinstance(result["items"], list):
|
||||||
|
items = result["items"]
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
if isinstance(item, dict) and item.get("id") == requested_id:
|
||||||
|
logger.debug(
|
||||||
|
"Successfully fetched image info for ID %s from %s",
|
||||||
|
image_id,
|
||||||
|
host,
|
||||||
|
)
|
||||||
|
return item
|
||||||
|
|
||||||
|
returned_ids = [
|
||||||
|
item.get("id")
|
||||||
|
for item in items
|
||||||
|
if isinstance(item, dict) and "id" in item
|
||||||
|
]
|
||||||
|
|
||||||
|
if index < len(candidate_hosts) - 1:
|
||||||
|
logger.info(
|
||||||
|
"No matching image for requested ID %s from %s; trying fallback host. Returned %d item(s) with IDs: %s",
|
||||||
|
image_id,
|
||||||
|
host,
|
||||||
|
len(items),
|
||||||
|
returned_ids,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.warning(
|
||||||
|
"CivitAI API returned no matching image for requested ID %s from %s. Returned %d item(s) with IDs: %s. This may indicate the image was deleted, hidden, or there is a database lag.",
|
||||||
|
image_id,
|
||||||
|
host,
|
||||||
|
len(items),
|
||||||
|
returned_ids,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
if index < len(candidate_hosts) - 1:
|
||||||
|
logger.info(
|
||||||
|
"No image found with ID %s from %s; trying fallback host",
|
||||||
|
image_id,
|
||||||
|
host,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.warning("No image found with ID: %s", image_id)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
logger.error(f"Failed to fetch image info for ID: {image_id}: {result}")
|
if last_error is not None:
|
||||||
|
logger.error(
|
||||||
|
"Failed to fetch image info for ID %s from all candidate hosts: %s",
|
||||||
|
image_id,
|
||||||
|
last_error,
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
except RateLimitError:
|
except RateLimitError:
|
||||||
raise
|
raise
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ from __future__ import annotations
|
|||||||
import base64
|
import base64
|
||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
import tempfile
|
import tempfile
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any, Callable, Optional
|
from typing import Any, Callable, Optional
|
||||||
@@ -14,7 +13,7 @@ import numpy as np
|
|||||||
from PIL import Image
|
from PIL import Image
|
||||||
|
|
||||||
from ...utils.utils import calculate_recipe_fingerprint
|
from ...utils.utils import calculate_recipe_fingerprint
|
||||||
from ...utils.civitai_utils import rewrite_preview_url
|
from ...utils.civitai_utils import extract_civitai_image_id, rewrite_preview_url
|
||||||
from .errors import (
|
from .errors import (
|
||||||
RecipeDownloadError,
|
RecipeDownloadError,
|
||||||
RecipeNotFoundError,
|
RecipeNotFoundError,
|
||||||
@@ -104,9 +103,11 @@ class RecipeAnalysisService:
|
|||||||
extension = ".jpg" # Default
|
extension = ".jpg" # Default
|
||||||
|
|
||||||
try:
|
try:
|
||||||
civitai_match = re.match(r"https://civitai\.com/images/(\d+)", url)
|
civitai_image_id = extract_civitai_image_id(url)
|
||||||
if civitai_match:
|
if civitai_image_id:
|
||||||
image_info = await civitai_client.get_image_info(civitai_match.group(1))
|
image_info = await civitai_client.get_image_info(
|
||||||
|
civitai_image_id, source_url=url
|
||||||
|
)
|
||||||
if not image_info:
|
if not image_info:
|
||||||
raise RecipeDownloadError(
|
raise RecipeDownloadError(
|
||||||
"Failed to fetch image information from Civitai"
|
"Failed to fetch image information from Civitai"
|
||||||
|
|||||||
@@ -2,10 +2,12 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
from typing import Any, Dict, Iterable, Mapping, Sequence
|
from typing import Any, Dict, Iterable, Mapping, Sequence
|
||||||
from urllib.parse import urlparse, urlunparse
|
from urllib.parse import parse_qs, urlparse, urlunparse
|
||||||
|
|
||||||
|
|
||||||
|
_SUPPORTED_CIVITAI_PAGE_HOSTS = frozenset({"civitai.com", "civitai.red"})
|
||||||
_DEFAULT_ALLOW_COMMERCIAL_USE: Sequence[str] = ("Sell",)
|
_DEFAULT_ALLOW_COMMERCIAL_USE: Sequence[str] = ("Sell",)
|
||||||
_LICENSE_DEFAULTS: Dict[str, Any] = {
|
_LICENSE_DEFAULTS: Dict[str, Any] = {
|
||||||
"allowNoCredit": True,
|
"allowNoCredit": True,
|
||||||
@@ -17,6 +19,77 @@ _COMMERCIAL_ALLOWED_VALUES = {"sell", "rent", "rentcivit", "image"}
|
|||||||
_COMMERCIAL_SHIFT = 1
|
_COMMERCIAL_SHIFT = 1
|
||||||
|
|
||||||
|
|
||||||
|
def is_supported_civitai_page_host(hostname: str | None) -> bool:
|
||||||
|
"""Return whether the hostname is a supported Civitai page domain."""
|
||||||
|
|
||||||
|
if not hostname:
|
||||||
|
return False
|
||||||
|
return hostname.lower() in _SUPPORTED_CIVITAI_PAGE_HOSTS
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_supported_civitai_page_url(url: str | None):
|
||||||
|
if not url:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
parsed = urlparse(url)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if parsed.scheme not in {"http", "https"}:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not is_supported_civitai_page_host(parsed.hostname):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
|
||||||
|
def extract_civitai_model_url_parts(
|
||||||
|
url: str | None,
|
||||||
|
) -> tuple[str | None, str | None]:
|
||||||
|
"""Extract model and version identifiers from a supported Civitai model URL."""
|
||||||
|
|
||||||
|
parsed = _parse_supported_civitai_page_url(url)
|
||||||
|
if parsed is None:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
path_match = re.search(r"/models/(\d+)", parsed.path)
|
||||||
|
if not path_match:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
model_id = path_match.group(1)
|
||||||
|
|
||||||
|
query_params = parse_qs(parsed.query)
|
||||||
|
version_values = query_params.get("modelVersionId") or []
|
||||||
|
version_id = version_values[0] if version_values else None
|
||||||
|
return model_id, version_id
|
||||||
|
|
||||||
|
|
||||||
|
def extract_civitai_image_id(url: str | None) -> str | None:
|
||||||
|
"""Extract the image identifier from a supported Civitai image page URL."""
|
||||||
|
|
||||||
|
parsed = _parse_supported_civitai_page_url(url)
|
||||||
|
if parsed is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
path_match = re.search(r"/images/(\d+)", parsed.path)
|
||||||
|
if not path_match:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return path_match.group(1)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_civitai_page_host(url: str | None) -> str | None:
|
||||||
|
"""Extract the supported Civitai page host from a URL."""
|
||||||
|
|
||||||
|
parsed = _parse_supported_civitai_page_url(url)
|
||||||
|
if parsed is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return parsed.hostname.lower() if parsed.hostname else None
|
||||||
|
|
||||||
|
|
||||||
def _normalize_commercial_values(value: Any) -> Sequence[str]:
|
def _normalize_commercial_values(value: Any) -> Sequence[str]:
|
||||||
"""Return a normalized list of commercial permissions preserving source values."""
|
"""Return a normalized list of commercial permissions preserving source values."""
|
||||||
|
|
||||||
@@ -199,6 +272,10 @@ def rewrite_preview_url(
|
|||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"build_license_flags",
|
"build_license_flags",
|
||||||
|
"extract_civitai_image_id",
|
||||||
|
"extract_civitai_page_host",
|
||||||
|
"extract_civitai_model_url_parts",
|
||||||
|
"is_supported_civitai_page_host",
|
||||||
"resolve_license_payload",
|
"resolve_license_payload",
|
||||||
"resolve_license_info",
|
"resolve_license_info",
|
||||||
"rewrite_preview_url",
|
"rewrite_preview_url",
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import { bulkManager } from '../../managers/BulkManager.js';
|
|||||||
import { MODEL_CONFIG } from '../../api/apiConfig.js';
|
import { MODEL_CONFIG } from '../../api/apiConfig.js';
|
||||||
import { translate } from '../../utils/i18nHelpers.js';
|
import { translate } from '../../utils/i18nHelpers.js';
|
||||||
import { getNsfwLevelSelector } from '../shared/NsfwLevelSelector.js';
|
import { getNsfwLevelSelector } from '../shared/NsfwLevelSelector.js';
|
||||||
|
import { extractCivitaiModelUrlParts } from '../../utils/civitaiUtils.js';
|
||||||
|
|
||||||
// Mixin with shared functionality for LoraContextMenu and CheckpointContextMenu
|
// Mixin with shared functionality for LoraContextMenu and CheckpointContextMenu
|
||||||
export const ModelContextMenuMixin = {
|
export const ModelContextMenuMixin = {
|
||||||
@@ -154,25 +155,7 @@ export const ModelContextMenuMixin = {
|
|||||||
},
|
},
|
||||||
|
|
||||||
extractModelVersionId(url) {
|
extractModelVersionId(url) {
|
||||||
try {
|
return extractCivitaiModelUrlParts(url);
|
||||||
// Handle all three URL formats:
|
|
||||||
// 1. https://civitai.com/models/649516
|
|
||||||
// 2. https://civitai.com/models/649516?modelVersionId=726676
|
|
||||||
// 3. https://civitai.com/models/649516/cynthia-pokemon-diamond-and-pearl-pdxl-lora?modelVersionId=726676
|
|
||||||
|
|
||||||
const parsedUrl = new URL(url);
|
|
||||||
|
|
||||||
// Extract model ID from path
|
|
||||||
const pathMatch = parsedUrl.pathname.match(/\/models\/(\d+)/);
|
|
||||||
const modelId = pathMatch ? pathMatch[1] : null;
|
|
||||||
|
|
||||||
// Extract model version ID from query parameters
|
|
||||||
const modelVersionId = parsedUrl.searchParams.get('modelVersionId');
|
|
||||||
|
|
||||||
return { modelId, modelVersionId };
|
|
||||||
} catch (e) {
|
|
||||||
return { modelId: null, modelVersionId: null };
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
|
|
||||||
parseModelId(value) {
|
parseModelId(value) {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import { getModelApiClient, resetAndReload } from '../api/modelApiFactory.js';
|
|||||||
import { getStorageItem, setStorageItem } from '../utils/storageHelpers.js';
|
import { getStorageItem, setStorageItem } from '../utils/storageHelpers.js';
|
||||||
import { FolderTreeManager } from '../components/FolderTreeManager.js';
|
import { FolderTreeManager } from '../components/FolderTreeManager.js';
|
||||||
import { translate } from '../utils/i18nHelpers.js';
|
import { translate } from '../utils/i18nHelpers.js';
|
||||||
|
import { extractCivitaiModelUrlParts } from '../utils/civitaiUtils.js';
|
||||||
|
|
||||||
export class DownloadManager {
|
export class DownloadManager {
|
||||||
constructor() {
|
constructor() {
|
||||||
@@ -197,21 +198,22 @@ export class DownloadManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
extractModelId(url) {
|
extractModelId(url) {
|
||||||
const versionMatch = url.match(/modelVersionId=(\d+)/i);
|
|
||||||
this.modelVersionId = versionMatch ? versionMatch[1] : null;
|
|
||||||
|
|
||||||
const civarchiveMatch = url.match(/https?:\/\/(?:www\.)?(?:civitaiarchive|civarchive)\.com\/models\/(\d+)/i);
|
const civarchiveMatch = url.match(/https?:\/\/(?:www\.)?(?:civitaiarchive|civarchive)\.com\/models\/(\d+)/i);
|
||||||
if (civarchiveMatch) {
|
if (civarchiveMatch) {
|
||||||
|
const versionMatch = url.match(/modelVersionId=(\d+)/i);
|
||||||
|
this.modelVersionId = versionMatch ? versionMatch[1] : null;
|
||||||
this.source = 'civarchive';
|
this.source = 'civarchive';
|
||||||
return civarchiveMatch[1];
|
return civarchiveMatch[1];
|
||||||
}
|
}
|
||||||
|
|
||||||
const civitaiMatch = url.match(/https?:\/\/(?:www\.)?civitai\.com\/models\/(\d+)/i);
|
const { modelId, modelVersionId } = extractCivitaiModelUrlParts(url);
|
||||||
if (civitaiMatch) {
|
if (modelId) {
|
||||||
|
this.modelVersionId = modelVersionId;
|
||||||
this.source = null;
|
this.source = null;
|
||||||
return civitaiMatch[1];
|
return modelId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.modelVersionId = null;
|
||||||
this.source = null;
|
this.source = null;
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,11 @@ export const OptimizationMode = {
|
|||||||
THUMBNAIL: 'thumbnail',
|
THUMBNAIL: 'thumbnail',
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const SUPPORTED_CIVITAI_PAGE_HOSTS = new Set([
|
||||||
|
'civitai.com',
|
||||||
|
'civitai.red',
|
||||||
|
]);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rewrite Civitai preview URLs to use optimized renditions.
|
* Rewrite Civitai preview URLs to use optimized renditions.
|
||||||
* Mirrors the backend's rewrite_preview_url() function from py/utils/civitai_utils.py
|
* Mirrors the backend's rewrite_preview_url() function from py/utils/civitai_utils.py
|
||||||
@@ -119,3 +124,50 @@ export function isCivitaiUrl(url) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function isSupportedCivitaiPageHost(hostname) {
|
||||||
|
if (!hostname) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return SUPPORTED_CIVITAI_PAGE_HOSTS.has(hostname.toLowerCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
export function extractCivitaiModelUrlParts(url) {
|
||||||
|
if (!url) {
|
||||||
|
return { modelId: null, modelVersionId: null };
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const parsedUrl = new URL(url);
|
||||||
|
if (!isSupportedCivitaiPageHost(parsedUrl.hostname)) {
|
||||||
|
return { modelId: null, modelVersionId: null };
|
||||||
|
}
|
||||||
|
|
||||||
|
const pathMatch = parsedUrl.pathname.match(/\/models\/(\d+)/);
|
||||||
|
const modelId = pathMatch ? pathMatch[1] : null;
|
||||||
|
const modelVersionId = parsedUrl.searchParams.get('modelVersionId');
|
||||||
|
|
||||||
|
return { modelId, modelVersionId };
|
||||||
|
} catch (e) {
|
||||||
|
return { modelId: null, modelVersionId: null };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function extractCivitaiImageId(url) {
|
||||||
|
if (!url) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const parsedUrl = new URL(url);
|
||||||
|
if (!isSupportedCivitaiPageHost(parsedUrl.hostname)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const pathMatch = parsedUrl.pathname.match(/\/images\/(\d+)/);
|
||||||
|
return pathMatch ? pathMatch[1] : null;
|
||||||
|
} catch (e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
21
tests/frontend/components/modelContextMenuMixin.test.js
Normal file
21
tests/frontend/components/modelContextMenuMixin.test.js
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
import { describe, expect, it } from 'vitest';
|
||||||
|
|
||||||
|
import { ModelContextMenuMixin } from '../../../static/js/components/ContextMenu/ModelContextMenuMixin.js';
|
||||||
|
|
||||||
|
describe('ModelContextMenuMixin.extractModelVersionId', () => {
|
||||||
|
it('accepts civitai.red model URLs', () => {
|
||||||
|
expect(
|
||||||
|
ModelContextMenuMixin.extractModelVersionId(
|
||||||
|
'https://civitai.red/models/65423/nijimecha-artstyle?modelVersionId=777'
|
||||||
|
)
|
||||||
|
).toEqual({ modelId: '65423', modelVersionId: '777' });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects model-like URLs from unsupported hosts', () => {
|
||||||
|
expect(
|
||||||
|
ModelContextMenuMixin.extractModelVersionId(
|
||||||
|
'https://example.com/models/65423?modelVersionId=777'
|
||||||
|
)
|
||||||
|
).toEqual({ modelId: null, modelVersionId: null });
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -136,4 +136,14 @@ describe('DownloadManager version history badges', () => {
|
|||||||
expect(items[1].querySelector('.local-path')?.textContent).toContain('/models/still-local.safetensors');
|
expect(items[1].querySelector('.local-path')?.textContent).toContain('/models/still-local.safetensors');
|
||||||
expect(items[1].querySelector('.downloaded-badge')).toBeNull();
|
expect(items[1].querySelector('.downloaded-badge')).toBeNull();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('extracts model and version ids from civitai.red URLs', () => {
|
||||||
|
const manager = new DownloadManager();
|
||||||
|
|
||||||
|
expect(
|
||||||
|
manager.extractModelId('https://civitai.red/models/65423/nijimecha-artstyle?modelVersionId=777')
|
||||||
|
).toBe('65423');
|
||||||
|
expect(manager.modelVersionId).toBe('777');
|
||||||
|
expect(manager.source).toBeNull();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -4,7 +4,10 @@ import {
|
|||||||
getOptimizedUrl,
|
getOptimizedUrl,
|
||||||
getShowcaseUrl,
|
getShowcaseUrl,
|
||||||
getThumbnailUrl,
|
getThumbnailUrl,
|
||||||
|
extractCivitaiImageId,
|
||||||
|
extractCivitaiModelUrlParts,
|
||||||
isCivitaiUrl,
|
isCivitaiUrl,
|
||||||
|
isSupportedCivitaiPageHost,
|
||||||
OptimizationMode
|
OptimizationMode
|
||||||
} from '../../../static/js/utils/civitaiUtils.js';
|
} from '../../../static/js/utils/civitaiUtils.js';
|
||||||
|
|
||||||
@@ -217,4 +220,43 @@ describe('civitaiUtils', () => {
|
|||||||
expect(isCivitaiUrl('not-a-url')).toBe(false);
|
expect(isCivitaiUrl('not-a-url')).toBe(false);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('isSupportedCivitaiPageHost', () => {
|
||||||
|
it('accepts civitai.com and civitai.red page hosts', () => {
|
||||||
|
expect(isSupportedCivitaiPageHost('civitai.com')).toBe(true);
|
||||||
|
expect(isSupportedCivitaiPageHost('civitai.red')).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects unrelated hosts', () => {
|
||||||
|
expect(isSupportedCivitaiPageHost('www.civitai.com')).toBe(false);
|
||||||
|
expect(isSupportedCivitaiPageHost('www.civitai.red')).toBe(false);
|
||||||
|
expect(isSupportedCivitaiPageHost('example.com')).toBe(false);
|
||||||
|
expect(isSupportedCivitaiPageHost('')).toBe(false);
|
||||||
|
expect(isSupportedCivitaiPageHost(null)).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('extractCivitaiModelUrlParts', () => {
|
||||||
|
it('extracts model and version ids from civitai.red model URLs', () => {
|
||||||
|
expect(
|
||||||
|
extractCivitaiModelUrlParts('https://civitai.red/models/65423/name?modelVersionId=98765')
|
||||||
|
).toEqual({ modelId: '65423', modelVersionId: '98765' });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects model-like URLs from unsupported hosts', () => {
|
||||||
|
expect(
|
||||||
|
extractCivitaiModelUrlParts('https://example.com/models/65423?modelVersionId=98765')
|
||||||
|
).toEqual({ modelId: null, modelVersionId: null });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('extractCivitaiImageId', () => {
|
||||||
|
it('extracts image ids from civitai.red image URLs', () => {
|
||||||
|
expect(extractCivitaiImageId('https://civitai.red/images/126920345')).toBe('126920345');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects image-like URLs from unsupported hosts', () => {
|
||||||
|
expect(extractCivitaiImageId('https://example.com/images/126920345')).toBe(null);
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -274,7 +274,9 @@ class StubCivitaiClient:
|
|||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.image_info: Dict[str, Any] = {}
|
self.image_info: Dict[str, Any] = {}
|
||||||
|
|
||||||
async def get_image_info(self, image_id: str) -> Optional[Dict[str, Any]]:
|
async def get_image_info(
|
||||||
|
self, image_id: str, source_url: str | None = None
|
||||||
|
) -> Optional[Dict[str, Any]]:
|
||||||
return self.image_info.get(image_id)
|
return self.image_info.get(image_id)
|
||||||
|
|
||||||
|
|
||||||
@@ -635,6 +637,58 @@ async def test_import_remote_video_recipe(monkeypatch, tmp_path: Path) -> None:
|
|||||||
assert call["extension"] == ".mp4"
|
assert call["extension"] == ".mp4"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_import_remote_recipe_supports_civitai_red(monkeypatch, tmp_path: Path) -> None:
|
||||||
|
async def fake_get_default_metadata_provider():
|
||||||
|
return SimpleNamespace(get_model_version_info=lambda id: ({}, None))
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"py.recipes.enrichment.get_default_metadata_provider",
|
||||||
|
fake_get_default_metadata_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with recipe_harness(monkeypatch, tmp_path) as harness:
|
||||||
|
harness.civitai.image_info["126920345"] = {
|
||||||
|
"id": 126920345,
|
||||||
|
"url": "https://image.civitai.com/x/y/original=true/sample.jpeg",
|
||||||
|
"type": "image",
|
||||||
|
}
|
||||||
|
|
||||||
|
response = await harness.client.get(
|
||||||
|
"/api/lm/recipes/import-remote",
|
||||||
|
params={
|
||||||
|
"image_url": "https://civitai.red/images/126920345",
|
||||||
|
"name": "Red Recipe",
|
||||||
|
"resources": json.dumps([]),
|
||||||
|
"base_model": "Flux",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
payload = await response.json()
|
||||||
|
assert response.status == 200
|
||||||
|
assert payload["success"] is True
|
||||||
|
assert harness.downloader.urls
|
||||||
|
assert "width=450,optimized=true" in harness.downloader.urls[0]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_analyze_remote_image_supports_civitai_red(
|
||||||
|
monkeypatch, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
async with recipe_harness(monkeypatch, tmp_path) as harness:
|
||||||
|
harness.analysis.result = SimpleNamespace(payload={"loras": []}, status=200)
|
||||||
|
|
||||||
|
response = await harness.client.post(
|
||||||
|
"/api/lm/recipes/analyze-image",
|
||||||
|
json={"url": "https://civitai.red/images/126920345"},
|
||||||
|
)
|
||||||
|
payload = await response.json()
|
||||||
|
|
||||||
|
assert response.status == 200
|
||||||
|
assert payload == {"loras": []}
|
||||||
|
assert harness.analysis.remote_calls == [
|
||||||
|
"https://civitai.red/images/126920345"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
async def test_analyze_uploaded_image_error_path(monkeypatch, tmp_path: Path) -> None:
|
async def test_analyze_uploaded_image_error_path(monkeypatch, tmp_path: Path) -> None:
|
||||||
async with recipe_harness(monkeypatch, tmp_path) as harness:
|
async with recipe_harness(monkeypatch, tmp_path) as harness:
|
||||||
harness.analysis.raise_for_uploaded = RecipeValidationError(
|
harness.analysis.raise_for_uploaded = RecipeValidationError(
|
||||||
|
|||||||
@@ -581,6 +581,7 @@ class TestInputValidation:
|
|||||||
assert service._validate_url("https://example.com/image.png") is True
|
assert service._validate_url("https://example.com/image.png") is True
|
||||||
assert service._validate_url("http://example.com/image.png") is True
|
assert service._validate_url("http://example.com/image.png") is True
|
||||||
assert service._validate_url("https://civitai.com/images/123") is True
|
assert service._validate_url("https://civitai.com/images/123") is True
|
||||||
|
assert service._validate_url("https://civitai.red/images/123") is True
|
||||||
|
|
||||||
def test_validate_invalid_url(self, service):
|
def test_validate_invalid_url(self, service):
|
||||||
assert service._validate_url("not-a-url") is False
|
assert service._validate_url("not-a-url") is False
|
||||||
|
|||||||
@@ -530,6 +530,97 @@ async def test_get_image_info_handles_missing(monkeypatch, downloader):
|
|||||||
assert result is None
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_image_info_prefers_red_host_for_red_source(monkeypatch, downloader):
|
||||||
|
requested_urls = []
|
||||||
|
|
||||||
|
async def fake_make_request(method, url, use_auth=True, **kwargs):
|
||||||
|
requested_urls.append(url)
|
||||||
|
return True, {"items": [{"id": 124950237, "name": "target"}]}
|
||||||
|
|
||||||
|
downloader.make_request = fake_make_request
|
||||||
|
|
||||||
|
client = await CivitaiClient.get_instance()
|
||||||
|
|
||||||
|
result = await client.get_image_info(
|
||||||
|
"124950237", source_url="https://civitai.red/images/124950237"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result == {"id": 124950237, "name": "target"}
|
||||||
|
assert requested_urls == [
|
||||||
|
"https://civitai.red/api/v1/images?imageId=124950237&nsfw=X"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_image_info_falls_back_from_com_to_red(monkeypatch, downloader):
|
||||||
|
requested_urls = []
|
||||||
|
|
||||||
|
async def fake_make_request(method, url, use_auth=True, **kwargs):
|
||||||
|
requested_urls.append(url)
|
||||||
|
if url.startswith("https://civitai.com/"):
|
||||||
|
return True, {"items": []}
|
||||||
|
return True, {"items": [{"id": 124950237, "name": "fallback"}]}
|
||||||
|
|
||||||
|
downloader.make_request = fake_make_request
|
||||||
|
|
||||||
|
client = await CivitaiClient.get_instance()
|
||||||
|
|
||||||
|
result = await client.get_image_info("124950237")
|
||||||
|
|
||||||
|
assert result == {"id": 124950237, "name": "fallback"}
|
||||||
|
assert requested_urls == [
|
||||||
|
"https://civitai.com/api/v1/images?imageId=124950237&nsfw=X",
|
||||||
|
"https://civitai.red/api/v1/images?imageId=124950237&nsfw=X",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_image_info_falls_back_from_red_to_com(monkeypatch, downloader):
|
||||||
|
requested_urls = []
|
||||||
|
|
||||||
|
async def fake_make_request(method, url, use_auth=True, **kwargs):
|
||||||
|
requested_urls.append(url)
|
||||||
|
if url.startswith("https://civitai.red/"):
|
||||||
|
return True, {"items": []}
|
||||||
|
return True, {"items": [{"id": 124950237, "name": "fallback"}]}
|
||||||
|
|
||||||
|
downloader.make_request = fake_make_request
|
||||||
|
|
||||||
|
client = await CivitaiClient.get_instance()
|
||||||
|
|
||||||
|
result = await client.get_image_info(
|
||||||
|
"124950237", source_url="https://civitai.red/images/124950237"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result == {"id": 124950237, "name": "fallback"}
|
||||||
|
assert requested_urls == [
|
||||||
|
"https://civitai.red/api/v1/images?imageId=124950237&nsfw=X",
|
||||||
|
"https://civitai.com/api/v1/images?imageId=124950237&nsfw=X",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_image_info_falls_back_after_request_failure(monkeypatch, downloader):
|
||||||
|
requested_urls = []
|
||||||
|
|
||||||
|
async def fake_make_request(method, url, use_auth=True, **kwargs):
|
||||||
|
requested_urls.append(url)
|
||||||
|
if url.startswith("https://civitai.red/"):
|
||||||
|
return False, "403 forbidden"
|
||||||
|
return True, {"items": [{"id": 124950237, "name": "fallback"}]}
|
||||||
|
|
||||||
|
downloader.make_request = fake_make_request
|
||||||
|
|
||||||
|
client = await CivitaiClient.get_instance()
|
||||||
|
|
||||||
|
result = await client.get_image_info(
|
||||||
|
"124950237", source_url="https://civitai.red/images/124950237"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result == {"id": 124950237, "name": "fallback"}
|
||||||
|
assert requested_urls == [
|
||||||
|
"https://civitai.red/api/v1/images?imageId=124950237&nsfw=X",
|
||||||
|
"https://civitai.com/api/v1/images?imageId=124950237&nsfw=X",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
async def test_get_image_info_handles_invalid_id(monkeypatch, downloader, caplog):
|
async def test_get_image_info_handles_invalid_id(monkeypatch, downloader, caplog):
|
||||||
"""When given a non-numeric image ID, return None and log error."""
|
"""When given a non-numeric image ID, return None and log error."""
|
||||||
client = await CivitaiClient.get_instance()
|
client = await CivitaiClient.get_instance()
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from types import SimpleNamespace
|
|||||||
|
|
||||||
# We define these here to help with spec= if needed
|
# We define these here to help with spec= if needed
|
||||||
class MockCivitaiClient:
|
class MockCivitaiClient:
|
||||||
async def get_image_info(self, image_id):
|
async def get_image_info(self, image_id, source_url=None):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class MockPersistenceService:
|
class MockPersistenceService:
|
||||||
@@ -119,6 +119,50 @@ async def test_repair_all_recipes_with_enriched_checkpoint_id(setup_scanner):
|
|||||||
assert "hash" not in checkpoint
|
assert "hash" not in checkpoint
|
||||||
assert "file_name" not in checkpoint
|
assert "file_name" not in checkpoint
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_repair_all_recipes_supports_civitai_red_source_url(setup_scanner):
|
||||||
|
recipe_scanner, mock_civitai_client, mock_metadata_provider = setup_scanner
|
||||||
|
|
||||||
|
recipe = {
|
||||||
|
"id": "r1",
|
||||||
|
"title": "Red Recipe",
|
||||||
|
"source_url": "https://civitai.red/images/12345",
|
||||||
|
"checkpoint": None,
|
||||||
|
"gen_params": {"prompt": ""},
|
||||||
|
}
|
||||||
|
recipe_scanner._cache = SimpleNamespace(raw_data=[recipe])
|
||||||
|
|
||||||
|
mock_civitai_client.get_image_info.return_value = {
|
||||||
|
"modelVersionId": 5678,
|
||||||
|
"meta": {"prompt": "from red"},
|
||||||
|
}
|
||||||
|
mock_metadata_provider.get_model_version_info.return_value = (
|
||||||
|
{
|
||||||
|
"id": 5678,
|
||||||
|
"modelId": 1234,
|
||||||
|
"name": "v1.0",
|
||||||
|
"model": {"name": "Full Model Name"},
|
||||||
|
"baseModel": "SDXL 1.0",
|
||||||
|
"images": [{"url": "https://image.url/thumb.jpg"}],
|
||||||
|
"files": [
|
||||||
|
{
|
||||||
|
"type": "Model",
|
||||||
|
"hashes": {"SHA256": "ABCDEF"},
|
||||||
|
"name": "full_filename.safetensors",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
|
||||||
|
results = await recipe_scanner.repair_all_recipes()
|
||||||
|
|
||||||
|
assert results["repaired"] == 1
|
||||||
|
mock_civitai_client.get_image_info.assert_called_with(
|
||||||
|
"12345", source_url="https://civitai.red/images/12345"
|
||||||
|
)
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_repair_all_recipes_with_enriched_checkpoint_hash(setup_scanner):
|
async def test_repair_all_recipes_with_enriched_checkpoint_hash(setup_scanner):
|
||||||
recipe_scanner, mock_civitai_client, mock_metadata_provider = setup_scanner
|
recipe_scanner, mock_civitai_client, mock_metadata_provider = setup_scanner
|
||||||
|
|||||||
@@ -678,7 +678,7 @@ async def test_analyze_remote_video(tmp_path):
|
|||||||
)
|
)
|
||||||
|
|
||||||
class DummyClient:
|
class DummyClient:
|
||||||
async def get_image_info(self, image_id):
|
async def get_image_info(self, image_id, source_url=None):
|
||||||
return {
|
return {
|
||||||
"url": "https://civitai.com/video.mp4",
|
"url": "https://civitai.com/video.mp4",
|
||||||
"type": "video",
|
"type": "video",
|
||||||
@@ -698,3 +698,60 @@ async def test_analyze_remote_video(tmp_path):
|
|||||||
assert result.payload["is_video"] is True
|
assert result.payload["is_video"] is True
|
||||||
assert result.payload["extension"] == ".mp4"
|
assert result.payload["extension"] == ".mp4"
|
||||||
assert result.payload["image_base64"] is not None
|
assert result.payload["image_base64"] is not None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_analyze_remote_image_supports_civitai_red():
|
||||||
|
exif_utils = DummyExifUtils()
|
||||||
|
|
||||||
|
class DummyFactory:
|
||||||
|
def create_parser(self, metadata):
|
||||||
|
async def parse_metadata(m, recipe_scanner=None, civitai_client=None):
|
||||||
|
return {"loras": [], "gen_params": {"prompt": "red prompt"}}
|
||||||
|
|
||||||
|
return SimpleNamespace(parse_metadata=parse_metadata)
|
||||||
|
|
||||||
|
async def downloader_factory():
|
||||||
|
class Downloader:
|
||||||
|
async def download_file(self, url, path, use_auth=False):
|
||||||
|
Path(path).write_bytes(b"fake-image")
|
||||||
|
return True, "success"
|
||||||
|
|
||||||
|
return Downloader()
|
||||||
|
|
||||||
|
service = RecipeAnalysisService(
|
||||||
|
exif_utils=exif_utils,
|
||||||
|
recipe_parser_factory=DummyFactory(),
|
||||||
|
downloader_factory=downloader_factory,
|
||||||
|
metadata_collector=None,
|
||||||
|
metadata_processor_cls=None,
|
||||||
|
metadata_registry_cls=None,
|
||||||
|
standalone_mode=False,
|
||||||
|
logger=logging.getLogger("test"),
|
||||||
|
)
|
||||||
|
|
||||||
|
class DummyClient:
|
||||||
|
def __init__(self):
|
||||||
|
self.calls = []
|
||||||
|
|
||||||
|
async def get_image_info(self, image_id, source_url=None):
|
||||||
|
self.calls.append((image_id, source_url))
|
||||||
|
return {
|
||||||
|
"url": "https://image.civitai.com/x/y/original=true/sample.jpeg",
|
||||||
|
"type": "image",
|
||||||
|
"meta": {"prompt": "red prompt"},
|
||||||
|
}
|
||||||
|
|
||||||
|
class DummyScanner:
|
||||||
|
async def find_recipes_by_fingerprint(self, fingerprint):
|
||||||
|
return []
|
||||||
|
|
||||||
|
client = DummyClient()
|
||||||
|
result = await service.analyze_remote_image(
|
||||||
|
url="https://civitai.red/images/123",
|
||||||
|
recipe_scanner=DummyScanner(),
|
||||||
|
civitai_client=client,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert client.calls == [("123", "https://civitai.red/images/123")]
|
||||||
|
assert result.payload["loras"] == []
|
||||||
|
|||||||
@@ -1,4 +1,11 @@
|
|||||||
from py.utils.civitai_utils import build_license_flags, resolve_license_info, resolve_license_payload
|
from py.utils.civitai_utils import (
|
||||||
|
build_license_flags,
|
||||||
|
extract_civitai_image_id,
|
||||||
|
extract_civitai_model_url_parts,
|
||||||
|
is_supported_civitai_page_host,
|
||||||
|
resolve_license_info,
|
||||||
|
resolve_license_payload,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_resolve_license_payload_defaults():
|
def test_resolve_license_payload_defaults():
|
||||||
@@ -78,3 +85,40 @@ def test_build_license_flags_parses_aggregate_inside_list():
|
|||||||
flags = build_license_flags(source)
|
flags = build_license_flags(source)
|
||||||
expected_flags = (1 << 0) | (7 << 1) | (1 << 5)
|
expected_flags = (1 << 0) | (7 << 1) | (1 << 5)
|
||||||
assert flags == expected_flags
|
assert flags == expected_flags
|
||||||
|
|
||||||
|
|
||||||
|
def test_supported_civitai_page_hosts_include_red():
|
||||||
|
assert is_supported_civitai_page_host("civitai.com") is True
|
||||||
|
assert is_supported_civitai_page_host("civitai.red") is True
|
||||||
|
assert is_supported_civitai_page_host("www.civitai.com") is False
|
||||||
|
assert is_supported_civitai_page_host("www.civitai.red") is False
|
||||||
|
assert is_supported_civitai_page_host("example.com") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_civitai_model_url_parts_supports_red():
|
||||||
|
model_id, version_id = extract_civitai_model_url_parts(
|
||||||
|
"https://civitai.red/models/65423/nijimecha-artstyle?modelVersionId=777"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert model_id == "65423"
|
||||||
|
assert version_id == "777"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_civitai_model_url_parts_rejects_non_civitai_host():
|
||||||
|
model_id, version_id = extract_civitai_model_url_parts(
|
||||||
|
"https://example.com/models/65423?modelVersionId=777"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert model_id is None
|
||||||
|
assert version_id is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_civitai_image_id_supports_red():
|
||||||
|
assert (
|
||||||
|
extract_civitai_image_id("https://civitai.red/images/126920345")
|
||||||
|
== "126920345"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_civitai_image_id_rejects_non_civitai_host():
|
||||||
|
assert extract_civitai_image_id("https://example.com/images/126920345") is None
|
||||||
|
|||||||
Reference in New Issue
Block a user