Merge branch 'main' into fix-symlink

This commit is contained in:
pixelpaws
2026-01-26 17:29:31 +08:00
committed by GitHub
17 changed files with 224282 additions and 100 deletions

View File

@@ -4,6 +4,7 @@ import os
import pytest
from py import config as config_module
from py.utils import cache_paths as cache_paths_module
def _normalize(path: str) -> str:
@@ -28,9 +29,14 @@ def _setup_paths(monkeypatch: pytest.MonkeyPatch, tmp_path):
}
return mapping.get(kind, [])
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths)
monkeypatch.setattr(config_module, "standalone_mode", True)
monkeypatch.setattr(config_module, "get_settings_dir", lambda create=True: str(settings_dir))
monkeypatch.setattr(config_module, "get_settings_dir", fake_get_settings_dir)
# Also patch cache_paths module which has its own import of get_settings_dir
monkeypatch.setattr(cache_paths_module, "get_settings_dir", fake_get_settings_dir)
return loras_dir, settings_dir
@@ -57,7 +63,7 @@ def test_symlink_scan_skips_file_links(monkeypatch: pytest.MonkeyPatch, tmp_path
normalized_file_real = _normalize(os.path.realpath(file_target))
assert normalized_file_real not in cfg._path_mappings
cache_path = settings_dir / "cache" / "symlink_map.json"
cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert cache_path.exists()
@@ -71,7 +77,7 @@ def test_symlink_cache_reuses_previous_scan(monkeypatch: pytest.MonkeyPatch, tmp
first_cfg = config_module.Config()
cached_mappings = dict(first_cfg._path_mappings)
cache_path = settings_dir / "cache" / "symlink_map.json"
cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert cache_path.exists()
def fail_scan(self):
@@ -97,7 +103,7 @@ def test_symlink_cache_survives_noise_mtime(monkeypatch: pytest.MonkeyPatch, tmp
noise_file = recipes_dir / "touchme.txt"
first_cfg = config_module.Config()
cache_path = settings_dir / "cache" / "symlink_map.json"
cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert cache_path.exists()
# Update a noisy path to bump parent directory mtime
@@ -159,9 +165,14 @@ def test_symlink_roots_are_preserved(monkeypatch: pytest.MonkeyPatch, tmp_path):
}
return mapping.get(kind, [])
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths)
monkeypatch.setattr(config_module, "standalone_mode", True)
monkeypatch.setattr(config_module, "get_settings_dir", lambda create=True: str(settings_dir))
monkeypatch.setattr(config_module, "get_settings_dir", fake_get_settings_dir)
# Also patch cache_paths module which has its own import of get_settings_dir
monkeypatch.setattr(cache_paths_module, "get_settings_dir", fake_get_settings_dir)
cfg = config_module.Config()
@@ -169,7 +180,7 @@ def test_symlink_roots_are_preserved(monkeypatch: pytest.MonkeyPatch, tmp_path):
normalized_link = _normalize(str(loras_link))
assert cfg._path_mappings[normalized_real] == normalized_link
cache_path = settings_dir / "cache" / "symlink_map.json"
cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
payload = json.loads(cache_path.read_text(encoding="utf-8"))
assert payload["path_mappings"][normalized_real] == normalized_link
@@ -271,3 +282,60 @@ def test_retargeted_deep_symlink_triggers_rescan(monkeypatch: pytest.MonkeyPatch
second_cfg = config_module.Config()
assert _normalize(str(target_v2)) in second_cfg._path_mappings
assert _normalize(str(target_v1)) not in second_cfg._path_mappings
def test_legacy_symlink_cache_automatic_cleanup(monkeypatch: pytest.MonkeyPatch, tmp_path):
"""Test that legacy symlink cache is automatically cleaned up after migration."""
settings_dir = tmp_path / "settings"
loras_dir = tmp_path / "loras"
loras_dir.mkdir()
checkpoint_dir = tmp_path / "checkpoints"
checkpoint_dir.mkdir()
embedding_dir = tmp_path / "embeddings"
embedding_dir.mkdir()
def fake_get_folder_paths(kind: str):
mapping = {
"loras": [str(loras_dir)],
"checkpoints": [str(checkpoint_dir)],
"unet": [],
"embeddings": [str(embedding_dir)],
}
return mapping.get(kind, [])
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths)
monkeypatch.setattr(config_module, "standalone_mode", True)
monkeypatch.setattr(config_module, "get_settings_dir", fake_get_settings_dir)
monkeypatch.setattr(cache_paths_module, "get_settings_dir", fake_get_settings_dir)
# Create legacy symlink cache at old location
settings_dir.mkdir(parents=True, exist_ok=True)
legacy_cache_dir = settings_dir / "cache"
legacy_cache_dir.mkdir(exist_ok=True)
legacy_cache_path = legacy_cache_dir / "symlink_map.json"
# Write some legacy cache data
legacy_data = {
"fingerprint": {"roots": []},
"path_mappings": {
"/legacy/target": "/legacy/link"
}
}
legacy_cache_path.write_text(json.dumps(legacy_data), encoding="utf-8")
# Verify legacy file exists
assert legacy_cache_path.exists()
# Initialize Config - this should trigger migration and automatic cleanup
cfg = config_module.Config()
# New canonical cache should exist
new_cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert new_cache_path.exists()
# Legacy file should be automatically cleaned up
assert not legacy_cache_path.exists()
# Config should still work correctly
assert isinstance(cfg._path_mappings, dict)

View File

@@ -37,7 +37,7 @@ def test_portable_settings_use_project_root(tmp_path, monkeypatch):
monkeypatch.delenv("LORA_MANAGER_CACHE_DB", raising=False)
cache = cache_module.PersistentModelCache(library_name="portable_lib")
expected_cache_path = tmp_path / "model_cache" / "portable_lib.sqlite"
expected_cache_path = tmp_path / "cache" / "model" / "portable_lib.sqlite"
assert cache.get_database_path() == str(expected_cache_path)
assert expected_cache_path.parent.is_dir()

262
tests/test_tag_fts_index.py Normal file
View File

@@ -0,0 +1,262 @@
"""Tests for TagFTSIndex functionality."""
import os
import tempfile
from typing import List
import pytest
from py.services.tag_fts_index import (
TagFTSIndex,
CATEGORY_NAMES,
CATEGORY_NAME_TO_IDS,
)
@pytest.fixture
def temp_db_path():
"""Create a temporary database path."""
with tempfile.NamedTemporaryFile(suffix=".sqlite", delete=False) as f:
path = f.name
yield path
# Cleanup
if os.path.exists(path):
os.unlink(path)
for suffix in ["-wal", "-shm"]:
wal_path = path + suffix
if os.path.exists(wal_path):
os.unlink(wal_path)
@pytest.fixture
def temp_csv_path():
"""Create a temporary CSV file with test data."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False, encoding="utf-8") as f:
# Write test data in the same format as danbooru_e621_merged.csv
# Format: tag_name,category,post_count,aliases
f.write('1girl,0,6008644,"1girls,sole_female"\n')
f.write('highres,5,5256195,"high_res,high_resolution,hires"\n')
f.write('solo,0,5000954,"alone,female_solo,single"\n')
f.write('hatsune_miku,4,500000,"miku"\n')
f.write('konpaku_youmu,4,150000,"youmu"\n')
f.write('artist_request,1,100000,""\n')
f.write('touhou,3,300000,"touhou_project"\n')
f.write('mammal,12,3437444,"cetancodont"\n')
f.write('anthro,7,3381927,"anthropomorphic"\n')
f.write('hi_res,14,3116617,"high_res"\n')
path = f.name
yield path
# Cleanup
if os.path.exists(path):
os.unlink(path)
class TestTagFTSIndexBasic:
"""Basic tests for TagFTSIndex initialization and schema."""
def test_initialize_creates_tables(self, temp_db_path, temp_csv_path):
"""Test that initialization creates required tables."""
fts = TagFTSIndex(db_path=temp_db_path, csv_path=temp_csv_path)
fts.initialize()
assert fts._schema_initialized is True
def test_get_database_path(self, temp_db_path, temp_csv_path):
"""Test get_database_path returns correct path."""
fts = TagFTSIndex(db_path=temp_db_path, csv_path=temp_csv_path)
assert fts.get_database_path() == temp_db_path
def test_get_csv_path(self, temp_db_path, temp_csv_path):
"""Test get_csv_path returns correct path."""
fts = TagFTSIndex(db_path=temp_db_path, csv_path=temp_csv_path)
assert fts.get_csv_path() == temp_csv_path
def test_is_ready_initially_false(self, temp_db_path, temp_csv_path):
"""Test that is_ready returns False before building index."""
fts = TagFTSIndex(db_path=temp_db_path, csv_path=temp_csv_path)
assert fts.is_ready() is False
class TestTagFTSIndexBuild:
"""Tests for building the FTS index."""
def test_build_index_from_csv(self, temp_db_path, temp_csv_path):
"""Test building index from CSV file."""
fts = TagFTSIndex(db_path=temp_db_path, csv_path=temp_csv_path)
fts.build_index()
assert fts.is_ready() is True
assert fts.get_indexed_count() == 10
def test_build_index_nonexistent_csv(self, temp_db_path):
"""Test that build_index handles missing CSV gracefully."""
fts = TagFTSIndex(db_path=temp_db_path, csv_path="/nonexistent/path.csv")
fts.build_index()
assert fts.is_ready() is False
assert fts.get_indexed_count() == 0
def test_ensure_ready_builds_index(self, temp_db_path, temp_csv_path):
"""Test that ensure_ready builds index if not ready."""
fts = TagFTSIndex(db_path=temp_db_path, csv_path=temp_csv_path)
# Initially not ready
assert fts.is_ready() is False
# ensure_ready should build the index
result = fts.ensure_ready()
assert result is True
assert fts.is_ready() is True
class TestTagFTSIndexSearch:
"""Tests for searching the FTS index."""
@pytest.fixture
def populated_fts(self, temp_db_path, temp_csv_path):
"""Create a populated FTS index."""
fts = TagFTSIndex(db_path=temp_db_path, csv_path=temp_csv_path)
fts.build_index()
return fts
def test_search_basic(self, populated_fts):
"""Test basic search functionality."""
results = populated_fts.search("1girl")
assert len(results) >= 1
assert any(r["tag_name"] == "1girl" for r in results)
def test_search_prefix(self, populated_fts):
"""Test prefix matching."""
results = populated_fts.search("hatsu")
assert len(results) >= 1
assert any(r["tag_name"] == "hatsune_miku" for r in results)
def test_search_returns_enriched_results(self, populated_fts):
"""Test that search returns enriched results with category and post_count."""
results = populated_fts.search("miku")
assert len(results) >= 1
result = results[0]
assert "tag_name" in result
assert "category" in result
assert "post_count" in result
assert result["tag_name"] == "hatsune_miku"
assert result["category"] == 4 # Character category
assert result["post_count"] == 500000
def test_search_with_category_filter(self, populated_fts):
"""Test searching with category filter."""
# Search for character tags only (categories 4 and 11)
results = populated_fts.search("konpaku", categories=[4, 11])
assert len(results) >= 1
assert all(r["category"] in [4, 11] for r in results)
def test_search_with_category_filter_excludes_others(self, populated_fts):
"""Test that category filter excludes other categories."""
# Search for "hi" but only in general category
results = populated_fts.search("hi", categories=[0, 7])
# Should not include "highres" (meta category 5) or "hi_res" (meta category 14)
assert all(r["category"] in [0, 7] for r in results)
def test_search_empty_query_returns_empty(self, populated_fts):
"""Test that empty query returns empty results."""
results = populated_fts.search("")
assert results == []
def test_search_no_matches_returns_empty(self, populated_fts):
"""Test that query with no matches returns empty results."""
results = populated_fts.search("zzzznonexistent")
assert results == []
def test_search_results_sorted_by_post_count(self, populated_fts):
"""Test that results are sorted by post_count descending."""
results = populated_fts.search("1girl", limit=10)
# Verify results are sorted by post_count descending
post_counts = [r["post_count"] for r in results]
assert post_counts == sorted(post_counts, reverse=True)
def test_search_limit(self, populated_fts):
"""Test search result limiting."""
results = populated_fts.search("girl", limit=1)
assert len(results) <= 1
class TestTagFTSIndexClear:
"""Tests for clearing the FTS index."""
def test_clear_removes_all_data(self, temp_db_path, temp_csv_path):
"""Test that clear removes all indexed data."""
fts = TagFTSIndex(db_path=temp_db_path, csv_path=temp_csv_path)
fts.build_index()
assert fts.get_indexed_count() > 0
fts.clear()
assert fts.get_indexed_count() == 0
assert fts.is_ready() is False
class TestCategoryMappings:
"""Tests for category name mappings."""
def test_category_names_complete(self):
"""Test that CATEGORY_NAMES includes all expected categories."""
expected_categories = [0, 1, 3, 4, 5, 7, 8, 10, 11, 12, 14, 15]
for cat in expected_categories:
assert cat in CATEGORY_NAMES
def test_category_name_to_ids_complete(self):
"""Test that CATEGORY_NAME_TO_IDS includes all expected names."""
expected_names = ["general", "artist", "copyright", "character", "meta", "species", "lore"]
for name in expected_names:
assert name in CATEGORY_NAME_TO_IDS
assert isinstance(CATEGORY_NAME_TO_IDS[name], list)
assert len(CATEGORY_NAME_TO_IDS[name]) > 0
def test_category_name_to_ids_includes_both_platforms(self):
"""Test that category mappings include both Danbooru and e621 IDs where applicable."""
# General should have both Danbooru (0) and e621 (7)
assert 0 in CATEGORY_NAME_TO_IDS["general"]
assert 7 in CATEGORY_NAME_TO_IDS["general"]
# Character should have both Danbooru (4) and e621 (11)
assert 4 in CATEGORY_NAME_TO_IDS["character"]
assert 11 in CATEGORY_NAME_TO_IDS["character"]
class TestFTSQueryBuilding:
"""Tests for FTS query building."""
@pytest.fixture
def fts_instance(self, temp_db_path, temp_csv_path):
"""Create an FTS instance for testing."""
return TagFTSIndex(db_path=temp_db_path, csv_path=temp_csv_path)
def test_build_fts_query_simple(self, fts_instance):
"""Test FTS query building with simple query."""
query = fts_instance._build_fts_query("test")
assert query == "test*"
def test_build_fts_query_multiple_words(self, fts_instance):
"""Test FTS query building with multiple words."""
query = fts_instance._build_fts_query("test query")
assert query == "test* query*"
def test_build_fts_query_escapes_special_chars(self, fts_instance):
"""Test that special characters are escaped."""
query = fts_instance._build_fts_query("test:query")
# Colon should be replaced with space
assert ":" not in query
def test_build_fts_query_empty_returns_empty(self, fts_instance):
"""Test that empty query returns empty string."""
query = fts_instance._build_fts_query("")
assert query == ""

View File

@@ -0,0 +1,529 @@
"""Unit tests for the cache_paths module."""
import os
import shutil
import tempfile
from pathlib import Path
import pytest
from py.utils.cache_paths import (
CacheType,
cleanup_legacy_cache_files,
get_cache_base_dir,
get_cache_file_path,
get_legacy_cache_files_for_cleanup,
get_legacy_cache_paths,
resolve_cache_path_with_migration,
)
class TestCacheType:
"""Tests for the CacheType enum."""
def test_enum_values(self):
assert CacheType.MODEL.value == "model"
assert CacheType.RECIPE.value == "recipe"
assert CacheType.RECIPE_FTS.value == "recipe_fts"
assert CacheType.TAG_FTS.value == "tag_fts"
assert CacheType.SYMLINK.value == "symlink"
class TestGetCacheBaseDir:
"""Tests for get_cache_base_dir function."""
def test_returns_cache_subdirectory(self):
cache_dir = get_cache_base_dir(create=True)
assert cache_dir.endswith("cache")
assert os.path.isdir(cache_dir)
def test_creates_directory_when_requested(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
cache_dir = get_cache_base_dir(create=True)
assert os.path.isdir(cache_dir)
assert cache_dir == str(settings_dir / "cache")
class TestGetCacheFilePath:
"""Tests for get_cache_file_path function."""
def test_model_cache_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.MODEL, "my_library", create_dir=True)
expected = settings_dir / "cache" / "model" / "my_library.sqlite"
assert path == str(expected)
assert os.path.isdir(expected.parent)
def test_recipe_cache_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.RECIPE, "default", create_dir=True)
expected = settings_dir / "cache" / "recipe" / "default.sqlite"
assert path == str(expected)
def test_recipe_fts_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.RECIPE_FTS, create_dir=True)
expected = settings_dir / "cache" / "fts" / "recipe_fts.sqlite"
assert path == str(expected)
def test_tag_fts_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.TAG_FTS, create_dir=True)
expected = settings_dir / "cache" / "fts" / "tag_fts.sqlite"
assert path == str(expected)
def test_symlink_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.SYMLINK, create_dir=True)
expected = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert path == str(expected)
def test_sanitizes_library_name(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.MODEL, "my/bad:name", create_dir=True)
assert "my_bad_name" in path
def test_none_library_name_defaults_to_default(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.MODEL, None, create_dir=True)
assert "default.sqlite" in path
class TestGetLegacyCachePaths:
"""Tests for get_legacy_cache_paths function."""
def test_model_legacy_paths_for_default(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.MODEL, "default")
assert len(paths) == 2
assert str(settings_dir / "model_cache" / "default.sqlite") in paths
assert str(settings_dir / "model_cache.sqlite") in paths
def test_model_legacy_paths_for_named_library(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.MODEL, "my_library")
assert len(paths) == 1
assert str(settings_dir / "model_cache" / "my_library.sqlite") in paths
def test_recipe_legacy_paths(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.RECIPE, "default")
assert len(paths) == 2
assert str(settings_dir / "recipe_cache" / "default.sqlite") in paths
assert str(settings_dir / "recipe_cache.sqlite") in paths
def test_recipe_fts_legacy_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.RECIPE_FTS)
assert len(paths) == 1
assert str(settings_dir / "recipe_fts.sqlite") in paths
def test_tag_fts_legacy_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.TAG_FTS)
assert len(paths) == 1
assert str(settings_dir / "tag_fts.sqlite") in paths
def test_symlink_legacy_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.SYMLINK)
assert len(paths) == 1
assert str(settings_dir / "cache" / "symlink_map.json") in paths
class TestResolveCachePathWithMigration:
"""Tests for resolve_cache_path_with_migration function."""
def test_returns_env_override_when_set(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
override_path = "/custom/path/cache.sqlite"
path = resolve_cache_path_with_migration(
CacheType.MODEL,
library_name="default",
env_override=override_path,
)
assert path == override_path
def test_returns_canonical_path_when_exists(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create the canonical path
canonical = settings_dir / "cache" / "model" / "default.sqlite"
canonical.parent.mkdir(parents=True)
canonical.write_text("existing")
path = resolve_cache_path_with_migration(CacheType.MODEL, "default")
assert path == str(canonical)
def test_migrates_from_legacy_root_level_cache(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create legacy cache at root level
legacy_path = settings_dir / "model_cache.sqlite"
legacy_path.write_text("legacy data")
path = resolve_cache_path_with_migration(CacheType.MODEL, "default")
# Should return canonical path
canonical = settings_dir / "cache" / "model" / "default.sqlite"
assert path == str(canonical)
# File should be copied to canonical location
assert canonical.exists()
assert canonical.read_text() == "legacy data"
# Legacy file should be automatically cleaned up
assert not legacy_path.exists()
def test_migrates_from_legacy_per_library_cache(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create legacy per-library cache
legacy_dir = settings_dir / "model_cache"
legacy_dir.mkdir()
legacy_path = legacy_dir / "my_library.sqlite"
legacy_path.write_text("legacy library data")
path = resolve_cache_path_with_migration(CacheType.MODEL, "my_library")
# Should return canonical path
canonical = settings_dir / "cache" / "model" / "my_library.sqlite"
assert path == str(canonical)
assert canonical.exists()
assert canonical.read_text() == "legacy library data"
# Legacy file should be automatically cleaned up
assert not legacy_path.exists()
# Empty legacy directory should be cleaned up
assert not legacy_dir.exists()
def test_prefers_per_library_over_root_for_migration(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create both legacy caches
legacy_root = settings_dir / "model_cache.sqlite"
legacy_root.write_text("root legacy")
legacy_dir = settings_dir / "model_cache"
legacy_dir.mkdir()
legacy_lib = legacy_dir / "default.sqlite"
legacy_lib.write_text("library legacy")
path = resolve_cache_path_with_migration(CacheType.MODEL, "default")
canonical = settings_dir / "cache" / "model" / "default.sqlite"
assert path == str(canonical)
# Should migrate from per-library path (first in legacy list)
assert canonical.read_text() == "library legacy"
def test_returns_canonical_path_when_no_legacy_exists(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = resolve_cache_path_with_migration(CacheType.MODEL, "new_library")
canonical = settings_dir / "cache" / "model" / "new_library.sqlite"
assert path == str(canonical)
# Directory should be created
assert canonical.parent.exists()
# But file should not exist yet
assert not canonical.exists()
class TestLegacyCacheCleanup:
"""Tests for legacy cache cleanup functions."""
def test_get_legacy_cache_files_for_cleanup(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create canonical and legacy files
canonical = settings_dir / "cache" / "model" / "default.sqlite"
canonical.parent.mkdir(parents=True)
canonical.write_text("canonical")
legacy = settings_dir / "model_cache.sqlite"
legacy.write_text("legacy")
files = get_legacy_cache_files_for_cleanup()
assert str(legacy) in files
def test_cleanup_legacy_cache_files_dry_run(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create canonical and legacy files
canonical = settings_dir / "cache" / "model" / "default.sqlite"
canonical.parent.mkdir(parents=True)
canonical.write_text("canonical")
legacy = settings_dir / "model_cache.sqlite"
legacy.write_text("legacy")
removed = cleanup_legacy_cache_files(dry_run=True)
assert str(legacy) in removed
# File should still exist (dry run)
assert legacy.exists()
def test_cleanup_legacy_cache_files_actual(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create canonical and legacy files
canonical = settings_dir / "cache" / "model" / "default.sqlite"
canonical.parent.mkdir(parents=True)
canonical.write_text("canonical")
legacy = settings_dir / "model_cache.sqlite"
legacy.write_text("legacy")
removed = cleanup_legacy_cache_files(dry_run=False)
assert str(legacy) in removed
# File should be deleted
assert not legacy.exists()
class TestAutomaticCleanup:
"""Tests for automatic cleanup during migration."""
def test_automatic_cleanup_on_migration(self, tmp_path, monkeypatch):
"""Test that legacy files are automatically cleaned up after migration."""
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create a legacy cache file
legacy_dir = settings_dir / "model_cache"
legacy_dir.mkdir()
legacy_file = legacy_dir / "default.sqlite"
legacy_file.write_text("test data")
# Verify legacy file exists
assert legacy_file.exists()
# Trigger migration (this should auto-cleanup)
resolved_path = resolve_cache_path_with_migration(CacheType.MODEL, "default")
# Verify canonical file exists
canonical_path = settings_dir / "cache" / "model" / "default.sqlite"
assert resolved_path == str(canonical_path)
assert canonical_path.exists()
assert canonical_path.read_text() == "test data"
# Verify legacy file was cleaned up
assert not legacy_file.exists()
# Verify empty directory was cleaned up
assert not legacy_dir.exists()
def test_automatic_cleanup_with_verification(self, tmp_path, monkeypatch):
"""Test that cleanup verifies file integrity before deletion."""
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create legacy cache
legacy_dir = settings_dir / "recipe_cache"
legacy_dir.mkdir()
legacy_file = legacy_dir / "my_library.sqlite"
legacy_file.write_text("data")
# Trigger migration
resolved_path = resolve_cache_path_with_migration(CacheType.RECIPE, "my_library")
canonical_path = settings_dir / "cache" / "recipe" / "my_library.sqlite"
# Both should exist initially (migration successful)
assert canonical_path.exists()
assert legacy_file.exists() is False # Auto-cleanup removes it
# File content should match (integrity check)
assert canonical_path.read_text() == "data"
# Directory should be cleaned up
assert not legacy_dir.exists()
def test_automatic_cleanup_multiple_cache_types(self, tmp_path, monkeypatch):
"""Test automatic cleanup for different cache types."""
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Test RECIPE_FTS migration
legacy_fts = settings_dir / "recipe_fts.sqlite"
legacy_fts.write_text("fts data")
resolve_cache_path_with_migration(CacheType.RECIPE_FTS)
canonical_fts = settings_dir / "cache" / "fts" / "recipe_fts.sqlite"
assert canonical_fts.exists()
assert not legacy_fts.exists()
# Test TAG_FTS migration
legacy_tag = settings_dir / "tag_fts.sqlite"
legacy_tag.write_text("tag data")
resolve_cache_path_with_migration(CacheType.TAG_FTS)
canonical_tag = settings_dir / "cache" / "fts" / "tag_fts.sqlite"
assert canonical_tag.exists()
assert not legacy_tag.exists()