mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
Add support for importing recipes from remote sources by: - Adding import_remote_recipe endpoint to RecipeHandlerSet - Injecting downloader_factory and civitai_client_getter dependencies - Implementing image download and resource parsing logic - Supporting Civitai resource payloads with checkpoints and LoRAs - Adding required imports for regex and temporary file handling This enables users to import recipes directly from external sources like Civitai without manual file downloads.
249 lines
7.0 KiB
Python
249 lines
7.0 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from py.services.recipes.analysis_service import RecipeAnalysisService
|
|
from py.services.recipes.errors import RecipeDownloadError, RecipeNotFoundError
|
|
from py.services.recipes.persistence_service import RecipePersistenceService
|
|
|
|
|
|
class DummyExifUtils:
|
|
def optimize_image(self, image_data, target_width, format, quality, preserve_metadata):
|
|
return image_data, ".webp"
|
|
|
|
def append_recipe_metadata(self, image_path, recipe_data):
|
|
self.appended = (image_path, recipe_data)
|
|
|
|
def extract_image_metadata(self, path):
|
|
return {}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_remote_image_download_failure_cleans_temp(tmp_path, monkeypatch):
|
|
exif_utils = DummyExifUtils()
|
|
|
|
class DummyFactory:
|
|
def create_parser(self, metadata):
|
|
return None
|
|
|
|
async def downloader_factory():
|
|
class Downloader:
|
|
async def download_file(self, url, path, use_auth=False):
|
|
return False, "failure"
|
|
|
|
return Downloader()
|
|
|
|
service = RecipeAnalysisService(
|
|
exif_utils=exif_utils,
|
|
recipe_parser_factory=DummyFactory(),
|
|
downloader_factory=downloader_factory,
|
|
metadata_collector=None,
|
|
metadata_processor_cls=None,
|
|
metadata_registry_cls=None,
|
|
standalone_mode=False,
|
|
logger=logging.getLogger("test"),
|
|
)
|
|
|
|
temp_path = tmp_path / "temp.jpg"
|
|
|
|
def create_temp_path():
|
|
temp_path.write_bytes(b"")
|
|
return str(temp_path)
|
|
|
|
monkeypatch.setattr(service, "_create_temp_path", create_temp_path)
|
|
|
|
with pytest.raises(RecipeDownloadError):
|
|
await service.analyze_remote_image(
|
|
url="https://example.com/image.jpg",
|
|
recipe_scanner=SimpleNamespace(),
|
|
civitai_client=SimpleNamespace(),
|
|
)
|
|
|
|
assert not temp_path.exists(), "temporary file should be cleaned after failure"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_local_image_missing_file(tmp_path):
|
|
async def downloader_factory():
|
|
return SimpleNamespace()
|
|
|
|
service = RecipeAnalysisService(
|
|
exif_utils=DummyExifUtils(),
|
|
recipe_parser_factory=SimpleNamespace(create_parser=lambda metadata: None),
|
|
downloader_factory=downloader_factory,
|
|
metadata_collector=None,
|
|
metadata_processor_cls=None,
|
|
metadata_registry_cls=None,
|
|
standalone_mode=False,
|
|
logger=logging.getLogger("test"),
|
|
)
|
|
|
|
with pytest.raises(RecipeNotFoundError):
|
|
await service.analyze_local_image(
|
|
file_path=str(tmp_path / "missing.png"),
|
|
recipe_scanner=SimpleNamespace(),
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_recipe_reports_duplicates(tmp_path):
|
|
exif_utils = DummyExifUtils()
|
|
|
|
class DummyCache:
|
|
def __init__(self):
|
|
self.raw_data = []
|
|
|
|
async def resort(self):
|
|
pass
|
|
|
|
class DummyScanner:
|
|
def __init__(self, root):
|
|
self.recipes_dir = str(root)
|
|
self._cache = DummyCache()
|
|
self.last_fingerprint = None
|
|
|
|
async def find_recipes_by_fingerprint(self, fingerprint):
|
|
self.last_fingerprint = fingerprint
|
|
return ["existing"]
|
|
|
|
async def add_recipe(self, recipe_data):
|
|
self._cache.raw_data.append(recipe_data)
|
|
await self._cache.resort()
|
|
|
|
scanner = DummyScanner(tmp_path)
|
|
service = RecipePersistenceService(
|
|
exif_utils=exif_utils,
|
|
card_preview_width=512,
|
|
logger=logging.getLogger("test"),
|
|
)
|
|
|
|
metadata = {
|
|
"base_model": "sd",
|
|
"loras": [
|
|
{
|
|
"file_name": "sample",
|
|
"hash": "abc123",
|
|
"weight": 0.5,
|
|
"id": 1,
|
|
"name": "Sample",
|
|
"version": "v1",
|
|
"isDeleted": False,
|
|
"exclude": False,
|
|
}
|
|
],
|
|
}
|
|
|
|
result = await service.save_recipe(
|
|
recipe_scanner=scanner,
|
|
image_bytes=b"image-bytes",
|
|
image_base64=None,
|
|
name="My Recipe",
|
|
tags=["tag"],
|
|
metadata=metadata,
|
|
)
|
|
|
|
assert result.payload["matching_recipes"] == ["existing"]
|
|
assert scanner.last_fingerprint is not None
|
|
assert os.path.exists(result.payload["json_path"])
|
|
assert scanner._cache.raw_data
|
|
|
|
stored = json.loads(Path(result.payload["json_path"]).read_text())
|
|
expected_image_path = os.path.normpath(result.payload["image_path"])
|
|
assert stored["file_path"] == expected_image_path
|
|
assert service._exif_utils.appended[0] == expected_image_path
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_recipe_persists_checkpoint_metadata(tmp_path):
|
|
exif_utils = DummyExifUtils()
|
|
|
|
class DummyScanner:
|
|
def __init__(self, root):
|
|
self.recipes_dir = str(root)
|
|
|
|
async def find_recipes_by_fingerprint(self, fingerprint):
|
|
return []
|
|
|
|
async def add_recipe(self, recipe_data):
|
|
return None
|
|
|
|
scanner = DummyScanner(tmp_path)
|
|
service = RecipePersistenceService(
|
|
exif_utils=exif_utils,
|
|
card_preview_width=512,
|
|
logger=logging.getLogger("test"),
|
|
)
|
|
|
|
checkpoint_meta = {
|
|
"type": "checkpoint",
|
|
"modelId": 10,
|
|
"modelVersionId": 20,
|
|
"modelName": "Flux",
|
|
"modelVersionName": "Dev",
|
|
}
|
|
|
|
metadata = {
|
|
"base_model": "Flux",
|
|
"loras": [],
|
|
"checkpoint": checkpoint_meta,
|
|
}
|
|
|
|
result = await service.save_recipe(
|
|
recipe_scanner=scanner,
|
|
image_bytes=b"img",
|
|
image_base64=None,
|
|
name="Checkpointed",
|
|
tags=[],
|
|
metadata=metadata,
|
|
)
|
|
|
|
stored = json.loads(Path(result.payload["json_path"]).read_text())
|
|
assert stored["checkpoint"] == checkpoint_meta
|
|
assert stored["gen_params"]["checkpoint"] == checkpoint_meta
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_recipe_from_widget_allows_empty_lora(tmp_path):
|
|
exif_utils = DummyExifUtils()
|
|
|
|
class DummyScanner:
|
|
def __init__(self, root):
|
|
self.recipes_dir = str(root)
|
|
self.added = []
|
|
|
|
async def get_local_lora(self, name): # pragma: no cover - no lookups expected
|
|
return None
|
|
|
|
async def add_recipe(self, recipe_data):
|
|
self.added.append(recipe_data)
|
|
|
|
scanner = DummyScanner(tmp_path)
|
|
service = RecipePersistenceService(
|
|
exif_utils=exif_utils,
|
|
card_preview_width=512,
|
|
logger=logging.getLogger("test"),
|
|
)
|
|
|
|
metadata = {
|
|
"loras": "", # no matches present in the stack
|
|
"checkpoint": "base-model.safetensors",
|
|
"prompt": "a calm scene",
|
|
"negative_prompt": "",
|
|
}
|
|
|
|
result = await service.save_recipe_from_widget(
|
|
recipe_scanner=scanner,
|
|
metadata=metadata,
|
|
image_bytes=b"image-bytes",
|
|
)
|
|
|
|
stored = json.loads(Path(result.payload["json_path"]).read_text())
|
|
|
|
assert stored["loras"] == []
|
|
assert stored["title"] == "recipe"
|
|
assert scanner.added and scanner.added[0]["loras"] == []
|