Files
ComfyUI-Lora-Manager/tests/services/test_recipe_services.py

896 lines
26 KiB
Python

import json
import logging
import os
from io import BytesIO
from pathlib import Path
from types import SimpleNamespace
import piexif
import pytest
from PIL import Image, PngImagePlugin
from py.services.recipes.analysis_service import RecipeAnalysisService
from py.services.recipes.errors import (
RecipeDownloadError,
RecipeNotFoundError,
RecipeValidationError,
)
from py.services.recipes.persistence_service import RecipePersistenceService
from py.utils.exif_utils import ExifUtils
class DummyExifUtils:
def __init__(self):
self.appended = None
self.optimized_calls = 0
def optimize_image(self, image_data, target_width, format, quality, preserve_metadata):
self.optimized_calls += 1
return image_data, ".webp"
def append_recipe_metadata(self, image_path, recipe_data):
self.appended = (image_path, recipe_data)
def extract_image_metadata(self, path):
return {}
@pytest.mark.asyncio
async def test_save_recipe_video_bypasses_optimization(tmp_path):
exif_utils = DummyExifUtils()
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
async def find_recipes_by_fingerprint(self, fingerprint):
return []
async def add_recipe(self, recipe_data):
return None
scanner = DummyScanner(tmp_path)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
metadata = {"base_model": "Flux", "loras": []}
video_bytes = b"mp4-content"
result = await service.save_recipe(
recipe_scanner=scanner,
image_bytes=video_bytes,
image_base64=None,
name="Video Recipe",
tags=[],
metadata=metadata,
extension=".mp4",
)
assert result.payload["image_path"].endswith(".mp4")
assert Path(result.payload["image_path"]).read_bytes() == video_bytes
assert exif_utils.optimized_calls == 0, "Optimization should be bypassed for video"
assert exif_utils.appended is None, "Metadata embedding should be bypassed for video"
@pytest.mark.asyncio
async def test_analyze_remote_image_download_failure_cleans_temp(tmp_path, monkeypatch):
exif_utils = DummyExifUtils()
class DummyFactory:
def create_parser(self, metadata):
return None
async def downloader_factory():
class Downloader:
async def download_file(self, url, path, use_auth=False):
return False, "failure"
return Downloader()
service = RecipeAnalysisService(
exif_utils=exif_utils,
recipe_parser_factory=DummyFactory(),
downloader_factory=downloader_factory,
metadata_collector=None,
metadata_processor_cls=None,
metadata_registry_cls=None,
standalone_mode=False,
logger=logging.getLogger("test"),
)
temp_path = tmp_path / "temp.jpg"
def create_temp_path(suffix=".jpg"):
temp_path.write_bytes(b"")
return str(temp_path)
monkeypatch.setattr(service, "_create_temp_path", create_temp_path)
with pytest.raises(RecipeDownloadError):
await service.analyze_remote_image(
url="https://example.com/image.jpg",
recipe_scanner=SimpleNamespace(),
civitai_client=SimpleNamespace(),
)
assert not temp_path.exists(), "temporary file should be cleaned after failure"
@pytest.mark.asyncio
async def test_analyze_local_image_missing_file(tmp_path):
async def downloader_factory():
return SimpleNamespace()
service = RecipeAnalysisService(
exif_utils=DummyExifUtils(),
recipe_parser_factory=SimpleNamespace(create_parser=lambda metadata: None),
downloader_factory=downloader_factory,
metadata_collector=None,
metadata_processor_cls=None,
metadata_registry_cls=None,
standalone_mode=False,
logger=logging.getLogger("test"),
)
with pytest.raises(RecipeNotFoundError):
await service.analyze_local_image(
file_path=str(tmp_path / "missing.png"),
recipe_scanner=SimpleNamespace(),
)
@pytest.mark.asyncio
async def test_save_recipe_reports_duplicates(tmp_path):
exif_utils = DummyExifUtils()
class DummyCache:
def __init__(self):
self.raw_data = []
async def resort(self):
pass
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
self._cache = DummyCache()
self.last_fingerprint = None
async def find_recipes_by_fingerprint(self, fingerprint):
self.last_fingerprint = fingerprint
return ["existing"]
async def add_recipe(self, recipe_data):
self._cache.raw_data.append(recipe_data)
await self._cache.resort()
scanner = DummyScanner(tmp_path)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
metadata = {
"base_model": "sd",
"loras": [
{
"file_name": "sample",
"hash": "abc123",
"weight": 0.5,
"id": 1,
"name": "Sample",
"version": "v1",
"isDeleted": False,
"exclude": False,
}
],
}
result = await service.save_recipe(
recipe_scanner=scanner,
image_bytes=b"image-bytes",
image_base64=None,
name="My Recipe",
tags=["tag"],
metadata=metadata,
)
assert result.payload["matching_recipes"] == ["existing"]
assert scanner.last_fingerprint is not None
assert os.path.exists(result.payload["json_path"])
assert scanner._cache.raw_data
stored = json.loads(Path(result.payload["json_path"]).read_text())
expected_image_path = os.path.normpath(result.payload["image_path"])
assert stored["file_path"] == expected_image_path
assert service._exif_utils.appended[0] == expected_image_path
@pytest.mark.asyncio
async def test_save_recipe_persists_checkpoint_metadata(tmp_path):
exif_utils = DummyExifUtils()
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
async def find_recipes_by_fingerprint(self, fingerprint):
return []
async def add_recipe(self, recipe_data):
return None
scanner = DummyScanner(tmp_path)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
checkpoint_meta = {
"type": "checkpoint",
"modelId": 10,
"modelVersionId": 20,
"modelName": "Flux",
"modelVersionName": "Dev",
}
metadata = {
"base_model": "Flux",
"loras": [],
"checkpoint": checkpoint_meta,
}
result = await service.save_recipe(
recipe_scanner=scanner,
image_bytes=b"img",
image_base64=None,
name="Checkpointed",
tags=[],
metadata=metadata,
)
stored = json.loads(Path(result.payload["json_path"]).read_text())
assert stored["checkpoint"] == checkpoint_meta
assert "checkpoint" not in stored["gen_params"]
@pytest.mark.asyncio
async def test_save_recipe_promotes_checkpoint_from_gen_params(tmp_path):
exif_utils = DummyExifUtils()
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
async def find_recipes_by_fingerprint(self, fingerprint):
return []
async def add_recipe(self, recipe_data):
return None
scanner = DummyScanner(tmp_path)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
checkpoint_meta = {
"type": "checkpoint",
"modelId": 10,
"modelVersionId": 20,
"modelName": "Flux",
"modelVersionName": "Dev",
}
metadata = {
"base_model": "Flux",
"loras": [],
"gen_params": {
"checkpoint": checkpoint_meta,
},
}
result = await service.save_recipe(
recipe_scanner=scanner,
image_bytes=b"img",
image_base64=None,
name="Checkpointed",
tags=[],
metadata=metadata,
)
stored = json.loads(Path(result.payload["json_path"]).read_text())
assert stored["checkpoint"] == checkpoint_meta
assert "checkpoint" not in stored["gen_params"]
@pytest.mark.asyncio
async def test_save_recipe_strips_non_persistable_gen_params(tmp_path):
exif_utils = DummyExifUtils()
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
async def find_recipes_by_fingerprint(self, fingerprint):
return []
async def add_recipe(self, recipe_data):
return None
scanner = DummyScanner(tmp_path)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
metadata = {
"base_model": "Flux",
"loras": [],
"gen_params": {
"prompt": "hello world",
"negative_prompt": "bad hands",
"cfg_scale": 7,
"raw_metadata": {"prompt": "should not persist"},
"Version": "ComfyUI",
"RNG": "cpu",
"Schedule type": "karras",
"Discard penultimate sigma": True,
"eps_scaling_factor": 0.1,
},
}
result = await service.save_recipe(
recipe_scanner=scanner,
image_bytes=b"img",
image_base64=None,
name="Sanitized",
tags=[],
metadata=metadata,
)
stored = json.loads(Path(result.payload["json_path"]).read_text())
assert stored["gen_params"] == {
"prompt": "hello world",
"negative_prompt": "bad hands",
"cfg_scale": 7,
}
@pytest.mark.asyncio
async def test_save_recipe_derives_allowed_fields_from_raw_metadata(tmp_path):
exif_utils = DummyExifUtils()
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
async def find_recipes_by_fingerprint(self, fingerprint):
return []
async def add_recipe(self, recipe_data):
return None
scanner = DummyScanner(tmp_path)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
metadata = {
"base_model": "Flux",
"loras": [],
"raw_metadata": {
"prompt": "hello world",
"negative_prompt": "bad hands",
"steps": 30,
"sampler": "Euler",
"cfg_scale": 7,
"seed": 123,
"size": "1024x1024",
"clip_skip": 2,
"Version": "ComfyUI",
"raw_metadata": {"nested": True},
},
}
result = await service.save_recipe(
recipe_scanner=scanner,
image_bytes=b"img",
image_base64=None,
name="Derived",
tags=[],
metadata=metadata,
)
stored = json.loads(Path(result.payload["json_path"]).read_text())
assert stored["gen_params"] == {
"prompt": "hello world",
"negative_prompt": "bad hands",
"steps": 30,
"sampler": "Euler",
"cfg_scale": 7,
"seed": 123,
"size": "1024x1024",
"clip_skip": 2,
}
@pytest.mark.asyncio
async def test_save_recipe_preserves_workflow_when_png_is_converted_to_webp(tmp_path):
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
async def find_recipes_by_fingerprint(self, fingerprint):
return []
async def add_recipe(self, recipe_data):
return None
png_info = PngImagePlugin.PngInfo()
png_info.add_text("parameters", "prompt text\nSteps: 20")
png_info.add_text("workflow", '{"nodes":[{"id":1}]}')
image_buffer = BytesIO()
Image.new("RGB", (96, 48), color="purple").save(
image_buffer, format="PNG", pnginfo=png_info
)
service = RecipePersistenceService(
exif_utils=ExifUtils,
card_preview_width=64,
logger=logging.getLogger("test"),
)
result = await service.save_recipe(
recipe_scanner=DummyScanner(tmp_path),
image_bytes=image_buffer.getvalue(),
image_base64=None,
name="Workflow Recipe",
tags=["workflow"],
metadata={"base_model": "sd", "loras": []},
extension=".png",
)
image_path = Path(result.payload["image_path"])
exif_dict = piexif.load(str(image_path))
assert (
exif_dict["0th"][piexif.ImageIFD.ImageDescription].decode("utf-8")
== 'Workflow:{"nodes":[{"id":1}]}'
)
user_comment = exif_dict["Exif"][piexif.ExifIFD.UserComment]
decoded_comment = user_comment[8:].decode("utf-16be")
assert "prompt text" in decoded_comment
assert "Recipe metadata:" in decoded_comment
@pytest.mark.asyncio
async def test_save_recipe_strips_checkpoint_local_fields(tmp_path):
exif_utils = DummyExifUtils()
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
async def find_recipes_by_fingerprint(self, fingerprint):
return []
async def add_recipe(self, recipe_data):
return None
scanner = DummyScanner(tmp_path)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
checkpoint_meta = {
"type": "checkpoint",
"modelId": 10,
"modelVersionId": 20,
"modelName": "Flux",
"modelVersionName": "Dev",
"existsLocally": False,
"localPath": "/tmp/foo",
"thumbnailUrl": "http://example.com",
"size": 123,
"downloadUrl": "http://example.com/dl",
}
metadata = {
"base_model": "Flux",
"loras": [],
"checkpoint": checkpoint_meta,
}
result = await service.save_recipe(
recipe_scanner=scanner,
image_bytes=b"img",
image_base64=None,
name="Checkpointed",
tags=[],
metadata=metadata,
)
stored = json.loads(Path(result.payload["json_path"]).read_text())
assert stored["checkpoint"] == {
"type": "checkpoint",
"modelId": 10,
"modelVersionId": 20,
"modelName": "Flux",
"modelVersionName": "Dev",
}
@pytest.mark.asyncio
async def test_save_recipe_from_widget_allows_empty_lora(tmp_path):
exif_utils = DummyExifUtils()
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
self.added = []
async def get_local_lora(self, name): # pragma: no cover - no lookups expected
return None
async def get_local_checkpoint(self, name):
return None
async def add_recipe(self, recipe_data):
self.added.append(recipe_data)
scanner = DummyScanner(tmp_path)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
metadata = {
"loras": "", # no matches present in the stack
"checkpoint": "base-model.safetensors",
"prompt": "a calm scene",
"negative_prompt": "",
}
result = await service.save_recipe_from_widget(
recipe_scanner=scanner,
metadata=metadata,
image_bytes=b"image-bytes",
)
stored = json.loads(Path(result.payload["json_path"]).read_text())
assert stored["loras"] == []
assert stored["title"] == "recipe"
assert stored["checkpoint"] == {
"type": "checkpoint",
"name": "base-model.safetensors",
"file_name": "base-model",
"hash": "",
}
assert scanner.added and scanner.added[0]["loras"] == []
@pytest.mark.asyncio
async def test_save_recipe_from_widget_enriches_checkpoint_from_local_cache(tmp_path):
exif_utils = DummyExifUtils()
class DummyScanner:
def __init__(self, root):
self.recipes_dir = str(root)
self.added = []
self.checkpoint_queries = []
async def get_local_lora(self, name): # pragma: no cover - no loras
return None
async def get_local_checkpoint(self, name):
self.checkpoint_queries.append(name)
if name != "matched-model":
return None
return {
"file_name": "matched-model",
"file_path": "/models/checkpoints/folder/matched-model.safetensors",
"sha256": "ABC123",
"base_model": "Illustrious",
"civitai": {
"id": 456,
"name": "v1.0",
"baseModel": "Illustrious",
"model": {
"id": 123,
"name": "Matched Model",
},
},
}
async def add_recipe(self, recipe_data):
self.added.append(recipe_data)
scanner = DummyScanner(tmp_path)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
result = await service.save_recipe_from_widget(
recipe_scanner=scanner,
metadata={
"loras": "",
"checkpoint": "folder/matched-model.safetensors",
"prompt": "a calm scene",
},
image_bytes=b"image-bytes",
)
stored = json.loads(Path(result.payload["json_path"]).read_text())
assert scanner.checkpoint_queries == [
"folder/matched-model.safetensors",
"matched-model.safetensors",
"matched-model",
]
assert stored["base_model"] == "Illustrious"
assert stored["checkpoint"] == {
"type": "checkpoint",
"modelId": 123,
"modelVersionId": 456,
"name": "Matched Model",
"version": "v1.0",
"hash": "abc123",
"file_name": "matched-model",
"modelName": "Matched Model",
"modelVersionName": "v1.0",
"baseModel": "Illustrious",
}
@pytest.mark.asyncio
async def test_move_recipe_updates_paths(tmp_path):
exif_utils = DummyExifUtils()
recipes_dir = tmp_path / "recipes"
recipes_dir.mkdir(parents=True, exist_ok=True)
recipe_id = "move-me"
image_path = recipes_dir / f"{recipe_id}.webp"
json_path = recipes_dir / f"{recipe_id}.recipe.json"
image_path.write_bytes(b"img")
json_path.write_text(
json.dumps(
{
"id": recipe_id,
"file_path": str(image_path),
"title": "Recipe",
"loras": [],
"gen_params": {},
"created_date": 0,
"modified": 0,
}
)
)
class MoveScanner:
def __init__(self, root: Path):
self.recipes_dir = str(root)
self.recipe = {
"id": recipe_id,
"file_path": str(image_path),
"title": "Recipe",
"loras": [],
"gen_params": {},
"created_date": 0,
"modified": 0,
"folder": "",
}
async def get_recipe_by_id(self, target_id: str):
return self.recipe if target_id == recipe_id else None
async def get_recipe_json_path(self, target_id: str):
matches = list(Path(self.recipes_dir).rglob(f"{target_id}.recipe.json"))
return str(matches[0]) if matches else None
async def update_recipe_metadata(self, target_id: str, metadata: dict):
if target_id != recipe_id:
return False
self.recipe.update(metadata)
target_path = await self.get_recipe_json_path(target_id)
if not target_path:
return False
existing = json.loads(Path(target_path).read_text())
existing.update(metadata)
Path(target_path).write_text(json.dumps(existing))
return True
async def get_cached_data(self, force_refresh: bool = False): # noqa: ARG002 - signature parity
return SimpleNamespace(raw_data=[self.recipe])
scanner = MoveScanner(recipes_dir)
service = RecipePersistenceService(
exif_utils=exif_utils,
card_preview_width=512,
logger=logging.getLogger("test"),
)
target_folder = recipes_dir / "nested"
result = await service.move_recipe(
recipe_scanner=scanner, recipe_id=recipe_id, target_path=str(target_folder)
)
assert result.payload["folder"] == "nested"
assert Path(result.payload["json_path"]).parent == target_folder
assert Path(result.payload["new_file_path"]).parent == target_folder
assert not json_path.exists()
stored = json.loads(Path(result.payload["json_path"]).read_text())
assert stored["folder"] == "nested"
assert stored["file_path"] == result.payload["new_file_path"]
@pytest.mark.asyncio
async def test_update_recipe_accepts_gen_params() -> None:
class DummyScanner:
def __init__(self):
self.calls = []
async def update_recipe_metadata(self, recipe_id: str, updates: dict[str, object]):
self.calls.append((recipe_id, updates))
return True
scanner = DummyScanner()
service = RecipePersistenceService(
exif_utils=DummyExifUtils(),
card_preview_width=512,
logger=logging.getLogger("test"),
)
updates = {"gen_params": {"prompt": "updated prompt", "steps": 28}}
result = await service.update_recipe(
recipe_scanner=scanner,
recipe_id="recipe-1",
updates=updates,
)
assert result.payload["success"] is True
assert scanner.calls == [("recipe-1", updates)]
@pytest.mark.asyncio
async def test_update_recipe_rejects_non_object_gen_params() -> None:
service = RecipePersistenceService(
exif_utils=DummyExifUtils(),
card_preview_width=512,
logger=logging.getLogger("test"),
)
with pytest.raises(RecipeValidationError, match="gen_params must be an object"):
await service.update_recipe(
recipe_scanner=SimpleNamespace(),
recipe_id="recipe-1",
updates={"gen_params": "invalid"},
)
@pytest.mark.asyncio
async def test_analyze_remote_video(tmp_path):
exif_utils = DummyExifUtils()
class DummyFactory:
def create_parser(self, metadata):
async def parse_metadata(m, recipe_scanner=None, civitai_client=None):
return {"loras": []}
return SimpleNamespace(parse_metadata=parse_metadata)
async def downloader_factory():
class Downloader:
async def download_file(self, url, path, use_auth=False):
Path(path).write_bytes(b"video-content")
return True, "success"
return Downloader()
service = RecipeAnalysisService(
exif_utils=exif_utils,
recipe_parser_factory=DummyFactory(),
downloader_factory=downloader_factory,
metadata_collector=None,
metadata_processor_cls=None,
metadata_registry_cls=None,
standalone_mode=False,
logger=logging.getLogger("test"),
)
class DummyClient:
async def get_image_info(self, image_id, source_url=None):
return {
"url": "https://civitai.com/video.mp4",
"type": "video",
"meta": {"prompt": "video prompt"},
}
class DummyScanner:
async def find_recipes_by_fingerprint(self, fingerprint):
return []
result = await service.analyze_remote_image(
url="https://civitai.com/images/123",
recipe_scanner=DummyScanner(),
civitai_client=DummyClient(),
)
assert result.payload["is_video"] is True
assert result.payload["extension"] == ".mp4"
assert result.payload["image_base64"] is not None
@pytest.mark.asyncio
async def test_analyze_remote_image_supports_civitai_red():
exif_utils = DummyExifUtils()
class DummyFactory:
def create_parser(self, metadata):
async def parse_metadata(m, recipe_scanner=None, civitai_client=None):
return {"loras": [], "gen_params": {"prompt": "red prompt"}}
return SimpleNamespace(parse_metadata=parse_metadata)
async def downloader_factory():
class Downloader:
async def download_file(self, url, path, use_auth=False):
Path(path).write_bytes(b"fake-image")
return True, "success"
return Downloader()
service = RecipeAnalysisService(
exif_utils=exif_utils,
recipe_parser_factory=DummyFactory(),
downloader_factory=downloader_factory,
metadata_collector=None,
metadata_processor_cls=None,
metadata_registry_cls=None,
standalone_mode=False,
logger=logging.getLogger("test"),
)
class DummyClient:
def __init__(self):
self.calls = []
async def get_image_info(self, image_id, source_url=None):
self.calls.append((image_id, source_url))
return {
"url": "https://image.civitai.com/x/y/original=true/sample.jpeg",
"type": "image",
"meta": {"prompt": "red prompt"},
}
class DummyScanner:
async def find_recipes_by_fingerprint(self, fingerprint):
return []
client = DummyClient()
result = await service.analyze_remote_image(
url="https://civitai.red/images/123",
recipe_scanner=DummyScanner(),
civitai_client=client,
)
assert client.calls == [("123", "https://civitai.red/images/123")]
assert result.payload["loras"] == []