perf(recipe_scanner): eliminate event loop blocking during cache rebuild

Refactor force_refresh path to use thread pool execution instead of blocking
the event loop shared with ComfyUI. Key changes:

- Fix 1: Route force_refresh through _initialize_recipe_cache_sync() in thread pool
- Fix 2: Add GIL release points (time.sleep(0)) every 100 files in sync loops
- Fix 3: Move RecipeCache.resort() to thread pool via run_in_executor
- Fix 4: Persist cache automatically after force_refresh
- Fix 5: Increase yield frequency in _enrich_cache_metadata (every recipe)

This eliminates the ~5 minute freeze when rebuilding 30K recipe cache.

Fixes performance issue where ComfyUI became unresponsive during recipe
scanning due to shared Python event loop blocking.
This commit is contained in:
Will Miao
2026-03-04 15:10:46 +08:00
parent 1ed503a6b5
commit 4d8113464c
3 changed files with 982 additions and 634 deletions

View File

@@ -4,6 +4,7 @@ from dataclasses import dataclass
from operator import itemgetter
from natsort import natsorted
@dataclass
class RecipeCache:
"""Cache structure for Recipe data"""
@@ -21,11 +22,18 @@ class RecipeCache:
self.folder_tree = self.folder_tree or {}
async def resort(self, name_only: bool = False):
"""Resort all cached data views"""
"""Resort all cached data views in a thread pool to avoid blocking the event loop."""
async with self._lock:
self._resort_locked(name_only=name_only)
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._resort_locked,
name_only,
)
async def update_recipe_metadata(self, recipe_id: str, metadata: Dict, *, resort: bool = True) -> bool:
async def update_recipe_metadata(
self, recipe_id: str, metadata: Dict, *, resort: bool = True
) -> bool:
"""Update metadata for a specific recipe in all cached data
Args:
@@ -37,7 +45,7 @@ class RecipeCache:
"""
async with self._lock:
for item in self.raw_data:
if str(item.get('id')) == str(recipe_id):
if str(item.get("id")) == str(recipe_id):
item.update(metadata)
if resort:
self._resort_locked()
@@ -52,7 +60,9 @@ class RecipeCache:
if resort:
self._resort_locked()
async def remove_recipe(self, recipe_id: str, *, resort: bool = False) -> Optional[Dict]:
async def remove_recipe(
self, recipe_id: str, *, resort: bool = False
) -> Optional[Dict]:
"""Remove a recipe from the cache by ID.
Args:
@@ -64,14 +74,16 @@ class RecipeCache:
async with self._lock:
for index, recipe in enumerate(self.raw_data):
if str(recipe.get('id')) == str(recipe_id):
if str(recipe.get("id")) == str(recipe_id):
removed = self.raw_data.pop(index)
if resort:
self._resort_locked()
return removed
return None
async def bulk_remove(self, recipe_ids: Iterable[str], *, resort: bool = False) -> List[Dict]:
async def bulk_remove(
self, recipe_ids: Iterable[str], *, resort: bool = False
) -> List[Dict]:
"""Remove multiple recipes from the cache."""
id_set = {str(recipe_id) for recipe_id in recipe_ids}
@@ -79,21 +91,25 @@ class RecipeCache:
return []
async with self._lock:
removed = [item for item in self.raw_data if str(item.get('id')) in id_set]
removed = [item for item in self.raw_data if str(item.get("id")) in id_set]
if not removed:
return []
self.raw_data = [item for item in self.raw_data if str(item.get('id')) not in id_set]
self.raw_data = [
item for item in self.raw_data if str(item.get("id")) not in id_set
]
if resort:
self._resort_locked()
return removed
async def replace_recipe(self, recipe_id: str, new_data: Dict, *, resort: bool = False) -> bool:
async def replace_recipe(
self, recipe_id: str, new_data: Dict, *, resort: bool = False
) -> bool:
"""Replace cached data for a recipe."""
async with self._lock:
for index, recipe in enumerate(self.raw_data):
if str(recipe.get('id')) == str(recipe_id):
if str(recipe.get("id")) == str(recipe_id):
self.raw_data[index] = new_data
if resort:
self._resort_locked()
@@ -105,7 +121,7 @@ class RecipeCache:
async with self._lock:
for recipe in self.raw_data:
if str(recipe.get('id')) == str(recipe_id):
if str(recipe.get("id")) == str(recipe_id):
return dict(recipe)
return None
@@ -115,16 +131,13 @@ class RecipeCache:
async with self._lock:
return [dict(item) for item in self.raw_data]
def _resort_locked(self, *, name_only: bool = False) -> None:
def _resort_locked(self, name_only: bool = False) -> None:
"""Sort cached views. Caller must hold ``_lock``."""
self.sorted_by_name = natsorted(
self.raw_data,
key=lambda x: x.get('title', '').lower()
self.raw_data, key=lambda x: x.get("title", "").lower()
)
if not name_only:
self.sorted_by_date = sorted(
self.raw_data,
key=itemgetter('created_date', 'file_path'),
reverse=True
)
self.raw_data, key=itemgetter("created_date", "file_path"), reverse=True
)

File diff suppressed because it is too large Load Diff

View File

@@ -60,11 +60,13 @@ class StubLoraScanner:
"preview_url": info.get("preview_url", ""),
"civitai": info.get("civitai", {}),
}
self._cache.raw_data.append({
"sha256": info.get("sha256", ""),
"path": info.get("file_path", ""),
"civitai": info.get("civitai", {}),
})
self._cache.raw_data.append(
{
"sha256": info.get("sha256", ""),
"path": info.get("file_path", ""),
"civitai": info.get("civitai", {}),
}
)
@pytest.fixture
@@ -107,7 +109,8 @@ async def test_add_recipe_during_concurrent_reads(recipe_scanner):
await asyncio.sleep(0)
await asyncio.gather(reader_task(), reader_task(), scanner.add_recipe(new_recipe))
await asyncio.sleep(0)
# Wait a bit longer for the thread-pool resort to complete
await asyncio.sleep(0.1)
cache = await scanner.get_cached_data()
assert {item["id"] for item in cache.raw_data} == {"one", "two"}
@@ -119,14 +122,16 @@ async def test_remove_recipe_during_reads(recipe_scanner):
recipe_ids = ["alpha", "beta", "gamma"]
for index, recipe_id in enumerate(recipe_ids):
await scanner.add_recipe({
"id": recipe_id,
"file_path": f"path/{recipe_id}.png",
"title": recipe_id,
"modified": float(index),
"created_date": float(index),
"loras": [],
})
await scanner.add_recipe(
{
"id": recipe_id,
"file_path": f"path/{recipe_id}.png",
"title": recipe_id,
"modified": float(index),
"created_date": float(index),
"loras": [],
}
)
async def reader_task():
for _ in range(5):
@@ -155,7 +160,13 @@ async def test_update_lora_entry_updates_cache_and_file(tmp_path: Path, recipe_s
"modified": 0.0,
"created_date": 0.0,
"loras": [
{"file_name": "old", "strength": 1.0, "hash": "", "isDeleted": True, "exclude": True},
{
"file_name": "old",
"strength": 1.0,
"hash": "",
"isDeleted": True,
"exclude": True,
},
],
}
recipe_path.write_text(json.dumps(recipe_data))
@@ -380,7 +391,9 @@ async def test_initialize_waits_for_lora_scanner(monkeypatch):
@pytest.mark.asyncio
async def test_invalid_model_version_marked_deleted_and_not_retried(monkeypatch, recipe_scanner):
async def test_invalid_model_version_marked_deleted_and_not_retried(
monkeypatch, recipe_scanner
):
scanner, _ = recipe_scanner
recipes_dir = Path(config.loras_roots[0]) / "recipes"
recipes_dir.mkdir(parents=True, exist_ok=True)
@@ -417,7 +430,9 @@ async def test_invalid_model_version_marked_deleted_and_not_retried(monkeypatch,
@pytest.mark.asyncio
async def test_load_recipe_persists_deleted_flag_on_invalid_version(monkeypatch, recipe_scanner, tmp_path):
async def test_load_recipe_persists_deleted_flag_on_invalid_version(
monkeypatch, recipe_scanner, tmp_path
):
scanner, _ = recipe_scanner
recipes_dir = Path(config.loras_roots[0]) / "recipes"
recipes_dir.mkdir(parents=True, exist_ok=True)
@@ -448,7 +463,9 @@ async def test_load_recipe_persists_deleted_flag_on_invalid_version(monkeypatch,
@pytest.mark.asyncio
async def test_update_lora_filename_by_hash_updates_affected_recipes(tmp_path: Path, recipe_scanner):
async def test_update_lora_filename_by_hash_updates_affected_recipes(
tmp_path: Path, recipe_scanner
):
scanner, _ = recipe_scanner
recipes_dir = Path(config.loras_roots[0]) / "recipes"
recipes_dir.mkdir(parents=True, exist_ok=True)
@@ -464,7 +481,7 @@ async def test_update_lora_filename_by_hash_updates_affected_recipes(tmp_path: P
"created_date": 0.0,
"loras": [
{"file_name": "old_name", "hash": "hash1"},
{"file_name": "other_lora", "hash": "hash2"}
{"file_name": "other_lora", "hash": "hash2"},
],
}
recipe1_path.write_text(json.dumps(recipe1_data))
@@ -479,16 +496,16 @@ async def test_update_lora_filename_by_hash_updates_affected_recipes(tmp_path: P
"title": "Recipe 2",
"modified": 0.0,
"created_date": 0.0,
"loras": [
{"file_name": "other_lora", "hash": "hash2"}
],
"loras": [{"file_name": "other_lora", "hash": "hash2"}],
}
recipe2_path.write_text(json.dumps(recipe2_data))
await scanner.add_recipe(dict(recipe2_data))
# Update LoRA name for "hash1" (using different case to test normalization)
new_name = "new_name"
file_count, cache_count = await scanner.update_lora_filename_by_hash("HASH1", new_name)
file_count, cache_count = await scanner.update_lora_filename_by_hash(
"HASH1", new_name
)
assert file_count == 1
assert cache_count == 1
@@ -510,92 +527,100 @@ async def test_update_lora_filename_by_hash_updates_affected_recipes(tmp_path: P
@pytest.mark.asyncio
async def test_get_paginated_data_filters_by_favorite(recipe_scanner):
scanner, _ = recipe_scanner
# Add a normal recipe
await scanner.add_recipe({
"id": "regular",
"file_path": "path/regular.png",
"title": "Regular Recipe",
"modified": 1.0,
"created_date": 1.0,
"loras": [],
})
await scanner.add_recipe(
{
"id": "regular",
"file_path": "path/regular.png",
"title": "Regular Recipe",
"modified": 1.0,
"created_date": 1.0,
"loras": [],
}
)
# Add a favorite recipe
await scanner.add_recipe({
"id": "favorite",
"file_path": "path/favorite.png",
"title": "Favorite Recipe",
"modified": 2.0,
"created_date": 2.0,
"loras": [],
"favorite": True
})
await scanner.add_recipe(
{
"id": "favorite",
"file_path": "path/favorite.png",
"title": "Favorite Recipe",
"modified": 2.0,
"created_date": 2.0,
"loras": [],
"favorite": True,
}
)
# Wait for cache update (it's async in some places, add_recipe is usually enough but let's be safe)
await asyncio.sleep(0)
# Test without filter (should return both)
result_all = await scanner.get_paginated_data(page=1, page_size=10)
assert len(result_all["items"]) == 2
# Test with favorite filter
result_fav = await scanner.get_paginated_data(page=1, page_size=10, filters={"favorite": True})
result_fav = await scanner.get_paginated_data(
page=1, page_size=10, filters={"favorite": True}
)
assert len(result_fav["items"]) == 1
assert result_fav["items"][0]["id"] == "favorite"
# Test with favorite filter set to False (should return both or at least not filter if it's the default)
# Actually our implementation checks if 'favorite' in filters and filters['favorite']
result_fav_false = await scanner.get_paginated_data(page=1, page_size=10, filters={"favorite": False})
result_fav_false = await scanner.get_paginated_data(
page=1, page_size=10, filters={"favorite": False}
)
assert len(result_fav_false["items"]) == 2
@pytest.mark.asyncio
async def test_get_paginated_data_filters_by_prompt(recipe_scanner):
scanner, _ = recipe_scanner
# Add a recipe with a specific prompt
await scanner.add_recipe({
"id": "prompt-recipe",
"file_path": "path/prompt.png",
"title": "Prompt Recipe",
"modified": 1.0,
"created_date": 1.0,
"loras": [],
"gen_params": {
"prompt": "a beautiful forest landscape"
await scanner.add_recipe(
{
"id": "prompt-recipe",
"file_path": "path/prompt.png",
"title": "Prompt Recipe",
"modified": 1.0,
"created_date": 1.0,
"loras": [],
"gen_params": {"prompt": "a beautiful forest landscape"},
}
})
)
# Add a recipe with a specific negative prompt
await scanner.add_recipe({
"id": "neg-prompt-recipe",
"file_path": "path/neg.png",
"title": "Negative Prompt Recipe",
"modified": 2.0,
"created_date": 2.0,
"loras": [],
"gen_params": {
"negative_prompt": "ugly, blurry mountains"
await scanner.add_recipe(
{
"id": "neg-prompt-recipe",
"file_path": "path/neg.png",
"title": "Negative Prompt Recipe",
"modified": 2.0,
"created_date": 2.0,
"loras": [],
"gen_params": {"negative_prompt": "ugly, blurry mountains"},
}
})
)
await asyncio.sleep(0)
# Test search in prompt
result_prompt = await scanner.get_paginated_data(
page=1, page_size=10, search="forest", search_options={"prompt": True}
)
assert len(result_prompt["items"]) == 1
assert result_prompt["items"][0]["id"] == "prompt-recipe"
# Test search in negative prompt
result_neg = await scanner.get_paginated_data(
page=1, page_size=10, search="mountains", search_options={"prompt": True}
)
assert len(result_neg["items"]) == 1
assert result_neg["items"][0]["id"] == "neg-prompt-recipe"
# Test search disabled (should not find by prompt)
result_disabled = await scanner.get_paginated_data(
page=1, page_size=10, search="forest", search_options={"prompt": False}
@@ -606,38 +631,57 @@ async def test_get_paginated_data_filters_by_prompt(recipe_scanner):
@pytest.mark.asyncio
async def test_get_paginated_data_sorting(recipe_scanner):
scanner, _ = recipe_scanner
# Add test recipes
# Recipe A: Name "Alpha", Date 10, LoRAs 2
await scanner.add_recipe({
"id": "A", "title": "Alpha", "created_date": 10.0,
"loras": [{}, {}], "file_path": "a.png"
})
await scanner.add_recipe(
{
"id": "A",
"title": "Alpha",
"created_date": 10.0,
"loras": [{}, {}],
"file_path": "a.png",
}
)
# Recipe B: Name "Beta", Date 20, LoRAs 1
await scanner.add_recipe({
"id": "B", "title": "Beta", "created_date": 20.0,
"loras": [{}], "file_path": "b.png"
})
await scanner.add_recipe(
{
"id": "B",
"title": "Beta",
"created_date": 20.0,
"loras": [{}],
"file_path": "b.png",
}
)
# Recipe C: Name "Gamma", Date 5, LoRAs 3
await scanner.add_recipe({
"id": "C", "title": "Gamma", "created_date": 5.0,
"loras": [{}, {}, {}], "file_path": "c.png"
})
await scanner.add_recipe(
{
"id": "C",
"title": "Gamma",
"created_date": 5.0,
"loras": [{}, {}, {}],
"file_path": "c.png",
}
)
await asyncio.sleep(0)
# Test Name DESC: Gamma, Beta, Alpha
res = await scanner.get_paginated_data(page=1, page_size=10, sort_by="name:desc")
assert [i["id"] for i in res["items"]] == ["C", "B", "A"]
# Test LoRA Count DESC: Gamma (3), Alpha (2), Beta (1)
res = await scanner.get_paginated_data(page=1, page_size=10, sort_by="loras_count:desc")
res = await scanner.get_paginated_data(
page=1, page_size=10, sort_by="loras_count:desc"
)
assert [i["id"] for i in res["items"]] == ["C", "A", "B"]
# Test LoRA Count ASC: Beta (1), Alpha (2), Gamma (3)
res = await scanner.get_paginated_data(page=1, page_size=10, sort_by="loras_count:asc")
res = await scanner.get_paginated_data(
page=1, page_size=10, sort_by="loras_count:asc"
)
assert [i["id"] for i in res["items"]] == ["B", "A", "C"]
# Test Date ASC: Gamma (5), Alpha (10), Beta (20)
res = await scanner.get_paginated_data(page=1, page_size=10, sort_by="date:asc")
assert [i["id"] for i in res["items"]] == ["C", "A", "B"]