mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
feat: Add support for remote video analysis and preview for recipe imports. see #420
This commit is contained in:
@@ -13,6 +13,7 @@ import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from ...utils.utils import calculate_recipe_fingerprint
|
||||
from ...utils.civitai_utils import rewrite_preview_url
|
||||
from .errors import (
|
||||
RecipeDownloadError,
|
||||
RecipeNotFoundError,
|
||||
@@ -94,18 +95,39 @@ class RecipeAnalysisService:
|
||||
if civitai_client is None:
|
||||
raise RecipeServiceError("Civitai client unavailable")
|
||||
|
||||
temp_path = self._create_temp_path()
|
||||
temp_path = None
|
||||
metadata: Optional[dict[str, Any]] = None
|
||||
is_video = False
|
||||
extension = ".jpg" # Default
|
||||
|
||||
try:
|
||||
civitai_match = re.match(r"https://civitai\.com/images/(\d+)", url)
|
||||
if civitai_match:
|
||||
image_info = await civitai_client.get_image_info(civitai_match.group(1))
|
||||
if not image_info:
|
||||
raise RecipeDownloadError("Failed to fetch image information from Civitai")
|
||||
|
||||
image_url = image_info.get("url")
|
||||
if not image_url:
|
||||
raise RecipeDownloadError("No image URL found in Civitai response")
|
||||
|
||||
is_video = image_info.get("type") == "video"
|
||||
|
||||
# Use optimized preview URLs if possible
|
||||
rewritten_url, _ = rewrite_preview_url(image_url, media_type=image_info.get("type"))
|
||||
if rewritten_url:
|
||||
image_url = rewritten_url
|
||||
|
||||
if is_video:
|
||||
# Extract extension from URL
|
||||
url_path = image_url.split('?')[0].split('#')[0]
|
||||
extension = os.path.splitext(url_path)[1].lower() or ".mp4"
|
||||
else:
|
||||
extension = ".jpg"
|
||||
|
||||
temp_path = self._create_temp_path(suffix=extension)
|
||||
await self._download_image(image_url, temp_path)
|
||||
|
||||
metadata = image_info.get("meta") if "meta" in image_info else None
|
||||
if (
|
||||
isinstance(metadata, dict)
|
||||
@@ -114,22 +136,31 @@ class RecipeAnalysisService:
|
||||
):
|
||||
metadata = metadata["meta"]
|
||||
else:
|
||||
# Basic extension detection for non-Civitai URLs
|
||||
url_path = url.split('?')[0].split('#')[0]
|
||||
extension = os.path.splitext(url_path)[1].lower()
|
||||
if extension in [".mp4", ".webm"]:
|
||||
is_video = True
|
||||
else:
|
||||
extension = ".jpg"
|
||||
|
||||
temp_path = self._create_temp_path(suffix=extension)
|
||||
await self._download_image(url, temp_path)
|
||||
|
||||
if metadata is None:
|
||||
if metadata is None and not is_video:
|
||||
metadata = self._exif_utils.extract_image_metadata(temp_path)
|
||||
|
||||
if not metadata:
|
||||
return self._metadata_not_found_response(temp_path)
|
||||
|
||||
return await self._parse_metadata(
|
||||
metadata,
|
||||
metadata or {},
|
||||
recipe_scanner=recipe_scanner,
|
||||
image_path=temp_path,
|
||||
include_image_base64=True,
|
||||
is_video=is_video,
|
||||
extension=extension,
|
||||
)
|
||||
finally:
|
||||
self._safe_cleanup(temp_path)
|
||||
if temp_path:
|
||||
self._safe_cleanup(temp_path)
|
||||
|
||||
async def analyze_local_image(
|
||||
self,
|
||||
@@ -198,12 +229,16 @@ class RecipeAnalysisService:
|
||||
recipe_scanner,
|
||||
image_path: Optional[str],
|
||||
include_image_base64: bool,
|
||||
is_video: bool = False,
|
||||
extension: str = ".jpg",
|
||||
) -> AnalysisResult:
|
||||
parser = self._recipe_parser_factory.create_parser(metadata)
|
||||
if parser is None:
|
||||
payload = {"error": "No parser found for this image", "loras": []}
|
||||
if include_image_base64 and image_path:
|
||||
payload["image_base64"] = self._encode_file(image_path)
|
||||
payload["is_video"] = is_video
|
||||
payload["extension"] = extension
|
||||
return AnalysisResult(payload)
|
||||
|
||||
result = await parser.parse_metadata(metadata, recipe_scanner=recipe_scanner)
|
||||
@@ -211,6 +246,9 @@ class RecipeAnalysisService:
|
||||
if include_image_base64 and image_path:
|
||||
result["image_base64"] = self._encode_file(image_path)
|
||||
|
||||
result["is_video"] = is_video
|
||||
result["extension"] = extension
|
||||
|
||||
if "error" in result and not result.get("loras"):
|
||||
return AnalysisResult(result)
|
||||
|
||||
@@ -241,8 +279,8 @@ class RecipeAnalysisService:
|
||||
temp_file.write(data)
|
||||
return temp_file.name
|
||||
|
||||
def _create_temp_path(self) -> str:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
|
||||
def _create_temp_path(self, suffix: str = ".jpg") -> str:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
|
||||
return temp_file.name
|
||||
|
||||
def _safe_cleanup(self, path: Optional[str]) -> None:
|
||||
|
||||
Reference in New Issue
Block a user