mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-21 21:22:11 -03:00
180 lines
5.5 KiB
Python
180 lines
5.5 KiB
Python
import asyncio
|
|
import copy
|
|
import json
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from py.utils import example_images_migration as migration_module
|
|
|
|
|
|
class FakeScanner:
|
|
def __init__(self, data_map, init_cycles=0):
|
|
self._data_map = data_map
|
|
self._init_cycles = init_cycles
|
|
self.update_calls = []
|
|
|
|
def is_initializing(self):
|
|
if self._init_cycles > 0:
|
|
self._init_cycles -= 1
|
|
return True
|
|
return False
|
|
|
|
def has_hash(self, hash_value):
|
|
return hash_value in self._data_map
|
|
|
|
async def get_cached_data(self):
|
|
payload = [copy.deepcopy(item) for item in self._data_map.values()]
|
|
return SimpleNamespace(raw_data=payload)
|
|
|
|
async def update_single_model_cache(self, *args):
|
|
self.update_calls.append(args)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_migrations_run_and_update_progress(tmp_path, monkeypatch):
|
|
example_root = tmp_path / "example_images"
|
|
library_root = example_root / "main"
|
|
missing_library = tmp_path / "missing"
|
|
library_root.mkdir(parents=True)
|
|
|
|
progress_path = library_root / ".download_progress.json"
|
|
progress_path.write_text(json.dumps({"naming_version": 0, "keep": "value"}))
|
|
|
|
migrating_hash = "a" * 64
|
|
migrating_folder = library_root / migrating_hash
|
|
migrating_folder.mkdir()
|
|
(migrating_folder / "image_1.png").write_bytes(b"one")
|
|
(migrating_folder / "image_2.png").write_bytes(b"two")
|
|
|
|
zero_based_hash = "b" * 64
|
|
zero_based_folder = library_root / zero_based_hash
|
|
zero_based_folder.mkdir()
|
|
(zero_based_folder / "image_0.png").write_bytes(b"zero")
|
|
(zero_based_folder / "image_1.png").write_bytes(b"one")
|
|
|
|
rename_only_hash = "c" * 64
|
|
rename_only_folder = library_root / rename_only_hash
|
|
rename_only_folder.mkdir()
|
|
(rename_only_folder / "image_1.png").write_bytes(b"needs rename")
|
|
|
|
metadata_path = tmp_path / "models" / "model.safetensors"
|
|
metadata_path.parent.mkdir()
|
|
|
|
metadata_entry = {
|
|
"sha256": migrating_hash,
|
|
"file_path": str(metadata_path),
|
|
"civitai": {
|
|
"images": [
|
|
{"url": "", "type": "image", "prompt": "custom"},
|
|
{"url": "https://example.com/remote.png", "type": "image"},
|
|
],
|
|
"customImages": [],
|
|
},
|
|
}
|
|
|
|
lora_scanner = FakeScanner({migrating_hash: metadata_entry}, init_cycles=1)
|
|
checkpoint_scanner = FakeScanner({}, init_cycles=0)
|
|
|
|
async def fake_get_lora_scanner(cls):
|
|
return lora_scanner
|
|
|
|
async def fake_get_checkpoint_scanner(cls):
|
|
return checkpoint_scanner
|
|
|
|
monkeypatch.setattr(
|
|
migration_module.ServiceRegistry,
|
|
"get_lora_scanner",
|
|
classmethod(fake_get_lora_scanner),
|
|
)
|
|
monkeypatch.setattr(
|
|
migration_module.ServiceRegistry,
|
|
"get_checkpoint_scanner",
|
|
classmethod(fake_get_checkpoint_scanner),
|
|
)
|
|
|
|
monkeypatch.setattr(
|
|
migration_module.settings,
|
|
"get",
|
|
lambda key, default=None: str(example_root) if key == "example_images_path" else default,
|
|
)
|
|
|
|
monkeypatch.setattr(
|
|
migration_module,
|
|
"iter_library_roots",
|
|
lambda: [("main", str(library_root)), ("missing", str(missing_library))],
|
|
)
|
|
|
|
saved_metadata = []
|
|
|
|
async def fake_save_metadata(path, metadata):
|
|
saved_metadata.append((path, metadata))
|
|
return True
|
|
|
|
monkeypatch.setattr(
|
|
migration_module.MetadataManager,
|
|
"save_metadata",
|
|
staticmethod(fake_save_metadata),
|
|
)
|
|
|
|
short_ids = iter(["short1234"])
|
|
monkeypatch.setattr(
|
|
migration_module.ExampleImagesProcessor,
|
|
"generate_short_id",
|
|
staticmethod(lambda: next(short_ids)),
|
|
)
|
|
|
|
sleep_calls = []
|
|
|
|
async def fake_sleep(delay):
|
|
sleep_calls.append(delay)
|
|
return None
|
|
|
|
monkeypatch.setattr(migration_module.asyncio, "sleep", fake_sleep)
|
|
|
|
scheduled_tasks = []
|
|
original_create_task = asyncio.create_task
|
|
|
|
def capture_create_task(coro, *args, **kwargs):
|
|
task = original_create_task(coro, *args, **kwargs)
|
|
scheduled_tasks.append(task)
|
|
return task
|
|
|
|
monkeypatch.setattr(migration_module.asyncio, "create_task", capture_create_task)
|
|
|
|
await migration_module.ExampleImagesMigration.check_and_run_migrations()
|
|
|
|
assert len(scheduled_tasks) == 1
|
|
await asyncio.gather(*scheduled_tasks)
|
|
|
|
progress_data = json.loads(progress_path.read_text())
|
|
assert progress_data["naming_version"] == migration_module.CURRENT_NAMING_VERSION
|
|
assert progress_data["keep"] == "value"
|
|
|
|
assert not (migrating_folder / "image_0.png").exists()
|
|
assert (migrating_folder / "custom_short1234.png").exists()
|
|
assert (migrating_folder / "image_1.png").exists()
|
|
|
|
assert (zero_based_folder / "image_0.png").exists()
|
|
assert (zero_based_folder / "image_1.png").exists()
|
|
|
|
assert (rename_only_folder / "image_0.png").exists()
|
|
assert not (rename_only_folder / "image_1.png").exists()
|
|
|
|
assert not missing_library.exists()
|
|
|
|
assert any(delay == 1 for delay in sleep_calls)
|
|
|
|
assert len(saved_metadata) == 1
|
|
saved_path, saved_payload = saved_metadata[0]
|
|
assert saved_path == str(metadata_path)
|
|
assert saved_payload["civitai"]["customImages"][0]["id"] == "short1234"
|
|
assert saved_payload["civitai"]["images"] == [
|
|
{"url": "https://example.com/remote.png", "type": "image"}
|
|
]
|
|
|
|
assert len(lora_scanner.update_calls) == 1
|
|
update_args = lora_scanner.update_calls[0]
|
|
assert update_args[0] == str(metadata_path)
|
|
assert update_args[2]["civitai"]["customImages"][0]["id"] == "short1234"
|